diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/FailedRecordTracker.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/FailedRecordTracker.java index ebd5bcda38..48001e82ab 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/FailedRecordTracker.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/FailedRecordTracker.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2022 the original author or authors. + * Copyright 2018-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -200,8 +200,9 @@ private FailedRecord getFailedRecordInstance(ConsumerRecord record, Except Map map, TopicPartition topicPartition) { Exception realException = exception; - if (realException instanceof ListenerExecutionFailedException - && realException.getCause() instanceof Exception) { + while ((realException instanceof ListenerExecutionFailedException + || realException instanceof TimestampedException) + && realException.getCause() instanceof Exception) { realException = (Exception) realException.getCause(); } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/FailedRecordTrackerTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/FailedRecordTrackerTests.java index ec477b2f71..1e49f60310 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/FailedRecordTrackerTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/FailedRecordTrackerTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2019-2022 the original author or authors. + * Copyright 2019-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -24,7 +24,9 @@ import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.TopicPartition; import org.junit.jupiter.api.Test; @@ -186,4 +188,32 @@ void exceptionChanges(boolean reset) { } } + @Test + void exceptionChangesWithTimestampedException() throws InterruptedException { + FixedBackOff bo1 = new FixedBackOff(0L, 5L); + FailedRecordTracker tracker = new FailedRecordTracker((rec, ex) -> { }, bo1, mock(LogAccessor.class)); + AtomicReference captured = new AtomicReference<>(); + tracker.setBackOffFunction((record, ex) -> { + captured.set(ex); + if (ex instanceof IllegalStateException) { + return bo1; + } + else { + return new FixedBackOff(0L, 0L); + } + }); + IllegalStateException ise = new IllegalStateException(); + Exception ex = new ListenerExecutionFailedException("", new TimestampedException( + new ListenerExecutionFailedException("", ise))); + ConsumerRecord record = mock(ConsumerRecord.class); + Consumer consumer = mock(Consumer.class); + assertThat(tracker.recovered(record, ex, mock(MessageListenerContainer.class), consumer)).isFalse(); + assertThat(captured.get()).isSameAs(ise); + IllegalArgumentException iae = new IllegalArgumentException(); + ex = new ListenerExecutionFailedException("", new TimestampedException( + new ListenerExecutionFailedException("", iae))); + assertThat(tracker.recovered(record, ex, mock(MessageListenerContainer.class), consumer)).isTrue(); + assertThat(captured.get()).isSameAs(iae); + } + }