Skip to content

Only process FIFO queue messages with same MessageGroupId if previous ones were acknowledged #124

@sebastianistoblame

Description

@sebastianistoblame

Type: Feature

Is your feature request related to a problem? Please describe.
When receiving FIFO queue messages as batches of more than 1, it can happen that a newer message with the same MessageGroupId is processed before older ones are acknowledged.

Lets consider this example: We have two messages: A, B
A is first in the FIFO queue, B second.

Let's set our Deletion Policy to NEVER to manually acknowledge our messages. Now put both message A and B into the queue and receive them with maximum number of messages set to 10. Currently, if message A is not acknowledged, message B will be processed.

Describe the solution you'd like
I would like, that if message A is not acknowledged, message B will not be processed. Sure, setting the max. number of messages to receive to 1 will work. But I want to avoid using a value of 1 in order to reduce the number of calls made to AWS while still retaining correct ordering.

I'd like to have a configuration flag here. By turning it on, Spring AWS Messaging will not process messages, that have an identical MessageGroupId, if an earlier message with this MessageGroupId in the same batch was not acknowledged.

Describe alternatives you've considered
The downside of my proposed solution is, that you are moving more FIFO logic into the consumer. Therefore I can understand if you choose not to implement this. But in this case, I'd like to have a warning if Spring Cloud AWS messaging is configured with maxNumberOfMessages > 1 in combination with a FIFO queue.

Additional context
Here is my AmazonSQSAsync config:

  @Bean
  @Primary
  public AmazonSQSAsync amazonSQSClient() {
    return AmazonSQSAsyncClientBuilder.standard().build();
  }

Here is my SimpleMessageListenerContainerFactory config:

  @Bean
  public SimpleMessageListenerContainerFactory messageListenerContainerFactory(AmazonSQSAsync amazonSQSClient) {
    var factory = new SimpleMessageListenerContainerFactory();
    factory.setAmazonSqs(amazonSQSClient);
    factory.setMaxNumberOfMessages(10);
    factory.setWaitTimeOut(10);
    factory.setBackOffTime(1000L);
    return factory;
  }

Metadata

Metadata

Assignees

No one assigned

    Labels

    component: sqsSQS integration related issuetype: bugSomething isn't workingtype: enhancementSmaller enhancement in existing integration

    Type

    No type

    Projects

    No projects

    Milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions