Skip to content

Commit c87f87f

Browse files
p-mongop
andauthored
RUBY-2352 Add try_next example to resuming change stream user guide (#2079)
Co-authored-by: Oleg Pudeyev <[email protected]>
1 parent 5c7be27 commit c87f87f

File tree

1 file changed

+53
-18
lines changed

1 file changed

+53
-18
lines changed

source/tutorials/ruby-driver-change-streams.txt

Lines changed: 53 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -134,9 +134,15 @@ You can close a change stream by calling its ``#close`` method:
134134
Resuming a Change Stream
135135
========================
136136

137-
The driver will automatically retry getMore operations on a change stream
138-
once. Initial aggregation is never retried. In practical terms this means
139-
that, for example:
137+
A change stream consists of two types of operations: the initial aggregation
138+
and ``getMore`` requests to receive the next batch of changes.
139+
140+
The driver will automatically retry each ``getMore`` operation once on
141+
network errors and when the server returns an error indicating it changed
142+
state (for example, it is no longer the primary). The driver does not retry
143+
the initial aggregation.
144+
145+
In practical terms this means that, for example:
140146

141147
- Calling ``collection.watch`` will fail if the cluster does not have
142148
enough available nodes to satisfy the ``"majority"`` read preference.
@@ -145,34 +151,63 @@ that, for example:
145151
change stream reads via ``next`` or ``each`` methods will continue
146152
transparently to the application.
147153

148-
If the cluster loses enough nodes to not be able to satisfy the ``"majority"``
149-
read preference and does not heal quickly enough, ``next`` and ``each``
150-
will raise an error. In these cases the application must track, via the
151-
resume token, which documents from the change stream it has processed and
152-
create a new change stream object via the ``watch`` call, passing an
153-
appropriate ``:resume_after`` argument. The ``_id`` key in each change
154-
document can be used as a resume token. However, to get the most up-to-date
155-
resume token, use the ``resume_token`` method.
154+
To indefinitely and reliably watch for changes without losing any changes or
155+
processing a change more than once, the application must track the resume
156+
token for the change stream and restart the change stream when it experiences
157+
extended error conditions that cause the driver's automatic resume to also
158+
fail. The following code snippet shows an example of iterating a change stream
159+
indefinitely, retrieving the resume token using the ``resume_token`` change
160+
stream method and restarting the change stream using the ``:resume_after``
161+
option on all MongoDB or network errors:
156162

157163
.. code-block:: ruby
158164

159-
token = doc['_id']
160-
stream = collection.watch([], resume_after: token)
165+
token = nil
166+
loop do
167+
begin
168+
stream = collection.watch([], resume_after: token)
169+
enum = stream.to_enum
170+
while doc = enum.next
171+
process(doc)
172+
token = stream.resume_token
173+
end
174+
rescue Mongo::Error
175+
sleep 1
176+
end
177+
end
161178

162-
To watch a collection indefinitely, retrying on all MongoDB errors, using ``next``:
179+
The above iteration is blocking at the ``enum.next`` call, and does not
180+
permit resuming processing in the event the Ruby process running this code
181+
is terminated. The driver also provides the ``try_next`` method which returns
182+
``nil`` (after a small waiting period) instead of blocking indefinitely when
183+
there are no changes in the change stream. Using the ``try_next`` method,
184+
the resume token may be persisted after each ``getMore`` request, even when
185+
a particular request does not return any changes, such that the resume token
186+
remains at the top of the oplog and the application has an opportunity to
187+
persist it should the process handling changes terminates:
163188

164189
.. code-block:: ruby
165190

166191
token = nil
167-
while true
192+
loop do
168193
begin
169194
stream = collection.watch([], resume_after: token)
170195
enum = stream.to_enum
171-
while doc = enum.next
196+
doc = enum.try_next
197+
if doc
172198
process(doc)
173-
token = stream.resume_token
174199
end
200+
token = stream.resume_token
201+
# Persist +token+ to support resuming processing upon process restart
175202
rescue Mongo::Error
176203
sleep 1
177204
end
178-
end
205+
end
206+
207+
Note that the resume token should be retrieved from the change stream after
208+
every ``try_next`` call, even if the call returned no document.
209+
210+
The resume token is also provided in the ``_id`` field of each change stream
211+
document. Reading the ``_id`` field is not recommended because it may be
212+
projected out by the application, and because using only the ``_id`` field
213+
would not advance the resume token when a ``getMore`` returns no documents.

0 commit comments

Comments
 (0)