Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
65daf5d
Add ShowPartitionsExec
MaxGekk Nov 17, 2020
ef84a73
Add logical node ShowPartitions
MaxGekk Nov 17, 2020
6934fa1
ShowPartitionsStatement -> ShowPartitions
MaxGekk Nov 17, 2020
c62b30b
Resolve ShowPartitions -> ShowPartitionsExec
MaxGekk Nov 17, 2020
d1190c7
Remove `verify(s"SHOW PARTITIONS $t")`
MaxGekk Nov 17, 2020
e086c32
Pass `UnresolvedTableOrView` to `ShowPartitions`
MaxGekk Nov 17, 2020
7e06381
Throw an exception for non-partitioned table
MaxGekk Nov 17, 2020
9cbbbe1
Fix ShowPartitionsParserSuite
MaxGekk Nov 17, 2020
04c9317
Ignore test in PartitionedTablePerfStatsSuite
MaxGekk Nov 17, 2020
ed915c9
Check partition spec
MaxGekk Nov 17, 2020
ade84ba
Remove a comment
MaxGekk Nov 17, 2020
3a457f9
pass Table to ShowPartitionsExec
MaxGekk Nov 18, 2020
8700ed5
Change type of partition spec in ShowPartitions
MaxGekk Nov 18, 2020
dfbad25
Revert a test in v1/ShowPartitionsSuite
MaxGekk Nov 18, 2020
d9e4bd2
Draft implementation of ShowPartitionsExec
MaxGekk Nov 18, 2020
761966b
Handle None as partitionSpec
MaxGekk Nov 19, 2020
db45677
Move view check to CheckAnalysis
MaxGekk Nov 19, 2020
5b15dc3
Check that a table supports partitioning
MaxGekk Nov 19, 2020
f6b6814
Remove wrong check from ShowPartitionsExec
MaxGekk Nov 19, 2020
8c59e8a
Add v2 test for non-partitioning columns
MaxGekk Nov 19, 2020
b62ed58
Move the test "non-partitioning columns" to the common trait
MaxGekk Nov 19, 2020
6d14c56
Move a positive test to the common trait
MaxGekk Nov 19, 2020
b800229
Use runShowPartitionsSql()
MaxGekk Nov 19, 2020
ac0d2f1
Move checking by all partitioned columns to the common trait
MaxGekk Nov 19, 2020
18cdaf6
Move the test "show everything more than 5 part keys" to the common t…
MaxGekk Nov 19, 2020
35af49f
Merge remote-tracking branch 'origin/master' into show-partitions-exe…
MaxGekk Nov 19, 2020
a08177e
Remove unused imports
MaxGekk Nov 19, 2020
797da17
Remove an unused import
MaxGekk Nov 19, 2020
c1faacb
Fix indentations
MaxGekk Nov 19, 2020
0d8738c
Fix PartitionedTablePerfStatsSuite
MaxGekk Nov 19, 2020
c34995f
Pass time zone id to Cast
MaxGekk Nov 20, 2020
c6ea9d9
Merge remote-tracking branch 'origin/master' into show-partitions-exe…
MaxGekk Nov 21, 2020
feffb90
Move table creation to the common trait
MaxGekk Nov 21, 2020
bf5154a
Refactoring: remove unused `defaultNamespace`
MaxGekk Nov 21, 2020
69cfa0b
Merge remote-tracking branch 'remotes/origin/master' into show-partit…
MaxGekk Nov 25, 2020
0a1f972
fix error msg
MaxGekk Nov 25, 2020
a4acf40
fix merge conflicts
MaxGekk Nov 25, 2020
9db9923
List by partition names
MaxGekk Nov 25, 2020
1cd1e34
Fix build
MaxGekk Nov 25, 2020
555ef3f
Merge remote-tracking branch 'origin/master' into show-partitions-exe…
MaxGekk Nov 28, 2020
0fe4a61
Fix merge
MaxGekk Nov 28, 2020
d3c344a
UnresolvedTableOrView -> UnresolvedTable
MaxGekk Nov 28, 2020
37d7801
Merge remote-tracking branch 'origin/master' into show-partitions-exe…
MaxGekk Nov 29, 2020
fbc5820
Address Wenchen's review comment
MaxGekk Nov 30, 2020
d16aec7
Merge remote-tracking branch 'remotes/origin/master' into show-partit…
MaxGekk Nov 30, 2020
dc211e6
Remove unused param `tableName` from convertToPartIdent()
MaxGekk Nov 30, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1085,7 +1085,7 @@ class Analyzer(override val catalogManager: CatalogManager)
lookupTableOrView(identifier).map {
case v: ResolvedView =>
val viewStr = if (v.isTemp) "temp view" else "view"
u.failAnalysis(s"${v.identifier.quoted} is a $viewStr. '$cmd' expects a table.'")
u.failAnalysis(s"${v.identifier.quoted} is a $viewStr. '$cmd' expects a table.")
case table => table
}.getOrElse(u)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -577,6 +577,8 @@ trait CheckAnalysis extends PredicateHelper {
case AlterTableDropPartition(ResolvedTable(_, _, table), parts, _, _, _) =>
checkAlterTablePartition(table, parts)

case showPartitions: ShowPartitions => checkShowPartitions(showPartitions)

case _ => // Fallbacks to the following checks
}

Expand Down Expand Up @@ -1009,4 +1011,16 @@ trait CheckAnalysis extends PredicateHelper {
case _ =>
}
}

// Make sure that the `SHOW PARTITIONS` command is allowed for the table
private def checkShowPartitions(showPartitions: ShowPartitions): Unit = showPartitions match {
case ShowPartitions(rt: ResolvedTable, _)
if !rt.table.isInstanceOf[SupportsPartitionManagement] =>
failAnalysis(s"SHOW PARTITIONS cannot run for a table which does not support partitioning")
case ShowPartitions(ResolvedTable(_, _, partTable: SupportsPartitionManagement), _)
if partTable.partitionSchema().isEmpty =>
failAnalysis(
s"SHOW PARTITIONS is not allowed on a table that is not partitioned: ${partTable.name()}")
case _ =>
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.analysis
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.{Cast, Literal}
import org.apache.spark.sql.catalyst.plans.logical.{AlterTableAddPartition, AlterTableDropPartition, LogicalPlan}
import org.apache.spark.sql.catalyst.plans.logical.{AlterTableAddPartition, AlterTableDropPartition, LogicalPlan, ShowPartitions}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.util.CharVarcharUtils
import org.apache.spark.sql.connector.catalog.SupportsPartitionManagement
Expand All @@ -40,6 +40,12 @@ object ResolvePartitionSpec extends Rule[LogicalPlan] {
case r @ AlterTableDropPartition(
ResolvedTable(_, _, table: SupportsPartitionManagement), partSpecs, _, _, _) =>
r.copy(parts = resolvePartitionSpecs(table.name, partSpecs, table.partitionSchema()))

case r @ ShowPartitions(ResolvedTable(_, _, table: SupportsPartitionManagement), partSpecs) =>
r.copy(pattern = resolvePartitionSpecs(
table.name,
partSpecs.toSeq,
table.partitionSchema()).headOption)
}

private def resolvePartitionSpecs(
Expand All @@ -48,25 +54,26 @@ object ResolvePartitionSpec extends Rule[LogicalPlan] {
partSchema: StructType): Seq[ResolvedPartitionSpec] =
partSpecs.map {
case unresolvedPartSpec: UnresolvedPartitionSpec =>
val normalizedSpec = normalizePartitionSpec(
unresolvedPartSpec.spec,
partSchema.map(_.name),
tableName,
conf.resolver)
val partitionNames = normalizedSpec.keySet
val requestedFields = partSchema.filter(field => partitionNames.contains(field.name))
ResolvedPartitionSpec(
convertToPartIdent(tableName, unresolvedPartSpec.spec, partSchema),
requestedFields.map(_.name),
convertToPartIdent(normalizedSpec, requestedFields),
unresolvedPartSpec.location)
case resolvedPartitionSpec: ResolvedPartitionSpec =>
resolvedPartitionSpec
}

private def convertToPartIdent(
tableName: String,
partitionSpec: TablePartitionSpec,
partSchema: StructType): InternalRow = {
val normalizedSpec = normalizePartitionSpec(
partitionSpec,
partSchema.map(_.name),
tableName,
conf.resolver)

val partValues = partSchema.map { part =>
val raw = normalizedSpec.get(part.name).orNull
schema: Seq[StructField]): InternalRow = {
val partValues = schema.map { part =>
val raw = partitionSpec.get(part.name).orNull
val dt = CharVarcharUtils.replaceCharVarcharWithString(part.dataType)
Cast(Literal.create(raw, StringType), dt, Some(conf.sessionLocalTimeZone)).eval()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,8 @@ case class ResolvedTable(catalog: TableCatalog, identifier: Identifier, table: T
}

case class ResolvedPartitionSpec(
spec: InternalRow,
names: Seq[String],
ident: InternalRow,
location: Option[String] = None) extends PartitionSpec

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3611,9 +3611,12 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg
* }}}
*/
override def visitShowPartitions(ctx: ShowPartitionsContext): LogicalPlan = withOrigin(ctx) {
val table = visitMultipartIdentifier(ctx.multipartIdentifier)
val partitionKeys = Option(ctx.partitionSpec).map(visitNonOptionalPartitionSpec)
ShowPartitionsStatement(table, partitionKeys)
val partitionKeys = Option(ctx.partitionSpec).map { specCtx =>
UnresolvedPartitionSpec(visitNonOptionalPartitionSpec(specCtx), None)
}
ShowPartitions(
UnresolvedTable(visitMultipartIdentifier(ctx.multipartIdentifier()), "SHOW PARTITIONS"),
partitionKeys)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -419,13 +419,6 @@ case class TruncateTableStatement(
tableName: Seq[String],
partitionSpec: Option[TablePartitionSpec]) extends ParsedStatement

/**
* A SHOW PARTITIONS statement, as parsed from SQL
*/
case class ShowPartitionsStatement(
tableName: Seq[String],
partitionSpec: Option[TablePartitionSpec]) extends ParsedStatement

/**
* A SHOW CURRENT NAMESPACE statement, as parsed from SQL
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -691,3 +691,18 @@ case class TruncateTable(
override def children: Seq[LogicalPlan] = child :: Nil
}


/**
* The logical plan of the SHOW PARTITIONS command.
*/
case class ShowPartitions(
child: LogicalPlan,
pattern: Option[PartitionSpec]) extends Command {
override def children: Seq[LogicalPlan] = child :: Nil

override lazy val resolved: Boolean =
childrenResolved && pattern.forall(_.isInstanceOf[ResolvedPartitionSpec])

override val output: Seq[Attribute] = Seq(
AttributeReference("partition", StringType, nullable = false)())
}
Original file line number Diff line number Diff line change
Expand Up @@ -430,11 +430,12 @@ class ResolveSessionCatalog(
ident.asTableIdentifier,
partitionSpec)

case ShowPartitionsStatement(tbl, partitionSpec) =>
val v1TableName = parseV1Table(tbl, "SHOW PARTITIONS")
case ShowPartitions(
ResolvedV1TableOrViewIdentifier(ident),
pattern @ (None | Some(UnresolvedPartitionSpec(_, _)))) =>
ShowPartitionsCommand(
v1TableName.asTableIdentifier,
partitionSpec)
ident.asTableIdentifier,
pattern.map(_.asInstanceOf[UnresolvedPartitionSpec].spec))

case ShowColumns(ResolvedV1TableOrViewIdentifier(ident), ns) =>
val v1TableName = ident.asTableIdentifier
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,20 +37,20 @@ case class AlterTableAddPartitionExec(

override protected def run(): Seq[InternalRow] = {
val (existsParts, notExistsParts) =
partSpecs.partition(p => table.partitionExists(p.spec))
partSpecs.partition(p => table.partitionExists(p.ident))

if (existsParts.nonEmpty && !ignoreIfExists) {
throw new PartitionsAlreadyExistException(
table.name(), existsParts.map(_.spec), table.partitionSchema())
table.name(), existsParts.map(_.ident), table.partitionSchema())
}

notExistsParts match {
case Seq() => // Nothing will be done
case Seq(partitionSpec) =>
val partProp = partitionSpec.location.map(loc => "location" -> loc).toMap
table.createPartition(partitionSpec.spec, partProp.asJava)
table.createPartition(partitionSpec.ident, partProp.asJava)
case _ if table.isInstanceOf[SupportsAtomicPartitionManagement] =>
val partIdents = notExistsParts.map(_.spec)
val partIdents = notExistsParts.map(_.ident)
val partProps = notExistsParts.map(_.location.map(loc => "location" -> loc).toMap)
table.asAtomicPartitionable
.createPartitions(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ case class AlterTableDropPartitionExec(

override protected def run(): Seq[InternalRow] = {
val (existsPartIdents, notExistsPartIdents) =
partSpecs.map(_.spec).partition(table.partitionExists)
partSpecs.map(_.ident).partition(table.partitionExists)

if (notExistsPartIdents.nonEmpty && !ignoreIfNotExists) {
throw new NoSuchPartitionsException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.datasources.v2
import scala.collection.JavaConverters._

import org.apache.spark.sql.{AnalysisException, SparkSession, Strategy}
import org.apache.spark.sql.catalyst.analysis.{ResolvedNamespace, ResolvedTable}
import org.apache.spark.sql.catalyst.analysis.{ResolvedNamespace, ResolvedPartitionSpec, ResolvedTable}
import org.apache.spark.sql.catalyst.expressions.{And, Expression, NamedExpression, PredicateHelper, SubqueryExpression}
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical._
Expand Down Expand Up @@ -318,6 +318,15 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
case ShowColumns(_: ResolvedTable, _) =>
throw new AnalysisException("SHOW COLUMNS is not supported for v2 tables.")

case r @ ShowPartitions(
ResolvedTable(catalog, _, table: SupportsPartitionManagement),
pattern @ (None | Some(_: ResolvedPartitionSpec))) =>
ShowPartitionsExec(
r.output,
catalog,
table,
pattern.map(_.asInstanceOf[ResolvedPartitionSpec])) :: Nil

case _ => Nil
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.datasources.v2

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.ResolvedPartitionSpec
import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils.escapePathName
import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Literal}
import org.apache.spark.sql.connector.catalog.{SupportsPartitionManagement, TableCatalog}
import org.apache.spark.sql.execution.LeafExecNode
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.StringType
import org.apache.spark.unsafe.types.UTF8String

/**
* Physical plan node for showing partitions.
*/
case class ShowPartitionsExec(
output: Seq[Attribute],
catalog: TableCatalog,
table: SupportsPartitionManagement,
partitionSpec: Option[ResolvedPartitionSpec]) extends V2CommandExec with LeafExecNode {
override protected def run(): Seq[InternalRow] = {
val (names, ident) = partitionSpec
.map(spec => (spec.names, spec.ident))
// listPartitionByNames() should return all partitions if the partition spec
// does not specify any partition names.
.getOrElse((Seq.empty[String], InternalRow.empty))
val partitionIdentifiers = table.listPartitionByNames(names.toArray, ident)
// Converting partition identifiers as `InternalRow` of partition values,
// for instance InternalRow(value0, value1, ..., valueN), to `InternalRow`s
// with a string in the format: "col0=value0/col1=value1/.../colN=valueN".
val schema = table.partitionSchema()
val len = schema.length
val partitions = new Array[String](len)
val timeZoneId = SQLConf.get.sessionLocalTimeZone
partitionIdentifiers.map { row =>
var i = 0
while (i < len) {
val dataType = schema(i).dataType
val partValue = row.get(i, dataType)
val partValueStr = Cast(Literal(partValue, dataType), StringType, Some(timeZoneId))
.eval().toString
partitions(i) = escapePathName(schema(i).name) + "=" + escapePathName(partValueStr)
i += 1
}
InternalRow(UTF8String.fromString(partitions.mkString("/")))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2284,7 +2284,6 @@ class DataSourceV2SQLSuite
verify(s"CACHE TABLE $t")
verify(s"UNCACHE TABLE $t")
verify(s"TRUNCATE TABLE $t")
verify(s"SHOW PARTITIONS $t")
verify(s"SHOW COLUMNS FROM $t")
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,25 +17,30 @@

package org.apache.spark.sql.execution.command

import org.apache.spark.sql.catalyst.analysis.AnalysisTest
import org.apache.spark.sql.catalyst.analysis.{AnalysisTest, UnresolvedPartitionSpec, UnresolvedTable}
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser.parsePlan
import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.catalyst.plans.logical.ShowPartitionsStatement
import org.apache.spark.sql.catalyst.plans.logical.ShowPartitions
import org.apache.spark.sql.execution.SparkSqlParser
import org.apache.spark.sql.test.SharedSparkSession

class ShowPartitionsParserSuite extends AnalysisTest with SharedSparkSession {
test("SHOW PARTITIONS") {
val commandName = "SHOW PARTITIONS"
Seq(
"SHOW PARTITIONS t1" -> ShowPartitionsStatement(Seq("t1"), None),
"SHOW PARTITIONS db1.t1" -> ShowPartitionsStatement(Seq("db1", "t1"), None),
"SHOW PARTITIONS t1" -> ShowPartitions(UnresolvedTable(Seq("t1"), commandName), None),
"SHOW PARTITIONS db1.t1" -> ShowPartitions(
UnresolvedTable(Seq("db1", "t1"), commandName), None),
"SHOW PARTITIONS t1 PARTITION(partcol1='partvalue', partcol2='partvalue')" ->
ShowPartitionsStatement(
Seq("t1"),
Some(Map("partcol1" -> "partvalue", "partcol2" -> "partvalue"))),
"SHOW PARTITIONS a.b.c" -> ShowPartitionsStatement(Seq("a", "b", "c"), None),
ShowPartitions(
UnresolvedTable(Seq("t1"), commandName),
Some(UnresolvedPartitionSpec(Map("partcol1" -> "partvalue", "partcol2" -> "partvalue")))),
"SHOW PARTITIONS a.b.c" -> ShowPartitions(
UnresolvedTable(Seq("a", "b", "c"), commandName), None),
"SHOW PARTITIONS a.b.c PARTITION(ds='2017-06-10')" ->
ShowPartitionsStatement(Seq("a", "b", "c"), Some(Map("ds" -> "2017-06-10")))
ShowPartitions(
UnresolvedTable(Seq("a", "b", "c"), commandName),
Some(UnresolvedPartitionSpec(Map("ds" -> "2017-06-10"))))
).foreach { case (sql, expected) =>
val parsed = parsePlan(sql)
comparePlans(parsed, expected)
Expand Down
Loading