Skip to content

Commit 70f494f

Browse files
committed
Changes for SPARK-1853
1 parent 1500deb commit 70f494f

21 files changed

+82
-88
lines changed

core/src/main/scala/org/apache/spark/rdd/RDD.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1214,9 +1214,9 @@ abstract class RDD[T: ClassTag](
12141214

12151215
/** User code that created this RDD (e.g. `textFile`, `parallelize`). */
12161216
@transient private[spark] val creationSite = {
1217-
val short: String = sc.getLocalProperty(name + Utils.CALL_SITE_SHORT)
1217+
val short: String = sc.getLocalProperty(Utils.CALL_SITE_SHORT)
12181218
if (short != null) {
1219-
CallSite(short, sc.getLocalProperty(name + Utils.CALL_SITE_LONG))
1219+
CallSite(short, sc.getLocalProperty(Utils.CALL_SITE_LONG))
12201220
} else {
12211221
Utils.getCallSite
12221222
}

core/src/main/scala/org/apache/spark/util/Utils.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -838,7 +838,7 @@ private[spark] object Utils extends Logging {
838838
SCALA_CLASS_REGEX.findFirstIn(el.getClassName).isDefined ||
839839
AKKA_CLASS_REGEX.findFirstIn(el.getClassName).isDefined ||
840840
JAVA_CLASS_REGEX.findFirstIn(el.getClassName).isDefined) {
841-
lastSparkMethod = if (el.getMethodName == "<init>") {
841+
lastSparkMethod = if (el.getMethodName == "<init>") {
842842
// Spark method is a constructor; get its class name
843843
el.getClassName.substring(el.getClassName.lastIndexOf('.') + 1)
844844
} else {

streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,6 @@ class StreamingContext private[streaming] (
112112
if (isCheckpointPresent) {
113113
new SparkContext(cp_.sparkConf)
114114
} else {
115-
sc_.setCallSite(Utils.getCallSite.short)
116115
sc_
117116
}
118117
}

streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -106,22 +106,25 @@ abstract class DStream[T: ClassTag] (
106106
/** Return the StreamingContext associated with this DStream */
107107
def context = ssc
108108

109-
private[streaming] val RDD_NAME: String = "rddName";
110-
111-
@transient var name: String = null
112-
113-
/** Assign a name to this DStream */
114-
def setName(_name: String) = {
115-
name = _name
116-
}
117-
118109
/* Find the creation callSite */
119110
val creationSite = Utils.getCallSite
120111

121112
/* Store the creation callSite in threadlocal */
122-
private[streaming] def setCallSite = {
123-
ssc.sparkContext.setLocalProperty(name + Utils.CALL_SITE_SHORT, creationSite.short)
124-
ssc.sparkContext.setLocalProperty(name + Utils.CALL_SITE_LONG, creationSite.long)
113+
private[streaming] def setCreationCallSite() = {
114+
ssc.sparkContext.setLocalProperty(Utils.CALL_SITE_SHORT, creationSite.short)
115+
ssc.sparkContext.setLocalProperty(Utils.CALL_SITE_LONG, creationSite.long)
116+
}
117+
118+
/* Store the supplied callSite in threadlocal */
119+
private[streaming] def setCallSite(callSite: CallSite) = {
120+
ssc.sparkContext.setLocalProperty(Utils.CALL_SITE_SHORT, callSite.short)
121+
ssc.sparkContext.setLocalProperty(Utils.CALL_SITE_LONG, callSite.long)
122+
}
123+
124+
/* Return the current callSite */
125+
private[streaming] def getCallSite() = {
126+
CallSite(ssc.sparkContext.getLocalProperty(Utils.CALL_SITE_SHORT),
127+
ssc.sparkContext.getLocalProperty(Utils.CALL_SITE_LONG))
125128
}
126129

127130
/** Persist the RDDs of this DStream with the given storage level */

streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,6 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
5757
override def start() { }
5858

5959
override def stop() { }
60-
setName("UnionRDD")
6160

6261
/**
6362
* Finds the files that were modified since the last time this method was called and makes
@@ -71,9 +70,8 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
7170
override def compute(validTime: Time): Option[RDD[(K, V)]] = {
7271
assert(validTime.milliseconds >= ignoreTime,
7372
"Trying to get new files for a really old time [" + validTime + " < " + ignoreTime + "]")
74-
75-
setCallSite
76-
ssc.sparkContext.setLocalProperty(RDD_NAME, name)
73+
val prevCallSite = getCallSite
74+
setCreationCallSite
7775
// Find new files
7876
val (newFiles, minNewFileModTime) = findNewFiles(validTime.milliseconds)
7977
logInfo("New files at time " + validTime + ":\n" + newFiles.mkString("\n"))
@@ -83,6 +81,7 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
8381
ignoreTime = minNewFileModTime
8482
}
8583
files += ((validTime, newFiles.toArray))
84+
setCallSite(prevCallSite)
8685
Some(filesToRDD(newFiles))
8786
}
8887

streaming/src/main/scala/org/apache/spark/streaming/dstream/FilteredDStream.scala

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,16 +27,15 @@ class FilteredDStream[T: ClassTag](
2727
filterFunc: T => Boolean
2828
) extends DStream[T](parent.ssc) {
2929

30-
setName("FilteredRDD")
31-
3230
override def dependencies = List(parent)
3331

3432
override def slideDuration: Duration = parent.slideDuration
3533

3634
override def compute(validTime: Time): Option[RDD[T]] = {
37-
setCallSite
35+
val prevCallSite = getCallSite
36+
setCreationCallSite
3837
val rdd: Option[RDD[T]] = parent.getOrCompute(validTime).map(_.filter(filterFunc))
39-
ssc.sparkContext.setLocalProperty(RDD_NAME, name)
38+
setCallSite(prevCallSite)
4039
return rdd
4140
}
4241
}

streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMapValuedDStream.scala

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,16 +28,15 @@ class FlatMapValuedDStream[K: ClassTag, V: ClassTag, U: ClassTag](
2828
flatMapValueFunc: V => TraversableOnce[U]
2929
) extends DStream[(K, U)](parent.ssc) {
3030

31-
setName("FlatMappedValuesRDD")
32-
3331
override def dependencies = List(parent)
3432

3533
override def slideDuration: Duration = parent.slideDuration
3634

3735
override def compute(validTime: Time): Option[RDD[(K, U)]] = {
38-
setCallSite
36+
val prevCallSite = getCallSite
37+
setCreationCallSite
3938
val rdd: Option[RDD[(K, U)]] = parent.getOrCompute(validTime).map(_.flatMapValues[U](flatMapValueFunc))
40-
ssc.sparkContext.setLocalProperty(RDD_NAME, name)
39+
setCallSite(prevCallSite)
4140
return rdd
4241
}
4342
}

streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMappedDStream.scala

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,16 +27,15 @@ class FlatMappedDStream[T: ClassTag, U: ClassTag](
2727
flatMapFunc: T => Traversable[U]
2828
) extends DStream[U](parent.ssc) {
2929

30-
setName("FlatMappedRDD")
31-
3230
override def dependencies = List(parent)
3331

3432
override def slideDuration: Duration = parent.slideDuration
3533

3634
override def compute(validTime: Time): Option[RDD[U]] = {
37-
setCallSite
35+
val prevCallSite = getCallSite
36+
setCreationCallSite
3837
val rdd: Option[RDD[U]] = parent.getOrCompute(validTime).map(_.flatMap(flatMapFunc))
39-
ssc.sparkContext.setLocalProperty(RDD_NAME, name)
38+
setCallSite(prevCallSite)
4039
return rdd
4140
}
4241
}

streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,10 +35,8 @@ class ForEachDStream[T: ClassTag] (
3535
override def compute(validTime: Time): Option[RDD[Unit]] = None
3636

3737
override def generateJob(time: Time): Option[Job] = {
38-
parent.getOrCompute(time) match {
38+
return parent.getOrCompute(time) match {
3939
case Some(rdd) =>
40-
//parent.ssc.sc.setLocalProperty("spark.job.callSiteShort", rdd.creationSite.short)
41-
//parent.ssc.sc.setLocalProperty("spark.job.callSiteLong", rdd.creationSite.long)
4240
val jobFunc = () => {
4341
foreachFunc(rdd, time)
4442
}

streaming/src/main/scala/org/apache/spark/streaming/dstream/GlommedDStream.scala

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,16 +25,15 @@ private[streaming]
2525
class GlommedDStream[T: ClassTag](parent: DStream[T])
2626
extends DStream[Array[T]](parent.ssc) {
2727

28-
setName("GlommedRDD")
29-
3028
override def dependencies = List(parent)
3129

3230
override def slideDuration: Duration = parent.slideDuration
3331

3432
override def compute(validTime: Time): Option[RDD[Array[T]]] = {
35-
setCallSite
33+
val prevCallSite = getCallSite
34+
setCreationCallSite
3635
val rdd: Option[RDD[Array[T]]] = parent.getOrCompute(validTime).map(_.glom())
37-
ssc.sparkContext.setLocalProperty(RDD_NAME, name)
36+
setCallSite(prevCallSite)
3837
return rdd
3938
}
4039
}

0 commit comments

Comments
 (0)