File tree Expand file tree Collapse file tree 4 files changed +20
-12
lines changed
catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions
core/src/main/scala/org/apache/spark/sql
hive/src/test/scala/org/apache/spark/sql/sources Expand file tree Collapse file tree 4 files changed +20
-12
lines changed Original file line number Diff line number Diff line change @@ -23,8 +23,6 @@ import java.util.zip.CRC32
2323import org .apache .commons .codec .digest .DigestUtils
2424
2525import org .apache .spark .sql .catalyst .expressions .codegen ._
26- import org .apache .spark .sql .catalyst .InternalRow
27- import org .apache .spark .sql .catalyst .util .{MapData , ArrayData }
2826import org .apache .spark .sql .types ._
2927import org .apache .spark .unsafe .types .UTF8String
3028
Original file line number Diff line number Diff line change @@ -119,8 +119,6 @@ final class DataFrameWriter private[sql](df: DataFrame) {
119119 * Partitions the output by the given columns on the file system. If specified, the output is
120120 * laid out on the file system similar to Hive's partitioning scheme.
121121 *
122- * This is only applicable for Parquet at the moment.
123- *
124122 * @since 1.4.0
125123 */
126124 @ scala.annotation.varargs
@@ -129,13 +127,24 @@ final class DataFrameWriter private[sql](df: DataFrame) {
129127 this
130128 }
131129
130+ /**
131+ * Buckets the output by the given columns on the file system. If specified, the output is
132+ * laid out on the file system similar to Hive's bucketing scheme.
133+ *
134+ * @since 2.0
135+ */
132136 @ scala.annotation.varargs
133137 def bucketBy (numBuckets : Int , colName : String , colNames : String * ): DataFrameWriter = {
134138 this .numBuckets = Option (numBuckets)
135139 this .bucketingColumns = Option (colName +: colNames)
136140 this
137141 }
138142
143+ /**
144+ * Sorts the bucketed output by the given columns.
145+ *
146+ * @since 2.0
147+ */
139148 @ scala.annotation.varargs
140149 def sortBy (colName : String , colNames : String * ): DataFrameWriter = {
141150 this .sortingColumns = Option (colName +: colNames)
Original file line number Diff line number Diff line change @@ -327,9 +327,10 @@ private[sql] class DynamicPartitionWriterContainer(
327327 val getKey : InternalRow => UnsafeRow = if (bucketSpec.isEmpty) {
328328 val projection = UnsafeProjection .create(partitionColumns, inputSchema)
329329 row => projection(row)
330- } else {
330+ } else { // If it's bucketed, we should also consider bucket id as part of the key.
331331 val bucketColumns = bucketSpec.get.resolvedBucketingColumns(inputSchema)
332332 val getBucketKey = UnsafeProjection .create(bucketColumns, inputSchema)
333+ // Leave an empty int slot at the last of the result row, so that we can set bucket id later.
333334 val getResultRow = UnsafeProjection .create(partitionColumns :+ Literal (- 1 ), inputSchema)
334335 row => {
335336 val bucketId = math.abs(getBucketKey(row).hashCode()) % bucketSpec.get.numBuckets
@@ -341,7 +342,7 @@ private[sql] class DynamicPartitionWriterContainer(
341342
342343 val keySchema = if (bucketSpec.isEmpty) {
343344 StructType .fromAttributes(partitionColumns)
344- } else {
345+ } else { // If it's bucketed, we should also consider bucket id as part of the key.
345346 StructType .fromAttributes(partitionColumns).add(" bucketId" , IntegerType , nullable = false )
346347 }
347348
Original file line number Diff line number Diff line change @@ -120,8 +120,8 @@ class BucketedWriteSuite extends QueryTest with SQLTestUtils with TestHiveSingle
120120
121121 for (row <- rows) {
122122 assert(row.isInstanceOf [UnsafeRow ])
123- val actuaBucketId = math.abs(row.hashCode()) % 8
124- assert(actuaBucketId == bucketId)
123+ val actualBucketId = math.abs(row.hashCode()) % 8
124+ assert(actualBucketId == bucketId)
125125 }
126126 }
127127 }
@@ -151,8 +151,8 @@ class BucketedWriteSuite extends QueryTest with SQLTestUtils with TestHiveSingle
151151
152152 for (row <- rows) {
153153 assert(row.isInstanceOf [UnsafeRow ])
154- val actuaBucketId = math.abs(row.hashCode()) % 8
155- assert(actuaBucketId == bucketId)
154+ val actualBucketId = math.abs(row.hashCode()) % 8
155+ assert(actualBucketId == bucketId)
156156 }
157157 }
158158 }
@@ -183,8 +183,8 @@ class BucketedWriteSuite extends QueryTest with SQLTestUtils with TestHiveSingle
183183
184184 for (row <- rows) {
185185 assert(row.isInstanceOf [UnsafeRow ])
186- val actuaBucketId = math.abs(row.hashCode()) % 8
187- assert(actuaBucketId == bucketId)
186+ val actualBucketId = math.abs(row.hashCode()) % 8
187+ assert(actualBucketId == bucketId)
188188 }
189189 }
190190 }
You can’t perform that action at this time.
0 commit comments