Skip to content

Commit 0170070

Browse files
authored
Merge pull request #5 from CodingCat/SPARK-22673_2
Spark 22673 2
2 parents 064f6d1 + fbb3729 commit 0170070

File tree

3 files changed

+35
-4
lines changed

3 files changed

+35
-4
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ class CacheManager extends Logging {
8181
}
8282

8383
private def extractStatsOfPlanForCache(plan: LogicalPlan): Option[Statistics] = {
84-
if (plan.conf.cboEnabled && plan.stats.rowCount.isDefined) {
84+
if (plan.stats.rowCount.isDefined) {
8585
Some(plan.stats)
8686
} else {
8787
None
@@ -156,7 +156,7 @@ class CacheManager extends Logging {
156156
storageLevel = cd.cachedRepresentation.storageLevel,
157157
child = spark.sessionState.executePlan(cd.plan).executedPlan,
158158
tableName = cd.cachedRepresentation.tableName,
159-
stats = extractStatsOfPlanForCache(cd.plan))
159+
statsOfPlanToCache = extractStatsOfPlanForCache(cd.plan))
160160
needToRecache += cd.copy(cachedRepresentation = newCache)
161161
}
162162
}

sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,9 +38,9 @@ object InMemoryRelation {
3838
storageLevel: StorageLevel,
3939
child: SparkPlan,
4040
tableName: Option[String],
41-
stats: Option[Statistics]): InMemoryRelation =
41+
statsOfPlanToCache: Option[Statistics]): InMemoryRelation =
4242
new InMemoryRelation(child.output, useCompression, batchSize, storageLevel, child, tableName)(
43-
statsOfPlanToCache = stats)
43+
statsOfPlanToCache = statsOfPlanToCache)
4444
}
4545

4646

@@ -74,6 +74,8 @@ case class InMemoryRelation(
7474

7575
override def computeStats(): Statistics = {
7676
if (batchStats.value == 0L) {
77+
// Underlying columnar RDD hasn't been materialized, use the stats from the plan to cache when
78+
// applicable
7779
statsOfPlanToCache.getOrElse(Statistics(sizeInBytes =
7880
child.sqlContext.conf.defaultSizeInBytes))
7981
} else {

sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import org.apache.spark.sql.test.SharedSQLContext
3030
import org.apache.spark.sql.test.SQLTestData._
3131
import org.apache.spark.sql.types._
3232
import org.apache.spark.storage.StorageLevel._
33+
import org.apache.spark.util.Utils
3334

3435
class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext {
3536
import testImplicits._
@@ -480,4 +481,32 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext {
480481
}
481482
}
482483
}
484+
485+
test("SPARK-22673: InMemoryRelation should utilize existing stats whenever possible") {
486+
withSQLConf("spark.sql.cbo.enabled" -> "true") {
487+
// scalastyle:off
488+
val workDir = s"${Utils.createTempDir()}/table1"
489+
val data = Seq(100, 200, 300, 400).toDF("count")
490+
data.write.parquet(workDir)
491+
val dfFromFile = spark.read.parquet(workDir).cache()
492+
val inMemoryRelation = dfFromFile.queryExecution.optimizedPlan.collect {
493+
case plan: InMemoryRelation => plan
494+
}.head
495+
// InMemoryRelation's stats is Long.MaxValue before the underlying RDD is materialized
496+
assert(inMemoryRelation.computeStats().sizeInBytes === Long.MaxValue)
497+
// InMemoryRelation's stats is updated after materializing RDD
498+
dfFromFile.collect()
499+
assert(inMemoryRelation.computeStats().sizeInBytes === 16)
500+
// test of catalog table
501+
val dfFromTable = spark.catalog.createTable("table1", workDir).cache()
502+
val inMemoryRelation2 = dfFromTable.queryExecution.optimizedPlan.
503+
collect { case plan: InMemoryRelation => plan }.head
504+
// Even CBO enabled, InMemoryRelation's stats keeps as the default one before table's stats
505+
// is calculated
506+
assert(inMemoryRelation2.computeStats().sizeInBytes === Long.MaxValue)
507+
// InMemoryRelation's stats should be updated after calculating stats of the table
508+
spark.sql("ANALYZE TABLE table1 COMPUTE STATISTICS")
509+
assert(inMemoryRelation2.computeStats().sizeInBytes === 16)
510+
}
511+
}
483512
}

0 commit comments

Comments
 (0)