diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala index 6c4c5c94cfa28..854dfbd7b1cb0 100644 --- a/core/src/main/scala/org/apache/spark/SparkConf.scala +++ b/core/src/main/scala/org/apache/spark/SparkConf.scala @@ -22,6 +22,7 @@ import java.util.concurrent.ConcurrentHashMap import scala.collection.JavaConverters._ import scala.collection.mutable.LinkedHashSet +import scala.concurrent.duration._ import org.apache.avro.{Schema, SchemaNormalization} @@ -610,7 +611,8 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Seria s"${NETWORK_AUTH_ENABLED.key} must be enabled when enabling encryption.") val executorTimeoutThreshold = getTimeAsSeconds("spark.network.timeout", "120s") - val executorHeartbeatInterval = getTimeAsSeconds("spark.executor.heartbeatInterval", "10s") + val executorHeartbeatInterval = + getTimeAsMs("spark.executor.heartbeatInterval", "10s").millis.toSeconds // If spark.executor.heartbeatInterval bigger than spark.network.timeout, // it will almost always cause ExecutorLostFailure. See SPARK-22754. require(executorTimeoutThreshold > executorHeartbeatInterval, "The value of " + diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 97dfcc482b174..9ac8063739984 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -28,6 +28,7 @@ import javax.annotation.concurrent.GuardedBy import scala.collection.JavaConverters._ import scala.collection.mutable.{ArrayBuffer, HashMap, Map} +import scala.concurrent.duration._ import scala.util.control.NonFatal import com.google.common.util.concurrent.ThreadFactoryBuilder @@ -831,9 +832,11 @@ private[spark] class Executor( } val message = Heartbeat(executorId, accumUpdates.toArray, env.blockManager.blockManagerId) + val heartbeatIntervalInSec = + conf.getTimeAsMs("spark.executor.heartbeatInterval", "10s").millis.toSeconds.seconds try { val response = heartbeatReceiverRef.askSync[HeartbeatResponse]( - message, RpcTimeout(conf, "spark.executor.heartbeatInterval", "10s")) + message, new RpcTimeout(heartbeatIntervalInSec, "spark.executor.heartbeatInterval")) if (response.reregisterBlockManager) { logInfo("Told to re-register on heartbeat") env.blockManager.reregister()