Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import java.io.Serializable;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Optional;

/**
Expand Down Expand Up @@ -1080,6 +1081,34 @@ void createTemporarySystemFunction(
*/
Table from(String path);

/**
* Reads a registered table and applies dynamic options, returning the corresponding {@link
* Table}.
*
* <p>Dynamic options override the table's static options defined at creation time (DDL).
* This is the Table API equivalent of SQL's {@code OPTIONS} hint:
*
* <pre>{@code
* // Table API (this method)
* Table tab = tableEnv.from("kafka_table1", Map.of("scan.startup.mode", "earliest-offset"));
*
* // Equivalent SQL
* // SELECT * FROM kafka_table1 /*+ OPTIONS('scan.startup.mode'='earliest-offset') * /
* }</pre>
*
* <p>The configuration option {@code table.dynamic-table-options.enabled} must be set to
* {@code true} (the default) for dynamic options to take effect.
*
* <p>Note: Dynamic options cannot be applied to views.
*
* @param path The path of a table API object to scan.
* @param dynamicOptions A map of option key-value pairs to override on the table.
* @return The {@link Table} object describing the pipeline for further transformations.
* @throws ValidationException if the table is not found, is a view, or dynamic options are
* disabled.
*/
Table from(String path, Map<String, String> dynamicOptions);

/**
* Returns a {@link Table} backed by the given {@link TableDescriptor descriptor}.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -673,6 +673,48 @@ public Table from(String path) {
"Table %s was not found.", unresolvedIdentifier)));
}

@Override
public Table from(String path, Map<String, String> dynamicOptions) {
Preconditions.checkNotNull(dynamicOptions, "Dynamic options must not be null.");

if (!tableConfig.get(TableConfigOptions.TABLE_DYNAMIC_TABLE_OPTIONS_ENABLED)) {
throw new ValidationException(
String.format(
"Dynamic table options are disabled. Set '%s' to 'true' to enable them.",
TableConfigOptions.TABLE_DYNAMIC_TABLE_OPTIONS_ENABLED.key()));
}

UnresolvedIdentifier unresolvedIdentifier = getParser().parseIdentifier(path);
ObjectIdentifier tableIdentifier = catalogManager.qualifyIdentifier(unresolvedIdentifier);
ContextResolvedTable contextResolvedTable =
catalogManager
.getTable(tableIdentifier)
.orElseThrow(
() ->
new ValidationException(
String.format(
"Table %s was not found.",
unresolvedIdentifier)));

if (dynamicOptions.isEmpty()) {
return createTable(new SourceQueryOperation(contextResolvedTable));
}

if (contextResolvedTable.getResolvedTable().getTableKind()
== CatalogBaseTable.TableKind.VIEW) {
throw new ValidationException(
String.format(
"View '%s' cannot be enriched with dynamic options.",
unresolvedIdentifier));
}

Map<String, String> mergedOptions =
new HashMap<>(contextResolvedTable.getResolvedTable().getOptions());
mergedOptions.putAll(dynamicOptions);
ContextResolvedTable withOptions = contextResolvedTable.copy(mergedOptions);
return createTable(new SourceQueryOperation(withOptions));
}

@Override
public Table from(TableDescriptor descriptor) {
Preconditions.checkNotNull(descriptor, "Table descriptor must not be null.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,80 @@ void testListModels() {
assertThat(tEnv.listModels()).containsExactly("M1", "M2");
}

@Test
void testFromWithDynamicOptions() {
tEnv.createTable("T", TEST_DESCRIPTOR);

final Table table = tEnv.from("T", Map.of("b", "Override"));

assertThat(table.getQueryOperation())
.asInstanceOf(type(SourceQueryOperation.class))
.extracting(SourceQueryOperation::getContextResolvedTable)
.satisfies(
crt -> {
assertThat(crt.getResolvedTable().getOptions())
.contains(entry("a", "Test"), entry("b", "Override"));
});
}

@Test
void testFromWithDynamicOptionsOverridesExisting() {
tEnv.createTable("T", TEST_DESCRIPTOR);

final Table table = tEnv.from("T", Map.of("a", "Overridden"));

assertThat(table.getQueryOperation())
.asInstanceOf(type(SourceQueryOperation.class))
.extracting(SourceQueryOperation::getContextResolvedTable)
.satisfies(
crt -> {
assertThat(crt.getResolvedTable().getOptions())
.containsEntry("a", "Overridden");
});
}

@Test
void testFromWithEmptyDynamicOptions() {
tEnv.createTable("T", TEST_DESCRIPTOR);

final Table table = tEnv.from("T", Map.of());

assertThat(table.getQueryOperation())
.asInstanceOf(type(SourceQueryOperation.class))
.extracting(SourceQueryOperation::getContextResolvedTable)
.satisfies(
crt -> {
assertThat(crt.getResolvedTable().getOptions())
.containsEntry("a", "Test");
});
}

@Test
void testFromWithDynamicOptionsOnViewThrows() {
tEnv.createTable("T", TEST_DESCRIPTOR);
tEnv.createView("V", tEnv.from("T"));

assertThatThrownBy(() -> tEnv.from("V", Map.of("key", "value")))
.isInstanceOf(ValidationException.class);
}

@Test
void testFromWithDynamicOptionsDisabledThrows() {
tEnv.getConfig().set("table.dynamic-table-options.enabled", "false");
tEnv.createTable("T", TEST_DESCRIPTOR);

assertThatThrownBy(() -> tEnv.from("T", Map.of("key", "value")))
.isInstanceOf(ValidationException.class)
.hasMessageContaining("Dynamic table options are disabled");
}

@Test
void testFromWithDynamicOptionsTableNotFound() {
assertThatThrownBy(() -> tEnv.from("NonExistent", Map.of("key", "value")))
.isInstanceOf(ValidationException.class)
.hasMessageContaining("was not found");
}

private static void assertCreateTableFromDescriptor(
TableEnvironmentMock tEnv, Schema schema, boolean ignoreIfExists)
throws org.apache.flink.table.catalog.exceptions.TableNotExistException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.flink.table.types.AbstractDataType;
import org.apache.flink.util.Preconditions;

import java.util.Map;
import java.util.function.Function;

import static org.apache.flink.core.testutils.FlinkAssertions.anyCauseMatches;
Expand Down Expand Up @@ -104,6 +105,11 @@ public Table sqlQuery(String query) {
return env.sqlQuery(query);
}

@Override
public Table from(String path, Map<String, String> dynamicOptions) {
return null;
}

@Override
public Model fromModel(String modelPath) {
return env.fromModel(modelPath);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.flink.table.functions.UserDefinedFunction;
import org.apache.flink.table.types.AbstractDataType;

import java.util.Map;
import java.util.function.Function;

/** Test step for execution of a Table API. Similar to {@link SqlTestStep}. */
Expand Down Expand Up @@ -84,6 +85,11 @@ public Table sqlQuery(String query) {
return env.sqlQuery(query);
}

@Override
public Table from(String path, Map<String, String> dynamicOptions) {
return env.from(path, dynamicOptions);
}

@Override
public Model fromModel(String modelPath) {
return env.fromModel(modelPath);
Expand Down Expand Up @@ -137,6 +143,8 @@ public interface TableEnvAccessor {
/** See {@link TableEnvironment#sqlQuery(String)}. */
Table sqlQuery(String query);

Table from(String path, Map<String, String> dynamicOptions);

/** See {@link TableEnvironment#fromModel(String)}. */
Model fromModel(String modelPath);

Expand Down