From 5290e42d416897bea4914c7ef2472c7b8190ea0f Mon Sep 17 00:00:00 2001 From: Lianhui Wang Date: Sat, 9 Jul 2016 12:18:13 +0800 Subject: [PATCH 1/6] init commit --- .../spark/sql/execution/SparkPlan.scala | 5 +- .../apache/spark/sql/execution/subquery.scala | 57 +++++++++++++++++-- .../org/apache/spark/sql/SQLQuerySuite.scala | 27 +++++++++ .../spark/sql/execution/PlannerSuite.scala | 36 +++++++++++- 4 files changed, 119 insertions(+), 6 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala index 045ccc7bd6eae..a94ab9086c450 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala @@ -149,8 +149,11 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ * The list of subqueries are added to [[subqueryResults]]. */ protected def prepareSubqueries(): Unit = { - val allSubqueries = expressions.flatMap(_.collect {case e: ScalarSubquery => e}) + val allSubqueries = expressions.flatMap(_.collect { + case e: ScalarSubquery if !e.isExecuted => e + }).distinct allSubqueries.asInstanceOf[Seq[ScalarSubquery]].foreach { e => + e.updateExecutedState() val futureResult = Future { // Each subquery should return only one row (and one column). We take two here and throws // an exception later if the number of rows is greater than one. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala index 461d3010ada7e..6cbc0b0a38f8b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala @@ -17,14 +17,17 @@ package org.apache.spark.sql.execution +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer + import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.expressions import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.{Expression, ExprId, Literal, SubqueryExpression} +import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode} import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.types.DataType +import org.apache.spark.sql.types.{DataType, StructType} /** * A subquery that will return only one row and one column. @@ -50,6 +53,13 @@ case class ScalarSubquery( // the first column in first row from `query`. @volatile private var result: Any = null @volatile private var updated: Boolean = false + @volatile private var executed: Boolean = false + + def isExecuted: Boolean = executed + + def updateExecutedState() : Unit = { + executed = true + } def updateResult(v: Any): Unit = { result = v @@ -65,6 +75,31 @@ case class ScalarSubquery( require(updated, s"$this has not finished") Literal.create(result, dataType).doGenCode(ctx, ev) } + + override def equals(o: Any): Boolean = o match { + case other: ScalarSubquery => this.eq(other) + case _ => false + } + + override def hashCode: Int = exprId.hashCode() +} + +/** + * A wrapper for reused uncorrelated ScalarSubquery to avoid the re-computing for subqueries with + * the same "canonical" logical plan in a query, because uncorrelated subqueries with the same + * "canonical" logical plan always produce the same results. + */ +case class ReusedScalarSubquery( + exprId: ExprId, + child: ScalarSubquery) extends UnaryExpression { + + override def dataType: DataType = child.dataType + override def toString: String = s"ReusedSubquery#${exprId.id} ($child)" + + override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = + defineCodeGen(ctx, ev, c => c) + + protected override def nullSafeEval(input: Any): Any = input } /** @@ -72,10 +107,24 @@ case class ScalarSubquery( */ case class PlanSubqueries(sparkSession: SparkSession) extends Rule[SparkPlan] { def apply(plan: SparkPlan): SparkPlan = { + // Build a hash map using schema of subquery's logical plan to avoid O(N*N) sameResult calls. + val subqueryMap = mutable.HashMap[StructType, ArrayBuffer[(LogicalPlan, ScalarSubquery)]]() plan.transformAllExpressions { case subquery: expressions.ScalarSubquery => - val executedPlan = new QueryExecution(sparkSession, subquery.plan).executedPlan - ScalarSubquery(executedPlan, subquery.exprId) + val sameSchema = subqueryMap.getOrElseUpdate( + subquery.query.schema, ArrayBuffer[(LogicalPlan, ScalarSubquery)]()) + val samePlan = sameSchema.find { case (e, _) => + subquery.query.sameResult(e) + } + if (samePlan.isDefined) { + // Subqueries that have the same logical plan can be reused the same results. + ReusedScalarSubquery(subquery.exprId, samePlan.get._2) + } else { + val executedPlan = new QueryExecution(sparkSession, subquery.plan).executedPlan + val physicalSubquery = new ScalarSubquery(executedPlan, subquery.exprId) + sameSchema += ((subquery.plan, physicalSubquery)) + physicalSubquery + } } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index dca9e5e503c72..4ee4af7a3e283 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -2896,4 +2896,31 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { sql(s"SELECT '$literal' AS DUMMY"), Row(s"$expected") :: Nil) } + + test("SPARK-16456: Reuse the uncorrelated scalar subqueries with the same logical plan") { + withTempTable("t1", "t2", "t3") { + val df = (1 to 3).map(i => (i, i)).toDF("key", "value") + df.createOrReplaceTempView("t1") + df.createOrReplaceTempView("t2") + df.createOrReplaceTempView("t3") + checkAnswer( + sql( + """ + |WITH max_test AS + |( + | SELECT max(key) as max_key FROM t1 + |), + |max_test2 AS + |( + | SELECT max(key) as max_key FROM t1 + |) + |SELECT key FROM t2 + |WHERE key = (SELECT max_key FROM max_test) and value = (SELECT max_key FROM max_test) + |UNION ALL + |SELECT key FROM t3 + |WHERE key = (SELECT max_key FROM max_test) and value = (SELECT max_key FROM max_test2) + """.stripMargin + ), Row(3) :: Row(3) :: Nil) + } + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala index c96239e682018..543a3bacf5ca2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.execution import org.apache.spark.rdd.RDD import org.apache.spark.sql.{execution, Row} import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute, Literal, SortOrder} +import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.Inner import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Repartition} import org.apache.spark.sql.catalyst.plans.physical._ @@ -518,6 +518,40 @@ class PlannerSuite extends SharedSQLContext { fail(s"Should have only two shuffles:\n$outputPlan") } } + + test("SPARK-16456: Reuse the uncorrelated scalar subqueries with the same logical plan") { + withTempTable("t1", "t2", "t3") { + val df = (1 to 3).map(i => (i, i)).toDF("key", "value") + df.createOrReplaceTempView("t1") + df.createOrReplaceTempView("t2") + df.createOrReplaceTempView("t3") + val planned = sql( + """ + |WITH max_test AS + |( + | SELECT max(key) as max_key FROM t1 + |), + |max_test2 AS + |( + | SELECT max(key) as max_key FROM t1 + |) + |SELECT key FROM t2 + |WHERE key = (SELECT max_key FROM max_test) and value = (SELECT max_key FROM max_test) + |UNION ALL + |SELECT key FROM t3 + |WHERE key = (SELECT max_key FROM max_test) and value = (SELECT max_key FROM max_test2) + """.stripMargin + ).queryExecution.executedPlan + val numExecutedSubqueries = planned.flatMap { + case plan => plan.expressions.flatMap(_.collect { case e: ScalarSubquery => e }) + }.distinct.size + assert(numExecutedSubqueries === 1) + val numReusedSubqueries = planned.flatMap { + case plan => plan.expressions.flatMap(_.collect { case e: ReusedScalarSubquery => e }) + }.size + assert(numReusedSubqueries === 3) + } + } } // Used for unit-testing EnsureRequirements From b1914de9001c3b66a1200876327e39d2aec2453b Mon Sep 17 00:00:00 2001 From: Lianhui Wang Date: Sat, 9 Jul 2016 13:28:17 +0800 Subject: [PATCH 2/6] fix explain --- .../scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala | 4 +++- .../main/scala/org/apache/spark/sql/execution/subquery.scala | 2 +- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala index cf34f4b30d8d8..9965d6e39c01a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala @@ -263,7 +263,9 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends TreeNode[PlanT * All the subqueries of current plan. */ def subqueries: Seq[PlanType] = { - expressions.flatMap(_.collect {case e: SubqueryExpression => e.plan.asInstanceOf[PlanType]}) + expressions.flatMap(_.collect { + case e: SubqueryExpression => e + }).distinct.map(_.plan.asInstanceOf[PlanType]) } override protected def innerChildren: Seq[QueryPlan[_]] = subqueries diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala index 6cbc0b0a38f8b..79b98059f2166 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala @@ -94,7 +94,7 @@ case class ReusedScalarSubquery( child: ScalarSubquery) extends UnaryExpression { override def dataType: DataType = child.dataType - override def toString: String = s"ReusedSubquery#${exprId.id} ($child)" + override def toString: String = s"ReusedSubquery#${exprId.id}($child)" override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = defineCodeGen(ctx, ev, c => c) From 77ea002b310e08a1994af5a6182bb01bb75d7fd2 Mon Sep 17 00:00:00 2001 From: Lianhui Wang Date: Sat, 9 Jul 2016 17:07:48 +0800 Subject: [PATCH 3/6] fix bug --- .../spark/sql/execution/SparkPlan.scala | 47 ++++------------ .../apache/spark/sql/execution/subquery.scala | 53 ++++++++++++++++--- .../org/apache/spark/sql/SQLQuerySuite.scala | 27 ---------- .../org/apache/spark/sql/SubquerySuite.scala | 27 ++++++++++ 4 files changed, 82 insertions(+), 72 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala index a94ab9086c450..2dd4a8c7b66e8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala @@ -20,8 +20,7 @@ package org.apache.spark.sql.execution import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataInputStream, DataOutputStream} import scala.collection.mutable.ArrayBuffer -import scala.concurrent.{ExecutionContext, Future} -import scala.concurrent.duration._ +import scala.concurrent.ExecutionContext import org.apache.spark.{broadcast, SparkEnv} import org.apache.spark.internal.Logging @@ -138,51 +137,30 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ } /** - * List of (uncorrelated scalar subquery, future holding the subquery result) for this plan node. + * List of uncorrelated scalar subquery for this plan node. * This list is populated by [[prepareSubqueries]], which is called in [[prepare]]. */ @transient - private val subqueryResults = new ArrayBuffer[(ScalarSubquery, Future[Array[InternalRow]])] + private var allSubqueries = new ArrayBuffer[ScalarSubquery] /** * Finds scalar subquery expressions in this plan node and starts evaluating them. - * The list of subqueries are added to [[subqueryResults]]. + * The list of subqueries are added to [[allSubqueries]]. */ protected def prepareSubqueries(): Unit = { - val allSubqueries = expressions.flatMap(_.collect { - case e: ScalarSubquery if !e.isExecuted => e - }).distinct - allSubqueries.asInstanceOf[Seq[ScalarSubquery]].foreach { e => - e.updateExecutedState() - val futureResult = Future { - // Each subquery should return only one row (and one column). We take two here and throws - // an exception later if the number of rows is greater than one. - e.executedPlan.executeTake(2) - }(SparkPlan.subqueryExecutionContext) - subqueryResults += e -> futureResult + expressions.flatMap(_.collect { case e: ScalarSubquery => e }).distinct.foreach { e => + e.submitSubqueryEvaluated() + allSubqueries += e } } /** - * Blocks the thread until all subqueries finish evaluation and update the results. + * Blocks the thread until all subqueries finish evaluation. */ protected def waitForSubqueries(): Unit = synchronized { - // fill in the result of subqueries - subqueryResults.foreach { case (e, futureResult) => - val rows = ThreadUtils.awaitResult(futureResult, Duration.Inf) - if (rows.length > 1) { - sys.error(s"more than one row returned by a subquery used as an expression:\n${e.plan}") - } - if (rows.length == 1) { - assert(rows(0).numFields == 1, - s"Expects 1 field, but got ${rows(0).numFields}; something went wrong in analysis") - e.updateResult(rows(0).get(0, e.dataType)) - } else { - // If there is no rows returned, the result should be null. - e.updateResult(null) - } + allSubqueries.foreach { e => + e.awaitSubqueryResult() } - subqueryResults.clear() } /** @@ -393,11 +371,6 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ } } -object SparkPlan { - private[execution] val subqueryExecutionContext = ExecutionContext.fromExecutorService( - ThreadUtils.newDaemonCachedThreadPool("subquery", 16)) -} - private[sql] trait LeafExecNode extends SparkPlan { override def children: Seq[SparkPlan] = Nil override def producedAttributes: AttributeSet = outputSet diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala index 79b98059f2166..14fd76347440c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala @@ -19,6 +19,8 @@ package org.apache.spark.sql.execution import scala.collection.mutable import scala.collection.mutable.ArrayBuffer +import scala.concurrent.{ExecutionContext, Future} +import scala.concurrent.duration.Duration import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.expressions @@ -28,6 +30,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCo import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.types.{DataType, StructType} +import org.apache.spark.util.ThreadUtils /** * A subquery that will return only one row and one column. @@ -53,15 +56,10 @@ case class ScalarSubquery( // the first column in first row from `query`. @volatile private var result: Any = null @volatile private var updated: Boolean = false - @volatile private var executed: Boolean = false + @volatile private var evaluated: Boolean = false + @volatile private var futureResult: Future[Array[InternalRow]] = _ - def isExecuted: Boolean = executed - - def updateExecutedState() : Unit = { - executed = true - } - - def updateResult(v: Any): Unit = { + private def updateResult(v: Any): Unit = { result = v updated = true } @@ -76,6 +74,40 @@ case class ScalarSubquery( Literal.create(result, dataType).doGenCode(ctx, ev) } + /** + * Submit the subquery to be evaluated. No need to do if it has been evaluated before. + */ + def submitSubqueryEvaluated(): Unit = synchronized { + if (!evaluated) { + futureResult = Future { + // Each subquery should return only one row (and one column). We take two here and throws + // an exception later if the number of rows is greater than one. + executedPlan.executeTake(2) + }(ScalarSubquery.subqueryExecutionContext) + evaluated = true + } + } + + /** + * Blocks the thread until the evaluation of subquery has been finished. + */ + def awaitSubqueryResult(): Unit = synchronized { + if (!updated) { + val rows = ThreadUtils.awaitResult(futureResult, Duration.Inf) + if (rows.length > 1) { + sys.error(s"more than one row returned by a subquery used as an expression:\n${plan}") + } + if (rows.length == 1) { + assert(rows(0).numFields == 1, + s"Expects 1 field, but got ${rows(0).numFields}; something went wrong in analysis") + updateResult(rows(0).get(0, dataType)) + } else { + // If there is no rows returned, the result should be null. + updateResult(null) + } + } + } + override def equals(o: Any): Boolean = o match { case other: ScalarSubquery => this.eq(other) case _ => false @@ -84,6 +116,11 @@ case class ScalarSubquery( override def hashCode: Int = exprId.hashCode() } +object ScalarSubquery { + private[execution] val subqueryExecutionContext = ExecutionContext.fromExecutorService( + ThreadUtils.newDaemonCachedThreadPool("subquery", 16)) +} + /** * A wrapper for reused uncorrelated ScalarSubquery to avoid the re-computing for subqueries with * the same "canonical" logical plan in a query, because uncorrelated subqueries with the same diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 4ee4af7a3e283..dca9e5e503c72 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -2896,31 +2896,4 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { sql(s"SELECT '$literal' AS DUMMY"), Row(s"$expected") :: Nil) } - - test("SPARK-16456: Reuse the uncorrelated scalar subqueries with the same logical plan") { - withTempTable("t1", "t2", "t3") { - val df = (1 to 3).map(i => (i, i)).toDF("key", "value") - df.createOrReplaceTempView("t1") - df.createOrReplaceTempView("t2") - df.createOrReplaceTempView("t3") - checkAnswer( - sql( - """ - |WITH max_test AS - |( - | SELECT max(key) as max_key FROM t1 - |), - |max_test2 AS - |( - | SELECT max(key) as max_key FROM t1 - |) - |SELECT key FROM t2 - |WHERE key = (SELECT max_key FROM max_test) and value = (SELECT max_key FROM max_test) - |UNION ALL - |SELECT key FROM t3 - |WHERE key = (SELECT max_key FROM max_test) and value = (SELECT max_key FROM max_test2) - """.stripMargin - ), Row(3) :: Row(3) :: Nil) - } - } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala index 1d9ff21dbf5d9..3919b1c38dc79 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala @@ -571,4 +571,31 @@ class SubquerySuite extends QueryTest with SharedSQLContext { Row(1.0, false) :: Row(1.0, false) :: Row(2.0, true) :: Row(2.0, true) :: Row(3.0, false) :: Row(5.0, true) :: Row(null, false) :: Row(null, true) :: Nil) } + + test("SPARK-16456: Reuse the uncorrelated scalar subqueries with the same logical plan") { + withTempTable("t1", "t2", "t3") { + val df = (1 to 3).map(i => (i, i)).toDF("key", "value") + df.createOrReplaceTempView("t1") + df.createOrReplaceTempView("t2") + df.createOrReplaceTempView("t3") + checkAnswer( + sql( + """ + |WITH max_test AS + |( + | SELECT max(key) as max_key FROM t1 + |), + |max_test2 AS + |( + | SELECT max(key) as max_key FROM t1 + |) + |SELECT key FROM t2 + |WHERE key = (SELECT max_key FROM max_test) and value = (SELECT max_key FROM max_test) + |UNION ALL + |SELECT key FROM t3 + |WHERE key = (SELECT max_key FROM max_test) and value = (SELECT max_key FROM max_test2) + """.stripMargin + ), Row(3) :: Row(3) :: Nil) + } + } } From 03115424302ad61e85aab22ae6c239f129b43342 Mon Sep 17 00:00:00 2001 From: Lianhui Wang Date: Sat, 9 Jul 2016 17:13:22 +0800 Subject: [PATCH 4/6] fix minor --- .../scala/org/apache/spark/sql/execution/SparkPlan.scala | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala index 2dd4a8c7b66e8..5f9355ee397a2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala @@ -158,9 +158,8 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ * Blocks the thread until all subqueries finish evaluation. */ protected def waitForSubqueries(): Unit = synchronized { - allSubqueries.foreach { e => - e.awaitSubqueryResult() - } + allSubqueries.foreach(_.awaitSubqueryResult()) + allSubqueries.clear() } /** From c4bb2737ae7067d385247558b81e593eb7252c68 Mon Sep 17 00:00:00 2001 From: Lianhui Wang Date: Sat, 9 Jul 2016 18:15:59 +0800 Subject: [PATCH 5/6] fix transient --- .../main/scala/org/apache/spark/sql/execution/subquery.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala index 14fd76347440c..ae29a440c6f17 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala @@ -56,8 +56,8 @@ case class ScalarSubquery( // the first column in first row from `query`. @volatile private var result: Any = null @volatile private var updated: Boolean = false - @volatile private var evaluated: Boolean = false - @volatile private var futureResult: Future[Array[InternalRow]] = _ + @transient private var evaluated: Boolean = false + @transient private var futureResult: Future[Array[InternalRow]] = _ private def updateResult(v: Any): Unit = { result = v From 1d7bd3c50b57c517888ebbaea4d0db9eadcf78e5 Mon Sep 17 00:00:00 2001 From: Lianhui Wang Date: Sun, 10 Jul 2016 21:03:43 +0800 Subject: [PATCH 6/6] fix style --- .../scala/org/apache/spark/sql/execution/subquery.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala index ae29a440c6f17..1e9873e2a2a1b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala @@ -54,10 +54,10 @@ case class ScalarSubquery( override def toString: String = s"subquery#${exprId.id}" // the first column in first row from `query`. - @volatile private var result: Any = null - @volatile private var updated: Boolean = false - @transient private var evaluated: Boolean = false - @transient private var futureResult: Future[Array[InternalRow]] = _ + @volatile private[this] var result: Any = null + @volatile private[this] var updated: Boolean = false + @transient private[this] var evaluated: Boolean = false + @transient private[this] var futureResult: Future[Array[InternalRow]] = _ private def updateResult(v: Any): Unit = { result = v