diff --git a/core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala index 69739a2e58481..7d419579a36d0 100644 --- a/core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala @@ -21,16 +21,18 @@ import java.io.File import scala.collection.JavaConverters._ import scala.collection.Map +import scala.concurrent.duration._ import scala.io.Codec import org.apache.hadoop.fs.Path import org.apache.hadoop.io.{LongWritable, Text} import org.apache.hadoop.mapred.{FileSplit, JobConf, TextInputFormat} +import org.scalatest.concurrent.Eventually import org.apache.spark._ import org.apache.spark.util.Utils -class PipedRDDSuite extends SparkFunSuite with SharedSparkContext { +class PipedRDDSuite extends SparkFunSuite with SharedSparkContext with Eventually { val envCommand = if (Utils.isWindows) { "cmd.exe /C set" } else { @@ -100,11 +102,16 @@ class PipedRDDSuite extends SparkFunSuite with SharedSparkContext { assert(result.collect().length === 0) - // collect stderr writer threads - val stderrWriterThread = Thread.getAllStackTraces.keySet().asScala - .find { _.getName.startsWith(PipedRDD.STDIN_WRITER_THREAD_PREFIX) } - - assert(stderrWriterThread.isEmpty) + // SPARK-29104 PipedRDD will invoke `stdinWriterThread.interrupt()` at task completion, + // and `obj.wait` will get InterruptedException. However, there exists a possibility + // which the thread termination gets delayed because the thread starts from `obj.wait()` + // with that exception. To prevent test flakiness, we need to use `eventually`. + eventually(timeout(10.seconds), interval(1.second)) { + // collect stdin writer threads + val stdinWriterThread = Thread.getAllStackTraces.keySet().asScala + .find { _.getName.startsWith(PipedRDD.STDIN_WRITER_THREAD_PREFIX) } + assert(stdinWriterThread.isEmpty) + } } test("advanced pipe") {