Skip to content

Commit ecbd70b

Browse files
author
Robert Kruszewski
committed
fix invoke private usage
1 parent deea685 commit ecbd70b

File tree

3 files changed

+12
-17
lines changed

3 files changed

+12
-17
lines changed

core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,11 @@ private[spark] class MetricsSystem private (
159159
} else { defaultName }
160160
}
161161

162+
def getSources: Seq[Source] =
163+
sourceToListeners.keySet.to[collection.immutable.Seq]
164+
165+
def getSinks: Seq[Sink] = sinks.to[collection.immutable.Seq]
166+
162167
def getSourcesByName(sourceName: String): Seq[Source] =
163168
sourceToListeners.keySet.filter(_.sourceName == sourceName).toSeq
164169

core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,6 @@
1717

1818
package org.apache.spark.metrics
1919

20-
import scala.collection.mutable.ArrayBuffer
21-
2220
import com.codahale.metrics.{Gauge, MetricRegistry}
2321
import org.scalatest.{BeforeAndAfter, PrivateMethodTester}
2422

@@ -42,27 +40,23 @@ class MetricsSystemSuite extends SparkFunSuite with BeforeAndAfter with PrivateM
4240
test("MetricsSystem with default config") {
4341
val metricsSystem = MetricsSystem.createMetricsSystem("default", conf, securityMgr)
4442
metricsSystem.start()
45-
val sources = PrivateMethod[ArrayBuffer[Source]](Symbol("sources"))
46-
val sinks = PrivateMethod[ArrayBuffer[Sink]](Symbol("sinks"))
4743

48-
assert(metricsSystem.invokePrivate(sources()).length === StaticSources.allSources.length)
49-
assert(metricsSystem.invokePrivate(sinks()).length === 0)
44+
assert(metricsSystem.getSources.length === StaticSources.allSources.length)
45+
assert(metricsSystem.getSinks.length === 0)
5046
assert(metricsSystem.getServletHandlers.nonEmpty)
5147
}
5248

5349
test("MetricsSystem with sources add") {
5450
val metricsSystem = MetricsSystem.createMetricsSystem("test", conf, securityMgr)
5551
metricsSystem.start()
56-
val sources = PrivateMethod[ArrayBuffer[Source]](Symbol("sources"))
57-
val sinks = PrivateMethod[ArrayBuffer[Sink]](Symbol("sinks"))
5852

59-
assert(metricsSystem.invokePrivate(sources()).length === StaticSources.allSources.length)
60-
assert(metricsSystem.invokePrivate(sinks()).length === 1)
53+
assert(metricsSystem.getSources.length === StaticSources.allSources.length)
54+
assert(metricsSystem.getSinks.length === 1)
6155
assert(metricsSystem.getServletHandlers.nonEmpty)
6256

6357
val source = new MasterSource(null)
6458
metricsSystem.registerSource(source)
65-
assert(metricsSystem.invokePrivate(sources()).length === StaticSources.allSources.length + 1)
59+
assert(metricsSystem.getSources.length === StaticSources.allSources.length + 1)
6660
}
6761

6862
test("MetricsSystem with Driver instance") {

streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -374,13 +374,13 @@ class StreamingContextSuite
374374
addInputStream(ssc).register()
375375
ssc.start()
376376

377-
val sources = StreamingContextSuite.getSources(ssc.env.metricsSystem)
377+
val sources = ssc.env.metricsSystem.getSources
378378
val streamingSource = StreamingContextSuite.getStreamingSource(ssc)
379379
assert(sources.contains(streamingSource))
380380
assert(ssc.getState() === StreamingContextState.ACTIVE)
381381

382382
ssc.stop()
383-
val sourcesAfterStop = StreamingContextSuite.getSources(ssc.env.metricsSystem)
383+
val sourcesAfterStop = ssc.env.metricsSystem.getSources
384384
val streamingSourceAfterStop = StreamingContextSuite.getStreamingSource(ssc)
385385
assert(ssc.getState() === StreamingContextState.STOPPED)
386386
assert(!sourcesAfterStop.contains(streamingSourceAfterStop))
@@ -1026,10 +1026,6 @@ package object testPackage extends Assertions {
10261026
* This includes methods to access private methods and fields in StreamingContext and MetricsSystem
10271027
*/
10281028
private object StreamingContextSuite extends PrivateMethodTester {
1029-
private val _sources = PrivateMethod[ArrayBuffer[Source]](Symbol("sources"))
1030-
private def getSources(metricsSystem: MetricsSystem): ArrayBuffer[Source] = {
1031-
metricsSystem.invokePrivate(_sources())
1032-
}
10331029
private val _streamingSource = PrivateMethod[StreamingSource](Symbol("streamingSource"))
10341030
private def getStreamingSource(streamingContext: StreamingContext): StreamingSource = {
10351031
streamingContext.invokePrivate(_streamingSource())

0 commit comments

Comments
 (0)