Skip to content

Commit 8ca5942

Browse files
cloud-fanrdblue
authored andcommitted
address review comments (#8)
1 parent e88dabb commit 8ca5942

File tree

7 files changed

+273
-306
lines changed

7 files changed

+273
-306
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala

Lines changed: 63 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -2457,7 +2457,18 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg
24572457

24582458
/**
24592459
* Type to keep track of table clauses:
2460-
* (partTransforms, partCols, bucketSpec, properties, options, location, comment, serde).
2460+
* - partition transforms
2461+
* - partition columns
2462+
* - bucketSpec
2463+
* - properties
2464+
* - options
2465+
* - location
2466+
* - comment
2467+
* - serde
2468+
*
2469+
* Note: Partition transforms are based on existing table schema definition. It can be simple
2470+
* column names, or functions like `year(date_col)`. Partition columns are column names with data
2471+
* types like `i INT`, which should be appended to the existing table schema.
24612472
*/
24622473
type TableClauses = (
24632474
Seq[Transform], Seq[StructField], Option[BucketSpec], Map[String, String],
@@ -2784,7 +2795,7 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg
27842795
(ctx.fileFormat, ctx.storageHandler) match {
27852796
// Expected format: INPUTFORMAT input_format OUTPUTFORMAT output_format
27862797
case (c: TableFileFormatContext, null) =>
2787-
SerdeInfo(formatClasses = Some((string(c.inFmt), string(c.outFmt))))
2798+
SerdeInfo(formatClasses = Some(FormatClasses(string(c.inFmt), string(c.outFmt))))
27882799
// Expected format: SEQUENCEFILE | TEXTFILE | RCFILE | ORC | PARQUET | AVRO
27892800
case (c: GenericFileFormatContext, null) =>
27902801
SerdeInfo(storedAs = Some(c.identifier.getText))
@@ -2813,8 +2824,7 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg
28132824
* [NULL DEFINED AS char]
28142825
* }}}
28152826
*/
2816-
def visitRowFormat(
2817-
ctx: RowFormatContext): SerdeInfo = withOrigin(ctx) {
2827+
def visitRowFormat(ctx: RowFormatContext): SerdeInfo = withOrigin(ctx) {
28182828
ctx match {
28192829
case serde: RowFormatSerdeContext => visitRowFormatSerde(serde)
28202830
case delimited: RowFormatDelimitedContext => visitRowFormatDelimited(delimited)
@@ -2934,16 +2944,19 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg
29342944
val location = visitLocationSpecList(ctx.locationSpec())
29352945
val (cleanedOptions, newLocation) = cleanTableOptions(ctx, options, location)
29362946
val comment = visitCommentSpecList(ctx.commentSpec())
2937-
2938-
validateRowFormatFileFormat(
2939-
ctx.rowFormat.asScala.toSeq, ctx.createFileFormat.asScala.toSeq, ctx)
2940-
val fileFormatSerdeInfo = ctx.createFileFormat.asScala.map(visitCreateFileFormat)
2941-
val rowFormatSerdeInfo = ctx.rowFormat.asScala.map(visitRowFormat)
2942-
val serdeInfo =
2943-
(fileFormatSerdeInfo ++ rowFormatSerdeInfo).reduceLeftOption((x, y) => x.merge(y))
2944-
2947+
val serdeInfo = getSerdeInfo(ctx.rowFormat.asScala, ctx.createFileFormat.asScala, ctx)
29452948
(partTransforms, partCols, bucketSpec, cleanedProperties, cleanedOptions, newLocation, comment,
2946-
serdeInfo)
2949+
serdeInfo)
2950+
}
2951+
2952+
protected def getSerdeInfo(
2953+
rowFormatCtx: Seq[RowFormatContext],
2954+
createFileFormatCtx: Seq[CreateFileFormatContext],
2955+
ctx: ParserRuleContext): Option[SerdeInfo] = {
2956+
validateRowFormatFileFormat(rowFormatCtx, createFileFormatCtx, ctx)
2957+
val rowFormatSerdeInfo = rowFormatCtx.map(visitRowFormat)
2958+
val fileFormatSerdeInfo = createFileFormatCtx.map(visitCreateFileFormat)
2959+
(fileFormatSerdeInfo ++ rowFormatSerdeInfo).reduceLeftOption((l, r) => l.merge(r))
29472960
}
29482961

29492962
private def partitionExpressions(
@@ -2954,8 +2967,8 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg
29542967
if (partCols.nonEmpty) {
29552968
val references = partTransforms.map(_.describe()).mkString(", ")
29562969
val columns = partCols
2957-
.map(field => s"${field.name} ${field.dataType.simpleString}")
2958-
.mkString(", ")
2970+
.map(field => s"${field.name} ${field.dataType.simpleString}")
2971+
.mkString(", ")
29592972
operationNotAllowed(
29602973
s"""PARTITION BY: Cannot mix partition expressions and partition columns:
29612974
|Expressions: $references
@@ -2977,12 +2990,12 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg
29772990
* Expected format:
29782991
* {{{
29792992
* CREATE [TEMPORARY] TABLE [IF NOT EXISTS] [db_name.]table_name
2980-
* USING table_provider
2993+
* [USING table_provider]
29812994
* create_table_clauses
29822995
* [[AS] select_statement];
29832996
*
29842997
* create_table_clauses (order insensitive):
2985-
* partition_clauses
2998+
* [PARTITIONED BY (partition_fields)]
29862999
* [OPTIONS table_property_list]
29873000
* [ROW FORMAT row_format]
29883001
* [STORED AS file_format]
@@ -2993,15 +3006,16 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg
29933006
* [LOCATION path]
29943007
* [COMMENT table_comment]
29953008
* [TBLPROPERTIES (property_name=property_value, ...)]
2996-
* partition_clauses:
2997-
* [PARTITIONED BY (col_name, transform(col_name), transform(constant, col_name), ...)] |
2998-
* [PARTITIONED BY (col2[:] data_type [COMMENT col_comment], ...)]
3009+
*
3010+
* partition_fields:
3011+
* col_name, transform(col_name), transform(constant, col_name), ... |
3012+
* col_name data_type [NOT NULL] [COMMENT col_comment], ...
29993013
* }}}
30003014
*/
30013015
override def visitCreateTable(ctx: CreateTableContext): LogicalPlan = withOrigin(ctx) {
30023016
val (table, temp, ifNotExists, external) = visitCreateTableHeader(ctx.createTableHeader)
30033017

3004-
val columns = Option(ctx.colTypeList()).map(visitColTypeList)
3018+
val columns = Option(ctx.colTypeList()).map(visitColTypeList).getOrElse(Nil)
30053019
val provider = Option(ctx.tableProvider).map(_.multipartIdentifier.getText)
30063020
val (partTransforms, partCols, bucketSpec, properties, options, location, comment, serdeInfo) =
30073021
visitCreateTableClauses(ctx.createTableClauses())
@@ -3010,18 +3024,16 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg
30103024
operationNotAllowed(s"CREATE TABLE ... USING ... ${serdeInfo.get.describe}", ctx)
30113025
}
30123026

3013-
val schema = columns
3014-
.map(dataCols => StructType(dataCols ++ partCols))
3015-
.getOrElse(StructType(partCols))
3027+
if (temp) {
3028+
val asSelect = if (ctx.query == null) "" else " AS ..."
3029+
operationNotAllowed(
3030+
s"CREATE TEMPORARY TABLE ...$asSelect, use CREATE TEMPORARY VIEW instead", ctx)
3031+
}
3032+
30163033
val partitioning = partitionExpressions(partTransforms, partCols, ctx)
30173034

30183035
Option(ctx.query).map(plan) match {
3019-
case Some(_) if temp =>
3020-
operationNotAllowed(
3021-
"CREATE TEMPORARY TABLE ... AS ..., use CREATE TEMPORARY VIEW instead",
3022-
ctx)
3023-
3024-
case Some(_) if columns.isDefined =>
3036+
case Some(_) if columns.nonEmpty =>
30253037
operationNotAllowed(
30263038
"Schema may not be specified in a Create Table As Select (CTAS) statement",
30273039
ctx)
@@ -3037,10 +3049,10 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg
30373049
table, query, partitioning, bucketSpec, properties, provider, options, location, comment,
30383050
writeOptions = Map.empty, serdeInfo, external = external, ifNotExists = ifNotExists)
30393051

3040-
case None if temp =>
3041-
operationNotAllowed("CREATE TEMPORARY TABLE", ctx)
3042-
30433052
case _ =>
3053+
// Note: table schema includes both the table columns list and the partition columns
3054+
// with data type.
3055+
val schema = StructType(columns ++ partCols)
30443056
CreateTableStatement(table, schema, partitioning, bucketSpec, properties, provider,
30453057
options, location, comment, serdeInfo, external = external, ifNotExists = ifNotExists)
30463058
}
@@ -3052,28 +3064,33 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg
30523064
* Expected format:
30533065
* {{{
30543066
* [CREATE OR] REPLACE TABLE [db_name.]table_name
3055-
* USING table_provider
3067+
* [USING table_provider]
30563068
* replace_table_clauses
30573069
* [[AS] select_statement];
30583070
*
30593071
* replace_table_clauses (order insensitive):
30603072
* [OPTIONS table_property_list]
3061-
* [PARTITIONED BY (col_name, transform(col_name), transform(constant, col_name), ...)]
3073+
* [PARTITIONED BY (partition_fields)]
30623074
* [CLUSTERED BY (col_name, col_name, ...)
30633075
* [SORTED BY (col_name [ASC|DESC], ...)]
30643076
* INTO num_buckets BUCKETS
30653077
* ]
30663078
* [LOCATION path]
30673079
* [COMMENT table_comment]
30683080
* [TBLPROPERTIES (property_name=property_value, ...)]
3081+
*
3082+
* partition_fields:
3083+
* col_name, transform(col_name), transform(constant, col_name), ... |
3084+
* col_name data_type [NOT NULL] [COMMENT col_comment], ...
30693085
* }}}
30703086
*/
30713087
override def visitReplaceTable(ctx: ReplaceTableContext): LogicalPlan = withOrigin(ctx) {
30723088
val (table, temp, ifNotExists, external) = visitReplaceTableHeader(ctx.replaceTableHeader)
3089+
val orCreate = ctx.replaceTableHeader().CREATE() != null
3090+
30733091
if (temp) {
3074-
operationNotAllowed(
3075-
"CREATE OR REPLACE TEMPORARY TABLE ..., use CREATE TEMPORARY VIEW instead",
3076-
ctx)
3092+
val action = if (orCreate) "CREATE OR REPLACE" else "REPLACE"
3093+
operationNotAllowed(s"$action TEMPORARY TABLE ..., use $action TEMPORARY VIEW instead.", ctx)
30773094
}
30783095

30793096
if (external) {
@@ -3086,24 +3103,23 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg
30863103

30873104
val (partTransforms, partCols, bucketSpec, properties, options, location, comment, serdeInfo) =
30883105
visitCreateTableClauses(ctx.createTableClauses())
3089-
val columns = Option(ctx.colTypeList()).map(visitColTypeList)
3106+
val columns = Option(ctx.colTypeList()).map(visitColTypeList).getOrElse(Nil)
30903107
val provider = Option(ctx.tableProvider).map(_.multipartIdentifier.getText)
30913108

30923109
if (provider.isDefined && serdeInfo.isDefined) {
30933110
operationNotAllowed(s"CREATE TABLE ... USING ... ${serdeInfo.get.describe}", ctx)
30943111
}
30953112

3096-
val schema = columns.map(dataCols => StructType(dataCols ++ partCols))
30973113
val partitioning = partitionExpressions(partTransforms, partCols, ctx)
3098-
val orCreate = ctx.replaceTableHeader().CREATE() != null
30993114

31003115
Option(ctx.query).map(plan) match {
3101-
case Some(_) if schema.isDefined =>
3116+
case Some(_) if columns.nonEmpty =>
31023117
operationNotAllowed(
31033118
"Schema may not be specified in a Replace Table As Select (RTAS) statement",
31043119
ctx)
31053120

31063121
case Some(_) if partCols.nonEmpty =>
3122+
// non-reference partition columns are not allowed because schema can't be specified
31073123
operationNotAllowed(
31083124
"Partition column types may not be specified in Replace Table As Select (RTAS)",
31093125
ctx)
@@ -3114,9 +3130,11 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg
31143130
orCreate = orCreate)
31153131

31163132
case _ =>
3117-
ReplaceTableStatement(table, schema.getOrElse(new StructType), partitioning,
3118-
bucketSpec, properties, provider, options, location, comment, serdeInfo,
3119-
orCreate = orCreate)
3133+
// Note: table schema includes both the table columns list and the partition columns
3134+
// with data type.
3135+
val schema = StructType(columns ++ partCols)
3136+
ReplaceTableStatement(table, schema, partitioning, bucketSpec, properties, provider,
3137+
options, location, comment, serdeInfo, orCreate = orCreate)
31203138
}
31213139
}
31223140

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala

Lines changed: 28 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.spark.sql.catalyst.plans.logical
1919

20+
import org.apache.spark.sql.AnalysisException
2021
import org.apache.spark.sql.catalyst.analysis.ViewType
2122
import org.apache.spark.sql.catalyst.catalog.{BucketSpec, FunctionResource}
2223
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
@@ -58,25 +59,25 @@ abstract class ParsedStatement extends LogicalPlan {
5859
*/
5960
case class SerdeInfo(
6061
storedAs: Option[String] = None,
61-
formatClasses: Option[(String, String)] = None,
62+
formatClasses: Option[FormatClasses] = None,
6263
serde: Option[String] = None,
6364
serdeProperties: Map[String, String] = Map.empty) {
6465
// this uses assertions because validation is done in validateRowFormatFileFormat etc.
6566
assert(storedAs.isEmpty || formatClasses.isEmpty,
66-
s"Conflicting STORED AS $storedAs and INPUTFORMAT/OUTPUTFORMAT $formatClasses values")
67+
"Cannot specify both STORED AS and INPUTFORMAT/OUTPUTFORMAT")
6768

6869
def describe: String = {
6970
val serdeString = if (serde.isDefined || serdeProperties.nonEmpty) {
70-
"ROW FORMAT" + serde.map(sd => s" SERDE $sd").getOrElse(" DELIMITED")
71+
"ROW FORMAT " + serde.map(sd => s"SERDE $sd").getOrElse("DELIMITED")
7172
} else {
7273
""
7374
}
7475

7576
this match {
76-
case SerdeInfo(Some(format), _, _, _) =>
77-
s"STORED AS $format $serdeString"
78-
case SerdeInfo(_, Some((inFormat, outFormat)), _, _) =>
79-
s"INPUTFORMAT $inFormat OUTPUTFORMAT $outFormat $serdeString"
77+
case SerdeInfo(Some(storedAs), _, _, _) =>
78+
s"STORED AS $storedAs $serdeString"
79+
case SerdeInfo(_, Some(formatClasses), _, _) =>
80+
s"STORED AS $formatClasses $serdeString"
8081
case _ =>
8182
serdeString
8283
}
@@ -85,7 +86,7 @@ case class SerdeInfo(
8586
def merge(other: SerdeInfo): SerdeInfo = {
8687
def getOnly[T](desc: String, left: Option[T], right: Option[T]): Option[T] = {
8788
(left, right) match {
88-
case (Some(l), Some(r)) if l != r =>
89+
case (Some(l), Some(r)) =>
8990
assert(l == r, s"Conflicting $desc values: $l != $r")
9091
left
9192
case (Some(_), _) =>
@@ -97,6 +98,7 @@ case class SerdeInfo(
9798
}
9899
}
99100

101+
SerdeInfo.checkSerdePropMerging(serdeProperties, other.serdeProperties)
100102
SerdeInfo(
101103
getOnly("STORED AS", storedAs, other.storedAs),
102104
getOnly("INPUTFORMAT/OUTPUTFORMAT", formatClasses, other.formatClasses),
@@ -105,9 +107,25 @@ case class SerdeInfo(
105107
}
106108
}
107109

110+
case class FormatClasses(input: String, output: String) {
111+
override def toString: String = s"INPUTFORMAT $input OUTPUTFORMAT $output"
112+
}
113+
108114
object SerdeInfo {
109-
val empty: SerdeInfo = {
110-
SerdeInfo(None, None, None, Map.empty)
115+
val empty: SerdeInfo = SerdeInfo(None, None, None, Map.empty)
116+
117+
def checkSerdePropMerging(
118+
props1: Map[String, String], props2: Map[String, String]): Unit = {
119+
val conflictKeys = props1.keySet.intersect(props2.keySet)
120+
if (conflictKeys.nonEmpty) {
121+
throw new UnsupportedOperationException(
122+
s"""
123+
|Cannot safely merge SERDEPROPERTIES:
124+
|${props1.map { case (k, v) => s"$k=$v" }.mkString("{", ",", "}")}
125+
|${props2.map { case (k, v) => s"$k=$v" }.mkString("{", ",", "}")}
126+
|The conflict keys: ${conflictKeys.mkString(", ")}
127+
|""".stripMargin)
128+
}
111129
}
112130
}
113131

sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -325,26 +325,30 @@ private[sql] object CatalogV2Util {
325325
options ++ // to make the transition to the "option." prefix easier, add both
326326
options.map { case (key, value) => TableCatalog.OPTION_PREFIX + key -> value } ++
327327
convertToProperties(serdeInfo) ++
328-
(if (external) Map(TableCatalog.PROP_EXTERNAL -> "true") else Map.empty) ++
328+
(if (external) Some(TableCatalog.PROP_EXTERNAL -> "true") else None) ++
329329
provider.map(TableCatalog.PROP_PROVIDER -> _) ++
330330
comment.map(TableCatalog.PROP_COMMENT -> _) ++
331331
location.map(TableCatalog.PROP_LOCATION -> _)
332332
}
333333

334+
/**
335+
* Converts Hive Serde info to table properties. The mapped property keys are:
336+
* - INPUTFORMAT/OUTPUTFORMAT: hive.input/output-format
337+
* - STORED AS: hive.stored-as
338+
* - ROW FORMAT SERDE: hive.serde
339+
* - SERDEPROPERTIES: add "option." prefix
340+
*/
334341
private def convertToProperties(serdeInfo: Option[SerdeInfo]): Map[String, String] = {
335342
serdeInfo match {
336343
case Some(s) =>
337-
((s.formatClasses match {
338-
case Some((inputFormat, outputFormat)) =>
339-
Map("hive.input-format" -> inputFormat, "hive.output-format" -> outputFormat)
340-
case _ =>
341-
Map.empty
342-
}) ++
344+
s.formatClasses.map { f =>
345+
Map("hive.input-format" -> f.input, "hive.output-format" -> f.output)
346+
}.getOrElse(Map.empty) ++
343347
s.storedAs.map("hive.stored-as" -> _) ++
344348
s.serde.map("hive.serde" -> _) ++
345349
s.serdeProperties.map {
346350
case (key, value) => TableCatalog.OPTION_PREFIX + key -> value
347-
}).toMap
351+
}
348352
case None =>
349353
Map.empty
350354
}

0 commit comments

Comments
 (0)