From 60035fa865fd85cc7e9441d2dc55d46693b16dee Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Tue, 28 Nov 2017 14:03:50 -0800 Subject: [PATCH 1/2] Use a separate query for StreamingQueryListenerBus --- .../scala/org/apache/spark/scheduler/LiveListenerBus.scala | 4 +++- .../sql/execution/streaming/StreamingQueryListenerBus.scala | 2 +- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala index 2f93c497c577..23121402b102 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala @@ -87,7 +87,9 @@ private[spark] class LiveListenerBus(conf: SparkConf) { * of each other (each one uses a separate thread for delivering events), allowing slower * listeners to be somewhat isolated from others. */ - private def addToQueue(listener: SparkListenerInterface, queue: String): Unit = synchronized { + private[spark] def addToQueue( + listener: SparkListenerInterface, + queue: String): Unit = synchronized { if (stopped.get()) { throw new IllegalStateException("LiveListenerBus is stopped.") } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala index 07e39023c836..f5f4081f03d1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala @@ -40,7 +40,7 @@ class StreamingQueryListenerBus(sparkListenerBus: LiveListenerBus) import StreamingQueryListener._ - sparkListenerBus.addToSharedQueue(this) + sparkListenerBus.addToQueue(this, "streams") /** * RunIds of active queries whose events are supposed to be forwarded by this ListenerBus From 9b4ce99baf391cf1033fc24553ca803ad86e71d2 Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Wed, 29 Nov 2017 15:35:30 -0800 Subject: [PATCH 2/2] Address Burak's comment --- .../sql/execution/streaming/StreamingQueryListenerBus.scala | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala index f5f4081f03d1..7dd491ede9d0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala @@ -40,7 +40,7 @@ class StreamingQueryListenerBus(sparkListenerBus: LiveListenerBus) import StreamingQueryListener._ - sparkListenerBus.addToQueue(this, "streams") + sparkListenerBus.addToQueue(this, StreamingQueryListenerBus.STREAM_EVENT_QUERY) /** * RunIds of active queries whose events are supposed to be forwarded by this ListenerBus @@ -130,3 +130,7 @@ class StreamingQueryListenerBus(sparkListenerBus: LiveListenerBus) } } } + +object StreamingQueryListenerBus { + val STREAM_EVENT_QUERY = "streams" +}