Skip to content
Closed
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,10 @@ final private[streaming] class DStreamGraph extends Serializable with Logging {
logDebug("Generating jobs for time " + time)
val jobs = this.synchronized {
outputStreams.flatMap { outputStream =>
val genStartTime=System.currentTimeMillis()
val jobOption = outputStream.generateJob(time)
jobOption.foreach(_.setCallSite(outputStream.creationSite))
jobOption.foreach(_.setGenDelay(System.currentTimeMillis()-genStartTime))
jobOption
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,4 +241,5 @@ private[streaming] case class JavaOutputOperationInfo(
description: String,
startTime: Long,
endTime: Long,
jobGenTime: Long,
failureReason: String)
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ private[streaming] class JavaStreamingListenerWrapper(javaStreamingListener: Jav
outputOperationInfo.description: String,
outputOperationInfo.startTime.getOrElse(-1),
outputOperationInfo.endTime.getOrElse(-1),
outputOperationInfo.jobGenTime.getOrElse(-1),
outputOperationInfo.failureReason.orNull
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,15 @@ import org.apache.spark.streaming.Time
@DeveloperApi
case class BatchInfo(
batchTime: Time,
jobSetCreationDelay: Option[Long],
streamIdToInputInfo: Map[Int, StreamInputInfo],
submissionTime: Long,
processingStartTime: Option[Long],
processingEndTime: Option[Long],
outputOperationInfos: Map[Int, OutputOperationInfo]
) {

def batchJobSetCreationDelay = jobSetCreationDelay.getOrElse(0L)
/**
* Time taken for the first job of this batch to start processing from the time this batch
* was submitted to the streaming scheduler. Essentially, it is
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ class Job(val time: Time, func: () => _) {
private var _callSite: CallSite = null
private var _startTime: Option[Long] = None
private var _endTime: Option[Long] = None
private var _jobGenTime: Option[Long] = None

def run() {
_result = Try(func())
Expand Down Expand Up @@ -85,6 +86,14 @@ class Job(val time: Time, func: () => _) {
_startTime = Some(startTime)
}

def setGenDelay(jobGenTime: Long): Unit = {
_jobGenTime = Some(jobGenTime)
}

def getGenDelay(): Option[Long] = {
_jobGenTime
}

def setEndTime(endTime: Long): Unit = {
_endTime = Some(endTime)
}
Expand All @@ -96,7 +105,7 @@ class Job(val time: Time, func: () => _) {
None
}
OutputOperationInfo(
time, outputOpId, callSite.shortForm, callSite.longForm, _startTime, _endTime, failureReason)
time, outputOpId, callSite.shortForm, callSite.longForm, _startTime, _endTime, _jobGenTime, failureReason)
}

override def toString: String = id
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
// added but not allocated, are dangling in the queue after recovering, we have to allocate
// those blocks to the next batch, which is the batch they were supposed to go.
jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch
jobScheduler.submitJobSet(JobSet(time, graph.generateJobs(time)))
jobScheduler.submitJobSet(JobSet(time, None, graph.generateJobs(time)))
}

// Restart the timer
Expand All @@ -241,6 +241,7 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {

/** Generate jobs and perform checkpoint for the given `time`. */
private def generateJobs(time: Time) {
val jobSetCreationStartTime=clock.getTimeMillis()
// Checkpoint all RDDs marked for checkpointing to ensure their lineages are
// truncated periodically. Otherwise, we may run into stack overflows (SPARK-6847).
ssc.sparkContext.setLocalProperty(RDD.CHECKPOINT_ALL_MARKED_ANCESTORS, "true")
Expand All @@ -249,8 +250,9 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
graph.generateJobs(time) // generate jobs using allocated block
} match {
case Success(jobs) =>
val jobSetCreationEndTime=clock.getTimeMillis()
val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time)
jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))
jobScheduler.submitJobSet(JobSet(time, Option(jobSetCreationEndTime-jobSetCreationStartTime), jobs, streamIdToInputInfos))
case Failure(e) =>
jobScheduler.reportError("Error generating jobs for time " + time, e)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.apache.spark.streaming.Time
private[streaming]
case class JobSet(
time: Time,
jobSetCreationDelay: Option[Long],
jobs: Seq[Job],
streamIdToInputInfo: Map[Int, StreamInputInfo] = Map.empty) {

Expand Down Expand Up @@ -63,6 +64,7 @@ case class JobSet(
def toBatchInfo: BatchInfo = {
BatchInfo(
time,
jobSetCreationDelay,
streamIdToInputInfo,
submissionTime,
if (hasStarted) Some(processingStartTime) else None,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ case class OutputOperationInfo(
description: String,
startTime: Option[Long],
endTime: Option[Long],
jobGenTime: Option[Long],
failureReason: Option[String]) {

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ private[ui] case class OutputOperationUIData(
description: String,
startTime: Option[Long],
endTime: Option[Long],
jobGenTime: Option[Long],
failureReason: Option[String]) {

def duration: Option[Long] = for (s <- startTime; e <- endTime) yield e - s
Expand All @@ -130,6 +131,7 @@ private[ui] object OutputOperationUIData {
outputOperationInfo.description,
outputOperationInfo.startTime,
outputOperationInfo.endTime,
outputOperationInfo.jobGenTime,
outputOperationInfo.failureReason
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ class JavaStreamingListenerWrapperSuite extends SparkFunSuite {
assertReceiverInfo(listener.receiverError.receiverInfo, receiverError.receiverInfo)

val batchSubmitted = StreamingListenerBatchSubmitted(BatchInfo(
batchTime = Time(1000L),
batchTime = Time(1000L), None,
streamIdToInputInfo = Map(
0 -> StreamInputInfo(
inputStreamId = 0,
Expand All @@ -84,6 +84,7 @@ class JavaStreamingListenerWrapperSuite extends SparkFunSuite {
description = "operation1",
startTime = None,
endTime = None,
jobGenTime = None,
failureReason = None),
1 -> OutputOperationInfo(
batchTime = Time(1000L),
Expand All @@ -92,13 +93,14 @@ class JavaStreamingListenerWrapperSuite extends SparkFunSuite {
description = "operation2",
startTime = None,
endTime = None,
jobGenTime = None,
failureReason = None))
))
listenerWrapper.onBatchSubmitted(batchSubmitted)
assertBatchInfo(listener.batchSubmitted.batchInfo, batchSubmitted.batchInfo)

val batchStarted = StreamingListenerBatchStarted(BatchInfo(
batchTime = Time(1000L),
batchTime = Time(1000L), None,
streamIdToInputInfo = Map(
0 -> StreamInputInfo(
inputStreamId = 0,
Expand All @@ -119,6 +121,7 @@ class JavaStreamingListenerWrapperSuite extends SparkFunSuite {
description = "operation1",
startTime = Some(1003L),
endTime = None,
jobGenTime = None,
failureReason = None),
1 -> OutputOperationInfo(
batchTime = Time(1000L),
Expand All @@ -127,13 +130,14 @@ class JavaStreamingListenerWrapperSuite extends SparkFunSuite {
description = "operation2",
startTime = Some(1005L),
endTime = None,
jobGenTime = None,
failureReason = None))
))
listenerWrapper.onBatchStarted(batchStarted)
assertBatchInfo(listener.batchStarted.batchInfo, batchStarted.batchInfo)

val batchCompleted = StreamingListenerBatchCompleted(BatchInfo(
batchTime = Time(1000L),
batchTime = Time(1000L), None,
streamIdToInputInfo = Map(
0 -> StreamInputInfo(
inputStreamId = 0,
Expand All @@ -154,6 +158,7 @@ class JavaStreamingListenerWrapperSuite extends SparkFunSuite {
description = "operation1",
startTime = Some(1003L),
endTime = Some(1004L),
jobGenTime = None,
failureReason = None),
1 -> OutputOperationInfo(
batchTime = Time(1000L),
Expand All @@ -162,6 +167,7 @@ class JavaStreamingListenerWrapperSuite extends SparkFunSuite {
description = "operation2",
startTime = Some(1005L),
endTime = Some(1010L),
jobGenTime = None,
failureReason = None))
))
listenerWrapper.onBatchCompleted(batchCompleted)
Expand All @@ -174,6 +180,7 @@ class JavaStreamingListenerWrapperSuite extends SparkFunSuite {
description = "operation1",
startTime = Some(1003L),
endTime = None,
jobGenTime = None,
failureReason = None
))
listenerWrapper.onOutputOperationStarted(outputOperationStarted)
Expand All @@ -187,6 +194,7 @@ class JavaStreamingListenerWrapperSuite extends SparkFunSuite {
description = "operation1",
startTime = Some(1003L),
endTime = Some(1004L),
jobGenTime = None,
failureReason = None
))
listenerWrapper.onOutputOperationCompleted(outputOperationCompleted)
Expand Down Expand Up @@ -243,6 +251,7 @@ class JavaStreamingListenerWrapperSuite extends SparkFunSuite {
assert(javaOutputOperationInfo.description === outputOperationInfo.description)
assert(javaOutputOperationInfo.startTime === outputOperationInfo.startTime.getOrElse(-1))
assert(javaOutputOperationInfo.endTime === outputOperationInfo.endTime.getOrElse(-1))
assert(javaOutputOperationInfo.jobGenTime === outputOperationInfo.jobGenTime.getOrElse(-1))
assert(javaOutputOperationInfo.failureReason === outputOperationInfo.failureReason.orNull)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers {
1 -> StreamInputInfo(1, 300L, Map(StreamInputInfo.METADATA_KEY_DESCRIPTION -> "test")))

// onBatchSubmitted
val batchInfoSubmitted = BatchInfo(Time(1000), streamIdToInputInfo, 1000, None, None, Map.empty)
val batchInfoSubmitted = BatchInfo(Time(1000), None, streamIdToInputInfo, 1000, None, None, Map.empty)
listener.onBatchSubmitted(StreamingListenerBatchSubmitted(batchInfoSubmitted))
listener.waitingBatches should be (List(BatchUIData(batchInfoSubmitted)))
listener.runningBatches should be (Nil)
Expand All @@ -76,7 +76,7 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers {

// onBatchStarted
val batchInfoStarted =
BatchInfo(Time(1000), streamIdToInputInfo, 1000, Some(2000), None, Map.empty)
BatchInfo(Time(1000), None, streamIdToInputInfo, 1000, Some(2000), None, Map.empty)
listener.onBatchStarted(StreamingListenerBatchStarted(batchInfoStarted))
listener.waitingBatches should be (Nil)
listener.runningBatches should be (List(BatchUIData(batchInfoStarted)))
Expand Down Expand Up @@ -118,7 +118,7 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers {

// onBatchCompleted
val batchInfoCompleted =
BatchInfo(Time(1000), streamIdToInputInfo, 1000, Some(2000), None, Map.empty)
BatchInfo(Time(1000), None, streamIdToInputInfo, 1000, Some(2000), None, Map.empty)
listener.onBatchCompleted(StreamingListenerBatchCompleted(batchInfoCompleted))
listener.waitingBatches should be (Nil)
listener.runningBatches should be (Nil)
Expand Down Expand Up @@ -159,7 +159,7 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers {
val streamIdToInputInfo = Map(0 -> StreamInputInfo(0, 300L), 1 -> StreamInputInfo(1, 300L))

val batchInfoCompleted =
BatchInfo(Time(1000), streamIdToInputInfo, 1000, Some(2000), None, Map.empty)
BatchInfo(Time(1000), None, streamIdToInputInfo, 1000, Some(2000), None, Map.empty)

for(_ <- 0 until (limit + 10)) {
listener.onBatchCompleted(StreamingListenerBatchCompleted(batchInfoCompleted))
Expand All @@ -177,7 +177,7 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers {
// fulfill completedBatchInfos
for(i <- 0 until limit) {
val batchInfoCompleted = BatchInfo(
Time(1000 + i * 100), Map.empty, 1000 + i * 100, Some(2000 + i * 100), None, Map.empty)
Time(1000 + i * 100), None, Map.empty, 1000 + i * 100, Some(2000 + i * 100), None, Map.empty)
listener.onBatchCompleted(StreamingListenerBatchCompleted(batchInfoCompleted))
val jobStart = createJobStart(Time(1000 + i * 100), outputOpId = 0, jobId = 1)
listener.onJobStart(jobStart)
Expand All @@ -188,7 +188,7 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers {
listener.onJobStart(jobStart)

val batchInfoSubmitted =
BatchInfo(Time(1000 + limit * 100), Map.empty, (1000 + limit * 100), None, None, Map.empty)
BatchInfo(Time(1000 + limit * 100), None, Map.empty, (1000 + limit * 100), None, None, Map.empty)
listener.onBatchSubmitted(StreamingListenerBatchSubmitted(batchInfoSubmitted))

// We still can see the info retrieved from onJobStart
Expand All @@ -205,7 +205,7 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers {
// A lot of "onBatchCompleted"s happen before "onJobStart"
for(i <- limit + 1 to limit * 2) {
val batchInfoCompleted = BatchInfo(
Time(1000 + i * 100), Map.empty, 1000 + i * 100, Some(2000 + i * 100), None, Map.empty)
Time(1000 + i * 100), None, Map.empty, 1000 + i * 100, Some(2000 + i * 100), None, Map.empty)
listener.onBatchCompleted(StreamingListenerBatchCompleted(batchInfoCompleted))
}

Expand All @@ -231,12 +231,12 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers {

// onBatchSubmitted
val batchInfoSubmitted =
BatchInfo(Time(1000), streamIdToInputInfo, 1000, None, None, Map.empty)
BatchInfo(Time(1000), None, streamIdToInputInfo, 1000, None, None, Map.empty)
listener.onBatchSubmitted(StreamingListenerBatchSubmitted(batchInfoSubmitted))

// onBatchStarted
val batchInfoStarted =
BatchInfo(Time(1000), streamIdToInputInfo, 1000, Some(2000), None, Map.empty)
BatchInfo(Time(1000), None, streamIdToInputInfo, 1000, Some(2000), None, Map.empty)
listener.onBatchStarted(StreamingListenerBatchStarted(batchInfoStarted))

// onJobStart
Expand All @@ -254,7 +254,7 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers {

// onBatchCompleted
val batchInfoCompleted =
BatchInfo(Time(1000), streamIdToInputInfo, 1000, Some(2000), None, Map.empty)
BatchInfo(Time(1000), None, streamIdToInputInfo, 1000, Some(2000), None, Map.empty)
listener.onBatchCompleted(StreamingListenerBatchCompleted(batchInfoCompleted))
}

Expand Down