File tree Expand file tree Collapse file tree 1 file changed +19
-6
lines changed
streaming/src/test/scala/org/apache/spark/streaming Expand file tree Collapse file tree 1 file changed +19
-6
lines changed Original file line number Diff line number Diff line change @@ -222,7 +222,11 @@ class StreamingListenerSuite extends TestSuiteBase with Matchers {
222222 val batchCounter = new BatchCounter (_ssc)
223223 _ssc.start()
224224 // Make sure running at least one batch
225- batchCounter.waitUntilBatchesCompleted(expectedNumCompletedBatches = 1 , timeout = 10000 )
225+ if (! batchCounter.waitUntilBatchesCompleted(expectedNumCompletedBatches = 1 , timeout = 10000 )) {
226+ fail(" The first batch cannot complete in 10 seconds" )
227+ }
228+ // When reaching here, we can make sure `StreamingContextStoppingCollector` won't call
229+ // `ssc.stop()`, so it's safe to call `_ssc.stop()` now.
226230 _ssc.stop()
227231 assert(contextStoppingCollector.sparkExSeen)
228232 }
@@ -345,12 +349,21 @@ class FailureReasonsCollector extends StreamingListener {
345349 */
346350class StreamingContextStoppingCollector (val ssc : StreamingContext ) extends StreamingListener {
347351 @ volatile var sparkExSeen = false
352+
353+ private var isFirstBatch = true
354+
348355 override def onBatchCompleted (batchCompleted : StreamingListenerBatchCompleted ) {
349- try {
350- ssc.stop()
351- } catch {
352- case se : SparkException =>
353- sparkExSeen = true
356+ if (isFirstBatch) {
357+ // We should only call `ssc.stop()` in the first batch. Otherwise, it's possible that the main
358+ // thread is calling `ssc.stop()`, while StreamingContextStoppingCollector is also calling
359+ // `ssc.stop()` in the listener thread, which becomes a dead-lock.
360+ isFirstBatch = false
361+ try {
362+ ssc.stop()
363+ } catch {
364+ case se : SparkException =>
365+ sparkExSeen = true
366+ }
354367 }
355368 }
356369}
You can’t perform that action at this time.
0 commit comments