diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala index 86f069b0bd609..f47cab582ddae 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala @@ -58,7 +58,7 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging { } private val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds, - longTime => eventLoop.post(GenerateJobs(new Time(longTime))), "JobGenerator") + longTime => eventLoop.post(GenerateJobs(new Time(longTime))), "JobGenerator", ssc.sparkContext.getConf.getLong("spark.streaming.starttime.jitter", 0)) // This is marked lazy so that this is initialized after checkpoint duration has been set // in the context and the generator has been started. diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/RecurringTimer.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/RecurringTimer.scala index 62e681e3e9646..012d252e5a9e4 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/RecurringTimer.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/RecurringTimer.scala @@ -21,7 +21,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.util.{Clock, SystemClock} private[streaming] -class RecurringTimer(clock: Clock, period: Long, callback: (Long) => Unit, name: String) +class RecurringTimer(clock: Clock, period: Long, callback: (Long) => Unit, name: String, jitter: Long = 0) extends Logging { private val thread = new Thread("RecurringTimer - " + name) { @@ -39,7 +39,7 @@ class RecurringTimer(clock: Clock, period: Long, callback: (Long) => Unit, name: * current system time. */ def getStartTime(): Long = { - (math.floor(clock.getTimeMillis().toDouble / period) + 1).toLong * period + (math.floor(clock.getTimeMillis().toDouble / period) + 1).toLong * period + jitter } /** diff --git a/streaming/src/test/scala/org/apache/spark/streaming/util/RecurringTimerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/util/RecurringTimerSuite.scala index 25b70a3d089ee..bc4b6fd4c3d0d 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/util/RecurringTimerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/util/RecurringTimerSuite.scala @@ -82,4 +82,35 @@ class RecurringTimerSuite extends SparkFunSuite with PrivateMethodTester { assert(results.asScala.toSeq === Seq(0L, 100L, 200L)) assert(lastTime === 200L) } + + test("SPARK-14230: add a start time jitter for the RecurringTimer") { + val jitter = 10 + val period = 100 + val clock = new ManualClock() + val results = new ConcurrentLinkedQueue[Long]() + val timer = new RecurringTimer(clock, period, time => { + results.add(time) + }, "RecurringTimerSuite-jitter", jitter) + + assert(timer.getStartTime() === period + jitter) + timer.start() + clock.advance(jitter) + clock.advance(period) + eventually(timeout(10.seconds), interval(10.millis)) { + assert(results.asScala.toSeq === Seq(jitter + period)) + } + clock.advance(period) + eventually(timeout(10.seconds), interval(10.millis)) { + assert(results.asScala.toSeq === Seq(jitter + period, jitter + period * 2)) + } + clock.advance(period * 2) + eventually(timeout(10.seconds), interval(10.millis)) { + assert(results.asScala.toSeq === Seq(jitter + period, + jitter + period * 2, + jitter + period * 3, + jitter + period * 4)) + } + + assert(timer.stop(interruptTimer = true) === (jitter + period * 4)) + } }