diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala index 5977481e1f08..5a90a2b9be0b 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala @@ -539,7 +539,7 @@ abstract class DStream[T: ClassTag] ( /** Return a new DStream containing only the elements that satisfy a predicate. */ def filter(filterFunc: T => Boolean): DStream[T] = ssc.withScope { - new FilteredDStream(this, filterFunc) + new FilteredDStream(this, context.sparkContext.clean(filterFunc)) } /** @@ -576,7 +576,8 @@ abstract class DStream[T: ClassTag] ( * of this DStream. */ def reduce(reduceFunc: (T, T) => T): DStream[T] = ssc.withScope { - this.map(x => (null, x)).reduceByKey(reduceFunc, 1).map(_._2) + val cleanedReduceFunc = context.sparkContext.clean(reduceFunc) + this.map(x => (null, x)).reduceByKey(cleanedReduceFunc, 1).map(_._2) } /** @@ -607,7 +608,7 @@ abstract class DStream[T: ClassTag] ( */ @deprecated("use foreachRDD", "0.9.0") def foreach(foreachFunc: RDD[T] => Unit): Unit = ssc.withScope { - this.foreachRDD(foreachFunc) + this.foreachRDD(context.sparkContext.clean(foreachFunc)) } /** @@ -616,7 +617,7 @@ abstract class DStream[T: ClassTag] ( */ @deprecated("use foreachRDD", "0.9.0") def foreach(foreachFunc: (RDD[T], Time) => Unit): Unit = ssc.withScope { - this.foreachRDD(foreachFunc) + this.foreachRDD(context.sparkContext.clean(foreachFunc)) } /** @@ -624,7 +625,8 @@ abstract class DStream[T: ClassTag] ( * 'this' DStream will be registered as an output stream and therefore materialized. */ def foreachRDD(foreachFunc: RDD[T] => Unit): Unit = ssc.withScope { - this.foreachRDD((r: RDD[T], t: Time) => foreachFunc(r)) + val cleanedForeachFunc = context.sparkContext.clean(foreachFunc, false) + this.foreachRDD((r: RDD[T], t: Time) => cleanedForeachFunc(r)) } /** @@ -788,9 +790,10 @@ abstract class DStream[T: ClassTag] ( windowDuration: Duration, slideDuration: Duration ): DStream[T] = ssc.withScope { - this.map(x => (1, x)) - .reduceByKeyAndWindow(reduceFunc, invReduceFunc, windowDuration, slideDuration, 1) - .map(_._2) + val cleanedReduceFunc = context.sparkContext.clean(reduceFunc) + val cleanedInvReduceFunc = context.sparkContext.clean(invReduceFunc) + this.map(x => (1, x)).reduceByKeyAndWindow( + cleanedReduceFunc, cleanedInvReduceFunc, windowDuration, slideDuration, 1).map(_._2) } /** diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala index 884a8e8b5228..f653bb448db9 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala @@ -38,6 +38,8 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) { private[streaming] def ssc = self.ssc + private[streaming] def sparkContext = self.context.sparkContext + private[streaming] def defaultPartitioner(numPartitions: Int = self.ssc.sc.defaultParallelism) = { new HashPartitioner(numPartitions) } @@ -76,7 +78,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) * with Spark's default number of partitions. */ def reduceByKey(reduceFunc: (V, V) => V): DStream[(K, V)] = ssc.withScope { - reduceByKey(reduceFunc, defaultPartitioner()) + reduceByKey(sparkContext.clean(reduceFunc), defaultPartitioner()) } /** @@ -87,7 +89,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) def reduceByKey( reduceFunc: (V, V) => V, numPartitions: Int): DStream[(K, V)] = ssc.withScope { - reduceByKey(reduceFunc, defaultPartitioner(numPartitions)) + reduceByKey(sparkContext.clean(reduceFunc), defaultPartitioner(numPartitions)) } /** @@ -98,7 +100,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) def reduceByKey( reduceFunc: (V, V) => V, partitioner: Partitioner): DStream[(K, V)] = ssc.withScope { - val cleanedReduceFunc = ssc.sc.clean(reduceFunc) + val cleanedReduceFunc = sparkContext.clean(reduceFunc) combineByKey((v: V) => v, cleanedReduceFunc, cleanedReduceFunc, partitioner) } @@ -113,7 +115,12 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) mergeCombiner: (C, C) => C, partitioner: Partitioner, mapSideCombine: Boolean = true): DStream[(K, C)] = ssc.withScope { - new ShuffledDStream[K, V, C](self, createCombiner, mergeValue, mergeCombiner, partitioner, + new ShuffledDStream[K, V, C]( + self, + sparkContext.clean(createCombiner), + sparkContext.clean(mergeValue), + sparkContext.clean(mergeCombiner), + partitioner, mapSideCombine) } @@ -202,7 +209,8 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) reduceFunc: (V, V) => V, windowDuration: Duration ): DStream[(K, V)] = ssc.withScope { - reduceByKeyAndWindow(reduceFunc, windowDuration, self.slideDuration, defaultPartitioner()) + reduceByKeyAndWindow(sparkContext.clean(reduceFunc), + windowDuration, self.slideDuration, defaultPartitioner()) } /** @@ -221,7 +229,8 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) windowDuration: Duration, slideDuration: Duration ): DStream[(K, V)] = ssc.withScope { - reduceByKeyAndWindow(reduceFunc, windowDuration, slideDuration, defaultPartitioner()) + reduceByKeyAndWindow(sparkContext.clean(reduceFunc), + windowDuration, slideDuration, defaultPartitioner()) } /** @@ -242,7 +251,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) slideDuration: Duration, numPartitions: Int ): DStream[(K, V)] = ssc.withScope { - reduceByKeyAndWindow(reduceFunc, windowDuration, slideDuration, + reduceByKeyAndWindow(sparkContext.clean(reduceFunc), windowDuration, slideDuration, defaultPartitioner(numPartitions)) } @@ -264,7 +273,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) slideDuration: Duration, partitioner: Partitioner ): DStream[(K, V)] = ssc.withScope { - val cleanedReduceFunc = ssc.sc.clean(reduceFunc) + val cleanedReduceFunc = sparkContext.clean(reduceFunc) self.reduceByKey(cleanedReduceFunc, partitioner) .window(windowDuration, slideDuration) .reduceByKey(cleanedReduceFunc, partitioner) @@ -298,9 +307,12 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) numPartitions: Int = ssc.sc.defaultParallelism, filterFunc: ((K, V)) => Boolean = null ): DStream[(K, V)] = ssc.withScope { + val cleanedReduceFunc = sparkContext.clean(reduceFunc) + val cleanedInvReduceFunc = sparkContext.clean(invReduceFunc) + val cleanedFilterFunc = if (filterFunc != null) sparkContext.clean(filterFunc) else null reduceByKeyAndWindow( - reduceFunc, invReduceFunc, windowDuration, - slideDuration, defaultPartitioner(numPartitions), filterFunc + cleanedReduceFunc, cleanedInvReduceFunc, windowDuration, slideDuration, + defaultPartitioner(numPartitions), cleanedFilterFunc ) } @@ -463,7 +475,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) * 'this' DStream without changing the key. */ def mapValues[U: ClassTag](mapValuesFunc: V => U): DStream[(K, U)] = ssc.withScope { - new MapValuedDStream[K, V, U](self, mapValuesFunc) + new MapValuedDStream[K, V, U](self, sparkContext.clean(mapValuesFunc)) } /** @@ -473,7 +485,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) def flatMapValues[U: ClassTag]( flatMapValuesFunc: V => TraversableOnce[U] ): DStream[(K, U)] = ssc.withScope { - new FlatMapValuedDStream[K, V, U](self, flatMapValuesFunc) + new FlatMapValuedDStream[K, V, U](self, sparkContext.clean(flatMapValuesFunc)) } /**