From f0e1815cef8954f846c1eb6195fc8a6c9f931ce0 Mon Sep 17 00:00:00 2001 From: Daoyuan Date: Sat, 12 Apr 2014 05:59:37 +0800 Subject: [PATCH 1/2] support leftsemijoin for sparkSQL --- .../spark/sql/catalyst/plans/joinTypes.scala | 1 + .../plans/logical/basicOperators.scala | 9 ++- .../apache/spark/sql/execution/joins.scala | 80 +++++++++++++------ .../org/apache/spark/sql/hive/HiveQl.scala | 1 + ...emijoin-0-80b6466213face7fbcb0de044611e1f5 | 0 ...emijoin-1-d1f6a3dea28a5f0fee08026bf33d9129 | 0 ...mijoin-10-89737a8857b5b61cc909e0c797f86aea | 4 + ...mijoin-11-80b6466213face7fbcb0de044611e1f5 | 0 ...mijoin-12-d1f6a3dea28a5f0fee08026bf33d9129 | 0 ...emijoin-2-43d53504df013e6b35f81811138a167a | 1 + ...emijoin-3-b07d292423312aafa5e5762a579decd2 | 0 ...emijoin-4-3ac2226efe7cb5d999c1c5e4ac2114be | 0 ...emijoin-5-9c307c0559d735960ce77efa95b2b17b | 0 ...emijoin-6-82921fc96eef547ec0f71027ee88298c | 0 ...emijoin-7-b30aa3b4a45db6b64bb46b4d9bd32ff0 | 0 ...tsemijoin-8-73cad58a10a1483ccb15e94a857013 | 4 + ...emijoin-9-c5efa6b8771a51610d655be461670e1e | 2 + ...join_mr-0-7087fb6281a34d00f1812d2ff4ba8b75 | 0 ...join_mr-1-aa3f07f028027ffd13ab5535dc821593 | 0 ...oin_mr-10-9914f44ecb6ae7587b62e5349ff60d04 | 1 + ...oin_mr-11-2027ecb1495d5550c5d56abf6b95b0a7 | 2 + ...join_mr-2-3f65953ae60375156367c54533978782 | 0 ...join_mr-3-645cf8b871c9b27418d6fa1d1bda9a52 | 0 ...join_mr-4-333895fe6abca27c8edb5c91bfe10d2f | 2 + ...join_mr-5-896d0948c1df849df9764a6d8ad8fff9 | 20 +++++ ...join_mr-6-b1e2ade89ae898650f0be4f796d8947b | 1 + ...join_mr-7-8e9c2969b999557363e40f9ebb3f6d7c | 1 + ...join_mr-8-c61b972d4409babe41d8963e841af45b | 1 + ...join_mr-9-2027ecb1495d5550c5d56abf6b95b0a7 | 2 + .../execution/HiveCompatibilitySuite.scala | 2 + 30 files changed, 106 insertions(+), 28 deletions(-) create mode 100644 sql/hive/src/test/resources/golden/leftsemijoin-0-80b6466213face7fbcb0de044611e1f5 create mode 100644 sql/hive/src/test/resources/golden/leftsemijoin-1-d1f6a3dea28a5f0fee08026bf33d9129 create mode 100644 sql/hive/src/test/resources/golden/leftsemijoin-10-89737a8857b5b61cc909e0c797f86aea create mode 100644 sql/hive/src/test/resources/golden/leftsemijoin-11-80b6466213face7fbcb0de044611e1f5 create mode 100644 sql/hive/src/test/resources/golden/leftsemijoin-12-d1f6a3dea28a5f0fee08026bf33d9129 create mode 100644 sql/hive/src/test/resources/golden/leftsemijoin-2-43d53504df013e6b35f81811138a167a create mode 100644 sql/hive/src/test/resources/golden/leftsemijoin-3-b07d292423312aafa5e5762a579decd2 create mode 100644 sql/hive/src/test/resources/golden/leftsemijoin-4-3ac2226efe7cb5d999c1c5e4ac2114be create mode 100644 sql/hive/src/test/resources/golden/leftsemijoin-5-9c307c0559d735960ce77efa95b2b17b create mode 100644 sql/hive/src/test/resources/golden/leftsemijoin-6-82921fc96eef547ec0f71027ee88298c create mode 100644 sql/hive/src/test/resources/golden/leftsemijoin-7-b30aa3b4a45db6b64bb46b4d9bd32ff0 create mode 100644 sql/hive/src/test/resources/golden/leftsemijoin-8-73cad58a10a1483ccb15e94a857013 create mode 100644 sql/hive/src/test/resources/golden/leftsemijoin-9-c5efa6b8771a51610d655be461670e1e create mode 100644 sql/hive/src/test/resources/golden/leftsemijoin_mr-0-7087fb6281a34d00f1812d2ff4ba8b75 create mode 100644 sql/hive/src/test/resources/golden/leftsemijoin_mr-1-aa3f07f028027ffd13ab5535dc821593 create mode 100644 sql/hive/src/test/resources/golden/leftsemijoin_mr-10-9914f44ecb6ae7587b62e5349ff60d04 create mode 100644 sql/hive/src/test/resources/golden/leftsemijoin_mr-11-2027ecb1495d5550c5d56abf6b95b0a7 create mode 100644 sql/hive/src/test/resources/golden/leftsemijoin_mr-2-3f65953ae60375156367c54533978782 create mode 100644 sql/hive/src/test/resources/golden/leftsemijoin_mr-3-645cf8b871c9b27418d6fa1d1bda9a52 create mode 100644 sql/hive/src/test/resources/golden/leftsemijoin_mr-4-333895fe6abca27c8edb5c91bfe10d2f create mode 100644 sql/hive/src/test/resources/golden/leftsemijoin_mr-5-896d0948c1df849df9764a6d8ad8fff9 create mode 100644 sql/hive/src/test/resources/golden/leftsemijoin_mr-6-b1e2ade89ae898650f0be4f796d8947b create mode 100644 sql/hive/src/test/resources/golden/leftsemijoin_mr-7-8e9c2969b999557363e40f9ebb3f6d7c create mode 100644 sql/hive/src/test/resources/golden/leftsemijoin_mr-8-c61b972d4409babe41d8963e841af45b create mode 100644 sql/hive/src/test/resources/golden/leftsemijoin_mr-9-2027ecb1495d5550c5d56abf6b95b0a7 diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/joinTypes.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/joinTypes.scala index ae8d7d3e4257f..613f4bb09daf5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/joinTypes.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/joinTypes.scala @@ -22,3 +22,4 @@ case object Inner extends JoinType case object LeftOuter extends JoinType case object RightOuter extends JoinType case object FullOuter extends JoinType +case object LeftSemi extends JoinType diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala index 397473e178867..a96a46e47d5c3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.plans.JoinType +import org.apache.spark.sql.catalyst.plans.{LeftSemi, JoinType} import org.apache.spark.sql.catalyst.types._ case class Project(projectList: Seq[NamedExpression], child: LogicalPlan) extends UnaryNode { @@ -81,7 +81,12 @@ case class Join( condition: Option[Expression]) extends BinaryNode { def references = condition.map(_.references).getOrElse(Set.empty) - def output = left.output ++ right.output + def output = joinType match { + case LeftSemi => + left.output + case _ => + left.output ++ right.output + } } case class InsertIntoTable( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala index c89dae9358bf7..0c52ba6771460 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala @@ -165,36 +165,64 @@ case class BroadcastNestedLoopJoin( def execute() = { val broadcastedRelation = sc.broadcast(broadcast.execute().map(_.copy()).collect().toIndexedSeq) - val streamedPlusMatches = streamed.execute().mapPartitions { streamedIter => - val matchedRows = new ArrayBuffer[Row] - // TODO: Use Spark's BitSet. - val includedBroadcastTuples = new BitSet(broadcastedRelation.value.size) - val joinedRow = new JoinedRow - - streamedIter.foreach { streamedRow => - var i = 0 - var matched = false - - while (i < broadcastedRelation.value.size) { - // TODO: One bitset per partition instead of per row. - val broadcastedRow = broadcastedRelation.value(i) - if (boundCondition(joinedRow(streamedRow, broadcastedRow))) { - matchedRows += buildRow(streamedRow ++ broadcastedRow) - matched = true - includedBroadcastTuples += i - } - i += 1 + val streamedPlusMatches = joinType match { + case LeftSemi => + streamed.execute().mapPartitions { + streamedIter => + val matchedRows = new ArrayBuffer[Row] + val joinedRow = new JoinedRow + + streamedIter.foreach { + streamedRow => + var i = 0 + var matched = false + + while (i < broadcastedRelation.value.size && !matched) { + // TODO: One bitset per partition instead of per row. + val broadcastedRow = broadcastedRelation.value(i) + if (boundCondition(joinedRow(streamedRow, broadcastedRow))) { + matchedRows += buildRow(streamedRow) + matched = true + } + i += 1 + } + } + Iterator((matchedRows, null)) } - - if (!matched && (joinType == LeftOuter || joinType == FullOuter)) { - matchedRows += buildRow(streamedRow ++ Array.fill(right.output.size)(null)) + case _ => + streamed.execute().mapPartitions { + streamedIter => + val matchedRows = new ArrayBuffer[Row] + // TODO: Use Spark's BitSet. + val includedBroadcastTuples = new BitSet(broadcastedRelation.value.size) + val joinedRow = new JoinedRow + + streamedIter.foreach { + streamedRow => + var i = 0 + var matched = false + + while (i < broadcastedRelation.value.size) { + // TODO: One bitset per partition instead of per row. + val broadcastedRow = broadcastedRelation.value(i) + if (boundCondition(joinedRow(streamedRow, broadcastedRow))) { + matchedRows += buildRow(streamedRow ++ broadcastedRow) + matched = true + includedBroadcastTuples += i + } + i += 1 + } + + if (!matched && (joinType == LeftOuter || joinType == FullOuter)) { + matchedRows += buildRow(streamedRow ++ Array.fill(right.output.size)(null)) + } + } + Iterator((matchedRows, includedBroadcastTuples)) } - } - Iterator((matchedRows, includedBroadcastTuples)) } - val includedBroadcastTuples = streamedPlusMatches.map(_._2) - val allIncludedBroadcastTuples = + lazy val includedBroadcastTuples = streamedPlusMatches.map(_._2) + lazy val allIncludedBroadcastTuples = if (includedBroadcastTuples.count == 0) { new scala.collection.mutable.BitSet(broadcastedRelation.value.size) } else { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala index 4dac25b3f60e4..dcafb1ea4456b 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala @@ -676,6 +676,7 @@ object HiveQl { case "TOK_RIGHTOUTERJOIN" => RightOuter case "TOK_LEFTOUTERJOIN" => LeftOuter case "TOK_FULLOUTERJOIN" => FullOuter + case "TOK_LEFTSEMIJOIN" => LeftSemi } assert(other.size <= 1, "Unhandled join clauses.") Join(nodeToRelation(relation1), diff --git a/sql/hive/src/test/resources/golden/leftsemijoin-0-80b6466213face7fbcb0de044611e1f5 b/sql/hive/src/test/resources/golden/leftsemijoin-0-80b6466213face7fbcb0de044611e1f5 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/sql/hive/src/test/resources/golden/leftsemijoin-1-d1f6a3dea28a5f0fee08026bf33d9129 b/sql/hive/src/test/resources/golden/leftsemijoin-1-d1f6a3dea28a5f0fee08026bf33d9129 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/sql/hive/src/test/resources/golden/leftsemijoin-10-89737a8857b5b61cc909e0c797f86aea b/sql/hive/src/test/resources/golden/leftsemijoin-10-89737a8857b5b61cc909e0c797f86aea new file mode 100644 index 0000000000000..25ce912507d55 --- /dev/null +++ b/sql/hive/src/test/resources/golden/leftsemijoin-10-89737a8857b5b61cc909e0c797f86aea @@ -0,0 +1,4 @@ +Hank 2 +Hank 2 +Joe 2 +Joe 2 diff --git a/sql/hive/src/test/resources/golden/leftsemijoin-11-80b6466213face7fbcb0de044611e1f5 b/sql/hive/src/test/resources/golden/leftsemijoin-11-80b6466213face7fbcb0de044611e1f5 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/sql/hive/src/test/resources/golden/leftsemijoin-12-d1f6a3dea28a5f0fee08026bf33d9129 b/sql/hive/src/test/resources/golden/leftsemijoin-12-d1f6a3dea28a5f0fee08026bf33d9129 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/sql/hive/src/test/resources/golden/leftsemijoin-2-43d53504df013e6b35f81811138a167a b/sql/hive/src/test/resources/golden/leftsemijoin-2-43d53504df013e6b35f81811138a167a new file mode 100644 index 0000000000000..573541ac9702d --- /dev/null +++ b/sql/hive/src/test/resources/golden/leftsemijoin-2-43d53504df013e6b35f81811138a167a @@ -0,0 +1 @@ +0 diff --git a/sql/hive/src/test/resources/golden/leftsemijoin-3-b07d292423312aafa5e5762a579decd2 b/sql/hive/src/test/resources/golden/leftsemijoin-3-b07d292423312aafa5e5762a579decd2 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/sql/hive/src/test/resources/golden/leftsemijoin-4-3ac2226efe7cb5d999c1c5e4ac2114be b/sql/hive/src/test/resources/golden/leftsemijoin-4-3ac2226efe7cb5d999c1c5e4ac2114be new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/sql/hive/src/test/resources/golden/leftsemijoin-5-9c307c0559d735960ce77efa95b2b17b b/sql/hive/src/test/resources/golden/leftsemijoin-5-9c307c0559d735960ce77efa95b2b17b new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/sql/hive/src/test/resources/golden/leftsemijoin-6-82921fc96eef547ec0f71027ee88298c b/sql/hive/src/test/resources/golden/leftsemijoin-6-82921fc96eef547ec0f71027ee88298c new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/sql/hive/src/test/resources/golden/leftsemijoin-7-b30aa3b4a45db6b64bb46b4d9bd32ff0 b/sql/hive/src/test/resources/golden/leftsemijoin-7-b30aa3b4a45db6b64bb46b4d9bd32ff0 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/sql/hive/src/test/resources/golden/leftsemijoin-8-73cad58a10a1483ccb15e94a857013 b/sql/hive/src/test/resources/golden/leftsemijoin-8-73cad58a10a1483ccb15e94a857013 new file mode 100644 index 0000000000000..25ce912507d55 --- /dev/null +++ b/sql/hive/src/test/resources/golden/leftsemijoin-8-73cad58a10a1483ccb15e94a857013 @@ -0,0 +1,4 @@ +Hank 2 +Hank 2 +Joe 2 +Joe 2 diff --git a/sql/hive/src/test/resources/golden/leftsemijoin-9-c5efa6b8771a51610d655be461670e1e b/sql/hive/src/test/resources/golden/leftsemijoin-9-c5efa6b8771a51610d655be461670e1e new file mode 100644 index 0000000000000..f1470bad5782b --- /dev/null +++ b/sql/hive/src/test/resources/golden/leftsemijoin-9-c5efa6b8771a51610d655be461670e1e @@ -0,0 +1,2 @@ +2 Tie +2 Tie diff --git a/sql/hive/src/test/resources/golden/leftsemijoin_mr-0-7087fb6281a34d00f1812d2ff4ba8b75 b/sql/hive/src/test/resources/golden/leftsemijoin_mr-0-7087fb6281a34d00f1812d2ff4ba8b75 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/sql/hive/src/test/resources/golden/leftsemijoin_mr-1-aa3f07f028027ffd13ab5535dc821593 b/sql/hive/src/test/resources/golden/leftsemijoin_mr-1-aa3f07f028027ffd13ab5535dc821593 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/sql/hive/src/test/resources/golden/leftsemijoin_mr-10-9914f44ecb6ae7587b62e5349ff60d04 b/sql/hive/src/test/resources/golden/leftsemijoin_mr-10-9914f44ecb6ae7587b62e5349ff60d04 new file mode 100644 index 0000000000000..573541ac9702d --- /dev/null +++ b/sql/hive/src/test/resources/golden/leftsemijoin_mr-10-9914f44ecb6ae7587b62e5349ff60d04 @@ -0,0 +1 @@ +0 diff --git a/sql/hive/src/test/resources/golden/leftsemijoin_mr-11-2027ecb1495d5550c5d56abf6b95b0a7 b/sql/hive/src/test/resources/golden/leftsemijoin_mr-11-2027ecb1495d5550c5d56abf6b95b0a7 new file mode 100644 index 0000000000000..6ed281c757a96 --- /dev/null +++ b/sql/hive/src/test/resources/golden/leftsemijoin_mr-11-2027ecb1495d5550c5d56abf6b95b0a7 @@ -0,0 +1,2 @@ +1 +1 diff --git a/sql/hive/src/test/resources/golden/leftsemijoin_mr-2-3f65953ae60375156367c54533978782 b/sql/hive/src/test/resources/golden/leftsemijoin_mr-2-3f65953ae60375156367c54533978782 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/sql/hive/src/test/resources/golden/leftsemijoin_mr-3-645cf8b871c9b27418d6fa1d1bda9a52 b/sql/hive/src/test/resources/golden/leftsemijoin_mr-3-645cf8b871c9b27418d6fa1d1bda9a52 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/sql/hive/src/test/resources/golden/leftsemijoin_mr-4-333895fe6abca27c8edb5c91bfe10d2f b/sql/hive/src/test/resources/golden/leftsemijoin_mr-4-333895fe6abca27c8edb5c91bfe10d2f new file mode 100644 index 0000000000000..6ed281c757a96 --- /dev/null +++ b/sql/hive/src/test/resources/golden/leftsemijoin_mr-4-333895fe6abca27c8edb5c91bfe10d2f @@ -0,0 +1,2 @@ +1 +1 diff --git a/sql/hive/src/test/resources/golden/leftsemijoin_mr-5-896d0948c1df849df9764a6d8ad8fff9 b/sql/hive/src/test/resources/golden/leftsemijoin_mr-5-896d0948c1df849df9764a6d8ad8fff9 new file mode 100644 index 0000000000000..179ef0e0209e9 --- /dev/null +++ b/sql/hive/src/test/resources/golden/leftsemijoin_mr-5-896d0948c1df849df9764a6d8ad8fff9 @@ -0,0 +1,20 @@ +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 diff --git a/sql/hive/src/test/resources/golden/leftsemijoin_mr-6-b1e2ade89ae898650f0be4f796d8947b b/sql/hive/src/test/resources/golden/leftsemijoin_mr-6-b1e2ade89ae898650f0be4f796d8947b new file mode 100644 index 0000000000000..573541ac9702d --- /dev/null +++ b/sql/hive/src/test/resources/golden/leftsemijoin_mr-6-b1e2ade89ae898650f0be4f796d8947b @@ -0,0 +1 @@ +0 diff --git a/sql/hive/src/test/resources/golden/leftsemijoin_mr-7-8e9c2969b999557363e40f9ebb3f6d7c b/sql/hive/src/test/resources/golden/leftsemijoin_mr-7-8e9c2969b999557363e40f9ebb3f6d7c new file mode 100644 index 0000000000000..573541ac9702d --- /dev/null +++ b/sql/hive/src/test/resources/golden/leftsemijoin_mr-7-8e9c2969b999557363e40f9ebb3f6d7c @@ -0,0 +1 @@ +0 diff --git a/sql/hive/src/test/resources/golden/leftsemijoin_mr-8-c61b972d4409babe41d8963e841af45b b/sql/hive/src/test/resources/golden/leftsemijoin_mr-8-c61b972d4409babe41d8963e841af45b new file mode 100644 index 0000000000000..573541ac9702d --- /dev/null +++ b/sql/hive/src/test/resources/golden/leftsemijoin_mr-8-c61b972d4409babe41d8963e841af45b @@ -0,0 +1 @@ +0 diff --git a/sql/hive/src/test/resources/golden/leftsemijoin_mr-9-2027ecb1495d5550c5d56abf6b95b0a7 b/sql/hive/src/test/resources/golden/leftsemijoin_mr-9-2027ecb1495d5550c5d56abf6b95b0a7 new file mode 100644 index 0000000000000..6ed281c757a96 --- /dev/null +++ b/sql/hive/src/test/resources/golden/leftsemijoin_mr-9-2027ecb1495d5550c5d56abf6b95b0a7 @@ -0,0 +1,2 @@ +1 +1 diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala index f76e16bc1afc5..d842b083136db 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala @@ -440,6 +440,8 @@ class HiveCompatibilitySuite extends HiveQueryFileTest { "join_view", "lateral_view_cp", "lateral_view_ppd", + "leftsemijoin", + "leftsemijoin_mr", "lineage1", "literal_double", "literal_ints", From 277e4e977c06f9db1f3af51f7397add4edaef3a3 Mon Sep 17 00:00:00 2001 From: Daoyuan Date: Mon, 12 May 2014 12:49:06 +0800 Subject: [PATCH 2/2] modify minor code according to michael --- .../main/scala/org/apache/spark/sql/execution/joins.scala | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala index 0c52ba6771460..40ea856602fae 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala @@ -181,7 +181,7 @@ case class BroadcastNestedLoopJoin( // TODO: One bitset per partition instead of per row. val broadcastedRow = broadcastedRelation.value(i) if (boundCondition(joinedRow(streamedRow, broadcastedRow))) { - matchedRows += buildRow(streamedRow) + matchedRows += streamedRow matched = true } i += 1 @@ -197,6 +197,8 @@ case class BroadcastNestedLoopJoin( val includedBroadcastTuples = new BitSet(broadcastedRelation.value.size) val joinedRow = new JoinedRow + val rightNull = Array.fill(right.output.size)(null) + streamedIter.foreach { streamedRow => var i = 0 @@ -206,7 +208,7 @@ case class BroadcastNestedLoopJoin( // TODO: One bitset per partition instead of per row. val broadcastedRow = broadcastedRelation.value(i) if (boundCondition(joinedRow(streamedRow, broadcastedRow))) { - matchedRows += buildRow(streamedRow ++ broadcastedRow) + matchedRows += joinedRow(streamedRow.copy, broadcastedRow) matched = true includedBroadcastTuples += i } @@ -214,7 +216,7 @@ case class BroadcastNestedLoopJoin( } if (!matched && (joinType == LeftOuter || joinType == FullOuter)) { - matchedRows += buildRow(streamedRow ++ Array.fill(right.output.size)(null)) + matchedRows += joinedRow(streamedRow.copy, rightNull) } } Iterator((matchedRows, includedBroadcastTuples))