From 10bf41ec86c0af59a791fa02b5efaedc7a164a3c Mon Sep 17 00:00:00 2001 From: xueyu <278006819@qq.com> Date: Thu, 14 Jun 2018 19:01:29 +0800 Subject: [PATCH 1/5] fix getTimeAs method --- .../main/scala/org/apache/spark/api/python/PythonRunner.scala | 2 +- .../scala/org/apache/spark/deploy/worker/DriverRunner.scala | 2 +- core/src/main/scala/org/apache/spark/executor/Executor.scala | 4 ++-- .../scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala | 2 +- .../scheduler/cluster/CoarseGrainedSchedulerBackend.scala | 4 ++-- .../main/scala/org/apache/spark/ui/ConsoleProgressBar.scala | 2 +- core/src/main/scala/org/apache/spark/util/RpcUtils.scala | 2 +- .../cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala | 4 ++-- 8 files changed, 11 insertions(+), 11 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala index ebabedf950e3..b96cb04da16c 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala @@ -354,7 +354,7 @@ private[spark] abstract class BasePythonRunner[IN, OUT]( extends Thread(s"Worker Monitor for $pythonExec") { /** How long to wait before killing the python worker if a task cannot be interrupted. */ - private val taskKillTimeout = env.conf.getTimeAsMs("spark.python.task.killTimeout", "2s") + private val taskKillTimeout = env.conf.getTimeAsSeconds("spark.python.task.killTimeout", "2s") * 1000L setDaemon(true) diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala index 58a181128eb4..f4366a5ca251 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala @@ -58,7 +58,7 @@ private[deploy] class DriverRunner( // Timeout to wait for when trying to terminate a driver. private val DRIVER_TERMINATE_TIMEOUT_MS = - conf.getTimeAsMs("spark.worker.driverTerminateTimeout", "10s") + conf.getTimeAsSeconds("spark.worker.driverTerminateTimeout", "10s") * 1000L // Decoupled for testing def setClock(_clock: Clock): Unit = { diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index b1856ff0f324..e3483d273c80 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -613,7 +613,7 @@ private[spark] class Executor( private[this] val taskId: Long = taskRunner.taskId private[this] val killPollingIntervalMs: Long = - conf.getTimeAsMs("spark.task.reaper.pollingInterval", "10s") + conf.getTimeAsSeconds("spark.task.reaper.pollingInterval", "10s") * 1000L private[this] val killTimeoutMs: Long = conf.getTimeAsMs("spark.task.reaper.killTimeout", "-1") @@ -820,7 +820,7 @@ private[spark] class Executor( * Schedules a task to report heartbeat and partial metrics for active tasks to driver. */ private def startDriverHeartbeater(): Unit = { - val intervalMs = conf.getTimeAsMs("spark.executor.heartbeatInterval", "10s") + val intervalMs = conf.getTimeAsSeconds("spark.executor.heartbeatInterval", "10s") * 1000L // Wait a random interval so the heartbeats don't end up in sync val initialDelay = intervalMs + (math.random * intervalMs).asInstanceOf[Int] diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 598b62f85a1f..7bff11174c95 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -80,7 +80,7 @@ private[spark] class TaskSchedulerImpl( ThreadUtils.newDaemonSingleThreadScheduledExecutor("task-scheduler-speculation") // Threshold above which we warn user initial TaskSet may be starved - val STARVATION_TIMEOUT_MS = conf.getTimeAsMs("spark.starvation.timeout", "15s") + val STARVATION_TIMEOUT_MS = conf.getTimeAsSeconds("spark.starvation.timeout", "15s") * 1000L // CPUs to request per task val CPUS_PER_TASK = conf.getInt("spark.task.cpus", 1) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index d8794e8e551a..29f96af1af82 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -58,7 +58,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp // Submit tasks after maxRegisteredWaitingTime milliseconds // if minRegisteredRatio has not yet been reached private val maxRegisteredWaitingTimeMs = - conf.getTimeAsMs("spark.scheduler.maxRegisteredResourcesWaitingTime", "30s") + conf.getTimeAsSeconds("spark.scheduler.maxRegisteredResourcesWaitingTime", "30s") * 1000L private val createTime = System.currentTimeMillis() // Accessing `executorDataMap` in `DriverEndpoint.receive/receiveAndReply` doesn't need any @@ -108,7 +108,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp override def onStart() { // Periodically revive offers to allow delay scheduling to work - val reviveIntervalMs = conf.getTimeAsMs("spark.scheduler.revive.interval", "1s") + val reviveIntervalMs = conf.getTimeAsSeconds("spark.scheduler.revive.interval", "1s") * 1000L reviveThread.scheduleAtFixedRate(new Runnable { override def run(): Unit = Utils.tryLogNonFatalError { diff --git a/core/src/main/scala/org/apache/spark/ui/ConsoleProgressBar.scala b/core/src/main/scala/org/apache/spark/ui/ConsoleProgressBar.scala index 3c4ee4eb6bbb..424b121aa137 100644 --- a/core/src/main/scala/org/apache/spark/ui/ConsoleProgressBar.scala +++ b/core/src/main/scala/org/apache/spark/ui/ConsoleProgressBar.scala @@ -34,7 +34,7 @@ private[spark] class ConsoleProgressBar(sc: SparkContext) extends Logging { private val CR = '\r' // Update period of progress bar, in milliseconds private val updatePeriodMSec = - sc.getConf.getTimeAsMs("spark.ui.consoleProgress.update.interval", "200") + sc.getConf.getTimeAsMs("spark.ui.consoleProgress.update.interval", "200ms") // Delay to show up a progress bar, in milliseconds private val firstDelayMSec = 500L diff --git a/core/src/main/scala/org/apache/spark/util/RpcUtils.scala b/core/src/main/scala/org/apache/spark/util/RpcUtils.scala index e5cccf39f945..f68b70caa121 100644 --- a/core/src/main/scala/org/apache/spark/util/RpcUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/RpcUtils.scala @@ -39,7 +39,7 @@ private[spark] object RpcUtils { /** Returns the configured number of milliseconds to wait on each retry */ def retryWaitMs(conf: SparkConf): Long = { - conf.getTimeAsMs("spark.rpc.retry.wait", "3s") + conf.getTimeAsSeconds("spark.rpc.retry.wait", "3s") * 1000L } /** Returns the default Spark timeout to use for RPC ask operations. */ diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala index d35bea4aca31..cd9f6933679a 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala @@ -85,7 +85,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( private val taskLabels = conf.get("spark.mesos.task.labels", "") private[this] val shutdownTimeoutMS = - conf.getTimeAsMs("spark.mesos.coarse.shutdownTimeout", "10s") + (conf.getTimeAsSeconds("spark.mesos.coarse.shutdownTimeout", "10s") * 1000L) .ensuring(_ >= 0, "spark.mesos.coarse.shutdownTimeout must be >= 0") // Synchronization protected by stateLock @@ -635,7 +635,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( externalShufflePort, sc.conf.getTimeAsMs("spark.storage.blockManagerSlaveTimeoutMs", s"${sc.conf.getTimeAsSeconds("spark.network.timeout", "120s") * 1000L}ms"), - sc.conf.getTimeAsMs("spark.executor.heartbeatInterval", "10s")) + sc.conf.getTimeAsSeconds("spark.executor.heartbeatInterval", "10s") * 1000L) slave.shuffleRegistered = true } From 3f59a0542d67613073c788f0bbeb7c885b0ab451 Mon Sep 17 00:00:00 2001 From: xueyu <278006819@qq.com> Date: Fri, 15 Jun 2018 19:03:14 +0800 Subject: [PATCH 2/5] use DurationConvertions etc --- .../spark/api/python/PythonRunner.scala | 5 ++-- .../spark/deploy/worker/DriverRunner.scala | 3 ++- .../org/apache/spark/executor/Executor.scala | 5 ++-- .../spark/scheduler/TaskSchedulerImpl.scala | 3 ++- .../CoarseGrainedSchedulerBackend.scala | 5 ++-- .../org/apache/spark/util/RpcUtils.scala | 4 +++- .../org/apache/spark/SparkConfSuite.scala | 23 +++++++++++++++++++ .../MesosCoarseGrainedSchedulerBackend.scala | 5 ++-- 8 files changed, 42 insertions(+), 11 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala index b96cb04da16c..74c068b2650a 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala @@ -23,6 +23,7 @@ import java.nio.charset.StandardCharsets import java.util.concurrent.atomic.AtomicBoolean import scala.collection.JavaConverters._ +import scala.concurrent.duration._ import org.apache.spark._ import org.apache.spark.internal.Logging @@ -354,7 +355,7 @@ private[spark] abstract class BasePythonRunner[IN, OUT]( extends Thread(s"Worker Monitor for $pythonExec") { /** How long to wait before killing the python worker if a task cannot be interrupted. */ - private val taskKillTimeout = env.conf.getTimeAsSeconds("spark.python.task.killTimeout", "2s") * 1000L + private val taskKillTimeoutMs = env.conf.getTimeAsSeconds("spark.python.task.killTimeout", "2s").seconds.toMillis setDaemon(true) @@ -365,7 +366,7 @@ private[spark] abstract class BasePythonRunner[IN, OUT]( Thread.sleep(2000) } if (!context.isCompleted) { - Thread.sleep(taskKillTimeout) + Thread.sleep(taskKillTimeoutMs) if (!context.isCompleted) { try { // Mimic the task name used in `Executor` to help the user find out the task to blame. diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala index f4366a5ca251..bca9a770ff8c 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala @@ -22,6 +22,7 @@ import java.net.URI import java.nio.charset.StandardCharsets import scala.collection.JavaConverters._ +import scala.concurrent.duration._ import com.google.common.io.Files @@ -58,7 +59,7 @@ private[deploy] class DriverRunner( // Timeout to wait for when trying to terminate a driver. private val DRIVER_TERMINATE_TIMEOUT_MS = - conf.getTimeAsSeconds("spark.worker.driverTerminateTimeout", "10s") * 1000L + conf.getTimeAsSeconds("spark.worker.driverTerminateTimeout", "10s").seconds.toMillis // Decoupled for testing def setClock(_clock: Clock): Unit = { diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index e3483d273c80..f0e7deb512d4 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -28,6 +28,7 @@ import javax.annotation.concurrent.GuardedBy import scala.collection.JavaConverters._ import scala.collection.mutable.{ArrayBuffer, HashMap, Map} +import scala.concurrent.duration._ import scala.util.control.NonFatal import com.google.common.util.concurrent.ThreadFactoryBuilder @@ -613,7 +614,7 @@ private[spark] class Executor( private[this] val taskId: Long = taskRunner.taskId private[this] val killPollingIntervalMs: Long = - conf.getTimeAsSeconds("spark.task.reaper.pollingInterval", "10s") * 1000L + conf.getTimeAsSeconds("spark.task.reaper.pollingInterval", "10s").seconds.toMillis private[this] val killTimeoutMs: Long = conf.getTimeAsMs("spark.task.reaper.killTimeout", "-1") @@ -820,7 +821,7 @@ private[spark] class Executor( * Schedules a task to report heartbeat and partial metrics for active tasks to driver. */ private def startDriverHeartbeater(): Unit = { - val intervalMs = conf.getTimeAsSeconds("spark.executor.heartbeatInterval", "10s") * 1000L + val intervalMs = conf.getTimeAsSeconds("spark.executor.heartbeatInterval", "10s").seconds.toMillis // Wait a random interval so the heartbeats don't end up in sync val initialDelay = intervalMs + (math.random * intervalMs).asInstanceOf[Int] diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 7bff11174c95..0129753e2c68 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -24,6 +24,7 @@ import java.util.concurrent.atomic.AtomicLong import scala.collection.Set import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} +import scala.concurrent.duration._ import scala.util.Random import org.apache.spark._ @@ -80,7 +81,7 @@ private[spark] class TaskSchedulerImpl( ThreadUtils.newDaemonSingleThreadScheduledExecutor("task-scheduler-speculation") // Threshold above which we warn user initial TaskSet may be starved - val STARVATION_TIMEOUT_MS = conf.getTimeAsSeconds("spark.starvation.timeout", "15s") * 1000L + val STARVATION_TIMEOUT_MS = conf.getTimeAsSeconds("spark.starvation.timeout", "15s").seconds.toMillis // CPUs to request per task val CPUS_PER_TASK = conf.getInt("spark.task.cpus", 1) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 29f96af1af82..b16c74c63c51 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -22,6 +22,7 @@ import java.util.concurrent.atomic.AtomicInteger import javax.annotation.concurrent.GuardedBy import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} +import scala.concurrent.duration._ import scala.concurrent.Future import org.apache.spark.{ExecutorAllocationClient, SparkEnv, SparkException, TaskState} @@ -58,7 +59,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp // Submit tasks after maxRegisteredWaitingTime milliseconds // if minRegisteredRatio has not yet been reached private val maxRegisteredWaitingTimeMs = - conf.getTimeAsSeconds("spark.scheduler.maxRegisteredResourcesWaitingTime", "30s") * 1000L + conf.getTimeAsSeconds("spark.scheduler.maxRegisteredResourcesWaitingTime", "30s").seconds.toMillis private val createTime = System.currentTimeMillis() // Accessing `executorDataMap` in `DriverEndpoint.receive/receiveAndReply` doesn't need any @@ -108,7 +109,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp override def onStart() { // Periodically revive offers to allow delay scheduling to work - val reviveIntervalMs = conf.getTimeAsSeconds("spark.scheduler.revive.interval", "1s") * 1000L + val reviveIntervalMs = conf.getTimeAsSeconds("spark.scheduler.revive.interval", "1s").seconds.toMillis reviveThread.scheduleAtFixedRate(new Runnable { override def run(): Unit = Utils.tryLogNonFatalError { diff --git a/core/src/main/scala/org/apache/spark/util/RpcUtils.scala b/core/src/main/scala/org/apache/spark/util/RpcUtils.scala index f68b70caa121..a64bbd8d2081 100644 --- a/core/src/main/scala/org/apache/spark/util/RpcUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/RpcUtils.scala @@ -17,6 +17,8 @@ package org.apache.spark.util +import scala.concurrent.duration._ + import org.apache.spark.SparkConf import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef, RpcEnv, RpcTimeout} @@ -39,7 +41,7 @@ private[spark] object RpcUtils { /** Returns the configured number of milliseconds to wait on each retry */ def retryWaitMs(conf: SparkConf): Long = { - conf.getTimeAsSeconds("spark.rpc.retry.wait", "3s") * 1000L + conf.getTimeAsSeconds("spark.rpc.retry.wait", "3s").seconds.toMillis } /** Returns the default Spark timeout to use for RPC ask operations. */ diff --git a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala index 0d06b02e74e3..30085aedc8d3 100644 --- a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala @@ -371,6 +371,29 @@ class SparkConfSuite extends SparkFunSuite with LocalSparkContext with ResetSyst assert(thrown.getMessage.contains(key)) } } + + test("SPARK-24560") { + val conf = new SparkConf() + conf.set("spark.python.task.killTimeout", "3") + conf.set("spark.worker.driverTerminateTimeout", "12") + conf.set("spark.task.reaper.pollingInterval", "12") + conf.set("spark.executor.heartbeatInterval", "12") + conf.set("spark.starvation.timeout", "16") + conf.set("spark.scheduler.maxRegisteredResourcesWaitingTime", "32") + conf.set("spark.scheduler.revive.interval", "2") + conf.set("spark.rpc.retry.wait", "4") + conf.set("spark.mesos.coarse.shutdownTimeout", "12") + assert(conf.getTimeAsSeconds("spark.python.task.killTimeout").seconds.toMillis === 3000) + assert(conf.getTimeAsSeconds("spark.worker.driverTerminateTimeout").seconds.toMillis === 12000) + assert(conf.getTimeAsSeconds("spark.task.reaper.pollingInterval").seconds.toMillis === 12000) + assert(conf.getTimeAsSeconds("spark.executor.heartbeatInterval").seconds.toMillis === 12000) + assert(conf.getTimeAsSeconds("spark.starvation.timeout").seconds.toMillis === 16000) + assert(conf.getTimeAsSeconds("spark.scheduler.maxRegisteredResourcesWaitingTime").seconds.toMillis === 32000) + assert(conf.getTimeAsSeconds("spark.scheduler.revive.interval").seconds.toMillis === 2000) + assert(conf.getTimeAsSeconds("spark.rpc.retry.wait").seconds.toMillis === 4000) + assert(conf.getTimeAsSeconds("spark.mesos.coarse.shutdownTimeout").seconds.toMillis === 12000) + } + } class Class1 {} diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala index cd9f6933679a..6051077bd369 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala @@ -24,6 +24,7 @@ import java.util.concurrent.locks.ReentrantLock import scala.collection.JavaConverters._ import scala.collection.mutable +import scala.concurrent.duration._ import scala.concurrent.Future import org.apache.hadoop.security.UserGroupInformation @@ -85,7 +86,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( private val taskLabels = conf.get("spark.mesos.task.labels", "") private[this] val shutdownTimeoutMS = - (conf.getTimeAsSeconds("spark.mesos.coarse.shutdownTimeout", "10s") * 1000L) + conf.getTimeAsSeconds("spark.mesos.coarse.shutdownTimeout", "10s").seconds.toMillis .ensuring(_ >= 0, "spark.mesos.coarse.shutdownTimeout must be >= 0") // Synchronization protected by stateLock @@ -634,7 +635,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( slave.hostname, externalShufflePort, sc.conf.getTimeAsMs("spark.storage.blockManagerSlaveTimeoutMs", - s"${sc.conf.getTimeAsSeconds("spark.network.timeout", "120s") * 1000L}ms"), + s"${sc.conf.getTimeAsSeconds("spark.network.timeout", "120s").seconds.toMillis}ms"), sc.conf.getTimeAsSeconds("spark.executor.heartbeatInterval", "10s") * 1000L) slave.shuffleRegistered = true } From a8dc241d0d9a0c0a2fef1b09bad2ac52fe4f9dd9 Mon Sep 17 00:00:00 2001 From: xueyu <278006819@qq.com> Date: Fri, 15 Jun 2018 19:14:40 +0800 Subject: [PATCH 3/5] use DurationConvertions etc --- .../cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala index 6051077bd369..7e0b68fbe792 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala @@ -636,7 +636,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( externalShufflePort, sc.conf.getTimeAsMs("spark.storage.blockManagerSlaveTimeoutMs", s"${sc.conf.getTimeAsSeconds("spark.network.timeout", "120s").seconds.toMillis}ms"), - sc.conf.getTimeAsSeconds("spark.executor.heartbeatInterval", "10s") * 1000L) + sc.conf.getTimeAsSeconds("spark.executor.heartbeatInterval", "10s").seconds.toMillis) slave.shuffleRegistered = true } From a69df1137f9918670523120a2a0516b855833e84 Mon Sep 17 00:00:00 2001 From: xueyu <278006819@qq.com> Date: Sat, 16 Jun 2018 13:19:13 +0800 Subject: [PATCH 4/5] fix check style error --- .../scala/org/apache/spark/api/python/PythonRunner.scala | 3 ++- .../main/scala/org/apache/spark/executor/Executor.scala | 3 ++- .../org/apache/spark/scheduler/TaskSchedulerImpl.scala | 3 ++- .../scheduler/cluster/CoarseGrainedSchedulerBackend.scala | 8 +++++--- core/src/test/scala/org/apache/spark/SparkConfSuite.scala | 3 ++- .../mesos/MesosCoarseGrainedSchedulerBackend.scala | 2 +- 6 files changed, 14 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala index 74c068b2650a..8f98045ebead 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala @@ -355,7 +355,8 @@ private[spark] abstract class BasePythonRunner[IN, OUT]( extends Thread(s"Worker Monitor for $pythonExec") { /** How long to wait before killing the python worker if a task cannot be interrupted. */ - private val taskKillTimeoutMs = env.conf.getTimeAsSeconds("spark.python.task.killTimeout", "2s").seconds.toMillis + private val taskKillTimeoutMs = env.conf.getTimeAsSeconds("spark.python.task.killTimeout", + "2s").seconds.toMillis setDaemon(true) diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index f0e7deb512d4..ed2b5929a782 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -821,7 +821,8 @@ private[spark] class Executor( * Schedules a task to report heartbeat and partial metrics for active tasks to driver. */ private def startDriverHeartbeater(): Unit = { - val intervalMs = conf.getTimeAsSeconds("spark.executor.heartbeatInterval", "10s").seconds.toMillis + val intervalMs = conf.getTimeAsSeconds("spark.executor.heartbeatInterval", + "10s").seconds.toMillis // Wait a random interval so the heartbeats don't end up in sync val initialDelay = intervalMs + (math.random * intervalMs).asInstanceOf[Int] diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 0129753e2c68..3e4f68cd18ec 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -81,7 +81,8 @@ private[spark] class TaskSchedulerImpl( ThreadUtils.newDaemonSingleThreadScheduledExecutor("task-scheduler-speculation") // Threshold above which we warn user initial TaskSet may be starved - val STARVATION_TIMEOUT_MS = conf.getTimeAsSeconds("spark.starvation.timeout", "15s").seconds.toMillis + val STARVATION_TIMEOUT_MS = conf.getTimeAsSeconds("spark.starvation.timeout", + "15s").seconds.toMillis // CPUs to request per task val CPUS_PER_TASK = conf.getInt("spark.task.cpus", 1) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index b16c74c63c51..4724c58fa788 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -22,8 +22,8 @@ import java.util.concurrent.atomic.AtomicInteger import javax.annotation.concurrent.GuardedBy import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} -import scala.concurrent.duration._ import scala.concurrent.Future +import scala.concurrent.duration._ import org.apache.spark.{ExecutorAllocationClient, SparkEnv, SparkException, TaskState} import org.apache.spark.internal.Logging @@ -59,7 +59,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp // Submit tasks after maxRegisteredWaitingTime milliseconds // if minRegisteredRatio has not yet been reached private val maxRegisteredWaitingTimeMs = - conf.getTimeAsSeconds("spark.scheduler.maxRegisteredResourcesWaitingTime", "30s").seconds.toMillis + conf.getTimeAsSeconds("spark.scheduler.maxRegisteredResourcesWaitingTime", + "30s").seconds.toMillis private val createTime = System.currentTimeMillis() // Accessing `executorDataMap` in `DriverEndpoint.receive/receiveAndReply` doesn't need any @@ -109,7 +110,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp override def onStart() { // Periodically revive offers to allow delay scheduling to work - val reviveIntervalMs = conf.getTimeAsSeconds("spark.scheduler.revive.interval", "1s").seconds.toMillis + val reviveIntervalMs = conf.getTimeAsSeconds("spark.scheduler.revive.interval", + "1s").seconds.toMillis reviveThread.scheduleAtFixedRate(new Runnable { override def run(): Unit = Utils.tryLogNonFatalError { diff --git a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala index 30085aedc8d3..372c4a1a6c7c 100644 --- a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala @@ -388,7 +388,8 @@ class SparkConfSuite extends SparkFunSuite with LocalSparkContext with ResetSyst assert(conf.getTimeAsSeconds("spark.task.reaper.pollingInterval").seconds.toMillis === 12000) assert(conf.getTimeAsSeconds("spark.executor.heartbeatInterval").seconds.toMillis === 12000) assert(conf.getTimeAsSeconds("spark.starvation.timeout").seconds.toMillis === 16000) - assert(conf.getTimeAsSeconds("spark.scheduler.maxRegisteredResourcesWaitingTime").seconds.toMillis === 32000) + assert(conf.getTimeAsSeconds("spark.scheduler.maxRegisteredResourcesWaitingTime") + .seconds.toMillis === 32000) assert(conf.getTimeAsSeconds("spark.scheduler.revive.interval").seconds.toMillis === 2000) assert(conf.getTimeAsSeconds("spark.rpc.retry.wait").seconds.toMillis === 4000) assert(conf.getTimeAsSeconds("spark.mesos.coarse.shutdownTimeout").seconds.toMillis === 12000) diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala index 7e0b68fbe792..62d10062f10e 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala @@ -24,8 +24,8 @@ import java.util.concurrent.locks.ReentrantLock import scala.collection.JavaConverters._ import scala.collection.mutable -import scala.concurrent.duration._ import scala.concurrent.Future +import scala.concurrent.duration._ import org.apache.hadoop.security.UserGroupInformation import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, _} From 0dd57ea193cc4c3282961cea63ecf36b1d6a7e95 Mon Sep 17 00:00:00 2001 From: xueyu <278006819@qq.com> Date: Sun, 17 Jun 2018 12:17:02 +0800 Subject: [PATCH 5/5] compatible with spark.akka.retry.wait deprecated config --- core/src/main/scala/org/apache/spark/util/RpcUtils.scala | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/util/RpcUtils.scala b/core/src/main/scala/org/apache/spark/util/RpcUtils.scala index a64bbd8d2081..bcb97e8e70ce 100644 --- a/core/src/main/scala/org/apache/spark/util/RpcUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/RpcUtils.scala @@ -41,7 +41,13 @@ private[spark] object RpcUtils { /** Returns the configured number of milliseconds to wait on each retry */ def retryWaitMs(conf: SparkConf): Long = { - conf.getTimeAsSeconds("spark.rpc.retry.wait", "3s").seconds.toMillis + if (conf.contains("spark.rpc.retry.wait") && !conf.contains("spark.akka.retry.wait")) { + conf.getTimeAsSeconds("spark.rpc.retry.wait", "3s").seconds.toMillis + } else { + // compatible with deprecated alternative `spark.akka.retry.wait` which has default + // unit as millisecond + conf.getTimeAsMs("spark.rpc.retry.wait", "3s") + } } /** Returns the default Spark timeout to use for RPC ask operations. */