Skip to content

Commit 00589bd

Browse files
committed
[SPARK-29104][CORE][TESTS] Fix PipedRDDSuite to use eventually to check thread termination
### What changes were proposed in this pull request? `PipedRDD` will invoke `stdinWriterThread.interrupt()` at task completion, and `obj.wait` will get `InterruptedException`. However, there exists a possibility which the thread termination gets delayed because the thread starts from `obj.wait()` with that exception. To prevent test flakiness, we need to use `eventually`. Also, This PR fixes the typo in code comment and variable name. ### Why are the changes needed? ``` - stdin writer thread should be exited when task is finished *** FAILED *** Some(Thread[stdin writer for List(cat),5,]) was not empty (PipedRDDSuite.scala:107) ``` - https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Test%20(Dashboard)/job/spark-master-test-maven-hadoop-2.7/6867/testReport/junit/org.apache.spark.rdd/PipedRDDSuite/stdin_writer_thread_should_be_exited_when_task_is_finished/ ### Does this PR introduce any user-facing change? No. ### How was this patch tested? Manual. We can reproduce the same failure like Jenkins if we catch `InterruptedException` and sleep longer than the `eventually` timeout inside the test code. The following is the example to reproduce it. ```scala val nums = sc.makeRDD(Array(1, 2, 3, 4), 1).map { x => try { obj.synchronized { obj.wait() // make the thread waits here. } } catch { case ie: InterruptedException => Thread.sleep(15000) throw ie } x } ``` Closes #25808 from dongjoon-hyun/SPARK-29104. Authored-by: Dongjoon Hyun <[email protected]> Signed-off-by: HyukjinKwon <[email protected]> (cherry picked from commit 34915b2) Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent 4dedd39 commit 00589bd

File tree

1 file changed

+13
-6
lines changed

1 file changed

+13
-6
lines changed

core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,16 +21,18 @@ import java.io.File
2121

2222
import scala.collection.JavaConverters._
2323
import scala.collection.Map
24+
import scala.concurrent.duration._
2425
import scala.io.Codec
2526

2627
import org.apache.hadoop.fs.Path
2728
import org.apache.hadoop.io.{LongWritable, Text}
2829
import org.apache.hadoop.mapred.{FileSplit, JobConf, TextInputFormat}
30+
import org.scalatest.concurrent.Eventually
2931

3032
import org.apache.spark._
3133
import org.apache.spark.util.Utils
3234

33-
class PipedRDDSuite extends SparkFunSuite with SharedSparkContext {
35+
class PipedRDDSuite extends SparkFunSuite with SharedSparkContext with Eventually {
3436
val envCommand = if (Utils.isWindows) {
3537
"cmd.exe /C set"
3638
} else {
@@ -100,11 +102,16 @@ class PipedRDDSuite extends SparkFunSuite with SharedSparkContext {
100102

101103
assert(result.collect().length === 0)
102104

103-
// collect stderr writer threads
104-
val stderrWriterThread = Thread.getAllStackTraces.keySet().asScala
105-
.find { _.getName.startsWith(PipedRDD.STDIN_WRITER_THREAD_PREFIX) }
106-
107-
assert(stderrWriterThread.isEmpty)
105+
// SPARK-29104 PipedRDD will invoke `stdinWriterThread.interrupt()` at task completion,
106+
// and `obj.wait` will get InterruptedException. However, there exists a possibility
107+
// which the thread termination gets delayed because the thread starts from `obj.wait()`
108+
// with that exception. To prevent test flakiness, we need to use `eventually`.
109+
eventually(timeout(10.seconds), interval(1.second)) {
110+
// collect stdin writer threads
111+
val stdinWriterThread = Thread.getAllStackTraces.keySet().asScala
112+
.find { _.getName.startsWith(PipedRDD.STDIN_WRITER_THREAD_PREFIX) }
113+
assert(stdinWriterThread.isEmpty)
114+
}
108115
}
109116

110117
test("advanced pipe") {

0 commit comments

Comments
 (0)