Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 46 additions & 30 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1772,50 +1772,66 @@ private[spark] object Utils extends Logging {
}

/**
* Terminates a process waiting for at most the specified duration. Returns whether
* the process terminated.
* Terminates a process waiting for at most the specified duration.
*
* @return the process exit value if it was successfully terminated, else None
*/
def terminateProcess(process: Process, timeoutMs: Long): Option[Int] = {
try {
// Java8 added a new API which will more forcibly kill the process. Use that if available.
val destroyMethod = process.getClass().getMethod("destroyForcibly");
destroyMethod.setAccessible(true)
destroyMethod.invoke(process)
} catch {
case NonFatal(e) =>
if (!e.isInstanceOf[NoSuchMethodException]) {
logWarning("Exception when attempting to kill process", e)
}
process.destroy()
}
// Politely destroy first
process.destroy()

if (waitForProcess(process, timeoutMs)) {
// Successful exit
Option(process.exitValue())
} else {
None
// Java 8 added a new API which will more forcibly kill the process. Use that if available.
try {
classOf[Process].getMethod("destroyForcibly").invoke(process)
} catch {
case _: NoSuchMethodException => return None // Not available; give up
case NonFatal(e) => logWarning("Exception when attempting to kill process", e)
}
// Wait, again, although this really should return almost immediately
if (waitForProcess(process, timeoutMs)) {
Option(process.exitValue())
} else {
logWarning("Timed out waiting to forcibly kill process")
None
}
}
}

/**
* Wait for a process to terminate for at most the specified duration.
* Return whether the process actually terminated after the given timeout.
*
* @return whether the process actually terminated before the given timeout.
*/
def waitForProcess(process: Process, timeoutMs: Long): Boolean = {
var terminated = false
val startTime = System.currentTimeMillis
while (!terminated) {
try {
process.exitValue()
terminated = true
} catch {
case e: IllegalThreadStateException =>
// Process not terminated yet
if (System.currentTimeMillis - startTime > timeoutMs) {
return false
try {
// Use Java 8 method if available
classOf[Process].getMethod("waitFor", java.lang.Long.TYPE, classOf[TimeUnit])
.invoke(process, timeoutMs.asInstanceOf[java.lang.Long], TimeUnit.MILLISECONDS)
.asInstanceOf[Boolean]
} catch {
case _: NoSuchMethodError =>
// Otherwise implement it manually
var terminated = false
val startTime = System.currentTimeMillis
while (!terminated) {
try {
process.exitValue()
terminated = true
} catch {
case e: IllegalThreadStateException =>
// Process not terminated yet
if (System.currentTimeMillis - startTime > timeoutMs) {
return false
}
Thread.sleep(100)
}
Thread.sleep(100)
}
}
true
}
true
}

/**
Expand Down
2 changes: 1 addition & 1 deletion core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -863,7 +863,7 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging {
assert(terminated.isDefined)
Utils.waitForProcess(process, 5000)
val duration = System.currentTimeMillis() - start
assert(duration < 5000)
assert(duration < 6000) // add a little extra time to allow a force kill to finish
assert(!pidExists(pid))
} finally {
signal(pid, "SIGKILL")
Expand Down