Skip to content

Commit c91dbde

Browse files
panbingkuncloud-fan
authored andcommitted
[SPARK-33393][SQL] Support SHOW TABLE EXTENDED in v2
### What changes were proposed in this pull request? The pr aim to implement v2 SHOW TABLE EXTENDED as `ShowTableExec` ### Why are the changes needed? To have feature parity with the datasource V1. ### Does this PR introduce _any_ user-facing change? Yes, Support SHOW TABLE EXTENDED in v2. ### How was this patch tested? Add new UT. By running the unified tests for v2 implementation: ``` $ build/sbt -Phive-2.3 -Phive-thriftserver "test:testOnly *ShowTablesSuite" $ build/sbt "test:testOnly *ShowTablesSuite" ``` Closes #37588 from panbingkun/v2_SHOW_TABLE_EXTENDED. Authored-by: panbingkun <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent bd14136 commit c91dbde

File tree

18 files changed

+815
-125
lines changed

18 files changed

+815
-125
lines changed

common/utils/src/main/resources/error/error-classes.json

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4776,11 +4776,6 @@
47764776
"Invalid bound function '<bound>: there are <argsLen> arguments but <inputTypesLen> parameters returned from 'inputTypes()'."
47774777
]
47784778
},
4779-
"_LEGACY_ERROR_TEMP_1200" : {
4780-
"message" : [
4781-
"<name> is not supported for v2 tables."
4782-
]
4783-
},
47844779
"_LEGACY_ERROR_TEMP_1201" : {
47854780
"message" : [
47864781
"Cannot resolve column name \"<colName>\" among (<fieldNames>)."

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -271,10 +271,6 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog with QueryErrorsB
271271
case _ =>
272272
}
273273

274-
// `ShowTableExtended` should have been converted to the v1 command if the table is v1.
275-
case _: ShowTableExtended =>
276-
throw QueryCompilationErrors.commandUnsupportedInV2TableError("SHOW TABLE EXTENDED")
277-
278274
case operator: LogicalPlan =>
279275
operator transformExpressionsDown {
280276
// Check argument data types of higher-order functions downwards first.

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ class ResolveCatalogs(val catalogManager: CatalogManager)
5050
case s @ ShowTables(UnresolvedNamespace(Seq()), _, _) =>
5151
s.copy(namespace = ResolvedNamespace(currentCatalog,
5252
catalogManager.currentNamespace.toImmutableArraySeq))
53-
case s @ ShowTableExtended(UnresolvedNamespace(Seq()), _, _, _) =>
53+
case s @ ShowTablesExtended(UnresolvedNamespace(Seq()), _, _) =>
5454
s.copy(namespace = ResolvedNamespace(currentCatalog,
5555
catalogManager.currentNamespace.toImmutableArraySeq))
5656
case s @ ShowViews(UnresolvedNamespace(Seq()), _, _) =>

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1090,6 +1090,25 @@ class SessionCatalog(
10901090
dbViews ++ listLocalTempViews(pattern)
10911091
}
10921092

1093+
/**
1094+
* List all matching temp views in the specified database, including global/local temporary views.
1095+
*/
1096+
def listTempViews(db: String, pattern: String): Seq[CatalogTable] = {
1097+
val globalTempViews = if (format(db) == globalTempViewManager.database) {
1098+
globalTempViewManager.listViewNames(pattern).flatMap { viewName =>
1099+
globalTempViewManager.get(viewName).map(_.tableMeta)
1100+
}
1101+
} else {
1102+
Seq.empty
1103+
}
1104+
1105+
val localTempViews = listLocalTempViews(pattern).flatMap { viewIndent =>
1106+
tempViews.get(viewIndent.table).map(_.tableMeta)
1107+
}
1108+
1109+
globalTempViews ++ localTempViews
1110+
}
1111+
10931112
/**
10941113
* List all matching local temporary views.
10951114
*/

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala

Lines changed: 21 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4086,19 +4086,31 @@ class AstBuilder extends DataTypeAstBuilder with SQLConfHelper with Logging {
40864086
}
40874087

40884088
/**
4089-
* Create a [[ShowTableExtended]] command.
4089+
* Create a [[ShowTablesExtended]] or [[ShowTablePartition]] command.
40904090
*/
40914091
override def visitShowTableExtended(
40924092
ctx: ShowTableExtendedContext): LogicalPlan = withOrigin(ctx) {
4093-
val partitionKeys = Option(ctx.partitionSpec).map { specCtx =>
4094-
UnresolvedPartitionSpec(visitNonOptionalPartitionSpec(specCtx), None)
4095-
}
4096-
val ns = if (ctx.identifierReference() != null) {
4097-
withIdentClause(ctx.identifierReference, UnresolvedNamespace(_))
4098-
} else {
4099-
UnresolvedNamespace(Seq.empty[String])
4093+
Option(ctx.partitionSpec).map { spec =>
4094+
val table = withOrigin(ctx.pattern) {
4095+
if (ctx.identifierReference() != null) {
4096+
withIdentClause(ctx.identifierReference(), ns => {
4097+
val names = ns :+ string(visitStringLit(ctx.pattern))
4098+
UnresolvedTable(names, "SHOW TABLE EXTENDED ... PARTITION ...")
4099+
})
4100+
} else {
4101+
val names = Seq.empty[String] :+ string(visitStringLit(ctx.pattern))
4102+
UnresolvedTable(names, "SHOW TABLE EXTENDED ... PARTITION ...")
4103+
}
4104+
}
4105+
ShowTablePartition(table, UnresolvedPartitionSpec(visitNonOptionalPartitionSpec(spec)))
4106+
}.getOrElse {
4107+
val ns = if (ctx.identifierReference() != null) {
4108+
withIdentClause(ctx.identifierReference, UnresolvedNamespace)
4109+
} else {
4110+
UnresolvedNamespace(Seq.empty[String])
4111+
}
4112+
ShowTablesExtended(ns, string(visitStringLit(ctx.pattern)))
41004113
}
4101-
ShowTableExtended(ns, string(visitStringLit(ctx.pattern)), partitionKeys)
41024114
}
41034115

41044116
/**

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -886,26 +886,37 @@ object ShowTables {
886886
}
887887

888888
/**
889-
* The logical plan of the SHOW TABLE EXTENDED command.
889+
* The logical plan of the SHOW TABLE EXTENDED (without PARTITION) command.
890890
*/
891-
case class ShowTableExtended(
891+
case class ShowTablesExtended(
892892
namespace: LogicalPlan,
893893
pattern: String,
894-
partitionSpec: Option[PartitionSpec],
895-
override val output: Seq[Attribute] = ShowTableExtended.getOutputAttrs) extends UnaryCommand {
894+
override val output: Seq[Attribute] = ShowTablesUtils.getOutputAttrs) extends UnaryCommand {
896895
override def child: LogicalPlan = namespace
897-
override protected def withNewChildInternal(newChild: LogicalPlan): ShowTableExtended =
896+
override protected def withNewChildInternal(newChild: LogicalPlan): ShowTablesExtended =
898897
copy(namespace = newChild)
899898
}
900899

901-
object ShowTableExtended {
900+
object ShowTablesUtils {
902901
def getOutputAttrs: Seq[Attribute] = Seq(
903902
AttributeReference("namespace", StringType, nullable = false)(),
904903
AttributeReference("tableName", StringType, nullable = false)(),
905904
AttributeReference("isTemporary", BooleanType, nullable = false)(),
906905
AttributeReference("information", StringType, nullable = false)())
907906
}
908907

908+
/**
909+
* The logical plan of the SHOW TABLE EXTENDED ... PARTITION ... command.
910+
*/
911+
case class ShowTablePartition(
912+
table: LogicalPlan,
913+
partitionSpec: PartitionSpec,
914+
override val output: Seq[Attribute] = ShowTablesUtils.getOutputAttrs)
915+
extends V2PartitionCommand {
916+
override protected def withNewChildInternal(newChild: LogicalPlan): ShowTablePartition =
917+
copy(table = newChild)
918+
}
919+
909920
/**
910921
* The logical plan of the SHOW VIEWS command.
911922
*

sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import org.apache.hadoop.fs.Path
2323

2424
import org.apache.spark.{SPARK_DOC_ROOT, SparkException, SparkThrowable, SparkThrowableHelper, SparkUnsupportedOperationException}
2525
import org.apache.spark.sql.AnalysisException
26-
import org.apache.spark.sql.catalyst.{ExtendedAnalysisException, FunctionIdentifier, QualifiedTableName, TableIdentifier}
26+
import org.apache.spark.sql.catalyst.{ExtendedAnalysisException, FunctionIdentifier, InternalRow, QualifiedTableName, TableIdentifier}
2727
import org.apache.spark.sql.catalyst.analysis.{CannotReplaceMissingTableException, FunctionAlreadyExistsException, NamespaceAlreadyExistsException, NoSuchFunctionException, NoSuchNamespaceException, NoSuchPartitionException, NoSuchTableException, ResolvedTable, Star, TableAlreadyExistsException, UnresolvedRegex}
2828
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, InvalidUDFClassException}
2929
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
@@ -2145,12 +2145,6 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase with Compilat
21452145
"inputTypesLen" -> bound.inputTypes().length.toString))
21462146
}
21472147

2148-
def commandUnsupportedInV2TableError(name: String): Throwable = {
2149-
new AnalysisException(
2150-
errorClass = "_LEGACY_ERROR_TEMP_1200",
2151-
messageParameters = Map("name" -> name))
2152-
}
2153-
21542148
def cannotResolveColumnNameAmongAttributesError(
21552149
colName: String, fieldNames: String): Throwable = {
21562150
new AnalysisException(
@@ -2477,7 +2471,7 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase with Compilat
24772471
errorClass = "_LEGACY_ERROR_TEMP_1231",
24782472
messageParameters = Map(
24792473
"key" -> key,
2480-
"tblName" -> tblName))
2474+
"tblName" -> toSQLId(tblName)))
24812475
}
24822476

24832477
def invalidPartitionSpecError(
@@ -2489,7 +2483,7 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase with Compilat
24892483
messageParameters = Map(
24902484
"specKeys" -> specKeys,
24912485
"partitionColumnNames" -> partitionColumnNames.mkString(", "),
2492-
"tableName" -> tableName))
2486+
"tableName" -> toSQLId(tableName)))
24932487
}
24942488

24952489
def columnAlreadyExistsError(columnName: String): Throwable = {
@@ -2547,6 +2541,13 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase with Compilat
25472541
new NoSuchPartitionException(db, table, partition)
25482542
}
25492543

2544+
def notExistPartitionError(
2545+
table: Identifier,
2546+
partitionIdent: InternalRow,
2547+
partitionSchema: StructType): Throwable = {
2548+
new NoSuchPartitionException(table.toString, partitionIdent, partitionSchema)
2549+
}
2550+
25502551
def analyzingColumnStatisticsNotSupportedForColumnTypeError(
25512552
name: String,
25522553
dataType: DataType): Throwable = {

sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Implicits.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,11 @@ object DataSourceV2Implicits {
6565
}
6666
}
6767

68+
def supportsPartitions: Boolean = table match {
69+
case _: SupportsPartitionManagement => true
70+
case _ => false
71+
}
72+
6873
def asPartitionable: SupportsPartitionManagement = {
6974
table match {
7075
case support: SupportsPartitionManagement =>

sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -241,19 +241,31 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
241241
case ShowTables(DatabaseInSessionCatalog(db), pattern, output) if conf.useV1Command =>
242242
ShowTablesCommand(Some(db), pattern, output)
243243

244-
case ShowTableExtended(
244+
case ShowTablesExtended(
245245
DatabaseInSessionCatalog(db),
246246
pattern,
247-
partitionSpec @ (None | Some(UnresolvedPartitionSpec(_, _))),
248247
output) =>
249248
val newOutput = if (conf.getConf(SQLConf.LEGACY_KEEP_COMMAND_OUTPUT_SCHEMA)) {
250-
assert(output.length == 4)
251249
output.head.withName("database") +: output.tail
252250
} else {
253251
output
254252
}
255-
val tablePartitionSpec = partitionSpec.map(_.asInstanceOf[UnresolvedPartitionSpec].spec)
256-
ShowTablesCommand(Some(db), Some(pattern), newOutput, true, tablePartitionSpec)
253+
ShowTablesCommand(Some(db), Some(pattern), newOutput, isExtended = true)
254+
255+
case ShowTablePartition(
256+
ResolvedTable(catalog, _, table: V1Table, _),
257+
partitionSpec,
258+
output) if isSessionCatalog(catalog) =>
259+
val newOutput = if (conf.getConf(SQLConf.LEGACY_KEEP_COMMAND_OUTPUT_SCHEMA)) {
260+
output.head.withName("database") +: output.tail
261+
} else {
262+
output
263+
}
264+
val tablePartitionSpec = Option(partitionSpec).map(
265+
_.asInstanceOf[UnresolvedPartitionSpec].spec)
266+
ShowTablesCommand(table.catalogTable.identifier.database,
267+
Some(table.catalogTable.identifier.table), newOutput,
268+
isExtended = true, tablePartitionSpec)
257269

258270
// ANALYZE TABLE works on permanent views if the views are cached.
259271
case AnalyzeTable(ResolvedV1TableOrViewIdentifier(ident), partitionSpec, noScan) =>

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -406,6 +406,16 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
406406
case ShowTables(ResolvedNamespace(catalog, ns), pattern, output) =>
407407
ShowTablesExec(output, catalog.asTableCatalog, ns, pattern) :: Nil
408408

409+
case ShowTablesExtended(
410+
ResolvedNamespace(catalog, ns),
411+
pattern,
412+
output) =>
413+
ShowTablesExtendedExec(output, catalog.asTableCatalog, ns, pattern) :: Nil
414+
415+
case ShowTablePartition(r: ResolvedTable, part, output) =>
416+
ShowTablePartitionExec(output, r.catalog, r.identifier,
417+
r.table.asPartitionable, Seq(part).asResolvedPartitionSpecs.head) :: Nil
418+
409419
case SetCatalogAndNamespace(ResolvedNamespace(catalog, ns)) =>
410420
val catalogManager = session.sessionState.catalogManager
411421
val namespace = if (ns.nonEmpty) Some(ns) else None

0 commit comments

Comments
 (0)