Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 55 additions & 30 deletions spec/concurrent/executor/thread_pool_executor_shared.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

after(:each) do
subject.kill
sleep(0.1)
subject.wait_for_termination(0.1)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this change actually brings most of the benefit.

end

context '#initialize defaults' do
Expand Down Expand Up @@ -92,14 +92,14 @@
end

it 'returns the set value when running' do
5.times{ subject.post{ sleep(0.1) } }
sleep(0.1)
trigger = Concurrent::Event.new
5.times{ subject.post{ trigger.wait } }
expect(subject.max_queue).to eq expected_max
trigger.set
end

it 'returns the set value after stopping' do
5.times{ subject.post{ sleep(0.1) } }
sleep(0.1)
5.times{ subject.post{ nil } }
subject.shutdown
subject.wait_for_termination(1)
expect(subject.max_queue).to eq expected_max
Expand All @@ -123,29 +123,33 @@
end

it 'returns zero when there are no enqueued tasks' do
5.times{ subject.post{ nil } }
sleep(0.1)
latch = Concurrent::CountDownLatch.new(5)
5.times{ subject.post{ latch.count_down } }
latch.wait(0.1)
expect(subject.queue_length).to eq 0
end

it 'returns the size of the queue when tasks are enqueued' do
100.times{ subject.post{ sleep(0.5) } }
sleep(0.1)
trigger = Concurrent::Event.new
20.times{ subject.post{ trigger.wait } }
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've cut the "100.times" to "20.times" in a lot of places, but I don't think it actually makes a difference. On the other hand, 100 times is probably overkill now that these are more deterministic.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

expect(subject.queue_length).to be > 0
trigger.set
end

it 'returns zero when stopped' do
100.times{ subject.post{ sleep(0.5) } }
sleep(0.1)
trigger = Concurrent::Event.new
20.times{ subject.post{ trigger.wait } }
subject.shutdown
trigger.set
subject.wait_for_termination(1)
expect(subject.queue_length).to eq 0
end

it 'can never be greater than :max_queue' do
100.times{ subject.post{ sleep(0.5) } }
sleep(0.1)
trigger = Concurrent::Event.new
20.times{ subject.post{ trigger.wait } }
expect(subject.queue_length).to be <= expected_max
trigger.set
end
end

Expand All @@ -165,7 +169,6 @@

it 'returns :max_length when stopped' do
100.times{ subject.post{ nil } }
sleep(0.1)
subject.shutdown
subject.wait_for_termination(1)
expect(subject.remaining_capacity).to eq expected_max
Expand All @@ -192,27 +195,35 @@
end

specify '#post raises an error when the queue is at capacity' do
trigger = Concurrent::Event.new
expect {
100.times{ subject.post{ sleep(1) } }
20.times{ subject.post{ trigger.wait } }
}.to raise_error(Concurrent::RejectedExecutionError)
trigger.set
end

specify '#<< raises an error when the queue is at capacity' do
trigger = Concurrent::Event.new
expect {
100.times{ subject << proc { sleep(1) } }
20.times{ subject << proc { trigger.wait } }
}.to raise_error(Concurrent::RejectedExecutionError)
trigger.set
end

specify '#post raises an error when the executor is shutting down' do
trigger = Concurrent::Event.new
expect {
subject.shutdown; subject.post{ sleep(1) }
subject.shutdown; subject.post{ trigger.wait }
}.to raise_error(Concurrent::RejectedExecutionError)
trigger.set
end

specify '#<< raises an error when the executor is shutting down' do
trigger = Concurrent::Event.new
expect {
subject.shutdown; subject << proc { sleep(1) }
subject.shutdown; subject << proc { trigger.wait }
}.to raise_error(Concurrent::RejectedExecutionError)
trigger.set
end

specify 'a #post task is never executed when the queue is at capacity' do
Expand All @@ -239,8 +250,8 @@
subject.post{ all_tasks_posted.wait; initial_executed.increment; }
end

# Inject 100 more tasks, which should throw an exception
100.times do
# Inject 20 more tasks, which should throw an exception
20.times do
expect {
subject.post { subsequent_executed.increment; }
}.to raise_error(Concurrent::RejectedExecutionError)
Expand Down Expand Up @@ -286,8 +297,8 @@
subject << proc { all_tasks_posted.wait; initial_executed.increment; }
end

# Inject 100 more tasks, which should throw an exeption
100.times do
# Inject 20 more tasks, which should throw an exeption
20.times do
expect {
subject << proc { subsequent_executed.increment; }
}.to raise_error(Concurrent::RejectedExecutionError)
Expand Down Expand Up @@ -346,8 +357,8 @@
subject.post{ all_tasks_posted.wait; initial_executed.increment; }
end

# Inject 100 more tasks, which should be dropped without an exception
100.times do
# Inject 20 more tasks, which should be dropped without an exception
20.times do
subject.post{ subsequent_executed.increment; }
end

Expand Down Expand Up @@ -391,8 +402,8 @@
subject << proc { all_tasks_posted.wait; initial_executed.increment; }
end

# Inject 100 more tasks, which should be dropped without an exception
100.times do
# Inject 20 more tasks, which should be dropped without an exception
20.times do
subject << proc { subsequent_executed.increment; }
end

Expand Down Expand Up @@ -456,23 +467,37 @@
end

specify '#post does not create any new threads when the queue is at capacity' do
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I rewrote this test a bit to get by without the sleep, but I'm not really convinced by it. I wonder if this might actually be a good place to use mocks - directly verifying how many times Thread.new is called?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm comfortable with us starting to use more test doubles now. In the beginning there is no way I would have felt comfortable with this library had I used a lot of mocks. Given how much we've grown, I'm a lot more comfortable with our ability to mock appropriately and still validate the real concurrent behavior.

trigger = Concurrent::Event.new
initial = Thread.list.length
5.times{ subject.post{ sleep(0.1) } }
expect(Thread.list.length).to be < initial + 5

# Post several tasks to the executor. Has to be a new thread,
# because it will start blocking once the queue fills up.
Thread.new do
5.times{ subject.post{ trigger.wait } }
end

expect(Thread.list.length).to be < initial + 1 + 5

# Let the executor tasks complete.
trigger.set
end

specify '#<< executes the task on the current thread when the queue is at capacity' do
trigger = Concurrent::Event.new
latch = Concurrent::CountDownLatch.new(5)
subject.post{ sleep(1) }
subject.post{ trigger.wait }
5.times{|i| subject << proc { latch.count_down } }
latch.wait(0.1)
trigger.set
end

specify '#post executes the task on the current thread when the queue is at capacity' do
trigger = Concurrent::Event.new
latch = Concurrent::CountDownLatch.new(5)
subject.post{ sleep(1) }
subject.post{ trigger.wait }
5.times{|i| subject.post{ latch.count_down } }
latch.wait(0.1)
trigger.set
end

specify '#post executes the task on the current thread when the executor is shutting down' do
Expand Down