Skip to content

Commit 89f9980

Browse files
committed
Change to Java enum and added Java test
1 parent 7c57351 commit 89f9980

File tree

6 files changed

+71
-77
lines changed

6 files changed

+71
-77
lines changed

streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala

Lines changed: 24 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
3232
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
3333

3434
import org.apache.spark._
35-
import org.apache.spark.annotation.Experimental
35+
import org.apache.spark.annotation.{DeveloperApi, Experimental}
3636
import org.apache.spark.input.FixedLengthBinaryInputFormat
3737
import org.apache.spark.rdd.RDD
3838
import org.apache.spark.storage.StorageLevel
@@ -511,8 +511,11 @@ class StreamingContext private[streaming] (
511511
}
512512

513513
/**
514+
* :: DeveloperApi ::
515+
*
514516
* Return the current state of the context.
515517
*/
518+
@DeveloperApi
516519
def getState(): StreamingContextState = {
517520
state
518521
}
@@ -601,25 +604,27 @@ class StreamingContext private[streaming] (
601604
* received data to be completed
602605
*/
603606
def stop(stopSparkContext: Boolean, stopGracefully: Boolean): Unit = synchronized {
604-
state = STOPPED
605-
StreamingContext.setActiveContext(null)
606-
607-
state match {
608-
case INITIALIZED =>
609-
logWarning("StreamingContext has not been started yet")
610-
case STOPPED =>
611-
logWarning("StreamingContext has already been stopped")
612-
case STARTED =>
613-
scheduler.stop(stopGracefully)
614-
logInfo("StreamingContext stopped successfully")
615-
waiter.notifyStop()
607+
try {
608+
state match {
609+
case INITIALIZED =>
610+
logWarning("StreamingContext has not been started yet")
611+
case STOPPED =>
612+
logWarning("StreamingContext has already been stopped")
613+
case STARTED =>
614+
scheduler.stop(stopGracefully)
615+
uiTab.foreach(_.detach())
616+
// Even if the streaming context has not been started, we still need to stop the SparkContext.
617+
// Even if we have already stopped, we still need to attempt to stop the SparkContext because
618+
// a user might stop(stopSparkContext = false) and then call stop(stopSparkContext = true).
619+
StreamingContext.setActiveContext(null)
620+
logInfo("StreamingContext stopped successfully")
621+
waiter.notifyStop()
622+
}
623+
if (stopSparkContext) sc.stop()
624+
} finally {
625+
// The state should always be Stopped after calling `stop()`, even if we haven't started yet
626+
state = STOPPED
616627
}
617-
// Even if the streaming context has not been started, we still need to stop the SparkContext.
618-
// Even if we have already stopped, we still need to attempt to stop the SparkContext because
619-
// a user might stop(stopSparkContext = false) and then call stop(stopSparkContext = true).
620-
if (stopSparkContext) sc.stop()
621-
uiTab.foreach(_.detach())
622-
// The state should always be Stopped after calling `stop()`, even if we haven't started yet:
623628
}
624629
}
625630

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.streaming;
19+
20+
import org.apache.spark.annotation.DeveloperApi;
21+
22+
/**
23+
* :: DeveloperApi ::
24+
*
25+
* Represents the state of the StreamingContext.
26+
*/
27+
@DeveloperApi
28+
public enum StreamingContextState {
29+
INITIALIZED, STARTED, STOPPED
30+
}

streaming/src/main/scala/org/apache/spark/streaming/StreamingContextState.scala

Lines changed: 0 additions & 57 deletions
This file was deleted.

streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -578,8 +578,9 @@ class JavaStreamingContext(val ssc: StreamingContext) extends Closeable {
578578
ssc.addStreamingListener(streamingListener)
579579
}
580580

581-
582581
/**
582+
* :: DeveloperApi ::
583+
*
583584
* Return the current state of the context.
584585
*/
585586
def getState(): StreamingContextState = {

streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,20 @@ public void testInitialization() {
7070
Assert.assertNotNull(ssc.sparkContext());
7171
}
7272

73+
@SuppressWarnings("unchecked")
74+
@Test
75+
public void testContextState() {
76+
List<List<Integer>> inputData = Arrays.asList(Arrays.asList(1, 2, 3, 4));
77+
Assert.assertTrue(ssc.getState() == StreamingContextState.INITIALIZED);
78+
JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
79+
JavaTestUtils.attachTestOutputStream(stream);
80+
Assert.assertTrue(ssc.getState() == StreamingContextState.INITIALIZED);
81+
ssc.start();
82+
Assert.assertTrue(ssc.getState() == StreamingContextState.STARTED);
83+
ssc.stop();
84+
Assert.assertTrue(ssc.getState() == StreamingContextState.STOPPED);
85+
}
86+
7387
@SuppressWarnings("unchecked")
7488
@Test
7589
public void testCount() {

streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ trait JavaTestBase extends TestSuiteBase {
7070
ssc: JavaStreamingContext, numBatches: Int, numExpectedOutput: Int): JList[JList[V]] = {
7171
implicit val cm: ClassTag[V] =
7272
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V]]
73+
ssc.getState()
7374
val res = runStreams[V](ssc.ssc, numBatches, numExpectedOutput)
7475
val out = new ArrayList[JList[V]]()
7576
res.map(entry => out.append(new ArrayList[V](entry)))

0 commit comments

Comments
 (0)