diff --git a/spring-cloud-aws-messaging/src/main/java/io/awspring/cloud/messaging/listener/SimpleMessageListenerContainer.java b/spring-cloud-aws-messaging/src/main/java/io/awspring/cloud/messaging/listener/SimpleMessageListenerContainer.java index e9d01970d..9ebbd79ab 100644 --- a/spring-cloud-aws-messaging/src/main/java/io/awspring/cloud/messaging/listener/SimpleMessageListenerContainer.java +++ b/spring-cloud-aws-messaging/src/main/java/io/awspring/cloud/messaging/listener/SimpleMessageListenerContainer.java @@ -429,7 +429,9 @@ public void run() { catch (MessagingException messagingException) { getLogger().warn("An exception occurred while handling message with id: {}", message.getMessageId(), messagingException); - applyDeletionPolicyOnError(receiptHandle); + if (!applyDeletionPolicyOnError(receiptHandle)) { + break; + } } } } @@ -442,11 +444,13 @@ private void applyDeletionPolicyOnSuccess(String receiptHandle) { } } - private void applyDeletionPolicyOnError(String receiptHandle) { + private boolean applyDeletionPolicyOnError(String receiptHandle) { if (this.deletionPolicy == SqsMessageDeletionPolicy.ALWAYS || (this.deletionPolicy == SqsMessageDeletionPolicy.NO_REDRIVE && !this.hasRedrivePolicy)) { deleteMessage(receiptHandle); + return true; } + return false; } private void deleteMessage(String receiptHandle) { diff --git a/spring-cloud-aws-messaging/src/test/java/io/awspring/cloud/messaging/listener/SimpleMessageListenerContainerTest.java b/spring-cloud-aws-messaging/src/test/java/io/awspring/cloud/messaging/listener/SimpleMessageListenerContainerTest.java index aaca92428..ff444901e 100644 --- a/spring-cloud-aws-messaging/src/test/java/io/awspring/cloud/messaging/listener/SimpleMessageListenerContainerTest.java +++ b/spring-cloud-aws-messaging/src/test/java/io/awspring/cloud/messaging/listener/SimpleMessageListenerContainerTest.java @@ -334,6 +334,77 @@ public void handleMessage(org.springframework.messaging.Message message) thro container.stop(); } + @Test + void testReceiveMessagesFromFifoQueueWithError() throws Exception { + SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); + + AmazonSQSAsync sqs = mock(AmazonSQSAsync.class, withSettings().stubOnly()); + container.setAmazonSqs(sqs); + + CountDownLatch countDownLatch = new CountDownLatch(3); + List actualHandledMessages = Collections.synchronizedList(new ArrayList<>()); + QueueMessageHandler messageHandler = new QueueMessageHandler(Collections.emptyList(), + SqsMessageDeletionPolicy.ON_SUCCESS) { + + @Override + public void handleMessage(org.springframework.messaging.Message message) throws MessagingException { + actualHandledMessages.add((String) message.getPayload()); + countDownLatch.countDown(); + throw new MessagingException(message); + } + + }; + container.setMessageHandler(messageHandler); + StaticApplicationContext applicationContext = new StaticApplicationContext(); + applicationContext.registerSingleton("fifoTestMessageListener", FifoTestMessageListener.class); + messageHandler.setApplicationContext(applicationContext); + container.setBeanName("testContainerName"); + messageHandler.afterPropertiesSet(); + + mockGetQueueUrl(sqs, "testQueue.fifo", "http://testSimpleReceiveMessage.amazonaws.com"); + mockGetQueueAttributesWithEmptyResult(sqs, "http://testSimpleReceiveMessage.amazonaws.com"); + + container.afterPropertiesSet(); + + final Message group1Msg1 = fifoMessage("1", "group1Msg1"); + final Message group1Msg2 = fifoMessage("1", "group1Msg2"); + final Message group1Msg3 = fifoMessage("1", "group1Msg3"); + final Message group1Msg4 = fifoMessage("1", "group1Msg4"); + final Message group1Msg5 = fifoMessage("1", "group1Msg5"); + final Message group1Msg6 = fifoMessage("1", "group1Msg6"); + final Message group1Msg7 = fifoMessage("1", "group1Msg7"); + final Message group2Msg1 = fifoMessage("2", "group2Msg1"); + final Message group2Msg2 = fifoMessage("2", "group2Msg2"); + final Message group3Msg1 = fifoMessage("3", "group3Msg1"); + + when(sqs.receiveMessage( + new ReceiveMessageRequest("http://testSimpleReceiveMessage.amazonaws.com").withAttributeNames("All") + .withMessageAttributeNames("All").withMaxNumberOfMessages(10).withWaitTimeSeconds(20))) + .thenReturn(new ReceiveMessageResult().withMessages(group1Msg1, group1Msg2, group1Msg3, + group1Msg4, group1Msg5, group1Msg6, group1Msg7, group2Msg1, group2Msg2, + group3Msg1)) + .thenReturn(new ReceiveMessageResult()); + + when(sqs.getQueueAttributes(any(GetQueueAttributesRequest.class))).thenReturn(new GetQueueAttributesResult()); + + container.start(); + + assertThat(countDownLatch.await(3, TimeUnit.SECONDS)).isTrue(); + + final List actualGroup1Messages = actualHandledMessages.stream().filter(msg -> msg.startsWith("group1")) + .collect(Collectors.toList()); + final List actualGroup2Messages = actualHandledMessages.stream().filter(msg -> msg.startsWith("group2")) + .collect(Collectors.toList()); + final List actualGroup3Messages = actualHandledMessages.stream().filter(msg -> msg.startsWith("group3")) + .collect(Collectors.toList()); + + assertThat(actualGroup1Messages).containsExactly("group1Msg1"); + assertThat(actualGroup2Messages).containsExactly("group2Msg1"); + assertThat(actualGroup3Messages).containsExactly("group3Msg1"); + + container.stop(); + } + @Test void testContainerDoesNotProcessMessageAfterBeingStopped() throws Exception { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();