From f733f90325b975973e60272ba6708dff5059f9dd Mon Sep 17 00:00:00 2001 From: Dilip Biswal Date: Fri, 6 Jan 2017 16:18:23 -0800 Subject: [PATCH 1/3] [SPARK-19093] Cached tables are not used in SubqueryExpression --- .../spark/sql/execution/CacheManager.scala | 7 ++ .../apache/spark/sql/CachedTableSuite.scala | 80 +++++++++++++++++++ 2 files changed, 87 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala index 526623a36d2a..99aeadc712cf 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala @@ -22,6 +22,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.expressions.SubqueryExpression import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.Dataset import org.apache.spark.sql.execution.columnar.InMemoryRelation @@ -131,6 +132,12 @@ class CacheManager extends Logging { /** Replaces segments of the given logical plan with cached versions where possible. */ def useCachedData(plan: LogicalPlan): LogicalPlan = { + useCachedDataInternal(plan) transformAllExpressions { + case s: SubqueryExpression => s.withNewPlan(useCachedData(s.plan)) + } + } + + private def useCachedDataInternal(plan: LogicalPlan): LogicalPlan = { plan transformDown { case currentFragment => lookupCachedData(currentFragment) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala index f42402e1cc7d..1dc398029a4e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala @@ -24,6 +24,8 @@ import scala.language.postfixOps import org.scalatest.concurrent.Eventually._ import org.apache.spark.CleanerListener +import org.apache.spark.sql.catalyst.expressions.{Expression, SubqueryExpression} +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.RDDScanExec import org.apache.spark.sql.execution.columnar._ import org.apache.spark.sql.execution.exchange.ShuffleExchange @@ -565,4 +567,82 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext case i: InMemoryRelation => i }.size == 1) } + + test("SPARK-19093 Caching in side subquery") { + withTempView("t1") { + Seq(1).toDF("c1").createOrReplaceTempView("t1") + spark.catalog.cacheTable("t1") + val cachedPlan = + sql( + """ + |SELECT * FROM t1 + |WHERE + |NOT EXISTS (SELECT * FROM t1) + """.stripMargin).queryExecution.optimizedPlan + assert( + cachedPlan.collect { + case i: InMemoryRelation => i + }.size == 2) + spark.catalog.uncacheTable("t1") + } + } + + test("SPARK-19093 scalar and nested predicate query") { + def getCachedPlans(plan: LogicalPlan): Seq[LogicalPlan] = { + plan collect { + case i: InMemoryRelation => i + } + } + withTempView("t1", "t2", "t3", "t4") { + Seq(1).toDF("c1").createOrReplaceTempView("t1") + Seq(2).toDF("c1").createOrReplaceTempView("t2") + Seq(1).toDF("c1").createOrReplaceTempView("t3") + Seq(1).toDF("c1").createOrReplaceTempView("t4") + spark.catalog.cacheTable("t1") + spark.catalog.cacheTable("t2") + spark.catalog.cacheTable("t3") + spark.catalog.cacheTable("t4") + + // Nested predicate subquery + val cachedPlan = + sql( + """ + |SELECT * FROM t1 + |WHERE + |c1 IN (SELECT c1 FROM t2 WHERE c1 IN (SELECT c1 FROM t3 WHERE c1 = 1)) + """.stripMargin).queryExecution.optimizedPlan + + assert( + cachedPlan.collect { + case i: InMemoryRelation => i + }.size == 3) + + // Scalar subquery and predicate subquery + val cachedPlan2 = + sql( + """ + |SELECT * FROM (SELECT max(c1) FROM t1 GROUP BY c1) + |WHERE + |c1 = (SELECT max(c1) FROM t2 GROUP BY c1) + |OR + |EXISTS (SELECT c1 FROM t3) + |OR + |c1 IN (SELECT c1 FROM t4) + """.stripMargin).queryExecution.optimizedPlan + + + val cachedRelations = scala.collection.mutable.MutableList.empty[Seq[LogicalPlan]] + cachedRelations += getCachedPlans(cachedPlan2) + cachedPlan2 transformAllExpressions { + case e: SubqueryExpression => cachedRelations += getCachedPlans(e.plan) + e + } + assert(cachedRelations.flatten.size == 4) + + spark.catalog.uncacheTable("t1") + spark.catalog.uncacheTable("t2") + spark.catalog.uncacheTable("t3") + spark.catalog.uncacheTable("t4") + } + } } From f9f0b01e5cf6e8a6a212324686e82b0c4bf1b5fc Mon Sep 17 00:00:00 2001 From: Dilip Biswal Date: Fri, 6 Jan 2017 19:18:48 -0800 Subject: [PATCH 2/3] Review comments --- .../spark/sql/execution/CacheManager.scala | 12 +++---- .../apache/spark/sql/CachedTableSuite.scala | 33 ++++++++----------- 2 files changed, 19 insertions(+), 26 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala index 99aeadc712cf..4ca134700857 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala @@ -132,18 +132,16 @@ class CacheManager extends Logging { /** Replaces segments of the given logical plan with cached versions where possible. */ def useCachedData(plan: LogicalPlan): LogicalPlan = { - useCachedDataInternal(plan) transformAllExpressions { - case s: SubqueryExpression => s.withNewPlan(useCachedData(s.plan)) - } - } - - private def useCachedDataInternal(plan: LogicalPlan): LogicalPlan = { - plan transformDown { + val newPlan = plan transformDown { case currentFragment => lookupCachedData(currentFragment) .map(_.cachedRepresentation.withOutput(currentFragment.output)) .getOrElse(currentFragment) } + + newPlan transformAllExpressions { + case s: SubqueryExpression => s.withNewPlan(useCachedData(s.plan)) + } } /** diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala index 1dc398029a4e..5ef63d1c8564 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala @@ -55,6 +55,16 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext maybeBlock.nonEmpty } + private def getNumInMemoryRelations(plan: LogicalPlan): Int = { + var sum = plan.collect { case _: InMemoryRelation => 1 }.sum + plan.transformAllExpressions { + case e: SubqueryExpression => + sum += getNumInMemoryRelations(e.plan) + e + } + sum + } + test("withColumn doesn't invalidate cached dataframe") { var evalCount = 0 val myUDF = udf((x: String) => { evalCount += 1; "result" }) @@ -588,11 +598,8 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext } test("SPARK-19093 scalar and nested predicate query") { - def getCachedPlans(plan: LogicalPlan): Seq[LogicalPlan] = { - plan collect { - case i: InMemoryRelation => i - } - } + + withTempView("t1", "t2", "t3", "t4") { Seq(1).toDF("c1").createOrReplaceTempView("t1") Seq(2).toDF("c1").createOrReplaceTempView("t2") @@ -611,11 +618,7 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext |WHERE |c1 IN (SELECT c1 FROM t2 WHERE c1 IN (SELECT c1 FROM t3 WHERE c1 = 1)) """.stripMargin).queryExecution.optimizedPlan - - assert( - cachedPlan.collect { - case i: InMemoryRelation => i - }.size == 3) + assert (getNumInMemoryRelations(cachedPlan) == 3) // Scalar subquery and predicate subquery val cachedPlan2 = @@ -629,15 +632,7 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext |OR |c1 IN (SELECT c1 FROM t4) """.stripMargin).queryExecution.optimizedPlan - - - val cachedRelations = scala.collection.mutable.MutableList.empty[Seq[LogicalPlan]] - cachedRelations += getCachedPlans(cachedPlan2) - cachedPlan2 transformAllExpressions { - case e: SubqueryExpression => cachedRelations += getCachedPlans(e.plan) - e - } - assert(cachedRelations.flatten.size == 4) + assert (getNumInMemoryRelations(cachedPlan2) == 4) spark.catalog.uncacheTable("t1") spark.catalog.uncacheTable("t2") From 3c779d59fa54f9ed62a3ebef260b097695c0eff1 Mon Sep 17 00:00:00 2001 From: Dilip Biswal Date: Fri, 6 Jan 2017 21:22:48 -0800 Subject: [PATCH 3/3] Review comments --- .../apache/spark/sql/CachedTableSuite.scala | 25 ++++++++----------- 1 file changed, 11 insertions(+), 14 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala index 5ef63d1c8564..fb4812adf108 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala @@ -39,6 +39,14 @@ private case class BigData(s: String) class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext { import testImplicits._ + override def afterEach(): Unit = { + try { + spark.catalog.clearCache() + } finally { + super.afterEach() + } + } + def rddIdOf(tableName: String): Int = { val plan = spark.table(tableName).queryExecution.sparkPlan plan.collect { @@ -589,17 +597,11 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext |WHERE |NOT EXISTS (SELECT * FROM t1) """.stripMargin).queryExecution.optimizedPlan - assert( - cachedPlan.collect { - case i: InMemoryRelation => i - }.size == 2) - spark.catalog.uncacheTable("t1") + assert(getNumInMemoryRelations(cachedPlan) == 2) } } test("SPARK-19093 scalar and nested predicate query") { - - withTempView("t1", "t2", "t3", "t4") { Seq(1).toDF("c1").createOrReplaceTempView("t1") Seq(2).toDF("c1").createOrReplaceTempView("t2") @@ -618,7 +620,7 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext |WHERE |c1 IN (SELECT c1 FROM t2 WHERE c1 IN (SELECT c1 FROM t3 WHERE c1 = 1)) """.stripMargin).queryExecution.optimizedPlan - assert (getNumInMemoryRelations(cachedPlan) == 3) + assert(getNumInMemoryRelations(cachedPlan) == 3) // Scalar subquery and predicate subquery val cachedPlan2 = @@ -632,12 +634,7 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext |OR |c1 IN (SELECT c1 FROM t4) """.stripMargin).queryExecution.optimizedPlan - assert (getNumInMemoryRelations(cachedPlan2) == 4) - - spark.catalog.uncacheTable("t1") - spark.catalog.uncacheTable("t2") - spark.catalog.uncacheTable("t3") - spark.catalog.uncacheTable("t4") + assert(getNumInMemoryRelations(cachedPlan2) == 4) } } }