Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
4f52877
Eliminate outer join before project
ankurdave Aug 4, 2015
ae46ab0
Use KeyHint to do join elimination
ankurdave Aug 12, 2015
df9ef14
Add foreign keys
ankurdave Aug 12, 2015
b22f702
Alias-aware join elimination + bugfixes
ankurdave Aug 13, 2015
9072cb7
Propagate foreign keys through Join operator
ankurdave Aug 13, 2015
f430ea2
Remove key hints after join elimination
ankurdave Aug 13, 2015
1302531
Support inner joins based on referential integrity
ankurdave Aug 17, 2015
35949f5
Correctness fixes for join elimination
ankurdave Aug 18, 2015
945e523
Do key hint resolution during analysis
ankurdave Aug 19, 2015
504c9d8
Don't crash when foreign key refers to unresolved relation
ankurdave Aug 19, 2015
83c8ff9
Fix JoinEliminationSuite
ankurdave Aug 19, 2015
0b0b840
Merge remote-tracking branch 'apache-spark/master' into GraphFrames
ankurdave Aug 19, 2015
9150dda
Fix KeyHintSuite after merge
ankurdave Aug 19, 2015
873b322
In ForeignKey, store referencedRelation as logical plan
ankurdave Oct 13, 2015
98e0b5e
Use semanticEquals for Attributes
ankurdave Oct 13, 2015
d43a2c0
Remove TODOs
ankurdave Oct 13, 2015
f4e7e01
Add more comments
ankurdave Oct 13, 2015
49b196e
Merge remote-tracking branch 'apache-spark/master' into GraphFrames
ankurdave Oct 13, 2015
578797c
Use SharedSQLContext in KeyHintSuite
ankurdave Oct 13, 2015
7c7357b
Remove long URLs
ankurdave Oct 13, 2015
5071759
Fix override of KeyHint#transformExpressions{Up,Down}
ankurdave Oct 13, 2015
ec2b80b
Declare new DataFrame methods extra-experimental
ankurdave Oct 13, 2015
55bb135
Explain why we keep old keys in self-join rewrite
ankurdave Oct 13, 2015
e1ec23d
Revert "Fix override of KeyHint#transformExpressions{Up,Down}"
ankurdave Oct 15, 2015
0cd8a91
Update transformExpressions override comments
ankurdave Oct 15, 2015
5abceae
Merge remote-tracking branch 'apache-spark/master' into SPARK-11077-J…
ankurdave Nov 6, 2015
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -383,14 +383,35 @@ class Analyzer(
j
case Some((oldRelation, newRelation)) =>
val attributeRewrites = AttributeMap(oldRelation.output.zip(newRelation.output))
val newRight = right transformUp {
case r if r == oldRelation => newRelation
} transformUp {
case other => other transformExpressions {
case a: Attribute => attributeRewrites.get(a).getOrElse(a)
def applyRewrites(plan: LogicalPlan): LogicalPlan =
plan transformUp {
case r if r == oldRelation => newRelation
} transformUp {
case other => other transformExpressions {
case a: Attribute => attributeRewrites.get(a).getOrElse(a)
}
}
}
j.copy(right = newRight)
val newRight = applyRewrites(right)
// Also apply the rewrites to foreign keys on the left side, because these are meant to
// reference the right side.
val newLeft =
if (left.keys.nonEmpty) {
left.transform {
case KeyHint(keys, child) =>
val newKeys = keys.collect {
case ForeignKey(attr, referencedRelation, referencedAttr) =>
ForeignKey(
attr,
applyRewrites(referencedRelation),
attributeRewrites.get(referencedAttr).getOrElse(referencedAttr))
}
// Keep the old keys as well to accommodate future self-joins
KeyHint((keys ++ newKeys).distinct, child)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't we just use newKeys here? Why do we need to keep old keys?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good eye! This is to accommodate future self-joins. If we got rid of the old foreign keys, a future self-join would not recognize that the new keys applied to it, because the attributes would have been rewritten. I just added a comment noting this.

There's a unit test that covers this (fails if you remove the old keys).

}
} else {
left
}
j.copy(left = newLeft, right = newRight)
}

// When resolve `SortOrder`s in Sort based on child, don't report errors as
Expand All @@ -399,6 +420,32 @@ class Analyzer(
val newOrdering = resolveSortOrders(ordering, child, throws = false)
Sort(newOrdering, global, child)

// Resolve referenced attributes of foreign keys using the referenced relation
case h @ KeyHint(keys, child) if child.resolved && !h.foreignKeyReferencesResolved =>
KeyHint(keys.map {
case ForeignKey(k, r, u @ UnresolvedAttribute(nameParts)) => withPosition(u) {
// The referenced relation r is itself guaranteed to be resolved already, so we can
// resolve u against it
val referencedAttr = r.resolve(nameParts, resolver).getOrElse(u).toAttribute

// Enforce the constraint that foreign keys can only reference unique keys
if (referencedAttr.resolved) {
val referencedAttrIsUnique = r.keys.exists {
case UniqueKey(attr) if attr semanticEquals referencedAttr => true
case _ => false
}
if (!referencedAttrIsUnique) {
failAnalysis("Foreign keys can only reference unique keys, but " +
s"$k references $referencedAttr which is not unique.")
}
}

ForeignKey(k, r, referencedAttr)
}

case otherKey => otherKey
}, child)

// A special case for Generate, because the output of Generate should not be resolved by
// ResolveReferences. Attributes in the output will be resolved by ResolveGenerate.
case g @ Generate(generator, join, outer, qualifier, output, child)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import scala.collection.immutable.HashSet
import org.apache.spark.sql.catalyst.analysis.{CleanupAliases, EliminateSubQueries}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.Inner
import org.apache.spark.sql.catalyst.planning.ExtractEquiJoinKeys
import org.apache.spark.sql.catalyst.plans.FullOuter
import org.apache.spark.sql.catalyst.plans.LeftOuter
import org.apache.spark.sql.catalyst.plans.RightOuter
Expand All @@ -32,14 +33,7 @@ import org.apache.spark.sql.types._
abstract class Optimizer extends RuleExecutor[LogicalPlan]

object DefaultOptimizer extends Optimizer {
val batches =
// SubQueries are only needed for analysis and can be removed before execution.
Batch("Remove SubQueries", FixedPoint(100),
EliminateSubQueries) ::
Batch("Aggregate", FixedPoint(100),
ReplaceDistinctWithAggregate,
RemoveLiteralFromGroupExpressions) ::
Batch("Operator Optimizations", FixedPoint(100),
val operatorOptimizations = Seq(
// Operator push down
SetOperationPushDown,
SamplePushDown,
Expand All @@ -50,6 +44,8 @@ object DefaultOptimizer extends Optimizer {
ColumnPruning,
// Operator combine
ProjectCollapsing,
KeyHintCollapsing,
JoinElimination,
CombineFilters,
CombineLimits,
// Constant folding
Expand All @@ -61,7 +57,21 @@ object DefaultOptimizer extends Optimizer {
RemoveDispensable,
SimplifyFilters,
SimplifyCasts,
SimplifyCaseConversionExpressions) ::
SimplifyCaseConversionExpressions)

val batches =
// SubQueries are only needed for analysis and can be removed before execution.
Batch("Remove SubQueries", FixedPoint(100),
EliminateSubQueries) ::
Batch("Aggregate", FixedPoint(100),
ReplaceDistinctWithAggregate,
RemoveLiteralFromGroupExpressions) ::
// Hints are necessary for some operator optimizations but interfere with others, so we run the
// rules with them, then remove them and run the rules again.
Batch("Operator Optimizations", FixedPoint(100),
operatorOptimizations: _*) ::
Batch("Remove Hints", FixedPoint(100),
(RemoveKeyHints +: operatorOptimizations): _*) ::
Batch("Decimal Optimizations", FixedPoint(100),
DecimalAggregates) ::
Batch("LocalRelation", FixedPoint(100),
Expand Down Expand Up @@ -325,6 +335,46 @@ object ProjectCollapsing extends Rule[LogicalPlan] {
}
}

/**
* Combines two adjacent [[KeyHint]]s into one by merging their key lists.
*/
object KeyHintCollapsing extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
case KeyHint(keys1, KeyHint(keys2, child)) =>
KeyHint((keys1 ++ keys2).distinct, child)
}
}

/**
* Eliminates keyed equi-joins when followed by a [[Project]] that only keeps columns from one side.
*/
object JoinElimination extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
case CanEliminateUniqueKeyOuterJoin(outer, projectList) =>
Project(projectList, outer)
case CanEliminateReferentialIntegrityJoin(parent, child, primaryForeignMap, projectList) =>
Project(substituteParentForChild(projectList, parent, primaryForeignMap), child)
}

/**
* In the given expressions, substitute all references to parent columns with references to the
* corresponding child columns. The `primaryForeignMap` contains these equivalences, extracted
* from the equality join expressions.
*/
private def substituteParentForChild(
expressions: Seq[NamedExpression],
parent: LogicalPlan,
primaryForeignMap: AttributeMap[Attribute])
: Seq[NamedExpression] = {
expressions.map(_.transform {
case a: Attribute =>
if (parent.outputSet.contains(a)) Alias(primaryForeignMap(a), a.name)(a.exprId)
else a
}.asInstanceOf[NamedExpression])
}

}

/**
* Simplifies LIKE expressions that do not need full regular expressions to evaluate the condition.
* For example, when the expression is just checking to see if a string starts with a given
Expand Down Expand Up @@ -844,6 +894,15 @@ object SimplifyCaseConversionExpressions extends Rule[LogicalPlan] {
}
}

/**
* Removes [[KeyHint]]s from the plan to avoid interfering with other rules.
*/
object RemoveKeyHints extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
case KeyHint(_, child) => child
}
}

/**
* Speeds up aggregates on fixed-precision decimals by executing them on unscaled Long values.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalyst.optimizer

import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.planning.ExtractEquiJoinKeys
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._

/**
* Finds left or right outer joins where only the outer table's columns are kept, and a key from the
* inner table is involved in the join so no duplicates would be generated.
*/
object CanEliminateUniqueKeyOuterJoin {
/** (outer, projectList) */
type ReturnType = (LogicalPlan, Seq[NamedExpression])

def unapply(plan: LogicalPlan): Option[ReturnType] = plan match {
case p @ Project(projectList,
ExtractEquiJoinKeys(
joinType @ (LeftOuter | RightOuter), leftJoinExprs, rightJoinExprs, _, left, right)) =>
val (outer, inner, innerJoinExprs) = (joinType: @unchecked) match {
case LeftOuter => (left, right, rightJoinExprs)
case RightOuter => (right, left, leftJoinExprs)
}

val onlyOuterColsKept = AttributeSet(projectList).subsetOf(outer.outputSet)

val innerUniqueKeys = AttributeSet(inner.keys.collect { case UniqueKey(attr) => attr })
val innerKeyIsInvolved = innerUniqueKeys.intersect(AttributeSet(innerJoinExprs)).nonEmpty

if (onlyOuterColsKept && innerKeyIsInvolved) {
Some((outer, projectList))
} else {
None
}

case _ => None
}
}

/**
* Finds joins based on foreign-key referential integrity, followed by [[Project]]s that reference
* no columns from the parent table other than the referenced unique keys. Such joins can be
* eliminated and replaced by the child table.
*
* The table containing the foreign key is referred to as the child table, while the table
* containing the referenced unique key is referred to as the parent table.
*
* For inner joins, all involved foreign keys must be non-nullable.
*/
object CanEliminateReferentialIntegrityJoin {
/** (parent, child, primaryForeignMap, projectList) */
type ReturnType =
(LogicalPlan, LogicalPlan, AttributeMap[Attribute], Seq[NamedExpression])

def unapply(plan: LogicalPlan): Option[ReturnType] = plan match {
case p @ Project(projectList, ExtractEquiJoinKeys(
joinType @ (Inner | LeftOuter | RightOuter),
leftJoinExprs, rightJoinExprs, _, left, right)) =>
val innerJoin = joinType == Inner

val leftParentPFM = getPrimaryForeignMap(left, right, leftJoinExprs, rightJoinExprs)
val rightForeignKeysAreNonNullable = leftParentPFM.values.forall(!_.nullable)
val leftIsParent =
(leftParentPFM.nonEmpty && onlyPrimaryKeysKept(projectList, leftParentPFM, left)
&& (!innerJoin || rightForeignKeysAreNonNullable))

val rightParentPFM = getPrimaryForeignMap(right, left, rightJoinExprs, leftJoinExprs)
val leftForeignKeysAreNonNullable = rightParentPFM.values.forall(!_.nullable)
val rightIsParent =
(rightParentPFM.nonEmpty && onlyPrimaryKeysKept(projectList, rightParentPFM, right)
&& (!innerJoin || leftForeignKeysAreNonNullable))

if (leftIsParent) {
Some((left, right, leftParentPFM, projectList))
} else if (rightIsParent) {
Some((right, left, rightParentPFM, projectList))
} else {
None
}

case _ => None
}

/**
* Return a map where, for each PK=FK join expression based on referential integrity between
* `parent` and `child`, the unique key from `parent` is mapped to its corresponding foreign
* key from `child`.
*/
private def getPrimaryForeignMap(
parent: LogicalPlan,
child: LogicalPlan,
parentJoinExprs: Seq[Expression],
childJoinExprs: Seq[Expression])
: AttributeMap[Attribute] = {
val primaryKeys = AttributeSet(parent.keys.collect { case UniqueKey(attr) => attr })
val foreignKeys = new ForeignKeyFinder(child, parent)
AttributeMap(parentJoinExprs.zip(childJoinExprs).collect {
case (parentExpr: NamedExpression, childExpr: NamedExpression)
if primaryKeys.contains(parentExpr.toAttribute)
&& foreignKeys.foreignKeyExists(childExpr.toAttribute, parentExpr.toAttribute) =>
(parentExpr.toAttribute, childExpr.toAttribute)
})
}

/**
* Return true if `kept` references no columns from `parent` except those involved in a PK=FK
* join expression. Such join expressions are stored in `primaryForeignMap`.
*/
private def onlyPrimaryKeysKept(
kept: Seq[NamedExpression],
primaryForeignMap: AttributeMap[Attribute],
parent: LogicalPlan)
: Boolean = {
AttributeSet(kept).forall { keptAttr =>
if (parent.outputSet.contains(keptAttr)) {
primaryForeignMap.contains(keptAttr)
} else {
true
}
}
}
}

private class ForeignKeyFinder(plan: LogicalPlan, referencedPlan: LogicalPlan) {
val equivalent = equivalences(referencedPlan)

def foreignKeyExists(attr: Attribute, referencedAttr: Attribute): Boolean = {
plan.keys.exists {
case ForeignKey(attr2, _, referencedAttr2)
if (attr semanticEquals attr2)
&& equivalent.query(referencedAttr, referencedAttr2) => true
case _ => false
}
}

private def equivalences(plan: LogicalPlan): MutableDisjointAttributeSets = {
val s = new MutableDisjointAttributeSets
plan.collect {
case Project(projectList, _) => projectList.collect {
case a @ Alias(old: Attribute, _) => s.union(old, a.toAttribute)
}
}
s
}
}

private class MutableDisjointAttributeSets() {
private var sets = Set[AttributeSet]()
def add(x: Attribute): Unit = {
if (!sets.exists(_.contains(x))) {
sets += AttributeSet(x)
}
}
def union(x: Attribute, y: Attribute): Unit = {
add(x)
add(y)
val xSet = sets.find(_.contains(x)).get
val ySet = sets.find(_.contains(y)).get
sets -= xSet
sets -= ySet
sets += (xSet ++ ySet)
}
def query(x: Attribute, y: Attribute): Boolean = {
(x semanticEquals y) || sets.exists(s => s.contains(x) && s.contains(y))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,12 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging {
}
}

/**
* The unique and foreign key constraints that will hold for the output of this plan. Specific
* plan nodes can override this to introduce or propagate keys.
*/
def keys: Seq[Key] = Seq.empty

/**
* Computes [[Statistics]] for this plan. The default implementation assumes the output
* cardinality is the product of of all child plan's cardinality, i.e. applies in the case
Expand Down
Loading