Skip to content

Commit 291ce3a

Browse files
author
Nan Zhu
committed
address the comments
1 parent 4b0a85b commit 291ce3a

File tree

2 files changed

+42
-1
lines changed

2 files changed

+42
-1
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -264,7 +264,7 @@ object SQLConf {
264264
.createWithDefault(false)
265265

266266
val DISK_TO_MEMORY_SIZE_FACTOR = buildConf(
267-
"org.apache.spark.sql.execution.datasources.fileDataSizeFactor")
267+
"spark.sql.sources.compressionFactor")
268268
.internal()
269269
.doc("The result of multiplying this factor with the size of data source files is propagated " +
270270
"to serve as the stats to choose the best execution plan. In the case where the " +

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelationSuite.scala

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.datasources
2020
import java.io.{File, FilenameFilter}
2121

2222
import org.apache.spark.sql.QueryTest
23+
import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, SortMergeJoinExec}
2324
import org.apache.spark.sql.test.SharedSQLContext
2425

2526
class HadoopFsRelationSuite extends QueryTest with SharedSQLContext {
@@ -39,4 +40,44 @@ class HadoopFsRelationSuite extends QueryTest with SharedSQLContext {
3940
assert(df.queryExecution.logical.stats.sizeInBytes === BigInt(totalSize))
4041
}
4142
}
43+
44+
test("SPARK-22790: spark.sql.sources.compressionFactor takes effect") {
45+
import testImplicits._
46+
Seq(1.0, 0.5).foreach { compressionFactor =>
47+
withSQLConf("spark.sql.sources.compressionFactor" -> compressionFactor.toString,
48+
"spark.sql.autoBroadcastJoinThreshold" -> "400") {
49+
withTempPath { workDir =>
50+
// the file size is 740 bytes
51+
val workDirPath = workDir.getAbsolutePath
52+
val data1 = Seq(100, 200, 300, 400).toDF("count")
53+
data1.write.parquet(workDirPath + "/data1")
54+
val df1FromFile = spark.read.parquet(workDirPath + "/data1")
55+
val data2 = Seq(100, 200, 300, 400).toDF("count")
56+
data2.write.parquet(workDirPath + "/data2")
57+
val df2FromFile = spark.read.parquet(workDirPath + "/data2")
58+
val joinedDF = df1FromFile.join(df2FromFile, Seq("count"))
59+
if (compressionFactor == 0.5) {
60+
val bJoinExec = joinedDF.queryExecution.executedPlan.collect {
61+
case bJoin: BroadcastHashJoinExec => bJoin
62+
}
63+
assert(bJoinExec.nonEmpty)
64+
val smJoinExec = joinedDF.queryExecution.executedPlan.collect {
65+
case smJoin: SortMergeJoinExec => smJoin
66+
}
67+
assert(smJoinExec.isEmpty)
68+
} else {
69+
// compressionFactor is 1.0
70+
val bJoinExec = joinedDF.queryExecution.executedPlan.collect {
71+
case bJoin: BroadcastHashJoinExec => bJoin
72+
}
73+
assert(bJoinExec.isEmpty)
74+
val smJoinExec = joinedDF.queryExecution.executedPlan.collect {
75+
case smJoin: SortMergeJoinExec => smJoin
76+
}
77+
assert(smJoinExec.nonEmpty)
78+
}
79+
}
80+
}
81+
}
82+
}
4283
}

0 commit comments

Comments
 (0)