Skip to content

Commit 1b46556

Browse files
committed
[SPARK-7361] [STREAMING] Throw unambiguous exception when attempting to start multiple StreamingContexts in the same JVM
Currently attempt to start a streamingContext while another one is started throws a confusing exception that the action name JobScheduler is already registered. Instead its best to throw a proper exception as it is not supported. Author: Tathagata Das <[email protected]> Closes apache#5907 from tdas/SPARK-7361 and squashes the following commits: fb81c4a [Tathagata Das] Fix typo a9cd5bb [Tathagata Das] Added startSite to StreamingContext 5fdfc0d [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into SPARK-7361 5870e2b [Tathagata Das] Added check for multiple streaming contexts
1 parent 4f8a155 commit 1b46556

File tree

2 files changed

+58
-8
lines changed

2 files changed

+58
-8
lines changed

streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala

Lines changed: 40 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
package org.apache.spark.streaming
1919

2020
import java.io.InputStream
21-
import java.util.concurrent.atomic.AtomicInteger
21+
import java.util.concurrent.atomic.{AtomicInteger, AtomicReference}
2222

2323
import scala.collection.Map
2424
import scala.collection.mutable.Queue
@@ -28,17 +28,19 @@ import akka.actor.{Props, SupervisorStrategy}
2828
import org.apache.hadoop.conf.Configuration
2929
import org.apache.hadoop.fs.Path
3030
import org.apache.hadoop.io.{BytesWritable, LongWritable, Text}
31-
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
3231
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
32+
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
33+
3334
import org.apache.spark._
3435
import org.apache.spark.annotation.Experimental
3536
import org.apache.spark.input.FixedLengthBinaryInputFormat
3637
import org.apache.spark.rdd.RDD
3738
import org.apache.spark.storage.StorageLevel
3839
import org.apache.spark.streaming.dstream._
3940
import org.apache.spark.streaming.receiver.{ActorReceiver, ActorSupervisorStrategy, Receiver}
40-
import org.apache.spark.streaming.scheduler._
41+
import org.apache.spark.streaming.scheduler.{JobScheduler, StreamingListener}
4142
import org.apache.spark.streaming.ui.{StreamingJobProgressListener, StreamingTab}
43+
import org.apache.spark.util.CallSite
4244

4345
/**
4446
* Main entry point for Spark Streaming functionality. It provides methods used to create
@@ -202,6 +204,8 @@ class StreamingContext private[streaming] (
202204
import StreamingContextState._
203205
private[streaming] var state = Initialized
204206

207+
private val startSite = new AtomicReference[CallSite](null)
208+
205209
/**
206210
* Return the associated Spark context
207211
*/
@@ -518,17 +522,23 @@ class StreamingContext private[streaming] (
518522
* @throws SparkException if the context has already been started or stopped.
519523
*/
520524
def start(): Unit = synchronized {
525+
import StreamingContext._
521526
if (state == Started) {
522527
throw new SparkException("StreamingContext has already been started")
523528
}
524529
if (state == Stopped) {
525530
throw new SparkException("StreamingContext has already been stopped")
526531
}
527532
validate()
528-
sparkContext.setCallSite(DStream.getCreationSite())
529-
scheduler.start()
530-
uiTab.foreach(_.attach())
531-
state = Started
533+
startSite.set(DStream.getCreationSite())
534+
sparkContext.setCallSite(startSite.get)
535+
ACTIVATION_LOCK.synchronized {
536+
assertNoOtherContextIsActive()
537+
scheduler.start()
538+
uiTab.foreach(_.attach())
539+
state = Started
540+
setActiveContext(this)
541+
}
532542
}
533543

534544
/**
@@ -603,6 +613,7 @@ class StreamingContext private[streaming] (
603613
uiTab.foreach(_.detach())
604614
// The state should always be Stopped after calling `stop()`, even if we haven't started yet:
605615
state = Stopped
616+
StreamingContext.setActiveContext(null)
606617
}
607618
}
608619

@@ -612,8 +623,29 @@ class StreamingContext private[streaming] (
612623
*/
613624

614625
object StreamingContext extends Logging {
626+
/**
627+
* Lock that guards access to global variables that track active StreamingContext.
628+
*/
629+
private val ACTIVATION_LOCK = new Object()
615630

616-
private[streaming] val DEFAULT_CLEANER_TTL = 3600
631+
private val activeContext = new AtomicReference[StreamingContext](null)
632+
633+
private def assertNoOtherContextIsActive(): Unit = {
634+
ACTIVATION_LOCK.synchronized {
635+
if (activeContext.get() != null) {
636+
throw new SparkException(
637+
"Only one StreamingContext may be started in this JVM. " +
638+
"Currently running StreamingContext was started at" +
639+
activeContext.get.startSite.get.longForm)
640+
}
641+
}
642+
}
643+
644+
private def setActiveContext(ssc: StreamingContext): Unit = {
645+
ACTIVATION_LOCK.synchronized {
646+
activeContext.set(ssc)
647+
}
648+
}
617649

618650
@deprecated("Replaced by implicit functions in the DStream companion object. This is " +
619651
"kept here only for backward compatibility.", "1.3.0")

streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -480,6 +480,24 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
480480
}
481481
}
482482

483+
test("multiple streaming contexts") {
484+
sc = new SparkContext(new SparkConf().setMaster(master).setAppName(appName))
485+
ssc = new StreamingContext(sc, Seconds(1))
486+
val input = addInputStream(ssc)
487+
input.foreachRDD { rdd => rdd.count }
488+
ssc.start()
489+
490+
// Creating another streaming context should not create errors
491+
val anotherSsc = new StreamingContext(sc, Seconds(10))
492+
val anotherInput = addInputStream(anotherSsc)
493+
anotherInput.foreachRDD { rdd => rdd.count }
494+
495+
val exception = intercept[SparkException] {
496+
anotherSsc.start()
497+
}
498+
assert(exception.getMessage.contains("StreamingContext"), "Did not get the right exception")
499+
}
500+
483501
test("DStream and generated RDD creation sites") {
484502
testPackage.test()
485503
}

0 commit comments

Comments
 (0)