From bc40285323dc7aa0d846e4e07a6e28882b9c557d Mon Sep 17 00:00:00 2001 From: tedyu Date: Mon, 16 Nov 2015 11:02:35 -0800 Subject: [PATCH 1/8] Prevent the call to StreamingContext#stop() in the listener bus's thread --- .../spark/util/AsynchronousListenerBus.scala | 45 +++++++++++-------- .../spark/streaming/StreamingContext.scala | 5 ++- 2 files changed, 31 insertions(+), 19 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/AsynchronousListenerBus.scala b/core/src/main/scala/org/apache/spark/util/AsynchronousListenerBus.scala index c20627b056bef..65dc59aaed946 100644 --- a/core/src/main/scala/org/apache/spark/util/AsynchronousListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/util/AsynchronousListenerBus.scala @@ -19,6 +19,7 @@ package org.apache.spark.util import java.util.concurrent._ import java.util.concurrent.atomic.AtomicBoolean +import scala.util.DynamicVariable import org.apache.spark.SparkContext @@ -60,25 +61,27 @@ private[spark] abstract class AsynchronousListenerBus[L <: AnyRef, E](name: Stri private val listenerThread = new Thread(name) { setDaemon(true) override def run(): Unit = Utils.tryOrStopSparkContext(sparkContext) { - while (true) { - eventLock.acquire() - self.synchronized { - processingEvent = true - } - try { - val event = eventQueue.poll - if (event == null) { - // Get out of the while loop and shutdown the daemon thread - if (!stopped.get) { - throw new IllegalStateException("Polling `null` from eventQueue means" + - " the listener bus has been stopped. So `stopped` must be true") - } - return - } - postToAll(event) - } finally { + AsynchronousListenerBus.withinListenerThread.withValue(true) { + while (true) { + eventLock.acquire() self.synchronized { - processingEvent = false + processingEvent = true + } + try { + val event = eventQueue.poll + if (event == null) { + // Get out of the while loop and shutdown the daemon thread + if (!stopped.get) { + throw new IllegalStateException("Polling `null` from eventQueue means" + + " the listener bus has been stopped. So `stopped` must be true") + } + return + } + postToAll(event) + } finally { + self.synchronized { + processingEvent = false + } } } } @@ -177,3 +180,9 @@ private[spark] abstract class AsynchronousListenerBus[L <: AnyRef, E](name: Stri */ def onDropEvent(event: E): Unit } + +private[spark] object AsynchronousListenerBus { + /* Allows for Context to check whether stop() call is made within listener thread + */ + val withinListenerThread: DynamicVariable[Boolean] = new DynamicVariable[Boolean](false) +} \ No newline at end of file diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index 97113835f3bd0..1edcc3dc3a0c1 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -44,7 +44,7 @@ import org.apache.spark.streaming.dstream._ import org.apache.spark.streaming.receiver.{ActorReceiver, ActorSupervisorStrategy, Receiver} import org.apache.spark.streaming.scheduler.{JobScheduler, StreamingListener} import org.apache.spark.streaming.ui.{StreamingJobProgressListener, StreamingTab} -import org.apache.spark.util.{CallSite, ShutdownHookManager, ThreadUtils, Utils} +import org.apache.spark.util.{AsynchronousListenerBus, CallSite, ShutdownHookManager, ThreadUtils, Utils} /** * Main entry point for Spark Streaming functionality. It provides methods used to create @@ -693,6 +693,9 @@ class StreamingContext private[streaming] ( */ def stop(stopSparkContext: Boolean, stopGracefully: Boolean): Unit = { var shutdownHookRefToRemove: AnyRef = null + if (AsynchronousListenerBus.withinListenerThread.value) { + throw new SparkException("Cannot stop StreamingContext within listener thread of AsynchronousListenerBus") + } synchronized { try { state match { From 8f583b992c26ccdc8098369ccbf9323c4f8717e6 Mon Sep 17 00:00:00 2001 From: tedyu Date: Mon, 16 Nov 2015 11:28:47 -0800 Subject: [PATCH 2/8] Address Scalastyle check failures --- .../scala/org/apache/spark/util/AsynchronousListenerBus.scala | 3 ++- .../scala/org/apache/spark/streaming/StreamingContext.scala | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/AsynchronousListenerBus.scala b/core/src/main/scala/org/apache/spark/util/AsynchronousListenerBus.scala index 65dc59aaed946..6c1fca71f2281 100644 --- a/core/src/main/scala/org/apache/spark/util/AsynchronousListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/util/AsynchronousListenerBus.scala @@ -185,4 +185,5 @@ private[spark] object AsynchronousListenerBus { /* Allows for Context to check whether stop() call is made within listener thread */ val withinListenerThread: DynamicVariable[Boolean] = new DynamicVariable[Boolean](false) -} \ No newline at end of file +} + diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index 1edcc3dc3a0c1..aa351bfa7612c 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -694,7 +694,8 @@ class StreamingContext private[streaming] ( def stop(stopSparkContext: Boolean, stopGracefully: Boolean): Unit = { var shutdownHookRefToRemove: AnyRef = null if (AsynchronousListenerBus.withinListenerThread.value) { - throw new SparkException("Cannot stop StreamingContext within listener thread of AsynchronousListenerBus") + throw new SparkException("Cannot stop StreamingContext within listener thread of + AsynchronousListenerBus") } synchronized { try { From abab4616c45f0db95434b9b9664c07cbea5d731f Mon Sep 17 00:00:00 2001 From: tedyu Date: Mon, 16 Nov 2015 11:52:40 -0800 Subject: [PATCH 3/8] Address Scalastyle check --- .../scala/org/apache/spark/streaming/StreamingContext.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index aa351bfa7612c..aee172a4f549a 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -694,8 +694,8 @@ class StreamingContext private[streaming] ( def stop(stopSparkContext: Boolean, stopGracefully: Boolean): Unit = { var shutdownHookRefToRemove: AnyRef = null if (AsynchronousListenerBus.withinListenerThread.value) { - throw new SparkException("Cannot stop StreamingContext within listener thread of - AsynchronousListenerBus") + throw new SparkException("Cannot stop StreamingContext within listener thread of" + + " AsynchronousListenerBus") } synchronized { try { From d67a133017467a9ef17fa4ed331b6c2731b5482b Mon Sep 17 00:00:00 2001 From: tedyu Date: Tue, 17 Nov 2015 16:19:22 -0800 Subject: [PATCH 4/8] Add test --- .../streaming/StreamingListenerSuite.scala | 31 +++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala index 5dc0472c7770c..c99fca73cb729 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala @@ -21,6 +21,7 @@ import scala.collection.mutable.{ArrayBuffer, HashMap, SynchronizedBuffer, Synch import scala.concurrent.Future import scala.concurrent.ExecutionContext.Implicits.global +import org.apache.spark.SparkException import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.dstream.DStream import org.apache.spark.streaming.receiver.Receiver @@ -171,6 +172,14 @@ class StreamingListenerSuite extends TestSuiteBase with Matchers { "A successful batch should not set errorMessage") } + test("don't call ssc.stop in listener") { + ssc = new StreamingContext("local[2]", "ssc", Milliseconds(1000)) + val inputStream = ssc.receiverStream(new StreamingListenerSuiteReceiver) + inputStream.foreachRDD(_.count) + + val failureReasons = startStreamingContextAndCallStop(ssc) + } + test("onBatchCompleted with failed batch and one failed job") { ssc = new StreamingContext("local[2]", "test", Milliseconds(1000)) val inputStream = ssc.receiverStream(new StreamingListenerSuiteReceiver) @@ -207,6 +216,19 @@ class StreamingListenerSuite extends TestSuiteBase with Matchers { assert(failureReasons(1).contains("This is another failed job")) } + private def startStreamingContextAndCallStop(_ssc: StreamingContext): Unit = { + val contextStoppingCollector = new StreamingContextStoppingCollector(_ssc) + _ssc.addStreamingListener(contextStoppingCollector) + val batchCounter = new BatchCounter(_ssc) + _ssc.start() + // Make sure running at least one batch + batchCounter.waitUntilBatchesCompleted(expectedNumCompletedBatches = 1, timeout = 10000) + intercept[SparkException] { + _ssc.awaitTerminationOrTimeout(10000) + } + _ssc.stop() + } + private def startStreamingContextAndCollectFailureReasons( _ssc: StreamingContext, isFailed: Boolean = false): Map[Int, String] = { val failureReasonsCollector = new FailureReasonsCollector() @@ -320,3 +342,12 @@ class FailureReasonsCollector extends StreamingListener { } } } +/** + * A StreamingListener that calls StreamingContext.stop(). + */ +class StreamingContextStoppingCollector(val ssc: StreamingContext) extends StreamingListener { + override def onOutputOperationStarted( + outputOperationStarted: StreamingListenerOutputOperationStarted): Unit = { + ssc.stop() + } +} From e0c61631dd480854e254ba2eb445e988efc3e05e Mon Sep 17 00:00:00 2001 From: tedyu Date: Tue, 17 Nov 2015 16:54:21 -0800 Subject: [PATCH 5/8] Modify the new test --- .../streaming/StreamingListenerSuite.scala | 29 ++++++++++--------- 1 file changed, 16 insertions(+), 13 deletions(-) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala index c99fca73cb729..571c73bd8fec8 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala @@ -162,22 +162,22 @@ class StreamingListenerSuite extends TestSuiteBase with Matchers { } } - test("onBatchCompleted with successful batch") { - ssc = new StreamingContext("local[2]", "test", Milliseconds(1000)) + test("don't call ssc.stop in listener") { + ssc = new StreamingContext("local[2]", "ssc", Milliseconds(1000)) val inputStream = ssc.receiverStream(new StreamingListenerSuiteReceiver) inputStream.foreachRDD(_.count) - val failureReasons = startStreamingContextAndCollectFailureReasons(ssc) - assert(failureReasons != null && failureReasons.isEmpty, - "A successful batch should not set errorMessage") + val failureReasons = startStreamingContextAndCallStop(ssc) } - test("don't call ssc.stop in listener") { - ssc = new StreamingContext("local[2]", "ssc", Milliseconds(1000)) + test("onBatchCompleted with successful batch") { + ssc = new StreamingContext("local[2]", "test", Milliseconds(1000)) val inputStream = ssc.receiverStream(new StreamingListenerSuiteReceiver) inputStream.foreachRDD(_.count) - val failureReasons = startStreamingContextAndCallStop(ssc) + val failureReasons = startStreamingContextAndCollectFailureReasons(ssc) + assert(failureReasons != null && failureReasons.isEmpty, + "A successful batch should not set errorMessage") } test("onBatchCompleted with failed batch and one failed job") { @@ -223,10 +223,8 @@ class StreamingListenerSuite extends TestSuiteBase with Matchers { _ssc.start() // Make sure running at least one batch batchCounter.waitUntilBatchesCompleted(expectedNumCompletedBatches = 1, timeout = 10000) - intercept[SparkException] { - _ssc.awaitTerminationOrTimeout(10000) - } _ssc.stop() + assert(contextStoppingCollector.sparkExSeen) } private def startStreamingContextAndCollectFailureReasons( @@ -346,8 +344,13 @@ class FailureReasonsCollector extends StreamingListener { * A StreamingListener that calls StreamingContext.stop(). */ class StreamingContextStoppingCollector(val ssc: StreamingContext) extends StreamingListener { - override def onOutputOperationStarted( - outputOperationStarted: StreamingListenerOutputOperationStarted): Unit = { + var sparkExSeen = false + override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) { + try { ssc.stop() + } catch { + case se: SparkException => + sparkExSeen = true + } } } From 8ff4c61b093214b3a9c94c42d3cba9e2519d1fda Mon Sep 17 00:00:00 2001 From: tedyu Date: Tue, 17 Nov 2015 17:01:50 -0800 Subject: [PATCH 6/8] Add volatile to sparkExSeen --- .../org/apache/spark/streaming/StreamingListenerSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala index 571c73bd8fec8..788bfd15f221d 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala @@ -344,7 +344,7 @@ class FailureReasonsCollector extends StreamingListener { * A StreamingListener that calls StreamingContext.stop(). */ class StreamingContextStoppingCollector(val ssc: StreamingContext) extends StreamingListener { - var sparkExSeen = false + volatile var sparkExSeen = false override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) { try { ssc.stop() From 4ef7de233744c89d09daf1b836c246ae990dab5a Mon Sep 17 00:00:00 2001 From: tedyu Date: Tue, 17 Nov 2015 17:03:57 -0800 Subject: [PATCH 7/8] Remove unused variable --- .../org/apache/spark/streaming/StreamingListenerSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala index 788bfd15f221d..cc2c5d427f064 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala @@ -167,7 +167,7 @@ class StreamingListenerSuite extends TestSuiteBase with Matchers { val inputStream = ssc.receiverStream(new StreamingListenerSuiteReceiver) inputStream.foreachRDD(_.count) - val failureReasons = startStreamingContextAndCallStop(ssc) + startStreamingContextAndCallStop(ssc) } test("onBatchCompleted with successful batch") { From 922bf0bece6b667350c6105dc84be1d3bd9f58c7 Mon Sep 17 00:00:00 2001 From: tedyu Date: Tue, 17 Nov 2015 17:06:17 -0800 Subject: [PATCH 8/8] Volatile --- .../org/apache/spark/streaming/StreamingListenerSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala index cc2c5d427f064..df4575ab25aad 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala @@ -344,7 +344,7 @@ class FailureReasonsCollector extends StreamingListener { * A StreamingListener that calls StreamingContext.stop(). */ class StreamingContextStoppingCollector(val ssc: StreamingContext) extends StreamingListener { - volatile var sparkExSeen = false + @volatile var sparkExSeen = false override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) { try { ssc.stop()