Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 15 additions & 3 deletions lib/concurrent/executor/timer_set.rb
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,10 @@ def post(intended_time, *args, &task)
@queue.push(Task.new(time, args, task))
@timer_executor.post(&method(:process_tasks))
end

true
end

@condition.signal
true
end

# For a timer, #kill is like an orderly shutdown, except we need to manually
Expand Down Expand Up @@ -129,8 +129,20 @@ def process_tasks
interval = task.time - Time.now.to_f

if interval <= 0
# We need to remove the task from the queue before passing
# it to the executor, to avoid race conditions where we pass
# the peek'ed task to the executor and then pop a different
# one that's been added in the meantime.
#
# Note that there's no race condition between the peek and
# this pop - this pop could retrieve a different task from
# the peek, but that task would be due to fire now anyway
# (because @queue is a priority queue, and this thread is
# the only reader, so whatever timer is at the head of the
# queue now must have the same pop time, or a closer one, as
# when we peeked).
task = mutex.synchronize { @queue.pop }
@task_executor.post(*task.args, &task.op)
mutex.synchronize { @queue.pop }
else
mutex.synchronize do
@condition.wait(mutex, [interval, 60].min)
Expand Down
12 changes: 12 additions & 0 deletions spec/concurrent/executor/timer_set_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,18 @@ module Concurrent
expect(latch.wait(0.2)).to be_truthy
end

it 'returns true when posting a task' do
expect(subject.post(0.1) { nil }).to be true
end

it 'executes a given task when given an interval in seconds, even if longer tasks have been scheduled' do
latch = CountDownLatch.new(1)
subject.post(0.5){ nil }
sleep 0.1
subject.post(0.1){ latch.count_down }
expect(latch.wait(0.2)).to be_truthy
end

it 'passes all arguments to the task on execution' do
expected = nil
latch = CountDownLatch.new(1)
Expand Down