Skip to content

Commit 3f59a05

Browse files
committed
use DurationConvertions etc
1 parent 10bf41e commit 3f59a05

File tree

8 files changed

+42
-11
lines changed

8 files changed

+42
-11
lines changed

core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import java.nio.charset.StandardCharsets
2323
import java.util.concurrent.atomic.AtomicBoolean
2424

2525
import scala.collection.JavaConverters._
26+
import scala.concurrent.duration._
2627

2728
import org.apache.spark._
2829
import org.apache.spark.internal.Logging
@@ -354,7 +355,7 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
354355
extends Thread(s"Worker Monitor for $pythonExec") {
355356

356357
/** How long to wait before killing the python worker if a task cannot be interrupted. */
357-
private val taskKillTimeout = env.conf.getTimeAsSeconds("spark.python.task.killTimeout", "2s") * 1000L
358+
private val taskKillTimeoutMs = env.conf.getTimeAsSeconds("spark.python.task.killTimeout", "2s").seconds.toMillis
358359

359360
setDaemon(true)
360361

@@ -365,7 +366,7 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
365366
Thread.sleep(2000)
366367
}
367368
if (!context.isCompleted) {
368-
Thread.sleep(taskKillTimeout)
369+
Thread.sleep(taskKillTimeoutMs)
369370
if (!context.isCompleted) {
370371
try {
371372
// Mimic the task name used in `Executor` to help the user find out the task to blame.

core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import java.net.URI
2222
import java.nio.charset.StandardCharsets
2323

2424
import scala.collection.JavaConverters._
25+
import scala.concurrent.duration._
2526

2627
import com.google.common.io.Files
2728

@@ -58,7 +59,7 @@ private[deploy] class DriverRunner(
5859

5960
// Timeout to wait for when trying to terminate a driver.
6061
private val DRIVER_TERMINATE_TIMEOUT_MS =
61-
conf.getTimeAsSeconds("spark.worker.driverTerminateTimeout", "10s") * 1000L
62+
conf.getTimeAsSeconds("spark.worker.driverTerminateTimeout", "10s").seconds.toMillis
6263

6364
// Decoupled for testing
6465
def setClock(_clock: Clock): Unit = {

core/src/main/scala/org/apache/spark/executor/Executor.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import javax.annotation.concurrent.GuardedBy
2828

2929
import scala.collection.JavaConverters._
3030
import scala.collection.mutable.{ArrayBuffer, HashMap, Map}
31+
import scala.concurrent.duration._
3132
import scala.util.control.NonFatal
3233

3334
import com.google.common.util.concurrent.ThreadFactoryBuilder
@@ -613,7 +614,7 @@ private[spark] class Executor(
613614
private[this] val taskId: Long = taskRunner.taskId
614615

615616
private[this] val killPollingIntervalMs: Long =
616-
conf.getTimeAsSeconds("spark.task.reaper.pollingInterval", "10s") * 1000L
617+
conf.getTimeAsSeconds("spark.task.reaper.pollingInterval", "10s").seconds.toMillis
617618

618619
private[this] val killTimeoutMs: Long = conf.getTimeAsMs("spark.task.reaper.killTimeout", "-1")
619620

@@ -820,7 +821,7 @@ private[spark] class Executor(
820821
* Schedules a task to report heartbeat and partial metrics for active tasks to driver.
821822
*/
822823
private def startDriverHeartbeater(): Unit = {
823-
val intervalMs = conf.getTimeAsSeconds("spark.executor.heartbeatInterval", "10s") * 1000L
824+
val intervalMs = conf.getTimeAsSeconds("spark.executor.heartbeatInterval", "10s").seconds.toMillis
824825

825826
// Wait a random interval so the heartbeats don't end up in sync
826827
val initialDelay = intervalMs + (math.random * intervalMs).asInstanceOf[Int]

core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import java.util.concurrent.atomic.AtomicLong
2424

2525
import scala.collection.Set
2626
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
27+
import scala.concurrent.duration._
2728
import scala.util.Random
2829

2930
import org.apache.spark._
@@ -80,7 +81,7 @@ private[spark] class TaskSchedulerImpl(
8081
ThreadUtils.newDaemonSingleThreadScheduledExecutor("task-scheduler-speculation")
8182

8283
// Threshold above which we warn user initial TaskSet may be starved
83-
val STARVATION_TIMEOUT_MS = conf.getTimeAsSeconds("spark.starvation.timeout", "15s") * 1000L
84+
val STARVATION_TIMEOUT_MS = conf.getTimeAsSeconds("spark.starvation.timeout", "15s").seconds.toMillis
8485

8586
// CPUs to request per task
8687
val CPUS_PER_TASK = conf.getInt("spark.task.cpus", 1)

core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import java.util.concurrent.atomic.AtomicInteger
2222
import javax.annotation.concurrent.GuardedBy
2323

2424
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
25+
import scala.concurrent.duration._
2526
import scala.concurrent.Future
2627

2728
import org.apache.spark.{ExecutorAllocationClient, SparkEnv, SparkException, TaskState}
@@ -58,7 +59,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
5859
// Submit tasks after maxRegisteredWaitingTime milliseconds
5960
// if minRegisteredRatio has not yet been reached
6061
private val maxRegisteredWaitingTimeMs =
61-
conf.getTimeAsSeconds("spark.scheduler.maxRegisteredResourcesWaitingTime", "30s") * 1000L
62+
conf.getTimeAsSeconds("spark.scheduler.maxRegisteredResourcesWaitingTime", "30s").seconds.toMillis
6263
private val createTime = System.currentTimeMillis()
6364

6465
// Accessing `executorDataMap` in `DriverEndpoint.receive/receiveAndReply` doesn't need any
@@ -108,7 +109,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
108109

109110
override def onStart() {
110111
// Periodically revive offers to allow delay scheduling to work
111-
val reviveIntervalMs = conf.getTimeAsSeconds("spark.scheduler.revive.interval", "1s") * 1000L
112+
val reviveIntervalMs = conf.getTimeAsSeconds("spark.scheduler.revive.interval", "1s").seconds.toMillis
112113

113114
reviveThread.scheduleAtFixedRate(new Runnable {
114115
override def run(): Unit = Utils.tryLogNonFatalError {

core/src/main/scala/org/apache/spark/util/RpcUtils.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package org.apache.spark.util
1919

20+
import scala.concurrent.duration._
21+
2022
import org.apache.spark.SparkConf
2123
import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef, RpcEnv, RpcTimeout}
2224

@@ -39,7 +41,7 @@ private[spark] object RpcUtils {
3941

4042
/** Returns the configured number of milliseconds to wait on each retry */
4143
def retryWaitMs(conf: SparkConf): Long = {
42-
conf.getTimeAsSeconds("spark.rpc.retry.wait", "3s") * 1000L
44+
conf.getTimeAsSeconds("spark.rpc.retry.wait", "3s").seconds.toMillis
4345
}
4446

4547
/** Returns the default Spark timeout to use for RPC ask operations. */

core/src/test/scala/org/apache/spark/SparkConfSuite.scala

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -371,6 +371,29 @@ class SparkConfSuite extends SparkFunSuite with LocalSparkContext with ResetSyst
371371
assert(thrown.getMessage.contains(key))
372372
}
373373
}
374+
375+
test("SPARK-24560") {
376+
val conf = new SparkConf()
377+
conf.set("spark.python.task.killTimeout", "3")
378+
conf.set("spark.worker.driverTerminateTimeout", "12")
379+
conf.set("spark.task.reaper.pollingInterval", "12")
380+
conf.set("spark.executor.heartbeatInterval", "12")
381+
conf.set("spark.starvation.timeout", "16")
382+
conf.set("spark.scheduler.maxRegisteredResourcesWaitingTime", "32")
383+
conf.set("spark.scheduler.revive.interval", "2")
384+
conf.set("spark.rpc.retry.wait", "4")
385+
conf.set("spark.mesos.coarse.shutdownTimeout", "12")
386+
assert(conf.getTimeAsSeconds("spark.python.task.killTimeout").seconds.toMillis === 3000)
387+
assert(conf.getTimeAsSeconds("spark.worker.driverTerminateTimeout").seconds.toMillis === 12000)
388+
assert(conf.getTimeAsSeconds("spark.task.reaper.pollingInterval").seconds.toMillis === 12000)
389+
assert(conf.getTimeAsSeconds("spark.executor.heartbeatInterval").seconds.toMillis === 12000)
390+
assert(conf.getTimeAsSeconds("spark.starvation.timeout").seconds.toMillis === 16000)
391+
assert(conf.getTimeAsSeconds("spark.scheduler.maxRegisteredResourcesWaitingTime").seconds.toMillis === 32000)
392+
assert(conf.getTimeAsSeconds("spark.scheduler.revive.interval").seconds.toMillis === 2000)
393+
assert(conf.getTimeAsSeconds("spark.rpc.retry.wait").seconds.toMillis === 4000)
394+
assert(conf.getTimeAsSeconds("spark.mesos.coarse.shutdownTimeout").seconds.toMillis === 12000)
395+
}
396+
374397
}
375398

376399
class Class1 {}

resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import java.util.concurrent.locks.ReentrantLock
2424

2525
import scala.collection.JavaConverters._
2626
import scala.collection.mutable
27+
import scala.concurrent.duration._
2728
import scala.concurrent.Future
2829

2930
import org.apache.hadoop.security.UserGroupInformation
@@ -85,7 +86,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
8586
private val taskLabels = conf.get("spark.mesos.task.labels", "")
8687

8788
private[this] val shutdownTimeoutMS =
88-
(conf.getTimeAsSeconds("spark.mesos.coarse.shutdownTimeout", "10s") * 1000L)
89+
conf.getTimeAsSeconds("spark.mesos.coarse.shutdownTimeout", "10s").seconds.toMillis
8990
.ensuring(_ >= 0, "spark.mesos.coarse.shutdownTimeout must be >= 0")
9091

9192
// Synchronization protected by stateLock
@@ -634,7 +635,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
634635
slave.hostname,
635636
externalShufflePort,
636637
sc.conf.getTimeAsMs("spark.storage.blockManagerSlaveTimeoutMs",
637-
s"${sc.conf.getTimeAsSeconds("spark.network.timeout", "120s") * 1000L}ms"),
638+
s"${sc.conf.getTimeAsSeconds("spark.network.timeout", "120s").seconds.toMillis}ms"),
638639
sc.conf.getTimeAsSeconds("spark.executor.heartbeatInterval", "10s") * 1000L)
639640
slave.shuffleRegistered = true
640641
}

0 commit comments

Comments
 (0)