Skip to content

Commit 771abeb

Browse files
wangyumhvanhovell
authored andcommitted
[SPARK-17685][SQL] Make SortMergeJoinExec's currentVars is null when calling createJoinKey
## What changes were proposed in this pull request? The following SQL query cause `IndexOutOfBoundsException` issue when `LIMIT > 1310720`: ```sql CREATE TABLE tab1(int int, int2 int, str string); CREATE TABLE tab2(int int, int2 int, str string); INSERT INTO tab1 values(1,1,'str'); INSERT INTO tab1 values(2,2,'str'); INSERT INTO tab2 values(1,1,'str'); INSERT INTO tab2 values(2,3,'str'); SELECT count(*) FROM ( SELECT t1.int, t2.int2 FROM (SELECT * FROM tab1 LIMIT 1310721) t1 INNER JOIN (SELECT * FROM tab2 LIMIT 1310721) t2 ON (t1.int = t2.int AND t1.int2 = t2.int2) ) t; ``` This pull request fix this issue. ## How was this patch tested? unit tests Author: Yuming Wang <[email protected]> Closes #17920 from wangyum/SPARK-17685.
1 parent c0189ab commit 771abeb

File tree

2 files changed

+11
-0
lines changed

2 files changed

+11
-0
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -371,6 +371,7 @@ case class SortMergeJoinExec(
371371
keys: Seq[Expression],
372372
input: Seq[Attribute]): Seq[ExprCode] = {
373373
ctx.INPUT_ROW = row
374+
ctx.currentVars = null
374375
keys.map(BindReferences.bindReference(_, input).genCode(ctx))
375376
}
376377

sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -264,4 +264,14 @@ class DataFrameJoinSuite extends QueryTest with SharedSQLContext {
264264
val ab = a.join(b, Seq("a"), "fullouter")
265265
checkAnswer(ab.join(c, "a"), Row(3, null, 4, 1) :: Nil)
266266
}
267+
268+
test("SPARK-17685: WholeStageCodegenExec throws IndexOutOfBoundsException") {
269+
val df = Seq((1, 1, "1"), (2, 2, "3")).toDF("int", "int2", "str")
270+
val df2 = Seq((1, 1, "1"), (2, 3, "5")).toDF("int", "int2", "str")
271+
val limit = 1310721
272+
val innerJoin = df.limit(limit).join(df2.limit(limit), Seq("int", "int2"), "inner")
273+
.agg(count($"int"))
274+
checkAnswer(innerJoin, Row(1) :: Nil)
275+
}
276+
267277
}

0 commit comments

Comments
 (0)