@@ -91,23 +91,58 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
9191 * Select the proper physical plan for join based on joining keys and size of logical plan.
9292 *
9393 * At first, uses the [[ExtractEquiJoinKeys ]] pattern to find joins where at least some of the
94- * predicates can be evaluated by matching join keys. If found, Join implementations are chosen
94+ * predicates can be evaluated by matching join keys. If found, join implementations are chosen
9595 * with the following precedence:
9696 *
97- * - Broadcast: We prefer to broadcast the join side with an explicit broadcast hint(e.g. the
98- * user applied the [[org.apache.spark.sql.functions.broadcast() ]] function to a DataFrame).
99- * If both sides have the broadcast hint, we prefer to broadcast the side with a smaller
100- * estimated physical size. If neither one of the sides has the broadcast hint,
101- * we only broadcast the join side if its estimated physical size that is smaller than
102- * the user-configurable [[SQLConf.AUTO_BROADCASTJOIN_THRESHOLD ]] threshold.
97+ * - Broadcast hash join (BHJ):
98+ * BHJ is not supported for full outer join. For right outer join, we only can broadcast the
99+ * left side. For left outer, left semi, left anti and the internal join type ExistenceJoin,
100+ * we only can broadcast the right side. For inner like join, we can broadcast both sides.
101+ * Normally, BHJ can perform faster than the other join algorithms when the broadcast side is
102+ * small. However, broadcasting tables is a network-intensive operation. It could cause OOM
103+ * or perform worse than the other join algorithms, especially when the build/broadcast side
104+ * is big.
105+ *
106+ * For the supported cases, users can specify the broadcast hint (e.g. the user applied the
107+ * [[org.apache.spark.sql.functions.broadcast() ]] function to a DataFrame) and session-based
108+ * [[SQLConf.AUTO_BROADCASTJOIN_THRESHOLD ]] threshold to adjust whether BHJ is used and
109+ * which join side is broadcast.
110+ *
111+ * 1) Broadcast the join side with the broadcast hint, even if the size is larger than
112+ * [[SQLConf.AUTO_BROADCASTJOIN_THRESHOLD ]]. If both sides have the hint (only when the type
113+ * is inner like join), the side with a smaller estimated physical size will be broadcast.
114+ * 2) Respect the [[SQLConf.AUTO_BROADCASTJOIN_THRESHOLD ]] threshold and broadcast the side
115+ * whose estimated physical size is smaller than the threshold. If both sides are below the
116+ * threshold, broadcast the smaller side. If neither is smaller, BHJ is not used.
117+ *
103118 * - Shuffle hash join: if the average size of a single partition is small enough to build a hash
104119 * table.
120+ *
105121 * - Sort merge: if the matching join keys are sortable.
106122 *
107123 * If there is no joining keys, Join implementations are chosen with the following precedence:
108- * - BroadcastNestedLoopJoin: if one side of the join could be broadcasted
109- * - CartesianProduct: for Inner join
110- * - BroadcastNestedLoopJoin
124+ * - BroadcastNestedLoopJoin (BNLJ):
125+ * BNLJ supports all the join types but the impl is OPTIMIZED for the following scenarios:
126+ * For right outer join, the left side is broadcast. For left outer, left semi, left anti
127+ * and the internal join type ExistenceJoin, the right side is broadcast. For inner like
128+ * joins, either side is broadcast.
129+ *
130+ * Like BHJ, users still can specify the broadcast hint and session-based
131+ * [[SQLConf.AUTO_BROADCASTJOIN_THRESHOLD ]] threshold to impact which side is broadcast.
132+ *
133+ * 1) Broadcast the join side with the broadcast hint, even if the size is larger than
134+ * [[SQLConf.AUTO_BROADCASTJOIN_THRESHOLD ]]. If both sides have the hint (i.e., just for
135+ * inner-like join), the side with a smaller estimated physical size will be broadcast.
136+ * 2) Respect the [[SQLConf.AUTO_BROADCASTJOIN_THRESHOLD ]] threshold and broadcast the side
137+ * whose estimated physical size is smaller than the threshold. If both sides are below the
138+ * threshold, broadcast the smaller side. If neither is smaller, BNLJ is not used.
139+ *
140+ * - CartesianProduct: for inner like join, CartesianProduct is the fallback option.
141+ *
142+ * - BroadcastNestedLoopJoin (BNLJ):
143+ * For the other join types, BNLJ is the fallback option. Here, we just pick the broadcast
144+ * side with the broadcast hint. If neither side has a hint, we broadcast the side with
145+ * the smaller estimated physical size.
111146 */
112147 object JoinSelection extends Strategy with PredicateHelper {
113148
@@ -140,8 +175,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
140175 }
141176
142177 private def canBuildRight (joinType : JoinType ): Boolean = joinType match {
143- case _ : InnerLike | LeftOuter | LeftSemi | LeftAnti => true
144- case j : ExistenceJoin => true
178+ case _ : InnerLike | LeftOuter | LeftSemi | LeftAnti | _ : ExistenceJoin => true
145179 case _ => false
146180 }
147181
@@ -244,7 +278,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
244278
245279 // --- Without joining keys ------------------------------------------------------------
246280
247- // Pick BroadcastNestedLoopJoin if one side could be broadcasted
281+ // Pick BroadcastNestedLoopJoin if one side could be broadcast
248282 case j @ logical.Join (left, right, joinType, condition)
249283 if canBroadcastByHints(joinType, left, right) =>
250284 val buildSide = broadcastSideByHints(joinType, left, right)
0 commit comments