Skip to content

Commit 2075bf8

Browse files
committed
[SPARK-16182][CORE] Utils.scala -- terminateProcess() should call Process.destroyForcibly() if and only if Process.destroy() fails
## What changes were proposed in this pull request? Utils.terminateProcess should `destroy()` first and only fall back to `destroyForcibly()` if it fails. It's kind of bad that we're force-killing executors -- and only in Java 8. See JIRA for an example of the impact: no shutdown While here: `Utils.waitForProcess` should use the Java 8 method if available instead of a custom implementation. ## How was this patch tested? Existing tests, which cover the force-kill case, and Amplab tests, which will cover both Java 7 and Java 8 eventually. However I tested locally on Java 8 and the PR builder will try Java 7 here. Author: Sean Owen <[email protected]> Closes #13973 from srowen/SPARK-16182.
1 parent fbfd0ab commit 2075bf8

File tree

2 files changed

+47
-31
lines changed

2 files changed

+47
-31
lines changed

core/src/main/scala/org/apache/spark/util/Utils.scala

Lines changed: 46 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1772,50 +1772,66 @@ private[spark] object Utils extends Logging {
17721772
}
17731773

17741774
/**
1775-
* Terminates a process waiting for at most the specified duration. Returns whether
1776-
* the process terminated.
1775+
* Terminates a process waiting for at most the specified duration.
1776+
*
1777+
* @return the process exit value if it was successfully terminated, else None
17771778
*/
17781779
def terminateProcess(process: Process, timeoutMs: Long): Option[Int] = {
1779-
try {
1780-
// Java8 added a new API which will more forcibly kill the process. Use that if available.
1781-
val destroyMethod = process.getClass().getMethod("destroyForcibly");
1782-
destroyMethod.setAccessible(true)
1783-
destroyMethod.invoke(process)
1784-
} catch {
1785-
case NonFatal(e) =>
1786-
if (!e.isInstanceOf[NoSuchMethodException]) {
1787-
logWarning("Exception when attempting to kill process", e)
1788-
}
1789-
process.destroy()
1790-
}
1780+
// Politely destroy first
1781+
process.destroy()
1782+
17911783
if (waitForProcess(process, timeoutMs)) {
1784+
// Successful exit
17921785
Option(process.exitValue())
17931786
} else {
1794-
None
1787+
// Java 8 added a new API which will more forcibly kill the process. Use that if available.
1788+
try {
1789+
classOf[Process].getMethod("destroyForcibly").invoke(process)
1790+
} catch {
1791+
case _: NoSuchMethodException => return None // Not available; give up
1792+
case NonFatal(e) => logWarning("Exception when attempting to kill process", e)
1793+
}
1794+
// Wait, again, although this really should return almost immediately
1795+
if (waitForProcess(process, timeoutMs)) {
1796+
Option(process.exitValue())
1797+
} else {
1798+
logWarning("Timed out waiting to forcibly kill process")
1799+
None
1800+
}
17951801
}
17961802
}
17971803

17981804
/**
17991805
* Wait for a process to terminate for at most the specified duration.
1800-
* Return whether the process actually terminated after the given timeout.
1806+
*
1807+
* @return whether the process actually terminated before the given timeout.
18011808
*/
18021809
def waitForProcess(process: Process, timeoutMs: Long): Boolean = {
1803-
var terminated = false
1804-
val startTime = System.currentTimeMillis
1805-
while (!terminated) {
1806-
try {
1807-
process.exitValue()
1808-
terminated = true
1809-
} catch {
1810-
case e: IllegalThreadStateException =>
1811-
// Process not terminated yet
1812-
if (System.currentTimeMillis - startTime > timeoutMs) {
1813-
return false
1810+
try {
1811+
// Use Java 8 method if available
1812+
classOf[Process].getMethod("waitFor", java.lang.Long.TYPE, classOf[TimeUnit])
1813+
.invoke(process, timeoutMs.asInstanceOf[java.lang.Long], TimeUnit.MILLISECONDS)
1814+
.asInstanceOf[Boolean]
1815+
} catch {
1816+
case _: NoSuchMethodError =>
1817+
// Otherwise implement it manually
1818+
var terminated = false
1819+
val startTime = System.currentTimeMillis
1820+
while (!terminated) {
1821+
try {
1822+
process.exitValue()
1823+
terminated = true
1824+
} catch {
1825+
case e: IllegalThreadStateException =>
1826+
// Process not terminated yet
1827+
if (System.currentTimeMillis - startTime > timeoutMs) {
1828+
return false
1829+
}
1830+
Thread.sleep(100)
18141831
}
1815-
Thread.sleep(100)
1816-
}
1832+
}
1833+
true
18171834
}
1818-
true
18191835
}
18201836

18211837
/**

core/src/test/scala/org/apache/spark/util/UtilsSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -863,7 +863,7 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging {
863863
assert(terminated.isDefined)
864864
Utils.waitForProcess(process, 5000)
865865
val duration = System.currentTimeMillis() - start
866-
assert(duration < 5000)
866+
assert(duration < 6000) // add a little extra time to allow a force kill to finish
867867
assert(!pidExists(pid))
868868
} finally {
869869
signal(pid, "SIGKILL")

0 commit comments

Comments
 (0)