Skip to content

Commit e0f0a05

Browse files
committed
Added getState and exposed StreamingContextState
1 parent 9f1f9b1 commit e0f0a05

File tree

4 files changed

+79
-21
lines changed

4 files changed

+79
-21
lines changed

streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala

Lines changed: 23 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ import org.apache.spark.annotation.Experimental
3535
import org.apache.spark.input.FixedLengthBinaryInputFormat
3636
import org.apache.spark.rdd.RDD
3737
import org.apache.spark.storage.StorageLevel
38+
import org.apache.spark.streaming.StreamingContextState._
3839
import org.apache.spark.streaming.dstream._
3940
import org.apache.spark.streaming.receiver.{ActorReceiver, ActorSupervisorStrategy, Receiver}
4041
import org.apache.spark.streaming.scheduler._
@@ -193,14 +194,7 @@ class StreamingContext private[streaming] (
193194
assert(env.metricsSystem != null)
194195
env.metricsSystem.registerSource(streamingSource)
195196

196-
/** Enumeration to identify current state of the StreamingContext */
197-
private[streaming] object StreamingContextState extends Enumeration {
198-
type CheckpointState = Value
199-
val Initialized, Started, Stopped = Value
200-
}
201-
202-
import StreamingContextState._
203-
private[streaming] var state = Initialized
197+
private var state: StreamingContextState = INITIALIZED
204198

205199
/**
206200
* Return the associated Spark context
@@ -512,23 +506,32 @@ class StreamingContext private[streaming] (
512506
)
513507
}
514508

509+
/**
510+
* Return the current state of the context.
511+
*/
512+
def getState(): StreamingContextState = {
513+
state
514+
}
515+
515516
/**
516517
* Start the execution of the streams.
517518
*
518519
* @throws SparkException if the context has already been started or stopped.
519520
*/
520521
def start(): Unit = synchronized {
521-
if (state == Started) {
522-
throw new SparkException("StreamingContext has already been started")
523-
}
524-
if (state == Stopped) {
525-
throw new SparkException("StreamingContext has already been stopped")
522+
state match {
523+
case INITIALIZED =>
524+
// good to start
525+
case STARTED =>
526+
throw new SparkException("StreamingContext has already been started")
527+
case STOPPED =>
528+
throw new SparkException("StreamingContext has already been stopped")
526529
}
527530
validate()
528531
sparkContext.setCallSite(DStream.getCreationSite())
529532
scheduler.start()
530533
uiTab.foreach(_.attach())
531-
state = Started
534+
state = StreamingContextState.STARTED
532535
}
533536

534537
/**
@@ -585,9 +588,11 @@ class StreamingContext private[streaming] (
585588
*/
586589
def stop(stopSparkContext: Boolean, stopGracefully: Boolean): Unit = synchronized {
587590
state match {
588-
case Initialized => logWarning("StreamingContext has not been started yet")
589-
case Stopped => logWarning("StreamingContext has already been stopped")
590-
case Started =>
591+
case INITIALIZED =>
592+
logWarning("StreamingContext has not been started yet")
593+
case STOPPED =>
594+
logWarning("StreamingContext has already been stopped")
595+
case STARTED =>
591596
scheduler.stop(stopGracefully)
592597
logInfo("StreamingContext stopped successfully")
593598
waiter.notifyStop()
@@ -598,7 +603,7 @@ class StreamingContext private[streaming] (
598603
if (stopSparkContext) sc.stop()
599604
uiTab.foreach(_.detach())
600605
// The state should always be Stopped after calling `stop()`, even if we haven't started yet:
601-
state = Stopped
606+
state = STOPPED
602607
}
603608
}
604609

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
package org.apache.spark.streaming
2+
3+
import org.apache.spark.annotation.DeveloperApi
4+
5+
/**
6+
* :: DeveloperApi ::
7+
*
8+
* Represents the state of the StreamingContext.
9+
*/
10+
@DeveloperApi
11+
class StreamingContextState private (enumValue: Int) {
12+
13+
override def hashCode: Int = enumValue
14+
15+
override def equals(other: Any): Boolean = {
16+
other match {
17+
case otherState: StreamingContextState =>
18+
otherState.hashCode == this.hashCode
19+
case _ =>
20+
false
21+
}
22+
}
23+
}
24+
25+
/**
26+
* :: DeveloperApi ::
27+
*
28+
* Object enumerating all the states that a StreamingContext can be.
29+
*/
30+
@DeveloperApi
31+
object StreamingContextState {
32+
val INITIALIZED = new StreamingContextState(0)
33+
val STARTED = new StreamingContextState(1)
34+
val STOPPED = new StreamingContextState(2)
35+
}

streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -578,6 +578,11 @@ class JavaStreamingContext(val ssc: StreamingContext) extends Closeable {
578578
ssc.addStreamingListener(streamingListener)
579579
}
580580

581+
582+
def getState(): StreamingContextState = {
583+
ssc.getState()
584+
}
585+
581586
/**
582587
* Start the execution of the streams.
583588
*/

streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -109,38 +109,49 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
109109
assert(ssc.conf.getTimeAsSeconds("spark.cleaner.ttl", "-1") === 10)
110110
}
111111

112+
test("state matching") {
113+
import StreamingContextState._
114+
assert(INITIALIZED === INITIALIZED)
115+
assert(INITIALIZED != STARTED)
116+
}
117+
112118
test("start and stop state check") {
113119
ssc = new StreamingContext(master, appName, batchDuration)
114120
addInputStream(ssc).register()
115121

116-
assert(ssc.state === ssc.StreamingContextState.Initialized)
122+
assert(ssc.getState() === StreamingContextState.INITIALIZED)
117123
ssc.start()
118-
assert(ssc.state === ssc.StreamingContextState.Started)
124+
assert(ssc.getState() === StreamingContextState.STARTED)
119125
ssc.stop()
120-
assert(ssc.state === ssc.StreamingContextState.Stopped)
126+
assert(ssc.getState() === StreamingContextState.STOPPED)
121127
}
122128

123129
test("start multiple times") {
124130
ssc = new StreamingContext(master, appName, batchDuration)
125131
addInputStream(ssc).register()
126132
ssc.start()
133+
assert(ssc.getState() === StreamingContextState.STARTED)
127134
intercept[SparkException] {
128135
ssc.start()
129136
}
137+
assert(ssc.getState() === StreamingContextState.STARTED)
130138
}
131139

132140
test("stop multiple times") {
133141
ssc = new StreamingContext(master, appName, batchDuration)
134142
addInputStream(ssc).register()
135143
ssc.start()
136144
ssc.stop()
145+
assert(ssc.getState() === StreamingContextState.STOPPED)
137146
ssc.stop()
147+
assert(ssc.getState() === StreamingContextState.STOPPED)
138148
}
139149

140150
test("stop before start") {
141151
ssc = new StreamingContext(master, appName, batchDuration)
142152
addInputStream(ssc).register()
143153
ssc.stop() // stop before start should not throw exception
154+
assert(ssc.getState() === StreamingContextState.STOPPED)
144155
}
145156

146157
test("start after stop") {
@@ -151,6 +162,7 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
151162
intercept[SparkException] {
152163
ssc.start() // start after stop should throw exception
153164
}
165+
assert(ssc.getState() === StreamingContextState.STOPPED)
154166
}
155167

156168
test("stop only streaming context") {
@@ -159,6 +171,7 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
159171
addInputStream(ssc).register()
160172
ssc.start()
161173
ssc.stop(stopSparkContext = false)
174+
assert(ssc.getState() === StreamingContextState.STOPPED)
162175
assert(sc.makeRDD(1 to 100).collect().size === 100)
163176
ssc = new StreamingContext(sc, batchDuration)
164177
addInputStream(ssc).register()

0 commit comments

Comments
 (0)