Skip to content

Commit 3d342da

Browse files
committed
Work stealing
1 parent 519e8f3 commit 3d342da

File tree

4 files changed

+327
-195
lines changed

4 files changed

+327
-195
lines changed

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ mimalloc-sys = {version = "0.1.6", optional = true }
2525
hoard-sys = {version = "0.1.1", optional = true }
2626
lazy_static = "1.1"
2727
log = {version = "0.4", features = ["max_level_trace", "release_max_level_off"] }
28-
crossbeam-deque = "0.6"
28+
crossbeam-deque = "0.7"
2929
num_cpus = "1.8"
3030
enum-map = "0.6.2"
3131
downcast-rs = "1.1.1"

src/scheduler/scheduler.rs

Lines changed: 147 additions & 93 deletions
Original file line numberDiff line numberDiff line change
@@ -5,22 +5,21 @@ use super::*;
55
use crate::mmtk::MMTK;
66
use crate::util::opaque_pointer::*;
77
use crate::vm::VMBinding;
8+
use crossbeam_deque::Steal;
89
use enum_map::{enum_map, EnumMap};
910
use std::collections::HashMap;
10-
use std::sync::atomic::Ordering;
1111
use std::sync::mpsc::{channel, Receiver, Sender};
1212
use std::sync::{Arc, Condvar, Mutex, RwLock};
1313

1414
pub enum CoordinatorMessage<VM: VMBinding> {
1515
Work(Box<dyn CoordinatorWork<VM>>),
1616
AllWorkerParked,
1717
BucketDrained,
18+
Finish,
1819
}
1920

2021
pub struct GCWorkScheduler<VM: VMBinding> {
2122
pub work_buckets: EnumMap<WorkBucketStage, WorkBucket<VM>>,
22-
/// Work for the coordinator thread
23-
pub coordinator_work: WorkBucket<VM>,
2423
/// workers
2524
worker_group: Option<Arc<WorkerGroup<VM>>>,
2625
/// Condition Variable for worker synchronization
@@ -58,17 +57,16 @@ impl<VM: VMBinding> GCWorkScheduler<VM> {
5857
let worker_monitor: Arc<(Mutex<()>, Condvar)> = Default::default();
5958
Arc::new(Self {
6059
work_buckets: enum_map! {
61-
WorkBucketStage::Unconstrained => WorkBucket::new(true, worker_monitor.clone()),
62-
WorkBucketStage::Prepare => WorkBucket::new(false, worker_monitor.clone()),
63-
WorkBucketStage::Closure => WorkBucket::new(false, worker_monitor.clone()),
64-
WorkBucketStage::RefClosure => WorkBucket::new(false, worker_monitor.clone()),
65-
WorkBucketStage::CalculateForwarding => WorkBucket::new(false, worker_monitor.clone()),
66-
WorkBucketStage::RefForwarding => WorkBucket::new(false, worker_monitor.clone()),
67-
WorkBucketStage::Compact => WorkBucket::new(false, worker_monitor.clone()),
68-
WorkBucketStage::Release => WorkBucket::new(false, worker_monitor.clone()),
69-
WorkBucketStage::Final => WorkBucket::new(false, worker_monitor.clone()),
60+
WorkBucketStage::Unconstrained => WorkBucket::new(true, worker_monitor.clone(), true),
61+
WorkBucketStage::Prepare => WorkBucket::new(false, worker_monitor.clone(), false),
62+
WorkBucketStage::Closure => WorkBucket::new(false, worker_monitor.clone(), false),
63+
WorkBucketStage::RefClosure => WorkBucket::new(false, worker_monitor.clone(), false),
64+
WorkBucketStage::CalculateForwarding => WorkBucket::new(false, worker_monitor.clone(), false),
65+
WorkBucketStage::RefForwarding => WorkBucket::new(false, worker_monitor.clone(), false),
66+
WorkBucketStage::Compact => WorkBucket::new(false, worker_monitor.clone(), false),
67+
WorkBucketStage::Release => WorkBucket::new(false, worker_monitor.clone(), false),
68+
WorkBucketStage::Final => WorkBucket::new(false, worker_monitor.clone(), false),
7069
},
71-
coordinator_work: WorkBucket::new(true, worker_monitor.clone()),
7270
worker_group: None,
7371
worker_monitor,
7472
mmtk: None,
@@ -91,7 +89,6 @@ impl<VM: VMBinding> GCWorkScheduler<VM> {
9189
mmtk: &'static MMTK<VM>,
9290
tls: VMThread,
9391
) {
94-
use crate::scheduler::work_bucket::WorkBucketStage::*;
9592
let num_workers = if cfg!(feature = "single_worker") {
9693
1
9794
} else {
@@ -103,7 +100,7 @@ impl<VM: VMBinding> GCWorkScheduler<VM> {
103100

104101
self_mut.mmtk = Some(mmtk);
105102
self_mut.coordinator_worker = Some(RwLock::new(GCWorker::new(
106-
0,
103+
usize::MAX,
107104
Arc::downgrade(self),
108105
true,
109106
self.channel.0.clone(),
@@ -113,18 +110,28 @@ impl<VM: VMBinding> GCWorkScheduler<VM> {
113110
Arc::downgrade(self),
114111
self.channel.0.clone(),
115112
));
113+
let group = self_mut.worker_group.as_ref().unwrap().clone();
114+
self_mut.work_buckets.values_mut().for_each(|bucket| {
115+
bucket.set_group(group.clone());
116+
});
116117
self.worker_group.as_ref().unwrap().spawn_workers(tls, mmtk);
117118

118119
{
119120
// Unconstrained is always open. Prepare will be opened at the beginning of a GC.
120121
// This vec will grow for each stage we call with open_next()
121-
let mut open_stages: Vec<WorkBucketStage> = vec![Unconstrained, Prepare];
122+
let first_stw_stage = self
123+
.work_buckets
124+
.iter()
125+
.skip(1)
126+
.next()
127+
.map(|(id, _)| id)
128+
.unwrap();
129+
let mut open_stages: Vec<WorkBucketStage> = vec![first_stw_stage];
122130
// The rest will open after the previous stage is done.
123131
let mut open_next = |s: WorkBucketStage| {
124132
let cur_stages = open_stages.clone();
125133
self_mut.work_buckets[s].set_open_condition(move || {
126-
let should_open =
127-
self.are_buckets_drained(&cur_stages) && self.worker_group().all_parked();
134+
let should_open = self.are_buckets_drained(&cur_stages);
128135
// Additional check before the `RefClosure` bucket opens.
129136
if should_open && s == WorkBucketStage::RefClosure {
130137
if let Some(closure_end) = self.closure_end.lock().unwrap().as_ref() {
@@ -139,13 +146,11 @@ impl<VM: VMBinding> GCWorkScheduler<VM> {
139146
open_stages.push(s);
140147
};
141148

142-
open_next(Closure);
143-
open_next(RefClosure);
144-
open_next(CalculateForwarding);
145-
open_next(RefForwarding);
146-
open_next(Compact);
147-
open_next(Release);
148-
open_next(Final);
149+
for (id, _) in self.work_buckets.iter() {
150+
if id != WorkBucketStage::Unconstrained && id != first_stw_stage {
151+
open_next(id);
152+
}
153+
}
149154
}
150155
}
151156

@@ -234,19 +239,20 @@ impl<VM: VMBinding> GCWorkScheduler<VM> {
234239
}
235240

236241
/// Open buckets if their conditions are met
237-
fn update_buckets(&self) {
242+
fn update_buckets(&self) -> bool {
238243
let mut buckets_updated = false;
244+
let mut new_packets = false;
239245
for (id, bucket) in self.work_buckets.iter() {
240246
if id == WorkBucketStage::Unconstrained {
241247
continue;
242248
}
243-
buckets_updated |= bucket.update();
244-
}
245-
if buckets_updated {
246-
// Notify the workers for new work
247-
let _guard = self.worker_monitor.0.lock().unwrap();
248-
self.worker_monitor.1.notify_all();
249+
let current_bucket_updated = bucket.update();
250+
buckets_updated |= current_bucket_updated;
251+
if current_bucket_updated {
252+
new_packets |= !bucket.is_drained();
253+
}
249254
}
255+
buckets_updated && new_packets
250256
}
251257

252258
/// Execute coordinator work, in the controller thread
@@ -269,8 +275,9 @@ impl<VM: VMBinding> GCWorkScheduler<VM> {
269275
self.process_coordinator_work(work);
270276
}
271277
CoordinatorMessage::AllWorkerParked | CoordinatorMessage::BucketDrained => {
272-
self.update_buckets();
278+
unreachable!()
273279
}
280+
CoordinatorMessage::Finish => {}
274281
}
275282
let _guard = self.worker_monitor.0.lock().unwrap();
276283
if self.worker_group().all_parked() && self.all_buckets_empty() {
@@ -291,36 +298,40 @@ impl<VM: VMBinding> GCWorkScheduler<VM> {
291298
if let Some(finalizer) = self.finalizer.lock().unwrap().take() {
292299
self.process_coordinator_work(finalizer);
293300
}
294-
debug_assert!(!self.work_buckets[WorkBucketStage::Prepare].is_activated());
295-
debug_assert!(!self.work_buckets[WorkBucketStage::Closure].is_activated());
296-
debug_assert!(!self.work_buckets[WorkBucketStage::RefClosure].is_activated());
297-
debug_assert!(!self.work_buckets[WorkBucketStage::CalculateForwarding].is_activated());
298-
debug_assert!(!self.work_buckets[WorkBucketStage::RefForwarding].is_activated());
299-
debug_assert!(!self.work_buckets[WorkBucketStage::Compact].is_activated());
300-
debug_assert!(!self.work_buckets[WorkBucketStage::Release].is_activated());
301-
debug_assert!(!self.work_buckets[WorkBucketStage::Final].is_activated());
301+
self.assert_all_deactivated();
302+
}
303+
304+
pub fn assert_all_deactivated(&self) {
305+
if cfg!(debug_assertions) {
306+
self.work_buckets.iter().for_each(|(id, bkt)| {
307+
if id != WorkBucketStage::Unconstrained {
308+
assert!(!bkt.is_activated());
309+
}
310+
});
311+
}
302312
}
303313

304314
pub fn deactivate_all(&self) {
305-
self.work_buckets[WorkBucketStage::Prepare].deactivate();
306-
self.work_buckets[WorkBucketStage::Closure].deactivate();
307-
self.work_buckets[WorkBucketStage::RefClosure].deactivate();
308-
self.work_buckets[WorkBucketStage::CalculateForwarding].deactivate();
309-
self.work_buckets[WorkBucketStage::RefForwarding].deactivate();
310-
self.work_buckets[WorkBucketStage::Compact].deactivate();
311-
self.work_buckets[WorkBucketStage::Release].deactivate();
312-
self.work_buckets[WorkBucketStage::Final].deactivate();
315+
self.work_buckets.iter().for_each(|(id, bkt)| {
316+
if id != WorkBucketStage::Unconstrained {
317+
bkt.deactivate();
318+
}
319+
});
313320
}
314321

315322
pub fn reset_state(&self) {
316-
// self.work_buckets[WorkBucketStage::Prepare].deactivate();
317-
self.work_buckets[WorkBucketStage::Closure].deactivate();
318-
self.work_buckets[WorkBucketStage::RefClosure].deactivate();
319-
self.work_buckets[WorkBucketStage::CalculateForwarding].deactivate();
320-
self.work_buckets[WorkBucketStage::RefForwarding].deactivate();
321-
self.work_buckets[WorkBucketStage::Compact].deactivate();
322-
self.work_buckets[WorkBucketStage::Release].deactivate();
323-
self.work_buckets[WorkBucketStage::Final].deactivate();
323+
let first_stw_stage = self
324+
.work_buckets
325+
.iter()
326+
.skip(1)
327+
.next()
328+
.map(|(id, _)| id)
329+
.unwrap();
330+
self.work_buckets.iter().for_each(|(id, bkt)| {
331+
if id != WorkBucketStage::Unconstrained && id != first_stw_stage {
332+
bkt.deactivate();
333+
}
334+
});
324335
}
325336

326337
pub fn add_coordinator_work(&self, work: impl CoordinatorWork<VM>, worker: &GCWorker<VM>) {
@@ -330,34 +341,72 @@ impl<VM: VMBinding> GCWorkScheduler<VM> {
330341
.unwrap();
331342
}
332343

333-
#[inline]
334-
fn pop_scheduable_work(&self, worker: &GCWorker<VM>) -> Option<(Box<dyn GCWork<VM>>, bool)> {
335-
if let Some(work) = worker.local_work_bucket.poll() {
336-
return Some((work, worker.local_work_bucket.is_empty()));
344+
#[inline(always)]
345+
fn all_activated_buckets_are_empty(&self) -> bool {
346+
for bucket in self.work_buckets.values() {
347+
if bucket.is_activated() && !bucket.is_drained() {
348+
return false;
349+
}
350+
}
351+
true
352+
}
353+
354+
#[inline(always)]
355+
fn pop_schedulable_work_once(&self, worker: &GCWorker<VM>) -> Steal<Box<dyn GCWork<VM>>> {
356+
let mut retry = false;
357+
match worker.local_work_bucket.poll_no_batch() {
358+
Steal::Success(w) => return Steal::Success(w),
359+
Steal::Retry => retry = true,
360+
_ => {}
337361
}
338362
for work_bucket in self.work_buckets.values() {
339-
if let Some(work) = work_bucket.poll() {
340-
return Some((work, work_bucket.is_empty()));
363+
match work_bucket.poll(&worker.local_work_buffer) {
364+
Steal::Success(w) => return Steal::Success(w),
365+
Steal::Retry => retry = true,
366+
_ => {}
367+
}
368+
}
369+
for (id, stealer) in &self.worker_group().stealers {
370+
if *id == worker.ordinal {
371+
continue;
372+
}
373+
match stealer.steal() {
374+
Steal::Success(w) => return Steal::Success(w),
375+
Steal::Retry => retry = true,
376+
_ => {}
341377
}
342378
}
343-
None
379+
if retry {
380+
Steal::Retry
381+
} else {
382+
Steal::Empty
383+
}
344384
}
345385

346-
/// Get a scheduable work. Called by workers
347386
#[inline]
348-
pub fn poll(&self, worker: &GCWorker<VM>) -> Box<dyn GCWork<VM>> {
349-
let work = if let Some((work, bucket_is_empty)) = self.pop_scheduable_work(worker) {
350-
if bucket_is_empty {
351-
worker
352-
.sender
353-
.send(CoordinatorMessage::BucketDrained)
354-
.unwrap();
387+
fn pop_schedulable_work(&self, worker: &GCWorker<VM>) -> Option<Box<dyn GCWork<VM>>> {
388+
loop {
389+
std::hint::spin_loop();
390+
match self.pop_schedulable_work_once(worker) {
391+
Steal::Success(w) => {
392+
return Some(w);
393+
}
394+
Steal::Retry => {
395+
// std::thread::yield_now();
396+
continue;
397+
}
398+
Steal::Empty => {
399+
return None;
400+
}
355401
}
356-
work
357-
} else {
358-
self.poll_slow(worker)
359-
};
360-
work
402+
}
403+
}
404+
405+
/// Get a schedulable work. Called by workers
406+
#[inline]
407+
pub fn poll(&self, worker: &GCWorker<VM>) -> Box<dyn GCWork<VM>> {
408+
self.pop_schedulable_work(worker)
409+
.unwrap_or_else(|| self.poll_slow(worker))
361410
}
362411

363412
#[cold]
@@ -366,27 +415,31 @@ impl<VM: VMBinding> GCWorkScheduler<VM> {
366415
let mut guard = self.worker_monitor.0.lock().unwrap();
367416
loop {
368417
debug_assert!(!worker.is_parked());
369-
if let Some((work, bucket_is_empty)) = self.pop_scheduable_work(worker) {
370-
if bucket_is_empty {
371-
worker
372-
.sender
373-
.send(CoordinatorMessage::BucketDrained)
374-
.unwrap();
375-
}
418+
if let Some(work) = self.pop_schedulable_work(worker) {
376419
return work;
377420
}
378421
// Park this worker
379-
worker.parked.store(true, Ordering::SeqCst);
380-
if self.worker_group().all_parked() {
381-
worker
382-
.sender
383-
.send(CoordinatorMessage::AllWorkerParked)
384-
.unwrap();
422+
let all_parked = self.worker_group().inc_parked_workers();
423+
if all_parked {
424+
if self.update_buckets() {
425+
self.worker_group().dec_parked_workers();
426+
// We guarantee that we can at least fetch one packet.
427+
let work = self.pop_schedulable_work(worker).unwrap();
428+
// Optimize for the case that a newly opened bucket only has one packet.
429+
if !self.all_activated_buckets_are_empty() {
430+
// Have more jobs in this buckets. Notify other workers.
431+
self.worker_monitor.1.notify_all();
432+
}
433+
return work;
434+
}
435+
worker.sender.send(CoordinatorMessage::Finish).unwrap();
385436
}
386437
// Wait
438+
// println!("[{}] sleep", worker.ordinal);
387439
guard = self.worker_monitor.1.wait(guard).unwrap();
440+
// println!("[{}] wake", worker.ordinal);
388441
// Unpark this worker
389-
worker.parked.store(false, Ordering::SeqCst);
442+
self.worker_group().dec_parked_workers();
390443
}
391444
}
392445

@@ -410,8 +463,9 @@ impl<VM: VMBinding> GCWorkScheduler<VM> {
410463

411464
pub fn notify_mutators_paused(&self, mmtk: &'static MMTK<VM>) {
412465
mmtk.plan.base().control_collector_context.clear_request();
413-
debug_assert!(!self.work_buckets[WorkBucketStage::Prepare].is_activated());
414-
self.work_buckets[WorkBucketStage::Prepare].activate();
466+
let first_stw_bucket = self.work_buckets.values().skip(1).next().unwrap();
467+
debug_assert!(!first_stw_bucket.is_activated());
468+
first_stw_bucket.activate();
415469
let _guard = self.worker_monitor.0.lock().unwrap();
416470
self.worker_monitor.1.notify_all();
417471
}

0 commit comments

Comments
 (0)