Skip to content

Commit ee10ca7

Browse files
committed
[SPARK-22638][SS] Use a separate queue for StreamingQueryListenerBus
## What changes were proposed in this pull request? Use a separate Spark event queue for StreamingQueryListenerBus so that if there are many non-streaming events, streaming query listeners don't need to wait for other Spark listeners and can catch up. ## How was this patch tested? Jenkins Author: Shixiong Zhu <[email protected]> Closes apache#19838 from zsxwing/SPARK-22638.
1 parent 9d06a9e commit ee10ca7

File tree

2 files changed

+8
-2
lines changed

2 files changed

+8
-2
lines changed

core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,9 @@ private[spark] class LiveListenerBus(conf: SparkConf) {
8787
* of each other (each one uses a separate thread for delivering events), allowing slower
8888
* listeners to be somewhat isolated from others.
8989
*/
90-
private def addToQueue(listener: SparkListenerInterface, queue: String): Unit = synchronized {
90+
private[spark] def addToQueue(
91+
listener: SparkListenerInterface,
92+
queue: String): Unit = synchronized {
9193
if (stopped.get()) {
9294
throw new IllegalStateException("LiveListenerBus is stopped.")
9395
}

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ class StreamingQueryListenerBus(sparkListenerBus: LiveListenerBus)
4040

4141
import StreamingQueryListener._
4242

43-
sparkListenerBus.addToSharedQueue(this)
43+
sparkListenerBus.addToQueue(this, StreamingQueryListenerBus.STREAM_EVENT_QUERY)
4444

4545
/**
4646
* RunIds of active queries whose events are supposed to be forwarded by this ListenerBus
@@ -130,3 +130,7 @@ class StreamingQueryListenerBus(sparkListenerBus: LiveListenerBus)
130130
}
131131
}
132132
}
133+
134+
object StreamingQueryListenerBus {
135+
val STREAM_EVENT_QUERY = "streams"
136+
}

0 commit comments

Comments
 (0)