@@ -20,20 +20,21 @@ package org.apache.spark.streaming.kafka
2020import java .io .File
2121
2222import scala .collection .mutable
23+ import scala .collection .mutable .ArrayBuffer
2324import scala .concurrent .duration ._
2425import scala .language .postfixOps
2526
27+ import kafka .common .TopicAndPartition
28+ import kafka .message .MessageAndMetadata
2629import kafka .serializer .StringDecoder
2730import org .scalatest .{BeforeAndAfter , BeforeAndAfterAll }
28- import org .scalatest .concurrent .{ Eventually , Timeouts }
31+ import org .scalatest .concurrent .Eventually
2932
30- import org .apache .spark .{SparkContext , SparkConf }
33+ import org .apache .spark .{SparkConf , SparkContext }
3134import org .apache .spark .rdd .RDD
3235import org .apache .spark .streaming .{Milliseconds , StreamingContext , Time }
33- import org .apache .spark .streaming .dstream .{ DStream , InputDStream }
36+ import org .apache .spark .streaming .dstream .DStream
3437import org .apache .spark .util .Utils
35- import kafka .common .TopicAndPartition
36- import kafka .message .MessageAndMetadata
3738
3839class DirectKafkaStreamSuite extends KafkaStreamSuiteBase
3940 with BeforeAndAfter with BeforeAndAfterAll with Eventually {
@@ -67,13 +68,14 @@ class DirectKafkaStreamSuite extends KafkaStreamSuiteBase
6768 }
6869
6970
70- ignore (" basic stream receiving with multiple topics and smallest starting offset" ) {
71+ test (" basic stream receiving with multiple topics and smallest starting offset" ) {
7172 val topics = Set (" basic1" , " basic2" , " basic3" )
7273 val data = Map (" a" -> 7 , " b" -> 9 )
7374 topics.foreach { t =>
7475 createTopic(t)
7576 sendMessages(t, data)
7677 }
78+ val totalSent = data.values.sum * topics.size
7779 val kafkaParams = Map (
7880 " metadata.broker.list" -> s " $brokerAddress" ,
7981 " auto.offset.reset" -> " smallest"
@@ -84,7 +86,8 @@ class DirectKafkaStreamSuite extends KafkaStreamSuiteBase
8486 KafkaUtils .createDirectStream[String , String , StringDecoder , StringDecoder ](
8587 ssc, kafkaParams, topics)
8688 }
87- var total = 0L
89+
90+ val allReceived = new ArrayBuffer [(String , String )]
8891
8992 stream.foreachRDD { rdd =>
9093 // Get the offset ranges in the RDD
@@ -104,16 +107,17 @@ class DirectKafkaStreamSuite extends KafkaStreamSuiteBase
104107 collected.foreach { case (partSize, rangeSize) =>
105108 assert(partSize === rangeSize, " offset ranges are wrong" )
106109 }
107- total += collected.size // Add up all the collected items
108110 }
111+ stream.foreachRDD { rdd => allReceived ++= rdd.collect() }
109112 ssc.start()
110113 eventually(timeout(20000 .milliseconds), interval(200 .milliseconds)) {
111- assert(total === data.values.sum * topics.size, " didn't get all messages" )
114+ assert(allReceived.size === totalSent,
115+ " didn't get expected number of messages, messages:\n " + allReceived.mkString(" \n " ))
112116 }
113117 ssc.stop()
114118 }
115119
116- ignore (" receiving from largest starting offset" ) {
120+ test (" receiving from largest starting offset" ) {
117121 val topic = " largest"
118122 val topicPartition = TopicAndPartition (topic, 0 )
119123 val data = Map (" a" -> 10 )
@@ -158,7 +162,7 @@ class DirectKafkaStreamSuite extends KafkaStreamSuiteBase
158162 }
159163
160164
161- ignore (" creating stream by offset" ) {
165+ test (" creating stream by offset" ) {
162166 val topic = " offset"
163167 val topicPartition = TopicAndPartition (topic, 0 )
164168 val data = Map (" a" -> 10 )
@@ -204,7 +208,7 @@ class DirectKafkaStreamSuite extends KafkaStreamSuiteBase
204208 }
205209
206210 // Test to verify the offset ranges can be recovered from the checkpoints
207- ignore (" offset recovery" ) {
211+ test (" offset recovery" ) {
208212 val topic = " recovery"
209213 createTopic(topic)
210214 testDir = Utils .createTempDir()
0 commit comments