1
+ .. _kafka-tutorial-replicate-with-cdc:
2
+
1
3
=================================================
2
4
Replicate Data with a Change Data Capture Handler
3
5
=================================================
@@ -13,33 +15,358 @@ Replicate Data with a Change Data Capture Handler
13
15
Overview
14
16
--------
15
17
16
- Learn how to use a **change data capture (CDC) handler** to replicate data with the
17
- {+mkc+}. A CDC handler is a program that translates CDC events into MongoDB
18
- write operations. Use a CDC handler when you need to reproduce the changes in
19
- one datastore into another datastore.
18
+ Follow this tutorial to learn how to use a
19
+ **change data capture (CDC) handler** to replicate data with the {+mkc+}.
20
+ A CDC handler is an application that translates CDC events into MongoDB
21
+ write operations. Use a CDC handler when you need to reproduce the changes
22
+ in one datastore into another datastore.
20
23
21
- In this tutorial, you use a CDC handler to make two MongoDB collections
22
- contain the same documents.
24
+ In this tutorial, you configure and run MongoDB Kafka source and sink
25
+ connectors to make two MongoDB collections contain the same documents using
26
+ CDC. The source connector writes change stream data from the original
27
+ collection to a Kafka topic and the sink connector writes the Kafka topic
28
+ data to the target MongoDB collection.
23
29
24
- If you want to learn more about how CDC handlers work rather than view
25
- a tutorial that demonstrates how you use them, see the
30
+ If you want to learn more about how CDC handlers work, see the
26
31
:ref:`<sink-fundamentals-cdc-handler>` guide.
27
32
28
- Requirements
29
- ~~~~~~~~~~~~
33
+ .. include:: /includes/tutorials/setup.rst
30
34
31
- .. include:: /includes/tutorials/pipeline-requirements.rst
35
+ Replicate Data with a CDC Handler
36
+ ---------------------------------
32
37
33
- Tutorial
34
- --------
38
+ .. procedure::
39
+ :style: connected
40
+
41
+ .. step:: Start Interactive Shells
42
+
43
+ Start two interactive shells on the Docker container in separate
44
+ windows. In the tutorial, you can use the shells to run and observe
45
+ different tasks.
46
+
47
+ Run the following command in the terminal to start an interactive shell
48
+ called **Shell1** in one terminal window:
49
+
50
+ .. code-block:: bash
51
+ :caption: This command starts an interactive shell called Shell1
52
+
53
+ docker run --rm --name Shell1 --network mongodb-kafka-base_localnet -it robwma/mongokafkatutorial:latest bash
54
+
55
+ Run the following command in the terminal to start an interactive shell
56
+ called **Shell2** in the other terminal window:
57
+
58
+ .. code-block:: bash
59
+ :caption: This command starts an interactive shell called Shell2
60
+
61
+ docker run --rm --name Shell2 --network mongodb-kafka-base_localnet -it robwma/mongokafkatutorial:latest bash
62
+
63
+ Arrange the two windows on your screen to keep both of them visible to
64
+ see real-time updates.
65
+
66
+ Use **Shell1** to configure your connectors and monitor your Kafka
67
+ topic. Use **Shell2** to perform write operations in MongoDB.
68
+
69
+ .. step:: Configure the Source Connector
70
+
71
+ In **Shell1**, configure a source connector to read from the
72
+ ``CDCTutorial.Source`` MongoDB namespace and write to the
73
+ ``CDCTutorial.Source`` Kafka topic.
74
+
75
+ Create a configuration file called ``cdc-source.json`` using the
76
+ following command:
77
+
78
+ .. code-block:: bash
79
+
80
+ nano cdc-source.json
81
+
82
+ Paste the following configuration information into the file and save
83
+ your changes:
84
+
85
+ .. code-block:: json
86
+
87
+ {
88
+ "name": "mongo-cdc-source",
89
+ "config": {
90
+ "connector.class": "com.mongodb.kafka.connect.MongoSourceConnector",
91
+ "connection.uri": "mongodb://mongo1",
92
+ "database": "CDCTutorial",
93
+ "collection": "Source"
94
+ }
95
+ }
96
+
97
+ Run the following command in **Shell1** to start the source connector
98
+ using the configuration file you created:
99
+
100
+ .. code-block:: bash
101
+
102
+ cx cdc-source.json
103
+
104
+ .. note::
105
+
106
+ The ``cx`` command is a custom script included in the tutorial
107
+ development environment. This script runs the following
108
+ equivalent request to the Kafka Connect REST API to create a new
109
+ connector:
110
+
111
+ .. code-block:: bash
112
+
113
+ curl -X POST -H "Content-Type: application/json" -d @cdc-source.json http://connect:8083/connectors -w "\n"
114
+
115
+ Run the following command in the shell to check the status of the
116
+ connectors:
117
+
118
+ .. code-block:: bash
119
+
120
+ status
121
+
122
+ If your source connector started successfully, you should see the
123
+ following output:
124
+
125
+ .. code-block:: none
126
+ :copyable: false
127
+
128
+ Kafka topics:
129
+ ...
130
+ The status of the connectors:
131
+
132
+ source | mongo-cdc-source | RUNNING | RUNNING | com.mongodb.kafka.connect.MongoSourceConnector
133
+
134
+ Currently configured connectors
135
+
136
+ [
137
+ "mongo-cdc-source"
138
+ ]
139
+ ...
140
+
141
+ .. step:: Configure the Sink Connector
142
+
143
+ In **Shell1**, configure a sink connector to copy data from the
144
+ ``CDCTutorial.Source`` Kafka topic to ``CDCTutorial.Destination``
145
+ MongoDB namespace.
146
+
147
+ Create a configuration file called ``cdc-sink.json`` using the
148
+ following command:
149
+
150
+ .. code-block:: bash
151
+
152
+ nano cdc-sink.json
153
+
154
+ Paste the following configuration information into the file and save
155
+ your changes:
156
+
157
+ .. code-block:: json
158
+
159
+ {
160
+ "name": "mongo-cdc-sink",
161
+ "config": {
162
+ "connector.class": "com.mongodb.kafka.connect.MongoSinkConnector",
163
+ "topics": "CDCTutorial.Source",
164
+ "change.data.capture.handler": "com.mongodb.kafka.connect.sink.cdc.mongodb.ChangeStreamHandler",
165
+ "connection.uri": "mongodb://mongo1",
166
+ "database": "CDCTutorial",
167
+ "collection": "Destination"
168
+ }
169
+ }
170
+
171
+ Run the following command in the shell to start the sink connector
172
+ using the configuration file you created:
173
+
174
+ .. code-block:: bash
175
+
176
+ cx cdc-sink.json
177
+
178
+ Run the following command in the shell to check the status of the
179
+ connectors:
180
+
181
+ .. code-block:: bash
182
+
183
+ status
184
+
185
+ If your sink connector started successfully, you should see the
186
+ following output:
187
+
188
+ .. code-block:: none
189
+ :copyable: false
190
+
191
+ Kafka topics:
192
+ ...
193
+ The status of the connectors:
194
+
195
+ sink | mongo-cdc-sink | RUNNING | RUNNING | com.mongodb.kafka.connect.MongoSinkConnector
196
+ source | mongo-cdc-source | RUNNING | RUNNING | com.mongodb.kafka.connect.MongoSourceConnector
197
+
198
+ Currently configured connectors
199
+
200
+ [
201
+ "mongo-cdc-sink"
202
+ "mongo-cdc-source"
203
+ ]
204
+ ...
205
+
206
+ .. step:: Monitor the Kafka Topic
207
+
208
+ In **Shell1**, monitor the Kafka topic for incoming events. Run the
209
+ following command to start the ``kafkacat`` application which outputs
210
+ data published to the topic:
211
+
212
+ .. code-block:: bash
213
+
214
+ kc CDCTutorial.Source
215
+
216
+ .. note::
217
+
218
+ The ``kc`` command is a custom script included in the tutorial
219
+ development environment that calls the ``kafkacat`` application
220
+ with options to connect to Kafka and format the output of the
221
+ specified topic.
222
+
223
+ Once started, you should see the following output that indicates there
224
+ is currently no data to read:
225
+
226
+ .. code-block:: none
227
+ :copyable: false
228
+
229
+ % Reached end of topic CDCTutorial.Source [0] at offset 0
230
+
231
+ .. step:: Write Data into the Source and Watch the Data Flow
232
+
233
+ In **Shell2**, connect to MongoDB using ``mongosh``, the MongoDB
234
+ shell by running the following command:
235
+
236
+ .. code-block:: bash
237
+
238
+ mongosh "mongodb://mongo1"
239
+
240
+ After you connect successfully, you should see the following
241
+ MongoDB shell prompt:
242
+
243
+ .. code-block::
244
+ :copyable: false
245
+
246
+ rs0 [direct: primary] test>
247
+
248
+ At the prompt, type the following commands to insert a new document
249
+ into the ``CDCTutorial.Source`` MongoDB namespace:
250
+
251
+ .. code-block:: json
252
+
253
+ use CDCTutorial
254
+ db.Source.insert({ proclaim: "Hello World!" });
255
+
256
+ Once MongoDB completes the insert command, you should receive an
257
+ acknowledgment that resembles the following text:
258
+
259
+ .. code-block:: json
260
+ :copyable: false
261
+
262
+ {
263
+ acknowledged: true,
264
+ insertedId: ObjectId("600b38ad...")
265
+ }
266
+
267
+ The source connector picks up the change and publishes it to the
268
+ Kafka topic. You should see the following topic message in your
269
+ **Shell1** window:
270
+
271
+ .. code-block:: json
272
+ :copyable: false
273
+
274
+ {
275
+ "schema": { "type": "string", "optional": false },
276
+ "payload": {
277
+ "_id": { "_data": "8260..." },
278
+ "operationType": "insert",
279
+ "clusterTime": { "$timestamp": { "t": 1611..., "i": 2 } },
280
+ "fullDocument": {
281
+ "_id": { "$oid": "600b38ad..." },
282
+ "proclaim": "Hello World!"
283
+ },
284
+ "ns": { "db": "CDCTutorial", "coll": "Source" },
285
+ "documentKey": { "_id": { "$oid": "600b38a..." } }
286
+ }
287
+ }
288
+
289
+ The sink connector picks up the Kafka message and sinks the data
290
+ into MongoDB. You can retrieve the document from the
291
+ ``CDCTutorial.Destination`` namespace in MongoDB by running the
292
+ following command in the MongoDB shell you started in **Shell2**:
293
+
294
+ .. code-block:: json
295
+
296
+ db.Destination.find()
297
+
298
+ You should see the following document returned as the result:
299
+
300
+ .. code-block:: json
301
+ :copyable: false
302
+
303
+ [
304
+ {
305
+ _id: ObjectId("600b38a..."),
306
+ proclaim: 'Hello World'
307
+ }
308
+ ]
309
+
310
+ .. step:: (Optional) Generate Additional Changes
311
+
312
+ Try removing documents from the ``CDCTutorial.Source`` namespace
313
+ by running the following command from the MongoDB shell:
314
+
315
+ .. code-block:: json
316
+
317
+ db.Source.deleteMany({})
318
+
319
+ You should see the following topic message in your **Shell1**
320
+ window:
321
+
322
+ .. code-block:: json
323
+ :copyable: false
324
+
325
+ {
326
+ "schema": { "type": "string", "optional": false },
327
+ "payload": {
328
+ "_id": { "_data": "8261...." },
329
+ ...
330
+ "operationType": "delete",
331
+ "clusterTime": { "$timestamp": { "t": 1631108282, "i": 1 } },
332
+ "ns": { "db": "CDCTutorial", "coll": "Source" },
333
+ "documentKey": { "_id": { "$oid": "6138..." } }
334
+ }
335
+ }
336
+
337
+ Run the following command to retrieve the current number of documents
338
+ in the collection:
339
+
340
+ .. code-block:: json
341
+
342
+ db.Destination.count()
343
+
344
+ This returns the following output, indicating the collection is empty:
345
+
346
+ .. code-block:: none
347
+ :copyable: false
348
+
349
+ 0
350
+
351
+ Run the following command to exit the MongoDB shell:
352
+
353
+ .. code-block:: none
354
+
355
+ exit
356
+
357
+ Summary
358
+ -------
35
359
36
- .. include:: /includes/steps/cdc-tutorial.rst
360
+ In this tutorial, you set up a source connector to capture changes to a
361
+ MongoDB collection and send them to Apache Kafka. You also configured a
362
+ sink connector with a MongoDB CDC Handler to move the data from Apache
363
+ Kafka to a MongoDB collection.
37
364
38
- Further Reading
39
- ---------------
365
+ Learn More
366
+ ----------
40
367
41
- To learn more about the topics discussed in this tutorial, see the
42
- following {+mkc+} guides :
368
+ Read the following resources to learn more about concepts mentioned in
369
+ this tutorial :
43
370
44
371
- :ref:`<sink-fundamentals-cdc-handler>`
45
372
- :ref:`<kafka-source-change-streams>`
0 commit comments