From 9f234e9c3bb7f7756b456b197fd31bde90dd396f Mon Sep 17 00:00:00 2001 From: Terry Kim Date: Wed, 23 Oct 2019 18:39:26 -0700 Subject: [PATCH 1/3] initial commit --- .../org/apache/spark/sql/catalyst/parser/SqlBase.g4 | 2 +- .../apache/spark/sql/catalyst/parser/AstBuilder.scala | 7 +++++++ .../spark/sql/catalyst/plans/logical/statements.scala | 7 +++++++ .../spark/sql/catalyst/parser/DDLParserSuite.scala | 10 ++++++++++ .../sql/catalyst/analysis/ResolveSessionCatalog.scala | 6 +++++- .../apache/spark/sql/execution/SparkSqlParser.scala | 7 ------- .../spark/sql/connector/DataSourceV2SQLSuite.scala | 10 ++++++++++ 7 files changed, 40 insertions(+), 9 deletions(-) diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 index 970d244071e0d..86e09d4dbe663 100644 --- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 +++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 @@ -207,7 +207,7 @@ statement | REFRESH (STRING | .*?) #refreshResource | CACHE LAZY? TABLE tableIdentifier (OPTIONS options=tablePropertyList)? (AS? query)? #cacheTable - | UNCACHE TABLE (IF EXISTS)? tableIdentifier #uncacheTable + | UNCACHE TABLE (IF EXISTS)? multipartIdentifier #uncacheTable | CLEAR CACHE #clearCache | LOAD DATA LOCAL? INPATH path=STRING OVERWRITE? INTO TABLE tableIdentifier partitionSpec? #loadData diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index 940dfd0fc333d..bd82d29b77d45 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -2811,4 +2811,11 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging override def visitRefreshTable(ctx: RefreshTableContext): LogicalPlan = withOrigin(ctx) { RefreshTableStatement(visitMultipartIdentifier(ctx.multipartIdentifier())) } + + /** + * Create an [[UncacheTableStatement]] logical plan. + */ + override def visitUncacheTable(ctx: UncacheTableContext): LogicalPlan = withOrigin(ctx) { + UncacheTableStatement(visitMultipartIdentifier(ctx.multipartIdentifier), ctx.EXISTS != null) + } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala index 127d9026f802e..8a796df82586e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala @@ -348,3 +348,10 @@ case class ShowPartitionsStatement( * A REFRESH TABLE statement, as parsed from SQL */ case class RefreshTableStatement(tableName: Seq[String]) extends ParsedStatement + +/** + * A UNCACHE TABLE statement, as parsed from SQL + */ +case class UncacheTableStatement( + tableName: Seq[String], + ifExists: Boolean) extends ParsedStatement diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala index 8e605bd15f696..8df8864be38ef 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala @@ -1087,6 +1087,16 @@ class DDLParserSuite extends AnalysisTest { RefreshTableStatement(Seq("a", "b", "c"))) } + test("UNCACHE TABLE table") { + comparePlans( + parsePlan("UNCACHE TABLE a.b.c"), + UncacheTableStatement(Seq("a", "b", "c"), ifExists = false)) + + comparePlans( + parsePlan("UNCACHE TABLE IF EXISTS a.b.c"), + UncacheTableStatement(Seq("a", "b", "c"), ifExists = true)) + } + private case class TableSpec( name: Seq[String], schema: Option[StructType], diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala index 230b8f3906bd2..56206e01d8976 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala @@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogPlugin, LookupCatalog, TableChange, V1Table} import org.apache.spark.sql.connector.expressions.Transform -import org.apache.spark.sql.execution.command.{AlterTableAddColumnsCommand, AlterTableRecoverPartitionsCommand, AlterTableSetLocationCommand, AlterTableSetPropertiesCommand, AlterTableUnsetPropertiesCommand, AnalyzeColumnCommand, AnalyzePartitionCommand, AnalyzeTableCommand, CreateDatabaseCommand, DescribeColumnCommand, DescribeTableCommand, DropTableCommand, ShowPartitionsCommand, ShowTablesCommand, TruncateTableCommand} +import org.apache.spark.sql.execution.command.{AlterTableAddColumnsCommand, AlterTableRecoverPartitionsCommand, AlterTableSetLocationCommand, AlterTableSetPropertiesCommand, AlterTableUnsetPropertiesCommand, AnalyzeColumnCommand, AnalyzePartitionCommand, AnalyzeTableCommand, CreateDatabaseCommand, DescribeColumnCommand, DescribeTableCommand, DropTableCommand, ShowPartitionsCommand, ShowTablesCommand, TruncateTableCommand, UncacheTableCommand} import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource, RefreshTable} import org.apache.spark.sql.execution.datasources.v2.FileDataSourceV2 import org.apache.spark.sql.internal.SQLConf @@ -310,6 +310,10 @@ class ResolveSessionCatalog( ShowPartitionsCommand( v1TableName.asTableIdentifier, partitionSpec) + + case UncacheTableStatement(tableName, ifExists) => + val v1TableName = parseV1Table(tableName, "UNCACHE TABLE") + UncacheTableCommand(v1TableName.asTableIdentifier, ifExists) } private def parseV1Table(tableName: Seq[String], sql: String): Seq[String] = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index 2439621f7725a..0b71241d0ac40 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -180,13 +180,6 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) { CacheTableCommand(tableIdent, query, ctx.LAZY != null, options) } - /** - * Create an [[UncacheTableCommand]] logical plan. - */ - override def visitUncacheTable(ctx: UncacheTableContext): LogicalPlan = withOrigin(ctx) { - UncacheTableCommand(visitTableIdentifier(ctx.tableIdentifier), ctx.EXISTS != null) - } - /** * Create a [[ClearCacheCommand]] logical plan. */ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala index 463147903c923..9f085f0d5864e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala @@ -1268,6 +1268,16 @@ class DataSourceV2SQLSuite } } + test("UNCACHE TABLE") { + val t = "testcat.ns1.ns2.tbl" + withTable(t) { + sql(s"CREATE TABLE $t (id bigint, data string) USING foo") + + testV1Command("UNCACHE TABLE", t) + testV1Command("UNCACHE TABLE", s"IF EXISTS $t") + } + } + private def testV1Command(sqlCommand: String, sqlParams: String): Unit = { val e = intercept[AnalysisException] { sql(s"$sqlCommand $sqlParams") From 7481684d5ae762d94d6c88b7c930db5820d9bd00 Mon Sep 17 00:00:00 2001 From: Terry Kim Date: Thu, 24 Oct 2019 06:45:15 -0700 Subject: [PATCH 2/3] Change the order. --- .../sql/catalyst/parser/DDLParserSuite.scala | 20 +++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala index 37526bc39fbd0..bc8a06a8cbc7f 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala @@ -1062,6 +1062,16 @@ class DDLParserSuite extends AnalysisTest { "It is not allowed to add catalog/namespace prefix a.b") } + test("UNCACHE TABLE table") { + comparePlans( + parsePlan("UNCACHE TABLE a.b.c"), + UncacheTableStatement(Seq("a", "b", "c"), ifExists = false)) + + comparePlans( + parsePlan("UNCACHE TABLE IF EXISTS a.b.c"), + UncacheTableStatement(Seq("a", "b", "c"), ifExists = true)) + } + test("TRUNCATE table") { comparePlans( parsePlan("TRUNCATE TABLE a.b.c"), @@ -1104,16 +1114,6 @@ class DDLParserSuite extends AnalysisTest { RefreshTableStatement(Seq("a", "b", "c"))) } - test("UNCACHE TABLE table") { - comparePlans( - parsePlan("UNCACHE TABLE a.b.c"), - UncacheTableStatement(Seq("a", "b", "c"), ifExists = false)) - - comparePlans( - parsePlan("UNCACHE TABLE IF EXISTS a.b.c"), - UncacheTableStatement(Seq("a", "b", "c"), ifExists = true)) - } - private case class TableSpec( name: Seq[String], schema: Option[StructType], From 53448b217ac5b471b1125b8c2bc5336408e1f48c Mon Sep 17 00:00:00 2001 From: Terry Kim Date: Thu, 24 Oct 2019 14:24:29 -0700 Subject: [PATCH 3/3] address comments --- .../spark/sql/catalyst/plans/logical/statements.scala | 2 +- .../apache/spark/sql/catalyst/parser/DDLParserSuite.scala | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala index fbb5d206fbae2..ef8c922694347 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala @@ -340,7 +340,7 @@ case class CacheTableStatement( options: Map[String, String]) extends ParsedStatement /** - * A UNCACHE TABLE statement, as parsed from SQL + * An UNCACHE TABLE statement, as parsed from SQL */ case class UncacheTableStatement( tableName: Seq[String], diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala index bc8a06a8cbc7f..f4375956f0af6 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala @@ -1039,13 +1039,13 @@ class DDLParserSuite extends AnalysisTest { "missing 'COLUMNS' at ''") } - test("MSCK REPAIR table") { + test("MSCK REPAIR TABLE") { comparePlans( parsePlan("MSCK REPAIR TABLE a.b.c"), RepairTableStatement(Seq("a", "b", "c"))) } - test("CACHE table") { + test("CACHE TABLE") { comparePlans( parsePlan("CACHE TABLE a.b.c"), CacheTableStatement(Seq("a", "b", "c"), None, false, Map.empty)) @@ -1062,7 +1062,7 @@ class DDLParserSuite extends AnalysisTest { "It is not allowed to add catalog/namespace prefix a.b") } - test("UNCACHE TABLE table") { + test("UNCACHE TABLE") { comparePlans( parsePlan("UNCACHE TABLE a.b.c"), UncacheTableStatement(Seq("a", "b", "c"), ifExists = false)) @@ -1108,7 +1108,7 @@ class DDLParserSuite extends AnalysisTest { comparePlans(parsed5, expected5) } - test("REFRESH TABLE table") { + test("REFRESH TABLE") { comparePlans( parsePlan("REFRESH TABLE a.b.c"), RefreshTableStatement(Seq("a", "b", "c")))