Skip to content

Commit b93c97d

Browse files
author
Andrew Or
committed
[SPARK-7501] [STREAMING] DAG visualization: show DStream operations
This is similar to #5999, but for streaming. Roughly 200 lines are tests. One thing to note here is that we already do some kind of scoping thing for call sites, so this patch adds the new RDD operation scoping logic in the same place. Also, this patch adds a `try finally` block to set the relevant variables in a safer way. tdas zsxwing ------------------------ **Before** <img src="https://cloud.githubusercontent.com/assets/2133137/7625996/d88211b8-f9b4-11e4-90b9-e11baa52d6d7.png" width="450px"/> -------------------------- **After** <img src="https://cloud.githubusercontent.com/assets/2133137/7625997/e0878f8c-f9b4-11e4-8df3-7dd611b13c87.png" width="650px"/> Author: Andrew Or <[email protected]> Closes #6034 from andrewor14/dag-viz-streaming and squashes the following commits: 932a64a [Andrew Or] Merge branch 'master' of github.com:apache/spark into dag-viz-streaming e685df9 [Andrew Or] Rename createRDDWith 84d0656 [Andrew Or] Review feedback 697c086 [Andrew Or] Fix tests 53b9936 [Andrew Or] Set scopes for foreachRDD properly 1881802 [Andrew Or] Refactor DStream scope names again af4ba8d [Andrew Or] Merge branch 'master' of github.com:apache/spark into dag-viz-streaming fd07d22 [Andrew Or] Make MQTT lower case f6de871 [Andrew Or] Merge branch 'master' of github.com:apache/spark into dag-viz-streaming 0ca1801 [Andrew Or] Remove a few unnecessary withScopes on aliases fa4e5fb [Andrew Or] Pass in input stream name rather than defining it from within 1af0b0e [Andrew Or] Fix style 074c00b [Andrew Or] Review comments d25a324 [Andrew Or] Merge branch 'master' of github.com:apache/spark into dag-viz-streaming e4a93ac [Andrew Or] Fix tests? 25416dc [Andrew Or] Merge branch 'master' of github.com:apache/spark into dag-viz-streaming 9113183 [Andrew Or] Add tests for DStream scopes b3806ab [Andrew Or] Fix test bb80bbb [Andrew Or] Fix MIMA? 5c30360 [Andrew Or] Merge branch 'master' of github.com:apache/spark into dag-viz-streaming 5703939 [Andrew Or] Rename operations that create InputDStreams 7c4513d [Andrew Or] Group RDDs by DStream operations and batches bf0ab6e [Andrew Or] Merge branch 'master' of github.com:apache/spark into dag-viz-streaming 05c2676 [Andrew Or] Wrap many more methods in withScope c121047 [Andrew Or] Merge branch 'master' of github.com:apache/spark into dag-viz-streaming 65ef3e9 [Andrew Or] Fix NPE a0d3263 [Andrew Or] Scope streaming operations instead of RDD operations
1 parent fcf90b7 commit b93c97d

File tree

14 files changed

+484
-145
lines changed

14 files changed

+484
-145
lines changed

core/src/main/resources/org/apache/spark/ui/static/dagre-d3.min.js

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -678,7 +678,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
678678
*
679679
* Note: Return statements are NOT allowed in the given body.
680680
*/
681-
private def withScope[U](body: => U): U = RDDOperationScope.withScope[U](this)(body)
681+
private[spark] def withScope[U](body: => U): U = RDDOperationScope.withScope[U](this)(body)
682682

683683
// Methods for creating RDDs
684684

core/src/main/scala/org/apache/spark/rdd/RDDOperationScope.scala

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import com.fasterxml.jackson.annotation.JsonInclude.Include
2424
import com.fasterxml.jackson.databind.ObjectMapper
2525
import com.fasterxml.jackson.module.scala.DefaultScalaModule
2626

27-
import org.apache.spark.SparkContext
27+
import org.apache.spark.{Logging, SparkContext}
2828

2929
/**
3030
* A general, named code block representing an operation that instantiates RDDs.
@@ -43,9 +43,8 @@ import org.apache.spark.SparkContext
4343
@JsonPropertyOrder(Array("id", "name", "parent"))
4444
private[spark] class RDDOperationScope(
4545
val name: String,
46-
val parent: Option[RDDOperationScope] = None) {
47-
48-
val id: Int = RDDOperationScope.nextScopeId()
46+
val parent: Option[RDDOperationScope] = None,
47+
val id: String = RDDOperationScope.nextScopeId().toString) {
4948

5049
def toJson: String = {
5150
RDDOperationScope.jsonMapper.writeValueAsString(this)
@@ -75,7 +74,7 @@ private[spark] class RDDOperationScope(
7574
* A collection of utility methods to construct a hierarchical representation of RDD scopes.
7675
* An RDD scope tracks the series of operations that created a given RDD.
7776
*/
78-
private[spark] object RDDOperationScope {
77+
private[spark] object RDDOperationScope extends Logging {
7978
private val jsonMapper = new ObjectMapper().registerModule(DefaultScalaModule)
8079
private val scopeCounter = new AtomicInteger(0)
8180

@@ -88,14 +87,25 @@ private[spark] object RDDOperationScope {
8887

8988
/**
9089
* Execute the given body such that all RDDs created in this body will have the same scope.
91-
* The name of the scope will be the name of the method that immediately encloses this one.
90+
* The name of the scope will be the first method name in the stack trace that is not the
91+
* same as this method's.
9292
*
9393
* Note: Return statements are NOT allowed in body.
9494
*/
9595
private[spark] def withScope[T](
9696
sc: SparkContext,
9797
allowNesting: Boolean = false)(body: => T): T = {
98-
val callerMethodName = Thread.currentThread.getStackTrace()(3).getMethodName
98+
val stackTrace = Thread.currentThread.getStackTrace().tail // ignore "Thread#getStackTrace"
99+
val ourMethodName = stackTrace(1).getMethodName // i.e. withScope
100+
// Climb upwards to find the first method that's called something different
101+
val callerMethodName = stackTrace
102+
.find(_.getMethodName != ourMethodName)
103+
.map(_.getMethodName)
104+
.getOrElse {
105+
// Log a warning just in case, but this should almost certainly never happen
106+
logWarning("No valid method name for this RDD operation scope!")
107+
"N/A"
108+
}
99109
withScope[T](sc, callerMethodName, allowNesting, ignoreParent = false)(body)
100110
}
101111

core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -116,8 +116,8 @@ private[ui] object RDDOperationGraph extends Logging {
116116
// which may be nested inside of other clusters
117117
val rddScopes = rdd.scope.map { scope => scope.getAllScopes }.getOrElse(Seq.empty)
118118
val rddClusters = rddScopes.map { scope =>
119-
val clusterId = scope.name + "_" + scope.id
120-
val clusterName = scope.name
119+
val clusterId = scope.id
120+
val clusterName = scope.name.replaceAll("\\n", "\\\\n")
121121
clusters.getOrElseUpdate(clusterId, new RDDOperationCluster(clusterId, clusterName))
122122
}
123123
// Build the cluster hierarchy for this RDD
@@ -177,7 +177,7 @@ private[ui] object RDDOperationGraph extends Logging {
177177

178178
/** Return the dot representation of a node in an RDDOperationGraph. */
179179
private def makeDotNode(node: RDDOperationNode): String = {
180-
s"""${node.id} [label="${node.name} (${node.id})"]"""
180+
s"""${node.id} [label="${node.name} [${node.id}]"]"""
181181
}
182182

183183
/** Return the dot representation of a subgraph in an RDDOperationGraph. */

core/src/test/scala/org/apache/spark/rdd/RDDOperationScopeSuite.scala

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,13 @@ import org.scalatest.{BeforeAndAfter, FunSuite}
2222
import org.apache.spark.{TaskContext, Partition, SparkContext}
2323

2424
/**
25-
*
25+
* Tests whether scopes are passed from the RDD operation to the RDDs correctly.
2626
*/
2727
class RDDOperationScopeSuite extends FunSuite with BeforeAndAfter {
2828
private var sc: SparkContext = null
2929
private val scope1 = new RDDOperationScope("scope1")
30-
private val scope2 = new RDDOperationScope("scope2", parent = Some(scope1))
31-
private val scope3 = new RDDOperationScope("scope3", parent = Some(scope2))
30+
private val scope2 = new RDDOperationScope("scope2", Some(scope1))
31+
private val scope3 = new RDDOperationScope("scope3", Some(scope2))
3232

3333
before {
3434
sc = new SparkContext("local", "test")
@@ -48,9 +48,9 @@ class RDDOperationScopeSuite extends FunSuite with BeforeAndAfter {
4848
val scope1Json = scope1.toJson
4949
val scope2Json = scope2.toJson
5050
val scope3Json = scope3.toJson
51-
assert(scope1Json === s"""{"id":${scope1.id},"name":"scope1"}""")
52-
assert(scope2Json === s"""{"id":${scope2.id},"name":"scope2","parent":$scope1Json}""")
53-
assert(scope3Json === s"""{"id":${scope3.id},"name":"scope3","parent":$scope2Json}""")
51+
assert(scope1Json === s"""{"id":"${scope1.id}","name":"scope1"}""")
52+
assert(scope2Json === s"""{"id":"${scope2.id}","name":"scope2","parent":$scope1Json}""")
53+
assert(scope3Json === s"""{"id":"${scope3.id}","name":"scope3","parent":$scope2Json}""")
5454
assert(RDDOperationScope.fromJson(scope1Json) === scope1)
5555
assert(RDDOperationScope.fromJson(scope2Json) === scope2)
5656
assert(RDDOperationScope.fromJson(scope3Json) === scope3)

external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,9 @@ class DirectKafkaInputDStream[
6565
val maxRetries = context.sparkContext.getConf.getInt(
6666
"spark.streaming.kafka.maxRetries", 1)
6767

68+
// Keep this consistent with how other streams are named (e.g. "Flume polling stream [2]")
69+
private[streaming] override def name: String = s"Kafka direct stream [$id]"
70+
6871
protected[streaming] override val checkpointData =
6972
new DirectKafkaInputDStreamCheckpointData
7073

external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -189,7 +189,7 @@ object KafkaUtils {
189189
sc: SparkContext,
190190
kafkaParams: Map[String, String],
191191
offsetRanges: Array[OffsetRange]
192-
): RDD[(K, V)] = {
192+
): RDD[(K, V)] = sc.withScope {
193193
val messageHandler = (mmd: MessageAndMetadata[K, V]) => (mmd.key, mmd.message)
194194
val leaders = leadersForRanges(kafkaParams, offsetRanges)
195195
new KafkaRDD[K, V, KD, VD, (K, V)](sc, kafkaParams, offsetRanges, leaders, messageHandler)
@@ -224,7 +224,7 @@ object KafkaUtils {
224224
offsetRanges: Array[OffsetRange],
225225
leaders: Map[TopicAndPartition, Broker],
226226
messageHandler: MessageAndMetadata[K, V] => R
227-
): RDD[R] = {
227+
): RDD[R] = sc.withScope {
228228
val leaderMap = if (leaders.isEmpty) {
229229
leadersForRanges(kafkaParams, offsetRanges)
230230
} else {
@@ -233,7 +233,8 @@ object KafkaUtils {
233233
case (tp: TopicAndPartition, Broker(host, port)) => (tp, (host, port))
234234
}.toMap
235235
}
236-
new KafkaRDD[K, V, KD, VD, R](sc, kafkaParams, offsetRanges, leaderMap, messageHandler)
236+
val cleanedHandler = sc.clean(messageHandler)
237+
new KafkaRDD[K, V, KD, VD, R](sc, kafkaParams, offsetRanges, leaderMap, cleanedHandler)
237238
}
238239

239240
/**
@@ -256,7 +257,7 @@ object KafkaUtils {
256257
valueDecoderClass: Class[VD],
257258
kafkaParams: JMap[String, String],
258259
offsetRanges: Array[OffsetRange]
259-
): JavaPairRDD[K, V] = {
260+
): JavaPairRDD[K, V] = jsc.sc.withScope {
260261
implicit val keyCmt: ClassTag[K] = ClassTag(keyClass)
261262
implicit val valueCmt: ClassTag[V] = ClassTag(valueClass)
262263
implicit val keyDecoderCmt: ClassTag[KD] = ClassTag(keyDecoderClass)
@@ -294,7 +295,7 @@ object KafkaUtils {
294295
offsetRanges: Array[OffsetRange],
295296
leaders: JMap[TopicAndPartition, Broker],
296297
messageHandler: JFunction[MessageAndMetadata[K, V], R]
297-
): JavaRDD[R] = {
298+
): JavaRDD[R] = jsc.sc.withScope {
298299
implicit val keyCmt: ClassTag[K] = ClassTag(keyClass)
299300
implicit val valueCmt: ClassTag[V] = ClassTag(valueClass)
300301
implicit val keyDecoderCmt: ClassTag[KD] = ClassTag(keyDecoderClass)
@@ -348,8 +349,9 @@ object KafkaUtils {
348349
fromOffsets: Map[TopicAndPartition, Long],
349350
messageHandler: MessageAndMetadata[K, V] => R
350351
): InputDStream[R] = {
352+
val cleanedHandler = ssc.sc.clean(messageHandler)
351353
new DirectKafkaInputDStream[K, V, KD, VD, R](
352-
ssc, kafkaParams, fromOffsets, messageHandler)
354+
ssc, kafkaParams, fromOffsets, cleanedHandler)
353355
}
354356

355357
/**
@@ -469,11 +471,12 @@ object KafkaUtils {
469471
implicit val keyDecoderCmt: ClassTag[KD] = ClassTag(keyDecoderClass)
470472
implicit val valueDecoderCmt: ClassTag[VD] = ClassTag(valueDecoderClass)
471473
implicit val recordCmt: ClassTag[R] = ClassTag(recordClass)
474+
val cleanedHandler = jssc.sparkContext.clean(messageHandler.call _)
472475
createDirectStream[K, V, KD, VD, R](
473476
jssc.ssc,
474477
Map(kafkaParams.toSeq: _*),
475478
Map(fromOffsets.mapValues { _.longValue() }.toSeq: _*),
476-
messageHandler.call _
479+
cleanedHandler
477480
)
478481
}
479482

external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@ import org.eclipse.paho.client.mqttv3.MqttMessage
3535
import org.eclipse.paho.client.mqttv3.MqttTopic
3636
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence
3737

38-
import org.apache.spark.Logging
3938
import org.apache.spark.storage.StorageLevel
4039
import org.apache.spark.streaming.StreamingContext
4140
import org.apache.spark.streaming.dstream._
@@ -57,6 +56,8 @@ class MQTTInputDStream(
5756
storageLevel: StorageLevel
5857
) extends ReceiverInputDStream[String](ssc_) {
5958

59+
private[streaming] override def name: String = s"MQTT stream [$id]"
60+
6061
def getReceiver(): Receiver[String] = {
6162
new MQTTReceiver(brokerUrl, topic, storageLevel)
6263
}

streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala

Lines changed: 34 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
3434
import org.apache.spark._
3535
import org.apache.spark.annotation.{DeveloperApi, Experimental}
3636
import org.apache.spark.input.FixedLengthBinaryInputFormat
37-
import org.apache.spark.rdd.RDD
37+
import org.apache.spark.rdd.{RDD, RDDOperationScope}
3838
import org.apache.spark.storage.StorageLevel
3939
import org.apache.spark.streaming.StreamingContextState._
4040
import org.apache.spark.streaming.dstream._
@@ -241,25 +241,45 @@ class StreamingContext private[streaming] (
241241

242242
private[streaming] def getNewInputStreamId() = nextInputStreamId.getAndIncrement()
243243

244+
/**
245+
* Execute a block of code in a scope such that all new DStreams created in this body will
246+
* be part of the same scope. For more detail, see the comments in `doCompute`.
247+
*
248+
* Note: Return statements are NOT allowed in the given body.
249+
*/
250+
private[streaming] def withScope[U](body: => U): U = sparkContext.withScope(body)
251+
252+
/**
253+
* Execute a block of code in a scope such that all new DStreams created in this body will
254+
* be part of the same scope. For more detail, see the comments in `doCompute`.
255+
*
256+
* Note: Return statements are NOT allowed in the given body.
257+
*/
258+
private[streaming] def withNamedScope[U](name: String)(body: => U): U = {
259+
RDDOperationScope.withScope(sc, name, allowNesting = false, ignoreParent = false)(body)
260+
}
261+
244262
/**
245263
* Create an input stream with any arbitrary user implemented receiver.
246264
* Find more details at: http://spark.apache.org/docs/latest/streaming-custom-receivers.html
247265
* @param receiver Custom implementation of Receiver
248266
*/
249267
@deprecated("Use receiverStream", "1.0.0")
250-
def networkStream[T: ClassTag](
251-
receiver: Receiver[T]): ReceiverInputDStream[T] = {
252-
receiverStream(receiver)
268+
def networkStream[T: ClassTag](receiver: Receiver[T]): ReceiverInputDStream[T] = {
269+
withNamedScope("network stream") {
270+
receiverStream(receiver)
271+
}
253272
}
254273

255274
/**
256275
* Create an input stream with any arbitrary user implemented receiver.
257276
* Find more details at: http://spark.apache.org/docs/latest/streaming-custom-receivers.html
258277
* @param receiver Custom implementation of Receiver
259278
*/
260-
def receiverStream[T: ClassTag](
261-
receiver: Receiver[T]): ReceiverInputDStream[T] = {
262-
new PluggableInputDStream[T](this, receiver)
279+
def receiverStream[T: ClassTag](receiver: Receiver[T]): ReceiverInputDStream[T] = {
280+
withNamedScope("receiver stream") {
281+
new PluggableInputDStream[T](this, receiver)
282+
}
263283
}
264284

265285
/**
@@ -279,7 +299,7 @@ class StreamingContext private[streaming] (
279299
name: String,
280300
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2,
281301
supervisorStrategy: SupervisorStrategy = ActorSupervisorStrategy.defaultStrategy
282-
): ReceiverInputDStream[T] = {
302+
): ReceiverInputDStream[T] = withNamedScope("actor stream") {
283303
receiverStream(new ActorReceiver[T](props, name, storageLevel, supervisorStrategy))
284304
}
285305

@@ -296,7 +316,7 @@ class StreamingContext private[streaming] (
296316
hostname: String,
297317
port: Int,
298318
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
299-
): ReceiverInputDStream[String] = {
319+
): ReceiverInputDStream[String] = withNamedScope("socket text stream") {
300320
socketStream[String](hostname, port, SocketReceiver.bytesToLines, storageLevel)
301321
}
302322

@@ -334,7 +354,7 @@ class StreamingContext private[streaming] (
334354
hostname: String,
335355
port: Int,
336356
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
337-
): ReceiverInputDStream[T] = {
357+
): ReceiverInputDStream[T] = withNamedScope("raw socket stream") {
338358
new RawInputDStream[T](this, hostname, port, storageLevel)
339359
}
340360

@@ -408,7 +428,7 @@ class StreamingContext private[streaming] (
408428
* file system. File names starting with . are ignored.
409429
* @param directory HDFS directory to monitor for new file
410430
*/
411-
def textFileStream(directory: String): DStream[String] = {
431+
def textFileStream(directory: String): DStream[String] = withNamedScope("text file stream") {
412432
fileStream[LongWritable, Text, TextInputFormat](directory).map(_._2.toString)
413433
}
414434

@@ -430,7 +450,7 @@ class StreamingContext private[streaming] (
430450
@Experimental
431451
def binaryRecordsStream(
432452
directory: String,
433-
recordLength: Int): DStream[Array[Byte]] = {
453+
recordLength: Int): DStream[Array[Byte]] = withNamedScope("binary records stream") {
434454
val conf = sc_.hadoopConfiguration
435455
conf.setInt(FixedLengthBinaryInputFormat.RECORD_LENGTH_PROPERTY, recordLength)
436456
val br = fileStream[LongWritable, BytesWritable, FixedLengthBinaryInputFormat](
@@ -477,7 +497,7 @@ class StreamingContext private[streaming] (
477497
/**
478498
* Create a unified DStream from multiple DStreams of the same type and same slide duration.
479499
*/
480-
def union[T: ClassTag](streams: Seq[DStream[T]]): DStream[T] = {
500+
def union[T: ClassTag](streams: Seq[DStream[T]]): DStream[T] = withScope {
481501
new UnionDStream[T](streams.toArray)
482502
}
483503

@@ -488,7 +508,7 @@ class StreamingContext private[streaming] (
488508
def transform[T: ClassTag](
489509
dstreams: Seq[DStream[_]],
490510
transformFunc: (Seq[RDD[_]], Time) => RDD[T]
491-
): DStream[T] = {
511+
): DStream[T] = withScope {
492512
new TransformedDStream[T](dstreams, sparkContext.clean(transformFunc))
493513
}
494514

0 commit comments

Comments
 (0)