Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
6bfdb32
Dedup common subqueries.
viirya Aug 1, 2016
229ae31
Add logical node CommonSubqueryAlias to represent subquery in CTE.
viirya Aug 4, 2016
734b050
Merge remote-tracking branch 'upstream/master' into single-exec-subquery
viirya Aug 10, 2016
70af8d6
Update with the change of access privileges for metrics.
viirya Aug 10, 2016
ba11d34
Use optimized plan of common subquery.
viirya Aug 10, 2016
4680994
Add test case for self-join.
viirya Aug 10, 2016
11815da
Synchonized on computed output of common subquery.
viirya Aug 10, 2016
5282de7
Fix some bugs.
viirya Aug 12, 2016
bdb6e84
Merge remote-tracking branch 'upstream/master' into single-exec-subquery
viirya Aug 12, 2016
9fe1fbe
Reuse all subqueries, instead of CTE subqueries.
viirya Aug 12, 2016
a14459c
Can not use optimizedPlan when replacing common subquery because we n…
viirya Aug 12, 2016
0d5eea7
Merge remote-tracking branch 'upstream/master' into single-exec-subquery
viirya Aug 15, 2016
9d7a15d
Optimization of common subqueries.
viirya Aug 18, 2016
5ef961d
Merge remote-tracking branch 'upstream/master' into single-exec-subquery
viirya Aug 18, 2016
f3aa0aa
Fix a bug and add test.
viirya Aug 19, 2016
e094c14
Pushdowned projection list should use attributes.
viirya Aug 19, 2016
50f0e19
Merge remote-tracking branch 'upstream/master' into single-exec-subquery
viirya Aug 19, 2016
f0954cd
Fix a bug in filter pushdown.
viirya Aug 23, 2016
6a8011b
Fix filter pushdown.
viirya Aug 25, 2016
6cb40f1
Skip duplicating table scan nodes.
viirya Aug 25, 2016
6a08486
Don't do nested common subquery deduplication.
viirya Aug 29, 2016
3337303
Merge remote-tracking branch 'upstream/master' into single-exec-subquery
viirya Aug 29, 2016
c6d987f
Change to common subquery query plans to make the query tree more rea…
viirya Aug 29, 2016
38c57e8
Add executed plan into otherCopyArgs.
viirya Aug 29, 2016
49eefb0
Fix the issue.
viirya Aug 30, 2016
79c45ac
Continue fixing query plan string.
viirya Aug 30, 2016
5729bb2
Improve filter pushdown.
viirya Aug 31, 2016
e9b0952
Fix filter pushdown again.
viirya Sep 1, 2016
e1d5b05
Merge remote-tracking branch 'upstream/master' into single-exec-subquery
viirya Sep 3, 2016
e1d050f
Fix subquery execution.
viirya Sep 3, 2016
6d79beb
Fix the cnf.
viirya Sep 5, 2016
bc70354
Fix predicate explosion.
viirya Sep 9, 2016
23e2dc8
Deal with no pushdown predicate.
viirya Sep 9, 2016
cebfbf5
Merge remote-tracking branch 'upstream/master' into single-exec-subquery
viirya Oct 5, 2016
9faf90a
Merge remote-tracking branch 'upstream/master' into single-exec-subquery
viirya Dec 23, 2016
f153c12
Cache the rdd of common subquery.
viirya Dec 26, 2016
aeba1c3
Cleaning some codes.
viirya Dec 28, 2016
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,17 @@ public void pointTo(Object baseObject, long baseOffset, int sizeInBytes) {
this.sizeInBytes = sizeInBytes;
}

/**
* Update this UnsafeRow to point to different UnsafeRow.
*
* @param other the UnsafeRow to point to
*/
public void pointTo(UnsafeRow other) {
this.baseObject = other.baseObject;
this.baseOffset = other.baseOffset;
this.sizeInBytes = other.sizeInBytes;
}

/**
* Update this UnsafeRow to point to the underlying byte array.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2219,7 +2219,7 @@ class Analyzer(
*/
object EliminateSubqueryAliases extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
case SubqueryAlias(_, child, _) => child
case SubqueryAlias(_, child, _, _) => child
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import scala.annotation.tailrec
import scala.collection.immutable.HashSet
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.HashMap

import org.apache.spark.api.java.function.FilterFunction
import org.apache.spark.sql.AnalysisException
Expand Down Expand Up @@ -51,7 +52,7 @@ abstract class Optimizer(sessionCatalog: SessionCatalog, conf: CatalystConf)
// However, because we also use the analyzer to canonicalized queries (for view definition),
// we do not eliminate subqueries or compute current time in the analyzer.
Batch("Finish Analysis", Once,
EliminateSubqueryAliases,
EliminateNonDuplicatedSubqueryAliases,
ReplaceExpressions,
ComputeCurrentTime,
GetCurrentDatabase(sessionCatalog),
Expand Down Expand Up @@ -92,6 +93,8 @@ abstract class Optimizer(sessionCatalog: SessionCatalog, conf: CatalystConf)
CombineFilters,
CombineLimits,
CombineUnions,
// Pushdown Filters again after combination
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change is submitted at #14912. Because it prevents this PR pushdown some predicates, for the easy of test, I also include that change here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#14912 is changed. But the changed solution is more complicated so I don't want to include it here. So I keep this change and wait for #14912 to be merged first.

PushDownPredicate,
// Constant folding and strength reduction
NullPropagation,
FoldablePropagation,
Expand Down Expand Up @@ -421,6 +424,9 @@ object ColumnPruning extends Rule[LogicalPlan] {
// Can't prune the columns on LeafNode
case p @ Project(_, _: LeafNode) => p

// Don't prune the columns on common subquery.
case p @ Project(_, SubqueryAlias(_, _, _, true)) => p

// for all other logical plans that inherits the output from it's children
case p @ Project(_, child) =>
val required = child.references ++ p.references
Expand Down Expand Up @@ -1167,3 +1173,211 @@ object RemoveRepetitionFromGroupExpressions extends Rule[LogicalPlan] {
a.copy(groupingExpressions = newGrouping)
}
}

/**
* Optimizes the logical plans wrapped in SubqueryAlias and operators on them.
* The SubqueryAlias which are remaining in optimization phase are common subqueries,
* i.e., they are duplicate in the whole query plan. The logical plans wrapped in
* SubqueryAlias will be executed individually later. However, some operators such as
* Project and Filter can be optimized with the wrapped logical plans. Thus, this rule
* considers the optimization of the wrapped logical plans and operators on SubqueryAlias.
*/
case class OptimizeCommonSubqueries(optimizer: Optimizer)
extends Rule[LogicalPlan] with PredicateHelper {
// Optimized the subqueries which all have a Project parent node and the same results.
private def optimizeProjectWithSubqueries(
plan: LogicalPlan,
keyPlan: LogicalPlan,
subqueries: ArrayBuffer[LogicalPlan]): LogicalPlan = {
plan transform {
case p @ Project(pList, s @ SubqueryAlias(alias, subquery, v, true))
if s.sameResult(keyPlan) =>
val pListForAll: Seq[NamedExpression] = subqueries.flatMap { case Project(pList, child) =>
val rewrites = buildRewrites(child, subquery)
pList.map(pushToOtherPlan(_, rewrites))
}

val newSubquery = Project(pListForAll, subquery)
val optimized = optimizer.execute(newSubquery)
// Check if any optimization is performed.
if (optimized.sameResult(newSubquery)) {
// No optimization happens. Let's keep original subquery.
p
} else {
Project(pList.map(_.toAttribute), SubqueryAlias(alias, newSubquery, v, true))
}
}
}

/**
* Maps Attributes from the source side to the corresponding Attribute on the target side.
*/
private def buildRewrites(source: LogicalPlan, target: LogicalPlan): AttributeMap[Attribute] = {
assert(source.output.size == target.output.size)
AttributeMap(source.output.zip(target.output))
}

/**
* Rewrites an expression so that it can be pushed to another LogicalPlan.
*/
private def pushToOtherPlan[A <: Expression](e: A, rewrites: AttributeMap[Attribute]) = {
val result = e transformUp {
case a: Attribute => rewrites.get(a).getOrElse(a)
}

// We must promise the compiler that we did not discard the names in the case of project
// expressions. This is safe since the only transformation is from Attribute => Attribute.
result.asInstanceOf[A]
}

private def optimizeFilterWithSubqueries(
plan: LogicalPlan,
keyPlan: LogicalPlan,
subqueries: ArrayBuffer[LogicalPlan]): LogicalPlan = {
var pushdownConds = splitConjunctivePredicates(subqueries(0).asInstanceOf[Filter].condition)
subqueries.tail.foreach {
case Filter(otherCond, child) =>
val rewrites = buildRewrites(child, subqueries(0).asInstanceOf[Filter].child)
// We can't simply push down all conditions from other Filter by concatenating them with
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This part has been extracted out as #15558 and can be removed if that PR is merged.

// [[Or]]. Because if any conditions contains a sub-condition which can't be pushed down
// through intermediate operators, it makes all concatenated conditions not pushed doen.
// E.g., first condition is [a && b] and second condition is [c]. If b can't be pushed
// down, the final condition [[a && b] || c] can't be pushed down too.
val pushdowns = new ArrayBuffer[Expression]()
splitConjunctivePredicates(otherCond).foreach { cond =>
val rewritten = pushToOtherPlan(cond, rewrites)
pushdownConds.flatMap { pushdown =>
val subConds = splitDisjunctivePredicates(pushdown)
val orCond = Or(pushdown, rewritten)
// To avoid exponential explosion of predicates, we skip [[IsNotNull]] and predicates
// which semantically equal to existing predicates.
if (rewritten.isInstanceOf[IsNotNull]
|| pushdown.isInstanceOf[IsNotNull]
|| subConds.exists(rewritten.semanticEquals(_))
|| pushdowns.exists(orCond.semanticEquals(_))
|| pushdownConds.exists(orCond.semanticEquals(_))) {
None
} else {
Some(orCond)
}
}.map { cond =>
if (!pushdowns.exists(cond.semanticEquals(_))) {
pushdowns += cond
}
}
}
pushdownConds = pushdowns.toSeq
}
// No pushdown for common subqueries.
if (pushdownConds.isEmpty) {
plan
} else {
val finalPushdownCondition: Expression = pushdownConds.reduce(And)
plan transformDown {
case f @ Filter(cond, s @ SubqueryAlias(a, subquery, v, true)) if s.sameResult(keyPlan) =>
val pushdownCond: Expression = subqueries.foldLeft(finalPushdownCondition) {
case (currentCond, sub) =>
val rewrites = buildRewrites(sub.asInstanceOf[Filter].child, subquery)
pushToOtherPlan(currentCond, rewrites)
}

val newSubquery = Filter(pushdownCond, subquery)
val optimized = optimizer.execute(newSubquery)

// Check if any optimization is performed.
if (optimized.sameResult(newSubquery)) {
// No optimization happens. Let's keep original subquery.
f
} else {
Filter(cond, SubqueryAlias(a, newSubquery, v, true))
}
}
}
}

def apply(plan: LogicalPlan): LogicalPlan = {
val subqueryMap = HashMap.empty[LogicalPlan, ArrayBuffer[LogicalPlan]]

// Constructs the groups of the subqueries with the same results.
plan.foreach {
case u: UnaryNode
if u.child.isInstanceOf[SubqueryAlias] &&
u.child.asInstanceOf[SubqueryAlias].commonSubquery =>

val child = u.child.asInstanceOf[SubqueryAlias].child
// Looking for the existing group with the same results.
subqueryMap.find { case (key, _) =>
if (key.sameResult(child)) {
true
} else {
false
}
}.map { case (_, subqueries) =>
// If found, add current logical plan into this group.
subqueries += u
}.getOrElse {
// If not, create a new group.
subqueryMap += ((child, ArrayBuffer[LogicalPlan](u)))
}
case _ =>
}

// Begins to optimize common SubqueryAlias with outside operators.
// We only need to take care two cases:
// 1. All subqueries have a Project on them.
// 2. All subqueries have a Filter on them.
var currentPlan = plan
subqueryMap.foreach { case (key, subqueries) =>
if (subqueries.length > 1) {
val allProject = subqueries.forall(_.isInstanceOf[Project])
if (allProject) {
currentPlan = optimizeProjectWithSubqueries(currentPlan, key, subqueries)
} else {
val allFilter = subqueries.forall(_.isInstanceOf[Filter])
if (allFilter) {
currentPlan = optimizeFilterWithSubqueries(currentPlan, key, subqueries)
}
}
}
}
currentPlan
}
}

/**
* Removes the [[SubqueryAlias]] operators which are not duplicated in the query plan.
*/
object EliminateNonDuplicatedSubqueryAliases extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = {
val subqueries = ArrayBuffer[LogicalPlan]()
val duplicateSubqueries = ArrayBuffer[LogicalPlan]()

// Eliminates the recursive subqueries which have the same output.
val cleanedPlan = plan.transformDown {
case s @ SubqueryAlias(_, child, _, _)
if child.find(p => p.isInstanceOf[SubqueryAlias] && p.sameResult(s)).isDefined =>
child
}

// Collects duplicated subqueries but ignores the SubqueryAlias of table scan.
cleanedPlan.foreach {
case SubqueryAlias(_, child, _, _) if !child.isInstanceOf[MultiInstanceRelation] =>
if (subqueries.indexWhere(s => s.sameResult(child)) >= 0) {
duplicateSubqueries += child
} else {
subqueries += child
}
case _ =>
}

// Eliminates non-duplicated subqueries.
cleanedPlan.transformDown {
case SubqueryAlias(alias, child, v, _) =>
if (duplicateSubqueries.indexWhere(s => s.sameResult(child)) < 0) {
child
} else {
SubqueryAlias(alias, child, v, commonSubquery = true)
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ object RewriteCorrelatedScalarSubquery extends Rule[LogicalPlan] {
// and Project operators, followed by an optional Filter, followed by an
// Aggregate. Traverse the operators recursively.
def evalPlan(lp : LogicalPlan) : Map[ExprId, Option[Any]] = lp match {
case SubqueryAlias(_, child, _) => evalPlan(child)
case SubqueryAlias(_, child, _, false) => evalPlan(child)
case Filter(condition, child) =>
val bindings = evalPlan(child)
if (bindings.isEmpty) bindings
Expand Down Expand Up @@ -221,7 +221,7 @@ object RewriteCorrelatedScalarSubquery extends Rule[LogicalPlan] {
topPart += p
bottomPart = child

case s @ SubqueryAlias(_, child, _) =>
case s @ SubqueryAlias(_, child, _, false) =>
topPart += s
bottomPart = child

Expand Down Expand Up @@ -292,8 +292,8 @@ object RewriteCorrelatedScalarSubquery extends Rule[LogicalPlan] {
topPart.reverse.foreach {
case Project(projList, _) =>
subqueryRoot = Project(projList ++ havingInputs, subqueryRoot)
case s @ SubqueryAlias(alias, _, None) =>
subqueryRoot = SubqueryAlias(alias, subqueryRoot, None)
case s @ SubqueryAlias(alias, _, None, false) =>
subqueryRoot = SubqueryAlias(alias, subqueryRoot, None, false)
case op => sys.error(s"Unexpected operator $op in corelated subquery")
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -701,13 +701,37 @@ case class LocalLimit(limitExpr: Expression, child: LogicalPlan) extends UnaryNo
}
}

/**
* Represents a subquery in query plan.
* @param alias The name of this subquery.
* @param child The logical plan of the subquery.
* @param commonSubquery Whether this subquery is a common subquery, i.e., the logical plan
* referred more than once in the query plan. Default: false.
*/
case class SubqueryAlias(
alias: String,
child: LogicalPlan,
view: Option[TableIdentifier])
view: Option[TableIdentifier],
commonSubquery: Boolean = false)
extends UnaryNode {

override def output: Seq[Attribute] = child.output.map(_.withQualifier(Some(alias)))

override def sameResult(plan: LogicalPlan): Boolean = plan match {
case c: SubqueryAlias =>
val thisChild = child.collectFirst {
case p: LogicalPlan if !p.isInstanceOf[SubqueryAlias] => p
}
val otherChild = c.child.collectFirst {
case p: LogicalPlan if !p.isInstanceOf[SubqueryAlias] => p
}
if (thisChild.isDefined && otherChild.isDefined) {
thisChild.get.sameResult(otherChild.get)
} else {
false
}
case o => child.sameResult(o)
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -451,7 +451,7 @@ class SQLBuilder private (

object RemoveSubqueriesAboveSQLTable extends Rule[LogicalPlan] {
override def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
case SubqueryAlias(_, t @ ExtractSQLTable(_), _) => t
case SubqueryAlias(_, t @ ExtractSQLTable(_), _, _) => t
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker
import org.apache.spark.sql.catalyst.optimizer.OptimizeCommonSubqueries
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, ReturnAnswer}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.util.DateTimeUtils
Expand Down Expand Up @@ -70,7 +71,11 @@ class QueryExecution(val sparkSession: SparkSession, val logical: LogicalPlan) {
sparkSession.sharedState.cacheManager.useCachedData(analyzed)
}

lazy val optimizedPlan: LogicalPlan = sparkSession.sessionState.optimizer.execute(withCachedData)
lazy val optimizedPlan: LogicalPlan = {
val optimized = sparkSession.sessionState.optimizer.execute(withCachedData)
val subqueryOptimized = OptimizeCommonSubqueries(sparkSession.sessionState.optimizer)(optimized)
DedupCommonSubqueries(sparkSession)(subqueryOptimized)
}

lazy val sparkPlan: SparkPlan = {
SparkSession.setActiveSession(sparkSession)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.exchange.ShuffleExchange
import org.apache.spark.sql.execution.joins.{BuildLeft, BuildRight}
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.execution.subquery.{CommonSubquery, CommonSubqueryExec}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.streaming.StreamingQuery

Expand Down Expand Up @@ -417,6 +418,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
case r: LogicalRDD =>
RDDScanExec(r.output, r.rdd, "ExistingRDD", r.outputPartitioning, r.outputOrdering) :: Nil
case BroadcastHint(child) => planLater(child) :: Nil
case c: CommonSubquery => CommonSubqueryExec(c.output, c) :: Nil
case _ => Nil
}
}
Expand Down
Loading