From a6df08d4f6bd1737caf613276d49ec779fa87dcb Mon Sep 17 00:00:00 2001 From: Gary Russell Date: Wed, 5 Jul 2023 12:20:39 -0400 Subject: [PATCH] GH-2722: Only Consider interceptBeforeTx with TX Resolves https://github.com/spring-projects/spring-kafka/issues/2722 Prior to 2.8.x, `interceptBeforeTx` was default `false`; it is now `true`. However, it should only be considered if there is a `TransactionManager` present. This allows an interceptor (when no transactions) to throw an exception, causing error handling to be invoked. Also tested with user's reproducer. **cherry-pick to 2.9.x** --- .../springframework/kafka/listener/BatchInterceptor.java | 4 +++- .../kafka/listener/KafkaMessageListenerContainer.java | 9 ++++----- .../kafka/listener/RecordInterceptor.java | 9 ++++++--- 3 files changed, 13 insertions(+), 9 deletions(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/BatchInterceptor.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/BatchInterceptor.java index da922d16b4..c36e0868e0 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/BatchInterceptor.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/BatchInterceptor.java @@ -1,5 +1,5 @@ /* - * Copyright 2021 the original author or authors. + * Copyright 2021-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -37,6 +37,8 @@ public interface BatchInterceptor extends ThreadStateProcessor { /** * Perform some action on the records or return a different one. If null is returned * the records will be skipped. Invoked before the listener. + * IMPORTANT: If transactions are being used, and this method throws an exception, it + * cannot be used with the container's {@code interceptBeforeTx} property set to true. * @param records the records. * @param consumer the consumer. * @return the records or null. diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java index 6272330601..95f90548d2 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java @@ -714,24 +714,24 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume private final Duration syncCommitTimeout; private final RecordInterceptor recordInterceptor = - !isInterceptBeforeTx() && this.transactionManager != null + !isInterceptBeforeTx() || this.transactionManager == null ? getRecordInterceptor() : null; private final RecordInterceptor earlyRecordInterceptor = - isInterceptBeforeTx() || this.transactionManager == null + isInterceptBeforeTx() && this.transactionManager != null ? getRecordInterceptor() : null; private final RecordInterceptor commonRecordInterceptor = getRecordInterceptor(); private final BatchInterceptor batchInterceptor = - !isInterceptBeforeTx() && this.transactionManager != null + !isInterceptBeforeTx() || this.transactionManager == null ? getBatchInterceptor() : null; private final BatchInterceptor earlyBatchInterceptor = - isInterceptBeforeTx() || this.transactionManager == null + isInterceptBeforeTx() && this.transactionManager != null ? getBatchInterceptor() : null; @@ -2915,7 +2915,6 @@ private void doInvokeOnMessage(final ConsumerRecord recordArg) { if (cRecord == null) { this.logger.debug(() -> "RecordInterceptor returned null, skipping: " + KafkaUtils.format(recordArg)); - ackCurrent(recordArg); } else { try { diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/RecordInterceptor.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/RecordInterceptor.java index 6e9a210543..699aa0a9d1 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/RecordInterceptor.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/RecordInterceptor.java @@ -1,5 +1,5 @@ /* - * Copyright 2019-2022 the original author or authors. + * Copyright 2019-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -38,8 +38,11 @@ public interface RecordInterceptor extends ThreadStateProcessor { /** * Perform some action on the record or return a different one. If null is returned * the record will be skipped. Invoked before the listener. IMPORTANT; if this method - * returns a different record, the topic, partition and offset must not be changed - * to avoid undesirable side-effects. + * returns a different record, the topic, partition and offset must not be changed to + * avoid undesirable side-effects. + *

+ * IMPORTANT: If transactions are being used, and this method throws an exception, it + * cannot be used with the container's {@code interceptBeforeTx} property set to true. * @param record the record. * @param consumer the consumer. * @return the record or null.