diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 23a1b7bdde93c..abd38f2f9d940 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -1085,7 +1085,7 @@ class Analyzer(override val catalogManager: CatalogManager) lookupTableOrView(identifier).map { case v: ResolvedView => val viewStr = if (v.isTemp) "temp view" else "view" - u.failAnalysis(s"${v.identifier.quoted} is a $viewStr. '$cmd' expects a table.'") + u.failAnalysis(s"${v.identifier.quoted} is a $viewStr. '$cmd' expects a table.") case table => table }.getOrElse(u) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala index 2818ba58075cd..61ac6346ff944 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala @@ -577,6 +577,8 @@ trait CheckAnalysis extends PredicateHelper { case AlterTableDropPartition(ResolvedTable(_, _, table), parts, _, _, _) => checkAlterTablePartition(table, parts) + case showPartitions: ShowPartitions => checkShowPartitions(showPartitions) + case _ => // Fallbacks to the following checks } @@ -1009,4 +1011,16 @@ trait CheckAnalysis extends PredicateHelper { case _ => } } + + // Make sure that the `SHOW PARTITIONS` command is allowed for the table + private def checkShowPartitions(showPartitions: ShowPartitions): Unit = showPartitions match { + case ShowPartitions(rt: ResolvedTable, _) + if !rt.table.isInstanceOf[SupportsPartitionManagement] => + failAnalysis(s"SHOW PARTITIONS cannot run for a table which does not support partitioning") + case ShowPartitions(ResolvedTable(_, _, partTable: SupportsPartitionManagement), _) + if partTable.partitionSchema().isEmpty => + failAnalysis( + s"SHOW PARTITIONS is not allowed on a table that is not partitioned: ${partTable.name()}") + case _ => + } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolvePartitionSpec.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolvePartitionSpec.scala index 98c6872a47cc6..38991a9e24fa8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolvePartitionSpec.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolvePartitionSpec.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.analysis import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions.{Cast, Literal} -import org.apache.spark.sql.catalyst.plans.logical.{AlterTableAddPartition, AlterTableDropPartition, LogicalPlan} +import org.apache.spark.sql.catalyst.plans.logical.{AlterTableAddPartition, AlterTableDropPartition, LogicalPlan, ShowPartitions} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.util.CharVarcharUtils import org.apache.spark.sql.connector.catalog.SupportsPartitionManagement @@ -40,6 +40,12 @@ object ResolvePartitionSpec extends Rule[LogicalPlan] { case r @ AlterTableDropPartition( ResolvedTable(_, _, table: SupportsPartitionManagement), partSpecs, _, _, _) => r.copy(parts = resolvePartitionSpecs(table.name, partSpecs, table.partitionSchema())) + + case r @ ShowPartitions(ResolvedTable(_, _, table: SupportsPartitionManagement), partSpecs) => + r.copy(pattern = resolvePartitionSpecs( + table.name, + partSpecs.toSeq, + table.partitionSchema()).headOption) } private def resolvePartitionSpecs( @@ -48,25 +54,26 @@ object ResolvePartitionSpec extends Rule[LogicalPlan] { partSchema: StructType): Seq[ResolvedPartitionSpec] = partSpecs.map { case unresolvedPartSpec: UnresolvedPartitionSpec => + val normalizedSpec = normalizePartitionSpec( + unresolvedPartSpec.spec, + partSchema.map(_.name), + tableName, + conf.resolver) + val partitionNames = normalizedSpec.keySet + val requestedFields = partSchema.filter(field => partitionNames.contains(field.name)) ResolvedPartitionSpec( - convertToPartIdent(tableName, unresolvedPartSpec.spec, partSchema), + requestedFields.map(_.name), + convertToPartIdent(normalizedSpec, requestedFields), unresolvedPartSpec.location) case resolvedPartitionSpec: ResolvedPartitionSpec => resolvedPartitionSpec } private def convertToPartIdent( - tableName: String, partitionSpec: TablePartitionSpec, - partSchema: StructType): InternalRow = { - val normalizedSpec = normalizePartitionSpec( - partitionSpec, - partSchema.map(_.name), - tableName, - conf.resolver) - - val partValues = partSchema.map { part => - val raw = normalizedSpec.get(part.name).orNull + schema: Seq[StructField]): InternalRow = { + val partValues = schema.map { part => + val raw = partitionSpec.get(part.name).orNull val dt = CharVarcharUtils.replaceCharVarcharWithString(part.dataType) Cast(Literal.create(raw, StringType), dt, Some(conf.sessionLocalTimeZone)).eval() } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/v2ResolutionPlans.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/v2ResolutionPlans.scala index 95fc4f47dec7f..1518f064d78db 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/v2ResolutionPlans.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/v2ResolutionPlans.scala @@ -89,7 +89,8 @@ case class ResolvedTable(catalog: TableCatalog, identifier: Identifier, table: T } case class ResolvedPartitionSpec( - spec: InternalRow, + names: Seq[String], + ident: InternalRow, location: Option[String] = None) extends PartitionSpec /** diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index ce95ea4b41def..ff8b56f0b724b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -3611,9 +3611,12 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg * }}} */ override def visitShowPartitions(ctx: ShowPartitionsContext): LogicalPlan = withOrigin(ctx) { - val table = visitMultipartIdentifier(ctx.multipartIdentifier) - val partitionKeys = Option(ctx.partitionSpec).map(visitNonOptionalPartitionSpec) - ShowPartitionsStatement(table, partitionKeys) + val partitionKeys = Option(ctx.partitionSpec).map { specCtx => + UnresolvedPartitionSpec(visitNonOptionalPartitionSpec(specCtx), None) + } + ShowPartitions( + UnresolvedTable(visitMultipartIdentifier(ctx.multipartIdentifier()), "SHOW PARTITIONS"), + partitionKeys) } /** diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala index effb4cff75930..1763547792e35 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala @@ -419,13 +419,6 @@ case class TruncateTableStatement( tableName: Seq[String], partitionSpec: Option[TablePartitionSpec]) extends ParsedStatement -/** - * A SHOW PARTITIONS statement, as parsed from SQL - */ -case class ShowPartitionsStatement( - tableName: Seq[String], - partitionSpec: Option[TablePartitionSpec]) extends ParsedStatement - /** * A SHOW CURRENT NAMESPACE statement, as parsed from SQL */ diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala index 4931f0eb2c007..67056470418fe 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala @@ -691,3 +691,18 @@ case class TruncateTable( override def children: Seq[LogicalPlan] = child :: Nil } + +/** + * The logical plan of the SHOW PARTITIONS command. + */ +case class ShowPartitions( + child: LogicalPlan, + pattern: Option[PartitionSpec]) extends Command { + override def children: Seq[LogicalPlan] = child :: Nil + + override lazy val resolved: Boolean = + childrenResolved && pattern.forall(_.isInstanceOf[ResolvedPartitionSpec]) + + override val output: Seq[Attribute] = Seq( + AttributeReference("partition", StringType, nullable = false)()) +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala index 53edd4fca7794..f6005f4b413a2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala @@ -430,11 +430,12 @@ class ResolveSessionCatalog( ident.asTableIdentifier, partitionSpec) - case ShowPartitionsStatement(tbl, partitionSpec) => - val v1TableName = parseV1Table(tbl, "SHOW PARTITIONS") + case ShowPartitions( + ResolvedV1TableOrViewIdentifier(ident), + pattern @ (None | Some(UnresolvedPartitionSpec(_, _)))) => ShowPartitionsCommand( - v1TableName.asTableIdentifier, - partitionSpec) + ident.asTableIdentifier, + pattern.map(_.asInstanceOf[UnresolvedPartitionSpec].spec)) case ShowColumns(ResolvedV1TableOrViewIdentifier(ident), ns) => val v1TableName = ident.asTableIdentifier diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AlterTableAddPartitionExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AlterTableAddPartitionExec.scala index 0171cdd9ca41a..d7fe25cff2064 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AlterTableAddPartitionExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AlterTableAddPartitionExec.scala @@ -37,20 +37,20 @@ case class AlterTableAddPartitionExec( override protected def run(): Seq[InternalRow] = { val (existsParts, notExistsParts) = - partSpecs.partition(p => table.partitionExists(p.spec)) + partSpecs.partition(p => table.partitionExists(p.ident)) if (existsParts.nonEmpty && !ignoreIfExists) { throw new PartitionsAlreadyExistException( - table.name(), existsParts.map(_.spec), table.partitionSchema()) + table.name(), existsParts.map(_.ident), table.partitionSchema()) } notExistsParts match { case Seq() => // Nothing will be done case Seq(partitionSpec) => val partProp = partitionSpec.location.map(loc => "location" -> loc).toMap - table.createPartition(partitionSpec.spec, partProp.asJava) + table.createPartition(partitionSpec.ident, partProp.asJava) case _ if table.isInstanceOf[SupportsAtomicPartitionManagement] => - val partIdents = notExistsParts.map(_.spec) + val partIdents = notExistsParts.map(_.ident) val partProps = notExistsParts.map(_.location.map(loc => "location" -> loc).toMap) table.asAtomicPartitionable .createPartitions( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AlterTableDropPartitionExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AlterTableDropPartitionExec.scala index 09a65804a05eb..c7a68ecb2bbee 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AlterTableDropPartitionExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AlterTableDropPartitionExec.scala @@ -35,7 +35,7 @@ case class AlterTableDropPartitionExec( override protected def run(): Seq[InternalRow] = { val (existsPartIdents, notExistsPartIdents) = - partSpecs.map(_.spec).partition(table.partitionExists) + partSpecs.map(_.ident).partition(table.partitionExists) if (notExistsPartIdents.nonEmpty && !ignoreIfNotExists) { throw new NoSuchPartitionsException( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala index 1fae8d937e90c..0c7bc19ad054e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.datasources.v2 import scala.collection.JavaConverters._ import org.apache.spark.sql.{AnalysisException, SparkSession, Strategy} -import org.apache.spark.sql.catalyst.analysis.{ResolvedNamespace, ResolvedTable} +import org.apache.spark.sql.catalyst.analysis.{ResolvedNamespace, ResolvedPartitionSpec, ResolvedTable} import org.apache.spark.sql.catalyst.expressions.{And, Expression, NamedExpression, PredicateHelper, SubqueryExpression} import org.apache.spark.sql.catalyst.planning.PhysicalOperation import org.apache.spark.sql.catalyst.plans.logical._ @@ -318,6 +318,15 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat case ShowColumns(_: ResolvedTable, _) => throw new AnalysisException("SHOW COLUMNS is not supported for v2 tables.") + case r @ ShowPartitions( + ResolvedTable(catalog, _, table: SupportsPartitionManagement), + pattern @ (None | Some(_: ResolvedPartitionSpec))) => + ShowPartitionsExec( + r.output, + catalog, + table, + pattern.map(_.asInstanceOf[ResolvedPartitionSpec])) :: Nil + case _ => Nil } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ShowPartitionsExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ShowPartitionsExec.scala new file mode 100644 index 0000000000000..44d6f4495f552 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ShowPartitionsExec.scala @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.v2 + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.analysis.ResolvedPartitionSpec +import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils.escapePathName +import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Literal} +import org.apache.spark.sql.connector.catalog.{SupportsPartitionManagement, TableCatalog} +import org.apache.spark.sql.execution.LeafExecNode +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.StringType +import org.apache.spark.unsafe.types.UTF8String + +/** + * Physical plan node for showing partitions. + */ +case class ShowPartitionsExec( + output: Seq[Attribute], + catalog: TableCatalog, + table: SupportsPartitionManagement, + partitionSpec: Option[ResolvedPartitionSpec]) extends V2CommandExec with LeafExecNode { + override protected def run(): Seq[InternalRow] = { + val (names, ident) = partitionSpec + .map(spec => (spec.names, spec.ident)) + // listPartitionByNames() should return all partitions if the partition spec + // does not specify any partition names. + .getOrElse((Seq.empty[String], InternalRow.empty)) + val partitionIdentifiers = table.listPartitionByNames(names.toArray, ident) + // Converting partition identifiers as `InternalRow` of partition values, + // for instance InternalRow(value0, value1, ..., valueN), to `InternalRow`s + // with a string in the format: "col0=value0/col1=value1/.../colN=valueN". + val schema = table.partitionSchema() + val len = schema.length + val partitions = new Array[String](len) + val timeZoneId = SQLConf.get.sessionLocalTimeZone + partitionIdentifiers.map { row => + var i = 0 + while (i < len) { + val dataType = schema(i).dataType + val partValue = row.get(i, dataType) + val partValueStr = Cast(Literal(partValue, dataType), StringType, Some(timeZoneId)) + .eval().toString + partitions(i) = escapePathName(schema(i).name) + "=" + escapePathName(partValueStr) + i += 1 + } + InternalRow(UTF8String.fromString(partitions.mkString("/"))) + } + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala index ffbc2287d81ad..583bc694dc3be 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala @@ -2284,7 +2284,6 @@ class DataSourceV2SQLSuite verify(s"CACHE TABLE $t") verify(s"UNCACHE TABLE $t") verify(s"TRUNCATE TABLE $t") - verify(s"SHOW PARTITIONS $t") verify(s"SHOW COLUMNS FROM $t") } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/ShowPartitionsParserSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/ShowPartitionsParserSuite.scala index bc75528b9644c..7b5cf8af4eead 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/ShowPartitionsParserSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/ShowPartitionsParserSuite.scala @@ -17,25 +17,30 @@ package org.apache.spark.sql.execution.command -import org.apache.spark.sql.catalyst.analysis.AnalysisTest +import org.apache.spark.sql.catalyst.analysis.{AnalysisTest, UnresolvedPartitionSpec, UnresolvedTable} import org.apache.spark.sql.catalyst.parser.CatalystSqlParser.parsePlan import org.apache.spark.sql.catalyst.parser.ParseException -import org.apache.spark.sql.catalyst.plans.logical.ShowPartitionsStatement +import org.apache.spark.sql.catalyst.plans.logical.ShowPartitions import org.apache.spark.sql.execution.SparkSqlParser import org.apache.spark.sql.test.SharedSparkSession class ShowPartitionsParserSuite extends AnalysisTest with SharedSparkSession { test("SHOW PARTITIONS") { + val commandName = "SHOW PARTITIONS" Seq( - "SHOW PARTITIONS t1" -> ShowPartitionsStatement(Seq("t1"), None), - "SHOW PARTITIONS db1.t1" -> ShowPartitionsStatement(Seq("db1", "t1"), None), + "SHOW PARTITIONS t1" -> ShowPartitions(UnresolvedTable(Seq("t1"), commandName), None), + "SHOW PARTITIONS db1.t1" -> ShowPartitions( + UnresolvedTable(Seq("db1", "t1"), commandName), None), "SHOW PARTITIONS t1 PARTITION(partcol1='partvalue', partcol2='partvalue')" -> - ShowPartitionsStatement( - Seq("t1"), - Some(Map("partcol1" -> "partvalue", "partcol2" -> "partvalue"))), - "SHOW PARTITIONS a.b.c" -> ShowPartitionsStatement(Seq("a", "b", "c"), None), + ShowPartitions( + UnresolvedTable(Seq("t1"), commandName), + Some(UnresolvedPartitionSpec(Map("partcol1" -> "partvalue", "partcol2" -> "partvalue")))), + "SHOW PARTITIONS a.b.c" -> ShowPartitions( + UnresolvedTable(Seq("a", "b", "c"), commandName), None), "SHOW PARTITIONS a.b.c PARTITION(ds='2017-06-10')" -> - ShowPartitionsStatement(Seq("a", "b", "c"), Some(Map("ds" -> "2017-06-10"))) + ShowPartitions( + UnresolvedTable(Seq("a", "b", "c"), commandName), + Some(UnresolvedPartitionSpec(Map("ds" -> "2017-06-10")))) ).foreach { case (sql, expected) => val parsed = parsePlan(sql) comparePlans(parsed, expected) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/ShowPartitionsSuiteBase.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/ShowPartitionsSuiteBase.scala index 413e170326eea..82457f96a3003 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/ShowPartitionsSuiteBase.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/ShowPartitionsSuiteBase.scala @@ -20,17 +20,133 @@ package org.apache.spark.sql.execution.command import org.scalactic.source.Position import org.scalatest.Tag -import org.apache.spark.sql.QueryTest +import org.apache.spark.sql.{AnalysisException, QueryTest, Row} import org.apache.spark.sql.test.SQLTestUtils +import org.apache.spark.sql.types.{StringType, StructType} trait ShowPartitionsSuiteBase extends QueryTest with SQLTestUtils { protected def version: String protected def catalog: String - protected def defaultNamespace: Seq[String] protected def defaultUsing: String + protected def wrongPartitionColumnsError(columns: String*): String + // Gets the schema of `SHOW PARTITIONS` + private val showSchema: StructType = new StructType().add("partition", StringType, false) + protected def runShowPartitionsSql(sqlText: String, expected: Seq[Row]): Unit = { + val df = spark.sql(sqlText) + assert(df.schema === showSchema) + checkAnswer(df, expected) + } override def test(testName: String, testTags: Tag*)(testFun: => Any) (implicit pos: Position): Unit = { super.test(s"SHOW PARTITIONS $version: " + testName, testTags: _*)(testFun) } + + protected def createDateTable(table: String): Unit = { + sql(s""" + |CREATE TABLE $table (price int, qty int, year int, month int) + |$defaultUsing + |partitioned by (year, month)""".stripMargin) + sql(s"INSERT INTO $table PARTITION(year = 2015, month = 1) SELECT 1, 1") + sql(s"INSERT INTO $table PARTITION(year = 2015, month = 2) SELECT 2, 2") + sql(s"ALTER TABLE $table ADD PARTITION(year = 2016, month = 2)") + sql(s"ALTER TABLE $table ADD PARTITION(year = 2016, month = 3)") + } + + protected def createWideTable(table: String): Unit = { + sql(s""" + |CREATE TABLE $table ( + | price int, qty int, + | year int, month int, hour int, minute int, sec int, extra int) + |$defaultUsing + |PARTITIONED BY (year, month, hour, minute, sec, extra) + |""".stripMargin) + sql(s""" + |INSERT INTO $table + |PARTITION(year = 2016, month = 3, hour = 10, minute = 10, sec = 10, extra = 1) SELECT 3, 3 + |""".stripMargin) + sql(s""" + |ALTER TABLE $table + |ADD PARTITION(year = 2016, month = 4, hour = 10, minute = 10, sec = 10, extra = 1) + |""".stripMargin) + } + + test("show partitions of non-partitioned table") { + withNamespace(s"$catalog.ns") { + sql(s"CREATE NAMESPACE $catalog.ns") + val table = s"$catalog.ns.not_partitioned_table" + withTable(table) { + sql(s"CREATE TABLE $table (col1 int) $defaultUsing") + val errMsg = intercept[AnalysisException] { + sql(s"SHOW PARTITIONS $table") + }.getMessage + assert(errMsg.contains("not allowed on a table that is not partitioned")) + } + } + } + + test("non-partitioning columns") { + withNamespace(s"$catalog.ns") { + sql(s"CREATE NAMESPACE $catalog.ns") + val table = s"$catalog.ns.dateTable" + withTable(table) { + createDateTable(table) + val errMsg = intercept[AnalysisException] { + sql(s"SHOW PARTITIONS $table PARTITION(abcd=2015, xyz=1)") + }.getMessage + assert(errMsg.contains(wrongPartitionColumnsError("abcd", "xyz"))) + } + } + } + + test("show everything") { + withNamespace(s"$catalog.ns") { + sql(s"CREATE NAMESPACE $catalog.ns") + val table = s"$catalog.ns.dateTable" + withTable(table) { + createDateTable(table) + runShowPartitionsSql( + s"show partitions $table", + Row("year=2015/month=1") :: + Row("year=2015/month=2") :: + Row("year=2016/month=2") :: + Row("year=2016/month=3") :: Nil) + } + } + } + + test("filter by partitions") { + withNamespace(s"$catalog.ns") { + sql(s"CREATE NAMESPACE $catalog.ns") + val table = s"$catalog.ns.dateTable" + withTable(table) { + createDateTable(table) + runShowPartitionsSql( + s"show partitions $table PARTITION(year=2015)", + Row("year=2015/month=1") :: + Row("year=2015/month=2") :: Nil) + runShowPartitionsSql( + s"show partitions $table PARTITION(year=2015, month=1)", + Row("year=2015/month=1") :: Nil) + runShowPartitionsSql( + s"show partitions $table PARTITION(month=2)", + Row("year=2015/month=2") :: + Row("year=2016/month=2") :: Nil) + } + } + } + + test("show everything more than 5 part keys") { + withNamespace(s"$catalog.ns") { + sql(s"CREATE NAMESPACE $catalog.ns") + val table = s"$catalog.ns.wideTable" + withTable(table) { + createWideTable(table) + runShowPartitionsSql( + s"show partitions $table", + Row("year=2016/month=3/hour=10/minute=10/sec=10/extra=1") :: + Row("year=2016/month=4/hour=10/minute=10/sec=10/extra=1") :: Nil) + } + } + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/ShowPartitionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/ShowPartitionsSuite.scala index bcc71e9b7241c..2b2bc9e63dc82 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/ShowPartitionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/ShowPartitionsSuite.scala @@ -18,7 +18,6 @@ package org.apache.spark.sql.execution.command.v1 import org.apache.spark.sql.{AnalysisException, Row, SaveMode} -import org.apache.spark.sql.catalyst.analysis.NoSuchTableException import org.apache.spark.sql.connector.catalog.CatalogManager import org.apache.spark.sql.execution.command import org.apache.spark.sql.test.SharedSparkSession @@ -26,104 +25,27 @@ import org.apache.spark.sql.test.SharedSparkSession trait ShowPartitionsSuiteBase extends command.ShowPartitionsSuiteBase { override def version: String = "V1" override def catalog: String = CatalogManager.SESSION_CATALOG_NAME - override def defaultNamespace: Seq[String] = Seq("default") override def defaultUsing: String = "USING parquet" - private def createDateTable(table: String): Unit = { - sql(s""" - |CREATE TABLE $table (price int, qty int, year int, month int) - |$defaultUsing - |partitioned by (year, month)""".stripMargin) - sql(s"INSERT INTO $table PARTITION(year = 2015, month = 1) SELECT 1, 1") - sql(s"INSERT INTO $table PARTITION(year = 2015, month = 2) SELECT 2, 2") - sql(s"INSERT INTO $table PARTITION(year = 2016, month = 2) SELECT 3, 3") - sql(s"INSERT INTO $table PARTITION(year = 2016, month = 3) SELECT 3, 3") + override protected def wrongPartitionColumnsError(columns: String*): String = { + s"Non-partitioning column(s) ${columns.mkString("[", ", ", "]")} are specified" } - test("show everything") { + test("show everything in the default database") { val table = "dateTable" withTable(table) { createDateTable(table) - checkAnswer( - sql(s"show partitions $table"), + runShowPartitionsSql( + s"show partitions default.$table", Row("year=2015/month=1") :: - Row("year=2015/month=2") :: - Row("year=2016/month=2") :: - Row("year=2016/month=3") :: Nil) - - checkAnswer( - sql(s"show partitions default.$table"), - Row("year=2015/month=1") :: - Row("year=2015/month=2") :: - Row("year=2016/month=2") :: - Row("year=2016/month=3") :: Nil) - } - } - - test("filter by partitions") { - val table = "dateTable" - withTable(table) { - createDateTable(table) - checkAnswer( - sql(s"show partitions default.$table PARTITION(year=2015)"), - Row("year=2015/month=1") :: - Row("year=2015/month=2") :: Nil) - checkAnswer( - sql(s"show partitions default.$table PARTITION(year=2015, month=1)"), - Row("year=2015/month=1") :: Nil) - checkAnswer( - sql(s"show partitions default.$table PARTITION(month=2)"), Row("year=2015/month=2") :: - Row("year=2016/month=2") :: Nil) - } - } - - test("show everything more than 5 part keys") { - val table = "wideTable" - withTable(table) { - sql(s""" - |CREATE TABLE $table ( - | price int, qty int, - | year int, month int, hour int, minute int, sec int, extra int) - |$defaultUsing - |PARTITIONED BY (year, month, hour, minute, sec, extra)""".stripMargin) - sql(s""" - |INSERT INTO $table - |PARTITION(year = 2016, month = 3, hour = 10, minute = 10, sec = 10, extra = 1) SELECT 3, 3 - """.stripMargin) - sql(s""" - |INSERT INTO $table - |PARTITION(year = 2016, month = 4, hour = 10, minute = 10, sec = 10, extra = 1) SELECT 3, 3 - """.stripMargin) - checkAnswer( - sql(s"show partitions $table"), - Row("year=2016/month=3/hour=10/minute=10/sec=10/extra=1") :: - Row("year=2016/month=4/hour=10/minute=10/sec=10/extra=1") :: Nil) - } - } - - test("non-partitioning columns") { - val table = "dateTable" - withTable(table) { - createDateTable(table) - val errMsg = intercept[AnalysisException] { - sql(s"SHOW PARTITIONS $table PARTITION(abcd=2015, xyz=1)") - }.getMessage - assert(errMsg.contains("Non-partitioning column(s) [abcd, xyz] are specified")) - } - } - - test("show partitions of non-partitioned table") { - val table = "not_partitioned_table" - withTable(table) { - sql(s"CREATE TABLE $table (col1 int) $defaultUsing") - val errMsg = intercept[AnalysisException] { - sql(s"SHOW PARTITIONS $table") - }.getMessage - assert(errMsg.contains("not allowed on a table that is not partitioned")) + Row("year=2016/month=2") :: + Row("year=2016/month=3") :: Nil) } } + // The test fails for V2 Table Catalogs with the exception: + // org.apache.spark.sql.AnalysisException: CREATE VIEW is only supported with v1 tables. test("show partitions of a view") { val table = "dateTable" withTable(table) { @@ -134,7 +56,7 @@ trait ShowPartitionsSuiteBase extends command.ShowPartitionsSuiteBase { val errMsg = intercept[AnalysisException] { sql(s"SHOW PARTITIONS $view") }.getMessage - assert(errMsg.contains("is not allowed on a view")) + assert(errMsg.contains("'SHOW PARTITIONS' expects a table")) } } } @@ -143,10 +65,10 @@ trait ShowPartitionsSuiteBase extends command.ShowPartitionsSuiteBase { val viewName = "test_view" withTempView(viewName) { spark.range(10).createTempView(viewName) - val errMsg = intercept[NoSuchTableException] { + val errMsg = intercept[AnalysisException] { sql(s"SHOW PARTITIONS $viewName") }.getMessage - assert(errMsg.contains(s"Table or view '$viewName' not found")) + assert(errMsg.contains("'SHOW PARTITIONS' expects a table")) } } } @@ -159,12 +81,12 @@ class ShowPartitionsSuite extends ShowPartitionsSuiteBase with SharedSparkSessio val viewName = "test_view" withTempView(viewName) { sql(s""" - |CREATE TEMPORARY VIEW $viewName (c1 INT, c2 STRING) - |$defaultUsing""".stripMargin) - val errMsg = intercept[NoSuchTableException] { + |CREATE TEMPORARY VIEW $viewName (c1 INT, c2 STRING) + |$defaultUsing""".stripMargin) + val errMsg = intercept[AnalysisException] { sql(s"SHOW PARTITIONS $viewName") }.getMessage - assert(errMsg.contains(s"Table or view '$viewName' not found")) + assert(errMsg.contains("'SHOW PARTITIONS' expects a table")) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/ShowPartitionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/ShowPartitionsSuite.scala index 8a63cd49e89e9..ca47a713ad604 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/ShowPartitionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/ShowPartitionsSuite.scala @@ -19,38 +19,34 @@ package org.apache.spark.sql.execution.command.v2 import org.apache.spark.SparkConf import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.connector.InMemoryTableCatalog +import org.apache.spark.sql.connector.{InMemoryPartitionTableCatalog, InMemoryTableCatalog} import org.apache.spark.sql.execution.command import org.apache.spark.sql.test.SharedSparkSession class ShowPartitionsSuite extends command.ShowPartitionsSuiteBase with SharedSparkSession { override def version: String = "V2" override def catalog: String = "test_catalog" - override def defaultNamespace: Seq[String] = Nil override def defaultUsing: String = "USING _" override def sparkConf: SparkConf = super.sparkConf - .set(s"spark.sql.catalog.$catalog", classOf[InMemoryTableCatalog].getName) + .set(s"spark.sql.catalog.$catalog", classOf[InMemoryPartitionTableCatalog].getName) + .set(s"spark.sql.catalog.non_part_$catalog", classOf[InMemoryTableCatalog].getName) - // TODO(SPARK-33452): Create a V2 SHOW PARTITIONS execution node - test("not supported SHOW PARTITIONS") { - def testV1Command(sqlCommand: String, sqlParams: String): Unit = { - val e = intercept[AnalysisException] { - sql(s"$sqlCommand $sqlParams") - } - assert(e.message.contains(s"$sqlCommand is only supported with v1 tables")) - } - val t = s"$catalog.ns1.ns2.tbl" - withTable(t) { - sql( - s""" - |CREATE TABLE $t (id bigint, data string) - |$defaultUsing - |PARTITIONED BY (id) - """.stripMargin) + override protected def wrongPartitionColumnsError(columns: String*): String = { + s"${columns.head} is not a valid partition column" + } - testV1Command("SHOW PARTITIONS", t) - testV1Command("SHOW PARTITIONS", s"$t PARTITION(id='1')") + test("a table does not support partitioning") { + val table = s"non_part_$catalog.tab1" + withTable(table) { + sql(s""" + |CREATE TABLE $table (price int, qty int, year int, month int) + |$defaultUsing""".stripMargin) + val errMsg = intercept[AnalysisException] { + sql(s"SHOW PARTITIONS $table") + }.getMessage + assert(errMsg.contains( + "SHOW PARTITIONS cannot run for a table which does not support partitioning")) } } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala index 3af163af0968c..49e26614e13c4 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala @@ -300,7 +300,7 @@ class PartitionedTablePerfStatsSuite HiveCatalogMetrics.reset() assert(spark.sql("show partitions test").count() == 100) - assert(HiveCatalogMetrics.METRIC_HIVE_CLIENT_CALLS.getCount() < 10) + assert(HiveCatalogMetrics.METRIC_HIVE_CLIENT_CALLS.getCount() <= 10) } } } @@ -323,7 +323,7 @@ class PartitionedTablePerfStatsSuite HiveCatalogMetrics.reset() assert(spark.sql("show partitions test").count() == 100) - assert(HiveCatalogMetrics.METRIC_HIVE_CLIENT_CALLS.getCount() < 10) + assert(HiveCatalogMetrics.METRIC_HIVE_CLIENT_CALLS.getCount() <= 10) } } }