@@ -77,14 +77,21 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatc
7777 test(" bus.stop() waits for the event queue to completely drain" ) {
7878 @ volatile var drained = false
7979
80+ // When Listener has started
81+ val listenerStarted = new Semaphore (0 )
82+
8083 // Tells the listener to stop blocking
81- val listenerWait = new Semaphore (1 )
84+ val listenerWait = new Semaphore (0 )
85+
86+ // When stopper has started
87+ val stopperStarted = new Semaphore (0 )
8288
83- // When stop has returned
84- val stopReturned = new Semaphore (1 )
89+ // When stopper has returned
90+ val stopperReturned = new Semaphore (0 )
8591
8692 class BlockingListener extends SparkListener {
8793 override def onJobEnd (jobEnd : SparkListenerJobEnd ) = {
94+ listenerStarted.release()
8895 listenerWait.acquire()
8996 drained = true
9097 }
@@ -97,23 +104,26 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatc
97104 bus.start()
98105 bus.post(SparkListenerJobEnd (0 , JobSucceeded ))
99106
100- // the queue should not drain immediately
107+ listenerStarted.acquire()
108+ // Listener should be blocked after start
101109 assert(! drained)
102110
103111 new Thread (" ListenerBusStopper" ) {
104112 override def run () {
113+ stopperStarted.release()
105114 // stop() will block until notify() is called below
106115 bus.stop()
107- stopReturned .release(1 )
116+ stopperReturned .release()
108117 }
109118 }.start()
110119
111- while ( ! bus.stopCalled) {
112- Thread .sleep( 10 )
113- }
120+ stopperStarted.acquire()
121+ // Listener should remain blocked after stopper started
122+ assert( ! drained)
114123
124+ // unblock Listener to let queue drain
115125 listenerWait.release()
116- stopReturned .acquire()
126+ stopperReturned .acquire()
117127 assert(drained)
118128 }
119129
0 commit comments