Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Copyright 2023 StreamThoughts.
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.streamthoughts.kafka.connect.filepulse.internal;

import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;

import java.util.HashMap;
import java.util.Map;
import java.util.Set;

public class KafkaUtils {

/**
* Return only relevant key-value relevant for creating new AdminClient from the given map.
*
* @param configs the config key-value map.
* @return the map with only configs properties relevant for AdminClient.
*/
public static Map<String, Object> getAdminClientConfigs(final Map<String, ?> configs) {
return getConfigsForKeys(configs, AdminClientConfig.configNames());
}
/**
* Return only relevant key-value relevant for creating new KafkaConsumer from the given map.
*
* @param configs the config key-value map.
* @return the map with only configs properties relevant for KafkaConsumer.
*/
public static Map<String, Object> getConsumerConfigs(final Map<String, ?> configs) {
return getConfigsForKeys(configs, ConsumerConfig.configNames());
}
/**
* Return only relevant key-value relevant for creating new KafkaProducer from the given map.
*
* @param configs the config key-value map.
* @return the map with only configs properties relevant for KafkaProducer.
*/
public static Map<String, Object> getProducerConfigs(final Map<String, ?> configs) {
return getConfigsForKeys(configs, ProducerConfig.configNames());
}

private static Map<String, Object> getConfigsForKeys(final Map<String, ?> configs,
final Set<String> keys) {
final Map<String, Object> parsed = new HashMap<>();
for (final String configName : keys) {
if (configs.containsKey(configName)) {
parsed.put(configName, configs.get(configName));
}
}
return parsed;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@
package io.streamthoughts.kafka.connect.filepulse.storage;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerConfig;

import io.streamthoughts.kafka.connect.filepulse.internal.KafkaUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
Expand All @@ -30,42 +30,54 @@
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Time;

import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG;
import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG;

/**
*/
class KafkaBasedLogFactory {

private final Map<String, ?> configs;
private final Map<String, ?> producerConfigs;
private final Map<String, ?> consumerConfigs;

/**
* Creates a new {@link KafkaBasedLogFactory} instance.
*
* @param configs the kafka configuration.
* @param producerConfigs configuration options to use when creating the internal producer.
* @param consumerConfigs configuration options to use when creating the internal consumer.
*/
KafkaBasedLogFactory(final Map<String, ?> configs) {
this.configs = Collections.unmodifiableMap(configs);
KafkaBasedLogFactory(final Map<String, ?> producerConfigs,
final Map<String, ?> consumerConfigs) {
this.producerConfigs = Collections.unmodifiableMap(producerConfigs);
this.consumerConfigs = Collections.unmodifiableMap(consumerConfigs);
}

KafkaBasedLog<String, byte[]> make(final String topic,
final Callback<ConsumerRecord<String, byte[]>> consumedCallback) {
return new KafkaBasedLog<>(topic,
newProducerConfigs(), newConsumerConfigs(), consumedCallback, Time.SYSTEM, null);
return new KafkaBasedLog<>(
topic,
newProducerConfigs(),
newConsumerConfigs(),
consumedCallback,
Time.SYSTEM,
null
);
}

private Map<String, Object> newConsumerConfigs() {
final Map<String, Object> consumerProps = new HashMap<>(configs);
consumerProps
.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProps
.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
return consumerProps;
Map<String, Object> clientProps = KafkaUtils.getConsumerConfigs(consumerConfigs);
clientProps.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
clientProps.put(VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
return clientProps;
}

private Map<String, Object> newProducerConfigs() {
final Map<String, Object> producerProps = new HashMap<>(configs);
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
producerProps
.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
producerProps.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
return producerProps;
Map<String, Object> clientProps = KafkaUtils.getProducerConfigs(producerConfigs);
clientProps.put(KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
clientProps.put(VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
clientProps.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
return clientProps;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,16 +61,18 @@ public class KafkaStateBackingStore<T> implements StateBackingStore<T> {
* @param topic the topic back store.
* @param keyPrefix the key-prefix.
* @param groupId the group attached to the backing topic (i.e., the connector-name).
* @param configs the kafka configuration.
* @param producerProps the kafka producer properties.
* @param consumerProps the kafka consumer properties.
* @param serde the state serdes.
*/
public KafkaStateBackingStore(final String topic,
final String keyPrefix,
final String groupId,
final Map<String, ?> configs,
final Map<String, ?> producerProps,
final Map<String, ?> consumerProps,
final StateSerde<T> serde,
final boolean consumerEnabled) {
KafkaBasedLogFactory factory = new KafkaBasedLogFactory(configs);
KafkaBasedLogFactory factory = new KafkaBasedLogFactory(producerProps, consumerProps);
this.kafkaLog = factory.make(topic, new ConsumeCallback());
this.groupId = sanitizeGroupId(groupId);
this.serde = serde;
Expand All @@ -81,6 +83,7 @@ public KafkaStateBackingStore(final String topic,
private static String sanitizeGroupId(final String groupId) {
return groupId.replaceAll("\\.", "-");
}

Status getState() {
return this.status;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Copyright 2023 StreamThoughts.
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.streamthoughts.kafka.connect.filepulse.internal;

import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import java.util.Map;

class KafkaUtilsTest {


@Test
void should_return_producer_client_props() {
Map<String, String> configs = Map.of(
"foo", "???",
"bar", "???",
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "???",
ProducerConfig.ACKS_CONFIG, "???"
);
Map<String, Object> producerConfigs = KafkaUtils.getProducerConfigs(configs);
Assertions.assertEquals(2, producerConfigs.size());
Assertions.assertNotNull(producerConfigs.get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));
Assertions.assertNotNull(producerConfigs.get(ProducerConfig.ACKS_CONFIG));
}

@Test
void should_return_consumer_client_props() {
Map<String, String> configs = Map.of(
"foo", "???",
"bar", "???",
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "???",
ConsumerConfig.GROUP_ID_CONFIG, "???"
);
Map<String, Object> consumerConfigs = KafkaUtils.getConsumerConfigs(configs);
Assertions.assertEquals(2, consumerConfigs.size());
Assertions.assertNotNull(consumerConfigs.get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG));
Assertions.assertNotNull(consumerConfigs.get(ConsumerConfig.GROUP_ID_CONFIG));
}

@Test
void should_return_admin_client_props() {
Map<String, String> configs = Map.of(
"foo", "???",
"bar", "???",
AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "???"
);
Map<String, Object> clientConfigs = KafkaUtils.getAdminClientConfigs(configs);
Assertions.assertEquals(1, clientConfigs.size());
}
}
28 changes: 25 additions & 3 deletions connect-file-pulse-filesystems/filepulse-amazons3-fs/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,27 @@

<properties>
<checkstyle.config.location>${project.parent.basedir}/..</checkstyle.config.location>
<aws-java-sdk.version>1.12.428</aws-java-sdk.version>
<aws.version>1.12.428</aws.version>
<aws.msk.version>1.1.6</aws.msk.version>
</properties>

<dependencyManagement>
<dependencies>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-bom</artifactId>
<version>${aws.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-sts</artifactId>
<version>${aws.version}</version>
</dependency>
</dependencies>
</dependencyManagement>

<dependencies>
<dependency>
<groupId>io.streamthoughts</groupId>
Expand All @@ -53,12 +71,16 @@
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-s3</artifactId>
<version>${aws-java-sdk.version}</version>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-sts</artifactId>
<version>${aws-java-sdk.version}</version>
</dependency>
<!-- Add AWS MSK IAM support -->
<dependency>
<groupId>software.amazon.msk</groupId>
<artifactId>aws-msk-iam-auth</artifactId>
<version>${aws.msk.version}</version>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,13 @@ public void configure(final Map<String, ?> props) {
config.getTaskStorageTopic(),
KEY_PREFIX,
config.getTaskStorageName(),
config.getTaskStorageConfigs(),
config.getProducerTaskStorageConfigs(),
config.getConsumerTaskStorageConfigs(),
new FileObjectSerde(),
config.getTaskStorageConsumerEnabled()
);

try (AdminClient client = AdminClient.create(config.getTaskStorageConfigs())) {
try (AdminClient client = AdminClient.create(config.getAdminClientTaskStorageConfigs())) {
Map<String, String> topicConfig = new HashMap<>();
topicConfig.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT);
final NewTopic newTopic = new NewTopic(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package io.streamthoughts.kafka.connect.filepulse.state;

import io.streamthoughts.kafka.connect.filepulse.internal.KafkaUtils;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.AbstractConfig;
Expand Down Expand Up @@ -70,24 +71,47 @@ public String getTaskStorageName() {
return this.getString(TASKS_FILE_STATUS_STORAGE_NAME_CONFIG);
}

public Map<String, Object> getTaskStorageConfigs() {
public Map<String, Object> getConsumerTaskStorageConfigs() {
final Map<String, Object> configs = new HashMap<>();
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, getInternalBootstrapServers());
configs.putAll(getInternalKafkaConsumerConfigs());
return configs;
}

public Map<String, Object> getProducerTaskStorageConfigs() {
final Map<String, Object> configs = new HashMap<>();
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, getInternalBootstrapServers());
configs.putAll(getInternalKafkaProducerConfigs());
return configs;
}

public Map<String, Object> getAdminClientTaskStorageConfigs() {
final Map<String, Object> configs = new HashMap<>();
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, getInternalBootstrapServers());
configs.putAll(getInternalKafkaAdminClientConfigs());
return configs;
}

private String getInternalBootstrapServers() {
return this.getString(TASKS_FILE_STATUS_STORAGE_BOOTSTRAP_SERVERS_CONFIG);
}

private Map<String, Object> getInternalKafkaAdminClientConfigs() {
Map<String, Object> consumerConfigs = KafkaUtils.getAdminClientConfigs(originals());
consumerConfigs.putAll(originalsWithPrefix("tasks.file.status.storage.admin."));
return consumerConfigs;
}

private Map<String, Object> getInternalKafkaConsumerConfigs() {
return this.originalsWithPrefix("tasks.file.status.storage.consumer.");
Map<String, Object> consumerConfigs = KafkaUtils.getConsumerConfigs(originals());
consumerConfigs.putAll(originalsWithPrefix("tasks.file.status.storage.consumer."));
return consumerConfigs;
}

private Map<String, Object> getInternalKafkaProducerConfigs() {
return this.originalsWithPrefix("tasks.file.status.storage.producer.");
Map<String, Object> producerConfigs = KafkaUtils.getProducerConfigs(originals());
producerConfigs.putAll(originalsWithPrefix("tasks.file.status.storage.producer."));
return producerConfigs;
}

Optional<Integer> getTopicPartitions() {
Expand Down