Skip to content

Commit c11a64d

Browse files
cloud-fancmonkey
authored andcommitted
[SPARK-19587][SQL] bucket sorting columns should not be picked from partition columns
## What changes were proposed in this pull request? We will throw an exception if bucket columns are part of partition columns, this should also apply to sort columns. This PR also move the checking logic from `DataFrameWriter` to `PreprocessTableCreation`, which is the central place for checking and normailization. ## How was this patch tested? updated test. Author: Wenchen Fan <[email protected]> Closes apache#16931 from cloud-fan/bucket.
1 parent c52bcf7 commit c11a64d

File tree

3 files changed

+25
-48
lines changed

3 files changed

+25
-48
lines changed

sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala

Lines changed: 2 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -215,7 +215,6 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
215215
df.sparkSession,
216216
className = source,
217217
partitionColumns = partitioningColumns.getOrElse(Nil),
218-
bucketSpec = getBucketSpec,
219218
options = extraOptions.toMap)
220219

221220
dataSource.write(mode, df)
@@ -270,52 +269,17 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
270269
ifNotExists = false)).toRdd
271270
}
272271

273-
private def normalizedParCols: Option[Seq[String]] = partitioningColumns.map { cols =>
274-
cols.map(normalize(_, "Partition"))
275-
}
276-
277-
private def normalizedBucketColNames: Option[Seq[String]] = bucketColumnNames.map { cols =>
278-
cols.map(normalize(_, "Bucketing"))
279-
}
280-
281-
private def normalizedSortColNames: Option[Seq[String]] = sortColumnNames.map { cols =>
282-
cols.map(normalize(_, "Sorting"))
283-
}
284-
285272
private def getBucketSpec: Option[BucketSpec] = {
286273
if (sortColumnNames.isDefined) {
287274
require(numBuckets.isDefined, "sortBy must be used together with bucketBy")
288275
}
289276

290-
for {
291-
n <- numBuckets
292-
} yield {
277+
numBuckets.map { n =>
293278
require(n > 0 && n < 100000, "Bucket number must be greater than 0 and less than 100000.")
294-
295-
// partitionBy columns cannot be used in bucketBy
296-
if (normalizedParCols.nonEmpty &&
297-
normalizedBucketColNames.get.toSet.intersect(normalizedParCols.get.toSet).nonEmpty) {
298-
throw new AnalysisException(
299-
s"bucketBy columns '${bucketColumnNames.get.mkString(", ")}' should not be part of " +
300-
s"partitionBy columns '${partitioningColumns.get.mkString(", ")}'")
301-
}
302-
303-
BucketSpec(n, normalizedBucketColNames.get, normalizedSortColNames.getOrElse(Nil))
279+
BucketSpec(n, bucketColumnNames.get, sortColumnNames.getOrElse(Nil))
304280
}
305281
}
306282

307-
/**
308-
* The given column name may not be equal to any of the existing column names if we were in
309-
* case-insensitive context. Normalize the given column name to the real one so that we don't
310-
* need to care about case sensitivity afterwards.
311-
*/
312-
private def normalize(columnName: String, columnType: String): String = {
313-
val validColumnNames = df.logicalPlan.output.map(_.name)
314-
validColumnNames.find(df.sparkSession.sessionState.analyzer.resolver(_, columnName))
315-
.getOrElse(throw new AnalysisException(s"$columnType column $columnName not found in " +
316-
s"existing columns (${validColumnNames.mkString(", ")})"))
317-
}
318-
319283
private def assertNotBucketed(operation: String): Unit = {
320284
if (numBuckets.isDefined || sortColumnNames.isDefined) {
321285
throw new AnalysisException(s"'$operation' does not support bucketing right now")

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -226,9 +226,21 @@ case class PreprocessTableCreation(sparkSession: SparkSession) extends Rule[Logi
226226
}
227227
checkDuplication(columnNames, "table definition of " + table.identifier)
228228

229-
table.copy(
230-
partitionColumnNames = normalizePartitionColumns(schema, table),
231-
bucketSpec = normalizeBucketSpec(schema, table))
229+
val normalizedPartCols = normalizePartitionColumns(schema, table)
230+
val normalizedBucketSpec = normalizeBucketSpec(schema, table)
231+
232+
normalizedBucketSpec.foreach { spec =>
233+
for (bucketCol <- spec.bucketColumnNames if normalizedPartCols.contains(bucketCol)) {
234+
throw new AnalysisException(s"bucketing column '$bucketCol' should not be part of " +
235+
s"partition columns '${normalizedPartCols.mkString(", ")}'")
236+
}
237+
for (sortCol <- spec.sortColumnNames if normalizedPartCols.contains(sortCol)) {
238+
throw new AnalysisException(s"bucket sorting column '$sortCol' should not be part of " +
239+
s"partition columns '${normalizedPartCols.mkString(", ")}'")
240+
}
241+
}
242+
243+
table.copy(partitionColumnNames = normalizedPartCols, bucketSpec = normalizedBucketSpec)
232244
}
233245

234246
private def normalizePartitionColumns(schema: StructType, table: CatalogTable): Seq[String] = {

sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -169,19 +169,20 @@ class BucketedWriteSuite extends QueryTest with SQLTestUtils with TestHiveSingle
169169
}
170170
}
171171

172-
test("write bucketed data with the overlapping bucketBy and partitionBy columns") {
173-
intercept[AnalysisException](df.write
172+
test("write bucketed data with the overlapping bucketBy/sortBy and partitionBy columns") {
173+
val e1 = intercept[AnalysisException](df.write
174174
.partitionBy("i", "j")
175175
.bucketBy(8, "j", "k")
176176
.sortBy("k")
177177
.saveAsTable("bucketed_table"))
178-
}
178+
assert(e1.message.contains("bucketing column 'j' should not be part of partition columns"))
179179

180-
test("write bucketed data with the identical bucketBy and partitionBy columns") {
181-
intercept[AnalysisException](df.write
182-
.partitionBy("i")
183-
.bucketBy(8, "i")
180+
val e2 = intercept[AnalysisException](df.write
181+
.partitionBy("i", "j")
182+
.bucketBy(8, "k")
183+
.sortBy("i")
184184
.saveAsTable("bucketed_table"))
185+
assert(e2.message.contains("bucket sorting column 'i' should not be part of partition columns"))
185186
}
186187

187188
test("write bucketed data without partitionBy") {

0 commit comments

Comments
 (0)