-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-16958] [SQL] Reuse subqueries within the same query #14548
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -17,42 +17,78 @@ | |
|
|
||
| package org.apache.spark.sql.execution | ||
|
|
||
| import scala.collection.mutable | ||
| import scala.collection.mutable.ArrayBuffer | ||
|
|
||
| import org.apache.spark.sql.SparkSession | ||
| import org.apache.spark.sql.catalyst.expressions | ||
| import org.apache.spark.sql.catalyst.InternalRow | ||
| import org.apache.spark.sql.catalyst.expressions.{Expression, ExprId, Literal, SubqueryExpression} | ||
| import org.apache.spark.sql.catalyst.{expressions, InternalRow} | ||
| import org.apache.spark.sql.catalyst.expressions._ | ||
| import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode} | ||
| import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan | ||
| import org.apache.spark.sql.catalyst.rules.Rule | ||
| import org.apache.spark.sql.types.DataType | ||
| import org.apache.spark.sql.internal.SQLConf | ||
| import org.apache.spark.sql.types.{BooleanType, DataType, StructType} | ||
|
|
||
| /** | ||
| * The base class for subquery that is used in SparkPlan. | ||
| */ | ||
| trait ExecSubqueryExpression extends SubqueryExpression { | ||
|
|
||
| val executedPlan: SubqueryExec | ||
| def withExecutedPlan(plan: SubqueryExec): ExecSubqueryExpression | ||
|
|
||
| // does not have logical plan | ||
| override def query: LogicalPlan = throw new UnsupportedOperationException | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe we should remove this from the interface? If we are going to use a type parameter anyway (see comment on withNewPlan) |
||
| override def withNewPlan(plan: LogicalPlan): SubqueryExpression = | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we try to combine this with
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Tried this multiple times, does not work that well, we could try again later.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ok, lets address this in a (potential) follow-up. |
||
| throw new UnsupportedOperationException | ||
|
|
||
| override def plan: SparkPlan = executedPlan | ||
|
|
||
| /** | ||
| * Fill the expression with collected result from executed plan. | ||
| */ | ||
| def updateResult(): Unit | ||
| } | ||
|
|
||
| /** | ||
| * A subquery that will return only one row and one column. | ||
| * | ||
| * This is the physical copy of ScalarSubquery to be used inside SparkPlan. | ||
| */ | ||
| case class ScalarSubquery( | ||
| executedPlan: SparkPlan, | ||
| executedPlan: SubqueryExec, | ||
| exprId: ExprId) | ||
| extends SubqueryExpression { | ||
|
|
||
| override def query: LogicalPlan = throw new UnsupportedOperationException | ||
| override def withNewPlan(plan: LogicalPlan): SubqueryExpression = { | ||
| throw new UnsupportedOperationException | ||
| } | ||
| override def plan: SparkPlan = SubqueryExec(simpleString, executedPlan) | ||
| extends ExecSubqueryExpression { | ||
|
|
||
| override def dataType: DataType = executedPlan.schema.fields.head.dataType | ||
| override def children: Seq[Expression] = Nil | ||
| override def nullable: Boolean = true | ||
| override def toString: String = s"subquery#${exprId.id}" | ||
| override def toString: String = executedPlan.simpleString | ||
|
|
||
| def withExecutedPlan(plan: SubqueryExec): ExecSubqueryExpression = copy(executedPlan = plan) | ||
|
|
||
| override def semanticEquals(other: Expression): Boolean = other match { | ||
| case s: ScalarSubquery => executedPlan.sameResult(executedPlan) | ||
| case _ => false | ||
| } | ||
|
|
||
| // the first column in first row from `query`. | ||
| @volatile private var result: Any = null | ||
| @volatile private var updated: Boolean = false | ||
|
|
||
| def updateResult(v: Any): Unit = { | ||
| result = v | ||
| def updateResult(): Unit = { | ||
| val rows = plan.executeCollect() | ||
| if (rows.length > 1) { | ||
| sys.error(s"more than one row returned by a subquery used as an expression:\n${plan}") | ||
| } | ||
| if (rows.length == 1) { | ||
| assert(rows(0).numFields == 1, | ||
| s"Expects 1 field, but got ${rows(0).numFields}; something went wrong in analysis") | ||
| result = rows(0).get(0, dataType) | ||
| } else { | ||
| // If there is no rows returned, the result should be null. | ||
| result = null | ||
| } | ||
| updated = true | ||
| } | ||
|
|
||
|
|
@@ -67,6 +103,51 @@ case class ScalarSubquery( | |
| } | ||
| } | ||
|
|
||
| /** | ||
| * A subquery that will check the value of `child` whether is in the result of a query or not. | ||
| */ | ||
| case class InSubquery( | ||
| child: Expression, | ||
| executedPlan: SubqueryExec, | ||
| exprId: ExprId, | ||
| private var result: Array[Any] = null, | ||
| private var updated: Boolean = false) extends ExecSubqueryExpression { | ||
|
|
||
| override def dataType: DataType = BooleanType | ||
| override def children: Seq[Expression] = child :: Nil | ||
| override def nullable: Boolean = child.nullable | ||
| override def toString: String = s"$child IN ${executedPlan.name}" | ||
|
|
||
| def withExecutedPlan(plan: SubqueryExec): ExecSubqueryExpression = copy(executedPlan = plan) | ||
|
|
||
| override def semanticEquals(other: Expression): Boolean = other match { | ||
| case in: InSubquery => child.semanticEquals(in.child) && | ||
| executedPlan.sameResult(in.executedPlan) | ||
| case _ => false | ||
| } | ||
|
|
||
| def updateResult(): Unit = { | ||
| val rows = plan.executeCollect() | ||
| result = rows.map(_.get(0, child.dataType)).asInstanceOf[Array[Any]] | ||
| updated = true | ||
| } | ||
|
|
||
| override def eval(input: InternalRow): Any = { | ||
| require(updated, s"$this has not finished") | ||
| val v = child.eval(input) | ||
| if (v == null) { | ||
| null | ||
| } else { | ||
| result.contains(v) | ||
| } | ||
| } | ||
|
|
||
| override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { | ||
| require(updated, s"$this has not finished") | ||
| InSet(child, result.toSet).doGenCode(ctx, ev) | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Plans scalar subqueries from that are present in the given [[SparkPlan]]. | ||
| */ | ||
|
|
@@ -75,7 +156,39 @@ case class PlanSubqueries(sparkSession: SparkSession) extends Rule[SparkPlan] { | |
| plan.transformAllExpressions { | ||
| case subquery: expressions.ScalarSubquery => | ||
| val executedPlan = new QueryExecution(sparkSession, subquery.plan).executedPlan | ||
| ScalarSubquery(executedPlan, subquery.exprId) | ||
| ScalarSubquery( | ||
| SubqueryExec(s"subquery${subquery.exprId.id}", executedPlan), | ||
| subquery.exprId) | ||
| case expressions.PredicateSubquery(plan, Seq(e: Expression), _, exprId) => | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Shouldn't we add some size check here? This might as well materialize a billion rows.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Under what circumstance is this triggered? A predicate subquery in Project?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. All the PredicateSubquery from original SQL query will be rewritten as join, this could come from other optimization rules, that rule may make sure that there not billions of rows. |
||
| val executedPlan = new QueryExecution(sparkSession, plan).executedPlan | ||
| InSubquery(e, SubqueryExec(s"subquery${exprId.id}", executedPlan), exprId) | ||
| } | ||
| } | ||
| } | ||
|
|
||
|
|
||
| /** | ||
| * Find out duplicated exchanges in the spark plan, then use the same exchange for all the | ||
| * references. | ||
| */ | ||
| case class ReuseSubquery(conf: SQLConf) extends Rule[SparkPlan] { | ||
|
|
||
| def apply(plan: SparkPlan): SparkPlan = { | ||
| if (!conf.exchangeReuseEnabled) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: We could also add this to planner conditionally.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Moving this to QueryExecution actually will make it ugly :(
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ok, lets leave it then.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why sharing the same conf |
||
| return plan | ||
| } | ||
| // Build a hash map using schema of exchanges to avoid O(N*N) sameResult calls. | ||
| val subqueries = mutable.HashMap[StructType, ArrayBuffer[SubqueryExec]]() | ||
| plan transformAllExpressions { | ||
| case sub: ExecSubqueryExpression => | ||
| val sameSchema = subqueries.getOrElseUpdate(sub.plan.schema, ArrayBuffer[SubqueryExec]()) | ||
| val sameResult = sameSchema.find(_.sameResult(sub.plan)) | ||
| if (sameResult.isDefined) { | ||
| sub.withExecutedPlan(sameResult.get) | ||
| } else { | ||
| sameSchema += sub.executedPlan | ||
| sub | ||
| } | ||
| } | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -99,7 +99,11 @@ object SparkPlanGraph { | |
| case "Subquery" if subgraph != null => | ||
| // Subquery should not be included in WholeStageCodegen | ||
| buildSparkPlanGraphNode(planInfo, nodeIdGenerator, nodes, edges, parent, null, exchanges) | ||
| case "ReusedExchange" => | ||
| case "Subquery" if exchanges.contains(planInfo) => | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could post a screen shot of this? |
||
| // Point to the re-used subquery | ||
| val node = exchanges(planInfo) | ||
| edges += SparkPlanGraphEdge(node.id, parent.id) | ||
| case "ReusedExchange" if exchanges.contains(planInfo.children.head) => | ||
| // Point to the re-used exchange | ||
| val node = exchanges(planInfo.children.head) | ||
| edges += SparkPlanGraphEdge(node.id, parent.id) | ||
|
|
@@ -115,7 +119,7 @@ object SparkPlanGraph { | |
| } else { | ||
| subgraph.nodes += node | ||
| } | ||
| if (name.contains("Exchange")) { | ||
| if (name.contains("Exchange") || name == "Subquery") { | ||
| exchanges += planInfo -> node | ||
| } | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A large part of this class is shared with BroadcastExchangeExec. Should we try to factor out common functionality?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's OK to have some duplicated code here, over abstracted code is actually harder to read.