Skip to content

Commit cac63ee

Browse files
committed
additional testing, fix fencepost error
1 parent 37d3053 commit cac63ee

File tree

2 files changed

+16
-6
lines changed

2 files changed

+16
-6
lines changed

external/kafka/src/main/scala/org/apache/spark/rdd/kafka/KafkaRDD.scala

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -132,12 +132,14 @@ class KafkaRDD[
132132
null.asInstanceOf[R]
133133
} else {
134134
val item = iter.next
135-
if (item.offset > part.untilOffset) {
135+
if (item.offset >= part.untilOffset) {
136136
finished = true
137+
null.asInstanceOf[R]
138+
} else {
139+
requestOffset = item.nextOffset
140+
messageHandler(new MessageAndMetadata(
141+
part.topic, part.partition, item.message, item.offset, keyDecoder, valueDecoder))
137142
}
138-
requestOffset = item.nextOffset
139-
messageHandler(new MessageAndMetadata(
140-
part.topic, part.partition, item.message, item.offset, keyDecoder, valueDecoder))
141143
}
142144
}
143145
}

external/kafka/src/test/scala/org/apache/spark/rdd/kafka/KafkaRDDSuite.scala

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,13 +55,21 @@ class KafkaRDDSuite extends KafkaStreamSuiteBase with BeforeAndAfter {
5555

5656
val rdd = getRdd(kc, Set(topic))
5757
assert(rdd.isDefined)
58-
assert(rdd.get.countByValue.size === sent.size)
58+
assert(rdd.get.count === sent.values.sum)
5959

6060
kc.setConsumerOffsets(kafkaParams("group.id"), rdd.get.untilOffsets)
6161

6262
val rdd2 = getRdd(kc, Set(topic))
63+
val sent2 = Map("d" -> 1)
64+
produceAndSendMessage(topic, sent2)
6365
assert(rdd2.isDefined)
6466
assert(rdd2.get.count === 0)
67+
68+
val rdd3 = getRdd(kc, Set(topic))
69+
produceAndSendMessage(topic, Map("extra" -> 22))
70+
assert(rdd3.isDefined)
71+
assert(rdd3.get.count === sent2.values.sum)
72+
6573
}
6674

6775
private def getRdd(kc: KafkaCluster, topics: Set[String]) = {
@@ -73,7 +81,7 @@ class KafkaRDDSuite extends KafkaStreamSuiteBase with BeforeAndAfter {
7381
until <- kc.getLatestLeaderOffsets(topicPartitions).right.toOption
7482
} yield {
7583
new KafkaRDD[String, String, StringDecoder, StringDecoder, String](
76-
sc, kc.kafkaParams, from, until, mmd => mmd.message)
84+
sc, kc.kafkaParams, from, until, mmd => s"${mmd.offset} ${mmd.message}")
7785
}
7886
}
7987
}

0 commit comments

Comments
 (0)