Skip to content

Commit 857f109

Browse files
Marcelo Vanzindongjoon-hyun
authored andcommitted
[SPARK-10614][CORE] Add monotonic time to Clock interface
This change adds a new method to the Clock interface that returns the time from a monotonic time source, so that code that needs that feature can also mock the Clock in tests. The original getTimeMillis and waitTillTime methods are unchanged, since streaming code that uses the Clock interface seems to rely on wall clock semantics, not monotonic clock. So, in a way, this doesn't directly address the problem raised in the bug, that waitTillTime can be affected by drift, but then the places being modified to use the new API don't really rely on that API. The dynamic allocation code was modified to use the new time source, since they shouldn't be basing their decisions on wall clock time. For a longer discussion on how monotonic clocks work on Linux/x64, the following blog post (and links within) shed a lot of light on the safety of `System.nanoTime()`: http://btorpey.github.io/blog/2014/02/18/clock-sources-in-linux/ Tested with unit test and also running apps with dynamic allocation on. Closes #26058 from vanzin/SPARK-10614. Authored-by: Marcelo Vanzin <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent 9407fba commit 857f109

File tree

6 files changed

+79
-36
lines changed

6 files changed

+79
-36
lines changed

core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -288,7 +288,7 @@ private[spark] class ExecutorAllocationManager(
288288
}
289289

290290
// Update executor target number only after initializing flag is unset
291-
updateAndSyncNumExecutorsTarget(clock.getTimeMillis())
291+
updateAndSyncNumExecutorsTarget(clock.nanoTime())
292292
if (executorIdsToBeRemoved.nonEmpty) {
293293
removeExecutors(executorIdsToBeRemoved)
294294
}
@@ -336,7 +336,7 @@ private[spark] class ExecutorAllocationManager(
336336
val delta = addExecutors(maxNeeded)
337337
logDebug(s"Starting timer to add more executors (to " +
338338
s"expire in $sustainedSchedulerBacklogTimeoutS seconds)")
339-
addTime = now + (sustainedSchedulerBacklogTimeoutS * 1000)
339+
addTime = now + TimeUnit.SECONDS.toNanos(sustainedSchedulerBacklogTimeoutS)
340340
delta
341341
} else {
342342
0
@@ -481,7 +481,7 @@ private[spark] class ExecutorAllocationManager(
481481
if (addTime == NOT_SET) {
482482
logDebug(s"Starting timer to add executors because pending tasks " +
483483
s"are building up (to expire in $schedulerBacklogTimeoutS seconds)")
484-
addTime = clock.getTimeMillis + schedulerBacklogTimeoutS * 1000
484+
addTime = clock.nanoTime() + TimeUnit.SECONDS.toNanos(schedulerBacklogTimeoutS)
485485
}
486486
}
487487

core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -39,11 +39,12 @@ private[spark] class ExecutorMonitor(
3939
listenerBus: LiveListenerBus,
4040
clock: Clock) extends SparkListener with CleanerListener with Logging {
4141

42-
private val idleTimeoutMs = TimeUnit.SECONDS.toMillis(
42+
private val idleTimeoutNs = TimeUnit.SECONDS.toNanos(
4343
conf.get(DYN_ALLOCATION_EXECUTOR_IDLE_TIMEOUT))
44-
private val storageTimeoutMs = TimeUnit.SECONDS.toMillis(
44+
private val storageTimeoutNs = TimeUnit.SECONDS.toNanos(
4545
conf.get(DYN_ALLOCATION_CACHED_EXECUTOR_IDLE_TIMEOUT))
46-
private val shuffleTimeoutMs = conf.get(DYN_ALLOCATION_SHUFFLE_TIMEOUT)
46+
private val shuffleTimeoutNs = TimeUnit.MILLISECONDS.toNanos(
47+
conf.get(DYN_ALLOCATION_SHUFFLE_TIMEOUT))
4748

4849
private val fetchFromShuffleSvcEnabled = conf.get(SHUFFLE_SERVICE_ENABLED) &&
4950
conf.get(SHUFFLE_SERVICE_FETCH_RDD_ENABLED)
@@ -100,7 +101,7 @@ private[spark] class ExecutorMonitor(
100101
* Should only be called from the EAM thread.
101102
*/
102103
def timedOutExecutors(): Seq[String] = {
103-
val now = clock.getTimeMillis()
104+
val now = clock.nanoTime()
104105
if (now >= nextTimeout.get()) {
105106
// Temporarily set the next timeout at Long.MaxValue. This ensures that after
106107
// scanning all executors below, we know when the next timeout for non-timed out
@@ -437,23 +438,23 @@ private[spark] class ExecutorMonitor(
437438

438439
def updateRunningTasks(delta: Int): Unit = {
439440
runningTasks = math.max(0, runningTasks + delta)
440-
idleStart = if (runningTasks == 0) clock.getTimeMillis() else -1L
441+
idleStart = if (runningTasks == 0) clock.nanoTime() else -1L
441442
updateTimeout()
442443
}
443444

444445
def updateTimeout(): Unit = {
445446
val oldDeadline = timeoutAt
446447
val newDeadline = if (idleStart >= 0) {
447448
val timeout = if (cachedBlocks.nonEmpty || (shuffleIds != null && shuffleIds.nonEmpty)) {
448-
val _cacheTimeout = if (cachedBlocks.nonEmpty) storageTimeoutMs else Long.MaxValue
449+
val _cacheTimeout = if (cachedBlocks.nonEmpty) storageTimeoutNs else Long.MaxValue
449450
val _shuffleTimeout = if (shuffleIds != null && shuffleIds.nonEmpty) {
450-
shuffleTimeoutMs
451+
shuffleTimeoutNs
451452
} else {
452453
Long.MaxValue
453454
}
454455
math.min(_cacheTimeout, _shuffleTimeout)
455456
} else {
456-
idleTimeoutMs
457+
idleTimeoutNs
457458
}
458459
val deadline = idleStart + timeout
459460
if (deadline >= 0) deadline else Long.MaxValue

core/src/main/scala/org/apache/spark/util/Clock.scala

Lines changed: 38 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,37 @@ package org.apache.spark.util
2121
* An interface to represent clocks, so that they can be mocked out in unit tests.
2222
*/
2323
private[spark] trait Clock {
24+
/** @return Current system time, in ms. */
2425
def getTimeMillis(): Long
26+
27+
// scalastyle:off line.size.limit
28+
/**
29+
* Current value of high resolution time source, in ns.
30+
*
31+
* This method abstracts the call to the JRE's `System.nanoTime()` call. As with that method, the
32+
* value here is not guaranteed to be monotonically increasing, but rather a higher resolution
33+
* time source for use in the calculation of time intervals. The characteristics of the values
34+
* returned may very from JVM to JVM (or even the same JVM running on different OSes or CPUs), but
35+
* in general it should be preferred over [[getTimeMillis()]] when calculating time differences.
36+
*
37+
* Specifically for Linux on x64 architecture, the following links provide useful information
38+
* about the characteristics of the value returned:
39+
*
40+
* http://btorpey.github.io/blog/2014/02/18/clock-sources-in-linux/
41+
* https://stackoverflow.com/questions/10921210/cpu-tsc-fetch-operation-especially-in-multicore-multi-processor-environment
42+
*
43+
* TL;DR: on modern (2.6.32+) Linux kernels with modern (AMD K8+) CPUs, the values returned by
44+
* `System.nanoTime()` are consistent across CPU cores *and* packages, and provide always
45+
* increasing values (although it may not be completely monotonic when the the system clock is
46+
* adjusted by NTP daemons using time slew).
47+
*/
48+
// scalastyle:on line.size.limit
49+
def nanoTime(): Long
50+
51+
/**
52+
* Wait until the wall clock reaches at least the given time. Note this may not actually wait for
53+
* the actual difference between the current and target times, since the wall clock may drift.
54+
*/
2555
def waitTillTime(targetTime: Long): Long
2656
}
2757

@@ -36,15 +66,19 @@ private[spark] class SystemClock extends Clock {
3666
* @return the same time (milliseconds since the epoch)
3767
* as is reported by `System.currentTimeMillis()`
3868
*/
39-
def getTimeMillis(): Long = System.currentTimeMillis()
69+
override def getTimeMillis(): Long = System.currentTimeMillis()
70+
71+
/**
72+
* @return value reported by `System.nanoTime()`.
73+
*/
74+
override def nanoTime(): Long = System.nanoTime()
4075

4176
/**
4277
* @param targetTime block until the current time is at least this value
4378
* @return current system time when wait has completed
4479
*/
45-
def waitTillTime(targetTime: Long): Long = {
46-
var currentTime = 0L
47-
currentTime = System.currentTimeMillis()
80+
override def waitTillTime(targetTime: Long): Long = {
81+
var currentTime = System.currentTimeMillis()
4882

4983
var waitTime = targetTime - currentTime
5084
if (waitTime <= 0) {

core/src/main/scala/org/apache/spark/util/ManualClock.scala

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,16 @@
1717

1818
package org.apache.spark.util
1919

20+
import java.util.concurrent.TimeUnit
21+
2022
/**
2123
* A `Clock` whose time can be manually set and modified. Its reported time does not change
2224
* as time elapses, but only as its time is modified by callers. This is mainly useful for
2325
* testing.
2426
*
27+
* For this implementation, `getTimeMillis()` and `nanoTime()` always return the same value
28+
* (adjusted for the correct unit).
29+
*
2530
* @param time initial time (in milliseconds since the epoch)
2631
*/
2732
private[spark] class ManualClock(private var time: Long) extends Clock {
@@ -31,10 +36,11 @@ private[spark] class ManualClock(private var time: Long) extends Clock {
3136
*/
3237
def this() = this(0L)
3338

34-
def getTimeMillis(): Long =
35-
synchronized {
36-
time
37-
}
39+
override def getTimeMillis(): Long = synchronized {
40+
time
41+
}
42+
43+
override def nanoTime(): Long = TimeUnit.MILLISECONDS.toNanos(getTimeMillis())
3844

3945
/**
4046
* @param timeToSet new time (in milliseconds) that the clock should represent
@@ -56,7 +62,7 @@ private[spark] class ManualClock(private var time: Long) extends Clock {
5662
* @param targetTime block until the clock time is set or advanced to at least this time
5763
* @return current time reported by the clock when waiting finishes
5864
*/
59-
def waitTillTime(targetTime: Long): Long = synchronized {
65+
override def waitTillTime(targetTime: Long): Long = synchronized {
6066
while (time < targetTime) {
6167
wait(10)
6268
}

core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package org.apache.spark
1919

20+
import java.util.concurrent.TimeUnit
21+
2022
import scala.collection.mutable
2123

2224
import org.mockito.ArgumentMatchers.{any, eq => meq}
@@ -541,7 +543,7 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite {
541543
assert(addTime(manager) === NOT_SET)
542544
onSchedulerBacklogged(manager)
543545
val firstAddTime = addTime(manager)
544-
assert(firstAddTime === clock.getTimeMillis + schedulerBacklogTimeout * 1000)
546+
assert(firstAddTime === clock.nanoTime() + TimeUnit.SECONDS.toNanos(schedulerBacklogTimeout))
545547
clock.advance(100L)
546548
onSchedulerBacklogged(manager)
547549
assert(addTime(manager) === firstAddTime) // timer is already started
@@ -555,7 +557,7 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite {
555557
assert(addTime(manager) === NOT_SET)
556558
onSchedulerBacklogged(manager)
557559
val secondAddTime = addTime(manager)
558-
assert(secondAddTime === clock.getTimeMillis + schedulerBacklogTimeout * 1000)
560+
assert(secondAddTime === clock.nanoTime() + TimeUnit.SECONDS.toNanos(schedulerBacklogTimeout))
559561
clock.advance(100L)
560562
onSchedulerBacklogged(manager)
561563
assert(addTime(manager) === secondAddTime) // timer is already started
@@ -936,7 +938,7 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite {
936938
clock.getTimeMillis(), "executor-1", new ExecutorInfo("host1", 1, Map.empty, Map.empty)))
937939
post(SparkListenerStageSubmitted(createStageInfo(0, 2)))
938940
clock.advance(1000)
939-
manager invokePrivate _updateAndSyncNumExecutorsTarget(clock.getTimeMillis())
941+
manager invokePrivate _updateAndSyncNumExecutorsTarget(clock.nanoTime())
940942
assert(numExecutorsTarget(manager) === 2)
941943
val taskInfo0 = createTaskInfo(0, 0, "executor-1")
942944
post(SparkListenerTaskStart(0, 0, taskInfo0))
@@ -952,7 +954,7 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite {
952954
assert(maxNumExecutorsNeeded(manager) === 1)
953955
assert(numExecutorsTarget(manager) === 2)
954956
clock.advance(1000)
955-
manager invokePrivate _updateAndSyncNumExecutorsTarget(clock.getTimeMillis())
957+
manager invokePrivate _updateAndSyncNumExecutorsTarget(clock.nanoTime())
956958
assert(numExecutorsTarget(manager) === 1)
957959
verify(client, never).killExecutors(any(), any(), any(), any())
958960

core/src/test/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitorSuite.scala

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -33,9 +33,9 @@ import org.apache.spark.util.ManualClock
3333

3434
class ExecutorMonitorSuite extends SparkFunSuite {
3535

36-
private val idleTimeoutMs = TimeUnit.SECONDS.toMillis(60L)
37-
private val storageTimeoutMs = TimeUnit.SECONDS.toMillis(120L)
38-
private val shuffleTimeoutMs = TimeUnit.SECONDS.toMillis(240L)
36+
private val idleTimeoutNs = TimeUnit.SECONDS.toNanos(60L)
37+
private val storageTimeoutNs = TimeUnit.SECONDS.toNanos(120L)
38+
private val shuffleTimeoutNs = TimeUnit.SECONDS.toNanos(240L)
3939

4040
private val conf = new SparkConf()
4141
.set(DYN_ALLOCATION_EXECUTOR_IDLE_TIMEOUT.key, "60s")
@@ -111,8 +111,8 @@ class ExecutorMonitorSuite extends SparkFunSuite {
111111
monitor.onTaskEnd(SparkListenerTaskEnd(1, 1, "foo", Success, taskInfo("1", 1),
112112
new ExecutorMetrics, null))
113113
assert(monitor.isExecutorIdle("1"))
114-
assert(monitor.timedOutExecutors(clock.getTimeMillis()).isEmpty)
115-
assert(monitor.timedOutExecutors(clock.getTimeMillis() + idleTimeoutMs + 1) === Seq("1"))
114+
assert(monitor.timedOutExecutors(clock.nanoTime()).isEmpty)
115+
assert(monitor.timedOutExecutors(clock.nanoTime() + idleTimeoutNs + 1) === Seq("1"))
116116
}
117117

118118
test("use appropriate time out depending on whether blocks are stored") {
@@ -166,7 +166,7 @@ class ExecutorMonitorSuite extends SparkFunSuite {
166166
// originally went idle.
167167
clock.setTime(idleDeadline)
168168
monitor.onUnpersistRDD(SparkListenerUnpersistRDD(2))
169-
assert(monitor.timedOutExecutors(clock.getTimeMillis()) === Seq("1"))
169+
assert(monitor.timedOutExecutors(clock.nanoTime()) === Seq("1"))
170170
}
171171

172172
test("handle timeouts correctly with multiple executors") {
@@ -186,7 +186,7 @@ class ExecutorMonitorSuite extends SparkFunSuite {
186186
// start exec 3 at 60s (should idle timeout at 120s, exec 1 should time out)
187187
clock.setTime(TimeUnit.SECONDS.toMillis(60))
188188
monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "3", null))
189-
assert(monitor.timedOutExecutors(clock.getTimeMillis()) === Seq("1"))
189+
assert(monitor.timedOutExecutors(clock.nanoTime()) === Seq("1"))
190190

191191
// store block on exec 3 (should now idle time out at 180s)
192192
monitor.onBlockUpdated(rddUpdate(1, 0, "3"))
@@ -196,11 +196,11 @@ class ExecutorMonitorSuite extends SparkFunSuite {
196196
// advance to 140s, remove block from exec 3 (time out immediately)
197197
clock.setTime(TimeUnit.SECONDS.toMillis(140))
198198
monitor.onBlockUpdated(rddUpdate(1, 0, "3", level = StorageLevel.NONE))
199-
assert(monitor.timedOutExecutors(clock.getTimeMillis()).toSet === Set("1", "3"))
199+
assert(monitor.timedOutExecutors(clock.nanoTime()).toSet === Set("1", "3"))
200200

201201
// advance to 150s, now exec 2 should time out
202202
clock.setTime(TimeUnit.SECONDS.toMillis(150))
203-
assert(monitor.timedOutExecutors(clock.getTimeMillis()).toSet === Set("1", "2", "3"))
203+
assert(monitor.timedOutExecutors(clock.nanoTime()).toSet === Set("1", "2", "3"))
204204
}
205205

206206
test("SPARK-27677: don't track blocks stored on disk when using shuffle service") {
@@ -410,9 +410,9 @@ class ExecutorMonitorSuite extends SparkFunSuite {
410410
assert(monitor.timedOutExecutors(idleDeadline).isEmpty)
411411
}
412412

413-
private def idleDeadline: Long = clock.getTimeMillis() + idleTimeoutMs + 1
414-
private def storageDeadline: Long = clock.getTimeMillis() + storageTimeoutMs + 1
415-
private def shuffleDeadline: Long = clock.getTimeMillis() + shuffleTimeoutMs + 1
413+
private def idleDeadline: Long = clock.nanoTime() + idleTimeoutNs + 1
414+
private def storageDeadline: Long = clock.nanoTime() + storageTimeoutNs + 1
415+
private def shuffleDeadline: Long = clock.nanoTime() + shuffleTimeoutNs + 1
416416

417417
private def stageInfo(id: Int, shuffleId: Int = -1): StageInfo = {
418418
new StageInfo(id, 0, s"stage$id", 1, Nil, Nil, "",

0 commit comments

Comments
 (0)