From 25e9f991c61e0f52886a9c90638de31f7ecbd3dc Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Mon, 16 Sep 2019 16:16:17 -0700 Subject: [PATCH 1/2] [SPARK-29104][CORE][TESTS] Fix PipedRDDSuite to wait messages before checking thread termination --- .../test/scala/org/apache/spark/rdd/PipedRDDSuite.scala | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala index 69739a2e58481..a5f0ccd511f94 100644 --- a/core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala @@ -99,12 +99,13 @@ class PipedRDDSuite extends SparkFunSuite with SharedSparkContext { val result = piped.mapPartitions(_ => Array.emptyIntArray.iterator) assert(result.collect().length === 0) + sc.listenerBus.waitUntilEmpty(10000) - // collect stderr writer threads - val stderrWriterThread = Thread.getAllStackTraces.keySet().asScala + // collect stdin writer threads + val stdinWriterThread = Thread.getAllStackTraces.keySet().asScala .find { _.getName.startsWith(PipedRDD.STDIN_WRITER_THREAD_PREFIX) } - assert(stderrWriterThread.isEmpty) + assert(stdinWriterThread.isEmpty) } test("advanced pipe") { From 918cbd795a00771660569d51004070ffc7206691 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Mon, 16 Sep 2019 21:50:41 -0700 Subject: [PATCH 2/2] Use eventually. --- .../org/apache/spark/rdd/PipedRDDSuite.scala | 20 ++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala index a5f0ccd511f94..7d419579a36d0 100644 --- a/core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala @@ -21,16 +21,18 @@ import java.io.File import scala.collection.JavaConverters._ import scala.collection.Map +import scala.concurrent.duration._ import scala.io.Codec import org.apache.hadoop.fs.Path import org.apache.hadoop.io.{LongWritable, Text} import org.apache.hadoop.mapred.{FileSplit, JobConf, TextInputFormat} +import org.scalatest.concurrent.Eventually import org.apache.spark._ import org.apache.spark.util.Utils -class PipedRDDSuite extends SparkFunSuite with SharedSparkContext { +class PipedRDDSuite extends SparkFunSuite with SharedSparkContext with Eventually { val envCommand = if (Utils.isWindows) { "cmd.exe /C set" } else { @@ -99,13 +101,17 @@ class PipedRDDSuite extends SparkFunSuite with SharedSparkContext { val result = piped.mapPartitions(_ => Array.emptyIntArray.iterator) assert(result.collect().length === 0) - sc.listenerBus.waitUntilEmpty(10000) - // collect stdin writer threads - val stdinWriterThread = Thread.getAllStackTraces.keySet().asScala - .find { _.getName.startsWith(PipedRDD.STDIN_WRITER_THREAD_PREFIX) } - - assert(stdinWriterThread.isEmpty) + // SPARK-29104 PipedRDD will invoke `stdinWriterThread.interrupt()` at task completion, + // and `obj.wait` will get InterruptedException. However, there exists a possibility + // which the thread termination gets delayed because the thread starts from `obj.wait()` + // with that exception. To prevent test flakiness, we need to use `eventually`. + eventually(timeout(10.seconds), interval(1.second)) { + // collect stdin writer threads + val stdinWriterThread = Thread.getAllStackTraces.keySet().asScala + .find { _.getName.startsWith(PipedRDD.STDIN_WRITER_THREAD_PREFIX) } + assert(stdinWriterThread.isEmpty) + } } test("advanced pipe") {