Skip to content

Commit af245ca

Browse files
Chris Choschmalliso
authored andcommitted
DOCSP-17834 sink timeseries cdc override (#128)
* DOCSP-17834: Sink Connector properties
1 parent 09df146 commit af245ca

File tree

7 files changed

+284
-6
lines changed

7 files changed

+284
-6
lines changed

source/sink-connector/configuration-properties.txt

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,16 @@ See the following categories for a list of related configuration properties:
5656
* - :doc:`Connector Post-processor Properties </sink-connector/configuration-properties/post-processors>`
5757
- Specify transformations of Kafka topic data.
5858

59+
* - :doc:`Topic Override Properties </sink-connector/configuration-properties/topic-override>`
60+
- Override how the connector processes data on specific Kafka topics.
61+
62+
* - :doc:`Change Data Capture Properties </sink-connector/configuration-properties/cdc>`
63+
- Specify how the connector captures CDC events from a Kafka topic.
64+
65+
* - :doc:`Time Series Properties </sink-connector/configuration-properties/time-series>`
66+
- Configure the connector to sink data to a MongoDB time series
67+
collection.
68+
5969
See the `Confluent Sink Connector documentation <https://docs.confluent.io/current/installation/configuration/connect/sink-connect-configs.html>`__
6070
for more information on these settings.
6171

@@ -69,4 +79,7 @@ for more information on these settings.
6979
Connector Error Handling </sink-connector/configuration-properties/error-handling>
7080
Id Strategy </sink-connector/configuration-properties/id-strategy>
7181
Post-processors <sink-connector/configuration-properties/post-processors>
82+
Topic Override <sink-connector/configuration-properties/topic-override>
83+
Change Data Capture <sink-connector/configuration-properties/cdc>
84+
Time Series <sink-connector/configuration-properties/time-series>
7285

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
==============================
2+
Change Data Capture Properties
3+
==============================
4+
5+
.. default-domain:: mongodb
6+
7+
.. contents:: On this page
8+
:local:
9+
:backlinks: none
10+
:depth: 2
11+
:class: singlecol
12+
13+
Overview
14+
--------
15+
16+
Use the following configuration settings to specify a class the sink
17+
connector uses to process change data capture (CDC) events.
18+
19+
See the guide on :doc:`Sink Connector Change Data Capture </sink-connector/fundamentals/change-data-capture>`
20+
for examples using the built-in ``ChangeStreamHandler`` and Debezium event
21+
producers.
22+
23+
.. include:: /includes/sink-config-link.rst
24+
25+
Settings
26+
--------
27+
28+
.. list-table::
29+
:header-rows: 1
30+
:widths: 25 75
31+
32+
* - Name
33+
- Description
34+
35+
* - | **change.data.capture.handler**
36+
- | **Type:** string
37+
|
38+
| **Description:**
39+
| The class name of the CDC handler to use for converting changes
40+
into event streams.
41+
|
42+
| **Default**: ``""``
43+
| **Accepted Values**: An empty string or a fully qualified Java
44+
class name
45+

source/sink-connector/configuration-properties/connector-message.txt

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,7 @@ the sink connector including the following:
2020
- Rate limits
2121
- Number of parallel tasks
2222

23-
For a list of categories of sink connector configuration settings, see our
24-
guide on :doc:`Sink Connector Configuration Properties </sink-connector/configuration-properties>`.
23+
.. include:: /includes/sink-config-link.rst
2524

2625
Settings
2726
--------

source/sink-connector/configuration-properties/error-handling.txt

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,7 @@ Overview
1616
Use the following configuration settings to specify how the sink connector
1717
handles errors and to configure the dead letter queue.
1818

19-
For a list of categories of sink connector configuration settings, see the
20-
guide on :doc:`Sink Connector Configuration Properties </sink-connector/configuration-properties>`.
19+
.. include:: /includes/sink-config-link.rst
2120

2221
Settings
2322
--------

source/sink-connector/configuration-properties/kafka-topic.txt

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,7 @@ Overview
1616
Use the following configuration settings to specify which Kafka topics the
1717
sink connector should watch for data.
1818

19-
For a list of categories of sink connector configuration settings, see the
20-
section on :doc:`Sink Connector Configuration Properties </sink-connector/configuration-properties>`.
19+
.. include:: /includes/sink-config-link.rst
2120

2221
Settings
2322
--------
Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
============================
2+
Kafka Time Series Properties
3+
============================
4+
5+
.. default-domain:: mongodb
6+
7+
.. contents:: On this page
8+
:local:
9+
:backlinks: none
10+
:depth: 2
11+
:class: singlecol
12+
13+
Overview
14+
--------
15+
16+
Use the following configuration settings to specify how the connector
17+
should sink data to a MongoDB time series collection.
18+
19+
.. include:: /includes/sink-config-link.rst
20+
21+
Settings
22+
--------
23+
24+
.. list-table::
25+
:header-rows: 1
26+
:widths: 45 55
27+
28+
* - Name
29+
- Description
30+
31+
* - | **timeseries.timefield**
32+
- | **Type:** string
33+
|
34+
| **Description:**
35+
| The name of the top-level field in the source data that contains time
36+
information that you want to associate with the new document in the
37+
time series collection.
38+
|
39+
| **Default**: ``""``
40+
| **Accepted Values**: An empty string or the name of a field
41+
that contains a BSON ``DateTime`` value
42+
43+
* - | **timeseries.timefield.auto.convert.date.format**
44+
- | **Type:** string
45+
|
46+
| **Description:**
47+
| The date format pattern the connector should use to convert the
48+
source data contained in the field specified by the
49+
``timeseries.timefield`` setting.
50+
The connector passes the date format pattern to the Java
51+
`DateTimeFormatter.ofPattern(pattern, locale) <https://docs.oracle.com/javase/8/docs/api/java/time/format/DateTimeFormatter.html#ofPattern-java.lang.String-java.util.Locale->`__
52+
method to perform date and time conversions on the time field.
53+
| If the date value from the source data only contains date information,
54+
the connector sets the time information to the start of the specified
55+
day. If the date value does not contain the timezone offset, the
56+
connector sets the offset to UTC.
57+
|
58+
| **Default**:
59+
60+
.. code-block:: none
61+
62+
yyyy-MM-dd[['T'][ ]][HH:mm:ss[[.][SSSSSS][SSS]][ ]VV[ ]'['VV']'][HH:mm:ss[[.][SSSSSS][SSS]][ ]X][HH:mm:ss[[.][SSSSSS][SSS]]]
63+
64+
| **Accepted Values**: A valid ``DateTimeFormatter`` format
65+
66+
* - | **timeseries.timefield.auto.convert**
67+
- | **Type:** boolean
68+
|
69+
| **Description:**
70+
| Whether to convert the data in the field into the BSON ``Date``
71+
format.
72+
| When set to ``true``, the connector uses the milliseconds
73+
after epoch and discards fractional parts if the value is
74+
a number. If the value is a string, the connector uses the
75+
setting in the ``timeseries.timefield.auto.convert.date.format``
76+
configuration to parse the date.
77+
| If the connector fails to convert the value, it sends the
78+
original value to the time series collection.
79+
|
80+
| **Default**: ``false``
81+
| **Accepted Values**: ``true`` or ``false``
82+
83+
* - | **timeseries.timefield.auto.convert.locale.language.tag**
84+
- | **Type:** string
85+
|
86+
| **Description:**
87+
| Which ``DateTimeFormatter`` locale language tag to use with the date
88+
format pattern (e.g. ``"en-US"``). For more information on
89+
locales, see the Java SE documentation of `Locale <https://docs.oracle.com/javase/8/docs/api/java/util/Locale.html>`__.
90+
|
91+
| **Default**: ``ROOT``
92+
| **Accepted Values**: A valid ``Locale`` language tag format
93+
94+
* - | **timeseries.metafield**
95+
- | **Type:** string
96+
|
97+
| **Description:**
98+
| Which top-level field to read from the source data to describe
99+
a group of related time series documents.
100+
101+
.. important::
102+
103+
This field must not be the ``_id`` field nor the field you specified
104+
in the ``timeseries.timefield`` setting.
105+
106+
| **Default**: ``""``
107+
| **Accepted Values**: An empty string or the name of a field
108+
that contains any BSON type except ``BsonArray``.
109+
110+
* - | **timeseries.expire.after.seconds**
111+
- | **Type:** int
112+
|
113+
| **Description:**
114+
| The number of seconds MongoDB should wait before automatically
115+
removing the time series collection data. The connector disables
116+
timed expiry when the setting value is less than ``1``.
117+
For more information on this collection setting, see the MongoDB
118+
Server Manual page on :manual:`Automatic Removal for Time Series Collections </core/timeseries/timeseries-automatic-removal/>`.
119+
|
120+
| **Default**: ``0``
121+
| **Accepted Values**: An integer
122+
123+
124+
* - | **timeseries.granularity**
125+
- | **Type:** string
126+
|
127+
| **Description:**
128+
| The expected interval between subsequent measurements of your
129+
source data. For more information on this setting, see the
130+
MongoDB Server Manual page on :manual:`Granularity for Time
131+
Series Data </core/timeseries/timeseries-granularity/>`.
132+
|
133+
| *Optional*
134+
| **Default**: ``""``
135+
| **Accepted Values**: ``""``, ``"seconds"``, ``"minutes"``, ``"hours"``
136+
137+
For an example on how to convert an existing collection to a time series
138+
collection, see the (TODO: link to Time Series Collection Example) page.
139+
140+
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
=========================
2+
Topic Override Properties
3+
=========================
4+
5+
.. default-domain:: mongodb
6+
7+
.. contents:: On this page
8+
:local:
9+
:backlinks: none
10+
:depth: 2
11+
:class: singlecol
12+
13+
Overview
14+
--------
15+
16+
Use the following sink connector configuration settings to override global or
17+
default property settings for specific topics.
18+
19+
.. include:: /includes/sink-config-link.rst
20+
21+
Settings
22+
--------
23+
24+
.. list-table::
25+
:header-rows: 1
26+
:widths: 25 75
27+
28+
* - Name
29+
- Description
30+
31+
* - | **topic.override.<topicName>.<propertyName>**
32+
- | **Type:** string
33+
|
34+
| **Description:**
35+
| Specify a topic and property name to override the corresponding
36+
global or default property setting.
37+
38+
.. example::
39+
40+
The ``topic.override.foo.collection=bar`` setting instructs the
41+
sink connector to store data from the ``foo`` topic in the ``bar``
42+
collection.
43+
44+
.. note::
45+
46+
You can specify any valid configuration setting in the
47+
``<propertyName>`` segment on a per-topic basis except
48+
``connection.uri`` and ``topics``.
49+
50+
| **Default**: ``""``
51+
| **Accepted Values**: Accepted values specific to the overridden property
52+
53+
54+
Example
55+
-------
56+
57+
You can override the sink connector to sink data from specific topics. The
58+
following example configuration shows how you can define configuration
59+
settings for a topic named ``topicA``:
60+
61+
.. code-block:: properties
62+
63+
topic.override.topicA.collection=collectionA
64+
topic.override.topicA.max.batch.size=100
65+
topic.override.topicA.document.id.strategy=com.mongodb.kafka.connect.sink.processor.id.strategy.UuidStrategy
66+
topic.override.topicA.post.processor.chain=com.mongodb.kafka.connect.sink.processor.DocumentIdAdder,com.mongodb.kafka.connect.sink.processor.BlockListValueProjector
67+
topic.override.topicA.value.projection.type=BlockList
68+
topic.override.topicA.value.projection.list=k2,k4
69+
70+
After applying these configuration settings, the sink connector performs
71+
the following for data consumed from ``topicA``:
72+
73+
- Write documents to the MongoDB collection ``collectionA`` in batches of
74+
up to 100.
75+
- Generate a UUID value for each new document and write it to the ``_id``
76+
field.
77+
- Omit fields ``k2`` and ``k4`` from the value projection using the
78+
``BlockList`` projection type.
79+
80+
For an example of how to configure the Block List Projector, see the
81+
:ref:`Sink Post-Processors guide
82+
<https://docs.mongodb.com/kafka-connector/current/kafka-sink-postprocessors/#block-list---allow-list-projector>`.
83+

0 commit comments

Comments
 (0)