Skip to content

Commit de13e0b

Browse files
Andrew Orjeanlyn
authored andcommitted
[SPARK-7237] [SPARK-7741] [CORE] [STREAMING] Clean more closures that need cleaning
SPARK-7741 is the equivalent of SPARK-7237 in streaming. This is an alternative to apache#6268. Author: Andrew Or <[email protected]> Closes apache#6269 from andrewor14/clean-moar and squashes the following commits: c51c9ab [Andrew Or] Add periods (trivial) 6c686ac [Andrew Or] Merge branch 'master' of github.com:apache/spark into clean-moar 79a435b [Andrew Or] Fix tests d18c9f9 [Andrew Or] Merge branch 'master' of github.com:apache/spark into clean-moar 65ef07b [Andrew Or] Fix tests? 4b487a3 [Andrew Or] Add tests for closures passed to DStream operations 328139b [Andrew Or] Do not forget foreachRDD 5431f61 [Andrew Or] Clean streaming closures 72b7b73 [Andrew Or] Clean core closures
1 parent 5be24df commit de13e0b

File tree

9 files changed

+249
-37
lines changed

9 files changed

+249
-37
lines changed

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1159,8 +1159,8 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
11591159
kcf: () => WritableConverter[K], vcf: () => WritableConverter[V]): RDD[(K, V)] = {
11601160
withScope {
11611161
assertNotStopped()
1162-
val kc = kcf()
1163-
val vc = vcf()
1162+
val kc = clean(kcf)()
1163+
val vc = clean(vcf)()
11641164
val format = classOf[SequenceFileInputFormat[Writable, Writable]]
11651165
val writables = hadoopFile(path, format,
11661166
kc.writableClass(km).asInstanceOf[Class[Writable]],

core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -296,6 +296,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
296296
* before sending results to a reducer, similarly to a "combiner" in MapReduce.
297297
*/
298298
def reduceByKeyLocally(func: (V, V) => V): Map[K, V] = self.withScope {
299+
val cleanedF = self.sparkContext.clean(func)
299300

300301
if (keyClass.isArray) {
301302
throw new SparkException("reduceByKeyLocally() does not support array keys")
@@ -305,15 +306,15 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
305306
val map = new JHashMap[K, V]
306307
iter.foreach { pair =>
307308
val old = map.get(pair._1)
308-
map.put(pair._1, if (old == null) pair._2 else func(old, pair._2))
309+
map.put(pair._1, if (old == null) pair._2 else cleanedF(old, pair._2))
309310
}
310311
Iterator(map)
311312
} : Iterator[JHashMap[K, V]]
312313

313314
val mergeMaps = (m1: JHashMap[K, V], m2: JHashMap[K, V]) => {
314315
m2.foreach { pair =>
315316
val old = m1.get(pair._1)
316-
m1.put(pair._1, if (old == null) pair._2 else func(old, pair._2))
317+
m1.put(pair._1, if (old == null) pair._2 else cleanedF(old, pair._2))
317318
}
318319
m1
319320
} : JHashMap[K, V]

core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,7 @@ class ClosureCleanerSuite extends FunSuite {
112112
expectCorrectException { TestUserClosuresActuallyCleaned.testAggregateByKey(pairRdd) }
113113
expectCorrectException { TestUserClosuresActuallyCleaned.testFoldByKey(pairRdd) }
114114
expectCorrectException { TestUserClosuresActuallyCleaned.testReduceByKey(pairRdd) }
115+
expectCorrectException { TestUserClosuresActuallyCleaned.testReduceByKeyLocally(pairRdd) }
115116
expectCorrectException { TestUserClosuresActuallyCleaned.testMapValues(pairRdd) }
116117
expectCorrectException { TestUserClosuresActuallyCleaned.testFlatMapValues(pairRdd) }
117118
expectCorrectException { TestUserClosuresActuallyCleaned.testForeachAsync(rdd) }
@@ -315,6 +316,9 @@ private object TestUserClosuresActuallyCleaned {
315316
}
316317
def testFoldByKey(rdd: RDD[(Int, Int)]): Unit = { rdd.foldByKey(0) { case (_, _) => return; 1 } }
317318
def testReduceByKey(rdd: RDD[(Int, Int)]): Unit = { rdd.reduceByKey { case (_, _) => return; 1 } }
319+
def testReduceByKeyLocally(rdd: RDD[(Int, Int)]): Unit = {
320+
rdd.reduceByKeyLocally { case (_, _) => return; 1 }
321+
}
318322
def testMapValues(rdd: RDD[(Int, Int)]): Unit = { rdd.mapValues { _ => return; 1 } }
319323
def testFlatMapValues(rdd: RDD[(Int, Int)]): Unit = { rdd.flatMapValues { _ => return; Seq() } }
320324

streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -255,7 +255,7 @@ class StreamingContext private[streaming] (
255255
*
256256
* Note: Return statements are NOT allowed in the given body.
257257
*/
258-
private[streaming] def withNamedScope[U](name: String)(body: => U): U = {
258+
private def withNamedScope[U](name: String)(body: => U): U = {
259259
RDDOperationScope.withScope(sc, name, allowNesting = false, ignoreParent = false)(body)
260260
}
261261

streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -539,7 +539,7 @@ abstract class DStream[T: ClassTag] (
539539

540540
/** Return a new DStream containing only the elements that satisfy a predicate. */
541541
def filter(filterFunc: T => Boolean): DStream[T] = ssc.withScope {
542-
new FilteredDStream(this, filterFunc)
542+
new FilteredDStream(this, context.sparkContext.clean(filterFunc))
543543
}
544544

545545
/**
@@ -624,7 +624,8 @@ abstract class DStream[T: ClassTag] (
624624
* 'this' DStream will be registered as an output stream and therefore materialized.
625625
*/
626626
def foreachRDD(foreachFunc: RDD[T] => Unit): Unit = ssc.withScope {
627-
this.foreachRDD((r: RDD[T], t: Time) => foreachFunc(r))
627+
val cleanedF = context.sparkContext.clean(foreachFunc, false)
628+
this.foreachRDD((r: RDD[T], t: Time) => cleanedF(r))
628629
}
629630

630631
/**

streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala

Lines changed: 20 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
3838
{
3939
private[streaming] def ssc = self.ssc
4040

41+
private[streaming] def sparkContext = self.context.sparkContext
42+
4143
private[streaming] def defaultPartitioner(numPartitions: Int = self.ssc.sc.defaultParallelism) = {
4244
new HashPartitioner(numPartitions)
4345
}
@@ -98,8 +100,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
98100
def reduceByKey(
99101
reduceFunc: (V, V) => V,
100102
partitioner: Partitioner): DStream[(K, V)] = ssc.withScope {
101-
val cleanedReduceFunc = ssc.sc.clean(reduceFunc)
102-
combineByKey((v: V) => v, cleanedReduceFunc, cleanedReduceFunc, partitioner)
103+
combineByKey((v: V) => v, reduceFunc, reduceFunc, partitioner)
103104
}
104105

105106
/**
@@ -113,7 +114,15 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
113114
mergeCombiner: (C, C) => C,
114115
partitioner: Partitioner,
115116
mapSideCombine: Boolean = true): DStream[(K, C)] = ssc.withScope {
116-
new ShuffledDStream[K, V, C](self, createCombiner, mergeValue, mergeCombiner, partitioner,
117+
val cleanedCreateCombiner = sparkContext.clean(createCombiner)
118+
val cleanedMergeValue = sparkContext.clean(mergeValue)
119+
val cleanedMergeCombiner = sparkContext.clean(mergeCombiner)
120+
new ShuffledDStream[K, V, C](
121+
self,
122+
cleanedCreateCombiner,
123+
cleanedMergeValue,
124+
cleanedMergeCombiner,
125+
partitioner,
117126
mapSideCombine)
118127
}
119128

@@ -264,10 +273,9 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
264273
slideDuration: Duration,
265274
partitioner: Partitioner
266275
): DStream[(K, V)] = ssc.withScope {
267-
val cleanedReduceFunc = ssc.sc.clean(reduceFunc)
268-
self.reduceByKey(cleanedReduceFunc, partitioner)
276+
self.reduceByKey(reduceFunc, partitioner)
269277
.window(windowDuration, slideDuration)
270-
.reduceByKey(cleanedReduceFunc, partitioner)
278+
.reduceByKey(reduceFunc, partitioner)
271279
}
272280

273281
/**
@@ -385,8 +393,9 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
385393
updateFunc: (Seq[V], Option[S]) => Option[S],
386394
partitioner: Partitioner
387395
): DStream[(K, S)] = ssc.withScope {
396+
val cleanedUpdateF = sparkContext.clean(updateFunc)
388397
val newUpdateFunc = (iterator: Iterator[(K, Seq[V], Option[S])]) => {
389-
iterator.flatMap(t => updateFunc(t._2, t._3).map(s => (t._1, s)))
398+
iterator.flatMap(t => cleanedUpdateF(t._2, t._3).map(s => (t._1, s)))
390399
}
391400
updateStateByKey(newUpdateFunc, partitioner, true)
392401
}
@@ -428,8 +437,9 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
428437
partitioner: Partitioner,
429438
initialRDD: RDD[(K, S)]
430439
): DStream[(K, S)] = ssc.withScope {
440+
val cleanedUpdateF = sparkContext.clean(updateFunc)
431441
val newUpdateFunc = (iterator: Iterator[(K, Seq[V], Option[S])]) => {
432-
iterator.flatMap(t => updateFunc(t._2, t._3).map(s => (t._1, s)))
442+
iterator.flatMap(t => cleanedUpdateF(t._2, t._3).map(s => (t._1, s)))
433443
}
434444
updateStateByKey(newUpdateFunc, partitioner, true, initialRDD)
435445
}
@@ -463,7 +473,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
463473
* 'this' DStream without changing the key.
464474
*/
465475
def mapValues[U: ClassTag](mapValuesFunc: V => U): DStream[(K, U)] = ssc.withScope {
466-
new MapValuedDStream[K, V, U](self, mapValuesFunc)
476+
new MapValuedDStream[K, V, U](self, sparkContext.clean(mapValuesFunc))
467477
}
468478

469479
/**
@@ -473,7 +483,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
473483
def flatMapValues[U: ClassTag](
474484
flatMapValuesFunc: V => TraversableOnce[U]
475485
): DStream[(K, U)] = ssc.withScope {
476-
new FlatMapValuedDStream[K, V, U](self, flatMapValuesFunc)
486+
new FlatMapValuedDStream[K, V, U](self, sparkContext.clean(flatMapValuesFunc))
477487
}
478488

479489
/**
Lines changed: 196 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,196 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.streaming
19+
20+
import java.io.NotSerializableException
21+
22+
import org.scalatest.{BeforeAndAfterAll, FunSuite}
23+
24+
import org.apache.spark.{HashPartitioner, SparkContext, SparkException}
25+
import org.apache.spark.rdd.RDD
26+
import org.apache.spark.streaming.dstream.DStream
27+
import org.apache.spark.util.ReturnStatementInClosureException
28+
29+
/**
30+
* Test that closures passed to DStream operations are actually cleaned.
31+
*/
32+
class DStreamClosureSuite extends FunSuite with BeforeAndAfterAll {
33+
private var ssc: StreamingContext = null
34+
35+
override def beforeAll(): Unit = {
36+
val sc = new SparkContext("local", "test")
37+
ssc = new StreamingContext(sc, Seconds(1))
38+
}
39+
40+
override def afterAll(): Unit = {
41+
ssc.stop(stopSparkContext = true)
42+
ssc = null
43+
}
44+
45+
test("user provided closures are actually cleaned") {
46+
val dstream = new DummyInputDStream(ssc)
47+
val pairDstream = dstream.map { i => (i, i) }
48+
// DStream
49+
testMap(dstream)
50+
testFlatMap(dstream)
51+
testFilter(dstream)
52+
testMapPartitions(dstream)
53+
testReduce(dstream)
54+
testForeach(dstream)
55+
testForeachRDD(dstream)
56+
testTransform(dstream)
57+
testTransformWith(dstream)
58+
testReduceByWindow(dstream)
59+
// PairDStreamFunctions
60+
testReduceByKey(pairDstream)
61+
testCombineByKey(pairDstream)
62+
testReduceByKeyAndWindow(pairDstream)
63+
testUpdateStateByKey(pairDstream)
64+
testMapValues(pairDstream)
65+
testFlatMapValues(pairDstream)
66+
// StreamingContext
67+
testTransform2(ssc, dstream)
68+
}
69+
70+
/**
71+
* Verify that the expected exception is thrown.
72+
*
73+
* We use return statements as an indication that a closure is actually being cleaned.
74+
* We expect closure cleaner to find the return statements in the user provided closures.
75+
*/
76+
private def expectCorrectException(body: => Unit): Unit = {
77+
try {
78+
body
79+
} catch {
80+
case rse: ReturnStatementInClosureException => // Success!
81+
case e @ (_: NotSerializableException | _: SparkException) =>
82+
throw new TestException(
83+
s"Expected ReturnStatementInClosureException, but got $e.\n" +
84+
"This means the closure provided by user is not actually cleaned.")
85+
}
86+
}
87+
88+
// DStream operations
89+
private def testMap(ds: DStream[Int]): Unit = expectCorrectException {
90+
ds.map { _ => return; 1 }
91+
}
92+
private def testFlatMap(ds: DStream[Int]): Unit = expectCorrectException {
93+
ds.flatMap { _ => return; Seq.empty }
94+
}
95+
private def testFilter(ds: DStream[Int]): Unit = expectCorrectException {
96+
ds.filter { _ => return; true }
97+
}
98+
private def testMapPartitions(ds: DStream[Int]): Unit = expectCorrectException {
99+
ds.mapPartitions { _ => return; Seq.empty.toIterator }
100+
}
101+
private def testReduce(ds: DStream[Int]): Unit = expectCorrectException {
102+
ds.reduce { case (_, _) => return; 1 }
103+
}
104+
private def testForeach(ds: DStream[Int]): Unit = {
105+
val foreachF1 = (rdd: RDD[Int], t: Time) => return
106+
val foreachF2 = (rdd: RDD[Int]) => return
107+
expectCorrectException { ds.foreach(foreachF1) }
108+
expectCorrectException { ds.foreach(foreachF2) }
109+
}
110+
private def testForeachRDD(ds: DStream[Int]): Unit = {
111+
val foreachRDDF1 = (rdd: RDD[Int], t: Time) => return
112+
val foreachRDDF2 = (rdd: RDD[Int]) => return
113+
expectCorrectException { ds.foreachRDD(foreachRDDF1) }
114+
expectCorrectException { ds.foreachRDD(foreachRDDF2) }
115+
}
116+
private def testTransform(ds: DStream[Int]): Unit = {
117+
val transformF1 = (rdd: RDD[Int]) => { return; rdd }
118+
val transformF2 = (rdd: RDD[Int], time: Time) => { return; rdd }
119+
expectCorrectException { ds.transform(transformF1) }
120+
expectCorrectException { ds.transform(transformF2) }
121+
}
122+
private def testTransformWith(ds: DStream[Int]): Unit = {
123+
val transformF1 = (rdd1: RDD[Int], rdd2: RDD[Int]) => { return; rdd1 }
124+
val transformF2 = (rdd1: RDD[Int], rdd2: RDD[Int], time: Time) => { return; rdd2 }
125+
expectCorrectException { ds.transformWith(ds, transformF1) }
126+
expectCorrectException { ds.transformWith(ds, transformF2) }
127+
}
128+
private def testReduceByWindow(ds: DStream[Int]): Unit = {
129+
val reduceF = (_: Int, _: Int) => { return; 1 }
130+
expectCorrectException { ds.reduceByWindow(reduceF, Seconds(1), Seconds(2)) }
131+
expectCorrectException { ds.reduceByWindow(reduceF, reduceF, Seconds(1), Seconds(2)) }
132+
}
133+
134+
// PairDStreamFunctions operations
135+
private def testReduceByKey(ds: DStream[(Int, Int)]): Unit = {
136+
val reduceF = (_: Int, _: Int) => { return; 1 }
137+
expectCorrectException { ds.reduceByKey(reduceF) }
138+
expectCorrectException { ds.reduceByKey(reduceF, 5) }
139+
expectCorrectException { ds.reduceByKey(reduceF, new HashPartitioner(5)) }
140+
}
141+
private def testCombineByKey(ds: DStream[(Int, Int)]): Unit = {
142+
expectCorrectException {
143+
ds.combineByKey[Int](
144+
{ _: Int => return; 1 },
145+
{ case (_: Int, _: Int) => return; 1 },
146+
{ case (_: Int, _: Int) => return; 1 },
147+
new HashPartitioner(5)
148+
)
149+
}
150+
}
151+
private def testReduceByKeyAndWindow(ds: DStream[(Int, Int)]): Unit = {
152+
val reduceF = (_: Int, _: Int) => { return; 1 }
153+
val filterF = (_: (Int, Int)) => { return; false }
154+
expectCorrectException { ds.reduceByKeyAndWindow(reduceF, Seconds(1)) }
155+
expectCorrectException { ds.reduceByKeyAndWindow(reduceF, Seconds(1), Seconds(2)) }
156+
expectCorrectException { ds.reduceByKeyAndWindow(reduceF, Seconds(1), Seconds(2), 5) }
157+
expectCorrectException {
158+
ds.reduceByKeyAndWindow(reduceF, Seconds(1), Seconds(2), new HashPartitioner(5))
159+
}
160+
expectCorrectException { ds.reduceByKeyAndWindow(reduceF, reduceF, Seconds(2)) }
161+
expectCorrectException {
162+
ds.reduceByKeyAndWindow(
163+
reduceF, reduceF, Seconds(2), Seconds(3), new HashPartitioner(5), filterF)
164+
}
165+
}
166+
private def testUpdateStateByKey(ds: DStream[(Int, Int)]): Unit = {
167+
val updateF1 = (_: Seq[Int], _: Option[Int]) => { return; Some(1) }
168+
val updateF2 = (_: Iterator[(Int, Seq[Int], Option[Int])]) => { return; Seq((1, 1)).toIterator }
169+
val initialRDD = ds.ssc.sparkContext.emptyRDD[Int].map { i => (i, i) }
170+
expectCorrectException { ds.updateStateByKey(updateF1) }
171+
expectCorrectException { ds.updateStateByKey(updateF1, 5) }
172+
expectCorrectException { ds.updateStateByKey(updateF1, new HashPartitioner(5)) }
173+
expectCorrectException {
174+
ds.updateStateByKey(updateF1, new HashPartitioner(5), initialRDD)
175+
}
176+
expectCorrectException {
177+
ds.updateStateByKey(updateF2, new HashPartitioner(5), true)
178+
}
179+
expectCorrectException {
180+
ds.updateStateByKey(updateF2, new HashPartitioner(5), true, initialRDD)
181+
}
182+
}
183+
private def testMapValues(ds: DStream[(Int, Int)]): Unit = expectCorrectException {
184+
ds.mapValues { _ => return; 1 }
185+
}
186+
private def testFlatMapValues(ds: DStream[(Int, Int)]): Unit = expectCorrectException {
187+
ds.flatMapValues { _ => return; Seq.empty }
188+
}
189+
190+
// StreamingContext operations
191+
private def testTransform2(ssc: StreamingContext, ds: DStream[Int]): Unit = {
192+
val transformF = (rdds: Seq[RDD[_]], time: Time) => { return; ssc.sparkContext.emptyRDD[Int] }
193+
expectCorrectException { ssc.transform(Seq(ds), transformF) }
194+
}
195+
196+
}

0 commit comments

Comments
 (0)