Skip to content
4 changes: 4 additions & 0 deletions project/MimaExcludes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,10 @@ object MimaExcludes {
"org.apache.spark.sql.parquet.ParquetTestData$"),
ProblemFilters.exclude[MissingClassProblem](
"org.apache.spark.sql.parquet.TestGroupWriteSupport")
) ++ Seq(
// SPARK-7530 Added StreamingContext.getState()
ProblemFilters.exclude[MissingMethodProblem](
"org.apache.spark.streaming.StreamingContext.state_=")
)

case v if v.startsWith("1.3") =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,11 @@ import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}

import org.apache.spark._
import org.apache.spark.annotation.Experimental
import org.apache.spark.annotation.{DeveloperApi, Experimental}
import org.apache.spark.input.FixedLengthBinaryInputFormat
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContextState._
import org.apache.spark.streaming.dstream._
import org.apache.spark.streaming.receiver.{ActorReceiver, ActorSupervisorStrategy, Receiver}
import org.apache.spark.streaming.scheduler.{JobScheduler, StreamingListener}
Expand Down Expand Up @@ -195,14 +196,7 @@ class StreamingContext private[streaming] (
assert(env.metricsSystem != null)
env.metricsSystem.registerSource(streamingSource)

/** Enumeration to identify current state of the StreamingContext */
private[streaming] object StreamingContextState extends Enumeration {
type CheckpointState = Value
val Initialized, Started, Stopped = Value
}

import StreamingContextState._
private[streaming] var state = Initialized
private var state: StreamingContextState = INITIALIZED

private val startSite = new AtomicReference[CallSite](null)

Expand Down Expand Up @@ -516,18 +510,35 @@ class StreamingContext private[streaming] (
)
}

/**
* :: DeveloperApi ::
*
* Return the current state of the context. The context can be in three possible states -
* - StreamingContextState.INTIALIZED - The context has been created, but not been started yet.
* Input DStreams, transformations and output operations can be created on the context.
* - StreamingContextState.ACTIVE - The context has been started, and been not stopped.
* Input DStreams, transformations and output operations cannot be created on the context.
* - StreamingContextState.STOPPED - The context has been stopped and cannot be used any more.
*/
@DeveloperApi
def getState(): StreamingContextState = synchronized {
state
}

/**
* Start the execution of the streams.
*
* @throws SparkException if the context has already been started or stopped.
*/
def start(): Unit = synchronized {
import StreamingContext._
if (state == Started) {
throw new SparkException("StreamingContext has already been started")
}
if (state == Stopped) {
throw new SparkException("StreamingContext has already been stopped")
state match {
case INITIALIZED =>
// good to start
case ACTIVE =>
throw new SparkException("StreamingContext has already been started")
case STOPPED =>
throw new SparkException("StreamingContext has already been stopped")
}
validate()
startSite.set(DStream.getCreationSite())
Expand All @@ -536,7 +547,7 @@ class StreamingContext private[streaming] (
assertNoOtherContextIsActive()
scheduler.start()
uiTab.foreach(_.attach())
state = Started
state = StreamingContextState.ACTIVE
setActiveContext(this)
}
}
Expand Down Expand Up @@ -598,22 +609,26 @@ class StreamingContext private[streaming] (
* received data to be completed
*/
def stop(stopSparkContext: Boolean, stopGracefully: Boolean): Unit = synchronized {
state match {
case Initialized => logWarning("StreamingContext has not been started yet")
case Stopped => logWarning("StreamingContext has already been stopped")
case Started =>
scheduler.stop(stopGracefully)
logInfo("StreamingContext stopped successfully")
waiter.notifyStop()
try {
state match {
case INITIALIZED =>
logWarning("StreamingContext has not been started yet")
case STOPPED =>
logWarning("StreamingContext has already been stopped")
case ACTIVE =>
scheduler.stop(stopGracefully)
uiTab.foreach(_.detach())
StreamingContext.setActiveContext(null)
waiter.notifyStop()
logInfo("StreamingContext stopped successfully")
}
// Even if we have already stopped, we still need to attempt to stop the SparkContext because
// a user might stop(stopSparkContext = false) and then call stop(stopSparkContext = true).
if (stopSparkContext) sc.stop()
} finally {
// The state should always be Stopped after calling `stop()`, even if we haven't started yet
state = STOPPED
}
// Even if the streaming context has not been started, we still need to stop the SparkContext.
// Even if we have already stopped, we still need to attempt to stop the SparkContext because
// a user might stop(stopSparkContext = false) and then call stop(stopSparkContext = true).
if (stopSparkContext) sc.stop()
uiTab.foreach(_.detach())
// The state should always be Stopped after calling `stop()`, even if we haven't started yet:
state = Stopped
StreamingContext.setActiveContext(null)
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.streaming;

import org.apache.spark.annotation.DeveloperApi;

/**
* :: DeveloperApi ::
*
* Represents the state of a StreamingContext.
*/
@DeveloperApi
public enum StreamingContextState {
/**
* The context has been created, but not been started yet.
* Input DStreams, transformations and output operations can be created on the context.
*/
INITIALIZED,

/**
* The context has been started, and been not stopped.
* Input DStreams, transformations and output operations cannot be created on the context.
*/
ACTIVE,

/**
* The context has been stopped and cannot be used any more.
*/
STOPPED
}
Original file line number Diff line number Diff line change
Expand Up @@ -578,6 +578,28 @@ class JavaStreamingContext(val ssc: StreamingContext) extends Closeable {
ssc.addStreamingListener(streamingListener)
}

/**
* :: DeveloperApi ::
*
* Return the current state of the context. The context can be in three possible states -
* <ul>
* <li>
* StreamingContextState.INTIALIZED - The context has been created, but not been started yet.
* Input DStreams, transformations and output operations can be created on the context.
* </li>
* <li>
* StreamingContextState.ACTIVE - The context has been started, and been not stopped.
* Input DStreams, transformations and output operations cannot be created on the context.
* </li>
* <li>
* StreamingContextState.STOPPED - The context has been stopped and cannot be used any more.
* </li>
* </ul>
*/
def getState(): StreamingContextState = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure Java can handle this. Also, please add a test to JavaStreamingContextSuite

ssc.getState()
}

/**
* Start the execution of the streams.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,20 @@ public void testInitialization() {
Assert.assertNotNull(ssc.sparkContext());
}

@SuppressWarnings("unchecked")
@Test
public void testContextState() {
List<List<Integer>> inputData = Arrays.asList(Arrays.asList(1, 2, 3, 4));
Assert.assertTrue(ssc.getState() == StreamingContextState.INITIALIZED);
JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
JavaTestUtils.attachTestOutputStream(stream);
Assert.assertTrue(ssc.getState() == StreamingContextState.INITIALIZED);
ssc.start();
Assert.assertTrue(ssc.getState() == StreamingContextState.ACTIVE);
ssc.stop();
Assert.assertTrue(ssc.getState() == StreamingContextState.STOPPED);
}

@SuppressWarnings("unchecked")
@Test
public void testCount() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ trait JavaTestBase extends TestSuiteBase {
ssc: JavaStreamingContext, numBatches: Int, numExpectedOutput: Int): JList[JList[V]] = {
implicit val cm: ClassTag[V] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V]]
ssc.getState()
val res = runStreams[V](ssc.ssc, numBatches, numExpectedOutput)
val out = new ArrayList[JList[V]]()
res.map(entry => out.append(new ArrayList[V](entry)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,15 +109,21 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
assert(ssc.conf.getTimeAsSeconds("spark.cleaner.ttl", "-1") === 10)
}

test("state matching") {
import StreamingContextState._
assert(INITIALIZED === INITIALIZED)
assert(INITIALIZED != ACTIVE)
}

test("start and stop state check") {
ssc = new StreamingContext(master, appName, batchDuration)
addInputStream(ssc).register()

assert(ssc.state === ssc.StreamingContextState.Initialized)
assert(ssc.getState() === StreamingContextState.INITIALIZED)
ssc.start()
assert(ssc.state === ssc.StreamingContextState.Started)
assert(ssc.getState() === StreamingContextState.ACTIVE)
ssc.stop()
assert(ssc.state === ssc.StreamingContextState.Stopped)
assert(ssc.getState() === StreamingContextState.STOPPED)

// Make sure that the SparkContext is also stopped by default
intercept[Exception] {
Expand All @@ -129,23 +135,28 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
ssc = new StreamingContext(master, appName, batchDuration)
addInputStream(ssc).register()
ssc.start()
assert(ssc.getState() === StreamingContextState.ACTIVE)
intercept[SparkException] {
ssc.start()
}
assert(ssc.getState() === StreamingContextState.ACTIVE)
}

test("stop multiple times") {
ssc = new StreamingContext(master, appName, batchDuration)
addInputStream(ssc).register()
ssc.start()
ssc.stop()
assert(ssc.getState() === StreamingContextState.STOPPED)
ssc.stop()
assert(ssc.getState() === StreamingContextState.STOPPED)
}

test("stop before start") {
ssc = new StreamingContext(master, appName, batchDuration)
addInputStream(ssc).register()
ssc.stop() // stop before start should not throw exception
assert(ssc.getState() === StreamingContextState.STOPPED)
}

test("start after stop") {
Expand All @@ -156,6 +167,7 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
intercept[SparkException] {
ssc.start() // start after stop should throw exception
}
assert(ssc.getState() === StreamingContextState.STOPPED)
}

test("stop only streaming context") {
Expand All @@ -167,6 +179,7 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
addInputStream(ssc).register()
ssc.start()
ssc.stop(stopSparkContext = false)
assert(ssc.getState() === StreamingContextState.STOPPED)
assert(sc.makeRDD(1 to 100).collect().size === 100)
sc.stop()

Expand Down