Skip to content

Commit 160863a

Browse files
committed
Consolidate Streaming Clock class into common util Clock
1 parent 7c956b2 commit 160863a

File tree

33 files changed

+195
-239
lines changed

33 files changed

+195
-239
lines changed

core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ private[spark] class ExecutorAllocationManager(
124124
private val intervalMillis: Long = 100
125125

126126
// Clock used to schedule when executors should be added and removed
127-
private var clock: Clock = SystemClock
127+
private var clock: Clock = new SystemClock()
128128

129129
// Listener for Spark events that impact the allocation policy
130130
private val listener = new ExecutorAllocationListener

core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ private[spark] class DriverRunner(
5858
// Decoupled for testing
5959
private[deploy] def setClock(_clock: Clock) = clock = _clock
6060
private[deploy] def setSleeper(_sleeper: Sleeper) = sleeper = _sleeper
61-
private var clock: Clock = SystemClock
61+
private var clock: Clock = new SystemClock()
6262
private var sleeper = new Sleeper {
6363
def sleep(seconds: Int): Unit = (0 until seconds).takeWhile(f => {Thread.sleep(1000); !killed})
6464
}

core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ class DAGScheduler(
6363
mapOutputTracker: MapOutputTrackerMaster,
6464
blockManagerMaster: BlockManagerMaster,
6565
env: SparkEnv,
66-
clock: org.apache.spark.util.Clock = SystemClock)
66+
clock: Clock = new SystemClock())
6767
extends Logging {
6868

6969
def this(sc: SparkContext, taskScheduler: TaskScheduler) = {

core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ private[spark] class TaskSetManager(
5151
sched: TaskSchedulerImpl,
5252
val taskSet: TaskSet,
5353
val maxTaskFailures: Int,
54-
clock: Clock = SystemClock)
54+
clock: Clock = new SystemClock())
5555
extends Schedulable with Logging {
5656

5757
val conf = sched.sc.conf

core/src/main/scala/org/apache/spark/util/Clock.scala

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,35 @@ package org.apache.spark.util
2222
*/
2323
private[spark] trait Clock {
2424
def getTime(): Long
25+
def waitTillTime(targetTime: Long): Long
2526
}
2627

27-
private[spark] object SystemClock extends Clock {
28+
private[spark] class SystemClock extends Clock {
29+
30+
val minPollTime = 25L
31+
2832
def getTime(): Long = System.currentTimeMillis()
33+
34+
def waitTillTime(targetTime: Long): Long = {
35+
var currentTime = 0L
36+
currentTime = System.currentTimeMillis()
37+
38+
var waitTime = targetTime - currentTime
39+
if (waitTime <= 0) {
40+
return currentTime
41+
}
42+
43+
val pollTime = math.max(waitTime / 10.0, minPollTime).toLong
44+
45+
while (true) {
46+
currentTime = System.currentTimeMillis()
47+
waitTime = targetTime - currentTime
48+
if (waitTime <= 0) {
49+
return currentTime
50+
}
51+
val sleepTime = math.min(waitTime, pollTime)
52+
Thread.sleep(sleepTime)
53+
}
54+
-1
55+
}
2956
}

core/src/test/scala/org/apache/spark/util/FakeClock.scala renamed to core/src/main/scala/org/apache/spark/util/FakeClock.scala

Lines changed: 27 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,33 @@
1717

1818
package org.apache.spark.util
1919

20-
class FakeClock(private var time: Long = 0L) extends Clock {
20+
class FakeClock(private var time: Long) extends Clock {
2121

22-
def advance(millis: Long): Unit = time += millis
22+
def this() = this(0L)
23+
24+
def getTime(): Long =
25+
synchronized {
26+
time
27+
}
28+
29+
def setTime(timeToSet: Long) =
30+
synchronized {
31+
time = timeToSet
32+
notifyAll()
33+
}
34+
35+
def advance(timeToAdd: Long) =
36+
this.synchronized {
37+
time += timeToAdd
38+
notifyAll()
39+
}
40+
41+
def waitTillTime(targetTime: Long): Long =
42+
synchronized {
43+
while (time < targetTime) {
44+
wait(100)
45+
}
46+
getTime()
47+
}
2348

24-
def getTime(): Long = time
2549
}

external/flume/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ public void setUp() {
3131
SparkConf conf = new SparkConf()
3232
.setMaster("local[2]")
3333
.setAppName("test")
34-
.set("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock");
34+
.set("spark.streaming.clock", "org.apache.spark.util.FakeClock");
3535
ssc = new JavaStreamingContext(conf, new Duration(1000));
3636
ssc.checkpoint("checkpoint");
3737
}

external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -34,10 +34,9 @@ import org.scalatest.{BeforeAndAfter, FunSuite}
3434
import org.apache.spark.{SparkConf, Logging}
3535
import org.apache.spark.storage.StorageLevel
3636
import org.apache.spark.streaming.dstream.ReceiverInputDStream
37-
import org.apache.spark.streaming.util.ManualClock
3837
import org.apache.spark.streaming.{Seconds, TestOutputStream, StreamingContext}
3938
import org.apache.spark.streaming.flume.sink._
40-
import org.apache.spark.util.Utils
39+
import org.apache.spark.util.{FakeClock, Utils}
4140

4241
class FlumePollingStreamSuite extends FunSuite with BeforeAndAfter with Logging {
4342

@@ -54,7 +53,7 @@ class FlumePollingStreamSuite extends FunSuite with BeforeAndAfter with Logging
5453

5554
def beforeFunction() {
5655
logInfo("Using manual clock")
57-
conf.set("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock")
56+
conf.set("spark.streaming.clock", "org.apache.spark.util.FakeClock")
5857
}
5958

6059
before(beforeFunction())
@@ -171,7 +170,7 @@ class FlumePollingStreamSuite extends FunSuite with BeforeAndAfter with Logging
171170

172171
def writeAndVerify(channels: Seq[MemoryChannel], ssc: StreamingContext,
173172
outputBuffer: ArrayBuffer[Seq[SparkFlumeEvent]]) {
174-
val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
173+
val clock = ssc.scheduler.clock.asInstanceOf[FakeClock]
175174
val executor = Executors.newCachedThreadPool()
176175
val executorCompletion = new ExecutorCompletionService[Void](executor)
177176
channels.map(channel => {
@@ -221,7 +220,7 @@ class FlumePollingStreamSuite extends FunSuite with BeforeAndAfter with Logging
221220
assert(m.invoke(queueRemaining.get(channel)).asInstanceOf[Int] === 5000)
222221
}
223222

224-
private class TxnSubmitter(channel: MemoryChannel, clock: ManualClock) extends Callable[Void] {
223+
private class TxnSubmitter(channel: MemoryChannel, clock: FakeClock) extends Callable[Void] {
225224
override def call(): Void = {
226225
var t = 0
227226
for (i <- 0 until batchCount) {
@@ -236,7 +235,7 @@ class FlumePollingStreamSuite extends FunSuite with BeforeAndAfter with Logging
236235
tx.commit()
237236
tx.close()
238237
Thread.sleep(500) // Allow some time for the events to reach
239-
clock.addToTime(batchDuration.milliseconds)
238+
clock.advance(batchDuration.milliseconds)
240239
}
241240
null
242241
}

external/mqtt/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ public void setUp() {
3131
SparkConf conf = new SparkConf()
3232
.setMaster("local[2]")
3333
.setAppName("test")
34-
.set("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock");
34+
.set("spark.streaming.clock", "org.apache.spark.util.FakeClock");
3535
ssc = new JavaStreamingContext(conf, new Duration(1000));
3636
ssc.checkpoint("checkpoint");
3737
}

external/twitter/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ public void setUp() {
3131
SparkConf conf = new SparkConf()
3232
.setMaster("local[2]")
3333
.setAppName("test")
34-
.set("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock");
34+
.set("spark.streaming.clock", "org.apache.spark.util.FakeClock");
3535
ssc = new JavaStreamingContext(conf, new Duration(1000));
3636
ssc.checkpoint("checkpoint");
3737
}

0 commit comments

Comments
 (0)