|
| 1 | +.. _tutorial-migrate-time-series: |
| 2 | + |
| 3 | +========================================================== |
| 4 | +Migrate an Existing Collection to a Time Series Collection |
| 5 | +========================================================== |
| 6 | + |
| 7 | +.. default-domain:: mongodb |
| 8 | + |
| 9 | +In this tutorial, you can learn how to convert an existing MongoDB |
| 10 | +collection to a **time series collection** using the {+mkc+}. |
| 11 | + |
| 12 | +Time series collections efficiently store sequences of measurements |
| 13 | +over a period of time. Time series data consists of measurement data collected |
| 14 | +over time, metadata that describes the measurement, and the time of the |
| 15 | +measurement. |
| 16 | + |
| 17 | +You can configure the source connector to read your existing MongoDB |
| 18 | +collection and the sink connector to write Kafka topic data into a MongoDB |
| 19 | +time series collection. |
| 20 | + |
| 21 | +To learn more about MongoDB time series collections, see the MongoDB |
| 22 | +manual page on :manual:`Time Series Collections </core/timeseries-collections/>`. |
| 23 | + |
| 24 | +Scenario |
| 25 | +-------- |
| 26 | + |
| 27 | +Suppose you accumulated stock price data in a MongoDB collection and have |
| 28 | +the following needs: |
| 29 | + |
| 30 | +- More efficient storage of the price data |
| 31 | +- Maintain the ability to analyze stock performance over time using |
| 32 | + aggregation operators |
| 33 | + |
| 34 | +After reading about MongoDB time series collections, you decide to migrate |
| 35 | +your existing collection into a time series one. Learn how to perform this |
| 36 | +migration in the following sections. |
| 37 | + |
| 38 | +Steps to Migrate to a Time Series Collection |
| 39 | +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ |
| 40 | + |
| 41 | +To migrate an existing MongoDB collection to a time series collection, you |
| 42 | +need to perform the following tasks: |
| 43 | + |
| 44 | +1) :ref:`Identify the time field <time-series-identify-field>` in the existing |
| 45 | + stock price data document. |
| 46 | +2) :ref:`Configure a source connector <time-series-source-config>` to copy |
| 47 | + the existing collection data to a Kafka topic. |
| 48 | +3) :ref:`Configure a sink connector <time-series-sink-config>` to copy the |
| 49 | + Kafka topic data to the time series collection. |
| 50 | +4) :ref:`Verify the connector migrated the data <time-series-verify-collection>` to the time |
| 51 | + series collection. |
| 52 | + |
| 53 | +.. _time-series-identify-field: |
| 54 | + |
| 55 | +Identify the Time Field |
| 56 | +~~~~~~~~~~~~~~~~~~~~~~~ |
| 57 | + |
| 58 | +Before you create a time series collection, you need to identify the |
| 59 | +**time field**. The time field is the document field that MongoDB uses to |
| 60 | +distinguish the time of the measurement. The value of this field can be |
| 61 | +a string, integer, or ISO date. Make sure to set the |
| 62 | +``timeseries.timefield.auto.convert`` setting to instruct the connector to |
| 63 | +automatically convert the value to a date. |
| 64 | + |
| 65 | +The following document shows the format of stock price data documents in |
| 66 | +the existing MongoDB collection: |
| 67 | + |
| 68 | +.. code-block:: javascript |
| 69 | + :copyable: false |
| 70 | + |
| 71 | + { |
| 72 | + tx_time: 2021-07-12T05:20:35Z, |
| 73 | + symbol: 'WSV', |
| 74 | + company_name: 'WORRIED SNAKEBITE VENTURES', |
| 75 | + price: 21.22, |
| 76 | + _id: ObjectId("...") |
| 77 | + } |
| 78 | + |
| 79 | +For this scenario, assume you stored these documents in a collection named |
| 80 | +``PriceData`` in the ``Stocks`` database. |
| 81 | + |
| 82 | +You identify that the ``tx_time`` field distinguishes the time of the |
| 83 | +measurements, and specify it as your time field in your sink connector |
| 84 | +configuration. |
| 85 | + |
| 86 | +Learn how to set your time field and field conversion in the |
| 87 | +:ref:`time series configuration guide <time-series-sink-config>`. |
| 88 | + |
| 89 | +.. _time-series-source-config: |
| 90 | + |
| 91 | +Configure the Source Connector |
| 92 | +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ |
| 93 | + |
| 94 | +To copy data from the ``PriceData`` MongoDB collection data and publish it |
| 95 | +to the ``marketdata.Stocks.PriceData`` Kafka topic, create a source |
| 96 | +connector with the following configuration: |
| 97 | + |
| 98 | +.. code-block:: json |
| 99 | + |
| 100 | + { |
| 101 | + "name": "mongo-source-marketdata", |
| 102 | + "config": { |
| 103 | + "tasks.max":"1", |
| 104 | + "connector.class":"com.mongodb.kafka.connect.MongoSourceConnector", |
| 105 | + "key.converter":"org.apache.kafka.connect.storage.StringConverter", |
| 106 | + "value.converter":"org.apache.kafka.connect.json.JsonConverter", |
| 107 | + "publish.full.document.only":"true", |
| 108 | + "connection.uri":"<your connection uri>", |
| 109 | + "topic.prefix":"marketdata", |
| 110 | + "database":"Stocks", |
| 111 | + "collection":"PriceData", |
| 112 | + "copy.existing":"true" |
| 113 | + }} |
| 114 | + |
| 115 | +.. note:: |
| 116 | + |
| 117 | + If you insert documents into a collection during the copying process, |
| 118 | + the connector inserts them after the process is complete. |
| 119 | + |
| 120 | +After you start your source connector with the preceding configuration, |
| 121 | +the connector starts the copying process. Once the process is complete, |
| 122 | +you should see the following message in the log: |
| 123 | + |
| 124 | +.. code-block:: none |
| 125 | + :copyable: false |
| 126 | + |
| 127 | + Finished copying existing data from the collection(s). |
| 128 | + |
| 129 | +Your data from the ``PriceData`` MongoDB collection is now available in |
| 130 | +the ``marketdata.Stocks.PriceData`` Kafka topic. |
| 131 | + |
| 132 | +.. _time-series-sink-config: |
| 133 | + |
| 134 | +Configure the Sink Connector |
| 135 | +~~~~~~~~~~~~~~~~~~~~~~~~~~~~ |
| 136 | + |
| 137 | +To consume data from the ``marketdata.Stocks.PriceData`` Kafka topic and write |
| 138 | +it to a time series collection named ``StockDataMigrate`` in a database |
| 139 | +named ``Stocks``, you can create the following source connector configuration: |
| 140 | + |
| 141 | +.. code-block:: json |
| 142 | + :emphasize-lines: 12-14 |
| 143 | + |
| 144 | + { |
| 145 | + "name": "mongo-sink-marketdata", |
| 146 | + "config": { |
| 147 | + "connector.class":"com.mongodb.kafka.connect.MongoSinkConnector", |
| 148 | + "tasks.max":"1", |
| 149 | + "topics":"marketdata.Stocks.PriceData", |
| 150 | + "connection.uri":"<your connection uri>", |
| 151 | + "database":"Stocks", |
| 152 | + "collection":"StockDataMigrate", |
| 153 | + "key.converter":"org.apache.kafka.connect.storage.StringConverter", |
| 154 | + "value.converter":"org.apache.kafka.connect.json.JsonConverter", |
| 155 | + "timeseries.timefield":"tx_time", |
| 156 | + "timeseries.timefield.auto.convert":"true", |
| 157 | + "timeseries.timefield.auto.convert.date.format":"yyyy-MM-dd'T'HH:mm:ss'Z'" |
| 158 | + }} |
| 159 | + |
| 160 | +.. tip:: |
| 161 | + |
| 162 | + The sink connector configuration above uses the time field date |
| 163 | + format converter. Alternatively, you can use the ``TimestampConverter`` |
| 164 | + Single Message Transform (SMT) to convert the ``tx_time`` field from a |
| 165 | + ``String`` to an ``ISODate``. When using the ``TimestampConverter`` SMT, |
| 166 | + you must define a schema for the data in the Kafka topic. |
| 167 | + |
| 168 | + For information on how to use the ``TimestampConverter`` SMT, see the |
| 169 | + `TimestampConverter <https://docs.confluent.io/platform/current/connect/transforms/timestampconverter.html#timestampconverter>`__ |
| 170 | + Confluent documentation. |
| 171 | + |
| 172 | +After your sink connector finishes processing the topic data, the documents |
| 173 | +in the ``StockDataMigrate`` time series collection contain the ``tx_time`` |
| 174 | +field with an ``ISODate`` type value. |
| 175 | + |
| 176 | +.. _time-series-verify-collection: |
| 177 | + |
| 178 | +Verify the Collection Data |
| 179 | +~~~~~~~~~~~~~~~~~~~~~~~~~~ |
| 180 | + |
| 181 | +By this step, your time series collection should contain all the market data |
| 182 | +from your ``PriceData`` collection. The following shows the format of the |
| 183 | +documents in the ``StockDataMigrate`` time series collection: |
| 184 | + |
| 185 | +.. code-block:: javascript |
| 186 | + |
| 187 | + { |
| 188 | + tx_time: 2021-07-12T20:05:35.000Z, |
| 189 | + symbol: 'WSV', |
| 190 | + company_name: 'WORRIED SNAKEBITE VENTURES', |
| 191 | + price: 21.22, |
| 192 | + _id: ObjectId("...") |
| 193 | + } |
| 194 | + |
| 195 | +To learn how to verify a collection is of type **timeseries**, see the |
| 196 | +instructions on how to :manual:`Check if a Collection is of Type Time Series </core/timeseries-collections/#check-if-a-collection-is-of-type-time-series>` |
| 197 | +in the MongoDB manual. |
| 198 | + |
0 commit comments