-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-19594][Structured Streaming] StreamingQueryListener fails to handle QueryTerminatedEvent if more then one listeners exists #16991
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
…vent if more then one listeners exists
| postToAll(s) | ||
| case t: QueryTerminatedEvent => | ||
| // run all the listeners synchronized before removing the id from the list | ||
| postToAll(t) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It will post QueryTerminatedEvent to all listeners directly in the current thread. Hence, the listener may see QueryProgressEvent after QueryTerminatedEvent.
You can override postToAll. It's fine to remove final from the postToAll method.
zsxwing
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The fix looks good. Just left some comments.
| /** | ||
| * Post the event to all registered listeners. The `postToAll` caller should guarantee calling | ||
| * `postToAll` in the same thread for all events. also remove the query id after all listeners | ||
| * process the QueryTerminatedEvent |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't copy the comments from the parent class. You can just document it as:
Override the parent `postToAll` to remove query from`activeQueryRunIds` after all listeners process `QueryTerminatedEvent`. (SPARK-19594)
| } | ||
| } | ||
|
|
||
| testQuietly("multiple listeners, check trigger events are generated correctly") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's better to add a regression test rather than copying the above test. Most of this test is testing the same thing in "single listener, check trigger events are generated correctly". It doesn't make sense. How about this:
test("SPARK-19594: all of listeners should receive QueryTerminatedEvent") {
val df = MemoryStream[Int].toDS().as[Long]
val listeners = (1 to 5).map(_ => new EventCollector)
try {
testStream(df, OutputMode.Append)(
StartStream(),
StopStream,
AssertOnQuery { query =>
eventually(Timeout(streamingTimeout)) {
listeners.foreach(listener => assert(listener.terminationEvent !== null))
listeners.foreach(listener => assert(listener.terminationEvent.id === query.id))
listeners.foreach(listener => assert(listener.terminationEvent.runId === query.runId))
listeners.foreach(listener => assert(listener.terminationEvent.exception === None))
}
listeners.foreach(listener => listener.checkAsyncErrors())
listeners.foreach(listener => listener.reset())
true
}
)
} finally {
listeners.foreach(spark.streams.removeListener)
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm ok with that (with a small fix)
need to add:
listeners.foreach(listener => spark.streams.addListener(listener))
…vent - test and comments fix
|
ok to test |
|
Test build #73496 has finished for PR 16991 at commit
|
|
LGTM. Merging to master and 2.1. Thanks! |
…andle QueryTerminatedEvent if more then one listeners exists ## What changes were proposed in this pull request? currently if multiple streaming queries listeners exists, when a QueryTerminatedEvent is triggered, only one of the listeners will be invoked while the rest of the listeners will ignore the event. this is caused since the the streaming queries listeners bus holds a set of running queries ids and when a termination event is triggered, after the first listeners is handling the event, the terminated query id is being removed from the set. in this PR, the query id will be removed from the set only after all the listeners handles the event ## How was this patch tested? a test with multiple listeners has been added to StreamingQueryListenerSuite Author: Eyal Zituny <[email protected]> Closes #16991 from eyalzit/master. (cherry picked from commit 9f8e392) Signed-off-by: Shixiong Zhu <[email protected]>
…andle QueryTerminatedEvent if more then one listeners exists ## What changes were proposed in this pull request? currently if multiple streaming queries listeners exists, when a QueryTerminatedEvent is triggered, only one of the listeners will be invoked while the rest of the listeners will ignore the event. this is caused since the the streaming queries listeners bus holds a set of running queries ids and when a termination event is triggered, after the first listeners is handling the event, the terminated query id is being removed from the set. in this PR, the query id will be removed from the set only after all the listeners handles the event ## How was this patch tested? a test with multiple listeners has been added to StreamingQueryListenerSuite Author: Eyal Zituny <[email protected]> Closes apache#16991 from eyalzit/master.
What changes were proposed in this pull request?
currently if multiple streaming queries listeners exists, when a QueryTerminatedEvent is triggered, only one of the listeners will be invoked while the rest of the listeners will ignore the event.
this is caused since the the streaming queries listeners bus holds a set of running queries ids and when a termination event is triggered, after the first listeners is handling the event, the terminated query id is being removed from the set.
in this PR, the query id will be removed from the set only after all the listeners handles the event
How was this patch tested?
a test with multiple listeners has been added to StreamingQueryListenerSuite