createArgumentResolvers(MessageConverter messageConverter) {
+ return Arrays.asList(
+ new AcknowledgmentHandlerMethodArgumentResolver(),
+ new BatchAcknowledgmentArgumentResolver(),
+ new HeaderMethodArgumentResolver(new DefaultConversionService(), getConfigurableBeanFactory()),
+ new HeadersMethodArgumentResolver(),
+ new BatchPayloadMethodArgumentResolver(messageConverter),
+ new MessageMethodArgumentResolver(messageConverter),
+ new PayloadMethodArgumentResolver(messageConverter));
+ }
+ // @formatter:on
+
+ protected MappingJackson2MessageConverter createDefaultMappingJackson2MessageConverter(ObjectMapper objectMapper) {
+ MappingJackson2MessageConverter jacksonMessageConverter = new MappingJackson2MessageConverter();
+ jacksonMessageConverter.setSerializedPayloadClass(String.class);
+ jacksonMessageConverter.setStrictContentTypeMatch(false);
+ if (objectMapper != null) {
+ jacksonMessageConverter.setObjectMapper(objectMapper);
+ }
+ return jacksonMessageConverter;
+ }
+
+ protected EndpointRegistrar createEndpointRegistrar() {
+ return new EndpointRegistrar();
+ }
+
+ @Override
+ public void setEmbeddedValueResolver(StringValueResolver resolver) {
+ this.resolver = resolver;
+ }
+
+ private static class DelegatingMessageHandlerMethodFactory implements MessageHandlerMethodFactory {
+
+ private MessageHandlerMethodFactory delegate;
+
+ @Override
+ public InvocableHandlerMethod createInvocableHandlerMethod(Object bean, Method method) {
+ Assert.notNull(this.delegate, "No delegate MessageHandlerMethodFactory set.");
+ return this.delegate.createInvocableHandlerMethod(bean, method);
+ }
+
+ public void setDelegate(MessageHandlerMethodFactory delegate) {
+ this.delegate = delegate;
+ }
+ }
+
+}
diff --git a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/annotation/SqsListener.java b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/annotation/SqsListener.java
new file mode 100644
index 000000000..b9edac3e1
--- /dev/null
+++ b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/annotation/SqsListener.java
@@ -0,0 +1,140 @@
+/*
+ * Copyright 2013-2022 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.awspring.cloud.sqs.annotation;
+
+import io.awspring.cloud.sqs.config.SqsListenerConfigurer;
+import io.awspring.cloud.sqs.config.SqsMessageListenerContainerFactory;
+import java.lang.annotation.Documented;
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+import org.springframework.core.annotation.AliasFor;
+
+/**
+ * Methods with this annotation will be wrapped by a {@link io.awspring.cloud.sqs.listener.MessageListener} or
+ * {@link io.awspring.cloud.sqs.listener.AsyncMessageListener} and set to a
+ * {@link io.awspring.cloud.sqs.listener.MessageListenerContainer}.
+ *
+ * Each method will be handled by a different container instance, created by the specified {@link #factory()} property.
+ * If not specified, a default factory will be looked up in the context.
+ *
+ * When used in conjunction with Spring Boot and auto configuration, the framework supplies a default
+ * {@link io.awspring.cloud.sqs.config.SqsMessageListenerContainerFactory} and a
+ * {@link software.amazon.awssdk.services.sqs.SqsAsyncClient}, unless such beans are already found in the
+ * {@link org.springframework.context.ApplicationContext}.
+ *
+ * For more complex configurations, {@link io.awspring.cloud.sqs.config.SqsMessageListenerContainerFactory} instances
+ * can be created and configured. See {@link SqsMessageListenerContainerFactory#builder()} for more information on
+ * creating and configuring a factory.
+ *
+ * Further configuration for containers created using this annotation can be achieved by declaring
+ * {@link SqsListenerConfigurer} beans.
+ *
+ * Methods with this annotation can have flexible signatures, including arguments of the following types:
+ *
+ * - {@link org.springframework.messaging.handler.annotation.Header}
+ * - {@link org.springframework.messaging.handler.annotation.Headers}
+ * - {@link org.springframework.messaging.Message}
+ * - {@link io.awspring.cloud.sqs.listener.Visibility}
+ * - {@link io.awspring.cloud.sqs.listener.QueueAttributes}
+ * - {@link software.amazon.awssdk.services.sqs.model.Message}
+ * - {@link io.awspring.cloud.sqs.listener.acknowledgement.Acknowledgement}
+ * - {@link io.awspring.cloud.sqs.listener.acknowledgement.AsyncAcknowledgement}
+ *
+ * Method signatures also accept {@link java.util.List}<Pojo> and
+ * {@link java.util.List}{@link org.springframework.messaging.Message}<Pojo> arguments . Such arguments will
+ * configure the container to batch mode. When using List arguments, no other arguments can be provided. Metadata can be
+ * retrieved by inspecting the {@link org.springframework.messaging.Message} instances'
+ * {@link org.springframework.messaging.MessageHeaders}.
+ *
+ * To support {@link io.awspring.cloud.sqs.listener.acknowledgement.Acknowledgement} and
+ * {@link io.awspring.cloud.sqs.listener.acknowledgement.AsyncAcknowledgement} arguments, the factory used to create the
+ * containers must be set to {@link io.awspring.cloud.sqs.listener.acknowledgement.handler.AcknowledgementMode#MANUAL}.
+ *
+ * Properties in this annotation support property placeholders ("${...}") and SpEL ("#{...}").
+ *
+ * @see SqsMessageListenerContainerFactory
+ * @see io.awspring.cloud.sqs.listener.SqsMessageListenerContainer
+ * @see SqsListenerConfigurer
+ *
+ * @author Alain Sahli
+ * @author Matej Nedic
+ * @author Tomaz Fernandes
+ * @since 1.1
+ */
+@Target(ElementType.METHOD)
+@Retention(RetentionPolicy.RUNTIME)
+@Documented
+public @interface SqsListener {
+
+ /**
+ * Array of queue names or urls. Queues declared in the same annotation will be handled by the same
+ * {@link io.awspring.cloud.sqs.listener.MessageListenerContainer}.
+ * @return list of queue names or urls.
+ */
+ String[] value() default {};
+
+ /**
+ * Alias for {@link #value()}
+ * @return list of queue names or urls.
+ */
+ @AliasFor("value")
+ String[] queueNames() default {};
+
+ /**
+ * The {@link io.awspring.cloud.sqs.config.MessageListenerContainerFactory} bean name to be used to process this
+ * endpoint.
+ * @return the factory bean name.
+ */
+ String factory() default "";
+
+ /**
+ * An id for the {@link io.awspring.cloud.sqs.listener.MessageListenerContainer} that will be created to handle this
+ * endpoint. If none provided a default ID will be created.
+ * @return the container id.
+ */
+ String id() default "";
+
+ /**
+ * The maximum number of inflight messages that should be processed simultaneously for each queue declared in this
+ * annotation.
+ * @return the maximum number of inflight messages.
+ */
+ String maxInflightMessagesPerQueue() default "";
+
+ /**
+ * The maximum number of seconds to wait for messages in a poll to SQS.
+ * @return the poll timeout.
+ */
+ String pollTimeoutSeconds() default "";
+
+ /**
+ * The maximum amount of time to wait for a poll to SQS. If a value greater than 10 is provided, the result of
+ * multiple polls will be combined, which can be useful for
+ * {@link io.awspring.cloud.sqs.listener.ListenerMode#BATCH}
+ * @return the maximum messages per poll.
+ */
+ String maxMessagesPerPoll() default "";
+
+ /**
+ * The message visibility to be applied to messages received from the provided queues. For Standard SQS queues and
+ * batch listeners, visibility will be applied at polling. For single message FIFO queues, visibility is changed
+ * before each remaining message from the same message group is processed.
+ */
+ String messageVisibilitySeconds() default "";
+
+}
diff --git a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/annotation/SqsListenerAnnotationBeanPostProcessor.java b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/annotation/SqsListenerAnnotationBeanPostProcessor.java
new file mode 100644
index 000000000..badfd646b
--- /dev/null
+++ b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/annotation/SqsListenerAnnotationBeanPostProcessor.java
@@ -0,0 +1,70 @@
+/*
+ * Copyright 2013-2022 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.awspring.cloud.sqs.annotation;
+
+import io.awspring.cloud.sqs.config.Endpoint;
+import io.awspring.cloud.sqs.config.EndpointRegistrar;
+import io.awspring.cloud.sqs.config.SqsEndpoint;
+import io.awspring.cloud.sqs.listener.SqsHeaders;
+import io.awspring.cloud.sqs.support.resolver.QueueAttributesMethodArgumentResolver;
+import io.awspring.cloud.sqs.support.resolver.SqsMessageMethodArgumentResolver;
+import io.awspring.cloud.sqs.support.resolver.VisibilityHandlerMethodArgumentResolver;
+import java.util.Arrays;
+import java.util.Collection;
+import org.springframework.beans.factory.config.BeanPostProcessor;
+import org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolver;
+
+/**
+ * {@link BeanPostProcessor} implementation that scans beans for a {@link SqsListener @SqsListener} annotation, extracts
+ * information to a {@link SqsEndpoint}, and registers it in the {@link EndpointRegistrar}.
+ *
+ * @author Tomaz Fernandes
+ * @since 3.0
+ */
+public class SqsListenerAnnotationBeanPostProcessor extends AbstractListenerAnnotationBeanPostProcessor {
+
+ private static final String GENERATED_ID_PREFIX = "io.awspring.cloud.sqs.sqsListenerEndpointContainer#";
+
+ @Override
+ protected Class getAnnotationClass() {
+ return SqsListener.class;
+ }
+
+ protected Endpoint createEndpoint(SqsListener sqsListenerAnnotation) {
+ return SqsEndpoint.builder().queueNames(resolveStringArray(sqsListenerAnnotation.value(), "queueNames"))
+ .factoryBeanName(resolveAsString(sqsListenerAnnotation.factory(), "factory"))
+ .id(getEndpointId(sqsListenerAnnotation.id()))
+ .pollTimeoutSeconds(resolveAsInteger(sqsListenerAnnotation.pollTimeoutSeconds(), "pollTimeoutSeconds"))
+ .maxMessagesPerPoll(resolveAsInteger(sqsListenerAnnotation.maxMessagesPerPoll(), "maxMessagesPerPoll"))
+ .maxInflightMessagesPerQueue(resolveAsInteger(sqsListenerAnnotation.maxInflightMessagesPerQueue(),
+ "maxInflightMessagesPerQueue"))
+ .messageVisibility(
+ resolveAsInteger(sqsListenerAnnotation.messageVisibilitySeconds(), "messageVisibility"))
+ .build();
+ }
+
+ @Override
+ protected String getGeneratedIdPrefix() {
+ return GENERATED_ID_PREFIX;
+ }
+
+ @Override
+ protected Collection createAdditionalArgumentResolvers() {
+ return Arrays.asList(new VisibilityHandlerMethodArgumentResolver(SqsHeaders.SQS_VISIBILITY_HEADER),
+ new SqsMessageMethodArgumentResolver(), new QueueAttributesMethodArgumentResolver());
+ }
+
+}
diff --git a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/annotation/package-info.java b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/annotation/package-info.java
new file mode 100644
index 000000000..aabc0e17e
--- /dev/null
+++ b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/annotation/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Copyright 2013-2022 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Auto-configuration for Amazon SQS (Simple Queue Service) integrations.
+ */
+@org.springframework.lang.NonNullApi
+@org.springframework.lang.NonNullFields
+package io.awspring.cloud.sqs.annotation;
diff --git a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/config/AbstractEndpoint.java b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/config/AbstractEndpoint.java
new file mode 100644
index 000000000..dbb273c75
--- /dev/null
+++ b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/config/AbstractEndpoint.java
@@ -0,0 +1,173 @@
+/*
+ * Copyright 2013-2022 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.awspring.cloud.sqs.config;
+
+import io.awspring.cloud.sqs.listener.AsyncMessageListener;
+import io.awspring.cloud.sqs.listener.ListenerMode;
+import io.awspring.cloud.sqs.listener.MessageListener;
+import io.awspring.cloud.sqs.listener.MessageListenerContainer;
+import io.awspring.cloud.sqs.listener.acknowledgement.BatchAcknowledgement;
+import io.awspring.cloud.sqs.listener.adapter.AsyncMessagingMessageListenerAdapter;
+import io.awspring.cloud.sqs.listener.adapter.MessagingMessageListenerAdapter;
+import java.lang.reflect.Method;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CompletionStage;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.springframework.core.BridgeMethodResolver;
+import org.springframework.core.MethodParameter;
+import org.springframework.lang.Nullable;
+import org.springframework.messaging.handler.annotation.support.MessageHandlerMethodFactory;
+import org.springframework.messaging.handler.invocation.InvocableHandlerMethod;
+import org.springframework.util.Assert;
+
+/**
+ * Base class for implementing a {@link HandlerMethodEndpoint}.
+ *
+ * @author Tomaz Fernandes
+ * @since 3.0
+ */
+public abstract class AbstractEndpoint implements HandlerMethodEndpoint {
+
+ private final Collection logicalNames;
+
+ private final String listenerContainerFactoryName;
+
+ private final String id;
+
+ private Object bean;
+
+ private Method method;
+
+ private MessageHandlerMethodFactory handlerMethodFactory;
+
+ protected AbstractEndpoint(Collection queueNames, @Nullable String listenerContainerFactoryName,
+ String id) {
+ Assert.notEmpty(queueNames, "queueNames cannot be empty.");
+ this.id = id;
+ this.logicalNames = queueNames;
+ this.listenerContainerFactoryName = listenerContainerFactoryName;
+ }
+
+ @Override
+ public Collection getLogicalNames() {
+ return this.logicalNames;
+ }
+
+ @Override
+ public String getListenerContainerFactoryName() {
+ return this.listenerContainerFactoryName;
+ }
+
+ @Override
+ public String getId() {
+ return this.id;
+ }
+
+ /**
+ * Set the bean instance to be used when handling a message for this endpoint.
+ * @param bean the bean instance.
+ */
+ @Override
+ public void setBean(Object bean) {
+ this.bean = bean;
+ }
+
+ /**
+ * Set the method to be used when handling a message for this endpoint.
+ * @param method the method.
+ */
+ @Override
+ public void setMethod(Method method) {
+ this.method = method;
+ }
+
+ /**
+ * Set the {@link MessageHandlerMethodFactory} to be used for handling messages in this endpoint.
+ * @param handlerMethodFactory the factory.
+ */
+ @Override
+ public void setHandlerMethodFactory(MessageHandlerMethodFactory handlerMethodFactory) {
+ Assert.notNull(handlerMethodFactory, "handlerMethodFactory cannot be null");
+ this.handlerMethodFactory = handlerMethodFactory;
+ }
+
+ @Override
+ public void configureListenerMode(Consumer consumer) {
+ List parameters = getMethodParameters();
+ boolean batch = hasParameterOfType(parameters, List.class);
+ boolean batchAckParameter = hasParameterOfType(parameters, BatchAcknowledgement.class);
+ Assert.isTrue(hasValidParameters(batch, batchAckParameter, parameters.size()), getInvalidParametersMessage());
+ consumer.accept(batch ? ListenerMode.BATCH : ListenerMode.SINGLE_MESSAGE);
+ }
+
+ private boolean hasValidParameters(boolean batch, boolean batchAckParameter, int size) {
+ return hasValidSingleMessageParameters(batch, batchAckParameter)
+ || hasValidBatchParameters(batchAckParameter, size);
+ }
+
+ private boolean hasValidSingleMessageParameters(boolean batch, boolean batchAckParameter) {
+ return !batch && !batchAckParameter;
+ }
+
+ private boolean hasValidBatchParameters(boolean batchAckParameter, int size) {
+ return size == 1 || (size == 2 && batchAckParameter);
+ }
+
+ private String getInvalidParametersMessage() {
+ return String.format(
+ "Method %s from class %s in endpoint %s has invalid parameters for batch processing. "
+ + "Batch methods must have a single List parameter, either of Message or T types,"
+ + "and optionally a BatchAcknowledgement or AsyncAcknowledgement parameter.",
+ this.method.getName(), this.method.getDeclaringClass(), this.id);
+ }
+
+ private boolean hasParameterOfType(List parameters, Class> clazz) {
+ return parameters.stream().anyMatch(param -> clazz.isAssignableFrom(param.getParameterType()));
+ }
+
+ private List getMethodParameters() {
+ return IntStream.range(0, BridgeMethodResolver.findBridgedMethod(this.method).getParameterCount())
+ .mapToObj(index -> new MethodParameter(this.method, index)).collect(Collectors.toList());
+ }
+
+ /**
+ * Configure the provided container for this endpoint.
+ * @param container the container to be configured.
+ */
+ public void setupContainer(MessageListenerContainer container) {
+ Assert.notNull(this.handlerMethodFactory, "No handlerMethodFactory has been set");
+ InvocableHandlerMethod handlerMethod = this.handlerMethodFactory.createInvocableHandlerMethod(this.bean,
+ this.method);
+ if (CompletionStage.class.isAssignableFrom(handlerMethod.getReturnType().getParameterType())) {
+ container.setAsyncMessageListener(createAsyncMessageListenerInstance(handlerMethod));
+ }
+ else {
+ container.setMessageListener(createMessageListenerInstance(handlerMethod));
+ }
+ }
+
+ protected MessageListener createMessageListenerInstance(InvocableHandlerMethod handlerMethod) {
+ return new MessagingMessageListenerAdapter<>(handlerMethod);
+ }
+
+ protected AsyncMessageListener createAsyncMessageListenerInstance(InvocableHandlerMethod handlerMethod) {
+ return new AsyncMessagingMessageListenerAdapter<>(handlerMethod);
+ }
+
+}
diff --git a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/config/AbstractMessageListenerContainerFactory.java b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/config/AbstractMessageListenerContainerFactory.java
new file mode 100644
index 000000000..383d1019a
--- /dev/null
+++ b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/config/AbstractMessageListenerContainerFactory.java
@@ -0,0 +1,256 @@
+/*
+ * Copyright 2013-2022 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.awspring.cloud.sqs.config;
+
+import io.awspring.cloud.sqs.ConfigUtils;
+import io.awspring.cloud.sqs.listener.AbstractMessageListenerContainer;
+import io.awspring.cloud.sqs.listener.AsyncComponentAdapters;
+import io.awspring.cloud.sqs.listener.AsyncMessageListener;
+import io.awspring.cloud.sqs.listener.ContainerComponentFactory;
+import io.awspring.cloud.sqs.listener.ContainerOptions;
+import io.awspring.cloud.sqs.listener.MessageListener;
+import io.awspring.cloud.sqs.listener.MessageListenerContainer;
+import io.awspring.cloud.sqs.listener.acknowledgement.AcknowledgementResultCallback;
+import io.awspring.cloud.sqs.listener.acknowledgement.AsyncAcknowledgementResultCallback;
+import io.awspring.cloud.sqs.listener.errorhandler.AsyncErrorHandler;
+import io.awspring.cloud.sqs.listener.errorhandler.ErrorHandler;
+import io.awspring.cloud.sqs.listener.interceptor.AsyncMessageInterceptor;
+import io.awspring.cloud.sqs.listener.interceptor.MessageInterceptor;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.function.Consumer;
+import org.springframework.messaging.Message;
+import org.springframework.util.Assert;
+
+/**
+ * Base implementation for a {@link MessageListenerContainerFactory}. Contains the components and
+ * {@link ContainerOptions} that will be used as a template for {@link MessageListenerContainer} instances created by
+ * this factory.
+ *
+ * @param the {@link Message}'s payload type to be consumed by the {@link AbstractMessageListenerContainer}.
+ * @param the type of {@link AbstractMessageListenerContainer} instances that will be created by this container.
+ *
+ * @author Tomaz Fernandes
+ * @since 3.0
+ */
+public abstract class AbstractMessageListenerContainerFactory>
+ implements MessageListenerContainerFactory {
+
+ private final ContainerOptions.Builder containerOptionsBuilder;
+
+ private final Collection> asyncMessageInterceptors = new ArrayList<>();
+
+ private final Collection> messageInterceptors = new ArrayList<>();
+
+ private AsyncErrorHandler asyncErrorHandler;
+
+ private ErrorHandler errorHandler;
+
+ private AsyncMessageListener asyncMessageListener;
+
+ private MessageListener messageListener;
+
+ private AsyncAcknowledgementResultCallback asyncAcknowledgementResultCallback;
+
+ private AcknowledgementResultCallback acknowledgementResultCallback;
+
+ private Collection> containerComponentFactories;
+
+ protected AbstractMessageListenerContainerFactory() {
+ this.containerOptionsBuilder = ContainerOptions.builder();
+ }
+
+ /**
+ * Set the {@link ErrorHandler} instance to be used by containers created with this factory. The component will be
+ * adapted to an {@link AsyncErrorHandler}.
+ * @param errorHandler the error handler instance.
+ * @see AsyncComponentAdapters
+ */
+ public void setErrorHandler(ErrorHandler errorHandler) {
+ Assert.notNull(errorHandler, "errorHandler cannot be null");
+ this.errorHandler = errorHandler;
+ }
+
+ /**
+ * Set the {@link AsyncErrorHandler} instance to be used by containers created with this factory.
+ * @param errorHandler the error handler instance.
+ */
+ public void setErrorHandler(AsyncErrorHandler errorHandler) {
+ Assert.notNull(errorHandler, "errorHandler cannot be null");
+ this.asyncErrorHandler = errorHandler;
+ }
+
+ /**
+ * Add a {@link MessageInterceptor} to be used by containers created with this factory. Interceptors will be applied
+ * just before method invocation. The component will be adapted to an {@link AsyncMessageInterceptor}.
+ * @param messageInterceptor the message interceptor instance.
+ * @see AsyncComponentAdapters
+ */
+ public void addMessageInterceptor(MessageInterceptor messageInterceptor) {
+ Assert.notNull(messageInterceptor, "messageInterceptor cannot be null");
+ this.messageInterceptors.add(messageInterceptor);
+ }
+
+ /**
+ * Add a {@link AsyncMessageInterceptor} to be used by containers created with this factory. Interceptors will be
+ * applied just before method invocation.
+ * @param messageInterceptor the message interceptor instance.
+ */
+ public void addMessageInterceptor(AsyncMessageInterceptor messageInterceptor) {
+ Assert.notNull(messageInterceptor, "messageInterceptor cannot be null");
+ this.asyncMessageInterceptors.add(messageInterceptor);
+ }
+
+ /**
+ * Set the {@link MessageListener} instance to be used by containers created with this factory. If none is provided,
+ * a default one will be created according to the endpoint's configuration. The component will be adapted to an
+ * {@link AsyncMessageListener}.
+ * @param messageListener the message listener instance.
+ * @see AsyncComponentAdapters
+ */
+ public void setMessageListener(MessageListener messageListener) {
+ Assert.notNull(messageListener, "messageListener cannot be null");
+ this.messageListener = messageListener;
+ }
+
+ /**
+ * Set the {@link AsyncMessageListener} instance to be used by containers created with this factory. If none is
+ * provided, a default one will be created according to the endpoint's configuration.
+ * @param messageListener the message listener instance.
+ */
+ public void setAsyncMessageListener(AsyncMessageListener messageListener) {
+ Assert.notNull(messageListener, "messageListener cannot be null");
+ this.asyncMessageListener = messageListener;
+ }
+
+ /**
+ * Set the {@link AsyncAcknowledgementResultCallback} instance to be used by containers created by this factory.
+ * @param acknowledgementResultCallback the instance.
+ */
+ public void setAcknowledgementResultCallback(AsyncAcknowledgementResultCallback acknowledgementResultCallback) {
+ Assert.notNull(acknowledgementResultCallback, "acknowledgementResultCallback cannot be null");
+ this.asyncAcknowledgementResultCallback = acknowledgementResultCallback;
+ }
+
+ /**
+ * Set the {@link AcknowledgementResultCallback} instance to be used by containers created by this factory.
+ * @param acknowledgementResultCallback the instance.
+ */
+ public void setAcknowledgementResultCallback(AcknowledgementResultCallback acknowledgementResultCallback) {
+ Assert.notNull(acknowledgementResultCallback, "acknowledgementResultCallback cannot be null");
+ this.acknowledgementResultCallback = acknowledgementResultCallback;
+ }
+
+ /**
+ * Set the {@link ContainerComponentFactory} instances that will be used to create components for listener
+ * containers created by this factory.
+ * @param containerComponentFactories the factory instances.
+ */
+ public void setContainerComponentFactories(Collection> containerComponentFactories) {
+ Assert.notEmpty(containerComponentFactories, "containerComponentFactories cannot be null or empty");
+ this.containerComponentFactories = containerComponentFactories;
+ }
+
+ /**
+ * Allows configuring this factories' {@link ContainerOptions.Builder}.
+ */
+ public void configure(Consumer options) {
+ options.accept(this.containerOptionsBuilder);
+ }
+
+ @Override
+ public C createContainer(Endpoint endpoint) {
+ Assert.notNull(endpoint, "endpoint cannot be null");
+ ContainerOptions.Builder options = this.containerOptionsBuilder.createCopy();
+ configure(endpoint, options);
+ C container = createContainerInstance(endpoint, options.build());
+ endpoint.setupContainer(container);
+ configureContainer(container, endpoint);
+ return container;
+ }
+
+ private void configure(Endpoint endpoint, ContainerOptions.Builder options) {
+ ConfigUtils.INSTANCE.acceptIfInstance(endpoint, HandlerMethodEndpoint.class,
+ abstractEndpoint -> abstractEndpoint.configureListenerMode(options::listenerMode));
+ configureContainerOptions(endpoint, options);
+ }
+
+ protected void configureContainerOptions(Endpoint endpoint, ContainerOptions.Builder containerOptions) {
+ }
+
+ @Override
+ public C createContainer(String... logicalEndpointNames) {
+ Assert.notEmpty(logicalEndpointNames, "endpointNames cannot be empty");
+ return createContainer(new EndpointAdapter(Arrays.asList(logicalEndpointNames)));
+ }
+
+ @SuppressWarnings("unchecked")
+ protected void configureContainer(C container, Endpoint endpoint) {
+ ConfigUtils.INSTANCE.acceptIfInstance(container, AbstractMessageListenerContainer.class,
+ abstractContainer -> configureAbstractContainer(abstractContainer, endpoint));
+ }
+
+ protected void configureAbstractContainer(AbstractMessageListenerContainer container, Endpoint endpoint) {
+ container.setQueueNames(endpoint.getLogicalNames());
+ ConfigUtils.INSTANCE.acceptIfNotNull(endpoint.getId(), container::setId)
+ .acceptIfNotNull(this.containerComponentFactories, container::setComponentFactories)
+ .acceptIfNotNull(this.messageListener, container::setMessageListener)
+ .acceptIfNotNull(this.asyncMessageListener, container::setAsyncMessageListener)
+ .acceptIfNotNull(this.errorHandler, container::setErrorHandler)
+ .acceptIfNotNull(this.asyncErrorHandler, container::setErrorHandler)
+ .acceptIfNotNull(this.asyncAcknowledgementResultCallback, container::setAcknowledgementResultCallback)
+ .acceptIfNotNull(this.acknowledgementResultCallback, container::setAcknowledgementResultCallback)
+ .acceptIfNotEmpty(this.messageInterceptors,
+ interceptors -> interceptors.forEach(container::addMessageInterceptor))
+ .acceptIfNotEmpty(this.asyncMessageInterceptors,
+ interceptors -> interceptors.forEach(container::addMessageInterceptor));
+ }
+
+ protected abstract C createContainerInstance(Endpoint endpoint, ContainerOptions containerOptions);
+
+ private static class EndpointAdapter implements Endpoint {
+
+ private final Collection endpointNames;
+
+ public EndpointAdapter(Collection endpointNames) {
+ this.endpointNames = endpointNames;
+ }
+
+ @SuppressWarnings("rawtypes")
+ @Override
+ public void setupContainer(MessageListenerContainer container) {
+ // No ops - container should be setup manually.
+ }
+
+ @Override
+ public Collection getLogicalNames() {
+ return this.endpointNames;
+ }
+
+ @Override
+ public String getListenerContainerFactoryName() {
+ // we're already in the factory
+ return null;
+ }
+
+ @Override
+ public String getId() {
+ // Container will setup its own id
+ return null;
+ }
+ }
+}
diff --git a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/config/Endpoint.java b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/config/Endpoint.java
new file mode 100644
index 000000000..fd3fcef45
--- /dev/null
+++ b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/config/Endpoint.java
@@ -0,0 +1,56 @@
+/*
+ * Copyright 2013-2022 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.awspring.cloud.sqs.config;
+
+import io.awspring.cloud.sqs.listener.MessageListenerContainer;
+import java.util.Collection;
+import org.springframework.lang.Nullable;
+
+/**
+ * Represents a messaging endpoint from which messages can be consumed by a {@link MessageListenerContainer}.
+ *
+ * @author Tomaz Fernandes
+ * @since 3.0
+ */
+public interface Endpoint {
+
+ /**
+ * The logical names for this endpoint.
+ * @return the logical names.
+ */
+ Collection getLogicalNames();
+
+ /**
+ * The name of the factory bean that will process this endpoint.
+ * @return the factory bean name.
+ */
+ @Nullable
+ String getListenerContainerFactoryName();
+
+ /**
+ * An optional id for this endpoint.
+ * @return the endpoint id.
+ */
+ @Nullable
+ String getId();
+
+ /**
+ * Set up the necessary attributes for the container to process this endpoint.
+ * @param container the container to be configured.
+ */
+ void setupContainer(MessageListenerContainer container);
+
+}
diff --git a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/config/EndpointRegistrar.java b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/config/EndpointRegistrar.java
new file mode 100644
index 000000000..abeba6b6d
--- /dev/null
+++ b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/config/EndpointRegistrar.java
@@ -0,0 +1,214 @@
+/*
+ * Copyright 2013-2022 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.awspring.cloud.sqs.config;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.awspring.cloud.sqs.listener.MessageListenerContainer;
+import io.awspring.cloud.sqs.listener.MessageListenerContainerRegistry;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.function.Consumer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.BeansException;
+import org.springframework.beans.factory.BeanFactory;
+import org.springframework.beans.factory.BeanFactoryAware;
+import org.springframework.beans.factory.SmartInitializingSingleton;
+import org.springframework.messaging.MessageHandler;
+import org.springframework.messaging.converter.MessageConverter;
+import org.springframework.messaging.handler.annotation.support.DefaultMessageHandlerMethodFactory;
+import org.springframework.messaging.handler.annotation.support.MessageHandlerMethodFactory;
+import org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolver;
+import org.springframework.util.Assert;
+import org.springframework.util.StringUtils;
+
+/**
+ * Processes the registered {@link Endpoint} instances using the appropriate {@link MessageListenerContainerFactory}.
+ * Contains configurations that will be applied to all {@link io.awspring.cloud.sqs.annotation.SqsListener @SqsListener}
+ * containers. Such configurations can be set by declaring {@link SqsListenerConfigurer} beans.
+ *
+ * @author Tomaz Fernandes
+ * @since 3.0
+ * @see SqsListenerConfigurer
+ */
+public class EndpointRegistrar implements BeanFactoryAware, SmartInitializingSingleton {
+
+ private static final Logger logger = LoggerFactory.getLogger(EndpointRegistrar.class);
+
+ public static final String DEFAULT_LISTENER_CONTAINER_FACTORY_BEAN_NAME = "defaultSqsListenerContainerFactory";
+
+ private BeanFactory beanFactory;
+
+ private MessageHandlerMethodFactory messageHandlerMethodFactory = new DefaultMessageHandlerMethodFactory();
+
+ private MessageListenerContainerRegistry listenerContainerRegistry;
+
+ private String messageListenerContainerRegistryBeanName = SqsBeanNames.ENDPOINT_REGISTRY_BEAN_NAME;
+
+ private String defaultListenerContainerFactoryBeanName = DEFAULT_LISTENER_CONTAINER_FACTORY_BEAN_NAME;
+
+ private final Collection endpoints = new ArrayList<>();
+
+ private Consumer> messageConvertersConsumer = converters -> {
+ };
+
+ private Consumer> methodArgumentResolversConsumer = resolvers -> {
+ };
+
+ private ObjectMapper objectMapper;
+
+ /**
+ * Set a custom {@link MessageHandlerMethodFactory} implementation.
+ * @param messageHandlerMethodFactory the instance.
+ */
+ public void setMessageHandlerMethodFactory(MessageHandlerMethodFactory messageHandlerMethodFactory) {
+ Assert.notNull(messageHandlerMethodFactory, "messageHandlerMethodFactory cannot be null");
+ this.messageHandlerMethodFactory = messageHandlerMethodFactory;
+ }
+
+ /**
+ * Set a custom {@link MessageListenerContainerRegistry}.
+ * @param listenerContainerRegistry the instance.
+ */
+ public void setListenerContainerRegistry(MessageListenerContainerRegistry listenerContainerRegistry) {
+ Assert.notNull(listenerContainerRegistry, "listenerContainerRegistry cannot be null");
+ this.listenerContainerRegistry = listenerContainerRegistry;
+ }
+
+ /**
+ * Set the bean name for the default {@link MessageListenerContainerFactory}.
+ * @param defaultListenerContainerFactoryBeanName the bean name.
+ */
+ public void setDefaultListenerContainerFactoryBeanName(String defaultListenerContainerFactoryBeanName) {
+ Assert.isTrue(StringUtils.hasText(defaultListenerContainerFactoryBeanName),
+ "defaultListenerContainerFactoryBeanName must have text");
+ this.defaultListenerContainerFactoryBeanName = defaultListenerContainerFactoryBeanName;
+ }
+
+ /**
+ * Set the bean name for the {@link MessageListenerContainerRegistry}.
+ * @param messageListenerContainerRegistryBeanName the bean name.
+ */
+ public void setMessageListenerContainerRegistryBeanName(String messageListenerContainerRegistryBeanName) {
+ Assert.isTrue(StringUtils.hasText(messageListenerContainerRegistryBeanName),
+ "messageListenerContainerRegistryBeanName must have text");
+ this.messageListenerContainerRegistryBeanName = messageListenerContainerRegistryBeanName;
+ }
+
+ /**
+ * Set the object mapper to be used to deserialize payloads fot SqsListener endpoints.
+ * @param objectMapper the object mapper instance.
+ */
+ public void setObjectMapper(ObjectMapper objectMapper) {
+ Assert.notNull(objectMapper, "objectMapper cannot be null.");
+ this.objectMapper = objectMapper;
+ }
+
+ /**
+ * Manage the list of {@link MessageConverter} instances to be used to convert payloads.
+ * @param convertersConsumer a consumer for the converters list.
+ */
+ public void manageMessageConverters(Consumer> convertersConsumer) {
+ Assert.notNull(convertersConsumer, "convertersConsumer cannot be null");
+ this.messageConvertersConsumer = convertersConsumer;
+ }
+
+ /**
+ * Manage the list of {@link HandlerMethodArgumentResolver} instances to be used for resolving method arguments.
+ * @param resolversConsumer a consumer for the resolvers list.
+ */
+ public void manageMethodArgumentResolvers(Consumer> resolversConsumer) {
+ Assert.notNull(resolversConsumer, "resolversConsumer cannot be null");
+ this.methodArgumentResolversConsumer = resolversConsumer;
+ }
+
+ /**
+ * Get the message converters list consumer.
+ * @return the consumer.
+ */
+ public Consumer> getMessageConverterConsumer() {
+ return this.messageConvertersConsumer;
+ }
+
+ /**
+ * Get the method argument resolvers list consumer.
+ * @return the consumer.
+ */
+ public Consumer> getMethodArgumentResolversConsumer() {
+ return this.methodArgumentResolversConsumer;
+ }
+
+ /**
+ * Get the object mapper used to deserialize payloads.
+ * @return the object mapper instance.
+ */
+ public ObjectMapper getObjectMapper() {
+ return this.objectMapper;
+ }
+
+ /**
+ * Return the {@link MessageHandlerMethodFactory} to be used to create {@link MessageHandler} instances for the
+ * {@link Endpoint}s.
+ * @return the factory instance.
+ */
+ public MessageHandlerMethodFactory getMessageHandlerMethodFactory() {
+ return this.messageHandlerMethodFactory;
+ }
+
+ @Override
+ public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
+ this.beanFactory = beanFactory;
+ }
+
+ /**
+ * Register an {@link Endpoint} within this registrar for later processing.
+ * @param endpoint the endpoint.
+ */
+ public void registerEndpoint(Endpoint endpoint) {
+ this.endpoints.add(endpoint);
+ }
+
+ @Override
+ public void afterSingletonsInstantiated() {
+ if (this.listenerContainerRegistry == null) {
+ this.listenerContainerRegistry = beanFactory.getBean(this.messageListenerContainerRegistryBeanName,
+ MessageListenerContainerRegistry.class);
+ }
+ this.endpoints.forEach(this::process);
+ }
+
+ private void process(Endpoint endpoint) {
+ logger.debug("Processing endpoint {}", endpoint.getId());
+ this.listenerContainerRegistry.registerListenerContainer(createContainerFor(endpoint));
+ }
+
+ private MessageListenerContainer> createContainerFor(Endpoint endpoint) {
+ String factoryBeanName = getListenerContainerFactoryName(endpoint);
+ Assert.isTrue(this.beanFactory.containsBean(factoryBeanName),
+ () -> "No MessageListenerContainerFactory bean with name " + factoryBeanName
+ + " found for endpoint names " + endpoint.getLogicalNames());
+ return this.beanFactory.getBean(factoryBeanName, MessageListenerContainerFactory.class)
+ .createContainer(endpoint);
+ }
+
+ private String getListenerContainerFactoryName(Endpoint endpoint) {
+ return StringUtils.hasText(endpoint.getListenerContainerFactoryName())
+ ? endpoint.getListenerContainerFactoryName()
+ : this.defaultListenerContainerFactoryBeanName;
+ }
+
+}
diff --git a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/config/HandlerMethodEndpoint.java b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/config/HandlerMethodEndpoint.java
new file mode 100644
index 000000000..2aeeb320d
--- /dev/null
+++ b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/config/HandlerMethodEndpoint.java
@@ -0,0 +1,57 @@
+/*
+ * Copyright 2013-2022 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.awspring.cloud.sqs.config;
+
+import io.awspring.cloud.sqs.listener.ListenerMode;
+import java.lang.reflect.Method;
+import java.util.function.Consumer;
+import org.springframework.messaging.handler.annotation.support.MessageHandlerMethodFactory;
+
+/**
+ * {@link Endpoint} specialization that indicates that {@link org.springframework.messaging.Message} instances coming
+ * from this endpoint will be handled by a {@link org.springframework.messaging.handler.HandlerMethod}.
+ *
+ * @author Tomaz Fernandes
+ * @since 3.0
+ */
+public interface HandlerMethodEndpoint extends Endpoint {
+
+ /**
+ * Set the bean containing the method to be invoked with the incoming messages.
+ * @param bean the bean.
+ */
+ void setBean(Object bean);
+
+ /**
+ * Set the method to be used when handling messages for this endpoint.
+ * @param method the method.
+ */
+ void setMethod(Method method);
+
+ /**
+ * Set the {@link MessageHandlerMethodFactory} to be used for creating the
+ * {@link org.springframework.messaging.handler.HandlerMethod}.
+ * @param handlerMethodFactory the factory.
+ */
+ void setHandlerMethodFactory(MessageHandlerMethodFactory handlerMethodFactory);
+
+ /**
+ * Allows configuring the {@link ListenerMode} for this endpoint.
+ * @param consumer a consumer for the strategy used by this endpoint.
+ */
+ void configureListenerMode(Consumer consumer);
+
+}
diff --git a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/config/MessageListenerContainerFactory.java b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/config/MessageListenerContainerFactory.java
new file mode 100644
index 000000000..88679e617
--- /dev/null
+++ b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/config/MessageListenerContainerFactory.java
@@ -0,0 +1,47 @@
+/*
+ * Copyright 2013-2022 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.awspring.cloud.sqs.config;
+
+import io.awspring.cloud.sqs.listener.MessageListenerContainer;
+
+/**
+ * Creates {@link MessageListenerContainer} instances for given {@link Endpoint} instances or endpoint names.
+ *
+ * @param the {@link MessageListenerContainer} type.
+ *
+ * @author Tomaz Fernandes
+ * @since 3.0
+ */
+@FunctionalInterface
+public interface MessageListenerContainerFactory> {
+
+ /**
+ * Create a container instance for the given endpoint names.
+ * @param logicalEndpointNames the names.
+ * @return the container instance.
+ */
+ C createContainer(String... logicalEndpointNames);
+
+ /**
+ * Create a container instance for the given {@link Endpoint}.
+ * @param endpoint the endpoint.
+ * @return the container instance.
+ */
+ default C createContainer(Endpoint endpoint) {
+ throw new UnsupportedOperationException("This factory is not capable of processing Endpoint instances.");
+ }
+
+}
diff --git a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/config/SqsBeanNames.java b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/config/SqsBeanNames.java
new file mode 100644
index 000000000..239f8a0e8
--- /dev/null
+++ b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/config/SqsBeanNames.java
@@ -0,0 +1,41 @@
+/*
+ * Copyright 2013-2022 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.awspring.cloud.sqs.config;
+
+/***
+ * Utility class containing the bean names used for the framework's bean registration.
+ *
+ * @author Tomaz Fernandes
+ * @since 3.0
+ */
+public class SqsBeanNames {
+
+ private SqsBeanNames() {
+ }
+
+ /**
+ * The bean name of the {@link io.awspring.cloud.sqs.listener.DefaultListenerContainerRegistry} registered by
+ * {@link SqsBootstrapConfiguration}.
+ */
+ public static final String ENDPOINT_REGISTRY_BEAN_NAME = "io.awspring.cloud.messaging.internalEndpointRegistryBeanName";
+
+ /**
+ * The bean name of the {@link io.awspring.cloud.sqs.annotation.SqsListenerAnnotationBeanPostProcessor} registered
+ * by {@link SqsBootstrapConfiguration}.
+ */
+ public static final String SQS_LISTENER_ANNOTATION_BEAN_POST_PROCESSOR_BEAN_NAME = "io.awspring.cloud.messaging.internalSqsListenerAnnotationBeanPostProcessor";
+
+}
diff --git a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/config/SqsBootstrapConfiguration.java b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/config/SqsBootstrapConfiguration.java
new file mode 100644
index 000000000..b6adb689d
--- /dev/null
+++ b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/config/SqsBootstrapConfiguration.java
@@ -0,0 +1,47 @@
+/*
+ * Copyright 2013-2022 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.awspring.cloud.sqs.config;
+
+import io.awspring.cloud.sqs.annotation.SqsListenerAnnotationBeanPostProcessor;
+import io.awspring.cloud.sqs.listener.DefaultListenerContainerRegistry;
+import org.springframework.beans.factory.support.BeanDefinitionRegistry;
+import org.springframework.beans.factory.support.RootBeanDefinition;
+import org.springframework.context.annotation.ImportBeanDefinitionRegistrar;
+import org.springframework.core.type.AnnotationMetadata;
+
+/**
+ * Registers the {@link DefaultListenerContainerRegistry} and {@link EndpointRegistrar} that will be used to bootstrap
+ * the AWS SQS integration.
+ *
+ * @author Tomaz Fernandes
+ * @since 3.0
+ */
+public class SqsBootstrapConfiguration implements ImportBeanDefinitionRegistrar {
+
+ @Override
+ public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {
+ if (!registry.containsBeanDefinition(SqsBeanNames.SQS_LISTENER_ANNOTATION_BEAN_POST_PROCESSOR_BEAN_NAME)) {
+ registry.registerBeanDefinition(SqsBeanNames.SQS_LISTENER_ANNOTATION_BEAN_POST_PROCESSOR_BEAN_NAME,
+ new RootBeanDefinition(SqsListenerAnnotationBeanPostProcessor.class));
+ }
+
+ if (!registry.containsBeanDefinition(SqsBeanNames.ENDPOINT_REGISTRY_BEAN_NAME)) {
+ registry.registerBeanDefinition(SqsBeanNames.ENDPOINT_REGISTRY_BEAN_NAME,
+ new RootBeanDefinition(DefaultListenerContainerRegistry.class));
+ }
+ }
+
+}
diff --git a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/config/SqsEndpoint.java b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/config/SqsEndpoint.java
new file mode 100644
index 000000000..7aa8b1f93
--- /dev/null
+++ b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/config/SqsEndpoint.java
@@ -0,0 +1,149 @@
+/*
+ * Copyright 2013-2022 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.awspring.cloud.sqs.config;
+
+import io.awspring.cloud.sqs.annotation.SqsListener;
+import java.time.Duration;
+import java.util.Collection;
+import org.springframework.lang.Nullable;
+
+/**
+ * {@link Endpoint} implementation for SQS endpoints.
+ *
+ * Contains properties that should be mapped from {@link SqsListener @SqsListener} annotations.
+ *
+ * @author Tomaz Fernandes
+ * @since 3.0
+ */
+public class SqsEndpoint extends AbstractEndpoint {
+
+ private final Integer maxInflightMessagesPerQueue;
+
+ private final Integer pollTimeoutSeconds;
+
+ private final Integer messageVisibility;
+
+ private final Integer maxMessagesPerPoll;
+
+ protected SqsEndpoint(SqsEndpointBuilder builder) {
+ super(builder.queueNames, builder.factoryName, builder.id);
+ this.maxInflightMessagesPerQueue = builder.maxInflightMessagesPerQueue;
+ this.pollTimeoutSeconds = builder.pollTimeoutSeconds;
+ this.messageVisibility = builder.messageVisibility;
+ this.maxMessagesPerPoll = builder.maxMessagesPerPoll;
+ }
+
+ /**
+ * Return a {@link SqsEndpointBuilder} instance with the provided queue names.
+ * @return the builder instance.
+ */
+ public static SqsEndpointBuilder builder() {
+ return new SqsEndpointBuilder();
+ }
+
+ /**
+ * The maximum number of inflight messages each queue in this endpoint can process simultaneously.
+ * @return the maximum number of inflight messages.
+ */
+ @Nullable
+ public Integer getMaxInflightMessagesPerQueue() {
+ return this.maxInflightMessagesPerQueue;
+ }
+
+ /**
+ * The maximum duration to wait for messages in a given poll.
+ * @return the poll timeout.
+ */
+ @Nullable
+ public Duration getPollTimeout() {
+ return this.pollTimeoutSeconds != null ? Duration.ofSeconds(this.pollTimeoutSeconds) : null;
+ }
+
+ /**
+ * Return the maximum amount of messages that should be returned in a poll.
+ * @return the maximum amount of messages.
+ */
+ @Nullable
+ public Integer getMaxMessagesPerPoll() {
+ return this.maxMessagesPerPoll;
+ }
+
+ /**
+ * Return the message visibility for this endpoint.
+ * @return the message visibility.
+ */
+ @Nullable
+ public Duration getMessageVisibility() {
+ return this.messageVisibility != null ? Duration.ofSeconds(this.messageVisibility) : null;
+ }
+
+ public static class SqsEndpointBuilder {
+
+ private Collection