Skip to content

Commit 4d7b950

Browse files
committed
* Fix AmqpClientMessageProducer for throwing a ListenerExecutionFailedException
when exception is bubbled from the downstream flow * Add test for error handling * Add test for batch manual ack * Remove redundant (and dangerous) `@ComponentScan` from the `ManualAckTests`: it does not do anything for the test suite, but is able to see all the `@Configuration` classes in other tests of this package. That may lead to unexpected behavior and failures
1 parent 6deab2a commit 4d7b950

File tree

3 files changed

+123
-13
lines changed

3 files changed

+123
-13
lines changed

spring-integration-amqp/src/main/java/org/springframework/integration/amqp/inbound/AmqpClientMessageProducer.java

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.springframework.amqp.core.AmqpAcknowledgment;
3232
import org.springframework.amqp.core.MessagePostProcessor;
3333
import org.springframework.amqp.core.MessageProperties;
34+
import org.springframework.amqp.rabbit.support.ListenerExecutionFailedException;
3435
import org.springframework.amqp.rabbitmq.client.AmqpConnectionFactory;
3536
import org.springframework.amqp.rabbitmq.client.RabbitAmqpUtils;
3637
import org.springframework.amqp.rabbitmq.client.listener.RabbitAmqpListenerContainer;
@@ -189,14 +190,19 @@ public boolean isPaused() {
189190
return this.paused;
190191
}
191192

192-
private class IntegrationRabbitAmqpMessageListener implements RabbitAmqpMessageListener {
193+
private final class IntegrationRabbitAmqpMessageListener implements RabbitAmqpMessageListener {
193194

194195
@Override
195196
public void onAmqpMessage(com.rabbitmq.client.amqp.Message amqpMessage, Consumer.@Nullable Context context) {
196197
org.springframework.amqp.core.Message message = RabbitAmqpUtils.fromAmqpMessage(amqpMessage, context);
197198
Message<?> messageToSend = toSpringMessage(message);
198199

199-
sendMessage(messageToSend);
200+
try {
201+
sendMessage(messageToSend);
202+
}
203+
catch (Exception ex) {
204+
throw new ListenerExecutionFailedException(getComponentName() + ".onAmqpMessage() failed", ex, message);
205+
}
200206
}
201207

202208
@Override
@@ -216,7 +222,13 @@ public void onMessageBatch(List<org.springframework.amqp.core.Message> messages)
216222
.setHeader(IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK, acknowledgmentCallback)
217223
.build();
218224

219-
sendMessage(messageToSend);
225+
try {
226+
sendMessage(messageToSend);
227+
}
228+
catch (Exception ex) {
229+
throw new ListenerExecutionFailedException(getComponentName() + ".onMessageBatch() failed", ex,
230+
messages.toArray(org.springframework.amqp.core.Message[]::new));
231+
}
220232
}
221233

222234
private Message<?> toSpringMessage(org.springframework.amqp.core.Message message) {

spring-integration-amqp/src/test/java/org/springframework/integration/amqp/inbound/AmqpClientMessageProducerTests.java

Lines changed: 108 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,27 +16,40 @@
1616

1717
package org.springframework.integration.amqp.inbound;
1818

19-
import java.util.List;
19+
import java.util.concurrent.TimeUnit;
20+
import java.util.concurrent.atomic.AtomicReference;
21+
import java.util.stream.Stream;
2022

2123
import com.rabbitmq.client.amqp.Environment;
2224
import com.rabbitmq.client.amqp.impl.AmqpEnvironmentBuilder;
2325
import org.assertj.core.api.InstanceOfAssertFactories;
2426
import org.junit.jupiter.api.Test;
2527

28+
import org.springframework.amqp.core.Binding;
29+
import org.springframework.amqp.core.BindingBuilder;
30+
import org.springframework.amqp.core.Declarables;
2631
import org.springframework.amqp.core.Queue;
32+
import org.springframework.amqp.core.QueueBuilder;
33+
import org.springframework.amqp.core.TopicExchange;
34+
import org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler;
35+
import org.springframework.amqp.rabbit.support.ListenerExecutionFailedException;
2736
import org.springframework.amqp.rabbitmq.client.AmqpConnectionFactory;
2837
import org.springframework.amqp.rabbitmq.client.RabbitAmqpAdmin;
2938
import org.springframework.amqp.rabbitmq.client.RabbitAmqpTemplate;
3039
import org.springframework.amqp.rabbitmq.client.SingleAmqpConnectionFactory;
40+
import org.springframework.amqp.rabbitmq.client.listener.RabbitAmqpListenerContainer;
3141
import org.springframework.beans.factory.annotation.Autowired;
3242
import org.springframework.context.annotation.Bean;
3343
import org.springframework.context.annotation.Configuration;
3444
import org.springframework.integration.StaticMessageHeaderAccessor;
3545
import org.springframework.integration.acks.SimpleAcknowledgment;
3646
import org.springframework.integration.amqp.support.RabbitTestContainer;
47+
import org.springframework.integration.channel.FixedSubscriberChannel;
3748
import org.springframework.integration.channel.QueueChannel;
3849
import org.springframework.integration.config.EnableIntegration;
50+
import org.springframework.integration.test.util.TestUtils;
3951
import org.springframework.messaging.Message;
52+
import org.springframework.messaging.converter.MessageConversionException;
4053
import org.springframework.test.annotation.DirtiesContext;
4154
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
4255

@@ -94,10 +107,60 @@ void receiveBatch() {
94107
.extracting(Message::getPayload)
95108
.asInstanceOf(InstanceOfAssertFactories.list(Message.class))
96109
.hasSize(2)
97-
.extracting(Message::getPayload)
110+
.extracting(Message<String>::getPayload)
98111
.contains("test data #3", "test data #4");
99112
}
100113

114+
@Test
115+
void receiveBatchAndAck() {
116+
this.rabbitTemplate.convertAndSend("q4", "test data #5");
117+
this.rabbitTemplate.convertAndSend("q4", "test data #6");
118+
this.rabbitTemplate.convertAndSend("q4", "test data #7");
119+
120+
Message<?> receive = this.inputChannel.receive(10_000);
121+
122+
assertThat(receive)
123+
.extracting(Message::getPayload)
124+
.asInstanceOf(InstanceOfAssertFactories.list(Message.class))
125+
.hasSize(3)
126+
.extracting(Message<String>::getPayload)
127+
.contains("test data #5", "test data #6", "test data #7");
128+
129+
SimpleAcknowledgment acknowledgment = StaticMessageHeaderAccessor.getAcknowledgment(receive);
130+
assertThat(acknowledgment).isNotNull();
131+
acknowledgment.acknowledge();
132+
}
133+
134+
@Autowired
135+
AmqpClientMessageProducer failureAmqpClientMessageProducer;
136+
137+
@Test
138+
void failureAfterReceiving() {
139+
RabbitAmqpListenerContainer listenerContainer =
140+
TestUtils.getPropertyValue(this.failureAmqpClientMessageProducer, "listenerContainer",
141+
RabbitAmqpListenerContainer.class);
142+
143+
AtomicReference<Throwable> listenerError = new AtomicReference<>();
144+
145+
listenerContainer.setErrorHandler(new ConditionalRejectingErrorHandler() {
146+
147+
@Override
148+
protected void log(Throwable t) {
149+
listenerError.set(t);
150+
}
151+
152+
});
153+
154+
this.rabbitTemplate.convertAndSend("queueForError", "discard");
155+
156+
assertThat(this.rabbitTemplate.receive("dlq1")).succeedsWithin(20, TimeUnit.SECONDS);
157+
158+
assertThat(listenerError.get())
159+
.asInstanceOf(InstanceOfAssertFactories.throwable(ListenerExecutionFailedException.class))
160+
.hasCauseInstanceOf(MessageConversionException.class)
161+
.hasStackTraceContaining("Intentional conversion failure");
162+
}
163+
101164
@Configuration(proxyBeanMethods = false)
102165
@EnableIntegration
103166
public static class ContextConfiguration {
@@ -122,18 +185,28 @@ RabbitAmqpAdmin admin(AmqpConnectionFactory connectionFactory) {
122185
}
123186

124187
@Bean
125-
Queue q1() {
126-
return new Queue("q1");
188+
Declarables declarables() {
189+
return new Declarables(Stream.of("q1", "q2", "q3", "q4").map(Queue::new).toArray(Queue[]::new));
190+
}
191+
192+
@Bean
193+
Queue queueForError() {
194+
return QueueBuilder.durable("queueForError").deadLetterExchange("dlx1").build();
195+
}
196+
197+
@Bean
198+
TopicExchange dlx1() {
199+
return new TopicExchange("dlx1");
127200
}
128201

129202
@Bean
130-
Queue q2() {
131-
return new Queue("q2");
203+
Queue dlq1() {
204+
return new Queue("dlq1");
132205
}
133206

134207
@Bean
135-
Queue q3() {
136-
return new Queue("q3");
208+
Binding dlq1Binding(Queue dlq1, TopicExchange dlx1) {
209+
return BindingBuilder.bind(dlq1).to(dlx1).with("#");
137210
}
138211

139212
@Bean
@@ -175,6 +248,33 @@ AmqpClientMessageProducer batchAmqpClientMessageProducer(AmqpConnectionFactory c
175248
return amqpClientMessageProducer;
176249
}
177250

251+
@Bean
252+
AmqpClientMessageProducer batchManualAckAmqpClientMessageProducer(AmqpConnectionFactory connectionFactory,
253+
QueueChannel inputChannel) {
254+
255+
AmqpClientMessageProducer amqpClientMessageProducer = new AmqpClientMessageProducer(connectionFactory, "q4");
256+
amqpClientMessageProducer.setOutputChannel(inputChannel);
257+
amqpClientMessageProducer.setBatchSize(3);
258+
amqpClientMessageProducer.setAutoSettle(false);
259+
return amqpClientMessageProducer;
260+
}
261+
262+
@Bean
263+
AmqpClientMessageProducer failureAmqpClientMessageProducer(AmqpConnectionFactory connectionFactory,
264+
FixedSubscriberChannel conversionChannel) {
265+
266+
var amqpClientMessageProducer = new AmqpClientMessageProducer(connectionFactory, "queueForError");
267+
amqpClientMessageProducer.setOutputChannel(conversionChannel);
268+
return amqpClientMessageProducer;
269+
}
270+
271+
@Bean
272+
FixedSubscriberChannel conversionChannel() {
273+
return new FixedSubscriberChannel(message -> {
274+
throw new MessageConversionException(message, "Intentional conversion failure");
275+
});
276+
}
277+
178278
}
179279

180280
}

spring-integration-amqp/src/test/java/org/springframework/integration/amqp/inbound/ManualAckTests.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@
3030
import org.springframework.beans.factory.BeanFactory;
3131
import org.springframework.beans.factory.annotation.Autowired;
3232
import org.springframework.context.annotation.Bean;
33-
import org.springframework.context.annotation.ComponentScan;
3433
import org.springframework.context.annotation.Configuration;
3534
import org.springframework.integration.amqp.support.RabbitTestContainer;
3635
import org.springframework.integration.annotation.MessageEndpoint;
@@ -93,7 +92,6 @@ public void testManual() {
9392

9493
@Configuration
9594
@EnableIntegration
96-
@ComponentScan
9795
@MessageEndpoint
9896
public static class ManualAckConfig {
9997

0 commit comments

Comments
 (0)