@@ -279,11 +279,15 @@ case class StreamingSymmetricHashJoinExec(
279279 // * Getting an iterator over the rows that have aged out on the left side. These rows are
280280 // candidates for being null joined. Note that to avoid doing two passes, this iterator
281281 // removes the rows from the state manager as they're processed.
282- // * Checking whether the current row matches a key in the right side state, and that key
283- // has any value which satisfies the filter function when joined. If it doesn't,
284- // we know we can join with null, since there was never (including this batch) a match
285- // within the watermark period. If it does, there must have been a match at some point, so
286- // we know we can't join with null.
282+ // * (state format version 1) Checking whether the current row matches a key in the
283+ // right side state, and that key has any value which satisfies the filter function when
284+ // joined. If it doesn't, we know we can join with null, since there was never
285+ // (including this batch) a match within the watermark period. If it does, there must have
286+ // been a match at some point, so we know we can't join with null.
287+ // * (state format version 2) We found edge-case of above approach which brings correctness
288+ // issue, and had to take another approach (see SPARK-26154); now Spark stores 'matched'
289+ // flag along with row, which is set to true when there's any matching row on the right.
290+
287291 def matchesWithRightSideState (leftKeyValue : UnsafeRowPair ) = {
288292 rightSideJoiner.get(leftKeyValue.key).exists { rightValue =>
289293 postJoinFilter(joinedRow.withLeft(leftKeyValue.value).withRight(rightValue))
0 commit comments