diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java index d205547698c5b..a2d4e3a38905c 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java @@ -172,6 +172,17 @@ public void pointTo(Object baseObject, long baseOffset, int sizeInBytes) { this.sizeInBytes = sizeInBytes; } + /** + * Update this UnsafeRow to point to different UnsafeRow. + * + * @param other the UnsafeRow to point to + */ + public void pointTo(UnsafeRow other) { + this.baseObject = other.baseObject; + this.baseOffset = other.baseOffset; + this.sizeInBytes = other.sizeInBytes; + } + /** * Update this UnsafeRow to point to the underlying byte array. * diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 58f98d529ab58..e76a1d80ea5b4 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -2219,7 +2219,7 @@ class Analyzer( */ object EliminateSubqueryAliases extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan transformUp { - case SubqueryAlias(_, child, _) => child + case SubqueryAlias(_, child, _, _) => child } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 75d9997582aa6..4a807df492dca 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -21,6 +21,7 @@ import scala.annotation.tailrec import scala.collection.immutable.HashSet import scala.collection.mutable import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.HashMap import org.apache.spark.api.java.function.FilterFunction import org.apache.spark.sql.AnalysisException @@ -51,7 +52,7 @@ abstract class Optimizer(sessionCatalog: SessionCatalog, conf: CatalystConf) // However, because we also use the analyzer to canonicalized queries (for view definition), // we do not eliminate subqueries or compute current time in the analyzer. Batch("Finish Analysis", Once, - EliminateSubqueryAliases, + EliminateNonDuplicatedSubqueryAliases, ReplaceExpressions, ComputeCurrentTime, GetCurrentDatabase(sessionCatalog), @@ -92,6 +93,8 @@ abstract class Optimizer(sessionCatalog: SessionCatalog, conf: CatalystConf) CombineFilters, CombineLimits, CombineUnions, + // Pushdown Filters again after combination + PushDownPredicate, // Constant folding and strength reduction NullPropagation, FoldablePropagation, @@ -421,6 +424,9 @@ object ColumnPruning extends Rule[LogicalPlan] { // Can't prune the columns on LeafNode case p @ Project(_, _: LeafNode) => p + // Don't prune the columns on common subquery. + case p @ Project(_, SubqueryAlias(_, _, _, true)) => p + // for all other logical plans that inherits the output from it's children case p @ Project(_, child) => val required = child.references ++ p.references @@ -1167,3 +1173,211 @@ object RemoveRepetitionFromGroupExpressions extends Rule[LogicalPlan] { a.copy(groupingExpressions = newGrouping) } } + +/** + * Optimizes the logical plans wrapped in SubqueryAlias and operators on them. + * The SubqueryAlias which are remaining in optimization phase are common subqueries, + * i.e., they are duplicate in the whole query plan. The logical plans wrapped in + * SubqueryAlias will be executed individually later. However, some operators such as + * Project and Filter can be optimized with the wrapped logical plans. Thus, this rule + * considers the optimization of the wrapped logical plans and operators on SubqueryAlias. + */ +case class OptimizeCommonSubqueries(optimizer: Optimizer) + extends Rule[LogicalPlan] with PredicateHelper { + // Optimized the subqueries which all have a Project parent node and the same results. + private def optimizeProjectWithSubqueries( + plan: LogicalPlan, + keyPlan: LogicalPlan, + subqueries: ArrayBuffer[LogicalPlan]): LogicalPlan = { + plan transform { + case p @ Project(pList, s @ SubqueryAlias(alias, subquery, v, true)) + if s.sameResult(keyPlan) => + val pListForAll: Seq[NamedExpression] = subqueries.flatMap { case Project(pList, child) => + val rewrites = buildRewrites(child, subquery) + pList.map(pushToOtherPlan(_, rewrites)) + } + + val newSubquery = Project(pListForAll, subquery) + val optimized = optimizer.execute(newSubquery) + // Check if any optimization is performed. + if (optimized.sameResult(newSubquery)) { + // No optimization happens. Let's keep original subquery. + p + } else { + Project(pList.map(_.toAttribute), SubqueryAlias(alias, newSubquery, v, true)) + } + } + } + + /** + * Maps Attributes from the source side to the corresponding Attribute on the target side. + */ + private def buildRewrites(source: LogicalPlan, target: LogicalPlan): AttributeMap[Attribute] = { + assert(source.output.size == target.output.size) + AttributeMap(source.output.zip(target.output)) + } + + /** + * Rewrites an expression so that it can be pushed to another LogicalPlan. + */ + private def pushToOtherPlan[A <: Expression](e: A, rewrites: AttributeMap[Attribute]) = { + val result = e transformUp { + case a: Attribute => rewrites.get(a).getOrElse(a) + } + + // We must promise the compiler that we did not discard the names in the case of project + // expressions. This is safe since the only transformation is from Attribute => Attribute. + result.asInstanceOf[A] + } + + private def optimizeFilterWithSubqueries( + plan: LogicalPlan, + keyPlan: LogicalPlan, + subqueries: ArrayBuffer[LogicalPlan]): LogicalPlan = { + var pushdownConds = splitConjunctivePredicates(subqueries(0).asInstanceOf[Filter].condition) + subqueries.tail.foreach { + case Filter(otherCond, child) => + val rewrites = buildRewrites(child, subqueries(0).asInstanceOf[Filter].child) + // We can't simply push down all conditions from other Filter by concatenating them with + // [[Or]]. Because if any conditions contains a sub-condition which can't be pushed down + // through intermediate operators, it makes all concatenated conditions not pushed doen. + // E.g., first condition is [a && b] and second condition is [c]. If b can't be pushed + // down, the final condition [[a && b] || c] can't be pushed down too. + val pushdowns = new ArrayBuffer[Expression]() + splitConjunctivePredicates(otherCond).foreach { cond => + val rewritten = pushToOtherPlan(cond, rewrites) + pushdownConds.flatMap { pushdown => + val subConds = splitDisjunctivePredicates(pushdown) + val orCond = Or(pushdown, rewritten) + // To avoid exponential explosion of predicates, we skip [[IsNotNull]] and predicates + // which semantically equal to existing predicates. + if (rewritten.isInstanceOf[IsNotNull] + || pushdown.isInstanceOf[IsNotNull] + || subConds.exists(rewritten.semanticEquals(_)) + || pushdowns.exists(orCond.semanticEquals(_)) + || pushdownConds.exists(orCond.semanticEquals(_))) { + None + } else { + Some(orCond) + } + }.map { cond => + if (!pushdowns.exists(cond.semanticEquals(_))) { + pushdowns += cond + } + } + } + pushdownConds = pushdowns.toSeq + } + // No pushdown for common subqueries. + if (pushdownConds.isEmpty) { + plan + } else { + val finalPushdownCondition: Expression = pushdownConds.reduce(And) + plan transformDown { + case f @ Filter(cond, s @ SubqueryAlias(a, subquery, v, true)) if s.sameResult(keyPlan) => + val pushdownCond: Expression = subqueries.foldLeft(finalPushdownCondition) { + case (currentCond, sub) => + val rewrites = buildRewrites(sub.asInstanceOf[Filter].child, subquery) + pushToOtherPlan(currentCond, rewrites) + } + + val newSubquery = Filter(pushdownCond, subquery) + val optimized = optimizer.execute(newSubquery) + + // Check if any optimization is performed. + if (optimized.sameResult(newSubquery)) { + // No optimization happens. Let's keep original subquery. + f + } else { + Filter(cond, SubqueryAlias(a, newSubquery, v, true)) + } + } + } + } + + def apply(plan: LogicalPlan): LogicalPlan = { + val subqueryMap = HashMap.empty[LogicalPlan, ArrayBuffer[LogicalPlan]] + + // Constructs the groups of the subqueries with the same results. + plan.foreach { + case u: UnaryNode + if u.child.isInstanceOf[SubqueryAlias] && + u.child.asInstanceOf[SubqueryAlias].commonSubquery => + + val child = u.child.asInstanceOf[SubqueryAlias].child + // Looking for the existing group with the same results. + subqueryMap.find { case (key, _) => + if (key.sameResult(child)) { + true + } else { + false + } + }.map { case (_, subqueries) => + // If found, add current logical plan into this group. + subqueries += u + }.getOrElse { + // If not, create a new group. + subqueryMap += ((child, ArrayBuffer[LogicalPlan](u))) + } + case _ => + } + + // Begins to optimize common SubqueryAlias with outside operators. + // We only need to take care two cases: + // 1. All subqueries have a Project on them. + // 2. All subqueries have a Filter on them. + var currentPlan = plan + subqueryMap.foreach { case (key, subqueries) => + if (subqueries.length > 1) { + val allProject = subqueries.forall(_.isInstanceOf[Project]) + if (allProject) { + currentPlan = optimizeProjectWithSubqueries(currentPlan, key, subqueries) + } else { + val allFilter = subqueries.forall(_.isInstanceOf[Filter]) + if (allFilter) { + currentPlan = optimizeFilterWithSubqueries(currentPlan, key, subqueries) + } + } + } + } + currentPlan + } +} + +/** + * Removes the [[SubqueryAlias]] operators which are not duplicated in the query plan. + */ +object EliminateNonDuplicatedSubqueryAliases extends Rule[LogicalPlan] { + def apply(plan: LogicalPlan): LogicalPlan = { + val subqueries = ArrayBuffer[LogicalPlan]() + val duplicateSubqueries = ArrayBuffer[LogicalPlan]() + + // Eliminates the recursive subqueries which have the same output. + val cleanedPlan = plan.transformDown { + case s @ SubqueryAlias(_, child, _, _) + if child.find(p => p.isInstanceOf[SubqueryAlias] && p.sameResult(s)).isDefined => + child + } + + // Collects duplicated subqueries but ignores the SubqueryAlias of table scan. + cleanedPlan.foreach { + case SubqueryAlias(_, child, _, _) if !child.isInstanceOf[MultiInstanceRelation] => + if (subqueries.indexWhere(s => s.sameResult(child)) >= 0) { + duplicateSubqueries += child + } else { + subqueries += child + } + case _ => + } + + // Eliminates non-duplicated subqueries. + cleanedPlan.transformDown { + case SubqueryAlias(alias, child, v, _) => + if (duplicateSubqueries.indexWhere(s => s.sameResult(child)) < 0) { + child + } else { + SubqueryAlias(alias, child, v, commonSubquery = true) + } + } + } +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala index f14aaab72a98f..f8057097f6708 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala @@ -163,7 +163,7 @@ object RewriteCorrelatedScalarSubquery extends Rule[LogicalPlan] { // and Project operators, followed by an optional Filter, followed by an // Aggregate. Traverse the operators recursively. def evalPlan(lp : LogicalPlan) : Map[ExprId, Option[Any]] = lp match { - case SubqueryAlias(_, child, _) => evalPlan(child) + case SubqueryAlias(_, child, _, false) => evalPlan(child) case Filter(condition, child) => val bindings = evalPlan(child) if (bindings.isEmpty) bindings @@ -221,7 +221,7 @@ object RewriteCorrelatedScalarSubquery extends Rule[LogicalPlan] { topPart += p bottomPart = child - case s @ SubqueryAlias(_, child, _) => + case s @ SubqueryAlias(_, child, _, false) => topPart += s bottomPart = child @@ -292,8 +292,8 @@ object RewriteCorrelatedScalarSubquery extends Rule[LogicalPlan] { topPart.reverse.foreach { case Project(projList, _) => subqueryRoot = Project(projList ++ havingInputs, subqueryRoot) - case s @ SubqueryAlias(alias, _, None) => - subqueryRoot = SubqueryAlias(alias, subqueryRoot, None) + case s @ SubqueryAlias(alias, _, None, false) => + subqueryRoot = SubqueryAlias(alias, subqueryRoot, None, false) case op => sys.error(s"Unexpected operator $op in corelated subquery") } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index 0de5aa8a93950..6ed318ec0426e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -701,13 +701,37 @@ case class LocalLimit(limitExpr: Expression, child: LogicalPlan) extends UnaryNo } } +/** + * Represents a subquery in query plan. + * @param alias The name of this subquery. + * @param child The logical plan of the subquery. + * @param commonSubquery Whether this subquery is a common subquery, i.e., the logical plan + * referred more than once in the query plan. Default: false. + */ case class SubqueryAlias( alias: String, child: LogicalPlan, - view: Option[TableIdentifier]) + view: Option[TableIdentifier], + commonSubquery: Boolean = false) extends UnaryNode { override def output: Seq[Attribute] = child.output.map(_.withQualifier(Some(alias))) + + override def sameResult(plan: LogicalPlan): Boolean = plan match { + case c: SubqueryAlias => + val thisChild = child.collectFirst { + case p: LogicalPlan if !p.isInstanceOf[SubqueryAlias] => p + } + val otherChild = c.child.collectFirst { + case p: LogicalPlan if !p.isInstanceOf[SubqueryAlias] => p + } + if (thisChild.isDefined && otherChild.isDefined) { + thisChild.get.sameResult(otherChild.get) + } else { + false + } + case o => child.sameResult(o) + } } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/SQLBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/SQLBuilder.scala index 380454267eaf4..a8b59c5899d31 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/SQLBuilder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/SQLBuilder.scala @@ -451,7 +451,7 @@ class SQLBuilder private ( object RemoveSubqueriesAboveSQLTable extends Rule[LogicalPlan] { override def apply(plan: LogicalPlan): LogicalPlan = plan transformUp { - case SubqueryAlias(_, t @ ExtractSQLTable(_), _) => t + case SubqueryAlias(_, t @ ExtractSQLTable(_), _, _) => t } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala index b3ef29f6e34c4..d646c8f4137b1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala @@ -24,6 +24,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql.{AnalysisException, Row, SparkSession} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker +import org.apache.spark.sql.catalyst.optimizer.OptimizeCommonSubqueries import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, ReturnAnswer} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.util.DateTimeUtils @@ -70,7 +71,11 @@ class QueryExecution(val sparkSession: SparkSession, val logical: LogicalPlan) { sparkSession.sharedState.cacheManager.useCachedData(analyzed) } - lazy val optimizedPlan: LogicalPlan = sparkSession.sessionState.optimizer.execute(withCachedData) + lazy val optimizedPlan: LogicalPlan = { + val optimized = sparkSession.sessionState.optimizer.execute(withCachedData) + val subqueryOptimized = OptimizeCommonSubqueries(sparkSession.sessionState.optimizer)(optimized) + DedupCommonSubqueries(sparkSession)(subqueryOptimized) + } lazy val sparkPlan: SparkPlan = { SparkSession.setActiveSession(sparkSession) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index b0bbcfc934cee..95113e4fabe6c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -33,6 +33,7 @@ import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.exchange.ShuffleExchange import org.apache.spark.sql.execution.joins.{BuildLeft, BuildRight} import org.apache.spark.sql.execution.streaming._ +import org.apache.spark.sql.execution.subquery.{CommonSubquery, CommonSubqueryExec} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.streaming.StreamingQuery @@ -417,6 +418,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { case r: LogicalRDD => RDDScanExec(r.output, r.rdd, "ExistingRDD", r.outputPartitioning, r.outputOrdering) :: Nil case BroadcastHint(child) => planLater(child) :: Nil + case c: CommonSubquery => CommonSubqueryExec(c.output, c) :: Nil case _ => Nil } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala index 730ca27f82bac..d9ab628c76a69 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala @@ -24,7 +24,9 @@ import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.{expressions, InternalRow} import org.apache.spark.sql.catalyst.expressions.{Expression, ExprId, InSet, Literal, PlanExpression} import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode} +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias} import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.subquery.SubqueryDedup import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{BooleanType, DataType, StructType} @@ -177,3 +179,16 @@ case class ReuseSubquery(conf: SQLConf) extends Rule[SparkPlan] { } } } + +/** + * Dedup common subqueries which are used more than once in the given [[LogicalPlan]]. + */ +case class DedupCommonSubqueries(sparkSession: SparkSession) extends Rule[LogicalPlan] { + def apply(plan: LogicalPlan): LogicalPlan = { + val dedup = new SubqueryDedup() + plan.transformDown { + case s @ SubqueryAlias(_, child, _, true) => + dedup.createCommonSubquery(sparkSession, child) + } + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery/CommonSubquery.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery/CommonSubquery.scala new file mode 100644 index 0000000000000..05cba8bb4578a --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery/CommonSubquery.scala @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.subquery + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.QueryPlan +import org.apache.spark.sql.catalyst.plans.logical +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Statistics} +import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.storage.StorageLevel +import org.apache.spark.util.Utils + +private[sql] case class SubqueryExecHelper(executedPlan: SparkPlan) { + private var _computedOutput: RDD[InternalRow] = null + + def computeOrGetResult(): RDD[InternalRow] = this.synchronized { + if (_computedOutput == null) { + _computedOutput = executedPlan.execute().mapPartitionsInternal { iter => + iter.map { row => + // A hack to create new UnsafeRow so each row can be serialized individually. + val copyRow = new UnsafeRow(row.numFields) + copyRow.pointTo(row.asInstanceOf[UnsafeRow]) + copyRow.asInstanceOf[InternalRow] + } + // Use serialized cache because under this mode the rows are written to serialized format + // when iterating. If we use deseralized mode, the rows will be queued and cached in batch. + // Because the rows are shared a common byte array, the bytes will be overritten in this mode. + }.persist(StorageLevel.MEMORY_ONLY_SER) + } + _computedOutput + } +} + +private[sql] case class CommonSubquery( + output: Seq[Attribute], + @transient child: SparkPlan)( + @transient val logicalChild: LogicalPlan, + private[sql] val _statistics: Statistics, + @transient private val execHelper: SubqueryExecHelper) + extends logical.LeafNode { + + override def argString: String = Utils.truncatedString(output, "[", ", ", "]") + + override protected def innerChildren: Seq[QueryPlan[_]] = Seq(logicalChild) + + override def producedAttributes: AttributeSet = outputSet + + override lazy val statistics: Statistics = _statistics + + lazy val numRows: Long = computedOutput.count + + def withOutput(newOutput: Seq[Attribute]): CommonSubquery = { + CommonSubquery(newOutput, child)(logicalChild, _statistics, execHelper) + } + + def computedOutput: RDD[InternalRow] = execHelper.computeOrGetResult() + + override protected def otherCopyArgs: Seq[AnyRef] = + Seq(logicalChild, _statistics, execHelper) +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery/CommonSubqueryExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery/CommonSubqueryExec.scala new file mode 100644 index 0000000000000..e61396fd626be --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery/CommonSubqueryExec.scala @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.subquery + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.QueryPlan +import org.apache.spark.sql.catalyst.plans.physical.Partitioning +import org.apache.spark.sql.execution.{LeafExecNode, UnaryExecNode} +import org.apache.spark.sql.execution.metric.SQLMetrics +import org.apache.spark.util.Utils + +private[sql] case class CommonSubqueryExec( + attributes: Seq[Attribute], + @transient subquery: CommonSubquery) + extends LeafExecNode { + + override protected def innerChildren: Seq[QueryPlan[_]] = Seq(subquery.child) + + override def argString: String = Utils.truncatedString(attributes, "[", ", ", "]") + + override lazy val metrics = Map( + "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows")) + + override def output: Seq[Attribute] = attributes + + override def outputPartitioning: Partitioning = subquery.child.outputPartitioning + + override def outputOrdering: Seq[SortOrder] = subquery.child.outputOrdering + + protected override def doExecute(): RDD[InternalRow] = { + val numOutputRows = longMetric("numOutputRows") + val subqueryOutput = subquery.computedOutput + numOutputRows += subquery.numRows + subqueryOutput + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery/SubqueryDedup.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery/SubqueryDedup.scala new file mode 100644 index 0000000000000..bf3f358a58b62 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery/SubqueryDedup.scala @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.subquery + +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.SparkSession + +/** Holds a subquery logical plan and its data */ +private[sql] case class CommonSubqueryItem(plan: LogicalPlan, subquery: CommonSubquery) + +/** + * Provides support to eliminate unnecessary subqueries execution. Unlike the queries that need to + * be cached with [[CacheManager]], the subqueries are only used in a single query but could be + * executed multiple times during the query execution. [[CommonSubquery]] is used to keep the + * common [[SparkPlan]] for the duplicate subqueries in a query. + */ +private[sql] class SubqueryDedup { + + private val subqueryData = new scala.collection.mutable.ArrayBuffer[CommonSubqueryItem] + + /** + * Creates a [[CommonSubquery]] for the given logical plan. A [[CommonSubquery]] + * wraps the output of the logical plan, the [[SparkPlan]] of the logical plan and its statistic. + * This method will first look up if there is already logical plan with the same results in the + * subqueries list. If so, it returns the previously created [[CommonSubquery]]. If not, + * this method will create a new [[CommonSubquery]]. Thus, all the logical plans which + * produce the same results, will refer to the same [[SparkPlan]] which will be executed + * only once at running stage. + */ + private[sql] def createCommonSubquery( + sparkSession: SparkSession, + planToDedup: LogicalPlan): CommonSubquery = { + val execution = sparkSession.sessionState.executePlan(planToDedup) + lookupCommonSubquery(planToDedup).map(_.subquery).getOrElse { + val common = + CommonSubqueryItem( + planToDedup, + CommonSubquery(planToDedup.output, + execution.executedPlan) + (execution.optimizedPlan, + planToDedup.statistics, + SubqueryExecHelper(execution.executedPlan))) + subqueryData += common + common.subquery + }.withOutput(planToDedup.output) + } + + /** Optionally returns common subquery for the given [[LogicalPlan]]. */ + private[sql] def lookupCommonSubquery(plan: LogicalPlan): Option[CommonSubqueryItem] = { + subqueryData.find(cd => plan.sameResult(cd.plan)) + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala index 2ef8b18c04612..f241b52c59d52 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala @@ -17,7 +17,12 @@ package org.apache.spark.sql +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, GreaterThan, Literal} +import org.apache.spark.sql.execution.{FilterExec, LocalTableScanExec} +import org.apache.spark.sql.execution.subquery.CommonSubqueryExec +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSQLContext +import org.apache.spark.sql.types.{DoubleType, IntegerType} class SubquerySuite extends QueryTest with SharedSQLContext { import testImplicits._ @@ -45,6 +50,22 @@ class SubquerySuite extends QueryTest with SharedSQLContext { row(null, 5.0), row(6, null)).toDF("c", "d") + lazy val tab1 = Seq( + row(1, 2), + row(2, 3), + row(5, 6), + row(6, 7), + row(10, 11), + row(11, 12)).toDF("c1", "c2") + + lazy val tab2 = Seq( + row(1, 2), + row(2, 3), + row(5, 6), + row(6, 7), + row(10, 11), + row(11, 12)).toDF("c1", "c2") + lazy val t = r.filter($"c".isNotNull && $"d".isNotNull) protected override def beforeAll(): Unit = { @@ -52,6 +73,8 @@ class SubquerySuite extends QueryTest with SharedSQLContext { l.createOrReplaceTempView("l") r.createOrReplaceTempView("r") t.createOrReplaceTempView("t") + tab1.createOrReplaceTempView("tab1") + tab2.createOrReplaceTempView("tab2") } test("SPARK-18854 numberedTreeString for subquery") { @@ -627,6 +650,158 @@ class SubquerySuite extends QueryTest with SharedSQLContext { Row(3.0, false) :: Row(5.0, true) :: Row(null, false) :: Row(null, true) :: Nil) } + test("Dedup subqueries") { + withSQLConf(SQLConf.CROSS_JOINS_ENABLED.key -> "true") { + withTempView("dedup") { + spark.range(10).createOrReplaceTempView("dedup") + val df = sql("WITH s AS (SELECT 1 FROM dedup) SELECT * FROM s s1 join s s2") + + val commonSubqueries = df.queryExecution.sparkPlan.collect { + case c: CommonSubqueryExec => c.subquery.child + }.distinct + assert(commonSubqueries.length == 1) + + val df2 = sql("WITH s1 AS (SELECT 1 FROM dedup), s2 AS (SELECT 1 FROM dedup) " + + "SELECT * FROM s1 JOIN (SELECT * FROM s1, s2)") + + val commonSubqueries2 = df2.queryExecution.sparkPlan.collect { + case c: CommonSubqueryExec => c.subquery.child + }.distinct + assert(commonSubqueries2.length == 1) + + val df3 = sql("WITH t1 AS (SELECT 1 AS id FROM dedup) SELECT * FROM t1 a, t1 b " + + "WHERE a.id = 1 AND b.id > 0") + + val commonSubqueries3 = df3.queryExecution.sparkPlan.collect { + case c: CommonSubqueryExec => c.subquery.child + }.distinct + assert(commonSubqueries3.length == 1) + } + + // Using a self-join as CTE to test if de-duplicated attributes work for this. + val df4 = sql("WITH j AS (SELECT * FROM (SELECT * FROM l JOIN l)) SELECT * FROM j j1, j j2") + val commonSubqueries4 = df4.queryExecution.sparkPlan.collect { + case c: CommonSubqueryExec => c.subquery.child + }.distinct + assert(commonSubqueries4.length == 1) + + val df4WithoutCTE = sql("SELECT * FROM (SELECT * FROM (SELECT * FROM l JOIN l)) j1, " + + "(SELECT * FROM (SELECT * FROM l JOIN l)) j2") + assert(df4.collect() === df4WithoutCTE.collect()) + + // CTE subquery refers to previous CTE subquery. + val df5 = sql("WITH cte AS (SELECT * FROM l a, l b), cte2 AS (SELECT * FROM cte j1, cte) " + + "SELECT * FROM cte2 j3, cte j4") + val commonSubqueries5 = df5.queryExecution.sparkPlan.collect { + case c: CommonSubqueryExec => c.subquery.child + }.distinct + assert(commonSubqueries5.length == 1) + } + } + + test("Dedup subqueries with optimization: Filter pushdown") { + withSQLConf(SQLConf.CROSS_JOINS_ENABLED.key -> "true") { + val df = sql("WITH cte AS (SELECT a.a AS a, a.b AS b, b.a AS c, b.b AS d FROM l a, l b) " + + "SELECT * FROM (SELECT * FROM cte WHERE a = 1) x JOIN (SELECT * FROM cte WHERE b = 1.0) y") + val commonSubqueries = df.queryExecution.sparkPlan.collect { + case c: CommonSubqueryExec => c.subquery.child + }.distinct + assert(commonSubqueries.length == 1) + val pushdownFilter = commonSubqueries(0).collect { + case f: FilterExec => f + } + assert(pushdownFilter.length == 1) + val intConditions = pushdownFilter(0).asInstanceOf[FilterExec].condition.collect { + case EqualTo(a: AttributeReference, Literal(i, IntegerType)) => (a.name, i) + }.distinct + assert(intConditions.length == 1 && intConditions(0)._1 == "a" && intConditions(0)._2 == 1) + val doubleConditions = pushdownFilter(0).asInstanceOf[FilterExec].condition.collect { + case EqualTo(a: AttributeReference, Literal(d, DoubleType)) => (a.name, d) + }.distinct + assert(doubleConditions.length == 1 && + doubleConditions(0)._1 == "b" && doubleConditions(0)._2 == 1.0) + + // There are two Filters: + // 1. x.a = 1 && x.b = 2.0 + // 2. x.b = 2.0 + // The conditions (x.a = 1 && x.b = 2.0) should be pushed down. + val df2 = sql("WITH cte AS (SELECT a.a AS a, a.b AS b, b.a AS c, b.b AS d FROM l a, l b) " + + "SELECT * FROM cte x, cte y WHERE x.b = y.b AND x.a = 1 AND x.b = 1 + 1") + val commonSubqueries2 = df2.queryExecution.sparkPlan.collect { + case c: CommonSubqueryExec => c.subquery.child + }.distinct + assert(commonSubqueries2.length == 1) + val pushdownFilter2 = commonSubqueries2(0).collect { + case f: FilterExec => f + } + assert(pushdownFilter2.length == 1) + val intConditions2 = pushdownFilter2(0).asInstanceOf[FilterExec].condition.collect { + case EqualTo(a: AttributeReference, Literal(i, IntegerType)) => (a.name, i) + }.distinct + assert(intConditions2.length == 1 && + intConditions2(0)._1 == "a" && intConditions2(0)._2 == 1) + val doubleConditions2 = pushdownFilter2(0).asInstanceOf[FilterExec].condition.collect { + case EqualTo(a: AttributeReference, Literal(d, DoubleType)) => (a.name, d) + }.distinct + assert(doubleConditions2.length == 1 && + doubleConditions2(0)._1 == "b" && doubleConditions2(0)._2 == 2.0) + + val df3 = sql("with cte as (select c1, case mycount when 0 then null else 1 end cov " + + "from (select tab1.c1, count(tab1.c2) as mycount from tab1, tab2 " + + "where tab1.c1 = tab2.c1 group by tab1.c1) foo " + + "where case mycount when 0 then null else 1 end > 1.0) " + + "select * from cte a, cte b where a.c1 > 5 and b.c1 > 10 and a.cov > 2") + val commonSubqueries3 = df3.queryExecution.sparkPlan.collect { + case c: CommonSubqueryExec => c.subquery.child + }.distinct + assert(commonSubqueries3.length == 1) + val pushdownFilter3 = commonSubqueries3(0).collect { + case f: FilterExec => f + } + // Besides the original Filter, two Filters are pushed down: + // One is tab1.c1 > 5 or tab1.c1 > 10, another one is tab2.c1 > 5 or tab2.c1 > 10. + assert(pushdownFilter3.length == 3) + val intConditions3 = pushdownFilter3(1).asInstanceOf[FilterExec].condition.collect { + case GreaterThan(a: AttributeReference, Literal(i, IntegerType)) => (a.name, i) + }.distinct + assert(intConditions3.length == 2) + assert(intConditions3(0)._2 == 5 && intConditions3(1)._2 == 10) + } + } + + test("Dedup subqueries with uncorrelated scalar subquery in CTE") { + val df = sql("WITH t1 AS (SELECT 1 AS b, 2 AS c), " + + "t2 AS (SELECT a FROM (SELECT 1 AS a UNION ALL SELECT 2 AS a) t " + + "WHERE a = (SELECT max(b) FROM t1)) SELECT * FROM t2 x UNION ALL SELECT * FROM t2 y") + checkAnswer(df, Array(Row(1), Row(1))) + val commonSubqueries = df.queryExecution.sparkPlan.collect { + case c: CommonSubqueryExec => c.subquery.child + }.distinct + assert(commonSubqueries.length == 1) + } + + test("Dedup subqueries with optimization: Project pushdown") { + withSQLConf(SQLConf.CROSS_JOINS_ENABLED.key -> "true") { + val df = sql("WITH cte AS (SELECT a.a AS a, a.b AS b, b.a AS c, b.b AS d FROM l a, l b) " + + "SELECT * FROM (SELECT a FROM cte) x JOIN (SELECT b FROM cte) y") + + val commonSubqueries = df.queryExecution.sparkPlan.collect { + case c: CommonSubqueryExec => c.subquery.child + }.distinct + assert(commonSubqueries.length == 1) + + val localTableScan = commonSubqueries(0).collect { + case l: LocalTableScanExec => l + } + + // As we project `a` and `b` which both are located at only one side at the inner join, + // one [[LocalTableScanExec]] will have empty output after column pruning. + assert(localTableScan.length == 2) + assert(localTableScan(0).asInstanceOf[LocalTableScanExec].output.isEmpty || + localTableScan(1).asInstanceOf[LocalTableScanExec].output.isEmpty) + } + } + test("SPARK-16804: Correlated subqueries containing LIMIT - 1") { withTempView("onerow") { Seq(1).toDF("c1").createOrReplaceTempView("onerow") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/subquery/SubqueryDedupSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/subquery/SubqueryDedupSuite.scala new file mode 100644 index 0000000000000..02510685f9c43 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/subquery/SubqueryDedupSuite.scala @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.subquery + +import org.apache.spark.sql.{Dataset, QueryTest} +import org.apache.spark.sql.catalyst.plans.logical.Project +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.test.SharedSQLContext + +class SubqueryDedupSuite extends QueryTest with SharedSQLContext { + import testImplicits._ + + test("Create common subquery") { + withSQLConf(SQLConf.CROSS_JOINS_ENABLED.key -> "true") { + val subqueryDedup = new SubqueryDedup() + + spark.range(10).createOrReplaceTempView("t1") + val df = sql("SELECT 1 FROM t1") + val project = df.queryExecution.analyzed.collect { + case p: Project => p + }.head + + val commonSubqueryAlias = subqueryDedup.createCommonSubquery(spark, project) + assert(commonSubqueryAlias.output === project.output) + assert(Dataset.ofRows(spark, commonSubqueryAlias).collect() === df.collect()) + + spark.range(10).createOrReplaceTempView("t2") + + val df2 = sql("SELECT * FROM t1 join t2") + val project2 = df2.queryExecution.analyzed.collect { + case p: Project => p + }.head + + val commonSubqueryAlias2 = subqueryDedup.createCommonSubquery(spark, project2) + assert(commonSubqueryAlias2.output === project2.output) + assert(Dataset.ofRows(spark, commonSubqueryAlias2).collect() === df2.collect()) + } + } +} diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala index 0a280b495215c..433b09985ba23 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala @@ -62,7 +62,7 @@ class HiveMetastoreCatalogSuite extends TestHiveSingleton with SQLTestUtils { spark.sql("create view vw1 as select 1 as id") val plan = spark.sql("select id from vw1").queryExecution.analyzed val aliases = plan.collect { - case x @ SubqueryAlias("vw1", _, Some(TableIdentifier("vw1", Some("default")))) => x + case x @ SubqueryAlias("vw1", _, Some(TableIdentifier("vw1", Some("default"))), _) => x } assert(aliases.size == 1) }