Skip to content

Commit 48db930

Browse files
committed
CXX-1414: Add ChangeStreams Support
1 parent e316778 commit 48db930

File tree

11 files changed

+710
-2
lines changed

11 files changed

+710
-2
lines changed

.travis.yml

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,8 +61,7 @@ install:
6161
# Install Mongo C Driver
6262
- pushd mongo-c-driver
6363

64-
# TODO: Update this to our real minimum for the C++11 driver 3.2 release, once known.
65-
- git checkout 1.9.2
64+
- git checkout master
6665

6766
- ./autogen.sh --enable-tests=no --enable-examples=no --with-libbson=bundled; make; sudo make install
6867

src/mongocxx/CMakeLists.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ add_subdirectory(config)
5050
set(mongocxx_sources
5151
bulk_write.cpp
5252
client.cpp
53+
change_stream.cpp
5354
collection.cpp
5455
cursor.cpp
5556
database.cpp
@@ -73,6 +74,7 @@ set(mongocxx_sources
7374
model/write.cpp
7475
options/aggregate.cpp
7576
options/bulk_write.cpp
77+
options/change_stream.cpp
7678
options/client.cpp
7779
options/count.cpp
7880
options/create_collection.cpp

src/mongocxx/change_stream.cpp

Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
// Copyright 2018-present MongoDB Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
#include <mongocxx/change_stream.hpp>
16+
17+
#include <cstdint>
18+
#include <memory>
19+
#include <string>
20+
#include <tuple>
21+
22+
#include <bsoncxx/private/libbson.hh>
23+
#include <bsoncxx/stdx/make_unique.hpp>
24+
#include <mongocxx/exception/private/error_category.hh>
25+
#include <mongocxx/exception/private/mongoc_error.hh>
26+
#include <mongocxx/exception/query_exception.hpp>
27+
#include <mongocxx/private/change_stream.hh>
28+
#include <mongocxx/private/libmongoc.hh>
29+
30+
#include <mongocxx/config/private/prelude.hh>
31+
32+
namespace mongocxx {
33+
MONGOCXX_INLINE_NAMESPACE_BEGIN
34+
35+
// Requirements for concept Iterator:
36+
// http://en.cppreference.com/w/cpp/concept/Iterator
37+
static_assert(std::is_copy_constructible<change_stream::iterator>::value, "");
38+
static_assert(std::is_copy_assignable<change_stream::iterator>::value, "");
39+
static_assert(std::is_destructible<change_stream::iterator>::value, "");
40+
41+
// Below basically assert that we have the traits on change_stream::iterator
42+
// so they can't be accidentally removed.
43+
static_assert(std::is_integral<change_stream::iterator::difference_type>::value, "");
44+
static_assert(std::is_class<change_stream::iterator::value_type>::value, "");
45+
static_assert(std::is_pointer<change_stream::iterator::pointer>::value, "");
46+
static_assert(std::is_reference<change_stream::iterator::reference>::value, "");
47+
48+
change_stream::change_stream(change_stream&&) noexcept = default;
49+
50+
change_stream& change_stream::operator=(change_stream&&) noexcept = default;
51+
52+
change_stream::~change_stream() = default;
53+
54+
change_stream::iterator change_stream::begin() {
55+
if (_impl->is_dead()) {
56+
return end();
57+
}
58+
return iterator{this};
59+
}
60+
61+
change_stream::iterator change_stream::end() {
62+
return iterator{};
63+
}
64+
65+
// void* since we don't leak C driver defs into C++ driver
66+
change_stream::change_stream(void* change_stream_ptr)
67+
: _impl(stdx::make_unique<impl>(*static_cast<mongoc_change_stream_t*>(change_stream_ptr))) {}
68+
69+
change_stream::iterator::iterator() : change_stream::iterator::iterator{nullptr} {}
70+
71+
const bsoncxx::document::view& change_stream::iterator::operator*() const noexcept {
72+
return _change_stream->_impl->doc();
73+
}
74+
75+
const bsoncxx::document::view* change_stream::iterator::operator->() const noexcept {
76+
return &_change_stream->_impl->doc();
77+
}
78+
79+
change_stream::iterator& change_stream::iterator::operator++() {
80+
_change_stream->_impl->advance_iterator();
81+
return *this;
82+
}
83+
84+
void change_stream::iterator::operator++(int) {
85+
operator++();
86+
}
87+
88+
change_stream::iterator::iterator(change_stream* change_stream) : _change_stream(change_stream) {
89+
if (!_change_stream || _change_stream->_impl->has_started()) {
90+
return;
91+
}
92+
93+
_change_stream->_impl->mark_started();
94+
// Advance to first event on begin() to keep operator*() state-machine-free.
95+
operator++();
96+
}
97+
98+
// Don't worry about the case of two iterators being created from
99+
// different change_streams
100+
bool MONGOCXX_CALL operator==(const change_stream::iterator& lhs,
101+
const change_stream::iterator& rhs) noexcept {
102+
return rhs.is_exhausted() && lhs.is_exhausted();
103+
}
104+
105+
bool MONGOCXX_CALL operator!=(const change_stream::iterator& lhs,
106+
const change_stream::iterator& rhs) noexcept {
107+
return !(lhs == rhs);
108+
}
109+
110+
bool change_stream::iterator::is_exhausted() const {
111+
// An iterator is exhausted if it is the end-iterator (_change_stream == nullptr)
112+
// or if the underlying _change_stream is marked exhausted.
113+
return !_change_stream || _change_stream->_impl->is_exhausted();
114+
}
115+
116+
MONGOCXX_INLINE_NAMESPACE_END
117+
} // namespace mongocxx

src/mongocxx/change_stream.hpp

Lines changed: 168 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,168 @@
1+
// Copyright 2018-present MongoDB Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
#pragma once
16+
17+
#include <memory>
18+
19+
#include <bsoncxx/document/view.hpp>
20+
#include <bsoncxx/stdx/optional.hpp>
21+
22+
#include <mongocxx/config/prelude.hpp>
23+
24+
namespace mongocxx {
25+
MONGOCXX_INLINE_NAMESPACE_BEGIN
26+
27+
class collection;
28+
29+
class MONGOCXX_API change_stream {
30+
public:
31+
class MONGOCXX_API iterator;
32+
33+
///
34+
/// Move constructs a change_stream.
35+
///
36+
change_stream(change_stream&& other) noexcept;
37+
38+
///
39+
/// Move assigns a change_stream.
40+
///
41+
change_stream& operator=(change_stream&& other) noexcept;
42+
43+
///
44+
/// Destroys a change_stream.
45+
///
46+
~change_stream();
47+
48+
///
49+
/// A change_stream::iterator points to the beginning of any
50+
/// available notifications. Each call to begin() advances to the next
51+
/// available notification. The state of all iterators is tracked by the
52+
/// change_stream itself, so advancing one iterator advances all iterators.
53+
///
54+
/// change_stream::begin() and the increment operators are blocking operations.
55+
/// They will not return until a notification is available, the max_await_time (from
56+
/// the options::change_stream) milliseconds have elapsed, or a server
57+
/// error is encountered.
58+
///
59+
/// When change_stream.begin() == change_stream.end(), no notifications
60+
/// are available. Each call to change_stream.begin() checks again for
61+
/// newly-available notifications.
62+
///
63+
/// @return
64+
/// The change_stream::iterator
65+
/// @exception
66+
/// Throws mongocxx::query_exception if the query failed.
67+
///
68+
iterator begin();
69+
70+
///
71+
/// A change_stream::iterator indicating stream exhaustion, meaning that
72+
/// no notifications are available from the stream.
73+
///
74+
/// @return
75+
/// The change_stream::iterator indicating exhaustion
76+
///
77+
iterator end();
78+
79+
private:
80+
friend class collection;
81+
friend class change_stream::iterator;
82+
83+
MONGOCXX_PRIVATE change_stream(void* change_stream_ptr);
84+
85+
class MONGOCXX_PRIVATE impl;
86+
std::unique_ptr<impl> _impl;
87+
};
88+
89+
class MONGOCXX_API change_stream::iterator {
90+
public:
91+
// Support input-iterator (caveat of post-increment returning void)
92+
using difference_type = long;
93+
using value_type = const bsoncxx::document::view;
94+
using pointer = std::add_pointer<value_type>::type;
95+
using reference = std::add_lvalue_reference<value_type>::type;
96+
using iterator_category = std::input_iterator_tag;
97+
98+
///
99+
/// Default-construct an iterator.
100+
/// This is equivalent to change_stream::end()
101+
///
102+
iterator();
103+
104+
///
105+
/// Dereferences the view for the document currently being pointed to.
106+
///
107+
const bsoncxx::document::view& operator*() const noexcept;
108+
109+
///
110+
/// Accesses a member of the dereferenced document currently being pointed to.
111+
///
112+
const bsoncxx::document::view* operator->() const noexcept;
113+
114+
///
115+
/// Pre-increments the iterator to move to the next document.
116+
///
117+
/// change_stream::begin() and increment operators are blocking operations.
118+
/// They will not return until a notification is available, the max_await_time (from
119+
/// the options::change_stream) miliseconds have elapsed, or a server
120+
/// error is encountered.
121+
///
122+
/// @throws mongocxx::query_exception if the query failed
123+
///
124+
iterator& operator++();
125+
126+
///
127+
/// Post-increments the iterator to move to the next document.
128+
///
129+
/// change_stream::begin() and increment operators are blocking operations.
130+
/// They will not return until a notification is available, the max_await_time (from
131+
/// the options::change_stream) miliseconds have elapsed, or a server
132+
/// error is encountered.
133+
///
134+
/// @throws mongocxx::query_exception if the query failed
135+
///
136+
void operator++(int);
137+
138+
private:
139+
friend class change_stream;
140+
141+
MONGOCXX_PRIVATE explicit iterator(change_stream* change_stream);
142+
143+
///
144+
/// @{
145+
///
146+
/// Compare two iterators for (in)-equality. Iterators compare equal if
147+
/// they point to the same underlying change_stream or if both are exhausted.
148+
///
149+
/// @relates iterator
150+
///
151+
friend MONGOCXX_API bool MONGOCXX_CALL operator==(const change_stream::iterator&,
152+
const change_stream::iterator&) noexcept;
153+
154+
friend MONGOCXX_API bool MONGOCXX_CALL operator!=(const change_stream::iterator&,
155+
const change_stream::iterator&) noexcept;
156+
///
157+
/// @}
158+
///
159+
160+
MONGOCXX_PRIVATE bool is_exhausted() const;
161+
162+
change_stream* _change_stream;
163+
};
164+
165+
MONGOCXX_INLINE_NAMESPACE_END
166+
} // namespace mongocxx
167+
168+
#include <mongocxx/config/postlude.hpp>

src/mongocxx/collection.cpp

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,35 @@ guard<T> make_guard(T&& t) {
138138
return guard<T>{std::forward<T>(t)};
139139
}
140140

141+
// TODO: Consider extending the builders to directly accept optional values.
142+
template <typename T>
143+
inline void append_if(bsoncxx::builder::basic::document& doc,
144+
const std::string& key,
145+
const mongocxx::stdx::optional<T>& opt) {
146+
if (opt) {
147+
doc.append(bsoncxx::builder::basic::kvp(key, opt.value()));
148+
}
149+
}
150+
151+
bsoncxx::document::value as_bson(const mongocxx::options::change_stream& cs) {
152+
// Construct new bson rep each time since values may change after this is called.
153+
bsoncxx::builder::basic::document out{};
154+
155+
append_if(out, "fullDocument", cs.full_document());
156+
append_if(out, "resumeAfter", cs.resume_after());
157+
append_if(out, "batchSize", cs.batch_size());
158+
159+
if (cs.max_await_time()) {
160+
auto count = cs.max_await_time().value().count();
161+
if ((count < 0) || (count >= std::numeric_limits<std::uint32_t>::max())) {
162+
throw mongocxx::logic_error{mongocxx::error_code::k_invalid_parameter};
163+
}
164+
out.append(bsoncxx::builder::basic::kvp("maxAwaitTimeMS", count));
165+
}
166+
167+
return out.extract();
168+
}
169+
141170
} // namespace
142171

143172
namespace mongocxx {
@@ -963,6 +992,21 @@ class write_concern collection::write_concern() const {
963992
return wc;
964993
}
965994

995+
class change_stream collection::watch(const options::change_stream& options) {
996+
return watch(pipeline{}, options);
997+
}
998+
999+
class change_stream collection::watch(const pipeline& pipe, const options::change_stream& options) {
1000+
scoped_bson_t pipeline_bson{bsoncxx::document::view(pipe._impl->view_array())};
1001+
1002+
scoped_bson_t options_bson;
1003+
options_bson.init_from_static(as_bson(options));
1004+
1005+
// NOTE: collection_watch copies what it needs so we're safe to destroy our copies.
1006+
return change_stream{libmongoc::collection_watch(
1007+
_get_impl().collection_t, pipeline_bson.bson(), options_bson.bson())};
1008+
}
1009+
9661010
class index_view collection::indexes() {
9671011
return index_view{_get_impl().collection_t};
9681012
}

0 commit comments

Comments
 (0)