Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,10 @@ case class StreamingSymmetricHashJoinExec(
postJoinFilter(joinedRow.withLeft(leftKeyValue.value).withRight(rightValue))
}
}

// NOTE: we need to make sure `outerOutputIter` is evaluated "after" exhausting all of
// elements in `innerOutputIter`, because evaluation of `innerOutputIter` may update
// the match flag which the logic for outer join is relying on.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to clarify: this comment is not related to the bug and just to document an existing assumption?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes right.

TBH I suspected this first and crafted a patch including the part with new iterator explicitly runs the logic after evaluating innerOutputIter, and later realized current logic already dealt with this properly, because removeOldState() doesn't materialize the candidates and evaluate lazily. This patch contains minimal change.

Worth to mention how it works for someone who may need to touch here.

val removedRowIter = leftSideJoiner.removeOldState()
val outerOutputIter = removedRowIter.filterNot { kv =>
stateFormatVersion match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -451,10 +451,25 @@ class SymmetricHashJoinStateManager(
}

private trait KeyWithIndexToValueRowConverter {
/** Defines the schema of the value row (the value side of K-V in state store). */
def valueAttributes: Seq[Attribute]

/**
* Convert the value row to (actual value, match) pair.
*
* NOTE: implementations should ensure the result row is NOT reused during execution, so
* that caller can safely read the value in any time.
*/
def convertValue(value: UnsafeRow): ValueAndMatchPair

/**
* Build the value row from (actual value, match) pair. This is expected to be called just
* before storing to the state store.
*
* NOTE: depending on the implementation, the result row "may" be reused during execution
* (to avoid initialization of object), so the caller should ensure that the logic doesn't
* affect by such behavior. Call copy() against the result row if needed.
*/
def convertToValueRow(value: UnsafeRow, matched: Boolean): UnsafeRow
}

Expand Down Expand Up @@ -493,7 +508,7 @@ class SymmetricHashJoinStateManager(

override def convertValue(value: UnsafeRow): ValueAndMatchPair = {
if (value != null) {
ValueAndMatchPair(valueRowGenerator(value),
ValueAndMatchPair(valueRowGenerator(value).copy(),
value.getBoolean(indexOrdinalInValueWithMatchedRow))
} else {
null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.sql.streaming

import java.io.File
import java.sql.Timestamp
import java.util.{Locale, UUID}

import scala.util.Random
Expand Down Expand Up @@ -996,4 +997,47 @@ class StreamingOuterJoinSuite extends StreamTest with StateStoreMetricsTest with
)
}
}

test("SPARK-32148 stream-stream join regression on Spark 3.0.0") {
val input1 = MemoryStream[(Timestamp, String, String)]
val df1 = input1.toDF
.selectExpr("_1 as eventTime", "_2 as id", "_3 as comment")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any specific reason why not use select? I don't see any expression here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess it's pretty much simpler and more readable than select('_1.as("eventTime"), '_2.as("id"), '_3.as("comment")) (or even with col(...) if ' notation doesn't work for _1, _2, _3).

.withWatermark(s"eventTime", "2 minutes")

val input2 = MemoryStream[(Timestamp, String, String)]
val df2 = input2.toDF
.selectExpr("_1 as eventTime", "_2 as id", "_3 as name")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here as well.

.withWatermark(s"eventTime", "4 minutes")

val joined = df1.as("left")
.join(df2.as("right"),
expr("""
|left.id = right.id AND left.eventTime BETWEEN
| right.eventTime - INTERVAL 30 seconds AND
| right.eventTime + INTERVAL 30 seconds
""".stripMargin),
joinType = "leftOuter")

val inputDataForInput1 = Seq(
(Timestamp.valueOf("2020-01-01 00:00:00"), "abc", "has no join partner"),
(Timestamp.valueOf("2020-01-02 00:00:00"), "abc", "joined with A"),
(Timestamp.valueOf("2020-01-02 01:00:00"), "abc", "joined with B"))

val inputDataForInput2 = Seq(
(Timestamp.valueOf("2020-01-02 00:00:10"), "abc", "A"),
(Timestamp.valueOf("2020-01-02 00:59:59"), "abc", "B"),
(Timestamp.valueOf("2020-01-02 02:00:00"), "abc", "C"))

val expectedOutput = Seq(
(Timestamp.valueOf("2020-01-01 00:00:00"), "abc", "has no join partner", null, null, null),
(Timestamp.valueOf("2020-01-02 00:00:00"), "abc", "joined with A",
Timestamp.valueOf("2020-01-02 00:00:10"), "abc", "A"),
(Timestamp.valueOf("2020-01-02 01:00:00"), "abc", "joined with B",
Timestamp.valueOf("2020-01-02 00:59:59"), "abc", "B"))

testStream(joined)(
MultiAddData((input1, inputDataForInput1), (input2, inputDataForInput2)),
CheckNewAnswer(expectedOutput.head, expectedOutput.tail: _*)
)
}
}