diff --git a/spring-kafka-docs/src/main/asciidoc/kafka.adoc b/spring-kafka-docs/src/main/asciidoc/kafka.adoc index 610d253e02..fcc8f1fb04 100644 --- a/spring-kafka-docs/src/main/asciidoc/kafka.adoc +++ b/spring-kafka-docs/src/main/asciidoc/kafka.adoc @@ -2578,6 +2578,10 @@ See `monitorInterval`. |`false` |Set to false to log the complete consumer record (in error, debug logs etc) instead of just `topic-partition@offset`. +|[[pauseImmediate]]<> +|`false` +|When the container is paused, stop processing after the current record instead of after processing all the records from the previous poll; the remaining records are retained in memory and will be passed to the listener when the container is resumed. + |[[pollTimeout]]<> |5000 |The timeout passed into `Consumer.poll()`. @@ -3825,6 +3829,10 @@ However, the consumers might not have actually paused yet. In addition (also since 2.1.5), `ConsumerPausedEvent` and `ConsumerResumedEvent` instances are published with the container as the `source` property and the `TopicPartition` instances involved in the `partitions` property. +Starting with version 2.9, a new container property `pauseImmediate`, when set to true, causes the pause to take effect after the current record is processed. +By default, the pause takes effect when all of the records from the previous poll have been processed. +See <>. + The following simple Spring Boot application demonstrates by using the container registry to get a reference to a `@KafkaListener` method's container and pausing or resuming its consumers as well as receiving the corresponding events: ==== diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/ContainerProperties.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/ContainerProperties.java index c36b110905..3a612f44eb 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/ContainerProperties.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/ContainerProperties.java @@ -280,6 +280,8 @@ public enum EOSMode { private boolean asyncAcks; + private boolean pauseImmediate; + /** * Create properties for a container that will subscribe to the specified topics. * @param topics the topics. @@ -873,6 +875,27 @@ public void setAsyncAcks(boolean asyncAcks) { this.asyncAcks = asyncAcks; } + /** + * When pausing the container with a record listener, whether the pause takes effect + * immediately, when the current record has been processed, or after all records from + * the previous poll have been processed. Default false. + * @return whether to pause immediately. + * @since 2.9 + */ + public boolean isPauseImmediate() { + return this.pauseImmediate; + } + + /** + * Set to true to pause the container after the current record has been processed, rather + * than after all the records from the previous poll have been processed. + * @param pauseImmediate true to pause immediately. + * @since 2.9 + */ + public void setPauseImmediate(boolean pauseImmediate) { + this.pauseImmediate = pauseImmediate; + } + private void adviseListenerIfNeeded() { if (!CollectionUtils.isEmpty(this.adviceChain)) { if (AopUtils.isAopProxy(this.messageListener)) { diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java index 4cc02acd7e..09f74740b7 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java @@ -746,6 +746,8 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume private final Set pausedForNack = new HashSet<>(); + private final boolean pauseImmediate = this.containerProperties.isPauseImmediate(); + private Map definedPartitions; private int count; @@ -782,7 +784,7 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume private boolean receivedSome; - private ConsumerRecords pendingRecordsAfterError; + private ConsumerRecords remainingRecords; private boolean pauseForPending; @@ -1381,7 +1383,7 @@ protected void pollAndInvoke() { debugRecords(records); invokeIfHaveRecords(records); - if (this.pendingRecordsAfterError == null) { + if (this.remainingRecords == null) { resumeConsumerIfNeccessary(); if (!this.consumerPaused) { resumePartitionsIfNecessary(); @@ -1395,9 +1397,9 @@ private void doProcessCommits() { processCommits(); } catch (CommitFailedException cfe) { - if (this.pendingRecordsAfterError != null && !this.isBatchListener) { - ConsumerRecords pending = this.pendingRecordsAfterError; - this.pendingRecordsAfterError = null; + if (this.remainingRecords != null && !this.isBatchListener) { + ConsumerRecords pending = this.remainingRecords; + this.remainingRecords = null; List> records = new ArrayList<>(); Iterator> iterator = pending.iterator(); while (iterator.hasNext()) { @@ -1563,19 +1565,19 @@ private ConsumerRecords doPoll() { } else { records = pollConsumer(); - if (this.pendingRecordsAfterError != null) { + if (this.remainingRecords != null) { int howManyRecords = records.count(); if (howManyRecords > 0) { this.logger.error(() -> String.format("Poll returned %d record(s) while consumer was paused " + "after an error; emergency stop invoked to avoid message loss", howManyRecords)); KafkaMessageListenerContainer.this.emergencyStop.run(); } - TopicPartition firstPart = this.pendingRecordsAfterError.partitions().iterator().next(); + TopicPartition firstPart = this.remainingRecords.partitions().iterator().next(); boolean isPaused = isPaused() || isPartitionPauseRequested(firstPart); this.logger.debug(() -> "First pending after error: " + firstPart + "; paused: " + isPaused); if (!isPaused) { - records = this.pendingRecordsAfterError; - this.pendingRecordsAfterError = null; + records = this.remainingRecords; + this.remainingRecords = null; } } captureOffsets(records); @@ -2225,8 +2227,8 @@ private RuntimeException doInvokeBatchListener(final ConsumerRecords recor private void commitOffsetsIfNeeded(final ConsumerRecords records) { if ((!this.autoCommit && this.commonErrorHandler.isAckAfterHandle()) || this.producer != null) { - if (this.pendingRecordsAfterError != null) { - ConsumerRecord firstUncommitted = this.pendingRecordsAfterError.iterator().next(); + if (this.remainingRecords != null) { + ConsumerRecord firstUncommitted = this.remainingRecords.iterator().next(); Iterator> it = records.iterator(); while (it.hasNext()) { ConsumerRecord next = it.next(); @@ -2392,7 +2394,7 @@ private void invokeBatchErrorHandler(final ConsumerRecords records, records, this.consumer, KafkaMessageListenerContainer.this.thisOrParentContainer, () -> invokeBatchOnMessageWithRecordsOrList(records, list)); if (!afterHandling.isEmpty()) { - this.pendingRecordsAfterError = afterHandling; + this.remainingRecords = afterHandling; this.pauseForPending = true; } } @@ -2444,7 +2446,9 @@ private void invokeRecordListenerInTx(final ConsumerRecords records) { handleNack(records, record); break; } - + if (checkImmediatePause(iterator)) { + break; + } } } @@ -2523,9 +2527,28 @@ private void doInvokeWithRecords(final ConsumerRecords records) { handleNack(records, record); break; } + if (checkImmediatePause(iterator)) { + break; + } } } + private boolean checkImmediatePause(Iterator> iterator) { + if (isPaused() && this.pauseImmediate) { + Map>> remaining = new HashMap<>(); + while (iterator.hasNext()) { + ConsumerRecord next = iterator.next(); + remaining.computeIfAbsent(new TopicPartition(next.topic(), next.partition()), + tp -> new ArrayList>()).add(next); + } + if (remaining.size() > 0) { + this.remainingRecords = new ConsumerRecords<>(remaining); + return true; + } + } + return false; + } + @Nullable private ConsumerRecords checkEarlyIntercept(ConsumerRecords nextArg) { ConsumerRecords next = nextArg; @@ -2669,8 +2692,8 @@ private void commitOffsetsIfNeeded(final ConsumerRecord record) { if (this.isManualAck) { this.commitRecovered = true; } - if (this.pendingRecordsAfterError == null - || !record.equals(this.pendingRecordsAfterError.iterator().next())) { + if (this.remainingRecords == null + || !record.equals(this.remainingRecords.iterator().next())) { ackCurrent(record); } if (this.isManualAck) { @@ -2787,7 +2810,7 @@ private void invokeErrorHandler(final ConsumerRecord record, tp -> new ArrayList>()).add(next); } if (records.size() > 0) { - this.pendingRecordsAfterError = new ConsumerRecords<>(records); + this.remainingRecords = new ConsumerRecords<>(records); this.pauseForPending = true; } } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/ContainerPauseImmediateTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/ContainerPauseImmediateTests.java new file mode 100644 index 0000000000..2953570774 --- /dev/null +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/ContainerPauseImmediateTests.java @@ -0,0 +1,217 @@ +/* + * Copyright 2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.listener; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyMap; +import static org.mockito.BDDMockito.given; +import static org.mockito.BDDMockito.willAnswer; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.mock; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.header.internals.RecordHeaders; +import org.apache.kafka.common.record.TimestampType; +import org.junit.jupiter.api.Test; +import org.mockito.InOrder; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.annotation.EnableKafka; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; +import org.springframework.kafka.config.KafkaListenerEndpointRegistry; +import org.springframework.kafka.core.ConsumerFactory; +import org.springframework.kafka.listener.ContainerProperties.AckMode; +import org.springframework.kafka.test.utils.KafkaTestUtils; +import org.springframework.test.annotation.DirtiesContext; +import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; + +/** + * @author Gary Russell + * @since 2.9 + * + */ +@SpringJUnitConfig +@DirtiesContext +public class ContainerPauseImmediateTests { + + @SuppressWarnings("rawtypes") + @Autowired + private Consumer consumer; + + @Autowired + private Config config; + + @Autowired + private KafkaListenerEndpointRegistry registry; + + @SuppressWarnings("unchecked") + @Test + public void pausesImmediately() throws Exception { + assertThat(this.config.deliveryLatch.await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(this.config.commitLatch.await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(this.config.pollLatch.await(10, TimeUnit.SECONDS)).isTrue(); + InOrder inOrder = inOrder(this.consumer); + inOrder.verify(this.consumer).assign(any(Collection.class)); + inOrder.verify(this.consumer).poll(Duration.ofMillis(ContainerProperties.DEFAULT_POLL_TIMEOUT)); + inOrder.verify(this.consumer).pause(any()); + inOrder.verify(this.consumer).poll(Duration.ofMillis(ContainerProperties.DEFAULT_POLL_TIMEOUT)); + assertThat(this.config.count).isEqualTo(4); + assertThat(this.config.contents).contains("foo", "bar", "baz", "qux"); + this.registry.getListenerContainer("id").resume(); + assertThat(this.config.deliveryLatch2.await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(this.config.contents).contains("foo", "bar", "baz", "qux", "fiz", "buz"); + } + + @Configuration + @EnableKafka + public static class Config { + + final List contents = new ArrayList<>(); + + final CountDownLatch pollLatch = new CountDownLatch(4); + + final CountDownLatch deliveryLatch = new CountDownLatch(4); + + final CountDownLatch deliveryLatch2 = new CountDownLatch(6); + + final CountDownLatch closeLatch = new CountDownLatch(1); + + final CountDownLatch commitLatch = new CountDownLatch(4); + + int count; + + @Autowired + KafkaListenerEndpointRegistry registry; + + @KafkaListener(id = "id", groupId = "grp", + topicPartitions = @org.springframework.kafka.annotation.TopicPartition(topic = "foo", + partitions = "#{'0,1,2'.split(',')}")) + public void foo(String in) { + this.contents.add(in); + this.deliveryLatch.countDown(); + this.deliveryLatch2.countDown(); + if (++this.count == 4) { + registry.getListenerContainer("id").pause(); + } + } + + @SuppressWarnings({ "rawtypes" }) + @Bean + public ConsumerFactory consumerFactory(KafkaListenerEndpointRegistry registry) { + ConsumerFactory consumerFactory = mock(ConsumerFactory.class); + final Consumer consumer = consumer(registry); + given(consumerFactory.createConsumer("grp", "", "-0", KafkaTestUtils.defaultPropertyOverrides())) + .willReturn(consumer); + return consumerFactory; + } + + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Bean + public Consumer consumer(KafkaListenerEndpointRegistry registry) { + final Consumer consumer = mock(Consumer.class); + final TopicPartition topicPartition0 = new TopicPartition("foo", 0); + final TopicPartition topicPartition1 = new TopicPartition("foo", 1); + final TopicPartition topicPartition2 = new TopicPartition("foo", 2); + Map> records1 = new LinkedHashMap<>(); + records1.put(topicPartition0, Arrays.asList( + new ConsumerRecord("foo", 0, 0L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, null, "foo", + new RecordHeaders(), Optional.empty()), + new ConsumerRecord("foo", 0, 1L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, null, "bar", + new RecordHeaders(), Optional.empty()))); + records1.put(topicPartition1, Arrays.asList( + new ConsumerRecord("foo", 1, 0L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, null, "baz", + new RecordHeaders(), Optional.empty()), + new ConsumerRecord("foo", 1, 1L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, null, "qux", + new RecordHeaders(), Optional.empty()))); + records1.put(topicPartition2, Arrays.asList( + new ConsumerRecord("foo", 2, 0L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, null, "fiz", + new RecordHeaders(), Optional.empty()), + new ConsumerRecord("foo", 2, 1L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, null, "buz", + new RecordHeaders(), Optional.empty()))); + final AtomicInteger which = new AtomicInteger(); + willAnswer(i -> { + this.pollLatch.countDown(); + switch (which.getAndIncrement()) { + case 0: + return new ConsumerRecords(records1); + default: + try { + Thread.sleep(50); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + return new ConsumerRecords(Collections.emptyMap()); + } + }).given(consumer).poll(Duration.ofMillis(ContainerProperties.DEFAULT_POLL_TIMEOUT)); + List paused = new ArrayList<>(); + willAnswer(i -> { + this.commitLatch.countDown(); + return null; + }).given(consumer).commitSync(anyMap(), any()); + willAnswer(i -> { + this.closeLatch.countDown(); + return null; + }).given(consumer).close(); + willAnswer(i -> { + paused.addAll(i.getArgument(0)); + return null; + }).given(consumer).pause(any()); + willAnswer(i -> { + return new HashSet<>(paused); + }).given(consumer).paused(); + willAnswer(i -> { + paused.removeAll(i.getArgument(0)); + return null; + }).given(consumer).resume(any()); + return consumer; + } + + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Bean + ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory(KafkaListenerEndpointRegistry registry) { + ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory(); + factory.setConsumerFactory(consumerFactory(registry)); + factory.getContainerProperties().setAckMode(AckMode.RECORD); + factory.getContainerProperties().setPauseImmediate(true); + return factory; + } + + } + +}