Skip to content

Commit 0899d2b

Browse files
steverenschmalliso
authored andcommitted
[Kafka] DOCSP-10640: [Kafka] Users can implement their own WriteModelStrategy
1 parent 44b02a2 commit 0899d2b

File tree

1 file changed

+41
-0
lines changed

1 file changed

+41
-0
lines changed

source/kafka-sink-postprocessors.txt

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -700,6 +700,47 @@ create your own. A custom write strategy model is a Java class that implements
700700
``mongodb.writemodel.strategy`` :ref:`configuration setting
701701
<kafka-sink-properties>`.
702702

703+
To create a custom ``WriteModelStrategy``:
704+
705+
1. Create a class that implements the ``WriteModelStrategy`` interface
706+
and overrides the ``createWriteModel(SinkDocument)`` method.
707+
708+
#. Compile the class to a ``.class`` file or JAR.
709+
710+
#. Add the class to the Class Path / Plugin Path for Kafka workers.
711+
For more information about plugin paths, see the `Confluent documentation
712+
<https://docs.confluent.io/current/connect/managing/community.html>`__.
713+
714+
The following is an example of a custom write strategy. It extracts the
715+
value of ``fullDocument`` from the value document and returns a
716+
``ReplaceOne`` operation.
717+
718+
.. code-block:: java
719+
720+
/**
721+
* A custom write model strategy
722+
*
723+
* This example takes the 'fullDocument' field from a change stream and creates a
724+
* ReplaceOne operation.
725+
*/
726+
727+
public class CustomWriteModelStrategy implements WriteModelStrategy {
728+
729+
private static String ID = "_id";
730+
@Override
731+
public WriteModel<BsonDocument> createWriteModel(final SinkDocument document) {
732+
BsonDocument changeStreamDocument = document.getValueDoc()
733+
.orElseThrow(() -> new DataException("Missing value document"));
734+
735+
BsonDocument fullDocument = changeStreamDocument.getDocument("fullDocument", new BsonDocument());
736+
if (fullDocument.isEmpty()) {
737+
return null; // Return null to indicate no op.
738+
}
739+
740+
return new ReplaceOneModel<>(Filters.eq(ID, fullDocument.get(ID)), fullDocument);
741+
}
742+
}
743+
703744
.. _writemodel-strategy-business-key:
704745

705746
WriteModel Strategy: Business Keys

0 commit comments

Comments
 (0)