@@ -254,9 +254,9 @@ def process(conf, event_count)
254254
255255 before { subject . register }
256256
257- def run_it_thread ( inst )
258- Thread . new ( inst ) do |subj |
259- subj . run ( queue )
257+ def run_it_thread ( plugin )
258+ Thread . new ( plugin ) do |subject |
259+ subject . run ( queue )
260260 end
261261 end
262262
@@ -269,16 +269,29 @@ def publish_thread(new_redis, prefix)
269269 end
270270 end
271271
272- def close_thread ( inst , rt )
273- Thread . new ( inst , rt ) do |subj , runner |
272+ def close_thread ( plugin , runner_thread )
273+ Thread . new ( plugin , runner_thread ) do |subject , runner |
274274 # block for the messages
275275 e1 = queue . pop
276276 e2 = queue . pop
277277 # put em back for the tests
278278 queue . push ( e1 )
279279 queue . push ( e2 )
280280 runner . raise ( LogStash ::ShutdownSignal )
281- subj . close
281+ subject . close
282+ end
283+ end
284+
285+ def stub_plugin_timeout ( timeout )
286+ value = LogStash ::Inputs ::Redis ::TIMEOUT
287+ begin
288+ LogStash ::Inputs ::Redis . send :remove_const , :TIMEOUT
289+ LogStash ::Inputs ::Redis . const_set :TIMEOUT , timeout
290+
291+ yield
292+ ensure
293+ LogStash ::Inputs ::Redis . send :remove_const , :TIMEOUT rescue nil
294+ LogStash ::Inputs ::Redis . const_set :TIMEOUT , value
282295 end
283296 end
284297
@@ -315,6 +328,23 @@ def close_thread(inst, rt)
315328
316329 expect ( queue . size ) . to eq ( 2 )
317330 end
331+
332+ it 'calling the run method, adds events to the queue (after timeout)' do
333+ stub_plugin_timeout ( 0.5 ) do
334+ #simulate the input thread
335+ rt = run_it_thread ( subject )
336+ [ :warn , :error ] . each { |level | expect ( subject . logger ) . not_to receive ( level ) }
337+ #make sure the Redis call times out and gets retried
338+ sleep ( LogStash ::Inputs ::Redis ::TIMEOUT * 4 )
339+ #simulate the other system thread
340+ publish_thread ( subject . send ( :new_redis_instance ) , 'c' ) . join
341+ #simulate the pipeline thread
342+ close_thread ( subject , rt ) . join
343+
344+ expect ( queue . size ) . to eq ( 2 )
345+ end
346+ end
347+
318348 it 'events had redis_channel' do
319349 #simulate the input thread
320350 rt = run_it_thread ( subject )
@@ -354,6 +384,22 @@ def close_thread(inst, rt)
354384 expect ( queue . size ) . to eq ( 2 )
355385 end
356386
387+ it 'calling the run method, adds events to the queue (after timeout)' do
388+ stub_plugin_timeout ( 0.5 ) do
389+ #simulate the input thread
390+ rt = run_it_thread ( subject )
391+ [ :warn , :error ] . each { |level | expect ( subject . logger ) . not_to receive ( level ) }
392+ #make sure the Redis call times out and gets retried
393+ sleep ( LogStash ::Inputs ::Redis ::TIMEOUT * 4 )
394+ #simulate the other system thread
395+ publish_thread ( subject . send ( :new_redis_instance ) , 'c' ) . join
396+ #simulate the pipeline thread
397+ close_thread ( subject , rt ) . join
398+
399+ expect ( queue . size ) . to eq ( 2 )
400+ end
401+ end
402+
357403 it 'events had redis_channel' do
358404 #simulate the input thread
359405 rt = run_it_thread ( subject )
0 commit comments