Skip to content

Commit af6f3ec

Browse files
committed
[SPARK-2808][Streaming][Kafka] delay test until latest leader offset matches expected value
1 parent 9edab4c commit af6f3ec

File tree

2 files changed

+21
-2
lines changed

2 files changed

+21
-2
lines changed

external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import scala.util.control.NonFatal
3030

3131
import kafka.admin.AdminUtils
3232
import kafka.api.Request
33+
import kafka.common.TopicAndPartition
3334
import kafka.producer.{KeyedMessage, Producer, ProducerConfig}
3435
import kafka.serializer.StringEncoder
3536
import kafka.server.{KafkaConfig, KafkaServer}
@@ -228,6 +229,20 @@ private class KafkaTestUtils extends Logging {
228229
tryAgain(1)
229230
}
230231

232+
def waitUntilLeaderOffset(
233+
kc: KafkaCluster,
234+
topic: String,
235+
partition: Int,
236+
offset: Long): Unit = {
237+
eventually(Time(10000), Time(100)) {
238+
val tp = TopicAndPartition(topic, partition)
239+
val llo = kc.getLatestLeaderOffsets(Set(tp)).right.get.apply(tp).offset
240+
assert(
241+
llo == offset,
242+
s"$topic $partition $offset not reached after timeout")
243+
}
244+
}
245+
231246
private def waitUntilMetadataIsPropagated(topic: String, partition: Int): Unit = {
232247
eventually(Time(10000), Time(100)) {
233248
assert(

external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,10 +58,12 @@ class KafkaRDDSuite extends FunSuite with BeforeAndAfterAll {
5858
val messages = Set("the", "quick", "brown", "fox")
5959
kafkaTestUtils.sendMessages(topic, messages.toArray)
6060

61-
6261
val kafkaParams = Map("metadata.broker.list" -> kafkaTestUtils.brokerAddress,
6362
"group.id" -> s"test-consumer-${Random.nextInt}")
6463

64+
val kc = new KafkaCluster(kafkaParams)
65+
kafkaTestUtils.waitUntilLeaderOffset(kc, topic, 0, messages.size)
66+
6567
val offsetRanges = Array(OffsetRange(topic, 0, 0, messages.size))
6668

6769
val rdd = KafkaUtils.createRDD[String, String, StringDecoder, StringDecoder](
@@ -84,14 +86,16 @@ class KafkaRDDSuite extends FunSuite with BeforeAndAfterAll {
8486

8587
// this is the "lots of messages" case
8688
kafkaTestUtils.sendMessages(topic, sent)
89+
val sentCount = sent.values.sum
90+
kafkaTestUtils.waitUntilLeaderOffset(kc, topic, 0, sentCount)
91+
8792
// rdd defined from leaders after sending messages, should get the number sent
8893
val rdd = getRdd(kc, Set(topic))
8994

9095
assert(rdd.isDefined)
9196

9297
val ranges = rdd.get.asInstanceOf[HasOffsetRanges].offsetRanges
9398
val rangeCount = ranges.map(o => o.untilOffset - o.fromOffset).sum
94-
val sentCount = sent.values.sum
9599

96100
assert(rangeCount === sentCount, "offset range didn't include all sent messages")
97101
assert(rdd.get.count === sentCount, "didn't get all sent messages")

0 commit comments

Comments
 (0)