Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ object SQLConf {
val CONSTRAINT_PROPAGATION_ENABLED = buildConf("spark.sql.constraintPropagation.enabled")
.internal()
.doc("When true, the query optimizer will infer and propagate data constraints in the query " +
"plan to optimize them. Constraint propagation can sometimes be computationally expensive" +
"plan to optimize them. Constraint propagation can sometimes be computationally expensive " +
"for certain kinds of query plans (such as those with a large number of predicates and " +
"aliases) which might negatively impact overall runtime.")
.booleanConf
Expand All @@ -263,6 +263,15 @@ object SQLConf {
.booleanConf
.createWithDefault(false)

val FILE_COMRESSION_FACTOR = buildConf("spark.sql.sources.fileCompressionFactor")
.internal()
.doc("When estimating the output data size of a table scan, multiply the file size with this " +
"factor as the estimated data size, in case the data is compressed in the file and lead to" +
" a heavily underestimated result.")
.doubleConf
.checkValue(_ > 0, "the value of fileDataSizeFactor must be larger than 0")
.createWithDefault(1.0)

val PARQUET_SCHEMA_MERGING_ENABLED = buildConf("spark.sql.parquet.mergeSchema")
.doc("When true, the Parquet data source merges schemas collected from all data files, " +
"otherwise the schema is picked from the summary file or a random data file " +
Expand Down Expand Up @@ -1241,6 +1250,8 @@ class SQLConf extends Serializable with Logging {

def escapedStringLiterals: Boolean = getConf(ESCAPED_STRING_LITERALS)

def fileCompressionFactor: Double = getConf(FILE_COMRESSION_FACTOR)

def stringRedationPattern: Option[Regex] = SQL_STRING_REDACTION_PATTERN.readFrom(reader)

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,11 @@ case class HadoopFsRelation(
}
}

override def sizeInBytes: Long = location.sizeInBytes
override def sizeInBytes: Long = {
val compressionFactor = sqlContext.conf.fileCompressionFactor
(location.sizeInBytes * compressionFactor).toLong
}


override def inputFiles: Array[String] = location.inputFiles
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.datasources
import java.io.{File, FilenameFilter}

import org.apache.spark.sql.QueryTest
import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, SortMergeJoinExec}
import org.apache.spark.sql.test.SharedSQLContext

class HadoopFsRelationSuite extends QueryTest with SharedSQLContext {
Expand All @@ -39,4 +40,44 @@ class HadoopFsRelationSuite extends QueryTest with SharedSQLContext {
assert(df.queryExecution.logical.stats.sizeInBytes === BigInt(totalSize))
}
}

test("SPARK-22790: spark.sql.sources.compressionFactor takes effect") {
import testImplicits._
Seq(1.0, 0.5).foreach { compressionFactor =>
withSQLConf("spark.sql.sources.fileCompressionFactor" -> compressionFactor.toString,
"spark.sql.autoBroadcastJoinThreshold" -> "400") {
withTempPath { workDir =>
// the file size is 740 bytes
val workDirPath = workDir.getAbsolutePath
val data1 = Seq(100, 200, 300, 400).toDF("count")
data1.write.parquet(workDirPath + "/data1")
val df1FromFile = spark.read.parquet(workDirPath + "/data1")
val data2 = Seq(100, 200, 300, 400).toDF("count")
data2.write.parquet(workDirPath + "/data2")
val df2FromFile = spark.read.parquet(workDirPath + "/data2")
val joinedDF = df1FromFile.join(df2FromFile, Seq("count"))
if (compressionFactor == 0.5) {
val bJoinExec = joinedDF.queryExecution.executedPlan.collect {
case bJoin: BroadcastHashJoinExec => bJoin
}
assert(bJoinExec.nonEmpty)
val smJoinExec = joinedDF.queryExecution.executedPlan.collect {
case smJoin: SortMergeJoinExec => smJoin
}
assert(smJoinExec.isEmpty)
} else {
// compressionFactor is 1.0
val bJoinExec = joinedDF.queryExecution.executedPlan.collect {
case bJoin: BroadcastHashJoinExec => bJoin
}
assert(bJoinExec.isEmpty)
val smJoinExec = joinedDF.queryExecution.executedPlan.collect {
case smJoin: SortMergeJoinExec => smJoin
}
assert(smJoinExec.nonEmpty)
}
}
}
}
}
}