Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -539,7 +539,7 @@ abstract class DStream[T: ClassTag] (

/** Return a new DStream containing only the elements that satisfy a predicate. */
def filter(filterFunc: T => Boolean): DStream[T] = ssc.withScope {
new FilteredDStream(this, filterFunc)
new FilteredDStream(this, context.sparkContext.clean(filterFunc))
}

/**
Expand Down Expand Up @@ -576,7 +576,8 @@ abstract class DStream[T: ClassTag] (
* of this DStream.
*/
def reduce(reduceFunc: (T, T) => T): DStream[T] = ssc.withScope {
this.map(x => (null, x)).reduceByKey(reduceFunc, 1).map(_._2)
val cleanedReduceFunc = context.sparkContext.clean(reduceFunc)
this.map(x => (null, x)).reduceByKey(cleanedReduceFunc, 1).map(_._2)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one's actually not necessary because it will be cleaned by reduceByKey later

}

/**
Expand Down Expand Up @@ -607,7 +608,7 @@ abstract class DStream[T: ClassTag] (
*/
@deprecated("use foreachRDD", "0.9.0")
def foreach(foreachFunc: RDD[T] => Unit): Unit = ssc.withScope {
this.foreachRDD(foreachFunc)
this.foreachRDD(context.sparkContext.clean(foreachFunc))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not needed

}

/**
Expand All @@ -616,15 +617,16 @@ abstract class DStream[T: ClassTag] (
*/
@deprecated("use foreachRDD", "0.9.0")
def foreach(foreachFunc: (RDD[T], Time) => Unit): Unit = ssc.withScope {
this.foreachRDD(foreachFunc)
this.foreachRDD(context.sparkContext.clean(foreachFunc))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not needed

}

/**
* Apply a function to each RDD in this DStream. This is an output operator, so
* 'this' DStream will be registered as an output stream and therefore materialized.
*/
def foreachRDD(foreachFunc: RDD[T] => Unit): Unit = ssc.withScope {
this.foreachRDD((r: RDD[T], t: Time) => foreachFunc(r))
val cleanedForeachFunc = context.sparkContext.clean(foreachFunc, false)
this.foreachRDD((r: RDD[T], t: Time) => cleanedForeachFunc(r))
}

/**
Expand Down Expand Up @@ -788,9 +790,10 @@ abstract class DStream[T: ClassTag] (
windowDuration: Duration,
slideDuration: Duration
): DStream[T] = ssc.withScope {
this.map(x => (1, x))
.reduceByKeyAndWindow(reduceFunc, invReduceFunc, windowDuration, slideDuration, 1)
.map(_._2)
val cleanedReduceFunc = context.sparkContext.clean(reduceFunc)
val cleanedInvReduceFunc = context.sparkContext.clean(invReduceFunc)
this.map(x => (1, x)).reduceByKeyAndWindow(
cleanedReduceFunc, cleanedInvReduceFunc, windowDuration, slideDuration, 1).map(_._2)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not needed

}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
{
private[streaming] def ssc = self.ssc

private[streaming] def sparkContext = self.context.sparkContext

private[streaming] def defaultPartitioner(numPartitions: Int = self.ssc.sc.defaultParallelism) = {
new HashPartitioner(numPartitions)
}
Expand Down Expand Up @@ -76,7 +78,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
* with Spark's default number of partitions.
*/
def reduceByKey(reduceFunc: (V, V) => V): DStream[(K, V)] = ssc.withScope {
reduceByKey(reduceFunc, defaultPartitioner())
reduceByKey(sparkContext.clean(reduceFunc), defaultPartitioner())
}

/**
Expand All @@ -87,7 +89,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
def reduceByKey(
reduceFunc: (V, V) => V,
numPartitions: Int): DStream[(K, V)] = ssc.withScope {
reduceByKey(reduceFunc, defaultPartitioner(numPartitions))
reduceByKey(sparkContext.clean(reduceFunc), defaultPartitioner(numPartitions))
}

/**
Expand All @@ -98,7 +100,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
def reduceByKey(
reduceFunc: (V, V) => V,
partitioner: Partitioner): DStream[(K, V)] = ssc.withScope {
val cleanedReduceFunc = ssc.sc.clean(reduceFunc)
val cleanedReduceFunc = sparkContext.clean(reduceFunc)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cleaning in all of these reduceByKeys is not needed because we clean them later in combineByKey

combineByKey((v: V) => v, cleanedReduceFunc, cleanedReduceFunc, partitioner)
}

Expand All @@ -113,7 +115,12 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
mergeCombiner: (C, C) => C,
partitioner: Partitioner,
mapSideCombine: Boolean = true): DStream[(K, C)] = ssc.withScope {
new ShuffledDStream[K, V, C](self, createCombiner, mergeValue, mergeCombiner, partitioner,
new ShuffledDStream[K, V, C](
self,
sparkContext.clean(createCombiner),
sparkContext.clean(mergeValue),
sparkContext.clean(mergeCombiner),
partitioner,
mapSideCombine)
}

Expand Down Expand Up @@ -202,7 +209,8 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
reduceFunc: (V, V) => V,
windowDuration: Duration
): DStream[(K, V)] = ssc.withScope {
reduceByKeyAndWindow(reduceFunc, windowDuration, self.slideDuration, defaultPartitioner())
reduceByKeyAndWindow(sparkContext.clean(reduceFunc),
windowDuration, self.slideDuration, defaultPartitioner())
}

/**
Expand All @@ -221,7 +229,8 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
windowDuration: Duration,
slideDuration: Duration
): DStream[(K, V)] = ssc.withScope {
reduceByKeyAndWindow(reduceFunc, windowDuration, slideDuration, defaultPartitioner())
reduceByKeyAndWindow(sparkContext.clean(reduceFunc),
windowDuration, slideDuration, defaultPartitioner())
}

/**
Expand All @@ -242,7 +251,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
slideDuration: Duration,
numPartitions: Int
): DStream[(K, V)] = ssc.withScope {
reduceByKeyAndWindow(reduceFunc, windowDuration, slideDuration,
reduceByKeyAndWindow(sparkContext.clean(reduceFunc), windowDuration, slideDuration,
defaultPartitioner(numPartitions))
}

Expand All @@ -264,7 +273,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
slideDuration: Duration,
partitioner: Partitioner
): DStream[(K, V)] = ssc.withScope {
val cleanedReduceFunc = ssc.sc.clean(reduceFunc)
val cleanedReduceFunc = sparkContext.clean(reduceFunc)
self.reduceByKey(cleanedReduceFunc, partitioner)
.window(windowDuration, slideDuration)
.reduceByKey(cleanedReduceFunc, partitioner)
Expand Down Expand Up @@ -298,9 +307,12 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
numPartitions: Int = ssc.sc.defaultParallelism,
filterFunc: ((K, V)) => Boolean = null
): DStream[(K, V)] = ssc.withScope {
val cleanedReduceFunc = sparkContext.clean(reduceFunc)
val cleanedInvReduceFunc = sparkContext.clean(invReduceFunc)
val cleanedFilterFunc = if (filterFunc != null) sparkContext.clean(filterFunc) else null
reduceByKeyAndWindow(
reduceFunc, invReduceFunc, windowDuration,
slideDuration, defaultPartitioner(numPartitions), filterFunc
cleanedReduceFunc, cleanedInvReduceFunc, windowDuration, slideDuration,
defaultPartitioner(numPartitions), cleanedFilterFunc
)
}

Expand Down Expand Up @@ -463,7 +475,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
* 'this' DStream without changing the key.
*/
def mapValues[U: ClassTag](mapValuesFunc: V => U): DStream[(K, U)] = ssc.withScope {
new MapValuedDStream[K, V, U](self, mapValuesFunc)
new MapValuedDStream[K, V, U](self, sparkContext.clean(mapValuesFunc))
}

/**
Expand All @@ -473,7 +485,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
def flatMapValues[U: ClassTag](
flatMapValuesFunc: V => TraversableOnce[U]
): DStream[(K, U)] = ssc.withScope {
new FlatMapValuedDStream[K, V, U](self, flatMapValuesFunc)
new FlatMapValuedDStream[K, V, U](self, sparkContext.clean(flatMapValuesFunc))
}

/**
Expand Down