Skip to content

Commit 464b1a6

Browse files
committed
Merge pull request #5 from mighe/master
Event potential concurrency issues
2 parents fffd747 + 892bf83 commit 464b1a6

File tree

5 files changed

+72
-31
lines changed

5 files changed

+72
-31
lines changed

lib/concurrent.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
require 'concurrent/cached_thread_pool'
2323
require 'concurrent/fixed_thread_pool'
24+
require 'concurrent/immediate_executor'
2425

2526
require 'concurrent/event_machine_defer_proxy' if defined?(EventMachine)
2627

lib/concurrent/event.rb

Lines changed: 14 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -20,40 +20,42 @@ class Event
2020
def initialize
2121
@set = false
2222
@mutex = Mutex.new
23-
@waiters = []
23+
@condition = ConditionVariable.new
2424
end
2525

2626
# Is the object in the set state?
2727
#
2828
# @return [Boolean] indicating whether or not the `Event` has been set
2929
def set?
30-
return @set == true
30+
@mutex.synchronize do
31+
@set
32+
end
3133
end
3234

3335
# Trigger the event, setting the state to `set` and releasing all threads
3436
# waiting on the event. Has no effect if the `Event` has already been set.
3537
#
3638
# @return [Boolean] should always return `true`
3739
def set
38-
return true if set?
3940
@mutex.synchronize do
41+
return true if @set
4042
@set = true
41-
@waiters.each {|waiter| waiter.run if waiter.status == 'sleep'}
43+
@condition.broadcast
4244
end
43-
return true
45+
46+
true
4447
end
4548

4649
# Reset a previously set event back to the `unset` state.
4750
# Has no effect if the `Event` has not yet been set.
4851
#
4952
# @return [Boolean] should always return `true`
5053
def reset
51-
return true unless set?
5254
@mutex.synchronize do
5355
@set = false
54-
@waiters.clear # just in case there's garbage
5556
end
56-
return true
57+
58+
true
5759
end
5860

5961
# Wait a given number of seconds for the `Event` to be set by another
@@ -62,21 +64,11 @@ def reset
6264
#
6365
# @return [Boolean] true if the `Event` was set before timeout else false
6466
def wait(timeout = nil)
65-
return true if set?
66-
67-
@mutex.synchronize { @waiters << Thread.current }
68-
return true if set? # if event was set while waiting for mutex
69-
70-
if timeout.nil?
71-
slept = sleep
72-
else
73-
slept = sleep(timeout)
67+
@mutex.synchronize do
68+
return true if @set
69+
@condition.wait(@mutex, timeout)
70+
@set
7471
end
75-
rescue
76-
# let it fail
77-
ensure
78-
@mutex.synchronize { @waiters.delete(Thread.current) }
79-
return set?
8072
end
8173
end
8274
end

lib/concurrent/immediate_executor.rb

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
module Concurrent
2+
class ImmediateExecutor
3+
4+
def post(*args, &block)
5+
block.call(*args)
6+
end
7+
8+
def <<(block)
9+
post(&block)
10+
self
11+
end
12+
13+
end
14+
end

spec/concurrent/future_spec.rb

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -58,36 +58,37 @@ module Concurrent
5858

5959
context 'fulfillment' do
6060

61+
before(:each) do
62+
Future.thread_pool = ImmediateExecutor.new
63+
end
64+
6165
it 'passes all arguments to handler' do
62-
@a = @b = @c = nil
63-
f = Future.new(1, 2, 3) do |a, b, c|
64-
@a, @b, @c = a, b, c
66+
result = nil
67+
68+
Future.new(1, 2, 3) do |a, b, c|
69+
result = [a, b, c]
6570
end
66-
sleep(0.1)
67-
[@a, @b, @c].should eq [1, 2, 3]
71+
72+
result.should eq [1, 2, 3]
6873
end
6974

7075
it 'sets the value to the result of the handler' do
7176
f = Future.new(10){|a| a * 2 }
72-
sleep(0.1)
7377
f.value.should eq 20
7478
end
7579

7680
it 'sets the state to :fulfilled when the block completes' do
7781
f = Future.new(10){|a| a * 2 }
78-
sleep(0.1)
7982
f.should be_fulfilled
8083
end
8184

8285
it 'sets the value to nil when the handler raises an exception' do
8386
f = Future.new{ raise StandardError }
84-
sleep(0.1)
8587
f.value.should be_nil
8688
end
8789

8890
it 'sets the state to :rejected when the handler raises an exception' do
8991
f = Future.new{ raise StandardError }
90-
sleep(0.1)
9192
f.should be_rejected
9293
end
9394

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
require 'spec_helper'
2+
3+
module Concurrent
4+
5+
describe ImmediateExecutor do
6+
7+
let(:executor) { ImmediateExecutor.new }
8+
9+
context "#post" do
10+
it 'executes the block using the arguments as parameters' do
11+
result = executor.post(1, 2, 3, 4) { |a, b, c, d| [a, b, c, d] }
12+
result.should eq [1, 2, 3, 4]
13+
end
14+
end
15+
16+
context "#<<" do
17+
18+
it "returns true" do
19+
result = executor << proc { false }
20+
result.should be_true
21+
end
22+
23+
it "executes the passed callable" do
24+
x = 0
25+
26+
executor << proc { x = 5 }
27+
28+
x.should eq 5
29+
end
30+
31+
end
32+
end
33+
end

0 commit comments

Comments
 (0)