diff --git a/src/integrationTest/java/com/mongodb/kafka/connect/MongoSinkConnectorIntegrationTest.java b/src/integrationTest/java/com/mongodb/kafka/connect/MongoSinkConnectorIntegrationTest.java index 624195184..f15b29d0e 100644 --- a/src/integrationTest/java/com/mongodb/kafka/connect/MongoSinkConnectorIntegrationTest.java +++ b/src/integrationTest/java/com/mongodb/kafka/connect/MongoSinkConnectorIntegrationTest.java @@ -49,10 +49,13 @@ import org.junit.jupiter.api.Test; import org.opentest4j.AssertionFailedError; +import com.mongodb.client.model.IndexOptions; +import com.mongodb.client.model.Indexes; import com.mongodb.client.model.Sorts; import com.mongodb.kafka.connect.avro.TweetMsg; import com.mongodb.kafka.connect.mongodb.MongoKafkaTestCase; +import com.mongodb.kafka.connect.sink.MongoSinkTopicConfig; import com.mongodb.kafka.connect.util.jmx.SinkTaskStatistics; class MongoSinkConnectorIntegrationTest extends MongoKafkaTestCase { @@ -226,6 +229,85 @@ private void assertProducesMessages( assertProducesMessages(topicName, collectionName, false, partitionCount); } + @Test + @DisplayName("Ignore MongoDB duplicate key error") + void testSinkSavesIgnoringMongoDBDuplicateKeyError() { + String topicName = getTopicName(); + KAFKA.createTopic(topicName); + + Properties sinkProperties = createSinkProperties(); + sinkProperties.put("topics", topicName); + sinkProperties.put(MongoSinkTopicConfig.BULK_WRITE_ORDERED_CONFIG, String.valueOf(false)); + KAFKA.addSinkConnector(sinkProperties); + + int partitionCount = 1; + String collectionName = getCollectionName(); + + Properties producerProps = new Properties(); + producerProps.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, topicName); + producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA.bootstrapServers()); + producerProps.put( + ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, + "io.confluent.kafka.serializers.KafkaAvroSerializer"); + producerProps.put( + ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, + "io.confluent.kafka.serializers.KafkaAvroSerializer"); + producerProps.put( + KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, KAFKA.schemaRegistryUrl()); + + MONGODB.getDatabase().createCollection(collectionName); + MONGODB + .getDatabase() + .getCollection(collectionName) + .createIndex(Indexes.ascending("text"), new IndexOptions().unique(true)); + + List tweets = + IntStream.range(0, 3) + .mapToObj( + i -> + TweetMsg.newBuilder() + .setId$1(i) + .setText( + format( + "test tweet %s end2end testing apache kafka <-> mongodb sink connector is fun!", + i)) + .setHashtags(asList(format("t%s", i), "kafka", "mongodb", "testing")) + .build()) + .collect(Collectors.toList()); + + try (KafkaProducer producer = new KafkaProducer<>(producerProps)) { + producer.initTransactions(); + producer.beginTransaction(); + producer.send( + new ProducerRecord<>( + topicName, RANDOM.nextInt(partitionCount), tweets.get(0).getId$1(), tweets.get(0))); + producer.send( + new ProducerRecord<>( + topicName, RANDOM.nextInt(partitionCount), tweets.get(0).getId$1(), tweets.get(0))); + producer.send( + new ProducerRecord<>( + topicName, RANDOM.nextInt(partitionCount), tweets.get(1).getId$1(), tweets.get(1))); + producer.commitTransaction(); + producer.flush(); + } + + assertEventuallyEquals( + 2L, () -> getCollection(collectionName).countDocuments(), collectionName); + + try (KafkaProducer producer = new KafkaProducer<>(producerProps)) { + producer.initTransactions(); + producer.beginTransaction(); + producer.send( + new ProducerRecord<>( + topicName, RANDOM.nextInt(partitionCount), tweets.get(2).getId$1(), tweets.get(2))); + producer.commitTransaction(); + producer.flush(); + } + + assertEventuallyEquals( + 3L, () -> getCollection(collectionName).countDocuments(), collectionName); + } + private void assertProducesMessages( final String topicName, final String collectionName, diff --git a/src/main/java/com/mongodb/kafka/connect/sink/StartedMongoSinkTask.java b/src/main/java/com/mongodb/kafka/connect/sink/StartedMongoSinkTask.java index decf5109f..5540b7c84 100644 --- a/src/main/java/com/mongodb/kafka/connect/sink/StartedMongoSinkTask.java +++ b/src/main/java/com/mongodb/kafka/connect/sink/StartedMongoSinkTask.java @@ -36,6 +36,7 @@ import com.mongodb.MongoBulkWriteException; import com.mongodb.MongoNamespace; +import com.mongodb.bulk.BulkWriteError; import com.mongodb.bulk.BulkWriteResult; import com.mongodb.client.MongoClient; import com.mongodb.client.model.BulkWriteOptions; @@ -58,6 +59,9 @@ final class StartedMongoSinkTask implements AutoCloseable { private final SinkTaskStatistics statistics; private final InnerOuterTimer inTaskPutInConnectFrameworkTimer; + private static final int DUPLICATE_KYE_ERROR_CODE = 11000; + // TODO move this flag to settings + private final boolean ignoreDuplicateKeyError = true; StartedMongoSinkTask( final MongoSinkConfig sinkConfig, @@ -218,6 +222,7 @@ private void handleTolerableWriteException( (MongoBulkWriteException) e, errorReporter, StartedMongoSinkTask::log); + List errors = ((MongoBulkWriteException) e).getWriteErrors(); if (logErrors) { LOGGER.error( "Failed to put into the sink some records, see log entries below for the details", e); @@ -225,6 +230,10 @@ private void handleTolerableWriteException( } if (tolerateErrors) { analyzedBatch.report(); + } else if (errors.size() == 1 + && errors.get(0).getCode() == DUPLICATE_KYE_ERROR_CODE + && ignoreDuplicateKeyError) { + LOGGER.error("Failed to put into the sink some records: Duplicate Key Error"); } else { throw new DataException(e); }