Skip to content

Commit 3b95b51

Browse files
colinmjjGitHub Enterprise
authored andcommitted
[HADP-56688] Add configuration for hive bucket definition support (apache#701)
1 parent 544a31a commit 3b95b51

File tree

5 files changed

+21
-4
lines changed

5 files changed

+21
-4
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1522,6 +1522,13 @@ object SQLConf {
15221522
.booleanConf
15231523
.createWithDefault(true)
15241524

1525+
val HIVE_BUCKET_WRITE_COMPATIBLE =
1526+
buildConf("spark.sql.hive.bucketWriteCompatible")
1527+
.doc("When true, bucket number for hive table will be checked during the writing process")
1528+
.version("3.5.0")
1529+
.booleanConf
1530+
.createWithDefault(false)
1531+
15251532
val HIVE_FILESOURCE_PARTITION_FILE_CACHE_SIZE =
15261533
buildConf("spark.sql.hive.filesourcePartitionFileCacheSize")
15271534
.doc("When nonzero, enable caching of partition file metadata in memory. All tables share " +
@@ -5761,6 +5768,8 @@ class SQLConf extends Serializable with Logging with SqlApiConf {
57615768

57625769
def manageFilesourcePartitions: Boolean = getConf(HIVE_MANAGE_FILESOURCE_PARTITIONS)
57635770

5771+
def bucketWriteCompatible: Boolean = getConf(HIVE_BUCKET_WRITE_COMPATIBLE)
5772+
57645773
def filesourcePartitionFileCacheSize: Long = getConf(HIVE_FILESOURCE_PARTITION_FILE_CACHE_SIZE)
57655774

57665775
def caseSensitiveInferenceMode: HiveCaseSensitiveInferenceMode.Value =

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -196,11 +196,12 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
196196
QualifiedTableName(relation.tableMeta.database, relation.tableMeta.identifier.table)
197197

198198
val lazyPruningEnabled = sparkSession.sqlContext.conf.manageFilesourcePartitions
199+
val bucketWriteCompatible = sparkSession.sqlContext.conf.bucketWriteCompatible
199200
val tablePath = new Path(relation.tableMeta.location)
200201
val fileFormat = fileFormatClass.getConstructor().newInstance()
201202
val bucketSpec = relation.tableMeta.bucketSpec
202203
val (hiveOptions, hiveBucketSpec) =
203-
if (isWrite) {
204+
if (isWrite && bucketWriteCompatible) {
204205
(options.updated(BucketingUtils.optionForHiveCompatibleBucketWrite, "true"),
205206
bucketSpec)
206207
} else {

sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -338,7 +338,12 @@ object InsertIntoHiveTable extends V1WritesHiveUtils {
338338

339339
val partitionColumns = getDynamicPartitionColumns(table, partition, query)
340340
val bucketSpec = table.bucketSpec
341-
val options = getOptionsWithHiveBucketWrite(bucketSpec)
341+
val bucketWriteCompatible = sparkSession.sqlContext.conf.bucketWriteCompatible
342+
val options = if (bucketWriteCompatible) {
343+
getOptionsWithHiveBucketWrite(bucketSpec)
344+
} else {
345+
Map.empty[String, String]
346+
}
342347

343348
new InsertIntoHiveTable(table, partition, query, overwrite, ifPartitionNotExists,
344349
outputColumnNames, partitionColumns, bucketSpec, options, fileFormat, hiveTempPath)

sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -999,7 +999,8 @@ class InsertSuite extends QueryTest with TestHiveSingleton with BeforeAndAfter
999999
dataSource <- Seq("USING PARQUET", "STORED AS PARQUET")) {
10001000
withSQLConf(SQLConf.COALESCE_PARTITIONS_ENABLED.key -> "false",
10011001
SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM.key -> "2",
1002-
SQLConf.AUTO_REPARTITION_BEFORE_WRITING_ENABLED.key -> enabled) {
1002+
SQLConf.AUTO_REPARTITION_BEFORE_WRITING_ENABLED.key -> enabled,
1003+
SQLConf.HIVE_BUCKET_WRITE_COMPATIBLE.key -> "true") {
10031004
withTable("part_tab") {
10041005
sql(s"""CREATE TABLE part_tab(a int, b int, c int)
10051006
|$dataSource

sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteWithHiveSupportSuite.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,8 @@ class BucketedWriteWithHiveSupportSuite extends BucketedWriteSuite with TestHive
5353
Seq("true", "false").foreach { enableConvertMetastore =>
5454
withSQLConf(HiveUtils.CONVERT_METASTORE_PARQUET.key -> enableConvertMetastore,
5555
HiveUtils.CONVERT_METASTORE_ORC.key -> enableConvertMetastore,
56-
SQLConf.DYNAMIC_PARTITION_OVERWRITE_LOCK_ENABLED.key -> "true") {
56+
SQLConf.DYNAMIC_PARTITION_OVERWRITE_LOCK_ENABLED.key -> "true",
57+
SQLConf.HIVE_BUCKET_WRITE_COMPATIBLE.key -> "true") {
5758
withTable(table) {
5859
sql(
5960
s"""

0 commit comments

Comments
 (0)