diff --git a/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/dsl/AmqpBaseInboundChannelAdapterSpec.java b/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/dsl/AmqpBaseInboundChannelAdapterSpec.java index 274d7892b19..ad93fc01d6c 100644 --- a/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/dsl/AmqpBaseInboundChannelAdapterSpec.java +++ b/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/dsl/AmqpBaseInboundChannelAdapterSpec.java @@ -18,12 +18,12 @@ import org.springframework.amqp.rabbit.retry.MessageRecoverer; import org.springframework.amqp.support.converter.MessageConverter; +import org.springframework.core.retry.RetryTemplate; import org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter; import org.springframework.integration.amqp.support.AmqpHeaderMapper; import org.springframework.integration.amqp.support.DefaultAmqpHeaderMapper; +import org.springframework.integration.core.RecoveryCallback; import org.springframework.integration.dsl.MessageProducerSpec; -import org.springframework.retry.RecoveryCallback; -import org.springframework.retry.support.RetryTemplate; /** * The base {@link MessageProducerSpec} implementation for a {@link AmqpInboundChannelAdapter}. diff --git a/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/dsl/AmqpBaseInboundGatewaySpec.java b/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/dsl/AmqpBaseInboundGatewaySpec.java index 739f917feeb..e50f4b76893 100644 --- a/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/dsl/AmqpBaseInboundGatewaySpec.java +++ b/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/dsl/AmqpBaseInboundGatewaySpec.java @@ -19,12 +19,12 @@ import org.springframework.amqp.rabbit.batch.BatchingStrategy; import org.springframework.amqp.rabbit.retry.MessageRecoverer; import org.springframework.amqp.support.converter.MessageConverter; +import org.springframework.core.retry.RetryTemplate; import org.springframework.integration.amqp.inbound.AmqpInboundGateway; import org.springframework.integration.amqp.support.AmqpHeaderMapper; import org.springframework.integration.amqp.support.DefaultAmqpHeaderMapper; +import org.springframework.integration.core.RecoveryCallback; import org.springframework.integration.dsl.MessagingGatewaySpec; -import org.springframework.retry.RecoveryCallback; -import org.springframework.retry.support.RetryTemplate; /** * A base {@link MessagingGatewaySpec} implementation for {@link AmqpInboundGateway} endpoint options. diff --git a/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/inbound/AmqpInboundChannelAdapter.java b/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/inbound/AmqpInboundChannelAdapter.java index 62d76c0a5c6..ecc4a0cf453 100644 --- a/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/inbound/AmqpInboundChannelAdapter.java +++ b/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/inbound/AmqpInboundChannelAdapter.java @@ -20,7 +20,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Objects; import java.util.concurrent.atomic.AtomicInteger; import com.rabbitmq.client.Channel; @@ -36,11 +35,15 @@ import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener; import org.springframework.amqp.rabbit.retry.MessageBatchRecoverer; import org.springframework.amqp.rabbit.retry.MessageRecoverer; +import org.springframework.amqp.rabbit.support.ListenerExecutionFailedException; import org.springframework.amqp.support.AmqpHeaders; import org.springframework.amqp.support.converter.MessageConversionException; import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.amqp.support.converter.SimpleMessageConverter; import org.springframework.core.AttributeAccessor; +import org.springframework.core.retry.RetryException; +import org.springframework.core.retry.RetryOperations; +import org.springframework.core.retry.RetryTemplate; import org.springframework.integration.IntegrationMessageHeaderAccessor; import org.springframework.integration.StaticMessageHeaderAccessor; import org.springframework.integration.amqp.support.AmqpHeaderMapper; @@ -48,18 +51,15 @@ import org.springframework.integration.amqp.support.DefaultAmqpHeaderMapper; import org.springframework.integration.amqp.support.EndpointUtils; import org.springframework.integration.context.OrderlyShutdownCapable; +import org.springframework.integration.core.RecoveryCallback; import org.springframework.integration.endpoint.MessageProducerSupport; import org.springframework.integration.support.ErrorMessageUtils; import org.springframework.messaging.MessageChannel; -import org.springframework.retry.RecoveryCallback; -import org.springframework.retry.RetryOperations; -import org.springframework.retry.support.RetrySynchronizationManager; -import org.springframework.retry.support.RetryTemplate; import org.springframework.util.Assert; /** * Adapter that receives Messages from an AMQP Queue, converts them into - * Spring Integration Messages, and sends the results to a Message Channel. + * Spring Integration messages and sends the results to a Message Channel. * * @author Mark Fisher * @author Gary Russell @@ -274,24 +274,24 @@ private void setupRecoveryCallbackIfAny() { "The 'messageRecoverer' must be an instance of MessageBatchRecoverer " + "when consumer configured for batch mode"); this.recoveryCallback = - context -> { + (context, cause) -> { @SuppressWarnings("unchecked") List messagesToRecover = (List) context.getAttribute(AmqpMessageHeaderErrorMessageStrategy.AMQP_RAW_MESSAGE); if (messagesToRecover != null) { ((MessageBatchRecoverer) messageRecovererToUse) - .recover(messagesToRecover, context.getLastThrowable()); + .recover(messagesToRecover, cause); } return null; }; } else { this.recoveryCallback = - context -> { + (context, cause) -> { Message messageToRecover = (Message) context.getAttribute(AmqpMessageHeaderErrorMessageStrategy.AMQP_RAW_MESSAGE); if (messageToRecover != null) { - messageRecovererToUse.recover(messageToRecover, context.getLastThrowable()); + messageRecovererToUse.recover(messageToRecover, cause); } return null; }; @@ -321,9 +321,9 @@ public int afterShutdown() { } /** - * If there's a retry template, it will set the attributes holder via the listener. If - * there's no retry template, but there's an error channel, we create a new attributes - * holder here. If an attributes holder exists (by either method), we set the + * If there's a retry template, it will set the attribute holder via the listener. If + * there's no retry template, but there's an error channel, we create a new attribute + * holder here. If an attribute holder exists (by either method), we set the * attributes for use by the * {@link org.springframework.integration.support.ErrorMessageStrategy}. * @param amqpMessage the AMQP message to use. @@ -333,19 +333,15 @@ public int afterShutdown() { private void setAttributesIfNecessary(Object amqpMessage, org.springframework.messaging.@Nullable Message message) { - boolean needHolder = getErrorChannel() != null && this.retryTemplate == null; - boolean needAttributes = needHolder || this.retryTemplate != null; + boolean needHolder = getErrorChannel() != null || this.retryTemplate != null; if (needHolder) { - ATTRIBUTES_HOLDER.set(ErrorMessageUtils.getAttributeAccessor(null, null)); - } - if (needAttributes) { - AttributeAccessor attributes = this.retryTemplate != null - ? RetrySynchronizationManager.getContext() - : ATTRIBUTES_HOLDER.get(); - if (attributes != null) { - attributes.setAttribute(ErrorMessageUtils.INPUT_MESSAGE_CONTEXT_KEY, message); - attributes.setAttribute(AmqpMessageHeaderErrorMessageStrategy.AMQP_RAW_MESSAGE, amqpMessage); + AttributeAccessor attributes = ATTRIBUTES_HOLDER.get(); + if (attributes == null) { + attributes = ErrorMessageUtils.getAttributeAccessor(null, null); + ATTRIBUTES_HOLDER.set(attributes); } + attributes.setAttribute(ErrorMessageUtils.INPUT_MESSAGE_CONTEXT_KEY, message); + attributes.setAttribute(AmqpMessageHeaderErrorMessageStrategy.AMQP_RAW_MESSAGE, amqpMessage); } } @@ -385,16 +381,27 @@ public void onMessage(final Message message, @Nullable Channel channel) { else { final org.springframework.messaging.Message toSend = createMessageFromAmqp(message, channel); - this.retryOps.execute( - context -> { - AtomicInteger deliveryAttempt = StaticMessageHeaderAccessor.getDeliveryAttempt(toSend); - if (deliveryAttempt != null) { - deliveryAttempt.incrementAndGet(); - } - setAttributesIfNecessary(message, toSend); - sendMessage(toSend); - return null; - }, this.recoverer); + try { + this.retryOps.execute( + () -> { + AtomicInteger deliveryAttempt = StaticMessageHeaderAccessor.getDeliveryAttempt(toSend); + if (deliveryAttempt != null) { + deliveryAttempt.incrementAndGet(); + } + setAttributesIfNecessary(message, toSend); + sendMessage(toSend); + return null; + }); + } + catch (RetryException ex) { + if (this.recoverer != null) { + this.recoverer.recover(getErrorMessageAttributes(toSend), ex.getCause()); + } + else { + throw new ListenerExecutionFailedException( + "Failed handling message after '" + ex.getRetryCount() + "' retries", ex, message); + } + } } } catch (MessageConversionException e) { @@ -404,7 +411,6 @@ public void onMessage(final Message message, @Nullable Channel channel) { getMessagingTemplate() .send(errorChannel, buildErrorMessage(null, - EndpointUtils.errorMessagePayload(message, channel, this.manualAcks, e))); } else { @@ -412,9 +418,7 @@ public void onMessage(final Message message, @Nullable Channel channel) { } } finally { - if (this.retryOps == null) { - ATTRIBUTES_HOLDER.remove(); - } + ATTRIBUTES_HOLDER.remove(); } } @@ -453,7 +457,7 @@ protected Object convertPayload(Message message) { protected org.springframework.messaging.Message createMessageFromPayload(Object payload, @Nullable Channel channel, Map headers, long deliveryTag, - @Nullable List> listHeaders) { + @Nullable List> listHeaders) { if (this.manualAcks) { headers.put(AmqpHeaders.DELIVERY_TAG, deliveryTag); @@ -480,14 +484,14 @@ protected class BatchListener extends Listener implements ChannelAwareBatchMessa @Override public void onMessageBatch(List messages, @Nullable Channel channel) { List converted; - List> headers = null; + List> headers = null; if (this.batchModeMessages) { converted = convertMessages(messages, channel); } else { converted = convertPayloads(messages, channel); if (BatchMode.EXTRACT_PAYLOADS_WITH_HEADERS.equals(AmqpInboundChannelAdapter.this.batchMode)) { - List> listHeaders = new ArrayList<>(); + List> listHeaders = new ArrayList<>(); messages.forEach(msg -> listHeaders.add(AmqpInboundChannelAdapter.this.headerMapper .toHeadersFromRequest(msg.getMessageProperties()))); headers = listHeaders; @@ -503,31 +507,45 @@ public void onMessageBatch(List messages, @Nullable Channel channel) { sendMessage(message); } else { - this.retryOps.execute( - context -> { - AtomicInteger deliveryAttempt = StaticMessageHeaderAccessor.getDeliveryAttempt(message); - if (deliveryAttempt != null) { - deliveryAttempt.incrementAndGet(); - } - if (this.batchModeMessages) { - @SuppressWarnings("unchecked") - List> payloads = - (List>) message.getPayload(); - payloads.forEach(payload -> - Objects.requireNonNull( - StaticMessageHeaderAccessor.getDeliveryAttempt(payload)) - .incrementAndGet()); - } - setAttributesIfNecessary(messages, message); - sendMessage(message); - return null; - }, this.recoverer); + try { + this.retryOps.execute( + () -> { + AtomicInteger deliveryAttempt = + StaticMessageHeaderAccessor.getDeliveryAttempt(message); + if (deliveryAttempt != null) { + deliveryAttempt.incrementAndGet(); + } + if (this.batchModeMessages) { + @SuppressWarnings("unchecked") + List> payloads = + (List>) message.getPayload(); + payloads.forEach(payload -> { + AtomicInteger batchItemDeliveryAttempt = + StaticMessageHeaderAccessor.getDeliveryAttempt(payload); + if (batchItemDeliveryAttempt != null) { + batchItemDeliveryAttempt.incrementAndGet(); + } + }); + } + setAttributesIfNecessary(messages, message); + sendMessage(message); + return null; + }); + } + catch (RetryException ex) { + if (this.recoverer != null) { + this.recoverer.recover(getErrorMessageAttributes(null), ex.getCause()); + } + else { + throw new ListenerExecutionFailedException( + "Failed handling messages after '" + ex.getRetryCount() + "' retries", ex, + messages.toArray(new Message[0])); + } + } } } finally { - if (this.retryOps == null) { - ATTRIBUTES_HOLDER.remove(); - } + ATTRIBUTES_HOLDER.remove(); } } } diff --git a/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/inbound/AmqpInboundGateway.java b/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/inbound/AmqpInboundGateway.java index e90c953e60c..094ea03dfb6 100644 --- a/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/inbound/AmqpInboundGateway.java +++ b/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/inbound/AmqpInboundGateway.java @@ -35,10 +35,13 @@ import org.springframework.amqp.rabbit.listener.MessageListenerContainer; import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener; import org.springframework.amqp.rabbit.retry.MessageRecoverer; +import org.springframework.amqp.rabbit.support.ListenerExecutionFailedException; import org.springframework.amqp.support.AmqpHeaders; import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.amqp.support.converter.SimpleMessageConverter; import org.springframework.core.AttributeAccessor; +import org.springframework.core.retry.RetryException; +import org.springframework.core.retry.RetryTemplate; import org.springframework.integration.IntegrationMessageHeaderAccessor; import org.springframework.integration.StaticMessageHeaderAccessor; import org.springframework.integration.amqp.support.AmqpHeaderMapper; @@ -46,17 +49,15 @@ import org.springframework.integration.amqp.support.DefaultAmqpHeaderMapper; import org.springframework.integration.amqp.support.EndpointUtils; import org.springframework.integration.amqp.support.MappingUtils; +import org.springframework.integration.core.RecoveryCallback; import org.springframework.integration.gateway.MessagingGatewaySupport; import org.springframework.integration.support.ErrorMessageUtils; import org.springframework.messaging.MessageChannel; -import org.springframework.retry.RecoveryCallback; -import org.springframework.retry.support.RetrySynchronizationManager; -import org.springframework.retry.support.RetryTemplate; import org.springframework.util.Assert; /** * Adapter that receives Messages from an AMQP Queue, converts them into - * Spring Integration Messages, and sends the results to a Message Channel. + * Spring Integration messages and sends the results to a Message Channel. * If a reply Message is received, it will be converted and sent back to * the AMQP 'replyTo'. * @@ -237,13 +238,13 @@ public void setBindSourceMessage(boolean bindSourceMessage) { /** * When mapping headers for the outbound (reply) message, determine whether the headers are - * mapped before the message is converted, or afterwards. This only affects headers + * mapped before the message is converted, or afterward. This only affects headers * that might be added by the message converter. When false, the converter's headers * win; when true, any headers added by the converter will be overridden (if the * source message has a header that maps to those headers). You might wish to set this * to true, for example, when using a * {@link org.springframework.amqp.support.converter.SimpleMessageConverter} with a - * String payload that contains json; the converter will set the content type to + * String payload that contains JSON; the converter will set the content type to * {@code text/plain} which can be overridden to {@code application/json} by setting * the {@link AmqpHeaders#CONTENT_TYPE} message header. Default: false. * @param replyHeadersMappedLast true if reply headers are mapped after conversion. @@ -286,11 +287,11 @@ private void setupRecoveryCallbackIfAny() { "Only one of 'recoveryCallback' or 'messageRecoverer' may be provided, but not both"); if (messageRecovererToUse != null) { this.recoveryCallback = - context -> { + (context, cause) -> { Message messageToRecover = (Message) context.getAttribute(AmqpMessageHeaderErrorMessageStrategy.AMQP_RAW_MESSAGE); if (messageToRecover != null) { - messageRecovererToUse.recover(messageToRecover, context.getLastThrowable()); + messageRecovererToUse.recover(messageToRecover, cause); } return null; }; @@ -310,9 +311,9 @@ protected void doStop() { } /** - * If there's a retry template, it will set the attributes holder via the listener. If - * there's no retry template, but there's an error channel, we create a new attributes - * holder here. If an attributes holder exists (by either method), we set the + * If there's a retry template, it will set the attribute holder via the listener. If + * there's no retry template, but there's an error channel, we create a new attribute + * holder here. If an attribute holder exists (by either method), we set the * attributes for use by the * {@link org.springframework.integration.support.ErrorMessageStrategy}. * @param amqpMessage the AMQP message to use. @@ -322,19 +323,15 @@ protected void doStop() { private void setAttributesIfNecessary(Message amqpMessage, org.springframework.messaging.@Nullable Message message) { - boolean needHolder = getErrorChannel() != null && this.retryTemplate == null; - boolean needAttributes = needHolder || this.retryTemplate != null; + boolean needHolder = getErrorChannel() != null || this.retryTemplate != null; if (needHolder) { - ATTRIBUTES_HOLDER.set(ErrorMessageUtils.getAttributeAccessor(null, null)); - } - if (needAttributes) { - AttributeAccessor attributes = this.retryTemplate != null - ? RetrySynchronizationManager.getContext() - : ATTRIBUTES_HOLDER.get(); - if (attributes != null) { - attributes.setAttribute(ErrorMessageUtils.INPUT_MESSAGE_CONTEXT_KEY, message); - attributes.setAttribute(AmqpMessageHeaderErrorMessageStrategy.AMQP_RAW_MESSAGE, amqpMessage); + AttributeAccessor attributes = ATTRIBUTES_HOLDER.get(); + if (attributes == null) { + attributes = ErrorMessageUtils.getAttributeAccessor(null, null); + ATTRIBUTES_HOLDER.set(attributes); } + attributes.setAttribute(ErrorMessageUtils.INPUT_MESSAGE_CONTEXT_KEY, message); + attributes.setAttribute(AmqpMessageHeaderErrorMessageStrategy.AMQP_RAW_MESSAGE, amqpMessage); } } @@ -357,31 +354,44 @@ protected Listener() { @SuppressWarnings("unchecked") @Override public void onMessage(final Message message, @Nullable Channel channel) { - if (AmqpInboundGateway.this.retryTemplate == null) { - try { + try { + if (AmqpInboundGateway.this.retryTemplate == null) { + org.springframework.messaging.Message converted = convert(message, channel); if (converted != null) { process(message, converted); } } - finally { - ATTRIBUTES_HOLDER.remove(); - } - } - else { - org.springframework.messaging.Message converted = convert(message, channel); - if (converted != null) { - AmqpInboundGateway.this.retryTemplate.execute(context -> { + else { + org.springframework.messaging.Message converted = convert(message, channel); + if (converted != null) { + try { + AmqpInboundGateway.this.retryTemplate.execute(() -> { AtomicInteger deliveryAttempt = StaticMessageHeaderAccessor.getDeliveryAttempt(converted); if (deliveryAttempt != null) { deliveryAttempt.incrementAndGet(); } process(message, converted); return null; - }, - (RecoveryCallback) AmqpInboundGateway.this.recoveryCallback); + }); + } + catch (RetryException ex) { + if (AmqpInboundGateway.this.recoveryCallback != null) { + AmqpInboundGateway.this.recoveryCallback.recover(getErrorMessageAttributes(converted), + ex.getCause()); + } + else { + throw new ListenerExecutionFailedException( + "Failed handling message after '" + ex.getRetryCount() + "' retries", ex, + message); + } + } + } } } + finally { + ATTRIBUTES_HOLDER.remove(); + } } private org.springframework.messaging.@Nullable Message convert( diff --git a/spring-integration-amqp/src/test/java/org/springframework/integration/amqp/inbound/InboundEndpointTests.java b/spring-integration-amqp/src/test/java/org/springframework/integration/amqp/inbound/InboundEndpointTests.java index 46a223907a2..08e1feefb28 100644 --- a/spring-integration-amqp/src/test/java/org/springframework/integration/amqp/inbound/InboundEndpointTests.java +++ b/spring-integration-amqp/src/test/java/org/springframework/integration/amqp/inbound/InboundEndpointTests.java @@ -16,6 +16,7 @@ package org.springframework.integration.amqp.inbound; +import java.time.Duration; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -47,6 +48,8 @@ import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.amqp.support.converter.SimpleMessageConverter; import org.springframework.beans.factory.BeanFactory; +import org.springframework.core.retry.RetryPolicy; +import org.springframework.core.retry.RetryTemplate; import org.springframework.integration.StaticMessageHeaderAccessor; import org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter.BatchMode; import org.springframework.integration.amqp.support.AmqpMessageHeaderErrorMessageStrategy; @@ -66,7 +69,6 @@ import org.springframework.messaging.MessagingException; import org.springframework.messaging.PollableChannel; import org.springframework.messaging.support.GenericMessage; -import org.springframework.retry.support.RetryTemplate; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException; @@ -346,7 +348,7 @@ public void testRetryWithinOnMessageAdapter() throws Exception { AbstractMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory); AmqpInboundChannelAdapter adapter = new AmqpInboundChannelAdapter(container); adapter.setOutputChannel(new DirectChannel()); - adapter.setRetryTemplate(new RetryTemplate()); + adapter.setRetryTemplate(new RetryTemplate(RetryPolicy.builder().maxAttempts(2).delay(Duration.ZERO).build())); QueueChannel errors = new QueueChannel(); ErrorMessageSendingRecoverer recoveryCallback = new ErrorMessageSendingRecoverer(errors); recoveryCallback.setErrorMessageStrategy(new AmqpMessageHeaderErrorMessageStrategy()); @@ -375,7 +377,7 @@ public void testRetryWithMessageRecovererOnMessageAdapter() throws Exception { AbstractMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory); AmqpInboundChannelAdapter adapter = new AmqpInboundChannelAdapter(container); adapter.setOutputChannel(new DirectChannel()); - adapter.setRetryTemplate(new RetryTemplate()); + adapter.setRetryTemplate(new RetryTemplate(RetryPolicy.builder().maxAttempts(2).delay(Duration.ZERO).build())); AtomicReference recoveredMessage = new AtomicReference<>(); AtomicReference recoveredError = new AtomicReference<>(); CountDownLatch recoveredLatch = new CountDownLatch(1); @@ -409,7 +411,7 @@ public void testRetryWithinOnMessageGateway() throws Exception { AbstractMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory); AmqpInboundGateway adapter = new AmqpInboundGateway(container); adapter.setRequestChannel(new DirectChannel()); - adapter.setRetryTemplate(new RetryTemplate()); + adapter.setRetryTemplate(new RetryTemplate(RetryPolicy.builder().maxAttempts(2).delay(Duration.ZERO).build())); QueueChannel errors = new QueueChannel(); ErrorMessageSendingRecoverer recoveryCallback = new ErrorMessageSendingRecoverer(errors); recoveryCallback.setErrorMessageStrategy(new AmqpMessageHeaderErrorMessageStrategy()); @@ -438,7 +440,7 @@ public void testRetryWithMessageRecovererOnMessageGateway() throws Exception { AbstractMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory); AmqpInboundGateway adapter = new AmqpInboundGateway(container); adapter.setRequestChannel(new DirectChannel()); - adapter.setRetryTemplate(new RetryTemplate()); + adapter.setRetryTemplate(new RetryTemplate(RetryPolicy.builder().maxAttempts(2).delay(Duration.ZERO).build())); AtomicReference recoveredMessage = new AtomicReference<>(); AtomicReference recoveredError = new AtomicReference<>(); CountDownLatch recoveredLatch = new CountDownLatch(1); @@ -591,7 +593,7 @@ public void testExclusiveRecover() { adapter.setRetryTemplate(new RetryTemplate()); adapter.setMessageRecoverer((message, cause) -> { }); - adapter.setRecoveryCallback(context -> null); + adapter.setRecoveryCallback((context, cause) -> null); adapter.setBeanFactory(TEST_INTEGRATION_CONTEXT); assertThatIllegalStateException() .isThrownBy(adapter::afterPropertiesSet) @@ -721,7 +723,7 @@ public void testRetryWithinOnMessageAdapterConsumerBatch() { container.setConsumerBatchEnabled(true); AmqpInboundChannelAdapter adapter = new AmqpInboundChannelAdapter(container); adapter.setOutputChannel(new DirectChannel()); - adapter.setRetryTemplate(new RetryTemplate()); + adapter.setRetryTemplate(new RetryTemplate(RetryPolicy.builder().maxAttempts(2).delay(Duration.ZERO).build())); QueueChannel errors = new QueueChannel(); ErrorMessageSendingRecoverer recoveryCallback = new ErrorMessageSendingRecoverer(errors); adapter.setBeanFactory(TEST_INTEGRATION_CONTEXT); @@ -768,7 +770,7 @@ public void testRetryWithMessageRecovererOnMessageAdapterConsumerBatch() throws container.setConsumerBatchEnabled(true); AmqpInboundChannelAdapter adapter = new AmqpInboundChannelAdapter(container); adapter.setOutputChannel(new DirectChannel()); - adapter.setRetryTemplate(new RetryTemplate()); + adapter.setRetryTemplate(new RetryTemplate(RetryPolicy.builder().maxAttempts(2).delay(Duration.ZERO).build())); AtomicReference> recoveredMessages = new AtomicReference<>(); AtomicReference recoveredError = new AtomicReference<>(); CountDownLatch recoveredLatch = new CountDownLatch(1); diff --git a/spring-integration-core/src/main/java/org/springframework/integration/core/ErrorMessagePublisher.java b/spring-integration-core/src/main/java/org/springframework/integration/core/ErrorMessagePublisher.java index d98a18cf2ca..8e92ba63fd1 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/core/ErrorMessagePublisher.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/core/ErrorMessagePublisher.java @@ -109,7 +109,6 @@ protected MessagingTemplate getMessagingTemplate() { return this.messagingTemplate; } - @Nullable protected DestinationResolver getChannelResolver() { return this.channelResolver; } @@ -158,10 +157,10 @@ public void publish(@Nullable Message inputMessage, @Nullable Message fail * Publish an error message for the supplied throwable and context. * The {@link #errorMessageStrategy} is used to build a {@link ErrorMessage} * to publish. - * @param throwable the throwable. May be null. + * @param throwable the throwable. Maybe null. * @param context the context for {@link ErrorMessage} properties. */ - public void publish(Throwable throwable, AttributeAccessor context) { + public void publish(@Nullable Throwable throwable, AttributeAccessor context) { populateChannel(); Throwable payload = determinePayload(throwable, context); ErrorMessage errorMessage = this.errorMessageStrategy.buildErrorMessage(payload, context); @@ -179,7 +178,7 @@ public void publish(Throwable throwable, AttributeAccessor context) { * @return the throwable for the {@link ErrorMessage} payload * @see ErrorMessageUtils */ - protected Throwable determinePayload(Throwable throwable, AttributeAccessor context) { + protected Throwable determinePayload(@Nullable Throwable throwable, AttributeAccessor context) { Throwable lastThrowable = throwable; if (lastThrowable == null) { lastThrowable = payloadWhenNull(context); @@ -214,9 +213,7 @@ private void populateChannel() { if (recoveryChannelName == null) { recoveryChannelName = IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME; } - if (this.channelResolver != null) { - this.channel = this.channelResolver.resolveDestination(recoveryChannelName); - } + this.channel = this.channelResolver.resolveDestination(recoveryChannelName); } this.messagingTemplate.setDefaultChannel(this.channel); diff --git a/spring-integration-core/src/main/java/org/springframework/integration/core/RecoveryCallback.java b/spring-integration-core/src/main/java/org/springframework/integration/core/RecoveryCallback.java new file mode 100644 index 00000000000..6510c99bf41 --- /dev/null +++ b/spring-integration-core/src/main/java/org/springframework/integration/core/RecoveryCallback.java @@ -0,0 +1,40 @@ +/* + * Copyright 2025-present the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.core; + +import org.jspecify.annotations.Nullable; + +import org.springframework.core.AttributeAccessor; + +/** + * Error handler-like strategy to provide fallback based on the {@link AttributeAccessor}. + * @param the type that is returned from the recovery + * + * @author Artem Bilan + * + * @since 7.0 + */ +public interface RecoveryCallback { + + /** + * @param context the context for failure + * @param cause the cause of the failure + * @return an Object that can be used to replace the callback result that failed + */ + T recover(AttributeAccessor context, Throwable cause); + +} diff --git a/spring-integration-core/src/main/java/org/springframework/integration/handler/advice/ErrorMessageSendingRecoverer.java b/spring-integration-core/src/main/java/org/springframework/integration/handler/advice/ErrorMessageSendingRecoverer.java index 30883c9616a..154fee1cf90 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/handler/advice/ErrorMessageSendingRecoverer.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/handler/advice/ErrorMessageSendingRecoverer.java @@ -16,16 +16,17 @@ package org.springframework.integration.handler.advice; +import java.io.Serial; + import org.springframework.core.AttributeAccessor; import org.springframework.integration.core.ErrorMessagePublisher; +import org.springframework.integration.core.RecoveryCallback; import org.springframework.integration.support.DefaultErrorMessageStrategy; import org.springframework.integration.support.ErrorMessageStrategy; import org.springframework.integration.support.ErrorMessageUtils; import org.springframework.messaging.Message; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessagingException; -import org.springframework.retry.RecoveryCallback; -import org.springframework.retry.RetryContext; /** * A {@link RecoveryCallback} that sends the final throwable as an @@ -43,8 +44,8 @@ public class ErrorMessageSendingRecoverer extends ErrorMessagePublisher implemen /** * Construct instance with the default {@code errorChannel} - * to publish recovery error message. - * The {@link DefaultErrorMessageStrategy} is used for building error message to publish. + * to publish a recovery error message. + * The {@link DefaultErrorMessageStrategy} is used for building an error message to publish. * @since 4.3.10 */ public ErrorMessageSendingRecoverer() { @@ -52,8 +53,8 @@ public ErrorMessageSendingRecoverer() { } /** - * Construct instance based on the provided message channel. - * The {@link DefaultErrorMessageStrategy} is used for building error message to publish. + * Construct an instance based on the provided message channel. + * The {@link DefaultErrorMessageStrategy} is used for building an error message to publish. * @param channel the message channel to publish error messages on recovery action. */ public ErrorMessageSendingRecoverer(MessageChannel channel) { @@ -80,8 +81,8 @@ public ErrorMessageSendingRecoverer(MessageChannel channel, ErrorMessageStrategy } @Override - public Object recover(RetryContext context) { - publish(context.getLastThrowable(), context); + public Object recover(AttributeAccessor context, Throwable cause) { + publish(cause, context); return null; } @@ -91,7 +92,7 @@ protected Throwable payloadWhenNull(AttributeAccessor context) { String description = "No retry exception available; " + "this can occur, for example, if the RetryPolicy allowed zero attempts " + "to execute the handler; " + - "RetryContext: " + context.toString(); + "RetryContext: " + context; return message == null ? new RetryExceptionNotAvailableException(description) : new RetryExceptionNotAvailableException(message, description); @@ -99,6 +100,7 @@ protected Throwable payloadWhenNull(AttributeAccessor context) { public static class RetryExceptionNotAvailableException extends MessagingException { + @Serial private static final long serialVersionUID = 1L; RetryExceptionNotAvailableException(String description) { diff --git a/spring-integration-core/src/main/java/org/springframework/integration/handler/advice/RequestHandlerRetryAdvice.java b/spring-integration-core/src/main/java/org/springframework/integration/handler/advice/RequestHandlerRetryAdvice.java index 72393d5110a..94a94dd1713 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/handler/advice/RequestHandlerRetryAdvice.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/handler/advice/RequestHandlerRetryAdvice.java @@ -16,11 +16,11 @@ package org.springframework.integration.handler.advice; +import org.springframework.integration.core.RecoveryCallback; import org.springframework.integration.handler.AbstractReplyProducingMessageHandler; import org.springframework.integration.support.ErrorMessageUtils; import org.springframework.messaging.Message; import org.springframework.messaging.MessagingException; -import org.springframework.retry.RecoveryCallback; import org.springframework.retry.RetryCallback; import org.springframework.retry.RetryContext; import org.springframework.retry.RetryListener; @@ -47,7 +47,7 @@ public class RequestHandlerRetryAdvice extends AbstractRequestHandlerAdvice { private RetryTemplate retryTemplate = new RetryTemplate(); - private RecoveryCallback recoveryCallback; + private org.springframework.retry.RecoveryCallback recoveryCallback; // Stateless unless a state generator is provided private RetryStateGenerator retryStateGenerator = message -> null; @@ -63,7 +63,8 @@ public void setRetryTemplate(RetryTemplate retryTemplate) { } public void setRecoveryCallback(RecoveryCallback recoveryCallback) { - this.recoveryCallback = recoveryCallback; + this.recoveryCallback = (context) -> + recoveryCallback.recover(context, context.getLastThrowable()); } public void setRetryStateGenerator(RetryStateGenerator retryStateGenerator) { diff --git a/spring-integration-core/src/test/java/org/springframework/integration/config/xml/RetryAdviceParserTests.java b/spring-integration-core/src/test/java/org/springframework/integration/config/xml/RetryAdviceParserTests.java index 6c3783bef4c..e54de204914 100644 --- a/spring-integration-core/src/test/java/org/springframework/integration/config/xml/RetryAdviceParserTests.java +++ b/spring-integration-core/src/test/java/org/springframework/integration/config/xml/RetryAdviceParserTests.java @@ -105,8 +105,9 @@ public void testAll() { assertThat(TestUtils.getPropertyValue(a1, "recoveryCallback")).isNull(); assertThat(TestUtils.getPropertyValue(a7, "recoveryCallback")).isNotNull(); - assertThat(TestUtils.getPropertyValue(a7, "recoveryCallback.channel")).isSameAs(this.foo); - assertThat(TestUtils.getPropertyValue(a7, "recoveryCallback.messagingTemplate.sendTimeout")).isEqualTo(4567L); +// TODO https://github.com/spring-projects/spring-integration/issues/10345 +// assertThat(TestUtils.getPropertyValue(a7, "recoveryCallback.channel")).isSameAs(this.foo); +// assertThat(TestUtils.getPropertyValue(a7, "recoveryCallback.messagingTemplate.sendTimeout")).isEqualTo(4567L); assertThat(TestUtils.getPropertyValue(this.handler1, "adviceChain", List.class).get(0)).isSameAs(this.a1); assertThat(TestUtils.getPropertyValue( diff --git a/spring-integration-core/src/test/java/org/springframework/integration/handler/advice/AdvisedMessageHandlerTests.java b/spring-integration-core/src/test/java/org/springframework/integration/handler/advice/AdvisedMessageHandlerTests.java index fa8ee8eacc3..252db255fd7 100644 --- a/spring-integration-core/src/test/java/org/springframework/integration/handler/advice/AdvisedMessageHandlerTests.java +++ b/spring-integration-core/src/test/java/org/springframework/integration/handler/advice/AdvisedMessageHandlerTests.java @@ -534,7 +534,7 @@ protected Object handleRequestMessage(Message requestMessage) { private void defaultStatefulRetryRecoverAfterThirdTryGuts(final AtomicInteger counter, AbstractReplyProducingMessageHandler handler, QueueChannel replies, RequestHandlerRetryAdvice advice) { - advice.setRecoveryCallback(context -> "baz"); + advice.setRecoveryCallback((context, cause) -> "baz"); List adviceChain = new ArrayList<>(); adviceChain.add(advice); @@ -619,11 +619,12 @@ public boolean canRetry(RetryContext context) { Message message = new GenericMessage<>("Hello, world!"); handler.handleMessage(message); Message error = errors.receive(10000); - assertThat(error).isNotNull(); - assertThat(error.getPayload() instanceof ErrorMessageSendingRecoverer.RetryExceptionNotAvailableException) - .isTrue(); - assertThat(((MessagingException) error.getPayload()).getFailedMessage()).isNotNull(); - assertThat(((MessagingException) error.getPayload()).getFailedMessage()).isSameAs(message); + assertThat(error) + .isNotNull() + .extracting("payload"). + isInstanceOf(ErrorMessageSendingRecoverer.RetryExceptionNotAvailableException.class) + .extracting("failedMessage") + .isSameAs(message); } @Test diff --git a/spring-integration-kafka/src/main/java/org/springframework/integration/kafka/dsl/KafkaInboundGatewaySpec.java b/spring-integration-kafka/src/main/java/org/springframework/integration/kafka/dsl/KafkaInboundGatewaySpec.java index b9eab44f7da..cfe3d1dcc11 100644 --- a/spring-integration-kafka/src/main/java/org/springframework/integration/kafka/dsl/KafkaInboundGatewaySpec.java +++ b/spring-integration-kafka/src/main/java/org/springframework/integration/kafka/dsl/KafkaInboundGatewaySpec.java @@ -25,6 +25,7 @@ import org.apache.kafka.common.TopicPartition; import org.jspecify.annotations.Nullable; +import org.springframework.integration.core.RecoveryCallback; import org.springframework.integration.dsl.ComponentsRegistration; import org.springframework.integration.dsl.MessagingGatewaySpec; import org.springframework.integration.kafka.inbound.KafkaInboundGateway; @@ -32,7 +33,6 @@ import org.springframework.kafka.listener.AbstractMessageListenerContainer; import org.springframework.kafka.listener.ConsumerSeekAware; import org.springframework.kafka.support.converter.RecordMessageConverter; -import org.springframework.retry.RecoveryCallback; import org.springframework.retry.support.RetryTemplate; import org.springframework.util.Assert; diff --git a/spring-integration-kafka/src/main/java/org/springframework/integration/kafka/dsl/KafkaMessageDrivenChannelAdapterSpec.java b/spring-integration-kafka/src/main/java/org/springframework/integration/kafka/dsl/KafkaMessageDrivenChannelAdapterSpec.java index 35da30fc20a..56ca7c87454 100644 --- a/spring-integration-kafka/src/main/java/org/springframework/integration/kafka/dsl/KafkaMessageDrivenChannelAdapterSpec.java +++ b/spring-integration-kafka/src/main/java/org/springframework/integration/kafka/dsl/KafkaMessageDrivenChannelAdapterSpec.java @@ -24,6 +24,7 @@ import org.apache.kafka.common.TopicPartition; import org.jspecify.annotations.Nullable; +import org.springframework.integration.core.RecoveryCallback; import org.springframework.integration.dsl.ComponentsRegistration; import org.springframework.integration.dsl.MessageProducerSpec; import org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter; @@ -33,7 +34,6 @@ import org.springframework.kafka.support.converter.BatchMessageConverter; import org.springframework.kafka.support.converter.MessageConverter; import org.springframework.kafka.support.converter.RecordMessageConverter; -import org.springframework.retry.RecoveryCallback; import org.springframework.retry.support.RetryTemplate; import org.springframework.util.Assert; diff --git a/spring-integration-kafka/src/main/java/org/springframework/integration/kafka/inbound/KafkaInboundGateway.java b/spring-integration-kafka/src/main/java/org/springframework/integration/kafka/inbound/KafkaInboundGateway.java index ccaeaf181c0..18218325607 100644 --- a/spring-integration-kafka/src/main/java/org/springframework/integration/kafka/inbound/KafkaInboundGateway.java +++ b/spring-integration-kafka/src/main/java/org/springframework/integration/kafka/inbound/KafkaInboundGateway.java @@ -18,6 +18,7 @@ import java.nio.ByteBuffer; import java.util.Map; +import java.util.Objects; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiConsumer; @@ -31,6 +32,7 @@ import org.springframework.integration.IntegrationMessageHeaderAccessor; import org.springframework.integration.context.OrderlyShutdownCapable; import org.springframework.integration.core.Pausable; +import org.springframework.integration.core.RecoveryCallback; import org.springframework.integration.gateway.MessagingGatewaySupport; import org.springframework.integration.handler.advice.ErrorMessageSendingRecoverer; import org.springframework.integration.kafka.support.RawRecordHeaderErrorMessageStrategy; @@ -55,7 +57,6 @@ import org.springframework.messaging.Message; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageHeaders; -import org.springframework.retry.RecoveryCallback; import org.springframework.retry.RetryContext; import org.springframework.retry.support.RetryTemplate; import org.springframework.util.Assert; @@ -86,7 +87,7 @@ public class KafkaInboundGateway extends MessagingGatewaySupport private @Nullable RetryTemplate retryTemplate; - private @Nullable RecoveryCallback recoveryCallback; + private org.springframework.retry.@Nullable RecoveryCallback recoveryCallback; private @Nullable BiConsumer, ConsumerSeekAware.ConsumerSeekCallback> onPartitionsAssignedSeekCallback; @@ -174,7 +175,8 @@ public void setRetryTemplate(RetryTemplate retryTemplate) { * @param recoveryCallback the recovery callback. */ public void setRecoveryCallback(RecoveryCallback recoveryCallback) { - this.recoveryCallback = recoveryCallback; + this.recoveryCallback = (context) -> + recoveryCallback.recover(context, Objects.requireNonNull(context.getLastThrowable())); } /** @@ -207,7 +209,12 @@ protected void onInit() { if (this.retryTemplate != null) { MessageChannel errorChannel = getErrorChannel(); if (this.recoveryCallback != null && errorChannel != null) { - this.recoveryCallback = new ErrorMessageSendingRecoverer(errorChannel, getErrorMessageStrategy()); + // TODO https://github.com/spring-projects/spring-integration/issues/10345 + ErrorMessageSendingRecoverer errorMessageSendingRecoverer = + new ErrorMessageSendingRecoverer(errorChannel, getErrorMessageStrategy()); + this.recoveryCallback = + context -> + errorMessageSendingRecoverer.recover(context, context.getLastThrowable()); } } ContainerProperties containerProperties = this.messageListenerContainer.getContainerProperties(); diff --git a/spring-integration-kafka/src/main/java/org/springframework/integration/kafka/inbound/KafkaMessageDrivenChannelAdapter.java b/spring-integration-kafka/src/main/java/org/springframework/integration/kafka/inbound/KafkaMessageDrivenChannelAdapter.java index a10e18b7764..fab6d66834e 100644 --- a/spring-integration-kafka/src/main/java/org/springframework/integration/kafka/inbound/KafkaMessageDrivenChannelAdapter.java +++ b/spring-integration-kafka/src/main/java/org/springframework/integration/kafka/inbound/KafkaMessageDrivenChannelAdapter.java @@ -20,6 +20,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiConsumer; import java.util.function.Supplier; @@ -34,6 +35,7 @@ import org.springframework.integration.IntegrationMessageHeaderAccessor; import org.springframework.integration.context.OrderlyShutdownCapable; import org.springframework.integration.core.Pausable; +import org.springframework.integration.core.RecoveryCallback; import org.springframework.integration.endpoint.MessageProducerSupport; import org.springframework.integration.handler.advice.ErrorMessageSendingRecoverer; import org.springframework.integration.kafka.support.RawRecordHeaderErrorMessageStrategy; @@ -63,7 +65,6 @@ import org.springframework.kafka.support.converter.RecordMessageConverter; import org.springframework.messaging.Message; import org.springframework.messaging.MessageChannel; -import org.springframework.retry.RecoveryCallback; import org.springframework.retry.RetryContext; import org.springframework.retry.support.RetryTemplate; import org.springframework.util.Assert; @@ -99,7 +100,7 @@ public class KafkaMessageDrivenChannelAdapter extends MessageProducerSuppo private @Nullable RetryTemplate retryTemplate; - private @Nullable RecoveryCallback recoveryCallback; + private org.springframework.retry.@Nullable RecoveryCallback recoveryCallback; private boolean filterInRetry; @@ -239,7 +240,8 @@ public void setRetryTemplate(RetryTemplate retryTemplate) { * @param recoveryCallback the recovery callback. */ public void setRecoveryCallback(RecoveryCallback recoveryCallback) { - this.recoveryCallback = recoveryCallback; + this.recoveryCallback = (context) -> + recoveryCallback.recover(context, Objects.requireNonNull(context.getLastThrowable())); } /** @@ -309,7 +311,12 @@ protected void onInit() { if (this.retryTemplate != null) { MessageChannel errorChannel = getErrorChannel(); if (this.recoveryCallback != null && errorChannel != null) { - this.recoveryCallback = new ErrorMessageSendingRecoverer(errorChannel, getErrorMessageStrategy()); + // TODO https://github.com/spring-projects/spring-integration/issues/10345 + ErrorMessageSendingRecoverer errorMessageSendingRecoverer = + new ErrorMessageSendingRecoverer(errorChannel, getErrorMessageStrategy()); + this.recoveryCallback = + context -> + errorMessageSendingRecoverer.recover(context, context.getLastThrowable()); } } if (!doFilterInRetry && this.recordFilterStrategy != null) { diff --git a/spring-integration-kafka/src/main/resources/org/springframework/integration/kafka/config/spring-integration-kafka.xsd b/spring-integration-kafka/src/main/resources/org/springframework/integration/kafka/config/spring-integration-kafka.xsd index 77b9a6bc170..e826ee22d32 100644 --- a/spring-integration-kafka/src/main/resources/org/springframework/integration/kafka/config/spring-integration-kafka.xsd +++ b/spring-integration-kafka/src/main/resources/org/springframework/integration/kafka/config/spring-integration-kafka.xsd @@ -711,7 +711,7 @@ - + diff --git a/spring-integration-kafka/src/test/java/org/springframework/integration/kafka/config/xml/KafkaInboundGatewayTests.java b/spring-integration-kafka/src/test/java/org/springframework/integration/kafka/config/xml/KafkaInboundGatewayTests.java index 4f353e1e481..6c5a5d1f7af 100644 --- a/spring-integration-kafka/src/test/java/org/springframework/integration/kafka/config/xml/KafkaInboundGatewayTests.java +++ b/spring-integration-kafka/src/test/java/org/springframework/integration/kafka/config/xml/KafkaInboundGatewayTests.java @@ -66,8 +66,9 @@ public void testProps() { .isSameAs(this.context.getBean("ems")); assertThat(TestUtils.getPropertyValue(this.gateway1, "retryTemplate")) .isSameAs(this.context.getBean("retryTemplate")); - assertThat(TestUtils.getPropertyValue(this.gateway1, "recoveryCallback")) - .isSameAs(this.context.getBean("recoveryCallback")); + // TODO https://github.com/spring-projects/spring-integration/issues/10345 +// assertThat(TestUtils.getPropertyValue(this.gateway1, "recoveryCallback")) +// .isSameAs(this.context.getBean("recoveryCallback")); assertThat(TestUtils.getPropertyValue(this.gateway1, "onPartitionsAssignedSeekCallback")) .isSameAs(this.context.getBean("onPartitionsAssignedSeekCallback")); assertThat(TestUtils.getPropertyValue(this.gateway1, "messagingTemplate.sendTimeout")).isEqualTo(5000L); diff --git a/spring-integration-kafka/src/test/java/org/springframework/integration/kafka/config/xml/KafkaMessageDrivenChannelAdapterParserTests.java b/spring-integration-kafka/src/test/java/org/springframework/integration/kafka/config/xml/KafkaMessageDrivenChannelAdapterParserTests.java index 250f1c5a197..a49592e73e7 100644 --- a/spring-integration-kafka/src/test/java/org/springframework/integration/kafka/config/xml/KafkaMessageDrivenChannelAdapterParserTests.java +++ b/spring-integration-kafka/src/test/java/org/springframework/integration/kafka/config/xml/KafkaMessageDrivenChannelAdapterParserTests.java @@ -25,6 +25,7 @@ import org.springframework.integration.channel.NullChannel; import org.springframework.integration.channel.PublishSubscribeChannel; import org.springframework.integration.channel.QueueChannel; +import org.springframework.integration.core.RecoveryCallback; import org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter; import org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.ListenerMode; import org.springframework.integration.support.ErrorMessageStrategy; @@ -35,7 +36,6 @@ import org.springframework.kafka.listener.KafkaMessageListenerContainer; import org.springframework.kafka.listener.adapter.FilteringMessageListenerAdapter; import org.springframework.kafka.listener.adapter.RecordFilterStrategy; -import org.springframework.retry.RecoveryCallback; import org.springframework.retry.support.RetryTemplate; import org.springframework.test.annotation.DirtiesContext; import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; @@ -95,7 +95,8 @@ void testKafkaMessageDrivenChannelAdapterParser() { .isEqualTo(String.class); assertThat(TestUtils.getPropertyValue(this.kafkaListener, "errorMessageStrategy")).isSameAs(this.ems); assertThat(TestUtils.getPropertyValue(this.kafkaListener, "retryTemplate")).isSameAs(this.retryTemplate); - assertThat(TestUtils.getPropertyValue(this.kafkaListener, "recoveryCallback")).isSameAs(this.recoveryCallback); + // TODO https://github.com/spring-projects/spring-integration/issues/10345 +// assertThat(TestUtils.getPropertyValue(this.kafkaListener, "recoveryCallback")).isSameAs(this.recoveryCallback); assertThat(TestUtils.getPropertyValue(this.kafkaListener, "onPartitionsAssignedSeekCallback")) .isSameAs(this.context.getBean("onPartitionsAssignedSeekCallback")); assertThat(TestUtils.getPropertyValue(this.kafkaListener, "bindSourceRecord", Boolean.class)).isTrue();