Skip to content

Commit f407049

Browse files
committed
feat: SqsMessageDrivenChannelAdapterParser
1 parent 5df59f8 commit f407049

File tree

4 files changed

+111
-2
lines changed

4 files changed

+111
-2
lines changed
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
package org.springframework.integration.aws.config.xml;
2+
3+
import io.awspring.cloud.sqs.listener.QueueNotFoundStrategy;
4+
import io.awspring.cloud.sqs.listener.SqsContainerOptions;
5+
import io.awspring.cloud.sqs.listener.SqsContainerOptionsBuilder;
6+
import io.awspring.cloud.sqs.listener.acknowledgement.handler.AcknowledgementMode;
7+
import org.springframework.beans.factory.FactoryBean;
8+
import org.springframework.core.task.TaskExecutor;
9+
10+
import java.time.Duration;
11+
12+
class SqsContainerOptionsFactoryBean implements FactoryBean<SqsContainerOptions> {
13+
14+
private final SqsContainerOptionsBuilder builder = SqsContainerOptions.builder();
15+
16+
@Override
17+
public boolean isSingleton() {
18+
return false;
19+
}
20+
21+
@Override
22+
public Class<SqsContainerOptions> getObjectType() {
23+
return SqsContainerOptions.class;
24+
}
25+
26+
@Override
27+
public SqsContainerOptions getObject() {
28+
return builder.build();
29+
}
30+
31+
public void setVisibilityTimeout(long seconds) {
32+
builder.messageVisibility(Duration.ofSeconds(seconds));
33+
}
34+
35+
public void setMaxNumberOfMessages(int maxMessages) {
36+
builder.maxConcurrentMessages(maxMessages)
37+
.maxMessagesPerPoll(maxMessages);
38+
}
39+
40+
public void setTaskExecutor(TaskExecutor taskExecutor) {
41+
builder.componentsTaskExecutor(taskExecutor);
42+
}
43+
44+
public void setQueueStopTimeout(long millis) {
45+
builder.listenerShutdownTimeout(Duration.ofMillis(millis));
46+
}
47+
48+
public void setWaitTimeOut(long seconds) {
49+
builder.pollTimeout(Duration.ofSeconds(seconds));
50+
}
51+
52+
public void setMessageDeletionPolicy(String policy) {
53+
builder.acknowledgementMode("NO_REDRIVE".equals(policy)
54+
? AcknowledgementMode.ON_SUCCESS
55+
: AcknowledgementMode.valueOf(policy));
56+
}
57+
58+
public void setFailOnMissingQueue(boolean fail) {
59+
builder.queueNotFoundStrategy(fail
60+
? QueueNotFoundStrategy.FAIL
61+
: QueueNotFoundStrategy.CREATE);
62+
}
63+
}
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
package org.springframework.integration.aws.config.xml;
2+
3+
import org.springframework.beans.factory.support.AbstractBeanDefinition;
4+
import org.springframework.beans.factory.xml.ParserContext;
5+
import org.springframework.integration.aws.inbound.SqsMessageDrivenChannelAdapter;
6+
import org.springframework.integration.config.xml.AbstractChannelAdapterParser;
7+
import org.w3c.dom.Element;
8+
9+
public class SqsMessageDrivenChannelAdapterParser extends AbstractChannelAdapterParser {
10+
11+
@Override
12+
protected AbstractBeanDefinition doParse(Element element, ParserContext parserContext, String channelName) {
13+
return XmlBeanDefinitionBuilder.newInstance(element, parserContext, SqsMessageDrivenChannelAdapter.class)
14+
.unsupportedAttributeWarning("resource-id-resolver")
15+
.addConstructorArgReference("sqs")
16+
.addConstructorArgValue("queues")
17+
.setPropertyValueIfAttributeDefined("send-timeout")
18+
.setPropertyValueIfAttributeDefined("errorChannelName", "error-channel")
19+
.setPropertyReferenceIfAttributeDefined("channelResolver", "destination-resolver")
20+
.getBeanDefinitionBuilder()
21+
.addPropertyValue("outputChannelName", channelName)
22+
.addPropertyValue("sqsContainerOptions", sqsContainerOptions(element, parserContext))
23+
.getBeanDefinition();
24+
}
25+
26+
private AbstractBeanDefinition sqsContainerOptions(Element element, ParserContext parserContext) {
27+
return XmlBeanDefinitionBuilder.newInstance(element, parserContext, SqsContainerOptionsFactoryBean.class)
28+
.setPropertyValueIfAttributeDefined("max-number-of-messages")
29+
.setPropertyValueIfAttributeDefined("visibility-timeout")
30+
.setPropertyValueIfAttributeDefined("queue-stop-timeout")
31+
.setPropertyValueIfAttributeDefined("wait-time-out")
32+
.setPropertyValueIfAttributeDefined("message-deletion-policy")
33+
.setPropertyValueIfAttributeDefined("fail-on-missing-queue")
34+
.setPropertyReferenceIfAttributeDefined("task-executor")
35+
.getBeanDefinitionBuilder()
36+
.applyCustomizers(bean -> bean.setAutowireCandidate(false))
37+
.getBeanDefinition();
38+
}
39+
}
Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1,2 @@
1-
sqs-outbound-channel-adapter: org.springframework.integration.aws.config.xml.SqsOutboundChannelAdapterParser
1+
sqs-outbound-channel-adapter: org.springframework.integration.aws.config.xml.SqsOutboundChannelAdapterParser
2+
sqs-message-driven-channel-adapter: org.springframework.integration.aws.config.xml.SqsMessageDrivenChannelAdapterParser

int-aws-support/src/test/java/org/springframework/integration/aws/config/xml/SpringIntegrationAwsNamespaceHandlerTest.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,12 @@ void test() {
1515
handler.init();
1616
assertThat(handler)
1717
.extracting("parsers", InstanceOfAssertFactories.MAP)
18-
.hasEntrySatisfying("sqs-outbound-channel-adapter", new Condition<>(SqsOutboundChannelAdapterParser.class::isInstance, "instance of SqsOutboundChannelAdapterParser"));
18+
.hasEntrySatisfying("sqs-outbound-channel-adapter", isInstanceOf(SqsOutboundChannelAdapterParser.class))
19+
.hasEntrySatisfying("sqs-message-driven-channel-adapter", isInstanceOf(SqsMessageDrivenChannelAdapterParser.class))
20+
;
21+
}
22+
23+
private Condition<Object> isInstanceOf(Class<?> type) {
24+
return new Condition<>(type::isInstance, "instance of " + type);
1925
}
2026
}

0 commit comments

Comments
 (0)