Skip to content

Commit d8cb577

Browse files
author
Neelesh Srinivas Salian
committed
Added registerSource to start() and removeSource to stop(). Wrote a test to check the registration and de-registration
1 parent c472eb1 commit d8cb577

File tree

2 files changed

+48
-6
lines changed

2 files changed

+48
-6
lines changed

streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -192,11 +192,8 @@ class StreamingContext private[streaming] (
192192
None
193193
}
194194

195-
/** Register streaming source to metrics system */
195+
/* Initializing a streamingSource to register metrics */
196196
private val streamingSource = new StreamingSource(this)
197-
assert(env != null)
198-
assert(env.metricsSystem != null)
199-
env.metricsSystem.registerSource(streamingSource)
200197

201198
private var state: StreamingContextState = INITIALIZED
202199

@@ -606,6 +603,9 @@ class StreamingContext private[streaming] (
606603
}
607604
shutdownHookRef = Utils.addShutdownHook(
608605
StreamingContext.SHUTDOWN_HOOK_PRIORITY)(stopOnShutdown)
606+
// Registering Streaming Metrics at the start of the StreamingContext
607+
assert(env.metricsSystem != null)
608+
env.metricsSystem.registerSource(streamingSource)
609609
uiTab.foreach(_.attach())
610610
logInfo("StreamingContext started")
611611
case ACTIVE =>
@@ -682,6 +682,8 @@ class StreamingContext private[streaming] (
682682
logWarning("StreamingContext has already been stopped")
683683
case ACTIVE =>
684684
scheduler.stop(stopGracefully)
685+
// Removing the streamingSource to de-register the metrics on stop()
686+
env.metricsSystem.removeSource(streamingSource)
685687
uiTab.foreach(_.detach())
686688
StreamingContext.setActiveContext(null)
687689
waiter.notifyStop()

streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala

Lines changed: 42 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,12 @@ import org.apache.spark.storage.StorageLevel
3333
import org.apache.spark.streaming.dstream.DStream
3434
import org.apache.spark.streaming.receiver.Receiver
3535
import org.apache.spark.util.Utils
36-
import org.apache.spark.{Logging, SparkConf, SparkContext, SparkException, SparkFunSuite}
37-
36+
import org.apache.spark.{Logging, SparkConf, SparkContext, SparkFunSuite}
37+
import org.apache.spark.metrics.MetricsSystem
38+
import org.apache.spark.metrics.source.Source
39+
import org.scalatest.{PrivateMethodTester, Assertions, BeforeAndAfter}
40+
import org.apache.spark.{Logging, SparkConf, SparkContext, SparkFunSuite}
41+
import scala.collection.mutable.ArrayBuffer
3842

3943
class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with Timeouts with Logging {
4044

@@ -299,6 +303,26 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with Timeo
299303
Thread.sleep(100)
300304
}
301305

306+
test ("registering and de-registering of streamingSource") {
307+
val conf = new SparkConf().setMaster(master).setAppName(appName)
308+
ssc = new StreamingContext(conf, batchDuration)
309+
assert(ssc.getState() === StreamingContextState.INITIALIZED)
310+
addInputStream(ssc).register()
311+
ssc.start()
312+
313+
val sources = StreamingContextSuite.getSources(ssc.env.metricsSystem)
314+
val streamingSource = StreamingContextSuite.getStreamingSource(ssc)
315+
assert(sources.contains(streamingSource))
316+
assert(ssc.getState() === StreamingContextState.ACTIVE)
317+
Thread.sleep(100)
318+
319+
ssc.stop()
320+
val sourcesAfterStop = StreamingContextSuite.getSources(ssc.env.metricsSystem)
321+
val streamingSourceAfterStop = StreamingContextSuite.getStreamingSource(ssc)
322+
assert(ssc.getState() === StreamingContextState.STOPPED)
323+
assert(!sourcesAfterStop.contains(streamingSourceAfterStop))
324+
}
325+
302326
test("awaitTermination") {
303327
ssc = new StreamingContext(master, appName, batchDuration)
304328
val inputStream = addInputStream(ssc)
@@ -811,3 +835,19 @@ package object testPackage extends Assertions {
811835
}
812836
}
813837
}
838+
839+
/**
840+
* Helper methods for testing StreamingContextSuite
841+
* This includes methods to access private methods and fields in StreamingContext and MetricsSystem
842+
*/
843+
844+
private object StreamingContextSuite extends PrivateMethodTester {
845+
private val _sources = PrivateMethod[ArrayBuffer[Source]]('sources)
846+
private def getSources(metricsSystem: MetricsSystem): ArrayBuffer[Source] = {
847+
metricsSystem.invokePrivate(_sources())
848+
}
849+
private val _streamingSource = PrivateMethod[StreamingSource]('streamingSource)
850+
private def getStreamingSource(streamingContext: StreamingContext): StreamingSource = {
851+
streamingContext.invokePrivate(_streamingSource())
852+
}
853+
}

0 commit comments

Comments
 (0)