Skip to content

Commit b13a8e2

Browse files
committed
Implement v2 CreateTableAsSelect.
1 parent 09422f5 commit b13a8e2

File tree

14 files changed

+487
-32
lines changed

14 files changed

+487
-32
lines changed

sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -238,7 +238,7 @@ unsupportedHiveNativeCommands
238238
;
239239

240240
createTableHeader
241-
: CREATE TEMPORARY? EXTERNAL? TABLE (IF NOT EXISTS)? tableIdentifier
241+
: CREATE TEMPORARY? EXTERNAL? TABLE (IF NOT EXISTS)? multipartIdentifier
242242
;
243243

244244
bucketSpec

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@ import org.apache.spark.sql.types._
3333
*/
3434
trait CheckAnalysis extends PredicateHelper {
3535

36+
import org.apache.spark.sql.catalog.v2.CatalogV2Implicits._
37+
3638
/**
3739
* Override to provide additional checks for correct analysis.
3840
* These rules will be evaluated after our built-in check rules.
@@ -296,6 +298,21 @@ trait CheckAnalysis extends PredicateHelper {
296298
}
297299
}
298300

301+
case CreateTableAsSelect(_, _, partitioning, query, _, _, _) =>
302+
val references = partitioning.flatMap(_.references).toSet
303+
val badReferences = references.map(_.fieldNames).flatMap { column =>
304+
query.schema.findNestedField(column).map(_.dataType) match {
305+
case Some(_) =>
306+
None
307+
case _ =>
308+
Some(s"${column.quoted} is missing or is in a map or array")
309+
}
310+
}
311+
312+
if (badReferences.nonEmpty) {
313+
failAnalysis(s"Invalid partitioning: ${badReferences.mkString(", ")}")
314+
}
315+
299316
case _ => // Fallbacks to the following checks
300317
}
301318

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2019,7 +2019,7 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
20192019
/**
20202020
* Type to keep track of a table header: (identifier, isTemporary, ifNotExists, isExternal).
20212021
*/
2022-
type TableHeader = (TableIdentifier, Boolean, Boolean, Boolean)
2022+
type TableHeader = (Seq[String], Boolean, Boolean, Boolean)
20232023

20242024
/**
20252025
* Validate a create table statement and return the [[TableIdentifier]].
@@ -2031,7 +2031,8 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
20312031
if (temporary && ifNotExists) {
20322032
operationNotAllowed("CREATE TEMPORARY TABLE ... IF NOT EXISTS", ctx)
20332033
}
2034-
(visitTableIdentifier(ctx.tableIdentifier), temporary, ifNotExists, ctx.EXTERNAL != null)
2034+
val multipartIdentifier: Seq[String] = ctx.multipartIdentifier.parts.asScala.map(_.getText)
2035+
(multipartIdentifier, temporary, ifNotExists, ctx.EXTERNAL != null)
20352036
}
20362037

20372038
/**

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package org.apache.spark.sql.catalyst.plans.logical
1919

20+
import org.apache.spark.sql.catalog.v2.{Identifier, TableCatalog}
21+
import org.apache.spark.sql.catalog.v2.expressions.Transform
2022
import org.apache.spark.sql.catalyst.AliasIdentifier
2123
import org.apache.spark.sql.catalyst.analysis.{MultiInstanceRelation, NamedRelation}
2224
import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable}
@@ -402,6 +404,35 @@ trait V2WriteCommand extends Command {
402404
}
403405
}
404406

407+
/**
408+
* Create a new table from a select query with a v2 catalog.
409+
*/
410+
case class CreateTableAsSelect(
411+
catalog: TableCatalog,
412+
tableName: Identifier,
413+
partitioning: Seq[Transform],
414+
query: LogicalPlan,
415+
properties: Map[String, String],
416+
writeOptions: Map[String, String],
417+
ignoreIfExists: Boolean) extends Command {
418+
419+
override def children: Seq[LogicalPlan] = Seq(query)
420+
421+
override lazy val resolved: Boolean = {
422+
// the table schema is created from the query schema, so the only resolution needed is to check
423+
// that the columns referenced by the table's partitioning exist in the query schema
424+
val references = partitioning.flatMap(_.references).toSet
425+
references.map(_.fieldNames).forall { column =>
426+
query.schema.findNestedField(column).map(_.dataType) match {
427+
case Some(_) =>
428+
true
429+
case _ =>
430+
false
431+
}
432+
}
433+
}
434+
}
435+
405436
/**
406437
* Append data to an existing table.
407438
*/

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/sql/CreateTableStatement.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
package org.apache.spark.sql.catalyst.plans.logical.sql
1919

2020
import org.apache.spark.sql.catalog.v2.expressions.Transform
21-
import org.apache.spark.sql.catalyst.TableIdentifier
2221
import org.apache.spark.sql.catalyst.catalog.BucketSpec
2322
import org.apache.spark.sql.catalyst.expressions.Attribute
2423
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
@@ -30,7 +29,7 @@ import org.apache.spark.sql.types.StructType
3029
* This is a metadata-only command and is not used to write data to the created table.
3130
*/
3231
case class CreateTableStatement(
33-
table: TableIdentifier,
32+
tableName: Seq[String],
3433
tableSchema: StructType,
3534
partitioning: Seq[Transform],
3635
bucketSpec: Option[BucketSpec],
@@ -50,7 +49,7 @@ case class CreateTableStatement(
5049
* A CREATE TABLE AS SELECT command, as parsed from SQL.
5150
*/
5251
case class CreateTableAsSelectStatement(
53-
table: TableIdentifier,
52+
tableName: Seq[String],
5453
asSelect: LogicalPlan,
5554
partitioning: Seq[Transform],
5655
bucketSpec: Option[BucketSpec],

sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -307,6 +307,29 @@ case class StructType(fields: Array[StructField]) extends DataType with Seq[Stru
307307
nameToIndex.get(name)
308308
}
309309

310+
/**
311+
* Returns a field in this struct and its child structs.
312+
*
313+
* This does not support finding fields nested in maps or arrays.
314+
*/
315+
private[sql] def findNestedField(fieldNames: Seq[String]): Option[StructField] = {
316+
fieldNames.headOption.flatMap(nameToField.get) match {
317+
case Some(field) =>
318+
if (fieldNames.tail.isEmpty) {
319+
Some(field)
320+
} else {
321+
field.dataType match {
322+
case struct: StructType =>
323+
struct.findNestedField(fieldNames.tail)
324+
case _ =>
325+
None
326+
}
327+
}
328+
case _ =>
329+
None
330+
}
331+
}
332+
310333
protected[sql] def toAttributes: Seq[AttributeReference] =
311334
map(f => AttributeReference(f.name, f.dataType, f.nullable, f.metadata)())
312335

Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,153 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.catalyst.analysis
19+
20+
import org.apache.spark.sql.catalog.v2.{Identifier, TableCatalog, TestTableCatalog}
21+
import org.apache.spark.sql.catalog.v2.expressions.LogicalExpressions
22+
import org.apache.spark.sql.catalyst.expressions.AttributeReference
23+
import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelect, LeafNode}
24+
import org.apache.spark.sql.types.{DoubleType, LongType, StringType, StructType}
25+
import org.apache.spark.sql.util.CaseInsensitiveStringMap
26+
27+
class CreateTablePartitioningValidationSuite extends AnalysisTest {
28+
import CreateTablePartitioningValidationSuite._
29+
30+
test("CreateTableAsSelect: fail missing top-level column") {
31+
val plan = CreateTableAsSelect(
32+
catalog,
33+
Identifier.of(Array(), "table_name"),
34+
LogicalExpressions.bucket(4, "does_not_exist") :: Nil,
35+
TestRelation2,
36+
Map.empty,
37+
Map.empty,
38+
ignoreIfExists = false)
39+
40+
assert(!plan.resolved)
41+
assertAnalysisError(plan, Seq(
42+
"Invalid partitioning",
43+
"does_not_exist is missing or is in a map or array"))
44+
}
45+
46+
test("CreateTableAsSelect: fail missing top-level column nested reference") {
47+
val plan = CreateTableAsSelect(
48+
catalog,
49+
Identifier.of(Array(), "table_name"),
50+
LogicalExpressions.bucket(4, "does_not_exist.z") :: Nil,
51+
TestRelation2,
52+
Map.empty,
53+
Map.empty,
54+
ignoreIfExists = false)
55+
56+
assert(!plan.resolved)
57+
assertAnalysisError(plan, Seq(
58+
"Invalid partitioning",
59+
"does_not_exist.z is missing or is in a map or array"))
60+
}
61+
62+
test("CreateTableAsSelect: fail missing nested column") {
63+
val plan = CreateTableAsSelect(
64+
catalog,
65+
Identifier.of(Array(), "table_name"),
66+
LogicalExpressions.bucket(4, "point.z") :: Nil,
67+
TestRelation2,
68+
Map.empty,
69+
Map.empty,
70+
ignoreIfExists = false)
71+
72+
assert(!plan.resolved)
73+
assertAnalysisError(plan, Seq(
74+
"Invalid partitioning",
75+
"point.z is missing or is in a map or array"))
76+
}
77+
78+
test("CreateTableAsSelect: fail with multiple errors") {
79+
val plan = CreateTableAsSelect(
80+
catalog,
81+
Identifier.of(Array(), "table_name"),
82+
LogicalExpressions.bucket(4, "does_not_exist", "point.z") :: Nil,
83+
TestRelation2,
84+
Map.empty,
85+
Map.empty,
86+
ignoreIfExists = false)
87+
88+
assert(!plan.resolved)
89+
assertAnalysisError(plan, Seq(
90+
"Invalid partitioning",
91+
"point.z is missing or is in a map or array",
92+
"does_not_exist is missing or is in a map or array"))
93+
}
94+
95+
test("CreateTableAsSelect: success with top-level column") {
96+
val plan = CreateTableAsSelect(
97+
catalog,
98+
Identifier.of(Array(), "table_name"),
99+
LogicalExpressions.bucket(4, "id") :: Nil,
100+
TestRelation2,
101+
Map.empty,
102+
Map.empty,
103+
ignoreIfExists = false)
104+
105+
assertAnalysisSuccess(plan)
106+
}
107+
108+
test("CreateTableAsSelect: success using nested column") {
109+
val plan = CreateTableAsSelect(
110+
catalog,
111+
Identifier.of(Array(), "table_name"),
112+
LogicalExpressions.bucket(4, "point.x") :: Nil,
113+
TestRelation2,
114+
Map.empty,
115+
Map.empty,
116+
ignoreIfExists = false)
117+
118+
assertAnalysisSuccess(plan)
119+
}
120+
121+
test("CreateTableAsSelect: success using complex column") {
122+
val plan = CreateTableAsSelect(
123+
catalog,
124+
Identifier.of(Array(), "table_name"),
125+
LogicalExpressions.bucket(4, "point") :: Nil,
126+
TestRelation2,
127+
Map.empty,
128+
Map.empty,
129+
ignoreIfExists = false)
130+
131+
assertAnalysisSuccess(plan)
132+
}
133+
}
134+
135+
private object CreateTablePartitioningValidationSuite {
136+
val catalog: TableCatalog = {
137+
val cat = new TestTableCatalog()
138+
cat.initialize("test", CaseInsensitiveStringMap.empty())
139+
cat
140+
}
141+
142+
val schema: StructType = new StructType()
143+
.add("id", LongType)
144+
.add("data", StringType)
145+
.add("point", new StructType().add("x", DoubleType).add("y", DoubleType))
146+
}
147+
148+
private case object TestRelation2 extends LeafNode with NamedRelation {
149+
override def name: String = "source_relation"
150+
override def output: Seq[AttributeReference] =
151+
CreateTablePartitioningValidationSuite.schema.toAttributes
152+
}
153+

0 commit comments

Comments
 (0)