Skip to content

Commit 34b7c35

Browse files
srowenAndrew Or
authored andcommitted
SPARK-4682 [CORE] Consolidate various 'Clock' classes
Another one from JoshRosen 's wish list. The first commit is much smaller and removes 2 of the 4 Clock classes. The second is much larger, necessary for consolidating the streaming one. I put together implementations in the way that seemed simplest. Almost all the change is standardizing class and method names. Author: Sean Owen <[email protected]> Closes apache#4514 from srowen/SPARK-4682 and squashes the following commits: 5ed3a03 [Sean Owen] Javadoc Clock classes; make ManualClock private[spark] 169dd13 [Sean Owen] Add support for legacy org.apache.spark.streaming clock class names 277785a [Sean Owen] Reduce the net change in this patch by reversing some unnecessary syntax changes along the way b5e53df [Sean Owen] FakeClock -> ManualClock; getTime() -> getTimeMillis() 160863a [Sean Owen] Consolidate Streaming Clock class into common util Clock 7c956b2 [Sean Owen] Consolidate Clocks except for Streaming Clock
1 parent ad6b169 commit 34b7c35

File tree

37 files changed

+301
-337
lines changed

37 files changed

+301
-337
lines changed

core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala

Lines changed: 2 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ package org.apache.spark
2020
import scala.collection.mutable
2121

2222
import org.apache.spark.scheduler._
23+
import org.apache.spark.util.{SystemClock, Clock}
2324

2425
/**
2526
* An agent that dynamically allocates and removes executors based on the workload.
@@ -123,7 +124,7 @@ private[spark] class ExecutorAllocationManager(
123124
private val intervalMillis: Long = 100
124125

125126
// Clock used to schedule when executors should be added and removed
126-
private var clock: Clock = new RealClock
127+
private var clock: Clock = new SystemClock()
127128

128129
// Listener for Spark events that impact the allocation policy
129130
private val listener = new ExecutorAllocationListener
@@ -588,28 +589,3 @@ private[spark] class ExecutorAllocationManager(
588589
private object ExecutorAllocationManager {
589590
val NOT_SET = Long.MaxValue
590591
}
591-
592-
/**
593-
* An abstract clock for measuring elapsed time.
594-
*/
595-
private trait Clock {
596-
def getTimeMillis: Long
597-
}
598-
599-
/**
600-
* A clock backed by a monotonically increasing time source.
601-
* The time returned by this clock does not correspond to any notion of wall-clock time.
602-
*/
603-
private class RealClock extends Clock {
604-
override def getTimeMillis: Long = System.nanoTime / (1000 * 1000)
605-
}
606-
607-
/**
608-
* A clock that allows the caller to customize the time.
609-
* This is used mainly for testing.
610-
*/
611-
private class TestClock(startTimeMillis: Long) extends Clock {
612-
private var time: Long = startTimeMillis
613-
override def getTimeMillis: Long = time
614-
def tick(ms: Long): Unit = { time += ms }
615-
}

core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala

Lines changed: 5 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -20,19 +20,18 @@ package org.apache.spark.deploy.worker
2020
import java.io._
2121

2222
import scala.collection.JavaConversions._
23-
import scala.collection.Map
2423

2524
import akka.actor.ActorRef
2625
import com.google.common.base.Charsets.UTF_8
2726
import com.google.common.io.Files
28-
import org.apache.hadoop.conf.Configuration
2927
import org.apache.hadoop.fs.{FileUtil, Path}
3028

3129
import org.apache.spark.{Logging, SparkConf}
32-
import org.apache.spark.deploy.{Command, DriverDescription, SparkHadoopUtil}
30+
import org.apache.spark.deploy.{DriverDescription, SparkHadoopUtil}
3331
import org.apache.spark.deploy.DeployMessages.DriverStateChanged
3432
import org.apache.spark.deploy.master.DriverState
3533
import org.apache.spark.deploy.master.DriverState.DriverState
34+
import org.apache.spark.util.{Clock, SystemClock}
3635

3736
/**
3837
* Manages the execution of one driver, including automatically restarting the driver on failure.
@@ -59,9 +58,7 @@ private[spark] class DriverRunner(
5958
// Decoupled for testing
6059
private[deploy] def setClock(_clock: Clock) = clock = _clock
6160
private[deploy] def setSleeper(_sleeper: Sleeper) = sleeper = _sleeper
62-
private var clock = new Clock {
63-
def currentTimeMillis(): Long = System.currentTimeMillis()
64-
}
61+
private var clock: Clock = new SystemClock()
6562
private var sleeper = new Sleeper {
6663
def sleep(seconds: Int): Unit = (0 until seconds).takeWhile(f => {Thread.sleep(1000); !killed})
6764
}
@@ -190,9 +187,9 @@ private[spark] class DriverRunner(
190187
initialize(process.get)
191188
}
192189

193-
val processStart = clock.currentTimeMillis()
190+
val processStart = clock.getTimeMillis()
194191
val exitCode = process.get.waitFor()
195-
if (clock.currentTimeMillis() - processStart > successfulRunDuration * 1000) {
192+
if (clock.getTimeMillis() - processStart > successfulRunDuration * 1000) {
196193
waitSeconds = 1
197194
}
198195

@@ -208,10 +205,6 @@ private[spark] class DriverRunner(
208205
}
209206
}
210207

211-
private[deploy] trait Clock {
212-
def currentTimeMillis(): Long
213-
}
214-
215208
private[deploy] trait Sleeper {
216209
def sleep(seconds: Int)
217210
}

core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ class DAGScheduler(
6363
mapOutputTracker: MapOutputTrackerMaster,
6464
blockManagerMaster: BlockManagerMaster,
6565
env: SparkEnv,
66-
clock: org.apache.spark.util.Clock = SystemClock)
66+
clock: Clock = new SystemClock())
6767
extends Logging {
6868

6969
def this(sc: SparkContext, taskScheduler: TaskScheduler) = {
@@ -657,7 +657,7 @@ class DAGScheduler(
657657
// completion events or stage abort
658658
stageIdToStage -= s.id
659659
jobIdToStageIds -= job.jobId
660-
listenerBus.post(SparkListenerJobEnd(job.jobId, clock.getTime(), jobResult))
660+
listenerBus.post(SparkListenerJobEnd(job.jobId, clock.getTimeMillis(), jobResult))
661661
}
662662
}
663663

@@ -706,7 +706,7 @@ class DAGScheduler(
706706
stage.latestInfo.stageFailed(stageFailedMessage)
707707
listenerBus.post(SparkListenerStageCompleted(stage.latestInfo))
708708
}
709-
listenerBus.post(SparkListenerJobEnd(job.jobId, clock.getTime(), JobFailed(error)))
709+
listenerBus.post(SparkListenerJobEnd(job.jobId, clock.getTimeMillis(), JobFailed(error)))
710710
}
711711
}
712712

@@ -745,7 +745,7 @@ class DAGScheduler(
745745
logInfo("Missing parents: " + getMissingParentStages(finalStage))
746746
val shouldRunLocally =
747747
localExecutionEnabled && allowLocal && finalStage.parents.isEmpty && partitions.length == 1
748-
val jobSubmissionTime = clock.getTime()
748+
val jobSubmissionTime = clock.getTimeMillis()
749749
if (shouldRunLocally) {
750750
// Compute very short actions like first() or take() with no parent stages locally.
751751
listenerBus.post(
@@ -871,7 +871,7 @@ class DAGScheduler(
871871
logDebug("New pending tasks: " + stage.pendingTasks)
872872
taskScheduler.submitTasks(
873873
new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.jobId, properties))
874-
stage.latestInfo.submissionTime = Some(clock.getTime())
874+
stage.latestInfo.submissionTime = Some(clock.getTimeMillis())
875875
} else {
876876
// Because we posted SparkListenerStageSubmitted earlier, we should post
877877
// SparkListenerStageCompleted here in case there are no tasks to run.
@@ -940,12 +940,12 @@ class DAGScheduler(
940940

941941
def markStageAsFinished(stage: Stage, errorMessage: Option[String] = None) = {
942942
val serviceTime = stage.latestInfo.submissionTime match {
943-
case Some(t) => "%.03f".format((clock.getTime() - t) / 1000.0)
943+
case Some(t) => "%.03f".format((clock.getTimeMillis() - t) / 1000.0)
944944
case _ => "Unknown"
945945
}
946946
if (errorMessage.isEmpty) {
947947
logInfo("%s (%s) finished in %s s".format(stage, stage.name, serviceTime))
948-
stage.latestInfo.completionTime = Some(clock.getTime())
948+
stage.latestInfo.completionTime = Some(clock.getTimeMillis())
949949
} else {
950950
stage.latestInfo.stageFailed(errorMessage.get)
951951
logInfo("%s (%s) failed in %s s".format(stage, stage.name, serviceTime))
@@ -971,7 +971,7 @@ class DAGScheduler(
971971
markStageAsFinished(stage)
972972
cleanupStateForJobAndIndependentStages(job)
973973
listenerBus.post(
974-
SparkListenerJobEnd(job.jobId, clock.getTime(), JobSucceeded))
974+
SparkListenerJobEnd(job.jobId, clock.getTimeMillis(), JobSucceeded))
975975
}
976976

977977
// taskSucceeded runs some user code that might throw an exception. Make sure
@@ -1187,7 +1187,7 @@ class DAGScheduler(
11871187
}
11881188
val dependentJobs: Seq[ActiveJob] =
11891189
activeJobs.filter(job => stageDependsOn(job.finalStage, failedStage)).toSeq
1190-
failedStage.latestInfo.completionTime = Some(clock.getTime())
1190+
failedStage.latestInfo.completionTime = Some(clock.getTimeMillis())
11911191
for (job <- dependentJobs) {
11921192
failJobAndIndependentStages(job, s"Job aborted due to stage failure: $reason")
11931193
}
@@ -1242,7 +1242,7 @@ class DAGScheduler(
12421242
if (ableToCancelStages) {
12431243
job.listener.jobFailed(error)
12441244
cleanupStateForJobAndIndependentStages(job)
1245-
listenerBus.post(SparkListenerJobEnd(job.jobId, clock.getTime(), JobFailed(error)))
1245+
listenerBus.post(SparkListenerJobEnd(job.jobId, clock.getTimeMillis(), JobFailed(error)))
12461246
}
12471247
}
12481248

core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ private[spark] class TaskSetManager(
5151
sched: TaskSchedulerImpl,
5252
val taskSet: TaskSet,
5353
val maxTaskFailures: Int,
54-
clock: Clock = SystemClock)
54+
clock: Clock = new SystemClock())
5555
extends Schedulable with Logging {
5656

5757
val conf = sched.sc.conf
@@ -166,7 +166,7 @@ private[spark] class TaskSetManager(
166166
// last launched a task at that level, and move up a level when localityWaits[curLevel] expires.
167167
// We then move down if we manage to launch a "more local" task.
168168
var currentLocalityIndex = 0 // Index of our current locality level in validLocalityLevels
169-
var lastLaunchTime = clock.getTime() // Time we last launched a task at this level
169+
var lastLaunchTime = clock.getTimeMillis() // Time we last launched a task at this level
170170

171171
override def schedulableQueue = null
172172

@@ -281,7 +281,7 @@ private[spark] class TaskSetManager(
281281
val failed = failedExecutors.get(taskId).get
282282

283283
return failed.contains(execId) &&
284-
clock.getTime() - failed.get(execId).get < EXECUTOR_TASK_BLACKLIST_TIMEOUT
284+
clock.getTimeMillis() - failed.get(execId).get < EXECUTOR_TASK_BLACKLIST_TIMEOUT
285285
}
286286

287287
false
@@ -428,7 +428,7 @@ private[spark] class TaskSetManager(
428428
: Option[TaskDescription] =
429429
{
430430
if (!isZombie) {
431-
val curTime = clock.getTime()
431+
val curTime = clock.getTimeMillis()
432432

433433
var allowedLocality = maxLocality
434434

@@ -459,7 +459,7 @@ private[spark] class TaskSetManager(
459459
lastLaunchTime = curTime
460460
}
461461
// Serialize and return the task
462-
val startTime = clock.getTime()
462+
val startTime = clock.getTimeMillis()
463463
val serializedTask: ByteBuffer = try {
464464
Task.serializeWithDependencies(task, sched.sc.addedFiles, sched.sc.addedJars, ser)
465465
} catch {
@@ -674,7 +674,7 @@ private[spark] class TaskSetManager(
674674
return
675675
}
676676
val key = ef.description
677-
val now = clock.getTime()
677+
val now = clock.getTimeMillis()
678678
val (printFull, dupCount) = {
679679
if (recentExceptions.contains(key)) {
680680
val (dupCount, printTime) = recentExceptions(key)
@@ -706,7 +706,7 @@ private[spark] class TaskSetManager(
706706
}
707707
// always add to failed executors
708708
failedExecutors.getOrElseUpdate(index, new HashMap[String, Long]()).
709-
put(info.executorId, clock.getTime())
709+
put(info.executorId, clock.getTimeMillis())
710710
sched.dagScheduler.taskEnded(tasks(index), reason, null, null, info, taskMetrics)
711711
addPendingTask(index)
712712
if (!isZombie && state != TaskState.KILLED && !reason.isInstanceOf[TaskCommitDenied]) {
@@ -821,7 +821,7 @@ private[spark] class TaskSetManager(
821821
val minFinishedForSpeculation = (SPECULATION_QUANTILE * numTasks).floor.toInt
822822
logDebug("Checking for speculative tasks: minFinished = " + minFinishedForSpeculation)
823823
if (tasksSuccessful >= minFinishedForSpeculation && tasksSuccessful > 0) {
824-
val time = clock.getTime()
824+
val time = clock.getTimeMillis()
825825
val durations = taskInfos.values.filter(_.successful).map(_.duration).toArray
826826
Arrays.sort(durations)
827827
val medianDuration = durations(min((0.5 * tasksSuccessful).round.toInt, durations.size - 1))

core/src/main/scala/org/apache/spark/util/Clock.scala

Lines changed: 41 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,47 @@ package org.apache.spark.util
2121
* An interface to represent clocks, so that they can be mocked out in unit tests.
2222
*/
2323
private[spark] trait Clock {
24-
def getTime(): Long
24+
def getTimeMillis(): Long
25+
def waitTillTime(targetTime: Long): Long
2526
}
2627

27-
private[spark] object SystemClock extends Clock {
28-
def getTime(): Long = System.currentTimeMillis()
28+
/**
29+
* A clock backed by the actual time from the OS as reported by the `System` API.
30+
*/
31+
private[spark] class SystemClock extends Clock {
32+
33+
val minPollTime = 25L
34+
35+
/**
36+
* @return the same time (milliseconds since the epoch)
37+
* as is reported by `System.currentTimeMillis()`
38+
*/
39+
def getTimeMillis(): Long = System.currentTimeMillis()
40+
41+
/**
42+
* @param targetTime block until the current time is at least this value
43+
* @return current system time when wait has completed
44+
*/
45+
def waitTillTime(targetTime: Long): Long = {
46+
var currentTime = 0L
47+
currentTime = System.currentTimeMillis()
48+
49+
var waitTime = targetTime - currentTime
50+
if (waitTime <= 0) {
51+
return currentTime
52+
}
53+
54+
val pollTime = math.max(waitTime / 10.0, minPollTime).toLong
55+
56+
while (true) {
57+
currentTime = System.currentTimeMillis()
58+
waitTime = targetTime - currentTime
59+
if (waitTime <= 0) {
60+
return currentTime
61+
}
62+
val sleepTime = math.min(waitTime, pollTime)
63+
Thread.sleep(sleepTime)
64+
}
65+
-1
66+
}
2967
}
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.util
19+
20+
/**
21+
* A `Clock` whose time can be manually set and modified. Its reported time does not change
22+
* as time elapses, but only as its time is modified by callers. This is mainly useful for
23+
* testing.
24+
*
25+
* @param time initial time (in milliseconds since the epoch)
26+
*/
27+
private[spark] class ManualClock(private var time: Long) extends Clock {
28+
29+
/**
30+
* @return `ManualClock` with initial time 0
31+
*/
32+
def this() = this(0L)
33+
34+
def getTimeMillis(): Long =
35+
synchronized {
36+
time
37+
}
38+
39+
/**
40+
* @param timeToSet new time (in milliseconds) that the clock should represent
41+
*/
42+
def setTime(timeToSet: Long) =
43+
synchronized {
44+
time = timeToSet
45+
notifyAll()
46+
}
47+
48+
/**
49+
* @param timeToAdd time (in milliseconds) to add to the clock's time
50+
*/
51+
def advance(timeToAdd: Long) =
52+
synchronized {
53+
time += timeToAdd
54+
notifyAll()
55+
}
56+
57+
/**
58+
* @param targetTime block until the clock time is set or advanced to at least this time
59+
* @return current time reported by the clock when waiting finishes
60+
*/
61+
def waitTillTime(targetTime: Long): Long =
62+
synchronized {
63+
while (time < targetTime) {
64+
wait(100)
65+
}
66+
getTimeMillis()
67+
}
68+
69+
}

0 commit comments

Comments
 (0)