From 956b290e8c64ad1dd75a53ad53d70b1849548d8e Mon Sep 17 00:00:00 2001 From: Shengkai <1059623455@qq.com> Date: Fri, 24 Jul 2020 19:32:52 +0800 Subject: [PATCH 1/7] [FLINK-16384][table sql/client] Support SHOW CREATE TABLE statement --- docs/dev/table/hive/hive_dialect.md | 0 docs/dev/table/hive/hive_dialect.zh.md | 0 docs/dev/table/sql/show.md | 0 docs/dev/table/sql/show.zh.md | 0 .../table/client/cli/SqlCommandParser.java | 0 .../client/cli/SqlCommandParserTest.java | 0 .../src/main/codegen/data/Parser.tdd | 2 + .../src/main/codegen/includes/parserImpls.ftl | 16 ++++ .../hive/FlinkHiveSqlParserImplTest.java | 6 ++ .../src/main/codegen/data/Parser.tdd | 2 + .../src/main/codegen/includes/parserImpls.ftl | 16 ++++ .../sql/parser/dql/SqlShowCreateTable.java | 69 +++++++++++++++ .../sql/parser/FlinkSqlParserImplTest.java | 6 ++ .../api/internal/TableEnvironmentImpl.java | 85 ++++++++++++++++++- .../operations/ShowCreateTableOperation.java | 42 +++++++++ .../operations/SqlToOperationConverter.java | 13 ++- .../planner/catalog/CatalogTableITCase.scala | 24 ++++++ .../sqlexec/SqlToOperationConverter.java | 17 +++- .../table/api/internal/TableEnvImpl.scala | 84 ++++++++++++++++++ .../table/catalog/CatalogTableITCase.scala | 20 +++++ 20 files changed, 398 insertions(+), 4 deletions(-) create mode 100644 docs/dev/table/hive/hive_dialect.md create mode 100644 docs/dev/table/hive/hive_dialect.zh.md create mode 100644 docs/dev/table/sql/show.md create mode 100644 docs/dev/table/sql/show.zh.md create mode 100644 flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/SqlCommandParser.java create mode 100644 flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/SqlCommandParserTest.java create mode 100644 flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlShowCreateTable.java create mode 100644 flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowCreateTableOperation.java diff --git a/docs/dev/table/hive/hive_dialect.md b/docs/dev/table/hive/hive_dialect.md new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/docs/dev/table/hive/hive_dialect.zh.md b/docs/dev/table/hive/hive_dialect.zh.md new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/docs/dev/table/sql/show.md b/docs/dev/table/sql/show.md new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/docs/dev/table/sql/show.zh.md b/docs/dev/table/sql/show.zh.md new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/SqlCommandParser.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/SqlCommandParser.java new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/SqlCommandParserTest.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/SqlCommandParserTest.java new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/flink-table/flink-sql-parser-hive/src/main/codegen/data/Parser.tdd b/flink-table/flink-sql-parser-hive/src/main/codegen/data/Parser.tdd index a778bf7e9f1d2..920145476b2ee 100644 --- a/flink-table/flink-sql-parser-hive/src/main/codegen/data/Parser.tdd +++ b/flink-table/flink-sql-parser-hive/src/main/codegen/data/Parser.tdd @@ -87,6 +87,7 @@ "org.apache.flink.sql.parser.dql.SqlShowFunctions" "org.apache.flink.sql.parser.dql.SqlShowModules" "org.apache.flink.sql.parser.dql.SqlShowTables" + "org.apache.flink.sql.parser.dql.SqlShowCreateTable" "org.apache.flink.sql.parser.dql.SqlShowPartitions" "org.apache.flink.sql.parser.dql.SqlRichDescribeTable" "org.apache.flink.sql.parser.dql.SqlRichExplain" @@ -530,6 +531,7 @@ "SqlAlterDatabase()" "SqlDescribeDatabase()" "SqlShowTables()" + "SqlShowCreateTable()" "SqlRichDescribeTable()" "SqlShowFunctions()" "SqlAlterTable()" diff --git a/flink-table/flink-sql-parser-hive/src/main/codegen/includes/parserImpls.ftl b/flink-table/flink-sql-parser-hive/src/main/codegen/includes/parserImpls.ftl index 6457f32a144e4..d1324704992c2 100644 --- a/flink-table/flink-sql-parser-hive/src/main/codegen/includes/parserImpls.ftl +++ b/flink-table/flink-sql-parser-hive/src/main/codegen/includes/parserImpls.ftl @@ -252,6 +252,22 @@ SqlShowTables SqlShowTables() : } } +/** +* Parse a "Show Create Table" query command. +*/ +SqlShowCreateTable SqlShowCreateTable() : +{ + SqlIdentifier tableName; + SqlParserPos pos; +} +{ + { pos = getPos();} + tableName = CompoundIdentifier() + { + return new SqlShowCreateTable(pos, tableName); + } +} + /** * Here we add Rich in className to distinguish from calcite's original SqlDescribeTable. */ diff --git a/flink-table/flink-sql-parser-hive/src/test/java/org/apache/flink/sql/parser/hive/FlinkHiveSqlParserImplTest.java b/flink-table/flink-sql-parser-hive/src/test/java/org/apache/flink/sql/parser/hive/FlinkHiveSqlParserImplTest.java index 0bdf11b619178..5d053b12e96a6 100644 --- a/flink-table/flink-sql-parser-hive/src/test/java/org/apache/flink/sql/parser/hive/FlinkHiveSqlParserImplTest.java +++ b/flink-table/flink-sql-parser-hive/src/test/java/org/apache/flink/sql/parser/hive/FlinkHiveSqlParserImplTest.java @@ -111,6 +111,12 @@ public void testShowTables() { sql("show tables").ok("SHOW TABLES"); } + @Test + public void testShowCreateTable() { + sql("show create table tbl").ok("SHOW CREATE TABLE `TBL`"); + sql("show create table catalog1.db1.tbl").ok("SHOW CREATE TABLE `CATALOG1`.`DB1`.`TBL`"); + } + @Test public void testDescribeTable() { // TODO: support describe partition and columns diff --git a/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd b/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd index 5ad878b684cf6..8134d906428af 100644 --- a/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd +++ b/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd @@ -69,6 +69,7 @@ "org.apache.flink.sql.parser.dql.SqlShowFunctions" "org.apache.flink.sql.parser.dql.SqlShowModules" "org.apache.flink.sql.parser.dql.SqlShowTables" + "org.apache.flink.sql.parser.dql.SqlShowCreateTable" "org.apache.flink.sql.parser.dql.SqlShowViews" "org.apache.flink.sql.parser.dql.SqlRichDescribeTable" "org.apache.flink.sql.parser.dql.SqlUnloadModule" @@ -478,6 +479,7 @@ "SqlAlterFunction()" "SqlShowFunctions()" "SqlShowTables()" + "SqlShowCreateTable()" "SqlRichDescribeTable()" "SqlAlterTable()" "SqlShowModules()" diff --git a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl index b4cfc34d8fbae..f65b8fe1c5876 100644 --- a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl +++ b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl @@ -422,6 +422,22 @@ SqlShowTables SqlShowTables() : } } +/** +* Parse a "Show Create Table" query command. +*/ +SqlShowCreateTable SqlShowCreateTable() : +{ + SqlIdentifier tableName; + SqlParserPos pos; +} +{ +
{ pos = getPos();} + tableName = CompoundIdentifier() + { + return new SqlShowCreateTable(pos, tableName); + } +} + /** * DESCRIBE | DESC [ EXTENDED] [[catalogName.] dataBasesName].tableName sql call. * Here we add Rich in className to distinguish from calcite's original SqlDescribeTable. diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlShowCreateTable.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlShowCreateTable.java new file mode 100644 index 0000000000000..e6be0e67152b2 --- /dev/null +++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlShowCreateTable.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.sql.parser.dql; + +import org.apache.calcite.sql.SqlCall; +import org.apache.calcite.sql.SqlIdentifier; +import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.SqlOperator; +import org.apache.calcite.sql.SqlSpecialOperator; +import org.apache.calcite.sql.SqlWriter; +import org.apache.calcite.sql.parser.SqlParserPos; + +import java.util.Collections; +import java.util.List; + +/** + * SHOW CREATE TABLE sql call. + * */ +public class SqlShowCreateTable extends SqlCall { + + public static final SqlSpecialOperator OPERATOR = new SqlSpecialOperator("SHOW CREATE TABLE", SqlKind.OTHER_DDL); + private final SqlIdentifier tableName; + + public SqlShowCreateTable(SqlParserPos pos, SqlIdentifier tableName) { + super(pos); + this.tableName = tableName; + } + + public SqlIdentifier getTableName() { + return tableName; + } + + public String[] getFullTableName() { + return tableName.names.toArray(new String[0]); + } + + @Override + public SqlOperator getOperator() { + return OPERATOR; + } + + @Override + public List getOperandList() { + return Collections.singletonList(tableName); + } + + @Override + public void unparse(SqlWriter writer, int leftPrec, int rightPrec) { + writer.keyword("SHOW CREATE TABLE"); + tableName.unparse(writer, leftPrec, rightPrec); + } +} diff --git a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java index a222321f3ac97..e3aaba262450e 100644 --- a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java +++ b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java @@ -191,6 +191,12 @@ public void testShowTables() { sql("show tables").ok("SHOW TABLES"); } + @Test + public void testShowCreateTable() { + sql("show create table tbl").ok("SHOW CREATE TABLE `TBL`"); + sql("show create table catalog1.db1.tbl").ok("SHOW CREATE TABLE `CATALOG1`.`DB1`.`TBL`"); + } + @Test public void testDescribeTable() { sql("describe tbl").ok("DESCRIBE `TBL`"); diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java index 1259cdf5e289e..c40fbabe5543b 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java @@ -39,6 +39,7 @@ import org.apache.flink.table.api.TableResult; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.api.constraints.UniqueConstraint; import org.apache.flink.table.catalog.Catalog; import org.apache.flink.table.catalog.CatalogBaseTable; import org.apache.flink.table.catalog.CatalogFunction; @@ -96,6 +97,7 @@ import org.apache.flink.table.operations.QueryOperation; import org.apache.flink.table.operations.SelectSinkOperation; import org.apache.flink.table.operations.ShowCatalogsOperation; +import org.apache.flink.table.operations.ShowCreateTableOperation; import org.apache.flink.table.operations.ShowCurrentCatalogOperation; import org.apache.flink.table.operations.ShowCurrentDatabaseOperation; import org.apache.flink.table.operations.ShowDatabasesOperation; @@ -178,6 +180,7 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal { private final ModuleManager moduleManager; private final OperationTreeBuilder operationTreeBuilder; private final List bufferedModifyOperations = new ArrayList<>(); + private final String printIndent = " "; protected final TableConfig tableConfig; protected final Executor execEnv; @@ -1121,7 +1124,18 @@ public TableResult executeInternal(Operation operation) { return TableResultImpl.TABLE_RESULT_OK; } else if (operation instanceof ShowCatalogsOperation) { return buildShowResult("catalog name", listCatalogs()); - } else if (operation instanceof ShowCurrentCatalogOperation) { + } else if (operation instanceof ShowCreateTableOperation) { + ShowCreateTableOperation showCreateTableOperation = (ShowCreateTableOperation) operation; + Optional result = + catalogManager.getTable(showCreateTableOperation.getSqlIdentifier()); + if (result.isPresent()) { + return buildShowCreateTableResult(result.get().getTable(), ((ShowCreateTableOperation) operation).getSqlIdentifier()); + } else { + throw new ValidationException(String.format( + "Table with identifier '%s' does not exist.", + showCreateTableOperation.getSqlIdentifier().asSummaryString())); + } + } else if (operation instanceof ShowCurrentCatalogOperation) { return buildShowResult( "current catalog name", new String[] {catalogManager.getCurrentCatalog()}); } else if (operation instanceof ShowDatabasesOperation) { @@ -1293,6 +1307,75 @@ private TableResult buildShowResult(String columnName, String[] objects) { Arrays.stream(objects).map((c) -> new String[] {c}).toArray(String[][]::new)); } + private TableResult buildShowCreateTableResult(CatalogBaseTable table, ObjectIdentifier sqlIdentifier) { + StringBuilder sb = new StringBuilder("CREATE TABLE "); + TableSchema schema = table.getSchema(); + String comment = table.getComment(); + Map options = table.getOptions(); + + sb.append(String.format("`%s` (\n", sqlIdentifier.getObjectName())); + // append columns + sb.append(String.join(",\n", + schema + .getTableColumns() + .stream() + .map(col -> { + if (col.getExpr().isPresent()) { + return String.format("%s`%s` AS %s", printIndent, col.getName(), col.getExpr().get()); + } else { + return String.format("%s`%s` %s", printIndent, col.getName(), col.getType()); + } + }).collect(Collectors.toList()))); + + // append watermark spec + if (!schema.getWatermarkSpecs().isEmpty()) { + sb.append(",\n") // add delimiter for last line + .append(String.join(",\n", schema.getWatermarkSpecs().stream().map( + sepc -> String.format("%sWATERMARK FOR `%s` AS %s", printIndent, sepc.getRowtimeAttribute(), sepc.getWatermarkExpr()) + ).collect(Collectors.toList()))); + } + // append constraint + if (schema.getPrimaryKey().isPresent()) { + UniqueConstraint constraint = schema.getPrimaryKey().get(); + sb.append(",\n") // add delimiter for last line + .append(String.format("%s%s", printIndent, constraint.asCanonicalString())); + } + sb.append("\n) "); + // append comment + if (comment != null) { + sb.append(String.format("COMMENT '%s'\n", comment)); + } + // append partitions + if (table instanceof CatalogTable) { + CatalogTable catalogTable = (CatalogTable) table; + if (catalogTable.isPartitioned()) { + sb.append("PARTITIONED BY (") + .append(String.join(", ", + catalogTable + .getPartitionKeys() + .stream() + .map(key -> String.format("`%s`", key)) + .collect(Collectors.toList()))) + .append(")\n"); + } + } + // append `with` properties + sb.append("WITH (\n") + .append(String.join(",\n", + options + .entrySet() + .stream() + .map(entry -> String.format("%s'%s' = '%s'", printIndent, entry.getKey(), entry.getValue())) + .collect(Collectors.toList()))) + .append("\n)\n"); + + Object[][] rows = new Object[][]{new Object[]{sb.toString()}}; + return buildResult( + new String[]{"create table"}, + new DataType[]{DataTypes.STRING()}, + rows); + } + private TableResult buildShowFullModulesResult(ModuleEntry[] moduleEntries) { Object[][] rows = Arrays.stream(moduleEntries) diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowCreateTableOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowCreateTableOperation.java new file mode 100644 index 0000000000000..83374dca0d516 --- /dev/null +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowCreateTableOperation.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.operations; + +import org.apache.flink.table.catalog.ObjectIdentifier; + +/** + * Operation to describe a SHOW CREATE TABLE statement. + * */ +public class ShowCreateTableOperation implements ShowOperation{ + + private final ObjectIdentifier sqlIdentifier; + + public ShowCreateTableOperation(ObjectIdentifier sqlIdentifier) { + this.sqlIdentifier = sqlIdentifier; + } + + public ObjectIdentifier getSqlIdentifier() { + return sqlIdentifier; + } + + @Override + public String asSummaryString() { + return String.format("SHOW CREATE TABLE %s", sqlIdentifier.asSummaryString()); + } +} diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java index 538b9094e1c6c..3fe4a1259f10f 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java @@ -55,6 +55,7 @@ import org.apache.flink.sql.parser.dql.SqlRichDescribeTable; import org.apache.flink.sql.parser.dql.SqlRichExplain; import org.apache.flink.sql.parser.dql.SqlShowCatalogs; +import org.apache.flink.sql.parser.dql.SqlShowCreateTable; import org.apache.flink.sql.parser.dql.SqlShowCurrentCatalog; import org.apache.flink.sql.parser.dql.SqlShowCurrentDatabase; import org.apache.flink.sql.parser.dql.SqlShowDatabases; @@ -94,6 +95,7 @@ import org.apache.flink.table.operations.LoadModuleOperation; import org.apache.flink.table.operations.Operation; import org.apache.flink.table.operations.ShowCatalogsOperation; +import org.apache.flink.table.operations.ShowCreateTableOperation; import org.apache.flink.table.operations.ShowCurrentCatalogOperation; import org.apache.flink.table.operations.ShowCurrentDatabaseOperation; import org.apache.flink.table.operations.ShowDatabasesOperation; @@ -256,7 +258,9 @@ public static Optional convert( return Optional.of(converter.convertDropFunction((SqlDropFunction) validated)); } else if (validated instanceof SqlAlterFunction) { return Optional.of(converter.convertAlterFunction((SqlAlterFunction) validated)); - } else if (validated instanceof SqlShowFunctions) { + } else if (validated instanceof SqlShowCreateTable) { + return Optional.of(converter.convertShowCreateTable((SqlShowCreateTable) validated)); + } else if (validated instanceof SqlShowFunctions) { return Optional.of(converter.convertShowFunctions((SqlShowFunctions) validated)); } else if (validated instanceof SqlShowPartitions) { return Optional.of(converter.convertShowPartitions((SqlShowPartitions) validated)); @@ -781,6 +785,13 @@ private Operation convertShowTables(SqlShowTables sqlShowTables) { return new ShowTablesOperation(); } + /** Convert SHOW CREATE TABLE statement. */ + private Operation convertShowCreateTable(SqlShowCreateTable sqlShowCreateTable) { + UnresolvedIdentifier unresolvedIdentifier = UnresolvedIdentifier.of(sqlShowCreateTable.getFullTableName()); + ObjectIdentifier identifier = catalogManager.qualifyIdentifier(unresolvedIdentifier); + return new ShowCreateTableOperation(identifier); + } + /** Convert SHOW FUNCTIONS statement. */ private Operation convertShowFunctions(SqlShowFunctions sqlShowFunctions) { return new ShowFunctionsOperation( diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/catalog/CatalogTableITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/catalog/CatalogTableITCase.scala index da8a69c9f1b68..3dc5159d8361e 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/catalog/CatalogTableITCase.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/catalog/CatalogTableITCase.scala @@ -985,6 +985,30 @@ class CatalogTableITCase(isStreamingMode: Boolean) extends AbstractTestBase { assertEquals(false, tableSchema2.getPrimaryKey.isPresent) } + @Test + def testCreateTableAndShowCreateTable(): Unit = { + val createDDL = + """ |CREATE TABLE `TBL1` ( + | `A` BIGINT NOT NULL, + | `H` STRING, + | `G` AS 2 * (`A` + 1), + | `B` STRING NOT NULL, + | `TS` TIMESTAMP(3), + | `PROC` AS PROCTIME(), + | WATERMARK FOR `TS` AS `TS` - INTERVAL '5' SECOND, + | CONSTRAINT test_constraint PRIMARY KEY (`A`, `B`) NOT ENFORCED + |) COMMENT 'test show create table statement' + |PARTITIONED BY (`B`, `H`) + |WITH ( + | 'connector' = 'kafka', + | 'kafka.topic' = 'log.test' + |) + |""".stripMargin + tableEnv.executeSql(createDDL) + val row = tableEnv.executeSql("SHOW CREATE TABLE `TBL1`").collect().next(); + assertEquals(createDDL, row.getField(0)) + } + @Test def testUseCatalogAndShowCurrentCatalog(): Unit = { tableEnv.registerCatalog("cat1", new GenericInMemoryCatalog("cat1")) diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sqlexec/SqlToOperationConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sqlexec/SqlToOperationConverter.java index 70acb49ec6347..d4178aa0658d1 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sqlexec/SqlToOperationConverter.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sqlexec/SqlToOperationConverter.java @@ -39,6 +39,8 @@ import org.apache.flink.sql.parser.dql.SqlRichDescribeTable; import org.apache.flink.sql.parser.dql.SqlRichExplain; import org.apache.flink.sql.parser.dql.SqlShowCatalogs; +import org.apache.flink.sql.parser.dql.SqlShowCreateTable; +import org.apache.flink.sql.parser.dql.SqlShowCreateTable; import org.apache.flink.sql.parser.dql.SqlShowCurrentCatalog; import org.apache.flink.sql.parser.dql.SqlShowCurrentDatabase; import org.apache.flink.sql.parser.dql.SqlShowDatabases; @@ -71,6 +73,8 @@ import org.apache.flink.table.operations.Operation; import org.apache.flink.table.operations.PlannerQueryOperation; import org.apache.flink.table.operations.ShowCatalogsOperation; +import org.apache.flink.table.operations.ShowCreateTableOperation; +import org.apache.flink.table.operations.ShowCreateTableOperation; import org.apache.flink.table.operations.ShowCurrentCatalogOperation; import org.apache.flink.table.operations.ShowCurrentDatabaseOperation; import org.apache.flink.table.operations.ShowDatabasesOperation; @@ -180,7 +184,9 @@ public static Optional convert( return Optional.of(converter.convertAlterTable((SqlAlterTable) validated)); } else if (validated instanceof SqlShowTables) { return Optional.of(converter.convertShowTables((SqlShowTables) validated)); - } else if (validated instanceof SqlCreateView) { + } else if (validated instanceof SqlShowCreateTable) { + return Optional.of(converter.convertShowCreateTable((SqlShowCreateTable) validated)); + } else if (validated instanceof SqlCreateView) { return Optional.of(converter.convertCreateView((SqlCreateView) validated)); } else if (validated instanceof SqlDropView) { return Optional.of(converter.convertDropView((SqlDropView) validated)); @@ -571,7 +577,14 @@ private Operation convertShowTables(SqlShowTables sqlShowTables) { return new ShowTablesOperation(); } - /** Convert SHOW FUNCTIONS statement. */ + /** Convert SHOW CREATE TABLE statement. */ + private Operation convertShowCreateTable(SqlShowCreateTable sqlShowCreateTable) { + UnresolvedIdentifier unresolvedIdentifier = UnresolvedIdentifier.of(sqlShowCreateTable.getFullTableName()); + ObjectIdentifier identifier = catalogManager.qualifyIdentifier(unresolvedIdentifier); + return new ShowCreateTableOperation(identifier); + } + + /** Convert SHOW FUNCTIONS statement. */ private Operation convertShowFunctions(SqlShowFunctions sqlShowFunctions) { return new ShowFunctionsOperation( sqlShowFunctions.requireUser() ? FunctionScope.USER : FunctionScope.ALL); diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala index 91846baaf359c..d7ed5bd06e54a 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala @@ -158,6 +158,8 @@ abstract class TableEnvImpl( "SHOW CATALOGS, SHOW DATABASES, SHOW TABLES, SHOW FUNCTIONS, CREATE VIEW, DROP VIEW, " + "SHOW VIEWS, INSERT, DESCRIBE." + private val PRINT_INDENT = " " + private def isStreamingMode: Boolean = this match { case _: BatchTableEnvImpl => false case _ => true @@ -784,6 +786,15 @@ abstract class TableEnvImpl( buildShowResult("current database name", Array(catalogManager.getCurrentDatabase)) case _: ShowTablesOperation => buildShowResult("table name", listTables()) + case showCreateTableOperation: ShowCreateTableOperation => + val result = catalogManager.getTable(showCreateTableOperation.getSqlIdentifier) + if (result.isPresent) { + buildShowCreateResult(result.get().getTable, showCreateTableOperation.getSqlIdentifier) + } else { + throw new ValidationException(String.format( + "Table or view with identifier '%s' doesn't exist", + showCreateTableOperation.getSqlIdentifier.asSummaryString())) + } case showFunctionsOperation: ShowFunctionsOperation => val functionScope = showFunctionsOperation.getFunctionScope() val functionNames = functionScope match { @@ -853,6 +864,79 @@ abstract class TableEnvImpl( buildResult(Array(columnName), Array(DataTypes.STRING), rows) } + private def buildShowCreateResult( + table: CatalogBaseTable, + sqlIdentifier: ObjectIdentifier): TableResult = { + val schema = table.getSchema + val fieldCount = schema.getFieldCount + val fieldDecls = + new Array[String](fieldCount + + schema.getWatermarkSpecs.length + + (if (schema.getPrimaryKey.isPresent) 1 else 0)) + + schema.getTableColumns().asScala.zipWithIndex.foreach { + case (col, ind) => { + fieldDecls(ind) = + if (col.getExpr.isPresent) { + String.format("%s`%s` AS %s", PRINT_INDENT, col.getName, col.getExpr.get) + } else { + String.format("%s`%s` %s", PRINT_INDENT, col.getName, col.getType) + } + } + } + if (!schema.getWatermarkSpecs.isEmpty) { + schema.getWatermarkSpecs.asScala.zipWithIndex.foreach { + case (sepc, ind) => { + fieldDecls(fieldCount + ind) = + String.format("%sWATERMARK FOR `%s` AS `%s`", + PRINT_INDENT, sepc.getRowtimeAttribute, sepc.getWatermarkExpr) + } + } + } + if (schema.getPrimaryKey.isPresent) { + fieldDecls(fieldCount + schema.getWatermarkSpecs.length) = + String.format("%s%s", PRINT_INDENT, schema.getPrimaryKey.get.asCanonicalString()) + } + + val withOptions = table.getOptions.map( + option => String.format("%s'%s' = '%s'", PRINT_INDENT, option._1, option._2) + ).toArray + + // assemble all components + val sb = StringBuilder.newBuilder + sb.append(String.format("CREATE TABLE `%s` (\n", sqlIdentifier.getObjectName)) + .append(fieldDecls.mkString(",\n")) + .append("\n) ") + + if (table.getComment != null) { + sb.append(s"COMMENT '${table.getComment}'\n") + } + + table match { + case partitionedTable: CatalogTable => { + if (partitionedTable.isPartitioned) { + val keys = partitionedTable.getPartitionKeys + if (keys.length > 0) { + sb.append("PARTITIONED BY (") + .append(partitionedTable + .getPartitionKeys + .map(key => String.format("`%s`", key)) + .toArray + .mkString(", ")) + .append(")\n") + } + } + } + } + sb.append(s"WITH (\n${withOptions.mkString(",\n")}\n)\n") + + buildResult( + Array("create table"), + Array(DataTypes.STRING), + Array(Array(sb.toString)) + ) + } + private def buildDescribeResult(schema: TableSchema): TableResult = { val fieldToWatermark = schema diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/catalog/CatalogTableITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/catalog/CatalogTableITCase.scala index de3c69fc8923e..20689f3145a8b 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/catalog/CatalogTableITCase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/catalog/CatalogTableITCase.scala @@ -750,6 +750,26 @@ class CatalogTableITCase(isStreaming: Boolean) extends AbstractTestBase { assertEquals(expectedProperty, database.getProperties) } + @Test + def testExecuteSqlWithCreateAndShowCreateTable(): Unit = { + val createDDL = + """|CREATE TABLE `TBL1` ( + | `A` BIGINT, + | `H` STRING, + | `B` STRING, + | `TS` TIMESTAMP(3) + |) COMMENT 'test show create table statement' + |PARTITIONED BY (`B`, `H`) + |WITH ( + | 'connector' = 'kafka', + | 'kafka.topic' = 'log.test' + |) + |""".stripMargin + tableEnv.executeSql(createDDL) + val showResult = tableEnv.executeSql("SHOW CREATE TABLE `TBL1`").collect().next(); + assertEquals(createDDL, showResult.getField(0)) + } + @Test(expected = classOf[ValidationException]) def testCreateViewTwice(): Unit = { val sourceData = List( From e7301817d4de3454583df6db449addd992a8872b Mon Sep 17 00:00:00 2001 From: Jane Chan Date: Mon, 12 Apr 2021 21:01:38 +0800 Subject: [PATCH 2/7] [FLINK-16384][table] Support 'SHOW CREATE TABLE' statement. This commit tries to 1. resolve the conflicts 2. revert the changes made on old planner 3. apply spotless formatting 4. fix DDL missing `TEMPORARY` keyword for temporary table 5. display table's full object path as catalog.db.table 6. support displaying the expanded query for view 7. add view test in CatalogTableITCase, and adapt sql client test to the new test framework 8. adapt docs --- .../connectors/table/hive/hive_dialect.md | 1 + docs/content.zh/docs/dev/table/sql/show.md | 94 +++++++- .../connectors/table/hive/hive_dialect.md | 1 + docs/content/docs/dev/table/sql/show.md | 95 +++++++- docs/dev/table/hive/hive_dialect.md | 0 docs/dev/table/hive/hive_dialect.zh.md | 0 docs/dev/table/sql/show.md | 0 docs/dev/table/sql/show.zh.md | 0 .../table/client/cli/SqlCommandParser.java | 0 .../client/cli/SqlCommandParserTest.java | 0 .../src/test/resources/sql/table.q | 59 +++++ .../src/test/resources/sql/view.q | 13 + .../hive/FlinkHiveSqlParserImplTest.java | 8 +- .../sql/parser/dql/SqlShowCreateTable.java | 55 +++-- .../sql/parser/FlinkSqlParserImplTest.java | 8 +- .../api/internal/TableEnvironmentImpl.java | 223 +++++++++++------- .../operations/ShowCreateTableOperation.java | 28 +-- .../flink/table/catalog/UniqueConstraint.java | 27 +-- .../operations/SqlToOperationConverter.java | 15 +- .../planner/catalog/CatalogTableITCase.scala | 84 +++++-- .../sqlexec/SqlToOperationConverter.java | 17 +- .../table/api/internal/TableEnvImpl.scala | 84 ------- .../table/catalog/CatalogTableITCase.scala | 20 -- 23 files changed, 542 insertions(+), 290 deletions(-) delete mode 100644 docs/dev/table/hive/hive_dialect.md delete mode 100644 docs/dev/table/hive/hive_dialect.zh.md delete mode 100644 docs/dev/table/sql/show.md delete mode 100644 docs/dev/table/sql/show.zh.md delete mode 100644 flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/SqlCommandParser.java delete mode 100644 flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/SqlCommandParserTest.java diff --git a/docs/content.zh/docs/connectors/table/hive/hive_dialect.md b/docs/content.zh/docs/connectors/table/hive/hive_dialect.md index c70554429a4f2..3d6815f3914d0 100644 --- a/docs/content.zh/docs/connectors/table/hive/hive_dialect.md +++ b/docs/content.zh/docs/connectors/table/hive/hive_dialect.md @@ -162,6 +162,7 @@ USE database_name; ```sql SHOW TABLES; +SHOW CREATE TABLE [catalog_name.][db_name.]table_name; ``` #### Create diff --git a/docs/content.zh/docs/dev/table/sql/show.md b/docs/content.zh/docs/dev/table/sql/show.md index 0054d665dfd24..a4b5a15291aa4 100644 --- a/docs/content.zh/docs/dev/table/sql/show.md +++ b/docs/content.zh/docs/dev/table/sql/show.md @@ -28,7 +28,7 @@ under the License. -SHOW 语句用于列出所有的 catalog,或者列出当前 catalog 中所有的 database,或者列出当前 catalog 和当前 database 的所有表或视图,或者列出当前正在使用的 catalog 和 database, 或者列出当前 catalog 和当前 database 中所有的 function,包括:系统 function 和用户定义的 function,或者仅仅列出当前 catalog 和当前 database 中用户定义的 function,或者列出当前环境所有激活的 module,或者列出当前环境所有加载的 module 及激活状态。 +SHOW 语句用于列出所有的 catalog,或者列出当前 catalog 中所有的 database,或者列出当前 catalog 和当前 database 的所有表或视图,或者列出当前正在使用的 catalog 和 database, 或者列出创建指定表或视图的语句,或者列出当前 catalog 和当前 database 中所有的 function,包括:系统 function 和用户定义的 function,或者仅仅列出当前 catalog 和当前 database 中用户定义的 function,或者列出当前环境所有激活的 module,或者列出当前环境所有加载的 module 及激活状态。 目前 Flink SQL 支持下列 SHOW 语句: - SHOW CATALOGS @@ -36,6 +36,7 @@ SHOW 语句用于列出所有的 catalog,或者列出当前 catalog 中所有 - SHOW DATABASES - SHOW CURRENT DATABASE - SHOW TABLES +- SHOW CREATE TABLE - SHOW VIEWS - SHOW FUNCTIONS - SHOW MODULES @@ -120,6 +121,18 @@ tEnv.executeSql("SHOW TABLES").print(); // | my_table | // +------------+ +// show create table +tEnv.executeSql("SHOW CREATE TABLE my_table").print(); +// +----------------------------------------------------------+ +// | create table | +// +----------------------------------------------------------+ +// | CREATE TABLE `default_catalog`.`default_db`.`my_table` ( | +// | ... | +// | ) WITH ( | +// | ... | +// | ) | +// +----------------------------------------------------------+ + // create a view tEnv.executeSql("CREATE VIEW my_view AS ..."); // show views @@ -129,6 +142,15 @@ tEnv.executeSql("SHOW VIEWS").print(); // +-----------+ // | my_view | // +-----------+ +// show create table for view +tEnv.executeSql("SHOW CREATE TABLE my_view").print(); +// +---------------------------------------------------------+ +// | create table | +// +---------------------------------------------------------+ +// | CREATE VIEW `default_catalog`.`default_db`.`my_view` AS | +// | ... | +// | | +// +---------------------------------------------------------+ // show functions tEnv.executeSql("SHOW FUNCTIONS").print(); @@ -201,6 +223,17 @@ tEnv.executeSql("SHOW TABLES").print() // | my_table | // +------------+ +// show create table +tEnv.executeSql("SHOW CREATE TABLE my_table").print() +// +----------------------------------------------------------+ +// | create table | +// +----------------------------------------------------------+ +// | CREATE TABLE `default_catalog`.`default_db`.`my_table` ( | +// | ... | +// | ) WITH ( | +// | ... | +// | ) | +// +----------------------------------------------------------+ // create a view tEnv.executeSql("CREATE VIEW my_view AS ...") // show views @@ -210,6 +243,15 @@ tEnv.executeSql("SHOW VIEWS").print() // +-----------+ // | my_view | // +-----------+ +// show create table for view +tEnv.executeSql("SHOW CREATE TABLE my_view").print() +// +---------------------------------------------------------+ +// | create table | +// +---------------------------------------------------------+ +// | CREATE VIEW `default_catalog`.`default_db`.`my_view` AS | +// | ... | +// | | +// +---------------------------------------------------------+ // show functions tEnv.executeSql("SHOW FUNCTIONS").print() @@ -281,6 +323,17 @@ table_env.execute_sql("SHOW TABLES").print() # +------------+ # | my_table | # +------------+ +# show create table +table_env.executeSql("SHOW CREATE TABLE my_table").print() +# +----------------------------------------------------------+ +# | create table | +# +----------------------------------------------------------+ +# | CREATE TABLE `default_catalog`.`default_db`.`my_table` ( | +# | ... | +# | ) WITH ( | +# | ... | +# | ) | +# +----------------------------------------------------------+ # create a view table_env.execute_sql("CREATE VIEW my_view AS ...") @@ -291,6 +344,15 @@ table_env.execute_sql("SHOW VIEWS").print() # +-----------+ # | my_view | # +-----------+ +# show create table for view +table_env.execute_sql("SHOW CREATE TABLE my_view").print() +# +---------------------------------------------------------+ +# | create table | +# +---------------------------------------------------------+ +# | CREATE VIEW `default_catalog`.`default_db`.`my_view` AS | +# | ... | +# | | +# +---------------------------------------------------------+ # show functions table_env.execute_sql("SHOW FUNCTIONS").print() @@ -346,12 +408,34 @@ Flink SQL> CREATE TABLE my_table (...) WITH (...); Flink SQL> SHOW TABLES; my_table +Flink SQL> SHOW CREATE TABLE my_table; ++----------------------------------------------------------+ +| create table | ++----------------------------------------------------------+ +| CREATE TABLE `default_catalog`.`default_db`.`my_table` ( | +| ... | +| ) WITH ( | +| ... | +| ) | ++----------------------------------------------------------+ +1 row in set + Flink SQL> CREATE VIEW my_view AS ...; [INFO] View has been created. Flink SQL> SHOW VIEWS; my_view +Flink SQL> SHOW CREATE TABLE my_view; ++---------------------------------------------------------+ +| create table | ++---------------------------------------------------------+ +| CREATE VIEW `default_catalog`.`default_db`.`my_view` AS | +| ... | +| | ++---------------------------------------------------------+ +1 row in set + Flink SQL> SHOW FUNCTIONS; mod sha256 @@ -428,6 +512,14 @@ SHOW TABLES 展示当前 catalog 和当前 database 中所有的表。 +## SHOW CREATE TABLE + +```sql +SHOW CREATE TABLE [catalog_name.][db_name.]table_name +``` + +展示创建指定表或视图的 create 语句。 + ## SHOW VIEWS ```sql diff --git a/docs/content/docs/connectors/table/hive/hive_dialect.md b/docs/content/docs/connectors/table/hive/hive_dialect.md index a1c71fdeff0d6..ad4ebf062f036 100644 --- a/docs/content/docs/connectors/table/hive/hive_dialect.md +++ b/docs/content/docs/connectors/table/hive/hive_dialect.md @@ -170,6 +170,7 @@ USE database_name; ```sql SHOW TABLES; +SHOW CREATE TABLE [catalog_name.][db_name.]table_name; ``` #### Create diff --git a/docs/content/docs/dev/table/sql/show.md b/docs/content/docs/dev/table/sql/show.md index 5314281eca9ff..56544a4a47451 100644 --- a/docs/content/docs/dev/table/sql/show.md +++ b/docs/content/docs/dev/table/sql/show.md @@ -26,7 +26,7 @@ under the License. # SHOW Statements -SHOW statements are used to list all catalogs, or list all databases in the current catalog, or list all tables/views in the current catalog and the current database, or show current catalog and database, or list all functions including system functions and user-defined functions in the current catalog and current database, or list only user-defined functions in the current catalog and current database, or list enabled module names, or list all loaded modules with enabled status in the current session. +SHOW statements are used to list all catalogs, or list all databases in the current catalog, or list all tables/views in the current catalog and the current database, or show current catalog and database, or show create statement for specified table/view, or list all functions including system functions and user-defined functions in the current catalog and current database, or list only user-defined functions in the current catalog and current database, or list enabled module names, or list all loaded modules with enabled status in the current session. Flink SQL supports the following SHOW statements for now: - SHOW CATALOGS @@ -34,6 +34,7 @@ Flink SQL supports the following SHOW statements for now: - SHOW DATABASES - SHOW CURRENT DATABASE - SHOW TABLES +- SHOW CREATE TABLE - SHOW VIEWS - SHOW FUNCTIONS - SHOW MODULES @@ -119,6 +120,18 @@ tEnv.executeSql("SHOW TABLES").print(); // | my_table | // +------------+ +// show create table +tEnv.executeSql("SHOW CREATE TABLE my_table").print(); +// +----------------------------------------------------------+ +// | create table | +// +----------------------------------------------------------+ +// | CREATE TABLE `default_catalog`.`default_db`.`my_table` ( | +// | ... | +// | ) WITH ( | +// | ... | +// | ) | +// +----------------------------------------------------------+ + // create a view tEnv.executeSql("CREATE VIEW my_view AS ..."); // show views @@ -128,6 +141,15 @@ tEnv.executeSql("SHOW VIEWS").print(); // +-----------+ // | my_view | // +-----------+ +// show create table for view +tEnv.executeSql("SHOW CREATE TABLE my_view").print(); +// +---------------------------------------------------------+ +// | create table | +// +---------------------------------------------------------+ +// | CREATE VIEW `default_catalog`.`default_db`.`my_view` AS | +// | ... | +// | | +// +---------------------------------------------------------+ // show functions tEnv.executeSql("SHOW FUNCTIONS").print(); @@ -200,6 +222,17 @@ tEnv.executeSql("SHOW TABLES").print() // | my_table | // +------------+ +// show create table +tEnv.executeSql("SHOW CREATE TABLE my_table").print() +// +----------------------------------------------------------+ +// | create table | +// +----------------------------------------------------------+ +// | CREATE TABLE `default_catalog`.`default_db`.`my_table` ( | +// | ... | +// | ) WITH ( | +// | ... | +// | ) | +// +----------------------------------------------------------+ // create a view tEnv.executeSql("CREATE VIEW my_view AS ...") // show views @@ -209,6 +242,15 @@ tEnv.executeSql("SHOW VIEWS").print() // +-----------+ // | my_view | // +-----------+ +// show create table for view +tEnv.executeSql("SHOW CREATE TABLE my_view").print() +// +---------------------------------------------------------+ +// | create table | +// +---------------------------------------------------------+ +// | CREATE VIEW `default_catalog`.`default_db`.`my_view` AS | +// | ... | +// | | +// +---------------------------------------------------------+ // show functions tEnv.executeSql("SHOW FUNCTIONS").print() @@ -280,6 +322,17 @@ table_env.execute_sql("SHOW TABLES").print() # +------------+ # | my_table | # +------------+ +# show create table +table_env.executeSql("SHOW CREATE TABLE my_table").print() +# +----------------------------------------------------------+ +# | create table | +# +----------------------------------------------------------+ +# | CREATE TABLE `default_catalog`.`default_db`.`my_table` ( | +# | ... | +# | ) WITH ( | +# | ... | +# | ) | +# +----------------------------------------------------------+ # create a view table_env.execute_sql("CREATE VIEW my_view AS ...") @@ -290,6 +343,15 @@ table_env.execute_sql("SHOW VIEWS").print() # +-----------+ # | my_view | # +-----------+ +# show create table for view +table_env.execute_sql("SHOW CREATE TABLE my_view").print() +# +---------------------------------------------------------+ +# | create table | +# +---------------------------------------------------------+ +# | CREATE VIEW `default_catalog`.`default_db`.`my_view` AS | +# | ... | +# | | +# +---------------------------------------------------------+ # show functions table_env.execute_sql("SHOW FUNCTIONS").print() @@ -345,12 +407,34 @@ Flink SQL> CREATE TABLE my_table (...) WITH (...); Flink SQL> SHOW TABLES; my_table +Flink SQL> SHOW CREATE TABLE my_table; ++----------------------------------------------------------+ +| create table | ++----------------------------------------------------------+ +| CREATE TABLE `default_catalog`.`default_db`.`my_table` ( | +| ... | +| ) WITH ( | +| ... | +| ) | ++----------------------------------------------------------+ +1 row in set + Flink SQL> CREATE VIEW my_view AS ...; [INFO] View has been created. Flink SQL> SHOW VIEWS; my_view +Flink SQL> SHOW CREATE TABLE my_view; ++---------------------------------------------------------+ +| create table | ++---------------------------------------------------------+ +| CREATE VIEW `default_catalog`.`default_db`.`my_view` AS | +| ... | +| | ++---------------------------------------------------------+ +1 row in set + Flink SQL> SHOW FUNCTIONS; mod sha256 @@ -427,6 +511,15 @@ SHOW TABLES Show all tables in the current catalog and the current database. + +## SHOW CREATE TABLE + +```sql +SHOW CREATE TABLE +``` + +Show create table statement for specified table or view. + ## SHOW VIEWS ```sql diff --git a/docs/dev/table/hive/hive_dialect.md b/docs/dev/table/hive/hive_dialect.md deleted file mode 100644 index e69de29bb2d1d..0000000000000 diff --git a/docs/dev/table/hive/hive_dialect.zh.md b/docs/dev/table/hive/hive_dialect.zh.md deleted file mode 100644 index e69de29bb2d1d..0000000000000 diff --git a/docs/dev/table/sql/show.md b/docs/dev/table/sql/show.md deleted file mode 100644 index e69de29bb2d1d..0000000000000 diff --git a/docs/dev/table/sql/show.zh.md b/docs/dev/table/sql/show.zh.md deleted file mode 100644 index e69de29bb2d1d..0000000000000 diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/SqlCommandParser.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/SqlCommandParser.java deleted file mode 100644 index e69de29bb2d1d..0000000000000 diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/SqlCommandParserTest.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/SqlCommandParserTest.java deleted file mode 100644 index e69de29bb2d1d..0000000000000 diff --git a/flink-table/flink-sql-client/src/test/resources/sql/table.q b/flink-table/flink-sql-client/src/test/resources/sql/table.q index b89c746b28e2c..a871d7ea41597 100644 --- a/flink-table/flink-sql-client/src/test/resources/sql/table.q +++ b/flink-table/flink-sql-client/src/test/resources/sql/table.q @@ -73,6 +73,27 @@ show tables; 1 row in set !ok +# test SHOW CREATE TABLE +show create table orders; ++-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ +| create table | ++-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ +| CREATE TABLE `default_catalog`.`default_database`.`orders` ( + `user` BIGINT NOT NULL, + `product` VARCHAR(32), + `amount` INT, + `ts` TIMESTAMP(3), + `ptime` AS PROCTIME(), + WATERMARK FOR `ts` AS `ts` - INTERVAL '1' SECOND, + CONSTRAINT `PK_3599338` PRIMARY KEY (`user`) NOT ENFORCED +) WITH ( + 'connector' = 'datagen' +) + | ++-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ +1 row in set +!ok + # ========================================================================== # test alter table # ========================================================================== @@ -117,6 +138,27 @@ desc orders2; 5 rows in set !ok +# test SHOW CREATE TABLE +show create table orders2; ++------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ +| create table | ++------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ +| CREATE TABLE `default_catalog`.`default_database`.`orders2` ( + `user` BIGINT NOT NULL, + `product` VARCHAR(32), + `amount` INT, + `ts` TIMESTAMP(3), + `ptime` AS PROCTIME(), + WATERMARK FOR `ts` AS `ts` - INTERVAL '1' SECOND, + CONSTRAINT `PK_3599338` PRIMARY KEY (`user`) NOT ENFORCED +) WITH ( + 'connector' = 'kafka' +) + | ++------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ +1 row in set +!ok + # ========================================================================== # test drop table # ========================================================================== @@ -165,6 +207,23 @@ show tables; 1 row in set !ok +# SHOW CREATE TABLE for temporary table +show create table tbl1; ++------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ +| create table | ++------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ +| CREATE TEMPORARY TABLE `default_catalog`.`default_database`.`tbl1` ( + `user` BIGINT NOT NULL, + `product` VARCHAR(32), + `amount` INT +) WITH ( + 'connector' = 'datagen' +) + | ++------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ +1 row in set +!ok + drop temporary table tbl1; [INFO] Execute statement succeed. !info diff --git a/flink-table/flink-sql-client/src/test/resources/sql/view.q b/flink-table/flink-sql-client/src/test/resources/sql/view.q index e8e9d624830f2..5342cf3b5531f 100644 --- a/flink-table/flink-sql-client/src/test/resources/sql/view.q +++ b/flink-table/flink-sql-client/src/test/resources/sql/view.q @@ -72,6 +72,19 @@ show views; 2 rows in set !ok +# test SHOW CREATE TABLE for views +show create table v1; ++------------------------------------------------------------------------------------------------------------------------+ +| create table | ++------------------------------------------------------------------------------------------------------------------------+ +| CREATE TEMPORARY VIEW `default_catalog`.`default_database`.`v1` AS +SELECT * +FROM `default_catalog`.`default_database`.`orders` + | ++------------------------------------------------------------------------------------------------------------------------+ +1 row in set +!ok + # ==== test permanent view ===== # register a permanent view with the duplicate name with temporary view diff --git a/flink-table/flink-sql-parser-hive/src/test/java/org/apache/flink/sql/parser/hive/FlinkHiveSqlParserImplTest.java b/flink-table/flink-sql-parser-hive/src/test/java/org/apache/flink/sql/parser/hive/FlinkHiveSqlParserImplTest.java index 5d053b12e96a6..4b9cb9212ca6a 100644 --- a/flink-table/flink-sql-parser-hive/src/test/java/org/apache/flink/sql/parser/hive/FlinkHiveSqlParserImplTest.java +++ b/flink-table/flink-sql-parser-hive/src/test/java/org/apache/flink/sql/parser/hive/FlinkHiveSqlParserImplTest.java @@ -112,10 +112,10 @@ public void testShowTables() { } @Test - public void testShowCreateTable() { - sql("show create table tbl").ok("SHOW CREATE TABLE `TBL`"); - sql("show create table catalog1.db1.tbl").ok("SHOW CREATE TABLE `CATALOG1`.`DB1`.`TBL`"); - } + public void testShowCreateTable() { + sql("show create table tbl").ok("SHOW CREATE TABLE `TBL`"); + sql("show create table catalog1.db1.tbl").ok("SHOW CREATE TABLE `CATALOG1`.`DB1`.`TBL`"); + } @Test public void testDescribeTable() { diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlShowCreateTable.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlShowCreateTable.java index e6be0e67152b2..447a044c7e37c 100644 --- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlShowCreateTable.java +++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlShowCreateTable.java @@ -30,40 +30,39 @@ import java.util.Collections; import java.util.List; -/** - * SHOW CREATE TABLE sql call. - * */ +/** SHOW CREATE TABLE sql call. */ public class SqlShowCreateTable extends SqlCall { - public static final SqlSpecialOperator OPERATOR = new SqlSpecialOperator("SHOW CREATE TABLE", SqlKind.OTHER_DDL); - private final SqlIdentifier tableName; + public static final SqlSpecialOperator OPERATOR = + new SqlSpecialOperator("SHOW CREATE TABLE", SqlKind.OTHER_DDL); + private final SqlIdentifier tableName; - public SqlShowCreateTable(SqlParserPos pos, SqlIdentifier tableName) { - super(pos); - this.tableName = tableName; - } + public SqlShowCreateTable(SqlParserPos pos, SqlIdentifier tableName) { + super(pos); + this.tableName = tableName; + } - public SqlIdentifier getTableName() { - return tableName; - } + public SqlIdentifier getTableName() { + return tableName; + } - public String[] getFullTableName() { - return tableName.names.toArray(new String[0]); - } + public String[] getFullTableName() { + return tableName.names.toArray(new String[0]); + } - @Override - public SqlOperator getOperator() { - return OPERATOR; - } + @Override + public SqlOperator getOperator() { + return OPERATOR; + } - @Override - public List getOperandList() { - return Collections.singletonList(tableName); - } + @Override + public List getOperandList() { + return Collections.singletonList(tableName); + } - @Override - public void unparse(SqlWriter writer, int leftPrec, int rightPrec) { - writer.keyword("SHOW CREATE TABLE"); - tableName.unparse(writer, leftPrec, rightPrec); - } + @Override + public void unparse(SqlWriter writer, int leftPrec, int rightPrec) { + writer.keyword("SHOW CREATE TABLE"); + tableName.unparse(writer, leftPrec, rightPrec); + } } diff --git a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java index e3aaba262450e..53dc861a5efc3 100644 --- a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java +++ b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java @@ -192,10 +192,10 @@ public void testShowTables() { } @Test - public void testShowCreateTable() { - sql("show create table tbl").ok("SHOW CREATE TABLE `TBL`"); - sql("show create table catalog1.db1.tbl").ok("SHOW CREATE TABLE `CATALOG1`.`DB1`.`TBL`"); - } + public void testShowCreateTable() { + sql("show create table tbl").ok("SHOW CREATE TABLE `TBL`"); + sql("show create table catalog1.db1.tbl").ok("SHOW CREATE TABLE `CATALOG1`.`DB1`.`TBL`"); + } @Test public void testDescribeTable() { diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java index c40fbabe5543b..37c50bf4b932f 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java @@ -39,7 +39,6 @@ import org.apache.flink.table.api.TableResult; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.api.ValidationException; -import org.apache.flink.table.api.constraints.UniqueConstraint; import org.apache.flink.table.catalog.Catalog; import org.apache.flink.table.catalog.CatalogBaseTable; import org.apache.flink.table.catalog.CatalogFunction; @@ -55,6 +54,9 @@ import org.apache.flink.table.catalog.ObjectIdentifier; import org.apache.flink.table.catalog.ObjectPath; import org.apache.flink.table.catalog.QueryOperationCatalogView; +import org.apache.flink.table.catalog.ResolvedCatalogBaseTable; +import org.apache.flink.table.catalog.ResolvedCatalogTable; +import org.apache.flink.table.catalog.ResolvedCatalogView; import org.apache.flink.table.catalog.ResolvedSchema; import org.apache.flink.table.catalog.UnresolvedIdentifier; import org.apache.flink.table.catalog.WatermarkSpec; @@ -146,11 +148,15 @@ import org.apache.flink.table.types.AbstractDataType; import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.LogicalTypeRoot; +import org.apache.flink.table.utils.EncodingUtils; import org.apache.flink.table.utils.PrintUtils; import org.apache.flink.table.utils.TableSchemaUtils; import org.apache.flink.types.Row; import org.apache.flink.util.Preconditions; +import org.apache.commons.lang3.StringUtils; + import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -1125,17 +1131,24 @@ public TableResult executeInternal(Operation operation) { } else if (operation instanceof ShowCatalogsOperation) { return buildShowResult("catalog name", listCatalogs()); } else if (operation instanceof ShowCreateTableOperation) { - ShowCreateTableOperation showCreateTableOperation = (ShowCreateTableOperation) operation; - Optional result = - catalogManager.getTable(showCreateTableOperation.getSqlIdentifier()); - if (result.isPresent()) { - return buildShowCreateTableResult(result.get().getTable(), ((ShowCreateTableOperation) operation).getSqlIdentifier()); - } else { - throw new ValidationException(String.format( - "Table with identifier '%s' does not exist.", - showCreateTableOperation.getSqlIdentifier().asSummaryString())); - } - } else if (operation instanceof ShowCurrentCatalogOperation) { + ShowCreateTableOperation showCreateTableOperation = + (ShowCreateTableOperation) operation; + Optional result = + catalogManager.getTable(showCreateTableOperation.getSqlIdentifier()); + if (result.isPresent()) { + return buildShowResult( + "create table", + buildShowCreateTableRow( + result.get().getResolvedTable(), + ((ShowCreateTableOperation) operation).getSqlIdentifier(), + result.get().isTemporary())); + } else { + throw new ValidationException( + String.format( + "Table with identifier '%s' does not exist.", + showCreateTableOperation.getSqlIdentifier().asSummaryString())); + } + } else if (operation instanceof ShowCurrentCatalogOperation) { return buildShowResult( "current catalog name", new String[] {catalogManager.getCurrentCatalog()}); } else if (operation instanceof ShowDatabasesOperation) { @@ -1307,74 +1320,124 @@ private TableResult buildShowResult(String columnName, String[] objects) { Arrays.stream(objects).map((c) -> new String[] {c}).toArray(String[][]::new)); } - private TableResult buildShowCreateTableResult(CatalogBaseTable table, ObjectIdentifier sqlIdentifier) { - StringBuilder sb = new StringBuilder("CREATE TABLE "); - TableSchema schema = table.getSchema(); - String comment = table.getComment(); - Map options = table.getOptions(); - - sb.append(String.format("`%s` (\n", sqlIdentifier.getObjectName())); - // append columns - sb.append(String.join(",\n", - schema - .getTableColumns() - .stream() - .map(col -> { - if (col.getExpr().isPresent()) { - return String.format("%s`%s` AS %s", printIndent, col.getName(), col.getExpr().get()); - } else { - return String.format("%s`%s` %s", printIndent, col.getName(), col.getType()); - } - }).collect(Collectors.toList()))); - - // append watermark spec - if (!schema.getWatermarkSpecs().isEmpty()) { - sb.append(",\n") // add delimiter for last line - .append(String.join(",\n", schema.getWatermarkSpecs().stream().map( - sepc -> String.format("%sWATERMARK FOR `%s` AS %s", printIndent, sepc.getRowtimeAttribute(), sepc.getWatermarkExpr()) - ).collect(Collectors.toList()))); - } - // append constraint - if (schema.getPrimaryKey().isPresent()) { - UniqueConstraint constraint = schema.getPrimaryKey().get(); - sb.append(",\n") // add delimiter for last line - .append(String.format("%s%s", printIndent, constraint.asCanonicalString())); - } - sb.append("\n) "); - // append comment - if (comment != null) { - sb.append(String.format("COMMENT '%s'\n", comment)); - } - // append partitions - if (table instanceof CatalogTable) { - CatalogTable catalogTable = (CatalogTable) table; - if (catalogTable.isPartitioned()) { - sb.append("PARTITIONED BY (") - .append(String.join(", ", - catalogTable - .getPartitionKeys() - .stream() - .map(key -> String.format("`%s`", key)) - .collect(Collectors.toList()))) - .append(")\n"); - } - } - // append `with` properties - sb.append("WITH (\n") - .append(String.join(",\n", - options - .entrySet() - .stream() - .map(entry -> String.format("%s'%s' = '%s'", printIndent, entry.getKey(), entry.getValue())) - .collect(Collectors.toList()))) - .append("\n)\n"); - - Object[][] rows = new Object[][]{new Object[]{sb.toString()}}; - return buildResult( - new String[]{"create table"}, - new DataType[]{DataTypes.STRING()}, - rows); - } + private String[] buildShowCreateTableRow( + ResolvedCatalogBaseTable table, + ObjectIdentifier sqlIdentifier, + boolean isTemporary) { + CatalogBaseTable.TableKind kind = table.getTableKind(); + StringBuilder sb = + new StringBuilder( + String.format( + "CREATE%s%s%s", + isTemporary ? " TEMPORARY" : "", + kind == CatalogBaseTable.TableKind.TABLE ? " TABLE " : " VIEW ", + sqlIdentifier.asSerializableString())); + if (kind == CatalogBaseTable.TableKind.TABLE) { + sb.append(" (\n"); + ResolvedSchema schema = table.getResolvedSchema(); + // append columns + sb.append( + schema.getColumns().stream() + .map( + column -> + String.format( + "%s%s", printIndent, getColumnString(column))) + .collect(Collectors.joining(",\n"))); + // append watermark spec + if (!schema.getWatermarkSpecs().isEmpty()) { + sb.append(",\n"); + sb.append( + schema.getWatermarkSpecs().stream() + .map( + watermarkSpec -> + String.format( + "%sWATERMARK FOR %s AS %s", + printIndent, + String.join( + ".", + EncodingUtils.escapeIdentifier( + watermarkSpec + .getRowtimeAttribute())), + watermarkSpec + .getWatermarkExpression() + .asSummaryString())) + .collect(Collectors.joining("\n"))); + } + // append constraint + if (schema.getPrimaryKey().isPresent()) { + sb.append(",\n"); + sb.append(String.format("%s%s", printIndent, schema.getPrimaryKey().get())); + } + sb.append("\n) "); + // append comment + String comment = table.getComment(); + if (StringUtils.isNotEmpty(comment)) { + sb.append(String.format("COMMENT '%s'\n", comment)); + } + + // append partitions + ResolvedCatalogTable catalogTable = (ResolvedCatalogTable) table; + if (catalogTable.isPartitioned()) { + sb.append("PARTITIONED BY (") + .append( + catalogTable.getPartitionKeys().stream() + .map(key -> String.format("`%s`", key)) + .collect(Collectors.joining(", "))) + .append(")\n"); + } + // append `with` properties + Map options = table.getOptions(); + sb.append("WITH (\n") + .append( + options.entrySet().stream() + .map( + entry -> + String.format( + "%s'%s' = '%s'", + printIndent, + entry.getKey(), + entry.getValue())) + .collect(Collectors.joining(",\n"))) + .append("\n)\n"); + } else { + sb.append(" AS\n"); + sb.append(((ResolvedCatalogView) table).getExpandedQuery()).append("\n"); + } + return new String[] {sb.toString()}; + } + + private String getColumnString(Column column) { + final StringBuilder sb = new StringBuilder(); + sb.append(EncodingUtils.escapeIdentifier(column.getName())); + sb.append(" "); + // skip data type for computed column + if (column instanceof Column.ComputedColumn) { + sb.append( + column.explainExtras() + .orElseThrow( + () -> + new TableException( + String.format( + "Column expression can not be null for computed column '%s'", + column.getName())))); + } else { + DataType dataType = column.getDataType(); + String type = dataType.toString(); + LogicalType logicalType = dataType.getLogicalType(); + // skip internal timestamp kind + if (logicalType.getTypeRoot() == LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE) { + type = logicalType.asSerializableString(); + } + sb.append(type); + column.explainExtras() + .ifPresent( + e -> { + sb.append(" "); + sb.append(e); + }); + } + return sb.toString(); + } private TableResult buildShowFullModulesResult(ModuleEntry[] moduleEntries) { Object[][] rows = diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowCreateTableOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowCreateTableOperation.java index 83374dca0d516..1008289fcd6cb 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowCreateTableOperation.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowCreateTableOperation.java @@ -20,23 +20,21 @@ import org.apache.flink.table.catalog.ObjectIdentifier; -/** - * Operation to describe a SHOW CREATE TABLE statement. - * */ -public class ShowCreateTableOperation implements ShowOperation{ +/** Operation to describe a SHOW CREATE TABLE statement. */ +public class ShowCreateTableOperation implements ShowOperation { - private final ObjectIdentifier sqlIdentifier; + private final ObjectIdentifier sqlIdentifier; - public ShowCreateTableOperation(ObjectIdentifier sqlIdentifier) { - this.sqlIdentifier = sqlIdentifier; - } + public ShowCreateTableOperation(ObjectIdentifier sqlIdentifier) { + this.sqlIdentifier = sqlIdentifier; + } - public ObjectIdentifier getSqlIdentifier() { - return sqlIdentifier; - } + public ObjectIdentifier getSqlIdentifier() { + return sqlIdentifier; + } - @Override - public String asSummaryString() { - return String.format("SHOW CREATE TABLE %s", sqlIdentifier.asSummaryString()); - } + @Override + public String asSummaryString() { + return String.format("SHOW CREATE TABLE %s", sqlIdentifier.asSummaryString()); + } } diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/UniqueConstraint.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/UniqueConstraint.java index a60f0d2f831be..08654dc35835d 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/UniqueConstraint.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/UniqueConstraint.java @@ -56,6 +56,17 @@ public List getColumns() { return columns; } + private final String getTypeString() { + switch (getType()) { + case PRIMARY_KEY: + return "PRIMARY KEY"; + case UNIQUE_KEY: + return "UNIQUE"; + default: + throw new IllegalStateException("Unknown key type: " + getType()); + } + } + @Override public ConstraintType getType() { return type; @@ -67,27 +78,15 @@ public ConstraintType getType() { *
      * CONSTRAINT [constraint-name] [constraint-type] ([constraint-definition])
      *
-     * E.g CONSTRAINT pk PRIMARY KEY (f0, f1) NOT ENFORCED
+     * E.g CONSTRAINT pk PRIMARY KEY (`f0`, `f1`) NOT ENFORCED
      * 
*/ @Override public final String asSummaryString() { - final String typeString; - switch (getType()) { - case PRIMARY_KEY: - typeString = "PRIMARY KEY"; - break; - case UNIQUE_KEY: - typeString = "UNIQUE"; - break; - default: - throw new IllegalStateException("Unknown key type: " + getType()); - } - return String.format( "CONSTRAINT %s %s (%s)%s", EncodingUtils.escapeIdentifier(getName()), - typeString, + getTypeString(), columns.stream() .map(EncodingUtils::escapeIdentifier) .collect(Collectors.joining(", ")), diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java index 3fe4a1259f10f..8d55d63adb408 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java @@ -259,8 +259,8 @@ public static Optional convert( } else if (validated instanceof SqlAlterFunction) { return Optional.of(converter.convertAlterFunction((SqlAlterFunction) validated)); } else if (validated instanceof SqlShowCreateTable) { - return Optional.of(converter.convertShowCreateTable((SqlShowCreateTable) validated)); - } else if (validated instanceof SqlShowFunctions) { + return Optional.of(converter.convertShowCreateTable((SqlShowCreateTable) validated)); + } else if (validated instanceof SqlShowFunctions) { return Optional.of(converter.convertShowFunctions((SqlShowFunctions) validated)); } else if (validated instanceof SqlShowPartitions) { return Optional.of(converter.convertShowPartitions((SqlShowPartitions) validated)); @@ -786,11 +786,12 @@ private Operation convertShowTables(SqlShowTables sqlShowTables) { } /** Convert SHOW CREATE TABLE statement. */ - private Operation convertShowCreateTable(SqlShowCreateTable sqlShowCreateTable) { - UnresolvedIdentifier unresolvedIdentifier = UnresolvedIdentifier.of(sqlShowCreateTable.getFullTableName()); - ObjectIdentifier identifier = catalogManager.qualifyIdentifier(unresolvedIdentifier); - return new ShowCreateTableOperation(identifier); - } + private Operation convertShowCreateTable(SqlShowCreateTable sqlShowCreateTable) { + UnresolvedIdentifier unresolvedIdentifier = + UnresolvedIdentifier.of(sqlShowCreateTable.getFullTableName()); + ObjectIdentifier identifier = catalogManager.qualifyIdentifier(unresolvedIdentifier); + return new ShowCreateTableOperation(identifier); + } /** Convert SHOW FUNCTIONS statement. */ private Operation convertShowFunctions(SqlShowFunctions sqlShowFunctions) { diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/catalog/CatalogTableITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/catalog/CatalogTableITCase.scala index 3dc5159d8361e..232fec0a1de63 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/catalog/CatalogTableITCase.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/catalog/CatalogTableITCase.scala @@ -29,7 +29,7 @@ import org.apache.flink.table.planner.utils.DateTimeTestUtil.localDateTime import org.apache.flink.test.util.AbstractTestBase import org.apache.flink.types.Row import org.apache.flink.util.FileUtils -import org.junit.Assert.{assertEquals, assertTrue, fail} +import org.junit.Assert.{assertEquals, fail} import org.junit.rules.ExpectedException import org.junit.runner.RunWith import org.junit.runners.Parameterized @@ -987,26 +987,76 @@ class CatalogTableITCase(isStreamingMode: Boolean) extends AbstractTestBase { @Test def testCreateTableAndShowCreateTable(): Unit = { - val createDDL = - """ |CREATE TABLE `TBL1` ( - | `A` BIGINT NOT NULL, - | `H` STRING, - | `G` AS 2 * (`A` + 1), - | `B` STRING NOT NULL, - | `TS` TIMESTAMP(3), - | `PROC` AS PROCTIME(), - | WATERMARK FOR `TS` AS `TS` - INTERVAL '5' SECOND, - | CONSTRAINT test_constraint PRIMARY KEY (`A`, `B`) NOT ENFORCED - |) COMMENT 'test show create table statement' - |PARTITIONED BY (`B`, `H`) - |WITH ( + val executedDDL = + """ + |create temporary table TBL1 ( + | a bigint not null, + | h string, + | g as 2*(a+1), + | b string not null, + | c bigint metadata virtual, + | ts timestamp(3), + | proc as proctime(), + | watermark for ts as ts - interval '5' second, + | constraint test_constraint primary key (a, b) not enforced + |) comment 'test show create table statement' + |partitioned by (b,h) + |with ( | 'connector' = 'kafka', | 'kafka.topic' = 'log.test' |) |""".stripMargin - tableEnv.executeSql(createDDL) - val row = tableEnv.executeSql("SHOW CREATE TABLE `TBL1`").collect().next(); - assertEquals(createDDL, row.getField(0)) + + val expectedDDL = + """ |CREATE TEMPORARY TABLE `default_catalog`.`default_database`.`TBL1` ( + | `a` BIGINT NOT NULL, + | `h` STRING, + | `g` AS 2 * (`a` + 1), + | `b` STRING NOT NULL, + | `c` BIGINT METADATA VIRTUAL, + | `ts` TIMESTAMP(3), + | `proc` AS PROCTIME(), + | WATERMARK FOR `ts` AS `ts` - INTERVAL '5' SECOND, + | CONSTRAINT `test_constraint` PRIMARY KEY (`a`, `b`) NOT ENFORCED + |) COMMENT 'test show create table statement' + |PARTITIONED BY (`b`, `h`) + |WITH ( + | 'connector' = 'kafka', + | 'kafka.topic' = 'log.test' + |) + |""".stripMargin + tableEnv.executeSql(executedDDL) + val row = tableEnv.executeSql("SHOW CREATE TABLE `TBL1`").collect().next() + assertEquals(expectedDDL, row.getField(0)) + } + + @Test + def testCreateViewAndShowCreateTable(): Unit = { + val createTableDDL = + """ |create table `source` ( + | `id` bigint not null, + | `group` string not null, + | `score` double + |) with ( + | 'connector' = 'source-only' + |) + |""".stripMargin + val createViewDDL = + """ |create view `tmp` as + |select `group`, avg(`score`) as avg_score + |from `source` + |group by `group` + |""".stripMargin + val expectedDDL = + """ |CREATE VIEW `default_catalog`.`default_database`.`tmp` AS + |SELECT `source`.`group`, AVG(`source`.`score`) AS `avg_score` + |FROM `default_catalog`.`default_database`.`source` + |GROUP BY `source`.`group` + |""".stripMargin + tableEnv.executeSql(createTableDDL) + tableEnv.executeSql(createViewDDL) + val row = tableEnv.executeSql("SHOW CREATE TABLE `tmp`").collect().next() + assertEquals(expectedDDL, row.getField(0)) } @Test diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sqlexec/SqlToOperationConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sqlexec/SqlToOperationConverter.java index d4178aa0658d1..70acb49ec6347 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sqlexec/SqlToOperationConverter.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sqlexec/SqlToOperationConverter.java @@ -39,8 +39,6 @@ import org.apache.flink.sql.parser.dql.SqlRichDescribeTable; import org.apache.flink.sql.parser.dql.SqlRichExplain; import org.apache.flink.sql.parser.dql.SqlShowCatalogs; -import org.apache.flink.sql.parser.dql.SqlShowCreateTable; -import org.apache.flink.sql.parser.dql.SqlShowCreateTable; import org.apache.flink.sql.parser.dql.SqlShowCurrentCatalog; import org.apache.flink.sql.parser.dql.SqlShowCurrentDatabase; import org.apache.flink.sql.parser.dql.SqlShowDatabases; @@ -73,8 +71,6 @@ import org.apache.flink.table.operations.Operation; import org.apache.flink.table.operations.PlannerQueryOperation; import org.apache.flink.table.operations.ShowCatalogsOperation; -import org.apache.flink.table.operations.ShowCreateTableOperation; -import org.apache.flink.table.operations.ShowCreateTableOperation; import org.apache.flink.table.operations.ShowCurrentCatalogOperation; import org.apache.flink.table.operations.ShowCurrentDatabaseOperation; import org.apache.flink.table.operations.ShowDatabasesOperation; @@ -184,9 +180,7 @@ public static Optional convert( return Optional.of(converter.convertAlterTable((SqlAlterTable) validated)); } else if (validated instanceof SqlShowTables) { return Optional.of(converter.convertShowTables((SqlShowTables) validated)); - } else if (validated instanceof SqlShowCreateTable) { - return Optional.of(converter.convertShowCreateTable((SqlShowCreateTable) validated)); - } else if (validated instanceof SqlCreateView) { + } else if (validated instanceof SqlCreateView) { return Optional.of(converter.convertCreateView((SqlCreateView) validated)); } else if (validated instanceof SqlDropView) { return Optional.of(converter.convertDropView((SqlDropView) validated)); @@ -577,14 +571,7 @@ private Operation convertShowTables(SqlShowTables sqlShowTables) { return new ShowTablesOperation(); } - /** Convert SHOW CREATE TABLE statement. */ - private Operation convertShowCreateTable(SqlShowCreateTable sqlShowCreateTable) { - UnresolvedIdentifier unresolvedIdentifier = UnresolvedIdentifier.of(sqlShowCreateTable.getFullTableName()); - ObjectIdentifier identifier = catalogManager.qualifyIdentifier(unresolvedIdentifier); - return new ShowCreateTableOperation(identifier); - } - - /** Convert SHOW FUNCTIONS statement. */ + /** Convert SHOW FUNCTIONS statement. */ private Operation convertShowFunctions(SqlShowFunctions sqlShowFunctions) { return new ShowFunctionsOperation( sqlShowFunctions.requireUser() ? FunctionScope.USER : FunctionScope.ALL); diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala index d7ed5bd06e54a..91846baaf359c 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala @@ -158,8 +158,6 @@ abstract class TableEnvImpl( "SHOW CATALOGS, SHOW DATABASES, SHOW TABLES, SHOW FUNCTIONS, CREATE VIEW, DROP VIEW, " + "SHOW VIEWS, INSERT, DESCRIBE." - private val PRINT_INDENT = " " - private def isStreamingMode: Boolean = this match { case _: BatchTableEnvImpl => false case _ => true @@ -786,15 +784,6 @@ abstract class TableEnvImpl( buildShowResult("current database name", Array(catalogManager.getCurrentDatabase)) case _: ShowTablesOperation => buildShowResult("table name", listTables()) - case showCreateTableOperation: ShowCreateTableOperation => - val result = catalogManager.getTable(showCreateTableOperation.getSqlIdentifier) - if (result.isPresent) { - buildShowCreateResult(result.get().getTable, showCreateTableOperation.getSqlIdentifier) - } else { - throw new ValidationException(String.format( - "Table or view with identifier '%s' doesn't exist", - showCreateTableOperation.getSqlIdentifier.asSummaryString())) - } case showFunctionsOperation: ShowFunctionsOperation => val functionScope = showFunctionsOperation.getFunctionScope() val functionNames = functionScope match { @@ -864,79 +853,6 @@ abstract class TableEnvImpl( buildResult(Array(columnName), Array(DataTypes.STRING), rows) } - private def buildShowCreateResult( - table: CatalogBaseTable, - sqlIdentifier: ObjectIdentifier): TableResult = { - val schema = table.getSchema - val fieldCount = schema.getFieldCount - val fieldDecls = - new Array[String](fieldCount + - schema.getWatermarkSpecs.length + - (if (schema.getPrimaryKey.isPresent) 1 else 0)) - - schema.getTableColumns().asScala.zipWithIndex.foreach { - case (col, ind) => { - fieldDecls(ind) = - if (col.getExpr.isPresent) { - String.format("%s`%s` AS %s", PRINT_INDENT, col.getName, col.getExpr.get) - } else { - String.format("%s`%s` %s", PRINT_INDENT, col.getName, col.getType) - } - } - } - if (!schema.getWatermarkSpecs.isEmpty) { - schema.getWatermarkSpecs.asScala.zipWithIndex.foreach { - case (sepc, ind) => { - fieldDecls(fieldCount + ind) = - String.format("%sWATERMARK FOR `%s` AS `%s`", - PRINT_INDENT, sepc.getRowtimeAttribute, sepc.getWatermarkExpr) - } - } - } - if (schema.getPrimaryKey.isPresent) { - fieldDecls(fieldCount + schema.getWatermarkSpecs.length) = - String.format("%s%s", PRINT_INDENT, schema.getPrimaryKey.get.asCanonicalString()) - } - - val withOptions = table.getOptions.map( - option => String.format("%s'%s' = '%s'", PRINT_INDENT, option._1, option._2) - ).toArray - - // assemble all components - val sb = StringBuilder.newBuilder - sb.append(String.format("CREATE TABLE `%s` (\n", sqlIdentifier.getObjectName)) - .append(fieldDecls.mkString(",\n")) - .append("\n) ") - - if (table.getComment != null) { - sb.append(s"COMMENT '${table.getComment}'\n") - } - - table match { - case partitionedTable: CatalogTable => { - if (partitionedTable.isPartitioned) { - val keys = partitionedTable.getPartitionKeys - if (keys.length > 0) { - sb.append("PARTITIONED BY (") - .append(partitionedTable - .getPartitionKeys - .map(key => String.format("`%s`", key)) - .toArray - .mkString(", ")) - .append(")\n") - } - } - } - } - sb.append(s"WITH (\n${withOptions.mkString(",\n")}\n)\n") - - buildResult( - Array("create table"), - Array(DataTypes.STRING), - Array(Array(sb.toString)) - ) - } - private def buildDescribeResult(schema: TableSchema): TableResult = { val fieldToWatermark = schema diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/catalog/CatalogTableITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/catalog/CatalogTableITCase.scala index 20689f3145a8b..de3c69fc8923e 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/catalog/CatalogTableITCase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/catalog/CatalogTableITCase.scala @@ -750,26 +750,6 @@ class CatalogTableITCase(isStreaming: Boolean) extends AbstractTestBase { assertEquals(expectedProperty, database.getProperties) } - @Test - def testExecuteSqlWithCreateAndShowCreateTable(): Unit = { - val createDDL = - """|CREATE TABLE `TBL1` ( - | `A` BIGINT, - | `H` STRING, - | `B` STRING, - | `TS` TIMESTAMP(3) - |) COMMENT 'test show create table statement' - |PARTITIONED BY (`B`, `H`) - |WITH ( - | 'connector' = 'kafka', - | 'kafka.topic' = 'log.test' - |) - |""".stripMargin - tableEnv.executeSql(createDDL) - val showResult = tableEnv.executeSql("SHOW CREATE TABLE `TBL1`").collect().next(); - assertEquals(createDDL, showResult.getField(0)) - } - @Test(expected = classOf[ValidationException]) def testCreateViewTwice(): Unit = { val sourceData = List( From e3cbbbdf8de0cf09c03f8881347675afef31ed7f Mon Sep 17 00:00:00 2001 From: Jane Chan Date: Tue, 13 Apr 2021 16:39:54 +0800 Subject: [PATCH 3/7] Fix indention --- docs/content.zh/docs/dev/table/sql/show.md | 6 +++--- docs/content/docs/dev/table/sql/show.md | 6 +++--- .../flink-sql-client/src/test/resources/sql/view.q | 8 ++++---- 3 files changed, 10 insertions(+), 10 deletions(-) diff --git a/docs/content.zh/docs/dev/table/sql/show.md b/docs/content.zh/docs/dev/table/sql/show.md index a4b5a15291aa4..ae68acebccace 100644 --- a/docs/content.zh/docs/dev/table/sql/show.md +++ b/docs/content.zh/docs/dev/table/sql/show.md @@ -148,7 +148,7 @@ tEnv.executeSql("SHOW CREATE TABLE my_view").print(); // | create table | // +---------------------------------------------------------+ // | CREATE VIEW `default_catalog`.`default_db`.`my_view` AS | -// | ... | +// | ... | // | | // +---------------------------------------------------------+ @@ -249,7 +249,7 @@ tEnv.executeSql("SHOW CREATE TABLE my_view").print() // | create table | // +---------------------------------------------------------+ // | CREATE VIEW `default_catalog`.`default_db`.`my_view` AS | -// | ... | +// | ... | // | | // +---------------------------------------------------------+ @@ -350,7 +350,7 @@ table_env.execute_sql("SHOW CREATE TABLE my_view").print() # | create table | # +---------------------------------------------------------+ # | CREATE VIEW `default_catalog`.`default_db`.`my_view` AS | -# | ... | +# | ... | # | | # +---------------------------------------------------------+ diff --git a/docs/content/docs/dev/table/sql/show.md b/docs/content/docs/dev/table/sql/show.md index 56544a4a47451..348d0a7410488 100644 --- a/docs/content/docs/dev/table/sql/show.md +++ b/docs/content/docs/dev/table/sql/show.md @@ -147,7 +147,7 @@ tEnv.executeSql("SHOW CREATE TABLE my_view").print(); // | create table | // +---------------------------------------------------------+ // | CREATE VIEW `default_catalog`.`default_db`.`my_view` AS | -// | ... | +// | ... | // | | // +---------------------------------------------------------+ @@ -248,7 +248,7 @@ tEnv.executeSql("SHOW CREATE TABLE my_view").print() // | create table | // +---------------------------------------------------------+ // | CREATE VIEW `default_catalog`.`default_db`.`my_view` AS | -// | ... | +// | ... | // | | // +---------------------------------------------------------+ @@ -349,7 +349,7 @@ table_env.execute_sql("SHOW CREATE TABLE my_view").print() # | create table | # +---------------------------------------------------------+ # | CREATE VIEW `default_catalog`.`default_db`.`my_view` AS | -# | ... | +# | ... | # | | # +---------------------------------------------------------+ diff --git a/flink-table/flink-sql-client/src/test/resources/sql/view.q b/flink-table/flink-sql-client/src/test/resources/sql/view.q index 5342cf3b5531f..2e20a6052871e 100644 --- a/flink-table/flink-sql-client/src/test/resources/sql/view.q +++ b/flink-table/flink-sql-client/src/test/resources/sql/view.q @@ -74,14 +74,14 @@ show views; # test SHOW CREATE TABLE for views show create table v1; -+------------------------------------------------------------------------------------------------------------------------+ -| create table | -+------------------------------------------------------------------------------------------------------------------------+ ++---------------------------------------------------------------------------------------------------------------------------------+ +| create table | ++---------------------------------------------------------------------------------------------------------------------------------+ | CREATE TEMPORARY VIEW `default_catalog`.`default_database`.`v1` AS SELECT * FROM `default_catalog`.`default_database`.`orders` | -+------------------------------------------------------------------------------------------------------------------------+ ++---------------------------------------------------------------------------------------------------------------------------------+ 1 row in set !ok From 3c8d492cdc3496847971cd07e95af849332e49e1 Mon Sep 17 00:00:00 2001 From: Jane Chan Date: Fri, 16 Apr 2021 02:29:45 +0800 Subject: [PATCH 4/7] Address comments. 1. Revert changes on Hive parser and remove unnecessary whitespace in parserImpls.ftl 2. Change the output format from tableau form to raw content 3. Add TODO to track missing column comment 4. Only support showing catalog table 5. Update docs accordingly --- .../connectors/table/hive/hive_dialect.md | 1 - docs/content.zh/docs/dev/table/sql/show.md | 104 +++-------- .../connectors/table/hive/hive_dialect.md | 1 - docs/content/docs/dev/table/sql/show.md | 104 +++-------- .../flink/table/client/cli/CliClient.java | 12 ++ .../src/test/resources/sql/table.q | 27 +-- .../src/test/resources/sql/view.q | 13 +- .../src/main/codegen/data/Parser.tdd | 2 - .../src/main/codegen/includes/parserImpls.ftl | 16 -- .../hive/FlinkHiveSqlParserImplTest.java | 6 - .../src/main/codegen/includes/parserImpls.ftl | 2 +- .../api/internal/TableEnvironmentImpl.java | 162 +++++++++--------- .../planner/catalog/CatalogTableITCase.scala | 22 +-- 13 files changed, 169 insertions(+), 303 deletions(-) diff --git a/docs/content.zh/docs/connectors/table/hive/hive_dialect.md b/docs/content.zh/docs/connectors/table/hive/hive_dialect.md index 3d6815f3914d0..c70554429a4f2 100644 --- a/docs/content.zh/docs/connectors/table/hive/hive_dialect.md +++ b/docs/content.zh/docs/connectors/table/hive/hive_dialect.md @@ -162,7 +162,6 @@ USE database_name; ```sql SHOW TABLES; -SHOW CREATE TABLE [catalog_name.][db_name.]table_name; ``` #### Create diff --git a/docs/content.zh/docs/dev/table/sql/show.md b/docs/content.zh/docs/dev/table/sql/show.md index ae68acebccace..8289651a1742d 100644 --- a/docs/content.zh/docs/dev/table/sql/show.md +++ b/docs/content.zh/docs/dev/table/sql/show.md @@ -28,7 +28,7 @@ under the License. -SHOW 语句用于列出所有的 catalog,或者列出当前 catalog 中所有的 database,或者列出当前 catalog 和当前 database 的所有表或视图,或者列出当前正在使用的 catalog 和 database, 或者列出创建指定表或视图的语句,或者列出当前 catalog 和当前 database 中所有的 function,包括:系统 function 和用户定义的 function,或者仅仅列出当前 catalog 和当前 database 中用户定义的 function,或者列出当前环境所有激活的 module,或者列出当前环境所有加载的 module 及激活状态。 +SHOW 语句用于列出所有的 catalog,或者列出当前 catalog 中所有的 database,或者列出当前 catalog 和当前 database 的所有表或视图,或者列出当前正在使用的 catalog 和 database, 或者列出创建指定表的语句,或者列出当前 catalog 和当前 database 中所有的 function,包括:系统 function 和用户定义的 function,或者仅仅列出当前 catalog 和当前 database 中用户定义的 function,或者列出当前环境所有激活的 module,或者列出当前环境所有加载的 module 及激活状态。 目前 Flink SQL 支持下列 SHOW 语句: - SHOW CATALOGS @@ -123,16 +123,13 @@ tEnv.executeSql("SHOW TABLES").print(); // show create table tEnv.executeSql("SHOW CREATE TABLE my_table").print(); -// +----------------------------------------------------------+ -// | create table | -// +----------------------------------------------------------+ -// | CREATE TABLE `default_catalog`.`default_db`.`my_table` ( | -// | ... | -// | ) WITH ( | -// | ... | -// | ) | -// +----------------------------------------------------------+ - +// CREATE TABLE `default_catalog`.`default_db`.`my_table` ( +// ... +// ) WITH ( +// ... +// ) + + // create a view tEnv.executeSql("CREATE VIEW my_view AS ..."); // show views @@ -142,15 +139,6 @@ tEnv.executeSql("SHOW VIEWS").print(); // +-----------+ // | my_view | // +-----------+ -// show create table for view -tEnv.executeSql("SHOW CREATE TABLE my_view").print(); -// +---------------------------------------------------------+ -// | create table | -// +---------------------------------------------------------+ -// | CREATE VIEW `default_catalog`.`default_db`.`my_view` AS | -// | ... | -// | | -// +---------------------------------------------------------+ // show functions tEnv.executeSql("SHOW FUNCTIONS").print(); @@ -225,15 +213,11 @@ tEnv.executeSql("SHOW TABLES").print() // show create table tEnv.executeSql("SHOW CREATE TABLE my_table").print() -// +----------------------------------------------------------+ -// | create table | -// +----------------------------------------------------------+ -// | CREATE TABLE `default_catalog`.`default_db`.`my_table` ( | -// | ... | -// | ) WITH ( | -// | ... | -// | ) | -// +----------------------------------------------------------+ +// CREATE TABLE `default_catalog`.`default_db`.`my_table` ( +// ... +// ) WITH ( +// ... +// ) // create a view tEnv.executeSql("CREATE VIEW my_view AS ...") // show views @@ -243,15 +227,6 @@ tEnv.executeSql("SHOW VIEWS").print() // +-----------+ // | my_view | // +-----------+ -// show create table for view -tEnv.executeSql("SHOW CREATE TABLE my_view").print() -// +---------------------------------------------------------+ -// | create table | -// +---------------------------------------------------------+ -// | CREATE VIEW `default_catalog`.`default_db`.`my_view` AS | -// | ... | -// | | -// +---------------------------------------------------------+ // show functions tEnv.executeSql("SHOW FUNCTIONS").print() @@ -325,15 +300,11 @@ table_env.execute_sql("SHOW TABLES").print() # +------------+ # show create table table_env.executeSql("SHOW CREATE TABLE my_table").print() -# +----------------------------------------------------------+ -# | create table | -# +----------------------------------------------------------+ -# | CREATE TABLE `default_catalog`.`default_db`.`my_table` ( | -# | ... | -# | ) WITH ( | -# | ... | -# | ) | -# +----------------------------------------------------------+ +# CREATE TABLE `default_catalog`.`default_db`.`my_table` ( +# ... +# ) WITH ( +# ... +# ) # create a view table_env.execute_sql("CREATE VIEW my_view AS ...") @@ -344,15 +315,6 @@ table_env.execute_sql("SHOW VIEWS").print() # +-----------+ # | my_view | # +-----------+ -# show create table for view -table_env.execute_sql("SHOW CREATE TABLE my_view").print() -# +---------------------------------------------------------+ -# | create table | -# +---------------------------------------------------------+ -# | CREATE VIEW `default_catalog`.`default_db`.`my_view` AS | -# | ... | -# | | -# +---------------------------------------------------------+ # show functions table_env.execute_sql("SHOW FUNCTIONS").print() @@ -409,33 +371,17 @@ Flink SQL> SHOW TABLES; my_table Flink SQL> SHOW CREATE TABLE my_table; -+----------------------------------------------------------+ -| create table | -+----------------------------------------------------------+ -| CREATE TABLE `default_catalog`.`default_db`.`my_table` ( | -| ... | -| ) WITH ( | -| ... | -| ) | -+----------------------------------------------------------+ -1 row in set +CREATE TABLE `default_catalog`.`default_db`.`my_table` ( + ... +) WITH ( + ... +) Flink SQL> CREATE VIEW my_view AS ...; [INFO] View has been created. Flink SQL> SHOW VIEWS; my_view - -Flink SQL> SHOW CREATE TABLE my_view; -+---------------------------------------------------------+ -| create table | -+---------------------------------------------------------+ -| CREATE VIEW `default_catalog`.`default_db`.`my_view` AS | -| ... | -| | -+---------------------------------------------------------+ -1 row in set - Flink SQL> SHOW FUNCTIONS; mod sha256 @@ -518,7 +464,9 @@ SHOW TABLES SHOW CREATE TABLE [catalog_name.][db_name.]table_name ``` -展示创建指定表或视图的 create 语句。 +展示创建指定表的 create 语句。 + +Attention 目前 `SHOW CREATE TABLE` 只支持通过 Flink SQL 创建的表。 ## SHOW VIEWS diff --git a/docs/content/docs/connectors/table/hive/hive_dialect.md b/docs/content/docs/connectors/table/hive/hive_dialect.md index ad4ebf062f036..a1c71fdeff0d6 100644 --- a/docs/content/docs/connectors/table/hive/hive_dialect.md +++ b/docs/content/docs/connectors/table/hive/hive_dialect.md @@ -170,7 +170,6 @@ USE database_name; ```sql SHOW TABLES; -SHOW CREATE TABLE [catalog_name.][db_name.]table_name; ``` #### Create diff --git a/docs/content/docs/dev/table/sql/show.md b/docs/content/docs/dev/table/sql/show.md index 348d0a7410488..80ff53bc48a16 100644 --- a/docs/content/docs/dev/table/sql/show.md +++ b/docs/content/docs/dev/table/sql/show.md @@ -26,7 +26,7 @@ under the License. # SHOW Statements -SHOW statements are used to list all catalogs, or list all databases in the current catalog, or list all tables/views in the current catalog and the current database, or show current catalog and database, or show create statement for specified table/view, or list all functions including system functions and user-defined functions in the current catalog and current database, or list only user-defined functions in the current catalog and current database, or list enabled module names, or list all loaded modules with enabled status in the current session. +SHOW statements are used to list all catalogs, or list all databases in the current catalog, or list all tables/views in the current catalog and the current database, or show current catalog and database, or show create statement for specified table, or list all functions including system functions and user-defined functions in the current catalog and current database, or list only user-defined functions in the current catalog and current database, or list enabled module names, or list all loaded modules with enabled status in the current session. Flink SQL supports the following SHOW statements for now: - SHOW CATALOGS @@ -122,16 +122,13 @@ tEnv.executeSql("SHOW TABLES").print(); // show create table tEnv.executeSql("SHOW CREATE TABLE my_table").print(); -// +----------------------------------------------------------+ -// | create table | -// +----------------------------------------------------------+ -// | CREATE TABLE `default_catalog`.`default_db`.`my_table` ( | -// | ... | -// | ) WITH ( | -// | ... | -// | ) | -// +----------------------------------------------------------+ - +// CREATE TABLE `default_catalog`.`default_db`.`my_table` ( +// ... +// ) WITH ( +// ... +// ) + + // create a view tEnv.executeSql("CREATE VIEW my_view AS ..."); // show views @@ -141,15 +138,6 @@ tEnv.executeSql("SHOW VIEWS").print(); // +-----------+ // | my_view | // +-----------+ -// show create table for view -tEnv.executeSql("SHOW CREATE TABLE my_view").print(); -// +---------------------------------------------------------+ -// | create table | -// +---------------------------------------------------------+ -// | CREATE VIEW `default_catalog`.`default_db`.`my_view` AS | -// | ... | -// | | -// +---------------------------------------------------------+ // show functions tEnv.executeSql("SHOW FUNCTIONS").print(); @@ -224,15 +212,11 @@ tEnv.executeSql("SHOW TABLES").print() // show create table tEnv.executeSql("SHOW CREATE TABLE my_table").print() -// +----------------------------------------------------------+ -// | create table | -// +----------------------------------------------------------+ -// | CREATE TABLE `default_catalog`.`default_db`.`my_table` ( | -// | ... | -// | ) WITH ( | -// | ... | -// | ) | -// +----------------------------------------------------------+ +// CREATE TABLE `default_catalog`.`default_db`.`my_table` ( +// ... +// ) WITH ( +// ... +// ) // create a view tEnv.executeSql("CREATE VIEW my_view AS ...") // show views @@ -242,15 +226,6 @@ tEnv.executeSql("SHOW VIEWS").print() // +-----------+ // | my_view | // +-----------+ -// show create table for view -tEnv.executeSql("SHOW CREATE TABLE my_view").print() -// +---------------------------------------------------------+ -// | create table | -// +---------------------------------------------------------+ -// | CREATE VIEW `default_catalog`.`default_db`.`my_view` AS | -// | ... | -// | | -// +---------------------------------------------------------+ // show functions tEnv.executeSql("SHOW FUNCTIONS").print() @@ -324,15 +299,11 @@ table_env.execute_sql("SHOW TABLES").print() # +------------+ # show create table table_env.executeSql("SHOW CREATE TABLE my_table").print() -# +----------------------------------------------------------+ -# | create table | -# +----------------------------------------------------------+ -# | CREATE TABLE `default_catalog`.`default_db`.`my_table` ( | -# | ... | -# | ) WITH ( | -# | ... | -# | ) | -# +----------------------------------------------------------+ +# CREATE TABLE `default_catalog`.`default_db`.`my_table` ( +# ... +# ) WITH ( +# ... +# ) # create a view table_env.execute_sql("CREATE VIEW my_view AS ...") @@ -343,15 +314,6 @@ table_env.execute_sql("SHOW VIEWS").print() # +-----------+ # | my_view | # +-----------+ -# show create table for view -table_env.execute_sql("SHOW CREATE TABLE my_view").print() -# +---------------------------------------------------------+ -# | create table | -# +---------------------------------------------------------+ -# | CREATE VIEW `default_catalog`.`default_db`.`my_view` AS | -# | ... | -# | | -# +---------------------------------------------------------+ # show functions table_env.execute_sql("SHOW FUNCTIONS").print() @@ -408,33 +370,17 @@ Flink SQL> SHOW TABLES; my_table Flink SQL> SHOW CREATE TABLE my_table; -+----------------------------------------------------------+ -| create table | -+----------------------------------------------------------+ -| CREATE TABLE `default_catalog`.`default_db`.`my_table` ( | -| ... | -| ) WITH ( | -| ... | -| ) | -+----------------------------------------------------------+ -1 row in set +CREATE TABLE `default_catalog`.`default_db`.`my_table` ( + ... +) WITH ( + ... +) Flink SQL> CREATE VIEW my_view AS ...; [INFO] View has been created. Flink SQL> SHOW VIEWS; my_view - -Flink SQL> SHOW CREATE TABLE my_view; -+---------------------------------------------------------+ -| create table | -+---------------------------------------------------------+ -| CREATE VIEW `default_catalog`.`default_db`.`my_view` AS | -| ... | -| | -+---------------------------------------------------------+ -1 row in set - Flink SQL> SHOW FUNCTIONS; mod sha256 @@ -518,7 +464,9 @@ Show all tables in the current catalog and the current database. SHOW CREATE TABLE ``` -Show create table statement for specified table or view. +Show create table statement for specified table. + +Attention Currently `SHOW CREATE TABLE` only supports table that is created by Flink SQL. ## SHOW VIEWS diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java index a738c977d46e1..e3a526a9fd091 100644 --- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java +++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java @@ -35,6 +35,7 @@ import org.apache.flink.table.operations.ModifyOperation; import org.apache.flink.table.operations.Operation; import org.apache.flink.table.operations.QueryOperation; +import org.apache.flink.table.operations.ShowCreateTableOperation; import org.apache.flink.table.operations.UnloadModuleOperation; import org.apache.flink.table.operations.UseOperation; import org.apache.flink.table.operations.command.ClearOperation; @@ -414,6 +415,9 @@ private void callOperation(Operation operation, ExecutionMode mode) { } else if (operation instanceof EndStatementSetOperation) { // END callEndStatementSet(); + } else if (operation instanceof ShowCreateTableOperation) { + // SHOW CREATE TABLE + callShowCreateTable((ShowCreateTableOperation) operation); } else { // fallback to default implementation executeOperation(operation); @@ -527,6 +531,14 @@ private void callInserts(List operations) { } public void callExplain(ExplainOperation operation) { + printRawContent(operation); + } + + public void callShowCreateTable(ShowCreateTableOperation operation) { + printRawContent(operation); + } + + public void printRawContent(Operation operation) { TableResult tableResult = executor.executeOperation(sessionId, operation); // show raw content instead of tableau style final String explanation = diff --git a/flink-table/flink-sql-client/src/test/resources/sql/table.q b/flink-table/flink-sql-client/src/test/resources/sql/table.q index a871d7ea41597..44957b82a17dd 100644 --- a/flink-table/flink-sql-client/src/test/resources/sql/table.q +++ b/flink-table/flink-sql-client/src/test/resources/sql/table.q @@ -75,10 +75,7 @@ show tables; # test SHOW CREATE TABLE show create table orders; -+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ -| create table | -+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ -| CREATE TABLE `default_catalog`.`default_database`.`orders` ( +CREATE TABLE `default_catalog`.`default_database`.`orders` ( `user` BIGINT NOT NULL, `product` VARCHAR(32), `amount` INT, @@ -89,9 +86,7 @@ show create table orders; ) WITH ( 'connector' = 'datagen' ) - | -+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ -1 row in set + !ok # ========================================================================== @@ -140,10 +135,7 @@ desc orders2; # test SHOW CREATE TABLE show create table orders2; -+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ -| create table | -+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ -| CREATE TABLE `default_catalog`.`default_database`.`orders2` ( +CREATE TABLE `default_catalog`.`default_database`.`orders2` ( `user` BIGINT NOT NULL, `product` VARCHAR(32), `amount` INT, @@ -154,9 +146,7 @@ show create table orders2; ) WITH ( 'connector' = 'kafka' ) - | -+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ -1 row in set + !ok # ========================================================================== @@ -209,19 +199,14 @@ show tables; # SHOW CREATE TABLE for temporary table show create table tbl1; -+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ -| create table | -+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ -| CREATE TEMPORARY TABLE `default_catalog`.`default_database`.`tbl1` ( +CREATE TEMPORARY TABLE `default_catalog`.`default_database`.`tbl1` ( `user` BIGINT NOT NULL, `product` VARCHAR(32), `amount` INT ) WITH ( 'connector' = 'datagen' ) - | -+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ -1 row in set + !ok drop temporary table tbl1; diff --git a/flink-table/flink-sql-client/src/test/resources/sql/view.q b/flink-table/flink-sql-client/src/test/resources/sql/view.q index 2e20a6052871e..e9821d7d1e745 100644 --- a/flink-table/flink-sql-client/src/test/resources/sql/view.q +++ b/flink-table/flink-sql-client/src/test/resources/sql/view.q @@ -74,16 +74,9 @@ show views; # test SHOW CREATE TABLE for views show create table v1; -+---------------------------------------------------------------------------------------------------------------------------------+ -| create table | -+---------------------------------------------------------------------------------------------------------------------------------+ -| CREATE TEMPORARY VIEW `default_catalog`.`default_database`.`v1` AS -SELECT * -FROM `default_catalog`.`default_database`.`orders` - | -+---------------------------------------------------------------------------------------------------------------------------------+ -1 row in set -!ok +[ERROR] Could not execute SQL statement. Reason: +org.apache.flink.table.api.TableException: Could not execute SHOW CREATE TABLE. View with identifier `default_catalog`.`default_database`.`v1` is not supported. +!error # ==== test permanent view ===== diff --git a/flink-table/flink-sql-parser-hive/src/main/codegen/data/Parser.tdd b/flink-table/flink-sql-parser-hive/src/main/codegen/data/Parser.tdd index 920145476b2ee..a778bf7e9f1d2 100644 --- a/flink-table/flink-sql-parser-hive/src/main/codegen/data/Parser.tdd +++ b/flink-table/flink-sql-parser-hive/src/main/codegen/data/Parser.tdd @@ -87,7 +87,6 @@ "org.apache.flink.sql.parser.dql.SqlShowFunctions" "org.apache.flink.sql.parser.dql.SqlShowModules" "org.apache.flink.sql.parser.dql.SqlShowTables" - "org.apache.flink.sql.parser.dql.SqlShowCreateTable" "org.apache.flink.sql.parser.dql.SqlShowPartitions" "org.apache.flink.sql.parser.dql.SqlRichDescribeTable" "org.apache.flink.sql.parser.dql.SqlRichExplain" @@ -531,7 +530,6 @@ "SqlAlterDatabase()" "SqlDescribeDatabase()" "SqlShowTables()" - "SqlShowCreateTable()" "SqlRichDescribeTable()" "SqlShowFunctions()" "SqlAlterTable()" diff --git a/flink-table/flink-sql-parser-hive/src/main/codegen/includes/parserImpls.ftl b/flink-table/flink-sql-parser-hive/src/main/codegen/includes/parserImpls.ftl index d1324704992c2..6457f32a144e4 100644 --- a/flink-table/flink-sql-parser-hive/src/main/codegen/includes/parserImpls.ftl +++ b/flink-table/flink-sql-parser-hive/src/main/codegen/includes/parserImpls.ftl @@ -252,22 +252,6 @@ SqlShowTables SqlShowTables() : } } -/** -* Parse a "Show Create Table" query command. -*/ -SqlShowCreateTable SqlShowCreateTable() : -{ - SqlIdentifier tableName; - SqlParserPos pos; -} -{ -
{ pos = getPos();} - tableName = CompoundIdentifier() - { - return new SqlShowCreateTable(pos, tableName); - } -} - /** * Here we add Rich in className to distinguish from calcite's original SqlDescribeTable. */ diff --git a/flink-table/flink-sql-parser-hive/src/test/java/org/apache/flink/sql/parser/hive/FlinkHiveSqlParserImplTest.java b/flink-table/flink-sql-parser-hive/src/test/java/org/apache/flink/sql/parser/hive/FlinkHiveSqlParserImplTest.java index 4b9cb9212ca6a..0bdf11b619178 100644 --- a/flink-table/flink-sql-parser-hive/src/test/java/org/apache/flink/sql/parser/hive/FlinkHiveSqlParserImplTest.java +++ b/flink-table/flink-sql-parser-hive/src/test/java/org/apache/flink/sql/parser/hive/FlinkHiveSqlParserImplTest.java @@ -111,12 +111,6 @@ public void testShowTables() { sql("show tables").ok("SHOW TABLES"); } - @Test - public void testShowCreateTable() { - sql("show create table tbl").ok("SHOW CREATE TABLE `TBL`"); - sql("show create table catalog1.db1.tbl").ok("SHOW CREATE TABLE `CATALOG1`.`DB1`.`TBL`"); - } - @Test public void testDescribeTable() { // TODO: support describe partition and columns diff --git a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl index f65b8fe1c5876..81cf8fba592e1 100644 --- a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl +++ b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl @@ -425,7 +425,7 @@ SqlShowTables SqlShowTables() : /** * Parse a "Show Create Table" query command. */ -SqlShowCreateTable SqlShowCreateTable() : +SqlShowCreateTable SqlShowCreateTable() : { SqlIdentifier tableName; SqlParserPos pos; diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java index 37c50bf4b932f..9873c5f4d0e73 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java @@ -56,7 +56,6 @@ import org.apache.flink.table.catalog.QueryOperationCatalogView; import org.apache.flink.table.catalog.ResolvedCatalogBaseTable; import org.apache.flink.table.catalog.ResolvedCatalogTable; -import org.apache.flink.table.catalog.ResolvedCatalogView; import org.apache.flink.table.catalog.ResolvedSchema; import org.apache.flink.table.catalog.UnresolvedIdentifier; import org.apache.flink.table.catalog.WatermarkSpec; @@ -1136,17 +1135,26 @@ public TableResult executeInternal(Operation operation) { Optional result = catalogManager.getTable(showCreateTableOperation.getSqlIdentifier()); if (result.isPresent()) { - return buildShowResult( - "create table", - buildShowCreateTableRow( - result.get().getResolvedTable(), - ((ShowCreateTableOperation) operation).getSqlIdentifier(), - result.get().isTemporary())); + return TableResultImpl.builder() + .resultKind(ResultKind.SUCCESS_WITH_CONTENT) + .schema(ResolvedSchema.of(Column.physical("result", DataTypes.STRING()))) + .data( + Collections.singletonList( + Row.of( + buildShowCreateTableRow( + result.get().getResolvedTable(), + ((ShowCreateTableOperation) operation) + .getSqlIdentifier(), + result.get().isTemporary())))) + .setPrintStyle(TableResultImpl.PrintStyle.rawContent()) + .build(); } else { throw new ValidationException( String.format( - "Table with identifier '%s' does not exist.", - showCreateTableOperation.getSqlIdentifier().asSummaryString())); + "Could not execute SHOW CREATE TABLE. Table with identifier %s does not exist.", + showCreateTableOperation + .getSqlIdentifier() + .asSerializableString())); } } else if (operation instanceof ShowCurrentCatalogOperation) { return buildShowResult( @@ -1320,90 +1328,85 @@ private TableResult buildShowResult(String columnName, String[] objects) { Arrays.stream(objects).map((c) -> new String[] {c}).toArray(String[][]::new)); } - private String[] buildShowCreateTableRow( + private String buildShowCreateTableRow( ResolvedCatalogBaseTable table, ObjectIdentifier sqlIdentifier, boolean isTemporary) { CatalogBaseTable.TableKind kind = table.getTableKind(); + if (kind == CatalogBaseTable.TableKind.VIEW) { + throw new TableException( + String.format( + "Could not execute SHOW CREATE TABLE. View with identifier %s is not supported.", + sqlIdentifier.asSerializableString())); + } StringBuilder sb = new StringBuilder( String.format( - "CREATE%s%s%s", - isTemporary ? " TEMPORARY" : "", - kind == CatalogBaseTable.TableKind.TABLE ? " TABLE " : " VIEW ", + "CREATE %sTABLE %s (\n", + isTemporary ? "TEMPORARY " : "", sqlIdentifier.asSerializableString())); - if (kind == CatalogBaseTable.TableKind.TABLE) { - sb.append(" (\n"); - ResolvedSchema schema = table.getResolvedSchema(); - // append columns + ResolvedSchema schema = table.getResolvedSchema(); + // append columns + sb.append( + schema.getColumns().stream() + .map(column -> String.format("%s%s", printIndent, getColumnString(column))) + .collect(Collectors.joining(",\n"))); + // append watermark spec + if (!schema.getWatermarkSpecs().isEmpty()) { + sb.append(",\n"); sb.append( - schema.getColumns().stream() + schema.getWatermarkSpecs().stream() .map( - column -> + watermarkSpec -> String.format( - "%s%s", printIndent, getColumnString(column))) - .collect(Collectors.joining(",\n"))); - // append watermark spec - if (!schema.getWatermarkSpecs().isEmpty()) { - sb.append(",\n"); - sb.append( - schema.getWatermarkSpecs().stream() + "%sWATERMARK FOR %s AS %s", + printIndent, + String.join( + ".", + EncodingUtils.escapeIdentifier( + watermarkSpec + .getRowtimeAttribute())), + watermarkSpec + .getWatermarkExpression() + .asSummaryString())) + .collect(Collectors.joining("\n"))); + } + // append constraint + if (schema.getPrimaryKey().isPresent()) { + sb.append(",\n"); + sb.append(String.format("%s%s", printIndent, schema.getPrimaryKey().get())); + } + sb.append("\n) "); + // append comment + String comment = table.getComment(); + if (StringUtils.isNotEmpty(comment)) { + sb.append(String.format("COMMENT '%s'\n", comment)); + } + // append partitions + ResolvedCatalogTable catalogTable = (ResolvedCatalogTable) table; + if (catalogTable.isPartitioned()) { + sb.append("PARTITIONED BY (") + .append( + catalogTable.getPartitionKeys().stream() + .map(key -> String.format("`%s`", key)) + .collect(Collectors.joining(", "))) + .append(")\n"); + } + // append `with` properties + Map options = table.getOptions(); + sb.append("WITH (\n") + .append( + options.entrySet().stream() .map( - watermarkSpec -> + entry -> String.format( - "%sWATERMARK FOR %s AS %s", + "%s'%s' = '%s'", printIndent, - String.join( - ".", - EncodingUtils.escapeIdentifier( - watermarkSpec - .getRowtimeAttribute())), - watermarkSpec - .getWatermarkExpression() - .asSummaryString())) - .collect(Collectors.joining("\n"))); - } - // append constraint - if (schema.getPrimaryKey().isPresent()) { - sb.append(",\n"); - sb.append(String.format("%s%s", printIndent, schema.getPrimaryKey().get())); - } - sb.append("\n) "); - // append comment - String comment = table.getComment(); - if (StringUtils.isNotEmpty(comment)) { - sb.append(String.format("COMMENT '%s'\n", comment)); - } - - // append partitions - ResolvedCatalogTable catalogTable = (ResolvedCatalogTable) table; - if (catalogTable.isPartitioned()) { - sb.append("PARTITIONED BY (") - .append( - catalogTable.getPartitionKeys().stream() - .map(key -> String.format("`%s`", key)) - .collect(Collectors.joining(", "))) - .append(")\n"); - } - // append `with` properties - Map options = table.getOptions(); - sb.append("WITH (\n") - .append( - options.entrySet().stream() - .map( - entry -> - String.format( - "%s'%s' = '%s'", - printIndent, - entry.getKey(), - entry.getValue())) - .collect(Collectors.joining(",\n"))) - .append("\n)\n"); - } else { - sb.append(" AS\n"); - sb.append(((ResolvedCatalogView) table).getExpandedQuery()).append("\n"); - } - return new String[] {sb.toString()}; + entry.getKey(), + entry.getValue())) + .collect(Collectors.joining(",\n"))) + .append("\n)\n"); + return sb.toString(); } private String getColumnString(Column column) { @@ -1436,6 +1439,7 @@ private String getColumnString(Column column) { sb.append(e); }); } + // TODO: Print the column comment until FLINK-18958 is fixed return sb.toString(); } diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/catalog/CatalogTableITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/catalog/CatalogTableITCase.scala index 232fec0a1de63..d69c3ce2c1b98 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/catalog/CatalogTableITCase.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/catalog/CatalogTableITCase.scala @@ -20,7 +20,7 @@ package org.apache.flink.table.planner.catalog import org.apache.flink.table.api.config.{ExecutionConfigOptions, TableConfigOptions} import org.apache.flink.table.api.internal.TableEnvironmentImpl -import org.apache.flink.table.api.{EnvironmentSettings, TableEnvironment, ValidationException} +import org.apache.flink.table.api.{EnvironmentSettings, TableEnvironment, TableException, ValidationException} import org.apache.flink.table.catalog.{CatalogDatabaseImpl, CatalogFunctionImpl, GenericInMemoryCatalog, ObjectPath} import org.apache.flink.table.planner.expressions.utils.Func0 import org.apache.flink.table.planner.factories.utils.TestCollectionTableFactory @@ -39,7 +39,6 @@ import java.io.File import java.math.{BigDecimal => JBigDecimal} import java.net.URI import java.util - import scala.collection.JavaConversions._ /** Test cases for catalog table. */ @@ -1028,6 +1027,12 @@ class CatalogTableITCase(isStreamingMode: Boolean) extends AbstractTestBase { tableEnv.executeSql(executedDDL) val row = tableEnv.executeSql("SHOW CREATE TABLE `TBL1`").collect().next() assertEquals(expectedDDL, row.getField(0)) + + expectedEx.expect(classOf[ValidationException]) + expectedEx.expectMessage( + "Could not execute SHOW CREATE TABLE. " + + "Table with identifier `default_catalog`.`default_database`.`tmp` does not exist.") + tableEnv.executeSql("SHOW CREATE TABLE `tmp`") } @Test @@ -1047,16 +1052,13 @@ class CatalogTableITCase(isStreamingMode: Boolean) extends AbstractTestBase { |from `source` |group by `group` |""".stripMargin - val expectedDDL = - """ |CREATE VIEW `default_catalog`.`default_database`.`tmp` AS - |SELECT `source`.`group`, AVG(`source`.`score`) AS `avg_score` - |FROM `default_catalog`.`default_database`.`source` - |GROUP BY `source`.`group` - |""".stripMargin tableEnv.executeSql(createTableDDL) tableEnv.executeSql(createViewDDL) - val row = tableEnv.executeSql("SHOW CREATE TABLE `tmp`").collect().next() - assertEquals(expectedDDL, row.getField(0)) + expectedEx.expect(classOf[TableException]) + expectedEx.expectMessage( + "Could not execute SHOW CREATE TABLE. " + + "View with identifier `default_catalog`.`default_database`.`tmp` is not supported.") + tableEnv.executeSql("SHOW CREATE TABLE `tmp`") } @Test From 730d1127a5b96ef6862ee9c8c600ff463c8ee9f5 Mon Sep 17 00:00:00 2001 From: Jane Chan Date: Fri, 16 Apr 2021 15:25:21 +0800 Subject: [PATCH 5/7] Address comments. 1. Change exception message 2. Revert newline 3. Rebase master --- docs/content.zh/docs/dev/table/sql/show.md | 1 + docs/content/docs/dev/table/sql/show.md | 1 + flink-table/flink-sql-client/src/test/resources/sql/view.q | 2 +- .../apache/flink/table/api/internal/TableEnvironmentImpl.java | 2 +- .../flink/table/planner/catalog/CatalogTableITCase.scala | 4 ++-- 5 files changed, 6 insertions(+), 4 deletions(-) diff --git a/docs/content.zh/docs/dev/table/sql/show.md b/docs/content.zh/docs/dev/table/sql/show.md index 8289651a1742d..c93eb88edfc0f 100644 --- a/docs/content.zh/docs/dev/table/sql/show.md +++ b/docs/content.zh/docs/dev/table/sql/show.md @@ -382,6 +382,7 @@ Flink SQL> CREATE VIEW my_view AS ...; Flink SQL> SHOW VIEWS; my_view + Flink SQL> SHOW FUNCTIONS; mod sha256 diff --git a/docs/content/docs/dev/table/sql/show.md b/docs/content/docs/dev/table/sql/show.md index 80ff53bc48a16..f48035864fcd6 100644 --- a/docs/content/docs/dev/table/sql/show.md +++ b/docs/content/docs/dev/table/sql/show.md @@ -381,6 +381,7 @@ Flink SQL> CREATE VIEW my_view AS ...; Flink SQL> SHOW VIEWS; my_view + Flink SQL> SHOW FUNCTIONS; mod sha256 diff --git a/flink-table/flink-sql-client/src/test/resources/sql/view.q b/flink-table/flink-sql-client/src/test/resources/sql/view.q index e9821d7d1e745..d2e115ceb4715 100644 --- a/flink-table/flink-sql-client/src/test/resources/sql/view.q +++ b/flink-table/flink-sql-client/src/test/resources/sql/view.q @@ -75,7 +75,7 @@ show views; # test SHOW CREATE TABLE for views show create table v1; [ERROR] Could not execute SQL statement. Reason: -org.apache.flink.table.api.TableException: Could not execute SHOW CREATE TABLE. View with identifier `default_catalog`.`default_database`.`v1` is not supported. +org.apache.flink.table.api.TableException: SHOW CREATE TABLE does not support showing CREATE VIEW statement with identifier `default_catalog`.`default_database`.`v1`. !error # ==== test permanent view ===== diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java index 9873c5f4d0e73..7320f1dc169f9 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java @@ -1336,7 +1336,7 @@ private String buildShowCreateTableRow( if (kind == CatalogBaseTable.TableKind.VIEW) { throw new TableException( String.format( - "Could not execute SHOW CREATE TABLE. View with identifier %s is not supported.", + "SHOW CREATE TABLE does not support showing CREATE VIEW statement with identifier %s.", sqlIdentifier.asSerializableString())); } StringBuilder sb = diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/catalog/CatalogTableITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/catalog/CatalogTableITCase.scala index d69c3ce2c1b98..bf05996fcd1f1 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/catalog/CatalogTableITCase.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/catalog/CatalogTableITCase.scala @@ -1056,8 +1056,8 @@ class CatalogTableITCase(isStreamingMode: Boolean) extends AbstractTestBase { tableEnv.executeSql(createViewDDL) expectedEx.expect(classOf[TableException]) expectedEx.expectMessage( - "Could not execute SHOW CREATE TABLE. " + - "View with identifier `default_catalog`.`default_database`.`tmp` is not supported.") + "SHOW CREATE TABLE does not support showing CREATE VIEW statement with " + + "identifier `default_catalog`.`default_database`.`tmp`.") tableEnv.executeSql("SHOW CREATE TABLE `tmp`") } From aad5947f15be862099e4358e6cbd7473e1f8ab7c Mon Sep 17 00:00:00 2001 From: Jane Chan Date: Tue, 20 Apr 2021 17:52:49 +0800 Subject: [PATCH 6/7] Address comments. 1. Remove final modifier in UniqueConstraint#getTypeString 2. Rename sqlIdentifier to tableIdentifier both in ShowCreateTableOperation and TableEnvironmentImpl 3. Use asSerializableString and EncodingUtils#escapeIdentifier 4. Add more data types in CatalogTableITCase 5. Remove unnecessary String#join in WatermarkSpec 6. Update documentation --- docs/content.zh/docs/dev/table/sql/show.md | 2 +- docs/content/docs/dev/table/sql/show.md | 2 +- .../api/internal/TableEnvironmentImpl.java | 37 +++++++------------ .../operations/ShowCreateTableOperation.java | 10 ++--- .../flink/table/catalog/UniqueConstraint.java | 2 +- .../flink/table/catalog/WatermarkSpec.java | 2 +- .../planner/catalog/CatalogTableITCase.scala | 20 +++++++--- 7 files changed, 36 insertions(+), 39 deletions(-) diff --git a/docs/content.zh/docs/dev/table/sql/show.md b/docs/content.zh/docs/dev/table/sql/show.md index c93eb88edfc0f..0297cdd0b0162 100644 --- a/docs/content.zh/docs/dev/table/sql/show.md +++ b/docs/content.zh/docs/dev/table/sql/show.md @@ -467,7 +467,7 @@ SHOW CREATE TABLE [catalog_name.][db_name.]table_name 展示创建指定表的 create 语句。 -Attention 目前 `SHOW CREATE TABLE` 只支持通过 Flink SQL 创建的表。 +Attention 目前 `SHOW CREATE TABLE` 只支持通过 Flink SQL DDL 创建的表。 ## SHOW VIEWS diff --git a/docs/content/docs/dev/table/sql/show.md b/docs/content/docs/dev/table/sql/show.md index f48035864fcd6..9ea474b893f51 100644 --- a/docs/content/docs/dev/table/sql/show.md +++ b/docs/content/docs/dev/table/sql/show.md @@ -467,7 +467,7 @@ SHOW CREATE TABLE Show create table statement for specified table. -Attention Currently `SHOW CREATE TABLE` only supports table that is created by Flink SQL. +Attention Currently `SHOW CREATE TABLE` only supports table that is created by Flink SQL DDL. ## SHOW VIEWS diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java index 7320f1dc169f9..19720212ad12c 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java @@ -147,7 +147,6 @@ import org.apache.flink.table.types.AbstractDataType; import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.logical.LogicalType; -import org.apache.flink.table.types.logical.LogicalTypeRoot; import org.apache.flink.table.utils.EncodingUtils; import org.apache.flink.table.utils.PrintUtils; import org.apache.flink.table.utils.TableSchemaUtils; @@ -185,7 +184,6 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal { private final ModuleManager moduleManager; private final OperationTreeBuilder operationTreeBuilder; private final List bufferedModifyOperations = new ArrayList<>(); - private final String printIndent = " "; protected final TableConfig tableConfig; protected final Executor execEnv; @@ -1133,7 +1131,7 @@ public TableResult executeInternal(Operation operation) { ShowCreateTableOperation showCreateTableOperation = (ShowCreateTableOperation) operation; Optional result = - catalogManager.getTable(showCreateTableOperation.getSqlIdentifier()); + catalogManager.getTable(showCreateTableOperation.getTableIdentifier()); if (result.isPresent()) { return TableResultImpl.builder() .resultKind(ResultKind.SUCCESS_WITH_CONTENT) @@ -1143,8 +1141,8 @@ public TableResult executeInternal(Operation operation) { Row.of( buildShowCreateTableRow( result.get().getResolvedTable(), - ((ShowCreateTableOperation) operation) - .getSqlIdentifier(), + showCreateTableOperation + .getTableIdentifier(), result.get().isTemporary())))) .setPrintStyle(TableResultImpl.PrintStyle.rawContent()) .build(); @@ -1153,7 +1151,7 @@ public TableResult executeInternal(Operation operation) { String.format( "Could not execute SHOW CREATE TABLE. Table with identifier %s does not exist.", showCreateTableOperation - .getSqlIdentifier() + .getTableIdentifier() .asSerializableString())); } } else if (operation instanceof ShowCurrentCatalogOperation) { @@ -1330,21 +1328,22 @@ private TableResult buildShowResult(String columnName, String[] objects) { private String buildShowCreateTableRow( ResolvedCatalogBaseTable table, - ObjectIdentifier sqlIdentifier, + ObjectIdentifier tableIdentifier, boolean isTemporary) { + final String printIndent = " "; CatalogBaseTable.TableKind kind = table.getTableKind(); if (kind == CatalogBaseTable.TableKind.VIEW) { throw new TableException( String.format( "SHOW CREATE TABLE does not support showing CREATE VIEW statement with identifier %s.", - sqlIdentifier.asSerializableString())); + tableIdentifier.asSerializableString())); } StringBuilder sb = new StringBuilder( String.format( "CREATE %sTABLE %s (\n", isTemporary ? "TEMPORARY " : "", - sqlIdentifier.asSerializableString())); + tableIdentifier.asSerializableString())); ResolvedSchema schema = table.getResolvedSchema(); // append columns sb.append( @@ -1361,14 +1360,11 @@ private String buildShowCreateTableRow( String.format( "%sWATERMARK FOR %s AS %s", printIndent, - String.join( - ".", - EncodingUtils.escapeIdentifier( - watermarkSpec - .getRowtimeAttribute())), + EncodingUtils.escapeIdentifier( + watermarkSpec.getRowtimeAttribute()), watermarkSpec .getWatermarkExpression() - .asSummaryString())) + .asSerializableString())) .collect(Collectors.joining("\n"))); } // append constraint @@ -1388,7 +1384,7 @@ private String buildShowCreateTableRow( sb.append("PARTITIONED BY (") .append( catalogTable.getPartitionKeys().stream() - .map(key -> String.format("`%s`", key)) + .map(EncodingUtils::escapeIdentifier) .collect(Collectors.joining(", "))) .append(")\n"); } @@ -1424,14 +1420,7 @@ private String getColumnString(Column column) { "Column expression can not be null for computed column '%s'", column.getName())))); } else { - DataType dataType = column.getDataType(); - String type = dataType.toString(); - LogicalType logicalType = dataType.getLogicalType(); - // skip internal timestamp kind - if (logicalType.getTypeRoot() == LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE) { - type = logicalType.asSerializableString(); - } - sb.append(type); + sb.append(column.getDataType().getLogicalType().asSerializableString()); column.explainExtras() .ifPresent( e -> { diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowCreateTableOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowCreateTableOperation.java index 1008289fcd6cb..47f6b3eed7ddf 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowCreateTableOperation.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowCreateTableOperation.java @@ -23,18 +23,18 @@ /** Operation to describe a SHOW CREATE TABLE statement. */ public class ShowCreateTableOperation implements ShowOperation { - private final ObjectIdentifier sqlIdentifier; + private final ObjectIdentifier tableIdentifier; public ShowCreateTableOperation(ObjectIdentifier sqlIdentifier) { - this.sqlIdentifier = sqlIdentifier; + this.tableIdentifier = sqlIdentifier; } - public ObjectIdentifier getSqlIdentifier() { - return sqlIdentifier; + public ObjectIdentifier getTableIdentifier() { + return tableIdentifier; } @Override public String asSummaryString() { - return String.format("SHOW CREATE TABLE %s", sqlIdentifier.asSummaryString()); + return String.format("SHOW CREATE TABLE %s", tableIdentifier.asSummaryString()); } } diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/UniqueConstraint.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/UniqueConstraint.java index 08654dc35835d..722a51d3b9f16 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/UniqueConstraint.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/UniqueConstraint.java @@ -56,7 +56,7 @@ public List getColumns() { return columns; } - private final String getTypeString() { + private String getTypeString() { switch (getType()) { case PRIMARY_KEY: return "PRIMARY KEY"; diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/WatermarkSpec.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/WatermarkSpec.java index 5179684e91d79..f1c6342223ae6 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/WatermarkSpec.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/WatermarkSpec.java @@ -68,7 +68,7 @@ public ResolvedExpression getWatermarkExpression() { public String asSummaryString() { return "WATERMARK FOR " - + String.join(".", EncodingUtils.escapeIdentifier(rowtimeAttribute)) + + EncodingUtils.escapeIdentifier(rowtimeAttribute) + ": " + watermarkExpression.getOutputDataType() + " AS " diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/catalog/CatalogTableITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/catalog/CatalogTableITCase.scala index bf05996fcd1f1..e6bea72262e27 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/catalog/CatalogTableITCase.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/catalog/CatalogTableITCase.scala @@ -994,9 +994,13 @@ class CatalogTableITCase(isStreamingMode: Boolean) extends AbstractTestBase { | g as 2*(a+1), | b string not null, | c bigint metadata virtual, - | ts timestamp(3), + | e row, + | f as myfunc(a), + | ts1 timestamp(3), + | ts2 timestamp_ltz(3) metadata from 'timestamp', + | `__source__` varchar(255), | proc as proctime(), - | watermark for ts as ts - interval '5' second, + | watermark for ts1 as cast(timestampadd(hour, 8, ts1) as timestamp(3)) - interval '5' second, | constraint test_constraint primary key (a, b) not enforced |) comment 'test show create table statement' |partitioned by (b,h) @@ -1009,13 +1013,17 @@ class CatalogTableITCase(isStreamingMode: Boolean) extends AbstractTestBase { val expectedDDL = """ |CREATE TEMPORARY TABLE `default_catalog`.`default_database`.`TBL1` ( | `a` BIGINT NOT NULL, - | `h` STRING, + | `h` VARCHAR(2147483647), | `g` AS 2 * (`a` + 1), - | `b` STRING NOT NULL, + | `b` VARCHAR(2147483647) NOT NULL, | `c` BIGINT METADATA VIRTUAL, - | `ts` TIMESTAMP(3), + | `e` ROW<`name` VARCHAR(2147483647), `age` INT, `flag` BOOLEAN>, + | `f` AS `default_catalog`.`default_database`.`myfunc`(`a`), + | `ts1` TIMESTAMP(3), + | `ts2` TIMESTAMP(3) WITH LOCAL TIME ZONE METADATA FROM 'timestamp', + | `__source__` VARCHAR(255), | `proc` AS PROCTIME(), - | WATERMARK FOR `ts` AS `ts` - INTERVAL '5' SECOND, + | WATERMARK FOR `ts1` AS CAST(TIMESTAMPADD(HOUR, 8, `ts1`) AS TIMESTAMP(3)) - INTERVAL '5' SECOND, | CONSTRAINT `test_constraint` PRIMARY KEY (`a`, `b`) NOT ENFORCED |) COMMENT 'test show create table statement' |PARTITIONED BY (`b`, `h`) From 353393ce49d6df894f38c059a01ce2b7283fce01 Mon Sep 17 00:00:00 2001 From: Jane Chan Date: Tue, 20 Apr 2021 22:32:23 +0800 Subject: [PATCH 7/7] Simplify case content to fix scala checkstyle failure that the max line length cannot exceed 100 --- .../flink/table/planner/catalog/CatalogTableITCase.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/catalog/CatalogTableITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/catalog/CatalogTableITCase.scala index e6bea72262e27..4f122e0cc4ca0 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/catalog/CatalogTableITCase.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/catalog/CatalogTableITCase.scala @@ -1000,7 +1000,7 @@ class CatalogTableITCase(isStreamingMode: Boolean) extends AbstractTestBase { | ts2 timestamp_ltz(3) metadata from 'timestamp', | `__source__` varchar(255), | proc as proctime(), - | watermark for ts1 as cast(timestampadd(hour, 8, ts1) as timestamp(3)) - interval '5' second, + | watermark for ts1 as cast(timestampadd(hour, 8, ts1) as timestamp(3)), | constraint test_constraint primary key (a, b) not enforced |) comment 'test show create table statement' |partitioned by (b,h) @@ -1023,7 +1023,7 @@ class CatalogTableITCase(isStreamingMode: Boolean) extends AbstractTestBase { | `ts2` TIMESTAMP(3) WITH LOCAL TIME ZONE METADATA FROM 'timestamp', | `__source__` VARCHAR(255), | `proc` AS PROCTIME(), - | WATERMARK FOR `ts1` AS CAST(TIMESTAMPADD(HOUR, 8, `ts1`) AS TIMESTAMP(3)) - INTERVAL '5' SECOND, + | WATERMARK FOR `ts1` AS CAST(TIMESTAMPADD(HOUR, 8, `ts1`) AS TIMESTAMP(3)), | CONSTRAINT `test_constraint` PRIMARY KEY (`a`, `b`) NOT ENFORCED |) COMMENT 'test show create table statement' |PARTITIONED BY (`b`, `h`)