@@ -24,6 +24,7 @@ import scala.collection.JavaConverters._
2424
2525import org .scalatest .Matchers
2626
27+ import org .apache .spark .SparkException
2728import org .apache .spark .executor .TaskMetrics
2829import org .apache .spark .util .ResetSystemProperties
2930import org .apache .spark .{LocalSparkContext , SparkConf , SparkContext , SparkFunSuite }
@@ -36,6 +37,21 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
3637
3738 val jobCompletionTime = 1421191296660L
3839
40+ test(" don't call sc.stop in listener" ) {
41+ sc = new SparkContext (" local" , " SparkListenerSuite" )
42+ val listener = new SparkContextStoppingListener (sc)
43+ val bus = new LiveListenerBus
44+ bus.addListener(listener)
45+
46+ // Starting listener bus should flush all buffered events
47+ bus.start(sc)
48+ bus.post(SparkListenerJobEnd (0 , jobCompletionTime, JobSucceeded ))
49+ bus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS )
50+
51+ bus.stop()
52+ assert(listener.sparkExSeen)
53+ }
54+
3955 test(" basic creation and shutdown of LiveListenerBus" ) {
4056 val counter = new BasicJobCounter
4157 val bus = new LiveListenerBus
@@ -443,6 +459,21 @@ private class BasicJobCounter extends SparkListener {
443459 override def onJobEnd (job : SparkListenerJobEnd ): Unit = count += 1
444460}
445461
462+ /**
463+ * A simple listener that tries to stop SparkContext.
464+ */
465+ private class SparkContextStoppingListener (val sc : SparkContext ) extends SparkListener {
466+ @ volatile var sparkExSeen = false
467+ override def onJobEnd (job : SparkListenerJobEnd ): Unit = {
468+ try {
469+ sc.stop()
470+ } catch {
471+ case se : SparkException =>
472+ sparkExSeen = true
473+ }
474+ }
475+ }
476+
446477private class ListenerThatAcceptsSparkConf (conf : SparkConf ) extends SparkListener {
447478 var count = 0
448479 override def onJobEnd (job : SparkListenerJobEnd ): Unit = count += 1
0 commit comments