From 1ee861b28a9dd96e27ba1b705b280b69da9765df Mon Sep 17 00:00:00 2001 From: sdusza1 Date: Thu, 2 Sep 2021 14:09:01 +0200 Subject: [PATCH 1/2] Stop processing messages from fifo queue in MessageGroupExecutor when exception is thrown --- .../SimpleMessageListenerContainer.java | 1 + .../SimpleMessageListenerContainerTest.java | 82 +++++++++++++++++++ 2 files changed, 83 insertions(+) diff --git a/spring-cloud-aws-messaging/src/main/java/io/awspring/cloud/messaging/listener/SimpleMessageListenerContainer.java b/spring-cloud-aws-messaging/src/main/java/io/awspring/cloud/messaging/listener/SimpleMessageListenerContainer.java index df6542dee..c61531c69 100644 --- a/spring-cloud-aws-messaging/src/main/java/io/awspring/cloud/messaging/listener/SimpleMessageListenerContainer.java +++ b/spring-cloud-aws-messaging/src/main/java/io/awspring/cloud/messaging/listener/SimpleMessageListenerContainer.java @@ -428,6 +428,7 @@ public void run() { } catch (MessagingException messagingException) { applyDeletionPolicyOnError(receiptHandle); + break; } } } diff --git a/spring-cloud-aws-messaging/src/test/java/io/awspring/cloud/messaging/listener/SimpleMessageListenerContainerTest.java b/spring-cloud-aws-messaging/src/test/java/io/awspring/cloud/messaging/listener/SimpleMessageListenerContainerTest.java index aaca92428..034c04202 100644 --- a/spring-cloud-aws-messaging/src/test/java/io/awspring/cloud/messaging/listener/SimpleMessageListenerContainerTest.java +++ b/spring-cloud-aws-messaging/src/test/java/io/awspring/cloud/messaging/listener/SimpleMessageListenerContainerTest.java @@ -793,6 +793,73 @@ void doStop_containerNotRunning_shouldNotThrowAnException() throws Exception { container.stop(); } + @Test + void receiveMessagesFromFifoQueue_stoppsProcessingMessageGroup_whenExceptionIsThrown() throws Exception { + // Arrange + SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); + + AmazonSQSAsync sqs = mock(AmazonSQSAsync.class, withSettings().stubOnly()); + container.setAmazonSqs(sqs); + + CountDownLatch countDownLatch = new CountDownLatch(4); + List actualHandledMessages = Collections.synchronizedList(new ArrayList<>()); + QueueMessageHandler messageHandler = new QueueMessageHandler() { + @Override + public void handleMessage(org.springframework.messaging.Message message) throws MessagingException { + super.handleMessage(message); + countDownLatch.countDown(); + actualHandledMessages.add((String) message.getPayload()); + } + }; + container.setMessageHandler(messageHandler); + StaticApplicationContext applicationContext = new StaticApplicationContext(); + applicationContext.registerSingleton("fifoTestMessageListener", + FifoTestMessageListenerThrowingExceptionConditionally.class); + messageHandler.setApplicationContext(applicationContext); + container.setBeanName("testContainerName"); + messageHandler.afterPropertiesSet(); + + mockGetQueueUrl(sqs, "testQueue.fifo", "http://testSimpleReceiveMessage.amazonaws.com"); + mockGetQueueAttributesWithEmptyResult(sqs, "http://testSimpleReceiveMessage.amazonaws.com"); + + container.afterPropertiesSet(); + + Message group1Msg1 = fifoMessage("1", "group1Msg1"); + Message group1Msg2 = fifoMessage("1", "group1Msg2-fail"); + Message group1Msg3 = fifoMessage("1", "group1Msg3-shouldWaitForMsg2"); + Message group1Msg4 = fifoMessage("1", "group1Msg4-shouldWaitForMsg2"); + Message group2Msg1 = fifoMessage("2", "group2Msg1"); + Message group2Msg2 = fifoMessage("2", "group2Msg2"); + Message group3Msg1 = fifoMessage("3", "group3Msg1"); + + when(sqs.receiveMessage( + new ReceiveMessageRequest("http://testSimpleReceiveMessage.amazonaws.com").withAttributeNames("All") + .withMessageAttributeNames("All").withMaxNumberOfMessages(10).withWaitTimeSeconds(20))) + .thenReturn(new ReceiveMessageResult().withMessages(group1Msg1, group1Msg2, group1Msg3, + group1Msg4, group2Msg1, group2Msg2, group3Msg1)) + .thenReturn(new ReceiveMessageResult()); + when(sqs.getQueueAttributes(any(GetQueueAttributesRequest.class))).thenReturn(new GetQueueAttributesResult()); + + // Act + container.start(); + + // Assert + assertThat(countDownLatch.await(5, TimeUnit.SECONDS)).isTrue(); + + List actualGroup1Messages = actualHandledMessages.stream().filter(msg -> msg.startsWith("group1")) + .collect(Collectors.toList()); + List actualGroup2Messages = actualHandledMessages.stream().filter(msg -> msg.startsWith("group2")) + .collect(Collectors.toList()); + List actualGroup3Messages = actualHandledMessages.stream().filter(msg -> msg.startsWith("group3")) + .collect(Collectors.toList()); + + assertThat(actualGroup1Messages).containsExactly("group1Msg1"); + assertThat(actualGroup2Messages).containsExactly("group2Msg1", "group2Msg2"); + assertThat(actualGroup3Messages).containsExactly("group3Msg1"); + + container.stop(); + } + @Test void receiveMessage_throwsAnException_operationShouldBeRetried() throws Exception { // Arrange @@ -1421,6 +1488,21 @@ String getMessage() { } + private static class FifoTestMessageListenerThrowingExceptionConditionally { + + private String message; + + @RuntimeUse + @SqsListener("testQueue.fifo") + private void handleMessage(String message) { + this.message = message; + if (message.contains("-fail")) { + throw new RuntimeException("Message processing failed."); + } + } + + } + private static class AnotherTestMessageListener { private String message; From 1297cfff06eb93f8012568f78b2adb15406a36d6 Mon Sep 17 00:00:00 2001 From: sdusza1 Date: Fri, 3 Sep 2021 08:47:00 +0200 Subject: [PATCH 2/2] Fixed typo --- .../messaging/listener/SimpleMessageListenerContainerTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spring-cloud-aws-messaging/src/test/java/io/awspring/cloud/messaging/listener/SimpleMessageListenerContainerTest.java b/spring-cloud-aws-messaging/src/test/java/io/awspring/cloud/messaging/listener/SimpleMessageListenerContainerTest.java index 034c04202..6e6c7a969 100644 --- a/spring-cloud-aws-messaging/src/test/java/io/awspring/cloud/messaging/listener/SimpleMessageListenerContainerTest.java +++ b/spring-cloud-aws-messaging/src/test/java/io/awspring/cloud/messaging/listener/SimpleMessageListenerContainerTest.java @@ -794,7 +794,7 @@ void doStop_containerNotRunning_shouldNotThrowAnException() throws Exception { } @Test - void receiveMessagesFromFifoQueue_stoppsProcessingMessageGroup_whenExceptionIsThrown() throws Exception { + void receiveMessagesFromFifoQueue_stopsProcessingMessageGroup_whenExceptionIsThrown() throws Exception { // Arrange SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();