Skip to content

Commit 5aedfa2

Browse files
zsxwingtdas
authored andcommitted
[SPARK-8404] [STREAMING] [TESTS] Use thread-safe collections to make the tests more reliable
KafkaStreamSuite, DirectKafkaStreamSuite, JavaKafkaStreamSuite and JavaDirectKafkaStreamSuite use non-thread-safe collections to collect data in one thread and check it in another thread. It may fail the tests. This PR changes them to thread-safe collections. Note: I cannot reproduce the test failures in my environment. But at least, this PR should make the tests more reliable. Author: zsxwing <[email protected]> Closes #6852 from zsxwing/fix-KafkaStreamSuite and squashes the following commits: d464211 [zsxwing] Use thread-safe collections to make the tests more reliable (cherry picked from commit a06d9c8) Signed-off-by: Tathagata Das <[email protected]>
1 parent 5e7973d commit 5aedfa2

File tree

4 files changed

+14
-19
lines changed

4 files changed

+14
-19
lines changed

external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,7 @@
1818
package org.apache.spark.streaming.kafka;
1919

2020
import java.io.Serializable;
21-
import java.util.HashMap;
22-
import java.util.HashSet;
23-
import java.util.Arrays;
21+
import java.util.*;
2422

2523
import scala.Tuple2;
2624

@@ -116,7 +114,7 @@ public String call(MessageAndMetadata<String, String> msgAndMd) throws Exception
116114
);
117115
JavaDStream<String> unifiedStream = stream1.union(stream2);
118116

119-
final HashSet<String> result = new HashSet<String>();
117+
final Set<String> result = Collections.synchronizedSet(new HashSet<String>());
120118
unifiedStream.foreachRDD(
121119
new Function<JavaRDD<String>, Void>() {
122120
@Override

external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,7 @@
1818
package org.apache.spark.streaming.kafka;
1919

2020
import java.io.Serializable;
21-
import java.util.HashMap;
22-
import java.util.List;
23-
import java.util.Random;
21+
import java.util.*;
2422

2523
import scala.Tuple2;
2624

@@ -94,7 +92,7 @@ public void testKafkaStream() throws InterruptedException {
9492
topics,
9593
StorageLevel.MEMORY_ONLY_SER());
9694

97-
final HashMap<String, Long> result = new HashMap<String, Long>();
95+
final Map<String, Long> result = Collections.synchronizedMap(new HashMap<String, Long>());
9896

9997
JavaDStream<String> words = stream.map(
10098
new Function<Tuple2<String, String>, String>() {

external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,8 @@ class DirectKafkaStreamSuite
9999
ssc, kafkaParams, topics)
100100
}
101101

102-
val allReceived = new ArrayBuffer[(String, String)]
102+
val allReceived =
103+
new ArrayBuffer[(String, String)] with mutable.SynchronizedBuffer[(String, String)]
103104

104105
stream.foreachRDD { rdd =>
105106
// Get the offset ranges in the RDD
@@ -162,7 +163,7 @@ class DirectKafkaStreamSuite
162163
"Start offset not from latest"
163164
)
164165

165-
val collectedData = new mutable.ArrayBuffer[String]()
166+
val collectedData = new mutable.ArrayBuffer[String]() with mutable.SynchronizedBuffer[String]
166167
stream.map { _._2 }.foreachRDD { rdd => collectedData ++= rdd.collect() }
167168
ssc.start()
168169
val newData = Map("b" -> 10)
@@ -208,7 +209,7 @@ class DirectKafkaStreamSuite
208209
"Start offset not from latest"
209210
)
210211

211-
val collectedData = new mutable.ArrayBuffer[String]()
212+
val collectedData = new mutable.ArrayBuffer[String]() with mutable.SynchronizedBuffer[String]
212213
stream.foreachRDD { rdd => collectedData ++= rdd.collect() }
213214
ssc.start()
214215
val newData = Map("b" -> 10)
@@ -324,7 +325,8 @@ class DirectKafkaStreamSuite
324325
ssc, kafkaParams, Set(topic))
325326
}
326327

327-
val allReceived = new ArrayBuffer[(String, String)]
328+
val allReceived =
329+
new ArrayBuffer[(String, String)] with mutable.SynchronizedBuffer[(String, String)]
328330

329331
stream.foreachRDD { rdd => allReceived ++= rdd.collect() }
330332
ssc.start()
@@ -350,8 +352,8 @@ class DirectKafkaStreamSuite
350352
}
351353

352354
object DirectKafkaStreamSuite {
353-
val collectedData = new mutable.ArrayBuffer[String]()
354-
var total = -1L
355+
val collectedData = new mutable.ArrayBuffer[String]() with mutable.SynchronizedBuffer[String]
356+
@volatile var total = -1L
355357

356358
class InputInfoCollector extends StreamingListener {
357359
val numRecordsSubmitted = new AtomicLong(0L)

external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ class KafkaStreamSuite extends SparkFunSuite with Eventually with BeforeAndAfter
6565

6666
val stream = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
6767
ssc, kafkaParams, Map(topic -> 1), StorageLevel.MEMORY_ONLY)
68-
val result = new mutable.HashMap[String, Long]()
68+
val result = new mutable.HashMap[String, Long]() with mutable.SynchronizedMap[String, Long]
6969
stream.map(_._2).countByValue().foreachRDD { r =>
7070
val ret = r.collect()
7171
ret.toMap.foreach { kv =>
@@ -77,10 +77,7 @@ class KafkaStreamSuite extends SparkFunSuite with Eventually with BeforeAndAfter
7777
ssc.start()
7878

7979
eventually(timeout(10000 milliseconds), interval(100 milliseconds)) {
80-
assert(sent.size === result.size)
81-
sent.keys.foreach { k =>
82-
assert(sent(k) === result(k).toInt)
83-
}
80+
assert(sent === result)
8481
}
8582
}
8683
}

0 commit comments

Comments
 (0)