From 31ad69b8aa15a0f167d7a02c93985694082ea407 Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Thu, 1 Dec 2016 11:23:20 -0800 Subject: [PATCH] Fix flaky test: StreamingContextSuite.SPARK-18560 Receiver data should be deserialized properly --- .../org/apache/spark/streaming/StreamingContextSuite.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala index 35eeb9dfa5ef..5645996de5a6 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala @@ -814,10 +814,12 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with Timeo ssc = new StreamingContext(conf, Milliseconds(100)) val input = ssc.receiverStream(new TestReceiver) val latch = new CountDownLatch(1) + @volatile var stopping = false input.count().foreachRDD { rdd => // Make sure we can read from BlockRDD - if (rdd.collect().headOption.getOrElse(0L) > 0) { + if (rdd.collect().headOption.getOrElse(0L) > 0 && !stopping) { // Stop StreamingContext to unblock "awaitTerminationOrTimeout" + stopping = true new Thread() { setDaemon(true) override def run(): Unit = {