Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions source/fundamentals/crud/read-operations.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ Read Operations

/fundamentals/crud/read-operations/retrieve
/fundamentals/crud/read-operations/count
/fundamentals/crud/read-operations/change-streams

- :ref:`csharp-retrieve`
- :ref:`csharp-count-documents`
- :ref:`csharp-change-streams`
316 changes: 316 additions & 0 deletions source/fundamentals/crud/read-operations/change-streams.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,316 @@
.. _csharp-change-streams:

====================
Monitor Data Changes
====================

.. contents:: On this page
:local:
:backlinks: none
:depth: 2
:class: singlecol

.. facet::
:name: genre
:values: reference

.. meta::
:keywords: watch, code example

Overview
--------

In this guide, you can learn how to use a **change stream** to monitor real-time
changes to your data. A change stream is a {+mdb-server+} feature that
allows your application to subscribe to data changes on a collection, database,
or deployment.

Sample Data
~~~~~~~~~~~

The examples in this guide use the ``sample_restaurants.restaurants`` collection
from the :atlas:`Atlas sample datasets </sample-data>`. To learn how to create a
free MongoDB Atlas cluster and load the sample datasets, see the :ref:`<csharp-quickstart>`.

The examples on this page use the following ``Restaurant``, ``Address``, and ``GradeEntry``
classes as models:

.. literalinclude:: /includes/code-examples/Restaurant.cs
:language: csharp
:copyable:
:dedent:

.. literalinclude:: /includes/code-examples/Address.cs
:language: csharp
:copyable:
:dedent:

.. literalinclude:: /includes/code-examples/GradeEntry.cs
:language: csharp
:copyable:
:dedent:

.. include:: /includes/convention-pack-note.rst

Open a Change Stream
--------------------

To open a change stream, call the ``Watch()`` or ``WatchAsync()`` method. The instance on which you
call the method determines the scope of events that the change
stream listens for. You can call the ``Watch()`` or ``WatchAsync()`` method on the following
classes:

- ``MongoClient``: To monitor all changes in the MongoDB deployment
- ``Database``: To monitor changes in all collections in the database
- ``Collection``: To monitor changes in the collection

The following example opens a change stream on the ``restaurants`` collection
and outputs the changes as they occur. Select the
:guilabel:`Asynchronous` or :guilabel:`Synchronous` tab to see the corresponding
code.

.. tabs::

.. tab:: Asynchronous
:tabid: change-stream-async

.. literalinclude:: /includes/code-examples/change-streams/change-streams.cs
:start-after: start-open-change-stream-async
:end-before: end-open-change-stream-async
:language: csharp

.. tab:: Synchronous
:tabid: change-stream-sync

.. literalinclude:: /includes/code-examples/change-streams/change-streams.cs
:start-after: start-open-change-stream
:end-before: end-open-change-stream
:language: csharp

To begin watching for changes, run the application. Then, in a separate
application or shell, modify the ``restaurants`` collection. Updating a document
that has a ``"name"`` value of ``"Blarney Castle"`` results in the following
change stream output:

.. code-block:: sh
:copyable: false

{ "_id" : { "_data" : "..." }, "operationType" : "update", "clusterTime" : Timestamp(...),
"wallTime" : ISODate("..."), "ns" : { "db" : "sample_restaurants", "coll" : "restaurants" },
"documentKey" : { "_id" : ObjectId("...") }, "updateDescription" : { "updatedFields" : { "cuisine" : "Irish" },
"removedFields" : [], "truncatedArrays" : [] } }

Modify the Change Stream Output
-------------------------------

You can pass the ``pipeline`` parameter to the ``Watch()`` and ``WatchAsync()``
methods to modify the change stream output. This parameter allows you to watch
for only specified change events. Create the pipeline by using the
``EmptyPipelineDefinition`` class and appending the relevant aggregation stage methods.

You can specify the following aggregation stages in the ``pipeline`` parameter:

- ``$addFields``
- ``$match``
- ``$project``
- ``$replaceRoot``
- ``$replaceWith``
- ``$redact``
- ``$set``
- ``$unset``

To learn how to build an aggregation pipeline by using the
``PipelineDefinitionBuilder`` class, see :ref:`csharp-builders-aggregation` in
the Operations with Builders guide.

The following example uses the ``pipeline`` parameter to open a change stream
that records only update operations. Select the :guilabel:`Asynchronous` or :guilabel:`Synchronous` tab to see the
corresponding code.

.. tabs::

.. tab:: Asynchronous
:tabid: change-stream-async

.. literalinclude:: /includes/code-examples/change-streams/change-streams.cs
:start-after: start-change-stream-pipeline-async
:end-before: end-change-stream-pipeline-async
:language: csharp

.. tab:: Synchronous
:tabid: change-stream-sync

.. literalinclude:: /includes/code-examples/change-streams/change-streams.cs
:start-after: start-change-stream-pipeline
:end-before: end-change-stream-pipeline
:language: csharp

To learn more about modifying your change stream output, see the
:manual:`Modify Change Stream Output
</changeStreams/#modify-change-stream-output>` section in the {+mdb-server+}
manual.

Modify ``Watch()`` Behavior
---------------------------

The ``Watch()`` and ``WatchAsync()`` methods accept optional parameters, which represent
options you can use to configure the operation. If you don't specify any
options, the driver does not customize the operation.

The following table describes the options you can set to customize the behavior
of ``Watch()`` and ``WatchAsync()``:

.. list-table::
:widths: 30 70
:header-rows: 1

* - Option
- Description

* - ``FullDocument``
- | Specifies whether to show the full document after the change, rather
than showing only the changes made to the document. To learn more about
this option, see :ref:`csharp-change-stream-pre-post-image`.

* - ``FullDocumentBeforeChange``
- | Specifies whether to show the full document as it was before the change, rather
than showing only the changes made to the document. To learn more about
this option, see :ref:`csharp-change-stream-pre-post-image`.

* - ``ResumeAfter``
- | Directs ``Watch()`` or ``WatchAsync()`` to resume returning changes after the
operation specified in the resume token.
| Each change stream event document includes a resume token as the ``_id``
field. Pass the entire ``_id`` field of the change event document that
represents the operation you want to resume after.
| ``ResumeAfter`` is mutually exclusive with ``StartAfter`` and ``StartAtOperationTime``.

* - ``StartAfter``
- | Directs ``Watch()`` or ``WatchAsync()`` to start a new change stream after the
operation specified in the resume token. Allows notifications to
resume after an invalidate event.
| Each change stream event document includes a resume token as the ``_id``
field. Pass the entire ``_id`` field of the change event document that
represents the operation you want to resume after.
| ``StartAfter`` is mutually exclusive with ``ResumeAfter`` and ``StartAtOperationTime``.

* - ``StartAtOperationTime``
- | Directs ``Watch()`` or ``WatchAsync()`` to return only events that occur after the
specified timestamp.
| ``StartAtOperationTime`` is mutually exclusive with ``ResumeAfter`` and ``StartAfter``.

* - ``MaxAwaitTime``
- | Specifies the maximum amount of time, in milliseconds, the server waits for new
data changes to report to the change stream cursor before returning an
empty batch. Defaults to 1000 milliseconds.

* - ``ShowExpandedEvents``
- | Starting in {+mdb-server+} v6.0, change streams support change notifications
for Data Definition Language (DDL) events, such as the ``createIndexes`` and ``dropIndexes`` events. To
include expanded events in a change stream, create the change stream
cursor and set this parameter to ``True``.

* - ``BatchSize``
- | Specifies the maximum number of change events to return in each batch of the
response from the MongoDB cluster.

* - ``Collation``
- | Specifies the collation to use for the change stream cursor.

* - ``Comment``
- | Attaches a comment to the operation.

.. _csharp-change-stream-pre-post-image:

Include Pre-Images and Post-Images
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

.. important::

You can enable pre-images and post-images on collections only if your
deployment uses MongoDB v6.0 or later.

By default, when you perform an operation on a collection, the
corresponding change event includes only the delta of the fields
modified by that operation. To see the full document before or after a
change, create a ``ChangeStreamOptions`` object and specify the
``FullDocumentBeforeChange`` or the ``FullDocument`` options. Then, pass the
``ChangeStreamOptions`` object to the ``Watch()`` or ``WatchAsync()`` method.

The **pre-image** is the full version of a document *before* a change. To include the
pre-image in the change stream event, set the ``FullDocumentBeforeChange``
option to one of the following values:

- ``ChangeStreamFullDocumentBeforeChangeOption.WhenAvailable``: The change event
includes a pre-image of the modified document for change events only if the
pre-image is available.
- ``ChangeStreamFullDocumentBeforeChangeOption.Required``:
The change event includes a pre-image of the modified document for change
events. If the pre-image is not available, the driver raises an error.

The **post-image** is the full version of a document *after* a change. To include the
post-image in the change stream event, set the ``FullDocument`` option to
one of the following values:

- ``ChangeStreamFullDocumentOption.UpdateLookup``: The change event includes a
copy of the entire changed document from some time after the change.
- ``ChangeStreamFullDocumentOption.WhenAvailable``: The change event includes a
post-image of the modified document for change events only if the post-image
is available.
- ``ChangeStreamFullDocumentOption.Required``: The change event includes a
post-image of the modified document for change events. If the post-image is
not available, the driver raises an error.

The following example opens a change stream on a collection and includes the post-image
of updated documents by specifying the ``FullDocument`` option. Select the
:guilabel:`Asynchronous` or :guilabel:`Synchronous` tab to see the corresponding
code.

.. tabs::

.. tab:: Asynchronous
:tabid: change-stream-async

.. literalinclude:: /includes/code-examples/change-streams/change-streams.cs
:start-after: start-change-stream-post-image-async
:end-before: end-change-stream-post-image-async
:language: csharp

.. tab:: Synchronous
:tabid: change-stream-sync

.. literalinclude:: /includes/code-examples/change-streams/change-streams.cs
:start-after: start-change-stream-post-image
:end-before: end-change-stream-post-image
:language: csharp

Running the preceding code example and updating a document that has a ``"name"``
value of ``"Blarney Castle"`` results in the following change stream output:

.. code-block:: sh
:copyable: false

{ "_id" : ObjectId("..."), "name" : "Blarney Castle", "restaurant_id" : "40366356",
"cuisine" : "Traditional Irish", "address" : { "building" : "202-24", "coord" : [-73.925044200000002, 40.5595462],
"street" : "Rockaway Point Boulevard", "zipcode" : "11697" }, "borough" : "Queens", "grades" : [...] }

To learn more about pre-images and post-images, see
:manual:`Change Streams with Document Pre- and Post-Images </changeStreams#change-streams-with-document-pre--and-post-images>`
in the {+mdb-server+} manual.

Additional Information
----------------------

To learn more about change streams, see :manual:`Change Streams
</changeStreams>` in the {+mdb-server+} manual.

API Documentation
~~~~~~~~~~~~~~~~~

To learn more about any of the methods or types discussed in this
guide, see the following API documentation:

- `Watch() <{+new-api-root+}/MongoDB.Driver/MongoDB.Driver.IMongoClient.Watch.html>`__
- `WatchAsync() <{+new-api-root+}/MongoDB.Driver/MongoDB.Driver.IMongoClient.WatchAsync.html>`__
- `ChangeStreamOptions <{+new-api-root+}/MongoDB.Driver/MongoDB.Driver.ChangeStreamOptions.html>`__
- `UpdateOne() <{+new-api-root+}/MongoDB.Driver/MongoDB.Driver.IMongoCollectionExtensions.UpdateOne.html>`__
Loading