Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,6 @@ case class CostBasedJoinReorder(conf: SQLConf) extends Rule[LogicalPlan] with Pr

private def reorder(plan: LogicalPlan, output: Seq[Attribute]): LogicalPlan = {
val (items, conditions) = extractInnerJoins(plan)
// TODO: Compute the set of star-joins and use them in the join enumeration
// algorithm to prune un-optimal plan choices.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the overall approach? Run star join reorder rule first, and make the DP join reorder rule respect star join and keep the join order generated by star join reorder rule?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan Star-schema detection is first called to compute the set of tables connected by star-schema relationship e.g. {F1, D1, D2} in our code example. This call does not do any join reordering among the tables. It simply computes the set of tables in a star-schema relationship. Then, DP join enumeration generates all possible plan combinations among the entire set of tables in a the join e.g. {F1, D1}, {F1, T1}, {T2, T3}, etc. Star-filter, if called, will eliminate plan combinations among the star and non-star tables until the star join combinations are built. For example, {F1, D1} combination will be retained since it involves tables in a star schema, but {F1, T1} will be eliminated since it mixes star and non-star tables. Star-filter simply decides what combinations to retain but it will not decide on the order of execution of those tables. The order of the joins within a star-join and for the overall plan is decided by the DP join enumeration. Star-filter only ensures that tables in a star-join are planned together.

Copy link
Contributor

@cloud-fan cloud-fan Apr 11, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, so if users enable the star join reorder and cbo join reorder rules together, stat join will still be overwritten by cbo join reorder rule?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan That’s correct. If CBO is enabled, it will do the final planning.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we have a plan to completely merge these 2 rules?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan Once CBO is enabled by default, I can remove the call from ReorderJoin.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What the meaning of "remove the call from ReorderJoin"?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@viirya Star-schema detection is called from both CostBasedJoinReorder and ReorderJoin. In the latter case, it is called to reorder star-joins based on heuristics if cbo is disabled.

When cost-based optimizer becomes the default optimizer, we don’t need to reorder star-joins in ReorderJoin based on heuristics since the cost-based optimizer will choose the best plan based on cost.

val result =
// Do reordering if the number of items is appropriate and join conditions exist.
// We also need to check if costs of all items can be evaluated.
Expand Down Expand Up @@ -150,12 +148,15 @@ object JoinReorderDP extends PredicateHelper with Logging {
case (item, id) => Set(id) -> JoinPlan(Set(id), item, Set(), Cost(0, 0))
}.toMap)

// Build filters from the join graph to be used by the search algorithm.
val filters = JoinReorderDPFilters.buildJoinGraphInfo(conf, items, conditions, itemIndex)
Copy link
Contributor

@cloud-fan cloud-fan Apr 12, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why call it filters? should we name it joinInfo?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan I call it "filters" since the join graph information is used as filters on top of the DP join enumeration. It suggests the purpose for which the graph info was gathered.

If this is confusing, I can rename. Let me know.


// Build plans for next levels until the last level has only one plan. This plan contains
// all items that can be joined, so there's no need to continue.
val topOutputSet = AttributeSet(output)
while (foundPlans.size < items.length && foundPlans.last.size > 1) {
while (foundPlans.size < items.length) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wzhfy Condition "foundPlans.last.size > 1" does not apply when filters are used with the join enumeration since not all the plan combinations are generated. Without filters, I think the condition will always be satisfied, so I removed it completely. Let me know if you have a counter example.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea we should remove this.

// Build plans for the next level.
foundPlans += searchLevel(foundPlans, conf, conditions, topOutputSet)
foundPlans += searchLevel(foundPlans, conf, conditions, topOutputSet, filters)
}

val durationInMs = (System.nanoTime() - startTime) / (1000 * 1000)
Expand All @@ -179,7 +180,8 @@ object JoinReorderDP extends PredicateHelper with Logging {
existingLevels: Seq[JoinPlanMap],
conf: SQLConf,
conditions: Set[Expression],
topOutput: AttributeSet): JoinPlanMap = {
topOutput: AttributeSet,
filters: Option[JoinGraphInfo]): JoinPlanMap = {

val nextLevel = mutable.Map.empty[Set[Int], JoinPlan]
var k = 0
Expand All @@ -200,7 +202,7 @@ object JoinReorderDP extends PredicateHelper with Logging {
}

otherSideCandidates.foreach { otherSidePlan =>
buildJoin(oneSidePlan, otherSidePlan, conf, conditions, topOutput) match {
buildJoin(oneSidePlan, otherSidePlan, conf, conditions, topOutput, filters) match {
case Some(newJoinPlan) =>
// Check if it's the first plan for the item set, or it's a better plan than
// the existing one due to lower cost.
Expand All @@ -218,28 +220,48 @@ object JoinReorderDP extends PredicateHelper with Logging {
}

/**
* Builds a new JoinPlan when both conditions hold:
* Builds a new JoinPlan if the following conditions hold:
* - the sets of items contained in left and right sides do not overlap.
* - there exists at least one join condition involving references from both sides.
* - if star-join filter is enabled, allow the following combinations:
* 1) (oneJoinPlan U otherJoinPlan) is a subset of star-join
* 2) star-join is a subset of (oneJoinPlan U otherJoinPlan)
* 3) (oneJoinPlan U otherJoinPlan) is a subset of non star-join
*
* @param oneJoinPlan One side JoinPlan for building a new JoinPlan.
* @param otherJoinPlan The other side JoinPlan for building a new join node.
* @param conf SQLConf for statistics computation.
* @param conditions The overall set of join conditions.
* @param topOutput The output attributes of the final plan.
* @param filters Join graph info to be used as filters by the search algorithm.
* @return Builds and returns a new JoinPlan if both conditions hold. Otherwise, returns None.
*/
private def buildJoin(
oneJoinPlan: JoinPlan,
otherJoinPlan: JoinPlan,
conf: SQLConf,
conditions: Set[Expression],
topOutput: AttributeSet): Option[JoinPlan] = {
topOutput: AttributeSet,
filters: Option[JoinGraphInfo]): Option[JoinPlan] = {

if (oneJoinPlan.itemIds.intersect(otherJoinPlan.itemIds).nonEmpty) {
// Should not join two overlapping item sets.
return None
}

if (filters.isDefined) {
// Apply star-join filter, which ensures that tables in a star schema relationship
// are planned together. The star-filter will eliminate joins among star and non-star
// tables until the star joins are built. The following combinations are allowed:
// 1. (oneJoinPlan U otherJoinPlan) is a subset of star-join
// 2. star-join is a subset of (oneJoinPlan U otherJoinPlan)
// 3. (oneJoinPlan U otherJoinPlan) is a subset of non star-join
val isValidJoinCombination =
JoinReorderDPFilters.starJoinFilter(oneJoinPlan.itemIds, otherJoinPlan.itemIds,
filters.get)
if (!isValidJoinCombination) return None
}

val onePlan = oneJoinPlan.plan
val otherPlan = otherJoinPlan.plan
val joinConds = conditions
Expand Down Expand Up @@ -327,3 +349,109 @@ object JoinReorderDP extends PredicateHelper with Logging {
case class Cost(card: BigInt, size: BigInt) {
def +(other: Cost): Cost = Cost(this.card + other.card, this.size + other.size)
}

/**
* Implements optional filters to reduce the search space for join enumeration.
*
* 1) Star-join filters: Plan star-joins together since they are assumed
* to have an optimal execution based on their RI relationship.
* 2) Cartesian products: Defer their planning later in the graph to avoid
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We already have this logic in the dp join reorder algorithm.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan DP join enumeration relies on ReorderJoin to pull up the Cartesian products. Instead, I think that Cartesian pull-up should be implemented as another filter on top of DP join enumeration.

* large intermediate results (expanding joins, in general).
* 3) Composite inners: Don't generate "bushy tree" plans to avoid materializing
* intermediate results.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't get it, doesn't left-deep tree materialize intermediate results?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan Left-deep trees are executed in a pipelined fashion. Given the following join trees:

left-deep tree:

     join
    /     \ 
  join     t3 
 /     \
t1     t2

bushy-tree:

  join
 /     \
t1      join
        /    \
      t2     t3

The bushy-tree plan (right-deep in this case) requires the result of (t2 join t3) to be materialized before joining with t1. The left-deep tree doesn’t have this requirement.

*
* Filters (2) and (3) are not implemented.
*/
object JoinReorderDPFilters extends PredicateHelper {
/**
* Builds join graph information to be used by the filtering strategies.
* Currently, it builds the sets of star/non-star joins.
* It can be extended with the sets of connected/unconnected joins, which
* can be used to filter Cartesian products.
*/
def buildJoinGraphInfo(
conf: SQLConf,
items: Seq[LogicalPlan],
conditions: Set[Expression],
itemIndex: Seq[(LogicalPlan, Int)]): Option[JoinGraphInfo] = {

if (conf.joinReorderDPStarFilter) {
// Compute the tables in a star-schema relationship.
val starJoin = StarSchemaDetection(conf).findStarJoins(items, conditions.toSeq)
val nonStarJoin = items.filterNot(starJoin.contains(_))

if (starJoin.nonEmpty && nonStarJoin.nonEmpty) {
val itemMap = itemIndex.toMap
Some(JoinGraphInfo(starJoin.map(itemMap).toSet, nonStarJoin.map(itemMap).toSet))
} else {
// Nothing interesting to return.
None
}
} else {
// Star schema filter is not enabled.
None
}
}

/**
* Applies the star-join filter that eliminates join combinations among star
* and non-star tables until the star join is built.
*
* Given the oneSideJoinPlan/otherSideJoinPlan, which represent all the plan
* permutations generated by the DP join enumeration, and the star/non-star plans,
* the following plan combinations are allowed:
* 1. (oneSideJoinPlan U otherSideJoinPlan) is a subset of star-join
* 2. star-join is a subset of (oneSideJoinPlan U otherSideJoinPlan)
* 3. (oneSideJoinPlan U otherSideJoinPlan) is a subset of non star-join
*
* It assumes the sets are disjoint.
*
* Example query graph:
*
* t1 d1 - t2 - t3
* \ /
* f1
* |
* d2
*
* star: {d1, f1, d2}
* non-star: {t2, t1, t3}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we also have an example about outer and inner?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan outer/inner i.e. oneJoinPlan/otherJoinPlan represent all the plan permutations generated by JoinReorderDP. For example, at level 1, join enumeration will combine plans from level 0 e.g. oneJoinPlan = (f1) and otherJoinPlan = (d1). At level 2, it will generate plans from plan combinations at level 0 and level 1 e.g. oneJoinPlan = (d2) and otherJoinPlan = {f1, d1}, and so on. I will clarify the comment with more details.

*
* level 0: (f1 ), (d2 ), (t3 ), (d1 ), (t1 ), (t2 )
* level 1: {t3 t2 }, {f1 d2 }, {f1 d1 }
* level 2: {d2 f1 d1 }
* level 3: {t1 d1 f1 d2 }, {t2 d1 f1 d2 }
* level 4: {d1 t2 f1 t1 d2 }, {d1 t3 t2 f1 d2 }
* level 5: {d1 t3 t2 f1 t1 d2 }
*
* @param oneSideJoinPlan One side of the join represented as a set of plan ids.
* @param otherSideJoinPlan The other side of the join represented as a set of plan ids.
* @param filters Star and non-star plans represented as sets of plan ids
*/
def starJoinFilter(
oneSideJoinPlan: Set[Int],
otherSideJoinPlan: Set[Int],
filters: JoinGraphInfo) : Boolean = {
val starJoins = filters.starJoins
val nonStarJoins = filters.nonStarJoins
val join = oneSideJoinPlan.union(otherSideJoinPlan)

// Disjoint sets
oneSideJoinPlan.intersect(otherSideJoinPlan).isEmpty &&
// Either star or non-star is empty
(starJoins.isEmpty || nonStarJoins.isEmpty ||
// Join is a subset of the star-join
join.subsetOf(starJoins) ||
// Star-join is a subset of join
starJoins.subsetOf(join) ||
Copy link
Member

@viirya viirya Apr 6, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Won't a star join be reordered later?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@viirya The star filter ensures that star-joins will be planned together. It is the cost based optimizer that decide on the best execution plan within a star-join. Let me know if I answer your question.

Copy link
Member

@viirya viirya Apr 7, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ReorderJoin will reorder the star join plans based on heuristics. Doesn't this cost-based join reorder rule breaks the order created by ReorderJoin? Here we only ask this rule doesn't try to reorder part of star join plans and non-star join plans, but it still can reorder the order among star join plans.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't this cost-based join reorder rule breaks the order created by ReorderJoin?

This is expected from cost based reordering. ReorderJoin only puts connected items together, the order among these items is not optimized.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If so, with this added filter, CostBasedJoinReorder can also let the star join plans together, right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So do we still need ReorderJoin? Looks like we don't need it anymore if we don't care about the order created by it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ReorderJoin is done heuristically. It can be useful when cbo is off.

Copy link
Member

@viirya viirya Apr 7, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh. right. forgot that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@viirya The cost-based optimizer will find the best plan for the star-join. The star filter is a heuristic within join enumeration to limit the join sequences evaluated.

// Join is a subset of non-star
join.subsetOf(nonStarJoins))
}
}

/**
* Helper class that keeps information about the join graph as sets of item/plan ids.
* It currently stores the star/non-star plans. It can be
* extended with the set of connected/unconnected plans.
*/
case class JoinGraphInfo (starJoins: Set[Int], nonStarJoins: Set[Int])
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ case class StarSchemaDetection(conf: SQLConf) extends PredicateHelper {

val emptyStarJoinPlan = Seq.empty[LogicalPlan]

if (!conf.starSchemaDetection || input.size < 2) {
if (input.size < 2) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed unnecessary call to conf.starSchemaDetection.

emptyStarJoinPlan
} else {
// Find if the input plans are eligible for star join detection.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -736,6 +736,12 @@ object SQLConf {
.checkValue(weight => weight >= 0 && weight <= 1, "The weight value must be in [0, 1].")
.createWithDefault(0.7)

val JOIN_REORDER_DP_STAR_FILTER =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any cases we don't want to enable this if cbo is enabled?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@viirya Star join plans are expected to have an optimal execution based on their referential integrity constraints among the tables. It is a good heuristic. I expect that once CBO is enabled by default, star joins will also be enabled.

Copy link
Member

@viirya viirya Apr 7, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we can have this as true by default? Or even we don't need this flag.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@viirya Yes, we can enable the feature by default, but I would like to keep the filter under the config option.

buildConf("spark.sql.cbo.joinReorder.dp.star.filter")
.doc("Applies star-join filter heuristics to cost based join enumeration.")
.booleanConf
.createWithDefault(false)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The logics will be enabled if and only if both conf.cboEnabled and conf.joinReorderEnabled are true. Thus, it is safe to be true by default?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gatorsmile Regardless of the default value, I still want to control the filters with their own knobs. The filters are applied on top of the join enumeration. They need to have their own control.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am fine to keep this conf. I am just thinking whether we should change the default.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gatorsmile I am also fine with changing the default.
@wzhfy What do you think?

Copy link
Contributor

@ron8hu ron8hu Apr 7, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In Spark 2.2, we introduced a couple of new configuration parameters in optimizer area. In order to play on the safe side, we set the default value to false. I suggest that we can change the default value to true AFTER we are sure that the new optimizer feature does not cause any regression. I think the system regression/integration test suites can help us make a decision.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ron8hu Thank you. We will keep the default false.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea I also think we keep the default false.


val STARSCHEMA_DETECTION = buildConf("spark.sql.cbo.starSchemaDetection")
.doc("When true, it enables join reordering based on star schema detection. ")
.booleanConf
Expand Down Expand Up @@ -1011,6 +1017,8 @@ class SQLConf extends Serializable with Logging {

def joinReorderCardWeight: Double = getConf(SQLConf.JOIN_REORDER_CARD_WEIGHT)

def joinReorderDPStarFilter: Boolean = getConf(SQLConf.JOIN_REORDER_DP_STAR_FILTER)

def windowExecBufferSpillThreshold: Int = getConf(WINDOW_EXEC_BUFFER_SPILL_THRESHOLD)

def sortMergeJoinExecBufferSpillThreshold: Int =
Expand Down
Loading