diff --git a/spring-cloud-aws-messaging/src/main/java/io/awspring/cloud/messaging/listener/SimpleMessageListenerContainer.java b/spring-cloud-aws-messaging/src/main/java/io/awspring/cloud/messaging/listener/SimpleMessageListenerContainer.java index df6542dee..c61531c69 100644 --- a/spring-cloud-aws-messaging/src/main/java/io/awspring/cloud/messaging/listener/SimpleMessageListenerContainer.java +++ b/spring-cloud-aws-messaging/src/main/java/io/awspring/cloud/messaging/listener/SimpleMessageListenerContainer.java @@ -428,6 +428,7 @@ public void run() { } catch (MessagingException messagingException) { applyDeletionPolicyOnError(receiptHandle); + break; } } } diff --git a/spring-cloud-aws-messaging/src/test/java/io/awspring/cloud/messaging/listener/SimpleMessageListenerContainerTest.java b/spring-cloud-aws-messaging/src/test/java/io/awspring/cloud/messaging/listener/SimpleMessageListenerContainerTest.java index aaca92428..6e6c7a969 100644 --- a/spring-cloud-aws-messaging/src/test/java/io/awspring/cloud/messaging/listener/SimpleMessageListenerContainerTest.java +++ b/spring-cloud-aws-messaging/src/test/java/io/awspring/cloud/messaging/listener/SimpleMessageListenerContainerTest.java @@ -793,6 +793,73 @@ void doStop_containerNotRunning_shouldNotThrowAnException() throws Exception { container.stop(); } + @Test + void receiveMessagesFromFifoQueue_stopsProcessingMessageGroup_whenExceptionIsThrown() throws Exception { + // Arrange + SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); + + AmazonSQSAsync sqs = mock(AmazonSQSAsync.class, withSettings().stubOnly()); + container.setAmazonSqs(sqs); + + CountDownLatch countDownLatch = new CountDownLatch(4); + List actualHandledMessages = Collections.synchronizedList(new ArrayList<>()); + QueueMessageHandler messageHandler = new QueueMessageHandler() { + @Override + public void handleMessage(org.springframework.messaging.Message message) throws MessagingException { + super.handleMessage(message); + countDownLatch.countDown(); + actualHandledMessages.add((String) message.getPayload()); + } + }; + container.setMessageHandler(messageHandler); + StaticApplicationContext applicationContext = new StaticApplicationContext(); + applicationContext.registerSingleton("fifoTestMessageListener", + FifoTestMessageListenerThrowingExceptionConditionally.class); + messageHandler.setApplicationContext(applicationContext); + container.setBeanName("testContainerName"); + messageHandler.afterPropertiesSet(); + + mockGetQueueUrl(sqs, "testQueue.fifo", "http://testSimpleReceiveMessage.amazonaws.com"); + mockGetQueueAttributesWithEmptyResult(sqs, "http://testSimpleReceiveMessage.amazonaws.com"); + + container.afterPropertiesSet(); + + Message group1Msg1 = fifoMessage("1", "group1Msg1"); + Message group1Msg2 = fifoMessage("1", "group1Msg2-fail"); + Message group1Msg3 = fifoMessage("1", "group1Msg3-shouldWaitForMsg2"); + Message group1Msg4 = fifoMessage("1", "group1Msg4-shouldWaitForMsg2"); + Message group2Msg1 = fifoMessage("2", "group2Msg1"); + Message group2Msg2 = fifoMessage("2", "group2Msg2"); + Message group3Msg1 = fifoMessage("3", "group3Msg1"); + + when(sqs.receiveMessage( + new ReceiveMessageRequest("http://testSimpleReceiveMessage.amazonaws.com").withAttributeNames("All") + .withMessageAttributeNames("All").withMaxNumberOfMessages(10).withWaitTimeSeconds(20))) + .thenReturn(new ReceiveMessageResult().withMessages(group1Msg1, group1Msg2, group1Msg3, + group1Msg4, group2Msg1, group2Msg2, group3Msg1)) + .thenReturn(new ReceiveMessageResult()); + when(sqs.getQueueAttributes(any(GetQueueAttributesRequest.class))).thenReturn(new GetQueueAttributesResult()); + + // Act + container.start(); + + // Assert + assertThat(countDownLatch.await(5, TimeUnit.SECONDS)).isTrue(); + + List actualGroup1Messages = actualHandledMessages.stream().filter(msg -> msg.startsWith("group1")) + .collect(Collectors.toList()); + List actualGroup2Messages = actualHandledMessages.stream().filter(msg -> msg.startsWith("group2")) + .collect(Collectors.toList()); + List actualGroup3Messages = actualHandledMessages.stream().filter(msg -> msg.startsWith("group3")) + .collect(Collectors.toList()); + + assertThat(actualGroup1Messages).containsExactly("group1Msg1"); + assertThat(actualGroup2Messages).containsExactly("group2Msg1", "group2Msg2"); + assertThat(actualGroup3Messages).containsExactly("group3Msg1"); + + container.stop(); + } + @Test void receiveMessage_throwsAnException_operationShouldBeRetried() throws Exception { // Arrange @@ -1421,6 +1488,21 @@ String getMessage() { } + private static class FifoTestMessageListenerThrowingExceptionConditionally { + + private String message; + + @RuntimeUse + @SqsListener("testQueue.fifo") + private void handleMessage(String message) { + this.message = message; + if (message.contains("-fail")) { + throw new RuntimeException("Message processing failed."); + } + } + + } + private static class AnotherTestMessageListener { private String message;