Skip to content

Commit 43a6755

Browse files
authored
Work stealing (#507)
* work stealing * Fix CI * Add comments * optimizations * wip * optimize * revert * Revert "revert" This reverts commit c63b94c. * minor * minor * Fix CI * minor * refactor local work queues * Fix work buckets enumeration * Fix * WIP: Fix designated work * Fix Cargo.toml * WIP * Fix CI * minor * Fix jikes * minor * cleanup * seqcst
1 parent 28e1af1 commit 43a6755

File tree

9 files changed

+543
-305
lines changed

9 files changed

+543
-305
lines changed

Cargo.toml

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,9 @@ mimalloc-sys = { version = "0.1.6", optional = true }
2727
hoard-sys = { version = "0.1.1", optional = true }
2828
lazy_static = "1.1"
2929
log = { version = "0.4", features = ["max_level_trace", "release_max_level_off"] }
30-
crossbeam-deque = "0.6"
30+
crossbeam = "0.8.1"
3131
num_cpus = "1.8"
32-
enum-map = "0.6.2"
32+
enum-map = "=2.1"
3333
downcast-rs = "1.1.1"
3434
atomic-traits = "0.2.0"
3535
atomic = "0.4.6"
@@ -41,7 +41,6 @@ strum = "0.24"
4141
strum_macros = "0.24"
4242

4343
[dev-dependencies]
44-
crossbeam = "0.7.3"
4544
rand = "0.7.3"
4645

4746
[features]

src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ extern crate log;
3939
#[cfg(target = "x86_64-unknown-linux-gnu")]
4040
extern crate atomic;
4141
extern crate atomic_traits;
42-
extern crate crossbeam_deque;
42+
extern crate crossbeam;
4343
extern crate num_cpus;
4444
#[macro_use]
4545
extern crate downcast_rs;

src/scheduler/controller.rs

Lines changed: 38 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ use crate::scheduler::CoordinatorMessage;
1212
use crate::util::VMWorkerThread;
1313
use crate::vm::VMBinding;
1414
use crate::MMTK;
15+
use atomic::Ordering;
1516

1617
use super::{GCWork, GCWorkScheduler, GCWorker};
1718

@@ -66,33 +67,54 @@ impl<VM: VMBinding> GCController<VM> {
6667
}
6768
}
6869

69-
/// Coordinate workers to perform GC in response to a GC request.
70-
pub fn do_gc_until_completion(&mut self) {
70+
/// Process a message. Return true if the GC is finished.
71+
fn process_message(&mut self, message: CoordinatorMessage<VM>) -> bool {
7172
let worker = &mut self.coordinator_worker;
7273
let mmtk = self.mmtk;
74+
match message {
75+
CoordinatorMessage::Work(mut work) => {
76+
work.do_work_with_stat(worker, mmtk);
77+
self.scheduler
78+
.pending_messages
79+
.fetch_sub(1, Ordering::SeqCst);
80+
false
81+
}
82+
CoordinatorMessage::Finish => {
83+
self.scheduler
84+
.pending_messages
85+
.fetch_sub(1, Ordering::SeqCst);
86+
// Quit only if all the buckets are empty.
87+
// For concurrent GCs, the coordinator thread may receive this message when
88+
// some buckets are still not empty. Under such case, the coordinator
89+
// should ignore the message.
90+
let _guard = self.scheduler.worker_monitor.0.lock().unwrap();
91+
self.scheduler.worker_group.all_parked() && self.scheduler.all_buckets_empty()
92+
}
93+
}
94+
}
7395

96+
/// Coordinate workers to perform GC in response to a GC request.
97+
pub fn do_gc_until_completion(&mut self) {
7498
// Schedule collection.
75-
ScheduleCollection.do_work_with_stat(worker, mmtk);
99+
ScheduleCollection.do_work_with_stat(&mut self.coordinator_worker, self.mmtk);
76100

77101
// Drain the message queue and execute coordinator work.
78102
loop {
79103
let message = self.receiver.recv().unwrap();
80-
match message {
81-
CoordinatorMessage::Work(mut work) => {
82-
work.do_work_with_stat(worker, mmtk);
83-
}
84-
CoordinatorMessage::AllWorkerParked | CoordinatorMessage::BucketDrained => {
85-
self.scheduler.update_buckets();
86-
}
87-
}
88-
let _guard = self.scheduler.worker_monitor.0.lock().unwrap();
89-
if self.scheduler.all_workers_parked() && self.scheduler.all_buckets_empty() {
104+
let finished = self.process_message(message);
105+
if finished {
90106
break;
91107
}
92108
}
109+
debug_assert!(!self.scheduler.worker_group.has_designated_work());
110+
// Sometimes multiple finish messages will be sent. Skip them.
93111
for message in self.receiver.try_iter() {
94-
if let CoordinatorMessage::Work(mut work) = message {
95-
work.do_work_with_stat(worker, mmtk);
112+
self.scheduler
113+
.pending_messages
114+
.fetch_sub(1, Ordering::SeqCst);
115+
match message {
116+
CoordinatorMessage::Work(_) => unreachable!(),
117+
CoordinatorMessage::Finish => {}
96118
}
97119
}
98120
self.scheduler.deactivate_all();
@@ -101,7 +123,7 @@ impl<VM: VMBinding> GCController<VM> {
101123
// Otherwise, for generational GCs, workers will receive and process
102124
// newly generated remembered-sets from those open buckets.
103125
// But these remsets should be preserved until next GC.
104-
EndOfGC.do_work_with_stat(worker, mmtk);
126+
EndOfGC.do_work_with_stat(&mut self.coordinator_worker, self.mmtk);
105127

106128
self.scheduler.debug_assert_all_buckets_deactivated();
107129
}

src/scheduler/gc_work.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -50,8 +50,9 @@ impl<C: GCWorkContext + 'static> GCWork<C::VM> for Prepare<C> {
5050
mmtk.scheduler.work_buckets[WorkBucketStage::Prepare]
5151
.add(PrepareMutator::<C::VM>::new(mutator));
5252
}
53-
for w in &mmtk.scheduler.workers_shared {
54-
w.local_work_bucket.add(PrepareCollector);
53+
for w in &mmtk.scheduler.worker_group.workers_shared {
54+
let result = w.designated_work.push(Box::new(PrepareCollector));
55+
debug_assert!(result.is_ok());
5556
}
5657
}
5758
}
@@ -118,8 +119,9 @@ impl<C: GCWorkContext + 'static> GCWork<C::VM> for Release<C> {
118119
mmtk.scheduler.work_buckets[WorkBucketStage::Release]
119120
.add(ReleaseMutator::<C::VM>::new(mutator));
120121
}
121-
for w in &mmtk.scheduler.workers_shared {
122-
w.local_work_bucket.add(ReleaseCollector);
122+
for w in &mmtk.scheduler.worker_group.workers_shared {
123+
let result = w.designated_work.push(Box::new(ReleaseCollector));
124+
debug_assert!(result.is_ok());
123125
}
124126
}
125127
}

0 commit comments

Comments
 (0)