Skip to content

Commit 83f0423

Browse files
HeartSaVioRcloud-fan
authored andcommitted
[SPARK-32148][SS] Fix stream-stream join issue on missing to copy reused unsafe row
### What changes were proposed in this pull request? This patch fixes the odd join result being occurred from stream-stream join for state store format V2. There're some spots on V2 path which leverage UnsafeProjection. As the result row is reused, the row should be copied to avoid changing value during reading (or make sure the caller doesn't affect by such behavior) but `SymmetricHashJoinStateManager.removeByValueCondition` violates the case. This patch makes `KeyWithIndexToValueRowConverterV2.convertValue` copy the row by itself so that callers don't need to take care about it. This patch doesn't change the behavior of `KeyWithIndexToValueRowConverterV2.convertToValueRow` to avoid double-copying, as the caller is expected to store the row which the implementation of state store will call `copy()`. This patch adds such behavior into each method doc in `KeyWithIndexToValueRowConverter`, so that further contributors can read through and make sure the change / new addition doesn't break the contract. ### Why are the changes needed? Stream-stream join with state store format V2 (newly added in Spark 3.0.0) has a serious correctness bug which brings indeterministic result. ### Does this PR introduce _any_ user-facing change? Yes, some of Spark 3.0.0 users using stream-stream join from the new checkpoint (as the bug exists to only v2 format path) may encounter wrong join result. This patch will fix it. ### How was this patch tested? Reported case is converted to the new UT, and confirmed UT passed. All UTs in StreamingInnerJoinSuite and StreamingOuterJoinSuite passed as well Closes #28975 from HeartSaVioR/SPARK-32148. Authored-by: Jungtaek Lim (HeartSaVioR) <[email protected]> Signed-off-by: Wenchen Fan <[email protected]> (cherry picked from commit 526cb2d) Signed-off-by: Wenchen Fan <[email protected]>
1 parent 29e098b commit 83f0423

File tree

3 files changed

+64
-1
lines changed

3 files changed

+64
-1
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -295,6 +295,10 @@ case class StreamingSymmetricHashJoinExec(
295295
postJoinFilter(joinedRow.withLeft(leftKeyValue.value).withRight(rightValue))
296296
}
297297
}
298+
299+
// NOTE: we need to make sure `outerOutputIter` is evaluated "after" exhausting all of
300+
// elements in `innerOutputIter`, because evaluation of `innerOutputIter` may update
301+
// the match flag which the logic for outer join is relying on.
298302
val removedRowIter = leftSideJoiner.removeOldState()
299303
val outerOutputIter = removedRowIter.filterNot { kv =>
300304
stateFormatVersion match {

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -451,10 +451,25 @@ class SymmetricHashJoinStateManager(
451451
}
452452

453453
private trait KeyWithIndexToValueRowConverter {
454+
/** Defines the schema of the value row (the value side of K-V in state store). */
454455
def valueAttributes: Seq[Attribute]
455456

457+
/**
458+
* Convert the value row to (actual value, match) pair.
459+
*
460+
* NOTE: implementations should ensure the result row is NOT reused during execution, so
461+
* that caller can safely read the value in any time.
462+
*/
456463
def convertValue(value: UnsafeRow): ValueAndMatchPair
457464

465+
/**
466+
* Build the value row from (actual value, match) pair. This is expected to be called just
467+
* before storing to the state store.
468+
*
469+
* NOTE: depending on the implementation, the result row "may" be reused during execution
470+
* (to avoid initialization of object), so the caller should ensure that the logic doesn't
471+
* affect by such behavior. Call copy() against the result row if needed.
472+
*/
458473
def convertToValueRow(value: UnsafeRow, matched: Boolean): UnsafeRow
459474
}
460475

@@ -493,7 +508,7 @@ class SymmetricHashJoinStateManager(
493508

494509
override def convertValue(value: UnsafeRow): ValueAndMatchPair = {
495510
if (value != null) {
496-
ValueAndMatchPair(valueRowGenerator(value),
511+
ValueAndMatchPair(valueRowGenerator(value).copy(),
497512
value.getBoolean(indexOrdinalInValueWithMatchedRow))
498513
} else {
499514
null

sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.spark.sql.streaming
1919

2020
import java.io.File
21+
import java.sql.Timestamp
2122
import java.util.{Locale, UUID}
2223

2324
import scala.util.Random
@@ -991,4 +992,47 @@ class StreamingOuterJoinSuite extends StreamTest with StateStoreMetricsTest with
991992
)
992993
}
993994
}
995+
996+
test("SPARK-32148 stream-stream join regression on Spark 3.0.0") {
997+
val input1 = MemoryStream[(Timestamp, String, String)]
998+
val df1 = input1.toDF
999+
.selectExpr("_1 as eventTime", "_2 as id", "_3 as comment")
1000+
.withWatermark(s"eventTime", "2 minutes")
1001+
1002+
val input2 = MemoryStream[(Timestamp, String, String)]
1003+
val df2 = input2.toDF
1004+
.selectExpr("_1 as eventTime", "_2 as id", "_3 as name")
1005+
.withWatermark(s"eventTime", "4 minutes")
1006+
1007+
val joined = df1.as("left")
1008+
.join(df2.as("right"),
1009+
expr("""
1010+
|left.id = right.id AND left.eventTime BETWEEN
1011+
| right.eventTime - INTERVAL 30 seconds AND
1012+
| right.eventTime + INTERVAL 30 seconds
1013+
""".stripMargin),
1014+
joinType = "leftOuter")
1015+
1016+
val inputDataForInput1 = Seq(
1017+
(Timestamp.valueOf("2020-01-01 00:00:00"), "abc", "has no join partner"),
1018+
(Timestamp.valueOf("2020-01-02 00:00:00"), "abc", "joined with A"),
1019+
(Timestamp.valueOf("2020-01-02 01:00:00"), "abc", "joined with B"))
1020+
1021+
val inputDataForInput2 = Seq(
1022+
(Timestamp.valueOf("2020-01-02 00:00:10"), "abc", "A"),
1023+
(Timestamp.valueOf("2020-01-02 00:59:59"), "abc", "B"),
1024+
(Timestamp.valueOf("2020-01-02 02:00:00"), "abc", "C"))
1025+
1026+
val expectedOutput = Seq(
1027+
(Timestamp.valueOf("2020-01-01 00:00:00"), "abc", "has no join partner", null, null, null),
1028+
(Timestamp.valueOf("2020-01-02 00:00:00"), "abc", "joined with A",
1029+
Timestamp.valueOf("2020-01-02 00:00:10"), "abc", "A"),
1030+
(Timestamp.valueOf("2020-01-02 01:00:00"), "abc", "joined with B",
1031+
Timestamp.valueOf("2020-01-02 00:59:59"), "abc", "B"))
1032+
1033+
testStream(joined)(
1034+
MultiAddData((input1, inputDataForInput1), (input2, inputDataForInput2)),
1035+
CheckNewAnswer(expectedOutput.head, expectedOutput.tail: _*)
1036+
)
1037+
}
9941038
}

0 commit comments

Comments
 (0)