From 83351b8f8dcb7767700157a17e513149740ab6d1 Mon Sep 17 00:00:00 2001 From: mwws Date: Mon, 23 May 2016 14:54:41 +0800 Subject: [PATCH 1/3] show missed input info in streaming info page --- .../spark/streaming/scheduler/JobScheduler.scala | 12 ++++++++++++ .../apache/spark/streaming/scheduler/JobSet.scala | 2 +- 2 files changed, 13 insertions(+), 1 deletion(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala index ac18f73ea86aa..3684b57475d70 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala @@ -21,6 +21,7 @@ import java.util.Properties import java.util.concurrent.{ConcurrentHashMap, TimeUnit} import scala.collection.JavaConverters._ +import scala.collection.mutable.HashSet import scala.util.Failure import org.apache.commons.lang.SerializationUtils @@ -64,6 +65,8 @@ class JobScheduler(val ssc: StreamingContext) extends Logging { private var eventLoop: EventLoop[JobSchedulerEvent] = null + private val inputInfoMissedTimes = HashSet[Time]() + def start(): Unit = synchronized { if (eventLoop != null) return // scheduler has already been started @@ -139,6 +142,7 @@ class JobScheduler(val ssc: StreamingContext) extends Logging { def submitJobSet(jobSet: JobSet) { if (jobSet.jobs.isEmpty) { logInfo("No jobs added for time " + jobSet.time) + inputInfoMissedTimes.add(jobSet.time) } else { listenerBus.post(StreamingListenerBatchSubmitted(jobSet.toBatchInfo)) jobSets.put(jobSet.time, jobSet) @@ -193,6 +197,14 @@ class JobScheduler(val ssc: StreamingContext) extends Logging { listenerBus.post(StreamingListenerOutputOperationCompleted(job.toOutputOperationInfo)) logInfo("Finished job " + job.id + " from job set of time " + jobSet.time) if (jobSet.hasCompleted) { + // submit fake BatchCompleted event to show missing inputInfo on Streaming UI + inputInfoMissedTimes.foreach (time => { + val streamIdToInputInfos = inputInfoTracker.getInfo(time) + val fakeJobSet = JobSet(time, Seq(), streamIdToInputInfos) + listenerBus.post(StreamingListenerBatchCompleted(fakeJobSet.toBatchInfo)) + }) + inputInfoMissedTimes.clear() + jobSets.remove(jobSet.time) jobGenerator.onBatchCompletion(jobSet.time) logInfo("Total delay: %.3f s for time %s (execution: %.3f s)".format( diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala index 0baedaf275d67..351f3a3fa030e 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala @@ -50,7 +50,7 @@ case class JobSet( def hasStarted: Boolean = processingStartTime > 0 - def hasCompleted: Boolean = incompleteJobs.isEmpty + def hasCompleted: Boolean = incompleteJobs.isEmpty && processingStartTime >= 0 // Time taken to process all the jobs from the time they started processing // (i.e. not including the time they wait in the streaming scheduler queue) From a3d05e08c9889bc7a2d4df110430b2889858943f Mon Sep 17 00:00:00 2001 From: mwws Date: Wed, 1 Jun 2016 15:33:50 +0800 Subject: [PATCH 2/3] revert perivous change and fix bug in other way 1. revert perivous change 2. add batchTimesWithNoJob Set to record the batch with no job 3. add aggrate method of InputInfo --- .../scheduler/InputInfoTracker.scala | 28 +++++++++++++++++++ .../streaming/scheduler/JobGenerator.scala | 15 +++++++++- .../streaming/scheduler/JobScheduler.scala | 12 -------- 3 files changed, 42 insertions(+), 13 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/InputInfoTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/InputInfoTracker.scala index 4f124a1356b5a..85c4043495da0 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/InputInfoTracker.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/InputInfoTracker.scala @@ -39,6 +39,12 @@ case class StreamInputInfo( def metadataDescription: Option[String] = metadata.get(StreamInputInfo.METADATA_KEY_DESCRIPTION).map(_.toString) + + def merge(other: StreamInputInfo): StreamInputInfo = { + require(other.inputStreamId == inputStreamId, + "Can't merge two StreamInputInfo with different id") + StreamInputInfo(inputStreamId, numRecords + other.numRecords, metadata ++ other.metadata) + } } @DeveloperApi @@ -79,6 +85,28 @@ private[streaming] class InputInfoTracker(ssc: StreamingContext) extends Logging inputInfos.map(_.toMap).getOrElse(Map[Int, StreamInputInfo]()) } + /** + * Get the all the input stream's information of all specified batch times and + * merge results together. + */ + def getInfo(batchTimes: Iterable[Time]): Map[Int, StreamInputInfo] = synchronized { + val inputInfosSet = batchTimes.map{ batchTime => + val inputInfos = batchTimeToInputInfos.get(batchTime) + inputInfos.getOrElse(mutable.Map[Int, StreamInputInfo]()) + } + + val aggregatedInputInfos = mutable.Map[Int, StreamInputInfo]() + inputInfosSet.foreach(inputInfos => inputInfos.foreach { case (id, info) => + val currentInfo = aggregatedInputInfos.get(id) + if (currentInfo.isEmpty) { + aggregatedInputInfos(id) = info + } else { + aggregatedInputInfos(id) = currentInfo.get.merge(info) + } + }) + aggregatedInputInfos.toMap + } + /** Cleanup the tracked input information older than threshold batch time */ def cleanup(batchThreshTime: Time): Unit = synchronized { val timesToCleanup = batchTimeToInputInfos.keys.filter(_ < batchThreshTime) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala index 8f9421fc098ba..8c6816071a05b 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala @@ -17,6 +17,7 @@ package org.apache.spark.streaming.scheduler +import scala.collection.mutable import scala.util.{Failure, Success, Try} import org.apache.spark.SparkEnv @@ -77,6 +78,10 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging { // last batch whose completion,checkpointing and metadata cleanup has been completed private var lastProcessedBatch: Time = null + // On some batch time, a JobSet with no jobs will be submit. We record such batch time here in + // order to correct the input info of later jobSet with jobs. + private var batchTimesWithNoJob: mutable.HashSet[Time] = mutable.HashSet[Time]() + /** Start generation of jobs */ def start(): Unit = synchronized { if (eventLoop != null) return // generator has already been started @@ -249,7 +254,15 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging { graph.generateJobs(time) // generate jobs using allocated block } match { case Success(jobs) => - val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time) + val streamIdToInputInfos = if (jobs.isEmpty) { + batchTimesWithNoJob.add(time) + Map.empty[Int, StreamInputInfo] + } else { + batchTimesWithNoJob.add(time) + val inputInfo = jobScheduler.inputInfoTracker.getInfo(batchTimesWithNoJob) + batchTimesWithNoJob.clear() + inputInfo + } jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos)) case Failure(e) => jobScheduler.reportError("Error generating jobs for time " + time, e) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala index 3684b57475d70..ac18f73ea86aa 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala @@ -21,7 +21,6 @@ import java.util.Properties import java.util.concurrent.{ConcurrentHashMap, TimeUnit} import scala.collection.JavaConverters._ -import scala.collection.mutable.HashSet import scala.util.Failure import org.apache.commons.lang.SerializationUtils @@ -65,8 +64,6 @@ class JobScheduler(val ssc: StreamingContext) extends Logging { private var eventLoop: EventLoop[JobSchedulerEvent] = null - private val inputInfoMissedTimes = HashSet[Time]() - def start(): Unit = synchronized { if (eventLoop != null) return // scheduler has already been started @@ -142,7 +139,6 @@ class JobScheduler(val ssc: StreamingContext) extends Logging { def submitJobSet(jobSet: JobSet) { if (jobSet.jobs.isEmpty) { logInfo("No jobs added for time " + jobSet.time) - inputInfoMissedTimes.add(jobSet.time) } else { listenerBus.post(StreamingListenerBatchSubmitted(jobSet.toBatchInfo)) jobSets.put(jobSet.time, jobSet) @@ -197,14 +193,6 @@ class JobScheduler(val ssc: StreamingContext) extends Logging { listenerBus.post(StreamingListenerOutputOperationCompleted(job.toOutputOperationInfo)) logInfo("Finished job " + job.id + " from job set of time " + jobSet.time) if (jobSet.hasCompleted) { - // submit fake BatchCompleted event to show missing inputInfo on Streaming UI - inputInfoMissedTimes.foreach (time => { - val streamIdToInputInfos = inputInfoTracker.getInfo(time) - val fakeJobSet = JobSet(time, Seq(), streamIdToInputInfos) - listenerBus.post(StreamingListenerBatchCompleted(fakeJobSet.toBatchInfo)) - }) - inputInfoMissedTimes.clear() - jobSets.remove(jobSet.time) jobGenerator.onBatchCompletion(jobSet.time) logInfo("Total delay: %.3f s for time %s (execution: %.3f s)".format( From ffd178779c09b31e6e09e92b1b611d23fe90df4a Mon Sep 17 00:00:00 2001 From: mwws Date: Tue, 7 Jun 2016 10:15:42 +0800 Subject: [PATCH 3/3] add unit test --- .../scheduler/InputInfoTrackerSuite.scala | 30 +++++++++++++++++++ 1 file changed, 30 insertions(+) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/InputInfoTrackerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/InputInfoTrackerSuite.scala index a7e365649d3e8..e6cee5cfa121e 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/InputInfoTrackerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/InputInfoTrackerSuite.scala @@ -76,4 +76,34 @@ class InputInfoTrackerSuite extends SparkFunSuite with BeforeAndAfter { assert(inputInfoTracker.getInfo(Time(0)).get(streamId1) === None) assert(inputInfoTracker.getInfo(Time(1))(streamId1) === inputInfo2) } + + test("merge two InputInfos") { + val inputInfo1_1 = StreamInputInfo(1, 100L, Map("ID" -> 1)) + val inputInfo1_2 = StreamInputInfo(1, 200L, Map("ID" -> 1)) + val inputInfo2 = StreamInputInfo(2, 200L, Map("ID" -> 2)) + + val mergedInfo = inputInfo1_1.merge(inputInfo1_2) + assert(mergedInfo.inputStreamId == 1) + assert(mergedInfo.numRecords == 300L) + assert(mergedInfo.metadata == Map("ID" -> 1)) + + intercept[IllegalArgumentException]{ + inputInfo1_1.merge(inputInfo2) + } + } + + test("test get InputInfo of all specified times") { + val inputInfoTracker = new InputInfoTracker(ssc) + + val streamId1 = 0 + val inputInfo1 = StreamInputInfo(streamId1, 100L) + val inputInfo2 = StreamInputInfo(streamId1, 300L) + inputInfoTracker.reportInfo(Time(0), inputInfo1) + inputInfoTracker.reportInfo(Time(1), inputInfo2) + + val times = Seq(Time(0), Time(1)) + val mergedInfo = inputInfoTracker.getInfo(times)(streamId1) + assert(mergedInfo.inputStreamId == 0) + assert(mergedInfo.numRecords == 400L) + } }