Skip to content

Commit b49856a

Browse files
nongliAndrew Or
authored andcommitted
[SPARK-12411][CORE] Decrease executor heartbeat timeout to match heartbeat interval
Previously, the rpc timeout was the default network timeout, which is the same value the driver uses to determine dead executors. This means if there is a network issue, the executor is determined dead after one heartbeat attempt. There is a separate config for the heartbeat interval which is a better value to use for the heartbeat RPC. With this change, the executor will make multiple heartbeat attempts even with RPC issues. Author: Nong Li <[email protected]> Closes #10365 from nongli/spark-12411.
1 parent 5987b16 commit b49856a

File tree

1 file changed

+3
-1
lines changed

1 file changed

+3
-1
lines changed

core/src/main/scala/org/apache/spark/executor/Executor.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import scala.util.control.NonFatal
3030
import org.apache.spark._
3131
import org.apache.spark.deploy.SparkHadoopUtil
3232
import org.apache.spark.memory.TaskMemoryManager
33+
import org.apache.spark.rpc.RpcTimeout
3334
import org.apache.spark.scheduler.{DirectTaskResult, IndirectTaskResult, Task}
3435
import org.apache.spark.shuffle.FetchFailedException
3536
import org.apache.spark.storage.{StorageLevel, TaskResultBlockId}
@@ -445,7 +446,8 @@ private[spark] class Executor(
445446

446447
val message = Heartbeat(executorId, tasksMetrics.toArray, env.blockManager.blockManagerId)
447448
try {
448-
val response = heartbeatReceiverRef.askWithRetry[HeartbeatResponse](message)
449+
val response = heartbeatReceiverRef.askWithRetry[HeartbeatResponse](
450+
message, RpcTimeout(conf, "spark.executor.heartbeatInterval", "10s"))
449451
if (response.reregisterBlockManager) {
450452
logInfo("Told to re-register on heartbeat")
451453
env.blockManager.reregister()

0 commit comments

Comments
 (0)