Skip to content

Commit 18c6755

Browse files
aokolnychyidongjoon-hyun
authored andcommitted
rdar://72811621 Control distribution and ordering during table creation (apache#939)
### What changes were proposed in this pull request? This PR adds support for defining distribution and ordering during table creation. ### Why are the changes needed? This change is needed for feature parity with ADT in Spark 2. ### Does this PR introduce any user-facing change? This PR adds new optional table creation clauses but it should only affect Iceberg users. ### How was this patch tested? This PR comes with new tests. More tests are in Iceberg.
1 parent f740116 commit 18c6755

File tree

20 files changed

+651
-66
lines changed

20 files changed

+651
-66
lines changed

pom.xml

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -194,7 +194,7 @@
194194
<selenium.version>3.141.59</selenium.version>
195195
<htmlunit.version>2.40.0</htmlunit.version>
196196
<acispark.callhome.version>0.2.0</acispark.callhome.version>
197-
<iceberg.version>0.12.0-apple-preview</iceberg.version>
197+
<iceberg.version>0.12.0-apple-preview-2-SNAPSHOT</iceberg.version>
198198
<!--
199199
Managed up from older version from Avro; sync with jackson-module-paranamer dependency version
200200
-->
@@ -294,6 +294,10 @@
294294
<id>apple</id>
295295
<url>https://artifacts.apple.com/libs-release</url>
296296
</repository>
297+
<repository>
298+
<id>apple-snapshot</id>
299+
<url>https://artifacts.apple.com/libs-snapshot</url>
300+
</repository>
297301
</repositories>
298302
<pluginRepositories>
299303
<pluginRepository>

project/SparkBuild.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -270,6 +270,7 @@ object SparkBuild extends PomBuild {
270270
"gcs-maven-central-mirror" at "https://maven-central.storage-download.googleapis.com/maven2/",
271271
DefaultMavenRepository,
272272
"apple" at "https://artifacts.apple.com/libs-release",
273+
"apple-snapshot" at "https://artifacts.apple.com/libs-snapshot",
273274
Resolver.mavenLocal,
274275
Resolver.file("ivyLocal", file(Path.userHome.absolutePath + "/.ivy2/local"))(Resolver.ivyStylePatterns)
275276
),

sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -396,6 +396,7 @@ createTableClauses
396396
(PARTITIONED BY partitioning=partitionFieldList) |
397397
skewSpec |
398398
bucketSpec |
399+
writeSpec |
399400
rowFormat |
400401
createFileFormat |
401402
locationSpec |

sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/StagingTableCatalog.java

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,10 @@
1919

2020
import java.util.Map;
2121

22+
import com.google.common.base.Preconditions;
23+
2224
import org.apache.spark.annotation.Evolving;
25+
import org.apache.spark.sql.connector.expressions.SortOrder;
2326
import org.apache.spark.sql.connector.expressions.Transform;
2427
import org.apache.spark.sql.connector.write.LogicalWriteInfo;
2528
import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException;
@@ -78,6 +81,30 @@ StagedTable stageCreate(
7881
Transform[] partitions,
7982
Map<String, String> properties) throws TableAlreadyExistsException, NoSuchNamespaceException;
8083

84+
/**
85+
* Stage the creation of a table with a specific distribution and ordering.
86+
*/
87+
default StagedTable stageCreate(
88+
Identifier ident,
89+
StructType schema,
90+
Transform[] partitions,
91+
Map<String, String> properties,
92+
String distributionMode,
93+
SortOrder[] ordering) throws TableAlreadyExistsException, NoSuchNamespaceException {
94+
95+
Preconditions.checkArgument(
96+
distributionMode.equals("none"),
97+
"%s does not support tables with a specific distribution",
98+
this.getClass().getName());
99+
100+
Preconditions.checkArgument(
101+
ordering.length == 0,
102+
"%s does not support tables with a specific ordering",
103+
this.getClass().getName());
104+
105+
return stageCreate(ident, schema, partitions, properties);
106+
}
107+
81108
/**
82109
* Stage the replacement of a table, preparing it to be committed into the metastore when the
83110
* returned table's {@link StagedTable#commitStagedChanges()} is called.
@@ -111,6 +138,30 @@ StagedTable stageReplace(
111138
Transform[] partitions,
112139
Map<String, String> properties) throws NoSuchNamespaceException, NoSuchTableException;
113140

141+
/**
142+
* Stage the replacement of a table with a specific distribution and ordering.
143+
*/
144+
default StagedTable stageReplace(
145+
Identifier ident,
146+
StructType schema,
147+
Transform[] partitions,
148+
Map<String, String> properties,
149+
String distributionMode,
150+
SortOrder[] ordering) throws NoSuchNamespaceException, NoSuchTableException {
151+
152+
Preconditions.checkArgument(
153+
distributionMode.equals("none"),
154+
"%s does not support tables with a specific distribution",
155+
this.getClass().getName());
156+
157+
Preconditions.checkArgument(
158+
ordering.length == 0,
159+
"%s does not support tables with a specific ordering",
160+
this.getClass().getName());
161+
162+
return stageReplace(ident, schema, partitions, properties);
163+
}
164+
114165
/**
115166
* Stage the creation or replacement of a table, preparing it to be committed into the metastore
116167
* when the returned table's {@link StagedTable#commitStagedChanges()} is called.
@@ -141,4 +192,28 @@ StagedTable stageCreateOrReplace(
141192
StructType schema,
142193
Transform[] partitions,
143194
Map<String, String> properties) throws NoSuchNamespaceException;
195+
196+
/**
197+
* Stage the creation or replacement of a table with a specific distribution and ordering.
198+
*/
199+
default StagedTable stageCreateOrReplace(
200+
Identifier ident,
201+
StructType schema,
202+
Transform[] partitions,
203+
Map<String, String> properties,
204+
String distributionMode,
205+
SortOrder[] ordering) throws NoSuchNamespaceException {
206+
207+
Preconditions.checkArgument(
208+
distributionMode.equals("none"),
209+
"%s does not support tables with a specific distribution",
210+
this.getClass().getName());
211+
212+
Preconditions.checkArgument(
213+
ordering.length == 0,
214+
"%s does not support tables with a specific ordering",
215+
this.getClass().getName());
216+
217+
return stageCreateOrReplace(ident, schema, partitions, properties);
218+
}
144219
}

sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.spark.sql.connector.catalog;
1919

2020
import org.apache.spark.annotation.Evolving;
21+
import org.apache.spark.sql.connector.expressions.SortOrder;
2122
import org.apache.spark.sql.connector.expressions.Transform;
2223
import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException;
2324
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
@@ -26,6 +27,8 @@
2627

2728
import java.util.Map;
2829

30+
import com.google.common.base.Preconditions;
31+
2932
/**
3033
* Catalog methods for working with Tables.
3134
* <p>
@@ -140,6 +143,30 @@ Table createTable(
140143
Transform[] partitions,
141144
Map<String, String> properties) throws TableAlreadyExistsException, NoSuchNamespaceException;
142145

146+
/**
147+
* Create a table with a specific distribution and ordering.
148+
*/
149+
default Table createTable(
150+
Identifier ident,
151+
StructType schema,
152+
Transform[] partitions,
153+
Map<String, String> properties,
154+
String distributionMode,
155+
SortOrder[] ordering) throws TableAlreadyExistsException, NoSuchNamespaceException {
156+
157+
Preconditions.checkArgument(
158+
distributionMode.equals("none"),
159+
"%s does not support tables with a specific distribution",
160+
this.getClass().getName());
161+
162+
Preconditions.checkArgument(
163+
ordering.length == 0,
164+
"%s does not support tables with a specific ordering",
165+
this.getClass().getName());
166+
167+
return createTable(ident, schema, partitions, properties);
168+
}
169+
143170
/**
144171
* Apply a set of {@link TableChange changes} to a table in the catalog.
145172
* <p>

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,7 @@ class ResolveCatalogs(val catalogManager: CatalogManager)
155155
RenameTable(catalog.asTableCatalog, oldName.asIdentifier, newNameParts.asIdentifier)
156156

157157
case c @ CreateTableStatement(
158-
NonSessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _, _, _) =>
158+
NonSessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _, _, _, _, _) =>
159159
assertNoNullTypeInSchema(c.tableSchema)
160160
CreateV2Table(
161161
catalog.asTableCatalog,
@@ -164,10 +164,12 @@ class ResolveCatalogs(val catalogManager: CatalogManager)
164164
// convert the bucket spec and add it as a transform
165165
c.partitioning ++ c.bucketSpec.map(_.asTransform),
166166
convertTableProperties(c),
167-
ignoreIfExists = c.ifNotExists)
167+
ignoreIfExists = c.ifNotExists,
168+
c.distributionMode,
169+
c.ordering)
168170

169171
case c @ CreateTableAsSelectStatement(
170-
NonSessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _, _, _, _) =>
172+
NonSessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _, _, _, _, _, _) =>
171173
if (c.asSelect.resolved) {
172174
assertNoNullTypeInSchema(c.asSelect.schema)
173175
}
@@ -179,7 +181,9 @@ class ResolveCatalogs(val catalogManager: CatalogManager)
179181
c.asSelect,
180182
convertTableProperties(c),
181183
writeOptions = c.writeOptions,
182-
ignoreIfExists = c.ifNotExists)
184+
ignoreIfExists = c.ifNotExists,
185+
c.distributionMode,
186+
c.ordering)
183187

184188
case m @ MigrateTableStatement(NonSessionCatalogAndTable(catalog, tbl), _, _) =>
185189
if (!catalog.isInstanceOf[SupportsMigrate]) {
@@ -207,7 +211,7 @@ class ResolveCatalogs(val catalogManager: CatalogManager)
207211
)
208212

209213
case c @ ReplaceTableStatement(
210-
NonSessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _, _) =>
214+
NonSessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _, _, _, _) =>
211215
assertNoNullTypeInSchema(c.tableSchema)
212216
ReplaceTable(
213217
catalog.asTableCatalog,
@@ -216,10 +220,12 @@ class ResolveCatalogs(val catalogManager: CatalogManager)
216220
// convert the bucket spec and add it as a transform
217221
c.partitioning ++ c.bucketSpec.map(_.asTransform),
218222
convertTableProperties(c),
219-
orCreate = c.orCreate)
223+
orCreate = c.orCreate,
224+
distributionMode = c.distributionMode,
225+
ordering = c.ordering)
220226

221227
case c @ ReplaceTableAsSelectStatement(
222-
NonSessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _, _, _) =>
228+
NonSessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _, _, _, _, _) =>
223229
if (c.asSelect.resolved) {
224230
assertNoNullTypeInSchema(c.asSelect.schema)
225231
}
@@ -231,7 +237,9 @@ class ResolveCatalogs(val catalogManager: CatalogManager)
231237
c.asSelect,
232238
convertTableProperties(c),
233239
writeOptions = c.writeOptions,
234-
orCreate = c.orCreate)
240+
orCreate = c.orCreate,
241+
distributionMode = c.distributionMode,
242+
ordering = c.ordering)
235243

236244
case DropViewStatement(NonSessionCatalogAndTable(catalog, viewName), _) =>
237245
throw new AnalysisException(

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala

Lines changed: 38 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2517,6 +2517,13 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg
25172517
Seq[Transform], Seq[StructField], Option[BucketSpec], Map[String, String],
25182518
Map[String, String], Option[String], Option[String], Option[SerdeInfo])
25192519

2520+
/**
2521+
* Type to keep track of custom table clauses:
2522+
* - distribution mode
2523+
* - ordering
2524+
*/
2525+
type CustomTableClauses = (String, Seq[V2SortOrder])
2526+
25202527
/**
25212528
* Validate a create table statement and return the [[TableIdentifier]].
25222529
*/
@@ -2968,6 +2975,21 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg
29682975
}
29692976
}
29702977

2978+
private def visitCustomCreateTableClauses(ctx: CreateTableClausesContext): CustomTableClauses = {
2979+
checkDuplicateClauses(ctx.writeSpec, "DISTRIBUTED/ORDERED BY", ctx)
2980+
2981+
ctx.writeSpec.asScala.headOption match {
2982+
case Some(writeSpec) =>
2983+
val (distributionSpec, orderingSpec) = toDistributionAndOrderingSpec(writeSpec)
2984+
val distributionMode = toDistributionMode(distributionSpec, orderingSpec)
2985+
val ordering = toOrdering(orderingSpec)
2986+
(distributionMode, ordering)
2987+
2988+
case None =>
2989+
("none", Array.empty[V2SortOrder])
2990+
}
2991+
}
2992+
29712993
override def visitCreateTableClauses(ctx: CreateTableClausesContext): TableClauses = {
29722994
checkDuplicateClauses(ctx.TBLPROPERTIES, "TBLPROPERTIES", ctx)
29732995
checkDuplicateClauses(ctx.OPTIONS, "OPTIONS", ctx)
@@ -3080,6 +3102,11 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg
30803102
}
30813103

30823104
val partitioning = partitionExpressions(partTransforms, partCols, ctx)
3105+
val (distributionMode, ordering) = visitCustomCreateTableClauses(ctx.createTableClauses)
3106+
3107+
if (distributionMode == "hash" && partitioning.isEmpty) {
3108+
operationNotAllowed("DISTRIBUTED BY PARTITION is supported only for partitioned tables", ctx)
3109+
}
30833110

30843111
Option(ctx.query).map(plan) match {
30853112
case Some(_) if columns.nonEmpty =>
@@ -3096,14 +3123,16 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg
30963123
case Some(query) =>
30973124
CreateTableAsSelectStatement(
30983125
table, query, partitioning, bucketSpec, properties, provider, options, location, comment,
3099-
writeOptions = Map.empty, serdeInfo, external = external, ifNotExists = ifNotExists)
3126+
writeOptions = Map.empty, serdeInfo, external = external, ifNotExists = ifNotExists,
3127+
distributionMode, ordering)
31003128

31013129
case _ =>
31023130
// Note: table schema includes both the table columns list and the partition columns
31033131
// with data type.
31043132
val schema = StructType(columns ++ partCols)
31053133
CreateTableStatement(table, schema, partitioning, bucketSpec, properties, provider,
3106-
options, location, comment, serdeInfo, external = external, ifNotExists = ifNotExists)
3134+
options, location, comment, serdeInfo, external = external, ifNotExists = ifNotExists,
3135+
distributionMode, ordering)
31073136
}
31083137
}
31093138

@@ -3160,6 +3189,11 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg
31603189
}
31613190

31623191
val partitioning = partitionExpressions(partTransforms, partCols, ctx)
3192+
val (distributionMode, ordering) = visitCustomCreateTableClauses(ctx.createTableClauses)
3193+
3194+
if (distributionMode == "hash" && partitioning.isEmpty) {
3195+
operationNotAllowed("DISTRIBUTED BY PARTITION is supported only for partitioned tables", ctx)
3196+
}
31633197

31643198
Option(ctx.query).map(plan) match {
31653199
case Some(_) if columns.nonEmpty =>
@@ -3176,14 +3210,14 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg
31763210
case Some(query) =>
31773211
ReplaceTableAsSelectStatement(table, query, partitioning, bucketSpec, properties,
31783212
provider, options, location, comment, writeOptions = Map.empty, serdeInfo,
3179-
orCreate = orCreate)
3213+
orCreate = orCreate, distributionMode, ordering)
31803214

31813215
case _ =>
31823216
// Note: table schema includes both the table columns list and the partition columns
31833217
// with data type.
31843218
val schema = StructType(columns ++ partCols)
31853219
ReplaceTableStatement(table, schema, partitioning, bucketSpec, properties, provider,
3186-
options, location, comment, serdeInfo, orCreate = orCreate)
3220+
options, location, comment, serdeInfo, orCreate = orCreate, distributionMode, ordering)
31873221
}
31883222
}
31893223

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,9 @@ case class CreateTableStatement(
145145
comment: Option[String],
146146
serde: Option[SerdeInfo],
147147
external: Boolean,
148-
ifNotExists: Boolean) extends ParsedStatement
148+
ifNotExists: Boolean,
149+
distributionMode: String = "none",
150+
ordering: Seq[SortOrder] = Seq.empty) extends ParsedStatement
149151

150152
/**
151153
* A CREATE TABLE AS SELECT command, as parsed from SQL.
@@ -163,7 +165,9 @@ case class CreateTableAsSelectStatement(
163165
writeOptions: Map[String, String],
164166
serde: Option[SerdeInfo],
165167
external: Boolean,
166-
ifNotExists: Boolean) extends ParsedStatement {
168+
ifNotExists: Boolean,
169+
distributionMode: String = "none",
170+
ordering: Seq[SortOrder] = Seq.empty) extends ParsedStatement {
167171

168172
override def children: Seq[LogicalPlan] = Seq(asSelect)
169173
}
@@ -199,7 +203,9 @@ case class ReplaceTableStatement(
199203
location: Option[String],
200204
comment: Option[String],
201205
serde: Option[SerdeInfo],
202-
orCreate: Boolean) extends ParsedStatement
206+
orCreate: Boolean,
207+
distributionMode: String = "none",
208+
ordering: Seq[SortOrder] = Seq.empty) extends ParsedStatement
203209

204210
/**
205211
* A REPLACE TABLE AS SELECT command, as parsed from SQL.
@@ -216,7 +222,9 @@ case class ReplaceTableAsSelectStatement(
216222
comment: Option[String],
217223
writeOptions: Map[String, String],
218224
serde: Option[SerdeInfo],
219-
orCreate: Boolean) extends ParsedStatement {
225+
orCreate: Boolean,
226+
distributionMode: String = "none",
227+
ordering: Seq[SortOrder] = Seq.empty) extends ParsedStatement {
220228

221229
override def children: Seq[LogicalPlan] = Seq(asSelect)
222230
}

0 commit comments

Comments
 (0)