diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParser.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParser.java index b2313903ba5dc..b2218c9324fad 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParser.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParser.java @@ -20,7 +20,6 @@ import org.apache.flink.connectors.hive.FlinkHiveException; import org.apache.flink.table.api.SqlParserException; -import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.catalog.Catalog; import org.apache.flink.table.catalog.CatalogManager; @@ -36,7 +35,6 @@ import org.apache.flink.table.operations.ddl.CreateTableASOperation; import org.apache.flink.table.operations.ddl.CreateTableOperation; import org.apache.flink.table.planner.calcite.FlinkPlannerImpl; -import org.apache.flink.table.planner.calcite.SqlExprToRexConverter; import org.apache.flink.table.planner.delegation.ParserImpl; import org.apache.flink.table.planner.delegation.PlannerContext; import org.apache.flink.table.planner.delegation.hive.copy.HiveASTParseException; @@ -80,7 +78,6 @@ import java.util.HashSet; import java.util.List; import java.util.Set; -import java.util.function.Function; import java.util.function.Supplier; /** A Parser that uses Hive's planner to parse a statement. */ @@ -176,13 +173,12 @@ public class HiveParser extends ParserImpl { CatalogManager catalogManager, Supplier validatorSupplier, Supplier calciteParserSupplier, - Function sqlExprToRexConverterCreator, PlannerContext plannerContext) { super( catalogManager, validatorSupplier, calciteParserSupplier, - sqlExprToRexConverterCreator); + plannerContext.getSqlExprToRexConverterFactory()); this.plannerContext = plannerContext; this.catalogReader = plannerContext.createCatalogReader( diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserFactory.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserFactory.java index b23603e1226a8..dab4c5eaa2628 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserFactory.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserFactory.java @@ -23,7 +23,6 @@ import org.apache.flink.table.catalog.CatalogManager; import org.apache.flink.table.delegation.Parser; import org.apache.flink.table.descriptors.DescriptorProperties; -import org.apache.flink.table.planner.calcite.SqlExprToRexConverterFactory; import org.apache.flink.table.planner.delegation.ParserFactory; import org.apache.flink.table.planner.delegation.PlannerContext; @@ -36,8 +35,6 @@ public class HiveParserFactory implements ParserFactory { @Override public Parser create(CatalogManager catalogManager, PlannerContext plannerContext) { - SqlExprToRexConverterFactory sqlExprToRexConverterFactory = - plannerContext::createSqlExprToRexConverter; return new HiveParser( catalogManager, () -> @@ -45,9 +42,6 @@ public Parser create(CatalogManager catalogManager, PlannerContext plannerContex catalogManager.getCurrentCatalog(), catalogManager.getCurrentDatabase()), plannerContext::createCalciteParser, - tableSchema -> - sqlExprToRexConverterFactory.create( - plannerContext.getTypeFactory().buildRelNodeRowType(tableSchema)), plannerContext); } diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java index ccf488187110a..6541a96da035b 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java @@ -261,9 +261,11 @@ protected TableEnvironmentImpl( return Optional.empty(); } }, - (sqlExpression, inputSchema) -> { + (sqlExpression, inputRowType, outputType) -> { try { - return getParser().parseSqlExpression(sqlExpression, inputSchema); + return getParser() + .parseSqlExpression( + sqlExpression, inputRowType, outputType); } catch (Throwable t) { throw new ValidationException( String.format("Invalid SQL expression: %s", sqlExpression), diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/DefaultSchemaResolver.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/DefaultSchemaResolver.java index 5b008ccd9608d..0993669ebf4cf 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/DefaultSchemaResolver.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/DefaultSchemaResolver.java @@ -144,7 +144,8 @@ private ComputedColumn resolveComputedColumn( UnresolvedComputedColumn unresolvedColumn, List inputColumns) { final ResolvedExpression resolvedExpression; try { - resolvedExpression = resolveExpression(inputColumns, unresolvedColumn.getExpression()); + resolvedExpression = + resolveExpression(inputColumns, unresolvedColumn.getExpression(), null); } catch (Exception e) { throw new ValidationException( String.format( @@ -189,22 +190,26 @@ private List resolveWatermarkSpecs( final ResolvedExpression watermarkExpression; try { watermarkExpression = - resolveExpression(inputColumns, watermarkSpec.getWatermarkExpression()); + resolveExpression( + inputColumns, + watermarkSpec.getWatermarkExpression(), + validatedTimeColumn.getDataType()); } catch (Exception e) { throw new ValidationException( String.format( "Invalid expression for watermark '%s'.", watermarkSpec.toString()), e); } - validateWatermarkExpression(watermarkExpression.getOutputDataType().getLogicalType()); + final LogicalType outputType = watermarkExpression.getOutputDataType().getLogicalType(); + final LogicalType timeColumnType = validatedTimeColumn.getDataType().getLogicalType(); + validateWatermarkExpression(outputType); - if (!(watermarkExpression.getOutputDataType().getLogicalType().getTypeRoot() - == validatedTimeColumn.getDataType().getLogicalType().getTypeRoot())) { + if (outputType.getTypeRoot() != timeColumnType.getTypeRoot()) { throw new ValidationException( String.format( - "The watermark output type %s is different from input time filed type %s.", - watermarkExpression.getOutputDataType(), - validatedTimeColumn.getDataType())); + "The watermark declaration's output data type '%s' is different " + + "from the time field's data type '%s'.", + outputType, timeColumnType)); } return Collections.singletonList( @@ -348,13 +353,15 @@ private void validatePrimaryKey(UniqueConstraint primaryKey, List column } } - private ResolvedExpression resolveExpression(List columns, Expression expression) { + private ResolvedExpression resolveExpression( + List columns, Expression expression, @Nullable DataType outputDataType) { final LocalReferenceExpression[] localRefs = columns.stream() .map(c -> localRef(c.getName(), c.getDataType())) .toArray(LocalReferenceExpression[]::new); return resolverBuilder .withLocalReferences(localRefs) + .withOutputDataType(outputDataType) .build() .resolve(Collections.singletonList(expression)) .get(0); diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/Parser.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/Parser.java index 20f07e11bd940..0ff4567db244a 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/Parser.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/Parser.java @@ -19,11 +19,14 @@ package org.apache.flink.table.delegation; import org.apache.flink.annotation.Internal; -import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.catalog.UnresolvedIdentifier; import org.apache.flink.table.expressions.ResolvedExpression; import org.apache.flink.table.operations.Operation; import org.apache.flink.table.operations.QueryOperation; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.RowType; + +import javax.annotation.Nullable; import java.util.List; @@ -58,11 +61,13 @@ public interface Parser { * Entry point for parsing SQL expressions expressed as a String. * * @param sqlExpression the SQL expression to parse - * @param inputSchema the schema of the fields in sql expression + * @param inputRowType the fields available in the SQL expression + * @param outputType expected top-level output type if available * @return resolved expression * @throws org.apache.flink.table.api.SqlParserException when failed to parse the sql expression */ - ResolvedExpression parseSqlExpression(String sqlExpression, TableSchema inputSchema); + ResolvedExpression parseSqlExpression( + String sqlExpression, RowType inputRowType, @Nullable LogicalType outputType); /** * Returns completion hints for the given statement at the given cursor position. The completion diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/ExpressionResolver.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/ExpressionResolver.java index 3c58f0f0228dd..fce73093515fe 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/ExpressionResolver.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/ExpressionResolver.java @@ -43,6 +43,8 @@ import org.apache.flink.table.types.DataType; import org.apache.flink.util.Preconditions; +import javax.annotation.Nullable; + import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -117,6 +119,8 @@ public static List getAllResolverRules() { private final Map localReferences; + private final @Nullable DataType outputDataType; + private final Map localOverWindows; private final boolean isGroupedAggregation; @@ -130,6 +134,7 @@ private ExpressionResolver( FieldReferenceLookup fieldLookup, List localOverWindows, List localReferences, + @Nullable DataType outputDataType, boolean isGroupedAggregation) { this.config = Preconditions.checkNotNull(config).getConfiguration(); this.tableLookup = Preconditions.checkNotNull(tableLookup); @@ -149,6 +154,7 @@ private ExpressionResolver( "Duplicate local reference: " + u); }, LinkedHashMap::new)); + this.outputDataType = outputDataType; this.localOverWindows = prepareOverWindows(localOverWindows); this.isGroupedAggregation = isGroupedAggregation; } @@ -323,6 +329,11 @@ public List getLocalReferences() { return new ArrayList<>(localReferences.values()); } + @Override + public Optional getOutputDataType() { + return Optional.ofNullable(outputDataType); + } + @Override public Optional getOverWindow(Expression alias) { return Optional.ofNullable(localOverWindows.get(alias)); @@ -443,6 +454,7 @@ public static class ExpressionResolverBuilder { private final SqlExpressionResolver sqlExpressionResolver; private List logicalOverWindows = new ArrayList<>(); private List localReferences = new ArrayList<>(); + private @Nullable DataType outputDataType; private boolean isGroupedAggregation; private ExpressionResolverBuilder( @@ -471,6 +483,11 @@ public ExpressionResolverBuilder withLocalReferences( return this; } + public ExpressionResolverBuilder withOutputDataType(@Nullable DataType outputDataType) { + this.outputDataType = outputDataType; + return this; + } + public ExpressionResolverBuilder withGroupedAggregation(boolean isGroupedAggregation) { this.isGroupedAggregation = isGroupedAggregation; return this; @@ -486,6 +503,7 @@ public ExpressionResolver build() { new FieldReferenceLookup(queryOperations), logicalOverWindows, localReferences, + outputDataType, isGroupedAggregation); } } diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/SqlExpressionResolver.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/SqlExpressionResolver.java index fd1bd7a0e35cb..516837df80bff 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/SqlExpressionResolver.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/SqlExpressionResolver.java @@ -19,14 +19,18 @@ package org.apache.flink.table.expressions.resolver; import org.apache.flink.annotation.Internal; -import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.expressions.ResolvedExpression; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.RowType; + +import javax.annotation.Nullable; /** Translates a SQL expression string into a {@link ResolvedExpression}. */ @Internal public interface SqlExpressionResolver { /** Translates the given SQL expression string or throws a {@link ValidationException}. */ - ResolvedExpression resolveExpression(String sqlExpression, TableSchema inputSchema); + ResolvedExpression resolveExpression( + String sqlExpression, RowType inputRowType, @Nullable LogicalType outputType); } diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolveSqlCallRule.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolveSqlCallRule.java index cf2d74befcdde..fb8de721fb49f 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolveSqlCallRule.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolveSqlCallRule.java @@ -19,13 +19,19 @@ package org.apache.flink.table.expressions.resolver.rules; import org.apache.flink.annotation.Internal; -import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.expressions.Expression; import org.apache.flink.table.expressions.ResolvedExpression; import org.apache.flink.table.expressions.SqlCallExpression; import org.apache.flink.table.expressions.UnresolvedCallExpression; import org.apache.flink.table.expressions.resolver.SqlExpressionResolver; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.logical.RowType.RowField; +import javax.annotation.Nullable; + +import java.util.ArrayList; import java.util.List; import java.util.stream.Collectors; @@ -37,32 +43,49 @@ final class ResolveSqlCallRule implements ResolverRule { @Override public List apply(List expression, ResolutionContext context) { - return expression.stream() - .map(expr -> expr.accept(new TranslateSqlCallsVisitor(context))) - .collect(Collectors.toList()); + // only the top-level expressions may access the output data type + final LogicalType outputType = + context.getOutputDataType().map(DataType::getLogicalType).orElse(null); + final TranslateSqlCallsVisitor visitor = new TranslateSqlCallsVisitor(context, outputType); + return expression.stream().map(expr -> expr.accept(visitor)).collect(Collectors.toList()); } private static class TranslateSqlCallsVisitor extends RuleExpressionVisitor { - TranslateSqlCallsVisitor(ResolutionContext resolutionContext) { + private final @Nullable LogicalType outputType; + + TranslateSqlCallsVisitor( + ResolutionContext resolutionContext, @Nullable LogicalType outputType) { super(resolutionContext); + this.outputType = outputType; } @Override public Expression visit(SqlCallExpression sqlCall) { final SqlExpressionResolver resolver = resolutionContext.sqlExpressionResolver(); - final TableSchema.Builder builder = TableSchema.builder(); + final List fields = new ArrayList<>(); // input references resolutionContext .referenceLookup() .getAllInputFields() - .forEach(f -> builder.field(f.getName(), f.getOutputDataType())); + .forEach( + f -> + fields.add( + new RowField( + f.getName(), + f.getOutputDataType().getLogicalType()))); // local references resolutionContext .getLocalReferences() - .forEach(refs -> builder.field(refs.getName(), refs.getOutputDataType())); - return resolver.resolveExpression(sqlCall.getSqlExpression(), builder.build()); + .forEach( + refs -> + fields.add( + new RowField( + refs.getName(), + refs.getOutputDataType().getLogicalType()))); + return resolver.resolveExpression( + sqlCall.getSqlExpression(), new RowType(false, fields), outputType); } @Override @@ -76,8 +99,10 @@ protected Expression defaultMethod(Expression expression) { } private List resolveChildren(List lookupChildren) { + final TranslateSqlCallsVisitor visitor = + new TranslateSqlCallsVisitor(resolutionContext, null); return lookupChildren.stream() - .map(child -> child.accept(this)) + .map(child -> child.accept(visitor)) .collect(Collectors.toList()); } } diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolverRule.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolverRule.java index bc212ce4d517d..0ee8c5e90cbad 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolverRule.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolverRule.java @@ -31,6 +31,7 @@ import org.apache.flink.table.expressions.resolver.lookups.FieldReferenceLookup; import org.apache.flink.table.expressions.resolver.lookups.TableReferenceLookup; import org.apache.flink.table.functions.FunctionDefinition; +import org.apache.flink.table.types.DataType; import java.util.List; import java.util.Optional; @@ -87,6 +88,9 @@ interface ResolutionContext { /** Access to available local references. */ List getLocalReferences(); + /** Access to the expected top-level output data type. */ + Optional getOutputDataType(); + /** Access to available local over windows. */ Optional getOverWindow(Expression alias); diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogBaseTableResolutionTest.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogBaseTableResolutionTest.java index cef78b30c2632..5085ad7b7756b 100644 --- a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogBaseTableResolutionTest.java +++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogBaseTableResolutionTest.java @@ -26,11 +26,15 @@ import org.apache.flink.table.expressions.ResolvedExpression; import org.apache.flink.table.expressions.resolver.ExpressionResolver.ExpressionResolverBuilder; import org.apache.flink.table.expressions.utils.ResolvedExpressionMock; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.RowType; import org.apache.flink.table.utils.CatalogManagerMocks; import org.apache.flink.table.utils.ExpressionResolverMocks; import org.junit.Test; +import javax.annotation.Nullable; + import java.util.Arrays; import java.util.Collections; import java.util.HashMap; @@ -268,7 +272,7 @@ private static T resolveCatalogBaseTable( } private static ResolvedExpression resolveSqlExpression( - String sqlExpression, TableSchema inputSchema) { + String sqlExpression, RowType inputRowType, @Nullable LogicalType outputType) { switch (sqlExpression) { case COMPUTED_SQL: return COMPUTED_COLUMN_RESOLVED; diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/SchemaResolutionTest.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/SchemaResolutionTest.java index aeb5bda6a242e..6e0edd65e3770 100644 --- a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/SchemaResolutionTest.java +++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/SchemaResolutionTest.java @@ -21,18 +21,20 @@ import org.apache.flink.core.testutils.FlinkMatchers; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.Schema; -import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.expressions.ResolvedExpression; import org.apache.flink.table.expressions.utils.ResolvedExpressionMock; import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.logical.LocalZonedTimestampType; import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.RowType; import org.apache.flink.table.types.logical.TimestampKind; import org.apache.flink.table.types.utils.DataTypeFactoryMock; import org.apache.flink.table.utils.ExpressionResolverMocks; import org.junit.Test; +import javax.annotation.Nullable; + import java.util.Arrays; import java.util.Collections; @@ -217,7 +219,8 @@ public void testSchemaResolutionErrors() { .column("ts", DataTypes.TIMESTAMP(3)) .watermark("ts", callSql(INVALID_WATERMARK_SQL)) .build(), - "The watermark output type TIMESTAMP_LTZ(3) is different from input time filed type TIMESTAMP(3)."); + "The watermark declaration's output data type 'TIMESTAMP_LTZ(3)' is " + + "different from the time field's data type 'TIMESTAMP(3)'."); testError( Schema.newBuilder() @@ -440,27 +443,27 @@ private static ResolvedSchema resolveSchema(Schema schema, boolean isStreamingMo } private static ResolvedExpression resolveSqlExpression( - String sqlExpression, TableSchema inputSchema) { + String sqlExpression, RowType inputRowType, @Nullable LogicalType outputType) { switch (sqlExpression) { case COMPUTED_SQL: assertThat( - inputSchema.getFieldDataType("orig_ts").orElse(null), - equalTo(DataTypes.TIMESTAMP(3))); + getType(inputRowType, "orig_ts"), + equalTo(DataTypes.TIMESTAMP(3).getLogicalType())); return COMPUTED_COLUMN_RESOLVED; case COMPUTED_SQL_WITH_TS_LTZ: assertThat( - inputSchema.getFieldDataType("ts_ltz").orElse(null), - equalTo(DataTypes.TIMESTAMP_LTZ(3))); + getType(inputRowType, "ts_ltz"), + equalTo(DataTypes.TIMESTAMP_LTZ(3).getLogicalType())); return COMPUTED_COLUMN_RESOLVED_WITH_TS_LTZ; case WATERMARK_SQL: assertThat( - inputSchema.getFieldDataType("ts").orElse(null), - equalTo(DataTypes.TIMESTAMP(3))); + getType(inputRowType, "ts"), + equalTo(DataTypes.TIMESTAMP(3).getLogicalType())); return WATERMARK_RESOLVED; case WATERMARK_SQL_WITH_TS_LTZ: assertThat( - inputSchema.getFieldDataType("ts1").orElse(null), - equalTo(DataTypes.TIMESTAMP_LTZ(3))); + getType(inputRowType, "ts1"), + equalTo(DataTypes.TIMESTAMP_LTZ(3).getLogicalType())); return WATERMARK_RESOLVED_WITH_TS_LTZ; case PROCTIME_SQL: return PROCTIME_RESOLVED; @@ -478,4 +481,9 @@ private static LogicalType getType(ResolvedSchema resolvedSchema, String column) .getDataType() .getLogicalType(); } + + private static LogicalType getType(RowType inputRowType, String field) { + final int pos = inputRowType.getFieldIndex(field); + return inputRowType.getTypeAt(pos); + } } diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/expressions/resolver/ExpressionResolverTest.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/expressions/resolver/ExpressionResolverTest.java index f48033d1bab3f..b89d09b9d8cba 100644 --- a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/expressions/resolver/ExpressionResolverTest.java +++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/expressions/resolver/ExpressionResolverTest.java @@ -332,7 +332,7 @@ public ExpressionResolver getResolver() { name -> Optional.empty(), new FunctionLookupMock(functions), new DataTypeFactoryMock(), - (sqlExpression, inputSchema) -> { + (sqlExpression, inputRowType, outputType) -> { throw new UnsupportedOperationException(); }, Arrays.stream(schemas) diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/operations/utils/ValuesOperationTreeBuilderTest.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/operations/utils/ValuesOperationTreeBuilderTest.java index a18c1bedc5b20..d011b9c3d73a1 100644 --- a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/operations/utils/ValuesOperationTreeBuilderTest.java +++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/operations/utils/ValuesOperationTreeBuilderTest.java @@ -496,7 +496,7 @@ public OperationTreeBuilder getTreeBuilder() { new FunctionLookupMock(Collections.emptyMap()), new DataTypeFactoryMock(), name -> Optional.empty(), // do not support - (sqlExpression, inputSchema) -> { + (sqlExpression, inputRowType, outputType) -> { throw new UnsupportedOperationException(); }, true); diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/ExpressionResolverMocks.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/ExpressionResolverMocks.java index 4deb785db0089..68970fe2f1dd9 100644 --- a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/ExpressionResolverMocks.java +++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/ExpressionResolverMocks.java @@ -47,7 +47,7 @@ public static ExpressionResolverBuilder forSqlExpression(SqlExpressionResolver r public static ExpressionResolverBuilder dummyResolver() { return forSqlExpression( - (sqlExpression, inputSchema) -> { + (sqlExpression, inputRowType, outputType) -> { throw new UnsupportedOperationException(); }); } diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/ParserMock.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/ParserMock.java index c56ee9ccb500e..526c682fb1d3b 100644 --- a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/ParserMock.java +++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/ParserMock.java @@ -18,11 +18,14 @@ package org.apache.flink.table.utils; -import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.catalog.UnresolvedIdentifier; import org.apache.flink.table.delegation.Parser; import org.apache.flink.table.expressions.ResolvedExpression; import org.apache.flink.table.operations.Operation; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.RowType; + +import javax.annotation.Nullable; import java.util.List; @@ -39,7 +42,8 @@ public UnresolvedIdentifier parseIdentifier(String identifier) { } @Override - public ResolvedExpression parseSqlExpression(String sqlExpression, TableSchema inputSchema) { + public ResolvedExpression parseSqlExpression( + String sqlExpression, RowType inputRowType, @Nullable LogicalType outputType) { return null; } diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/DynamicTableSink.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/DynamicTableSink.java index 74f94f80e5011..ad58228d00624 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/DynamicTableSink.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/DynamicTableSink.java @@ -20,7 +20,7 @@ import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.catalog.ResolvedSchema; import org.apache.flink.table.connector.ChangelogMode; import org.apache.flink.table.connector.ParallelismProvider; import org.apache.flink.table.connector.RuntimeConverter; @@ -96,8 +96,7 @@ public interface DynamicTableSink { * interfaces might be located in other Flink modules. * *

Independent of the provider interface, the table runtime expects that a sink - * implementation accepts internal data structures (see {@link - * org.apache.flink.table.data.RowData} for more information). + * implementation accepts internal data structures (see {@link RowData} for more information). * *

The given {@link Context} offers utilities by the planner for creating runtime * implementation with minimal dependencies to internal data structures. @@ -146,7 +145,7 @@ interface Context { * Creates type information describing the internal data structures of the given {@link * DataType}. * - * @see TableSchema#toPhysicalRowDataType() + * @see ResolvedSchema#toPhysicalRowDataType() */ TypeInformation createTypeInformation(DataType consumedDataType); diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java index ce4f359724781..7d626a26e5cd7 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java @@ -117,7 +117,7 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL) .name("SOURCE_WATERMARK") .kind(SCALAR) .inputTypeStrategy(NO_ARGS) - .outputTypeStrategy(explicit(DataTypes.TIMESTAMP(3))) + .outputTypeStrategy(TypeStrategies.SOURCE_WATERMARK) .runtimeClass( "org.apache.flink.table.runtime.functions.scalar.SourceWatermarkFunction") .build(); diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TypeStrategies.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TypeStrategies.java index 1815ed14b221c..d65b1519c4bfc 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TypeStrategies.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TypeStrategies.java @@ -69,6 +69,7 @@ public final class TypeStrategies { /** Placeholder for a missing type strategy. */ public static final TypeStrategy MISSING = new MissingTypeStrategy(); + /** Type strategy that returns a common, least restrictive type of all arguments. */ public static final TypeStrategy COMMON = new CommonTypeStrategy(); /** Type strategy that returns a fixed {@link DataType}. */ @@ -418,6 +419,21 @@ public static TypeStrategy varyingString(TypeStrategy initialStrategy) { return Optional.of(nullReplacementDataType); }; + /** Type strategy specific for source watermarks that depend on the output type. */ + public static final TypeStrategy SOURCE_WATERMARK = + callContext -> { + final DataType timestampDataType = + callContext + .getOutputDataType() + .filter( + dt -> + hasFamily( + dt.getLogicalType(), + LogicalTypeFamily.TIMESTAMP)) + .orElse(DataTypes.TIMESTAMP_LTZ(3)); + return Optional.of(timestampDataType); + }; + /** * Type strategy specific for aggregations that partially produce different nullability * depending whether the result is grouped or not. diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/calcite/FlinkCalciteSqlValidator.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/calcite/FlinkCalciteSqlValidator.java index 6ca42021fb1c1..bd5a4a812f2ee 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/calcite/FlinkCalciteSqlValidator.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/calcite/FlinkCalciteSqlValidator.java @@ -42,6 +42,7 @@ import java.math.BigDecimal; import java.util.List; +import java.util.Optional; import static org.apache.calcite.sql.type.SqlTypeName.DECIMAL; @@ -49,6 +50,10 @@ @Internal public final class FlinkCalciteSqlValidator extends SqlValidatorImpl { + // Enables CallContext#getOutputDataType() when validating SQL expressions. + private SqlNode sqlNodeForExpectedOutputType; + private RelDataType expectedOutputType; + public FlinkCalciteSqlValidator( SqlOperatorTable opTab, SqlValidatorCatalogReader catalogReader, @@ -57,6 +62,18 @@ public FlinkCalciteSqlValidator( super(opTab, catalogReader, typeFactory, config); } + public void setExpectedOutputType(SqlNode sqlNode, RelDataType expectedOutputType) { + this.sqlNodeForExpectedOutputType = sqlNode; + this.expectedOutputType = expectedOutputType; + } + + public Optional getExpectedOutputType(SqlNode sqlNode) { + if (sqlNode == sqlNodeForExpectedOutputType) { + return Optional.of(expectedOutputType); + } + return Optional.empty(); + } + @Override public void validateLiteral(SqlLiteral literal) { if (literal.getTypeName() == DECIMAL) { diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/calcite/SqlExprToRexConverter.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/calcite/SqlExprToRexConverter.java index 473d4d431e4fe..49294c1162bd4 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/calcite/SqlExprToRexConverter.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/calcite/SqlExprToRexConverter.java @@ -18,9 +18,12 @@ package org.apache.flink.table.planner.calcite; +import org.apache.flink.annotation.Internal; + import org.apache.calcite.rex.RexNode; /** Converts SQL expressions to {@link RexNode}. */ +@Internal public interface SqlExprToRexConverter { /** diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/calcite/SqlExprToRexConverterFactory.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/calcite/SqlExprToRexConverterFactory.java index e1a8cc93120c9..83ed640efb74d 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/calcite/SqlExprToRexConverterFactory.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/calcite/SqlExprToRexConverterFactory.java @@ -18,13 +18,28 @@ package org.apache.flink.table.planner.calcite; +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.RowType; + import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rex.RexNode; + +import javax.annotation.Nullable; /** Factory to create {@link SqlExprToRexConverter}. */ +@Internal public interface SqlExprToRexConverterFactory { /** - * Creates a new instance of {@link SqlExprToRexConverter} to convert SQL expression to RexNode. + * Creates a new instance of {@link SqlExprToRexConverter} to convert SQL expression to {@link + * RexNode}. + */ + SqlExprToRexConverter create(RelDataType inputRowType, @Nullable RelDataType outputType); + + /** + * Creates a new instance of {@link SqlExprToRexConverter} to convert SQL expression to {@link + * RexNode}. */ - SqlExprToRexConverter create(RelDataType tableRowType); + SqlExprToRexConverter create(RowType inputRowType, @Nullable LogicalType outputType); } diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/calcite/SqlExprToRexConverterImpl.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/calcite/SqlExprToRexConverterImpl.java index c0fa221dde0b6..8bc2d0034cbbe 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/calcite/SqlExprToRexConverterImpl.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/calcite/SqlExprToRexConverterImpl.java @@ -31,6 +31,8 @@ import org.apache.calcite.sql.SqlNode; import org.apache.calcite.tools.FrameworkConfig; +import javax.annotation.Nullable; + import java.util.Collections; import java.util.Properties; import java.util.stream.Stream; @@ -42,14 +44,17 @@ public class SqlExprToRexConverterImpl implements SqlExprToRexConverter { private final SqlDialect sqlDialect; - private final RelDataType tableRowType; + private final RelDataType inputRowType; + + private final @Nullable RelDataType outputType; public SqlExprToRexConverterImpl( FrameworkConfig config, FlinkTypeFactory typeFactory, RelOptCluster cluster, SqlDialect sqlDialect, - RelDataType tableRowType) { + RelDataType inputRowType, + @Nullable RelDataType outputType) { this.planner = new FlinkPlannerImpl( config, @@ -57,20 +62,22 @@ public SqlExprToRexConverterImpl( typeFactory, cluster); this.sqlDialect = sqlDialect; - this.tableRowType = tableRowType; + this.inputRowType = inputRowType; + this.outputType = outputType; } @Override public String expand(String expr) { final CalciteParser parser = planner.parser(); final SqlNode node = parser.parseExpression(expr); - final SqlNode validated = planner.validateExpression(node, tableRowType); + final SqlNode validated = planner.validateExpression(node, inputRowType, outputType); return validated.toSqlString(sqlDialect).getSql(); } @Override public RexNode convertToRexNode(String expr) { - return convertToRexNodes(new String[] {expr})[0]; + final CalciteParser parser = planner.parser(); + return planner.rex(parser.parseExpression(expr), inputRowType, outputType); } @Override @@ -78,7 +85,7 @@ public RexNode[] convertToRexNodes(String[] exprs) { final CalciteParser parser = planner.parser(); return Stream.of(exprs) .map(parser::parseExpression) - .map(node -> planner.rex(node, tableRowType)) + .map(node -> planner.rex(node, inputRowType, null)) .toArray(RexNode[]::new); } diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/DefaultParserFactory.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/DefaultParserFactory.java index af968a06468ff..99631f12901d4 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/DefaultParserFactory.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/DefaultParserFactory.java @@ -23,7 +23,6 @@ import org.apache.flink.table.catalog.CatalogManager; import org.apache.flink.table.delegation.Parser; import org.apache.flink.table.descriptors.DescriptorProperties; -import org.apache.flink.table.planner.calcite.SqlExprToRexConverterFactory; import java.util.Collections; import java.util.List; @@ -33,8 +32,6 @@ public class DefaultParserFactory implements ParserFactory { @Override public Parser create(CatalogManager catalogManager, PlannerContext plannerContext) { - SqlExprToRexConverterFactory sqlExprToRexConverterFactory = - plannerContext::createSqlExprToRexConverter; return new ParserImpl( catalogManager, () -> @@ -42,9 +39,7 @@ public Parser create(CatalogManager catalogManager, PlannerContext plannerContex catalogManager.getCurrentCatalog(), catalogManager.getCurrentDatabase()), plannerContext::createCalciteParser, - tableSchema -> - sqlExprToRexConverterFactory.create( - plannerContext.getTypeFactory().buildRelNodeRowType(tableSchema))); + plannerContext.getSqlExprToRexConverterFactory()); } @Override diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/ParserImpl.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/ParserImpl.java index e9136a5f5f156..e4f7d53baa768 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/ParserImpl.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/ParserImpl.java @@ -19,7 +19,6 @@ package org.apache.flink.table.planner.delegation; import org.apache.flink.table.api.TableException; -import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.catalog.CatalogManager; import org.apache.flink.table.catalog.UnresolvedIdentifier; import org.apache.flink.table.delegation.Parser; @@ -28,11 +27,13 @@ import org.apache.flink.table.planner.calcite.FlinkPlannerImpl; import org.apache.flink.table.planner.calcite.FlinkTypeFactory; import org.apache.flink.table.planner.calcite.SqlExprToRexConverter; +import org.apache.flink.table.planner.calcite.SqlExprToRexConverterFactory; import org.apache.flink.table.planner.expressions.RexNodeExpression; import org.apache.flink.table.planner.operations.SqlToOperationConverter; import org.apache.flink.table.planner.parse.CalciteParser; import org.apache.flink.table.planner.parse.ExtendedParser; import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.RowType; import org.apache.flink.table.types.utils.TypeConversions; import org.apache.calcite.rex.RexNode; @@ -41,12 +42,13 @@ import org.apache.calcite.sql.advise.SqlAdvisor; import org.apache.calcite.sql.advise.SqlAdvisorValidator; +import javax.annotation.Nullable; + import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Optional; -import java.util.function.Function; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -60,18 +62,18 @@ public class ParserImpl implements Parser { // multiple statements parsing private final Supplier validatorSupplier; private final Supplier calciteParserSupplier; - private final Function sqlExprToRexConverterCreator; + private final SqlExprToRexConverterFactory sqlExprToRexConverterFactory; private static final ExtendedParser EXTENDED_PARSER = ExtendedParser.INSTANCE; public ParserImpl( CatalogManager catalogManager, Supplier validatorSupplier, Supplier calciteParserSupplier, - Function sqlExprToRexConverterCreator) { + SqlExprToRexConverterFactory sqlExprToRexConverterFactory) { this.catalogManager = catalogManager; this.validatorSupplier = validatorSupplier; this.calciteParserSupplier = calciteParserSupplier; - this.sqlExprToRexConverterCreator = sqlExprToRexConverterCreator; + this.sqlExprToRexConverterFactory = sqlExprToRexConverterFactory; } /** @@ -109,9 +111,10 @@ public UnresolvedIdentifier parseIdentifier(String identifier) { } @Override - public ResolvedExpression parseSqlExpression(String sqlExpression, TableSchema inputSchema) { + public ResolvedExpression parseSqlExpression( + String sqlExpression, RowType inputRowType, @Nullable LogicalType outputType) { final SqlExprToRexConverter sqlExprToRexConverter = - sqlExprToRexConverterCreator.apply(inputSchema); + sqlExprToRexConverterFactory.create(inputRowType, outputType); final RexNode rexNode = sqlExprToRexConverter.convertToRexNode(sqlExpression); final LogicalType logicalType = FlinkTypeFactory.toLogicalType(rexNode.getType()); // expand expression for serializable expression strings similar to views diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/PlannerContext.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/PlannerContext.java index 2d856d9f036a7..f9c939789d01c 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/PlannerContext.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/PlannerContext.java @@ -37,6 +37,7 @@ import org.apache.flink.table.planner.calcite.FlinkTypeFactory; import org.apache.flink.table.planner.calcite.FlinkTypeSystem; import org.apache.flink.table.planner.calcite.SqlExprToRexConverter; +import org.apache.flink.table.planner.calcite.SqlExprToRexConverterFactory; import org.apache.flink.table.planner.calcite.SqlExprToRexConverterImpl; import org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable; import org.apache.flink.table.planner.codegen.ExpressionReducer; @@ -47,6 +48,8 @@ import org.apache.flink.table.planner.plan.cost.FlinkCostFactory; import org.apache.flink.table.planner.utils.JavaScalaConversionUtil; import org.apache.flink.table.planner.utils.TableConfigUtils; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.RowType; import org.apache.calcite.config.Lex; import org.apache.calcite.jdbc.CalciteSchema; @@ -69,6 +72,8 @@ import org.apache.calcite.tools.FrameworkConfig; import org.apache.calcite.tools.Frameworks; +import javax.annotation.Nullable; + import java.util.List; import static java.util.Arrays.asList; @@ -87,6 +92,8 @@ public class PlannerContext { private final RelDataTypeSystem typeSystem = new FlinkTypeSystem(); private final FlinkTypeFactory typeFactory = new FlinkTypeFactory(typeSystem); + private final SqlExprToRexConverterFactory rexConverterFactory = + new DefaultSqlExprToRexConverterFactory(); private final TableConfig tableConfig; private final RelOptCluster cluster; private final FlinkContext context; @@ -104,10 +111,7 @@ public PlannerContext( this.context = new FlinkContextImpl( - tableConfig, - functionCatalog, - catalogManager, - this::createSqlExprToRexConverter); + tableConfig, functionCatalog, catalogManager, rexConverterFactory); this.rootSchema = rootSchema; this.traitDefs = traitDefs; @@ -125,13 +129,8 @@ public PlannerContext( this.cluster = FlinkRelOptClusterFactory.create(planner, new FlinkRexBuilder(typeFactory)); } - public SqlExprToRexConverter createSqlExprToRexConverter(RelDataType rowType) { - return new SqlExprToRexConverterImpl( - checkNotNull(frameworkConfig), - checkNotNull(typeFactory), - checkNotNull(cluster), - checkNotNull(getCalciteSqlDialect()), - rowType); + public SqlExprToRexConverterFactory getSqlExprToRexConverterFactory() { + return rexConverterFactory; } public FrameworkConfig createFrameworkConfig() { @@ -328,4 +327,38 @@ private SqlOperatorTable getBuiltinSqlOperatorTable() { typeFactory), FlinkSqlOperatorTable.instance()); } + + // -------------------------------------------------------------------------------------------- + // DefaultSqlExprToRexConverterFactory + // -------------------------------------------------------------------------------------------- + + private class DefaultSqlExprToRexConverterFactory implements SqlExprToRexConverterFactory { + + @Override + public SqlExprToRexConverter create( + RelDataType inputRowType, @Nullable RelDataType outputType) { + return new SqlExprToRexConverterImpl( + checkNotNull(frameworkConfig), + checkNotNull(typeFactory), + checkNotNull(cluster), + checkNotNull(getCalciteSqlDialect()), + inputRowType, + outputType); + } + + @Override + public SqlExprToRexConverter create( + RowType inputRowType, @Nullable LogicalType outputType) { + final RelDataType convertedInputRowType = typeFactory.buildRelNodeRowType(inputRowType); + + final RelDataType convertedOutputType; + if (outputType != null) { + convertedOutputType = typeFactory.createFieldTypeFromLogicalType(outputType); + } else { + convertedOutputType = null; + } + + return create(convertedInputRowType, convertedOutputType); + } + } } diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/CallExpressionResolver.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/CallExpressionResolver.java index 9f2edacf410bf..8d37c483db35c 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/CallExpressionResolver.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/CallExpressionResolver.java @@ -52,7 +52,7 @@ public CallExpressionResolver(RelBuilder relBuilder) { "We should not need to lookup any expressions at this point"); }), context.getCatalogManager().getDataTypeFactory(), - (sqlExpression, inputSchema) -> { + (sqlExpression, inputRowType, outputType) -> { throw new TableException( "SQL expression parsing is not supported at this location."); }) diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/inference/TypeInferenceReturnInference.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/inference/TypeInferenceReturnInference.java index 3e3ffd250a22e..8bb177b6535ef 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/inference/TypeInferenceReturnInference.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/inference/TypeInferenceReturnInference.java @@ -22,6 +22,7 @@ import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.catalog.DataTypeFactory; import org.apache.flink.table.functions.FunctionDefinition; +import org.apache.flink.table.planner.calcite.FlinkCalciteSqlValidator; import org.apache.flink.table.planner.calcite.FlinkTypeFactory; import org.apache.flink.table.types.inference.CallContext; import org.apache.flink.table.types.inference.TypeInference; @@ -29,9 +30,12 @@ import org.apache.flink.table.types.logical.LogicalType; import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.sql.SqlCallBinding; import org.apache.calcite.sql.SqlOperatorBinding; import org.apache.calcite.sql.type.SqlReturnTypeInference; +import javax.annotation.Nullable; + import static org.apache.flink.table.planner.utils.ShortcutUtils.unwrapTypeFactory; import static org.apache.flink.table.types.inference.TypeInferenceUtil.createInvalidCallException; import static org.apache.flink.table.types.inference.TypeInferenceUtil.createUnexpectedException; @@ -63,7 +67,11 @@ public TypeInferenceReturnInference( @Override public RelDataType inferReturnType(SqlOperatorBinding opBinding) { final CallContext callContext = - new OperatorBindingCallContext(dataTypeFactory, definition, opBinding, null); + new OperatorBindingCallContext( + dataTypeFactory, + definition, + opBinding, + extractExpectedOutputType(opBinding)); try { return inferReturnTypeOrError(unwrapTypeFactory(opBinding), callContext); } catch (ValidationException e) { @@ -75,6 +83,16 @@ public RelDataType inferReturnType(SqlOperatorBinding opBinding) { // -------------------------------------------------------------------------------------------- + private @Nullable RelDataType extractExpectedOutputType(SqlOperatorBinding opBinding) { + if (opBinding instanceof SqlCallBinding) { + final SqlCallBinding binding = (SqlCallBinding) opBinding; + final FlinkCalciteSqlValidator validator = + (FlinkCalciteSqlValidator) binding.getValidator(); + return validator.getExpectedOutputType(binding.getCall()).orElse(null); + } + return null; + } + private RelDataType inferReturnTypeOrError( FlinkTypeFactory typeFactory, CallContext callContext) { final LogicalType inferredType = diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/abilities/source/FilterPushDownSpec.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/abilities/source/FilterPushDownSpec.java index 713eea3d4a54b..1afc2c1336a69 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/abilities/source/FilterPushDownSpec.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/abilities/source/FilterPushDownSpec.java @@ -106,7 +106,7 @@ public static SupportsFilterPushDown.Result apply( "We should not need to lookup any expressions at this point"); }), context.getCatalogManager().getDataTypeFactory(), - (sqlExpression, inputSchema) -> { + (sqlExpression, inputRowType, outputType) -> { throw new TableException( "SQL expression parsing is not supported at this location."); }) diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala index 6e3afbc79955e..e036e17713d4d 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala @@ -40,6 +40,8 @@ import org.apache.calcite.tools.{FrameworkConfig, RelConversionException} import org.apache.flink.sql.parser.ddl.SqlUseModules import org.apache.flink.table.planner.parse.CalciteParser +import javax.annotation.Nullable + import java.lang.{Boolean => JBoolean} import java.util import java.util.function.{Function => JFunction} @@ -179,35 +181,43 @@ class FlinkPlannerImpl( } } - def validateExpression(sqlNode: SqlNode, inputRowType: RelDataType): SqlNode = { - validateExpression(sqlNode, getOrCreateSqlValidator(), inputRowType) + def validateExpression( + sqlNode: SqlNode, + inputRowType: RelDataType, + @Nullable outputType: RelDataType): SqlNode = { + validateExpression(sqlNode, getOrCreateSqlValidator(), inputRowType, outputType) } private def validateExpression( sqlNode: SqlNode, sqlValidator: FlinkCalciteSqlValidator, - inputRowType: RelDataType): SqlNode = { - val nameToTypeMap = inputRowType - .getFieldList + inputRowType: RelDataType, + @Nullable outputType: RelDataType) + : SqlNode = { + val nameToTypeMap = new util.HashMap[String, RelDataType]() + inputRowType.getFieldList .asScala - .map { field => - (field.getName, field.getType) - } - .toMap[String, RelDataType] - .asJava + .foreach(f => nameToTypeMap.put(f.getName, f.getType)) + if (outputType != null) { + sqlValidator.setExpectedOutputType(sqlNode, outputType) + } sqlValidator.validateParameterizedExpression(sqlNode, nameToTypeMap) } - def rex(sqlNode: SqlNode, inputRowType: RelDataType): RexNode = { - rex(sqlNode, getOrCreateSqlValidator(), inputRowType) + def rex( + sqlNode: SqlNode, + inputRowType: RelDataType, + @Nullable outputType: RelDataType): RexNode = { + rex(sqlNode, getOrCreateSqlValidator(), inputRowType, outputType) } private def rex( sqlNode: SqlNode, sqlValidator: FlinkCalciteSqlValidator, - inputRowType: RelDataType) = { + inputRowType: RelDataType, + @Nullable outputType: RelDataType) = { try { - val validatedSqlNode = validateExpression(sqlNode, sqlValidator, inputRowType) + val validatedSqlNode = validateExpression(sqlNode, sqlValidator, inputRowType, outputType) val sqlToRelConverter = createSqlToRelConverter(sqlValidator) val nameToNodeMap = inputRowType .getFieldList diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala index 8a0902bd6d83b..e676771003a3d 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala @@ -92,11 +92,6 @@ abstract class PlannerBase( // temporary utility until we don't use planner expressions anymore functionCatalog.setPlannerTypeInferenceUtil(PlannerTypeInferenceUtilImpl.INSTANCE) - private val sqlExprToRexConverterFactory = new SqlExprToRexConverterFactory { - override def create(tableRowType: RelDataType): SqlExprToRexConverter = - plannerContext.createSqlExprToRexConverter(tableRowType) - } - private var parser: Parser = _ private var currentDialect: SqlDialect = getTableConfig.getSqlDialect @@ -110,6 +105,8 @@ abstract class PlannerBase( getTraitDefs.toList ) + private val sqlExprToRexConverterFactory = plannerContext.getSqlExprToRexConverterFactory + /** Returns the [[FlinkRelBuilder]] of this TableEnvironment. */ private[flink] def getRelBuilder: FlinkRelBuilder = { val currentCatalogName = catalogManager.getCurrentCatalog diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/schema/LegacyCatalogSourceTable.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/schema/LegacyCatalogSourceTable.scala index b60ae51d794f7..c0e8a76ad5fdd 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/schema/LegacyCatalogSourceTable.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/schema/LegacyCatalogSourceTable.scala @@ -135,7 +135,7 @@ class LegacyCatalogSourceTable[T]( s"`$name`" } }.toArray - val rexNodes = toRexFactory.create(newRelTable.getRowType).convertToRexNodes(fieldExprs) + val rexNodes = toRexFactory.create(newRelTable.getRowType, null).convertToRexNodes(fieldExprs) relBuilder.projectNamed(rexNodes.toList, fieldNames, true) } @@ -158,7 +158,7 @@ class LegacyCatalogSourceTable[T]( } val rowtimeIndex = fieldNames.indexOf(rowtime) val watermarkRexNode = toRexFactory - .create(actualRowType) + .create(actualRowType, null) .convertToRexNode(watermarkSpec.get.getWatermarkExpr) relBuilder.watermark(rowtimeIndex, watermarkRexNode) } diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/delegation/ParserImplTest.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/delegation/ParserImplTest.java index fc70477100d90..1c241735a4754 100644 --- a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/delegation/ParserImplTest.java +++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/delegation/ParserImplTest.java @@ -76,9 +76,7 @@ public class ParserImplTest { catalogManager, plannerSupplier, () -> plannerSupplier.get().parser(), - t -> - plannerContext.createSqlExprToRexConverter( - plannerContext.getTypeFactory().buildRelNodeRowType(t))); + plannerContext.getSqlExprToRexConverterFactory()); private static final List TEST_SPECS = Arrays.asList( diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java index cc5f7bfaa0c7a..26cd1cc8d6328 100644 --- a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java +++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java @@ -129,17 +129,6 @@ public class SqlToOperationConverterTest { .createFlinkPlanner( catalogManager.getCurrentCatalog(), catalogManager.getCurrentDatabase()); - private final Parser parser = - new ParserImpl( - catalogManager, - plannerSupplier, - () -> plannerSupplier.get().parser(), - t -> - getPlannerContext() - .createSqlExprToRexConverter( - getPlannerContext() - .getTypeFactory() - .buildRelNodeRowType(t))); private final PlannerContext plannerContext = new PlannerContext( tableConfig, @@ -148,6 +137,13 @@ public class SqlToOperationConverterTest { asRootSchema(new CatalogManagerCalciteSchema(catalogManager, isStreamingMode)), Collections.emptyList()); + private final Parser parser = + new ParserImpl( + catalogManager, + plannerSupplier, + () -> plannerSupplier.get().parser(), + getPlannerContext().getSqlExprToRexConverterFactory()); + private PlannerContext getPlannerContext() { return plannerContext; } diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/DataStreamJavaITCase.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/DataStreamJavaITCase.java index 9ead35740ce5d..2f0566acf8e65 100644 --- a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/DataStreamJavaITCase.java +++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/DataStreamJavaITCase.java @@ -28,11 +28,13 @@ import org.apache.flink.api.java.typeutils.TupleTypeInfo; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.ProcessFunction; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.Schema; import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.TableConfig; import org.apache.flink.table.api.TableResult; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.catalog.Column; @@ -48,6 +50,7 @@ import org.apache.flink.types.RowKind; import org.apache.flink.util.CloseableIterator; import org.apache.flink.util.CollectionUtil; +import org.apache.flink.util.Collector; import org.junit.Before; import org.junit.Test; @@ -57,6 +60,8 @@ import org.junit.runners.Parameterized.Parameters; import java.time.DayOfWeek; +import java.time.LocalDateTime; +import java.time.ZoneId; import java.time.ZoneOffset; import java.util.Arrays; import java.util.Collections; @@ -246,7 +251,7 @@ public void testFromAndToDataStreamEventTime() throws Exception { tableEnv.fromDataStream( dataStream, Schema.newBuilder() - .columnByMetadata("rowtime", "TIMESTAMP(3)") + .columnByMetadata("rowtime", "TIMESTAMP_LTZ(3)") .watermark("rowtime", "SOURCE_WATERMARK()") .build()); @@ -257,12 +262,14 @@ public void testFromAndToDataStreamEventTime() throws Exception { Column.physical("f0", DataTypes.BIGINT().notNull()), Column.physical("f1", DataTypes.INT().notNull()), Column.physical("f2", DataTypes.STRING()), - Column.metadata("rowtime", DataTypes.TIMESTAMP(3), null, false)), + Column.metadata( + "rowtime", DataTypes.TIMESTAMP_LTZ(3), null, false)), Collections.singletonList( WatermarkSpec.of( "rowtime", ResolvedExpressionMock.of( - DataTypes.TIMESTAMP(3), "`SOURCE_WATERMARK`()"))), + DataTypes.TIMESTAMP_LTZ(3), + "`SOURCE_WATERMARK`()"))), null)); tableEnv.createTemporaryView("t", table); @@ -305,7 +312,7 @@ public void testFromAndToChangelogStreamEventTime() throws Exception { tableEnv.fromChangelogStream( changelogStream, Schema.newBuilder() - .columnByMetadata("rowtime", DataTypes.TIMESTAMP(3)) + .columnByMetadata("rowtime", DataTypes.TIMESTAMP_LTZ(3)) .watermark("rowtime", "SOURCE_WATERMARK()") .build()); tableEnv.createTemporaryView("t", table); @@ -431,6 +438,62 @@ public void testFromAndToChangelogStreamUpsert() throws Exception { getOutput(inputOrOutput)); } + @Test + public void testToDataStreamCustomEventTime() throws Exception { + final TableConfig config = tableEnv.getConfig(); + + // session time zone should not have an impact on the conversion + final ZoneId originalZone = config.getLocalTimeZone(); + config.setLocalTimeZone(ZoneId.of("Europe/Berlin")); + + final LocalDateTime localDateTime1 = LocalDateTime.parse("1970-01-01T00:00:00.000"); + final LocalDateTime localDateTime2 = LocalDateTime.parse("1970-01-01T01:00:00.000"); + + final DataStream> dataStream = + env.fromElements( + new Tuple2<>(localDateTime1, "alice"), new Tuple2<>(localDateTime2, "bob")); + + final Table table = + tableEnv.fromDataStream( + dataStream, + Schema.newBuilder() + .column("f0", "TIMESTAMP(3)") + .column("f1", "STRING") + .watermark("f0", "SOURCE_WATERMARK()") + .build()); + + testSchema( + table, + new ResolvedSchema( + Arrays.asList( + Column.physical("f0", DataTypes.TIMESTAMP(3)), + Column.physical("f1", DataTypes.STRING())), + Collections.singletonList( + WatermarkSpec.of( + "f0", + ResolvedExpressionMock.of( + DataTypes.TIMESTAMP(3), "`SOURCE_WATERMARK`()"))), + null)); + + final DataStream rowtimeStream = + tableEnv.toDataStream(table) + .process( + new ProcessFunction() { + @Override + public void processElement( + Row value, Context ctx, Collector out) { + out.collect(ctx.timestamp()); + } + }); + + testResult( + rowtimeStream, + localDateTime1.atOffset(ZoneOffset.UTC).toInstant().toEpochMilli(), + localDateTime2.atOffset(ZoneOffset.UTC).toInstant().toEpochMilli()); + + config.setLocalTimeZone(originalZone); + } + // -------------------------------------------------------------------------------------------- // Helper methods // -------------------------------------------------------------------------------------------- diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/utils/PlannerMocks.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/utils/PlannerMocks.java index f7dcf76a19f1c..becbed1f53556 100644 --- a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/utils/PlannerMocks.java +++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/utils/PlannerMocks.java @@ -59,9 +59,7 @@ public static FlinkPlannerImpl createDefaultPlanner() { catalogManager, () -> planner, planner::parser, - t -> - plannerContext.createSqlExprToRexConverter( - plannerContext.getTypeFactory().buildRelNodeRowType(t))); + plannerContext.getSqlExprToRexConverterFactory()); catalogManager.initSchemaResolver( isStreamingMode, ExpressionResolver.resolverFor( diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/codegen/WatermarkGeneratorCodeGenTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/codegen/WatermarkGeneratorCodeGenTest.scala index 90c5010e8525f..00350e00479cf 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/codegen/WatermarkGeneratorCodeGenTest.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/codegen/WatermarkGeneratorCodeGenTest.scala @@ -60,27 +60,6 @@ class WatermarkGeneratorCodeGenTest(useDefinedConstructor: Boolean) { val config = new TableConfig val catalogManager: CatalogManager = CatalogManagerMocks.createEmptyCatalogManager() val functionCatalog = new FunctionCatalog(config, catalogManager, new ModuleManager) - private val sqlExprToRexConverterFactory = new SqlExprToRexConverterFactory { - override def create(tableRowType: RelDataType): SqlExprToRexConverter = - createSqlExprToRexConverter(tableRowType) - } - private val parser: Parser = new ParserImpl( - catalogManager, - new JSupplier[FlinkPlannerImpl] { - override def get(): FlinkPlannerImpl = getPlanner - }, - // we do not cache the parser in order to use the most up to - // date configuration. Users might change parser configuration in TableConfig in between - // parsing statements - new JSupplier[CalciteParser] { - override def get(): CalciteParser = plannerContext.createCalciteParser() - }, - new JFunction[TableSchema, SqlExprToRexConverter] { - override def apply(t: TableSchema): SqlExprToRexConverter = { - sqlExprToRexConverterFactory.create(plannerContext.getTypeFactory.buildRelNodeRowType(t)) - } - } - ) val plannerContext = new PlannerContext( config, functionCatalog, @@ -102,9 +81,6 @@ class WatermarkGeneratorCodeGenTest(useDefinedConstructor: Boolean) { GenericRowData.of(TimestampData.fromEpochMillis(6000L), JInt.valueOf(8)) ) - private def createSqlExprToRexConverter(tableRowType: RelDataType): SqlExprToRexConverter = - plannerContext.createSqlExprToRexConverter(tableRowType) - @Test def testAscendingWatermark(): Unit = { val generator = generateWatermarkGenerator("ts - INTERVAL '0.001' SECOND", @@ -202,7 +178,7 @@ class WatermarkGeneratorCodeGenTest(useDefinedConstructor: Boolean) { .getContext .unwrap(classOf[FlinkContext]) .getSqlExprToRexConverterFactory - .create(tableRowType) + .create(tableRowType, null) val rexNode = converter.convertToRexNode(expr) if (useDefinedConstructor) { diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala index 595eb41d98b59..1dbbb54c354fd 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala @@ -251,7 +251,7 @@ class FlinkRelMdHandlerTestBase { .unwrap(classOf[FlinkContext]) val watermarkRexNode = flinkContext .getSqlExprToRexConverterFactory - .create(scan.getTable.getRowType) + .create(scan.getTable.getRowType, null) .convertToRexNode("rowtime - INTERVAL '10' SECOND") relBuilder.push(scan) diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/ParserImpl.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/ParserImpl.java index 2c2c4ef4c486f..11c0d028bc9e2 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/ParserImpl.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/ParserImpl.java @@ -19,7 +19,6 @@ package org.apache.flink.table.planner; import org.apache.flink.table.api.TableException; -import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.calcite.FlinkPlannerImpl; import org.apache.flink.table.catalog.CatalogManager; import org.apache.flink.table.catalog.UnresolvedIdentifier; @@ -29,12 +28,16 @@ import org.apache.flink.table.parse.CalciteParser; import org.apache.flink.table.parse.ExtendedParser; import org.apache.flink.table.sqlexec.SqlToOperationConverter; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.RowType; import org.apache.calcite.sql.SqlIdentifier; import org.apache.calcite.sql.SqlNode; import org.apache.calcite.sql.advise.SqlAdvisor; import org.apache.calcite.sql.advise.SqlAdvisorValidator; +import javax.annotation.Nullable; + import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -105,7 +108,8 @@ public UnresolvedIdentifier parseIdentifier(String identifier) { } @Override - public ResolvedExpression parseSqlExpression(String sqlExpression, TableSchema inputSchema) { + public ResolvedExpression parseSqlExpression( + String sqlExpression, RowType inputRowType, @Nullable LogicalType outputType) { throw new UnsupportedOperationException( "Computed columns is only supported by the Blink planner."); } diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala index 89845502c55ba..0cadaefb32b10 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala @@ -44,6 +44,7 @@ import org.apache.flink.table.parse.CalciteParser import org.apache.flink.table.planner.{ParserImpl, PlanningConfigurationBuilder} import org.apache.flink.table.sinks.{BatchSelectTableSink, BatchTableSink, OutputFormatTableSink, OverwritableTableSink, PartitionableTableSink, TableSink, TableSinkUtils} import org.apache.flink.table.sources.TableSource +import org.apache.flink.table.types.logical.{LogicalType, RowType} import org.apache.flink.table.types.{AbstractDataType, DataType} import org.apache.flink.table.util.JavaScalaConversionUtil import org.apache.flink.table.utils.PrintUtils @@ -112,7 +113,10 @@ abstract class TableEnvImpl( catalogManager.getDataTypeFactory, tableLookup, new SqlExpressionResolver { - override def resolveExpression(sqlExpression: String, inputSchema: TableSchema) + override def resolveExpression( + sqlExpression: String, + inputRowType: RowType, + outputType: LogicalType) : ResolvedExpression = { throw new UnsupportedOperationException( "SQL expression parsing is only supported in the Blink planner.") diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/sink/OutputConversionOperator.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/sink/OutputConversionOperator.java index c24775f7b6513..c8cf4fab7009f 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/sink/OutputConversionOperator.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/sink/OutputConversionOperator.java @@ -70,9 +70,11 @@ public void processElement(StreamRecord element) throws Exception { final RowData rowData = element.getValue(); if (consumeRowtimeMetadata) { + // timestamp is TIMESTAMP_LTZ final long rowtime = rowData.getTimestamp(rowData.getArity() - 1, 3).getMillisecond(); outRecord.setTimestamp(rowtime); } else if (rowtimeIndex != -1) { + // timestamp might be TIMESTAMP or TIMESTAMP_LTZ final long rowtime = rowData.getTimestamp(rowtimeIndex, 3).getMillisecond(); outRecord.setTimestamp(rowtime); }