Skip to content

Commit 65fda3d

Browse files
biniona-mongodbschmalliso
authored andcommitted
(DOSCP-15812) Copy Existing (#166)
1 parent b69ce54 commit 65fda3d

File tree

7 files changed

+147
-3
lines changed

7 files changed

+147
-3
lines changed
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
{
2+
"_id": { "_id": 1, "copyingData": true },
3+
"operationType": "insert",
4+
"documentKey": { "_id": 1 },
5+
"fullDocument": {
6+
"_id": 1,
7+
"country": "Mexico",
8+
"purchases": 2,
9+
"last_viewed": { "$date": "2021-10-31T20:30:00.245Z" }
10+
},
11+
"ns": { "db": "shopping", "coll": "customers" }
12+
}
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
connector.class=com.mongodb.kafka.connect.MongoSinkConnector
2+
connection.uri=<your development MongoDB connection uri>
3+
database=dev_shopping
4+
collection=dev_customers
5+
topics=shopping.customers
6+
change.data.capture.handler=com.mongodb.kafka.connect.sink.cdc.mongodb.ChangeStreamHandler
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
connector.class=com.mongodb.kafka.connect.MongoSourceConnector
2+
connection.uri=<your production MongoDB connection uri>
3+
database=shopping
4+
collection=customers
5+
copy.existing=true
6+
copy.existing.pipeline=[{ "$match": { "country": "Mexico" } }]

source/sink-connector/fundamentals/change-data-capture.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
.. _sink-fundamentals-cdc-handler:
2+
13
============================
24
Change Data Capture Handlers
35
============================

source/source-connector/fundamentals/change-streams.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ more of the following ways:
5151
- Trim the amount of data generated by the change stream
5252

5353
For a list of aggregation operators you can use with a change stream, see
54-
the guide on :manual:`Modify Change Stream Output <changeStreams/#modify-change-stream-output>`
54+
the guide on :manual:`Modify Change Stream Output </changeStreams/#modify-change-stream-output>`
5555
in the MongoDB manual.
5656

5757
.. _source-connector-fundamentals-change-event:
Lines changed: 119 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,123 @@
1+
.. _source-usage-example-copy-existing-data:
2+
13
==================
24
Copy Existing Data
35
==================
46

5-
TODO:
7+
.. default-domain:: mongodb
8+
9+
This usage example demonstrates how to copy data from a MongoDB collection to an
10+
{+ak+} topic using the {+mkc+}.
11+
12+
Example
13+
-------
14+
15+
Suppose you need to copy a MongoDB collection to {+ak+} and filter some of the data.
16+
17+
Your requirements and your solutions are as follows:
18+
19+
.. list-table::
20+
:header-rows: 1
21+
:widths: 50 50
22+
23+
* - Requirement
24+
- Solution
25+
26+
* - Copy the ``customers`` collection of the ``shopping`` database in your
27+
MongoDB deployment onto an {+ak+} topic.
28+
- | See the :ref:`<source-usage-example-copy-existing-data-copy-data>` section of this guide.
29+
30+
* - Only copy documents that have the value "Mexico" in the ``country`` field.
31+
- | See the :ref:`<source-usage-example-copy-existing-data-mask-data>` section of this guide.
32+
33+
The ``customers`` collection contains the following documents:
34+
35+
.. _usage-example-copy-sample-document:
36+
37+
.. code-block:: json
38+
:copyable: false
39+
40+
{
41+
"_id": 1,
42+
"country": "Mexico",
43+
"purchases": 2,
44+
"last_viewed": { "$date": "2021-10-31T20:30:00.245Z" }
45+
}
46+
{
47+
"_id": 2,
48+
"country": "Iceland",
49+
"purchases": 8,
50+
"last_viewed": { "$date": "2015-07-20T10:00:00.135Z" }
51+
}
52+
53+
.. _source-usage-example-copy-existing-data-copy-data:
54+
55+
Copy Data
56+
~~~~~~~~~
57+
58+
Copy the contents of the ``customers`` collection of the ``shopping`` database by
59+
specifying the following configuration options in your source connector:
60+
61+
.. code-block:: properties
62+
63+
database=shopping
64+
collection=customers
65+
copy.existing=true
66+
67+
Your source connector copies your collection by creating change event documents
68+
that describe inserting each document into your collection.
69+
70+
To learn more about change event documents, see the
71+
:ref:`Change Streams <source-connector-fundamentals-change-event>` guide.
72+
73+
To learn more about the ``copy.existing`` option, see
74+
:ref:`<source-configuration-copy-existing>` in the {+mkc+}.
75+
76+
.. _source-usage-example-copy-existing-data-mask-data:
77+
78+
Filter Data
79+
~~~~~~~~~~~
80+
81+
You can filter data by specifying an aggregation pipeline in the
82+
``copy.existing.pipeline`` option of your source connector configuration. The
83+
following configuration specifies an aggregation pipeline that matches all
84+
documents with "Mexico" in the ``country`` field:
85+
86+
.. code-block:: properties
87+
88+
copy.existing.pipeline=[{ "$match": { "country": "Mexico" } }]
89+
90+
To learn more about the ``copy.existing.pipeline`` option, see
91+
:ref:`<source-configuration-copy-existing>` in the {+mkc+}.
92+
93+
To learn more about aggregation pipelines, see the following resources:
94+
95+
- :ref:`<source-usage-example-custom-pipeline>` Usage Example
96+
- :manual:`Aggregation </aggregation>` in the MongoDB manual.
97+
98+
99+
Copy Data Configuration
100+
~~~~~~~~~~~~~~~~~~~~~~~
101+
102+
Your source connector configuration to copy the ``customers`` collection should
103+
look like this:
104+
105+
.. literalinclude:: /includes/usage-examples/copy/source.properties
106+
:language: properties
107+
:emphasize-lines: 5,6
108+
109+
Once your connector copies your data, you see the following change event
110+
document corresponding to the
111+
:ref:`preceding sample collection <usage-example-copy-sample-document>`
112+
in the ``shopping.customers`` {+ak+} topic:
113+
114+
.. literalinclude:: /includes/usage-examples/copy/payload.json
115+
:language: json
116+
:copyable: false
117+
:emphasize-lines: 6,7
118+
119+
.. note:: Write the Data in your Topic into a Collection
120+
121+
Use a change data capture handler to convert change event documents in an
122+
{+ak+} topic into MongoDB write operations. To learn more, see the
123+
:ref:`Change Data Capture Handlers <sink-fundamentals-cdc-handler>` guide.

source/source-connector/usage-examples/custom-pipeline.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ For more information, see the MongoDB Server manual entry on
2121
Example
2222
-------
2323

24-
Suppose you're an event coordinator needs to collect names and arrival times
24+
Suppose you're an event coordinator who needs to collect names and arrival times
2525
of each guest at a specific event. Whenever a guest checks into the event,
2626
an application inserts a new document that contains the following details:
2727

0 commit comments

Comments
 (0)