Skip to content

Commit f9c7580

Browse files
committed
[SPARK-7530] [STREAMING] Added StreamingContext.getState() to expose the current state of the context
Author: Tathagata Das <[email protected]> Closes apache#6058 from tdas/SPARK-7530 and squashes the following commits: 80ee0e6 [Tathagata Das] STARTED --> ACTIVE 3da6547 [Tathagata Das] Added synchronized dd88444 [Tathagata Das] Added more docs e1a8505 [Tathagata Das] Fixed comment length 89f9980 [Tathagata Das] Change to Java enum and added Java test 7c57351 [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into SPARK-7530 dd4e702 [Tathagata Das] Addressed comments. 3d56106 [Tathagata Das] Added Mima excludes 2b86ba1 [Tathagata Das] Added scala docs. 1722433 [Tathagata Das] Fixed style 976b094 [Tathagata Das] Added license 0585130 [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into SPARK-7530 e0f0a05 [Tathagata Das] Added getState and exposed StreamingContextState
1 parent 35fb42a commit f9c7580

File tree

7 files changed

+147
-33
lines changed

7 files changed

+147
-33
lines changed

project/MimaExcludes.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,10 @@ object MimaExcludes {
106106
"org.apache.spark.sql.parquet.ParquetTestData$"),
107107
ProblemFilters.exclude[MissingClassProblem](
108108
"org.apache.spark.sql.parquet.TestGroupWriteSupport")
109+
) ++ Seq(
110+
// SPARK-7530 Added StreamingContext.getState()
111+
ProblemFilters.exclude[MissingMethodProblem](
112+
"org.apache.spark.streaming.StreamingContext.state_=")
109113
)
110114

111115
case v if v.startsWith("1.3") =>

streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala

Lines changed: 45 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -32,10 +32,11 @@ import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
3232
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
3333

3434
import org.apache.spark._
35-
import org.apache.spark.annotation.Experimental
35+
import org.apache.spark.annotation.{DeveloperApi, Experimental}
3636
import org.apache.spark.input.FixedLengthBinaryInputFormat
3737
import org.apache.spark.rdd.RDD
3838
import org.apache.spark.storage.StorageLevel
39+
import org.apache.spark.streaming.StreamingContextState._
3940
import org.apache.spark.streaming.dstream._
4041
import org.apache.spark.streaming.receiver.{ActorReceiver, ActorSupervisorStrategy, Receiver}
4142
import org.apache.spark.streaming.scheduler.{JobScheduler, StreamingListener}
@@ -195,14 +196,7 @@ class StreamingContext private[streaming] (
195196
assert(env.metricsSystem != null)
196197
env.metricsSystem.registerSource(streamingSource)
197198

198-
/** Enumeration to identify current state of the StreamingContext */
199-
private[streaming] object StreamingContextState extends Enumeration {
200-
type CheckpointState = Value
201-
val Initialized, Started, Stopped = Value
202-
}
203-
204-
import StreamingContextState._
205-
private[streaming] var state = Initialized
199+
private var state: StreamingContextState = INITIALIZED
206200

207201
private val startSite = new AtomicReference[CallSite](null)
208202

@@ -516,18 +510,35 @@ class StreamingContext private[streaming] (
516510
)
517511
}
518512

513+
/**
514+
* :: DeveloperApi ::
515+
*
516+
* Return the current state of the context. The context can be in three possible states -
517+
* - StreamingContextState.INTIALIZED - The context has been created, but not been started yet.
518+
* Input DStreams, transformations and output operations can be created on the context.
519+
* - StreamingContextState.ACTIVE - The context has been started, and been not stopped.
520+
* Input DStreams, transformations and output operations cannot be created on the context.
521+
* - StreamingContextState.STOPPED - The context has been stopped and cannot be used any more.
522+
*/
523+
@DeveloperApi
524+
def getState(): StreamingContextState = synchronized {
525+
state
526+
}
527+
519528
/**
520529
* Start the execution of the streams.
521530
*
522531
* @throws SparkException if the context has already been started or stopped.
523532
*/
524533
def start(): Unit = synchronized {
525534
import StreamingContext._
526-
if (state == Started) {
527-
throw new SparkException("StreamingContext has already been started")
528-
}
529-
if (state == Stopped) {
530-
throw new SparkException("StreamingContext has already been stopped")
535+
state match {
536+
case INITIALIZED =>
537+
// good to start
538+
case ACTIVE =>
539+
throw new SparkException("StreamingContext has already been started")
540+
case STOPPED =>
541+
throw new SparkException("StreamingContext has already been stopped")
531542
}
532543
validate()
533544
startSite.set(DStream.getCreationSite())
@@ -536,7 +547,7 @@ class StreamingContext private[streaming] (
536547
assertNoOtherContextIsActive()
537548
scheduler.start()
538549
uiTab.foreach(_.attach())
539-
state = Started
550+
state = StreamingContextState.ACTIVE
540551
setActiveContext(this)
541552
}
542553
}
@@ -598,22 +609,26 @@ class StreamingContext private[streaming] (
598609
* received data to be completed
599610
*/
600611
def stop(stopSparkContext: Boolean, stopGracefully: Boolean): Unit = synchronized {
601-
state match {
602-
case Initialized => logWarning("StreamingContext has not been started yet")
603-
case Stopped => logWarning("StreamingContext has already been stopped")
604-
case Started =>
605-
scheduler.stop(stopGracefully)
606-
logInfo("StreamingContext stopped successfully")
607-
waiter.notifyStop()
612+
try {
613+
state match {
614+
case INITIALIZED =>
615+
logWarning("StreamingContext has not been started yet")
616+
case STOPPED =>
617+
logWarning("StreamingContext has already been stopped")
618+
case ACTIVE =>
619+
scheduler.stop(stopGracefully)
620+
uiTab.foreach(_.detach())
621+
StreamingContext.setActiveContext(null)
622+
waiter.notifyStop()
623+
logInfo("StreamingContext stopped successfully")
624+
}
625+
// Even if we have already stopped, we still need to attempt to stop the SparkContext because
626+
// a user might stop(stopSparkContext = false) and then call stop(stopSparkContext = true).
627+
if (stopSparkContext) sc.stop()
628+
} finally {
629+
// The state should always be Stopped after calling `stop()`, even if we haven't started yet
630+
state = STOPPED
608631
}
609-
// Even if the streaming context has not been started, we still need to stop the SparkContext.
610-
// Even if we have already stopped, we still need to attempt to stop the SparkContext because
611-
// a user might stop(stopSparkContext = false) and then call stop(stopSparkContext = true).
612-
if (stopSparkContext) sc.stop()
613-
uiTab.foreach(_.detach())
614-
// The state should always be Stopped after calling `stop()`, even if we haven't started yet:
615-
state = Stopped
616-
StreamingContext.setActiveContext(null)
617632
}
618633
}
619634

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.streaming;
19+
20+
import org.apache.spark.annotation.DeveloperApi;
21+
22+
/**
23+
* :: DeveloperApi ::
24+
*
25+
* Represents the state of a StreamingContext.
26+
*/
27+
@DeveloperApi
28+
public enum StreamingContextState {
29+
/**
30+
* The context has been created, but not been started yet.
31+
* Input DStreams, transformations and output operations can be created on the context.
32+
*/
33+
INITIALIZED,
34+
35+
/**
36+
* The context has been started, and been not stopped.
37+
* Input DStreams, transformations and output operations cannot be created on the context.
38+
*/
39+
ACTIVE,
40+
41+
/**
42+
* The context has been stopped and cannot be used any more.
43+
*/
44+
STOPPED
45+
}

streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -578,6 +578,28 @@ class JavaStreamingContext(val ssc: StreamingContext) extends Closeable {
578578
ssc.addStreamingListener(streamingListener)
579579
}
580580

581+
/**
582+
* :: DeveloperApi ::
583+
*
584+
* Return the current state of the context. The context can be in three possible states -
585+
* <ul>
586+
* <li>
587+
* StreamingContextState.INTIALIZED - The context has been created, but not been started yet.
588+
* Input DStreams, transformations and output operations can be created on the context.
589+
* </li>
590+
* <li>
591+
* StreamingContextState.ACTIVE - The context has been started, and been not stopped.
592+
* Input DStreams, transformations and output operations cannot be created on the context.
593+
* </li>
594+
* <li>
595+
* StreamingContextState.STOPPED - The context has been stopped and cannot be used any more.
596+
* </li>
597+
* </ul>
598+
*/
599+
def getState(): StreamingContextState = {
600+
ssc.getState()
601+
}
602+
581603
/**
582604
* Start the execution of the streams.
583605
*/

streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,20 @@ public void testInitialization() {
7070
Assert.assertNotNull(ssc.sparkContext());
7171
}
7272

73+
@SuppressWarnings("unchecked")
74+
@Test
75+
public void testContextState() {
76+
List<List<Integer>> inputData = Arrays.asList(Arrays.asList(1, 2, 3, 4));
77+
Assert.assertTrue(ssc.getState() == StreamingContextState.INITIALIZED);
78+
JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
79+
JavaTestUtils.attachTestOutputStream(stream);
80+
Assert.assertTrue(ssc.getState() == StreamingContextState.INITIALIZED);
81+
ssc.start();
82+
Assert.assertTrue(ssc.getState() == StreamingContextState.ACTIVE);
83+
ssc.stop();
84+
Assert.assertTrue(ssc.getState() == StreamingContextState.STOPPED);
85+
}
86+
7387
@SuppressWarnings("unchecked")
7488
@Test
7589
public void testCount() {

streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ trait JavaTestBase extends TestSuiteBase {
7070
ssc: JavaStreamingContext, numBatches: Int, numExpectedOutput: Int): JList[JList[V]] = {
7171
implicit val cm: ClassTag[V] =
7272
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V]]
73+
ssc.getState()
7374
val res = runStreams[V](ssc.ssc, numBatches, numExpectedOutput)
7475
val out = new ArrayList[JList[V]]()
7576
res.map(entry => out.append(new ArrayList[V](entry)))

streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -109,15 +109,21 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
109109
assert(ssc.conf.getTimeAsSeconds("spark.cleaner.ttl", "-1") === 10)
110110
}
111111

112+
test("state matching") {
113+
import StreamingContextState._
114+
assert(INITIALIZED === INITIALIZED)
115+
assert(INITIALIZED != ACTIVE)
116+
}
117+
112118
test("start and stop state check") {
113119
ssc = new StreamingContext(master, appName, batchDuration)
114120
addInputStream(ssc).register()
115121

116-
assert(ssc.state === ssc.StreamingContextState.Initialized)
122+
assert(ssc.getState() === StreamingContextState.INITIALIZED)
117123
ssc.start()
118-
assert(ssc.state === ssc.StreamingContextState.Started)
124+
assert(ssc.getState() === StreamingContextState.ACTIVE)
119125
ssc.stop()
120-
assert(ssc.state === ssc.StreamingContextState.Stopped)
126+
assert(ssc.getState() === StreamingContextState.STOPPED)
121127

122128
// Make sure that the SparkContext is also stopped by default
123129
intercept[Exception] {
@@ -129,23 +135,28 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
129135
ssc = new StreamingContext(master, appName, batchDuration)
130136
addInputStream(ssc).register()
131137
ssc.start()
138+
assert(ssc.getState() === StreamingContextState.ACTIVE)
132139
intercept[SparkException] {
133140
ssc.start()
134141
}
142+
assert(ssc.getState() === StreamingContextState.ACTIVE)
135143
}
136144

137145
test("stop multiple times") {
138146
ssc = new StreamingContext(master, appName, batchDuration)
139147
addInputStream(ssc).register()
140148
ssc.start()
141149
ssc.stop()
150+
assert(ssc.getState() === StreamingContextState.STOPPED)
142151
ssc.stop()
152+
assert(ssc.getState() === StreamingContextState.STOPPED)
143153
}
144154

145155
test("stop before start") {
146156
ssc = new StreamingContext(master, appName, batchDuration)
147157
addInputStream(ssc).register()
148158
ssc.stop() // stop before start should not throw exception
159+
assert(ssc.getState() === StreamingContextState.STOPPED)
149160
}
150161

151162
test("start after stop") {
@@ -156,6 +167,7 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
156167
intercept[SparkException] {
157168
ssc.start() // start after stop should throw exception
158169
}
170+
assert(ssc.getState() === StreamingContextState.STOPPED)
159171
}
160172

161173
test("stop only streaming context") {
@@ -167,6 +179,7 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
167179
addInputStream(ssc).register()
168180
ssc.start()
169181
ssc.stop(stopSparkContext = false)
182+
assert(ssc.getState() === StreamingContextState.STOPPED)
170183
assert(sc.makeRDD(1 to 100).collect().size === 100)
171184
sc.stop()
172185

0 commit comments

Comments
 (0)