From fbe8253ed278f9b7bc017bec10138579ee2f9aa4 Mon Sep 17 00:00:00 2001 From: CodingCat Date: Mon, 7 Mar 2016 09:37:37 -0500 Subject: [PATCH 01/13] improve the doc for "spark.memory.offHeap.size" --- docs/configuration.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/configuration.md b/docs/configuration.md index 1189aea2aa71..1b858a6b610e 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -1093,7 +1093,7 @@ Apart from these, the following properties are also available, and may be useful spark.memory.offHeap.size 0 - The absolute amount of memory in bytes which can be used for off-heap allocation. + The absolute amount of memory (in terms by bytes) which can be used for off-heap allocation. This setting has no impact on heap memory usage, so if your executors' total memory consumption must fit within some hard limit then be sure to shrink your JVM heap size accordingly. This must be set to a positive value when spark.memory.offHeap.enabled=true. From f40dde4b86f7555febd3151a956ef238ebc5cd3f Mon Sep 17 00:00:00 2001 From: CodingCat Date: Mon, 7 Mar 2016 14:00:16 -0500 Subject: [PATCH 02/13] fix --- docs/configuration.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/configuration.md b/docs/configuration.md index 1b858a6b610e..1189aea2aa71 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -1093,7 +1093,7 @@ Apart from these, the following properties are also available, and may be useful spark.memory.offHeap.size 0 - The absolute amount of memory (in terms by bytes) which can be used for off-heap allocation. + The absolute amount of memory in bytes which can be used for off-heap allocation. This setting has no impact on heap memory usage, so if your executors' total memory consumption must fit within some hard limit then be sure to shrink your JVM heap size accordingly. This must be set to a positive value when spark.memory.offHeap.enabled=true. From e868b6f11520463653278b2e40c5e708ce44209d Mon Sep 17 00:00:00 2001 From: CodingCat Date: Mon, 7 Mar 2016 09:37:37 -0500 Subject: [PATCH 03/13] improve the doc for "spark.memory.offHeap.size" --- docs/configuration.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/configuration.md b/docs/configuration.md index 1189aea2aa71..1b858a6b610e 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -1093,7 +1093,7 @@ Apart from these, the following properties are also available, and may be useful spark.memory.offHeap.size 0 - The absolute amount of memory in bytes which can be used for off-heap allocation. + The absolute amount of memory (in terms by bytes) which can be used for off-heap allocation. This setting has no impact on heap memory usage, so if your executors' total memory consumption must fit within some hard limit then be sure to shrink your JVM heap size accordingly. This must be set to a positive value when spark.memory.offHeap.enabled=true. From 400f71eb5aa1595559665198eaa03cc33ae0d47c Mon Sep 17 00:00:00 2001 From: CodingCat Date: Mon, 7 Mar 2016 14:00:16 -0500 Subject: [PATCH 04/13] fix --- docs/configuration.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/configuration.md b/docs/configuration.md index 1b858a6b610e..1189aea2aa71 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -1093,7 +1093,7 @@ Apart from these, the following properties are also available, and may be useful spark.memory.offHeap.size 0 - The absolute amount of memory (in terms by bytes) which can be used for off-heap allocation. + The absolute amount of memory in bytes which can be used for off-heap allocation. This setting has no impact on heap memory usage, so if your executors' total memory consumption must fit within some hard limit then be sure to shrink your JVM heap size accordingly. This must be set to a positive value when spark.memory.offHeap.enabled=true. From 2ba0c26ee00f76f6eb51f45c860996cdac1308d0 Mon Sep 17 00:00:00 2001 From: CodingCat Date: Sun, 24 Dec 2017 19:21:02 -0800 Subject: [PATCH 05/13] add a configurable factor to describe HadoopFsRelation's size --- .../org/apache/spark/sql/internal/SQLConf.scala | 12 ++++++++++++ .../sql/execution/datasources/HadoopFsRelation.scala | 12 +++++++++++- 2 files changed, 23 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 5c61f10bb71a..b01fba99a3e5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -263,6 +263,16 @@ object SQLConf { .booleanConf .createWithDefault(false) + val HADOOPFSRELATION_SIZE_FACTOR = buildConf( + "org.apache.spark.sql.execution.datasources.sizeFactor") + .internal() + .doc("The result of multiplying this factor with the size of data source files is propagated" + + " to serve as the stats to choose the best execution plan. In the case where the " + + " the in-disk and in-memory size of data is significantly different, users can adjust this" + + " factor for a better choice of the execution plan. The default value is 1.0.") + .doubleConf + .createWithDefault(1.0) + val PARQUET_SCHEMA_MERGING_ENABLED = buildConf("spark.sql.parquet.mergeSchema") .doc("When true, the Parquet data source merges schemas collected from all data files, " + "otherwise the schema is picked from the summary file or a random data file " + @@ -1241,6 +1251,8 @@ class SQLConf extends Serializable with Logging { def escapedStringLiterals: Boolean = getConf(ESCAPED_STRING_LITERALS) + def hadoopFSSizeFactor: Double = getConf(HADOOPFSRELATION_SIZE_FACTOR) + def stringRedationPattern: Option[Regex] = SQL_STRING_REDACTION_PATTERN.readFrom(reader) /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala index 89d8a85a9cbd..bd0d912a1596 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala @@ -60,6 +60,8 @@ case class HadoopFsRelation( } } + private val hadoopFSSizeFactor = sqlContext.conf.hadoopFSSizeFactor + val overlappedPartCols = mutable.Map.empty[String, StructField] partitionSchema.foreach { partitionField => if (dataSchema.exists(getColName(_) == getColName(partitionField))) { @@ -82,7 +84,15 @@ case class HadoopFsRelation( } } - override def sizeInBytes: Long = location.sizeInBytes + override def sizeInBytes: Long = { + val size = location.sizeInBytes * hadoopFSSizeFactor + if (size > Long.MaxValue) { + Long.MaxValue + } else { + size.toLong + } + } + override def inputFiles: Array[String] = location.inputFiles } From 669704cdcb208c00ab202e8aef27ffc61271502e Mon Sep 17 00:00:00 2001 From: CodingCat Date: Mon, 25 Dec 2017 09:26:26 -0800 Subject: [PATCH 06/13] check value > 0 --- .../src/main/scala/org/apache/spark/sql/internal/SQLConf.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index b01fba99a3e5..dc08915b087a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -271,6 +271,7 @@ object SQLConf { " the in-disk and in-memory size of data is significantly different, users can adjust this" + " factor for a better choice of the execution plan. The default value is 1.0.") .doubleConf + .checkValue(_ > 0, "the value of sizeFactor must be larger than 0") .createWithDefault(1.0) val PARQUET_SCHEMA_MERGING_ENABLED = buildConf("spark.sql.parquet.mergeSchema") From e25aff59a2ef27747d53c27e498ef7b8d0f74f27 Mon Sep 17 00:00:00 2001 From: CodingCat Date: Sat, 30 Dec 2017 10:05:30 -0800 Subject: [PATCH 07/13] address the comments --- .../scala/org/apache/spark/sql/internal/SQLConf.scala | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index dc08915b087a..0bb7500d2a2c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -249,7 +249,7 @@ object SQLConf { val CONSTRAINT_PROPAGATION_ENABLED = buildConf("spark.sql.constraintPropagation.enabled") .internal() .doc("When true, the query optimizer will infer and propagate data constraints in the query " + - "plan to optimize them. Constraint propagation can sometimes be computationally expensive" + + "plan to optimize them. Constraint propagation can sometimes be computationally expensive " + "for certain kinds of query plans (such as those with a large number of predicates and " + "aliases) which might negatively impact overall runtime.") .booleanConf @@ -266,10 +266,10 @@ object SQLConf { val HADOOPFSRELATION_SIZE_FACTOR = buildConf( "org.apache.spark.sql.execution.datasources.sizeFactor") .internal() - .doc("The result of multiplying this factor with the size of data source files is propagated" + - " to serve as the stats to choose the best execution plan. In the case where the " + - " the in-disk and in-memory size of data is significantly different, users can adjust this" + - " factor for a better choice of the execution plan. The default value is 1.0.") + .doc("The result of multiplying this factor with the size of data source files is propagated " + + "to serve as the stats to choose the best execution plan. In the case where the " + + "in-disk and in-memory size of data is significantly different, users can adjust this " + + "factor for a better choice of the execution plan. The default value is 1.0.") .doubleConf .checkValue(_ > 0, "the value of sizeFactor must be larger than 0") .createWithDefault(1.0) From 36e8ac4e2d37ab6c31a88e53689d6edb58b3b085 Mon Sep 17 00:00:00 2001 From: Nan Zhu Date: Mon, 1 Jan 2018 19:40:18 -0800 Subject: [PATCH 08/13] address the comments --- .../scala/org/apache/spark/sql/internal/SQLConf.scala | 4 ++-- .../sql/execution/datasources/HadoopFsRelation.scala | 10 ++-------- 2 files changed, 4 insertions(+), 10 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 0bb7500d2a2c..4c5234416b20 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -263,7 +263,7 @@ object SQLConf { .booleanConf .createWithDefault(false) - val HADOOPFSRELATION_SIZE_FACTOR = buildConf( + val DISK_TO_MEMORY_SIZE_FACTOR = buildConf( "org.apache.spark.sql.execution.datasources.sizeFactor") .internal() .doc("The result of multiplying this factor with the size of data source files is propagated " + @@ -1252,7 +1252,7 @@ class SQLConf extends Serializable with Logging { def escapedStringLiterals: Boolean = getConf(ESCAPED_STRING_LITERALS) - def hadoopFSSizeFactor: Double = getConf(HADOOPFSRELATION_SIZE_FACTOR) + def sizeToMemorySizeFactor: Double = getConf(DISK_TO_MEMORY_SIZE_FACTOR) def stringRedationPattern: Option[Regex] = SQL_STRING_REDACTION_PATTERN.readFrom(reader) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala index bd0d912a1596..c4fee742ffbf 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala @@ -60,8 +60,6 @@ case class HadoopFsRelation( } } - private val hadoopFSSizeFactor = sqlContext.conf.hadoopFSSizeFactor - val overlappedPartCols = mutable.Map.empty[String, StructField] partitionSchema.foreach { partitionField => if (dataSchema.exists(getColName(_) == getColName(partitionField))) { @@ -85,12 +83,8 @@ case class HadoopFsRelation( } override def sizeInBytes: Long = { - val size = location.sizeInBytes * hadoopFSSizeFactor - if (size > Long.MaxValue) { - Long.MaxValue - } else { - size.toLong - } + val sizeFactor = sqlContext.conf.sizeToMemorySizeFactor + (location.sizeInBytes * sizeFactor).toLong } From 4b0a85b940406068dfca778bd6279faec932a430 Mon Sep 17 00:00:00 2001 From: Nan Zhu Date: Wed, 3 Jan 2018 09:00:28 -0800 Subject: [PATCH 09/13] address more comments --- .../main/scala/org/apache/spark/sql/internal/SQLConf.scala | 6 +++--- .../spark/sql/execution/datasources/HadoopFsRelation.scala | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 4c5234416b20..1836e174fdc0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -264,14 +264,14 @@ object SQLConf { .createWithDefault(false) val DISK_TO_MEMORY_SIZE_FACTOR = buildConf( - "org.apache.spark.sql.execution.datasources.sizeFactor") + "org.apache.spark.sql.execution.datasources.fileDataSizeFactor") .internal() .doc("The result of multiplying this factor with the size of data source files is propagated " + "to serve as the stats to choose the best execution plan. In the case where the " + "in-disk and in-memory size of data is significantly different, users can adjust this " + "factor for a better choice of the execution plan. The default value is 1.0.") .doubleConf - .checkValue(_ > 0, "the value of sizeFactor must be larger than 0") + .checkValue(_ > 0, "the value of fileDataSizeFactor must be larger than 0") .createWithDefault(1.0) val PARQUET_SCHEMA_MERGING_ENABLED = buildConf("spark.sql.parquet.mergeSchema") @@ -1252,7 +1252,7 @@ class SQLConf extends Serializable with Logging { def escapedStringLiterals: Boolean = getConf(ESCAPED_STRING_LITERALS) - def sizeToMemorySizeFactor: Double = getConf(DISK_TO_MEMORY_SIZE_FACTOR) + def diskToMemorySizeFactor: Double = getConf(DISK_TO_MEMORY_SIZE_FACTOR) def stringRedationPattern: Option[Regex] = SQL_STRING_REDACTION_PATTERN.readFrom(reader) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala index c4fee742ffbf..1d1aaa6289cb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala @@ -83,7 +83,7 @@ case class HadoopFsRelation( } override def sizeInBytes: Long = { - val sizeFactor = sqlContext.conf.sizeToMemorySizeFactor + val sizeFactor = sqlContext.conf.diskToMemorySizeFactor (location.sizeInBytes * sizeFactor).toLong } From 291ce3a70d903c6d4608c0a1b6f0a27f5526a79f Mon Sep 17 00:00:00 2001 From: Nan Zhu Date: Sat, 6 Jan 2018 08:10:37 -0800 Subject: [PATCH 10/13] address the comments --- .../apache/spark/sql/internal/SQLConf.scala | 2 +- .../datasources/HadoopFsRelationSuite.scala | 41 +++++++++++++++++++ 2 files changed, 42 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 1836e174fdc0..bdba01919f40 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -264,7 +264,7 @@ object SQLConf { .createWithDefault(false) val DISK_TO_MEMORY_SIZE_FACTOR = buildConf( - "org.apache.spark.sql.execution.datasources.fileDataSizeFactor") + "spark.sql.sources.compressionFactor") .internal() .doc("The result of multiplying this factor with the size of data source files is propagated " + "to serve as the stats to choose the best execution plan. In the case where the " + diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelationSuite.scala index caf03885e387..d9d9a353ae58 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelationSuite.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.datasources import java.io.{File, FilenameFilter} import org.apache.spark.sql.QueryTest +import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, SortMergeJoinExec} import org.apache.spark.sql.test.SharedSQLContext class HadoopFsRelationSuite extends QueryTest with SharedSQLContext { @@ -39,4 +40,44 @@ class HadoopFsRelationSuite extends QueryTest with SharedSQLContext { assert(df.queryExecution.logical.stats.sizeInBytes === BigInt(totalSize)) } } + + test("SPARK-22790: spark.sql.sources.compressionFactor takes effect") { + import testImplicits._ + Seq(1.0, 0.5).foreach { compressionFactor => + withSQLConf("spark.sql.sources.compressionFactor" -> compressionFactor.toString, + "spark.sql.autoBroadcastJoinThreshold" -> "400") { + withTempPath { workDir => + // the file size is 740 bytes + val workDirPath = workDir.getAbsolutePath + val data1 = Seq(100, 200, 300, 400).toDF("count") + data1.write.parquet(workDirPath + "/data1") + val df1FromFile = spark.read.parquet(workDirPath + "/data1") + val data2 = Seq(100, 200, 300, 400).toDF("count") + data2.write.parquet(workDirPath + "/data2") + val df2FromFile = spark.read.parquet(workDirPath + "/data2") + val joinedDF = df1FromFile.join(df2FromFile, Seq("count")) + if (compressionFactor == 0.5) { + val bJoinExec = joinedDF.queryExecution.executedPlan.collect { + case bJoin: BroadcastHashJoinExec => bJoin + } + assert(bJoinExec.nonEmpty) + val smJoinExec = joinedDF.queryExecution.executedPlan.collect { + case smJoin: SortMergeJoinExec => smJoin + } + assert(smJoinExec.isEmpty) + } else { + // compressionFactor is 1.0 + val bJoinExec = joinedDF.queryExecution.executedPlan.collect { + case bJoin: BroadcastHashJoinExec => bJoin + } + assert(bJoinExec.isEmpty) + val smJoinExec = joinedDF.queryExecution.executedPlan.collect { + case smJoin: SortMergeJoinExec => smJoin + } + assert(smJoinExec.nonEmpty) + } + } + } + } + } } From 523008192cb1476a68cf3808f46574598fcc6d2d Mon Sep 17 00:00:00 2001 From: Nan Zhu Date: Wed, 10 Jan 2018 20:35:04 -0800 Subject: [PATCH 11/13] address the comments --- .../org/apache/spark/sql/internal/SQLConf.scala | 12 +++++------- .../sql/execution/datasources/HadoopFsRelation.scala | 2 +- 2 files changed, 6 insertions(+), 8 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index bdba01919f40..7432897f4ae5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -263,13 +263,11 @@ object SQLConf { .booleanConf .createWithDefault(false) - val DISK_TO_MEMORY_SIZE_FACTOR = buildConf( - "spark.sql.sources.compressionFactor") + val FILE_COMRESSION_FACTOR = buildConf("spark.sql.sources.fileCompressionFactor") .internal() - .doc("The result of multiplying this factor with the size of data source files is propagated " + - "to serve as the stats to choose the best execution plan. In the case where the " + - "in-disk and in-memory size of data is significantly different, users can adjust this " + - "factor for a better choice of the execution plan. The default value is 1.0.") + .doc("When estimating the output data size of a table scan, multiply the file size with this " + + "factor as the estimated data size, in case the data is compressed in the file and lead to" + + " a heavily underestimated result.") .doubleConf .checkValue(_ > 0, "the value of fileDataSizeFactor must be larger than 0") .createWithDefault(1.0) @@ -1252,7 +1250,7 @@ class SQLConf extends Serializable with Logging { def escapedStringLiterals: Boolean = getConf(ESCAPED_STRING_LITERALS) - def diskToMemorySizeFactor: Double = getConf(DISK_TO_MEMORY_SIZE_FACTOR) + def compressionFactor: Double = getConf(FILE_COMRESSION_FACTOR) def stringRedationPattern: Option[Regex] = SQL_STRING_REDACTION_PATTERN.readFrom(reader) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala index 1d1aaa6289cb..1eed6b2b9a8f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala @@ -83,7 +83,7 @@ case class HadoopFsRelation( } override def sizeInBytes: Long = { - val sizeFactor = sqlContext.conf.diskToMemorySizeFactor + val sizeFactor = sqlContext.conf.compressionFactor (location.sizeInBytes * sizeFactor).toLong } From 6fe85892ca6c53b3aa965951dc7ef51df9d068e2 Mon Sep 17 00:00:00 2001 From: Nan Zhu Date: Thu, 11 Jan 2018 06:44:22 -0800 Subject: [PATCH 12/13] fix the test --- .../spark/sql/execution/datasources/HadoopFsRelationSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelationSuite.scala index d9d9a353ae58..c1f2c18d1417 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelationSuite.scala @@ -44,7 +44,7 @@ class HadoopFsRelationSuite extends QueryTest with SharedSQLContext { test("SPARK-22790: spark.sql.sources.compressionFactor takes effect") { import testImplicits._ Seq(1.0, 0.5).foreach { compressionFactor => - withSQLConf("spark.sql.sources.compressionFactor" -> compressionFactor.toString, + withSQLConf("spark.sql.sources.fileCompressionFactor" -> compressionFactor.toString, "spark.sql.autoBroadcastJoinThreshold" -> "400") { withTempPath { workDir => // the file size is 740 bytes From c584c61fb90851fd5cb74869c62f0935c93fed14 Mon Sep 17 00:00:00 2001 From: Nan Zhu Date: Thu, 11 Jan 2018 11:53:14 -0800 Subject: [PATCH 13/13] renaming --- .../main/scala/org/apache/spark/sql/internal/SQLConf.scala | 2 +- .../spark/sql/execution/datasources/HadoopFsRelation.scala | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 7432897f4ae5..d0839669adc4 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1250,7 +1250,7 @@ class SQLConf extends Serializable with Logging { def escapedStringLiterals: Boolean = getConf(ESCAPED_STRING_LITERALS) - def compressionFactor: Double = getConf(FILE_COMRESSION_FACTOR) + def fileCompressionFactor: Double = getConf(FILE_COMRESSION_FACTOR) def stringRedationPattern: Option[Regex] = SQL_STRING_REDACTION_PATTERN.readFrom(reader) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala index 1eed6b2b9a8f..6b3463852977 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala @@ -83,8 +83,8 @@ case class HadoopFsRelation( } override def sizeInBytes: Long = { - val sizeFactor = sqlContext.conf.compressionFactor - (location.sizeInBytes * sizeFactor).toLong + val compressionFactor = sqlContext.conf.fileCompressionFactor + (location.sizeInBytes * compressionFactor).toLong }