@@ -19,26 +19,28 @@ package org.apache.spark.streaming.kafka
1919
2020import java .io .File
2121import java .util .Arrays
22- import java .util .concurrent .ConcurrentLinkedQueue
2322import java .util .concurrent .atomic .AtomicLong
23+ import java .util .concurrent .ConcurrentLinkedQueue
24+
25+ import scala .collection .JavaConverters ._
26+ import scala .concurrent .duration ._
27+ import scala .language .postfixOps
2428
2529import kafka .common .TopicAndPartition
2630import kafka .message .MessageAndMetadata
2731import kafka .serializer .StringDecoder
32+ import org .scalatest .{BeforeAndAfter , BeforeAndAfterAll }
33+ import org .scalatest .concurrent .Eventually
34+
35+ import org .apache .spark .{SparkConf , SparkContext , SparkFunSuite }
2836import org .apache .spark .internal .Logging
2937import org .apache .spark .rdd .RDD
38+ import org .apache .spark .streaming .{Milliseconds , StreamingContext , Time }
3039import org .apache .spark .streaming .dstream .DStream
40+ import org .apache .spark .streaming .kafka .KafkaCluster .LeaderOffset
3141import org .apache .spark .streaming .scheduler ._
3242import org .apache .spark .streaming .scheduler .rate .RateEstimator
33- import org .apache .spark .streaming .{Milliseconds , StreamingContext , Time }
3443import org .apache .spark .util .Utils
35- import org .apache .spark .{SparkConf , SparkContext , SparkFunSuite }
36- import org .scalatest .concurrent .Eventually
37- import org .scalatest .{BeforeAndAfter , BeforeAndAfterAll }
38-
39- import scala .collection .JavaConverters ._
40- import scala .concurrent .duration ._
41- import scala .language .postfixOps
4244
4345class DirectKafkaStreamSuite
4446 extends SparkFunSuite
@@ -115,8 +117,8 @@ class DirectKafkaStreamSuite
115117 logInfo(s " ${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}" )
116118 }
117119 val collected = rdd.mapPartitionsWithIndex { (i, iter) =>
118- // For each partition, get size of the range in the partition,
119- // and the number of items in the partition
120+ // For each partition, get size of the range in the partition,
121+ // and the number of items in the partition
120122 val off = offsetRanges(i)
121123 val all = iter.toSeq
122124 val partSize = all.size
@@ -411,7 +413,7 @@ class DirectKafkaStreamSuite
411413 .fold(e => Map .empty[TopicAndPartition , Long ], m => m.mapValues(lo => lo.offset))
412414
413415 new DirectKafkaInputDStream [String , String , StringDecoder , StringDecoder , (String , String )](
414- ssc, kafkaParams, m, messageHandler) {
416+ ssc, kafkaParams, m, messageHandler) {
415417 override protected [streaming] val rateController =
416418 Some (new DirectKafkaRateController (id, estimator))
417419 }
@@ -436,8 +438,8 @@ class DirectKafkaStreamSuite
436438 Seq (100 , 50 , 20 ).foreach { rate =>
437439 collectedData.clear() // Empty this buffer on each pass.
438440 estimator.updateRate(rate) // Set a new rate.
439- // Expect blocks of data equal to "rate", scaled by the interval length in secs.
440- val expectedSize = Math .round(rate * batchIntervalMilliseconds * 0.001 )
441+ // Expect blocks of data equal to "rate", scaled by the interval length in secs.
442+ val expectedSize = Math .round(rate * batchIntervalMilliseconds * 0.001 )
441443 eventually(timeout(5 .seconds), interval(batchIntervalMilliseconds.milliseconds)) {
442444 // Assert that rate estimator values are used to determine maxMessagesPerPartition.
443445 // Funky "-" in message makes the complete assertion message read better.
@@ -451,7 +453,7 @@ class DirectKafkaStreamSuite
451453
452454 /** Get the generated offset ranges from the DirectKafkaStream */
453455 private def getOffsetRanges [K , V ](
454- kafkaStream : DStream [(K , V )]): Seq [(Time , Array [OffsetRange ])] = {
456+ kafkaStream : DStream [(K , V )]): Seq [(Time , Array [OffsetRange ])] = {
455457 kafkaStream.generatedRDDs.mapValues { rdd =>
456458 rdd.asInstanceOf [KafkaRDD [K , V , _, _, (K , V )]].offsetRanges
457459 }.toSeq.sortBy { _._1 }
@@ -477,172 +479,6 @@ class DirectKafkaStreamSuite
477479 }
478480}
479481
480-
481- object DirectKafkaWordCountLocal {
482-
483- import org .apache .spark .streaming ._
484-
485- def main (args : Array [String ]) {
486-
487- // Create context with 2 second batch interval
488- val sparkConf = new SparkConf ().setMaster(" local[*]" ).setAppName(" DirectKafkaWordCount" )
489- val ssc = new StreamingContext (sparkConf, Seconds (2 ))
490- ssc.checkpoint(" /tmp/checkpoint" )
491- val listener = new LatencyListener (ssc)
492- ssc.addStreamingListener(listener)
493- val lines = ssc.socketTextStream(" localhost" , 9998 )
494-
495- val words = lines.flatMap(_.split(" " ))
496-
497- val pairs = words.map(word => (word, 1 ))
498-
499- val wordCounts = pairs.reduceByKeyAndWindow(_+_, _-_, Seconds (60 ),Seconds (10 ))
500-
501- wordCounts.print()
502-
503- ssc.start()
504- ssc.awaitTermination()
505- }
506- }
507-
508- object DirectKafkaWordCountKafka {
509-
510- import org .apache .spark .streaming ._
511-
512- def main (args : Array [String ]) {
513- val sparkConf = new SparkConf ().setMaster(" local[*]" ).setAppName(" DirectKafkaWordCount" )
514-
515- val ssc = new StreamingContext (sparkConf, Seconds (2 ))
516- // ssc.checkpoint(checkPointPath)
517-
518- val listener = new LatencyListener (ssc)
519- ssc.addStreamingListener(listener)
520- val kafkaBrokers = " localhost"
521- val kafkaPort = " 9092"
522- val topic = " test"
523-
524- val topicsSet = Set (topic)
525-
526- val brokerListString = kafkaBrokers+ " :" + kafkaPort
527-
528- val kafkaParams = Map [String , String ](" metadata.broker.list" -> brokerListString.toString())
529- System .err.println(
530- " Trying to connect to Kafka at " + brokerListString.toString())
531- val messages = KafkaUtils .createDirectStream[String , String , StringDecoder , StringDecoder ](
532- ssc, kafkaParams, topicsSet)
533- ssc.checkpoint(" /tmp/checkPoint/" )
534-
535- val words = messages.map(x => x._2).flatMap(_.split(" " ))
536-
537- val pairs = words.map(word => (word, 1 ))
538-
539- val wordCounts = pairs.reduceByKeyAndWindow(_+_, _-_, Seconds (60 ),Seconds (10 ))
540-
541- wordCounts.print()
542-
543- ssc.start()
544- ssc.awaitTermination()
545- }
546- }
547- import org .apache .spark .streaming .StreamingContext
548- import org .apache .spark .streaming .scheduler ._
549-
550- class StopContextThread (ssc : StreamingContext ) extends Runnable {
551- def run {
552- ssc.stop(true , true )
553- }
554- }
555-
556-
557- class LatencyListener (ssc : StreamingContext ) extends StreamingListener {
558-
559- var metricMap : scala.collection.mutable.Map [String , Object ] = _
560- var startTime = 0L
561- var startTime1 = 0L
562- var endTime = 0L
563- var endTime1 = 0L
564- var totalDelay = 0L
565- var hasStarted = false
566- var batchCount = 0
567- var totalRecords = 0L
568- val thread : Thread = new Thread (new StopContextThread (ssc))
569-
570-
571- def getMap (): scala.collection.mutable.Map [String , Object ] = synchronized {
572- if (metricMap == null ) metricMap = scala.collection.mutable.Map ()
573- metricMap
574- }
575-
576- def setMap (metricMap : scala.collection.mutable.Map [String , Object ]) = synchronized {
577- this .metricMap = metricMap
578- }
579-
580- override def onBatchCompleted (batchCompleted : StreamingListenerBatchCompleted ): Unit = {
581- val batchInfo = batchCompleted.batchInfo
582- println(" job generate delay =" + batchCompleted.batchInfo.batchJobSetCreationDelay)
583-
584- val prevCount = totalRecords
585- var recordThisBatch = batchInfo.numRecords
586- if (! thread.isAlive) {
587- totalRecords += recordThisBatch
588- val imap = getMap
589- imap(batchInfo.batchTime.toString()) = " batchTime," + batchInfo.batchTime +
590- " , batch Count so far," + batchCount +
591- " , total Records so far," + totalRecords +
592- " , record This Batch," + recordThisBatch +
593- " , submission Time," + batchInfo.submissionTime +
594- " , processing Start Time," + batchInfo.processingStartTime.getOrElse(0L ) +
595- " , processing End Time," + batchInfo.processingEndTime.getOrElse(0L ) +
596- " , scheduling Delay," + batchInfo.schedulingDelay.getOrElse(0L ) +
597- " , processing Delay," + batchInfo.processingDelay.getOrElse(0L )
598- setMap(imap)
599- }
600-
601- if (totalRecords >= 10000 ) {
602- if (hasStarted && ! thread.isAlive) {
603- // not receiving any data more, finish
604- endTime = System .currentTimeMillis()
605- endTime1 = batchInfo.processingEndTime.getOrElse(0L )
606- var warning = " "
607- val totalTime = (endTime - startTime).toDouble / 1000
608- // This is weighted avg of every batch process time. The weight is records processed int the batch
609- val recordThroughput = totalRecords / totalTime
610-
611- val imap = getMap
612-
613- imap(" Final Metric" ) = " Total Batch count," + batchCount+
614- " , startTime based on submissionTime," + startTime +
615- " , startTime based on System," + startTime1 +
616- " , endTime based on System," + endTime +
617- " , endTime based on processingEndTime," + endTime1 +
618- " , Total Records," + totalRecords+
619- // ", Total processing delay = " + totalDelay + " ms "+
620- " , Total Consumed time in sec," + totalTime +
621- " , Avg records/sec," + recordThroughput
622-
623- imap.foreach {case (key, value) => println(key + " -->" + value)}
624-
625- thread.start
626- }
627- } else if (! hasStarted) {
628- if (batchInfo.numRecords> 0 ) {
629- startTime = batchCompleted.batchInfo.submissionTime
630- startTime1 = System .currentTimeMillis()
631- hasStarted = true
632- }
633- }
634-
635- if (hasStarted) {
636- // println("This delay:"+batchCompleted.batchInfo.processingDelay+"ms")
637- batchCompleted.batchInfo.processingDelay match {
638- case Some (value) => totalDelay += value * recordThisBatch
639- case None => // Nothing
640- }
641- batchCount = batchCount + 1
642- }
643- }
644-
645- }
646482object DirectKafkaStreamSuite {
647483 val collectedData = new ConcurrentLinkedQueue [String ]()
648484 @ volatile var total = - 1L
@@ -674,14 +510,14 @@ private[streaming] class ConstantEstimator(@volatile private var rate: Long)
674510 }
675511
676512 def compute (
677- time : Long ,
678- elements : Long ,
679- processingDelay : Long ,
680- schedulingDelay : Long ): Option [Double ] = Some (rate)
513+ time : Long ,
514+ elements : Long ,
515+ processingDelay : Long ,
516+ schedulingDelay : Long ): Option [Double ] = Some (rate)
681517}
682518
683519private [streaming] class ConstantRateController (id : Int , estimator : RateEstimator , rate : Long )
684520 extends RateController (id, estimator) {
685521 override def publish (rate : Long ): Unit = ()
686522 override def getLatestRate (): Long = rate
687- }
523+ }
0 commit comments