Skip to content

Commit 51797ad

Browse files
committed
INT-3978 Fix MessageHistory for Message types
JIRA: https://jira.spring.io/browse/INT-3978 Previously the `MessageHistory.write()` always used `MessageBuilder` to populated the `history` header. Since the `MessageBuilder` doesn't care about the `source` message type, we might lose some important information, like `AdviceMessage.inputMessage` * Add conditional logic into the `MessageHistory` for all known `Message<?>` implementations * Provide appropriate tests in the `MessageHistoryTests` for known `Message<?>` implementations * Make `AdviceMessage` generic and fix all affected code on the matter * Since `MutableMessage` is public already, fix `MongoDbMessageStore.DBObjectToMutableMessageConverter` to use `MutableMessage` type directly.
1 parent 020aa01 commit 51797ad

File tree

7 files changed

+133
-51
lines changed

7 files changed

+133
-51
lines changed

spring-integration-core/src/main/java/org/springframework/integration/handler/advice/ExpressionEvaluatingRequestHandlerAdvice.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,7 @@ private void evaluateSuccessExpression(Message<?> message) throws Exception {
164164
evaluationFailed = true;
165165
}
166166
if (evalResult != null && this.successChannel != null) {
167-
AdviceMessage resultMessage = new AdviceMessage(evalResult, message);
167+
AdviceMessage<?> resultMessage = new AdviceMessage<Object>(evalResult, message);
168168
this.messagingTemplate.send(this.successChannel, resultMessage);
169169
}
170170
if (evaluationFailed && this.propagateOnSuccessEvaluationFailures) {

spring-integration-core/src/main/java/org/springframework/integration/history/MessageHistory.java

Lines changed: 30 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,15 +25,20 @@
2525
import java.util.ListIterator;
2626
import java.util.Properties;
2727

28+
import org.springframework.integration.IntegrationMessageHeaderAccessor;
29+
import org.springframework.integration.message.AdviceMessage;
2830
import org.springframework.integration.support.DefaultMessageBuilderFactory;
2931
import org.springframework.integration.support.MessageBuilderFactory;
32+
import org.springframework.integration.support.MutableMessage;
3033
import org.springframework.integration.support.context.NamedComponent;
3134
import org.springframework.messaging.Message;
35+
import org.springframework.messaging.support.ErrorMessage;
3236
import org.springframework.util.Assert;
3337
import org.springframework.util.StringUtils;
3438

3539
/**
3640
* @author Mark Fisher
41+
* @author Artem Bilan
3742
* @since 2.0
3843
*/
3944
@SuppressWarnings("serial")
@@ -47,21 +52,21 @@ public final class MessageHistory implements List<Properties>, Serializable {
4752

4853
public static final String TIMESTAMP_PROPERTY = "timestamp";
4954

50-
private static final MessageBuilderFactory mesageBuilderFactory = new DefaultMessageBuilderFactory();
55+
private static final MessageBuilderFactory MESSAGE_BUILDER_FACTORY = new DefaultMessageBuilderFactory();
5156

5257

5358
private final List<Properties> components;
5459

5560

5661
public static MessageHistory read(Message<?> message) {
57-
return (message != null) ?
58-
message.getHeaders().get(HEADER_NAME, MessageHistory.class) : null;
62+
return message != null ? message.getHeaders().get(HEADER_NAME, MessageHistory.class) : null;
5963
}
6064

6165
public static <T> Message<T> write(Message<T> message, NamedComponent component) {
62-
return write(message, component, mesageBuilderFactory);
66+
return write(message, component, MESSAGE_BUILDER_FACTORY);
6367
}
6468

69+
@SuppressWarnings("unchecked")
6570
public static <T> Message<T> write(Message<T> message, NamedComponent component,
6671
MessageBuilderFactory messageBuilderFactory) {
6772
Assert.notNull(message, "Message must not be null");
@@ -73,7 +78,26 @@ public static <T> Message<T> write(Message<T> message, NamedComponent component,
7378
new ArrayList<Properties>(previousHistory) : new ArrayList<Properties>();
7479
components.add(metadata);
7580
MessageHistory history = new MessageHistory(components);
76-
message = messageBuilderFactory.fromMessage(message).setHeader(HEADER_NAME, history).build();
81+
82+
if (message instanceof MutableMessage) {
83+
message.getHeaders().put(HEADER_NAME, history);
84+
}
85+
else if (message instanceof ErrorMessage) {
86+
IntegrationMessageHeaderAccessor headerAccessor = new IntegrationMessageHeaderAccessor(message);
87+
headerAccessor.setHeader(HEADER_NAME, history);
88+
Throwable payload = ((ErrorMessage) message).getPayload();
89+
ErrorMessage errorMessage = new ErrorMessage(payload, headerAccessor.toMessageHeaders());
90+
message = (Message<T>) errorMessage;
91+
}
92+
else if (message instanceof AdviceMessage) {
93+
IntegrationMessageHeaderAccessor headerAccessor = new IntegrationMessageHeaderAccessor(message);
94+
headerAccessor.setHeader(HEADER_NAME, history);
95+
message = new AdviceMessage<T>(message.getPayload(), headerAccessor.toMessageHeaders(),
96+
((AdviceMessage) message).getInputMessage());
97+
}
98+
else {
99+
message = messageBuilderFactory.fromMessage(message).setHeader(HEADER_NAME, history).build();
100+
}
77101
}
78102
return message;
79103
}
@@ -263,6 +287,7 @@ public String getTimestamp() {
263287
private void setTimestamp(String timestamp) {
264288
this.setProperty(TIMESTAMP_PROPERTY, timestamp);
265289
}
290+
266291
}
267292

268293
}

spring-integration-core/src/main/java/org/springframework/integration/message/AdviceMessage.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,20 +29,22 @@
2929
* handler.
3030
* .
3131
* @author Gary Russell
32+
* @author Artem Bilan
33+
*
3234
* @since 2.2
3335
*/
34-
public class AdviceMessage extends GenericMessage<Object> {
36+
public class AdviceMessage<T> extends GenericMessage<T> {
3537

3638
private static final long serialVersionUID = 1L;
3739

3840
private final Message<?> inputMessage;
3941

40-
public AdviceMessage(Object payload, Message<?> inputMessage) {
42+
public AdviceMessage(T payload, Message<?> inputMessage) {
4143
super(payload);
4244
this.inputMessage = inputMessage;
4345
}
4446

45-
public AdviceMessage(Object payload, Map<String, Object> headers, Message<?> inputMessage) {
47+
public AdviceMessage(T payload, Map<String, Object> headers, Message<?> inputMessage) {
4648
super(payload, headers);
4749
this.inputMessage = inputMessage;
4850
}

spring-integration-core/src/test/java/org/springframework/integration/core/MessageHistoryTests.java

Lines changed: 73 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2010 the original author or authors.
2+
* Copyright 2002-2016 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -16,22 +16,30 @@
1616

1717
package org.springframework.integration.core;
1818

19+
import static org.hamcrest.Matchers.instanceOf;
1920
import static org.junit.Assert.assertEquals;
2021
import static org.junit.Assert.assertNotNull;
22+
import static org.junit.Assert.assertNotSame;
2123
import static org.junit.Assert.assertNull;
24+
import static org.junit.Assert.assertSame;
25+
import static org.junit.Assert.assertThat;
2226

2327
import java.util.Properties;
2428

2529
import org.junit.Test;
2630

2731
import org.springframework.integration.history.MessageHistory;
28-
import org.springframework.messaging.support.GenericMessage;
32+
import org.springframework.integration.message.AdviceMessage;
2933
import org.springframework.integration.support.MessageBuilder;
34+
import org.springframework.integration.support.MutableMessage;
3035
import org.springframework.integration.support.context.NamedComponent;
3136
import org.springframework.messaging.Message;
37+
import org.springframework.messaging.support.ErrorMessage;
38+
import org.springframework.messaging.support.GenericMessage;
3239

3340
/**
3441
* @author Mark Fisher
42+
* @author Artem Bilan
3543
* @since 2.0
3644
*/
3745
public class MessageHistoryTests {
@@ -57,6 +65,69 @@ public void verifyImmutability() {
5765
history.add(new Properties());
5866
}
5967

68+
@Test
69+
public void testCorrectMutableMessageAfterWrite() {
70+
MutableMessage<String> original = new MutableMessage<>("foo");
71+
assertNull(MessageHistory.read(original));
72+
Message<String> result1 = MessageHistory.write(original, new TestComponent(1));
73+
assertThat(result1, instanceOf(MutableMessage.class));
74+
assertSame(original, result1);
75+
MessageHistory history1 = MessageHistory.read(result1);
76+
assertNotNull(history1);
77+
assertEquals("testComponent-1", history1.toString());
78+
Message<String> result2 = MessageHistory.write(result1, new TestComponent(2));
79+
assertSame(original, result2);
80+
MessageHistory history2 = MessageHistory.read(result2);
81+
assertNotNull(history2);
82+
assertEquals("testComponent-1,testComponent-2", history2.toString());
83+
}
84+
85+
@Test
86+
public void testCorrectErrorMessageAfterWrite() {
87+
RuntimeException payload = new RuntimeException();
88+
ErrorMessage original = new ErrorMessage(payload);
89+
assertNull(MessageHistory.read(original));
90+
Message<Throwable> result1 = MessageHistory.write(original, new TestComponent(1));
91+
assertThat(result1, instanceOf(ErrorMessage.class));
92+
assertNotSame(original, result1);
93+
assertSame(original.getPayload(), result1.getPayload());
94+
MessageHistory history1 = MessageHistory.read(result1);
95+
assertNotNull(history1);
96+
assertEquals("testComponent-1", history1.toString());
97+
Message<Throwable> result2 = MessageHistory.write(result1, new TestComponent(2));
98+
assertThat(result2, instanceOf(ErrorMessage.class));
99+
assertNotSame(original, result2);
100+
assertNotSame(result1, result2);
101+
assertSame(original.getPayload(), result2.getPayload());
102+
MessageHistory history2 = MessageHistory.read(result2);
103+
assertNotNull(history2);
104+
assertEquals("testComponent-1,testComponent-2", history2.toString());
105+
}
106+
107+
@Test
108+
public void testCorrectAdviceMessageAfterWrite() {
109+
Message<?> inputMessage = new GenericMessage<>("input");
110+
AdviceMessage<String> original = new AdviceMessage<>("foo", inputMessage);
111+
assertNull(MessageHistory.read(original));
112+
Message<String> result1 = MessageHistory.write(original, new TestComponent(1));
113+
assertThat(result1, instanceOf(AdviceMessage.class));
114+
assertNotSame(original, result1);
115+
assertSame(original.getPayload(), result1.getPayload());
116+
assertSame(original.getInputMessage(), ((AdviceMessage) result1).getInputMessage());
117+
MessageHistory history1 = MessageHistory.read(result1);
118+
assertNotNull(history1);
119+
assertEquals("testComponent-1", history1.toString());
120+
Message<String> result2 = MessageHistory.write(result1, new TestComponent(2));
121+
assertThat(result2, instanceOf(AdviceMessage.class));
122+
assertNotSame(original, result2);
123+
assertSame(original.getPayload(), result2.getPayload());
124+
assertSame(original.getInputMessage(), ((AdviceMessage) result2).getInputMessage());
125+
assertNotSame(result1, result2);
126+
MessageHistory history2 = MessageHistory.read(result2);
127+
assertNotNull(history2);
128+
assertEquals("testComponent-1,testComponent-2", history2.toString());
129+
}
130+
60131

61132
private static class TestComponent implements NamedComponent {
62133

spring-integration-core/src/test/java/org/springframework/integration/handler/advice/AdvisedMessageHandlerTests.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,7 @@ protected Object handleRequestMessage(Message<?> requestMessage) {
164164

165165
Message<?> success = successChannel.receive(1000);
166166
assertNotNull(success);
167-
assertEquals("Hello, world!", ((AdviceMessage) success).getInputMessage().getPayload());
167+
assertEquals("Hello, world!", ((AdviceMessage<?>) success).getInputMessage().getPayload());
168168
assertEquals("foo", success.getPayload());
169169

170170
// advice with failure, not trapped
@@ -244,7 +244,7 @@ protected Object handleRequestMessage(Message<?> requestMessage) {
244244

245245
Message<?> success = successChannel.receive(1000);
246246
assertNotNull(success);
247-
assertEquals("Hello, world!", ((AdviceMessage) success).getInputMessage().getPayload());
247+
assertEquals("Hello, world!", ((AdviceMessage<?>) success).getInputMessage().getPayload());
248248
assertEquals(ArithmeticException.class, success.getPayload().getClass());
249249
assertEquals("/ by zero", ((Exception) success.getPayload()).getMessage());
250250

@@ -262,7 +262,7 @@ protected Object handleRequestMessage(Message<?> requestMessage) {
262262

263263
success = successChannel.receive(1000);
264264
assertNotNull(success);
265-
assertEquals("Hello, world!", ((AdviceMessage) success).getInputMessage().getPayload());
265+
assertEquals("Hello, world!", ((AdviceMessage<?>) success).getInputMessage().getPayload());
266266
assertEquals(ArithmeticException.class, success.getPayload().getClass());
267267
assertEquals("/ by zero", ((Exception) success.getPayload()).getMessage());
268268

spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/store/MongoDbMessageStore.java

Lines changed: 15 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,11 @@
1919
import java.util.ArrayList;
2020
import java.util.Collection;
2121
import java.util.HashMap;
22-
import java.util.HashSet;
2322
import java.util.Iterator;
2423
import java.util.List;
2524
import java.util.Map;
2625
import java.util.Map.Entry;
2726
import java.util.Properties;
28-
import java.util.Set;
2927
import java.util.UUID;
3028

3129
import org.springframework.beans.BeansException;
@@ -34,9 +32,7 @@
3432
import org.springframework.beans.factory.InitializingBean;
3533
import org.springframework.context.ApplicationContext;
3634
import org.springframework.context.ApplicationContextAware;
37-
import org.springframework.core.convert.TypeDescriptor;
3835
import org.springframework.core.convert.converter.Converter;
39-
import org.springframework.core.convert.converter.GenericConverter;
4036
import org.springframework.core.serializer.support.DeserializingConverter;
4137
import org.springframework.core.serializer.support.SerializingConverter;
4238
import org.springframework.data.annotation.Id;
@@ -64,6 +60,7 @@
6460
import org.springframework.integration.store.MessageGroup;
6561
import org.springframework.integration.store.MessageGroupStore;
6662
import org.springframework.integration.store.MessageStore;
63+
import org.springframework.integration.support.MutableMessage;
6764
import org.springframework.integration.support.MutableMessageBuilder;
6865
import org.springframework.jmx.export.annotation.ManagedAttribute;
6966
import org.springframework.messaging.Message;
@@ -650,44 +647,27 @@ public GenericMessage<?> convert(DBObject source) {
650647

651648
}
652649

653-
private final class DBObjectToMutableMessageConverter implements GenericConverter {
650+
private final class DBObjectToMutableMessageConverter implements Converter<DBObject, MutableMessage<?>> {
654651

655-
private final Class<?> mutableMessageClass;
656-
657-
private DBObjectToMutableMessageConverter() {
658-
try {
659-
this.mutableMessageClass = ClassUtils.forName("org.springframework.integration.support.MutableMessage",
660-
MongoDbMessageStore.this.classLoader);
661-
}
662-
catch (ClassNotFoundException e) {
663-
throw new IllegalStateException(e);
664-
}
665-
}
666652

667653
@Override
668-
public Set<ConvertiblePair> getConvertibleTypes() {
669-
Set<ConvertiblePair> convertiblePairs = new HashSet<ConvertiblePair>();
670-
convertiblePairs.add(new ConvertiblePair(DBObject.class, this.mutableMessageClass));
671-
return convertiblePairs;
672-
}
673-
674-
@Override
675-
public Object convert(Object source, TypeDescriptor sourceType, TypeDescriptor targetType) {
676-
DBObject dbObject = (DBObject) source;
654+
public MutableMessage<?> convert(DBObject source) {
677655
@SuppressWarnings("unchecked")
678656
Map<String, Object> headers =
679-
MongoDbMessageStore.this.converter.normalizeHeaders((Map<String, Object>) dbObject.get("headers"));
657+
MongoDbMessageStore.this.converter.normalizeHeaders((Map<String, Object>) source.get("headers"));
680658

681-
return MutableMessageBuilder.withPayload(MongoDbMessageStore.this.converter.extractPayload(dbObject))
659+
Object payload = MongoDbMessageStore.this.converter.extractPayload(source);
660+
return (MutableMessage<?>) MutableMessageBuilder.withPayload(payload)
682661
.copyHeaders(headers)
683662
.build();
684663
}
664+
685665
}
686666

687-
private class DBObjectToAdviceMessageConverter implements Converter<DBObject, AdviceMessage> {
667+
private class DBObjectToAdviceMessageConverter implements Converter<DBObject, AdviceMessage<?>> {
688668

689669
@Override
690-
public AdviceMessage convert(DBObject source) {
670+
public AdviceMessage<?> convert(DBObject source) {
691671
@SuppressWarnings("unchecked")
692672
Map<String, Object> headers =
693673
MongoDbMessageStore.this.converter.normalizeHeaders((Map<String, Object>) source.get("headers"));
@@ -698,16 +678,18 @@ public AdviceMessage convert(DBObject source) {
698678
DBObject inputMessageObject = (DBObject) source.get("inputMessage");
699679
Object inputMessageType = inputMessageObject.get("_class");
700680
try {
701-
Class<?> messageClass = ClassUtils.forName(inputMessageType.toString(), MongoDbMessageStore.this.classLoader);
702-
inputMessage = (Message<?>) MongoDbMessageStore.this.converter.read(messageClass, inputMessageObject);
681+
Class<?> messageClass = ClassUtils.forName(inputMessageType.toString(),
682+
MongoDbMessageStore.this.classLoader);
683+
inputMessage = (Message<?>) MongoDbMessageStore.this.converter.read(messageClass,
684+
inputMessageObject);
703685
}
704686
catch (Exception e) {
705687
throw new IllegalStateException("failed to load class: " + inputMessageType, e);
706688
}
707689
}
708690

709-
AdviceMessage message =
710-
new AdviceMessage(MongoDbMessageStore.this.converter.extractPayload(source), headers, inputMessage);
691+
AdviceMessage<?> message = new AdviceMessage<Object>(
692+
MongoDbMessageStore.this.converter.extractPayload(source), headers, inputMessage);
711693
enhanceHeaders(message.getHeaders(), headers);
712694

713695
return message;

0 commit comments

Comments
 (0)