Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
226 changes: 226 additions & 0 deletions lib/concurrent/atomic/semaphore.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,226 @@
require 'concurrent/atomic/condition'

module Concurrent
class MutexSemaphore
# @!macro [attach] semaphore_method_initialize
#
# Create a new `Semaphore` with the initial `count`.
#
# @param [Fixnum] count the initial count
#
# @raise [ArgumentError] if `count` is not an integer or is less than zero
def initialize(count)
unless count.is_a?(Fixnum) && count >= 0
fail ArgumentError, 'count must be an non-negative integer'
end
@mutex = Mutex.new
@condition = Condition.new
@free = count
end

# @!macro [attach] semaphore_method_acquire
#
# Acquires the given number of permits from this semaphore,
# blocking until all are available.
#
# @param [Fixnum] permits Number of permits to acquire
#
# @raise [ArgumentError] if `permits` is not an integer or is less than
# one
#
# @return [Nil]
def acquire(permits = 1)
unless permits.is_a?(Fixnum) && permits > 0
fail ArgumentError, 'permits must be an integer greater than zero'
end
@mutex.synchronize do
try_acquire_timed(permits, nil)
nil
end
end

# @!macro [attach] semaphore_method_available_permits
#
# Returns the current number of permits available in this semaphore.
#
# @return [Integer]
def available_permits
@mutex.synchronize { @free }
end

# @!macro [attach] semaphore_method_drain_permits
#
# Acquires and returns all permits that are immediately available.
#
# @return [Integer]
def drain_permits
@mutex.synchronize do
@free.tap { |_| @free = 0 }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't this line be simply @free = 0?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to make it consistent with the Java api and it should return the number of drained permits.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe that @free = 0 will return 0.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, you're right 👍

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Possible minor optimization removing the block:

@mutex.synchronize { old, @free = @free, 0 }
return old

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pitr-ch I believe that in your example old needs to be declared outside the block else its scope will only be within the block:

def drain_permits
   old = 0
   @mutex.synchronize { old, @free = @free, 0 }
   old
end

@adamruzicka If you have a free moment over the next few days, can you please test these variations? I know it may seem somewhat pedantic, but we've learned that these minor variations can often hide troubling concurrency bugs.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jdantonio yep, you are right, I've overlooked it. Functional alternative:

def drain_permits
   old, _ = @mutex.synchronize { old, @free = @free, 0 }
   old
end

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did some testing of the variations[1], the tap one seems to perform slightly better than the others. Is there anything wrong with using tap?

[1] http://pastie.org/9779681
edit: I never did performance testing before so please let me know if this doesn't actually prove anything

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@adamruzicka So long as everything happens within the mutex, tap is fine. I'm surprised that tap is the fastest variation, but the numbers don't lie. Thank you for humoring us and looking into this!

Timing with a simple clock the way you did is perfectly fine, but you may want to take a look at Ruby's Benchmark class. It provide's more information. You can see how it's used in the couple of benchmark scripts in the examples folder.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did some more testing, see https://gist.github.com/pitr-ch/43fd7c52f01e0cc56cea The tap is slower in the end but only a little. Big surprise for me, thanks @adamruzicka for bringing it up.

end
end

# @!macro [attach] semaphore_method_try_acquire
#
# Acquires the given number of permits from this semaphore,
# only if all are available at the time of invocation or within
# `timeout` interval
#
# @param [Fixnum] permits the number of permits to acquire
#
# @param [Fixnum] timeout the number of seconds to wait for the counter
# or `nil` to return immediately
#
# @raise [ArgumentError] if `permits` is not an integer or is less than
# one
#
# @return [Boolean] `false` if no permits are available, `true` when
# acquired a permit
def try_acquire(permits = 1, timeout = nil)
unless permits.is_a?(Fixnum) && permits > 0
fail ArgumentError, 'permits must be an integer greater than zero'
end
@mutex.synchronize do
if timeout.nil?
try_acquire_now(permits)
else
try_acquire_timed(permits, timeout)
end
end
end

# @!macro [attach] semaphore_method_release
#
# Releases the given number of permits, returning them to the semaphore.
#
# @param [Fixnum] permits Number of permits to return to the semaphore.
#
# @raise [ArgumentError] if `permits` is not a number or is less than one
#
# @return [Nil]
def release(permits = 1)
unless permits.is_a?(Fixnum) && permits > 0
fail ArgumentError, 'permits must be an integer greater than zero'
end
@mutex.synchronize do
@free += permits
permits.times { @condition.signal }
end
nil
end

# @!macro [attach] semaphore_method_reduce_permits
#
# @api private
#
# Shrinks the number of available permits by the indicated reduction.
#
# @param [Fixnum] reduction Number of permits to remove.
#
# @raise [ArgumentError] if `reduction` is not an integer or is negative
#
# @raise [ArgumentError] if `@free` - `@reduction` is less than zero
#
# @return [Nil]
def reduce_permits(reduction)
unless reduction.is_a?(Fixnum) && reduction >= 0
fail ArgumentError, 'reduction must be an non-negative integer'
end
@mutex.synchronize { @free -= reduction }
nil
end

private

def try_acquire_now(permits)
if @free >= permits
@free -= permits
true
else
false
end
end

def try_acquire_timed(permits, timeout)
remaining = Condition::Result.new(timeout)
while !try_acquire_now(permits) && remaining.can_wait?
@condition.signal
remaining = @condition.wait(@mutex, remaining.remaining_time)
end
remaining.can_wait? ? true : false
end
end

if RUBY_PLATFORM == 'java'

# @!macro semaphore
class JavaSemaphore
# @!macro semaphore_method_initialize
def initialize(count)
unless count.is_a?(Fixnum) && count >= 0
fail(ArgumentError,
'count must be in integer greater than or equal zero')
end
@semaphore = java.util.concurrent.Semaphore.new(count)
end

# @!macro semaphore_method_acquire
def acquire(permits = 1)
unless permits.is_a?(Fixnum) && permits > 0
fail ArgumentError, 'permits must be an integer greater than zero'
end
@semaphore.acquire(permits)
end

# @!macro semaphore_method_available_permits
def available_permits
@semaphore.availablePermits
end

# @!macro semaphore_method_drain_permits
def drain_permits
@semaphore.drainPermits
end

# @!macro semaphore_method_try_acquire
def try_acquire(permits = 1, timeout = nil)
unless permits.is_a?(Fixnum) && permits > 0
fail ArgumentError, 'permits must be an integer greater than zero'
end
if timeout.nil?
@semaphore.tryAcquire(permits)
else
@semaphore.tryAcquire(permits,
timeout,
java.util.concurrent.TimeUnit::SECONDS)
end
end

# @!macro semaphore_method_release
def release(permits = 1)
unless permits.is_a?(Fixnum) && permits > 0
fail ArgumentError, 'permits must be an integer greater than zero'
end
@semaphore.release(permits)
true
end

# @!macro semaphore_method_reduce_permits
def reduce_permits(reduction)
unless reduction.is_a?(Fixnum) && reduction >= 0
fail ArgumentError, 'reduction must be an non-negative integer'
end
@semaphore.reducePermits(reduction)
end
end

# @!macro semaphore
class Semaphore < JavaSemaphore
end

else

# @!macro semaphore
class Semaphore < MutexSemaphore
end
end
end
1 change: 1 addition & 0 deletions lib/concurrent/atomics.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,4 @@
require 'concurrent/atomic/count_down_latch'
require 'concurrent/atomic/event'
require 'concurrent/atomic/synchronization'
require 'concurrent/atomic/semaphore'
166 changes: 166 additions & 0 deletions spec/concurrent/atomic/semaphore_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
require 'spec_helper'

shared_examples :semaphore do
let(:semaphore) { described_class.new(3) }

context '#initialize' do
it 'raises an exception if the initial count is not an integer' do
expect {
described_class.new('foo')
}.to raise_error(ArgumentError)
end
end

describe '#acquire' do
context 'permits available' do
it 'should return true immediately' do
result = semaphore.acquire
expect(result).to be_nil
end
end

context 'not enough permits available' do
it 'should block thread until permits are available' do
semaphore.drain_permits
Thread.new { sleep(0.2) && semaphore.release }

result = semaphore.acquire
expect(result).to be_nil
expect(semaphore.available_permits).to eq 0
end
end
end

describe '#drain_permits' do
it 'drains all available permits' do
drained = semaphore.drain_permits
expect(drained).to eq 3
expect(semaphore.available_permits).to eq 0
end

it 'drains nothing in no permits are available' do
semaphore.reduce_permits 3
drained = semaphore.drain_permits
expect(drained).to eq 0
end
end

describe '#try_acquire' do
context 'without timeout' do
it 'acquires immediately if permits are available' do
result = semaphore.try_acquire(1)
expect(result).to be_truthy
end

it 'returns false immediately in no permits are available' do
result = semaphore.try_acquire(20)
expect(result).to be_falsey
end
end

context 'with timeout' do
it 'acquires immediately if permits are available' do
result = semaphore.try_acquire(1, 5)
expect(result).to be_truthy
end

it 'acquires when permits are available within timeout' do
semaphore.drain_permits
Thread.new { sleep 0.1 && semaphore.release }
result = semaphore.try_acquire(1, 1)
expect(result).to be_truthy
end

it 'returns false on timeout' do
semaphore.drain_permits
result = semaphore.try_acquire(1, 0.1)
expect(result).to be_falsey
end
end
end

describe '#reduce_permits' do
it 'raises ArgumentError if reducing by negative number' do
expect {
semaphore.reduce_permits(-1)
}.to raise_error(ArgumentError)
end

it 'reduces permits below zero' do
semaphore.reduce_permits 1003
expect(semaphore.available_permits).to eq -1000
end

it 'reduces permits' do
semaphore.reduce_permits 1
expect(semaphore.available_permits).to eq 2
semaphore.reduce_permits 2
expect(semaphore.available_permits).to eq 0
end
end
end

module Concurrent
describe MutexSemaphore do
it_should_behave_like :semaphore

context 'spurious wake ups' do
subject { described_class.new(1) }

before(:each) do
def subject.simulate_spurious_wake_up
@mutex.synchronize do
@condition.signal
@condition.broadcast
end
end
subject.drain_permits
end

it 'should resist to spurious wake ups without timeout' do
@expected = true
# would set @expected to false
Thread.new { @expected = subject.acquire }

sleep(0.1)
subject.simulate_spurious_wake_up

sleep(0.1)
expect(@expected).to be_truthy
end

it 'should resist to spurious wake ups with timeout' do
@expected = true
# sets @expected to false in another thread
t = Thread.new { @expected = subject.try_acquire(1, 0.3) }

sleep(0.1)
subject.simulate_spurious_wake_up

sleep(0.1)
expect(@expected).to be_truthy

t.join
expect(@expected).to be_falsey
end
end
end

if TestHelpers.jruby?
describe JavaSemaphore do
it_should_behave_like :semaphore
end
end

describe Semaphore do
if jruby?
it 'inherits from JavaSemaphore' do
expect(Semaphore.ancestors).to include(JavaSemaphore)
end
else
it 'inherits from MutexSemaphore' do
expect(Semaphore.ancestors).to include(MutexSemaphore)
end
end
end
end