From b2fb1d25804b7bdbe1a767306a319dc748965bce Mon Sep 17 00:00:00 2001 From: CodingCat Date: Mon, 7 Mar 2016 09:37:37 -0500 Subject: [PATCH 01/18] improve the doc for "spark.memory.offHeap.size" --- docs/configuration.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/configuration.md b/docs/configuration.md index ef061dd39dcba..c44c1d7cd95f8 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -1073,7 +1073,7 @@ Apart from these, the following properties are also available, and may be useful spark.memory.offHeap.size 0 - The absolute amount of memory in bytes which can be used for off-heap allocation. + The absolute amount of memory (in terms by bytes) which can be used for off-heap allocation. This setting has no impact on heap memory usage, so if your executors' total memory consumption must fit within some hard limit then be sure to shrink your JVM heap size accordingly. This must be set to a positive value when spark.memory.offHeap.enabled=true. From 0971900d562cb1a18af6f7de02bb8eb95637a640 Mon Sep 17 00:00:00 2001 From: CodingCat Date: Mon, 7 Mar 2016 14:00:16 -0500 Subject: [PATCH 02/18] fix --- docs/configuration.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/configuration.md b/docs/configuration.md index c44c1d7cd95f8..ef061dd39dcba 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -1073,7 +1073,7 @@ Apart from these, the following properties are also available, and may be useful spark.memory.offHeap.size 0 - The absolute amount of memory (in terms by bytes) which can be used for off-heap allocation. + The absolute amount of memory in bytes which can be used for off-heap allocation. This setting has no impact on heap memory usage, so if your executors' total memory consumption must fit within some hard limit then be sure to shrink your JVM heap size accordingly. This must be set to a positive value when spark.memory.offHeap.enabled=true. From 32f7c74a9b5cf4f19e7d14357bb87064383e11b3 Mon Sep 17 00:00:00 2001 From: CodingCat Date: Fri, 1 Dec 2017 15:05:35 -0800 Subject: [PATCH 03/18] use cbo stats in inmemoryrelation --- .../execution/columnar/InMemoryRelation.scala | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala index a1c62a729900e..ae35e2a443a24 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala @@ -27,6 +27,8 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.sql.catalyst.plans.logical.Statistics import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.execution.datasources.LogicalRelation +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.storage.StorageLevel import org.apache.spark.util.LongAccumulator @@ -71,9 +73,17 @@ case class InMemoryRelation( override def computeStats(): Statistics = { if (batchStats.value == 0L) { - // Underlying columnar RDD hasn't been materialized, no useful statistics information - // available, return the default statistics. - Statistics(sizeInBytes = child.sqlContext.conf.defaultSizeInBytes) + children.filter(_.isInstanceOf[LogicalRelation]) match { + case Seq(c @ LogicalRelation(_, _, _, _), _) if SQLConf.CBO_ENABLED => + val stats = c.computeStats() + if (stats.rowCount.isDefined) { + stats + } else { + Statistics(sizeInBytes = child.sqlContext.conf.defaultSizeInBytes) + } + case _ => + Statistics(sizeInBytes = child.sqlContext.conf.defaultSizeInBytes) + } } else { Statistics(sizeInBytes = batchStats.value.longValue) } From 2082c0ed776206db4f064b936d9f560a4633b6a0 Mon Sep 17 00:00:00 2001 From: CodingCat Date: Fri, 1 Dec 2017 17:19:52 -0800 Subject: [PATCH 04/18] fix compilation issue --- .../apache/spark/sql/execution/columnar/InMemoryRelation.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala index ae35e2a443a24..761d00cb82647 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala @@ -74,7 +74,7 @@ case class InMemoryRelation( override def computeStats(): Statistics = { if (batchStats.value == 0L) { children.filter(_.isInstanceOf[LogicalRelation]) match { - case Seq(c @ LogicalRelation(_, _, _, _), _) if SQLConf.CBO_ENABLED => + case Seq(c @ LogicalRelation(_, _, _, _), _) if c.conf.cboEnabled => val stats = c.computeStats() if (stats.rowCount.isDefined) { stats From bc708178eb84cb21343efa5eda60a625ddf80799 Mon Sep 17 00:00:00 2001 From: CodingCat Date: Sat, 2 Dec 2017 09:48:28 -0800 Subject: [PATCH 05/18] set stats in cachedmanager --- .../spark/sql/execution/CacheManager.scala | 18 +++++++------- .../execution/columnar/InMemoryRelation.scala | 24 +++++++++---------- 2 files changed, 21 insertions(+), 21 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala index 5a1d680c99f66..d1d9861db5c65 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala @@ -94,14 +94,16 @@ class CacheManager extends Logging { logWarning("Asked to cache already cached data.") } else { val sparkSession = query.sparkSession - cachedData.add(CachedData( - planToCache, - InMemoryRelation( - sparkSession.sessionState.conf.useCompression, - sparkSession.sessionState.conf.columnBatchSize, - storageLevel, - sparkSession.sessionState.executePlan(planToCache).executedPlan, - tableName))) + val inMemoryRelation = InMemoryRelation( + sparkSession.sessionState.conf.useCompression, + sparkSession.sessionState.conf.columnBatchSize, + storageLevel, + sparkSession.sessionState.executePlan(planToCache).executedPlan, + tableName) + if (planToCache.conf.cboEnabled && planToCache.stats.rowCount.isDefined) { + inMemoryRelation.setStatsFromCachedPlan(planToCache) + } + cachedData.add(CachedData(planToCache, inMemoryRelation)) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala index 761d00cb82647..25cda8027824d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala @@ -25,15 +25,15 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.logical -import org.apache.spark.sql.catalyst.plans.logical.Statistics +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Statistics} import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.datasources.LogicalRelation -import org.apache.spark.sql.internal.SQLConf import org.apache.spark.storage.StorageLevel import org.apache.spark.util.LongAccumulator object InMemoryRelation { + def apply( useCompression: Boolean, batchSize: Int, @@ -73,22 +73,20 @@ case class InMemoryRelation( override def computeStats(): Statistics = { if (batchStats.value == 0L) { - children.filter(_.isInstanceOf[LogicalRelation]) match { - case Seq(c @ LogicalRelation(_, _, _, _), _) if c.conf.cboEnabled => - val stats = c.computeStats() - if (stats.rowCount.isDefined) { - stats - } else { - Statistics(sizeInBytes = child.sqlContext.conf.defaultSizeInBytes) - } - case _ => - Statistics(sizeInBytes = child.sqlContext.conf.defaultSizeInBytes) - } + inheritedStats.getOrElse(Statistics(sizeInBytes = child.sqlContext.conf.defaultSizeInBytes)) } else { Statistics(sizeInBytes = batchStats.value.longValue) } } + private var inheritedStats: Option[Statistics] = _ + + private[execution] def setStatsFromCachedPlan(planToCache: LogicalPlan): Unit = { + require(planToCache.conf.cboEnabled, "you cannot use the stats of cached plan in" + + " InMemoryRelation without cbo enabled") + inheritedStats = Some(planToCache.stats) + } + // If the cached column buffers were not passed in, we calculate them in the constructor. // As in Spark, the actual work of caching is lazy. if (_cachedColumnBuffers == null) { From 4650307e2f6fbd5bffdef5f2fdf0b7f39f9fd0ef Mon Sep 17 00:00:00 2001 From: CodingCat Date: Sat, 2 Dec 2017 11:33:32 -0800 Subject: [PATCH 06/18] fix the typo --- .../apache/spark/sql/execution/columnar/InMemoryRelation.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala index 25cda8027824d..3507009edddce 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala @@ -79,7 +79,7 @@ case class InMemoryRelation( } } - private var inheritedStats: Option[Statistics] = _ + private var inheritedStats: Option[Statistics] = None private[execution] def setStatsFromCachedPlan(planToCache: LogicalPlan): Unit = { require(planToCache.conf.cboEnabled, "you cannot use the stats of cached plan in" + From b6a36be7854e6510f0f2e013f8e46c76053bccf7 Mon Sep 17 00:00:00 2001 From: CodingCat Date: Tue, 5 Dec 2017 09:16:05 -0800 Subject: [PATCH 07/18] address the comments --- .../spark/sql/execution/CacheManager.scala | 23 +++++++++++------- .../execution/columnar/InMemoryRelation.scala | 24 +++++++------------ 2 files changed, 23 insertions(+), 24 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala index d1d9861db5c65..1bbf315244ed4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala @@ -22,11 +22,10 @@ import java.util.concurrent.locks.ReentrantReadWriteLock import scala.collection.JavaConverters._ import org.apache.hadoop.fs.{FileSystem, Path} - import org.apache.spark.internal.Logging import org.apache.spark.sql.{Dataset, SparkSession} import org.apache.spark.sql.catalyst.expressions.SubqueryExpression -import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Statistics} import org.apache.spark.sql.execution.columnar.InMemoryRelation import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation} import org.apache.spark.storage.StorageLevel @@ -80,6 +79,14 @@ class CacheManager extends Logging { cachedData.isEmpty } + private def extractStatsOfPlanForCache(plan: LogicalPlan): Option[Statistics] = { + if (plan.conf.cboEnabled && plan.stats.rowCount.isDefined) { + Some(plan.stats) + } else { + None + } + } + /** * Caches the data produced by the logical representation of the given [[Dataset]]. * Unlike `RDD.cache()`, the default storage level is set to be `MEMORY_AND_DISK` because @@ -96,13 +103,10 @@ class CacheManager extends Logging { val sparkSession = query.sparkSession val inMemoryRelation = InMemoryRelation( sparkSession.sessionState.conf.useCompression, - sparkSession.sessionState.conf.columnBatchSize, - storageLevel, + sparkSession.sessionState.conf.columnBatchSize, storageLevel, sparkSession.sessionState.executePlan(planToCache).executedPlan, - tableName) - if (planToCache.conf.cboEnabled && planToCache.stats.rowCount.isDefined) { - inMemoryRelation.setStatsFromCachedPlan(planToCache) - } + tableName, + extractStatsOfPlanForCache(planToCache)) cachedData.add(CachedData(planToCache, inMemoryRelation)) } } @@ -150,7 +154,8 @@ class CacheManager extends Logging { batchSize = cd.cachedRepresentation.batchSize, storageLevel = cd.cachedRepresentation.storageLevel, child = spark.sessionState.executePlan(cd.plan).executedPlan, - tableName = cd.cachedRepresentation.tableName) + tableName = cd.cachedRepresentation.tableName, + stats = extractStatsOfPlanForCache(cd.plan)) needToRecache += cd.copy(cachedRepresentation = newCache) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala index 3507009edddce..b1e0ffafb90fa 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala @@ -25,22 +25,22 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.logical -import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Statistics} +import org.apache.spark.sql.catalyst.plans.logical.Statistics import org.apache.spark.sql.execution.SparkPlan -import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.spark.storage.StorageLevel import org.apache.spark.util.LongAccumulator object InMemoryRelation { - def apply( useCompression: Boolean, batchSize: Int, storageLevel: StorageLevel, child: SparkPlan, - tableName: Option[String]): InMemoryRelation = - new InMemoryRelation(child.output, useCompression, batchSize, storageLevel, child, tableName)() + tableName: Option[String], + stats: Option[Statistics]): InMemoryRelation = + new InMemoryRelation(child.output, useCompression, batchSize, storageLevel, child, tableName)( + statsOfPlanToCache = stats) } @@ -62,7 +62,8 @@ case class InMemoryRelation( @transient child: SparkPlan, tableName: Option[String])( @transient var _cachedColumnBuffers: RDD[CachedBatch] = null, - val batchStats: LongAccumulator = child.sqlContext.sparkContext.longAccumulator) + val batchStats: LongAccumulator = child.sqlContext.sparkContext.longAccumulator, + statsOfPlanToCache: Option[Statistics] = None) extends logical.LeafNode with MultiInstanceRelation { override protected def innerChildren: Seq[SparkPlan] = Seq(child) @@ -73,20 +74,13 @@ case class InMemoryRelation( override def computeStats(): Statistics = { if (batchStats.value == 0L) { - inheritedStats.getOrElse(Statistics(sizeInBytes = child.sqlContext.conf.defaultSizeInBytes)) + statsOfPlanToCache.getOrElse(Statistics(sizeInBytes = + child.sqlContext.conf.defaultSizeInBytes)) } else { Statistics(sizeInBytes = batchStats.value.longValue) } } - private var inheritedStats: Option[Statistics] = None - - private[execution] def setStatsFromCachedPlan(planToCache: LogicalPlan): Unit = { - require(planToCache.conf.cboEnabled, "you cannot use the stats of cached plan in" + - " InMemoryRelation without cbo enabled") - inheritedStats = Some(planToCache.stats) - } - // If the cached column buffers were not passed in, we calculate them in the constructor. // As in Spark, the actual work of caching is lazy. if (_cachedColumnBuffers == null) { From 898f1b5606745cd9b8bf90cc77c452440c534a55 Mon Sep 17 00:00:00 2001 From: CodingCat Date: Tue, 5 Dec 2017 09:35:49 -0800 Subject: [PATCH 08/18] fix the stylistic issue --- .../main/scala/org/apache/spark/sql/execution/CacheManager.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala index 1bbf315244ed4..c34f65234a489 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala @@ -22,6 +22,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock import scala.collection.JavaConverters._ import org.apache.hadoop.fs.{FileSystem, Path} + import org.apache.spark.internal.Logging import org.apache.spark.sql.{Dataset, SparkSession} import org.apache.spark.sql.catalyst.expressions.SubqueryExpression From 4c347014f32aa3c3088f8a18b08786fb074a1790 Mon Sep 17 00:00:00 2001 From: CodingCat Date: Tue, 5 Dec 2017 10:17:06 -0800 Subject: [PATCH 09/18] fix compilation issue --- .../apache/spark/sql/execution/columnar/InMemoryRelation.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala index b1e0ffafb90fa..9b684e2a914ac 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala @@ -38,7 +38,7 @@ object InMemoryRelation { storageLevel: StorageLevel, child: SparkPlan, tableName: Option[String], - stats: Option[Statistics]): InMemoryRelation = + stats: Option[Statistics] = None): InMemoryRelation = new InMemoryRelation(child.output, useCompression, batchSize, storageLevel, child, tableName)( statsOfPlanToCache = stats) } From 8f912a3ba18ec5b07eab25df7b84500db2531566 Mon Sep 17 00:00:00 2001 From: CodingCat Date: Tue, 5 Dec 2017 10:57:56 -0800 Subject: [PATCH 10/18] fix the stylistic issue --- .../sql/execution/columnar/InMemoryRelation.scala | 2 +- .../columnar/InMemoryColumnarQuerySuite.scala | 13 +++++++------ 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala index 9b684e2a914ac..b1e0ffafb90fa 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala @@ -38,7 +38,7 @@ object InMemoryRelation { storageLevel: StorageLevel, child: SparkPlan, tableName: Option[String], - stats: Option[Statistics] = None): InMemoryRelation = + stats: Option[Statistics]): InMemoryRelation = new InMemoryRelation(child.output, useCompression, batchSize, storageLevel, child, tableName)( statsOfPlanToCache = stats) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala index e662e294228db..3a08d77f6b982 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala @@ -40,7 +40,8 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext { data.createOrReplaceTempView(s"testData$dataType") val storageLevel = MEMORY_ONLY val plan = spark.sessionState.executePlan(data.logicalPlan).sparkPlan - val inMemoryRelation = InMemoryRelation(useCompression = true, 5, storageLevel, plan, None) + val inMemoryRelation = InMemoryRelation(useCompression = true, 5, storageLevel, plan, None, + None) assert(inMemoryRelation.cachedColumnBuffers.getStorageLevel == storageLevel) inMemoryRelation.cachedColumnBuffers.collect().head match { @@ -116,7 +117,7 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext { test("simple columnar query") { val plan = spark.sessionState.executePlan(testData.logicalPlan).sparkPlan - val scan = InMemoryRelation(useCompression = true, 5, MEMORY_ONLY, plan, None) + val scan = InMemoryRelation(useCompression = true, 5, MEMORY_ONLY, plan, None, None) checkAnswer(scan, testData.collect().toSeq) } @@ -133,7 +134,7 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext { test("projection") { val plan = spark.sessionState.executePlan(testData.select('value, 'key).logicalPlan).sparkPlan - val scan = InMemoryRelation(useCompression = true, 5, MEMORY_ONLY, plan, None) + val scan = InMemoryRelation(useCompression = true, 5, MEMORY_ONLY, plan, None, None) checkAnswer(scan, testData.collect().map { case Row(key: Int, value: String) => value -> key @@ -149,7 +150,7 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext { test("SPARK-1436 regression: in-memory columns must be able to be accessed multiple times") { val plan = spark.sessionState.executePlan(testData.logicalPlan).sparkPlan - val scan = InMemoryRelation(useCompression = true, 5, MEMORY_ONLY, plan, None) + val scan = InMemoryRelation(useCompression = true, 5, MEMORY_ONLY, plan, None, None) checkAnswer(scan, testData.collect().toSeq) checkAnswer(scan, testData.collect().toSeq) @@ -323,7 +324,7 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext { test("SPARK-17549: cached table size should be correctly calculated") { val data = spark.sparkContext.parallelize(1 to 10, 5).toDF() val plan = spark.sessionState.executePlan(data.logicalPlan).sparkPlan - val cached = InMemoryRelation(true, 5, MEMORY_ONLY, plan, None) + val cached = InMemoryRelation(true, 5, MEMORY_ONLY, plan, None, None) // Materialize the data. val expectedAnswer = data.collect() @@ -449,7 +450,7 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext { test("SPARK-22249: buildFilter should not throw exception when In contains an empty list") { val attribute = AttributeReference("a", IntegerType)() val testRelation = InMemoryRelation(false, 1, MEMORY_ONLY, - LocalTableScanExec(Seq(attribute), Nil), None) + LocalTableScanExec(Seq(attribute), Nil), None, None) val tableScanExec = InMemoryTableScanExec(Seq(attribute), Seq(In(attribute, Nil)), testRelation) assert(tableScanExec.partitionFilters.isEmpty) From 064f6d1c36fdf96ab4f4a0eb7b7c911f4eff0b81 Mon Sep 17 00:00:00 2001 From: CodingCat Date: Tue, 5 Dec 2017 13:44:56 -0800 Subject: [PATCH 11/18] added missing args --- .../spark/sql/execution/columnar/InMemoryRelation.scala | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala index b1e0ffafb90fa..6750599a9bb0a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala @@ -144,7 +144,7 @@ case class InMemoryRelation( def withOutput(newOutput: Seq[Attribute]): InMemoryRelation = { InMemoryRelation( newOutput, useCompression, batchSize, storageLevel, child, tableName)( - _cachedColumnBuffers, batchStats) + _cachedColumnBuffers, batchStats, statsOfPlanToCache) } override def newInstance(): this.type = { @@ -156,11 +156,12 @@ case class InMemoryRelation( child, tableName)( _cachedColumnBuffers, - batchStats).asInstanceOf[this.type] + batchStats, + statsOfPlanToCache).asInstanceOf[this.type] } def cachedColumnBuffers: RDD[CachedBatch] = _cachedColumnBuffers override protected def otherCopyArgs: Seq[AnyRef] = - Seq(_cachedColumnBuffers, batchStats) + Seq(_cachedColumnBuffers, batchStats, statsOfPlanToCache) } From 385edc0fb0f792522a89006db5e4e5a381fd9cbc Mon Sep 17 00:00:00 2001 From: CodingCat Date: Wed, 6 Dec 2017 09:07:18 -0800 Subject: [PATCH 12/18] address comments and added tests --- .../spark/sql/execution/CacheManager.scala | 4 +-- .../execution/columnar/InMemoryRelation.scala | 7 ++-- .../columnar/InMemoryColumnarQuerySuite.scala | 32 ++++++++++++++++++- 3 files changed, 38 insertions(+), 5 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala index c34f65234a489..9f02d0e3d0e0b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala @@ -81,7 +81,7 @@ class CacheManager extends Logging { } private def extractStatsOfPlanForCache(plan: LogicalPlan): Option[Statistics] = { - if (plan.conf.cboEnabled && plan.stats.rowCount.isDefined) { + if (plan.stats.rowCount.isDefined) { Some(plan.stats) } else { None @@ -156,7 +156,7 @@ class CacheManager extends Logging { storageLevel = cd.cachedRepresentation.storageLevel, child = spark.sessionState.executePlan(cd.plan).executedPlan, tableName = cd.cachedRepresentation.tableName, - stats = extractStatsOfPlanForCache(cd.plan)) + statsOfPlanToCache = extractStatsOfPlanForCache(cd.plan)) needToRecache += cd.copy(cachedRepresentation = newCache) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala index 6750599a9bb0a..fb32d984bce62 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala @@ -38,9 +38,9 @@ object InMemoryRelation { storageLevel: StorageLevel, child: SparkPlan, tableName: Option[String], - stats: Option[Statistics]): InMemoryRelation = + statsOfPlanToCache: Option[Statistics]): InMemoryRelation = new InMemoryRelation(child.output, useCompression, batchSize, storageLevel, child, tableName)( - statsOfPlanToCache = stats) + statsOfPlanToCache = statsOfPlanToCache) } @@ -73,7 +73,10 @@ case class InMemoryRelation( @transient val partitionStatistics = new PartitionStatistics(output) override def computeStats(): Statistics = { + // scalastyle:off if (batchStats.value == 0L) { + // Underlying columnar RDD hasn't been materialized, use the stats from the plan to cache when + // applicable statsOfPlanToCache.getOrElse(Statistics(sizeInBytes = child.sqlContext.conf.defaultSizeInBytes)) } else { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala index 3a08d77f6b982..e1fd1ad99e45f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala @@ -20,7 +20,8 @@ package org.apache.spark.sql.execution.columnar import java.nio.charset.StandardCharsets import java.sql.{Date, Timestamp} -import org.apache.spark.sql.{DataFrame, QueryTest, Row} +import org.apache.spark.sql.{DataFrame, QueryTest, Row, SparkSession} +import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.expressions.{AttributeReference, AttributeSet, In} import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning import org.apache.spark.sql.execution.{FilterExec, LocalTableScanExec, WholeStageCodegenExec} @@ -30,6 +31,7 @@ import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.test.SQLTestData._ import org.apache.spark.sql.types._ import org.apache.spark.storage.StorageLevel._ +import org.apache.spark.util.Utils class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext { import testImplicits._ @@ -480,4 +482,32 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext { } } } + + test("SPARK-22673: InMemoryRelation should utilize existing stats whenever possible") { + withSQLConf("spark.sql.cbo.enabled" -> "true") { + // scalastyle:off + val workDir = s"${Utils.createTempDir()}/table1" + val data = Seq(100, 200, 300, 400).toDF("count") + data.write.parquet(workDir) + val dfFromFile = spark.read.parquet(workDir).cache() + val inMemoryRelation = dfFromFile.queryExecution.optimizedPlan.collect { + case plan: InMemoryRelation => plan + }.head + // InMemoryRelation's stats is Long.MaxValue before the underlying RDD is materialized + assert(inMemoryRelation.computeStats().sizeInBytes === Long.MaxValue) + // InMemoryRelation's stats is updated after materializing RDD + dfFromFile.collect() + assert(inMemoryRelation.computeStats().sizeInBytes === 16) + // test of catalog table + val dfFromTable = spark.catalog.createTable("table1", workDir).cache() + val inMemoryRelation2 = dfFromTable.queryExecution.optimizedPlan. + collect { case plan: InMemoryRelation => plan }.head + // Even CBO enabled, InMemoryRelation's stats keeps as the default one before table's stats + // is calculated + assert(inMemoryRelation2.computeStats().sizeInBytes === Long.MaxValue) + // InMemoryRelation's stats should be updated after calculating stats of the table + spark.sql("ANALYZE TABLE table1 COMPUTE STATISTICS") + assert(inMemoryRelation2.computeStats().sizeInBytes === 16) + } + } } From fbb372909e18cc75b8e04896779f78b79f612c84 Mon Sep 17 00:00:00 2001 From: CodingCat Date: Wed, 6 Dec 2017 14:42:55 -0800 Subject: [PATCH 13/18] cleaning --- .../apache/spark/sql/execution/columnar/InMemoryRelation.scala | 1 - .../sql/execution/columnar/InMemoryColumnarQuerySuite.scala | 3 +-- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala index fb32d984bce62..48787d0f8fd4e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala @@ -73,7 +73,6 @@ case class InMemoryRelation( @transient val partitionStatistics = new PartitionStatistics(output) override def computeStats(): Statistics = { - // scalastyle:off if (batchStats.value == 0L) { // Underlying columnar RDD hasn't been materialized, use the stats from the plan to cache when // applicable diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala index e1fd1ad99e45f..a5925e3172605 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala @@ -20,8 +20,7 @@ package org.apache.spark.sql.execution.columnar import java.nio.charset.StandardCharsets import java.sql.{Date, Timestamp} -import org.apache.spark.sql.{DataFrame, QueryTest, Row, SparkSession} -import org.apache.spark.sql.catalyst.catalog._ +import org.apache.spark.sql.{DataFrame, QueryTest, Row} import org.apache.spark.sql.catalyst.expressions.{AttributeReference, AttributeSet, In} import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning import org.apache.spark.sql.execution.{FilterExec, LocalTableScanExec, WholeStageCodegenExec} From 6299f3de8496d7c8eb5d11f484010392a1712395 Mon Sep 17 00:00:00 2001 From: CodingCat Date: Thu, 7 Dec 2017 09:23:22 -0800 Subject: [PATCH 14/18] address the comments --- .../sql/execution/columnar/InMemoryColumnarQuerySuite.scala | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala index a5925e3172605..5d365f663a7ca 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala @@ -484,7 +484,6 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext { test("SPARK-22673: InMemoryRelation should utilize existing stats whenever possible") { withSQLConf("spark.sql.cbo.enabled" -> "true") { - // scalastyle:off val workDir = s"${Utils.createTempDir()}/table1" val data = Seq(100, 200, 300, 400).toDF("count") data.write.parquet(workDir) @@ -494,16 +493,20 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext { }.head // InMemoryRelation's stats is Long.MaxValue before the underlying RDD is materialized assert(inMemoryRelation.computeStats().sizeInBytes === Long.MaxValue) + // InMemoryRelation's stats is updated after materializing RDD dfFromFile.collect() assert(inMemoryRelation.computeStats().sizeInBytes === 16) + // test of catalog table val dfFromTable = spark.catalog.createTable("table1", workDir).cache() val inMemoryRelation2 = dfFromTable.queryExecution.optimizedPlan. collect { case plan: InMemoryRelation => plan }.head + // Even CBO enabled, InMemoryRelation's stats keeps as the default one before table's stats // is calculated assert(inMemoryRelation2.computeStats().sizeInBytes === Long.MaxValue) + // InMemoryRelation's stats should be updated after calculating stats of the table spark.sql("ANALYZE TABLE table1 COMPUTE STATISTICS") assert(inMemoryRelation2.computeStats().sizeInBytes === 16) From 0fde46e3b272264ae2875793233b207bd531ca0b Mon Sep 17 00:00:00 2001 From: CodingCat Date: Thu, 7 Dec 2017 10:07:05 -0800 Subject: [PATCH 15/18] stylistic fix --- .../sql/execution/columnar/InMemoryColumnarQuerySuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala index 5d365f663a7ca..a1fd41900db59 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala @@ -506,7 +506,7 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext { // Even CBO enabled, InMemoryRelation's stats keeps as the default one before table's stats // is calculated assert(inMemoryRelation2.computeStats().sizeInBytes === Long.MaxValue) - + // InMemoryRelation's stats should be updated after calculating stats of the table spark.sql("ANALYZE TABLE table1 COMPUTE STATISTICS") assert(inMemoryRelation2.computeStats().sizeInBytes === 16) From 4b2fcb6a9fd61fe771ab323f41935541b0223bdf Mon Sep 17 00:00:00 2001 From: CodingCat Date: Thu, 14 Dec 2017 19:27:04 -0800 Subject: [PATCH 16/18] use file size as the stats --- .../spark/sql/execution/CacheManager.scala | 12 ++------ .../execution/columnar/InMemoryRelation.scala | 10 +++---- .../columnar/InMemoryColumnarQuerySuite.scala | 30 +++++++++++-------- 3 files changed, 23 insertions(+), 29 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala index 9f02d0e3d0e0b..b05fe49a6ac3b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala @@ -80,14 +80,6 @@ class CacheManager extends Logging { cachedData.isEmpty } - private def extractStatsOfPlanForCache(plan: LogicalPlan): Option[Statistics] = { - if (plan.stats.rowCount.isDefined) { - Some(plan.stats) - } else { - None - } - } - /** * Caches the data produced by the logical representation of the given [[Dataset]]. * Unlike `RDD.cache()`, the default storage level is set to be `MEMORY_AND_DISK` because @@ -107,7 +99,7 @@ class CacheManager extends Logging { sparkSession.sessionState.conf.columnBatchSize, storageLevel, sparkSession.sessionState.executePlan(planToCache).executedPlan, tableName, - extractStatsOfPlanForCache(planToCache)) + planToCache.stats) cachedData.add(CachedData(planToCache, inMemoryRelation)) } } @@ -156,7 +148,7 @@ class CacheManager extends Logging { storageLevel = cd.cachedRepresentation.storageLevel, child = spark.sessionState.executePlan(cd.plan).executedPlan, tableName = cd.cachedRepresentation.tableName, - statsOfPlanToCache = extractStatsOfPlanForCache(cd.plan)) + statsOfPlanToCache = cd.plan.stats) needToRecache += cd.copy(cachedRepresentation = newCache) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala index 48787d0f8fd4e..51928d914841e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala @@ -38,7 +38,7 @@ object InMemoryRelation { storageLevel: StorageLevel, child: SparkPlan, tableName: Option[String], - statsOfPlanToCache: Option[Statistics]): InMemoryRelation = + statsOfPlanToCache: Statistics): InMemoryRelation = new InMemoryRelation(child.output, useCompression, batchSize, storageLevel, child, tableName)( statsOfPlanToCache = statsOfPlanToCache) } @@ -63,7 +63,7 @@ case class InMemoryRelation( tableName: Option[String])( @transient var _cachedColumnBuffers: RDD[CachedBatch] = null, val batchStats: LongAccumulator = child.sqlContext.sparkContext.longAccumulator, - statsOfPlanToCache: Option[Statistics] = None) + statsOfPlanToCache: Statistics = null) extends logical.LeafNode with MultiInstanceRelation { override protected def innerChildren: Seq[SparkPlan] = Seq(child) @@ -74,10 +74,8 @@ case class InMemoryRelation( override def computeStats(): Statistics = { if (batchStats.value == 0L) { - // Underlying columnar RDD hasn't been materialized, use the stats from the plan to cache when - // applicable - statsOfPlanToCache.getOrElse(Statistics(sizeInBytes = - child.sqlContext.conf.defaultSizeInBytes)) + // Underlying columnar RDD hasn't been materialized, use the stats from the plan to cache + statsOfPlanToCache } else { Statistics(sizeInBytes = batchStats.value.longValue) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala index a1fd41900db59..ac8d398df90e5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala @@ -42,7 +42,7 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext { val storageLevel = MEMORY_ONLY val plan = spark.sessionState.executePlan(data.logicalPlan).sparkPlan val inMemoryRelation = InMemoryRelation(useCompression = true, 5, storageLevel, plan, None, - None) + data.logicalPlan.stats) assert(inMemoryRelation.cachedColumnBuffers.getStorageLevel == storageLevel) inMemoryRelation.cachedColumnBuffers.collect().head match { @@ -118,7 +118,8 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext { test("simple columnar query") { val plan = spark.sessionState.executePlan(testData.logicalPlan).sparkPlan - val scan = InMemoryRelation(useCompression = true, 5, MEMORY_ONLY, plan, None, None) + val scan = InMemoryRelation(useCompression = true, 5, MEMORY_ONLY, plan, None, + testData.logicalPlan.stats) checkAnswer(scan, testData.collect().toSeq) } @@ -134,8 +135,10 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext { } test("projection") { - val plan = spark.sessionState.executePlan(testData.select('value, 'key).logicalPlan).sparkPlan - val scan = InMemoryRelation(useCompression = true, 5, MEMORY_ONLY, plan, None, None) + val logicalPlan = testData.select('value, 'key).logicalPlan + val plan = spark.sessionState.executePlan(logicalPlan).sparkPlan + val scan = InMemoryRelation(useCompression = true, 5, MEMORY_ONLY, plan, None, + logicalPlan.stats) checkAnswer(scan, testData.collect().map { case Row(key: Int, value: String) => value -> key @@ -151,7 +154,8 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext { test("SPARK-1436 regression: in-memory columns must be able to be accessed multiple times") { val plan = spark.sessionState.executePlan(testData.logicalPlan).sparkPlan - val scan = InMemoryRelation(useCompression = true, 5, MEMORY_ONLY, plan, None, None) + val scan = InMemoryRelation(useCompression = true, 5, MEMORY_ONLY, plan, None, + testData.logicalPlan.stats) checkAnswer(scan, testData.collect().toSeq) checkAnswer(scan, testData.collect().toSeq) @@ -325,7 +329,7 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext { test("SPARK-17549: cached table size should be correctly calculated") { val data = spark.sparkContext.parallelize(1 to 10, 5).toDF() val plan = spark.sessionState.executePlan(data.logicalPlan).sparkPlan - val cached = InMemoryRelation(true, 5, MEMORY_ONLY, plan, None, None) + val cached = InMemoryRelation(true, 5, MEMORY_ONLY, plan, None, data.logicalPlan.stats) // Materialize the data. val expectedAnswer = data.collect() @@ -450,8 +454,8 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext { test("SPARK-22249: buildFilter should not throw exception when In contains an empty list") { val attribute = AttributeReference("a", IntegerType)() - val testRelation = InMemoryRelation(false, 1, MEMORY_ONLY, - LocalTableScanExec(Seq(attribute), Nil), None, None) + val localTableScanExec = LocalTableScanExec(Seq(attribute), Nil) + val testRelation = InMemoryRelation(false, 1, MEMORY_ONLY, localTableScanExec, None, null) val tableScanExec = InMemoryTableScanExec(Seq(attribute), Seq(In(attribute, Nil)), testRelation) assert(tableScanExec.partitionFilters.isEmpty) @@ -482,7 +486,7 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext { } } - test("SPARK-22673: InMemoryRelation should utilize existing stats whenever possible") { + test("SPARK-22673: InMemoryRelation should utilize existing stats of the plan to be cached") { withSQLConf("spark.sql.cbo.enabled" -> "true") { val workDir = s"${Utils.createTempDir()}/table1" val data = Seq(100, 200, 300, 400).toDF("count") @@ -491,8 +495,8 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext { val inMemoryRelation = dfFromFile.queryExecution.optimizedPlan.collect { case plan: InMemoryRelation => plan }.head - // InMemoryRelation's stats is Long.MaxValue before the underlying RDD is materialized - assert(inMemoryRelation.computeStats().sizeInBytes === Long.MaxValue) + // InMemoryRelation's stats is file size before the underlying RDD is materialized + assert(inMemoryRelation.computeStats().sizeInBytes === 740) // InMemoryRelation's stats is updated after materializing RDD dfFromFile.collect() @@ -503,9 +507,9 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext { val inMemoryRelation2 = dfFromTable.queryExecution.optimizedPlan. collect { case plan: InMemoryRelation => plan }.head - // Even CBO enabled, InMemoryRelation's stats keeps as the default one before table's stats + // Even CBO enabled, InMemoryRelation's stats keeps as the file size before table's stats // is calculated - assert(inMemoryRelation2.computeStats().sizeInBytes === Long.MaxValue) + assert(inMemoryRelation2.computeStats().sizeInBytes === 740) // InMemoryRelation's stats should be updated after calculating stats of the table spark.sql("ANALYZE TABLE table1 COMPUTE STATISTICS") From b2d829b2ff30d03af681931260a78da579915624 Mon Sep 17 00:00:00 2001 From: CodingCat Date: Sun, 17 Dec 2017 14:18:41 -0800 Subject: [PATCH 17/18] address the issues in test --- .../columnar/InMemoryColumnarQuerySuite.scala | 60 +++++++++++-------- 1 file changed, 34 insertions(+), 26 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala index ac8d398df90e5..4bf986a0e9a5b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala @@ -488,32 +488,40 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext { test("SPARK-22673: InMemoryRelation should utilize existing stats of the plan to be cached") { withSQLConf("spark.sql.cbo.enabled" -> "true") { - val workDir = s"${Utils.createTempDir()}/table1" - val data = Seq(100, 200, 300, 400).toDF("count") - data.write.parquet(workDir) - val dfFromFile = spark.read.parquet(workDir).cache() - val inMemoryRelation = dfFromFile.queryExecution.optimizedPlan.collect { - case plan: InMemoryRelation => plan - }.head - // InMemoryRelation's stats is file size before the underlying RDD is materialized - assert(inMemoryRelation.computeStats().sizeInBytes === 740) - - // InMemoryRelation's stats is updated after materializing RDD - dfFromFile.collect() - assert(inMemoryRelation.computeStats().sizeInBytes === 16) - - // test of catalog table - val dfFromTable = spark.catalog.createTable("table1", workDir).cache() - val inMemoryRelation2 = dfFromTable.queryExecution.optimizedPlan. - collect { case plan: InMemoryRelation => plan }.head - - // Even CBO enabled, InMemoryRelation's stats keeps as the file size before table's stats - // is calculated - assert(inMemoryRelation2.computeStats().sizeInBytes === 740) - - // InMemoryRelation's stats should be updated after calculating stats of the table - spark.sql("ANALYZE TABLE table1 COMPUTE STATISTICS") - assert(inMemoryRelation2.computeStats().sizeInBytes === 16) + withTempDir { workDir => + withTable("table1") { + val workDirPath = workDir.getAbsolutePath + "/table1" + val data = Seq(100, 200, 300, 400).toDF("count") + data.write.parquet(workDirPath) + val dfFromFile = spark.read.parquet(workDirPath).cache() + val inMemoryRelation = dfFromFile.queryExecution.optimizedPlan.collect { + case plan: InMemoryRelation => plan + }.head + // InMemoryRelation's stats is file size before the underlying RDD is materialized + assert(inMemoryRelation.computeStats().sizeInBytes === 740) + + // InMemoryRelation's stats is updated after materializing RDD + dfFromFile.collect() + assert(inMemoryRelation.computeStats().sizeInBytes === 16) + + // test of catalog table + val dfFromTable = spark.catalog.createTable("table1", workDirPath).cache() + val inMemoryRelation2 = dfFromTable.queryExecution.optimizedPlan. + collect { case plan: InMemoryRelation => plan }.head + + // Even CBO enabled, InMemoryRelation's stats keeps as the file size before table's stats + // is calculated + assert(inMemoryRelation2.computeStats().sizeInBytes === 740) + + // InMemoryRelation's stats should be updated after calculating stats of the table + // clear cache to simulate a fresh environment + dfFromTable.unpersist(blocking = true) + spark.sql("ANALYZE TABLE table1 COMPUTE STATISTICS") + val inMemoryRelation3 = spark.read.table("table1").cache().queryExecution.optimizedPlan. + collect { case plan: InMemoryRelation => plan }.head + assert(inMemoryRelation3.computeStats().sizeInBytes === 48) + } + } } } } From de2905c36f933ccf896e80ef6e5b76f9cea9f4f2 Mon Sep 17 00:00:00 2001 From: Nan Zhu Date: Mon, 18 Dec 2017 08:55:22 -0800 Subject: [PATCH 18/18] change to withTempPath --- .../sql/execution/columnar/InMemoryColumnarQuerySuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala index 4bf986a0e9a5b..ff7c5e58e9863 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala @@ -488,9 +488,9 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext { test("SPARK-22673: InMemoryRelation should utilize existing stats of the plan to be cached") { withSQLConf("spark.sql.cbo.enabled" -> "true") { - withTempDir { workDir => + withTempPath { workDir => withTable("table1") { - val workDirPath = workDir.getAbsolutePath + "/table1" + val workDirPath = workDir.getAbsolutePath val data = Seq(100, 200, 300, 400).toDF("count") data.write.parquet(workDirPath) val dfFromFile = spark.read.parquet(workDirPath).cache()