From 3afe07783ef4746c6f0a0a0b027c5524c5cf99a3 Mon Sep 17 00:00:00 2001 From: Wzy19930507 <1208931582@qq.com> Date: Tue, 20 Feb 2024 01:22:03 +0800 Subject: [PATCH] Align RecordInterceptor and BatchInterceptor lifecycle. Resolves https://github.com/spring-projects/spring-kafka/issues/2287 * Align lifecycle for earlyRecordInterceptor(intercept + failure/success + afterRecord) and earlyBatchInterceptor(intercept + failure/success). * Fix unit test KafkaMessageListenerContainerTests, see https://github.com/spring-projects/spring-kafka/issues/2722 and https://github.com/spring-projects/spring-kafka/issues/2287. --- .../KafkaMessageListenerContainer.java | 3 +++ .../KafkaMessageListenerContainerTests.java | 22 +++++++++++++++---- 2 files changed, 21 insertions(+), 4 deletions(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java index 44e3fb21db..09b3ab912c 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java @@ -2658,6 +2658,7 @@ private ConsumerRecords checkEarlyIntercept(ConsumerRecords nextArg) catch (InterruptedException e) { Thread.currentThread().interrupt(); } + this.earlyBatchInterceptor.success(nextArg, this.consumer); } } return next; @@ -2673,6 +2674,8 @@ private ConsumerRecord checkEarlyIntercept(ConsumerRecord recordArg) this.logger.debug(() -> "RecordInterceptor returned null, skipping: " + KafkaUtils.format(recordArg)); ackCurrent(recordArg); + this.earlyRecordInterceptor.success(recordArg, this.consumer); + this.earlyRecordInterceptor.afterRecord(recordArg, this.consumer); } } return cRecord; diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java index 7909189e78..953b7d0515 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java @@ -3898,6 +3898,9 @@ public void testInvokeRecordInterceptorAllSkipped(AckMode ackMode, boolean early containerProps.setMessageListener((MessageListener) msg -> { }); containerProps.setClientId("clientId"); + if (early) { + containerProps.setTransactionManager(mock(PlatformTransactionManager.class)); + } RecordInterceptor recordInterceptor = spy(new RecordInterceptor() { @@ -3922,7 +3925,7 @@ public ConsumerRecord intercept(ConsumerRecord inOrder.verify(recordInterceptor).setupThreadState(eq(consumer)); inOrder.verify(consumer).poll(Duration.ofMillis(ContainerProperties.DEFAULT_POLL_TIMEOUT)); inOrder.verify(recordInterceptor).intercept(eq(firstRecord), eq(consumer)); - if (ackMode.equals(AckMode.RECORD)) { + if (AckMode.RECORD.equals(ackMode)) { inOrder.verify(consumer).commitSync(eq(Map.of(new TopicPartition("foo", 0), new OffsetAndMetadata(1L))), any(Duration.class)); } @@ -3930,9 +3933,19 @@ public ConsumerRecord intercept(ConsumerRecord verify(consumer, never()).commitSync(eq(Map.of(new TopicPartition("foo", 0), new OffsetAndMetadata(1L))), any(Duration.class)); } + inOrder.verify(recordInterceptor).success(eq(firstRecord), eq(consumer)); + inOrder.verify(recordInterceptor).afterRecord(eq(firstRecord), eq(consumer)); inOrder.verify(recordInterceptor).intercept(eq(secondRecord), eq(consumer)); - inOrder.verify(consumer).commitSync(eq(Map.of(new TopicPartition("foo", 0), new OffsetAndMetadata(2L))), - any(Duration.class)); + if (AckMode.RECORD.equals(ackMode)) { + inOrder.verify(consumer).commitSync(eq(Map.of(new TopicPartition("foo", 0), new OffsetAndMetadata(2L))), + any(Duration.class)); + } + inOrder.verify(recordInterceptor).success(eq(secondRecord), eq(consumer)); + inOrder.verify(recordInterceptor).afterRecord(eq(secondRecord), eq(consumer)); + if (AckMode.BATCH.equals(ackMode)) { + inOrder.verify(consumer).commitSync(eq(Map.of(new TopicPartition("foo", 0), new OffsetAndMetadata(2L))), + any(Duration.class)); + } container.stop(); } @@ -3968,7 +3981,7 @@ public void testInvokeBatchInterceptorAllSkipped(boolean early) throws Exception containerProps.setMessageListener((BatchMessageListener) msgs -> { }); containerProps.setClientId("clientId"); - if (!early) { + if (early) { containerProps.setTransactionManager(mock(PlatformTransactionManager.class)); } @@ -3995,6 +4008,7 @@ public ConsumerRecords intercept(ConsumerRecords