diff --git a/spring-kafka-test/src/main/java/org/springframework/kafka/test/utils/KafkaTestUtils.java b/spring-kafka-test/src/main/java/org/springframework/kafka/test/utils/KafkaTestUtils.java index 7e29a6710e..cfe22848fb 100644 --- a/spring-kafka-test/src/main/java/org/springframework/kafka/test/utils/KafkaTestUtils.java +++ b/spring-kafka-test/src/main/java/org/springframework/kafka/test/utils/KafkaTestUtils.java @@ -18,6 +18,8 @@ import java.time.Duration; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.Iterator; @@ -36,6 +38,7 @@ import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.IntegerDeserializer; import org.apache.kafka.common.serialization.IntegerSerializer; @@ -239,6 +242,39 @@ public static OffsetAndMetadata getCurrentOffset(String brokerAddresses, String } } + /** + * Return the end offsets of the requested topic/partitions + * @param the key type. + * @param the value type. + * @param consumer the consumer. + * @param topic the topic. + * @param partitions the partitions, or null for all partitions. + * @return the map of end offsets. + * @since 2.6.5 + * @see Consumer#endOffsets(Collection, Duration) + */ + public static Map getEndOffsets(Consumer consumer, String topic, + Integer... partitions) { + + Collection tps; + if (partitions == null || partitions.length == 0) { + Map> parts = consumer.listTopics(Duration.ofSeconds(10)); + tps = parts.entrySet() + .stream() + .filter(entry -> entry.getKey().equals(topic)) + .flatMap(entry -> entry.getValue().stream()) + .map(pi -> new TopicPartition(topic, pi.partition())) + .collect(Collectors.toList()); + } + else { + Assert.noNullElements(partitions, "'partitions' cannot have null elements"); + tps = Arrays.stream(partitions) + .map(part -> new TopicPartition(topic, part)) + .collect(Collectors.toList()); + } + return consumer.endOffsets(tps, Duration.ofSeconds(10)); + } + /** * Poll the consumer for records. * @param consumer the consumer. diff --git a/spring-kafka-test/src/test/java/org/springframework/kafka/test/utils/KafkaTestUtilsTests.java b/spring-kafka-test/src/test/java/org/springframework/kafka/test/utils/KafkaTestUtilsTests.java index 42f870a6b1..adc9c6bfff 100644 --- a/spring-kafka-test/src/test/java/org/springframework/kafka/test/utils/KafkaTestUtilsTests.java +++ b/spring-kafka-test/src/test/java/org/springframework/kafka/test/utils/KafkaTestUtilsTests.java @@ -26,6 +26,7 @@ import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.TopicPartition; import org.junit.jupiter.api.Test; import org.springframework.kafka.test.EmbeddedKafkaBroker; @@ -44,14 +45,18 @@ public class KafkaTestUtilsTests { void testGetSingleWithMoreThatOneTopic(EmbeddedKafkaBroker broker) { Map producerProps = KafkaTestUtils.producerProps(broker); KafkaProducer producer = new KafkaProducer<>(producerProps); - producer.send(new ProducerRecord<>("singleTopic1", 1, "foo")); - producer.send(new ProducerRecord<>("singleTopic2", 1, "foo")); + producer.send(new ProducerRecord<>("singleTopic1", 0, 1, "foo")); + producer.send(new ProducerRecord<>("singleTopic2", 0, 1, "foo")); producer.close(); Map consumerProps = KafkaTestUtils.consumerProps("ktuTests1", "false", broker); KafkaConsumer consumer = new KafkaConsumer<>(consumerProps); broker.consumeFromAllEmbeddedTopics(consumer); KafkaTestUtils.getSingleRecord(consumer, "singleTopic1"); KafkaTestUtils.getSingleRecord(consumer, "singleTopic2"); + Map endOffsets = KafkaTestUtils.getEndOffsets(consumer, "singleTopic1"); + assertThat(endOffsets).hasSize(2); + assertThat(endOffsets.get(new TopicPartition("singleTopic1", 0))).isEqualTo(1L); + assertThat(endOffsets.get(new TopicPartition("singleTopic1", 1))).isEqualTo(0L); consumer.close(); } @@ -59,7 +64,7 @@ void testGetSingleWithMoreThatOneTopic(EmbeddedKafkaBroker broker) { void testGetSingleWithMoreThatOneTopicRecordNotThereYet(EmbeddedKafkaBroker broker) { Map producerProps = KafkaTestUtils.producerProps(broker); KafkaProducer producer = new KafkaProducer<>(producerProps); - producer.send(new ProducerRecord<>("singleTopic4", 1, "foo")); + producer.send(new ProducerRecord<>("singleTopic4", 0, 1, "foo")); Map consumerProps = KafkaTestUtils.consumerProps("ktuTests2", "false", broker); KafkaConsumer consumer = new KafkaConsumer<>(consumerProps); broker.consumeFromEmbeddedTopics(consumer, "singleTopic4", "singleTopic5"); @@ -71,6 +76,10 @@ void testGetSingleWithMoreThatOneTopicRecordNotThereYet(EmbeddedKafkaBroker brok producer.close(); KafkaTestUtils.getSingleRecord(consumer, "singleTopic4"); KafkaTestUtils.getSingleRecord(consumer, "singleTopic5"); + Map endOffsets = KafkaTestUtils.getEndOffsets(consumer, "singleTopic4", 0, 1); + assertThat(endOffsets).hasSize(2); + assertThat(endOffsets.get(new TopicPartition("singleTopic4", 0))).isEqualTo(1L); + assertThat(endOffsets.get(new TopicPartition("singleTopic4", 1))).isEqualTo(0L); consumer.close(); } diff --git a/src/reference/asciidoc/testing.adoc b/src/reference/asciidoc/testing.adoc index 083ec74379..9f430887ee 100644 --- a/src/reference/asciidoc/testing.adoc +++ b/src/reference/asciidoc/testing.adoc @@ -3,10 +3,16 @@ The `spring-kafka-test` jar contains some useful utilities to assist with testing your applications. +[[ktu]] +==== KafkaTestUtils + +`o.s.kafka.test.utils.KafkaTestUtils` provides a number of static helper methods to consume records, retrieve various record offsets, and others. +Refer to its https://docs.spring.io/spring-kafka/docs/current/api/org/springframework/kafka/test/utils/KafkaTestUtils.html[Javadocs] for complete details. + [[junit]] ==== JUnit -`o.s.kafka.test.utils.KafkaTestUtils` provides some static methods to set up producer and consumer properties. +`o.s.kafka.test.utils.KafkaTestUtils` also provides some static methods to set up producer and consumer properties. The following listing shows those method signatures: ====