Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 13 additions & 6 deletions core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,18 @@ import java.io.File

import scala.collection.JavaConverters._
import scala.collection.Map
import scala.concurrent.duration._
import scala.io.Codec

import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.mapred.{FileSplit, JobConf, TextInputFormat}
import org.scalatest.concurrent.Eventually

import org.apache.spark._
import org.apache.spark.util.Utils

class PipedRDDSuite extends SparkFunSuite with SharedSparkContext {
class PipedRDDSuite extends SparkFunSuite with SharedSparkContext with Eventually {
val envCommand = if (Utils.isWindows) {
"cmd.exe /C set"
} else {
Expand Down Expand Up @@ -100,11 +102,16 @@ class PipedRDDSuite extends SparkFunSuite with SharedSparkContext {

assert(result.collect().length === 0)

// collect stderr writer threads
val stderrWriterThread = Thread.getAllStackTraces.keySet().asScala
.find { _.getName.startsWith(PipedRDD.STDIN_WRITER_THREAD_PREFIX) }

assert(stderrWriterThread.isEmpty)
// SPARK-29104 PipedRDD will invoke `stdinWriterThread.interrupt()` at task completion,
// and `obj.wait` will get InterruptedException. However, there exists a possibility
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the updated patch. Much better with artificially reproducing the issue.

I'm trying to understand the situation - for the sake of understanding, interrupting thread would throw InterruptedException when thread is waiting via obj.wait() and stdinWriterThread will catch it, and run() will finish.

I'm trying to understand the problematic situation as well - if Thread.interrupt() occurs before obj.wait(), the thread will keep running until there's some method understanding Thread's interrupted state (like Object.wait() which will throw InterruptedException even it was thrown earlier) so there's possible delay for thread to be finished. Could you confirm my understanding?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The second flow is correct. However, the first one is slightly differently described. stdinWriterThread is not catching the exception. The task thread is just marked as interrupted status and restarts from the after obj.wait bytecode with InterruptedException. However, this Task thread can be slow and the test code grabs the Thread.getAllStackTraces before the Task thread escaped from the run function.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I played with debugger for the first case as second case cannot be reproduced easily but first case would be always reproduced. It's slightly different though no big deal.

With debugger, for normal case with current master branch, stdinWriterThread catches the InterruptedException and escape run() function after executing catch/finally statements. stderrReaderThread doesn't catch the InterruptedException as it's already destroyed when we try to interrupt.

We don't join these threads for task completion anyway so it seems to be also possible even for first case to let test code verify earlier than let stdinWriterThread be destroyed. Maybe you've already explained same and I misunderstood ;( If that's the case, I'm sorry to bother you.

// which the thread termination gets delayed because the thread starts from `obj.wait()`
// with that exception. To prevent test flakiness, we need to use `eventually`.
eventually(timeout(10.seconds), interval(1.second)) {
// collect stdin writer threads
val stdinWriterThread = Thread.getAllStackTraces.keySet().asScala
.find { _.getName.startsWith(PipedRDD.STDIN_WRITER_THREAD_PREFIX) }
assert(stdinWriterThread.isEmpty)
}
}

test("advanced pipe") {
Expand Down