@@ -55,10 +55,10 @@ def post(intended_time, *args, &task)
55
55
@queue . push ( Task . new ( time , args , task ) )
56
56
@timer_executor . post ( &method ( :process_tasks ) )
57
57
end
58
-
59
- true
60
58
end
61
59
60
+ @condition . signal
61
+ true
62
62
end
63
63
64
64
# For a timer, #kill is like an orderly shutdown, except we need to manually
@@ -129,8 +129,20 @@ def process_tasks
129
129
interval = task . time - Time . now . to_f
130
130
131
131
if interval <= 0
132
+ # We need to remove the task from the queue before passing
133
+ # it to the executor, to avoid race conditions where we pass
134
+ # the peek'ed task to the executor and then pop a different
135
+ # one that's been added in the meantime.
136
+ #
137
+ # Note that there's no race condition between the peek and
138
+ # this pop - this pop could retrieve a different task from
139
+ # the peek, but that task would be due to fire now anyway
140
+ # (because @queue is a priority queue, and this thread is
141
+ # the only reader, so whatever timer is at the head of the
142
+ # queue now must have the same pop time, or a closer one, as
143
+ # when we peeked).
144
+ task = mutex . synchronize { @queue . pop }
132
145
@task_executor . post ( *task . args , &task . op )
133
- mutex . synchronize { @queue . pop }
134
146
else
135
147
mutex . synchronize do
136
148
@condition . wait ( mutex , [ interval , 60 ] . min )
0 commit comments