Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -428,6 +428,7 @@ public void run() {
}
catch (MessagingException messagingException) {
applyDeletionPolicyOnError(receiptHandle);
break;
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering if this break should be executed only when this.deletionPolicy == ON_SUCCESS ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hello @sebastiandusza

I missed your pull request, and opened another with the same implementation.
Take a look what I implemented about this question.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @fernandomoraes, cool :-)

I posted a small comment. Please consider if it makes sense.

}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -793,6 +793,73 @@ void doStop_containerNotRunning_shouldNotThrowAnException() throws Exception {
container.stop();
}

@Test
void receiveMessagesFromFifoQueue_stopsProcessingMessageGroup_whenExceptionIsThrown() throws Exception {
// Arrange
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();

AmazonSQSAsync sqs = mock(AmazonSQSAsync.class, withSettings().stubOnly());
container.setAmazonSqs(sqs);

CountDownLatch countDownLatch = new CountDownLatch(4);
List<String> actualHandledMessages = Collections.synchronizedList(new ArrayList<>());
QueueMessageHandler messageHandler = new QueueMessageHandler() {
@Override
public void handleMessage(org.springframework.messaging.Message<?> message) throws MessagingException {
super.handleMessage(message);
countDownLatch.countDown();
actualHandledMessages.add((String) message.getPayload());
}
};
container.setMessageHandler(messageHandler);
StaticApplicationContext applicationContext = new StaticApplicationContext();
applicationContext.registerSingleton("fifoTestMessageListener",
FifoTestMessageListenerThrowingExceptionConditionally.class);
messageHandler.setApplicationContext(applicationContext);
container.setBeanName("testContainerName");
messageHandler.afterPropertiesSet();

mockGetQueueUrl(sqs, "testQueue.fifo", "http://testSimpleReceiveMessage.amazonaws.com");
mockGetQueueAttributesWithEmptyResult(sqs, "http://testSimpleReceiveMessage.amazonaws.com");

container.afterPropertiesSet();

Message group1Msg1 = fifoMessage("1", "group1Msg1");
Message group1Msg2 = fifoMessage("1", "group1Msg2-fail");
Message group1Msg3 = fifoMessage("1", "group1Msg3-shouldWaitForMsg2");
Message group1Msg4 = fifoMessage("1", "group1Msg4-shouldWaitForMsg2");
Message group2Msg1 = fifoMessage("2", "group2Msg1");
Message group2Msg2 = fifoMessage("2", "group2Msg2");
Message group3Msg1 = fifoMessage("3", "group3Msg1");

when(sqs.receiveMessage(
new ReceiveMessageRequest("http://testSimpleReceiveMessage.amazonaws.com").withAttributeNames("All")
.withMessageAttributeNames("All").withMaxNumberOfMessages(10).withWaitTimeSeconds(20)))
.thenReturn(new ReceiveMessageResult().withMessages(group1Msg1, group1Msg2, group1Msg3,
group1Msg4, group2Msg1, group2Msg2, group3Msg1))
.thenReturn(new ReceiveMessageResult());
when(sqs.getQueueAttributes(any(GetQueueAttributesRequest.class))).thenReturn(new GetQueueAttributesResult());

// Act
container.start();

// Assert
assertThat(countDownLatch.await(5, TimeUnit.SECONDS)).isTrue();

List<String> actualGroup1Messages = actualHandledMessages.stream().filter(msg -> msg.startsWith("group1"))
.collect(Collectors.toList());
List<String> actualGroup2Messages = actualHandledMessages.stream().filter(msg -> msg.startsWith("group2"))
.collect(Collectors.toList());
List<String> actualGroup3Messages = actualHandledMessages.stream().filter(msg -> msg.startsWith("group3"))
.collect(Collectors.toList());

assertThat(actualGroup1Messages).containsExactly("group1Msg1");
assertThat(actualGroup2Messages).containsExactly("group2Msg1", "group2Msg2");
assertThat(actualGroup3Messages).containsExactly("group3Msg1");

container.stop();
}

@Test
void receiveMessage_throwsAnException_operationShouldBeRetried() throws Exception {
// Arrange
Expand Down Expand Up @@ -1421,6 +1488,21 @@ String getMessage() {

}

private static class FifoTestMessageListenerThrowingExceptionConditionally {

private String message;

@RuntimeUse
@SqsListener("testQueue.fifo")
private void handleMessage(String message) {
this.message = message;
if (message.contains("-fail")) {
throw new RuntimeException("Message processing failed.");
}
}

}

private static class AnotherTestMessageListener {

private String message;
Expand Down