From 92fa04b16cad1e945ab8a4d1be752f08a241e922 Mon Sep 17 00:00:00 2001 From: Neelesh Srinivas Salian Date: Mon, 6 Jul 2015 20:16:51 -0700 Subject: [PATCH 01/20] SPARK-8743: Added the registerSource method call to the start method for the Streaming Context. Added the removeSource method to the stop method. Added comments for both --- .../spark/streaming/StreamingContext.scala | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index 1708f309fc00..57e93d5cd3cd 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -192,11 +192,9 @@ class StreamingContext private[streaming] ( None } - /** Register streaming source to metrics system */ + /** Initializing a streaming source to help register metrics system */ private val streamingSource = new StreamingSource(this) - assert(env != null) - assert(env.metricsSystem != null) - env.metricsSystem.registerSource(streamingSource) + private var state: StreamingContextState = INITIALIZED @@ -577,6 +575,12 @@ class StreamingContext private[streaming] ( * @throws IllegalStateException if the StreamingContext is already stopped. */ def start(): Unit = synchronized { + /** + * Registering Streaming Metrics at the start of the StreamingContext + */ + assert(env != null) + assert(env.metricsSystem != null) + env.metricsSystem.registerSource(streamingSource) state match { case INITIALIZED => startSite.set(DStream.getCreationSite()) @@ -688,6 +692,10 @@ class StreamingContext private[streaming] ( } finally { // The state should always be Stopped after calling `stop()`, even if we haven't started yet state = STOPPED + /** + * De-registering Streaming Metrics at the stop of the StreamingContext + */ + env.metricsSystem.removeSource(streamingSource) } } From a665965a40c7b49cc13d8ab38f8da194693d9845 Mon Sep 17 00:00:00 2001 From: Neelesh Srinivas Salian Date: Tue, 7 Jul 2015 00:23:20 -0700 Subject: [PATCH 02/20] Added // instead of /** for commenting in code --- .../org/apache/spark/streaming/StreamingContext.scala | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index 57e93d5cd3cd..fb0427c1fcf7 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -575,9 +575,7 @@ class StreamingContext private[streaming] ( * @throws IllegalStateException if the StreamingContext is already stopped. */ def start(): Unit = synchronized { - /** - * Registering Streaming Metrics at the start of the StreamingContext - */ + //Registering Streaming Metrics at the start of the StreamingContext assert(env != null) assert(env.metricsSystem != null) env.metricsSystem.registerSource(streamingSource) @@ -692,9 +690,7 @@ class StreamingContext private[streaming] ( } finally { // The state should always be Stopped after calling `stop()`, even if we haven't started yet state = STOPPED - /** - * De-registering Streaming Metrics at the stop of the StreamingContext - */ + // De-registering Streaming Metrics of the StreamingContext env.metricsSystem.removeSource(streamingSource) } } From 7621adf368112056eb2e137f62adc429851fa570 Mon Sep 17 00:00:00 2001 From: Neelesh Srinivas Salian Date: Tue, 7 Jul 2015 03:36:35 -0700 Subject: [PATCH 03/20] Added indentation and Space at the comment on line 578; Registering.. --- .../scala/org/apache/spark/streaming/StreamingContext.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index fb0427c1fcf7..6a8be183d3df 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -575,7 +575,7 @@ class StreamingContext private[streaming] ( * @throws IllegalStateException if the StreamingContext is already stopped. */ def start(): Unit = synchronized { - //Registering Streaming Metrics at the start of the StreamingContext + // Registering Streaming Metrics at the start of the StreamingContext assert(env != null) assert(env.metricsSystem != null) env.metricsSystem.registerSource(streamingSource) From 18bcc7e164b46ecd969a266579f4349444373a0c Mon Sep 17 00:00:00 2001 From: Neelesh Srinivas Salian Date: Tue, 7 Jul 2015 16:20:29 -0700 Subject: [PATCH 04/20] Added test case for de-register metrics and made a change to the scope of the sources ArrayBuffer --- .../apache/spark/metrics/MetricsSystem.scala | 2 +- .../spark/streaming/StreamingContextSuite.scala | 17 ++++++++++++++++- 2 files changed, 17 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala index ed5131c79fdc..1454d9494adf 100644 --- a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala +++ b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala @@ -73,7 +73,7 @@ private[spark] class MetricsSystem private ( private[this] val metricsConfig = new MetricsConfig(conf) private val sinks = new mutable.ArrayBuffer[Sink] - private val sources = new mutable.ArrayBuffer[Source] + val sources = new mutable.ArrayBuffer[Source] private val registry = new MetricRegistry() private var running: Boolean = false diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala index 819dd2ccfe91..a970247b5a85 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala @@ -33,7 +33,6 @@ import org.apache.spark.streaming.receiver.Receiver import org.apache.spark.util.Utils import org.apache.spark.{Logging, SparkConf, SparkContext, SparkException, SparkFunSuite} - class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with Timeouts with Logging { val master = "local[2]" @@ -297,6 +296,22 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with Timeo Thread.sleep(100) } + test("de-register codahale metrics on stop()") { + val conf = new SparkConf().setMaster(master).setAppName(appName) + ssc = new StreamingContext(conf, batchDuration) + addInputStream(ssc).register() + + assert(ssc.getState() === StreamingContextState.INITIALIZED) + ssc.start() + assert(ssc.getState() === StreamingContextState.ACTIVE) + val sizeOfSourcesArrayBuffer = ssc.env.metricsSystem.sources.size + Thread.sleep(100) + + ssc.stop() + assert(ssc.getState() === StreamingContextState.STOPPED) + assert(sizeOfSourcesArrayBuffer == sizeOfSourcesArrayBuffer - 1 ) + } + test("awaitTermination") { ssc = new StreamingContext(master, appName, batchDuration) val inputStream = addInputStream(ssc) From e4f00d7577dec22278f5b6243791c4c33e0eb373 Mon Sep 17 00:00:00 2001 From: Neelesh Srinivas Salian Date: Wed, 8 Jul 2015 11:56:30 -0700 Subject: [PATCH 05/20] Added additional variable to check the updated Sources size value to compare with the original size after removal --- .../org/apache/spark/streaming/StreamingContextSuite.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala index a970247b5a85..f4d528c47d7d 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala @@ -308,8 +308,9 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with Timeo Thread.sleep(100) ssc.stop() + val updatedSourcesSize = ssc.env.metricsSystem.sources.size assert(ssc.getState() === StreamingContextState.STOPPED) - assert(sizeOfSourcesArrayBuffer == sizeOfSourcesArrayBuffer - 1 ) + assert(updatedSourcesSize == sizeOfSourcesArrayBuffer - 1 ) } test("awaitTermination") { From f5e47e0725adee6792081a8a4ed765ede1759e9c Mon Sep 17 00:00:00 2001 From: Neelesh Srinivas Salian Date: Wed, 8 Jul 2015 18:02:33 -0700 Subject: [PATCH 06/20] Added the removeSource method in try --- .../scala/org/apache/spark/streaming/StreamingContext.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index 6a8be183d3df..36f0590ac46a 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -676,6 +676,8 @@ class StreamingContext private[streaming] ( logWarning("StreamingContext has already been stopped") case ACTIVE => scheduler.stop(stopGracefully) + // De-registering Streaming Metrics of the StreamingContext + env.metricsSystem.removeSource(streamingSource) uiTab.foreach(_.detach()) StreamingContext.setActiveContext(null) waiter.notifyStop() @@ -690,8 +692,7 @@ class StreamingContext private[streaming] ( } finally { // The state should always be Stopped after calling `stop()`, even if we haven't started yet state = STOPPED - // De-registering Streaming Metrics of the StreamingContext - env.metricsSystem.removeSource(streamingSource) + } } From d04fd2a2b2e4433d49c0860c4d4028564081db31 Mon Sep 17 00:00:00 2001 From: Neelesh Srinivas Salian Date: Thu, 9 Jul 2015 09:23:17 -0700 Subject: [PATCH 07/20] Removed the assert for the env field, added the registerSource line in the INITIALIZED block and kept the removeSource() in the ACTIVE block --- .../org/apache/spark/streaming/StreamingContext.scala | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index 36f0590ac46a..929f73f2fbdd 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -575,14 +575,13 @@ class StreamingContext private[streaming] ( * @throws IllegalStateException if the StreamingContext is already stopped. */ def start(): Unit = synchronized { - // Registering Streaming Metrics at the start of the StreamingContext - assert(env != null) - assert(env.metricsSystem != null) - env.metricsSystem.registerSource(streamingSource) state match { case INITIALIZED => startSite.set(DStream.getCreationSite()) sparkContext.setCallSite(startSite.get) + // Registering Streaming Metrics at the start of the StreamingContext + assert(env.metricsSystem != null) + env.metricsSystem.registerSource(streamingSource) StreamingContext.ACTIVATION_LOCK.synchronized { StreamingContext.assertNoOtherContextIsActive() try { From e2c3bf82c226e38282a9a17b80771b58dcc6cc55 Mon Sep 17 00:00:00 2001 From: Neelesh Srinivas Salian Date: Thu, 9 Jul 2015 15:02:50 -0700 Subject: [PATCH 08/20] Added test to check registering and de-registering of streamingSource --- .../streaming/StreamingContextSuite.scala | 36 +++++++++++++++---- 1 file changed, 29 insertions(+), 7 deletions(-) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala index f4d528c47d7d..0aae2e3d373c 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala @@ -21,17 +21,22 @@ import java.io.{File, NotSerializableException} import java.util.concurrent.atomic.AtomicInteger import org.apache.commons.io.FileUtils +import org.apache.spark.metrics.MetricsSystem +import org.apache.spark.metrics.source.Source import org.scalatest.concurrent.Eventually._ import org.scalatest.concurrent.Timeouts import org.scalatest.exceptions.TestFailedDueToTimeoutException import org.scalatest.time.SpanSugar._ -import org.scalatest.{Assertions, BeforeAndAfter} +import org.scalatest.{PrivateMethodTester, Assertions, BeforeAndAfter} import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.dstream.DStream import org.apache.spark.streaming.receiver.Receiver import org.apache.spark.util.Utils -import org.apache.spark.{Logging, SparkConf, SparkContext, SparkException, SparkFunSuite} +import org.apache.spark._ + +import scala.collection._ + class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with Timeouts with Logging { @@ -296,21 +301,21 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with Timeo Thread.sleep(100) } - test("de-register codahale metrics on stop()") { + test("registering and de-registering of streamingSource") { val conf = new SparkConf().setMaster(master).setAppName(appName) ssc = new StreamingContext(conf, batchDuration) addInputStream(ssc).register() - assert(ssc.getState() === StreamingContextState.INITIALIZED) ssc.start() + assert(ssc.getState() === StreamingContextState.INITIALIZED) + assert(StreamingContextSuite.sources.get(StreamingContextSuite.streamingSource)!= "null") + assert(ssc.getState() === StreamingContextState.ACTIVE) - val sizeOfSourcesArrayBuffer = ssc.env.metricsSystem.sources.size Thread.sleep(100) ssc.stop() - val updatedSourcesSize = ssc.env.metricsSystem.sources.size assert(ssc.getState() === StreamingContextState.STOPPED) - assert(updatedSourcesSize == sizeOfSourcesArrayBuffer - 1 ) + assert(StreamingContextSuite.sources.get(StreamingContextSuite.streamingSource) == "null") } test("awaitTermination") { @@ -812,3 +817,20 @@ package object testPackage extends Assertions { } } } + +/** + * Helper methods for testing StreamingContextSuite. + * This includes methods to access private methods and fields in ExecutorAllocationManager. + */ +object StreamingContextSuite { + val metricsSystemsObject = Class.forName("org.apache.spark.metrics.MetricsSystem") + val sources = metricsSystemsObject.getDeclaredField("sources") + sources.setAccessible(true) + + val streamingContextObject = classOf[StreamingContext] + val streamingSource = getClass.getDeclaredField("streamingSource") + streamingSource.setAccessible(true) +} + + + From 742398c334c71bcd1b2b702a9abc5e4ab1288d9e Mon Sep 17 00:00:00 2001 From: Neelesh Srinivas Salian Date: Thu, 9 Jul 2015 15:08:55 -0700 Subject: [PATCH 09/20] Removed unused imports --- .../org/apache/spark/streaming/StreamingContextSuite.scala | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala index 0aae2e3d373c..60dfe8f7af1e 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala @@ -21,13 +21,11 @@ import java.io.{File, NotSerializableException} import java.util.concurrent.atomic.AtomicInteger import org.apache.commons.io.FileUtils -import org.apache.spark.metrics.MetricsSystem -import org.apache.spark.metrics.source.Source import org.scalatest.concurrent.Eventually._ import org.scalatest.concurrent.Timeouts import org.scalatest.exceptions.TestFailedDueToTimeoutException import org.scalatest.time.SpanSugar._ -import org.scalatest.{PrivateMethodTester, Assertions, BeforeAndAfter} +import org.scalatest.{Assertions, BeforeAndAfter} import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.dstream.DStream From ca081fa3effd0303d04a034af5c5a0e8facd3b2d Mon Sep 17 00:00:00 2001 From: Neelesh Srinivas Salian Date: Thu, 9 Jul 2015 19:33:38 -0700 Subject: [PATCH 10/20] Moved the registerSource() call before line 601 --- .../scala/org/apache/spark/streaming/StreamingContext.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index 929f73f2fbdd..4507f97a0e1e 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -579,9 +579,6 @@ class StreamingContext private[streaming] ( case INITIALIZED => startSite.set(DStream.getCreationSite()) sparkContext.setCallSite(startSite.get) - // Registering Streaming Metrics at the start of the StreamingContext - assert(env.metricsSystem != null) - env.metricsSystem.registerSource(streamingSource) StreamingContext.ACTIVATION_LOCK.synchronized { StreamingContext.assertNoOtherContextIsActive() try { @@ -599,6 +596,9 @@ class StreamingContext private[streaming] ( } shutdownHookRef = Utils.addShutdownHook( StreamingContext.SHUTDOWN_HOOK_PRIORITY)(stopOnShutdown) + // Registering Streaming Metrics at the start of the StreamingContext + assert(env.metricsSystem != null) + env.metricsSystem.registerSource(streamingSource) uiTab.foreach(_.attach()) logInfo("StreamingContext started") case ACTIVE => From 33a2091a4984b8e29143ab2e1202751a87e838b3 Mon Sep 17 00:00:00 2001 From: Neelesh Srinivas Salian Date: Fri, 10 Jul 2015 14:31:53 -0700 Subject: [PATCH 11/20] Changed scope of sources and corrected comments for helper --- .../src/main/scala/org/apache/spark/metrics/MetricsSystem.scala | 2 +- .../org/apache/spark/streaming/StreamingContextSuite.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala index 1454d9494adf..ed5131c79fdc 100644 --- a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala +++ b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala @@ -73,7 +73,7 @@ private[spark] class MetricsSystem private ( private[this] val metricsConfig = new MetricsConfig(conf) private val sinks = new mutable.ArrayBuffer[Sink] - val sources = new mutable.ArrayBuffer[Source] + private val sources = new mutable.ArrayBuffer[Source] private val registry = new MetricRegistry() private var running: Boolean = false diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala index 60dfe8f7af1e..18e170acb7ef 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala @@ -818,7 +818,7 @@ package object testPackage extends Assertions { /** * Helper methods for testing StreamingContextSuite. - * This includes methods to access private methods and fields in ExecutorAllocationManager. + * This includes methods to access private methods and fields in StreamingContext and MetricsSystem. */ object StreamingContextSuite { val metricsSystemsObject = Class.forName("org.apache.spark.metrics.MetricsSystem") From a67918cb9732936ea84427f728086985f7319e3a Mon Sep 17 00:00:00 2001 From: Neelesh Srinivas Salian Date: Fri, 10 Jul 2015 14:41:16 -0700 Subject: [PATCH 12/20] Removed extra line in Helper Methods section --- .../scala/org/apache/spark/streaming/StreamingContextSuite.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala index 18e170acb7ef..7422797497df 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala @@ -824,7 +824,6 @@ object StreamingContextSuite { val metricsSystemsObject = Class.forName("org.apache.spark.metrics.MetricsSystem") val sources = metricsSystemsObject.getDeclaredField("sources") sources.setAccessible(true) - val streamingContextObject = classOf[StreamingContext] val streamingSource = getClass.getDeclaredField("streamingSource") streamingSource.setAccessible(true) From 74598cec17a6ec54a64f3fc0f8c336d6ba19cc1e Mon Sep 17 00:00:00 2001 From: Neelesh Srinivas Salian Date: Fri, 10 Jul 2015 19:16:18 -0700 Subject: [PATCH 13/20] Added helper method for private methods and changed the test logic to check for Sources containing or not containing StreamingSource --- .../streaming/StreamingContextSuite.scala | 33 ++++++++++++------- 1 file changed, 21 insertions(+), 12 deletions(-) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala index 7422797497df..300937f35e51 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala @@ -21,19 +21,23 @@ import java.io.{File, NotSerializableException} import java.util.concurrent.atomic.AtomicInteger import org.apache.commons.io.FileUtils +import org.apache.spark.metrics.MetricsSystem +import org.apache.spark.metrics.source.Source import org.scalatest.concurrent.Eventually._ import org.scalatest.concurrent.Timeouts import org.scalatest.exceptions.TestFailedDueToTimeoutException import org.scalatest.time.SpanSugar._ -import org.scalatest.{Assertions, BeforeAndAfter} +import org.scalatest.{PrivateMethodTester, Assertions, BeforeAndAfter} import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.dstream.DStream import org.apache.spark.streaming.receiver.Receiver import org.apache.spark.util.Utils + import org.apache.spark._ import scala.collection._ +import scala.collection.mutable.ArrayBuffer class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with Timeouts with Logging { @@ -303,17 +307,20 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with Timeo val conf = new SparkConf().setMaster(master).setAppName(appName) ssc = new StreamingContext(conf, batchDuration) addInputStream(ssc).register() - ssc.start() - assert(ssc.getState() === StreamingContextState.INITIALIZED) - assert(StreamingContextSuite.sources.get(StreamingContextSuite.streamingSource)!= "null") + assert(ssc.getState() === StreamingContextState.INITIALIZED) + val sources: ArrayBuffer[Source] = StreamingContextSuite.getSources(ssc.env.metricsSystem) + val streamingSource: StreamingSource = StreamingContextSuite.getStreamingSource(ssc) + assert(sources.contains(streamingSource)) assert(ssc.getState() === StreamingContextState.ACTIVE) Thread.sleep(100) ssc.stop() + val sourcesAfterStop: ArrayBuffer[Source] = StreamingContextSuite.getSources(ssc.env.metricsSystem) + val streamingSourceAfterStop: StreamingSource = StreamingContextSuite.getStreamingSource(ssc) assert(ssc.getState() === StreamingContextState.STOPPED) - assert(StreamingContextSuite.sources.get(StreamingContextSuite.streamingSource) == "null") + assert(sourcesAfterStop.contains(streamingSourceAfterStop)) } test("awaitTermination") { @@ -820,13 +827,15 @@ package object testPackage extends Assertions { * Helper methods for testing StreamingContextSuite. * This includes methods to access private methods and fields in StreamingContext and MetricsSystem. */ -object StreamingContextSuite { - val metricsSystemsObject = Class.forName("org.apache.spark.metrics.MetricsSystem") - val sources = metricsSystemsObject.getDeclaredField("sources") - sources.setAccessible(true) - val streamingContextObject = classOf[StreamingContext] - val streamingSource = getClass.getDeclaredField("streamingSource") - streamingSource.setAccessible(true) +private object StreamingContextSuite extends PrivateMethodTester { + private val _sources = PrivateMethod[ArrayBuffer[Source]]('sources) + private def getSources(metricsSystem: MetricsSystem): ArrayBuffer[Source] = { + metricsSystem invokePrivate _sources() + } + private val _streamingSource = PrivateMethod[StreamingSource]('streamingSource) + private def getStreamingSource(streamingContext: StreamingContext): StreamingSource = { + streamingContext invokePrivate _streamingSource() + } } From e37a2f3cc3364f6819205b5eac39d0603eb91ac5 Mon Sep 17 00:00:00 2001 From: Neelesh Srinivas Salian Date: Sun, 12 Jul 2015 07:38:09 -0700 Subject: [PATCH 14/20] Changed import statements to remove unnecessary imports and add specific imports --- .../org/apache/spark/streaming/StreamingContextSuite.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala index 300937f35e51..92d0852bc448 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala @@ -34,9 +34,8 @@ import org.apache.spark.streaming.dstream.DStream import org.apache.spark.streaming.receiver.Receiver import org.apache.spark.util.Utils -import org.apache.spark._ +import org.apache.spark.{Logging, SparkConf, SparkContext, SparkFunSuite} -import scala.collection._ import scala.collection.mutable.ArrayBuffer From f54afcf78819ad30a59318164515457b47c31d7d Mon Sep 17 00:00:00 2001 From: Neelesh Srinivas Salian Date: Sun, 12 Jul 2015 07:43:29 -0700 Subject: [PATCH 15/20] Removed types for fields in test for registering and deregistering metrics --- .../apache/spark/streaming/StreamingContextSuite.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala index 92d0852bc448..2f9d1922c5ac 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala @@ -309,15 +309,15 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with Timeo ssc.start() assert(ssc.getState() === StreamingContextState.INITIALIZED) - val sources: ArrayBuffer[Source] = StreamingContextSuite.getSources(ssc.env.metricsSystem) - val streamingSource: StreamingSource = StreamingContextSuite.getStreamingSource(ssc) + val sources = StreamingContextSuite.getSources(ssc.env.metricsSystem) + val streamingSource = StreamingContextSuite.getStreamingSource(ssc) assert(sources.contains(streamingSource)) assert(ssc.getState() === StreamingContextState.ACTIVE) Thread.sleep(100) ssc.stop() - val sourcesAfterStop: ArrayBuffer[Source] = StreamingContextSuite.getSources(ssc.env.metricsSystem) - val streamingSourceAfterStop: StreamingSource = StreamingContextSuite.getStreamingSource(ssc) + val sourcesAfterStop = StreamingContextSuite.getSources(ssc.env.metricsSystem) + val streamingSourceAfterStop = StreamingContextSuite.getStreamingSource(ssc) assert(ssc.getState() === StreamingContextState.STOPPED) assert(sourcesAfterStop.contains(streamingSourceAfterStop)) } From ea0dc1a74848af8682bb8fc1e0f03b5261591f6e Mon Sep 17 00:00:00 2001 From: Neelesh Srinivas Salian Date: Sun, 12 Jul 2015 15:38:00 -0700 Subject: [PATCH 16/20] Changed imports statements, negated test statement and removed postfix --- .../spark/streaming/StreamingContextSuite.scala | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala index 2f9d1922c5ac..ada77bcca564 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala @@ -19,7 +19,6 @@ package org.apache.spark.streaming import java.io.{File, NotSerializableException} import java.util.concurrent.atomic.AtomicInteger - import org.apache.commons.io.FileUtils import org.apache.spark.metrics.MetricsSystem import org.apache.spark.metrics.source.Source @@ -28,14 +27,11 @@ import org.scalatest.concurrent.Timeouts import org.scalatest.exceptions.TestFailedDueToTimeoutException import org.scalatest.time.SpanSugar._ import org.scalatest.{PrivateMethodTester, Assertions, BeforeAndAfter} - import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.dstream.DStream import org.apache.spark.streaming.receiver.Receiver import org.apache.spark.util.Utils - import org.apache.spark.{Logging, SparkConf, SparkContext, SparkFunSuite} - import scala.collection.mutable.ArrayBuffer @@ -308,7 +304,7 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with Timeo addInputStream(ssc).register() ssc.start() - assert(ssc.getState() === StreamingContextState.INITIALIZED) + //assert(ssc.getState() === StreamingContextState.INITIALIZED) val sources = StreamingContextSuite.getSources(ssc.env.metricsSystem) val streamingSource = StreamingContextSuite.getStreamingSource(ssc) assert(sources.contains(streamingSource)) @@ -319,7 +315,7 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with Timeo val sourcesAfterStop = StreamingContextSuite.getSources(ssc.env.metricsSystem) val streamingSourceAfterStop = StreamingContextSuite.getStreamingSource(ssc) assert(ssc.getState() === StreamingContextState.STOPPED) - assert(sourcesAfterStop.contains(streamingSourceAfterStop)) + assert(!sourcesAfterStop.contains(streamingSourceAfterStop)) } test("awaitTermination") { @@ -829,11 +825,11 @@ package object testPackage extends Assertions { private object StreamingContextSuite extends PrivateMethodTester { private val _sources = PrivateMethod[ArrayBuffer[Source]]('sources) private def getSources(metricsSystem: MetricsSystem): ArrayBuffer[Source] = { - metricsSystem invokePrivate _sources() + metricsSystem.invokePrivate(_sources()) } private val _streamingSource = PrivateMethod[StreamingSource]('streamingSource) private def getStreamingSource(streamingContext: StreamingContext): StreamingSource = { - streamingContext invokePrivate _streamingSource() + streamingContext.invokePrivate(_streamingSource()) } } From a0f1950937c36d7f568c5be545cf72ba5afe36ee Mon Sep 17 00:00:00 2001 From: Neelesh Srinivas Salian Date: Sun, 12 Jul 2015 15:43:39 -0700 Subject: [PATCH 17/20] Removed added comment to Assert for INITIALIZED state --- .../org/apache/spark/streaming/StreamingContextSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala index ada77bcca564..3443ccd282c1 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala @@ -304,7 +304,7 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with Timeo addInputStream(ssc).register() ssc.start() - //assert(ssc.getState() === StreamingContextState.INITIALIZED) + assert(ssc.getState() === StreamingContextState.INITIALIZED) val sources = StreamingContextSuite.getSources(ssc.env.metricsSystem) val streamingSource = StreamingContextSuite.getStreamingSource(ssc) assert(sources.contains(streamingSource)) From 2a812878643a1e97e0113c7be7a195ae3c740b48 Mon Sep 17 00:00:00 2001 From: Neelesh Srinivas Salian Date: Sun, 12 Jul 2015 15:49:13 -0700 Subject: [PATCH 18/20] Removing the INITIALIZED check since after start() the state moves to ACTIVE and this check fails --- .../scala/org/apache/spark/streaming/StreamingContextSuite.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala index 3443ccd282c1..4c56b81e91e0 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala @@ -304,7 +304,6 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with Timeo addInputStream(ssc).register() ssc.start() - assert(ssc.getState() === StreamingContextState.INITIALIZED) val sources = StreamingContextSuite.getSources(ssc.env.metricsSystem) val streamingSource = StreamingContextSuite.getStreamingSource(ssc) assert(sources.contains(streamingSource)) From 5d3af311abe18e0db8cecb36141432af07d3afcb Mon Sep 17 00:00:00 2001 From: Neelesh Srinivas Salian Date: Sun, 12 Jul 2015 16:03:34 -0700 Subject: [PATCH 19/20] Move the INITIALIZED state check to when the ssc is initialized --- .../scala/org/apache/spark/streaming/StreamingContextSuite.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala index 4c56b81e91e0..3cff9d1e7c61 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala @@ -301,6 +301,7 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with Timeo test("registering and de-registering of streamingSource") { val conf = new SparkConf().setMaster(master).setAppName(appName) ssc = new StreamingContext(conf, batchDuration) + assert(ssc.getState() === StreamingContextState.INITIALIZED) addInputStream(ssc).register() ssc.start() From 299a57d0b909b2be968f17723736c66c0e61fdcd Mon Sep 17 00:00:00 2001 From: Neelesh Srinivas Salian Date: Mon, 6 Jul 2015 20:16:51 -0700 Subject: [PATCH 20/20] SPARK-8743: Added the registerSource method call to the start method for the Streaming Context. Added the removeSource method to the stop method. Added comments for both Added // instead of /** for commenting in code Added indentation and Space at the comment on line 578; Registering.. Added test case for de-register metrics and made a change to the scope of the sources ArrayBuffer Added additional variable to check the updated Sources size value to compare with the original size after removal Added the removeSource method in try Removed the assert for the env field, added the registerSource line in the INITIALIZED block and kept the removeSource() in the ACTIVE block Added test to check registering and de-registering of streamingSource Removed unused imports Moved the registerSource() call before line 601 Changed scope of sources and corrected comments for helper Removed extra line in Helper Methods section Added helper method for private methods and changed the test logic to check for Sources containing or not containing StreamingSource Changed import statements to remove unnecessary imports and add specific imports Removed types for fields in test for registering and deregistering metrics Changed imports statements, negated test statement and removed postfix Removed added comment to Assert for INITIALIZED state Removing the INITIALIZED check since after start() the state moves to ACTIVE and this check fails Move the INITIALIZED state check to when the ssc is initialized --- .../spark/streaming/StreamingContext.scala | 12 +++-- .../streaming/StreamingContextSuite.scala | 47 +++++++++++++++++-- 2 files changed, 51 insertions(+), 8 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index 1708f309fc00..4507f97a0e1e 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -192,11 +192,9 @@ class StreamingContext private[streaming] ( None } - /** Register streaming source to metrics system */ + /** Initializing a streaming source to help register metrics system */ private val streamingSource = new StreamingSource(this) - assert(env != null) - assert(env.metricsSystem != null) - env.metricsSystem.registerSource(streamingSource) + private var state: StreamingContextState = INITIALIZED @@ -598,6 +596,9 @@ class StreamingContext private[streaming] ( } shutdownHookRef = Utils.addShutdownHook( StreamingContext.SHUTDOWN_HOOK_PRIORITY)(stopOnShutdown) + // Registering Streaming Metrics at the start of the StreamingContext + assert(env.metricsSystem != null) + env.metricsSystem.registerSource(streamingSource) uiTab.foreach(_.attach()) logInfo("StreamingContext started") case ACTIVE => @@ -674,6 +675,8 @@ class StreamingContext private[streaming] ( logWarning("StreamingContext has already been stopped") case ACTIVE => scheduler.stop(stopGracefully) + // De-registering Streaming Metrics of the StreamingContext + env.metricsSystem.removeSource(streamingSource) uiTab.foreach(_.detach()) StreamingContext.setActiveContext(null) waiter.notifyStop() @@ -688,6 +691,7 @@ class StreamingContext private[streaming] ( } finally { // The state should always be Stopped after calling `stop()`, even if we haven't started yet state = STOPPED + } } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala index 819dd2ccfe91..3cff9d1e7c61 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala @@ -19,19 +19,20 @@ package org.apache.spark.streaming import java.io.{File, NotSerializableException} import java.util.concurrent.atomic.AtomicInteger - import org.apache.commons.io.FileUtils +import org.apache.spark.metrics.MetricsSystem +import org.apache.spark.metrics.source.Source import org.scalatest.concurrent.Eventually._ import org.scalatest.concurrent.Timeouts import org.scalatest.exceptions.TestFailedDueToTimeoutException import org.scalatest.time.SpanSugar._ -import org.scalatest.{Assertions, BeforeAndAfter} - +import org.scalatest.{PrivateMethodTester, Assertions, BeforeAndAfter} import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.dstream.DStream import org.apache.spark.streaming.receiver.Receiver import org.apache.spark.util.Utils -import org.apache.spark.{Logging, SparkConf, SparkContext, SparkException, SparkFunSuite} +import org.apache.spark.{Logging, SparkConf, SparkContext, SparkFunSuite} +import scala.collection.mutable.ArrayBuffer class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with Timeouts with Logging { @@ -297,6 +298,26 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with Timeo Thread.sleep(100) } + test("registering and de-registering of streamingSource") { + val conf = new SparkConf().setMaster(master).setAppName(appName) + ssc = new StreamingContext(conf, batchDuration) + assert(ssc.getState() === StreamingContextState.INITIALIZED) + addInputStream(ssc).register() + ssc.start() + + val sources = StreamingContextSuite.getSources(ssc.env.metricsSystem) + val streamingSource = StreamingContextSuite.getStreamingSource(ssc) + assert(sources.contains(streamingSource)) + assert(ssc.getState() === StreamingContextState.ACTIVE) + Thread.sleep(100) + + ssc.stop() + val sourcesAfterStop = StreamingContextSuite.getSources(ssc.env.metricsSystem) + val streamingSourceAfterStop = StreamingContextSuite.getStreamingSource(ssc) + assert(ssc.getState() === StreamingContextState.STOPPED) + assert(!sourcesAfterStop.contains(streamingSourceAfterStop)) + } + test("awaitTermination") { ssc = new StreamingContext(master, appName, batchDuration) val inputStream = addInputStream(ssc) @@ -796,3 +817,21 @@ package object testPackage extends Assertions { } } } + +/** + * Helper methods for testing StreamingContextSuite. + * This includes methods to access private methods and fields in StreamingContext and MetricsSystem. + */ +private object StreamingContextSuite extends PrivateMethodTester { + private val _sources = PrivateMethod[ArrayBuffer[Source]]('sources) + private def getSources(metricsSystem: MetricsSystem): ArrayBuffer[Source] = { + metricsSystem.invokePrivate(_sources()) + } + private val _streamingSource = PrivateMethod[StreamingSource]('streamingSource) + private def getStreamingSource(streamingContext: StreamingContext): StreamingSource = { + streamingContext.invokePrivate(_streamingSource()) + } +} + + +