diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala index ebabedf950e39..8f98045ebead5 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala @@ -23,6 +23,7 @@ import java.nio.charset.StandardCharsets import java.util.concurrent.atomic.AtomicBoolean import scala.collection.JavaConverters._ +import scala.concurrent.duration._ import org.apache.spark._ import org.apache.spark.internal.Logging @@ -354,7 +355,8 @@ private[spark] abstract class BasePythonRunner[IN, OUT]( extends Thread(s"Worker Monitor for $pythonExec") { /** How long to wait before killing the python worker if a task cannot be interrupted. */ - private val taskKillTimeout = env.conf.getTimeAsMs("spark.python.task.killTimeout", "2s") + private val taskKillTimeoutMs = env.conf.getTimeAsSeconds("spark.python.task.killTimeout", + "2s").seconds.toMillis setDaemon(true) @@ -365,7 +367,7 @@ private[spark] abstract class BasePythonRunner[IN, OUT]( Thread.sleep(2000) } if (!context.isCompleted) { - Thread.sleep(taskKillTimeout) + Thread.sleep(taskKillTimeoutMs) if (!context.isCompleted) { try { // Mimic the task name used in `Executor` to help the user find out the task to blame. diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala index 58a181128eb4d..bca9a770ff8c5 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala @@ -22,6 +22,7 @@ import java.net.URI import java.nio.charset.StandardCharsets import scala.collection.JavaConverters._ +import scala.concurrent.duration._ import com.google.common.io.Files @@ -58,7 +59,7 @@ private[deploy] class DriverRunner( // Timeout to wait for when trying to terminate a driver. private val DRIVER_TERMINATE_TIMEOUT_MS = - conf.getTimeAsMs("spark.worker.driverTerminateTimeout", "10s") + conf.getTimeAsSeconds("spark.worker.driverTerminateTimeout", "10s").seconds.toMillis // Decoupled for testing def setClock(_clock: Clock): Unit = { diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index b1856ff0f3247..ed2b5929a7820 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -28,6 +28,7 @@ import javax.annotation.concurrent.GuardedBy import scala.collection.JavaConverters._ import scala.collection.mutable.{ArrayBuffer, HashMap, Map} +import scala.concurrent.duration._ import scala.util.control.NonFatal import com.google.common.util.concurrent.ThreadFactoryBuilder @@ -613,7 +614,7 @@ private[spark] class Executor( private[this] val taskId: Long = taskRunner.taskId private[this] val killPollingIntervalMs: Long = - conf.getTimeAsMs("spark.task.reaper.pollingInterval", "10s") + conf.getTimeAsSeconds("spark.task.reaper.pollingInterval", "10s").seconds.toMillis private[this] val killTimeoutMs: Long = conf.getTimeAsMs("spark.task.reaper.killTimeout", "-1") @@ -820,7 +821,8 @@ private[spark] class Executor( * Schedules a task to report heartbeat and partial metrics for active tasks to driver. */ private def startDriverHeartbeater(): Unit = { - val intervalMs = conf.getTimeAsMs("spark.executor.heartbeatInterval", "10s") + val intervalMs = conf.getTimeAsSeconds("spark.executor.heartbeatInterval", + "10s").seconds.toMillis // Wait a random interval so the heartbeats don't end up in sync val initialDelay = intervalMs + (math.random * intervalMs).asInstanceOf[Int] diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 598b62f85a1fa..3e4f68cd18ec1 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -24,6 +24,7 @@ import java.util.concurrent.atomic.AtomicLong import scala.collection.Set import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} +import scala.concurrent.duration._ import scala.util.Random import org.apache.spark._ @@ -80,7 +81,8 @@ private[spark] class TaskSchedulerImpl( ThreadUtils.newDaemonSingleThreadScheduledExecutor("task-scheduler-speculation") // Threshold above which we warn user initial TaskSet may be starved - val STARVATION_TIMEOUT_MS = conf.getTimeAsMs("spark.starvation.timeout", "15s") + val STARVATION_TIMEOUT_MS = conf.getTimeAsSeconds("spark.starvation.timeout", + "15s").seconds.toMillis // CPUs to request per task val CPUS_PER_TASK = conf.getInt("spark.task.cpus", 1) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index d8794e8e551aa..4724c58fa788a 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -23,6 +23,7 @@ import javax.annotation.concurrent.GuardedBy import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} import scala.concurrent.Future +import scala.concurrent.duration._ import org.apache.spark.{ExecutorAllocationClient, SparkEnv, SparkException, TaskState} import org.apache.spark.internal.Logging @@ -58,7 +59,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp // Submit tasks after maxRegisteredWaitingTime milliseconds // if minRegisteredRatio has not yet been reached private val maxRegisteredWaitingTimeMs = - conf.getTimeAsMs("spark.scheduler.maxRegisteredResourcesWaitingTime", "30s") + conf.getTimeAsSeconds("spark.scheduler.maxRegisteredResourcesWaitingTime", + "30s").seconds.toMillis private val createTime = System.currentTimeMillis() // Accessing `executorDataMap` in `DriverEndpoint.receive/receiveAndReply` doesn't need any @@ -108,7 +110,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp override def onStart() { // Periodically revive offers to allow delay scheduling to work - val reviveIntervalMs = conf.getTimeAsMs("spark.scheduler.revive.interval", "1s") + val reviveIntervalMs = conf.getTimeAsSeconds("spark.scheduler.revive.interval", + "1s").seconds.toMillis reviveThread.scheduleAtFixedRate(new Runnable { override def run(): Unit = Utils.tryLogNonFatalError { diff --git a/core/src/main/scala/org/apache/spark/ui/ConsoleProgressBar.scala b/core/src/main/scala/org/apache/spark/ui/ConsoleProgressBar.scala index 3c4ee4eb6bbb9..424b121aa1370 100644 --- a/core/src/main/scala/org/apache/spark/ui/ConsoleProgressBar.scala +++ b/core/src/main/scala/org/apache/spark/ui/ConsoleProgressBar.scala @@ -34,7 +34,7 @@ private[spark] class ConsoleProgressBar(sc: SparkContext) extends Logging { private val CR = '\r' // Update period of progress bar, in milliseconds private val updatePeriodMSec = - sc.getConf.getTimeAsMs("spark.ui.consoleProgress.update.interval", "200") + sc.getConf.getTimeAsMs("spark.ui.consoleProgress.update.interval", "200ms") // Delay to show up a progress bar, in milliseconds private val firstDelayMSec = 500L diff --git a/core/src/main/scala/org/apache/spark/util/RpcUtils.scala b/core/src/main/scala/org/apache/spark/util/RpcUtils.scala index e5cccf39f9455..bcb97e8e70cef 100644 --- a/core/src/main/scala/org/apache/spark/util/RpcUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/RpcUtils.scala @@ -17,6 +17,8 @@ package org.apache.spark.util +import scala.concurrent.duration._ + import org.apache.spark.SparkConf import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef, RpcEnv, RpcTimeout} @@ -39,7 +41,13 @@ private[spark] object RpcUtils { /** Returns the configured number of milliseconds to wait on each retry */ def retryWaitMs(conf: SparkConf): Long = { - conf.getTimeAsMs("spark.rpc.retry.wait", "3s") + if (conf.contains("spark.rpc.retry.wait") && !conf.contains("spark.akka.retry.wait")) { + conf.getTimeAsSeconds("spark.rpc.retry.wait", "3s").seconds.toMillis + } else { + // compatible with deprecated alternative `spark.akka.retry.wait` which has default + // unit as millisecond + conf.getTimeAsMs("spark.rpc.retry.wait", "3s") + } } /** Returns the default Spark timeout to use for RPC ask operations. */ diff --git a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala index 0d06b02e74e34..372c4a1a6c7c2 100644 --- a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala @@ -371,6 +371,30 @@ class SparkConfSuite extends SparkFunSuite with LocalSparkContext with ResetSyst assert(thrown.getMessage.contains(key)) } } + + test("SPARK-24560") { + val conf = new SparkConf() + conf.set("spark.python.task.killTimeout", "3") + conf.set("spark.worker.driverTerminateTimeout", "12") + conf.set("spark.task.reaper.pollingInterval", "12") + conf.set("spark.executor.heartbeatInterval", "12") + conf.set("spark.starvation.timeout", "16") + conf.set("spark.scheduler.maxRegisteredResourcesWaitingTime", "32") + conf.set("spark.scheduler.revive.interval", "2") + conf.set("spark.rpc.retry.wait", "4") + conf.set("spark.mesos.coarse.shutdownTimeout", "12") + assert(conf.getTimeAsSeconds("spark.python.task.killTimeout").seconds.toMillis === 3000) + assert(conf.getTimeAsSeconds("spark.worker.driverTerminateTimeout").seconds.toMillis === 12000) + assert(conf.getTimeAsSeconds("spark.task.reaper.pollingInterval").seconds.toMillis === 12000) + assert(conf.getTimeAsSeconds("spark.executor.heartbeatInterval").seconds.toMillis === 12000) + assert(conf.getTimeAsSeconds("spark.starvation.timeout").seconds.toMillis === 16000) + assert(conf.getTimeAsSeconds("spark.scheduler.maxRegisteredResourcesWaitingTime") + .seconds.toMillis === 32000) + assert(conf.getTimeAsSeconds("spark.scheduler.revive.interval").seconds.toMillis === 2000) + assert(conf.getTimeAsSeconds("spark.rpc.retry.wait").seconds.toMillis === 4000) + assert(conf.getTimeAsSeconds("spark.mesos.coarse.shutdownTimeout").seconds.toMillis === 12000) + } + } class Class1 {} diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala index d35bea4aca311..62d10062f10e2 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala @@ -25,6 +25,7 @@ import java.util.concurrent.locks.ReentrantLock import scala.collection.JavaConverters._ import scala.collection.mutable import scala.concurrent.Future +import scala.concurrent.duration._ import org.apache.hadoop.security.UserGroupInformation import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, _} @@ -85,7 +86,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( private val taskLabels = conf.get("spark.mesos.task.labels", "") private[this] val shutdownTimeoutMS = - conf.getTimeAsMs("spark.mesos.coarse.shutdownTimeout", "10s") + conf.getTimeAsSeconds("spark.mesos.coarse.shutdownTimeout", "10s").seconds.toMillis .ensuring(_ >= 0, "spark.mesos.coarse.shutdownTimeout must be >= 0") // Synchronization protected by stateLock @@ -634,8 +635,8 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( slave.hostname, externalShufflePort, sc.conf.getTimeAsMs("spark.storage.blockManagerSlaveTimeoutMs", - s"${sc.conf.getTimeAsSeconds("spark.network.timeout", "120s") * 1000L}ms"), - sc.conf.getTimeAsMs("spark.executor.heartbeatInterval", "10s")) + s"${sc.conf.getTimeAsSeconds("spark.network.timeout", "120s").seconds.toMillis}ms"), + sc.conf.getTimeAsSeconds("spark.executor.heartbeatInterval", "10s").seconds.toMillis) slave.shuffleRegistered = true }