|
| 1 | +package org.springframework.integration.aws.config.xml; |
| 2 | + |
| 3 | +import org.springframework.beans.factory.config.RuntimeBeanReference; |
| 4 | +import org.springframework.beans.factory.support.AbstractBeanDefinition; |
| 5 | +import org.springframework.beans.factory.support.BeanDefinitionBuilder; |
| 6 | +import org.springframework.beans.factory.xml.ParserContext; |
| 7 | +import org.springframework.integration.aws.inbound.kinesis.KclMessageDrivenChannelAdapter; |
| 8 | +import org.springframework.integration.config.xml.AbstractChannelAdapterParser; |
| 9 | +import org.springframework.util.StringUtils; |
| 10 | +import org.w3c.dom.Element; |
| 11 | + |
| 12 | +import java.util.AbstractMap; |
| 13 | +import java.util.List; |
| 14 | +import java.util.Map.Entry; |
| 15 | + |
| 16 | +public class KclMessageDrivenChannelAdapterParser extends AbstractChannelAdapterParser { |
| 17 | + |
| 18 | + @Override |
| 19 | + protected AbstractBeanDefinition doParse(Element element, ParserContext parserContext, String channelName) { |
| 20 | + return XmlBeanDefinitionBuilder.newInstance(element, parserContext, KclMessageDrivenChannelAdapter.class) |
| 21 | + .configure(builder -> addConstructorArgs(builder, element)) |
| 22 | + .addConstructorArgValue("streams") |
| 23 | + .setPropertyIfAttributeDefined("error-channel", "errorChannelName") |
| 24 | + .setPropertyIfAttributeDefined("worker-id") |
| 25 | + .setPropertyIfAttributeDefined("consumer-group") |
| 26 | + .setPropertyIfAttributeDefined("lease-table-name") |
| 27 | + .setPropertyIfAttributeDefined("consumer-backoff") |
| 28 | + .setPropertyIfAttributeDefined("empty-record-list") |
| 29 | + .setPropertyIfAttributeDefined("fan-out") |
| 30 | + .setPropertyIfAttributeDefined("bind-source-record") |
| 31 | + .setPropertyIfAttributeDefined("checkpoint-mode") |
| 32 | + .setPropertyIfAttributeDefined("checkpoints-interval") |
| 33 | + .setPropertyIfAttributeDefined("listener-mode") |
| 34 | + .setPropertyIfAttributeDefined("metrics-level") |
| 35 | + .setPropertyIfAttributeDefined("polling-idle-time") |
| 36 | + .setPropertyIfAttributeDefined("polling-max-records") |
| 37 | + .setPropertyIfAttributeDefined("graceful-shutdown-timeout") |
| 38 | + .setPropertyReferenceIfAttributeDefined("glue-schema-registry-deserializer") |
| 39 | + .setPropertyReferenceIfAttributeDefined("executor") |
| 40 | + .setPropertyReferenceIfAttributeDefined("converter") |
| 41 | + .setPropertyReferenceIfAttributeDefined("embedded-headers-mapper") |
| 42 | + .setPropertyReferenceIfAttributeDefined("stream-initial-sequence") |
| 43 | + .getBeanDefinitionBuilder() |
| 44 | + .addPropertyValue("outputChannelName", channelName) |
| 45 | + .getBeanDefinition(); |
| 46 | + } |
| 47 | + |
| 48 | + private void addConstructorArgs(BeanDefinitionBuilder builder, Element element) { |
| 49 | + var args = List.of( |
| 50 | + new AbstractMap.SimpleEntry<>("kinesis-client", "software.amazon.awssdk.services.kinesis.KinesisAsyncClient"), |
| 51 | + new AbstractMap.SimpleEntry<>("cloud-watch-client", "software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient"), |
| 52 | + new AbstractMap.SimpleEntry<>("dynamo-db-client", "software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient") |
| 53 | + ); |
| 54 | + if (args.stream().map(Entry::getKey).map(element::getAttribute).anyMatch(StringUtils::hasText)) { |
| 55 | + for (AbstractMap.SimpleEntry<String, String> e : args) { |
| 56 | + builder.addConstructorArgValue(arg(element.getAttribute(e.getKey()), e.getValue())); |
| 57 | + } |
| 58 | + } |
| 59 | + } |
| 60 | + |
| 61 | + private Object arg(String ref, String beanClass) { |
| 62 | + return StringUtils.hasText(ref) |
| 63 | + ? new RuntimeBeanReference(ref) |
| 64 | + : BeanDefinitionBuilder.genericBeanDefinition(beanClass) |
| 65 | + .setFactoryMethod("create") |
| 66 | + .applyCustomizers(def -> def.setAutowireCandidate(false)) |
| 67 | + .getBeanDefinition(); |
| 68 | + } |
| 69 | +} |
0 commit comments