Skip to content

Commit d466d75

Browse files
committed
Changes for spark streaming UI
1 parent 9c24974 commit d466d75

File tree

7 files changed

+40
-11
lines changed

7 files changed

+40
-11
lines changed

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1296,6 +1296,9 @@ object SparkContext extends Logging {
12961296

12971297
private[spark] val SPARK_UNKNOWN_USER = "<unknown>"
12981298

1299+
private[spark] val SPARK_JOB_CALL_SITE_SHORT = "spark.job.callSiteShort"
1300+
private[spark] val SPARK_JOB_CALL_SITE_LONG = "spark.job.callSiteLong"
1301+
12991302
implicit object DoubleAccumulatorParam extends AccumulatorParam[Double] {
13001303
def addInPlace(t1: Double, t2: Double): Double = t1 + t2
13011304
def zero(initialValue: Double) = 0.0

core/src/main/scala/org/apache/spark/rdd/RDD.scala

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
package org.apache.spark.rdd
1919

20-
import java.util.Random
20+
import java.util.{Properties, Random}
2121

2222
import scala.collection.{mutable, Map}
2323
import scala.collection.mutable.ArrayBuffer
@@ -1213,7 +1213,17 @@ abstract class RDD[T: ClassTag](
12131213
private var storageLevel: StorageLevel = StorageLevel.NONE
12141214

12151215
/** User code that created this RDD (e.g. `textFile`, `parallelize`). */
1216-
@transient private[spark] val creationSite = Utils.getCallSite
1216+
@transient private[spark] val creationSite = {
1217+
val short: String = sc.getLocalProperty("spark.job.callSiteShort")
1218+
if (short != null) {
1219+
CallSite(short, sc.getLocalProperty("spark.job.callSiteLong"))
1220+
} else {
1221+
val callSite: CallSite = Utils.getCallSite
1222+
//sc.setLocalProperty("spark.job.callSiteShort", callSite.short)
1223+
//sc.setLocalProperty("spark.job.callSiteLong", callSite.long)
1224+
callSite
1225+
}
1226+
}
12171227
private[spark] def getCreationSite: String = Option(creationSite).map(_.short).getOrElse("")
12181228

12191229
private[spark] def elementClassTag: ClassTag[T] = classTag[T]

core/src/main/scala/org/apache/spark/util/Utils.scala

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -800,7 +800,10 @@ private[spark] object Utils extends Logging {
800800
* A regular expression to match classes of the "core" Spark API that we want to skip when
801801
* finding the call site of a method.
802802
*/
803-
private val SPARK_CLASS_REGEX = """^org\.apache\.spark(\.api\.java)?(\.util)?(\.rdd)?\.[A-Z]""".r
803+
private val SPARK_CLASS_REGEX = """^org\.apache\.spark(\.api\.java)?(\.util)?(\.rdd)?(\.streaming)?(\.streaming\.dstream)?(\.streaming\.scheduler)?\.[A-Z]""".r
804+
private val SCALA_CLASS_REGEX = """^scala(\.util)?(\.collection)?(\.collection\.mutable)?(\.collection\.immutable)?(\.concurrent\.forkjoin)?\.[A-Z]""".r
805+
private val AKKA_CLASS_REGEX = """^akka(\.actor)?(\.dispatch)?\.[A-Z]""".r
806+
private val JAVA_CLASS_REGEX = """^java(\.util\.concurrent)?(\.lang)?\.[A-Z]""".r
804807

805808
/**
806809
* When called inside a class in the spark package, returns the name of the user code class
@@ -828,7 +831,10 @@ private[spark] object Utils extends Logging {
828831

829832
for (el <- trace) {
830833
if (insideSpark) {
831-
if (SPARK_CLASS_REGEX.findFirstIn(el.getClassName).isDefined) {
834+
if (SPARK_CLASS_REGEX.findFirstIn(el.getClassName).isDefined ||
835+
SCALA_CLASS_REGEX.findFirstIn(el.getClassName).isDefined ||
836+
AKKA_CLASS_REGEX.findFirstIn(el.getClassName).isDefined ||
837+
JAVA_CLASS_REGEX.findFirstIn(el.getClassName).isDefined) {
832838
lastSparkMethod = if (el.getMethodName == "<init>") {
833839
// Spark method is a constructor; get its class name
834840
el.getClassName.substring(el.getClassName.lastIndexOf('.') + 1)
@@ -846,7 +852,7 @@ private[spark] object Utils extends Logging {
846852
callStack += el.toString
847853
}
848854
}
849-
val callStackDepth = System.getProperty("spark.callstack.depth", "20").toInt
855+
val callStackDepth = System.getProperty("spark.callstack.depth", "10").toInt
850856
CallSite(
851857
short = "%s at %s:%s".format(lastSparkMethod, firstUserFile, firstUserLine),
852858
long = callStack.take(callStackDepth).mkString("\n"))

streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ import org.apache.spark.streaming.dstream._
3838
import org.apache.spark.streaming.receiver.{ActorSupervisorStrategy, ActorReceiver, Receiver}
3939
import org.apache.spark.streaming.scheduler._
4040
import org.apache.spark.streaming.ui.StreamingTab
41-
import org.apache.spark.util.MetadataCleaner
41+
import org.apache.spark.util.{Utils, MetadataCleaner}
4242

4343
/**
4444
* Main entry point for Spark Streaming functionality. It provides methods used to create
@@ -112,6 +112,7 @@ class StreamingContext private[streaming] (
112112
if (isCheckpointPresent) {
113113
new SparkContext(cp_.sparkConf)
114114
} else {
115+
sc_.setCallSite(Utils.getCallSite.short)
115116
sc_
116117
}
117118
}

streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ import org.apache.spark.storage.StorageLevel
3030
import org.apache.spark.streaming._
3131
import org.apache.spark.streaming.StreamingContext._
3232
import org.apache.spark.streaming.scheduler.Job
33-
import org.apache.spark.util.MetadataCleaner
33+
import org.apache.spark.util.{Utils, MetadataCleaner}
3434

3535
/**
3636
* A Discretized Stream (DStream), the basic abstraction in Spark Streaming, is a continuous
@@ -322,6 +322,7 @@ abstract class DStream[T: ClassTag] (
322322
private[streaming] def generateJob(time: Time): Option[Job] = {
323323
getOrCompute(time) match {
324324
case Some(rdd) => {
325+
//ssc.sc.setJobGroup("g","d")
325326
val jobFunc = () => {
326327
val emptyFunc = { (iterator: Iterator[T]) => {} }
327328
context.sparkContext.runJob(rdd, emptyFunc)

streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,9 +34,12 @@ class ForEachDStream[T: ClassTag] (
3434

3535
override def compute(validTime: Time): Option[RDD[Unit]] = None
3636

37+
//TODO: where to clear up the threadlocal values?
3738
override def generateJob(time: Time): Option[Job] = {
3839
parent.getOrCompute(time) match {
3940
case Some(rdd) =>
41+
parent.ssc.sc.setLocalProperty("spark.job.callSiteShort", rdd.creationSite.short)
42+
parent.ssc.sc.setLocalProperty("spark.job.callSiteLong", rdd.creationSite.long)
4043
val jobFunc = () => {
4144
foreachFunc(rdd, time)
4245
}

streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,11 @@ import org.apache.spark.{SparkException, SparkEnv, Logging}
2222
import org.apache.spark.streaming.{Checkpoint, Time, CheckpointWriter}
2323
import org.apache.spark.streaming.util.{ManualClock, RecurringTimer, Clock}
2424
import scala.util.{Failure, Success, Try}
25+
import org.apache.spark.util.{CallSite, Utils}
2526

2627
/** Event classes for JobGenerator */
2728
private[scheduler] sealed trait JobGeneratorEvent
28-
private[scheduler] case class GenerateJobs(time: Time) extends JobGeneratorEvent
29+
private[scheduler] case class GenerateJobs(time: Time, callSite: CallSite) extends JobGeneratorEvent
2930
private[scheduler] case class ClearMetadata(time: Time) extends JobGeneratorEvent
3031
private[scheduler] case class DoCheckpoint(time: Time) extends JobGeneratorEvent
3132
private[scheduler] case class ClearCheckpointData(time: Time) extends JobGeneratorEvent
@@ -47,8 +48,10 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
4748
Class.forName(clockClass).newInstance().asInstanceOf[Clock]
4849
}
4950

51+
val callSite: CallSite = Utils.getCallSite
52+
5053
private val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds,
51-
longTime => eventActor ! GenerateJobs(new Time(longTime)), "JobGenerator")
54+
longTime => eventActor ! GenerateJobs(new Time(longTime), callSite), "JobGenerator")
5255

5356
// This is marked lazy so that this is initialized after checkpoint duration has been set
5457
// in the context and the generator has been started.
@@ -162,7 +165,7 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
162165
private def processEvent(event: JobGeneratorEvent) {
163166
logDebug("Got event " + event)
164167
event match {
165-
case GenerateJobs(time) => generateJobs(time)
168+
case GenerateJobs(time, callSite) => generateJobs(time, callSite)
166169
case ClearMetadata(time) => clearMetadata(time)
167170
case DoCheckpoint(time) => doCheckpoint(time)
168171
case ClearCheckpointData(time) => clearCheckpointData(time)
@@ -216,7 +219,9 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
216219
}
217220

218221
/** Generate jobs and perform checkpoint for the given `time`. */
219-
private def generateJobs(time: Time) {
222+
private def generateJobs(time: Time, callSite: CallSite) {
223+
ssc.sc.setLocalProperty("spark.job.callSiteShort", callSite.short)
224+
ssc.sc.setLocalProperty("spark.job.callSiteLong", callSite.long)
220225
SparkEnv.set(ssc.env)
221226
Try(graph.generateJobs(time)) match {
222227
case Success(jobs) =>

0 commit comments

Comments
 (0)