Skip to content

Commit b5e53df

Browse files
committed
FakeClock -> ManualClock; getTime() -> getTimeMillis()
1 parent 160863a commit b5e53df

File tree

32 files changed

+141
-141
lines changed

32 files changed

+141
-141
lines changed

core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -227,7 +227,7 @@ private[spark] class ExecutorAllocationManager(
227227
* This is factored out into its own method for testing.
228228
*/
229229
private def schedule(): Unit = synchronized {
230-
val now = clock.getTime()
230+
val now = clock.getTimeMillis()
231231

232232
addOrCancelExecutorRequests(now)
233233

@@ -410,7 +410,7 @@ private[spark] class ExecutorAllocationManager(
410410
if (addTime == NOT_SET) {
411411
logDebug(s"Starting timer to add executors because pending tasks " +
412412
s"are building up (to expire in $schedulerBacklogTimeout seconds)")
413-
addTime = clock.getTime() + schedulerBacklogTimeout * 1000
413+
addTime = clock.getTimeMillis() + schedulerBacklogTimeout * 1000
414414
}
415415
}
416416

@@ -434,7 +434,7 @@ private[spark] class ExecutorAllocationManager(
434434
if (!removeTimes.contains(executorId) && !executorsPendingToRemove.contains(executorId)) {
435435
logDebug(s"Starting idle timer for $executorId because there are no more tasks " +
436436
s"scheduled to run on the executor (to expire in $executorIdleTimeout seconds)")
437-
removeTimes(executorId) = clock.getTime() + executorIdleTimeout * 1000
437+
removeTimes(executorId) = clock.getTimeMillis() + executorIdleTimeout * 1000
438438
}
439439
} else {
440440
logWarning(s"Attempted to mark unknown executor $executorId idle")

core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -187,9 +187,9 @@ private[spark] class DriverRunner(
187187
initialize(process.get)
188188
}
189189

190-
val processStart = clock.getTime()
190+
val processStart = clock.getTimeMillis()
191191
val exitCode = process.get.waitFor()
192-
if (clock.getTime() - processStart > successfulRunDuration * 1000) {
192+
if (clock.getTimeMillis() - processStart > successfulRunDuration * 1000) {
193193
waitSeconds = 1
194194
}
195195

core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -650,7 +650,7 @@ class DAGScheduler(
650650
// completion events or stage abort
651651
stageIdToStage -= s.id
652652
jobIdToStageIds -= job.jobId
653-
listenerBus.post(SparkListenerJobEnd(job.jobId, clock.getTime(), jobResult))
653+
listenerBus.post(SparkListenerJobEnd(job.jobId, clock.getTimeMillis(), jobResult))
654654
}
655655
}
656656

@@ -699,7 +699,7 @@ class DAGScheduler(
699699
stage.latestInfo.stageFailed(stageFailedMessage)
700700
listenerBus.post(SparkListenerStageCompleted(stage.latestInfo))
701701
}
702-
listenerBus.post(SparkListenerJobEnd(job.jobId, clock.getTime(), JobFailed(error)))
702+
listenerBus.post(SparkListenerJobEnd(job.jobId, clock.getTimeMillis(), JobFailed(error)))
703703
}
704704
}
705705

@@ -738,7 +738,7 @@ class DAGScheduler(
738738
logInfo("Missing parents: " + getMissingParentStages(finalStage))
739739
val shouldRunLocally =
740740
localExecutionEnabled && allowLocal && finalStage.parents.isEmpty && partitions.length == 1
741-
val jobSubmissionTime = clock.getTime()
741+
val jobSubmissionTime = clock.getTimeMillis()
742742
if (shouldRunLocally) {
743743
// Compute very short actions like first() or take() with no parent stages locally.
744744
listenerBus.post(
@@ -864,7 +864,7 @@ class DAGScheduler(
864864
logDebug("New pending tasks: " + stage.pendingTasks)
865865
taskScheduler.submitTasks(
866866
new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.jobId, properties))
867-
stage.latestInfo.submissionTime = Some(clock.getTime())
867+
stage.latestInfo.submissionTime = Some(clock.getTimeMillis())
868868
} else {
869869
// Because we posted SparkListenerStageSubmitted earlier, we should post
870870
// SparkListenerStageCompleted here in case there are no tasks to run.
@@ -933,12 +933,12 @@ class DAGScheduler(
933933

934934
def markStageAsFinished(stage: Stage, errorMessage: Option[String] = None) = {
935935
val serviceTime = stage.latestInfo.submissionTime match {
936-
case Some(t) => "%.03f".format((clock.getTime() - t) / 1000.0)
936+
case Some(t) => "%.03f".format((clock.getTimeMillis() - t) / 1000.0)
937937
case _ => "Unknown"
938938
}
939939
if (errorMessage.isEmpty) {
940940
logInfo("%s (%s) finished in %s s".format(stage, stage.name, serviceTime))
941-
stage.latestInfo.completionTime = Some(clock.getTime())
941+
stage.latestInfo.completionTime = Some(clock.getTimeMillis())
942942
} else {
943943
stage.latestInfo.stageFailed(errorMessage.get)
944944
logInfo("%s (%s) failed in %s s".format(stage, stage.name, serviceTime))
@@ -964,7 +964,7 @@ class DAGScheduler(
964964
markStageAsFinished(stage)
965965
cleanupStateForJobAndIndependentStages(job)
966966
listenerBus.post(
967-
SparkListenerJobEnd(job.jobId, clock.getTime(), JobSucceeded))
967+
SparkListenerJobEnd(job.jobId, clock.getTimeMillis(), JobSucceeded))
968968
}
969969

970970
// taskSucceeded runs some user code that might throw an exception. Make sure
@@ -1180,7 +1180,7 @@ class DAGScheduler(
11801180
}
11811181
val dependentJobs: Seq[ActiveJob] =
11821182
activeJobs.filter(job => stageDependsOn(job.finalStage, failedStage)).toSeq
1183-
failedStage.latestInfo.completionTime = Some(clock.getTime())
1183+
failedStage.latestInfo.completionTime = Some(clock.getTimeMillis())
11841184
for (job <- dependentJobs) {
11851185
failJobAndIndependentStages(job, s"Job aborted due to stage failure: $reason")
11861186
}
@@ -1235,7 +1235,7 @@ class DAGScheduler(
12351235
if (ableToCancelStages) {
12361236
job.listener.jobFailed(error)
12371237
cleanupStateForJobAndIndependentStages(job)
1238-
listenerBus.post(SparkListenerJobEnd(job.jobId, clock.getTime(), JobFailed(error)))
1238+
listenerBus.post(SparkListenerJobEnd(job.jobId, clock.getTimeMillis(), JobFailed(error)))
12391239
}
12401240
}
12411241

core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,7 @@ private[spark] class TaskSetManager(
166166
// last launched a task at that level, and move up a level when localityWaits[curLevel] expires.
167167
// We then move down if we manage to launch a "more local" task.
168168
var currentLocalityIndex = 0 // Index of our current locality level in validLocalityLevels
169-
var lastLaunchTime = clock.getTime() // Time we last launched a task at this level
169+
var lastLaunchTime = clock.getTimeMillis() // Time we last launched a task at this level
170170

171171
override def schedulableQueue = null
172172

@@ -281,7 +281,7 @@ private[spark] class TaskSetManager(
281281
val failed = failedExecutors.get(taskId).get
282282

283283
return failed.contains(execId) &&
284-
clock.getTime() - failed.get(execId).get < EXECUTOR_TASK_BLACKLIST_TIMEOUT
284+
clock.getTimeMillis() - failed.get(execId).get < EXECUTOR_TASK_BLACKLIST_TIMEOUT
285285
}
286286

287287
false
@@ -428,7 +428,7 @@ private[spark] class TaskSetManager(
428428
: Option[TaskDescription] =
429429
{
430430
if (!isZombie) {
431-
val curTime = clock.getTime()
431+
val curTime = clock.getTimeMillis()
432432

433433
var allowedLocality = maxLocality
434434

@@ -459,7 +459,7 @@ private[spark] class TaskSetManager(
459459
lastLaunchTime = curTime
460460
}
461461
// Serialize and return the task
462-
val startTime = clock.getTime()
462+
val startTime = clock.getTimeMillis()
463463
val serializedTask: ByteBuffer = try {
464464
Task.serializeWithDependencies(task, sched.sc.addedFiles, sched.sc.addedJars, ser)
465465
} catch {
@@ -674,7 +674,7 @@ private[spark] class TaskSetManager(
674674
return
675675
}
676676
val key = ef.description
677-
val now = clock.getTime()
677+
val now = clock.getTimeMillis()
678678
val (printFull, dupCount) = {
679679
if (recentExceptions.contains(key)) {
680680
val (dupCount, printTime) = recentExceptions(key)
@@ -706,7 +706,7 @@ private[spark] class TaskSetManager(
706706
}
707707
// always add to failed executors
708708
failedExecutors.getOrElseUpdate(index, new HashMap[String, Long]()).
709-
put(info.executorId, clock.getTime())
709+
put(info.executorId, clock.getTimeMillis())
710710
sched.dagScheduler.taskEnded(tasks(index), reason, null, null, info, taskMetrics)
711711
addPendingTask(index)
712712
if (!isZombie && state != TaskState.KILLED && !reason.isInstanceOf[TaskCommitDenied]) {
@@ -821,7 +821,7 @@ private[spark] class TaskSetManager(
821821
val minFinishedForSpeculation = (SPECULATION_QUANTILE * numTasks).floor.toInt
822822
logDebug("Checking for speculative tasks: minFinished = " + minFinishedForSpeculation)
823823
if (tasksSuccessful >= minFinishedForSpeculation && tasksSuccessful > 0) {
824-
val time = clock.getTime()
824+
val time = clock.getTimeMillis()
825825
val durations = taskInfos.values.filter(_.successful).map(_.duration).toArray
826826
Arrays.sort(durations)
827827
val medianDuration = durations(min((0.5 * tasksSuccessful).round.toInt, durations.size - 1))

core/src/main/scala/org/apache/spark/util/Clock.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,15 +21,15 @@ package org.apache.spark.util
2121
* An interface to represent clocks, so that they can be mocked out in unit tests.
2222
*/
2323
private[spark] trait Clock {
24-
def getTime(): Long
24+
def getTimeMillis(): Long
2525
def waitTillTime(targetTime: Long): Long
2626
}
2727

2828
private[spark] class SystemClock extends Clock {
2929

3030
val minPollTime = 25L
3131

32-
def getTime(): Long = System.currentTimeMillis()
32+
def getTimeMillis(): Long = System.currentTimeMillis()
3333

3434
def waitTillTime(targetTime: Long): Long = {
3535
var currentTime = 0L

core/src/main/scala/org/apache/spark/util/FakeClock.scala renamed to core/src/main/scala/org/apache/spark/util/ManualClock.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,11 @@
1717

1818
package org.apache.spark.util
1919

20-
class FakeClock(private var time: Long) extends Clock {
20+
class ManualClock(private var time: Long) extends Clock {
2121

2222
def this() = this(0L)
2323

24-
def getTime(): Long =
24+
def getTimeMillis(): Long =
2525
synchronized {
2626
time
2727
}
@@ -43,7 +43,7 @@ class FakeClock(private var time: Long) extends Clock {
4343
while (time < targetTime) {
4444
wait(100)
4545
}
46-
getTime()
46+
getTimeMillis()
4747
}
4848

4949
}

core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import org.scalatest.{FunSuite, PrivateMethodTester}
2323
import org.apache.spark.executor.TaskMetrics
2424
import org.apache.spark.scheduler._
2525
import org.apache.spark.scheduler.cluster.ExecutorInfo
26-
import org.apache.spark.util.FakeClock
26+
import org.apache.spark.util.ManualClock
2727

2828
/**
2929
* Test add and remove behavior of ExecutorAllocationManager.
@@ -322,15 +322,15 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext {
322322

323323
test("starting/canceling add timer") {
324324
sc = createSparkContext(2, 10)
325-
val clock = new FakeClock(8888L)
325+
val clock = new ManualClock(8888L)
326326
val manager = sc.executorAllocationManager.get
327327
manager.setClock(clock)
328328

329329
// Starting add timer is idempotent
330330
assert(addTime(manager) === NOT_SET)
331331
onSchedulerBacklogged(manager)
332332
val firstAddTime = addTime(manager)
333-
assert(firstAddTime === clock.getTime() + schedulerBacklogTimeout * 1000)
333+
assert(firstAddTime === clock.getTimeMillis() + schedulerBacklogTimeout * 1000)
334334
clock.advance(100L)
335335
onSchedulerBacklogged(manager)
336336
assert(addTime(manager) === firstAddTime) // timer is already started
@@ -344,7 +344,7 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext {
344344
assert(addTime(manager) === NOT_SET)
345345
onSchedulerBacklogged(manager)
346346
val secondAddTime = addTime(manager)
347-
assert(secondAddTime === clock.getTime() + schedulerBacklogTimeout * 1000)
347+
assert(secondAddTime === clock.getTimeMillis() + schedulerBacklogTimeout * 1000)
348348
clock.advance(100L)
349349
onSchedulerBacklogged(manager)
350350
assert(addTime(manager) === secondAddTime) // timer is already started
@@ -354,7 +354,7 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext {
354354

355355
test("starting/canceling remove timers") {
356356
sc = createSparkContext(2, 10)
357-
val clock = new FakeClock(14444L)
357+
val clock = new ManualClock(14444L)
358358
val manager = sc.executorAllocationManager.get
359359
manager.setClock(clock)
360360

@@ -366,7 +366,7 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext {
366366
assert(removeTimes(manager).size === 1)
367367
assert(removeTimes(manager).contains("1"))
368368
val firstRemoveTime = removeTimes(manager)("1")
369-
assert(firstRemoveTime === clock.getTime() + executorIdleTimeout * 1000)
369+
assert(firstRemoveTime === clock.getTimeMillis() + executorIdleTimeout * 1000)
370370
clock.advance(100L)
371371
onExecutorIdle(manager, "1")
372372
assert(removeTimes(manager)("1") === firstRemoveTime) // timer is already started
@@ -376,11 +376,11 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext {
376376
clock.advance(300L)
377377
onExecutorIdle(manager, "2")
378378
assert(removeTimes(manager)("2") !== firstRemoveTime) // different executor
379-
assert(removeTimes(manager)("2") === clock.getTime() + executorIdleTimeout * 1000)
379+
assert(removeTimes(manager)("2") === clock.getTimeMillis() + executorIdleTimeout * 1000)
380380
clock.advance(400L)
381381
onExecutorIdle(manager, "3")
382382
assert(removeTimes(manager)("3") !== firstRemoveTime)
383-
assert(removeTimes(manager)("3") === clock.getTime() + executorIdleTimeout * 1000)
383+
assert(removeTimes(manager)("3") === clock.getTimeMillis() + executorIdleTimeout * 1000)
384384
assert(removeTimes(manager).size === 3)
385385
assert(removeTimes(manager).contains("2"))
386386
assert(removeTimes(manager).contains("3"))
@@ -393,7 +393,7 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext {
393393
assert(removeTimes(manager).size === 3)
394394
assert(removeTimes(manager).contains("1"))
395395
val secondRemoveTime = removeTimes(manager)("1")
396-
assert(secondRemoveTime === clock.getTime() + executorIdleTimeout * 1000)
396+
assert(secondRemoveTime === clock.getTimeMillis() + executorIdleTimeout * 1000)
397397
assert(removeTimes(manager)("1") === secondRemoveTime) // timer is already started
398398
assert(removeTimes(manager)("1") !== firstRemoveTime)
399399
assert(firstRemoveTime !== secondRemoveTime)
@@ -402,7 +402,7 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext {
402402
test("mock polling loop with no events") {
403403
sc = createSparkContext(1, 20)
404404
val manager = sc.executorAllocationManager.get
405-
val clock = new FakeClock(2020L)
405+
val clock = new ManualClock(2020L)
406406
manager.setClock(clock)
407407

408408
// No events - we should not be adding or removing
@@ -427,7 +427,7 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext {
427427

428428
test("mock polling loop add behavior") {
429429
sc = createSparkContext(1, 20)
430-
val clock = new FakeClock(2020L)
430+
val clock = new ManualClock(2020L)
431431
val manager = sc.executorAllocationManager.get
432432
manager.setClock(clock)
433433
sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 1000)))
@@ -477,7 +477,7 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext {
477477

478478
test("mock polling loop remove behavior") {
479479
sc = createSparkContext(1, 20)
480-
val clock = new FakeClock(2020L)
480+
val clock = new ManualClock(2020L)
481481
val manager = sc.executorAllocationManager.get
482482
manager.setClock(clock)
483483

core/src/test/scala/org/apache/spark/deploy/worker/DriverRunnerTest.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ class DriverRunnerTest extends FunSuite {
130130
.thenReturn(-1) // fail 3
131131
.thenReturn(-1) // fail 4
132132
.thenReturn(0) // success
133-
when(clock.getTime())
133+
when(clock.getTimeMillis())
134134
.thenReturn(0).thenReturn(1000) // fail 1 (short)
135135
.thenReturn(1000).thenReturn(2000) // fail 2 (short)
136136
.thenReturn(2000).thenReturn(10000) // fail 3 (long)

0 commit comments

Comments
 (0)