From 402d8e495d0fec01c3b7bb7fc8dcdf4efa56d1d2 Mon Sep 17 00:00:00 2001 From: xin Wu Date: Tue, 27 Oct 2015 23:26:19 -0700 Subject: [PATCH 1/6] [SPARK-11246] Table cache for Parquet broken in 1.5 --- .../sql/catalyst/plans/logical/basicOperators.scala | 6 ++++++ .../org/apache/spark/sql/hive/CachedTableSuite.scala | 11 +++++++++++ 2 files changed, 17 insertions(+) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala index d2d3db0a44484..9056c0af73296 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.catalyst.plans.logical +import org.apache.spark.sql.catalyst.analysis.EliminateSubQueries import org.apache.spark.sql.catalyst.encoders._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate.Utils @@ -388,6 +389,11 @@ case class Limit(limitExpr: Expression, child: LogicalPlan) extends UnaryNode { case class Subquery(alias: String, child: LogicalPlan) extends UnaryNode { override def output: Seq[Attribute] = child.output.map(_.withQualifiers(alias :: Nil)) + + override def sameResult(plan: LogicalPlan): Boolean = { + child sameResult(EliminateSubQueries(plan)) + } + } /** diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala index 9adb3780a2c55..5c2fc7d82ffbd 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.hive import java.io.File import org.apache.spark.sql.columnar.InMemoryColumnarTableScan +import org.apache.spark.sql.execution.datasources.parquet.ParquetRelation import org.apache.spark.sql.hive.test.TestHiveSingleton import org.apache.spark.sql.{AnalysisException, QueryTest, SaveMode} import org.apache.spark.storage.RDDBlockId @@ -203,4 +204,14 @@ class CachedTableSuite extends QueryTest with TestHiveSingleton { sql("DROP TABLE refreshTable") Utils.deleteRecursively(tempPath) } + + test("SPARK-11246 cache parquet table") { + sql("CREATE TABLE cachedTable STORED AS PARQUET AS SELECT 1") + + cacheTable("cachedTable") + val sparkPlan = sql("SELECT * FROM cachedTable").queryExecution.sparkPlan + assert(sparkPlan.collect { case e: InMemoryColumnarTableScan => e }.size === 1) + + sql("DROP TABLE cachedTable") + } } From 5ed1429dc598bb929d98a7447db05d73c6356ffe Mon Sep 17 00:00:00 2001 From: xin Wu Date: Wed, 28 Oct 2015 18:45:19 -0700 Subject: [PATCH 2/6] SPARK-11246 implement code review comments --- .../spark/sql/catalyst/plans/logical/basicOperators.scala | 5 ----- .../spark/sql/execution/datasources/LogicalRelation.scala | 6 +++++- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala index 9056c0af73296..43f471233dc6a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala @@ -389,11 +389,6 @@ case class Limit(limitExpr: Expression, child: LogicalPlan) extends UnaryNode { case class Subquery(alias: String, child: LogicalPlan) extends UnaryNode { override def output: Seq[Attribute] = child.output.map(_.withQualifiers(alias :: Nil)) - - override def sameResult(plan: LogicalPlan): Boolean = { - child sameResult(EliminateSubQueries(plan)) - } - } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala index 783252e0a297f..00e56e66a6fd7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala @@ -17,10 +17,13 @@ package org.apache.spark.sql.execution.datasources import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation -import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, AttributeReference} +import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics} +import org.apache.spark.sql.catalyst.trees.TreeNode import org.apache.spark.sql.sources.BaseRelation +import scala.collection.mutable.ArrayBuffer + /** * Used to link a [[BaseRelation]] in to a logical query plan. * @@ -61,6 +64,7 @@ case class LogicalRelation( case LogicalRelation(otherRelation, _) => relation == otherRelation case _ => false } + override lazy val cleanArgs: Seq[Any] = Seq(relation) @transient override lazy val statistics: Statistics = Statistics( sizeInBytes = BigInt(relation.sizeInBytes) From be82cb701334a5bdce0cf814a85d4d666f60b917 Mon Sep 17 00:00:00 2001 From: xin Wu Date: Wed, 28 Oct 2015 21:33:55 -0700 Subject: [PATCH 3/6] SPARK-11246 remove unnecessary import from LogicalRelation --- .../spark/sql/execution/datasources/LogicalRelation.scala | 3 --- 1 file changed, 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala index 00e56e66a6fd7..5ef184a219f11 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala @@ -19,11 +19,8 @@ package org.apache.spark.sql.execution.datasources import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics} -import org.apache.spark.sql.catalyst.trees.TreeNode import org.apache.spark.sql.sources.BaseRelation -import scala.collection.mutable.ArrayBuffer - /** * Used to link a [[BaseRelation]] in to a logical query plan. * From fb709542a42791fa1c617bb2c5e034459123ff2d Mon Sep 17 00:00:00 2001 From: xin Wu Date: Wed, 28 Oct 2015 21:46:14 -0700 Subject: [PATCH 4/6] SPARK-11246 implement code review comment --- .../spark/sql/execution/datasources/LogicalRelation.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala index 5ef184a219f11..e25bbba5c21b6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.execution.datasources import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation -import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, AttributeReference} import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics} import org.apache.spark.sql.sources.BaseRelation @@ -61,6 +61,7 @@ case class LogicalRelation( case LogicalRelation(otherRelation, _) => relation == otherRelation case _ => false } + override lazy val cleanArgs: Seq[Any] = Seq(relation) @transient override lazy val statistics: Statistics = Statistics( From 3f25d6a1b330ea2c1d0db4c9e9aeb09ce3c61311 Mon Sep 17 00:00:00 2001 From: xin Wu Date: Wed, 28 Oct 2015 22:17:00 -0700 Subject: [PATCH 5/6] SPARK-11246 implement code review comment --- .../spark/sql/catalyst/plans/logical/basicOperators.scala | 1 - .../spark/sql/execution/datasources/LogicalRelation.scala | 3 +++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala index 43f471233dc6a..d2d3db0a44484 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala @@ -17,7 +17,6 @@ package org.apache.spark.sql.catalyst.plans.logical -import org.apache.spark.sql.catalyst.analysis.EliminateSubQueries import org.apache.spark.sql.catalyst.encoders._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate.Utils diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala index e25bbba5c21b6..160fc2e29a75c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala @@ -62,6 +62,9 @@ case class LogicalRelation( case _ => false } + // When comparing two LogicalRelations from within LogicalPlan.sameResult, we only need + // LogicalRelation.cleanArgs to return Seq(relation), since expectedOutputAttribute's expId can be different + // but the relation is still the same. override lazy val cleanArgs: Seq[Any] = Seq(relation) @transient override lazy val statistics: Statistics = Statistics( From 88edd14e6cb6f25d15e83bcb8e24c3c3e5306ed8 Mon Sep 17 00:00:00 2001 From: xin Wu Date: Wed, 28 Oct 2015 22:37:33 -0700 Subject: [PATCH 6/6] SPARK-11246 implement code review comment --- .../spark/sql/execution/datasources/LogicalRelation.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala index 160fc2e29a75c..219dae88e515d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala @@ -63,8 +63,8 @@ case class LogicalRelation( } // When comparing two LogicalRelations from within LogicalPlan.sameResult, we only need - // LogicalRelation.cleanArgs to return Seq(relation), since expectedOutputAttribute's expId can be different - // but the relation is still the same. + // LogicalRelation.cleanArgs to return Seq(relation), since expectedOutputAttribute's + // expId can be different but the relation is still the same. override lazy val cleanArgs: Seq[Any] = Seq(relation) @transient override lazy val statistics: Statistics = Statistics(