Skip to content

Commit 975eb46

Browse files
Add DefaultJoin Strategy
1 parent 024a1fb commit 975eb46

File tree

4 files changed

+31
-36
lines changed

4 files changed

+31
-36
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,9 @@ class SparkPlanner(val sqlContext: SQLContext) extends SparkStrategies {
4444
EquiJoinSelection ::
4545
InMemoryScans ::
4646
BasicOperators ::
47+
BroadcastNestedLoop ::
4748
CartesianProduct ::
48-
BroadcastNestedLoopJoin :: Nil)
49+
DefaultJoin :: Nil)
4950

5051
/**
5152
* Used to build table scan operators where complex projection and filtering are done using

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala

Lines changed: 26 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -294,46 +294,24 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
294294
}
295295
}
296296

297-
298-
object BroadcastNestedLoopJoin extends Strategy {
297+
object BroadcastNestedLoop extends Strategy {
299298
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
300-
case logical.Join(left, right, joinType, condition) =>
301-
val buildSide =
302-
if (right.statistics.sizeInBytes <= left.statistics.sizeInBytes) {
303-
joins.BuildRight
304-
} else {
305-
joins.BuildLeft
306-
}
307-
joins.BroadcastNestedLoopJoin(
308-
planLater(left), planLater(right), buildSide, joinType, condition) :: Nil
299+
case logical.Join(
300+
CanBroadcast(left), right, joinType, condition) if joinType != LeftSemiJoin =>
301+
execution.joins.BroadcastNestedLoopJoin(
302+
planLater(left), planLater(right), joins.BuildLeft, joinType, condition) :: Nil
303+
case logical.Join(
304+
left, CanBroadcast(right), joinType, condition) if joinType != LeftSemiJoin =>
305+
execution.joins.BroadcastNestedLoopJoin(
306+
planLater(left), planLater(right), joins.BuildRight, joinType, condition) :: Nil
309307
case _ => Nil
310308
}
311309
}
312310

313311
object CartesianProduct extends Strategy {
314312
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
315-
// Not like the equal-join, BroadcastNestedLoopJoin doesn't support condition
316-
// for cartesian join, as in cartesian join, probably, the records satisfy the
317-
// condition, but exists in another partition of the large table, so we may not able
318-
// to eliminate the duplicates.
319-
case logical.Join(
320-
CanBroadcast(left), right, joinType @ (FullOuter | LeftOuter | RightOuter), None) =>
321-
execution.joins.BroadcastNestedLoopJoin(
322-
planLater(left), planLater(right), joins.BuildLeft, joinType, None) :: Nil
323-
case logical.Join(
324-
left, CanBroadcast(right), joinType @ (FullOuter | LeftOuter | RightOuter), None) =>
325-
execution.joins.BroadcastNestedLoopJoin(
326-
planLater(left), planLater(right), joins.BuildRight, joinType, None) :: Nil
327-
// Since BroadCastNestedLoopJoin supports condition already, we simply passed it down.
328-
case logical.Join(
329-
CanBroadcast(left), right, Inner, condition) =>
330-
execution.joins.BroadcastNestedLoopJoin(
331-
planLater(left), planLater(right), joins.BuildLeft, Inner, condition) :: Nil
332-
case logical.Join(
333-
left, CanBroadcast(right), Inner, condition) =>
334-
execution.joins.BroadcastNestedLoopJoin(
335-
planLater(left), planLater(right), joins.BuildRight, Inner, condition) :: Nil
336-
case logical.Join(left, right, _, None) =>
313+
// TODO CartesianProduct doesn't support the Left Semi Join
314+
case logical.Join(left, right, joinType, None) if joinType != LeftSemiJoin =>
337315
execution.joins.CartesianProduct(planLater(left), planLater(right)) :: Nil
338316
case logical.Join(left, right, Inner, Some(condition)) =>
339317
execution.Filter(condition,
@@ -342,6 +320,21 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
342320
}
343321
}
344322

323+
object DefaultJoin extends Strategy {
324+
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
325+
case logical.Join(left, right, joinType, condition) =>
326+
val buildSide =
327+
if (right.statistics.sizeInBytes <= left.statistics.sizeInBytes) {
328+
joins.BuildRight
329+
} else {
330+
joins.BuildLeft
331+
}
332+
joins.BroadcastNestedLoopJoin(
333+
planLater(left), planLater(right), buildSide, joinType, condition) :: Nil
334+
case _ => Nil
335+
}
336+
}
337+
345338
protected lazy val singleRowRdd = sparkContext.parallelize(Seq(InternalRow()), 1)
346339

347340
object TakeOrderedAndProject extends Strategy {

sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoin.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ case class BroadcastNestedLoopJoin(
6868
case FullOuter =>
6969
left.output.map(_.withNullability(true)) ++ right.output.map(_.withNullability(true))
7070
case Inner => left.output ++ right.output
71-
case x =>
71+
case x => // TODO support the Left Semi Join
7272
throw new IllegalArgumentException(
7373
s"BroadcastNestedLoopJoin should not take $x as the JoinType")
7474
}

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -587,8 +587,9 @@ class HiveContext private[hive](
587587
LeftSemiJoin,
588588
EquiJoinSelection,
589589
BasicOperators,
590+
BroadcastNestedLoop,
590591
CartesianProduct,
591-
BroadcastNestedLoopJoin
592+
DefaultJoin
592593
)
593594
}
594595

0 commit comments

Comments
 (0)