From 12b53ee967449285b90ab1d75f89152d7711181a Mon Sep 17 00:00:00 2001 From: Ajith Date: Tue, 19 Mar 2019 10:39:15 +0530 Subject: [PATCH 01/10] fix interval to be uniform when no unit is mentioned --- core/src/main/scala/org/apache/spark/executor/Executor.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 97dfcc482b174..34d683f9cda4a 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -855,7 +855,7 @@ private[spark] class Executor( * Schedules a task to report heartbeat and partial metrics for active tasks to driver. */ private def startDriverHeartbeater(): Unit = { - val intervalMs = conf.getTimeAsMs("spark.executor.heartbeatInterval", "10s") + val intervalMs = conf.getTimeAsSeconds("spark.executor.heartbeatInterval", "10s") * 1000 // Wait a random interval so the heartbeats don't end up in sync val initialDelay = intervalMs + (math.random * intervalMs).asInstanceOf[Int] From 5755b3110ce8a1c1d16fac6edbbfc4927c4c2df8 Mon Sep 17 00:00:00 2001 From: Ajith Date: Tue, 19 Mar 2019 15:58:24 +0530 Subject: [PATCH 02/10] handle magic number as per review comment --- core/src/main/scala/org/apache/spark/executor/Executor.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 34d683f9cda4a..7e0fa0a276c80 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -855,7 +855,9 @@ private[spark] class Executor( * Schedules a task to report heartbeat and partial metrics for active tasks to driver. */ private def startDriverHeartbeater(): Unit = { - val intervalMs = conf.getTimeAsSeconds("spark.executor.heartbeatInterval", "10s") * 1000 + val intervalMs = conf + .getTimeAsSeconds("spark.executor.heartbeatInterval", "10s") * + TimeUnit.SECONDS.toMillis(1) // Wait a random interval so the heartbeats don't end up in sync val initialDelay = intervalMs + (math.random * intervalMs).asInstanceOf[Int] From bc396957d809280a6db64fd33522f863d6e5c0cf Mon Sep 17 00:00:00 2001 From: Ajith Date: Thu, 21 Mar 2019 20:04:28 +0530 Subject: [PATCH 03/10] handle non-unit time as millisecond --- .../org/apache/spark/network/util/JavaUtils.java | 10 +++++++++- .../main/scala/org/apache/spark/SparkConf.scala | 16 ++++++++++++++-- .../org/apache/spark/executor/Executor.scala | 13 +++++++++---- 3 files changed, 32 insertions(+), 7 deletions(-) diff --git a/common/network-common/src/main/java/org/apache/spark/network/util/JavaUtils.java b/common/network-common/src/main/java/org/apache/spark/network/util/JavaUtils.java index b5497087634ce..d41804f9d8820 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/util/JavaUtils.java +++ b/common/network-common/src/main/java/org/apache/spark/network/util/JavaUtils.java @@ -226,6 +226,14 @@ private static boolean isSymlink(File file) throws IOException { * The unit is also considered the default if the given string does not specify a unit. */ public static long timeStringAs(String str, TimeUnit unit) { + return timeStringAs(str, unit, unit); + } + + /** + * Convert a passed time string (e.g. 50s, 100ms, or 250us) to a time count in the given unit. + * defaultUnit is used for string which have just number and no units mentioned + */ + public static long timeStringAs(String str, TimeUnit unit, TimeUnit defaultUnit) { String lower = str.toLowerCase(Locale.ROOT).trim(); try { @@ -243,7 +251,7 @@ public static long timeStringAs(String str, TimeUnit unit) { } // If suffix is valid use that, otherwise none was provided and use the default passed - return unit.convert(val, suffix != null ? timeSuffixes.get(suffix) : unit); + return unit.convert(val, suffix != null ? timeSuffixes.get(suffix) : defaultUnit); } catch (NumberFormatException e) { String timeError = "Time must be specified as seconds (s), " + "milliseconds (ms), microseconds (us), minutes (m or min), hour (h), or day (d). " + diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala index 6c4c5c94cfa28..7eedcc48be8cf 100644 --- a/core/src/main/scala/org/apache/spark/SparkConf.scala +++ b/core/src/main/scala/org/apache/spark/SparkConf.scala @@ -18,7 +18,7 @@ package org.apache.spark import java.util.{Map => JMap} -import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.{ConcurrentHashMap, TimeUnit} import scala.collection.JavaConverters._ import scala.collection.mutable.LinkedHashSet @@ -28,6 +28,7 @@ import org.apache.avro.{Schema, SchemaNormalization} import org.apache.spark.deploy.history.config._ import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ +import org.apache.spark.network.util.JavaUtils import org.apache.spark.serializer.KryoSerializer import org.apache.spark.util.Utils @@ -280,6 +281,16 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Seria Utils.timeStringAsSeconds(get(key, defaultValue)) } + /** + * Get a time parameter as seconds, falling back to a default if not set. If no + * suffix is provided then defaultUnit is assumed. + * @throws NumberFormatException If the value cannot be interpreted as seconds + */ + def getTimeAsSeconds(key: String, defaultValue: String, defaultUnit: TimeUnit): Long = + catchIllegalValue(key) { + JavaUtils.timeStringAs(get(key, defaultValue), TimeUnit.SECONDS, defaultUnit) + } + /** * Get a time parameter as milliseconds; throws a NoSuchElementException if it's not set. If no * suffix is provided then milliseconds are assumed. @@ -610,7 +621,8 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Seria s"${NETWORK_AUTH_ENABLED.key} must be enabled when enabling encryption.") val executorTimeoutThreshold = getTimeAsSeconds("spark.network.timeout", "120s") - val executorHeartbeatInterval = getTimeAsSeconds("spark.executor.heartbeatInterval", "10s") + val executorHeartbeatInterval = + getTimeAsSeconds("spark.executor.heartbeatInterval", "10s", TimeUnit.MILLISECONDS) // If spark.executor.heartbeatInterval bigger than spark.network.timeout, // it will almost always cause ExecutorLostFailure. See SPARK-22754. require(executorTimeoutThreshold > executorHeartbeatInterval, "The value of " + diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 7e0fa0a276c80..e3a267d3977ed 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -28,6 +28,7 @@ import javax.annotation.concurrent.GuardedBy import scala.collection.JavaConverters._ import scala.collection.mutable.{ArrayBuffer, HashMap, Map} +import scala.concurrent.duration._ import scala.util.control.NonFatal import com.google.common.util.concurrent.ThreadFactoryBuilder @@ -71,6 +72,11 @@ private[spark] class Executor( private val conf = env.conf + private val HEARTBEAT_INTERVAL_KEY = "spark.executor.heartbeatInterval" + + private val heartbeatIntervalInSec = + conf.getTimeAsSeconds(HEARTBEAT_INTERVAL_KEY, "10s", TimeUnit.MILLISECONDS).seconds + // No ip or host:port - just hostname Utils.checkHost(executorHostname) // must not have port specified. @@ -832,8 +838,9 @@ private[spark] class Executor( val message = Heartbeat(executorId, accumUpdates.toArray, env.blockManager.blockManagerId) try { + val response = heartbeatReceiverRef.askSync[HeartbeatResponse]( - message, RpcTimeout(conf, "spark.executor.heartbeatInterval", "10s")) + message, new RpcTimeout(heartbeatIntervalInSec, HEARTBEAT_INTERVAL_KEY)) if (response.reregisterBlockManager) { logInfo("Told to re-register on heartbeat") env.blockManager.reregister() @@ -855,9 +862,7 @@ private[spark] class Executor( * Schedules a task to report heartbeat and partial metrics for active tasks to driver. */ private def startDriverHeartbeater(): Unit = { - val intervalMs = conf - .getTimeAsSeconds("spark.executor.heartbeatInterval", "10s") * - TimeUnit.SECONDS.toMillis(1) + val intervalMs = heartbeatIntervalInSec.toMillis // Wait a random interval so the heartbeats don't end up in sync val initialDelay = intervalMs + (math.random * intervalMs).asInstanceOf[Int] From fb9ea5c9d2ce5f11ce8934031a2bcdf4bf9b6eb9 Mon Sep 17 00:00:00 2001 From: Ajith Date: Fri, 22 Mar 2019 11:47:55 +0530 Subject: [PATCH 04/10] standardize heartbeat interval config --- .../org/apache/spark/network/util/JavaUtils.java | 10 +--------- .../main/scala/org/apache/spark/SparkConf.scala | 14 ++------------ .../org/apache/spark/executor/Executor.scala | 16 +++++++--------- .../apache/spark/internal/config/package.scala | 9 +++++++++ .../MesosCoarseGrainedSchedulerBackend.scala | 2 +- 5 files changed, 20 insertions(+), 31 deletions(-) diff --git a/common/network-common/src/main/java/org/apache/spark/network/util/JavaUtils.java b/common/network-common/src/main/java/org/apache/spark/network/util/JavaUtils.java index d41804f9d8820..b5497087634ce 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/util/JavaUtils.java +++ b/common/network-common/src/main/java/org/apache/spark/network/util/JavaUtils.java @@ -226,14 +226,6 @@ private static boolean isSymlink(File file) throws IOException { * The unit is also considered the default if the given string does not specify a unit. */ public static long timeStringAs(String str, TimeUnit unit) { - return timeStringAs(str, unit, unit); - } - - /** - * Convert a passed time string (e.g. 50s, 100ms, or 250us) to a time count in the given unit. - * defaultUnit is used for string which have just number and no units mentioned - */ - public static long timeStringAs(String str, TimeUnit unit, TimeUnit defaultUnit) { String lower = str.toLowerCase(Locale.ROOT).trim(); try { @@ -251,7 +243,7 @@ public static long timeStringAs(String str, TimeUnit unit, TimeUnit defaultUnit) } // If suffix is valid use that, otherwise none was provided and use the default passed - return unit.convert(val, suffix != null ? timeSuffixes.get(suffix) : defaultUnit); + return unit.convert(val, suffix != null ? timeSuffixes.get(suffix) : unit); } catch (NumberFormatException e) { String timeError = "Time must be specified as seconds (s), " + "milliseconds (ms), microseconds (us), minutes (m or min), hour (h), or day (d). " + diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala index 7eedcc48be8cf..ec68e7a31b86a 100644 --- a/core/src/main/scala/org/apache/spark/SparkConf.scala +++ b/core/src/main/scala/org/apache/spark/SparkConf.scala @@ -22,6 +22,7 @@ import java.util.concurrent.{ConcurrentHashMap, TimeUnit} import scala.collection.JavaConverters._ import scala.collection.mutable.LinkedHashSet +import scala.concurrent.duration._ import org.apache.avro.{Schema, SchemaNormalization} @@ -281,16 +282,6 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Seria Utils.timeStringAsSeconds(get(key, defaultValue)) } - /** - * Get a time parameter as seconds, falling back to a default if not set. If no - * suffix is provided then defaultUnit is assumed. - * @throws NumberFormatException If the value cannot be interpreted as seconds - */ - def getTimeAsSeconds(key: String, defaultValue: String, defaultUnit: TimeUnit): Long = - catchIllegalValue(key) { - JavaUtils.timeStringAs(get(key, defaultValue), TimeUnit.SECONDS, defaultUnit) - } - /** * Get a time parameter as milliseconds; throws a NoSuchElementException if it's not set. If no * suffix is provided then milliseconds are assumed. @@ -621,8 +612,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Seria s"${NETWORK_AUTH_ENABLED.key} must be enabled when enabling encryption.") val executorTimeoutThreshold = getTimeAsSeconds("spark.network.timeout", "120s") - val executorHeartbeatInterval = - getTimeAsSeconds("spark.executor.heartbeatInterval", "10s", TimeUnit.MILLISECONDS) + val executorHeartbeatInterval = get(EXECUTOR_HEARTBEAT_INTERVAL).millis.toSeconds // If spark.executor.heartbeatInterval bigger than spark.network.timeout, // it will almost always cause ExecutorLostFailure. See SPARK-22754. require(executorTimeoutThreshold > executorHeartbeatInterval, "The value of " + diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index e3a267d3977ed..88d47a2611f2d 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -72,10 +72,7 @@ private[spark] class Executor( private val conf = env.conf - private val HEARTBEAT_INTERVAL_KEY = "spark.executor.heartbeatInterval" - - private val heartbeatIntervalInSec = - conf.getTimeAsSeconds(HEARTBEAT_INTERVAL_KEY, "10s", TimeUnit.MILLISECONDS).seconds + private val heartbeatInterval = conf.get(EXECUTOR_HEARTBEAT_INTERVAL) // No ip or host:port - just hostname Utils.checkHost(executorHostname) @@ -840,7 +837,8 @@ private[spark] class Executor( try { val response = heartbeatReceiverRef.askSync[HeartbeatResponse]( - message, new RpcTimeout(heartbeatIntervalInSec, HEARTBEAT_INTERVAL_KEY)) + message, new RpcTimeout(heartbeatInterval.millis.toSeconds.seconds, + EXECUTOR_HEARTBEAT_INTERVAL.key)) if (response.reregisterBlockManager) { logInfo("Told to re-register on heartbeat") env.blockManager.reregister() @@ -862,15 +860,15 @@ private[spark] class Executor( * Schedules a task to report heartbeat and partial metrics for active tasks to driver. */ private def startDriverHeartbeater(): Unit = { - val intervalMs = heartbeatIntervalInSec.toMillis - // Wait a random interval so the heartbeats don't end up in sync - val initialDelay = intervalMs + (math.random * intervalMs).asInstanceOf[Int] + val initialDelay = heartbeatInterval + (math.random * heartbeatInterval).asInstanceOf[Int] val heartbeatTask = new Runnable() { override def run(): Unit = Utils.logUncaughtExceptions(reportHeartBeat()) } - heartbeater.scheduleAtFixedRate(heartbeatTask, initialDelay, intervalMs, TimeUnit.MILLISECONDS) + heartbeater + .scheduleAtFixedRate(heartbeatTask, initialDelay, + heartbeatInterval, TimeUnit.MILLISECONDS) } } diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 5c17b9b3a3207..c000a64ad10ee 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -646,4 +646,13 @@ package object config { .stringConf .toSequence .createWithDefault(Nil) + + private[spark] val EXECUTOR_HEARTBEAT_INTERVAL = + ConfigBuilder("spark.executor.heartbeatInterval") + .doc("Interval between each executor's heartbeats to the driver. Heartbeats let " + + "the driver know that the executor is still alive and update it with metrics for" + + "in-progress tasks. spark.executor.heartbeatInterval should be significantly less than " + + "spark.network.timeout") + .timeConf(TimeUnit.MILLISECONDS) + .createWithDefaultString("10s") } diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala index 178de30f0f381..80cbd24c77fc9 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala @@ -635,7 +635,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( externalShufflePort, sc.conf.getTimeAsMs("spark.storage.blockManagerSlaveTimeoutMs", s"${sc.conf.getTimeAsSeconds("spark.network.timeout", "120s")}s"), - sc.conf.getTimeAsMs("spark.executor.heartbeatInterval", "10s")) + sc.conf.get(config.EXECUTOR_HEARTBEAT_INTERVAL)) slave.shuffleRegistered = true } From 76570b7f1dad8cb1e5f58dd1718a21e770148e53 Mon Sep 17 00:00:00 2001 From: Ajith Date: Fri, 22 Mar 2019 11:51:04 +0530 Subject: [PATCH 05/10] update --- core/src/main/scala/org/apache/spark/SparkConf.scala | 3 +-- core/src/main/scala/org/apache/spark/executor/Executor.scala | 1 - 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala index ec68e7a31b86a..5c223058bc69e 100644 --- a/core/src/main/scala/org/apache/spark/SparkConf.scala +++ b/core/src/main/scala/org/apache/spark/SparkConf.scala @@ -18,7 +18,7 @@ package org.apache.spark import java.util.{Map => JMap} -import java.util.concurrent.{ConcurrentHashMap, TimeUnit} +import java.util.concurrent.ConcurrentHashMap import scala.collection.JavaConverters._ import scala.collection.mutable.LinkedHashSet @@ -29,7 +29,6 @@ import org.apache.avro.{Schema, SchemaNormalization} import org.apache.spark.deploy.history.config._ import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ -import org.apache.spark.network.util.JavaUtils import org.apache.spark.serializer.KryoSerializer import org.apache.spark.util.Utils diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 88d47a2611f2d..d592fdb6a1072 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -835,7 +835,6 @@ private[spark] class Executor( val message = Heartbeat(executorId, accumUpdates.toArray, env.blockManager.blockManagerId) try { - val response = heartbeatReceiverRef.askSync[HeartbeatResponse]( message, new RpcTimeout(heartbeatInterval.millis.toSeconds.seconds, EXECUTOR_HEARTBEAT_INTERVAL.key)) From 596c9b287dd972be27b9cd8895d4d0a8fcca1e57 Mon Sep 17 00:00:00 2001 From: Ajith Date: Sun, 24 Mar 2019 12:48:03 +0530 Subject: [PATCH 06/10] update --- .../main/scala/org/apache/spark/SparkConf.scala | 3 ++- .../org/apache/spark/executor/Executor.scala | 16 +++++++++------- .../apache/spark/internal/config/package.scala | 9 --------- 3 files changed, 11 insertions(+), 17 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala index 5c223058bc69e..854dfbd7b1cb0 100644 --- a/core/src/main/scala/org/apache/spark/SparkConf.scala +++ b/core/src/main/scala/org/apache/spark/SparkConf.scala @@ -611,7 +611,8 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Seria s"${NETWORK_AUTH_ENABLED.key} must be enabled when enabling encryption.") val executorTimeoutThreshold = getTimeAsSeconds("spark.network.timeout", "120s") - val executorHeartbeatInterval = get(EXECUTOR_HEARTBEAT_INTERVAL).millis.toSeconds + val executorHeartbeatInterval = + getTimeAsMs("spark.executor.heartbeatInterval", "10s").millis.toSeconds // If spark.executor.heartbeatInterval bigger than spark.network.timeout, // it will almost always cause ExecutorLostFailure. See SPARK-22754. require(executorTimeoutThreshold > executorHeartbeatInterval, "The value of " + diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index d592fdb6a1072..358bda39c938c 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -72,8 +72,6 @@ private[spark] class Executor( private val conf = env.conf - private val heartbeatInterval = conf.get(EXECUTOR_HEARTBEAT_INTERVAL) - // No ip or host:port - just hostname Utils.checkHost(executorHostname) // must not have port specified. @@ -834,10 +832,12 @@ private[spark] class Executor( } val message = Heartbeat(executorId, accumUpdates.toArray, env.blockManager.blockManagerId) + val intervalMs = conf.getTimeAsMs("spark.executor.heartbeatInterval", "10s") try { - val response = heartbeatReceiverRef.askSync[HeartbeatResponse]( - message, new RpcTimeout(heartbeatInterval.millis.toSeconds.seconds, - EXECUTOR_HEARTBEAT_INTERVAL.key)) + val response = + heartbeatReceiverRef.askSync[HeartbeatResponse]( + message, + new RpcTimeout(intervalMs.millis.toSeconds.seconds, "spark.executor.heartbeatInterval")) if (response.reregisterBlockManager) { logInfo("Told to re-register on heartbeat") env.blockManager.reregister() @@ -859,15 +859,17 @@ private[spark] class Executor( * Schedules a task to report heartbeat and partial metrics for active tasks to driver. */ private def startDriverHeartbeater(): Unit = { + val intervalMs = conf.getTimeAsMs("spark.executor.heartbeatInterval", "10s") + // Wait a random interval so the heartbeats don't end up in sync - val initialDelay = heartbeatInterval + (math.random * heartbeatInterval).asInstanceOf[Int] + val initialDelay = intervalMs + (math.random * intervalMs).asInstanceOf[Int] val heartbeatTask = new Runnable() { override def run(): Unit = Utils.logUncaughtExceptions(reportHeartBeat()) } heartbeater .scheduleAtFixedRate(heartbeatTask, initialDelay, - heartbeatInterval, TimeUnit.MILLISECONDS) + intervalMs, TimeUnit.MILLISECONDS) } } diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index c000a64ad10ee..5c17b9b3a3207 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -646,13 +646,4 @@ package object config { .stringConf .toSequence .createWithDefault(Nil) - - private[spark] val EXECUTOR_HEARTBEAT_INTERVAL = - ConfigBuilder("spark.executor.heartbeatInterval") - .doc("Interval between each executor's heartbeats to the driver. Heartbeats let " + - "the driver know that the executor is still alive and update it with metrics for" + - "in-progress tasks. spark.executor.heartbeatInterval should be significantly less than " + - "spark.network.timeout") - .timeConf(TimeUnit.MILLISECONDS) - .createWithDefaultString("10s") } From c62330a26a977cad657d0c15a3e7874ae9028877 Mon Sep 17 00:00:00 2001 From: Ajith Date: Sun, 24 Mar 2019 12:53:29 +0530 Subject: [PATCH 07/10] update --- core/src/main/scala/org/apache/spark/executor/Executor.scala | 4 +--- .../cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala | 2 +- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 358bda39c938c..5b2806653dd82 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -867,9 +867,7 @@ private[spark] class Executor( val heartbeatTask = new Runnable() { override def run(): Unit = Utils.logUncaughtExceptions(reportHeartBeat()) } - heartbeater - .scheduleAtFixedRate(heartbeatTask, initialDelay, - intervalMs, TimeUnit.MILLISECONDS) + heartbeater.scheduleAtFixedRate(heartbeatTask, initialDelay, intervalMs, TimeUnit.MILLISECONDS) } } diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala index 80cbd24c77fc9..178de30f0f381 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala @@ -635,7 +635,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( externalShufflePort, sc.conf.getTimeAsMs("spark.storage.blockManagerSlaveTimeoutMs", s"${sc.conf.getTimeAsSeconds("spark.network.timeout", "120s")}s"), - sc.conf.get(config.EXECUTOR_HEARTBEAT_INTERVAL)) + sc.conf.getTimeAsMs("spark.executor.heartbeatInterval", "10s")) slave.shuffleRegistered = true } From 9941e93d8d7f8327629ab3e6a7b6bf6229fb7dbf Mon Sep 17 00:00:00 2001 From: Ajith Date: Sun, 24 Mar 2019 12:55:46 +0530 Subject: [PATCH 08/10] update --- core/src/main/scala/org/apache/spark/executor/Executor.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 5b2806653dd82..c8ef7ff328d76 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -832,12 +832,13 @@ private[spark] class Executor( } val message = Heartbeat(executorId, accumUpdates.toArray, env.blockManager.blockManagerId) - val intervalMs = conf.getTimeAsMs("spark.executor.heartbeatInterval", "10s") + val heartbeatIntervalInSec = conf.getTimeAsMs("spark.executor.heartbeatInterval", "10s") + .millis.toSeconds.seconds try { val response = heartbeatReceiverRef.askSync[HeartbeatResponse]( message, - new RpcTimeout(intervalMs.millis.toSeconds.seconds, "spark.executor.heartbeatInterval")) + new RpcTimeout(heartbeatIntervalInSec, "spark.executor.heartbeatInterval")) if (response.reregisterBlockManager) { logInfo("Told to re-register on heartbeat") env.blockManager.reregister() From 39d9b592c7bde55a10520e560b740d6ba35d737b Mon Sep 17 00:00:00 2001 From: Ajith Date: Sun, 24 Mar 2019 12:58:33 +0530 Subject: [PATCH 09/10] updated --- .../main/scala/org/apache/spark/executor/Executor.scala | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index c8ef7ff328d76..13b93af6e60af 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -832,13 +832,12 @@ private[spark] class Executor( } val message = Heartbeat(executorId, accumUpdates.toArray, env.blockManager.blockManagerId) - val heartbeatIntervalInSec = conf.getTimeAsMs("spark.executor.heartbeatInterval", "10s") - .millis.toSeconds.seconds + val heartbeatIntervalInSec = + conf.getTimeAsMs("spark.executor.heartbeatInterval", "10s").millis.toSeconds.seconds try { val response = heartbeatReceiverRef.askSync[HeartbeatResponse]( - message, - new RpcTimeout(heartbeatIntervalInSec, "spark.executor.heartbeatInterval")) + message, new RpcTimeout(heartbeatIntervalInSec, "spark.executor.heartbeatInterval")) if (response.reregisterBlockManager) { logInfo("Told to re-register on heartbeat") env.blockManager.reregister() From 6c17bf8f1a004fbe0efadadeb0fd9bda4eb2b1f3 Mon Sep 17 00:00:00 2001 From: Ajith Date: Sun, 24 Mar 2019 13:03:16 +0530 Subject: [PATCH 10/10] update --- core/src/main/scala/org/apache/spark/executor/Executor.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 13b93af6e60af..9ac8063739984 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -835,8 +835,7 @@ private[spark] class Executor( val heartbeatIntervalInSec = conf.getTimeAsMs("spark.executor.heartbeatInterval", "10s").millis.toSeconds.seconds try { - val response = - heartbeatReceiverRef.askSync[HeartbeatResponse]( + val response = heartbeatReceiverRef.askSync[HeartbeatResponse]( message, new RpcTimeout(heartbeatIntervalInSec, "spark.executor.heartbeatInterval")) if (response.reregisterBlockManager) { logInfo("Told to re-register on heartbeat")