Skip to content

Commit 6013ed4

Browse files
authored
DOCSP-38212: change streams (#17)
1 parent 365bd0c commit 6013ed4

File tree

2 files changed

+142
-2
lines changed

2 files changed

+142
-2
lines changed

source/tutorials.txt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,9 @@ Tutorials
1313
/tutorials/encrypt/
1414
/tutorials/write-ops/
1515
/tutorials/aggregation/
16+
/tutorials/change-stream/
1617

1718
..
18-
/tutorials/change-stream/
1919
/tutorials/text-search/
2020
/tutorials/geo/
2121
/tutorials/gridfs/
@@ -28,9 +28,9 @@ Tutorials
2828
- :ref:`scala-encrypt`
2929
- :ref:`scala-write-ops`
3030
- :ref:`scala-aggregation`
31+
- :ref:`scala-changestream`
3132

3233
..
33-
- :ref:`scala-changestream`
3434
- :ref:`scala-text-search`
3535
- :ref:`scala-geo`
3636
- :ref:`scala-gridfs`

source/tutorials/change-stream.txt

Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
.. _scala-changestream:
2+
3+
==============
4+
Change Streams
5+
==============
6+
7+
.. facet::
8+
:name: genre
9+
:values: reference
10+
11+
.. meta::
12+
:keywords: code example, watch for changes
13+
14+
.. contents:: On this page
15+
:local:
16+
:backlinks: none
17+
:depth: 2
18+
:class: singlecol
19+
20+
MongoDB Server version 3.6 introduces the ``$changeStream`` aggregation pipeline
21+
operator.
22+
23+
Change streams provide a way to watch changes to documents in a
24+
collection. To improve the usability of this new stage, the
25+
``MongoCollection`` type includes the ``watch()`` method. The
26+
``ChangeStreamObservable`` instance sets up the change stream and automatically
27+
attempts to resume if it encounters a potentially recoverable error.
28+
29+
Prerequisites
30+
-------------
31+
32+
.. include:: /includes/prereq-restaurants.rst
33+
34+
.. code-block:: scala
35+
36+
import java.util.concurrent.CountDownLatch
37+
38+
import org.mongodb.scala._
39+
import org.mongodb.scala.model.Aggregates._
40+
import org.mongodb.scala.model.Filters._
41+
import org.mongodb.scala.model.changestream._
42+
43+
.. include:: /includes/obs-note.rst
44+
45+
Connect to a MongoDB Deployment
46+
-------------------------------
47+
48+
.. include:: /includes/connect-section.rst
49+
50+
Watch for Changes in a Collection
51+
---------------------------------
52+
53+
To create a change stream use one of the ``MongoCollection.watch()``
54+
methods.
55+
56+
In the following example, the change stream prints out all changes it
57+
observes:
58+
59+
.. code-block:: scala
60+
61+
case class LatchedObserver() extends Observer[ChangeStreamDocument[Document]] {
62+
val latch = new CountDownLatch(1)
63+
64+
override def onSubscribe(subscription: Subscription): Unit = subscription.request(Long.MaxValue) // Request data
65+
66+
override def onNext(changeDocument: ChangeStreamDocument[Document]): Unit = println(changeDocument)
67+
68+
override def onError(throwable: Throwable): Unit = {
69+
println(s"Error: '$throwable")
70+
latch.countDown()
71+
}
72+
73+
override def onComplete(): Unit = latch.countDown()
74+
75+
def await(): Unit = latch.await()
76+
}
77+
78+
val observer = LatchedObserver()
79+
collection.watch().subscribe(observer)
80+
observer.await() // Block waiting for the latch
81+
82+
Watch for Changes on a Database
83+
-------------------------------
84+
85+
Applications can open a single change stream to watch all non-system
86+
collections of a database. To create such a change stream, use one of the
87+
``MongoDatabase.watch()`` methods.
88+
89+
In the following example, the change stream prints out all the changes
90+
it observes on the given database:
91+
92+
.. code-block:: scala
93+
94+
val observer = LatchedObserver()
95+
database.watch().subscribe(observer)
96+
observer.await() // Block waiting for the latch
97+
98+
Watch for Changes on All Databases
99+
----------------------------------
100+
101+
Applications can open a single change stream to watch all non-system
102+
collections of all databases in a MongoDB deployment. To create such a
103+
change stream, use one of the ``MongoClient.watch()`` methods.
104+
105+
In the following example, the change stream prints out all the changes
106+
it observes in the deployment to which the ``MongoClient`` is connected:
107+
108+
.. code-block:: scala
109+
110+
val observer = LatchedObserver()
111+
client.watch().subscribe(observer)
112+
observer.await() // Block waiting for the latch
113+
114+
Filtering Content
115+
-----------------
116+
117+
You can pass a list of aggregation stages to the ``watch()`` method to
118+
modify the data returned by the ``$changeStream`` operator.
119+
120+
.. note::
121+
122+
Not all aggregation operators are supported. See
123+
:manual:`Change Streams </changeStreams/>` in the Server manual to learn more.
124+
125+
In the following example, the change stream prints out all changes it
126+
observes corresponding to ``insert``, ``update``, ``replace`` and
127+
``delete`` operations.
128+
129+
First, the pipeline includes a ``$match`` stage to filter for documents
130+
where the ``operationType`` is either an ``insert``, ``update``, ``replace`` or
131+
``delete``. Then, it sets the ``fullDocument`` to
132+
``FullDocument.UPDATE_LOOKUP``, so that the document after the update is
133+
included in the results:
134+
135+
.. code-block:: scala
136+
137+
val observer = LatchedObserver()
138+
collection.watch(Seq(Aggregates.filter(Filters.in("operationType", Seq("insert", "update", "replace", "delete")))))
139+
.fullDocument(FullDocument.UPDATE_LOOKUP).subscribe(observer)
140+
observer.await() // Block waiting for the latch

0 commit comments

Comments
 (0)