Skip to content

Commit 51bae90

Browse files
committed
Made cacheTable idempotent
1 parent b88238f commit 51bae90

File tree

2 files changed

+30
-5
lines changed

2 files changed

+30
-5
lines changed

sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ import org.apache.spark.sql.catalyst.types._
3232
import org.apache.spark.sql.catalyst.optimizer.Optimizer
3333
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
3434
import org.apache.spark.sql.catalyst.rules.RuleExecutor
35-
import org.apache.spark.sql.columnar.InMemoryRelation
35+
import org.apache.spark.sql.columnar.{InMemoryColumnarTableScan, InMemoryRelation}
3636
import org.apache.spark.sql.execution._
3737
import org.apache.spark.sql.execution.SparkStrategies
3838
import org.apache.spark.sql.json._
@@ -187,10 +187,15 @@ class SQLContext(@transient val sparkContext: SparkContext)
187187
/** Caches the specified table in-memory. */
188188
def cacheTable(tableName: String): Unit = {
189189
val currentTable = catalog.lookupRelation(None, tableName)
190-
val useCompression =
191-
sparkContext.conf.getBoolean("spark.sql.inMemoryColumnarStorage.compressed", false)
192-
val asInMemoryRelation =
193-
InMemoryRelation(useCompression, executePlan(currentTable).executedPlan)
190+
val asInMemoryRelation = EliminateAnalysisOperators(currentTable.logicalPlan) match {
191+
case _: InMemoryRelation =>
192+
currentTable.logicalPlan
193+
194+
case _ =>
195+
val useCompression =
196+
sparkContext.conf.getBoolean("spark.sql.inMemoryColumnarStorage.compressed", false)
197+
InMemoryRelation(useCompression, executePlan(currentTable).executedPlan)
198+
}
194199

195200
catalog.registerTable(None, tableName, asInMemoryRelation)
196201
}

sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,9 @@
1717

1818
package org.apache.spark.sql
1919

20+
import org.apache.spark.sql.catalyst.analysis.EliminateAnalysisOperators
2021
import org.apache.spark.sql.catalyst.expressions._
22+
import org.apache.spark.sql.columnar.{InMemoryColumnarTableScan, InMemoryRelation}
2123
import org.apache.spark.sql.test._
2224

2325
/* Implicits */
@@ -405,4 +407,22 @@ class SQLQuerySuite extends QueryTest {
405407
clear()
406408
}
407409

410+
test("SPARK-1669: cacheTable should be idempotent") {
411+
assume(!table("testData").logicalPlan.isInstanceOf[InMemoryRelation])
412+
413+
cacheTable("testData")
414+
EliminateAnalysisOperators(table("testData").logicalPlan) match {
415+
case _: InMemoryRelation =>
416+
case _ =>
417+
fail("testData should be cached")
418+
}
419+
420+
cacheTable("testData")
421+
EliminateAnalysisOperators(table("testData").logicalPlan) match {
422+
case InMemoryRelation(_, _, _: InMemoryColumnarTableScan) =>
423+
fail("cacheTable is not idempotent")
424+
425+
case _ =>
426+
}
427+
}
408428
}

0 commit comments

Comments
 (0)