Skip to content

Commit d3200cf

Browse files
committed
simplification
1 parent b4985d9 commit d3200cf

File tree

9 files changed

+212
-299
lines changed

9 files changed

+212
-299
lines changed

sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -129,8 +129,8 @@ final class DataFrameWriter private[sql](df: DataFrame) {
129129
}
130130

131131
/**
132-
* Buckets the output by the given columns on the file system. If specified, the output is
133-
* laid out on the file system similar to Hive's bucketing scheme.
132+
* Buckets the output by the given columns. If specified, the output is laid out on the file
133+
* system similar to Hive's bucketing scheme.
134134
*
135135
* This is applicable for Parquet, JSON and ORC.
136136
*
@@ -144,7 +144,7 @@ final class DataFrameWriter private[sql](df: DataFrame) {
144144
}
145145

146146
/**
147-
* Sorts the bucketed output by the given columns.
147+
* Sorts the output in each bucket by the given columns.
148148
*
149149
* This is applicable for Parquet, JSON and ORC.
150150
*
@@ -239,7 +239,7 @@ final class DataFrameWriter private[sql](df: DataFrame) {
239239
for {
240240
n <- numBuckets
241241
} yield {
242-
require(n > 0, "Bucket number must be greater than 0.")
242+
require(n > 0 && n < 100000, "Bucket number must be greater than 0 and less than 100000.")
243243
BucketSpec(n, normalizedBucketColNames.get, normalizedSortColNames.getOrElse(Nil))
244244
}
245245
}

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala

Lines changed: 10 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -125,39 +125,22 @@ private[sql] case class InsertIntoHadoopFsRelation(
125125
|Actual: ${partitionColumns.mkString(", ")}
126126
""".stripMargin)
127127

128-
val bucketSpec = relation match {
129-
case relation: BucketedHadoopFsRelation => relation.bucketSpec
130-
case _ => None
131-
}
132-
133-
val writerContainer = if (partitionColumns.isEmpty && bucketSpec.isEmpty) {
128+
val writerContainer = if (partitionColumns.isEmpty && relation.bucketSpec.isEmpty) {
134129
new DefaultWriterContainer(relation, job, isAppend)
135130
} else {
136131
val output = df.queryExecution.executedPlan.output
137132
val (partitionOutput, dataOutput) =
138133
output.partition(a => partitionColumns.contains(a.name))
139134

140-
if (bucketSpec.isEmpty) {
141-
new DynamicPartitionWriterContainer(
142-
relation,
143-
job,
144-
partitionOutput,
145-
dataOutput,
146-
output,
147-
PartitioningUtils.DEFAULT_PARTITION_NAME,
148-
sqlContext.conf.getConf(SQLConf.PARTITION_MAX_FILES),
149-
isAppend)
150-
} else {
151-
new BucketedPartitionWriterContainer(
152-
relation.asInstanceOf[BucketedHadoopFsRelation],
153-
job,
154-
partitionOutput,
155-
bucketSpec.get,
156-
dataOutput,
157-
output,
158-
PartitioningUtils.DEFAULT_PARTITION_NAME,
159-
isAppend)
160-
}
135+
new DynamicPartitionWriterContainer(
136+
relation,
137+
job,
138+
partitionOutput,
139+
dataOutput,
140+
output,
141+
PartitioningUtils.DEFAULT_PARTITION_NAME,
142+
sqlContext.conf.getConf(SQLConf.PARTITION_MAX_FILES),
143+
isAppend)
161144
}
162145

163146
// This call shouldn't be put into the `try` block below because it only initializes and

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala

Lines changed: 7 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -240,21 +240,13 @@ object ResolvedDataSource extends Logging {
240240
val equality = columnNameEquality(caseSensitive)
241241
val dataSchema = StructType(
242242
data.schema.filterNot(f => partitionColumns.exists(equality(_, f.name))))
243-
val r = dataSource match {
244-
case provider: BucketedHadoopFsRelationProvider => provider.createRelation(
245-
sqlContext,
246-
Array(outputPath.toString),
247-
Option(dataSchema.asNullable),
248-
Option(partitionColumnsSchema(data.schema, partitionColumns, caseSensitive)),
249-
bucketSpec,
250-
caseInsensitiveOptions)
251-
case provider: HadoopFsRelationProvider => provider.createRelation(
252-
sqlContext,
253-
Array(outputPath.toString),
254-
Option(dataSchema.asNullable),
255-
Option(partitionColumnsSchema(data.schema, partitionColumns, caseSensitive)),
256-
caseInsensitiveOptions)
257-
}
243+
val r = dataSource.createRelation(
244+
sqlContext,
245+
Array(outputPath.toString),
246+
Some(dataSchema.asNullable),
247+
Some(partitionColumnsSchema(data.schema, partitionColumns, caseSensitive)),
248+
bucketSpec,
249+
caseInsensitiveOptions)
258250

259251
// For partitioned relation r, r.schema's column ordering can be different from the column
260252
// ordering of data.logicalPlan (partition columns are all moved after data column). This

0 commit comments

Comments
 (0)