Skip to content

Commit 7d3b649

Browse files
committed
Add support for MIME-based message conversion
The MessageConverter interface in spring-messaging is now explicitly designed to support conversion of the payload of a Message<?> to and from serialized form based on MIME type message header. By default, the MessageHeaders.CONTENT_HEADER header is used but a custom ContentTypeResolver can be configured to customize that. Currently available are Jackson 2, String, and byte array converters. A CompositeMessageConverter can be used to configure several message converters in various places such as a messaging template.
1 parent ee8f1aa commit 7d3b649

File tree

34 files changed

+1522
-176
lines changed

34 files changed

+1522
-176
lines changed

spring-messaging/src/main/java/org/springframework/messaging/core/AbstractDestinationResolvingMessagingTemplate.java

Lines changed: 51 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -15,16 +15,22 @@
1515
*/
1616
package org.springframework.messaging.core;
1717

18+
import java.util.Map;
19+
1820
import org.springframework.messaging.Message;
1921
import org.springframework.util.Assert;
2022

2123

2224
/**
25+
* Base class for a messaging template that can resolve String-based destinations.
26+
*
2327
* @author Mark Fisher
28+
* @author Rossen Stoyanchev
2429
* @since 4.0
2530
*/
26-
public abstract class AbstractDestinationResolvingMessagingTemplate<D> extends AbstractMessagingTemplate<D>
27-
implements DestinationResolvingMessageSendingOperations<D>,
31+
public abstract class AbstractDestinationResolvingMessagingTemplate<D> extends
32+
AbstractMessagingTemplate<D> implements
33+
DestinationResolvingMessageSendingOperations<D>,
2834
DestinationResolvingMessageReceivingOperations<D>,
2935
DestinationResolvingMessageRequestReplyOperations<D> {
3036

@@ -48,14 +54,29 @@ protected final D resolveDestination(String destinationName) {
4854
}
4955

5056
@Override
51-
public <T> void convertAndSend(String destinationName, T message) {
52-
this.convertAndSend(destinationName, message, null);
57+
public <T> void convertAndSend(String destinationName, T payload) {
58+
Map<String, Object> headers = null;
59+
this.convertAndSend(destinationName, payload, headers);
60+
}
61+
62+
@Override
63+
public <T> void convertAndSend(String destinationName, T payload, Map<String, Object> headers) {
64+
MessagePostProcessor postProcessor = null;
65+
this.convertAndSend(destinationName, payload, headers, postProcessor);
5366
}
5467

5568
@Override
56-
public <T> void convertAndSend(String destinationName, T message, MessagePostProcessor postProcessor) {
69+
public <T> void convertAndSend(String destinationName, T payload, MessagePostProcessor postProcessor) {
70+
Map<String, Object> headers = null;
71+
this.convertAndSend(destinationName, payload, headers, postProcessor);
72+
}
73+
74+
@Override
75+
public <T> void convertAndSend(String destinationName, T payload, Map<String, Object> headers,
76+
MessagePostProcessor postProcessor) {
77+
5778
D destination = resolveDestination(destinationName);
58-
super.convertAndSend(destination, message, postProcessor);
79+
super.convertAndSend(destination, payload, headers, postProcessor);
5980
}
6081

6182
@Override
@@ -65,9 +86,9 @@ public <P> Message<P> receive(String destinationName) {
6586
}
6687

6788
@Override
68-
public Object receiveAndConvert(String destinationName) {
89+
public <T> T receiveAndConvert(String destinationName, Class<T> targetClass) {
6990
D destination = resolveDestination(destinationName);
70-
return super.receiveAndConvert(destination);
91+
return super.receiveAndConvert(destination, targetClass);
7192
}
7293

7394
@Override
@@ -77,15 +98,33 @@ public Message<?> sendAndReceive(String destinationName, Message<?> requestMessa
7798
}
7899

79100
@Override
80-
public Object convertSendAndReceive(String destinationName, Object request) {
101+
public <T> T convertSendAndReceive(String destinationName, Object request, Class<T> targetClass) {
102+
D destination = resolveDestination(destinationName);
103+
return super.convertSendAndReceive(destination, request, targetClass);
104+
}
105+
106+
@Override
107+
public <T> T convertSendAndReceive(String destinationName, Object request, Map<String, Object> headers,
108+
Class<T> targetClass) {
109+
110+
D destination = resolveDestination(destinationName);
111+
return super.convertSendAndReceive(destination, request, headers, targetClass);
112+
}
113+
114+
@Override
115+
public <T> T convertSendAndReceive(String destinationName, Object request, Class<T> targetClass,
116+
MessagePostProcessor postProcessor) {
117+
81118
D destination = resolveDestination(destinationName);
82-
return super.convertSendAndReceive(destination, request);
119+
return super.convertSendAndReceive(destination, request, targetClass, postProcessor);
83120
}
84121

85122
@Override
86-
public Object convertSendAndReceive(String destinationName, Object request, MessagePostProcessor postProcessor) {
123+
public <T> T convertSendAndReceive(String destinationName, Object request, Map<String, Object> headers,
124+
Class<T> targetClass, MessagePostProcessor postProcessor) {
125+
87126
D destination = resolveDestination(destinationName);
88-
return super.convertSendAndReceive(destination, request, postProcessor);
127+
return super.convertSendAndReceive(destination, request, headers, targetClass, postProcessor);
89128
}
90129

91130
}

spring-messaging/src/main/java/org/springframework/messaging/core/AbstractMessageSendingTemplate.java

Lines changed: 44 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -15,17 +15,27 @@
1515
*/
1616
package org.springframework.messaging.core;
1717

18+
import java.util.ArrayList;
19+
import java.util.Collection;
20+
import java.util.Map;
21+
1822
import org.apache.commons.logging.Log;
1923
import org.apache.commons.logging.LogFactory;
2024
import org.springframework.messaging.Message;
25+
import org.springframework.messaging.MessageHeaders;
2126
import org.springframework.messaging.MessagingException;
27+
import org.springframework.messaging.support.converter.ByteArrayMessageConverter;
28+
import org.springframework.messaging.support.converter.CompositeMessageConverter;
2229
import org.springframework.messaging.support.converter.MessageConverter;
23-
import org.springframework.messaging.support.converter.SimplePayloadMessageConverter;
30+
import org.springframework.messaging.support.converter.StringMessageConverter;
2431
import org.springframework.util.Assert;
2532

2633

2734
/**
35+
* Base class for templates that support sending messages.
36+
*
2837
* @author Mark Fisher
38+
* @author Rossen Stoyanchev
2939
* @since 4.0
3040
*/
3141
public abstract class AbstractMessageSendingTemplate<D> implements MessageSendingOperations<D> {
@@ -34,9 +44,16 @@ public abstract class AbstractMessageSendingTemplate<D> implements MessageSendin
3444

3545
private volatile D defaultDestination;
3646

37-
private volatile MessageConverter converter = new SimplePayloadMessageConverter();
47+
private volatile MessageConverter converter;
3848

3949

50+
public AbstractMessageSendingTemplate() {
51+
Collection<MessageConverter> converters = new ArrayList<MessageConverter>();
52+
converters.add(new StringMessageConverter());
53+
converters.add(new ByteArrayMessageConverter());
54+
this.converter = new CompositeMessageConverter(converters);
55+
}
56+
4057
public void setDefaultDestination(D defaultDestination) {
4158
this.defaultDestination = defaultDestination;
4259
}
@@ -58,19 +75,13 @@ public void setMessageConverter(MessageConverter messageConverter) {
5875
/**
5976
* @return the configured {@link MessageConverter}
6077
*/
61-
public MessageConverter getConverter() {
78+
public MessageConverter getMessageConverter() {
6279
return this.converter;
6380
}
6481

65-
/**
66-
* @param converter the converter to set
67-
*/
68-
public void setConverter(MessageConverter converter) {
69-
this.converter = converter;
70-
}
7182

7283
@Override
73-
public <P> void send(Message<P> message) {
84+
public void send(Message<?> message) {
7485
this.send(getRequiredDefaultDestination(), message);
7586
}
7687

@@ -82,34 +93,48 @@ protected final D getRequiredDefaultDestination() {
8293
}
8394

8495
@Override
85-
public <P> void send(D destination, Message<P> message) {
96+
public void send(D destination, Message<?> message) {
8697
this.doSend(destination, message);
8798
}
8899

89100
protected abstract void doSend(D destination, Message<?> message);
90101

91102

92103
@Override
93-
public <T> void convertAndSend(T message) {
104+
public void convertAndSend(Object message) throws MessagingException {
94105
this.convertAndSend(getRequiredDefaultDestination(), message);
95106
}
96107

97108
@Override
98-
public <T> void convertAndSend(D destination, T object) {
99-
this.convertAndSend(destination, object, null);
109+
public void convertAndSend(D destination, Object payload) throws MessagingException {
110+
this.convertAndSend(destination, payload, (Map<String, Object>) null);
100111
}
101112

102113
@Override
103-
public <T> void convertAndSend(T object, MessagePostProcessor postProcessor) {
104-
this.convertAndSend(getRequiredDefaultDestination(), object, postProcessor);
114+
public void convertAndSend(D destination, Object payload, Map<String, Object> headers) throws MessagingException {
115+
MessagePostProcessor postProcessor = null;
116+
this.convertAndSend(destination, payload, headers, postProcessor);
105117
}
106118

107119
@Override
108-
public <T> void convertAndSend(D destination, T object, MessagePostProcessor postProcessor)
120+
public void convertAndSend(Object payload, MessagePostProcessor postProcessor) throws MessagingException {
121+
this.convertAndSend(getRequiredDefaultDestination(), payload, postProcessor);
122+
}
123+
124+
@Override
125+
public void convertAndSend(D destination, Object payload, MessagePostProcessor postProcessor)
109126
throws MessagingException {
110127

111-
@SuppressWarnings("unchecked")
112-
Message<?> message = this.converter.toMessage(object);
128+
Map<String, Object> headers = null;
129+
this.convertAndSend(destination, payload, headers, postProcessor);
130+
}
131+
132+
@Override
133+
public void convertAndSend(D destination, Object payload, Map<String, Object> headers,
134+
MessagePostProcessor postProcessor) throws MessagingException {
135+
136+
MessageHeaders messageHeaders = (headers != null) ? new MessageHeaders(headers) : null;
137+
Message<?> message = this.converter.toMessage(payload, messageHeaders);
113138
if (postProcessor != null) {
114139
message = postProcessor.postProcessMessage(message);
115140
}

spring-messaging/src/main/java/org/springframework/messaging/core/AbstractMessagingTemplate.java

Lines changed: 42 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,17 @@
1515
*/
1616
package org.springframework.messaging.core;
1717

18+
import java.util.Map;
19+
1820
import org.springframework.messaging.Message;
21+
import org.springframework.messaging.MessageHeaders;
1922

2023

2124
/**
25+
* Base class for a messaging template that send and receive messages.
26+
*
2227
* @author Mark Fisher
28+
* @author Rossen Stoyanchev
2329
* @since 4.0
2430
*/
2531
public abstract class AbstractMessagingTemplate<D> extends AbstractMessageSendingTemplate<D>
@@ -40,18 +46,22 @@ public <P> Message<P> receive(D destination) {
4046

4147

4248
@Override
43-
public Object receiveAndConvert() {
44-
return this.receiveAndConvert(getRequiredDefaultDestination());
49+
public <T> T receiveAndConvert(Class<T> targetClass) {
50+
return this.receiveAndConvert(getRequiredDefaultDestination(), targetClass);
4551
}
4652

4753
@SuppressWarnings("unchecked")
4854
@Override
49-
public Object receiveAndConvert(D destination) {
55+
public <T> T receiveAndConvert(D destination, Class<T> targetClass) {
5056
Message<?> message = this.doReceive(destination);
51-
return (message != null) ? getConverter().fromMessage(message, null) : null;
57+
if (message != null) {
58+
return (T) getMessageConverter().fromMessage(message, targetClass);
59+
}
60+
else {
61+
return null;
62+
}
5263
}
5364

54-
5565
@Override
5666
public Message<?> sendAndReceive(Message<?> requestMessage) {
5767
return this.sendAndReceive(getRequiredDefaultDestination(), requestMessage);
@@ -66,29 +76,48 @@ public Message<?> sendAndReceive(D destination, Message<?> requestMessage) {
6676

6777

6878
@Override
69-
public Object convertSendAndReceive(Object request) {
70-
return this.convertSendAndReceive(getRequiredDefaultDestination(), request);
79+
public <T> T convertSendAndReceive(Object request, Class<T> targetClass) {
80+
return this.convertSendAndReceive(getRequiredDefaultDestination(), request, targetClass);
81+
}
82+
83+
@Override
84+
public <T> T convertSendAndReceive(D destination, Object request, Class<T> targetClass) {
85+
Map<String, Object> headers = null;
86+
return this.convertSendAndReceive(destination, request, headers, targetClass);
7187
}
7288

7389
@Override
74-
public Object convertSendAndReceive(D destination, Object request) {
90+
public <T> T convertSendAndReceive(D destination, Object request, Map<String, Object> headers,
91+
Class<T> targetClass) {
92+
7593
return this.convertSendAndReceive(destination, request, null);
7694
}
7795

7896
@Override
79-
public Object convertSendAndReceive(Object request, MessagePostProcessor postProcessor) {
80-
return this.convertSendAndReceive(getRequiredDefaultDestination(), request, postProcessor);
97+
public <T> T convertSendAndReceive(Object request, Class<T> targetClass, MessagePostProcessor postProcessor) {
98+
return this.convertSendAndReceive(getRequiredDefaultDestination(), request, targetClass, postProcessor);
99+
}
100+
101+
@Override
102+
public <T> T convertSendAndReceive(D destination, Object request, Class<T> targetClass,
103+
MessagePostProcessor postProcessor) {
104+
105+
Map<String, Object> headers = null;
106+
return this.convertSendAndReceive(destination, request, headers, targetClass, postProcessor);
81107
}
82108

83109
@SuppressWarnings("unchecked")
84110
@Override
85-
public Object convertSendAndReceive(D destination, Object request, MessagePostProcessor postProcessor) {
86-
Message<?> requestMessage = getConverter().toMessage(request);
111+
public <T> T convertSendAndReceive(D destination, Object request, Map<String, Object> headers,
112+
Class<T> targetClass, MessagePostProcessor postProcessor) {
113+
114+
MessageHeaders messageHeaders = (headers != null) ? new MessageHeaders(headers) : null;
115+
Message<?> requestMessage = getMessageConverter().toMessage(request, messageHeaders);
87116
if (postProcessor != null) {
88117
requestMessage = postProcessor.postProcessMessage(requestMessage);
89118
}
90119
Message<?> replyMessage = this.sendAndReceive(destination, requestMessage);
91-
return getConverter().fromMessage(replyMessage, null);
120+
return (T) getMessageConverter().fromMessage(replyMessage, targetClass);
92121
}
93122

94123
}

spring-messaging/src/main/java/org/springframework/messaging/core/DestinationResolvingMessageReceivingOperations.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,16 @@
2020

2121

2222
/**
23+
* A {@link MessageReceivingOperations} that can resolve a String-based destinations.
24+
*
2325
* @author Mark Fisher
26+
* @author Rossen Stoyanchev
2427
* @since 4.0
2528
*/
2629
public interface DestinationResolvingMessageReceivingOperations<D> extends MessageReceivingOperations<D> {
2730

2831
<P> Message<P> receive(String destinationName) throws MessagingException;
2932

30-
Object receiveAndConvert(String destinationName) throws MessagingException;
33+
<T> T receiveAndConvert(String destinationName, Class<T> targetClass) throws MessagingException;
3134

3235
}

spring-messaging/src/main/java/org/springframework/messaging/core/DestinationResolvingMessageRequestReplyOperations.java

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,19 +15,33 @@
1515
*/
1616
package org.springframework.messaging.core;
1717

18+
import java.util.Map;
19+
1820
import org.springframework.messaging.Message;
21+
import org.springframework.messaging.MessagingException;
1922

2023

2124
/**
25+
* A {@link MessageRequestReplyOperations} that can resolve a String-based destinations.
26+
*
2227
* @author Mark Fisher
28+
* @author Rossen Stoyanchev
2329
* @since 4.0
2430
*/
2531
public interface DestinationResolvingMessageRequestReplyOperations<D> extends MessageRequestReplyOperations<D> {
2632

27-
Message<?> sendAndReceive(String destinationName, Message<?> requestMessage);
33+
Message<?> sendAndReceive(String destinationName, Message<?> requestMessage) throws MessagingException;
34+
35+
<T> T convertSendAndReceive(String destinationName, Object request, Class<T> targetClass)
36+
throws MessagingException;
37+
38+
<T> T convertSendAndReceive(String destinationName, Object request, Map<String, Object> headers,
39+
Class<T> targetClass) throws MessagingException;
2840

29-
Object convertSendAndReceive(String destinationName, Object request);
41+
<T> T convertSendAndReceive(String destinationName, Object request,
42+
Class<T> targetClass, MessagePostProcessor requestPostProcessor) throws MessagingException;
3043

31-
Object convertSendAndReceive(String destinationName, Object request, MessagePostProcessor requestPostProcessor);
44+
<T> T convertSendAndReceive(String destinationName, Object request, Map<String, Object> headers,
45+
Class<T> targetClass, MessagePostProcessor requestPostProcessor) throws MessagingException;
3246

3347
}

0 commit comments

Comments
 (0)