Skip to content

Commit dc94b42

Browse files
garyrussellartembilan
authored andcommitted
INT-3975: AMQP Channels: Support extract-payload
JIRA: https://jira.spring.io/browse/INT-3975 Also, switch the default mapping to map all inbound headers, but don't map `x-*` on outbound (by default). This will provide apps access to important headers such as `x-death` by default, while not propagating dangerous headers outbound. Polishing - PR Comments Simple code style polishing
1 parent c677722 commit dc94b42

File tree

20 files changed

+691
-68
lines changed

20 files changed

+691
-68
lines changed

spring-integration-amqp/src/main/java/org/springframework/integration/amqp/channel/AbstractAmqpChannel.java

Lines changed: 94 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2014 the original author or authors.
2+
* Copyright 2002-2016 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -17,6 +17,11 @@
1717
package org.springframework.integration.amqp.channel;
1818

1919
import org.springframework.amqp.core.AmqpTemplate;
20+
import org.springframework.amqp.core.MessageDeliveryMode;
21+
import org.springframework.amqp.rabbit.core.RabbitTemplate;
22+
import org.springframework.integration.amqp.support.AmqpHeaderMapper;
23+
import org.springframework.integration.amqp.support.DefaultAmqpHeaderMapper;
24+
import org.springframework.integration.amqp.support.MappingUtils;
2025
import org.springframework.integration.channel.AbstractMessageChannel;
2126
import org.springframework.messaging.Message;
2227
import org.springframework.util.Assert;
@@ -29,11 +34,48 @@ public abstract class AbstractAmqpChannel extends AbstractMessageChannel {
2934

3035
private final AmqpTemplate amqpTemplate;
3136

37+
private final RabbitTemplate rabbitTemplate;
38+
39+
private final AmqpHeaderMapper outboundHeaderMapper;
40+
41+
private final AmqpHeaderMapper inboundHeaderMapper;
42+
43+
private volatile boolean extractPayload;
44+
3245
private volatile boolean loggingEnabled = true;
3346

47+
private MessageDeliveryMode defaultDeliveryMode;
48+
49+
/**
50+
* Construct an instance with the supplied template and default header mappers
51+
* used if the template is a {@link RabbitTemplate} and the message is mapped.
52+
* @param amqpTemplate the template.
53+
* @see #setExtractPayload(boolean)
54+
*/
3455
AbstractAmqpChannel(AmqpTemplate amqpTemplate) {
56+
this(amqpTemplate, DefaultAmqpHeaderMapper.outboundMapper(), DefaultAmqpHeaderMapper.inboundMapper());
57+
}
58+
59+
/**
60+
* Construct an instance with the supplied template and header mappers, used
61+
* when the message is mapped.
62+
* @param amqpTemplate the template.
63+
* @param outboundMapper the outbound mapper.
64+
* @param inboundMapper the inbound mapper.
65+
* @see #setExtractPayload(boolean)
66+
* @since 4.3
67+
*/
68+
AbstractAmqpChannel(AmqpTemplate amqpTemplate, AmqpHeaderMapper outboundMapper, AmqpHeaderMapper inboundMapper) {
3569
Assert.notNull(amqpTemplate, "amqpTemplate must not be null");
3670
this.amqpTemplate = amqpTemplate;
71+
if (amqpTemplate instanceof RabbitTemplate) {
72+
this.rabbitTemplate = (RabbitTemplate) amqpTemplate;
73+
}
74+
else {
75+
this.rabbitTemplate = null;
76+
}
77+
this.outboundHeaderMapper = outboundMapper;
78+
this.inboundHeaderMapper = inboundMapper;
3779
}
3880

3981
@Override
@@ -46,6 +88,41 @@ public void setLoggingEnabled(boolean loggingEnabled) {
4688
this.loggingEnabled = loggingEnabled;
4789
}
4890

91+
/**
92+
* Set the delivery mode to use if the message has no
93+
* {@value org.springframework.amqp.support.AmqpHeaders#DELIVERY_MODE}
94+
* header and the message property was not set by the {@code MessagePropertiesConverter}.
95+
* @param defaultDeliveryMode the default delivery mode.
96+
* @since 4.3
97+
*/
98+
public void setDefaultDeliveryMode(MessageDeliveryMode defaultDeliveryMode) {
99+
this.defaultDeliveryMode = defaultDeliveryMode;
100+
}
101+
102+
/**
103+
* Set to true to extract the payload and map the headers; otherwise
104+
* the entire message is converted and sent. Default false.
105+
* @param extractPayload true to extract and map.
106+
* @since 4.3
107+
*/
108+
public void setExtractPayload(boolean extractPayload) {
109+
if (extractPayload) {
110+
Assert.isTrue(this.rabbitTemplate != null, "amqpTemplate must be a RabbitTemplate for 'extractPayload'");
111+
Assert.state(this.outboundHeaderMapper != null && this.inboundHeaderMapper != null,
112+
"'extractPayload' requires both inbound and outbound header mappers");
113+
}
114+
this.extractPayload = extractPayload;
115+
}
116+
117+
/**
118+
* @return the extract payload.
119+
* @see #setExtractPayload(boolean)
120+
* @since 4.3
121+
*/
122+
protected boolean isExtractPayload() {
123+
return this.extractPayload;
124+
}
125+
49126
/**
50127
* Subclasses may override this method to return an Exchange name.
51128
* By default, Messages will be sent to the no-name Direct Exchange.
@@ -66,13 +143,27 @@ protected String getRoutingKey() {
66143
return "";
67144
}
68145

69-
AmqpTemplate getAmqpTemplate() {
146+
protected AmqpHeaderMapper getInboundHeaderMapper() {
147+
return this.inboundHeaderMapper;
148+
}
149+
150+
protected AmqpTemplate getAmqpTemplate() {
70151
return this.amqpTemplate;
71152
}
72153

154+
protected RabbitTemplate getRabbitTemplate() {
155+
return this.rabbitTemplate;
156+
}
157+
73158
@Override
74159
protected boolean doSend(Message<?> message, long timeout) {
75-
this.amqpTemplate.convertAndSend(this.getExchangeName(), this.getRoutingKey(), message);
160+
if (this.extractPayload) {
161+
this.amqpTemplate.send(getExchangeName(), getRoutingKey(), MappingUtils.mapMessage(message,
162+
this.rabbitTemplate.getMessageConverter(), this.outboundHeaderMapper, this.defaultDeliveryMode));
163+
}
164+
else {
165+
this.amqpTemplate.convertAndSend(getExchangeName(), getRoutingKey(), message);
166+
}
76167
return true;
77168
}
78169

spring-integration-amqp/src/main/java/org/springframework/integration/amqp/channel/AbstractSubscribableAmqpChannel.java

Lines changed: 81 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616

1717
package org.springframework.integration.amqp.channel;
1818

19+
import java.util.Map;
20+
1921
import org.apache.commons.logging.Log;
2022
import org.apache.commons.logging.LogFactory;
2123

@@ -31,9 +33,11 @@
3133
import org.springframework.beans.factory.DisposableBean;
3234
import org.springframework.context.SmartLifecycle;
3335
import org.springframework.integration.MessageDispatchingException;
36+
import org.springframework.integration.amqp.support.AmqpHeaderMapper;
3437
import org.springframework.integration.context.IntegrationProperties;
3538
import org.springframework.integration.dispatcher.AbstractDispatcher;
3639
import org.springframework.integration.dispatcher.MessageDispatcher;
40+
import org.springframework.integration.support.AbstractIntegrationMessageBuilder;
3741
import org.springframework.integration.support.MessageBuilderFactory;
3842
import org.springframework.messaging.Message;
3943
import org.springframework.messaging.MessageDeliveryException;
@@ -65,11 +69,44 @@ abstract class AbstractSubscribableAmqpChannel extends AbstractAmqpChannel
6569

6670
private final ConnectionFactory connectionFactory;
6771

72+
/**
73+
* Construct an instance with the supplied name, container and template; default header
74+
* mappers will be used if the message is mapped.
75+
* @param channelName the channel name.
76+
* @param container the container.
77+
* @param amqpTemplate the template.
78+
* @see #setExtractPayload(boolean)
79+
*/
6880
protected AbstractSubscribableAmqpChannel(String channelName, SimpleMessageListenerContainer container,
6981
AmqpTemplate amqpTemplate) {
7082
this(channelName, container, amqpTemplate, false);
7183
}
7284

85+
/**
86+
* Construct an instance with the supplied name, container and template; default header
87+
* mappers will be used if the message is mapped.
88+
* @param channelName the channel name.
89+
* @param container the container.
90+
* @param amqpTemplate the template.
91+
* @param outboundMapper the outbound mapper.
92+
* @param inboundMapper the inbound mapper.
93+
* @see #setExtractPayload(boolean)
94+
* @since 4.3
95+
*/
96+
protected AbstractSubscribableAmqpChannel(String channelName, SimpleMessageListenerContainer container,
97+
AmqpTemplate amqpTemplate, AmqpHeaderMapper outboundMapper, AmqpHeaderMapper inboundMapper) {
98+
this(channelName, container, amqpTemplate, false, outboundMapper, inboundMapper);
99+
}
100+
101+
/**
102+
* Construct an instance with the supplied name, container and template; default header
103+
* mappers will be used if the message is mapped.
104+
* @param channelName the channel name.
105+
* @param container the container.
106+
* @param amqpTemplate the template.
107+
* @param isPubSub true for a pub/sub channel.
108+
* @see #setExtractPayload(boolean)
109+
*/
73110
protected AbstractSubscribableAmqpChannel(String channelName,
74111
SimpleMessageListenerContainer container,
75112
AmqpTemplate amqpTemplate, boolean isPubSub) {
@@ -83,6 +120,32 @@ protected AbstractSubscribableAmqpChannel(String channelName,
83120
this.admin = new RabbitAdmin(this.connectionFactory);
84121
}
85122

123+
/**
124+
* Construct an instance with the supplied name, container and template; default header
125+
* mappers will be used if the message is mapped.
126+
* @param channelName the channel name.
127+
* @param container the container.
128+
* @param amqpTemplate the template.
129+
* @param isPubSub true for a pub/sub channel.
130+
* @param outboundMapper the outbound mapper.
131+
* @param inboundMapper the inbound mapper.
132+
* @see #setExtractPayload(boolean)
133+
* @since 4.3
134+
*/
135+
protected AbstractSubscribableAmqpChannel(String channelName,
136+
SimpleMessageListenerContainer container,
137+
AmqpTemplate amqpTemplate, boolean isPubSub,
138+
AmqpHeaderMapper outboundMapper, AmqpHeaderMapper inboundMapper) {
139+
super(amqpTemplate, outboundMapper, inboundMapper);
140+
Assert.notNull(container, "container must not be null");
141+
Assert.hasText(channelName, "channel name must not be empty");
142+
this.channelName = channelName;
143+
this.container = container;
144+
this.isPubSub = isPubSub;
145+
this.connectionFactory = container.getConnectionFactory();
146+
this.admin = new RabbitAdmin(this.connectionFactory);
147+
}
148+
86149
/**
87150
* Specify the maximum number of subscribers supported by the
88151
* channel's dispatcher (if it is an {@link AbstractDispatcher}).
@@ -131,7 +194,7 @@ public void onInit() throws Exception {
131194
: new SimpleMessageConverter();
132195
MessageListener listener = new DispatchingMessageListener(converter,
133196
this.dispatcher, this, this.isPubSub,
134-
this.getMessageBuilderFactory());
197+
getMessageBuilderFactory(), getInboundHeaderMapper());
135198
this.container.setMessageListener(listener);
136199
if (!this.container.isActive()) {
137200
this.container.afterPropertiesSet();
@@ -157,16 +220,19 @@ private static class DispatchingMessageListener implements MessageListener {
157220

158221
private final MessageBuilderFactory messageBuilderFactory;
159222

223+
private final AmqpHeaderMapper inboundHeaderMapper;
224+
160225
private DispatchingMessageListener(MessageConverter converter,
161226
MessageDispatcher dispatcher, AbstractSubscribableAmqpChannel channel, boolean isPubSub,
162-
MessageBuilderFactory messageBuilderFactory) {
227+
MessageBuilderFactory messageBuilderFactory, AmqpHeaderMapper inboundHeaderMapper) {
163228
Assert.notNull(converter, "MessageConverter must not be null");
164229
Assert.notNull(dispatcher, "MessageDispatcher must not be null");
165230
this.converter = converter;
166231
this.dispatcher = dispatcher;
167232
this.channel = channel;
168233
this.isPubSub = isPubSub;
169234
this.messageBuilderFactory = messageBuilderFactory;
235+
this.inboundHeaderMapper = inboundHeaderMapper;
170236
}
171237

172238

@@ -177,7 +243,7 @@ public void onMessage(org.springframework.amqp.core.Message message) {
177243
Object converted = this.converter.fromMessage(message);
178244
if (converted != null) {
179245
messageToSend = (converted instanceof Message<?>) ? (Message<?>) converted
180-
: this.messageBuilderFactory.withPayload(converted).build();
246+
: buildMessage(message, converted);
181247
this.dispatcher.dispatch(messageToSend);
182248
}
183249
else if (this.logger.isWarnEnabled()) {
@@ -203,6 +269,18 @@ else if (this.logger.isWarnEnabled()) {
203269
"while attempting to convert and dispatch Message.", e);
204270
}
205271
}
272+
273+
protected Message<Object> buildMessage(org.springframework.amqp.core.Message message, Object converted) {
274+
AbstractIntegrationMessageBuilder<Object> messageBuilder =
275+
this.messageBuilderFactory.withPayload(converted);
276+
if (this.channel.isExtractPayload()) {
277+
Map<String, Object> headers =
278+
this.inboundHeaderMapper.toHeadersFromRequest(message.getMessageProperties());
279+
messageBuilder.copyHeaders(headers);
280+
}
281+
return messageBuilder.build();
282+
}
283+
206284
}
207285

208286

spring-integration-amqp/src/main/java/org/springframework/integration/amqp/channel/PointToPointSubscribableAmqpChannel.java

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2014 the original author or authors.
2+
* Copyright 2002-2016 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -20,6 +20,7 @@
2020
import org.springframework.amqp.core.AmqpTemplate;
2121
import org.springframework.amqp.core.Queue;
2222
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
23+
import org.springframework.integration.amqp.support.AmqpHeaderMapper;
2324
import org.springframework.integration.dispatcher.AbstractDispatcher;
2425
import org.springframework.integration.dispatcher.RoundRobinLoadBalancingStrategy;
2526
import org.springframework.integration.dispatcher.UnicastingDispatcher;
@@ -34,12 +35,37 @@ public class PointToPointSubscribableAmqpChannel extends AbstractSubscribableAmq
3435
private volatile String queueName;
3536

3637

38+
/**
39+
* Construct an instance with the supplied name, container and template; default header
40+
* mappers will be used if the message is mapped.
41+
* @param channelName the channel name.
42+
* @param container the container.
43+
* @param amqpTemplate the template.
44+
* @see #setExtractPayload(boolean)
45+
*/
3746
public PointToPointSubscribableAmqpChannel(String channelName, SimpleMessageListenerContainer container,
3847
AmqpTemplate amqpTemplate) {
3948
super(channelName, container, amqpTemplate);
4049
}
4150

4251

52+
/**
53+
* Construct an instance with the supplied name, container and template; default header
54+
* mappers will be used if the message is mapped.
55+
* @param channelName the channel name.
56+
* @param container the container.
57+
* @param amqpTemplate the template.
58+
* @param outboundMapper the outbound mapper.
59+
* @param inboundMapper the inbound mapper.
60+
* @see #setExtractPayload(boolean)
61+
* @since 4.3
62+
*/
63+
public PointToPointSubscribableAmqpChannel(String channelName, SimpleMessageListenerContainer container,
64+
AmqpTemplate amqpTemplate, AmqpHeaderMapper outboundMapper, AmqpHeaderMapper inboundMapper) {
65+
super(channelName, container, amqpTemplate, outboundMapper, inboundMapper);
66+
}
67+
68+
4369
/**
4470
* Provide a Queue name to be used. If this is not provided,
4571
* the Queue's name will be the same as the channel name.

0 commit comments

Comments
 (0)