Skip to content

Commit c9ec7c9

Browse files
committed
[SPARK-20233] Move star-filter conf as a method param and other changes.
1 parent 179747f commit c9ec7c9

File tree

3 files changed

+21
-24
lines changed

3 files changed

+21
-24
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala

Lines changed: 20 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,7 @@ object JoinReorderDP extends PredicateHelper with Logging {
149149
}.toMap)
150150

151151
// Build filters from the join graph to be used by the search algorithm.
152-
val filters = JoinReorderDPFilters(conf).buildJoinGraphInfo(items, conditions, itemIndex)
152+
val filters = JoinReorderDPFilters.buildJoinGraphInfo(conf, items, conditions, itemIndex)
153153

154154
// Build plans for next levels until the last level has only one plan. This plan contains
155155
// all items that can be joined, so there's no need to continue.
@@ -249,15 +249,15 @@ object JoinReorderDP extends PredicateHelper with Logging {
249249
return None
250250
}
251251

252-
if (conf.joinReorderDPStarFilter && filters.isDefined) {
252+
if (filters.isDefined) {
253253
// Apply star-join filter, which ensures that tables in a star schema relationship
254254
// are planned together. The star-filter will eliminate joins among star and non-star
255255
// tables until the star joins are built. The following combinations are allowed:
256256
// 1. (oneJoinPlan U otherJoinPlan) is a subset of star-join
257257
// 2. star-join is a subset of (oneJoinPlan U otherJoinPlan)
258258
// 3. (oneJoinPlan U otherJoinPlan) is a subset of non star-join
259259
val isValidJoinCombination =
260-
JoinReorderDPFilters(conf).starJoinFilter(oneJoinPlan.itemIds, otherJoinPlan.itemIds,
260+
JoinReorderDPFilters.starJoinFilter(oneJoinPlan.itemIds, otherJoinPlan.itemIds,
261261
filters.get)
262262
if (!isValidJoinCombination) return None
263263
}
@@ -362,34 +362,33 @@ case class Cost(card: BigInt, size: BigInt) {
362362
*
363363
* Filters (2) and (3) are not implemented.
364364
*/
365-
case class JoinReorderDPFilters(conf: SQLConf) extends PredicateHelper {
365+
object JoinReorderDPFilters extends PredicateHelper {
366366
/**
367367
* Builds join graph information to be used by the filtering strategies.
368368
* Currently, it builds the sets of star/non-star joins.
369369
* It can be extended with the sets of connected/unconnected joins, which
370370
* can be used to filter Cartesian products.
371371
*/
372372
def buildJoinGraphInfo(
373+
conf: SQLConf,
373374
items: Seq[LogicalPlan],
374375
conditions: Set[Expression],
375-
planIndex: Seq[(LogicalPlan, Int)]): Option[JoinGraphInfo] = {
376-
377-
// Compute the tables in a star-schema relationship.
378-
val starJoin = StarSchemaDetection(conf).findStarJoins(items, conditions.toSeq)
379-
val nonStarJoin = items.filterNot(starJoin.contains(_))
380-
381-
if (starJoin.nonEmpty && nonStarJoin.nonEmpty) {
382-
val (starInt, nonStarInt) = planIndex.collect {
383-
case (p, i) if starJoin.contains(p) =>
384-
(Some(i), None)
385-
case (p, i) if nonStarJoin.contains(p) =>
386-
(None, Some(i))
387-
case _ =>
388-
(None, None)
389-
}.unzip
390-
Some(JoinGraphInfo(starInt.flatten.toSet, nonStarInt.flatten.toSet))
376+
itemIndex: Seq[(LogicalPlan, Int)]): Option[JoinGraphInfo] = {
377+
378+
if (conf.joinReorderDPStarFilter) {
379+
// Compute the tables in a star-schema relationship.
380+
val starJoin = StarSchemaDetection(conf).findStarJoins(items, conditions.toSeq)
381+
val nonStarJoin = items.filterNot(starJoin.contains(_))
382+
383+
if (starJoin.nonEmpty && nonStarJoin.nonEmpty) {
384+
val itemMap = itemIndex.toMap
385+
Some(JoinGraphInfo(starJoin.map(itemMap).toSet, nonStarJoin.map(itemMap).toSet))
386+
} else {
387+
// Nothing interesting to return.
388+
None
389+
}
391390
} else {
392-
// Nothing interesting to return.
391+
// Star schema filter is not enabled.
393392
None
394393
}
395394
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/StarSchemaDetection.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ case class StarSchemaDetection(conf: SQLConf) extends PredicateHelper {
7676

7777
val emptyStarJoinPlan = Seq.empty[LogicalPlan]
7878

79-
if (!conf.starSchemaDetection || input.size < 2) {
79+
if (input.size < 2) {
8080
emptyStarJoinPlan
8181
} else {
8282
// Find if the input plans are eligible for star join detection.

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/StarJoinCostBasedReorderSuite.scala

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,10 +31,8 @@ import org.apache.spark.sql.internal.SQLConf._
3131
class StarJoinCostBasedReorderSuite extends PlanTest with StatsEstimationTestBase {
3232

3333
override val conf = new SQLConf().copy(
34-
CASE_SENSITIVE -> true,
3534
CBO_ENABLED -> true,
3635
JOIN_REORDER_ENABLED -> true,
37-
STARSCHEMA_DETECTION -> true,
3836
JOIN_REORDER_DP_STAR_FILTER -> true)
3937

4038
object Optimize extends RuleExecutor[LogicalPlan] {

0 commit comments

Comments
 (0)