Skip to content

Commit 55b7046

Browse files
committed
feat: Add AWS MSK IAM support
This commit adds the aws-msk-iam-auth to enable MSK authentication. In addition, it improve KafkaStateBackingStore configuration. GH #403
1 parent 54654ea commit 55b7046

File tree

7 files changed

+232
-31
lines changed

7 files changed

+232
-31
lines changed
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
/*
2+
* Copyright 2023 StreamThoughts.
3+
*
4+
* Licensed to the Apache Software Foundation (ASF) under one or more
5+
* contributor license agreements. See the NOTICE file distributed with
6+
* this work for additional information regarding copyright ownership.
7+
* The ASF licenses this file to You under the Apache License, Version 2.0
8+
* (the "License"); you may not use this file except in compliance with
9+
* the License. You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package io.streamthoughts.kafka.connect.filepulse.internal;
20+
21+
import org.apache.kafka.clients.admin.AdminClientConfig;
22+
import org.apache.kafka.clients.consumer.ConsumerConfig;
23+
import org.apache.kafka.clients.producer.ProducerConfig;
24+
25+
import java.util.HashMap;
26+
import java.util.Map;
27+
import java.util.Set;
28+
29+
public class KafkaUtils {
30+
31+
/**
32+
* Return only relevant key-value relevant for creating new AdminClient from the given map.
33+
*
34+
* @param configs the config key-value map.
35+
* @return the map with only configs properties relevant for AdminClient.
36+
*/
37+
public static Map<String, Object> getAdminClientConfigs(final Map<String, ?> configs) {
38+
return getConfigsForKeys(configs, AdminClientConfig.configNames());
39+
}
40+
/**
41+
* Return only relevant key-value relevant for creating new KafkaConsumer from the given map.
42+
*
43+
* @param configs the config key-value map.
44+
* @return the map with only configs properties relevant for KafkaConsumer.
45+
*/
46+
public static Map<String, Object> getConsumerConfigs(final Map<String, ?> configs) {
47+
return getConfigsForKeys(configs, ConsumerConfig.configNames());
48+
}
49+
/**
50+
* Return only relevant key-value relevant for creating new KafkaProducer from the given map.
51+
*
52+
* @param configs the config key-value map.
53+
* @return the map with only configs properties relevant for KafkaProducer.
54+
*/
55+
public static Map<String, Object> getProducerConfigs(final Map<String, ?> configs) {
56+
return getConfigsForKeys(configs, ProducerConfig.configNames());
57+
}
58+
59+
private static Map<String, Object> getConfigsForKeys(final Map<String, ?> configs,
60+
final Set<String> keys) {
61+
final Map<String, Object> parsed = new HashMap<>();
62+
for (final String configName : keys) {
63+
if (configs.containsKey(configName)) {
64+
parsed.put(configName, configs.get(configName));
65+
}
66+
}
67+
return parsed;
68+
}
69+
}

connect-file-pulse-api/src/main/java/io/streamthoughts/kafka/connect/filepulse/storage/KafkaBasedLogFactory.java

Lines changed: 32 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,9 @@
1919
package io.streamthoughts.kafka.connect.filepulse.storage;
2020

2121
import java.util.Collections;
22-
import java.util.HashMap;
2322
import java.util.Map;
24-
import org.apache.kafka.clients.consumer.ConsumerConfig;
23+
24+
import io.streamthoughts.kafka.connect.filepulse.internal.KafkaUtils;
2525
import org.apache.kafka.clients.consumer.ConsumerRecord;
2626
import org.apache.kafka.clients.producer.ProducerConfig;
2727
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
@@ -30,42 +30,54 @@
3030
import org.apache.kafka.common.serialization.StringSerializer;
3131
import org.apache.kafka.common.utils.Time;
3232

33+
import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
34+
import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
35+
import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG;
36+
import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG;
37+
3338
/**
3439
*/
3540
class KafkaBasedLogFactory {
3641

37-
private final Map<String, ?> configs;
42+
private final Map<String, ?> producerConfigs;
43+
private final Map<String, ?> consumerConfigs;
3844

3945
/**
4046
* Creates a new {@link KafkaBasedLogFactory} instance.
4147
*
42-
* @param configs the kafka configuration.
48+
* @param producerConfigs configuration options to use when creating the internal producer.
49+
* @param consumerConfigs configuration options to use when creating the internal consumer.
4350
*/
44-
KafkaBasedLogFactory(final Map<String, ?> configs) {
45-
this.configs = Collections.unmodifiableMap(configs);
51+
KafkaBasedLogFactory(final Map<String, ?> producerConfigs,
52+
final Map<String, ?> consumerConfigs) {
53+
this.producerConfigs = Collections.unmodifiableMap(producerConfigs);
54+
this.consumerConfigs = Collections.unmodifiableMap(consumerConfigs);
4655
}
4756

4857
KafkaBasedLog<String, byte[]> make(final String topic,
4958
final Callback<ConsumerRecord<String, byte[]>> consumedCallback) {
50-
return new KafkaBasedLog<>(topic,
51-
newProducerConfigs(), newConsumerConfigs(), consumedCallback, Time.SYSTEM, null);
59+
return new KafkaBasedLog<>(
60+
topic,
61+
newProducerConfigs(),
62+
newConsumerConfigs(),
63+
consumedCallback,
64+
Time.SYSTEM,
65+
null
66+
);
5267
}
5368

5469
private Map<String, Object> newConsumerConfigs() {
55-
final Map<String, Object> consumerProps = new HashMap<>(configs);
56-
consumerProps
57-
.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
58-
consumerProps
59-
.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
60-
return consumerProps;
70+
Map<String, Object> clientProps = KafkaUtils.getConsumerConfigs(consumerConfigs);
71+
clientProps.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
72+
clientProps.put(VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
73+
return clientProps;
6174
}
6275

6376
private Map<String, Object> newProducerConfigs() {
64-
final Map<String, Object> producerProps = new HashMap<>(configs);
65-
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
66-
producerProps
67-
.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
68-
producerProps.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
69-
return producerProps;
77+
Map<String, Object> clientProps = KafkaUtils.getProducerConfigs(producerConfigs);
78+
clientProps.put(KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
79+
clientProps.put(VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
80+
clientProps.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
81+
return clientProps;
7082
}
7183
}

connect-file-pulse-api/src/main/java/io/streamthoughts/kafka/connect/filepulse/storage/KafkaStateBackingStore.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -61,16 +61,18 @@ public class KafkaStateBackingStore<T> implements StateBackingStore<T> {
6161
* @param topic the topic back store.
6262
* @param keyPrefix the key-prefix.
6363
* @param groupId the group attached to the backing topic (i.e., the connector-name).
64-
* @param configs the kafka configuration.
64+
* @param producerProps the kafka producer properties.
65+
* @param consumerProps the kafka consumer properties.
6566
* @param serde the state serdes.
6667
*/
6768
public KafkaStateBackingStore(final String topic,
6869
final String keyPrefix,
6970
final String groupId,
70-
final Map<String, ?> configs,
71+
final Map<String, ?> producerProps,
72+
final Map<String, ?> consumerProps,
7173
final StateSerde<T> serde,
7274
final boolean consumerEnabled) {
73-
KafkaBasedLogFactory factory = new KafkaBasedLogFactory(configs);
75+
KafkaBasedLogFactory factory = new KafkaBasedLogFactory(producerProps, consumerProps);
7476
this.kafkaLog = factory.make(topic, new ConsumeCallback());
7577
this.groupId = sanitizeGroupId(groupId);
7678
this.serde = serde;
@@ -81,6 +83,7 @@ public KafkaStateBackingStore(final String topic,
8183
private static String sanitizeGroupId(final String groupId) {
8284
return groupId.replaceAll("\\.", "-");
8385
}
86+
8487
Status getState() {
8588
return this.status;
8689
}
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
/*
2+
* Copyright 2023 StreamThoughts.
3+
*
4+
* Licensed to the Apache Software Foundation (ASF) under one or more
5+
* contributor license agreements. See the NOTICE file distributed with
6+
* this work for additional information regarding copyright ownership.
7+
* The ASF licenses this file to You under the Apache License, Version 2.0
8+
* (the "License"); you may not use this file except in compliance with
9+
* the License. You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package io.streamthoughts.kafka.connect.filepulse.internal;
20+
21+
import org.apache.kafka.clients.admin.AdminClientConfig;
22+
import org.apache.kafka.clients.consumer.ConsumerConfig;
23+
import org.apache.kafka.clients.producer.ProducerConfig;
24+
import org.junit.jupiter.api.Assertions;
25+
import org.junit.jupiter.api.Test;
26+
27+
import java.util.Map;
28+
29+
class KafkaUtilsTest {
30+
31+
32+
@Test
33+
void should_return_producer_client_props() {
34+
Map<String, String> configs = Map.of(
35+
"foo", "???",
36+
"bar", "???",
37+
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "???",
38+
ProducerConfig.ACKS_CONFIG, "???"
39+
);
40+
Map<String, Object> producerConfigs = KafkaUtils.getProducerConfigs(configs);
41+
Assertions.assertEquals(2, producerConfigs.size());
42+
Assertions.assertNotNull(producerConfigs.get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));
43+
Assertions.assertNotNull(producerConfigs.get(ProducerConfig.ACKS_CONFIG));
44+
}
45+
46+
@Test
47+
void should_return_consumer_client_props() {
48+
Map<String, String> configs = Map.of(
49+
"foo", "???",
50+
"bar", "???",
51+
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "???",
52+
ConsumerConfig.GROUP_ID_CONFIG, "???"
53+
);
54+
Map<String, Object> consumerConfigs = KafkaUtils.getConsumerConfigs(configs);
55+
Assertions.assertEquals(2, consumerConfigs.size());
56+
Assertions.assertNotNull(consumerConfigs.get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG));
57+
Assertions.assertNotNull(consumerConfigs.get(ConsumerConfig.GROUP_ID_CONFIG));
58+
}
59+
60+
@Test
61+
void should_return_admin_client_props() {
62+
Map<String, String> configs = Map.of(
63+
"foo", "???",
64+
"bar", "???",
65+
AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "???"
66+
);
67+
Map<String, Object> clientConfigs = KafkaUtils.getAdminClientConfigs(configs);
68+
Assertions.assertEquals(1, clientConfigs.size());
69+
}
70+
}

connect-file-pulse-filesystems/filepulse-amazons3-fs/pom.xml

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,9 +33,27 @@
3333

3434
<properties>
3535
<checkstyle.config.location>${project.parent.basedir}/..</checkstyle.config.location>
36-
<aws-java-sdk.version>1.12.428</aws-java-sdk.version>
36+
<aws.version>1.12.428</aws.version>
37+
<aws.msk.version>1.1.6</aws.msk.version>
3738
</properties>
3839

40+
<dependencyManagement>
41+
<dependencies>
42+
<dependency>
43+
<groupId>com.amazonaws</groupId>
44+
<artifactId>aws-java-sdk-bom</artifactId>
45+
<version>${aws.version}</version>
46+
<type>pom</type>
47+
<scope>import</scope>
48+
</dependency>
49+
<dependency>
50+
<groupId>com.amazonaws</groupId>
51+
<artifactId>aws-java-sdk-sts</artifactId>
52+
<version>${aws.version}</version>
53+
</dependency>
54+
</dependencies>
55+
</dependencyManagement>
56+
3957
<dependencies>
4058
<dependency>
4159
<groupId>io.streamthoughts</groupId>
@@ -53,12 +71,16 @@
5371
<dependency>
5472
<groupId>com.amazonaws</groupId>
5573
<artifactId>aws-java-sdk-s3</artifactId>
56-
<version>${aws-java-sdk.version}</version>
5774
</dependency>
5875
<dependency>
5976
<groupId>com.amazonaws</groupId>
6077
<artifactId>aws-java-sdk-sts</artifactId>
61-
<version>${aws-java-sdk.version}</version>
78+
</dependency>
79+
<!-- Add AWS MSK IAM support -->
80+
<dependency>
81+
<groupId>software.amazon.msk</groupId>
82+
<artifactId>aws-msk-iam-auth</artifactId>
83+
<version>${aws.msk.version}</version>
6284
</dependency>
6385
<dependency>
6486
<groupId>org.apache.avro</groupId>

connect-file-pulse-plugin/src/main/java/io/streamthoughts/kafka/connect/filepulse/state/KafkaFileObjectStateBackingStore.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,12 +57,13 @@ public void configure(final Map<String, ?> props) {
5757
config.getTaskStorageTopic(),
5858
KEY_PREFIX,
5959
config.getTaskStorageName(),
60-
config.getTaskStorageConfigs(),
60+
config.getProducerTaskStorageConfigs(),
61+
config.getConsumerTaskStorageConfigs(),
6162
new FileObjectSerde(),
6263
config.getTaskStorageConsumerEnabled()
6364
);
6465

65-
try (AdminClient client = AdminClient.create(config.getTaskStorageConfigs())) {
66+
try (AdminClient client = AdminClient.create(config.getAdminClientTaskStorageConfigs())) {
6667
Map<String, String> topicConfig = new HashMap<>();
6768
topicConfig.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT);
6869
final NewTopic newTopic = new NewTopic(

connect-file-pulse-plugin/src/main/java/io/streamthoughts/kafka/connect/filepulse/state/KafkaFileObjectStateBackingStoreConfig.java

Lines changed: 27 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
*/
1919
package io.streamthoughts.kafka.connect.filepulse.state;
2020

21+
import io.streamthoughts.kafka.connect.filepulse.internal.KafkaUtils;
2122
import org.apache.kafka.clients.CommonClientConfigs;
2223
import org.apache.kafka.clients.producer.ProducerConfig;
2324
import org.apache.kafka.common.config.AbstractConfig;
@@ -70,24 +71,47 @@ public String getTaskStorageName() {
7071
return this.getString(TASKS_FILE_STATUS_STORAGE_NAME_CONFIG);
7172
}
7273

73-
public Map<String, Object> getTaskStorageConfigs() {
74+
public Map<String, Object> getConsumerTaskStorageConfigs() {
7475
final Map<String, Object> configs = new HashMap<>();
7576
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, getInternalBootstrapServers());
7677
configs.putAll(getInternalKafkaConsumerConfigs());
78+
return configs;
79+
}
80+
81+
public Map<String, Object> getProducerTaskStorageConfigs() {
82+
final Map<String, Object> configs = new HashMap<>();
83+
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, getInternalBootstrapServers());
7784
configs.putAll(getInternalKafkaProducerConfigs());
7885
return configs;
7986
}
8087

88+
public Map<String, Object> getAdminClientTaskStorageConfigs() {
89+
final Map<String, Object> configs = new HashMap<>();
90+
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, getInternalBootstrapServers());
91+
configs.putAll(getInternalKafkaAdminClientConfigs());
92+
return configs;
93+
}
94+
8195
private String getInternalBootstrapServers() {
8296
return this.getString(TASKS_FILE_STATUS_STORAGE_BOOTSTRAP_SERVERS_CONFIG);
8397
}
8498

99+
private Map<String, Object> getInternalKafkaAdminClientConfigs() {
100+
Map<String, Object> consumerConfigs = KafkaUtils.getAdminClientConfigs(originals());
101+
consumerConfigs.putAll(originalsWithPrefix("tasks.file.status.storage.admin."));
102+
return consumerConfigs;
103+
}
104+
85105
private Map<String, Object> getInternalKafkaConsumerConfigs() {
86-
return this.originalsWithPrefix("tasks.file.status.storage.consumer.");
106+
Map<String, Object> consumerConfigs = KafkaUtils.getConsumerConfigs(originals());
107+
consumerConfigs.putAll(originalsWithPrefix("tasks.file.status.storage.consumer."));
108+
return consumerConfigs;
87109
}
88110

89111
private Map<String, Object> getInternalKafkaProducerConfigs() {
90-
return this.originalsWithPrefix("tasks.file.status.storage.producer.");
112+
Map<String, Object> producerConfigs = KafkaUtils.getProducerConfigs(originals());
113+
producerConfigs.putAll(originalsWithPrefix("tasks.file.status.storage.producer."));
114+
return producerConfigs;
91115
}
92116

93117
Optional<Integer> getTopicPartitions() {

0 commit comments

Comments
 (0)