Skip to content

Commit 35d71dd

Browse files
srowenzzcclp
authored andcommitted
[SPARK-16182][CORE] Utils.scala -- terminateProcess() should call Process.destroyForcibly() if and only if Process.destroy() fails
## What changes were proposed in this pull request? Utils.terminateProcess should `destroy()` first and only fall back to `destroyForcibly()` if it fails. It's kind of bad that we're force-killing executors -- and only in Java 8. See JIRA for an example of the impact: no shutdown While here: `Utils.waitForProcess` should use the Java 8 method if available instead of a custom implementation. ## How was this patch tested? Existing tests, which cover the force-kill case, and Amplab tests, which will cover both Java 7 and Java 8 eventually. However I tested locally on Java 8 and the PR builder will try Java 7 here. Author: Sean Owen <[email protected]> Closes apache#13973 from srowen/SPARK-16182. (cherry picked from commit 2075bf8) Signed-off-by: Sean Owen <[email protected]> (cherry picked from commit 83f8604)
1 parent ca4898e commit 35d71dd

File tree

2 files changed

+47
-31
lines changed

2 files changed

+47
-31
lines changed

core/src/main/scala/org/apache/spark/util/Utils.scala

Lines changed: 46 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1732,50 +1732,66 @@ private[spark] object Utils extends Logging {
17321732
}
17331733

17341734
/**
1735-
* Terminates a process waiting for at most the specified duration. Returns whether
1736-
* the process terminated.
1735+
* Terminates a process waiting for at most the specified duration.
1736+
*
1737+
* @return the process exit value if it was successfully terminated, else None
17371738
*/
17381739
def terminateProcess(process: Process, timeoutMs: Long): Option[Int] = {
1739-
try {
1740-
// Java8 added a new API which will more forcibly kill the process. Use that if available.
1741-
val destroyMethod = process.getClass().getMethod("destroyForcibly");
1742-
destroyMethod.setAccessible(true)
1743-
destroyMethod.invoke(process)
1744-
} catch {
1745-
case NonFatal(e) =>
1746-
if (!e.isInstanceOf[NoSuchMethodException]) {
1747-
logWarning("Exception when attempting to kill process", e)
1748-
}
1749-
process.destroy()
1750-
}
1740+
// Politely destroy first
1741+
process.destroy()
1742+
17511743
if (waitForProcess(process, timeoutMs)) {
1744+
// Successful exit
17521745
Option(process.exitValue())
17531746
} else {
1754-
None
1747+
// Java 8 added a new API which will more forcibly kill the process. Use that if available.
1748+
try {
1749+
classOf[Process].getMethod("destroyForcibly").invoke(process)
1750+
} catch {
1751+
case _: NoSuchMethodException => return None // Not available; give up
1752+
case NonFatal(e) => logWarning("Exception when attempting to kill process", e)
1753+
}
1754+
// Wait, again, although this really should return almost immediately
1755+
if (waitForProcess(process, timeoutMs)) {
1756+
Option(process.exitValue())
1757+
} else {
1758+
logWarning("Timed out waiting to forcibly kill process")
1759+
None
1760+
}
17551761
}
17561762
}
17571763

17581764
/**
17591765
* Wait for a process to terminate for at most the specified duration.
1760-
* Return whether the process actually terminated after the given timeout.
1766+
*
1767+
* @return whether the process actually terminated before the given timeout.
17611768
*/
17621769
def waitForProcess(process: Process, timeoutMs: Long): Boolean = {
1763-
var terminated = false
1764-
val startTime = System.currentTimeMillis
1765-
while (!terminated) {
1766-
try {
1767-
process.exitValue()
1768-
terminated = true
1769-
} catch {
1770-
case e: IllegalThreadStateException =>
1771-
// Process not terminated yet
1772-
if (System.currentTimeMillis - startTime > timeoutMs) {
1773-
return false
1770+
try {
1771+
// Use Java 8 method if available
1772+
classOf[Process].getMethod("waitFor", java.lang.Long.TYPE, classOf[TimeUnit])
1773+
.invoke(process, timeoutMs.asInstanceOf[java.lang.Long], TimeUnit.MILLISECONDS)
1774+
.asInstanceOf[Boolean]
1775+
} catch {
1776+
case _: NoSuchMethodError =>
1777+
// Otherwise implement it manually
1778+
var terminated = false
1779+
val startTime = System.currentTimeMillis
1780+
while (!terminated) {
1781+
try {
1782+
process.exitValue()
1783+
terminated = true
1784+
} catch {
1785+
case e: IllegalThreadStateException =>
1786+
// Process not terminated yet
1787+
if (System.currentTimeMillis - startTime > timeoutMs) {
1788+
return false
1789+
}
1790+
Thread.sleep(100)
17741791
}
1775-
Thread.sleep(100)
1776-
}
1792+
}
1793+
true
17771794
}
1778-
true
17791795
}
17801796

17811797
/**

core/src/test/scala/org/apache/spark/util/UtilsSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -831,7 +831,7 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging {
831831
assert(terminated.isDefined)
832832
Utils.waitForProcess(process, 5000)
833833
val duration = System.currentTimeMillis() - start
834-
assert(duration < 5000)
834+
assert(duration < 6000) // add a little extra time to allow a force kill to finish
835835
assert(!pidExists(pid))
836836
} finally {
837837
signal(pid, "SIGKILL")

0 commit comments

Comments
 (0)