From 894ac23b42a4a0ae8eddefb3510c605c3b5ad97c Mon Sep 17 00:00:00 2001 From: Rob Day Date: Thu, 1 Jan 2015 11:22:29 +0000 Subject: [PATCH 1/5] Wake up the process_timers loop on receiving a new timer --- lib/concurrent/executor/timer_set.rb | 1 + 1 file changed, 1 insertion(+) diff --git a/lib/concurrent/executor/timer_set.rb b/lib/concurrent/executor/timer_set.rb index e93e703b5..de35f22e7 100644 --- a/lib/concurrent/executor/timer_set.rb +++ b/lib/concurrent/executor/timer_set.rb @@ -58,6 +58,7 @@ def post(intended_time, *args, &task) true end + @condition.signal end From 595343378cb576825241098478cad1dcd1451113 Mon Sep 17 00:00:00 2001 From: Rob Day Date: Thu, 1 Jan 2015 11:36:34 +0000 Subject: [PATCH 2/5] Add test for fixed behaviour --- spec/concurrent/executor/timer_set_spec.rb | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/spec/concurrent/executor/timer_set_spec.rb b/spec/concurrent/executor/timer_set_spec.rb index 3458f093e..2ea64578c 100644 --- a/spec/concurrent/executor/timer_set_spec.rb +++ b/spec/concurrent/executor/timer_set_spec.rb @@ -35,6 +35,14 @@ module Concurrent expect(latch.wait(0.2)).to be_truthy end + it 'executes a given task when given an interval in seconds, even if longer tasks have been scheduled' do + latch = CountDownLatch.new(1) + subject.post(0.5){ nil } + sleep 0.1 + subject.post(0.1){ latch.count_down } + expect(latch.wait(0.2)).to be_truthy + end + it 'passes all arguments to the task on execution' do expected = nil latch = CountDownLatch.new(1) From af591e09f15f3eab8ce0e7191a5e6f73661c394d Mon Sep 17 00:00:00 2001 From: Rob Day Date: Thu, 1 Jan 2015 11:37:03 +0000 Subject: [PATCH 3/5] Fix and comment possible race condition --- lib/concurrent/executor/timer_set.rb | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/lib/concurrent/executor/timer_set.rb b/lib/concurrent/executor/timer_set.rb index de35f22e7..ddb20eb34 100644 --- a/lib/concurrent/executor/timer_set.rb +++ b/lib/concurrent/executor/timer_set.rb @@ -130,8 +130,20 @@ def process_tasks interval = task.time - Time.now.to_f if interval <= 0 + # We need to remove the task from the queue before passing + # it to the executor, to avoid race conditions where we pass + # the peek'ed task to the executor and then pop a different + # one that's been added in the meantime. + # + # Note that there's no race condition between the peek and + # this pop - this pop could retrieve a different task from + # the peek, but that task would be due to fire now anyway + # (because @queue is a priority queue, and this thread is + # the only reader, so whatever timer is at the head of the + # queue now must have the same pop time, or a closer one, as + # when we peeked). + task = mutex.synchronize { @queue.pop } @task_executor.post(*task.args, &task.op) - mutex.synchronize { @queue.pop } else mutex.synchronize do @condition.wait(mutex, [interval, 60].min) From 5569f63e44a40e6a20861142a7d858700e411991 Mon Sep 17 00:00:00 2001 From: Rob Day Date: Thu, 1 Jan 2015 11:46:55 +0000 Subject: [PATCH 4/5] Preserve the 'return true'-ness of TimerSet#post --- lib/concurrent/executor/timer_set.rb | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/lib/concurrent/executor/timer_set.rb b/lib/concurrent/executor/timer_set.rb index ddb20eb34..d59d7cbe4 100644 --- a/lib/concurrent/executor/timer_set.rb +++ b/lib/concurrent/executor/timer_set.rb @@ -55,11 +55,10 @@ def post(intended_time, *args, &task) @queue.push(Task.new(time, args, task)) @timer_executor.post(&method(:process_tasks)) end - - true end - @condition.signal + @condition.signal + true end # For a timer, #kill is like an orderly shutdown, except we need to manually From 960af94401b5facd42523e252d289b5480580470 Mon Sep 17 00:00:00 2001 From: Rob Day Date: Thu, 1 Jan 2015 11:55:40 +0000 Subject: [PATCH 5/5] Add test to verify that TimerSet#post returns true --- spec/concurrent/executor/timer_set_spec.rb | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/spec/concurrent/executor/timer_set_spec.rb b/spec/concurrent/executor/timer_set_spec.rb index 2ea64578c..cf6adbd9b 100644 --- a/spec/concurrent/executor/timer_set_spec.rb +++ b/spec/concurrent/executor/timer_set_spec.rb @@ -35,6 +35,10 @@ module Concurrent expect(latch.wait(0.2)).to be_truthy end + it 'returns true when posting a task' do + expect(subject.post(0.1) { nil }).to be true + end + it 'executes a given task when given an interval in seconds, even if longer tasks have been scheduled' do latch = CountDownLatch.new(1) subject.post(0.5){ nil }