Skip to content

Commit c9e7edc

Browse files
authored
GH-3690: Fix observation leak in the KafkaMessageListenerContainer (#3694)
Fixes: #3690 Issue: #3690 When `this.listener` is an instance of `RecordMessagingMessageListenerAdapter`, we rely on its logic to call `invoke()` from super class to handle observation lifecycle this or other way. However, Spring Integration's `KafkaMessageDrivenChannelAdapter` use its own `IntegrationRecordMessageListener` extension of the `RecordMessagingMessageListenerAdapter` without calling super `invoke()`. The problem apparent from Spring Cloud Stream Kafka Binder, where an observation is enabled. * Fix `KafkaMessageListenerContainer` to check for exact type of `this.listener` before making decision to close an observation here, or propagate it down to the `RecordMessagingMessageListenerAdapter`
1 parent be612c5 commit c9e7edc

File tree

1 file changed

+8
-4
lines changed

1 file changed

+8
-4
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -966,8 +966,8 @@ else if (listener instanceof MessageListener) {
966966
this.wasIdlePartition = new HashMap<>();
967967
this.kafkaAdmin = obtainAdmin();
968968

969-
if (this.listener instanceof RecordMessagingMessageListenerAdapter<?, ?> rmmla) {
970-
rmmla.setObservationRegistry(observationRegistry);
969+
if (isListenerAdapterObservationAware()) {
970+
((RecordMessagingMessageListenerAdapter<?, ?>) this.listener).setObservationRegistry(observationRegistry);
971971
}
972972
}
973973

@@ -1228,6 +1228,10 @@ else if (timeout instanceof String str) {
12281228
}
12291229
}
12301230

1231+
private boolean isListenerAdapterObservationAware() {
1232+
return this.listener != null && RecordMessagingMessageListenerAdapter.class.equals(this.listener.getClass());
1233+
}
1234+
12311235
private void subscribeOrAssignTopics(final Consumer<? super K, ? super V> subscribingConsumer) {
12321236
if (KafkaMessageListenerContainer.this.topicPartitions == null) {
12331237
ConsumerRebalanceListener rebalanceListener = new ListenerConsumerRebalanceListener();
@@ -2772,7 +2776,7 @@ private RuntimeException doInvokeRecordListener(final ConsumerRecord<K, V> cReco
27722776
catch (RuntimeException e) {
27732777
failureTimer(sample, cRecord);
27742778
recordInterceptAfter(cRecord, e);
2775-
if (!(this.listener instanceof RecordMessagingMessageListenerAdapter<K, V>)) {
2779+
if (!isListenerAdapterObservationAware()) {
27762780
observation.error(e);
27772781
}
27782782
if (this.commonErrorHandler == null) {
@@ -2800,7 +2804,7 @@ private RuntimeException doInvokeRecordListener(final ConsumerRecord<K, V> cReco
28002804
}
28012805
}
28022806
finally {
2803-
if (!(this.listener instanceof RecordMessagingMessageListenerAdapter<K, V>)) {
2807+
if (!isListenerAdapterObservationAware()) {
28042808
observation.stop();
28052809
}
28062810
observationScope.close();

0 commit comments

Comments
 (0)