Skip to content

Commit d2cddc8

Browse files
Marcelo Vanzinsquito
authored andcommitted
[SPARK-22850][CORE] Ensure queued events are delivered to all event queues.
The code in LiveListenerBus was queueing events before start in the queues themselves; so in situations like the following: bus.post(someEvent) bus.addToEventLogQueue(listener) bus.start() "someEvent" would not be delivered to "listener" if that was the first listener in the queue, because the queue wouldn't exist when the event was posted. This change buffers the events before starting the bus in the bus itself, so that they can be delivered to all registered queues when the bus is started. Also tweaked the unit tests to cover the behavior above. Author: Marcelo Vanzin <[email protected]> Closes #20039 from vanzin/SPARK-22850.
1 parent 93f92c0 commit d2cddc8

File tree

2 files changed

+52
-14
lines changed

2 files changed

+52
-14
lines changed

core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala

Lines changed: 39 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,9 @@ private[spark] class LiveListenerBus(conf: SparkConf) {
6262

6363
private val queues = new CopyOnWriteArrayList[AsyncEventQueue]()
6464

65+
// Visible for testing.
66+
@volatile private[scheduler] var queuedEvents = new mutable.ListBuffer[SparkListenerEvent]()
67+
6568
/** Add a listener to queue shared by all non-internal listeners. */
6669
def addToSharedQueue(listener: SparkListenerInterface): Unit = {
6770
addToQueue(listener, SHARED_QUEUE)
@@ -125,13 +128,39 @@ private[spark] class LiveListenerBus(conf: SparkConf) {
125128

126129
/** Post an event to all queues. */
127130
def post(event: SparkListenerEvent): Unit = {
128-
if (!stopped.get()) {
129-
metrics.numEventsPosted.inc()
130-
val it = queues.iterator()
131-
while (it.hasNext()) {
132-
it.next().post(event)
131+
if (stopped.get()) {
132+
return
133+
}
134+
135+
metrics.numEventsPosted.inc()
136+
137+
// If the event buffer is null, it means the bus has been started and we can avoid
138+
// synchronization and post events directly to the queues. This should be the most
139+
// common case during the life of the bus.
140+
if (queuedEvents == null) {
141+
postToQueues(event)
142+
return
143+
}
144+
145+
// Otherwise, need to synchronize to check whether the bus is started, to make sure the thread
146+
// calling start() picks up the new event.
147+
synchronized {
148+
if (!started.get()) {
149+
queuedEvents += event
150+
return
133151
}
134152
}
153+
154+
// If the bus was already started when the check above was made, just post directly to the
155+
// queues.
156+
postToQueues(event)
157+
}
158+
159+
private def postToQueues(event: SparkListenerEvent): Unit = {
160+
val it = queues.iterator()
161+
while (it.hasNext()) {
162+
it.next().post(event)
163+
}
135164
}
136165

137166
/**
@@ -149,7 +178,11 @@ private[spark] class LiveListenerBus(conf: SparkConf) {
149178
}
150179

151180
this.sparkContext = sc
152-
queues.asScala.foreach(_.start(sc))
181+
queues.asScala.foreach { q =>
182+
q.start(sc)
183+
queuedEvents.foreach(q.post)
184+
}
185+
queuedEvents = null
153186
metricsSystem.registerSource(metrics)
154187
}
155188

core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
4848
bus.metrics.metricRegistry.counter(s"queue.$SHARED_QUEUE.numDroppedEvents").getCount
4949
}
5050

51-
private def queueSize(bus: LiveListenerBus): Int = {
51+
private def sharedQueueSize(bus: LiveListenerBus): Int = {
5252
bus.metrics.metricRegistry.getGauges().get(s"queue.$SHARED_QUEUE.size").getValue()
5353
.asInstanceOf[Int]
5454
}
@@ -73,12 +73,11 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
7373
val conf = new SparkConf()
7474
val counter = new BasicJobCounter
7575
val bus = new LiveListenerBus(conf)
76-
bus.addToSharedQueue(counter)
7776

7877
// Metrics are initially empty.
7978
assert(bus.metrics.numEventsPosted.getCount === 0)
8079
assert(numDroppedEvents(bus) === 0)
81-
assert(queueSize(bus) === 0)
80+
assert(bus.queuedEvents.size === 0)
8281
assert(eventProcessingTimeCount(bus) === 0)
8382

8483
// Post five events:
@@ -87,17 +86,23 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
8786
// Five messages should be marked as received and queued, but no messages should be posted to
8887
// listeners yet because the the listener bus hasn't been started.
8988
assert(bus.metrics.numEventsPosted.getCount === 5)
90-
assert(queueSize(bus) === 5)
89+
assert(bus.queuedEvents.size === 5)
90+
91+
// Add the counter to the bus after messages have been queued for later delivery.
92+
bus.addToSharedQueue(counter)
9193
assert(counter.count === 0)
9294

9395
// Starting listener bus should flush all buffered events
9496
bus.start(mockSparkContext, mockMetricsSystem)
9597
Mockito.verify(mockMetricsSystem).registerSource(bus.metrics)
9698
bus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
9799
assert(counter.count === 5)
98-
assert(queueSize(bus) === 0)
100+
assert(sharedQueueSize(bus) === 0)
99101
assert(eventProcessingTimeCount(bus) === 5)
100102

103+
// After the bus is started, there should be no more queued events.
104+
assert(bus.queuedEvents === null)
105+
101106
// After listener bus has stopped, posting events should not increment counter
102107
bus.stop()
103108
(1 to 5).foreach { _ => bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded)) }
@@ -188,18 +193,18 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
188193
// Post a message to the listener bus and wait for processing to begin:
189194
bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded))
190195
listenerStarted.acquire()
191-
assert(queueSize(bus) === 0)
196+
assert(sharedQueueSize(bus) === 0)
192197
assert(numDroppedEvents(bus) === 0)
193198

194199
// If we post an additional message then it should remain in the queue because the listener is
195200
// busy processing the first event:
196201
bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded))
197-
assert(queueSize(bus) === 1)
202+
assert(sharedQueueSize(bus) === 1)
198203
assert(numDroppedEvents(bus) === 0)
199204

200205
// The queue is now full, so any additional events posted to the listener will be dropped:
201206
bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded))
202-
assert(queueSize(bus) === 1)
207+
assert(sharedQueueSize(bus) === 1)
203208
assert(numDroppedEvents(bus) === 1)
204209

205210
// Allow the the remaining events to be processed so we can stop the listener bus:

0 commit comments

Comments
 (0)