1+ .. _change-streams:
2+
13==============
24Change Streams
35==============
@@ -11,12 +13,19 @@ Change Streams
1113 :class: singlecol
1214
1315As of version 3.6 of the MongoDB server, a new ``$changeStream`` pipeline stage
14- is supported in the aggregation framework. The Ruby driver provides an API for
15- receiving notifications for changes to a particular collection using this
16+ is supported in the aggregation framework. Specifying this stage first in an
17+ aggregation pipeline allows users to request that notifications are sent for all
18+ changes to a particular collection. As of MongoDB 4.0, change streams are
19+ supported on databases and clusters in addition to collections.
20+
21+ The Ruby driver provides an API for
22+ receiving notifications for changes to a particular collection, database
23+ or cluster using this
1624new pipeline stage. Although you can create a change stream using the pipeline
1725operator and aggregation framework directly, it is recommended to use the
18- driver API described below as the driver resumes the change stream if there is
19- a timeout, a network error or another type of a resumable error.
26+ driver API described below as the driver resumes the change stream one time
27+ if there is a timeout, a network error, a server error indicating that a
28+ failover is taking place or another type of a resumable error.
2029
2130Change streams on the server require a ``"majority"`` read concern or no
2231read concern.
@@ -28,13 +37,16 @@ getMores to be called in a loop in the background.
2837
2938.. _here: https://github.com/jruby/jruby/issues/4212
3039
31- Watching for changes on a particular collection
32- -----------------------------------------------
40+ Watching for Changes on a Collection
41+ ------------------------------------
3342
34- A change stream is created by calling the ``#watch`` method on a collection:
43+ A collection change stream is created by calling the ``#watch`` method on a
44+ collection:
3545
3646.. code-block:: ruby
3747
48+ client = Mongo::Client.new([ '127.0.0.1:27017' ], :database => 'test')
49+ collection = client[:test]
3850 stream = collection.watch
3951 collection.insert_one(a: 1)
4052 doc = stream.to_enum.next
@@ -45,12 +57,16 @@ You can also receive the notifications as they become available:
4557
4658.. code-block:: ruby
4759
48- stream = client[:test] .watch
60+ stream = collection .watch
4961 enum = stream.to_enum
5062 while doc = enum.next
5163 process(doc)
5264 end
5365
66+ The ``next`` method blocks and polls the cluster until a change is available.
67+ If there is a non-resumable error, ``next`` will raise an exception.
68+ See Resuming a Change Stream section below for an example that reads
69+ changes from a collection indefinitely.
5470
5571The change stream can take filters in the aggregation framework pipeline
5672operator format:
@@ -65,19 +81,53 @@ operator format:
6581 process(doc)
6682 end
6783
68- Close a Change Stream
69- ---------------------
84+ Watching for Changes on a Database
85+ ----------------------------------
7086
71- You can close a change stream by calling the ``#close`` method:
87+ A database change stream notifies on changes on any collection within the
88+ database as well as database-wide events, such as the database being dropped.
89+
90+ A database change stream is created by calling the ``#watch`` method on a
91+ database object:
7292
7393.. code-block:: ruby
7494
75- stream = collection.watch
76- collection.insert_one(a: 1)
95+ client = Mongo::Client.new([ '127.0.0.1:27017' ], :database => 'test')
96+ database = client.database
97+ stream = database.watch
98+ client[:test].insert_one(a: 1)
7799 doc = stream.to_enum.next
78100 process(doc)
101+
102+
103+ Watching for Changes on a Cluster
104+ ---------------------------------
105+
106+ A cluster change stream notifies on changes on any collection, any database
107+ within the cluster as well as cluster-wide events.
108+
109+ A cluster change stream is created by calling the ``#watch`` method on a
110+ client object (not the cluster object):
111+
112+ .. code-block:: ruby
113+
114+ client = Mongo::Client.new([ '127.0.0.1:27017' ], :database => 'test')
115+ stream = client.watch
116+ client[:test].insert_one(a: 1)
117+ doc = stream.to_enum.next
118+ process(doc)
119+
120+
121+ Closing a Change Stream
122+ -----------------------
123+
124+ You can close a change stream by calling its ``#close`` method:
125+
126+ .. code-block:: ruby
127+
79128 stream.close
80129
130+
81131Resuming a Change Stream
82132------------------------
83133
@@ -97,4 +147,28 @@ read preference and does not heal quickly enough, ``next`` and ``each``
97147will raise an error. In these cases the application must track, via the
98148resume token, which documents from the change stream it has processed and
99149create a new change stream object via the ``watch`` call, passing an
100- appropriate ``:resume_after`` argument.
150+ appropriate ``:resume_after`` argument. The resume token is exposed
151+ at the key ``_id`` in each change document.
152+
153+ .. code-block:: ruby
154+
155+ token = doc['_id']
156+ stream = collection.watch([], resume_after: token)
157+
158+ To watch a collection indefinitely, retrying on all MongoDB errors:
159+
160+ .. code-block:: ruby
161+
162+ token = nil
163+ while true
164+ begin
165+ stream = collection.watch([], resume_after: token)
166+ enum = stream.to_enum
167+ while doc = enum.next
168+ process(doc)
169+ token = doc['_id']
170+ end
171+ rescue Mongo::Error
172+ sleep 1
173+ end
174+ end
0 commit comments