Skip to content

Commit 420cb22

Browse files
committed
made changes to capture job level jobCreateDelay metric
1 parent 79c2c63 commit 420cb22

File tree

8 files changed

+230
-12
lines changed

8 files changed

+230
-12
lines changed

external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala

Lines changed: 204 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -19,28 +19,26 @@ package org.apache.spark.streaming.kafka
1919

2020
import java.io.File
2121
import java.util.Arrays
22-
import java.util.concurrent.atomic.AtomicLong
2322
import java.util.concurrent.ConcurrentLinkedQueue
24-
25-
import scala.collection.JavaConverters._
26-
import scala.concurrent.duration._
27-
import scala.language.postfixOps
23+
import java.util.concurrent.atomic.AtomicLong
2824

2925
import kafka.common.TopicAndPartition
3026
import kafka.message.MessageAndMetadata
3127
import kafka.serializer.StringDecoder
32-
import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll}
33-
import org.scalatest.concurrent.Eventually
34-
35-
import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite}
3628
import org.apache.spark.internal.Logging
3729
import org.apache.spark.rdd.RDD
38-
import org.apache.spark.streaming.{Milliseconds, StreamingContext, Time}
3930
import org.apache.spark.streaming.dstream.DStream
40-
import org.apache.spark.streaming.kafka.KafkaCluster.LeaderOffset
4131
import org.apache.spark.streaming.scheduler._
4232
import org.apache.spark.streaming.scheduler.rate.RateEstimator
33+
import org.apache.spark.streaming.{Seconds, Milliseconds, StreamingContext, Time}
4334
import org.apache.spark.util.Utils
35+
import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite}
36+
import org.scalatest.concurrent.Eventually
37+
import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll}
38+
39+
import scala.collection.JavaConverters._
40+
import scala.concurrent.duration._
41+
import scala.language.postfixOps
4442

4543
class DirectKafkaStreamSuite
4644
extends SparkFunSuite
@@ -479,6 +477,201 @@ class DirectKafkaStreamSuite
479477
}
480478
}
481479

480+
object DirectKafkaWordCountLocal {
481+
482+
import org.apache.spark.streaming._
483+
484+
case class Tick(symbol: String, price: Int, ts: Long)
485+
486+
def main(args: Array[String]) {
487+
488+
// Create context with 2 second batch interval
489+
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("DirectKafkaWordCount")
490+
val ssc = new StreamingContext(sparkConf, Seconds(2))
491+
ssc.checkpoint("/tmp/checkpoint")
492+
val listener = new LatencyListener(ssc)
493+
ssc.addStreamingListener(listener)
494+
val lines = ssc.socketTextStream("localhost", 8888)
495+
496+
val words = lines.flatMap(_.split(" "))
497+
498+
val pairs = words.map(word => (word, 1))
499+
500+
val wordCounts = pairs.reduceByKeyAndWindow(_+_, _-_, Seconds(60),Seconds(10))
501+
502+
503+
val wordCountNew = wordCounts.filter(_._1.startsWith("sac")).reduceByKeyAndWindow(_+_, _-_, Seconds(60),Seconds(10))
504+
wordCountNew.print()
505+
506+
ssc.start()
507+
ssc.awaitTermination()
508+
}
509+
}
510+
object DirectKafkaWordCountKafka {
511+
512+
import org.apache.spark.streaming._
513+
514+
def main(args: Array[String]) {
515+
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("DirectKafkaWordCount")
516+
517+
val ssc = new StreamingContext(sparkConf, Seconds(2))
518+
//ssc.checkpoint(checkPointPath)
519+
520+
val listener = new LatencyListener(ssc)
521+
ssc.addStreamingListener(listener)
522+
val kafkaBrokers = "localhost"
523+
val kafkaPort ="9092"
524+
val topic="test"
525+
526+
val topicsSet = Set(topic)
527+
528+
val brokerListString = new StringBuilder();
529+
530+
brokerListString.append(kafkaBrokers).append(":").append(kafkaPort)
531+
532+
533+
val kafkaParams = Map[String, String]("metadata.broker.list" -> brokerListString.toString())
534+
System.err.println(
535+
"Trying to connect to Kafka at " + brokerListString.toString())
536+
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
537+
ssc, kafkaParams, topicsSet)
538+
ssc.checkpoint("/tmp/checkPoint/")
539+
// Create context with 2 second batch interval
540+
541+
//val lines = ssc.socketTextStream("localhost", 9998)
542+
543+
val words = messages.map(x => x._2).flatMap(_.split(" "))
544+
545+
val pairs = words.map(word => (word, 1))
546+
pairs.print()
547+
548+
val wordCounts = pairs.reduceByKeyAndWindow(_+_, _-_, Seconds(60),Seconds(10))
549+
550+
wordCounts.print()
551+
552+
ssc.start()
553+
ssc.awaitTermination()
554+
}
555+
}
556+
557+
558+
559+
class StopContextThread(ssc: StreamingContext) extends Runnable {
560+
def run {
561+
ssc.stop(true, true)
562+
}
563+
}
564+
565+
566+
class LatencyListener(ssc: StreamingContext) extends StreamingListener {
567+
568+
var metricMap: scala.collection.mutable.Map[String, Object] = _
569+
var startTime = 0L
570+
var startTime1 = 0L
571+
var endTime = 0L
572+
var endTime1 = 0L
573+
var totalDelay = 0L
574+
var hasStarted = false
575+
var batchCount = 0
576+
var totalRecords = 0L
577+
val thread: Thread = new Thread(new StopContextThread(ssc))
578+
579+
580+
def getMap(): scala.collection.mutable.Map[String, Object] = synchronized {
581+
if (metricMap == null) metricMap = scala.collection.mutable.Map()
582+
metricMap
583+
}
584+
585+
def setMap(metricMap: scala.collection.mutable.Map[String, Object]) = synchronized {
586+
this.metricMap = metricMap
587+
}
588+
589+
/** Called when processing of a job of a batch has started. */
590+
override def onOutputOperationStarted(outputOperationStarted: StreamingListenerOutputOperationStarted): Unit = {
591+
println("job creation delay repoted in onOutputOperationStarted ==>"+outputOperationStarted.outputOperationInfo.batchTime+"==>"+outputOperationStarted.outputOperationInfo.id+"==>"+outputOperationStarted.outputOperationInfo.jobGenTime)
592+
}
593+
594+
val batchSize = Seconds(2).toString
595+
val recordLimitPerThread = 1000
596+
val loaderThreads = 10
597+
598+
val recordLimit = loaderThreads * recordLimitPerThread
599+
600+
override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit = {
601+
val batchInfo = batchCompleted.batchInfo
602+
val prevCount = totalRecords
603+
var recordThisBatch = batchInfo.numRecords
604+
605+
println("job creation delay repoted in onBatchCompleted ==>" +batchInfo.batchTime+"==>"+batchInfo.batchJobSetCreationDelay )
606+
607+
if (!thread.isAlive) {
608+
totalRecords += recordThisBatch
609+
// val imap = getMap
610+
// imap(batchInfo.batchTime.toString()) = "batchTime," + batchInfo.batchTime +
611+
// ", batch Count so far," + batchCount +
612+
// ", total Records so far," + totalRecords +
613+
// ", record This Batch," + recordThisBatch +
614+
// ", submission Time," + batchInfo.submissionTime +
615+
// ", processing Start Time," + batchInfo.processingStartTime.getOrElse(0L) +
616+
// ", processing End Time," + batchInfo.processingEndTime.getOrElse(0L) +
617+
// ", scheduling Delay," + batchInfo.schedulingDelay.getOrElse(0L) +
618+
// ", processing Delay," + batchInfo.processingDelay.getOrElse(0L)
619+
//
620+
// setMap(imap)
621+
}
622+
623+
if (totalRecords >= recordLimit) {
624+
if (hasStarted && !thread.isAlive) {
625+
//not receiving any data more, finish
626+
endTime = System.currentTimeMillis()
627+
endTime1 = batchInfo.processingEndTime.getOrElse(0L)
628+
var warning = ""
629+
val totalTime = (endTime - startTime).toDouble / 1000
630+
//This is weighted avg of every batch process time. The weight is records processed int the batch
631+
val avgLatency = totalDelay.toDouble / totalRecords
632+
if (avgLatency > batchSize.toDouble)
633+
warning = "WARNING:SPARK CLUSTER IN UNSTABLE STATE. TRY REDUCE INPUT SPEED"
634+
635+
val avgLatencyAdjust = avgLatency + batchSize.toDouble
636+
val recordThroughput = totalRecords / totalTime
637+
638+
val imap = getMap
639+
640+
imap("Final Metric") = " Total Batch count," + batchCount +
641+
", startTime based on submissionTime," + startTime +
642+
", startTime based on System," + startTime1 +
643+
", endTime based on System," + endTime +
644+
", endTime based on processingEndTime," + endTime1 +
645+
", Total Records," + totalRecords +
646+
// ", Total processing delay = " + totalDelay + " ms "+
647+
", Total Consumed time in sec," + totalTime +
648+
", Avg latency/batchInterval in ms," + avgLatencyAdjust +
649+
", Avg records/sec," + recordThroughput +
650+
", WARNING," + warning
651+
652+
imap.foreach { case (key, value) => println(key + "-->" + value) }
653+
654+
thread.start
655+
}
656+
} else if (!hasStarted) {
657+
if (batchInfo.numRecords > 0) {
658+
startTime = batchCompleted.batchInfo.submissionTime
659+
startTime1 = System.currentTimeMillis()
660+
hasStarted = true
661+
}
662+
}
663+
664+
if (hasStarted) {
665+
// println("This delay:"+batchCompleted.batchInfo.processingDelay+"ms")
666+
batchCompleted.batchInfo.processingDelay match {
667+
case Some(value) => totalDelay += value * recordThisBatch
668+
case None => //Nothing
669+
}
670+
batchCount = batchCount + 1
671+
}
672+
}
673+
}
674+
482675
object DirectKafkaStreamSuite {
483676
val collectedData = new ConcurrentLinkedQueue[String]()
484677
@volatile var total = -1L

streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,8 +114,10 @@ final private[streaming] class DStreamGraph extends Serializable with Logging {
114114
logDebug("Generating jobs for time " + time)
115115
val jobs = this.synchronized {
116116
outputStreams.flatMap { outputStream =>
117+
val genStartTime=System.currentTimeMillis()
117118
val jobOption = outputStream.generateJob(time)
118119
jobOption.foreach(_.setCallSite(outputStream.creationSite))
120+
jobOption.foreach(_.setGenDelay(System.currentTimeMillis()-genStartTime))
119121
jobOption
120122
}
121123
}

streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListener.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -241,4 +241,5 @@ private[streaming] case class JavaOutputOperationInfo(
241241
description: String,
242242
startTime: Long,
243243
endTime: Long,
244+
jobGenTime: Long,
244245
failureReason: String)

streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListenerWrapper.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ private[streaming] class JavaStreamingListenerWrapper(javaStreamingListener: Jav
5858
outputOperationInfo.description: String,
5959
outputOperationInfo.startTime.getOrElse(-1),
6060
outputOperationInfo.endTime.getOrElse(-1),
61+
outputOperationInfo.jobGenTime.getOrElse(-1),
6162
outputOperationInfo.failureReason.orNull
6263
)
6364
}

streaming/src/main/scala/org/apache/spark/streaming/scheduler/Job.scala

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ class Job(val time: Time, func: () => _) {
3434
private var _callSite: CallSite = null
3535
private var _startTime: Option[Long] = None
3636
private var _endTime: Option[Long] = None
37+
private var _jobGenTime: Option[Long] = None
3738

3839
def run() {
3940
_result = Try(func())
@@ -85,6 +86,14 @@ class Job(val time: Time, func: () => _) {
8586
_startTime = Some(startTime)
8687
}
8788

89+
def setGenDelay(jobGenTime: Long): Unit = {
90+
_jobGenTime = Some(jobGenTime)
91+
}
92+
93+
def getGenDelay(): Option[Long] = {
94+
_jobGenTime
95+
}
96+
8897
def setEndTime(endTime: Long): Unit = {
8998
_endTime = Some(endTime)
9099
}
@@ -96,7 +105,7 @@ class Job(val time: Time, func: () => _) {
96105
None
97106
}
98107
OutputOperationInfo(
99-
time, outputOpId, callSite.shortForm, callSite.longForm, _startTime, _endTime, failureReason)
108+
time, outputOpId, callSite.shortForm, callSite.longForm, _startTime, _endTime, _jobGenTime, failureReason)
100109
}
101110

102111
override def toString: String = id

streaming/src/main/scala/org/apache/spark/streaming/scheduler/OutputOperationInfo.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ case class OutputOperationInfo(
3939
description: String,
4040
startTime: Option[Long],
4141
endTime: Option[Long],
42+
jobGenTime: Option[Long],
4243
failureReason: Option[String]) {
4344

4445
/**

streaming/src/main/scala/org/apache/spark/streaming/ui/BatchUIData.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,7 @@ private[ui] case class OutputOperationUIData(
116116
description: String,
117117
startTime: Option[Long],
118118
endTime: Option[Long],
119+
jobGenTime: Option[Long],
119120
failureReason: Option[String]) {
120121

121122
def duration: Option[Long] = for (s <- startTime; e <- endTime) yield e - s
@@ -130,6 +131,7 @@ private[ui] object OutputOperationUIData {
130131
outputOperationInfo.description,
131132
outputOperationInfo.startTime,
132133
outputOperationInfo.endTime,
134+
outputOperationInfo.jobGenTime,
133135
outputOperationInfo.failureReason
134136
)
135137
}

streaming/src/test/java/org/apache/spark/streaming/api/java/JavaStreamingListenerWrapperSuite.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@ class JavaStreamingListenerWrapperSuite extends SparkFunSuite {
8484
description = "operation1",
8585
startTime = None,
8686
endTime = None,
87+
jobGenTime = None,
8788
failureReason = None),
8889
1 -> OutputOperationInfo(
8990
batchTime = Time(1000L),
@@ -92,6 +93,7 @@ class JavaStreamingListenerWrapperSuite extends SparkFunSuite {
9293
description = "operation2",
9394
startTime = None,
9495
endTime = None,
96+
jobGenTime = None,
9597
failureReason = None))
9698
))
9799
listenerWrapper.onBatchSubmitted(batchSubmitted)
@@ -119,6 +121,7 @@ class JavaStreamingListenerWrapperSuite extends SparkFunSuite {
119121
description = "operation1",
120122
startTime = Some(1003L),
121123
endTime = None,
124+
jobGenTime = None,
122125
failureReason = None),
123126
1 -> OutputOperationInfo(
124127
batchTime = Time(1000L),
@@ -127,6 +130,7 @@ class JavaStreamingListenerWrapperSuite extends SparkFunSuite {
127130
description = "operation2",
128131
startTime = Some(1005L),
129132
endTime = None,
133+
jobGenTime = None,
130134
failureReason = None))
131135
))
132136
listenerWrapper.onBatchStarted(batchStarted)
@@ -154,6 +158,7 @@ class JavaStreamingListenerWrapperSuite extends SparkFunSuite {
154158
description = "operation1",
155159
startTime = Some(1003L),
156160
endTime = Some(1004L),
161+
jobGenTime = None,
157162
failureReason = None),
158163
1 -> OutputOperationInfo(
159164
batchTime = Time(1000L),
@@ -162,6 +167,7 @@ class JavaStreamingListenerWrapperSuite extends SparkFunSuite {
162167
description = "operation2",
163168
startTime = Some(1005L),
164169
endTime = Some(1010L),
170+
jobGenTime = None,
165171
failureReason = None))
166172
))
167173
listenerWrapper.onBatchCompleted(batchCompleted)
@@ -174,6 +180,7 @@ class JavaStreamingListenerWrapperSuite extends SparkFunSuite {
174180
description = "operation1",
175181
startTime = Some(1003L),
176182
endTime = None,
183+
jobGenTime = None,
177184
failureReason = None
178185
))
179186
listenerWrapper.onOutputOperationStarted(outputOperationStarted)
@@ -187,6 +194,7 @@ class JavaStreamingListenerWrapperSuite extends SparkFunSuite {
187194
description = "operation1",
188195
startTime = Some(1003L),
189196
endTime = Some(1004L),
197+
jobGenTime = None,
190198
failureReason = None
191199
))
192200
listenerWrapper.onOutputOperationCompleted(outputOperationCompleted)
@@ -243,6 +251,7 @@ class JavaStreamingListenerWrapperSuite extends SparkFunSuite {
243251
assert(javaOutputOperationInfo.description === outputOperationInfo.description)
244252
assert(javaOutputOperationInfo.startTime === outputOperationInfo.startTime.getOrElse(-1))
245253
assert(javaOutputOperationInfo.endTime === outputOperationInfo.endTime.getOrElse(-1))
254+
assert(javaOutputOperationInfo.jobGenTime === outputOperationInfo.jobGenTime.getOrElse(-1))
246255
assert(javaOutputOperationInfo.failureReason === outputOperationInfo.failureReason.orNull)
247256
}
248257
}

0 commit comments

Comments
 (0)