Skip to content

Commit f079ed3

Browse files
authored
GH-2722: Only Consider interceptBeforeTx with TX
Resolves #2722 Prior to 2.8.x, `interceptBeforeTx` was default `false`; it is now `true`. However, it should only be considered if there is a `TransactionManager` present. This allows an interceptor (when no transactions) to throw an exception, causing error handling to be invoked. Also tested with user's reproducer. **cherry-pick to 2.9.x**
1 parent 2a55f3f commit f079ed3

File tree

3 files changed

+13
-9
lines changed

3 files changed

+13
-9
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/BatchInterceptor.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2021 the original author or authors.
2+
* Copyright 2021-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -37,6 +37,8 @@ public interface BatchInterceptor<K, V> extends ThreadStateProcessor {
3737
/**
3838
* Perform some action on the records or return a different one. If null is returned
3939
* the records will be skipped. Invoked before the listener.
40+
* IMPORTANT: If transactions are being used, and this method throws an exception, it
41+
* cannot be used with the container's {@code interceptBeforeTx} property set to true.
4042
* @param records the records.
4143
* @param consumer the consumer.
4244
* @return the records or null.

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -714,24 +714,24 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
714714
private final Duration syncCommitTimeout;
715715

716716
private final RecordInterceptor<K, V> recordInterceptor =
717-
!isInterceptBeforeTx() && this.transactionManager != null
717+
!isInterceptBeforeTx() || this.transactionManager == null
718718
? getRecordInterceptor()
719719
: null;
720720

721721
private final RecordInterceptor<K, V> earlyRecordInterceptor =
722-
isInterceptBeforeTx() || this.transactionManager == null
722+
isInterceptBeforeTx() && this.transactionManager != null
723723
? getRecordInterceptor()
724724
: null;
725725

726726
private final RecordInterceptor<K, V> commonRecordInterceptor = getRecordInterceptor();
727727

728728
private final BatchInterceptor<K, V> batchInterceptor =
729-
!isInterceptBeforeTx() && this.transactionManager != null
729+
!isInterceptBeforeTx() || this.transactionManager == null
730730
? getBatchInterceptor()
731731
: null;
732732

733733
private final BatchInterceptor<K, V> earlyBatchInterceptor =
734-
isInterceptBeforeTx() || this.transactionManager == null
734+
isInterceptBeforeTx() && this.transactionManager != null
735735
? getBatchInterceptor()
736736
: null;
737737

@@ -2915,7 +2915,6 @@ private void doInvokeOnMessage(final ConsumerRecord<K, V> recordArg) {
29152915
if (cRecord == null) {
29162916
this.logger.debug(() -> "RecordInterceptor returned null, skipping: "
29172917
+ KafkaUtils.format(recordArg));
2918-
ackCurrent(recordArg);
29192918
}
29202919
else {
29212920
try {

spring-kafka/src/main/java/org/springframework/kafka/listener/RecordInterceptor.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2019-2022 the original author or authors.
2+
* Copyright 2019-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -38,8 +38,11 @@ public interface RecordInterceptor<K, V> extends ThreadStateProcessor {
3838
/**
3939
* Perform some action on the record or return a different one. If null is returned
4040
* the record will be skipped. Invoked before the listener. IMPORTANT; if this method
41-
* returns a different record, the topic, partition and offset must not be changed
42-
* to avoid undesirable side-effects.
41+
* returns a different record, the topic, partition and offset must not be changed to
42+
* avoid undesirable side-effects.
43+
* <p>
44+
* IMPORTANT: If transactions are being used, and this method throws an exception, it
45+
* cannot be used with the container's {@code interceptBeforeTx} property set to true.
4346
* @param record the record.
4447
* @param consumer the consumer.
4548
* @return the record or null.

0 commit comments

Comments
 (0)