Skip to content

Commit 381e844

Browse files
committed
Implementation of Concurrent::JavaSemaphore in pure Java.
1 parent 3964a7b commit 381e844

File tree

3 files changed

+151
-64
lines changed

3 files changed

+151
-64
lines changed

ext/ConcurrentRubyExtService.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ public boolean basicLoad(final Ruby runtime) throws IOException {
88
new com.concurrent_ruby.ext.AtomicReferenceLibrary().load(runtime, false);
99
new com.concurrent_ruby.ext.JavaAtomicBooleanLibrary().load(runtime, false);
1010
new com.concurrent_ruby.ext.JavaAtomicFixnumLibrary().load(runtime, false);
11+
new com.concurrent_ruby.ext.JavaSemaphoreLibrary().load(runtime, false);
1112
return true;
1213
}
1314
}
Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,147 @@
1+
package com.concurrent_ruby.ext;
2+
3+
4+
import java.io.IOException;
5+
import java.util.concurrent.Semaphore;
6+
import org.jruby.Ruby;
7+
import org.jruby.RubyBoolean;
8+
import org.jruby.RubyClass;
9+
import org.jruby.RubyFixnum;
10+
import org.jruby.RubyModule;
11+
import org.jruby.RubyNumeric;
12+
import org.jruby.RubyObject;
13+
import org.jruby.anno.JRubyClass;
14+
import org.jruby.anno.JRubyMethod;
15+
import org.jruby.runtime.ObjectAllocator;
16+
import org.jruby.runtime.ThreadContext;
17+
import org.jruby.runtime.builtin.IRubyObject;
18+
19+
public class JavaSemaphoreLibrary {
20+
21+
public void load(Ruby runtime, boolean wrap) throws IOException {
22+
RubyModule concurrentMod = runtime.defineModule("Concurrent");
23+
RubyClass atomicCls = concurrentMod.defineClassUnder("JavaSemaphore", runtime.getObject(), JRUBYREFERENCE_ALLOCATOR);
24+
25+
atomicCls.defineAnnotatedMethods(JavaSemaphore.class);
26+
27+
}
28+
29+
private static final ObjectAllocator JRUBYREFERENCE_ALLOCATOR = new ObjectAllocator() {
30+
public IRubyObject allocate(Ruby runtime, RubyClass klazz) {
31+
return new JavaSemaphore(runtime, klazz);
32+
}
33+
};
34+
35+
@JRubyClass(name = "JavaSemaphore", parent = "Object")
36+
public static class JavaSemaphore extends RubyObject {
37+
38+
private JRubySemaphore semaphore;
39+
private ThreadContext context;
40+
41+
public JavaSemaphore(Ruby runtime, RubyClass metaClass) {
42+
super(runtime, metaClass);
43+
}
44+
45+
@JRubyMethod
46+
public IRubyObject initialize(ThreadContext context, IRubyObject value) {
47+
this.semaphore = new JRubySemaphore(rubyFixnumToInt(value, "count"));
48+
this.context = context;
49+
return context.nil;
50+
}
51+
52+
@JRubyMethod
53+
public IRubyObject acquire(ThreadContext context, IRubyObject value) throws InterruptedException {
54+
this.semaphore.acquire(rubyFixnumToInt(value, "permits"));
55+
return context.nil;
56+
}
57+
58+
@JRubyMethod(name = "available_permits")
59+
public IRubyObject availablePermits(ThreadContext context) {
60+
return new RubyFixnum(getRuntime(), this.semaphore.availablePermits());
61+
}
62+
63+
@JRubyMethod(name = "drain_permits")
64+
public IRubyObject drainPermits(ThreadContext context) {
65+
return new RubyFixnum(getRuntime(), this.semaphore.drainPermits());
66+
}
67+
68+
@JRubyMethod
69+
public IRubyObject acquire(ThreadContext context) throws InterruptedException {
70+
this.semaphore.acquire(1);
71+
return context.nil;
72+
}
73+
74+
@JRubyMethod(name = "try_acquire")
75+
public IRubyObject tryAcquire(ThreadContext context) throws InterruptedException {
76+
return RubyBoolean.newBoolean(getRuntime(), semaphore.tryAcquire(1));
77+
}
78+
79+
@JRubyMethod(name = "try_acquire")
80+
public IRubyObject tryAcquire(ThreadContext context, IRubyObject permits) throws InterruptedException {
81+
return RubyBoolean.newBoolean(getRuntime(), semaphore.tryAcquire(rubyFixnumToInt(permits, "permits")));
82+
}
83+
84+
@JRubyMethod(name = "try_acquire")
85+
public IRubyObject tryAcquire(ThreadContext context, IRubyObject permits, IRubyObject timeout) throws InterruptedException {
86+
return RubyBoolean.newBoolean(getRuntime(),
87+
semaphore.tryAcquire(
88+
rubyFixnumToInt(permits, "permits"),
89+
rubyNumericToLong(timeout, "timeout"),
90+
java.util.concurrent.TimeUnit.SECONDS)
91+
);
92+
}
93+
94+
@JRubyMethod
95+
public IRubyObject release(ThreadContext context) {
96+
this.semaphore.release(1);
97+
return RubyBoolean.newBoolean(getRuntime(), true);
98+
}
99+
100+
@JRubyMethod
101+
public IRubyObject release(ThreadContext context, IRubyObject value) {
102+
this.semaphore.release(rubyFixnumToInt(value, "permits"));
103+
return RubyBoolean.newBoolean(getRuntime(), true);
104+
}
105+
106+
@JRubyMethod(name = "reduce_permits")
107+
public IRubyObject reducePermits(ThreadContext context, IRubyObject reduction) throws InterruptedException {
108+
this.semaphore.publicReducePermits(rubyFixnumToInt(reduction, "reduction"));
109+
return context.nil;
110+
}
111+
112+
private int rubyFixnumToInt(IRubyObject value, String paramName) {
113+
if (value instanceof RubyFixnum && ((RubyFixnum) value).getLongValue() > 0) {
114+
RubyFixnum fixNum = (RubyFixnum) value;
115+
return (int) fixNum.getLongValue();
116+
} else {
117+
throw getRuntime().newArgumentError(paramName + " must be in integer greater than zero");
118+
}
119+
}
120+
121+
private long rubyNumericToLong(IRubyObject value, String paramName) {
122+
if (value instanceof RubyNumeric && ((RubyNumeric) value).getDoubleValue() > 0) {
123+
RubyNumeric fixNum = (RubyNumeric) value;
124+
return fixNum.getLongValue();
125+
} else {
126+
throw getRuntime().newArgumentError(paramName + " must be in float greater than zero");
127+
}
128+
}
129+
130+
class JRubySemaphore extends Semaphore {
131+
132+
public JRubySemaphore(int permits) {
133+
super(permits);
134+
}
135+
136+
public JRubySemaphore(int permits, boolean value) {
137+
super(permits, value);
138+
}
139+
140+
public void publicReducePermits(int i) {
141+
reducePermits(i);
142+
}
143+
144+
}
145+
}
146+
}
147+

lib/concurrent/atomic/semaphore.rb

Lines changed: 3 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ def release(permits = 1)
109109
end
110110

111111
# @!macro [attach] semaphore_method_reduce_permits
112-
#
112+
#
113113
# @api private
114114
#
115115
# Shrinks the number of available permits by the indicated reduction.
@@ -126,7 +126,7 @@ def reduce_permits(reduction)
126126
fail ArgumentError, 'reduction must be an non-negative integer'
127127
end
128128
@mutex.synchronize { @free -= reduction }
129-
nil
129+
nil
130130
end
131131

132132
private
@@ -153,73 +153,12 @@ def try_acquire_timed(permits, timeout)
153153
if RUBY_PLATFORM == 'java'
154154

155155
# @!macro semaphore
156-
#
156+
#
157157
# A counting semaphore. Conceptually, a semaphore maintains a set of permits. Each {#acquire} blocks if necessary
158158
# until a permit is available, and then takes it. Each {#release} adds a permit,
159159
# potentially releasing a blocking acquirer.
160160
# However, no actual permit objects are used; the Semaphore just keeps a count of the number available and
161161
# acts accordingly.
162-
class JavaSemaphore
163-
# @!macro semaphore_method_initialize
164-
def initialize(count)
165-
unless count.is_a?(Fixnum) && count >= 0
166-
fail(ArgumentError,
167-
'count must be in integer greater than or equal zero')
168-
end
169-
@semaphore = java.util.concurrent.Semaphore.new(count)
170-
end
171-
172-
# @!macro semaphore_method_acquire
173-
def acquire(permits = 1)
174-
unless permits.is_a?(Fixnum) && permits > 0
175-
fail ArgumentError, 'permits must be an integer greater than zero'
176-
end
177-
@semaphore.acquire(permits)
178-
end
179-
180-
# @!macro semaphore_method_available_permits
181-
def available_permits
182-
@semaphore.availablePermits
183-
end
184-
185-
# @!macro semaphore_method_drain_permits
186-
def drain_permits
187-
@semaphore.drainPermits
188-
end
189-
190-
# @!macro semaphore_method_try_acquire
191-
def try_acquire(permits = 1, timeout = nil)
192-
unless permits.is_a?(Fixnum) && permits > 0
193-
fail ArgumentError, 'permits must be an integer greater than zero'
194-
end
195-
if timeout.nil?
196-
@semaphore.tryAcquire(permits)
197-
else
198-
@semaphore.tryAcquire(permits,
199-
timeout,
200-
java.util.concurrent.TimeUnit::SECONDS)
201-
end
202-
end
203-
204-
# @!macro semaphore_method_release
205-
def release(permits = 1)
206-
unless permits.is_a?(Fixnum) && permits > 0
207-
fail ArgumentError, 'permits must be an integer greater than zero'
208-
end
209-
@semaphore.release(permits)
210-
true
211-
end
212-
213-
# @!macro semaphore_method_reduce_permits
214-
def reduce_permits(reduction)
215-
unless reduction.is_a?(Fixnum) && reduction >= 0
216-
fail ArgumentError, 'reduction must be an non-negative integer'
217-
end
218-
@semaphore.reducePermits(reduction)
219-
end
220-
end
221-
222-
# @!macro semaphore
223162
class Semaphore < JavaSemaphore
224163
end
225164

0 commit comments

Comments
 (0)