-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-26154][SS] Streaming left/right outer join should not return outer nulls for already matched rows #26108
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-26154][SS] Streaming left/right outer join should not return outer nulls for already matched rows #26108
Conversation
…uter nulls for already matched rows
|
cc. @tdas @zsxwing @jose-torres @gaborgsomogyi This is a revised version of the patch #23634 to reduce the diff. Please take a look. Thanks in advance. |
|
|
||
| /** A wrapper around a [[StateStore]] that stores [(key, index) -> value]. */ | ||
| private class KeyWithIndexToValueStore extends StateStoreHandler(KeyWithIndexToValueType) { | ||
| private trait KeyWithIndexToValueRowConverter { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is unavoidable refactoring, as we need to deal with both schemas depending on the state format version.
I didn't let inner join use state format version 1 (though it could be one of optimizations), as the inner join query could be changed to outer join which Spark will not be able to determine the change - correctness issue will occur then.
| var key: UnsafeRow = null, | ||
| var valueIndex: Long = -1, | ||
| var value: UnsafeRow = null, | ||
| var matched: Boolean = false) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here matched also has a default value for state format version 1 - the main version of state format version is 2, so focused to optimize for state format version 2. (Option[Boolean] vs Boolean) The value is not used in caller if the state format version is 1, so I think this is OK.
| // we know we can join with null, since there was never (including this batch) a match | ||
| // within the watermark period. If it does, there must have been a match at some point, so | ||
| // we know we can't join with null. | ||
| // * (state format version 1) Checking whether the current row matches a key in the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've left origin comment as it is since it is still applied for state format version 1, and added comment for explaining state format version 2 and the reason of making a change. Please let me know if we just want to remove explanation of state format version 1.
|
Test build #112006 has finished for PR 26108 at commit
|
|
Test build #112009 has finished for PR 26108 at commit
|
vanzin
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, I think I get what's going on. Things look correct, I just can't comment much on details of the APIs being used (e.g. whether there's a better / cheaper / easier way to turn an UnsafeRow into another row + some extra data, like the new matched state being tracked).
Also, KeyWithIndexAndValueWithMatched is a pretty long and very specific name. Maybe just keep the old name?
| "State between versions are tend to be incompatible, so state format version shouldn't " + | ||
| "be modified after running.") | ||
| .intConf | ||
| .checkValue(v => Set(1, 2).contains(v), "Valid versions are 1 and 2") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
checkValues(Set(1, 2))
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice suggestion. Btw, the line is actually consistent with the old things, STREAMING_AGGREGATION_STATE_FORMAT_VERSION, FLATMAPGROUPSWITHSTATE_STATE_FORMAT_VERSION. Would we like to change them as well, or just leave it be consistent? Maybe from follow-up PR then to exclude unrelated changes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you want to change the others, feel free to open another PR. But I'd like the better API to be used here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(Ignoring the discussion about removing this, in the other comment below.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd like to leave this as it is, and handle this entirely. We can keep consistency regardless of follow-up PR be accepted or not.
| } | ||
|
|
||
| if (stateFormatVersion < 2 && joinType != Inner) { | ||
| logError(s"The query is using stream-stream outer join with state format version" + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is logging enough? Shouldn't you just fail the query?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually this correctness issue is somewhat occurred in "corner case", so I wasn't 100% sure we want to force end users to discard their checkpoint. #24890 is a similar case for "possible" correctness issue and we just log warn message.
If we would like to avoid possible correctness issue in any case, I'm OK to let the query fail. Forcing end users to drop checkpoint might be unhappy one, though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. I would suggest a config option to choose between error or continue, but then I dislike adding more and more config options.
I guess the question is: in which situations will you have the new version of Spark using the old snapshot format?
The way I understand it, it should only happen if you restart an app that was running 2.4, now running with 3.0. The new query will pick up the state store from the previous run and go from there.
In that situation it doesn't seem horrible to fail the query. But anyway, I'll leave it to your judgement.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So the impact matters. They need to discard checkpoint and replay the query with their previous inputs from the scratch. The impact would depend on how much data they need to replay - join may store rows depending on the condition of join, may couple of hours, even days (though it doesn't seem to be realistic in production).
Maybe replaying couple of hours of inputs wouldn't matter too much, and they still be able to run the query with old Spark version, so yes I agree it wouldn't be so horrible if we guide how to discard checkpoint and rerun the query.
Btw, as I described below, we may try to provide offline migration tool - though this patch shouldn't be blocked by that.
| case 1 => matchesWithRightSideState( | ||
| new UnsafeRowPair(kvAndMatched.key, kvAndMatched.value)) | ||
| case 2 => kvAndMatched.matched | ||
| case _ => throw new IllegalStateException("Incorrect state format version! " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/Incorrect/Invalid (or Unexpected).
| case 1 => matchesWithLeftSideState( | ||
| new UnsafeRowPair(kvAndMatched.key, kvAndMatched.value)) | ||
| case 2 => kvAndMatched.matched | ||
| case _ => throw new IllegalStateException("Incorrect state format version! " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same.
| thisRow: UnsafeRow, | ||
| subIter: Iterator[JoinedRow]) | ||
| extends CompletionIterator[JoinedRow, Iterator[JoinedRow]](subIter) { | ||
| private var iteratorNotEmpty: Boolean = false |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can't this just be a val? hasNext is supposed to be cheap, or at the very least idempotent, so should be safe to call here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess yes, if the implementation of iterator is respecting the spec of iterator. Let me change it and see whether it doesn't break anything.
| val joinedRow = generateJoinedRow(keyIdxToValue.value) | ||
| if (predicate(joinedRow)) { | ||
| if (!keyIdxToValue.matched) { | ||
| // only update when matched flag is false |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
comment just repeats the code.
| } | ||
|
|
||
| object SymmetricHashJoinStateManager { | ||
| val supportedVersions = Seq(1, 2) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be good to just reference the same values as the config entry.
In fact, since the config is internal, you could probably just drop the check there, and rely on the checks in this code to make sure the version if correct.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here. Sounds nice, but may want to keep consistency. So I'm in favor of changing all, or leaving it as it is.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same comment as before. I'd like the better approach to be used. If you want to go later and fix the other spots in the code, that's up to you.
| expr("leftId = rightId AND leftTime >= rightTime AND " + | ||
| "leftTime <= rightTime + interval 5 seconds"), | ||
| joinType = "rightOuter") | ||
| .select(col("rightId"), col("rightTime").cast("int"), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To make it easier to see / compare with the previous test, could you keep the results from the left stream to the left? (so the unmatched row looks like Row(null, null, 1, 1L) instead)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah OK. Actually that was to help reviewers to compare the result with previous test easily (as it would be same), but looks like it doesn't work as intended. I'll make a change.
|
Thanks again for reviewing!
Yes agreed. May not strictly needed to be very specific. Will keep the old name. |
|
Test build #112664 has finished for PR 26108 at commit
|
| def put(key: UnsafeRow, valueIndex: Long, value: UnsafeRow, matched: Boolean): Unit = { | ||
| val keyWithIndex = keyWithIndexRow(key, valueIndex) | ||
| stateStore.put(keyWithIndex, value) | ||
| val valueWithMatched = valueRowConverter.convertToValueRow(value, matched) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So, while replying to your comment, this came to my mind.
The way I see this code, when you start from a v1 state store, you'll also be writing back v1 data.
Shouldn't you write out new data, so that you fix the problem going forward once you start the app with the fixed version of Spark?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Even we write v2 data here, we only update part of state instead of full rewrite of state. It will leave v1 / v2 data co-exist and make state store not possible to read the entire rows - individual row doesn't have schema information. That's why we separate versions and isolate them. Yes that's bad, but that's the current state.
Actually we don't even have schema information for state store as well so we would encounter undefined behavior if we do such unsafe thing. SPARK-27237 (#24173) proposes to introduce schema information per state (not per row) and fail the query if schema is not compatible.
We can still rewrite the state entirely in a batch query (cannot be online modification) - please refer the README of https://github.com/HeartSaVioR/spark-state-tools. I've already implemented the migration of v1 to v2 for streaming aggregation / flatMapGroupsWithState, and I may try to implement this migration as well. I'm also proposing the project as a part of Apache Spark, but not as it is - I'm proposing batch data source for "state" first (SPARK-28190), and once we adopt it, we could also consider to have some tools for helping migration. Flink recently added the state API which denotes the necessity of the data source for "state".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, think I got it. Seems like even if we add some code that would write new data in the v2 format, the state store still may have data in v1 format that can trigger the correctness issue. So it's a small gain in the end, although that old state data would be constrained to state generated before the "fixed" Spark was used.
Given that I think that failing the query when the bad combination is detected is better (I see you already made that change).
One thing is that a "rewrite" of the state in this case, at least, is not a good option. The data is missing from the v1 data, so there's no way to generate correct v2 data from it. The only thing you'd achieve by doing that is to mask errors (since the exception you're adding wouldn't be triggered anymore).
So unless it's possible to have a "mixed" solution here, where the same store can have both v1 and v2 data, erroring out is the best outcome I can think of. I can think of ways to hack it (you could e.g. compare the column count for the UnsafeRow read from the store and see if it matches the count in the expected schema, or if it has the extra fields expected by the v2 data), but haven't thoroughly thought about it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One thing is that a "rewrite" of the state in this case, at least, is not a good option. The data is missing from the v1 data, so there's no way to generate correct v2 data from it.
Yeah you're right. At least for this case rewrite doesn't work. I forgot what I've already found months ago. Thanks for reminding.
I can think of ways to hack it (you could e.g. compare the column count for the UnsafeRow read from the store and see if it matches the count in the expected schema, or if it has the extra fields expected by the v2 data), but haven't thoroughly thought about it.
I also had thought about it a bit (and that was maybe one of review comment in previous PR) but it can bring side-effect if the query is changed. If end users change the query to let input of join containing one less column, there's a chance Spark may read "v1 row" as "v2 row" incorrectly whereas Spark should just fail the query since schema has been changed. Relying on column count is unsafe.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You could take that approach if you also had schema validation, as in your other PR. The main advantage is that it would simplify some of the versioning-related changes in this PR (i.e. you would only have to deal with the read path).
Anyway, it's more important to fix the correctness issue, so that can be done later.
|
Test build #112705 has finished for PR 26108 at commit
|
|
Test build #112710 has finished for PR 26108 at commit
|
| * Helper class for representing data key to (value, matched). | ||
| * Designed for object reuse. | ||
| */ | ||
| case class KeyToValueAndMatched( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I commented on the other class name, but the comment also applies to this one. AndMatched is very specific, I'd prefer to keep a more generic name (e.g. KeyToValuePair to kinda follow the old UnsafeRowPair name).
|
Test build #112797 has finished for PR 26108 at commit
|
|
Could we go through the next round of review? Thanks in advance! |
| def removeByKeyCondition(removalCondition: UnsafeRow => Boolean): Iterator[UnsafeRowPair] = { | ||
| new NextIterator[UnsafeRowPair] { | ||
| def removeByKeyCondition(removalCondition: UnsafeRow => Boolean) | ||
| : Iterator[KeyToValuePair] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fits in previous line.
| .map(pair => joinedRow.withLeft(nullLeft).withRight(pair.value)) | ||
| val outerOutputIter = removedRowIter.filterNot { kv => | ||
| stateFormatVersion match { | ||
| case 1 => matchesWithLeftSideState( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I find that if your statement doesn't fit in the same line as the case, it's easier to read when it starts on the next line. Otherwise the indentation of the next lines look weird.
| def removeByValueCondition(removalCondition: UnsafeRow => Boolean): Iterator[UnsafeRowPair] = { | ||
| new NextIterator[UnsafeRowPair] { | ||
| def removeByValueCondition(removalCondition: UnsafeRow => Boolean) | ||
| : Iterator[KeyToValuePair] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fits in previous line.
| if (hasMoreValuesForCurrentKey) { | ||
| // First search the values for the current key. | ||
| val currentValue = keyWithIndexToValue.get(currentKey, index) | ||
| val (currentValue, matched) = keyWithIndexToValue.getWithMatched(currentKey, index) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
getWithMatched can return null; is it guaranteed that it won't at this point? Otherwise you'll get an error here.
Maybe KeyWithIndexToValueRowConverterFormatV2 should return (null, false) instead of null? It would at least be closer to the semantics of the previous code.
And now that I noticed it, I kinda dislike getWithMatched for the same reason I disliked the other type names.
Is the extra overhead of valueRowConverter.convertValue allocating a new tuple, and this code "unapplying" that tuple, something that we need to worry about? I'm thinking that maybe if you return a proper type and take the same reuse approach as is done with KeyToValuePair, that would be better?
(But don't know if that can be done safely here.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, it's more than just the allocation; every time you call KeyWithIndexToValueRowConverterFormatV2.convertValue you will not only allocate that new tuple (and unapply it, depending on the call site), but you'll also be applying a projection to the UnsafeRow from the state store.
Is that overhead acceptable here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm thinking that maybe if you return a proper type and take the same reuse approach as is done with KeyToValuePair, that would be better?
The optimization around reusing instance is based on the fact that setter provides the iterator. Actually current optimization was really odd for me - after some digging I understood the intention but without knowing such detail, it's super easy to mess up. Iterating two getAll() results concurrently would provide interesting result.
Getting an element with applying reuse approach would be really confusing. Maybe it can be acceptable if we invert the approach and let caller provides the instance to fill (say, "out parameter") - caller is responsible to not let the instance be messed up, but easier to do that given they have context, especially if caller calls in a loop. I'll check if it could be done safely.
Regarding projection, we've done similar things in my previous PR, removing duplicated columns from key part to reduce state store size - the patch applied projection to key row - performance didn't hurt much (more clearly, the size of delta significantly matters) - #21733 (comment)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd wait for hearing voice on out parameter, as it complicates the logic (all methods in call hierarchy should support out parameter to achieve object reuse). I've changed tuple to case class to deal with null issue.
| if (numValues > 1) { | ||
| val valueAtMaxIndex = keyWithIndexToValue.get(currentKey, numValues - 1) | ||
| keyWithIndexToValue.put(currentKey, index, valueAtMaxIndex) | ||
| val (valueAtMaxIndex, matchedAtMaxIndex) = keyWithIndexToValue.getWithMatched(currentKey, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same question re: getWithMatched returning null.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here
|
Test build #113064 has finished for PR 26108 at commit
|
| val currentValue = keyWithIndexToValue.get(currentKey, index) | ||
| if (removalCondition(currentValue)) { | ||
| return currentValue | ||
| val valuePair = keyWithIndexToValue.get(currentKey, index) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can valuePair be null here? Just like before, keyWithIndexToValue.get can return null, but I'm not sure whether in this particular spot it would return null.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it would only return null when there's corruption on the state store - hasMoreValuesForCurrentKey == true denotes the index is valid (not an out of index), then the return value is not expected to be null.
|
Well, looks ok to the best of my knowledge (and given all the "can't do this with the current state store" replies). I'll give others one last chance to comment before pushing in a day or two. |
|
Looks like no comment from others so far; shall we consider merging this? |
|
Yes. Merging to master. |
|
Thanks for reviewing and merging! |
| .createWithDefault(2) | ||
|
|
||
| val STREAMING_JOIN_STATE_FORMAT_VERSION = | ||
| buildConf("spark.sql.streaming.join.stateFormatVersion") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi, @tgravescs . The incompatibility of state format was the reason of not having this in branch-2.4.
|
Hi, All.
|
|
We fixed the issue with "destructive way" on existing query as the state in existing query cannot be corrected. Please read below migration guide I added for this PR.
I'm not sure we can bring this change to "bugfix" version and force end users to follow the guide. |
|
As written in the migration guide, the situation is the same in 3.0.0 with old stored data, isn't it? AFAIK, there is no available correctness fix for the existing stored data (and queries). |
End users might not be mad if "major" version requires them to lose something. They still have Spark 2.x version line to deny the change and take the risk. If we put this to 2.4.5, they have no way to deny.
I guess not. Spark will "fail" the query when Spark detects that the query is using state format version 1 and stream-stream outer join is being used. That's what I said "destructive". |
|
So unlike other states having versions which can be co-existed, for stream-stream outer join, there're only "valid" and "invalid" state format which cannot be co-existed. Why we still keep state format 1 is that stream-stream inner join is not affected by this bug and we don't want to let the case also discard the state. It will affect too many existing queries and I'd like to reduce the impact. |
|
Got it. Thank you for the detailed explanation. |
|
@HeartSaVioR would you mind putting a comment in the jira (like you describe above) describing why it can't be back ported so its easier for us to filter and target the jiras? |
|
OK I'll migrating my comments to JIRA issue as well. Thanks for guidance. |
|
Btw, if we plan to have Spark 2.5 then makes sense to port back there (as end users still have option to reside on 2.4.x) but I'm not seeing such plan as of now. |
|
thanks |
| CheckAnswer(1.to(1000): _*)) | ||
| } | ||
|
|
||
| test("SPARK-26187 restore the stream-stream inner join query from Spark 2.4") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@HeartSaVioR Question on this test. The point of this test is to ensure that Spark 3.x can read join state from Spark 2.4. But since the format was changed for stream-stream outer joins (that is, only outer joins use the new format), shouldn't we do this compatibility test for outer joins?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please find SPARK-26187 restore the stream-stream outer join query from Spark 2.4 in this suite. (Yeah twos are unintentionally a bit far away, my bad. If moving test will help readability I'll do it as follow-up PR. Please let me know how you think.)
One thing, the format is changed for stream-stream joins instead of only outer joins, to allow changing the type of join inner <-> left/right outer, which is technically possible (with such correctness issue) in previous state and now possible without correctness issue.
Spark will fail the query if end users try to read state version 1 for stream-stream outer join, but will still allow state version 1 for stream-stream inner join to reduce the impact. Once they try to change the join type to outer then they'll have error message as covered in the test I mentioned above.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for the clarification.
What changes were proposed in this pull request?
This patch fixes the edge case of streaming left/right outer join described below:
Suppose query is provided as
select * from A join B on A.id = B.id AND (A.ts <= B.ts AND B.ts <= A.ts + interval 5 seconds)and there're two rows for L1 (from A) and R1 (from B) which ensures L1.id = R1.id and L1.ts = R1.ts.
(we can simply imagine it from self-join)
Then Spark processes L1 and R1 as below:
When determining outer rows to match with null, Spark applies some assumption commented in codebase, as below:
But as explained the edge-case earlier, the assumption is not correct. As we don't have any good assumption to optimize which doesn't have edge-case, we have to track whether such row is matched with others before, and match with null row only when the row is not matched.
To track the matching of row, the patch adds a new state to streaming join state manager, and mark whether the row is matched to others or not. We leverage the information when dealing with eviction of rows which would be candidates to match with null rows.
This approach introduces new state format which is not compatible with old state format - queries with old state format will be still running but they will still have the issue and be required to discard checkpoint and rerun to take this patch in effect.
Why are the changes needed?
This patch fixes a correctness issue.
Does this PR introduce any user-facing change?
No for compatibility viewpoint, but we'll encourage end users to discard the old checkpoint and rerun the query if they run stream-stream outer join query with old checkpoint, which might be "yes" for the question.
How was this patch tested?
Added UT which fails on current Spark and passes with this patch. Also passed existing streaming join UTs.