Skip to content

Commit 729952a

Browse files
mubarakandrewor14
authored andcommitted
[SPARK-1853] Show Streaming application code context (file, line number) in Spark Stages UI
This is a refactored version of the original PR apache#1723 my mubarak Please take a look andrewor14, mubarak Author: Mubarak Seyed <[email protected]> Author: Tathagata Das <[email protected]> Closes apache#2464 from tdas/streaming-callsite and squashes the following commits: dc54c71 [Tathagata Das] Made changes based on PR comments. 390b45d [Tathagata Das] Fixed minor bugs. 904cd92 [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into streaming-callsite 7baa427 [Tathagata Das] Refactored getCallSite and setCallSite to make it simpler. Also added unit test for DStream creation site. b9ed945 [Mubarak Seyed] Adding streaming utils c461cf4 [Mubarak Seyed] Merge remote-tracking branch 'upstream/master' ceb43da [Mubarak Seyed] Changing default regex function name 8c5d443 [Mubarak Seyed] Merge remote-tracking branch 'upstream/master' 196121b [Mubarak Seyed] Merge remote-tracking branch 'upstream/master' 491a1eb [Mubarak Seyed] Removing streaming visibility from getRDDCreationCallSite in DStream 33a7295 [Mubarak Seyed] Fixing review comments: Merging both setCallSite methods c26d933 [Mubarak Seyed] Merge remote-tracking branch 'upstream/master' f51fd9f [Mubarak Seyed] Fixing scalastyle, Regex for Utils.getCallSite, and changing method names in DStream 5051c58 [Mubarak Seyed] Getting return value of compute() into variable and call setCallSite(prevCallSite) only once. Adding return for other code paths (for None) a207eb7 [Mubarak Seyed] Fixing code review comments ccde038 [Mubarak Seyed] Removing Utils import from MappedDStream 2a09ad6 [Mubarak Seyed] Changes in Utils.scala for SPARK-1853 1d90cc3 [Mubarak Seyed] Changes for SPARK-1853 5f3105a [Mubarak Seyed] Merge remote-tracking branch 'upstream/master' 70f494f [Mubarak Seyed] Changes for SPARK-1853 1500deb [Mubarak Seyed] Changes in Spark Streaming UI 9d38d3c [Mubarak Seyed] [SPARK-1853] Show Streaming application code context (file, line number) in Spark Stages UI d466d75 [Mubarak Seyed] Changes for spark streaming UI
1 parent b3fef50 commit 729952a

File tree

6 files changed

+153
-58
lines changed

6 files changed

+153
-58
lines changed

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1030,28 +1030,40 @@ class SparkContext(config: SparkConf) extends Logging {
10301030
}
10311031

10321032
/**
1033-
* Support function for API backtraces.
1033+
* Set the thread-local property for overriding the call sites
1034+
* of actions and RDDs.
10341035
*/
1035-
def setCallSite(site: String) {
1036-
setLocalProperty("externalCallSite", site)
1036+
def setCallSite(shortCallSite: String) {
1037+
setLocalProperty(CallSite.SHORT_FORM, shortCallSite)
10371038
}
10381039

10391040
/**
1040-
* Support function for API backtraces.
1041+
* Set the thread-local property for overriding the call sites
1042+
* of actions and RDDs.
1043+
*/
1044+
private[spark] def setCallSite(callSite: CallSite) {
1045+
setLocalProperty(CallSite.SHORT_FORM, callSite.shortForm)
1046+
setLocalProperty(CallSite.LONG_FORM, callSite.longForm)
1047+
}
1048+
1049+
/**
1050+
* Clear the thread-local property for overriding the call sites
1051+
* of actions and RDDs.
10411052
*/
10421053
def clearCallSite() {
1043-
setLocalProperty("externalCallSite", null)
1054+
setLocalProperty(CallSite.SHORT_FORM, null)
1055+
setLocalProperty(CallSite.LONG_FORM, null)
10441056
}
10451057

10461058
/**
10471059
* Capture the current user callsite and return a formatted version for printing. If the user
1048-
* has overridden the call site, this will return the user's version.
1060+
* has overridden the call site using `setCallSite()`, this will return the user's version.
10491061
*/
10501062
private[spark] def getCallSite(): CallSite = {
1051-
Option(getLocalProperty("externalCallSite")) match {
1052-
case Some(callSite) => CallSite(callSite, longForm = "")
1053-
case None => Utils.getCallSite
1054-
}
1063+
Option(getLocalProperty(CallSite.SHORT_FORM)).map { case shortCallSite =>
1064+
val longCallSite = Option(getLocalProperty(CallSite.LONG_FORM)).getOrElse("")
1065+
CallSite(shortCallSite, longCallSite)
1066+
}.getOrElse(Utils.getCallSite())
10551067
}
10561068

10571069
/**

core/src/main/scala/org/apache/spark/rdd/RDD.scala

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
package org.apache.spark.rdd
1919

20-
import java.util.Random
20+
import java.util.{Properties, Random}
2121

2222
import scala.collection.{mutable, Map}
2323
import scala.collection.mutable.ArrayBuffer
@@ -41,7 +41,7 @@ import org.apache.spark.partial.CountEvaluator
4141
import org.apache.spark.partial.GroupedCountEvaluator
4242
import org.apache.spark.partial.PartialResult
4343
import org.apache.spark.storage.StorageLevel
44-
import org.apache.spark.util.{BoundedPriorityQueue, Utils}
44+
import org.apache.spark.util.{BoundedPriorityQueue, Utils, CallSite}
4545
import org.apache.spark.util.collection.OpenHashMap
4646
import org.apache.spark.util.random.{BernoulliSampler, PoissonSampler, SamplingUtils}
4747

@@ -1224,7 +1224,8 @@ abstract class RDD[T: ClassTag](
12241224
private var storageLevel: StorageLevel = StorageLevel.NONE
12251225

12261226
/** User code that created this RDD (e.g. `textFile`, `parallelize`). */
1227-
@transient private[spark] val creationSite = Utils.getCallSite
1227+
@transient private[spark] val creationSite = sc.getCallSite()
1228+
12281229
private[spark] def getCreationSite: String = Option(creationSite).map(_.shortForm).getOrElse("")
12291230

12301231
private[spark] def elementClassTag: ClassTag[T] = classTag[T]

core/src/main/scala/org/apache/spark/util/Utils.scala

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,11 @@ import org.apache.spark.serializer.{DeserializationStream, SerializationStream,
4949
/** CallSite represents a place in user code. It can have a short and a long form. */
5050
private[spark] case class CallSite(shortForm: String, longForm: String)
5151

52+
private[spark] object CallSite {
53+
val SHORT_FORM = "callSite.short"
54+
val LONG_FORM = "callSite.long"
55+
}
56+
5257
/**
5358
* Various utility methods used by Spark.
5459
*/
@@ -859,18 +864,26 @@ private[spark] object Utils extends Logging {
859864
}
860865
}
861866

862-
/**
863-
* A regular expression to match classes of the "core" Spark API that we want to skip when
864-
* finding the call site of a method.
865-
*/
866-
private val SPARK_CLASS_REGEX = """^org\.apache\.spark(\.api\.java)?(\.util)?(\.rdd)?\.[A-Z]""".r
867+
/** Default filtering function for finding call sites using `getCallSite`. */
868+
private def coreExclusionFunction(className: String): Boolean = {
869+
// A regular expression to match classes of the "core" Spark API that we want to skip when
870+
// finding the call site of a method.
871+
val SPARK_CORE_CLASS_REGEX = """^org\.apache\.spark(\.api\.java)?(\.util)?(\.rdd)?\.[A-Z]""".r
872+
val SCALA_CLASS_REGEX = """^scala""".r
873+
val isSparkCoreClass = SPARK_CORE_CLASS_REGEX.findFirstIn(className).isDefined
874+
val isScalaClass = SCALA_CLASS_REGEX.findFirstIn(className).isDefined
875+
// If the class is a Spark internal class or a Scala class, then exclude.
876+
isSparkCoreClass || isScalaClass
877+
}
867878

868879
/**
869880
* When called inside a class in the spark package, returns the name of the user code class
870881
* (outside the spark package) that called into Spark, as well as which Spark method they called.
871882
* This is used, for example, to tell users where in their code each RDD got created.
883+
*
884+
* @param skipClass Function that is used to exclude non-user-code classes.
872885
*/
873-
def getCallSite: CallSite = {
886+
def getCallSite(skipClass: String => Boolean = coreExclusionFunction): CallSite = {
874887
val trace = Thread.currentThread.getStackTrace()
875888
.filterNot { ste:StackTraceElement =>
876889
// When running under some profilers, the current stack trace might contain some bogus
@@ -891,7 +904,7 @@ private[spark] object Utils extends Logging {
891904

892905
for (el <- trace) {
893906
if (insideSpark) {
894-
if (SPARK_CLASS_REGEX.findFirstIn(el.getClassName).isDefined) {
907+
if (skipClass(el.getClassName)) {
895908
lastSparkMethod = if (el.getMethodName == "<init>") {
896909
// Spark method is a constructor; get its class name
897910
el.getClassName.substring(el.getClassName.lastIndexOf('.') + 1)

streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,10 +35,9 @@ import org.apache.spark._
3535
import org.apache.spark.rdd.RDD
3636
import org.apache.spark.storage.StorageLevel
3737
import org.apache.spark.streaming.dstream._
38-
import org.apache.spark.streaming.receiver.{ActorSupervisorStrategy, ActorReceiver, Receiver}
38+
import org.apache.spark.streaming.receiver.{ActorReceiver, ActorSupervisorStrategy, Receiver}
3939
import org.apache.spark.streaming.scheduler._
4040
import org.apache.spark.streaming.ui.{StreamingJobProgressListener, StreamingTab}
41-
import org.apache.spark.util.MetadataCleaner
4241

4342
/**
4443
* Main entry point for Spark Streaming functionality. It provides methods used to create
@@ -448,6 +447,7 @@ class StreamingContext private[streaming] (
448447
throw new SparkException("StreamingContext has already been stopped")
449448
}
450449
validate()
450+
sparkContext.setCallSite(DStream.getCreationSite())
451451
scheduler.start()
452452
state = Started
453453
}

streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala

Lines changed: 62 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -23,14 +23,15 @@ import java.io.{IOException, ObjectInputStream, ObjectOutputStream}
2323
import scala.deprecated
2424
import scala.collection.mutable.HashMap
2525
import scala.reflect.ClassTag
26+
import scala.util.matching.Regex
2627

2728
import org.apache.spark.{Logging, SparkException}
2829
import org.apache.spark.rdd.{BlockRDD, RDD}
2930
import org.apache.spark.storage.StorageLevel
3031
import org.apache.spark.streaming._
3132
import org.apache.spark.streaming.StreamingContext._
3233
import org.apache.spark.streaming.scheduler.Job
33-
import org.apache.spark.util.MetadataCleaner
34+
import org.apache.spark.util.{CallSite, MetadataCleaner}
3435

3536
/**
3637
* A Discretized Stream (DStream), the basic abstraction in Spark Streaming, is a continuous
@@ -106,6 +107,9 @@ abstract class DStream[T: ClassTag] (
106107
/** Return the StreamingContext associated with this DStream */
107108
def context = ssc
108109

110+
/* Set the creation call site */
111+
private[streaming] val creationSite = DStream.getCreationSite()
112+
109113
/** Persist the RDDs of this DStream with the given storage level */
110114
def persist(level: StorageLevel): DStream[T] = {
111115
if (this.isInitialized) {
@@ -272,43 +276,41 @@ abstract class DStream[T: ClassTag] (
272276
}
273277

274278
/**
275-
* Retrieve a precomputed RDD of this DStream, or computes the RDD. This is an internal
276-
* method that should not be called directly.
279+
* Get the RDD corresponding to the given time; either retrieve it from cache
280+
* or compute-and-cache it.
277281
*/
278282
private[streaming] def getOrCompute(time: Time): Option[RDD[T]] = {
279-
// If this DStream was not initialized (i.e., zeroTime not set), then do it
280-
// If RDD was already generated, then retrieve it from HashMap
281-
generatedRDDs.get(time) match {
282-
283-
// If an RDD was already generated and is being reused, then
284-
// probably all RDDs in this DStream will be reused and hence should be cached
285-
case Some(oldRDD) => Some(oldRDD)
286-
287-
// if RDD was not generated, and if the time is valid
288-
// (based on sliding time of this DStream), then generate the RDD
289-
case None => {
290-
if (isTimeValid(time)) {
291-
compute(time) match {
292-
case Some(newRDD) =>
293-
if (storageLevel != StorageLevel.NONE) {
294-
newRDD.persist(storageLevel)
295-
logInfo("Persisting RDD " + newRDD.id + " for time " +
296-
time + " to " + storageLevel + " at time " + time)
297-
}
298-
if (checkpointDuration != null &&
299-
(time - zeroTime).isMultipleOf(checkpointDuration)) {
300-
newRDD.checkpoint()
301-
logInfo("Marking RDD " + newRDD.id + " for time " + time +
302-
" for checkpointing at time " + time)
303-
}
304-
generatedRDDs.put(time, newRDD)
305-
Some(newRDD)
306-
case None =>
307-
None
283+
// If RDD was already generated, then retrieve it from HashMap,
284+
// or else compute the RDD
285+
generatedRDDs.get(time).orElse {
286+
// Compute the RDD if time is valid (e.g. correct time in a sliding window)
287+
// of RDD generation, else generate nothing.
288+
if (isTimeValid(time)) {
289+
// Set the thread-local property for call sites to this DStream's creation site
290+
// such that RDDs generated by compute gets that as their creation site.
291+
// Note that this `getOrCompute` may get called from another DStream which may have
292+
// set its own call site. So we store its call site in a temporary variable,
293+
// set this DStream's creation site, generate RDDs and then restore the previous call site.
294+
val prevCallSite = ssc.sparkContext.getCallSite()
295+
ssc.sparkContext.setCallSite(creationSite)
296+
val rddOption = compute(time)
297+
ssc.sparkContext.setCallSite(prevCallSite)
298+
299+
rddOption.foreach { case newRDD =>
300+
// Register the generated RDD for caching and checkpointing
301+
if (storageLevel != StorageLevel.NONE) {
302+
newRDD.persist(storageLevel)
303+
logDebug(s"Persisting RDD ${newRDD.id} for time $time to $storageLevel")
308304
}
309-
} else {
310-
None
305+
if (checkpointDuration != null && (time - zeroTime).isMultipleOf(checkpointDuration)) {
306+
newRDD.checkpoint()
307+
logInfo(s"Marking RDD ${newRDD.id} for time $time for checkpointing")
308+
}
309+
generatedRDDs.put(time, newRDD)
311310
}
311+
rddOption
312+
} else {
313+
None
312314
}
313315
}
314316
}
@@ -799,3 +801,29 @@ abstract class DStream[T: ClassTag] (
799801
this
800802
}
801803
}
804+
805+
private[streaming] object DStream {
806+
807+
/** Get the creation site of a DStream from the stack trace of when the DStream is created. */
808+
def getCreationSite(): CallSite = {
809+
val SPARK_CLASS_REGEX = """^org\.apache\.spark""".r
810+
val SPARK_STREAMING_TESTCLASS_REGEX = """^org\.apache\.spark\.streaming\.test""".r
811+
val SPARK_EXAMPLES_CLASS_REGEX = """^org\.apache\.spark\.examples""".r
812+
val SCALA_CLASS_REGEX = """^scala""".r
813+
814+
/** Filtering function that excludes non-user classes for a streaming application */
815+
def streamingExclustionFunction(className: String): Boolean = {
816+
def doesMatch(r: Regex) = r.findFirstIn(className).isDefined
817+
val isSparkClass = doesMatch(SPARK_CLASS_REGEX)
818+
val isSparkExampleClass = doesMatch(SPARK_EXAMPLES_CLASS_REGEX)
819+
val isSparkStreamingTestClass = doesMatch(SPARK_STREAMING_TESTCLASS_REGEX)
820+
val isScalaClass = doesMatch(SCALA_CLASS_REGEX)
821+
822+
// If the class is a spark example class or a streaming test class then it is considered
823+
// as a streaming application class and don't exclude. Otherwise, exclude any
824+
// non-Spark and non-Scala class, as the rest would streaming application classes.
825+
(isSparkClass || isScalaClass) && !isSparkExampleClass && !isSparkStreamingTestClass
826+
}
827+
org.apache.spark.util.Utils.getCallSite(streamingExclustionFunction)
828+
}
829+
}

streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala

Lines changed: 43 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,16 @@ package org.apache.spark.streaming
1919

2020
import java.util.concurrent.atomic.AtomicInteger
2121

22+
import scala.language.postfixOps
23+
2224
import org.apache.spark.{Logging, SparkConf, SparkContext, SparkException}
2325
import org.apache.spark.storage.StorageLevel
2426
import org.apache.spark.streaming.dstream.DStream
2527
import org.apache.spark.streaming.receiver.Receiver
26-
import org.apache.spark.util.{MetadataCleaner, Utils}
27-
import org.scalatest.{BeforeAndAfter, FunSuite}
28+
import org.apache.spark.util.Utils
29+
import org.scalatest.{Assertions, BeforeAndAfter, FunSuite}
2830
import org.scalatest.concurrent.Timeouts
31+
import org.scalatest.concurrent.Eventually._
2932
import org.scalatest.exceptions.TestFailedDueToTimeoutException
3033
import org.scalatest.time.SpanSugar._
3134

@@ -257,6 +260,10 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
257260
assert(exception.getMessage.contains("transform"), "Expected exception not thrown")
258261
}
259262

263+
test("DStream and generated RDD creation sites") {
264+
testPackage.test()
265+
}
266+
260267
def addInputStream(s: StreamingContext): DStream[Int] = {
261268
val input = (1 to 100).map(i => (1 to i))
262269
val inputStream = new TestInputStream(s, input, 1)
@@ -293,3 +300,37 @@ class TestReceiver extends Receiver[Int](StorageLevel.MEMORY_ONLY) with Logging
293300
object TestReceiver {
294301
val counter = new AtomicInteger(1)
295302
}
303+
304+
/** Streaming application for testing DStream and RDD creation sites */
305+
package object testPackage extends Assertions {
306+
def test() {
307+
val conf = new SparkConf().setMaster("local").setAppName("CreationSite test")
308+
val ssc = new StreamingContext(conf , Milliseconds(100))
309+
try {
310+
val inputStream = ssc.receiverStream(new TestReceiver)
311+
312+
// Verify creation site of DStream
313+
val creationSite = inputStream.creationSite
314+
assert(creationSite.shortForm.contains("receiverStream") &&
315+
creationSite.shortForm.contains("StreamingContextSuite")
316+
)
317+
assert(creationSite.longForm.contains("testPackage"))
318+
319+
// Verify creation site of generated RDDs
320+
var rddGenerated = false
321+
var rddCreationSiteCorrect = true
322+
323+
inputStream.foreachRDD { rdd =>
324+
rddCreationSiteCorrect = rdd.creationSite == creationSite
325+
rddGenerated = true
326+
}
327+
ssc.start()
328+
329+
eventually(timeout(10000 millis), interval(10 millis)) {
330+
assert(rddGenerated && rddCreationSiteCorrect, "RDD creation site was not correct")
331+
}
332+
} finally {
333+
ssc.stop()
334+
}
335+
}
336+
}

0 commit comments

Comments
 (0)