diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/BatchInterceptor.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/BatchInterceptor.java index da922d16b4..c36e0868e0 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/BatchInterceptor.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/BatchInterceptor.java @@ -1,5 +1,5 @@ /* - * Copyright 2021 the original author or authors. + * Copyright 2021-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -37,6 +37,8 @@ public interface BatchInterceptor extends ThreadStateProcessor { /** * Perform some action on the records or return a different one. If null is returned * the records will be skipped. Invoked before the listener. + * IMPORTANT: If transactions are being used, and this method throws an exception, it + * cannot be used with the container's {@code interceptBeforeTx} property set to true. * @param records the records. * @param consumer the consumer. * @return the records or null. diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java index 6272330601..95f90548d2 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java @@ -714,24 +714,24 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume private final Duration syncCommitTimeout; private final RecordInterceptor recordInterceptor = - !isInterceptBeforeTx() && this.transactionManager != null + !isInterceptBeforeTx() || this.transactionManager == null ? getRecordInterceptor() : null; private final RecordInterceptor earlyRecordInterceptor = - isInterceptBeforeTx() || this.transactionManager == null + isInterceptBeforeTx() && this.transactionManager != null ? getRecordInterceptor() : null; private final RecordInterceptor commonRecordInterceptor = getRecordInterceptor(); private final BatchInterceptor batchInterceptor = - !isInterceptBeforeTx() && this.transactionManager != null + !isInterceptBeforeTx() || this.transactionManager == null ? getBatchInterceptor() : null; private final BatchInterceptor earlyBatchInterceptor = - isInterceptBeforeTx() || this.transactionManager == null + isInterceptBeforeTx() && this.transactionManager != null ? getBatchInterceptor() : null; @@ -2915,7 +2915,6 @@ private void doInvokeOnMessage(final ConsumerRecord recordArg) { if (cRecord == null) { this.logger.debug(() -> "RecordInterceptor returned null, skipping: " + KafkaUtils.format(recordArg)); - ackCurrent(recordArg); } else { try { diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/RecordInterceptor.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/RecordInterceptor.java index 6e9a210543..699aa0a9d1 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/RecordInterceptor.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/RecordInterceptor.java @@ -1,5 +1,5 @@ /* - * Copyright 2019-2022 the original author or authors. + * Copyright 2019-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -38,8 +38,11 @@ public interface RecordInterceptor extends ThreadStateProcessor { /** * Perform some action on the record or return a different one. If null is returned * the record will be skipped. Invoked before the listener. IMPORTANT; if this method - * returns a different record, the topic, partition and offset must not be changed - * to avoid undesirable side-effects. + * returns a different record, the topic, partition and offset must not be changed to + * avoid undesirable side-effects. + *

+ * IMPORTANT: If transactions are being used, and this method throws an exception, it + * cannot be used with the container's {@code interceptBeforeTx} property set to true. * @param record the record. * @param consumer the consumer. * @return the record or null.