From d466d75341bf246bf57341ca6b9ea3ed11679040 Mon Sep 17 00:00:00 2001 From: Mubarak Seyed Date: Thu, 17 Jul 2014 18:15:05 -0700 Subject: [PATCH 01/14] Changes for spark streaming UI --- .../main/scala/org/apache/spark/SparkContext.scala | 3 +++ core/src/main/scala/org/apache/spark/rdd/RDD.scala | 14 ++++++++++++-- .../main/scala/org/apache/spark/util/Utils.scala | 12 +++++++++--- .../apache/spark/streaming/StreamingContext.scala | 3 ++- .../apache/spark/streaming/dstream/DStream.scala | 3 ++- .../spark/streaming/dstream/ForEachDStream.scala | 3 +++ .../spark/streaming/scheduler/JobGenerator.scala | 13 +++++++++---- 7 files changed, 40 insertions(+), 11 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 8052499ab7526..9d027d80e7119 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1296,6 +1296,9 @@ object SparkContext extends Logging { private[spark] val SPARK_UNKNOWN_USER = "" + private[spark] val SPARK_JOB_CALL_SITE_SHORT = "spark.job.callSiteShort" + private[spark] val SPARK_JOB_CALL_SITE_LONG = "spark.job.callSiteLong" + implicit object DoubleAccumulatorParam extends AccumulatorParam[Double] { def addInPlace(t1: Double, t2: Double): Double = t1 + t2 def zero(initialValue: Double) = 0.0 diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index a25f263bea5c1..7672c2b82737e 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -17,7 +17,7 @@ package org.apache.spark.rdd -import java.util.Random +import java.util.{Properties, Random} import scala.collection.{mutable, Map} import scala.collection.mutable.ArrayBuffer @@ -1213,7 +1213,17 @@ abstract class RDD[T: ClassTag]( private var storageLevel: StorageLevel = StorageLevel.NONE /** User code that created this RDD (e.g. `textFile`, `parallelize`). */ - @transient private[spark] val creationSite = Utils.getCallSite + @transient private[spark] val creationSite = { + val short: String = sc.getLocalProperty("spark.job.callSiteShort") + if (short != null) { + CallSite(short, sc.getLocalProperty("spark.job.callSiteLong")) + } else { + val callSite: CallSite = Utils.getCallSite + //sc.setLocalProperty("spark.job.callSiteShort", callSite.short) + //sc.setLocalProperty("spark.job.callSiteLong", callSite.long) + callSite + } + } private[spark] def getCreationSite: String = Option(creationSite).map(_.short).getOrElse("") private[spark] def elementClassTag: ClassTag[T] = classTag[T] diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 10c33d67e7683..fd1aac162f2fc 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -800,7 +800,10 @@ private[spark] object Utils extends Logging { * A regular expression to match classes of the "core" Spark API that we want to skip when * finding the call site of a method. */ - private val SPARK_CLASS_REGEX = """^org\.apache\.spark(\.api\.java)?(\.util)?(\.rdd)?\.[A-Z]""".r + private val SPARK_CLASS_REGEX = """^org\.apache\.spark(\.api\.java)?(\.util)?(\.rdd)?(\.streaming)?(\.streaming\.dstream)?(\.streaming\.scheduler)?\.[A-Z]""".r + private val SCALA_CLASS_REGEX = """^scala(\.util)?(\.collection)?(\.collection\.mutable)?(\.collection\.immutable)?(\.concurrent\.forkjoin)?\.[A-Z]""".r + private val AKKA_CLASS_REGEX = """^akka(\.actor)?(\.dispatch)?\.[A-Z]""".r + private val JAVA_CLASS_REGEX = """^java(\.util\.concurrent)?(\.lang)?\.[A-Z]""".r /** * When called inside a class in the spark package, returns the name of the user code class @@ -828,7 +831,10 @@ private[spark] object Utils extends Logging { for (el <- trace) { if (insideSpark) { - if (SPARK_CLASS_REGEX.findFirstIn(el.getClassName).isDefined) { + if (SPARK_CLASS_REGEX.findFirstIn(el.getClassName).isDefined || + SCALA_CLASS_REGEX.findFirstIn(el.getClassName).isDefined || + AKKA_CLASS_REGEX.findFirstIn(el.getClassName).isDefined || + JAVA_CLASS_REGEX.findFirstIn(el.getClassName).isDefined) { lastSparkMethod = if (el.getMethodName == "") { // Spark method is a constructor; get its class name el.getClassName.substring(el.getClassName.lastIndexOf('.') + 1) @@ -846,7 +852,7 @@ private[spark] object Utils extends Logging { callStack += el.toString } } - val callStackDepth = System.getProperty("spark.callstack.depth", "20").toInt + val callStackDepth = System.getProperty("spark.callstack.depth", "10").toInt CallSite( short = "%s at %s:%s".format(lastSparkMethod, firstUserFile, firstUserLine), long = callStack.take(callStackDepth).mkString("\n")) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index e0677b795cb94..a4028c016a904 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -38,7 +38,7 @@ import org.apache.spark.streaming.dstream._ import org.apache.spark.streaming.receiver.{ActorSupervisorStrategy, ActorReceiver, Receiver} import org.apache.spark.streaming.scheduler._ import org.apache.spark.streaming.ui.StreamingTab -import org.apache.spark.util.MetadataCleaner +import org.apache.spark.util.{Utils, MetadataCleaner} /** * Main entry point for Spark Streaming functionality. It provides methods used to create @@ -112,6 +112,7 @@ class StreamingContext private[streaming] ( if (isCheckpointPresent) { new SparkContext(cp_.sparkConf) } else { + sc_.setCallSite(Utils.getCallSite.short) sc_ } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala index e05db236addca..51575219b0fe9 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala @@ -30,7 +30,7 @@ import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming._ import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.streaming.scheduler.Job -import org.apache.spark.util.MetadataCleaner +import org.apache.spark.util.{Utils, MetadataCleaner} /** * A Discretized Stream (DStream), the basic abstraction in Spark Streaming, is a continuous @@ -322,6 +322,7 @@ abstract class DStream[T: ClassTag] ( private[streaming] def generateJob(time: Time): Option[Job] = { getOrCompute(time) match { case Some(rdd) => { + //ssc.sc.setJobGroup("g","d") val jobFunc = () => { val emptyFunc = { (iterator: Iterator[T]) => {} } context.sparkContext.runJob(rdd, emptyFunc) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala index 905bc723f69a9..67c3b67d8ac3d 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala @@ -34,9 +34,12 @@ class ForEachDStream[T: ClassTag] ( override def compute(validTime: Time): Option[RDD[Unit]] = None + //TODO: where to clear up the threadlocal values? override def generateJob(time: Time): Option[Job] = { parent.getOrCompute(time) match { case Some(rdd) => + parent.ssc.sc.setLocalProperty("spark.job.callSiteShort", rdd.creationSite.short) + parent.ssc.sc.setLocalProperty("spark.job.callSiteLong", rdd.creationSite.long) val jobFunc = () => { foreachFunc(rdd, time) } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala index 374848358e700..2897be44ff96b 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala @@ -22,10 +22,11 @@ import org.apache.spark.{SparkException, SparkEnv, Logging} import org.apache.spark.streaming.{Checkpoint, Time, CheckpointWriter} import org.apache.spark.streaming.util.{ManualClock, RecurringTimer, Clock} import scala.util.{Failure, Success, Try} +import org.apache.spark.util.{CallSite, Utils} /** Event classes for JobGenerator */ private[scheduler] sealed trait JobGeneratorEvent -private[scheduler] case class GenerateJobs(time: Time) extends JobGeneratorEvent +private[scheduler] case class GenerateJobs(time: Time, callSite: CallSite) extends JobGeneratorEvent private[scheduler] case class ClearMetadata(time: Time) extends JobGeneratorEvent private[scheduler] case class DoCheckpoint(time: Time) extends JobGeneratorEvent private[scheduler] case class ClearCheckpointData(time: Time) extends JobGeneratorEvent @@ -47,8 +48,10 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging { Class.forName(clockClass).newInstance().asInstanceOf[Clock] } + val callSite: CallSite = Utils.getCallSite + private val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds, - longTime => eventActor ! GenerateJobs(new Time(longTime)), "JobGenerator") + longTime => eventActor ! GenerateJobs(new Time(longTime), callSite), "JobGenerator") // This is marked lazy so that this is initialized after checkpoint duration has been set // in the context and the generator has been started. @@ -162,7 +165,7 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging { private def processEvent(event: JobGeneratorEvent) { logDebug("Got event " + event) event match { - case GenerateJobs(time) => generateJobs(time) + case GenerateJobs(time, callSite) => generateJobs(time, callSite) case ClearMetadata(time) => clearMetadata(time) case DoCheckpoint(time) => doCheckpoint(time) case ClearCheckpointData(time) => clearCheckpointData(time) @@ -216,7 +219,9 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging { } /** Generate jobs and perform checkpoint for the given `time`. */ - private def generateJobs(time: Time) { + private def generateJobs(time: Time, callSite: CallSite) { + ssc.sc.setLocalProperty("spark.job.callSiteShort", callSite.short) + ssc.sc.setLocalProperty("spark.job.callSiteLong", callSite.long) SparkEnv.set(ssc.env) Try(graph.generateJobs(time)) match { case Success(jobs) => From 9d38d3cecedc37775cb5bd70c661c3acd2dd7f43 Mon Sep 17 00:00:00 2001 From: Mubarak Seyed Date: Thu, 17 Jul 2014 18:30:46 -0700 Subject: [PATCH 02/14] [SPARK-1853] Show Streaming application code context (file, line number) in Spark Stages UI --- core/src/main/scala/org/apache/spark/SparkContext.scala | 3 --- core/src/main/scala/org/apache/spark/util/Utils.scala | 2 +- .../scala/org/apache/spark/streaming/dstream/DStream.scala | 1 - 3 files changed, 1 insertion(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 9d027d80e7119..8052499ab7526 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1296,9 +1296,6 @@ object SparkContext extends Logging { private[spark] val SPARK_UNKNOWN_USER = "" - private[spark] val SPARK_JOB_CALL_SITE_SHORT = "spark.job.callSiteShort" - private[spark] val SPARK_JOB_CALL_SITE_LONG = "spark.job.callSiteLong" - implicit object DoubleAccumulatorParam extends AccumulatorParam[Double] { def addInPlace(t1: Double, t2: Double): Double = t1 + t2 def zero(initialValue: Double) = 0.0 diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index fd1aac162f2fc..0d86206e2e843 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -852,7 +852,7 @@ private[spark] object Utils extends Logging { callStack += el.toString } } - val callStackDepth = System.getProperty("spark.callstack.depth", "10").toInt + val callStackDepth = System.getProperty("spark.callstack.depth", "20").toInt CallSite( short = "%s at %s:%s".format(lastSparkMethod, firstUserFile, firstUserLine), long = callStack.take(callStackDepth).mkString("\n")) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala index 51575219b0fe9..e93b1065904b4 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala @@ -322,7 +322,6 @@ abstract class DStream[T: ClassTag] ( private[streaming] def generateJob(time: Time): Option[Job] = { getOrCompute(time) match { case Some(rdd) => { - //ssc.sc.setJobGroup("g","d") val jobFunc = () => { val emptyFunc = { (iterator: Iterator[T]) => {} } context.sparkContext.runJob(rdd, emptyFunc) From 1500deb44c7c484fae725144c288240b5b477fdb Mon Sep 17 00:00:00 2001 From: Mubarak Seyed Date: Fri, 18 Jul 2014 08:11:58 -0700 Subject: [PATCH 03/14] Changes in Spark Streaming UI --- .../main/scala/org/apache/spark/rdd/RDD.scala | 13 +++++------- .../scala/org/apache/spark/util/Utils.scala | 5 ++++- .../spark/streaming/dstream/DStream.scala | 20 ++++++++++++++++++- .../streaming/dstream/FileInputDStream.scala | 3 +++ .../streaming/dstream/FilteredDStream.scala | 7 ++++++- .../dstream/FlatMapValuedDStream.scala | 7 ++++++- .../streaming/dstream/FlatMappedDStream.scala | 7 ++++++- .../streaming/dstream/ForEachDStream.scala | 5 ++--- .../streaming/dstream/GlommedDStream.scala | 7 ++++++- .../dstream/MapPartitionedDStream.scala | 7 ++++++- .../streaming/dstream/MapValuedDStream.scala | 7 ++++++- .../streaming/dstream/MappedDStream.scala | 7 ++++++- .../streaming/dstream/QueueInputDStream.scala | 4 ++++ .../dstream/ReceiverInputDStream.scala | 6 ++++++ .../dstream/ReducedWindowedDStream.scala | 3 +++ .../streaming/dstream/ShuffledDStream.scala | 7 ++++++- .../streaming/dstream/StateDStream.scala | 4 +++- .../dstream/TransformedDStream.scala | 7 ++++++- .../streaming/dstream/UnionDStream.scala | 4 ++++ .../streaming/dstream/WindowedDStream.scala | 4 ++++ .../streaming/scheduler/JobGenerator.scala | 12 ++++------- 21 files changed, 115 insertions(+), 31 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 7672c2b82737e..9be15f1eaf644 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -40,7 +40,7 @@ import org.apache.spark.partial.CountEvaluator import org.apache.spark.partial.GroupedCountEvaluator import org.apache.spark.partial.PartialResult import org.apache.spark.storage.StorageLevel -import org.apache.spark.util.{BoundedPriorityQueue, CallSite, Utils} +import org.apache.spark.util.{Utils, BoundedPriorityQueue, CallSite} import org.apache.spark.util.collection.OpenHashMap import org.apache.spark.util.random.{BernoulliSampler, PoissonSampler, SamplingUtils} @@ -124,7 +124,7 @@ abstract class RDD[T: ClassTag]( val id: Int = sc.newRddId() /** A friendly name for this RDD */ - @transient var name: String = null + @transient var name: String = sc.getLocalProperty("rddName") /** Assign a name to this RDD */ def setName(_name: String): this.type = { @@ -1214,14 +1214,11 @@ abstract class RDD[T: ClassTag]( /** User code that created this RDD (e.g. `textFile`, `parallelize`). */ @transient private[spark] val creationSite = { - val short: String = sc.getLocalProperty("spark.job.callSiteShort") + val short: String = sc.getLocalProperty(name + Utils.CALL_SITE_SHORT) if (short != null) { - CallSite(short, sc.getLocalProperty("spark.job.callSiteLong")) + CallSite(short, sc.getLocalProperty(name + Utils.CALL_SITE_LONG)) } else { - val callSite: CallSite = Utils.getCallSite - //sc.setLocalProperty("spark.job.callSiteShort", callSite.short) - //sc.setLocalProperty("spark.job.callSiteLong", callSite.long) - callSite + Utils.getCallSite } } private[spark] def getCreationSite: String = Option(creationSite).map(_.short).getOrElse("") diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 0d86206e2e843..1ef662f421558 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -52,6 +52,9 @@ private[spark] case class CallSite(val short: String, val long: String) private[spark] object Utils extends Logging { val random = new Random() + private[spark] val CALL_SITE_SHORT: String = ".callSite.short" + private[spark] val CALL_SITE_LONG: String = ".callSite.long" + def sparkBin(sparkHome: String, which: String): File = { val suffix = if (isWindows) ".cmd" else "" new File(sparkHome + File.separator + "bin", which + suffix) @@ -800,7 +803,7 @@ private[spark] object Utils extends Logging { * A regular expression to match classes of the "core" Spark API that we want to skip when * finding the call site of a method. */ - private val SPARK_CLASS_REGEX = """^org\.apache\.spark(\.api\.java)?(\.util)?(\.rdd)?(\.streaming)?(\.streaming\.dstream)?(\.streaming\.scheduler)?\.[A-Z]""".r + private val SPARK_CLASS_REGEX = """^org\.apache\.spark(\.api\.java)?(\.util)?(\.rdd)?(\.streaming)?(\.streaming\.dstream)?(\.streaming\.scheduler)?(\.streaming\.twitter)?(\.streaming\.kafka)?(\.streaming\.flume)?(\.streaming\.mqtt)?(\.streaming\.zeromq)?\.[A-Z]""".r private val SCALA_CLASS_REGEX = """^scala(\.util)?(\.collection)?(\.collection\.mutable)?(\.collection\.immutable)?(\.concurrent\.forkjoin)?\.[A-Z]""".r private val AKKA_CLASS_REGEX = """^akka(\.actor)?(\.dispatch)?\.[A-Z]""".r private val JAVA_CLASS_REGEX = """^java(\.util\.concurrent)?(\.lang)?\.[A-Z]""".r diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala index e93b1065904b4..ada01cae93f0b 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala @@ -30,7 +30,7 @@ import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming._ import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.streaming.scheduler.Job -import org.apache.spark.util.{Utils, MetadataCleaner} +import org.apache.spark.util.{CallSite, Utils, MetadataCleaner} /** * A Discretized Stream (DStream), the basic abstraction in Spark Streaming, is a continuous @@ -106,6 +106,24 @@ abstract class DStream[T: ClassTag] ( /** Return the StreamingContext associated with this DStream */ def context = ssc + private[streaming] val RDD_NAME: String = "rddName"; + + @transient var name: String = null + + /** Assign a name to this DStream */ + def setName(_name: String) = { + name = _name + } + + /* Find the creation callSite */ + val creationSite = Utils.getCallSite + + /* Store the creation callSite in threadlocal */ + private[streaming] def setCallSite = { + ssc.sparkContext.setLocalProperty(name + Utils.CALL_SITE_SHORT, creationSite.short) + ssc.sparkContext.setLocalProperty(name + Utils.CALL_SITE_LONG, creationSite.long) + } + /** Persist the RDDs of this DStream with the given storage level */ def persist(level: StorageLevel): DStream[T] = { if (this.isInitialized) { diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala index 9eecbfaef363f..dc0e30a5999eb 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala @@ -57,6 +57,7 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas override def start() { } override def stop() { } + setName("UnionRDD") /** * Finds the files that were modified since the last time this method was called and makes @@ -71,6 +72,8 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas assert(validTime.milliseconds >= ignoreTime, "Trying to get new files for a really old time [" + validTime + " < " + ignoreTime + "]") + setCallSite + ssc.sparkContext.setLocalProperty(RDD_NAME, name) // Find new files val (newFiles, minNewFileModTime) = findNewFiles(validTime.milliseconds) logInfo("New files at time " + validTime + ":\n" + newFiles.mkString("\n")) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FilteredDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FilteredDStream.scala index c81534ae584ea..1ed7186044517 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FilteredDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FilteredDStream.scala @@ -27,12 +27,17 @@ class FilteredDStream[T: ClassTag]( filterFunc: T => Boolean ) extends DStream[T](parent.ssc) { + setName("FilteredRDD") + override def dependencies = List(parent) override def slideDuration: Duration = parent.slideDuration override def compute(validTime: Time): Option[RDD[T]] = { - parent.getOrCompute(validTime).map(_.filter(filterFunc)) + setCallSite + val rdd: Option[RDD[T]] = parent.getOrCompute(validTime).map(_.filter(filterFunc)) + ssc.sparkContext.setLocalProperty(RDD_NAME, name) + return rdd } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMapValuedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMapValuedDStream.scala index 658623455498c..1e2e3053a5bc3 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMapValuedDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMapValuedDStream.scala @@ -28,11 +28,16 @@ class FlatMapValuedDStream[K: ClassTag, V: ClassTag, U: ClassTag]( flatMapValueFunc: V => TraversableOnce[U] ) extends DStream[(K, U)](parent.ssc) { + setName("FlatMappedValuesRDD") + override def dependencies = List(parent) override def slideDuration: Duration = parent.slideDuration override def compute(validTime: Time): Option[RDD[(K, U)]] = { - parent.getOrCompute(validTime).map(_.flatMapValues[U](flatMapValueFunc)) + setCallSite + val rdd: Option[RDD[(K, U)]] = parent.getOrCompute(validTime).map(_.flatMapValues[U](flatMapValueFunc)) + ssc.sparkContext.setLocalProperty(RDD_NAME, name) + return rdd } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMappedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMappedDStream.scala index c7bb2833eabb8..e3223ccf1be6c 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMappedDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMappedDStream.scala @@ -27,12 +27,17 @@ class FlatMappedDStream[T: ClassTag, U: ClassTag]( flatMapFunc: T => Traversable[U] ) extends DStream[U](parent.ssc) { + setName("FlatMappedRDD") + override def dependencies = List(parent) override def slideDuration: Duration = parent.slideDuration override def compute(validTime: Time): Option[RDD[U]] = { - parent.getOrCompute(validTime).map(_.flatMap(flatMapFunc)) + setCallSite + val rdd: Option[RDD[U]] = parent.getOrCompute(validTime).map(_.flatMap(flatMapFunc)) + ssc.sparkContext.setLocalProperty(RDD_NAME, name) + return rdd } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala index 67c3b67d8ac3d..5d4c9b153b18a 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala @@ -34,12 +34,11 @@ class ForEachDStream[T: ClassTag] ( override def compute(validTime: Time): Option[RDD[Unit]] = None - //TODO: where to clear up the threadlocal values? override def generateJob(time: Time): Option[Job] = { parent.getOrCompute(time) match { case Some(rdd) => - parent.ssc.sc.setLocalProperty("spark.job.callSiteShort", rdd.creationSite.short) - parent.ssc.sc.setLocalProperty("spark.job.callSiteLong", rdd.creationSite.long) + //parent.ssc.sc.setLocalProperty("spark.job.callSiteShort", rdd.creationSite.short) + //parent.ssc.sc.setLocalProperty("spark.job.callSiteLong", rdd.creationSite.long) val jobFunc = () => { foreachFunc(rdd, time) } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/GlommedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/GlommedDStream.scala index a9bb51f054048..215626f9416c7 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/GlommedDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/GlommedDStream.scala @@ -25,11 +25,16 @@ private[streaming] class GlommedDStream[T: ClassTag](parent: DStream[T]) extends DStream[Array[T]](parent.ssc) { + setName("GlommedRDD") + override def dependencies = List(parent) override def slideDuration: Duration = parent.slideDuration override def compute(validTime: Time): Option[RDD[Array[T]]] = { - parent.getOrCompute(validTime).map(_.glom()) + setCallSite + val rdd: Option[RDD[Array[T]]] = parent.getOrCompute(validTime).map(_.glom()) + ssc.sparkContext.setLocalProperty(RDD_NAME, name) + return rdd } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapPartitionedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapPartitionedDStream.scala index 3d8ee29df1e82..d179bfc0a0974 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapPartitionedDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapPartitionedDStream.scala @@ -28,12 +28,17 @@ class MapPartitionedDStream[T: ClassTag, U: ClassTag]( preservePartitioning: Boolean ) extends DStream[U](parent.ssc) { + setName("MapPartitionsRDD") + override def dependencies = List(parent) override def slideDuration: Duration = parent.slideDuration override def compute(validTime: Time): Option[RDD[U]] = { - parent.getOrCompute(validTime).map(_.mapPartitions[U](mapPartFunc, preservePartitioning)) + setCallSite + val rdd: Option[RDD[U]] = parent.getOrCompute(validTime).map(_.mapPartitions[U](mapPartFunc, preservePartitioning)) + ssc.sparkContext.setLocalProperty(RDD_NAME, name) + return rdd } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapValuedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapValuedDStream.scala index 7aea1f945d9db..6431dd0424a9b 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapValuedDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapValuedDStream.scala @@ -28,12 +28,17 @@ class MapValuedDStream[K: ClassTag, V: ClassTag, U: ClassTag]( mapValueFunc: V => U ) extends DStream[(K, U)](parent.ssc) { + setName("MappedValuesRDD") + override def dependencies = List(parent) override def slideDuration: Duration = parent.slideDuration override def compute(validTime: Time): Option[RDD[(K, U)]] = { - parent.getOrCompute(validTime).map(_.mapValues[U](mapValueFunc)) + setCallSite + val rdd: Option[RDD[(K, U)]] = parent.getOrCompute(validTime).map(_.mapValues[U](mapValueFunc)) + ssc.sparkContext.setLocalProperty(RDD_NAME, name) + return rdd } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/MappedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/MappedDStream.scala index 02704a8d1c2e0..55e9fad3e98eb 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/MappedDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/MappedDStream.scala @@ -27,12 +27,17 @@ class MappedDStream[T: ClassTag, U: ClassTag] ( mapFunc: T => U ) extends DStream[U](parent.ssc) { + setName("MappedRDD") + override def dependencies = List(parent) override def slideDuration: Duration = parent.slideDuration override def compute(validTime: Time): Option[RDD[U]] = { - parent.getOrCompute(validTime).map(_.map[U](mapFunc)) + setCallSite + val rdd: Option[RDD[U]] = parent.getOrCompute(validTime).map(_.map[U](mapFunc)) + ssc.sparkContext.setLocalProperty(RDD_NAME, name) + return rdd } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala index ed7da6dc1315e..667c68fc3727c 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala @@ -32,11 +32,15 @@ class QueueInputDStream[T: ClassTag]( defaultRDD: RDD[T] ) extends InputDStream[T](ssc) { + setName("UnionRDD") + override def start() { } override def stop() { } override def compute(validTime: Time): Option[RDD[T]] = { + setCallSite + ssc.sparkContext.setLocalProperty(RDD_NAME, name) val buffer = new ArrayBuffer[RDD[T]]() if (oneAtATime && queue.size > 0) { buffer += queue.dequeue() diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala index 391e40924f38a..8954e0e4506f3 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala @@ -25,6 +25,7 @@ import org.apache.spark.storage.BlockId import org.apache.spark.streaming._ import org.apache.spark.streaming.receiver.Receiver import org.apache.spark.streaming.scheduler.ReceivedBlockInfo +import org.apache.spark.util.{Utils, CallSite} /** * Abstract class for defining any [[org.apache.spark.streaming.dstream.InputDStream]] @@ -45,6 +46,8 @@ abstract class ReceiverInputDStream[T: ClassTag](@transient ssc_ : StreamingCont /** This is an unique identifier for the network input stream. */ val id = ssc.getNewReceiverStreamId() + setName("BlockRDD") + /** * Gets the receiver object that will be sent to the worker nodes * to receive data. This method needs to defined by any specific implementation @@ -59,6 +62,8 @@ abstract class ReceiverInputDStream[T: ClassTag](@transient ssc_ : StreamingCont /** Ask ReceiverInputTracker for received data blocks and generates RDDs with them. */ override def compute(validTime: Time): Option[RDD[T]] = { + setCallSite + ssc.sparkContext.setLocalProperty(RDD_NAME, name) // If this is called for any time before the start time of the context, // then this returns an empty RDD. This may happen when recovering from a // master failure @@ -70,6 +75,7 @@ abstract class ReceiverInputDStream[T: ClassTag](@transient ssc_ : StreamingCont } else { Some(new BlockRDD[T](ssc.sc, Array[BlockId]())) } + } /** Get information on received blocks. */ diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala index 40da31318942e..4efac61b98f23 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala @@ -52,6 +52,7 @@ class ReducedWindowedDStream[K: ClassTag, V: ClassTag]( "must be multiple of the slide duration of parent DStream (" + parent.slideDuration + ")" ) + setName("MappedValuesRDD") // Reduce each batch of data using reduceByKey which will be further reduced by window // by ReducedWindowedDStream val reducedStream = parent.reduceByKey(reduceFunc, partitioner) @@ -83,6 +84,8 @@ class ReducedWindowedDStream[K: ClassTag, V: ClassTag]( } override def compute(validTime: Time): Option[RDD[(K, V)]] = { + setCallSite + ssc.sparkContext.setLocalProperty(RDD_NAME, name) val reduceF = reduceFunc val invReduceF = invReduceFunc diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ShuffledDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ShuffledDStream.scala index 880a89bc36895..777a87ef20978 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ShuffledDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ShuffledDStream.scala @@ -33,15 +33,20 @@ class ShuffledDStream[K: ClassTag, V: ClassTag, C: ClassTag]( mapSideCombine: Boolean = true ) extends DStream[(K,C)] (parent.ssc) { + setName("ShuffledRDD") + override def dependencies = List(parent) override def slideDuration: Duration = parent.slideDuration override def compute(validTime: Time): Option[RDD[(K,C)]] = { - parent.getOrCompute(validTime) match { + setCallSite + val rdd: Option[RDD[(K,C)]] = parent.getOrCompute(validTime) match { case Some(rdd) => Some(rdd.combineByKey[C]( createCombiner, mergeValue, mergeCombiner, partitioner, mapSideCombine)) case None => None } + ssc.sparkContext.setLocalProperty(RDD_NAME, name) + return rdd } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala index 7e22268767de7..a7dd7d81b4e85 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala @@ -34,7 +34,7 @@ class StateDStream[K: ClassTag, V: ClassTag, S: ClassTag]( ) extends DStream[(K, S)](parent.ssc) { super.persist(StorageLevel.MEMORY_ONLY_SER) - + setName("MapPartitionsRDD") override def dependencies = List(parent) override def slideDuration: Duration = parent.slideDuration @@ -42,6 +42,8 @@ class StateDStream[K: ClassTag, V: ClassTag, S: ClassTag]( override val mustCheckpoint = true override def compute(validTime: Time): Option[RDD[(K, S)]] = { + setCallSite + ssc.sparkContext.setLocalProperty(RDD_NAME, name) // Try to get the previous state RDD getOrCompute(validTime - slideDuration) match { diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala index 7cd4554282ca1..9bbf9a24deaba 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala @@ -32,12 +32,17 @@ class TransformedDStream[U: ClassTag] ( require(parents.map(_.slideDuration).distinct.size == 1, "Some of the DStreams have different slide durations") + setName("TransformedRDD") + override def dependencies = parents.toList override def slideDuration: Duration = parents.head.slideDuration override def compute(validTime: Time): Option[RDD[U]] = { + setCallSite val parentRDDs = parents.map(_.getOrCompute(validTime).orNull).toSeq - Some(transformFunc(parentRDDs, validTime)) + val rdd: Option[RDD[U]] = Some(transformFunc(parentRDDs, validTime)) + ssc.sparkContext.setLocalProperty(RDD_NAME, name) + return rdd } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/UnionDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/UnionDStream.scala index 57429a15329a1..12d3c5bdd3dea 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/UnionDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/UnionDStream.scala @@ -40,11 +40,15 @@ class UnionDStream[T: ClassTag](parents: Array[DStream[T]]) throw new IllegalArgumentException("Array of parents have different slide times") } + setName("UnionRDD") + override def dependencies = parents.toList override def slideDuration: Duration = parents.head.slideDuration override def compute(validTime: Time): Option[RDD[T]] = { + setCallSite + ssc.sparkContext.setLocalProperty(RDD_NAME, name) val rdds = new ArrayBuffer[RDD[T]]() parents.map(_.getOrCompute(validTime)).foreach(_ match { case Some(rdd) => rdds += rdd diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala index 775b6bfd065c0..ff715306316f0 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala @@ -44,6 +44,8 @@ class WindowedDStream[T: ClassTag]( // Persist parent level by default, as those RDDs are going to be obviously reused. parent.persist(StorageLevel.MEMORY_ONLY_SER) + setName("PartitionerAwareUnionRDD") + def windowDuration: Duration = _windowDuration override def dependencies = List(parent) @@ -61,6 +63,8 @@ class WindowedDStream[T: ClassTag]( } override def compute(validTime: Time): Option[RDD[T]] = { + setCallSite + ssc.sparkContext.setLocalProperty(RDD_NAME, name) val currentWindow = new Interval(validTime - windowDuration + parent.slideDuration, validTime) val rddsInWindow = parent.slice(currentWindow) val windowRDD = if (rddsInWindow.flatMap(_.partitioner).distinct.length == 1) { diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala index 2897be44ff96b..35706be90c378 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala @@ -26,7 +26,7 @@ import org.apache.spark.util.{CallSite, Utils} /** Event classes for JobGenerator */ private[scheduler] sealed trait JobGeneratorEvent -private[scheduler] case class GenerateJobs(time: Time, callSite: CallSite) extends JobGeneratorEvent +private[scheduler] case class GenerateJobs(time: Time) extends JobGeneratorEvent private[scheduler] case class ClearMetadata(time: Time) extends JobGeneratorEvent private[scheduler] case class DoCheckpoint(time: Time) extends JobGeneratorEvent private[scheduler] case class ClearCheckpointData(time: Time) extends JobGeneratorEvent @@ -48,10 +48,8 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging { Class.forName(clockClass).newInstance().asInstanceOf[Clock] } - val callSite: CallSite = Utils.getCallSite - private val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds, - longTime => eventActor ! GenerateJobs(new Time(longTime), callSite), "JobGenerator") + longTime => eventActor ! GenerateJobs(new Time(longTime)), "JobGenerator") // This is marked lazy so that this is initialized after checkpoint duration has been set // in the context and the generator has been started. @@ -165,7 +163,7 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging { private def processEvent(event: JobGeneratorEvent) { logDebug("Got event " + event) event match { - case GenerateJobs(time, callSite) => generateJobs(time, callSite) + case GenerateJobs(time) => generateJobs(time) case ClearMetadata(time) => clearMetadata(time) case DoCheckpoint(time) => doCheckpoint(time) case ClearCheckpointData(time) => clearCheckpointData(time) @@ -219,9 +217,7 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging { } /** Generate jobs and perform checkpoint for the given `time`. */ - private def generateJobs(time: Time, callSite: CallSite) { - ssc.sc.setLocalProperty("spark.job.callSiteShort", callSite.short) - ssc.sc.setLocalProperty("spark.job.callSiteLong", callSite.long) + private def generateJobs(time: Time) { SparkEnv.set(ssc.env) Try(graph.generateJobs(time)) match { case Success(jobs) => From 70f494fc01d112d81371d77d6d54d10a9b54eeca Mon Sep 17 00:00:00 2001 From: Mubarak Seyed Date: Sun, 10 Aug 2014 04:11:26 -0700 Subject: [PATCH 04/14] Changes for SPARK-1853 --- .../main/scala/org/apache/spark/rdd/RDD.scala | 4 +-- .../scala/org/apache/spark/util/Utils.scala | 2 +- .../spark/streaming/StreamingContext.scala | 1 - .../spark/streaming/dstream/DStream.scala | 27 ++++++++++--------- .../streaming/dstream/FileInputDStream.scala | 7 +++-- .../streaming/dstream/FilteredDStream.scala | 7 +++-- .../dstream/FlatMapValuedDStream.scala | 7 +++-- .../streaming/dstream/FlatMappedDStream.scala | 7 +++-- .../streaming/dstream/ForEachDStream.scala | 4 +-- .../streaming/dstream/GlommedDStream.scala | 7 +++-- .../dstream/MapPartitionedDStream.scala | 7 +++-- .../streaming/dstream/MapValuedDStream.scala | 7 +++-- .../streaming/dstream/MappedDStream.scala | 8 +++--- .../streaming/dstream/QueueInputDStream.scala | 7 +++-- .../dstream/ReceiverInputDStream.scala | 15 +++++------ .../dstream/ReducedWindowedDStream.scala | 15 ++++++----- .../streaming/dstream/ShuffledDStream.scala | 7 +++-- .../streaming/dstream/StateDStream.scala | 9 ++++--- .../dstream/TransformedDStream.scala | 7 +++-- .../streaming/dstream/UnionDStream.scala | 8 +++--- .../streaming/dstream/WindowedDStream.scala | 7 +++-- 21 files changed, 82 insertions(+), 88 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 9be15f1eaf644..b88934bc1f7d6 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -1214,9 +1214,9 @@ abstract class RDD[T: ClassTag]( /** User code that created this RDD (e.g. `textFile`, `parallelize`). */ @transient private[spark] val creationSite = { - val short: String = sc.getLocalProperty(name + Utils.CALL_SITE_SHORT) + val short: String = sc.getLocalProperty(Utils.CALL_SITE_SHORT) if (short != null) { - CallSite(short, sc.getLocalProperty(name + Utils.CALL_SITE_LONG)) + CallSite(short, sc.getLocalProperty(Utils.CALL_SITE_LONG)) } else { Utils.getCallSite } diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 1ef662f421558..bd39c10b50e9d 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -838,7 +838,7 @@ private[spark] object Utils extends Logging { SCALA_CLASS_REGEX.findFirstIn(el.getClassName).isDefined || AKKA_CLASS_REGEX.findFirstIn(el.getClassName).isDefined || JAVA_CLASS_REGEX.findFirstIn(el.getClassName).isDefined) { - lastSparkMethod = if (el.getMethodName == "") { + lastSparkMethod = if (el.getMethodName == "") { // Spark method is a constructor; get its class name el.getClassName.substring(el.getClassName.lastIndexOf('.') + 1) } else { diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index a4028c016a904..b287662ff1a7a 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -112,7 +112,6 @@ class StreamingContext private[streaming] ( if (isCheckpointPresent) { new SparkContext(cp_.sparkConf) } else { - sc_.setCallSite(Utils.getCallSite.short) sc_ } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala index ada01cae93f0b..16da46a15de15 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala @@ -106,22 +106,25 @@ abstract class DStream[T: ClassTag] ( /** Return the StreamingContext associated with this DStream */ def context = ssc - private[streaming] val RDD_NAME: String = "rddName"; - - @transient var name: String = null - - /** Assign a name to this DStream */ - def setName(_name: String) = { - name = _name - } - /* Find the creation callSite */ val creationSite = Utils.getCallSite /* Store the creation callSite in threadlocal */ - private[streaming] def setCallSite = { - ssc.sparkContext.setLocalProperty(name + Utils.CALL_SITE_SHORT, creationSite.short) - ssc.sparkContext.setLocalProperty(name + Utils.CALL_SITE_LONG, creationSite.long) + private[streaming] def setCreationCallSite() = { + ssc.sparkContext.setLocalProperty(Utils.CALL_SITE_SHORT, creationSite.short) + ssc.sparkContext.setLocalProperty(Utils.CALL_SITE_LONG, creationSite.long) + } + + /* Store the supplied callSite in threadlocal */ + private[streaming] def setCallSite(callSite: CallSite) = { + ssc.sparkContext.setLocalProperty(Utils.CALL_SITE_SHORT, callSite.short) + ssc.sparkContext.setLocalProperty(Utils.CALL_SITE_LONG, callSite.long) + } + + /* Return the current callSite */ + private[streaming] def getCallSite() = { + CallSite(ssc.sparkContext.getLocalProperty(Utils.CALL_SITE_SHORT), + ssc.sparkContext.getLocalProperty(Utils.CALL_SITE_LONG)) } /** Persist the RDDs of this DStream with the given storage level */ diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala index dc0e30a5999eb..6dcb07e679ccb 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala @@ -57,7 +57,6 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas override def start() { } override def stop() { } - setName("UnionRDD") /** * Finds the files that were modified since the last time this method was called and makes @@ -71,9 +70,8 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas override def compute(validTime: Time): Option[RDD[(K, V)]] = { assert(validTime.milliseconds >= ignoreTime, "Trying to get new files for a really old time [" + validTime + " < " + ignoreTime + "]") - - setCallSite - ssc.sparkContext.setLocalProperty(RDD_NAME, name) + val prevCallSite = getCallSite + setCreationCallSite // Find new files val (newFiles, minNewFileModTime) = findNewFiles(validTime.milliseconds) logInfo("New files at time " + validTime + ":\n" + newFiles.mkString("\n")) @@ -83,6 +81,7 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas ignoreTime = minNewFileModTime } files += ((validTime, newFiles.toArray)) + setCallSite(prevCallSite) Some(filesToRDD(newFiles)) } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FilteredDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FilteredDStream.scala index 1ed7186044517..c49f620533d85 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FilteredDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FilteredDStream.scala @@ -27,16 +27,15 @@ class FilteredDStream[T: ClassTag]( filterFunc: T => Boolean ) extends DStream[T](parent.ssc) { - setName("FilteredRDD") - override def dependencies = List(parent) override def slideDuration: Duration = parent.slideDuration override def compute(validTime: Time): Option[RDD[T]] = { - setCallSite + val prevCallSite = getCallSite + setCreationCallSite val rdd: Option[RDD[T]] = parent.getOrCompute(validTime).map(_.filter(filterFunc)) - ssc.sparkContext.setLocalProperty(RDD_NAME, name) + setCallSite(prevCallSite) return rdd } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMapValuedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMapValuedDStream.scala index 1e2e3053a5bc3..ef9cead0e9bcb 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMapValuedDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMapValuedDStream.scala @@ -28,16 +28,15 @@ class FlatMapValuedDStream[K: ClassTag, V: ClassTag, U: ClassTag]( flatMapValueFunc: V => TraversableOnce[U] ) extends DStream[(K, U)](parent.ssc) { - setName("FlatMappedValuesRDD") - override def dependencies = List(parent) override def slideDuration: Duration = parent.slideDuration override def compute(validTime: Time): Option[RDD[(K, U)]] = { - setCallSite + val prevCallSite = getCallSite + setCreationCallSite val rdd: Option[RDD[(K, U)]] = parent.getOrCompute(validTime).map(_.flatMapValues[U](flatMapValueFunc)) - ssc.sparkContext.setLocalProperty(RDD_NAME, name) + setCallSite(prevCallSite) return rdd } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMappedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMappedDStream.scala index e3223ccf1be6c..898cf82f5a3ab 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMappedDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMappedDStream.scala @@ -27,16 +27,15 @@ class FlatMappedDStream[T: ClassTag, U: ClassTag]( flatMapFunc: T => Traversable[U] ) extends DStream[U](parent.ssc) { - setName("FlatMappedRDD") - override def dependencies = List(parent) override def slideDuration: Duration = parent.slideDuration override def compute(validTime: Time): Option[RDD[U]] = { - setCallSite + val prevCallSite = getCallSite + setCreationCallSite val rdd: Option[RDD[U]] = parent.getOrCompute(validTime).map(_.flatMap(flatMapFunc)) - ssc.sparkContext.setLocalProperty(RDD_NAME, name) + setCallSite(prevCallSite) return rdd } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala index 5d4c9b153b18a..d696d57fac52f 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala @@ -35,10 +35,8 @@ class ForEachDStream[T: ClassTag] ( override def compute(validTime: Time): Option[RDD[Unit]] = None override def generateJob(time: Time): Option[Job] = { - parent.getOrCompute(time) match { + return parent.getOrCompute(time) match { case Some(rdd) => - //parent.ssc.sc.setLocalProperty("spark.job.callSiteShort", rdd.creationSite.short) - //parent.ssc.sc.setLocalProperty("spark.job.callSiteLong", rdd.creationSite.long) val jobFunc = () => { foreachFunc(rdd, time) } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/GlommedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/GlommedDStream.scala index 215626f9416c7..dac1ba817386a 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/GlommedDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/GlommedDStream.scala @@ -25,16 +25,15 @@ private[streaming] class GlommedDStream[T: ClassTag](parent: DStream[T]) extends DStream[Array[T]](parent.ssc) { - setName("GlommedRDD") - override def dependencies = List(parent) override def slideDuration: Duration = parent.slideDuration override def compute(validTime: Time): Option[RDD[Array[T]]] = { - setCallSite + val prevCallSite = getCallSite + setCreationCallSite val rdd: Option[RDD[Array[T]]] = parent.getOrCompute(validTime).map(_.glom()) - ssc.sparkContext.setLocalProperty(RDD_NAME, name) + setCallSite(prevCallSite) return rdd } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapPartitionedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapPartitionedDStream.scala index d179bfc0a0974..a6350bfbf918b 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapPartitionedDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapPartitionedDStream.scala @@ -28,16 +28,15 @@ class MapPartitionedDStream[T: ClassTag, U: ClassTag]( preservePartitioning: Boolean ) extends DStream[U](parent.ssc) { - setName("MapPartitionsRDD") - override def dependencies = List(parent) override def slideDuration: Duration = parent.slideDuration override def compute(validTime: Time): Option[RDD[U]] = { - setCallSite + val prevCallSite = getCallSite + setCreationCallSite val rdd: Option[RDD[U]] = parent.getOrCompute(validTime).map(_.mapPartitions[U](mapPartFunc, preservePartitioning)) - ssc.sparkContext.setLocalProperty(RDD_NAME, name) + setCallSite(prevCallSite) return rdd } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapValuedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapValuedDStream.scala index 6431dd0424a9b..5095aba74995f 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapValuedDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapValuedDStream.scala @@ -28,16 +28,15 @@ class MapValuedDStream[K: ClassTag, V: ClassTag, U: ClassTag]( mapValueFunc: V => U ) extends DStream[(K, U)](parent.ssc) { - setName("MappedValuesRDD") - override def dependencies = List(parent) override def slideDuration: Duration = parent.slideDuration override def compute(validTime: Time): Option[RDD[(K, U)]] = { - setCallSite + val prevCallSite = getCallSite + setCreationCallSite val rdd: Option[RDD[(K, U)]] = parent.getOrCompute(validTime).map(_.mapValues[U](mapValueFunc)) - ssc.sparkContext.setLocalProperty(RDD_NAME, name) + setCallSite(prevCallSite) return rdd } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/MappedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/MappedDStream.scala index 55e9fad3e98eb..cad0ee5116700 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/MappedDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/MappedDStream.scala @@ -20,6 +20,7 @@ package org.apache.spark.streaming.dstream import org.apache.spark.streaming.{Duration, Time} import org.apache.spark.rdd.RDD import scala.reflect.ClassTag +import org.apache.spark.util.Utils private[streaming] class MappedDStream[T: ClassTag, U: ClassTag] ( @@ -27,16 +28,15 @@ class MappedDStream[T: ClassTag, U: ClassTag] ( mapFunc: T => U ) extends DStream[U](parent.ssc) { - setName("MappedRDD") - override def dependencies = List(parent) override def slideDuration: Duration = parent.slideDuration override def compute(validTime: Time): Option[RDD[U]] = { - setCallSite + val prevCallSite = getCallSite + setCreationCallSite val rdd: Option[RDD[U]] = parent.getOrCompute(validTime).map(_.map[U](mapFunc)) - ssc.sparkContext.setLocalProperty(RDD_NAME, name) + setCallSite(prevCallSite) return rdd } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala index 667c68fc3727c..6e39712eba5eb 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala @@ -32,21 +32,20 @@ class QueueInputDStream[T: ClassTag]( defaultRDD: RDD[T] ) extends InputDStream[T](ssc) { - setName("UnionRDD") - override def start() { } override def stop() { } override def compute(validTime: Time): Option[RDD[T]] = { - setCallSite - ssc.sparkContext.setLocalProperty(RDD_NAME, name) + val prevCallSite = getCallSite + setCreationCallSite val buffer = new ArrayBuffer[RDD[T]]() if (oneAtATime && queue.size > 0) { buffer += queue.dequeue() } else { buffer ++= queue.dequeueAll(_ => true) } + setCallSite(prevCallSite) if (buffer.size > 0) { if (oneAtATime) { Some(buffer.head) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala index 8954e0e4506f3..f69f11e7f9f74 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala @@ -25,7 +25,6 @@ import org.apache.spark.storage.BlockId import org.apache.spark.streaming._ import org.apache.spark.streaming.receiver.Receiver import org.apache.spark.streaming.scheduler.ReceivedBlockInfo -import org.apache.spark.util.{Utils, CallSite} /** * Abstract class for defining any [[org.apache.spark.streaming.dstream.InputDStream]] @@ -46,8 +45,6 @@ abstract class ReceiverInputDStream[T: ClassTag](@transient ssc_ : StreamingCont /** This is an unique identifier for the network input stream. */ val id = ssc.getNewReceiverStreamId() - setName("BlockRDD") - /** * Gets the receiver object that will be sent to the worker nodes * to receive data. This method needs to defined by any specific implementation @@ -62,20 +59,22 @@ abstract class ReceiverInputDStream[T: ClassTag](@transient ssc_ : StreamingCont /** Ask ReceiverInputTracker for received data blocks and generates RDDs with them. */ override def compute(validTime: Time): Option[RDD[T]] = { - setCallSite - ssc.sparkContext.setLocalProperty(RDD_NAME, name) + val prevCallSite = getCallSite + setCreationCallSite // If this is called for any time before the start time of the context, // then this returns an empty RDD. This may happen when recovering from a // master failure + var blockRDD: Option[RDD[T]] = None if (validTime >= graph.startTime) { val blockInfo = ssc.scheduler.receiverTracker.getReceivedBlockInfo(id) receivedBlockInfo(validTime) = blockInfo val blockIds = blockInfo.map(_.blockId.asInstanceOf[BlockId]) - Some(new BlockRDD[T](ssc.sc, blockIds)) + blockRDD = Some(new BlockRDD[T](ssc.sc, blockIds)) } else { - Some(new BlockRDD[T](ssc.sc, Array[BlockId]())) + blockRDD = Some(new BlockRDD[T](ssc.sc, Array[BlockId]())) } - + setCallSite(prevCallSite) + blockRDD } /** Get information on received blocks. */ diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala index 4efac61b98f23..bcb4c0e33f4c7 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala @@ -52,7 +52,8 @@ class ReducedWindowedDStream[K: ClassTag, V: ClassTag]( "must be multiple of the slide duration of parent DStream (" + parent.slideDuration + ")" ) - setName("MappedValuesRDD") + + // Reduce each batch of data using reduceByKey which will be further reduced by window // by ReducedWindowedDStream val reducedStream = parent.reduceByKey(reduceFunc, partitioner) @@ -84,8 +85,8 @@ class ReducedWindowedDStream[K: ClassTag, V: ClassTag]( } override def compute(validTime: Time): Option[RDD[(K, V)]] = { - setCallSite - ssc.sparkContext.setLocalProperty(RDD_NAME, name) + val prevCallSite = getCallSite + setCreationCallSite val reduceF = reduceFunc val invReduceF = invReduceFunc @@ -170,11 +171,13 @@ class ReducedWindowedDStream[K: ClassTag, V: ClassTag]( } val mergedValuesRDD = cogroupedRDD.asInstanceOf[RDD[(K,Seq[Seq[V]])]].mapValues(mergeValues) - + var returnRDD: Option[RDD[(K, V)]] = None if (filterFunc.isDefined) { - Some(mergedValuesRDD.filter(filterFunc.get)) + returnRDD = Some(mergedValuesRDD.filter(filterFunc.get)) } else { - Some(mergedValuesRDD) + returnRDD = Some(mergedValuesRDD) } + setCallSite(prevCallSite) + returnRDD } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ShuffledDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ShuffledDStream.scala index 777a87ef20978..e6be835b5fa04 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ShuffledDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ShuffledDStream.scala @@ -33,20 +33,19 @@ class ShuffledDStream[K: ClassTag, V: ClassTag, C: ClassTag]( mapSideCombine: Boolean = true ) extends DStream[(K,C)] (parent.ssc) { - setName("ShuffledRDD") - override def dependencies = List(parent) override def slideDuration: Duration = parent.slideDuration override def compute(validTime: Time): Option[RDD[(K,C)]] = { - setCallSite + val prevCallSite = getCallSite + setCreationCallSite val rdd: Option[RDD[(K,C)]] = parent.getOrCompute(validTime) match { case Some(rdd) => Some(rdd.combineByKey[C]( createCombiner, mergeValue, mergeCombiner, partitioner, mapSideCombine)) case None => None } - ssc.sparkContext.setLocalProperty(RDD_NAME, name) + setCallSite(prevCallSite) return rdd } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala index a7dd7d81b4e85..b027e376d4e73 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala @@ -34,7 +34,6 @@ class StateDStream[K: ClassTag, V: ClassTag, S: ClassTag]( ) extends DStream[(K, S)](parent.ssc) { super.persist(StorageLevel.MEMORY_ONLY_SER) - setName("MapPartitionsRDD") override def dependencies = List(parent) override def slideDuration: Duration = parent.slideDuration @@ -42,8 +41,8 @@ class StateDStream[K: ClassTag, V: ClassTag, S: ClassTag]( override val mustCheckpoint = true override def compute(validTime: Time): Option[RDD[(K, S)]] = { - setCallSite - ssc.sparkContext.setLocalProperty(RDD_NAME, name) + val prevCallSite = getCallSite + setCreationCallSite // Try to get the previous state RDD getOrCompute(validTime - slideDuration) match { @@ -71,6 +70,7 @@ class StateDStream[K: ClassTag, V: ClassTag, S: ClassTag]( } val cogroupedRDD = parentRDD.cogroup(prevStateRDD, partitioner) val stateRDD = cogroupedRDD.mapPartitions(finalFunc, preservePartitioning) + setCallSite(prevCallSite) Some(stateRDD) } case None => { // If parent RDD does not exist @@ -82,6 +82,7 @@ class StateDStream[K: ClassTag, V: ClassTag, S: ClassTag]( updateFuncLocal(i) } val stateRDD = prevStateRDD.mapPartitions(finalFunc, preservePartitioning) + setCallSite(prevCallSite) Some(stateRDD) } } @@ -104,10 +105,12 @@ class StateDStream[K: ClassTag, V: ClassTag, S: ClassTag]( val groupedRDD = parentRDD.groupByKey(partitioner) val sessionRDD = groupedRDD.mapPartitions(finalFunc, preservePartitioning) // logDebug("Generating state RDD for time " + validTime + " (first)") + setCallSite(prevCallSite) Some(sessionRDD) } case None => { // If parent RDD does not exist, then nothing to do! // logDebug("Not generating state RDD (no previous state, no parent)") + setCallSite(prevCallSite) None } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala index 9bbf9a24deaba..57aea89216b20 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala @@ -32,17 +32,16 @@ class TransformedDStream[U: ClassTag] ( require(parents.map(_.slideDuration).distinct.size == 1, "Some of the DStreams have different slide durations") - setName("TransformedRDD") - override def dependencies = parents.toList override def slideDuration: Duration = parents.head.slideDuration override def compute(validTime: Time): Option[RDD[U]] = { - setCallSite + val prevCallSite = getCallSite + setCreationCallSite val parentRDDs = parents.map(_.getOrCompute(validTime).orNull).toSeq val rdd: Option[RDD[U]] = Some(transformFunc(parentRDDs, validTime)) - ssc.sparkContext.setLocalProperty(RDD_NAME, name) + setCallSite(prevCallSite) return rdd } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/UnionDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/UnionDStream.scala index 12d3c5bdd3dea..c159fc9223850 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/UnionDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/UnionDStream.scala @@ -40,15 +40,13 @@ class UnionDStream[T: ClassTag](parents: Array[DStream[T]]) throw new IllegalArgumentException("Array of parents have different slide times") } - setName("UnionRDD") - override def dependencies = parents.toList override def slideDuration: Duration = parents.head.slideDuration override def compute(validTime: Time): Option[RDD[T]] = { - setCallSite - ssc.sparkContext.setLocalProperty(RDD_NAME, name) + val prevCallSite = getCallSite + setCreationCallSite val rdds = new ArrayBuffer[RDD[T]]() parents.map(_.getOrCompute(validTime)).foreach(_ match { case Some(rdd) => rdds += rdd @@ -56,8 +54,10 @@ class UnionDStream[T: ClassTag](parents: Array[DStream[T]]) + validTime) }) if (rdds.size > 0) { + setCallSite(prevCallSite) Some(new UnionRDD(ssc.sc, rdds)) } else { + setCallSite(prevCallSite) None } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala index ff715306316f0..2cf493312cf77 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala @@ -44,8 +44,6 @@ class WindowedDStream[T: ClassTag]( // Persist parent level by default, as those RDDs are going to be obviously reused. parent.persist(StorageLevel.MEMORY_ONLY_SER) - setName("PartitionerAwareUnionRDD") - def windowDuration: Duration = _windowDuration override def dependencies = List(parent) @@ -63,8 +61,8 @@ class WindowedDStream[T: ClassTag]( } override def compute(validTime: Time): Option[RDD[T]] = { - setCallSite - ssc.sparkContext.setLocalProperty(RDD_NAME, name) + val prevCallSite = getCallSite + setCreationCallSite val currentWindow = new Interval(validTime - windowDuration + parent.slideDuration, validTime) val rddsInWindow = parent.slice(currentWindow) val windowRDD = if (rddsInWindow.flatMap(_.partitioner).distinct.length == 1) { @@ -74,6 +72,7 @@ class WindowedDStream[T: ClassTag]( logDebug("Using normal union for windowing at " + validTime) new UnionRDD(ssc.sc,rddsInWindow) } + setCallSite(prevCallSite) Some(windowRDD) } } From 1d90cc31d1f55d35e4806639d667e2c1ac52e012 Mon Sep 17 00:00:00 2001 From: Mubarak Seyed Date: Sun, 10 Aug 2014 18:57:49 -0700 Subject: [PATCH 05/14] Changes for SPARK-1853 --- conf/log4j.properties.template | 1 - core/src/main/scala/org/apache/spark/rdd/RDD.scala | 2 +- .../org/apache/spark/streaming/dstream/ForEachDStream.scala | 2 +- .../org/apache/spark/streaming/scheduler/JobGenerator.scala | 1 - 4 files changed, 2 insertions(+), 4 deletions(-) diff --git a/conf/log4j.properties.template b/conf/log4j.properties.template index ab8c65f6fd67b..89eec7d4b7f61 100644 --- a/conf/log4j.properties.template +++ b/conf/log4j.properties.template @@ -4,7 +4,6 @@ log4j.appender.console=org.apache.log4j.ConsoleAppender log4j.appender.console.target=System.err log4j.appender.console.layout=org.apache.log4j.PatternLayout log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n -log4j.logger.org.apache.spark.rdd.RDD=INFO # Settings to quiet third party logs that are too verbose log4j.logger.org.eclipse.jetty=WARN diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 82571cbf597ff..67289ef60b616 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -125,7 +125,7 @@ abstract class RDD[T: ClassTag]( val id: Int = sc.newRddId() /** A friendly name for this RDD */ - @transient var name: String = sc.getLocalProperty("rddName") + @transient var name: String = null /** Assign a name to this RDD */ def setName(_name: String): this.type = { diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala index d696d57fac52f..905bc723f69a9 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala @@ -35,7 +35,7 @@ class ForEachDStream[T: ClassTag] ( override def compute(validTime: Time): Option[RDD[Unit]] = None override def generateJob(time: Time): Option[Job] = { - return parent.getOrCompute(time) match { + parent.getOrCompute(time) match { case Some(rdd) => val jobFunc = () => { foreachFunc(rdd, time) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala index 35706be90c378..374848358e700 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala @@ -22,7 +22,6 @@ import org.apache.spark.{SparkException, SparkEnv, Logging} import org.apache.spark.streaming.{Checkpoint, Time, CheckpointWriter} import org.apache.spark.streaming.util.{ManualClock, RecurringTimer, Clock} import scala.util.{Failure, Success, Try} -import org.apache.spark.util.{CallSite, Utils} /** Event classes for JobGenerator */ private[scheduler] sealed trait JobGeneratorEvent From 2a09ad6620e380efb0a0bbc7daac0340ea6e497a Mon Sep 17 00:00:00 2001 From: Mubarak Seyed Date: Sun, 10 Aug 2014 19:04:01 -0700 Subject: [PATCH 06/14] Changes in Utils.scala for SPARK-1853 --- core/src/main/scala/org/apache/spark/util/Utils.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index c25db7eb44c0d..eedc8807603cd 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -52,8 +52,8 @@ private[spark] case class CallSite(shortForm: String, longForm: String) private[spark] object Utils extends Logging { val random = new Random() - private[spark] val CALL_SITE_SHORT: String = ".callSite.short" - private[spark] val CALL_SITE_LONG: String = ".callSite.long" + private[spark] val CALL_SITE_SHORT: String = "callSite.short" + private[spark] val CALL_SITE_LONG: String = "callSite.long" def sparkBin(sparkHome: String, which: String): File = { val suffix = if (isWindows) ".cmd" else "" From ccde038f664c46f2c48e1f073bcfc6d8917ecef1 Mon Sep 17 00:00:00 2001 From: Mubarak Seyed Date: Sun, 10 Aug 2014 19:10:54 -0700 Subject: [PATCH 07/14] Removing Utils import from MappedDStream --- .../scala/org/apache/spark/streaming/dstream/MappedDStream.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/MappedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/MappedDStream.scala index cad0ee5116700..6aae7658a7e0a 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/MappedDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/MappedDStream.scala @@ -20,7 +20,6 @@ package org.apache.spark.streaming.dstream import org.apache.spark.streaming.{Duration, Time} import org.apache.spark.rdd.RDD import scala.reflect.ClassTag -import org.apache.spark.util.Utils private[streaming] class MappedDStream[T: ClassTag, U: ClassTag] ( From a207eb7ba14e4f51011dea7d2ba0651c5466f218 Mon Sep 17 00:00:00 2001 From: Mubarak Seyed Date: Sun, 17 Aug 2014 20:45:06 -0700 Subject: [PATCH 08/14] Fixing code review comments --- core/src/main/scala/org/apache/spark/util/Utils.scala | 6 +----- .../org/apache/spark/streaming/StreamingContext.scala | 1 - .../org/apache/spark/streaming/dstream/DStream.scala | 6 +++++- .../spark/streaming/dstream/FileInputDStream.scala | 4 +--- .../spark/streaming/dstream/FilteredDStream.scala | 6 +----- .../streaming/dstream/FlatMapValuedDStream.scala | 6 +----- .../spark/streaming/dstream/FlatMappedDStream.scala | 6 +----- .../spark/streaming/dstream/GlommedDStream.scala | 6 +----- .../streaming/dstream/MapPartitionedDStream.scala | 6 +----- .../spark/streaming/dstream/MapValuedDStream.scala | 6 +----- .../spark/streaming/dstream/MappedDStream.scala | 6 +----- .../spark/streaming/dstream/QueueInputDStream.scala | 3 --- .../streaming/dstream/ReceiverInputDStream.scala | 9 ++------- .../streaming/dstream/ReducedWindowedDStream.scala | 11 ++--------- .../spark/streaming/dstream/ShuffledDStream.scala | 6 +----- .../apache/spark/streaming/dstream/StateDStream.scala | 7 +------ .../spark/streaming/dstream/TransformedDStream.scala | 6 +----- .../apache/spark/streaming/dstream/UnionDStream.scala | 4 ---- .../spark/streaming/dstream/WindowedDStream.scala | 3 --- 19 files changed, 21 insertions(+), 87 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index eedc8807603cd..693aff141743a 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -818,8 +818,6 @@ private[spark] object Utils extends Logging { */ private val SPARK_CLASS_REGEX = """^org\.apache\.spark(\.api\.java)?(\.util)?(\.rdd)?(\.streaming)?(\.streaming\.dstream)?(\.streaming\.scheduler)?(\.streaming\.twitter)?(\.streaming\.kafka)?(\.streaming\.flume)?(\.streaming\.mqtt)?(\.streaming\.zeromq)?\.[A-Z]""".r private val SCALA_CLASS_REGEX = """^scala(\.util)?(\.collection)?(\.collection\.mutable)?(\.collection\.immutable)?(\.concurrent\.forkjoin)?\.[A-Z]""".r - private val AKKA_CLASS_REGEX = """^akka(\.actor)?(\.dispatch)?\.[A-Z]""".r - private val JAVA_CLASS_REGEX = """^java(\.util\.concurrent)?(\.lang)?\.[A-Z]""".r /** * When called inside a class in the spark package, returns the name of the user code class @@ -848,9 +846,7 @@ private[spark] object Utils extends Logging { for (el <- trace) { if (insideSpark) { if (SPARK_CLASS_REGEX.findFirstIn(el.getClassName).isDefined || - SCALA_CLASS_REGEX.findFirstIn(el.getClassName).isDefined || - AKKA_CLASS_REGEX.findFirstIn(el.getClassName).isDefined || - JAVA_CLASS_REGEX.findFirstIn(el.getClassName).isDefined) { + SCALA_CLASS_REGEX.findFirstIn(el.getClassName).isDefined) { lastSparkMethod = if (el.getMethodName == "") { // Spark method is a constructor; get its class name el.getClassName.substring(el.getClassName.lastIndexOf('.') + 1) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index b287662ff1a7a..29ea2583275b1 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -38,7 +38,6 @@ import org.apache.spark.streaming.dstream._ import org.apache.spark.streaming.receiver.{ActorSupervisorStrategy, ActorReceiver, Receiver} import org.apache.spark.streaming.scheduler._ import org.apache.spark.streaming.ui.StreamingTab -import org.apache.spark.util.{Utils, MetadataCleaner} /** * Main entry point for Spark Streaming functionality. It provides methods used to create diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala index 440166b6537b2..876c6568ac72d 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala @@ -122,7 +122,7 @@ abstract class DStream[T: ClassTag] ( } /* Return the current callSite */ - private[streaming] def getCallSite() = { + private[streaming] def getCallSite(): CallSite = { CallSite(ssc.sparkContext.getLocalProperty(Utils.CALL_SITE_SHORT), ssc.sparkContext.getLocalProperty(Utils.CALL_SITE_LONG)) } @@ -309,6 +309,8 @@ abstract class DStream[T: ClassTag] ( // (based on sliding time of this DStream), then generate the RDD case None => { if (isTimeValid(time)) { + val prevCallSite = getCallSite + setCreationCallSite compute(time) match { case Some(newRDD) => if (storageLevel != StorageLevel.NONE) { @@ -323,8 +325,10 @@ abstract class DStream[T: ClassTag] ( " for checkpointing at time " + time) } generatedRDDs.put(time, newRDD) + setCallSite(prevCallSite) Some(newRDD) case None => + setCallSite(prevCallSite) None } } else { diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala index 6dcb07e679ccb..9eecbfaef363f 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala @@ -70,8 +70,7 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas override def compute(validTime: Time): Option[RDD[(K, V)]] = { assert(validTime.milliseconds >= ignoreTime, "Trying to get new files for a really old time [" + validTime + " < " + ignoreTime + "]") - val prevCallSite = getCallSite - setCreationCallSite + // Find new files val (newFiles, minNewFileModTime) = findNewFiles(validTime.milliseconds) logInfo("New files at time " + validTime + ":\n" + newFiles.mkString("\n")) @@ -81,7 +80,6 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas ignoreTime = minNewFileModTime } files += ((validTime, newFiles.toArray)) - setCallSite(prevCallSite) Some(filesToRDD(newFiles)) } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FilteredDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FilteredDStream.scala index c49f620533d85..c81534ae584ea 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FilteredDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FilteredDStream.scala @@ -32,11 +32,7 @@ class FilteredDStream[T: ClassTag]( override def slideDuration: Duration = parent.slideDuration override def compute(validTime: Time): Option[RDD[T]] = { - val prevCallSite = getCallSite - setCreationCallSite - val rdd: Option[RDD[T]] = parent.getOrCompute(validTime).map(_.filter(filterFunc)) - setCallSite(prevCallSite) - return rdd + parent.getOrCompute(validTime).map(_.filter(filterFunc)) } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMapValuedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMapValuedDStream.scala index ef9cead0e9bcb..658623455498c 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMapValuedDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMapValuedDStream.scala @@ -33,10 +33,6 @@ class FlatMapValuedDStream[K: ClassTag, V: ClassTag, U: ClassTag]( override def slideDuration: Duration = parent.slideDuration override def compute(validTime: Time): Option[RDD[(K, U)]] = { - val prevCallSite = getCallSite - setCreationCallSite - val rdd: Option[RDD[(K, U)]] = parent.getOrCompute(validTime).map(_.flatMapValues[U](flatMapValueFunc)) - setCallSite(prevCallSite) - return rdd + parent.getOrCompute(validTime).map(_.flatMapValues[U](flatMapValueFunc)) } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMappedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMappedDStream.scala index 898cf82f5a3ab..c7bb2833eabb8 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMappedDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMappedDStream.scala @@ -32,11 +32,7 @@ class FlatMappedDStream[T: ClassTag, U: ClassTag]( override def slideDuration: Duration = parent.slideDuration override def compute(validTime: Time): Option[RDD[U]] = { - val prevCallSite = getCallSite - setCreationCallSite - val rdd: Option[RDD[U]] = parent.getOrCompute(validTime).map(_.flatMap(flatMapFunc)) - setCallSite(prevCallSite) - return rdd + parent.getOrCompute(validTime).map(_.flatMap(flatMapFunc)) } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/GlommedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/GlommedDStream.scala index dac1ba817386a..a9bb51f054048 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/GlommedDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/GlommedDStream.scala @@ -30,10 +30,6 @@ class GlommedDStream[T: ClassTag](parent: DStream[T]) override def slideDuration: Duration = parent.slideDuration override def compute(validTime: Time): Option[RDD[Array[T]]] = { - val prevCallSite = getCallSite - setCreationCallSite - val rdd: Option[RDD[Array[T]]] = parent.getOrCompute(validTime).map(_.glom()) - setCallSite(prevCallSite) - return rdd + parent.getOrCompute(validTime).map(_.glom()) } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapPartitionedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapPartitionedDStream.scala index a6350bfbf918b..3d8ee29df1e82 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapPartitionedDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapPartitionedDStream.scala @@ -33,11 +33,7 @@ class MapPartitionedDStream[T: ClassTag, U: ClassTag]( override def slideDuration: Duration = parent.slideDuration override def compute(validTime: Time): Option[RDD[U]] = { - val prevCallSite = getCallSite - setCreationCallSite - val rdd: Option[RDD[U]] = parent.getOrCompute(validTime).map(_.mapPartitions[U](mapPartFunc, preservePartitioning)) - setCallSite(prevCallSite) - return rdd + parent.getOrCompute(validTime).map(_.mapPartitions[U](mapPartFunc, preservePartitioning)) } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapValuedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapValuedDStream.scala index 5095aba74995f..7aea1f945d9db 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapValuedDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapValuedDStream.scala @@ -33,11 +33,7 @@ class MapValuedDStream[K: ClassTag, V: ClassTag, U: ClassTag]( override def slideDuration: Duration = parent.slideDuration override def compute(validTime: Time): Option[RDD[(K, U)]] = { - val prevCallSite = getCallSite - setCreationCallSite - val rdd: Option[RDD[(K, U)]] = parent.getOrCompute(validTime).map(_.mapValues[U](mapValueFunc)) - setCallSite(prevCallSite) - return rdd + parent.getOrCompute(validTime).map(_.mapValues[U](mapValueFunc)) } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/MappedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/MappedDStream.scala index 6aae7658a7e0a..02704a8d1c2e0 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/MappedDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/MappedDStream.scala @@ -32,11 +32,7 @@ class MappedDStream[T: ClassTag, U: ClassTag] ( override def slideDuration: Duration = parent.slideDuration override def compute(validTime: Time): Option[RDD[U]] = { - val prevCallSite = getCallSite - setCreationCallSite - val rdd: Option[RDD[U]] = parent.getOrCompute(validTime).map(_.map[U](mapFunc)) - setCallSite(prevCallSite) - return rdd + parent.getOrCompute(validTime).map(_.map[U](mapFunc)) } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala index 6e39712eba5eb..ed7da6dc1315e 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala @@ -37,15 +37,12 @@ class QueueInputDStream[T: ClassTag]( override def stop() { } override def compute(validTime: Time): Option[RDD[T]] = { - val prevCallSite = getCallSite - setCreationCallSite val buffer = new ArrayBuffer[RDD[T]]() if (oneAtATime && queue.size > 0) { buffer += queue.dequeue() } else { buffer ++= queue.dequeueAll(_ => true) } - setCallSite(prevCallSite) if (buffer.size > 0) { if (oneAtATime) { Some(buffer.head) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala index f69f11e7f9f74..391e40924f38a 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala @@ -59,22 +59,17 @@ abstract class ReceiverInputDStream[T: ClassTag](@transient ssc_ : StreamingCont /** Ask ReceiverInputTracker for received data blocks and generates RDDs with them. */ override def compute(validTime: Time): Option[RDD[T]] = { - val prevCallSite = getCallSite - setCreationCallSite // If this is called for any time before the start time of the context, // then this returns an empty RDD. This may happen when recovering from a // master failure - var blockRDD: Option[RDD[T]] = None if (validTime >= graph.startTime) { val blockInfo = ssc.scheduler.receiverTracker.getReceivedBlockInfo(id) receivedBlockInfo(validTime) = blockInfo val blockIds = blockInfo.map(_.blockId.asInstanceOf[BlockId]) - blockRDD = Some(new BlockRDD[T](ssc.sc, blockIds)) + Some(new BlockRDD[T](ssc.sc, blockIds)) } else { - blockRDD = Some(new BlockRDD[T](ssc.sc, Array[BlockId]())) + Some(new BlockRDD[T](ssc.sc, Array[BlockId]())) } - setCallSite(prevCallSite) - blockRDD } /** Get information on received blocks. */ diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala index a9c884f846973..1a47089e513c4 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala @@ -52,8 +52,6 @@ class ReducedWindowedDStream[K: ClassTag, V: ClassTag]( "must be multiple of the slide duration of parent DStream (" + parent.slideDuration + ")" ) - - // Reduce each batch of data using reduceByKey which will be further reduced by window // by ReducedWindowedDStream val reducedStream = parent.reduceByKey(reduceFunc, partitioner) @@ -85,8 +83,6 @@ class ReducedWindowedDStream[K: ClassTag, V: ClassTag]( } override def compute(validTime: Time): Option[RDD[(K, V)]] = { - val prevCallSite = getCallSite - setCreationCallSite val reduceF = reduceFunc val invReduceF = invReduceFunc @@ -170,16 +166,13 @@ class ReducedWindowedDStream[K: ClassTag, V: ClassTag]( } } - var returnRDD: Option[RDD[(K, V)]] = None val mergedValuesRDD = cogroupedRDD.asInstanceOf[RDD[(K, Array[Iterable[V]])]] .mapValues(mergeValues) if (filterFunc.isDefined) { - returnRDD = Some(mergedValuesRDD.filter(filterFunc.get)) + Some(mergedValuesRDD.filter(filterFunc.get)) } else { - returnRDD = Some(mergedValuesRDD) + Some(mergedValuesRDD) } - setCallSite(prevCallSite) - returnRDD } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ShuffledDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ShuffledDStream.scala index e6be835b5fa04..880a89bc36895 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ShuffledDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ShuffledDStream.scala @@ -38,14 +38,10 @@ class ShuffledDStream[K: ClassTag, V: ClassTag, C: ClassTag]( override def slideDuration: Duration = parent.slideDuration override def compute(validTime: Time): Option[RDD[(K,C)]] = { - val prevCallSite = getCallSite - setCreationCallSite - val rdd: Option[RDD[(K,C)]] = parent.getOrCompute(validTime) match { + parent.getOrCompute(validTime) match { case Some(rdd) => Some(rdd.combineByKey[C]( createCombiner, mergeValue, mergeCombiner, partitioner, mapSideCombine)) case None => None } - setCallSite(prevCallSite) - return rdd } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala index b027e376d4e73..7e22268767de7 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala @@ -34,6 +34,7 @@ class StateDStream[K: ClassTag, V: ClassTag, S: ClassTag]( ) extends DStream[(K, S)](parent.ssc) { super.persist(StorageLevel.MEMORY_ONLY_SER) + override def dependencies = List(parent) override def slideDuration: Duration = parent.slideDuration @@ -41,8 +42,6 @@ class StateDStream[K: ClassTag, V: ClassTag, S: ClassTag]( override val mustCheckpoint = true override def compute(validTime: Time): Option[RDD[(K, S)]] = { - val prevCallSite = getCallSite - setCreationCallSite // Try to get the previous state RDD getOrCompute(validTime - slideDuration) match { @@ -70,7 +69,6 @@ class StateDStream[K: ClassTag, V: ClassTag, S: ClassTag]( } val cogroupedRDD = parentRDD.cogroup(prevStateRDD, partitioner) val stateRDD = cogroupedRDD.mapPartitions(finalFunc, preservePartitioning) - setCallSite(prevCallSite) Some(stateRDD) } case None => { // If parent RDD does not exist @@ -82,7 +80,6 @@ class StateDStream[K: ClassTag, V: ClassTag, S: ClassTag]( updateFuncLocal(i) } val stateRDD = prevStateRDD.mapPartitions(finalFunc, preservePartitioning) - setCallSite(prevCallSite) Some(stateRDD) } } @@ -105,12 +102,10 @@ class StateDStream[K: ClassTag, V: ClassTag, S: ClassTag]( val groupedRDD = parentRDD.groupByKey(partitioner) val sessionRDD = groupedRDD.mapPartitions(finalFunc, preservePartitioning) // logDebug("Generating state RDD for time " + validTime + " (first)") - setCallSite(prevCallSite) Some(sessionRDD) } case None => { // If parent RDD does not exist, then nothing to do! // logDebug("Not generating state RDD (no previous state, no parent)") - setCallSite(prevCallSite) None } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala index 57aea89216b20..7cd4554282ca1 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala @@ -37,11 +37,7 @@ class TransformedDStream[U: ClassTag] ( override def slideDuration: Duration = parents.head.slideDuration override def compute(validTime: Time): Option[RDD[U]] = { - val prevCallSite = getCallSite - setCreationCallSite val parentRDDs = parents.map(_.getOrCompute(validTime).orNull).toSeq - val rdd: Option[RDD[U]] = Some(transformFunc(parentRDDs, validTime)) - setCallSite(prevCallSite) - return rdd + Some(transformFunc(parentRDDs, validTime)) } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/UnionDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/UnionDStream.scala index c159fc9223850..57429a15329a1 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/UnionDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/UnionDStream.scala @@ -45,8 +45,6 @@ class UnionDStream[T: ClassTag](parents: Array[DStream[T]]) override def slideDuration: Duration = parents.head.slideDuration override def compute(validTime: Time): Option[RDD[T]] = { - val prevCallSite = getCallSite - setCreationCallSite val rdds = new ArrayBuffer[RDD[T]]() parents.map(_.getOrCompute(validTime)).foreach(_ match { case Some(rdd) => rdds += rdd @@ -54,10 +52,8 @@ class UnionDStream[T: ClassTag](parents: Array[DStream[T]]) + validTime) }) if (rdds.size > 0) { - setCallSite(prevCallSite) Some(new UnionRDD(ssc.sc, rdds)) } else { - setCallSite(prevCallSite) None } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala index 2cf493312cf77..775b6bfd065c0 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala @@ -61,8 +61,6 @@ class WindowedDStream[T: ClassTag]( } override def compute(validTime: Time): Option[RDD[T]] = { - val prevCallSite = getCallSite - setCreationCallSite val currentWindow = new Interval(validTime - windowDuration + parent.slideDuration, validTime) val rddsInWindow = parent.slice(currentWindow) val windowRDD = if (rddsInWindow.flatMap(_.partitioner).distinct.length == 1) { @@ -72,7 +70,6 @@ class WindowedDStream[T: ClassTag]( logDebug("Using normal union for windowing at " + validTime) new UnionRDD(ssc.sc,rddsInWindow) } - setCallSite(prevCallSite) Some(windowRDD) } } From 5051c58c9fce1e6821a35b5ffe7cb670a07912ed Mon Sep 17 00:00:00 2001 From: Mubarak Seyed Date: Sun, 17 Aug 2014 21:20:56 -0700 Subject: [PATCH 09/14] Getting return value of compute() into variable and call setCallSite(prevCallSite) only once. Adding return for other code paths (for None) --- .../org/apache/spark/streaming/dstream/DStream.scala | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala index 876c6568ac72d..69f08742e2b0b 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala @@ -311,7 +311,7 @@ abstract class DStream[T: ClassTag] ( if (isTimeValid(time)) { val prevCallSite = getCallSite setCreationCallSite - compute(time) match { + val rddOption = compute(time) match { case Some(newRDD) => if (storageLevel != StorageLevel.NONE) { newRDD.persist(storageLevel) @@ -325,14 +325,14 @@ abstract class DStream[T: ClassTag] ( " for checkpointing at time " + time) } generatedRDDs.put(time, newRDD) - setCallSite(prevCallSite) Some(newRDD) case None => - setCallSite(prevCallSite) - None + return None } + setCallSite(prevCallSite) + return rddOption } else { - None + return None } } } From f51fd9f62ed25611c7cd078ed5207f8c5392e60a Mon Sep 17 00:00:00 2001 From: Mubarak Seyed Date: Tue, 19 Aug 2014 00:11:52 -0700 Subject: [PATCH 10/14] Fixing scalastyle, Regex for Utils.getCallSite, and changing method names in DStream --- core/src/main/scala/org/apache/spark/util/Utils.scala | 8 +++++--- .../org/apache/spark/streaming/dstream/DStream.scala | 10 +++++----- 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 693aff141743a..ae3c9db71c29d 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -816,8 +816,9 @@ private[spark] object Utils extends Logging { * A regular expression to match classes of the "core" Spark API that we want to skip when * finding the call site of a method. */ - private val SPARK_CLASS_REGEX = """^org\.apache\.spark(\.api\.java)?(\.util)?(\.rdd)?(\.streaming)?(\.streaming\.dstream)?(\.streaming\.scheduler)?(\.streaming\.twitter)?(\.streaming\.kafka)?(\.streaming\.flume)?(\.streaming\.mqtt)?(\.streaming\.zeromq)?\.[A-Z]""".r - private val SCALA_CLASS_REGEX = """^scala(\.util)?(\.collection)?(\.collection\.mutable)?(\.collection\.immutable)?(\.concurrent\.forkjoin)?\.[A-Z]""".r + private val SPARK_CLASS_REGEX = """^org\.apache\.spark""".r + private val SPARK_EXAMPLES_CLASS_REGEX = """^org\.apache\.spark\.examples""".r + private val SCALA_CLASS_REGEX = """^scala""".r /** * When called inside a class in the spark package, returns the name of the user code class @@ -845,7 +846,8 @@ private[spark] object Utils extends Logging { for (el <- trace) { if (insideSpark) { - if (SPARK_CLASS_REGEX.findFirstIn(el.getClassName).isDefined || + if ((SPARK_CLASS_REGEX.findFirstIn(el.getClassName).isDefined && + !SPARK_EXAMPLES_CLASS_REGEX.findFirstIn(el.getClassName).isDefined) || SCALA_CLASS_REGEX.findFirstIn(el.getClassName).isDefined) { lastSparkMethod = if (el.getMethodName == "") { // Spark method is a constructor; get its class name diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala index 69f08742e2b0b..71cbd3ee1cf8c 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala @@ -109,14 +109,14 @@ abstract class DStream[T: ClassTag] ( /* Find the creation callSite */ val creationSite = Utils.getCallSite - /* Store the creation callSite in threadlocal */ - private[streaming] def setCreationCallSite() = { + /* Store the RDD creation callSite in threadlocal */ + private def setRDDCreationCallSite() = { ssc.sparkContext.setLocalProperty(Utils.CALL_SITE_SHORT, creationSite.shortForm) ssc.sparkContext.setLocalProperty(Utils.CALL_SITE_LONG, creationSite.longForm) } /* Store the supplied callSite in threadlocal */ - private[streaming] def setCallSite(callSite: CallSite) = { + private def setRDDCallSite(callSite: CallSite) = { ssc.sparkContext.setLocalProperty(Utils.CALL_SITE_SHORT, callSite.shortForm) ssc.sparkContext.setLocalProperty(Utils.CALL_SITE_LONG, callSite.longForm) } @@ -310,7 +310,7 @@ abstract class DStream[T: ClassTag] ( case None => { if (isTimeValid(time)) { val prevCallSite = getCallSite - setCreationCallSite + setRDDCreationCallSite val rddOption = compute(time) match { case Some(newRDD) => if (storageLevel != StorageLevel.NONE) { @@ -329,7 +329,7 @@ abstract class DStream[T: ClassTag] ( case None => return None } - setCallSite(prevCallSite) + setRDDCallSite(prevCallSite) return rddOption } else { return None From 33a72952caa31cb218b2dc3e0ecf188ee0ccdfb5 Mon Sep 17 00:00:00 2001 From: Mubarak Seyed Date: Wed, 20 Aug 2014 01:00:29 -0700 Subject: [PATCH 11/14] Fixing review comments: Merging both setCallSite methods --- .../apache/spark/streaming/dstream/DStream.scala | 16 +++++----------- 1 file changed, 5 insertions(+), 11 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala index 71cbd3ee1cf8c..caca59fb25871 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala @@ -110,19 +110,13 @@ abstract class DStream[T: ClassTag] ( val creationSite = Utils.getCallSite /* Store the RDD creation callSite in threadlocal */ - private def setRDDCreationCallSite() = { - ssc.sparkContext.setLocalProperty(Utils.CALL_SITE_SHORT, creationSite.shortForm) - ssc.sparkContext.setLocalProperty(Utils.CALL_SITE_LONG, creationSite.longForm) - } - - /* Store the supplied callSite in threadlocal */ - private def setRDDCallSite(callSite: CallSite) = { + private def setRDDCreationCallSite(callSite: CallSite = creationSite) = { ssc.sparkContext.setLocalProperty(Utils.CALL_SITE_SHORT, callSite.shortForm) ssc.sparkContext.setLocalProperty(Utils.CALL_SITE_LONG, callSite.longForm) } /* Return the current callSite */ - private[streaming] def getCallSite(): CallSite = { + private[streaming] def getRDDCreationCallSite(): CallSite = { CallSite(ssc.sparkContext.getLocalProperty(Utils.CALL_SITE_SHORT), ssc.sparkContext.getLocalProperty(Utils.CALL_SITE_LONG)) } @@ -309,8 +303,8 @@ abstract class DStream[T: ClassTag] ( // (based on sliding time of this DStream), then generate the RDD case None => { if (isTimeValid(time)) { - val prevCallSite = getCallSite - setRDDCreationCallSite + val prevCallSite = getRDDCreationCallSite + setRDDCreationCallSite() val rddOption = compute(time) match { case Some(newRDD) => if (storageLevel != StorageLevel.NONE) { @@ -329,7 +323,7 @@ abstract class DStream[T: ClassTag] ( case None => return None } - setRDDCallSite(prevCallSite) + setRDDCreationCallSite(prevCallSite) return rddOption } else { return None From 491a1eb23aa94919a9a06a84a9cf3cab118fa628 Mon Sep 17 00:00:00 2001 From: Mubarak Seyed Date: Wed, 20 Aug 2014 01:07:05 -0700 Subject: [PATCH 12/14] Removing streaming visibility from getRDDCreationCallSite in DStream --- .../main/scala/org/apache/spark/streaming/dstream/DStream.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala index caca59fb25871..8c418faaf72a0 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala @@ -116,7 +116,7 @@ abstract class DStream[T: ClassTag] ( } /* Return the current callSite */ - private[streaming] def getRDDCreationCallSite(): CallSite = { + private def getRDDCreationCallSite(): CallSite = { CallSite(ssc.sparkContext.getLocalProperty(Utils.CALL_SITE_SHORT), ssc.sparkContext.getLocalProperty(Utils.CALL_SITE_LONG)) } From ceb43daeb0947acab5629031c6e35d29f96f6376 Mon Sep 17 00:00:00 2001 From: Mubarak Seyed Date: Fri, 5 Sep 2014 03:00:38 -0700 Subject: [PATCH 13/14] Changing default regex function name --- core/src/main/scala/org/apache/spark/util/Utils.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 552a7e378ced1..172f447023e8d 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -863,7 +863,7 @@ private[spark] object Utils extends Logging { private val SPARK_STREAMING_CLASS_REGEX = """^org\.apache\.spark""".r private val SPARK_EXAMPLES_CLASS_REGEX = """^org\.apache\.spark\.examples""".r - private def defaultRegex(className: String): Boolean = { + private def defaultRegexFunc(className: String): Boolean = { SPARK_CLASS_REGEX.findFirstIn(className).isDefined || SCALA_CLASS_REGEX.findFirstIn(className).isDefined } @@ -879,7 +879,7 @@ private[spark] object Utils extends Logging { * (outside the spark package) that called into Spark, as well as which Spark method they called. * This is used, for example, to tell users where in their code each RDD got created. */ - def getCallSite(regexFunc: String => Boolean = defaultRegex(_)): CallSite = { + def getCallSite(regexFunc: String => Boolean = defaultRegexFunc(_)): CallSite = { val trace = Thread.currentThread.getStackTrace() .filterNot { ste:StackTraceElement => // When running under some profilers, the current stack trace might contain some bogus From b9ed945863806a657af31febbf63c53e57828b4f Mon Sep 17 00:00:00 2001 From: Mubarak Seyed Date: Fri, 5 Sep 2014 20:48:16 -0700 Subject: [PATCH 14/14] Adding streaming utils --- .../scala/org/apache/spark/util/Utils.scala | 10 +----- .../spark/streaming/StreamingContext.scala | 4 ++- .../spark/streaming/dstream/DStream.scala | 2 +- .../apache/spark/streaming/util/Utils.scala | 34 +++++++++++++++++++ 4 files changed, 39 insertions(+), 11 deletions(-) create mode 100644 streaming/src/main/scala/org/apache/spark/streaming/util/Utils.scala diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 172f447023e8d..46828bfec0593 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -859,21 +859,13 @@ private[spark] object Utils extends Logging { * finding the call site of a method. */ private val SPARK_CLASS_REGEX = """^org\.apache\.spark(\.api\.java)?(\.util)?(\.rdd)?\.[A-Z]""".r - private val SCALA_CLASS_REGEX = """^scala""".r - private val SPARK_STREAMING_CLASS_REGEX = """^org\.apache\.spark""".r - private val SPARK_EXAMPLES_CLASS_REGEX = """^org\.apache\.spark\.examples""".r + val SCALA_CLASS_REGEX = """^scala""".r private def defaultRegexFunc(className: String): Boolean = { SPARK_CLASS_REGEX.findFirstIn(className).isDefined || SCALA_CLASS_REGEX.findFirstIn(className).isDefined } - def streamingRegexFunc(className: String): Boolean = { - (SPARK_STREAMING_CLASS_REGEX.findFirstIn(className).isDefined && - !SPARK_EXAMPLES_CLASS_REGEX.findFirstIn(className).isDefined) || - SCALA_CLASS_REGEX.findFirstIn(className).isDefined - } - /** * When called inside a class in the spark package, returns the name of the user code class * (outside the spark package) that called into Spark, as well as which Spark method they called. diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index 6361e29fb1d4a..b1fbbe354cc96 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -441,7 +441,9 @@ class StreamingContext private[streaming] ( throw new SparkException("StreamingContext has already been stopped") } validate() - sc.setCallSite(Utils.getCallSite(Utils.streamingRegexFunc).shortForm) + sc.setCallSite( + Utils.getCallSite(org.apache.spark.streaming.util.Utils.streamingRegexFunc).shortForm + ) scheduler.start() state = Started } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala index ef7556f13e883..cdaaa55978cf0 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala @@ -107,7 +107,7 @@ abstract class DStream[T: ClassTag] ( def context = ssc /* Find the creation callSite */ - val creationSite = Utils.getCallSite(Utils.streamingRegexFunc) + val creationSite = Utils.getCallSite(org.apache.spark.streaming.util.Utils.streamingRegexFunc) /* Store the RDD creation callSite in threadlocal */ private def setRDDCreationCallSite(callSite: CallSite = creationSite) = { diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/Utils.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/Utils.scala new file mode 100644 index 0000000000000..e0458ea41b9c7 --- /dev/null +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/Utils.scala @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.util + +import org.apache.spark.util.Utils.SCALA_CLASS_REGEX + +/** + * Utility method used by Spark Streaming. + */ +private[streaming] object Utils { + private val SPARK_STREAMING_CLASS_REGEX = """^org\.apache\.spark""".r + private val SPARK_EXAMPLES_CLASS_REGEX = """^org\.apache\.spark\.examples""".r + + def streamingRegexFunc(className: String): Boolean = { + (SPARK_STREAMING_CLASS_REGEX.findFirstIn(className).isDefined && + !SPARK_EXAMPLES_CLASS_REGEX.findFirstIn(className).isDefined) || + SCALA_CLASS_REGEX.findFirstIn(className).isDefined + } +}