From 51bae90c74bb211bd9ecaa5bf2c5f3dc6c193249 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Mon, 23 Jun 2014 10:48:59 -0700 Subject: [PATCH 1/2] Made cacheTable idempotent --- .../org/apache/spark/sql/SQLContext.scala | 15 +++++++++----- .../org/apache/spark/sql/SQLQuerySuite.scala | 20 +++++++++++++++++++ 2 files changed, 30 insertions(+), 5 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index c60af28b2a1f3..ca39eaf17a533 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -32,7 +32,7 @@ import org.apache.spark.sql.catalyst.types._ import org.apache.spark.sql.catalyst.optimizer.Optimizer import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.rules.RuleExecutor -import org.apache.spark.sql.columnar.InMemoryRelation +import org.apache.spark.sql.columnar.{InMemoryColumnarTableScan, InMemoryRelation} import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.SparkStrategies import org.apache.spark.sql.json._ @@ -187,10 +187,15 @@ class SQLContext(@transient val sparkContext: SparkContext) /** Caches the specified table in-memory. */ def cacheTable(tableName: String): Unit = { val currentTable = catalog.lookupRelation(None, tableName) - val useCompression = - sparkContext.conf.getBoolean("spark.sql.inMemoryColumnarStorage.compressed", false) - val asInMemoryRelation = - InMemoryRelation(useCompression, executePlan(currentTable).executedPlan) + val asInMemoryRelation = EliminateAnalysisOperators(currentTable.logicalPlan) match { + case _: InMemoryRelation => + currentTable.logicalPlan + + case _ => + val useCompression = + sparkContext.conf.getBoolean("spark.sql.inMemoryColumnarStorage.compressed", false) + InMemoryRelation(useCompression, executePlan(currentTable).executedPlan) + } catalog.registerTable(None, tableName, asInMemoryRelation) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index e9360b0fc7910..cca58c0063b38 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -17,7 +17,9 @@ package org.apache.spark.sql +import org.apache.spark.sql.catalyst.analysis.EliminateAnalysisOperators import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.columnar.{InMemoryColumnarTableScan, InMemoryRelation} import org.apache.spark.sql.test._ /* Implicits */ @@ -405,4 +407,22 @@ class SQLQuerySuite extends QueryTest { clear() } + test("SPARK-1669: cacheTable should be idempotent") { + assume(!table("testData").logicalPlan.isInstanceOf[InMemoryRelation]) + + cacheTable("testData") + EliminateAnalysisOperators(table("testData").logicalPlan) match { + case _: InMemoryRelation => + case _ => + fail("testData should be cached") + } + + cacheTable("testData") + EliminateAnalysisOperators(table("testData").logicalPlan) match { + case InMemoryRelation(_, _, _: InMemoryColumnarTableScan) => + fail("cacheTable is not idempotent") + + case _ => + } + } } From 68f8a2032c9848798ced1dd9a8326d18c1a79be5 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Mon, 23 Jun 2014 10:57:27 -0700 Subject: [PATCH 2/2] Removed an unused import --- sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index ca39eaf17a533..0bcfbf6f849e4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -32,7 +32,7 @@ import org.apache.spark.sql.catalyst.types._ import org.apache.spark.sql.catalyst.optimizer.Optimizer import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.rules.RuleExecutor -import org.apache.spark.sql.columnar.{InMemoryColumnarTableScan, InMemoryRelation} +import org.apache.spark.sql.columnar.InMemoryRelation import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.SparkStrategies import org.apache.spark.sql.json._