Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions core/src/main/scala/org/apache/spark/SparkConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -610,14 +610,14 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Seria
require(!encryptionEnabled || get(NETWORK_AUTH_ENABLED),
s"${NETWORK_AUTH_ENABLED.key} must be enabled when enabling encryption.")

val executorTimeoutThreshold = getTimeAsSeconds("spark.network.timeout", "120s")
val executorHeartbeatInterval =
getTimeAsMs("spark.executor.heartbeatInterval", "10s").millis.toSeconds
val executorTimeoutThresholdMs =
getTimeAsSeconds("spark.network.timeout", "120s") * 1000
val executorHeartbeatIntervalMs = get(EXECUTOR_HEARTBEAT_INTERVAL)
// If spark.executor.heartbeatInterval bigger than spark.network.timeout,
// it will almost always cause ExecutorLostFailure. See SPARK-22754.
require(executorTimeoutThreshold > executorHeartbeatInterval, "The value of " +
s"spark.network.timeout=${executorTimeoutThreshold}s must be no less than the value of " +
s"spark.executor.heartbeatInterval=${executorHeartbeatInterval}s.")
require(executorTimeoutThresholdMs > executorHeartbeatIntervalMs, "The value of " +
s"spark.network.timeout=${executorTimeoutThresholdMs}ms must be no less than the value of " +
s"spark.executor.heartbeatInterval=${executorHeartbeatIntervalMs}ms.")
}

/**
Expand Down
11 changes: 7 additions & 4 deletions core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,11 @@ private[spark] class Executor(
// Maintains the list of running tasks.
private val runningTasks = new ConcurrentHashMap[Long, TaskRunner]

/**
* Interval to send heartbeats, in milliseconds
*/
private val HEARTBEAT_INTERVAL_MS = conf.get(EXECUTOR_HEARTBEAT_INTERVAL)

// Executor for the heartbeat task.
private val heartbeater = ThreadUtils.newDaemonSingleThreadScheduledExecutor("driver-heartbeater")

Expand Down Expand Up @@ -832,11 +837,9 @@ private[spark] class Executor(
}

val message = Heartbeat(executorId, accumUpdates.toArray, env.blockManager.blockManagerId)
val heartbeatIntervalInSec =
conf.getTimeAsMs("spark.executor.heartbeatInterval", "10s").millis.toSeconds.seconds
try {
val response = heartbeatReceiverRef.askSync[HeartbeatResponse](
message, new RpcTimeout(heartbeatIntervalInSec, "spark.executor.heartbeatInterval"))
message, new RpcTimeout(HEARTBEAT_INTERVAL_MS.millis, EXECUTOR_HEARTBEAT_INTERVAL.key))
if (response.reregisterBlockManager) {
logInfo("Told to re-register on heartbeat")
env.blockManager.reregister()
Expand All @@ -858,7 +861,7 @@ private[spark] class Executor(
* Schedules a task to report heartbeat and partial metrics for active tasks to driver.
*/
private def startDriverHeartbeater(): Unit = {
val intervalMs = conf.getTimeAsMs("spark.executor.heartbeatInterval", "10s")
val intervalMs = HEARTBEAT_INTERVAL_MS

// Wait a random interval so the heartbeats don't end up in sync
val initialDelay = intervalMs + (math.random * intervalMs).asInstanceOf[Int]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,11 @@ package object config {
private[spark] val EXECUTOR_CLASS_PATH =
ConfigBuilder(SparkLauncher.EXECUTOR_EXTRA_CLASSPATH).stringConf.createOptional

private[spark] val EXECUTOR_HEARTBEAT_INTERVAL =
ConfigBuilder("spark.executor.heartbeatInterval")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("10s")

private[spark] val EXECUTOR_JAVA_OPTIONS =
ConfigBuilder(SparkLauncher.EXECUTOR_EXTRA_JAVA_OPTIONS).stringConf.createOptional

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import org.apache.mesos.SchedulerDriver
import org.apache.spark.{SecurityManager, SparkConf, SparkContext, SparkException, TaskState}
import org.apache.spark.deploy.mesos.config._
import org.apache.spark.internal.config
import org.apache.spark.internal.config.EXECUTOR_HEARTBEAT_INTERVAL
import org.apache.spark.launcher.{LauncherBackend, SparkAppHandle}
import org.apache.spark.network.netty.SparkTransportConf
import org.apache.spark.network.shuffle.mesos.MesosExternalShuffleClient
Expand Down Expand Up @@ -635,7 +636,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
externalShufflePort,
sc.conf.getTimeAsMs("spark.storage.blockManagerSlaveTimeoutMs",
s"${sc.conf.getTimeAsSeconds("spark.network.timeout", "120s")}s"),
sc.conf.getTimeAsMs("spark.executor.heartbeatInterval", "10s"))
sc.conf.get(EXECUTOR_HEARTBEAT_INTERVAL))
slave.shuffleRegistered = true
}

Expand Down