Skip to content

Commit 9f8e392

Browse files
Eyal Zitunyzsxwing
authored andcommitted
[SPARK-19594][STRUCTURED STREAMING] StreamingQueryListener fails to handle QueryTerminatedEvent if more then one listeners exists
## What changes were proposed in this pull request? currently if multiple streaming queries listeners exists, when a QueryTerminatedEvent is triggered, only one of the listeners will be invoked while the rest of the listeners will ignore the event. this is caused since the the streaming queries listeners bus holds a set of running queries ids and when a termination event is triggered, after the first listeners is handling the event, the terminated query id is being removed from the set. in this PR, the query id will be removed from the set only after all the listeners handles the event ## How was this patch tested? a test with multiple listeners has been added to StreamingQueryListenerSuite Author: Eyal Zituny <[email protected]> Closes #16991 from eyalzit/master.
1 parent 68f2142 commit 9f8e392

File tree

3 files changed

+39
-2
lines changed

3 files changed

+39
-2
lines changed

core/src/main/scala/org/apache/spark/util/ListenerBus.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ private[spark] trait ListenerBus[L <: AnyRef, E] extends Logging {
5252
* Post the event to all registered listeners. The `postToAll` caller should guarantee calling
5353
* `postToAll` in the same thread for all events.
5454
*/
55-
final def postToAll(event: E): Unit = {
55+
def postToAll(event: E): Unit = {
5656
// JavaConverters can create a JIterableWrapper if we use asScala.
5757
// However, this method will be called frequently. To avoid the wrapper cost, here we use
5858
// Java Iterator directly.

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,19 @@ class StreamingQueryListenerBus(sparkListenerBus: LiveListenerBus)
7575
}
7676
}
7777

78+
/**
79+
* Override the parent `postToAll` to remove the query id from `activeQueryRunIds` after all
80+
* the listeners process `QueryTerminatedEvent`. (SPARK-19594)
81+
*/
82+
override def postToAll(event: Event): Unit = {
83+
super.postToAll(event)
84+
event match {
85+
case t: QueryTerminatedEvent =>
86+
activeQueryRunIds.synchronized { activeQueryRunIds -= t.runId }
87+
case _ =>
88+
}
89+
}
90+
7891
override def onOtherEvent(event: SparkListenerEvent): Unit = {
7992
event match {
8093
case e: StreamingQueryListener.Event =>
@@ -112,7 +125,6 @@ class StreamingQueryListenerBus(sparkListenerBus: LiveListenerBus)
112125
case queryTerminated: QueryTerminatedEvent =>
113126
if (shouldReport(queryTerminated.runId)) {
114127
listener.onQueryTerminated(queryTerminated)
115-
activeQueryRunIds.synchronized { activeQueryRunIds -= queryTerminated.runId }
116128
}
117129
case _ =>
118130
}

sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,31 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
133133
}
134134
}
135135

136+
test("SPARK-19594: all of listeners should receive QueryTerminatedEvent") {
137+
val df = MemoryStream[Int].toDS().as[Long]
138+
val listeners = (1 to 5).map(_ => new EventCollector)
139+
try {
140+
listeners.foreach(listener => spark.streams.addListener(listener))
141+
testStream(df, OutputMode.Append)(
142+
StartStream(),
143+
StopStream,
144+
AssertOnQuery { query =>
145+
eventually(Timeout(streamingTimeout)) {
146+
listeners.foreach(listener => assert(listener.terminationEvent !== null))
147+
listeners.foreach(listener => assert(listener.terminationEvent.id === query.id))
148+
listeners.foreach(listener => assert(listener.terminationEvent.runId === query.runId))
149+
listeners.foreach(listener => assert(listener.terminationEvent.exception === None))
150+
}
151+
listeners.foreach(listener => listener.checkAsyncErrors())
152+
listeners.foreach(listener => listener.reset())
153+
true
154+
}
155+
)
156+
} finally {
157+
listeners.foreach(spark.streams.removeListener)
158+
}
159+
}
160+
136161
test("adding and removing listener") {
137162
def isListenerActive(listener: EventCollector): Boolean = {
138163
listener.reset()

0 commit comments

Comments
 (0)