Skip to content

[FLINK-XXXXX][table] Add config option and Catalog API for table to materialized table conversion#28287

Draft
raminqaf wants to merge 4 commits into
apache:masterfrom
raminqaf:flip-578-table-to-mt-conversion
Draft

[FLINK-XXXXX][table] Add config option and Catalog API for table to materialized table conversion#28287
raminqaf wants to merge 4 commits into
apache:masterfrom
raminqaf:flip-578-table-to-mt-conversion

Conversation

@raminqaf
Copy link
Copy Markdown
Contributor

@raminqaf raminqaf commented Jun 1, 2026

What is the purpose of the change

This is the implementation of the FLIP-578: In-place Table to Materialized Table conversion

Introduce the building blocks for in-place conversion of a regular table to a materialized table use the already existing CoA command.

Brief change log

(for example:)

  • Add the cluster-level table.materialized-table.conversion-from-table.enabled option (default false) that gates the conversion. It is read from the root configuration, so a session-level SET has no effect.

  • Add Catalog#convertTableToMaterializedTable, which swaps an existing regular table's catalog entry for a materialized table in place while preserving its identity and storage. The default throws UnsupportedOperationException; GenericInMemoryCatalog overrides it with a table-kind check. CatalogManager resolves both tables, delegates to the catalog, and fires an AlterTableEvent. Launching the refresh job is left to the executor, not the catalog.

  • handleCreateOrAlter now dispatches on the kind of the existing object: a materialized table is altered as before, a view is rejected, and a regular table is converted in place to a materialized table when table.materialized-table.conversion-from-table.enabled is set on the cluster (otherwise the statement is rejected as before). The conversion carries over the source table's watermark and primary key when the DDL omits them, rejecting a source with more than one watermark. It emits the structured TableChanges (columns, constraint, watermark, definition query, options, distribution, start mode) by reusing the shared change-building helpers, and returns a ConvertTableToMaterializedTableOperation.

Verifying this change

  • Added test in SqlNodeToOperationConvertTableToMaterializedTableTest
  • Added ITs in MaterializedTableStatementITCase

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @Public(Evolving): yes
  • The serializers: no
  • The runtime per-record code paths (performance sensitive): no
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
  • The S3 file system connector: no

Documentation

  • Does this pull request introduce a new feature? yes
  • If yes, how is the feature documented? docs/JavaDocs

Was generative AI tooling used to co-author this PR?
  • Yes (please specify the tool below)

Generated-by: Opus 4.8

raminqaf added 4 commits May 30, 2026 09:53
…aterialized table conversion

Introduce the building blocks for in-place conversion of a regular table to a materialized table. Neither part is wired into SQL execution on its own.

Add the cluster-level table.materialized-table.conversion-from-table.enabled option (default false) that gates the conversion. It is read from the root configuration, so a session-level SET has no effect.

Add Catalog#convertTableToMaterializedTable, which swaps an existing regular table's catalog entry for a materialized table in place while preserving its identity and storage. The default throws UnsupportedOperationException; GenericInMemoryCatalog overrides it with a table-kind check. CatalogManager resolves both tables, delegates to the catalog, and fires an AlterTableEvent. Launching the refresh job is left to the executor, not the catalog.
…CREATE OR ALTER

handleCreateOrAlter now dispatches on the kind of the existing object: a materialized table is altered as before, a view is rejected, and a regular table is converted in place to a materialized table when table.materialized-table.conversion-from-table.enabled is set on the cluster (otherwise the statement is rejected as before). The conversion carries over the source table's watermark and primary key when the DDL omits them, rejecting a source with more than one watermark. It emits the structured TableChanges (columns, constraint, watermark, definition query, options, distribution, start mode) by reusing the shared change-building helpers, and returns a ConvertTableToMaterializedTableOperation.
…materialized table

The SQL gateway's MaterializedTableManager now handles ConvertTableToMaterializedTableOperation: it swaps the catalog entry and then launches the refresh job (a continuous streaming job or a full-mode workflow) and persists the refresh handler, mirroring CREATE MATERIALIZED TABLE. Without this the gateway rejected the operation as unsupported, so conversion swapped the catalog entry but never started refreshing.

On a refresh-job-launch failure the conversion makes a best-effort rollback: it drops the materialized table and recreates the original regular table from the operation's original table. Unlike CREATE there is no clean reverse path (the catalog only converts table to materialized table, and alterTable rejects kind changes), so this restores the user's prior state rather than dropping the table outright.

TestFileSystemCatalog gains the conversion override and the materialized table ITCase base enables the cluster-level conversion flag so the path can be exercised end to end.
…version

Document converting a regular table to a materialized table via CREATE OR ALTER MATERIALIZED TABLE on the materialized table statements page: the cluster-level enablement option, watermark and primary key inheritance, and the refresh-job behavior. The same content is mirrored in the Chinese docs (in English, pending translation).
@flinkbot
Copy link
Copy Markdown
Collaborator

flinkbot commented Jun 1, 2026

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

Copy link
Copy Markdown
Contributor

@AHeise AHeise left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First pass.

Comment on lines +266 to +268
`CREATE OR ALTER MATERIALIZED TABLE` can convert an existing regular table into a materialized table in place. The catalog object keeps its identity and underlying storage; only the table kind and the materialized-table metadata (query definition, freshness, refresh mode, and refresh status) change. After the conversion, a refresh job is launched just as it is for a newly created materialized table.

This lets you adopt a materialized table on top of a table that already exists, without dropping and recreating it.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Swap the sentences, the technical explanation last.


**Enabling conversion**

Conversion is disabled by default. To enable it, set the following option in the cluster configuration file `config.yaml`:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add: What happens if it's disabled?


**Watermark and primary key inheritance**

When the conversion statement does not declare a `WATERMARK` or a `PRIMARY KEY`, the corresponding definition is inherited from the source table:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm what's the CoA behavior? Wouldn't we want to drop things not declared?

```

<span class="label label-danger">Note</span>
- The conversion is one-way and cannot be undone. To revert, drop the materialized table and recreate the original table.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First explain that you can simply suspend and use it as if it was a regular table for input/output.


- **If the materialized table does not exist**: Creates a new materialized table with the specified options
- **If the materialized table exists**: Modifies the query definition (behaves like `ALTER MATERIALIZED TABLE AS`)
- **If a regular table with the same name exists**: Converts it in place into a materialized table, when enabled (see [Converting a Table to a Materialized Table](#converting-a-table-to-a-materialized-table))
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whole section needs to be translated. Did you file a translation ticket already?

default:
throw new ValidationException(
String.format(
"Unsupported table kind %s for %s.",
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't change the error message unnecessarily.

context.getCatalogManager().resolveCatalogMaterializedTable(newMaterializedTable);

final List<TableChange> tableChanges =
buildConversionTableChanges(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Build changes lazily like CoA

return handleConvert(sqlCreateOrAlterMaterializedTable, oldBaseTable, context, identifier);
}

private Operation handleConvert(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

having two methods with near-identical signatures is throwing me off a bit. Can you just merge, I don't see what's the difference on them from the signature.

* @throws ValidationException if the source table carries more than one watermark
* specification, which materialized tables do not support
*/
private static Schema inheritWatermarkAndPrimaryKey(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't agree with this part. Was that in the FLIP? This destroys the idempotency mindset that we have for CoA.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What actually happens if I run the CoA twice (first conversion, then alter). Would it drop watermark/pk then?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that especially those two table properties are for consumers only. So it's a hard sell that it's needed for keeping the data intact imho.

createExistingMaterializedTable();
}

// --------------------------------------------------------------------------------------------
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nested tests are superior to comment blocks...

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants