Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@

import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.core.retry.RetryTemplate;
import org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter;
import org.springframework.integration.amqp.support.AmqpHeaderMapper;
import org.springframework.integration.amqp.support.DefaultAmqpHeaderMapper;
import org.springframework.integration.core.RecoveryCallback;
import org.springframework.integration.dsl.MessageProducerSpec;
import org.springframework.retry.RecoveryCallback;
import org.springframework.retry.support.RetryTemplate;

/**
* The base {@link MessageProducerSpec} implementation for a {@link AmqpInboundChannelAdapter}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@
import org.springframework.amqp.rabbit.batch.BatchingStrategy;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.core.retry.RetryTemplate;
import org.springframework.integration.amqp.inbound.AmqpInboundGateway;
import org.springframework.integration.amqp.support.AmqpHeaderMapper;
import org.springframework.integration.amqp.support.DefaultAmqpHeaderMapper;
import org.springframework.integration.core.RecoveryCallback;
import org.springframework.integration.dsl.MessagingGatewaySpec;
import org.springframework.retry.RecoveryCallback;
import org.springframework.retry.support.RetryTemplate;

/**
* A base {@link MessagingGatewaySpec} implementation for {@link AmqpInboundGateway} endpoint options.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;

import com.rabbitmq.client.Channel;
Expand All @@ -36,30 +35,31 @@
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.amqp.rabbit.retry.MessageBatchRecoverer;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.rabbit.support.ListenerExecutionFailedException;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.amqp.support.converter.MessageConversionException;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.amqp.support.converter.SimpleMessageConverter;
import org.springframework.core.AttributeAccessor;
import org.springframework.core.retry.RetryException;
import org.springframework.core.retry.RetryOperations;
import org.springframework.core.retry.RetryTemplate;
import org.springframework.integration.IntegrationMessageHeaderAccessor;
import org.springframework.integration.StaticMessageHeaderAccessor;
import org.springframework.integration.amqp.support.AmqpHeaderMapper;
import org.springframework.integration.amqp.support.AmqpMessageHeaderErrorMessageStrategy;
import org.springframework.integration.amqp.support.DefaultAmqpHeaderMapper;
import org.springframework.integration.amqp.support.EndpointUtils;
import org.springframework.integration.context.OrderlyShutdownCapable;
import org.springframework.integration.core.RecoveryCallback;
import org.springframework.integration.endpoint.MessageProducerSupport;
import org.springframework.integration.support.ErrorMessageUtils;
import org.springframework.messaging.MessageChannel;
import org.springframework.retry.RecoveryCallback;
import org.springframework.retry.RetryOperations;
import org.springframework.retry.support.RetrySynchronizationManager;
import org.springframework.retry.support.RetryTemplate;
import org.springframework.util.Assert;

/**
* Adapter that receives Messages from an AMQP Queue, converts them into
* Spring Integration Messages, and sends the results to a Message Channel.
* Spring Integration messages and sends the results to a Message Channel.
*
* @author Mark Fisher
* @author Gary Russell
Expand Down Expand Up @@ -274,24 +274,24 @@ private void setupRecoveryCallbackIfAny() {
"The 'messageRecoverer' must be an instance of MessageBatchRecoverer " +
"when consumer configured for batch mode");
this.recoveryCallback =
context -> {
(context, cause) -> {
@SuppressWarnings("unchecked")
List<Message> messagesToRecover =
(List<Message>) context.getAttribute(AmqpMessageHeaderErrorMessageStrategy.AMQP_RAW_MESSAGE);
if (messagesToRecover != null) {
((MessageBatchRecoverer) messageRecovererToUse)
.recover(messagesToRecover, context.getLastThrowable());
.recover(messagesToRecover, cause);
}
return null;
};
}
else {
this.recoveryCallback =
context -> {
(context, cause) -> {
Message messageToRecover =
(Message) context.getAttribute(AmqpMessageHeaderErrorMessageStrategy.AMQP_RAW_MESSAGE);
if (messageToRecover != null) {
messageRecovererToUse.recover(messageToRecover, context.getLastThrowable());
messageRecovererToUse.recover(messageToRecover, cause);
}
return null;
};
Expand Down Expand Up @@ -321,9 +321,9 @@ public int afterShutdown() {
}

/**
* If there's a retry template, it will set the attributes holder via the listener. If
* there's no retry template, but there's an error channel, we create a new attributes
* holder here. If an attributes holder exists (by either method), we set the
* If there's a retry template, it will set the attribute holder via the listener. If
* there's no retry template, but there's an error channel, we create a new attribute
* holder here. If an attribute holder exists (by either method), we set the
* attributes for use by the
* {@link org.springframework.integration.support.ErrorMessageStrategy}.
* @param amqpMessage the AMQP message to use.
Expand All @@ -333,19 +333,15 @@ public int afterShutdown() {
private void setAttributesIfNecessary(Object amqpMessage,
org.springframework.messaging.@Nullable Message<?> message) {

boolean needHolder = getErrorChannel() != null && this.retryTemplate == null;
boolean needAttributes = needHolder || this.retryTemplate != null;
boolean needHolder = getErrorChannel() != null || this.retryTemplate != null;
if (needHolder) {
ATTRIBUTES_HOLDER.set(ErrorMessageUtils.getAttributeAccessor(null, null));
}
if (needAttributes) {
AttributeAccessor attributes = this.retryTemplate != null
? RetrySynchronizationManager.getContext()
: ATTRIBUTES_HOLDER.get();
if (attributes != null) {
attributes.setAttribute(ErrorMessageUtils.INPUT_MESSAGE_CONTEXT_KEY, message);
attributes.setAttribute(AmqpMessageHeaderErrorMessageStrategy.AMQP_RAW_MESSAGE, amqpMessage);
AttributeAccessor attributes = ATTRIBUTES_HOLDER.get();
if (attributes == null) {
attributes = ErrorMessageUtils.getAttributeAccessor(null, null);
ATTRIBUTES_HOLDER.set(attributes);
}
attributes.setAttribute(ErrorMessageUtils.INPUT_MESSAGE_CONTEXT_KEY, message);
attributes.setAttribute(AmqpMessageHeaderErrorMessageStrategy.AMQP_RAW_MESSAGE, amqpMessage);
}
}

Expand Down Expand Up @@ -385,16 +381,27 @@ public void onMessage(final Message message, @Nullable Channel channel) {
else {
final org.springframework.messaging.Message<Object> toSend =
createMessageFromAmqp(message, channel);
this.retryOps.execute(
context -> {
AtomicInteger deliveryAttempt = StaticMessageHeaderAccessor.getDeliveryAttempt(toSend);
if (deliveryAttempt != null) {
deliveryAttempt.incrementAndGet();
}
setAttributesIfNecessary(message, toSend);
sendMessage(toSend);
return null;
}, this.recoverer);
try {
this.retryOps.execute(
() -> {
AtomicInteger deliveryAttempt = StaticMessageHeaderAccessor.getDeliveryAttempt(toSend);
if (deliveryAttempt != null) {
deliveryAttempt.incrementAndGet();
}
setAttributesIfNecessary(message, toSend);
sendMessage(toSend);
return null;
});
}
catch (RetryException ex) {
if (this.recoverer != null) {
this.recoverer.recover(getErrorMessageAttributes(toSend), ex.getCause());
}
else {
throw new ListenerExecutionFailedException(
"Failed handling message after '" + ex.getRetryCount() + "' retries", ex, message);
}
}
}
}
catch (MessageConversionException e) {
Expand All @@ -404,17 +411,14 @@ public void onMessage(final Message message, @Nullable Channel channel) {
getMessagingTemplate()
.send(errorChannel,
buildErrorMessage(null,

EndpointUtils.errorMessagePayload(message, channel, this.manualAcks, e)));
}
else {
throw e;
}
}
finally {
if (this.retryOps == null) {
ATTRIBUTES_HOLDER.remove();
}
ATTRIBUTES_HOLDER.remove();
}
}

Expand Down Expand Up @@ -453,7 +457,7 @@ protected Object convertPayload(Message message) {

protected org.springframework.messaging.Message<Object> createMessageFromPayload(Object payload,
@Nullable Channel channel, Map<String, @Nullable Object> headers, long deliveryTag,
@Nullable List<Map<String, Object>> listHeaders) {
@Nullable List<Map<String, @Nullable Object>> listHeaders) {

if (this.manualAcks) {
headers.put(AmqpHeaders.DELIVERY_TAG, deliveryTag);
Expand All @@ -480,14 +484,14 @@ protected class BatchListener extends Listener implements ChannelAwareBatchMessa
@Override
public void onMessageBatch(List<Message> messages, @Nullable Channel channel) {
List<?> converted;
List<Map<String, Object>> headers = null;
List<Map<String, @Nullable Object>> headers = null;
if (this.batchModeMessages) {
converted = convertMessages(messages, channel);
}
else {
converted = convertPayloads(messages, channel);
if (BatchMode.EXTRACT_PAYLOADS_WITH_HEADERS.equals(AmqpInboundChannelAdapter.this.batchMode)) {
List<Map<String, Object>> listHeaders = new ArrayList<>();
List<Map<String, @Nullable Object>> listHeaders = new ArrayList<>();
messages.forEach(msg -> listHeaders.add(AmqpInboundChannelAdapter.this.headerMapper
.toHeadersFromRequest(msg.getMessageProperties())));
headers = listHeaders;
Expand All @@ -503,31 +507,45 @@ public void onMessageBatch(List<Message> messages, @Nullable Channel channel) {
sendMessage(message);
}
else {
this.retryOps.execute(
context -> {
AtomicInteger deliveryAttempt = StaticMessageHeaderAccessor.getDeliveryAttempt(message);
if (deliveryAttempt != null) {
deliveryAttempt.incrementAndGet();
}
if (this.batchModeMessages) {
@SuppressWarnings("unchecked")
List<org.springframework.messaging.Message<?>> payloads =
(List<org.springframework.messaging.Message<?>>) message.getPayload();
payloads.forEach(payload ->
Objects.requireNonNull(
StaticMessageHeaderAccessor.getDeliveryAttempt(payload))
.incrementAndGet());
}
setAttributesIfNecessary(messages, message);
sendMessage(message);
return null;
}, this.recoverer);
try {
this.retryOps.execute(
() -> {
AtomicInteger deliveryAttempt =
StaticMessageHeaderAccessor.getDeliveryAttempt(message);
if (deliveryAttempt != null) {
deliveryAttempt.incrementAndGet();
}
if (this.batchModeMessages) {
@SuppressWarnings("unchecked")
List<org.springframework.messaging.Message<?>> payloads =
(List<org.springframework.messaging.Message<?>>) message.getPayload();
payloads.forEach(payload -> {
AtomicInteger batchItemDeliveryAttempt =
StaticMessageHeaderAccessor.getDeliveryAttempt(payload);
if (batchItemDeliveryAttempt != null) {
batchItemDeliveryAttempt.incrementAndGet();
}
});
}
setAttributesIfNecessary(messages, message);
sendMessage(message);
return null;
});
}
catch (RetryException ex) {
if (this.recoverer != null) {
this.recoverer.recover(getErrorMessageAttributes(null), ex.getCause());
}
else {
throw new ListenerExecutionFailedException(
"Failed handling messages after '" + ex.getRetryCount() + "' retries", ex,
messages.toArray(new Message[0]));
}
}
}
}
finally {
if (this.retryOps == null) {
ATTRIBUTES_HOLDER.remove();
}
ATTRIBUTES_HOLDER.remove();
}
}
}
Expand Down
Loading