Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ final private[streaming] class DStreamGraph extends Serializable with Logging {
startTime = time
outputStreams.foreach(_.initialize(zeroTime))
outputStreams.foreach(_.remember(rememberDuration))
outputStreams.foreach(_.validate)
outputStreams.foreach(_.validateAtStart)
inputStreams.par.foreach(_.start())
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ abstract class DStream[T: ClassTag] (
@transient private[streaming] var ssc: StreamingContext
) extends Serializable with Logging {

validateAtInit()

// =======================================================================
// Methods that should be implemented by subclasses of DStream
// =======================================================================
Expand Down Expand Up @@ -171,7 +173,22 @@ abstract class DStream[T: ClassTag] (
dependencies.foreach(_.initialize(zeroTime))
}

private[streaming] def validate() {
private def validateAtInit(): Unit = {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I renamed validate() to validateAtStart(), and added validateAtInit()

ssc.getState() match {
case StreamingContextState.INITIALIZED =>
// good to go
case StreamingContextState.ACTIVE =>
throw new SparkException(
"Adding new inputs, transformations, and output operations after " +
"starting a context is not supported")
case StreamingContextState.STOPPED =>
throw new SparkException(
"Adding new inputs, transformations, and output operations after " +
"stopping a context is not supported")
}
}

private[streaming] def validateAtStart() {
assert(rememberDuration != null, "Remember duration is set to null")

assert(
Expand Down Expand Up @@ -226,7 +243,7 @@ abstract class DStream[T: ClassTag] (
math.ceil(rememberDuration.milliseconds / 1000.0).toInt + " seconds."
)

dependencies.foreach(_.validate())
dependencies.foreach(_.validateAtStart())

logInfo("Slide time = " + slideDuration)
logInfo("Storage level = " + storageLevel)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -513,6 +513,45 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
testPackage.test()
}

test("throw exception on using active or stopped context") {
val conf = new SparkConf()
.setMaster(master)
.setAppName(appName)
.set("spark.streaming.clock", "org.apache.spark.util.ManualClock")
ssc = new StreamingContext(conf, batchDuration)
require(ssc.getState() === StreamingContextState.INITIALIZED)
val input = addInputStream(ssc)
val transformed = input.map { x => x}
transformed.foreachRDD { rdd => rdd.count }

def testForException(clue: String, expectedErrorMsg: String)(body: => Unit): Unit = {
withClue(clue) {
val ex = intercept[SparkException] {
body
}
assert(ex.getMessage.toLowerCase().contains(expectedErrorMsg))
}
}

ssc.start()
require(ssc.getState() === StreamingContextState.ACTIVE)
testForException("no error on adding input after start", "start") {
addInputStream(ssc) }
testForException("no error on adding transformation after start", "start") {
input.map { x => x * 2 } }
testForException("no error on adding output operation after start", "start") {
transformed.foreachRDD { rdd => rdd.collect() } }

ssc.stop()
require(ssc.getState() === StreamingContextState.STOPPED)
testForException("no error on adding input after stop", "stop") {
addInputStream(ssc) }
testForException("no error on adding transformation after stop", "stop") {
input.map { x => x * 2 } }
testForException("no error on adding output operation after stop", "stop") {
transformed.foreachRDD { rdd => rdd.collect() } }
}

def addInputStream(s: StreamingContext): DStream[Int] = {
val input = (1 to 100).map(i => 1 to i)
val inputStream = new TestInputStream(s, input, 1)
Expand Down