From 387feeed0dd642720592f88db336ad935bf0f47e Mon Sep 17 00:00:00 2001 From: Cheng Su Date: Tue, 21 Jul 2020 17:33:43 -0700 Subject: [PATCH 1/2] Preserve hash join (BHJ and SHJ) stream side ordering --- .../spark/sql/execution/joins/HashJoin.scala | 2 + .../org/apache/spark/sql/JoinSuite.scala | 38 +++++++++++++++++++ 2 files changed, 40 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala index 8d9ba54f6568d..638b0ddb6ff6a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala @@ -54,6 +54,8 @@ trait HashJoin extends BaseJoinExec { override def outputPartitioning: Partitioning = streamedPlan.outputPartitioning + override def outputOrdering: Seq[SortOrder] = streamedPlan.outputOrdering + protected lazy val (buildPlan, streamedPlan) = buildSide match { case BuildLeft => (left, right) case BuildRight => (right, left) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala index b4f626270cfc9..589ccbd8b636b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala @@ -1104,4 +1104,42 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan }) } } + + test("SPARK-32383: Preserve hash join (BHJ and SHJ) stream side ordering") { + val df1 = spark.range(100).select($"id".as("k1")) + val df2 = spark.range(100).select($"id".as("k2")) + val df3 = spark.range(3).select($"id".as("k3")) + val df4 = spark.range(100).select($"id".as("k4")) + + // Test broadcast hash join + withSQLConf( + SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "50") { + val plan = df1.join(df2, $"k1" === $"k2") + .join(df3, $"k1" === $"k3") + .join(df4, $"k1" === $"k4") + .queryExecution + .executedPlan + assert(plan.collect { case _: SortMergeJoinExec => true }.size === 2) + assert(plan.collect { case _: BroadcastHashJoinExec => true }.size === 1) + // No extra sort before last sort merge join + assert(plan.collect { case _: SortExec => true }.size === 3) + } + + // Test shuffled hash join + withSQLConf( + SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "50", + SQLConf.SHUFFLE_PARTITIONS.key -> "2", + SQLConf.PREFER_SORTMERGEJOIN.key -> "false") { + val df3 = spark.range(10).select($"id".as("k3")) + val plan = df1.join(df2, $"k1" === $"k2") + .join(df3, $"k1" === $"k3") + .join(df4, $"k1" === $"k4") + .queryExecution + .executedPlan + assert(plan.collect { case _: SortMergeJoinExec => true }.size === 2) + assert(plan.collect { case _: ShuffledHashJoinExec => true }.size === 1) + // No extra sort before last sort merge join + assert(plan.collect { case _: SortExec => true }.size === 3) + } + } } From df8b32a51d0b128fcf106506ecdb4eb9c33e0bc0 Mon Sep 17 00:00:00 2001 From: Cheng Su Date: Thu, 23 Jul 2020 10:01:56 -0700 Subject: [PATCH 2/2] Explicitly match join type and build side for partitioning and ordering --- .../spark/sql/execution/joins/HashJoin.scala | 36 +++++++++++++++- .../org/apache/spark/sql/JoinSuite.scala | 41 +++++++++++-------- 2 files changed, 57 insertions(+), 20 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala index 638b0ddb6ff6a..fa3a55aa5ad94 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala @@ -52,9 +52,41 @@ trait HashJoin extends BaseJoinExec { } } - override def outputPartitioning: Partitioning = streamedPlan.outputPartitioning + override def outputPartitioning: Partitioning = buildSide match { + case BuildLeft => + joinType match { + case _: InnerLike | RightOuter => right.outputPartitioning + case x => + throw new IllegalArgumentException( + s"HashJoin should not take $x as the JoinType with building left side") + } + case BuildRight => + joinType match { + case _: InnerLike | LeftOuter | LeftSemi | LeftAnti | _: ExistenceJoin => + left.outputPartitioning + case x => + throw new IllegalArgumentException( + s"HashJoin should not take $x as the JoinType with building right side") + } + } - override def outputOrdering: Seq[SortOrder] = streamedPlan.outputOrdering + override def outputOrdering: Seq[SortOrder] = buildSide match { + case BuildLeft => + joinType match { + case _: InnerLike | RightOuter => right.outputOrdering + case x => + throw new IllegalArgumentException( + s"HashJoin should not take $x as the JoinType with building left side") + } + case BuildRight => + joinType match { + case _: InnerLike | LeftOuter | LeftSemi | LeftAnti | _: ExistenceJoin => + left.outputOrdering + case x => + throw new IllegalArgumentException( + s"HashJoin should not take $x as the JoinType with building right side") + } + } protected lazy val (buildPlan, streamedPlan) = buildSide match { case BuildLeft => (left, right) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala index 589ccbd8b636b..c42d4c6f74a93 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala @@ -1114,15 +1114,17 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan // Test broadcast hash join withSQLConf( SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "50") { - val plan = df1.join(df2, $"k1" === $"k2") - .join(df3, $"k1" === $"k3") - .join(df4, $"k1" === $"k4") - .queryExecution - .executedPlan - assert(plan.collect { case _: SortMergeJoinExec => true }.size === 2) - assert(plan.collect { case _: BroadcastHashJoinExec => true }.size === 1) - // No extra sort before last sort merge join - assert(plan.collect { case _: SortExec => true }.size === 3) + Seq("inner", "left_outer").foreach(joinType => { + val plan = df1.join(df2, $"k1" === $"k2", joinType) + .join(df3, $"k1" === $"k3", joinType) + .join(df4, $"k1" === $"k4", joinType) + .queryExecution + .executedPlan + assert(plan.collect { case _: SortMergeJoinExec => true }.size === 2) + assert(plan.collect { case _: BroadcastHashJoinExec => true }.size === 1) + // No extra sort before last sort merge join + assert(plan.collect { case _: SortExec => true }.size === 3) + }) } // Test shuffled hash join @@ -1131,15 +1133,18 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan SQLConf.SHUFFLE_PARTITIONS.key -> "2", SQLConf.PREFER_SORTMERGEJOIN.key -> "false") { val df3 = spark.range(10).select($"id".as("k3")) - val plan = df1.join(df2, $"k1" === $"k2") - .join(df3, $"k1" === $"k3") - .join(df4, $"k1" === $"k4") - .queryExecution - .executedPlan - assert(plan.collect { case _: SortMergeJoinExec => true }.size === 2) - assert(plan.collect { case _: ShuffledHashJoinExec => true }.size === 1) - // No extra sort before last sort merge join - assert(plan.collect { case _: SortExec => true }.size === 3) + + Seq("inner", "left_outer").foreach(joinType => { + val plan = df1.join(df2, $"k1" === $"k2", joinType) + .join(df3, $"k1" === $"k3", joinType) + .join(df4, $"k1" === $"k4", joinType) + .queryExecution + .executedPlan + assert(plan.collect { case _: SortMergeJoinExec => true }.size === 2) + assert(plan.collect { case _: ShuffledHashJoinExec => true }.size === 1) + // No extra sort before last sort merge join + assert(plan.collect { case _: SortExec => true }.size === 3) + }) } } }