Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,17 @@

## Changelog

## 1.13.0

### Improvements
- [KAFKA-404](https://jira.mongodb.org/browse/KAFKA-404) Support for extending MongoClient to allow for users to add custom auth such as AWS IAM / Assume Role.

## 1.12.0

### Improvements
- [KAFKA-374](https://jira.mongodb.org/browse/KAFKA-374) Implement an error handler to address specific scenarios.


## 1.11.2

### Bug Fixes
Expand Down
137 changes: 137 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,3 +76,140 @@ A couple of manual configuration steps are required to run the code in IntelliJ:
- Run the `compileBuildConfig` task: eg: `./gradlew compileBuildConfig` or via Gradle > mongo-kafka > Tasks > other > compileBuildConfig
- Set `compileBuildConfig` to execute Before Build. via Gradle > Tasks > other > right click compileBuildConfig - click on "Execute Before Build"
- Delegate all build actions to Gradle: Settings > Build, Execution, Deployment > Build Tools > Gradle > Runner - tick "Delegate IDE build/run actions to gradle"

## Custom Auth Provider Interface

The `com.mongodb.kafka.connect.util.custom.credentials.CustomCredentialProvider` interface can be implemented to provide an object of type `com.mongodb.MongoCredential` which gets wrapped in the MongoClient that is constructed for the sink and source connector.
The following properties need to be set -

```
mongo.custom.auth.mechanism.enable - set to true.
mongo.custom.auth.mechanism.providerClass - qualified class name of the implementation class
```
Additional properties and can be set as required within the implementation class.
The init and validate methods of the implementation class get called when the connector initializes.

### Example
When using MONGODB-AWS authentication mechanism for atlas, one can specify the following configuration -

```
"connection.uri": "mongodb+srv://<sever>/?authMechanism=MONGODB-AWS"
"mongo.custom.auth.mechanism.enable": true,
"mongo.custom.auth.mechanism.providerClass": "sample.AwsAssumeRoleCredentialProvider"
"mongodbaws.auth.mechanism.roleArn": "arn:aws:iam::<ACCOUNTID>:role/<ROLENAME>"
```
Here the `sample.AwsAssumeRoleCredentialProvider` must be available on the classpath. `mongodbaws.auth.mechanism.roleArn` is an example of custom properties that can be read by `sample.AwsAssumeRoleCredentialProvider`.

### Sample code for implementing Custom role provider
Here is sample code that can work.

```java
public class AwsAssumeRoleCredentialProvider implements CustomCredentialProvider {

public AwsAssumeRoleCredentialProvider() {}
@Override
public MongoCredential getCustomCredential(Map<?, ?> map) {
AWSCredentialsProvider provider = new DefaultAWSCredentialsProviderChain();
Supplier<AwsCredential> awsFreshCredentialSupplier = () -> {
AWSSecurityTokenService stsClient = AWSSecurityTokenServiceAsyncClientBuilder.standard()
.withCredentials(provider)
.withRegion("us-east-1")
.build();
AssumeRoleRequest assumeRoleRequest = new AssumeRoleRequest().withDurationSeconds(3600)
.withRoleArn((String)map.get("mongodbaws.auth.mechanism.roleArn"))
.withRoleSessionName("Test_Session");
AssumeRoleResult assumeRoleResult = stsClient.assumeRole(assumeRoleRequest);
Credentials creds = assumeRoleResult.getCredentials();
// Add your code to fetch new credentials
return new AwsCredential(creds.getAccessKeyId(), creds.getSecretAccessKey(), creds.getSessionToken());
};
return MongoCredential.createAwsCredential(null, null)
.withMechanismProperty(MongoCredential.AWS_CREDENTIAL_PROVIDER_KEY, awsFreshCredentialSupplier);
}

@Override
public void validate(Map<?, ?> map) {
String roleArn = (String) map.get("mongodbaws.auth.mechanism.roleArn");
if (StringUtils.isNullOrEmpty(roleArn)) {
throw new RuntimeException("Invalid value set for customProperty");
}
}

@Override
public void init(Map<?, ?> map) {

}
}
```
### pom file to build the sample CustomRoleProvider into a jar
Here is the pom.xml that can build the complete jar containing the AwsAssumeRoleCredentialProvider

```java
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>sample</groupId>
<artifactId>AwsAssumeRoleCredentialProvider</artifactId>
<version>1.0-SNAPSHOT</version>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.5.3</version>
<configuration>
<!-- put your configurations here -->
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>

<dependencies>
<!-- Java MongoDB Driver dependency -->
<!-- https://mvnrepository.com/artifact/org.mongodb/mongodb-driver-sync -->
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>mongodb-driver-sync</artifactId>
<version>5.1.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.amazonaws/aws-java-sdk -->
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk</artifactId>
<version>1.12.723</version>
</dependency>

<!-- slf4j logging dependency, required for logging output from the MongoDB Java Driver -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-jdk14</artifactId>
<version>1.7.28</version>
</dependency>

<dependency>
<groupId>kafka-connect</groupId>
<artifactId>kafka-connect</artifactId>
<scope>system</scope>
<version>1.12.1-SNAPSHOT</version>
<systemPath>/Users/jagadish.nallapaneni/mongo-kafka/build/libs/mongo-kafka-connect-1.12.1-SNAPSHOT-confluent.jar</systemPath>
</dependency>
</dependencies>

<properties>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>

</project>
```
4 changes: 2 additions & 2 deletions build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ plugins {
}

group = "org.mongodb.kafka"
version = "1.12.1-SNAPSHOT"
version = "1.13.0"
description = "The official MongoDB Apache Kafka Connect Connector."

repositories {
Expand All @@ -49,7 +49,7 @@ repositories {
}

extra.apply {
set("mongodbDriverVersion", "[4.7,4.7.99)")
set("mongodbDriverVersion", "[4.7,4.7.99]")
set("kafkaVersion", "2.6.0")
set("avroVersion", "1.9.2")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import static com.mongodb.kafka.connect.util.ServerApiConfig.addServerApiConfig;
import static com.mongodb.kafka.connect.util.SslConfigs.addSslConfigDef;
import static com.mongodb.kafka.connect.util.Validators.errorCheckingPasswordValueValidator;
import static com.mongodb.kafka.connect.util.custom.credentials.CustomCredentialProviderConstants.CUSTOM_AUTH_ENABLE_CONFIG;
import static com.mongodb.kafka.connect.util.custom.credentials.CustomCredentialProviderGenericInitializer.initializeCustomProvider;
import static java.lang.String.format;
import static java.util.Collections.emptyList;
import static java.util.Collections.singletonList;
Expand All @@ -50,6 +52,7 @@

import com.mongodb.kafka.connect.MongoSinkConnector;
import com.mongodb.kafka.connect.util.Validators;
import com.mongodb.kafka.connect.util.custom.credentials.CustomCredentialProvider;

public class MongoSinkConfig extends AbstractConfig {
private static final String EMPTY_STRING = "";
Expand Down Expand Up @@ -100,6 +103,7 @@ public class MongoSinkConfig extends AbstractConfig {
private final Optional<Pattern> topicsRegex;
private Map<String, MongoSinkTopicConfig> topicSinkConnectorConfigMap;
private ConnectionString connectionString;
private CustomCredentialProvider customCredentialProvider;

public MongoSinkConfig(final Map<String, String> originals) {
super(CONFIG, originals, false);
Expand Down Expand Up @@ -146,6 +150,10 @@ public MongoSinkConfig(final Map<String, String> originals) {
}
});
}
// Initialize CustomCredentialProvider if mongo.custom.auth.mechanism.enable is set to true
if (Boolean.parseBoolean(originals.get(CUSTOM_AUTH_ENABLE_CONFIG))) {
customCredentialProvider = initializeCustomProvider(originals);
}
}

public static final ConfigDef CONFIG = createConfigDef();
Expand All @@ -157,6 +165,10 @@ static String createOverrideKey(final String topic, final String config) {
return format(TOPIC_OVERRIDE_CONFIG, topic, config);
}

public CustomCredentialProvider getCustomCredentialProvider() {
return customCredentialProvider;
}

public ConnectionString getConnectionString() {
return connectionString;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,10 @@ private static MongoClient createMongoClient(final MongoSinkConfig sinkConfig) {
MongoClientSettings.builder()
.applyConnectionString(sinkConfig.getConnectionString())
.applyToSslSettings(sslBuilder -> setupSsl(sslBuilder, sinkConfig));
if (sinkConfig.getCustomCredentialProvider() != null) {
builder.credential(
sinkConfig.getCustomCredentialProvider().getCustomCredential(sinkConfig.getOriginals()));
}
setServerApi(builder, sinkConfig);

return MongoClients.create(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
import static com.mongodb.kafka.connect.util.Validators.errorCheckingPasswordValueValidator;
import static com.mongodb.kafka.connect.util.Validators.errorCheckingValueValidator;
import static com.mongodb.kafka.connect.util.VisibleForTesting.AccessModifier.PACKAGE;
import static com.mongodb.kafka.connect.util.custom.credentials.CustomCredentialProviderConstants.CUSTOM_AUTH_ENABLE_CONFIG;
import static com.mongodb.kafka.connect.util.custom.credentials.CustomCredentialProviderGenericInitializer.initializeCustomProvider;
import static java.lang.String.format;
import static java.util.Arrays.asList;
import static java.util.Collections.emptyList;
Expand Down Expand Up @@ -78,6 +80,7 @@
import com.mongodb.kafka.connect.util.Validators;
import com.mongodb.kafka.connect.util.VisibleForTesting;
import com.mongodb.kafka.connect.util.config.BsonTimestampParser;
import com.mongodb.kafka.connect.util.custom.credentials.CustomCredentialProvider;

public class MongoSourceConfig extends AbstractConfig {

Expand Down Expand Up @@ -583,6 +586,11 @@ public class MongoSourceConfig extends AbstractConfig {
+ "connection details will be used.";

static final String PROVIDER_CONFIG = "provider";
private CustomCredentialProvider customCredentialProvider;

public CustomCredentialProvider getCustomCredentialProvider() {
return customCredentialProvider;
}

public static final ConfigDef CONFIG = createConfigDef();
private static final List<Consumer<MongoSourceConfig>> INITIALIZERS =
Expand Down Expand Up @@ -745,6 +753,9 @@ public String value() {

public MongoSourceConfig(final Map<?, ?> originals) {
this(originals, true);
if (Boolean.parseBoolean((String) originals.get(CUSTOM_AUTH_ENABLE_CONFIG))) {
customCredentialProvider = initializeCustomProvider(originals);
}
}

private MongoSourceConfig(final Map<?, ?> originals, final boolean validateAll) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,14 @@ public void commandFailed(final CommandFailedEvent event) {
.applyConnectionString(sourceConfig.getConnectionString())
.addCommandListener(statisticsCommandListener)
.applyToSslSettings(sslBuilder -> setupSsl(sslBuilder, sourceConfig));

if (sourceConfig.getCustomCredentialProvider() != null) {
builder.credential(
sourceConfig
.getCustomCredentialProvider()
.getCustomCredential(sourceConfig.originals()));
}

setServerApi(builder, sourceConfig);

mongoClient =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@
import com.mongodb.event.ClusterListener;
import com.mongodb.event.ClusterOpeningEvent;

import com.mongodb.kafka.connect.sink.MongoSinkConfig;
import com.mongodb.kafka.connect.source.MongoSourceConfig;
import com.mongodb.kafka.connect.util.custom.credentials.CustomCredentialProvider;

public final class ConnectionValidator {

private static final Logger LOGGER = LoggerFactory.getLogger(ConnectionValidator.class);
Expand Down Expand Up @@ -77,6 +81,18 @@ public static Optional<MongoClient> validateCanConnect(
new ConnectionString(((Password) configValue.value()).value());
MongoClientSettings.Builder mongoClientSettingsBuilder =
MongoClientSettings.builder().applyConnectionString(connectionString);
CustomCredentialProvider customCredentialProvider = null;
if (connectorProperties instanceof MongoSinkConfig) {
customCredentialProvider =
((MongoSinkConfig) connectorProperties).getCustomCredentialProvider();
} else if (connectorProperties instanceof MongoSourceConfig) {
customCredentialProvider =
((MongoSourceConfig) connectorProperties).getCustomCredentialProvider();
}
if (customCredentialProvider != null) {
mongoClientSettingsBuilder.credential(
customCredentialProvider.getCustomCredential(connectorProperties.originals()));
}
setServerApi(mongoClientSettingsBuilder, config);

MongoClientSettings mongoClientSettings =
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Copyright 2008-present MongoDB, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.mongodb.kafka.connect.util.custom.credentials;

import java.util.Map;

import com.mongodb.MongoCredential;

public interface CustomCredentialProvider {
MongoCredential getCustomCredential(Map<?, ?> configs);

void validate(Map<?, ?> configs);

void init(Map<?, ?> configs);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright 2008-present MongoDB, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.mongodb.kafka.connect.util.custom.credentials;

public final class CustomCredentialProviderConstants {
private CustomCredentialProviderConstants() {}

public static final String CUSTOM_AUTH_ENABLE_CONFIG = "mongo.custom.auth.mechanism.enable";

public static final String CUSTOM_AUTH_PROVIDER_CLASS =
"mongo.custom.auth.mechanism.providerClass";
}
Loading