diff --git a/connect-file-pulse-api/src/main/java/io/streamthoughts/kafka/connect/filepulse/internal/KafkaUtils.java b/connect-file-pulse-api/src/main/java/io/streamthoughts/kafka/connect/filepulse/internal/KafkaUtils.java new file mode 100644 index 000000000..5da8050e6 --- /dev/null +++ b/connect-file-pulse-api/src/main/java/io/streamthoughts/kafka/connect/filepulse/internal/KafkaUtils.java @@ -0,0 +1,69 @@ +/* + * Copyright 2023 StreamThoughts. + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamthoughts.kafka.connect.filepulse.internal; + +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.producer.ProducerConfig; + +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +public class KafkaUtils { + + /** + * Return only relevant key-value relevant for creating new AdminClient from the given map. + * + * @param configs the config key-value map. + * @return the map with only configs properties relevant for AdminClient. + */ + public static Map getAdminClientConfigs(final Map configs) { + return getConfigsForKeys(configs, AdminClientConfig.configNames()); + } + /** + * Return only relevant key-value relevant for creating new KafkaConsumer from the given map. + * + * @param configs the config key-value map. + * @return the map with only configs properties relevant for KafkaConsumer. + */ + public static Map getConsumerConfigs(final Map configs) { + return getConfigsForKeys(configs, ConsumerConfig.configNames()); + } + /** + * Return only relevant key-value relevant for creating new KafkaProducer from the given map. + * + * @param configs the config key-value map. + * @return the map with only configs properties relevant for KafkaProducer. + */ + public static Map getProducerConfigs(final Map configs) { + return getConfigsForKeys(configs, ProducerConfig.configNames()); + } + + private static Map getConfigsForKeys(final Map configs, + final Set keys) { + final Map parsed = new HashMap<>(); + for (final String configName : keys) { + if (configs.containsKey(configName)) { + parsed.put(configName, configs.get(configName)); + } + } + return parsed; + } +} diff --git a/connect-file-pulse-api/src/main/java/io/streamthoughts/kafka/connect/filepulse/storage/KafkaBasedLogFactory.java b/connect-file-pulse-api/src/main/java/io/streamthoughts/kafka/connect/filepulse/storage/KafkaBasedLogFactory.java index 0823479a1..371c8c948 100644 --- a/connect-file-pulse-api/src/main/java/io/streamthoughts/kafka/connect/filepulse/storage/KafkaBasedLogFactory.java +++ b/connect-file-pulse-api/src/main/java/io/streamthoughts/kafka/connect/filepulse/storage/KafkaBasedLogFactory.java @@ -19,9 +19,9 @@ package io.streamthoughts.kafka.connect.filepulse.storage; import java.util.Collections; -import java.util.HashMap; import java.util.Map; -import org.apache.kafka.clients.consumer.ConsumerConfig; + +import io.streamthoughts.kafka.connect.filepulse.internal.KafkaUtils; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.ByteArrayDeserializer; @@ -30,42 +30,54 @@ import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.utils.Time; +import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG; + /** */ class KafkaBasedLogFactory { - private final Map configs; + private final Map producerConfigs; + private final Map consumerConfigs; /** * Creates a new {@link KafkaBasedLogFactory} instance. * - * @param configs the kafka configuration. + * @param producerConfigs configuration options to use when creating the internal producer. + * @param consumerConfigs configuration options to use when creating the internal consumer. */ - KafkaBasedLogFactory(final Map configs) { - this.configs = Collections.unmodifiableMap(configs); + KafkaBasedLogFactory(final Map producerConfigs, + final Map consumerConfigs) { + this.producerConfigs = Collections.unmodifiableMap(producerConfigs); + this.consumerConfigs = Collections.unmodifiableMap(consumerConfigs); } KafkaBasedLog make(final String topic, final Callback> consumedCallback) { - return new KafkaBasedLog<>(topic, - newProducerConfigs(), newConsumerConfigs(), consumedCallback, Time.SYSTEM, null); + return new KafkaBasedLog<>( + topic, + newProducerConfigs(), + newConsumerConfigs(), + consumedCallback, + Time.SYSTEM, + null + ); } private Map newConsumerConfigs() { - final Map consumerProps = new HashMap<>(configs); - consumerProps - .put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); - consumerProps - .put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); - return consumerProps; + Map clientProps = KafkaUtils.getConsumerConfigs(consumerConfigs); + clientProps.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + clientProps.put(VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); + return clientProps; } private Map newProducerConfigs() { - final Map producerProps = new HashMap<>(configs); - producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); - producerProps - .put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); - producerProps.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE); - return producerProps; + Map clientProps = KafkaUtils.getProducerConfigs(producerConfigs); + clientProps.put(KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + clientProps.put(VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); + clientProps.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE); + return clientProps; } } diff --git a/connect-file-pulse-api/src/main/java/io/streamthoughts/kafka/connect/filepulse/storage/KafkaStateBackingStore.java b/connect-file-pulse-api/src/main/java/io/streamthoughts/kafka/connect/filepulse/storage/KafkaStateBackingStore.java index 68b810b67..9af774ee6 100644 --- a/connect-file-pulse-api/src/main/java/io/streamthoughts/kafka/connect/filepulse/storage/KafkaStateBackingStore.java +++ b/connect-file-pulse-api/src/main/java/io/streamthoughts/kafka/connect/filepulse/storage/KafkaStateBackingStore.java @@ -61,16 +61,18 @@ public class KafkaStateBackingStore implements StateBackingStore { * @param topic the topic back store. * @param keyPrefix the key-prefix. * @param groupId the group attached to the backing topic (i.e., the connector-name). - * @param configs the kafka configuration. + * @param producerProps the kafka producer properties. + * @param consumerProps the kafka consumer properties. * @param serde the state serdes. */ public KafkaStateBackingStore(final String topic, final String keyPrefix, final String groupId, - final Map configs, + final Map producerProps, + final Map consumerProps, final StateSerde serde, final boolean consumerEnabled) { - KafkaBasedLogFactory factory = new KafkaBasedLogFactory(configs); + KafkaBasedLogFactory factory = new KafkaBasedLogFactory(producerProps, consumerProps); this.kafkaLog = factory.make(topic, new ConsumeCallback()); this.groupId = sanitizeGroupId(groupId); this.serde = serde; @@ -81,6 +83,7 @@ public KafkaStateBackingStore(final String topic, private static String sanitizeGroupId(final String groupId) { return groupId.replaceAll("\\.", "-"); } + Status getState() { return this.status; } diff --git a/connect-file-pulse-api/src/test/java/io/streamthoughts/kafka/connect/filepulse/internal/KafkaUtilsTest.java b/connect-file-pulse-api/src/test/java/io/streamthoughts/kafka/connect/filepulse/internal/KafkaUtilsTest.java new file mode 100644 index 000000000..6b58cd179 --- /dev/null +++ b/connect-file-pulse-api/src/test/java/io/streamthoughts/kafka/connect/filepulse/internal/KafkaUtilsTest.java @@ -0,0 +1,70 @@ +/* + * Copyright 2023 StreamThoughts. + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamthoughts.kafka.connect.filepulse.internal; + +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.Map; + +class KafkaUtilsTest { + + + @Test + void should_return_producer_client_props() { + Map configs = Map.of( + "foo", "???", + "bar", "???", + ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "???", + ProducerConfig.ACKS_CONFIG, "???" + ); + Map producerConfigs = KafkaUtils.getProducerConfigs(configs); + Assertions.assertEquals(2, producerConfigs.size()); + Assertions.assertNotNull(producerConfigs.get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)); + Assertions.assertNotNull(producerConfigs.get(ProducerConfig.ACKS_CONFIG)); + } + + @Test + void should_return_consumer_client_props() { + Map configs = Map.of( + "foo", "???", + "bar", "???", + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "???", + ConsumerConfig.GROUP_ID_CONFIG, "???" + ); + Map consumerConfigs = KafkaUtils.getConsumerConfigs(configs); + Assertions.assertEquals(2, consumerConfigs.size()); + Assertions.assertNotNull(consumerConfigs.get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)); + Assertions.assertNotNull(consumerConfigs.get(ConsumerConfig.GROUP_ID_CONFIG)); + } + + @Test + void should_return_admin_client_props() { + Map configs = Map.of( + "foo", "???", + "bar", "???", + AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "???" + ); + Map clientConfigs = KafkaUtils.getAdminClientConfigs(configs); + Assertions.assertEquals(1, clientConfigs.size()); + } +} \ No newline at end of file diff --git a/connect-file-pulse-filesystems/filepulse-amazons3-fs/pom.xml b/connect-file-pulse-filesystems/filepulse-amazons3-fs/pom.xml index dd6355034..4fd57f6dc 100644 --- a/connect-file-pulse-filesystems/filepulse-amazons3-fs/pom.xml +++ b/connect-file-pulse-filesystems/filepulse-amazons3-fs/pom.xml @@ -33,9 +33,27 @@ ${project.parent.basedir}/.. - 1.12.428 + 1.12.428 + 1.1.6 + + + + com.amazonaws + aws-java-sdk-bom + ${aws.version} + pom + import + + + com.amazonaws + aws-java-sdk-sts + ${aws.version} + + + + io.streamthoughts @@ -53,12 +71,16 @@ com.amazonaws aws-java-sdk-s3 - ${aws-java-sdk.version} com.amazonaws aws-java-sdk-sts - ${aws-java-sdk.version} + + + + software.amazon.msk + aws-msk-iam-auth + ${aws.msk.version} org.apache.avro diff --git a/connect-file-pulse-plugin/src/main/java/io/streamthoughts/kafka/connect/filepulse/state/KafkaFileObjectStateBackingStore.java b/connect-file-pulse-plugin/src/main/java/io/streamthoughts/kafka/connect/filepulse/state/KafkaFileObjectStateBackingStore.java index 01f8aa342..06dd4e073 100644 --- a/connect-file-pulse-plugin/src/main/java/io/streamthoughts/kafka/connect/filepulse/state/KafkaFileObjectStateBackingStore.java +++ b/connect-file-pulse-plugin/src/main/java/io/streamthoughts/kafka/connect/filepulse/state/KafkaFileObjectStateBackingStore.java @@ -57,12 +57,13 @@ public void configure(final Map props) { config.getTaskStorageTopic(), KEY_PREFIX, config.getTaskStorageName(), - config.getTaskStorageConfigs(), + config.getProducerTaskStorageConfigs(), + config.getConsumerTaskStorageConfigs(), new FileObjectSerde(), config.getTaskStorageConsumerEnabled() ); - try (AdminClient client = AdminClient.create(config.getTaskStorageConfigs())) { + try (AdminClient client = AdminClient.create(config.getAdminClientTaskStorageConfigs())) { Map topicConfig = new HashMap<>(); topicConfig.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT); final NewTopic newTopic = new NewTopic( diff --git a/connect-file-pulse-plugin/src/main/java/io/streamthoughts/kafka/connect/filepulse/state/KafkaFileObjectStateBackingStoreConfig.java b/connect-file-pulse-plugin/src/main/java/io/streamthoughts/kafka/connect/filepulse/state/KafkaFileObjectStateBackingStoreConfig.java index 1f7d20896..2b39eef5f 100644 --- a/connect-file-pulse-plugin/src/main/java/io/streamthoughts/kafka/connect/filepulse/state/KafkaFileObjectStateBackingStoreConfig.java +++ b/connect-file-pulse-plugin/src/main/java/io/streamthoughts/kafka/connect/filepulse/state/KafkaFileObjectStateBackingStoreConfig.java @@ -18,6 +18,7 @@ */ package io.streamthoughts.kafka.connect.filepulse.state; +import io.streamthoughts.kafka.connect.filepulse.internal.KafkaUtils; import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.config.AbstractConfig; @@ -70,24 +71,47 @@ public String getTaskStorageName() { return this.getString(TASKS_FILE_STATUS_STORAGE_NAME_CONFIG); } - public Map getTaskStorageConfigs() { + public Map getConsumerTaskStorageConfigs() { final Map configs = new HashMap<>(); configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, getInternalBootstrapServers()); configs.putAll(getInternalKafkaConsumerConfigs()); + return configs; + } + + public Map getProducerTaskStorageConfigs() { + final Map configs = new HashMap<>(); + configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, getInternalBootstrapServers()); configs.putAll(getInternalKafkaProducerConfigs()); return configs; } + public Map getAdminClientTaskStorageConfigs() { + final Map configs = new HashMap<>(); + configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, getInternalBootstrapServers()); + configs.putAll(getInternalKafkaAdminClientConfigs()); + return configs; + } + private String getInternalBootstrapServers() { return this.getString(TASKS_FILE_STATUS_STORAGE_BOOTSTRAP_SERVERS_CONFIG); } + private Map getInternalKafkaAdminClientConfigs() { + Map consumerConfigs = KafkaUtils.getAdminClientConfigs(originals()); + consumerConfigs.putAll(originalsWithPrefix("tasks.file.status.storage.admin.")); + return consumerConfigs; + } + private Map getInternalKafkaConsumerConfigs() { - return this.originalsWithPrefix("tasks.file.status.storage.consumer."); + Map consumerConfigs = KafkaUtils.getConsumerConfigs(originals()); + consumerConfigs.putAll(originalsWithPrefix("tasks.file.status.storage.consumer.")); + return consumerConfigs; } private Map getInternalKafkaProducerConfigs() { - return this.originalsWithPrefix("tasks.file.status.storage.producer."); + Map producerConfigs = KafkaUtils.getProducerConfigs(originals()); + producerConfigs.putAll(originalsWithPrefix("tasks.file.status.storage.producer.")); + return producerConfigs; } Optional getTopicPartitions() {