@@ -20,7 +20,8 @@ package org.apache.spark.sql.execution.columnar
2020import java .nio .charset .StandardCharsets
2121import java .sql .{Date , Timestamp }
2222
23- import org .apache .spark .sql .{DataFrame , QueryTest , Row }
23+ import org .apache .spark .sql .{DataFrame , QueryTest , Row , SparkSession }
24+ import org .apache .spark .sql .catalyst .catalog ._
2425import org .apache .spark .sql .catalyst .expressions .{AttributeReference , AttributeSet , In }
2526import org .apache .spark .sql .catalyst .plans .physical .HashPartitioning
2627import org .apache .spark .sql .execution .{FilterExec , LocalTableScanExec , WholeStageCodegenExec }
@@ -30,6 +31,7 @@ import org.apache.spark.sql.test.SharedSQLContext
3031import org .apache .spark .sql .test .SQLTestData ._
3132import org .apache .spark .sql .types ._
3233import org .apache .spark .storage .StorageLevel ._
34+ import org .apache .spark .util .Utils
3335
3436class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext {
3537 import testImplicits ._
@@ -480,4 +482,32 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext {
480482 }
481483 }
482484 }
485+
486+ test(" SPARK-22673: InMemoryRelation should utilize existing stats whenever possible" ) {
487+ withSQLConf(" spark.sql.cbo.enabled" -> " true" ) {
488+ // scalastyle:off
489+ val workDir = s " ${Utils .createTempDir()}/table1 "
490+ val data = Seq (100 , 200 , 300 , 400 ).toDF(" count" )
491+ data.write.parquet(workDir)
492+ val dfFromFile = spark.read.parquet(workDir).cache()
493+ val inMemoryRelation = dfFromFile.queryExecution.optimizedPlan.collect {
494+ case plan : InMemoryRelation => plan
495+ }.head
496+ // InMemoryRelation's stats is Long.MaxValue before the underlying RDD is materialized
497+ assert(inMemoryRelation.computeStats().sizeInBytes === Long .MaxValue )
498+ // InMemoryRelation's stats is updated after materializing RDD
499+ dfFromFile.collect()
500+ assert(inMemoryRelation.computeStats().sizeInBytes === 16 )
501+ // test of catalog table
502+ val dfFromTable = spark.catalog.createTable(" table1" , workDir).cache()
503+ val inMemoryRelation2 = dfFromTable.queryExecution.optimizedPlan.
504+ collect { case plan : InMemoryRelation => plan }.head
505+ // Even CBO enabled, InMemoryRelation's stats keeps as the default one before table's stats
506+ // is calculated
507+ assert(inMemoryRelation2.computeStats().sizeInBytes === Long .MaxValue )
508+ // InMemoryRelation's stats should be updated after calculating stats of the table
509+ spark.sql(" ANALYZE TABLE table1 COMPUTE STATISTICS" )
510+ assert(inMemoryRelation2.computeStats().sizeInBytes === 16 )
511+ }
512+ }
483513}
0 commit comments