Skip to content

Commit 86bf93e

Browse files
zsxwingAndrew Or
authored andcommitted
[SPARK-13522][CORE] Executor should kill itself when it's unable to heartbeat to driver more than N times
## What changes were proposed in this pull request? Sometimes, network disconnection event won't be triggered for other potential race conditions that we may not have thought of, then the executor will keep sending heartbeats to driver and won't exit. This PR adds a new configuration `spark.executor.heartbeat.maxFailures` to kill Executor when it's unable to heartbeat to the driver more than `spark.executor.heartbeat.maxFailures` times. ## How was this patch tested? unit tests Author: Shixiong Zhu <[email protected]> Closes #11401 from zsxwing/SPARK-13522.
1 parent c433c0a commit 86bf93e

File tree

2 files changed

+29
-1
lines changed

2 files changed

+29
-1
lines changed

core/src/main/scala/org/apache/spark/executor/Executor.scala

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,19 @@ private[spark] class Executor(
114114
private val heartbeatReceiverRef =
115115
RpcUtils.makeDriverRef(HeartbeatReceiver.ENDPOINT_NAME, conf, env.rpcEnv)
116116

117+
/**
118+
* When an executor is unable to send heartbeats to the driver more than `HEARTBEAT_MAX_FAILURES`
119+
* times, it should kill itself. The default value is 60. It means we will retry to send
120+
* heartbeats about 10 minutes because the heartbeat interval is 10s.
121+
*/
122+
private val HEARTBEAT_MAX_FAILURES = conf.getInt("spark.executor.heartbeat.maxFailures", 60)
123+
124+
/**
125+
* Count the failure times of heartbeat. It should only be acessed in the heartbeat thread. Each
126+
* successful heartbeat will reset it to 0.
127+
*/
128+
private var heartbeatFailures = 0
129+
117130
startDriverHeartbeater()
118131

119132
def launchTask(
@@ -464,8 +477,15 @@ private[spark] class Executor(
464477
logInfo("Told to re-register on heartbeat")
465478
env.blockManager.reregister()
466479
}
480+
heartbeatFailures = 0
467481
} catch {
468-
case NonFatal(e) => logWarning("Issue communicating with driver in heartbeater", e)
482+
case NonFatal(e) =>
483+
logWarning("Issue communicating with driver in heartbeater", e)
484+
logError(s"Unable to send heartbeats to driver more than $HEARTBEAT_MAX_FAILURES times")
485+
heartbeatFailures += 1
486+
if (heartbeatFailures >= HEARTBEAT_MAX_FAILURES) {
487+
System.exit(ExecutorExitCode.HEARTBEAT_FAILURE)
488+
}
469489
}
470490
}
471491

core/src/main/scala/org/apache/spark/executor/ExecutorExitCode.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,12 @@ object ExecutorExitCode {
3939
/** ExternalBlockStore failed to create a local temporary directory after many attempts. */
4040
val EXTERNAL_BLOCK_STORE_FAILED_TO_CREATE_DIR = 55
4141

42+
/**
43+
* Executor is unable to send heartbeats to the driver more than
44+
* "spark.executor.heartbeat.maxFailures" times.
45+
*/
46+
val HEARTBEAT_FAILURE = 56
47+
4248
def explainExitCode(exitCode: Int): String = {
4349
exitCode match {
4450
case UNCAUGHT_EXCEPTION => "Uncaught exception"
@@ -51,6 +57,8 @@ object ExecutorExitCode {
5157
// TODO: replace external block store with concrete implementation name
5258
case EXTERNAL_BLOCK_STORE_FAILED_TO_CREATE_DIR =>
5359
"ExternalBlockStore failed to create a local temporary directory."
60+
case HEARTBEAT_FAILURE =>
61+
"Unable to send heartbeats to driver."
5462
case _ =>
5563
"Unknown executor exit code (" + exitCode + ")" + (
5664
if (exitCode > 128) {

0 commit comments

Comments
 (0)