Skip to content

Commit c687994

Browse files
authored
RUBY-1547 Inline cluster rescans must be concurrent/done by monitor (#1142)
1 parent dad0e5a commit c687994

File tree

9 files changed

+103
-40
lines changed

9 files changed

+103
-40
lines changed

lib/mongo/cluster.rb

Lines changed: 26 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -426,17 +426,38 @@ def reconnect!
426426

427427
# Force a scan of all known servers in the cluster.
428428
#
429+
# If the sync parameter is true which is the default, the scan is
430+
# performed synchronously in the thread which called this method.
431+
# Each server in the cluster is checked sequentially. If there are
432+
# many servers in the cluster or they are slow to respond, this
433+
# can be a long running operation.
434+
#
435+
# If the sync parameter is false, this method instructs all server
436+
# monitor threads to perform an immediate scan and returns without
437+
# waiting for scan results.
438+
#
439+
# @note In both synchronous and asynchronous scans, each monitor
440+
# thread maintains a minimum interval between scans, meaning
441+
# calling this method may not initiate a scan on a particular server
442+
# the very next instant.
443+
#
429444
# @example Force a full cluster scan.
430445
# cluster.scan!
431446
#
432-
# @note This operation is done synchronously. If servers in the cluster are
433-
# down or slow to respond this can potentially be a slow operation.
434-
#
435447
# @return [ true ] Always true.
436448
#
437449
# @since 2.0.0
438-
def scan!
439-
servers_list.each{ |server| server.scan! } and true
450+
def scan!(sync=true)
451+
if sync
452+
servers_list.each do |server|
453+
server.scan!
454+
end
455+
else
456+
servers_list.each do |server|
457+
server.monitor.scan_semaphore.signal
458+
end
459+
end
460+
true
440461
end
441462

442463
# Determine if this cluster of servers is equal to another object. Checks the

lib/mongo/retryable.rb

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ def read_with_retry(session = nil)
4444
rescue Error::SocketError, Error::SocketTimeoutError => e
4545
raise(e) if attempt > cluster.max_read_retries || (session && session.in_transaction?)
4646
log_retry(e)
47-
cluster.scan!
47+
cluster.scan!(false)
4848
retry
4949
rescue Error::OperationFailure => e
5050
if cluster.sharded? && e.retryable? && !(session && session.in_transaction?)
@@ -133,17 +133,17 @@ def retry_write_allowed?(session, write_concern)
133133
end
134134

135135
def retry_write(original_error, txn_num, &block)
136-
cluster.scan!
136+
cluster.scan!(false)
137137
server = cluster.next_primary
138138
raise original_error unless (server.retry_writes? && txn_num)
139139
log_retry(original_error)
140140
yield(server, txn_num)
141141
rescue Error::SocketError, Error::SocketTimeoutError => e
142-
cluster.scan!
142+
cluster.scan!(false)
143143
raise e
144144
rescue Error::OperationFailure => e
145145
raise original_error unless e.write_retryable?
146-
cluster.scan!
146+
cluster.scan!(false)
147147
raise e
148148
rescue
149149
raise original_error
@@ -162,7 +162,7 @@ def legacy_write_with_retry(server = nil, session = nil)
162162
raise(e) if attempt > Cluster::MAX_WRITE_RETRIES
163163
if e.write_retryable? && !(session && session.in_transaction?)
164164
log_retry(e)
165-
cluster.scan!
165+
cluster.scan!(false)
166166
retry
167167
else
168168
raise(e)

lib/mongo/semaphore.rb

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,5 +36,11 @@ def broadcast
3636
@cv.broadcast
3737
end
3838
end
39+
40+
def signal
41+
@lock.synchronize do
42+
@cv.signal
43+
end
44+
end
3945
end
4046
end

lib/mongo/server/monitor.rb

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ def initialize(address, event_listeners, monitoring, options = {})
6666
@monitoring = monitoring
6767
@options = options.freeze
6868
@round_trip_time_averager = RoundTripTimeAverager.new
69+
@scan_semaphore = Semaphore.new
6970
# This is a Mongo::Server::Monitor::Connection
7071
@connection = Connection.new(address, options)
7172
@last_scan = nil
@@ -107,6 +108,9 @@ def heartbeat_frequency
107108
@heartbeat_frequency ||= options[:heartbeat_frequency] || HEARTBEAT_FREQUENCY
108109
end
109110

111+
# @api private
112+
attr_reader :scan_semaphore
113+
110114
# Runs the server monitor. Refreshing happens on a separate thread per
111115
# server.
112116
#
@@ -120,7 +124,7 @@ def run!
120124
@thread = Thread.new(heartbeat_frequency) do |i|
121125
loop do
122126
scan!
123-
sleep(i)
127+
@scan_semaphore.wait(i)
124128
end
125129
end
126130
end

lib/mongo/server_selector/selectable.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,7 @@ def select_server(cluster, ping = nil)
132132

133133
return server
134134
end
135-
cluster.scan!
135+
cluster.scan!(false)
136136
end
137137
raise Error::NoServerAvailable.new(self, cluster)
138138
end

spec/integration/retryable_writes_spec.rb

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,9 @@
164164
end
165165

166166
before do
167-
expect(client.cluster).to receive(:scan!).twice.and_call_original
167+
# server selector can call scan! until it finds a server,
168+
# hence more than two scan! calls may be issued
169+
expect(client.cluster).to receive(:scan!).at_least(:twice).and_call_original
168170
end
169171

170172
it 'does not retry writes and raises the second error' do
@@ -178,7 +180,9 @@
178180
context 'when the second error is a SocketTimeoutError' do
179181

180182
before do
181-
expect(client.cluster).to receive(:scan!).twice.and_call_original
183+
# server selector can call scan! until it finds a server,
184+
# hence more than two scan! calls may be issued
185+
expect(client.cluster).to receive(:scan!).at_least(:twice).and_call_original
182186
end
183187

184188
let(:second_error) do
@@ -196,7 +200,9 @@
196200
context 'when the second error is a retryable OperationFailure' do
197201

198202
before do
199-
expect(client.cluster).to receive(:scan!).twice.and_call_original
203+
# server selector can call scan! until it finds a server,
204+
# hence more than two scan! calls may be issued
205+
expect(client.cluster).to receive(:scan!).at_least(:twice).and_call_original
200206
end
201207

202208
let(:second_error) do

spec/mongo/operation/delete/op_msg_spec.rb

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -143,17 +143,21 @@
143143
context 'when an implicit session is created and the topology is then updated and the server does not support sessions' do
144144

145145
let(:expected_global_args) do
146-
global_args.delete(:lsid)
147-
global_args
146+
global_args.dup.tap do |args|
147+
args.delete(:lsid)
148+
end
148149
end
149150

150151
before do
151152
session.instance_variable_set(:@options, { implicit: true })
153+
# Topology is standalone, hence there is exactly one server
154+
authorized_client.cluster.servers.first.monitor.stop!(true)
152155
allow(authorized_primary.features).to receive(:sessions_enabled?).and_return(false)
153156
end
154157

155158
it 'creates the correct OP_MSG message' do
156159
authorized_client.command(ping:1)
160+
expect(expected_global_args[:session]).to be nil
157161
expect(Mongo::Protocol::Msg).to receive(:new).with([:none], {}, expected_global_args, expected_payload_1)
158162
op.send(:message, authorized_primary)
159163
end
@@ -176,8 +180,10 @@
176180
context 'when the topology is replica set or sharded', if: test_sessions? do
177181

178182
let(:expected_global_args) do
179-
global_args.delete(:lsid)
180-
global_args.merge!(Mongo::Operation::CLUSTER_TIME => authorized_client.cluster.cluster_time)
183+
global_args.dup.tap do |args|
184+
args.delete(:lsid)
185+
args.merge!(Mongo::Operation::CLUSTER_TIME => authorized_client.cluster.cluster_time)
186+
end
181187
end
182188

183189
it 'does not send a session id in the command' do
@@ -190,8 +196,9 @@
190196
context 'when the topology is standalone', if: standalone? && sessions_enabled? do
191197

192198
let(:expected_global_args) do
193-
global_args.delete(:lsid)
194-
global_args
199+
global_args.dup.tap do |args|
200+
args.delete(:lsid)
201+
end
195202
end
196203

197204
it 'creates the correct OP_MSG message' do
@@ -209,8 +216,10 @@
209216
end
210217

211218
let(:expected_global_args) do
212-
global_args.delete(:lsid)
213-
global_args.merge!(Mongo::Operation::CLUSTER_TIME => authorized_client.cluster.cluster_time)
219+
global_args.dup.tap do |args|
220+
args.delete(:lsid)
221+
args.merge!(Mongo::Operation::CLUSTER_TIME => authorized_client.cluster.cluster_time)
222+
end
214223
end
215224

216225
it 'does not send a session id in the command' do

spec/mongo/operation/insert/op_msg_spec.rb

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -152,17 +152,21 @@
152152
context 'when an implicit session is created and the topology is then updated and the server does not support sessions' do
153153

154154
let(:expected_global_args) do
155-
global_args.delete(:lsid)
156-
global_args
155+
global_args.dup.tap do |args|
156+
args.delete(:lsid)
157+
end
157158
end
158159

159160
before do
160161
session.instance_variable_set(:@options, { implicit: true })
161-
allow(authorized_primary.features).to receive(:sessions_enabled?).and_return(false)
162+
# Topology is standalone, hence there is exactly one server
163+
authorized_client.cluster.servers.first.monitor.stop!(true)
164+
expect(authorized_primary.features).to receive(:sessions_enabled?).at_least(:once).and_return(false)
162165
end
163166

164167
it 'creates the correct OP_MSG message' do
165168
authorized_client.command(ping:1)
169+
expect(expected_global_args).not_to have_key(:lsid)
166170
expect(Mongo::Protocol::Msg).to receive(:new).with([:none],
167171
{ validating_keys: true },
168172
expected_global_args,
@@ -188,8 +192,10 @@
188192
context 'when the topology is replica set or sharded', if: test_sessions? do
189193

190194
let(:expected_global_args) do
191-
global_args.delete(:lsid)
192-
global_args.merge(Mongo::Operation::CLUSTER_TIME => authorized_client.cluster.cluster_time)
195+
global_args.dup.tap do |args|
196+
args.delete(:lsid)
197+
args.merge!(Mongo::Operation::CLUSTER_TIME => authorized_client.cluster.cluster_time)
198+
end
193199
end
194200

195201
it 'does not send a session id in the command' do
@@ -205,8 +211,9 @@
205211
context 'when the topology is standalone', if: standalone? && sessions_enabled? do
206212

207213
let(:expected_global_args) do
208-
global_args.delete(:lsid)
209-
global_args
214+
global_args.dup.tap do |args|
215+
args.delete(:lsid)
216+
end
210217
end
211218

212219
it 'creates the correct OP_MSG message' do
@@ -227,8 +234,10 @@
227234
end
228235

229236
let(:expected_global_args) do
230-
global_args.delete(:lsid)
231-
global_args.merge(Mongo::Operation::CLUSTER_TIME => authorized_client.cluster.cluster_time)
237+
global_args.dup.tap do |args|
238+
args.delete(:lsid)
239+
args.merge!(Mongo::Operation::CLUSTER_TIME => authorized_client.cluster.cluster_time)
240+
end
232241
end
233242

234243
it 'does not send a session id in the command' do

spec/mongo/operation/update/op_msg_spec.rb

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -148,12 +148,15 @@
148148
context 'when an implicit session is created and the topology is then updated and the server does not support sessions' do
149149

150150
let(:expected_global_args) do
151-
global_args.delete(:lsid)
152-
global_args
151+
global_args.dup.tap do |args|
152+
args.delete(:lsid)
153+
end
153154
end
154155

155156
before do
156157
session.instance_variable_set(:@options, { implicit: true })
158+
# Topology is standalone, hence there is exactly one server
159+
authorized_client.cluster.servers.first.monitor.stop!(true)
157160
allow(authorized_primary.features).to receive(:sessions_enabled?).and_return(false)
158161
end
159162

@@ -181,8 +184,10 @@
181184
context 'when the topology is replica set or sharded', if: test_sessions? do
182185

183186
let(:expected_global_args) do
184-
global_args.delete(:lsid)
185-
global_args.merge(Mongo::Operation::CLUSTER_TIME => authorized_client.cluster.cluster_time)
187+
global_args.dup.tap do |args|
188+
args.delete(:lsid)
189+
args.merge!(Mongo::Operation::CLUSTER_TIME => authorized_client.cluster.cluster_time)
190+
end
186191
end
187192

188193
it 'does not send a session id in the command' do
@@ -195,8 +200,9 @@
195200
context 'when the topology is standalone', if: standalone? && sessions_enabled? do
196201

197202
let(:expected_global_args) do
198-
global_args.delete(:lsid)
199-
global_args
203+
global_args.dup.tap do |args|
204+
args.delete(:lsid)
205+
end
200206
end
201207

202208
it 'creates the correct OP_MSG message' do
@@ -214,8 +220,10 @@
214220
end
215221

216222
let(:expected_global_args) do
217-
global_args.delete(:lsid)
218-
global_args.merge(Mongo::Operation::CLUSTER_TIME => authorized_client.cluster.cluster_time)
223+
global_args.dup.tap do |args|
224+
args.delete(:lsid)
225+
args.merge!(Mongo::Operation::CLUSTER_TIME => authorized_client.cluster.cluster_time)
226+
end
219227
end
220228

221229
it 'does not send a session id in the command' do

0 commit comments

Comments
 (0)