Skip to content

Commit 10bf41e

Browse files
committed
fix getTimeAs method
1 parent 3bf7691 commit 10bf41e

File tree

8 files changed

+11
-11
lines changed

8 files changed

+11
-11
lines changed

core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -354,7 +354,7 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
354354
extends Thread(s"Worker Monitor for $pythonExec") {
355355

356356
/** How long to wait before killing the python worker if a task cannot be interrupted. */
357-
private val taskKillTimeout = env.conf.getTimeAsMs("spark.python.task.killTimeout", "2s")
357+
private val taskKillTimeout = env.conf.getTimeAsSeconds("spark.python.task.killTimeout", "2s") * 1000L
358358

359359
setDaemon(true)
360360

core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ private[deploy] class DriverRunner(
5858

5959
// Timeout to wait for when trying to terminate a driver.
6060
private val DRIVER_TERMINATE_TIMEOUT_MS =
61-
conf.getTimeAsMs("spark.worker.driverTerminateTimeout", "10s")
61+
conf.getTimeAsSeconds("spark.worker.driverTerminateTimeout", "10s") * 1000L
6262

6363
// Decoupled for testing
6464
def setClock(_clock: Clock): Unit = {

core/src/main/scala/org/apache/spark/executor/Executor.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -613,7 +613,7 @@ private[spark] class Executor(
613613
private[this] val taskId: Long = taskRunner.taskId
614614

615615
private[this] val killPollingIntervalMs: Long =
616-
conf.getTimeAsMs("spark.task.reaper.pollingInterval", "10s")
616+
conf.getTimeAsSeconds("spark.task.reaper.pollingInterval", "10s") * 1000L
617617

618618
private[this] val killTimeoutMs: Long = conf.getTimeAsMs("spark.task.reaper.killTimeout", "-1")
619619

@@ -820,7 +820,7 @@ private[spark] class Executor(
820820
* Schedules a task to report heartbeat and partial metrics for active tasks to driver.
821821
*/
822822
private def startDriverHeartbeater(): Unit = {
823-
val intervalMs = conf.getTimeAsMs("spark.executor.heartbeatInterval", "10s")
823+
val intervalMs = conf.getTimeAsSeconds("spark.executor.heartbeatInterval", "10s") * 1000L
824824

825825
// Wait a random interval so the heartbeats don't end up in sync
826826
val initialDelay = intervalMs + (math.random * intervalMs).asInstanceOf[Int]

core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ private[spark] class TaskSchedulerImpl(
8080
ThreadUtils.newDaemonSingleThreadScheduledExecutor("task-scheduler-speculation")
8181

8282
// Threshold above which we warn user initial TaskSet may be starved
83-
val STARVATION_TIMEOUT_MS = conf.getTimeAsMs("spark.starvation.timeout", "15s")
83+
val STARVATION_TIMEOUT_MS = conf.getTimeAsSeconds("spark.starvation.timeout", "15s") * 1000L
8484

8585
// CPUs to request per task
8686
val CPUS_PER_TASK = conf.getInt("spark.task.cpus", 1)

core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
5858
// Submit tasks after maxRegisteredWaitingTime milliseconds
5959
// if minRegisteredRatio has not yet been reached
6060
private val maxRegisteredWaitingTimeMs =
61-
conf.getTimeAsMs("spark.scheduler.maxRegisteredResourcesWaitingTime", "30s")
61+
conf.getTimeAsSeconds("spark.scheduler.maxRegisteredResourcesWaitingTime", "30s") * 1000L
6262
private val createTime = System.currentTimeMillis()
6363

6464
// Accessing `executorDataMap` in `DriverEndpoint.receive/receiveAndReply` doesn't need any
@@ -108,7 +108,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
108108

109109
override def onStart() {
110110
// Periodically revive offers to allow delay scheduling to work
111-
val reviveIntervalMs = conf.getTimeAsMs("spark.scheduler.revive.interval", "1s")
111+
val reviveIntervalMs = conf.getTimeAsSeconds("spark.scheduler.revive.interval", "1s") * 1000L
112112

113113
reviveThread.scheduleAtFixedRate(new Runnable {
114114
override def run(): Unit = Utils.tryLogNonFatalError {

core/src/main/scala/org/apache/spark/ui/ConsoleProgressBar.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ private[spark] class ConsoleProgressBar(sc: SparkContext) extends Logging {
3434
private val CR = '\r'
3535
// Update period of progress bar, in milliseconds
3636
private val updatePeriodMSec =
37-
sc.getConf.getTimeAsMs("spark.ui.consoleProgress.update.interval", "200")
37+
sc.getConf.getTimeAsMs("spark.ui.consoleProgress.update.interval", "200ms")
3838
// Delay to show up a progress bar, in milliseconds
3939
private val firstDelayMSec = 500L
4040

core/src/main/scala/org/apache/spark/util/RpcUtils.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ private[spark] object RpcUtils {
3939

4040
/** Returns the configured number of milliseconds to wait on each retry */
4141
def retryWaitMs(conf: SparkConf): Long = {
42-
conf.getTimeAsMs("spark.rpc.retry.wait", "3s")
42+
conf.getTimeAsSeconds("spark.rpc.retry.wait", "3s") * 1000L
4343
}
4444

4545
/** Returns the default Spark timeout to use for RPC ask operations. */

resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
8585
private val taskLabels = conf.get("spark.mesos.task.labels", "")
8686

8787
private[this] val shutdownTimeoutMS =
88-
conf.getTimeAsMs("spark.mesos.coarse.shutdownTimeout", "10s")
88+
(conf.getTimeAsSeconds("spark.mesos.coarse.shutdownTimeout", "10s") * 1000L)
8989
.ensuring(_ >= 0, "spark.mesos.coarse.shutdownTimeout must be >= 0")
9090

9191
// Synchronization protected by stateLock
@@ -635,7 +635,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
635635
externalShufflePort,
636636
sc.conf.getTimeAsMs("spark.storage.blockManagerSlaveTimeoutMs",
637637
s"${sc.conf.getTimeAsSeconds("spark.network.timeout", "120s") * 1000L}ms"),
638-
sc.conf.getTimeAsMs("spark.executor.heartbeatInterval", "10s"))
638+
sc.conf.getTimeAsSeconds("spark.executor.heartbeatInterval", "10s") * 1000L)
639639
slave.shuffleRegistered = true
640640
}
641641

0 commit comments

Comments
 (0)