@@ -17,11 +17,15 @@ def populate(key, event_count)
1717end
1818
1919def process ( conf , event_count )
20- events = input ( conf ) do |pipeline , queue |
21- event_count . times . map { queue . pop }
20+ events = input ( conf ) do |_ , queue |
21+ sleep 0.1 until queue . size >= event_count
22+ queue . size . times . map { queue . pop }
2223 end
23-
24- expect ( events . map { |evt | evt . get ( "sequence" ) } ) . to eq ( ( 0 ..event_count . pred ) . to_a )
24+ # due multiple workers we get events out-of-order in the output
25+ events . sort! { |a , b | a . get ( 'sequence' ) <=> b . get ( 'sequence' ) }
26+ expect ( events [ 0 ] . get ( 'sequence' ) ) . to eq ( 0 )
27+ expect ( events [ 100 ] . get ( 'sequence' ) ) . to eq ( 100 )
28+ expect ( events [ 1000 ] . get ( 'sequence' ) ) . to eq ( 1000 )
2529end
2630
2731# integration tests ---------------------
@@ -31,7 +35,6 @@ def process(conf, event_count)
3135 it "should read events from a list" do
3236 key = SecureRandom . hex
3337 event_count = 1000 + rand ( 50 )
34- # event_count = 100
3538 conf = <<-CONFIG
3639 input {
3740 redis {
@@ -249,6 +252,9 @@ def process(conf, event_count)
249252 end
250253
251254 context 'for the subscribe data_types' do
255+
256+ before { subject . register }
257+
252258 def run_it_thread ( inst )
253259 Thread . new ( inst ) do |subj |
254260 subj . run ( queue )
@@ -289,6 +295,8 @@ def close_thread(inst, rt)
289295 let ( :data_type ) { 'channel' }
290296 let ( :quit_calls ) { [ :unsubscribe , :connection ] }
291297
298+ before { subject . register }
299+
292300 context 'mocked redis' do
293301 it 'multiple stop calls, calls to redis once' , type : :mocked do
294302 subject . do_stop
0 commit comments