Skip to content

Commit 074c00b

Browse files
author
Andrew Or
committed
Review comments
1 parent d25a324 commit 074c00b

File tree

17 files changed

+30
-31
lines changed

17 files changed

+30
-31
lines changed

core/src/main/scala/org/apache/spark/rdd/RDDOperationScope.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,8 @@ import org.apache.spark.{Logging, SparkContext}
4343
@JsonPropertyOrder(Array("id", "name", "parent"))
4444
private[spark] class RDDOperationScope(
4545
val name: String,
46-
val id: String = RDDOperationScope.nextScopeId().toString,
47-
val parent: Option[RDDOperationScope] = None) {
46+
val parent: Option[RDDOperationScope] = None,
47+
val id: String = RDDOperationScope.nextScopeId().toString) {
4848

4949
def toJson: String = {
5050
RDDOperationScope.jsonMapper.writeValueAsString(this)
@@ -96,7 +96,7 @@ private[spark] object RDDOperationScope extends Logging {
9696
sc: SparkContext,
9797
allowNesting: Boolean = false)(body: => T): T = {
9898
val stackTrace = Thread.currentThread.getStackTrace().tail // ignore "Thread#getStackTrace"
99-
val ourMethodName = stackTrace(1).getMethodName
99+
val ourMethodName = stackTrace(1).getMethodName //
100100
// Climb upwards to find the first method that's called something different
101101
val callerMethodName = stackTrace
102102
.find(_.getMethodName != ourMethodName)
@@ -139,7 +139,7 @@ private[spark] object RDDOperationScope extends Logging {
139139
sc.setLocalProperty(scopeKey, new RDDOperationScope(name).toJson)
140140
} else if (sc.getLocalProperty(noOverrideKey) == null) {
141141
// Otherwise, set the scope only if the higher level caller allows us to do so
142-
sc.setLocalProperty(scopeKey, new RDDOperationScope(name, parent = oldScope).toJson)
142+
sc.setLocalProperty(scopeKey, new RDDOperationScope(name, oldScope).toJson)
143143
}
144144
// Optionally disallow the child body to override our scope
145145
if (!allowNesting) {

core/src/test/scala/org/apache/spark/rdd/RDDOperationScopeSuite.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,8 @@ import org.apache.spark.{TaskContext, Partition, SparkContext}
2727
class RDDOperationScopeSuite extends FunSuite with BeforeAndAfter {
2828
private var sc: SparkContext = null
2929
private val scope1 = new RDDOperationScope("scope1")
30-
private val scope2 = new RDDOperationScope("scope2", parent = Some(scope1))
31-
private val scope3 = new RDDOperationScope("scope3", parent = Some(scope2))
30+
private val scope2 = new RDDOperationScope("scope2", Some(scope1))
31+
private val scope3 = new RDDOperationScope("scope3", Some(scope2))
3232

3333
before {
3434
sc = new SparkContext("local", "test")

external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ class FlumeInputDStream[T: ClassTag](
5050
enableDecompression: Boolean
5151
) extends ReceiverInputDStream[SparkFlumeEvent](ssc_) {
5252

53-
protected override val customScopeName: Option[String] = Some(s"input stream [$id]")
53+
protected[streaming] override val customScopeName: Option[String] = Some(s"input stream [$id]")
5454

5555
override def getReceiver(): Receiver[SparkFlumeEvent] = {
5656
new FlumeReceiver(host, port, storageLevel, enableDecompression)

external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ private[streaming] class FlumePollingInputDStream[T: ClassTag](
5353
storageLevel: StorageLevel
5454
) extends ReceiverInputDStream[SparkFlumeEvent](_ssc) {
5555

56-
protected override val customScopeName: Option[String] = Some(s"flume polling stream [$id]")
56+
protected[streaming] override val customScopeName: Option[String] = Some(s"flume polling stream [$id]")
5757

5858
override def getReceiver(): Receiver[SparkFlumeEvent] = {
5959
new FlumePollingReceiver(addresses, maxBatchSize, parallelism, storageLevel)

external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ class DirectKafkaInputDStream[
6565
val maxRetries = context.sparkContext.getConf.getInt(
6666
"spark.streaming.kafka.maxRetries", 1)
6767

68-
protected override val customScopeName: Option[String] = Some(s"kafka direct stream [$id]")
68+
protected[streaming] override val customScopeName: Option[String] = Some(s"kafka direct stream [$id]")
6969

7070
protected[streaming] override val checkpointData =
7171
new DirectKafkaInputDStreamCheckpointData

external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ class KafkaInputDStream[
5555
storageLevel: StorageLevel
5656
) extends ReceiverInputDStream[(K, V)](ssc_) with Logging {
5757

58-
protected override val customScopeName: Option[String] = Some(s"kafka stream [$id]")
58+
protected[streaming] override val customScopeName: Option[String] = Some(s"kafka stream [$id]")
5959

6060
def getReceiver(): Receiver[(K, V)] = {
6161
if (!useReliableReceiver) {

external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ class MQTTInputDStream(
5757
storageLevel: StorageLevel
5858
) extends ReceiverInputDStream[String](ssc_) {
5959

60-
protected override val customScopeName: Option[String] = Some(s"MQTT stream [$id]")
60+
protected[streaming] override val customScopeName: Option[String] = Some(s"MQTT stream [$id]")
6161

6262
def getReceiver(): Receiver[String] = {
6363
new MQTTReceiver(brokerUrl, topic, storageLevel)

external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ class TwitterInputDStream(
4545
storageLevel: StorageLevel
4646
) extends ReceiverInputDStream[Status](ssc_) {
4747

48-
protected override val customScopeName: Option[String] = Some(s"twitter stream [$id]")
48+
protected[streaming] override val customScopeName: Option[String] = Some(s"twitter stream [$id]")
4949

5050
private def createOAuthAuthorization(): Authorization = {
5151
new OAuthAuthorization(new ConfigurationBuilder().build())

streaming/src/main/scala/org/apache/spark/streaming/dstream/ConstantInputDStream.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ import scala.reflect.ClassTag
2727
class ConstantInputDStream[T: ClassTag](ssc_ : StreamingContext, rdd: RDD[T])
2828
extends InputDStream[T](ssc_) {
2929

30-
protected override val customScopeName: Option[String] = Some(s"constant stream [$id]")
30+
protected[streaming] override val customScopeName: Option[String] = Some(s"constant stream [$id]")
3131

3232
override def start() {}
3333

streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ abstract class DStream[T: ClassTag] (
129129
* An optional custom name for all scopes generated by this DStream.
130130
* If None, the name of the operation that created this DStream will be used.
131131
*/
132-
protected val customScopeName: Option[String] = None
132+
protected[streaming] val customScopeName: Option[String] = None
133133

134134
/**
135135
* Make a scope that groups RDDs created in the same DStream operation in the same batch.
@@ -152,7 +152,7 @@ abstract class DStream[T: ClassTag] (
152152
s"$baseName @ $formattedBatchTime"
153153
}
154154
val scopeId = s"${bscope.id}_${time.milliseconds}"
155-
new RDDOperationScope(scopeName, scopeId)
155+
new RDDOperationScope(scopeName, id = scopeId)
156156
}
157157
}
158158

@@ -347,22 +347,22 @@ abstract class DStream[T: ClassTag] (
347347
// Compute the RDD if time is valid (e.g. correct time in a sliding window)
348348
// of RDD generation, else generate nothing.
349349
if (isTimeValid(time)) {
350-
val newRDD = doCompute(time)
350+
val rddOption = doCompute(time)
351351

352352
// Register the generated RDD for caching and checkpointing
353-
newRDD.foreach { case rdd =>
353+
rddOption.foreach { case newRDD =>
354354
if (storageLevel != StorageLevel.NONE) {
355-
rdd.persist(storageLevel)
356-
logDebug(s"Persisting RDD ${rdd.id} for time $time to $storageLevel")
355+
newRDD.persist(storageLevel)
356+
logDebug(s"Persisting RDD ${newRDD.id} for time $time to $storageLevel")
357357
}
358358
if (checkpointDuration != null && (time - zeroTime).isMultipleOf(checkpointDuration)) {
359-
rdd.checkpoint()
360-
logInfo(s"Marking RDD ${rdd.id} for time $time for checkpointing")
359+
newRDD.checkpoint()
360+
logInfo(s"Marking RDD ${newRDD.id} for time $time for checkpointing")
361361
}
362-
generatedRDDs.put(time, rdd)
362+
generatedRDDs.put(time, newRDD)
363363
}
364364

365-
newRDD
365+
rddOption
366366
} else {
367367
None
368368
}
@@ -380,7 +380,6 @@ abstract class DStream[T: ClassTag] (
380380
// thread-local properties in our SparkContext. Since this method may be called from another
381381
// DStream, we need to temporarily store any old scope and creation site information to
382382
// restore them later after setting our own.
383-
// TODO: this won't work if multiple StreamingContexts share the same SparkContext
384383
val prevCallSite = ssc.sparkContext.getCallSite()
385384
val prevScope = ssc.sparkContext.getLocalProperty(scopeKey)
386385
val prevScopeNoOverride = ssc.sparkContext.getLocalProperty(scopeNoOverrideKey)

0 commit comments

Comments
 (0)