Skip to content

Commit 06b5bfb

Browse files
committed
Create method ThreadPoolExecutor#active_count to expose the number of threads that are actively executing tasks
1 parent da2d27c commit 06b5bfb

File tree

4 files changed

+50
-8
lines changed

4 files changed

+50
-8
lines changed

lib/concurrent-ruby/concurrent/executor/fixed_thread_pool.rb

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,10 @@ module Concurrent
3939
# The number of tasks that have been completed by the pool since construction.
4040
# @return [Integer] The number of tasks that have been completed by the pool since construction.
4141

42+
# @!macro thread_pool_executor_method_active_count
43+
# The number of threads that are actively executing tasks.
44+
# @return [Integer] The number of threads that are actively executing tasks.
45+
4246
# @!macro thread_pool_executor_attr_reader_idletime
4347
# The number of seconds that a thread may be idle before being reclaimed.
4448
# @return [Integer] The number of seconds that a thread may be idle before being reclaimed.

lib/concurrent-ruby/concurrent/executor/java_thread_pool_executor.rb

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,11 @@ def completed_task_count
7373
@executor.getCompletedTaskCount
7474
end
7575

76+
# @!macro thread_pool_executor_method_active_count
77+
def active_count
78+
@executor.getActiveCount
79+
end
80+
7681
# @!macro thread_pool_executor_attr_reader_idletime
7782
def idletime
7883
@executor.getKeepAliveTime(java.util.concurrent.TimeUnit::SECONDS)

lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,13 @@ def completed_task_count
6161
synchronize { @completed_task_count }
6262
end
6363

64+
# @!macro thread_pool_executor_method_active_count
65+
def active_count
66+
synchronize do
67+
@pool.length - @ready.length
68+
end
69+
end
70+
6471
# @!macro executor_service_method_can_overflow_question
6572
def can_overflow?
6673
synchronize { ns_limited_queue? }

spec/concurrent/executor/thread_pool_executor_shared.rb

Lines changed: 34 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -258,6 +258,32 @@
258258
end
259259
end
260260

261+
context '#active_count' do
262+
subject do
263+
described_class.new(
264+
min_threads: 5,
265+
max_threads: 10,
266+
idletime: 60,
267+
max_queue: 0,
268+
fallback_policy: :discard
269+
)
270+
end
271+
272+
it 'returns the number of threads that are actively executing tasks.' do
273+
latch = Concurrent::CountDownLatch.new(1)
274+
3.times do
275+
subject.post { latch.wait }
276+
end
277+
278+
repeat_until_success do
279+
expect(subject.active_count).to eq 3
280+
end
281+
282+
# release
283+
latch.count_down
284+
end
285+
end
286+
261287
context '#fallback_policy' do
262288

263289
let!(:min_threads){ 1 }
@@ -627,33 +653,33 @@
627653
max_threads: 1,
628654
max_queue: 1,
629655
fallback_policy: :caller_runs)
630-
656+
631657
worker_unblocker = Concurrent::CountDownLatch.new(1)
632658
executor_unblocker = Concurrent::CountDownLatch.new(1)
633659
queue_done = Concurrent::CountDownLatch.new(1)
634-
660+
635661
# Block the worker thread
636662
executor << proc { worker_unblocker.wait }
637-
663+
638664
# Fill the queue
639665
executor << proc { log.push :queued; queue_done.count_down }
640-
666+
641667
# Block in a caller_runs job
642668
caller_runs_thread = Thread.new {
643669
executor << proc { executor_unblocker.wait; log.push :unblocked }
644670
}
645-
671+
646672
# Wait until the caller_runs job is blocked
647673
Thread.pass until caller_runs_thread.status == 'sleep'
648-
674+
649675
# Now unblock the worker thread
650676
worker_unblocker.count_down
651677
queue_done.wait
652678
executor_unblocker.count_down
653-
679+
654680
# Tidy up
655681
caller_runs_thread.join
656-
682+
657683
# We will see the queued jobs run before the caller_runs job unblocks
658684
expect([log.pop, log.pop]).to eq [:queued, :unblocked]
659685
end

0 commit comments

Comments
 (0)