@@ -53,14 +53,14 @@ class KafkaRDDSuite extends FunSuite with BeforeAndAfterAll {
5353 }
5454
5555 test(" basic usage" ) {
56- val topic = " topicbasic"
56+ val topic = s " topicbasic- ${ Random .nextInt} "
5757 kafkaTestUtils.createTopic(topic)
5858 val messages = Set (" the" , " quick" , " brown" , " fox" )
5959 kafkaTestUtils.sendMessages(topic, messages.toArray)
6060
6161
6262 val kafkaParams = Map (" metadata.broker.list" -> kafkaTestUtils.brokerAddress,
63- " group.id" -> s " test-consumer- ${Random .nextInt( 10000 ) }" )
63+ " group.id" -> s " test-consumer- ${Random .nextInt}" )
6464
6565 val offsetRanges = Array (OffsetRange (topic, 0 , 0 , messages.size))
6666
@@ -73,12 +73,12 @@ class KafkaRDDSuite extends FunSuite with BeforeAndAfterAll {
7373
7474 test(" iterator boundary conditions" ) {
7575 // the idea is to find e.g. off-by-one errors between what kafka has available and the rdd
76- val topic = " topic1 "
76+ val topic = s " topicboundary- ${ Random .nextInt} "
7777 val sent = Map (" a" -> 5 , " b" -> 3 , " c" -> 10 )
7878 kafkaTestUtils.createTopic(topic)
7979
8080 val kafkaParams = Map (" metadata.broker.list" -> kafkaTestUtils.brokerAddress,
81- " group.id" -> s " test-consumer- ${Random .nextInt( 10000 ) }" )
81+ " group.id" -> s " test-consumer- ${Random .nextInt}" )
8282
8383 val kc = new KafkaCluster (kafkaParams)
8484
@@ -88,12 +88,17 @@ class KafkaRDDSuite extends FunSuite with BeforeAndAfterAll {
8888 val rdd = getRdd(kc, Set (topic))
8989
9090 assert(rdd.isDefined)
91- assert(rdd.get.count === sent.values.sum, " didn't get all sent messages" )
9291
93- val ranges = rdd.get.asInstanceOf [HasOffsetRanges ]
94- .offsetRanges.map(o => TopicAndPartition (o.topic, o.partition) -> o.untilOffset).toMap
92+ val ranges = rdd.get.asInstanceOf [HasOffsetRanges ].offsetRanges
93+ val rangeCount = ranges.map(o => o.untilOffset - o.fromOffset).sum
94+ val sentCount = sent.values.sum
9595
96- kc.setConsumerOffsets(kafkaParams(" group.id" ), ranges).fold(
96+ assert(rangeCount === sentCount, " offset range didn't include all sent messages" )
97+ assert(rdd.get.count === sentCount, " didn't get all sent messages" )
98+
99+ val rangesMap = ranges.map(o => TopicAndPartition (o.topic, o.partition) -> o.untilOffset).toMap
100+
101+ kc.setConsumerOffsets(kafkaParams(" group.id" ), rangesMap).fold(
97102 err => throw new Exception (err.mkString(" \n " )),
98103 _ => ()
99104 )
0 commit comments