From 844420a649d8aa1293bab69c6b8feeb5c1349b81 Mon Sep 17 00:00:00 2001 From: Fernando Moraes Date: Fri, 8 Oct 2021 14:10:56 -0300 Subject: [PATCH 1/2] Fixing sqs fifo message order consuming --- .../SimpleMessageListenerContainer.java | 1 + .../SimpleMessageListenerContainerTest.java | 68 +++++++++++++++++++ 2 files changed, 69 insertions(+) diff --git a/spring-cloud-aws-messaging/src/main/java/io/awspring/cloud/messaging/listener/SimpleMessageListenerContainer.java b/spring-cloud-aws-messaging/src/main/java/io/awspring/cloud/messaging/listener/SimpleMessageListenerContainer.java index df6542dee..c61531c69 100644 --- a/spring-cloud-aws-messaging/src/main/java/io/awspring/cloud/messaging/listener/SimpleMessageListenerContainer.java +++ b/spring-cloud-aws-messaging/src/main/java/io/awspring/cloud/messaging/listener/SimpleMessageListenerContainer.java @@ -428,6 +428,7 @@ public void run() { } catch (MessagingException messagingException) { applyDeletionPolicyOnError(receiptHandle); + break; } } } diff --git a/spring-cloud-aws-messaging/src/test/java/io/awspring/cloud/messaging/listener/SimpleMessageListenerContainerTest.java b/spring-cloud-aws-messaging/src/test/java/io/awspring/cloud/messaging/listener/SimpleMessageListenerContainerTest.java index aaca92428..46a7c0311 100644 --- a/spring-cloud-aws-messaging/src/test/java/io/awspring/cloud/messaging/listener/SimpleMessageListenerContainerTest.java +++ b/spring-cloud-aws-messaging/src/test/java/io/awspring/cloud/messaging/listener/SimpleMessageListenerContainerTest.java @@ -334,6 +334,74 @@ public void handleMessage(org.springframework.messaging.Message message) thro container.stop(); } + @Test + void testReceiveMessagesFromFifoQueueWithError() throws Exception { + SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); + + AmazonSQSAsync sqs = mock(AmazonSQSAsync.class, withSettings().stubOnly()); + container.setAmazonSqs(sqs); + + CountDownLatch countDownLatch = new CountDownLatch(3); + List actualHandledMessages = Collections.synchronizedList(new ArrayList<>()); + QueueMessageHandler messageHandler = new QueueMessageHandler() { + + @Override + public void handleMessage(org.springframework.messaging.Message message) throws MessagingException { + actualHandledMessages.add((String) message.getPayload()); + countDownLatch.countDown(); + throw new MessagingException(message); + } + }; + container.setMessageHandler(messageHandler); + StaticApplicationContext applicationContext = new StaticApplicationContext(); + applicationContext.registerSingleton("fifoTestMessageListener", FifoTestMessageListener.class); + messageHandler.setApplicationContext(applicationContext); + container.setBeanName("testContainerName"); + messageHandler.afterPropertiesSet(); + + mockGetQueueUrl(sqs, "testQueue.fifo", "http://testSimpleReceiveMessage.amazonaws.com"); + mockGetQueueAttributesWithEmptyResult(sqs, "http://testSimpleReceiveMessage.amazonaws.com"); + + container.afterPropertiesSet(); + + final Message group1Msg1 = fifoMessage("1", "group1Msg1"); + final Message group1Msg2 = fifoMessage("1", "group1Msg2"); + final Message group1Msg3 = fifoMessage("1", "group1Msg3"); + final Message group1Msg4 = fifoMessage("1", "group1Msg4"); + final Message group1Msg5 = fifoMessage("1", "group1Msg5"); + final Message group1Msg6 = fifoMessage("1", "group1Msg6"); + final Message group1Msg7 = fifoMessage("1", "group1Msg7"); + final Message group2Msg1 = fifoMessage("2", "group2Msg1"); + final Message group2Msg2 = fifoMessage("2", "group2Msg2"); + final Message group3Msg1 = fifoMessage("3", "group3Msg1"); + + when(sqs.receiveMessage( + new ReceiveMessageRequest("http://testSimpleReceiveMessage.amazonaws.com").withAttributeNames("All") + .withMessageAttributeNames("All").withMaxNumberOfMessages(10).withWaitTimeSeconds(20))) + .thenReturn(new ReceiveMessageResult().withMessages(group1Msg1, group1Msg2, group1Msg3, + group1Msg4, group1Msg5, group1Msg6, group1Msg7, group2Msg1, group2Msg2, + group3Msg1)) + .thenReturn(new ReceiveMessageResult()); + when(sqs.getQueueAttributes(any(GetQueueAttributesRequest.class))).thenReturn(new GetQueueAttributesResult()); + + container.start(); + + assertThat(countDownLatch.await(3, TimeUnit.SECONDS)).isTrue(); + + final List actualGroup1Messages = actualHandledMessages.stream().filter(msg -> msg.startsWith("group1")) + .collect(Collectors.toList()); + final List actualGroup2Messages = actualHandledMessages.stream().filter(msg -> msg.startsWith("group2")) + .collect(Collectors.toList()); + final List actualGroup3Messages = actualHandledMessages.stream().filter(msg -> msg.startsWith("group3")) + .collect(Collectors.toList()); + + assertThat(actualGroup1Messages).containsExactly("group1Msg1"); + assertThat(actualGroup2Messages).containsExactly("group2Msg1"); + assertThat(actualGroup3Messages).containsExactly("group3Msg1"); + + container.stop(); + } + @Test void testContainerDoesNotProcessMessageAfterBeingStopped() throws Exception { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); From 7b268095a84a2b18a894defabc7fe56316e139af Mon Sep 17 00:00:00 2001 From: Fernando Moraes Date: Fri, 8 Oct 2021 15:06:59 -0300 Subject: [PATCH 2/2] Fixing sqs fifo message order consuming --- .../listener/SimpleMessageListenerContainer.java | 9 ++++++--- .../listener/SimpleMessageListenerContainerTest.java | 5 ++++- 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/spring-cloud-aws-messaging/src/main/java/io/awspring/cloud/messaging/listener/SimpleMessageListenerContainer.java b/spring-cloud-aws-messaging/src/main/java/io/awspring/cloud/messaging/listener/SimpleMessageListenerContainer.java index c61531c69..da0c96c66 100644 --- a/spring-cloud-aws-messaging/src/main/java/io/awspring/cloud/messaging/listener/SimpleMessageListenerContainer.java +++ b/spring-cloud-aws-messaging/src/main/java/io/awspring/cloud/messaging/listener/SimpleMessageListenerContainer.java @@ -427,8 +427,9 @@ public void run() { applyDeletionPolicyOnSuccess(receiptHandle); } catch (MessagingException messagingException) { - applyDeletionPolicyOnError(receiptHandle); - break; + if (!applyDeletionPolicyOnError(receiptHandle)) { + break; + } } } } @@ -441,11 +442,13 @@ private void applyDeletionPolicyOnSuccess(String receiptHandle) { } } - private void applyDeletionPolicyOnError(String receiptHandle) { + private boolean applyDeletionPolicyOnError(String receiptHandle) { if (this.deletionPolicy == SqsMessageDeletionPolicy.ALWAYS || (this.deletionPolicy == SqsMessageDeletionPolicy.NO_REDRIVE && !this.hasRedrivePolicy)) { deleteMessage(receiptHandle); + return true; } + return false; } private void deleteMessage(String receiptHandle) { diff --git a/spring-cloud-aws-messaging/src/test/java/io/awspring/cloud/messaging/listener/SimpleMessageListenerContainerTest.java b/spring-cloud-aws-messaging/src/test/java/io/awspring/cloud/messaging/listener/SimpleMessageListenerContainerTest.java index 46a7c0311..ff444901e 100644 --- a/spring-cloud-aws-messaging/src/test/java/io/awspring/cloud/messaging/listener/SimpleMessageListenerContainerTest.java +++ b/spring-cloud-aws-messaging/src/test/java/io/awspring/cloud/messaging/listener/SimpleMessageListenerContainerTest.java @@ -343,7 +343,8 @@ void testReceiveMessagesFromFifoQueueWithError() throws Exception { CountDownLatch countDownLatch = new CountDownLatch(3); List actualHandledMessages = Collections.synchronizedList(new ArrayList<>()); - QueueMessageHandler messageHandler = new QueueMessageHandler() { + QueueMessageHandler messageHandler = new QueueMessageHandler(Collections.emptyList(), + SqsMessageDeletionPolicy.ON_SUCCESS) { @Override public void handleMessage(org.springframework.messaging.Message message) throws MessagingException { @@ -351,6 +352,7 @@ public void handleMessage(org.springframework.messaging.Message message) thro countDownLatch.countDown(); throw new MessagingException(message); } + }; container.setMessageHandler(messageHandler); StaticApplicationContext applicationContext = new StaticApplicationContext(); @@ -382,6 +384,7 @@ public void handleMessage(org.springframework.messaging.Message message) thro group1Msg4, group1Msg5, group1Msg6, group1Msg7, group2Msg1, group2Msg2, group3Msg1)) .thenReturn(new ReceiveMessageResult()); + when(sqs.getQueueAttributes(any(GetQueueAttributesRequest.class))).thenReturn(new GetQueueAttributesResult()); container.start();