diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala index e23edfa50651..fc991c494451 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala @@ -24,15 +24,13 @@ import scala.collection.mutable.HashMap import scala.language.implicitConversions import scala.reflect.ClassTag import scala.util.matching.Regex - import org.apache.spark.{SparkContext, SparkException} import org.apache.spark.internal.Logging -import org.apache.spark.internal.io.SparkHadoopWriterUtils -import org.apache.spark.rdd.{BlockRDD, RDD, RDDOperationScope} +import org.apache.spark.rdd.{BlockRDD, PairRDDFunctions, RDD, RDDOperationScope} import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming._ import org.apache.spark.streaming.StreamingContext.rddToFileName -import org.apache.spark.streaming.scheduler.Job +import org.apache.spark.streaming.scheduler.{BatchController, Job} import org.apache.spark.streaming.ui.UIUtils import org.apache.spark.util.{CallSite, Utils} @@ -69,13 +67,13 @@ abstract class DStream[T: ClassTag] ( // Methods that should be implemented by subclasses of DStream // ======================================================================= - /** Time interval after which the DStream generates an RDD */ + /** Time interval after which the DStream generates a RDD */ def slideDuration: Duration /** List of parent DStreams on which this DStream depends on */ def dependencies: List[DStream[_]] - /** Method that generates an RDD for the given time */ + /** Method that generates a RDD for the given time */ def compute(validTime: Time): Option[RDD[T]] // ======================================================================= @@ -311,7 +309,9 @@ abstract class DStream[T: ClassTag] ( private[streaming] def isTimeValid(time: Time): Boolean = { if (!isInitialized) { throw new SparkException (this + " has not been initialized") - } else if (time <= zeroTime || ! (time - zeroTime).isMultipleOf(slideDuration)) { + } else if (time <= zeroTime || + ! ((time - zeroTime).isMultipleOf(slideDuration) || + BatchController.getBatchIntervalEnabled())) { logInfo(s"Time $time is invalid as zeroTime is $zeroTime" + s" , slideDuration is $slideDuration and difference is ${time - zeroTime}") false @@ -338,7 +338,7 @@ abstract class DStream[T: ClassTag] ( // scheduler, since we may need to write output to an existing directory during checkpoint // recovery; see SPARK-4835 for more details. We need to have this call here because // compute() might cause Spark jobs to be launched. - SparkHadoopWriterUtils.disableOutputSpecValidation.withValue(true) { + PairRDDFunctions.disableOutputSpecValidation.withValue(true) { compute(time) } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchController.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchController.scala new file mode 100644 index 000000000000..1de4348d86e0 --- /dev/null +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchController.scala @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.scheduler + +import java.util.concurrent.atomic.AtomicLong + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging +import org.apache.spark.streaming.scheduler.batch.BatchEstimator +import org.apache.spark.util.ThreadUtils +import scala.concurrent.{Future ,ExecutionContext} + +abstract class BatchController(val streamUID: Int, batchEstimator: BatchEstimator) + extends StreamingListener with Serializable with Logging { + + init() + + protected def publish(rate: Long): Unit + + @transient + implicit private var executionContext: ExecutionContext = _ + + @transient + private var batchInterval: AtomicLong = _ + + var totalDelay: Long = -1L + var schedulerDelay: Long = -1L + var numRecords: Long = -1L + var processingDelay: Long = -1L + var batchIntevl: Long = -1L + + /** + * An initialization method called both from the constructor and Serialization code. + */ + private def init() { + executionContext = ExecutionContext.fromExecutorService( + ThreadUtils.newDaemonSingleThreadExecutor("stream-batchInteval-update")) + batchInterval = new AtomicLong(-1L) + } + + /** + * Computing the Batch Intarvel and Publish it + */ + private def computeAndPublish(processTime: Long, batchIntevl: Long): Unit = + Future[Unit]{ + val newBatchInterval = batchEstimator.compute(processTime, batchIntevl) + logInfo(s" ##### the newBatchInterval is $newBatchInterval") + newBatchInterval.foreach{ s => + batchInterval.set(s) + logInfo(s" ##### after setting newBatchInterval is $batchInterval") + publish(getLatestBatchInterval()) + } + } + + def getLatestBatchInterval(): Long = batchInterval.get() + + + /** + * Compute the batch interval after completed + * @param batchCompleted + */ + override def onBatchCompleted (batchCompleted: StreamingListenerBatchCompleted) { + totalDelay = batchCompleted.batchInfo.totalDelay.get + schedulerDelay = batchCompleted.batchInfo.schedulingDelay.get + numRecords = batchCompleted.batchInfo.numRecords + processingDelay = batchCompleted.batchInfo.processingDelay.get + batchIntevl = batchCompleted.batchInfo.batchInterval + + logInfo(s"processingDelay $processingDelay | batchInterval $batchIntevl " + + s"| totalDelay $totalDelay | schedulerDelay $schedulerDelay | numRecords $numRecords") + + for{ + processingTime <- batchCompleted.batchInfo.processingDelay + batchInterval <- Option(batchCompleted.batchInfo.batchInterval) + } computeAndPublish(processingTime, batchInterval) + + logInfo(s" ##### Hear onBatchCompleted msg and begin to compute.") + } +} + /** + *Get the configure + */ +object BatchController { + // is the dynamic batch interval enabled + var isEnable: Boolean = false + def isDynamicBatchIntervalEnabled(conf: SparkConf): Boolean = + conf.getBoolean("spark.streaming.dynamicBatchInterval.enabled", false) + + def getBatchIntervalEnabled(): Boolean = isEnable + def setBatchIntervalEnabled(enabled: Boolean): Unit = { + isEnable = enabled + } +} diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala index 5b2b959f8138..e694ba62bcad 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala @@ -23,6 +23,7 @@ import org.apache.spark.streaming.Time /** * :: DeveloperApi :: * Class having information on completed batches. + * @param batchInterval Time of the batch Intercval * @param batchTime Time of the batch * @param streamIdToInputInfo A map of input stream id to its input info * @param submissionTime Clock time of when jobs of this batch was submitted to @@ -33,6 +34,7 @@ import org.apache.spark.streaming.Time */ @DeveloperApi case class BatchInfo( + batchInterval: Long, batchTime: Time, streamIdToInputInfo: Map[Int, StreamInputInfo], submissionTime: Long, diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala index 8d83dc8a8fc0..52be5c1ad1fb 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala @@ -17,11 +17,12 @@ package org.apache.spark.streaming.scheduler -import scala.util.{Failure, Success, Try} - import org.apache.spark.internal.Logging +import org.apache.spark.streaming.scheduler.batch.BatchEstimator + +import scala.util.{Failure, Success, Try} import org.apache.spark.rdd.RDD -import org.apache.spark.streaming.{Checkpoint, CheckpointWriter, Time} +import org.apache.spark.streaming.{Checkpoint, CheckpointWriter, Duration, Time} import org.apache.spark.streaming.api.python.PythonDStream import org.apache.spark.streaming.util.RecurringTimer import org.apache.spark.util.{Clock, EventLoop, ManualClock, Utils} @@ -34,6 +35,8 @@ private[scheduler] case class DoCheckpoint( time: Time, clearCheckpointDataLater: Boolean) extends JobGeneratorEvent private[scheduler] case class ClearCheckpointData(time: Time) extends JobGeneratorEvent +case class UpdateBatchInterval(newBatchInterval : Long) extends JobGeneratorEvent + /** * This class generates jobs from DStreams as well as drives checkpointing and cleaning * up DStream metadata. @@ -77,6 +80,15 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging { // last batch whose completion,checkpointing and metadata cleanup has been completed private var lastProcessedBatch: Time = null + protected[streaming] val batchController: Option[BatchController] = { + if (BatchController.isDynamicBatchIntervalEnabled(ssc.conf)) { + logInfo(s" ##### init batchController....") + Some(new ReceiverBatchController( + ssc.getNewInputStreamId(), BatchEstimator.create(ssc.conf, ssc.graph.batchDuration))) + } else { + None + } + } /** Start generation of jobs */ def start(): Unit = synchronized { if (eventLoop != null) return // generator has already been started @@ -185,6 +197,7 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging { case DoCheckpoint(time, clearCheckpointDataLater) => doCheckpoint(time, clearCheckpointDataLater) case ClearCheckpointData(time) => clearCheckpointData(time) + case UpdateBatchInterval(newBatchInterval) => updateBatchInteval(newBatchInterval) } } @@ -231,7 +244,8 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging { // added but not allocated, are dangling in the queue after recovering, we have to allocate // those blocks to the next batch, which is the batch they were supposed to go. jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch - jobScheduler.submitJobSet(JobSet(time, graph.generateJobs(time))) + jobScheduler.submitJobSet(JobSet( + graph.batchDuration.milliseconds, time, graph.generateJobs(time))) } // Restart the timer @@ -250,7 +264,8 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging { } match { case Success(jobs) => val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time) - jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos)) + jobScheduler.submitJobSet(JobSet( + graph.batchDuration.milliseconds, time, jobs, streamIdToInputInfos)) case Failure(e) => jobScheduler.reportError("Error generating jobs for time " + time, e) PythonDStream.stopStreamingContextIfPythonProcessIsDead(e) @@ -289,6 +304,21 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging { markBatchFullyProcessed(time) } + + /** + * update batch interval + * */ + private def updateBatchInterval(newBatchInterval: Long) { + this.synchronized{ + logInfo(s" ##### updateBatchInterval. before update the batchDuration " + + "${graph.batchDuration} and the period ${timer.period} ") + graph.batchDuration = new Duration(newBatchInterval) + timer.period = newBatchInterval + logInfo(s" ##### after update the batchDuration" + + " ${graph.batchDuration} and the period ${timer.period} ") + } + } + /** Perform checkpoint for the given `time`. */ private def doCheckpoint(time: Time, clearCheckpointDataLater: Boolean) { if (shouldCheckpoint && (time - graph.zeroTime).isMultipleOf(ssc.checkpointDuration)) { @@ -303,4 +333,12 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging { private def markBatchFullyProcessed(time: Time) { lastProcessedBatch = time } + + private[streaming] class ReceiverBatchController(id: Int, estimator: BatchEstimator) + extends BatchController(id, estimator) with Logging{ + override def publish(newBatchInterval: Long): Unit = { + logInfo(s"##### Begin to publish new batch interval $newBatchInterval " ) + eventLoop.post(UpdateBatchInterval(newBatchInterval)) + } + } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala index 2fa3bf7d5230..8277e7ba1326 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala @@ -26,8 +26,7 @@ import org.apache.commons.lang3.SerializationUtils import org.apache.spark.ExecutorAllocationClient import org.apache.spark.internal.Logging -import org.apache.spark.internal.io.SparkHadoopWriterUtils -import org.apache.spark.rdd.RDD +import org.apache.spark.rdd.{PairRDDFunctions, RDD} import org.apache.spark.streaming._ import org.apache.spark.streaming.api.python.PythonDStream import org.apache.spark.streaming.ui.UIUtils @@ -83,6 +82,11 @@ class JobScheduler(val ssc: StreamingContext) extends Logging { rateController <- inputDStream.rateController } ssc.addStreamingListener(rateController) + // attach batch controllers of input streams to receive batch completion updates + for { + batchController <- jobGenerator.batchController + } ssc.addStreamingListener(batchController) + listenerBus.start() receiverTracker = new ReceiverTracker(ssc) inputInfoTracker = new InputInfoTracker(ssc) @@ -201,20 +205,18 @@ class JobScheduler(val ssc: StreamingContext) extends Logging { listenerBus.post(StreamingListenerOutputOperationCompleted(job.toOutputOperationInfo)) logInfo("Finished job " + job.id + " from job set of time " + jobSet.time) if (jobSet.hasCompleted) { + jobSets.remove(jobSet.time) + jobGenerator.onBatchCompletion(jobSet.time) + logInfo("Total delay: %.3f s for time %s (execution: %.3f s)".format( + jobSet.totalDelay / 1000.0, jobSet.time.toString, + jobSet.processingDelay / 1000.0 + )) listenerBus.post(StreamingListenerBatchCompleted(jobSet.toBatchInfo)) } job.result match { case Failure(e) => reportError("Error running job " + job, e) case _ => - if (jobSet.hasCompleted) { - jobSets.remove(jobSet.time) - jobGenerator.onBatchCompletion(jobSet.time) - logInfo("Total delay: %.3f s for time %s (execution: %.3f s)".format( - jobSet.totalDelay / 1000.0, jobSet.time.toString, - jobSet.processingDelay / 1000.0 - )) - } } } @@ -253,7 +255,7 @@ class JobScheduler(val ssc: StreamingContext) extends Logging { // Disable checks for existing output directories in jobs launched by the streaming // scheduler, since we may need to write output to an existing directory during checkpoint // recovery; see SPARK-4835 for more details. - SparkHadoopWriterUtils.disableOutputSpecValidation.withValue(true) { + PairRDDFunctions.disableOutputSpecValidation.withValue(true) { job.run() } _eventLoop = eventLoop diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala index 0baedaf275d6..60fcd9789088 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala @@ -27,6 +27,7 @@ import org.apache.spark.streaming.Time */ private[streaming] case class JobSet( + batchInterval: Long, time: Time, jobs: Seq[Job], streamIdToInputInfo: Map[Int, StreamInputInfo] = Map.empty) { @@ -62,6 +63,7 @@ case class JobSet( def toBatchInfo: BatchInfo = { BatchInfo( + batchInterval, time, streamIdToInputInfo, submissionTime, diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/batch/BasicBatchEstimator.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/batch/BasicBatchEstimator.scala new file mode 100644 index 000000000000..0de817f75d82 --- /dev/null +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/batch/BasicBatchEstimator.scala @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.scheduler.batch + +import java.util.concurrent.atomic.AtomicInteger + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging + +class BasicBatchEstimator(conf: SparkConf) + extends BatchEstimator with Logging{ + // The batch interval list + private var batchList: Array[Long] = null + + // The id of the batch interval list + private var id: Int = 0 + + // The length of the batch interval list + private var length: Int = 0 + + init(conf) + + /** Initialize the batchList from the configure */ + def init(conf: SparkConf ) : Unit = { + conf.get("spark.streaming.BatchEstimator", "basic") match { + case "basic" => + val batchinfo = conf.get( + "spark.streaming.BatchEstimator.basic.batchList", "0") + val batchStr = batchinfo.split(",") + length = batchStr.length + batchList = new Array[Long](length) + for(i <- 0 until length) { + batchList(i) = toMaybeLong(batchStr.apply(i)) + } + case estimator => + throw new IllegalArgumentException(s"Unknown rate estimator: $estimator") + } + } + + /** Convert A String to a Long value */ + def toMaybeLong(s : String ): Long = { + return scala.util.Try(s.toLong).toOption.getOrElse(0) + } + + /** algorithm to get the next batch interval */ + def compute(latestProcessingTime: Long, latestBatchInterval: Long): Option[Long] = { + this.synchronized { + if (id >= length) { + id = 0 + } + var batch = batchList.apply(id) * 1000 + id = id + 1 + if(batch == 0) { + batch = latestBatchInterval + } + return Some(batch) + } + } +} diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/batch/BatchEstimator.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/batch/BatchEstimator.scala new file mode 100644 index 000000000000..4dd52b3de5bc --- /dev/null +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/batch/BatchEstimator.scala @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.scheduler.batch + +import org.apache.spark.streaming.Duration +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging + + +trait BatchEstimator extends Serializable { + def compute(processingTime: Long, batchInterval: Long) : Option[Long] +} + +object BatchEstimator extends Logging{ + + // + def create(conf: SparkConf, batchInterval: Duration): BatchEstimator = { + + if (isDefaultBatchIntervalEnabled(conf)) { + conf.get("spark.streaming.BatchEstimator", "default") match { + case "default" => + val proportional = conf.getDouble( + "spark.streaming.BatchEstimator.default.proportional", 0.85) + val converge = conf.getDouble("spark.streaming.BatchEstimator.default.converge", 30) + val gradient = conf.getDouble("spark.streaming.BatchEstimator.default.gradient", 0.85) + val stateThreshold = conf.getDouble( + "spark.streaming.BatchEstimator.default.stateThreshold", 1.4) + + new DefaultBatchEstimator(proportional, converge, gradient, stateThreshold) + + case estimator => + throw new IllegalArgumentException(s"Unkown rate estimator: $estimator") + } + } else { + conf.get("spark.streaming.BatchEstimator", "basic") match { + case "basic" => + new BasicBatchEstimator(conf) + + case estimator => + throw new IllegalArgumentException(s"Unkown rate estimator: $estimator") + } + } + } + + + def isDefaultBatchIntervalEnabled(conf: SparkConf): Boolean = + conf.getBoolean("spark.streaming.DefaultBatchInterval.enabled", false) + + def isBasicBatchIntervalEnabled(conf: SparkConf): Boolean = + conf.getBoolean("spark.streaming.BasicBatchInterval.enabled", false) +} diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/batch/DefaultBatchEstimator.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/batch/DefaultBatchEstimator.scala new file mode 100644 index 000000000000..a9f32e64d479 --- /dev/null +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/batch/DefaultBatchEstimator.scala @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.scheduler.batch +import java.util.concurrent.atomic.AtomicInteger + +import org.apache.spark.internal.Logging + +//noinspection ScalaStyle +class DefaultBatchEstimator( + proportional: Double, // Decreasing factor + converge: Double, // Convergence factor + gradient: Double, // Slope of stability line + stateThreshold: Double // State judgment threshold +) extends BatchEstimator with Logging{ + + private var firstRun: Boolean = true //mark the first run + private var latestSecondProcessingTime: Long = -1L + private var latestSecondBatchInterval: Long = -1L + private var PLarge: Long = -1L //Larger Processing Time + private var PSmall: Long = -1L //Smaller Processing Time + private var BLarge: Long = -1L //Larger Batch Interval + private var BSmall: Long = -1L //Smaller Batch Interval + private var currentSlope: Double = -1L + private var slopeDeviation: Double = -1L + private var angleDeviation: Double = -1L + private var nextBatchInterval: Long = -1L + private var tmpGrd: Double = -1L // temporary gradient + + var count: AtomicInteger = _ //counter + var latestStable:Boolean = true + var latencyCount: AtomicInteger = _ + + init() + + def init(){ + logInfo(s"the computer argument proportional: $proportional converge: $converge gradient: $gradient ") + count = new AtomicInteger(0) + latencyCount = new AtomicInteger(0) + } + + /** + * algorithm to get the next batch interval + * + * @param latestProcessingTime + * @param latestBatchInterval + * + * */ + def compute(latestProcessingTime: Long, latestBatchInterval: Long): Option[Long] ={ + logInfo(s"Compute the $count batch interval by latestSecondProcessingTime $latestSecondProcessingTime latestSecondBatchInterval" + + s" $latestSecondBatchInterval latestProcessingTime $latestProcessingTime "+ + s"and latestBatchInterval $latestBatchInterval tmpGrd $tmpGrd") + + BLarge = Math.max(latestSecondBatchInterval,latestBatchInterval) + BSmall = Math.min(latestSecondBatchInterval,latestBatchInterval) + + PLarge = Math.max(latestSecondProcessingTime,latestProcessingTime) + PSmall = Math.min(latestSecondProcessingTime,latestProcessingTime) + + currentSlope = latestProcessingTime.toDouble / latestBatchInterval.toDouble + slopeDeviation = Math.atan(gradient) - Math.atan(currentSlope) // Computing the Slope Difference + angleDeviation = slopeDeviation * 180 / Math.PI + logInfo(s"latestStable $latestStable | currentSlope $currentSlope | latencyCount $latencyCount | slopeDeviation $slopeDeviation | angleDeviation $angleDeviation stateThreshold $stateThreshold") + + /** Update the latest processing time and the latest batch interval*/ + def end { + latestSecondProcessingTime = latestProcessingTime + latestSecondBatchInterval = latestBatchInterval + } + + this.synchronized{ + count.set(count.incrementAndGet()) + + // enable the function, but not return the result + if(latestProcessingTime < 200){ + return Some(latestBatchInterval) + } + + //The first time + if(firstRun){ + end + firstRun = false + return Some(latestBatchInterval) // unchange batch interval + } + // system is convergence + if(currentSlope < gradient && angleDeviation < converge){ + nextBatchInterval = latestBatchInterval + end + latestStable = true + return Some(nextBatchInterval) + }else if(currentSlope < gradient && angleDeviation > converge){ // reduce the batch interval + nextBatchInterval = (Math.floor(proportional * BSmall / 200) * 200).toLong + latestStable = true + end + return Some(nextBatchInterval) + }else{ // System is not stable + // must have two unstable situation + if(latestStable == false) { + // Batch Interval is not changed + if (BLarge == BSmall) { + latencyCount.set(latencyCount.incrementAndGet()) + if(latencyCount.get() >= 2 ){ + tmpGrd = currentSlope + }else { + tmpGrd = (latestProcessingTime + latestSecondProcessingTime).toDouble / (latestSecondBatchInterval + latestBatchInterval).toDouble + } + } else { + latencyCount.set(0) // reset + tmpGrd = (PLarge - PSmall).toDouble / (BLarge - BSmall).toDouble + } + + // judge the state + if (tmpGrd > stateThreshold) { + // Reduce batch interval + nextBatchInterval = (Math.floor(proportional * BSmall / 200) * 200).toLong + end + return Some(nextBatchInterval) + } else { + // Increase batch interval + nextBatchInterval = (Math.floor(BLarge / gradient / 200) * 200).toLong + end + return Some(nextBatchInterval) + } + }else{ + latestStable = false + end + return Some(latestBatchInterval) + } + } + } + } +} diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/RecurringTimer.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/RecurringTimer.scala index 62e681e3e964..521db12bca4a 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/RecurringTimer.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/RecurringTimer.scala @@ -21,7 +21,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.util.{Clock, SystemClock} private[streaming] -class RecurringTimer(clock: Clock, period: Long, callback: (Long) => Unit, name: String) +class RecurringTimer(clock: Clock, var period: Long, callback: (Long) => Unit, name: String) extends Logging { private val thread = new Thread("RecurringTimer - " + name) { diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala index 56b400850fdd..ea74bd16bf68 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala @@ -62,12 +62,8 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers { 0 -> StreamInputInfo(0, 300L), 1 -> StreamInputInfo(1, 300L, Map(StreamInputInfo.METADATA_KEY_DESCRIPTION -> "test"))) - // onStreamingStarted - listener.onStreamingStarted(StreamingListenerStreamingStarted(100L)) - listener.startTime should be (100) - // onBatchSubmitted - val batchInfoSubmitted = BatchInfo(Time(1000), streamIdToInputInfo, 1000, None, None, Map.empty) + val batchInfoSubmitted = BatchInfo(1000, Time(1000), streamIdToInputInfo, 1000, None, None, Map.empty) listener.onBatchSubmitted(StreamingListenerBatchSubmitted(batchInfoSubmitted)) listener.waitingBatches should be (List(BatchUIData(batchInfoSubmitted))) listener.runningBatches should be (Nil) @@ -81,7 +77,7 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers { // onBatchStarted val batchInfoStarted = - BatchInfo(Time(1000), streamIdToInputInfo, 1000, Some(2000), None, Map.empty) + BatchInfo(1000, Time(1000), streamIdToInputInfo, 1000, Some(2000), None, Map.empty) listener.onBatchStarted(StreamingListenerBatchStarted(batchInfoStarted)) listener.waitingBatches should be (Nil) listener.runningBatches should be (List(BatchUIData(batchInfoStarted))) @@ -124,7 +120,7 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers { // onBatchCompleted val batchInfoCompleted = - BatchInfo(Time(1000), streamIdToInputInfo, 1000, Some(2000), None, Map.empty) + BatchInfo(1000, Time(1000), streamIdToInputInfo, 1000, Some(2000), None, Map.empty) listener.onBatchCompleted(StreamingListenerBatchCompleted(batchInfoCompleted)) listener.waitingBatches should be (Nil) listener.runningBatches should be (Nil) @@ -166,7 +162,7 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers { val streamIdToInputInfo = Map(0 -> StreamInputInfo(0, 300L), 1 -> StreamInputInfo(1, 300L)) val batchInfoCompleted = - BatchInfo(Time(1000), streamIdToInputInfo, 1000, Some(2000), None, Map.empty) + BatchInfo(1000, Time(1000), streamIdToInputInfo, 1000, Some(2000), None, Map.empty) for(_ <- 0 until (limit + 10)) { listener.onBatchCompleted(StreamingListenerBatchCompleted(batchInfoCompleted)) @@ -184,7 +180,7 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers { // fulfill completedBatchInfos for(i <- 0 until limit) { val batchInfoCompleted = BatchInfo( - Time(1000 + i * 100), Map.empty, 1000 + i * 100, Some(2000 + i * 100), None, Map.empty) + 1000, Time(1000 + i * 100), Map.empty, 1000 + i * 100, Some(2000 + i * 100), None, Map.empty) listener.onBatchCompleted(StreamingListenerBatchCompleted(batchInfoCompleted)) val jobStart = createJobStart(Time(1000 + i * 100), outputOpId = 0, jobId = 1) listener.onJobStart(jobStart) @@ -195,7 +191,7 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers { listener.onJobStart(jobStart) val batchInfoSubmitted = - BatchInfo(Time(1000 + limit * 100), Map.empty, (1000 + limit * 100), None, None, Map.empty) + BatchInfo(1000, Time(1000 + limit * 100), Map.empty, (1000 + limit * 100), None, None, Map.empty) listener.onBatchSubmitted(StreamingListenerBatchSubmitted(batchInfoSubmitted)) // We still can see the info retrieved from onJobStart @@ -212,7 +208,7 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers { // A lot of "onBatchCompleted"s happen before "onJobStart" for(i <- limit + 1 to limit * 2) { val batchInfoCompleted = BatchInfo( - Time(1000 + i * 100), Map.empty, 1000 + i * 100, Some(2000 + i * 100), None, Map.empty) + 1000, Time(1000 + i * 100), Map.empty, 1000 + i * 100, Some(2000 + i * 100), None, Map.empty) listener.onBatchCompleted(StreamingListenerBatchCompleted(batchInfoCompleted)) } @@ -238,12 +234,12 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers { // onBatchSubmitted val batchInfoSubmitted = - BatchInfo(Time(1000), streamIdToInputInfo, 1000, None, None, Map.empty) + BatchInfo(1000, Time(1000), streamIdToInputInfo, 1000, None, None, Map.empty) listener.onBatchSubmitted(StreamingListenerBatchSubmitted(batchInfoSubmitted)) // onBatchStarted val batchInfoStarted = - BatchInfo(Time(1000), streamIdToInputInfo, 1000, Some(2000), None, Map.empty) + BatchInfo(1000, Time(1000), streamIdToInputInfo, 1000, Some(2000), None, Map.empty) listener.onBatchStarted(StreamingListenerBatchStarted(batchInfoStarted)) // onJobStart @@ -261,7 +257,7 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers { // onBatchCompleted val batchInfoCompleted = - BatchInfo(Time(1000), streamIdToInputInfo, 1000, Some(2000), None, Map.empty) + BatchInfo(1000, Time(1000), streamIdToInputInfo, 1000, Some(2000), None, Map.empty) listener.onBatchCompleted(StreamingListenerBatchCompleted(batchInfoCompleted)) }