Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ mimalloc-sys = { version = "0.1.6", optional = true }
hoard-sys = { version = "0.1.1", optional = true }
lazy_static = "1.1"
log = { version = "0.4", features = ["max_level_trace", "release_max_level_off"] }
crossbeam-deque = "0.6"
crossbeam = "0.8.1"
num_cpus = "1.8"
enum-map = "0.6.2"
enum-map = "=2.1"
downcast-rs = "1.1.1"
atomic-traits = "0.2.0"
atomic = "0.4.6"
Expand All @@ -41,7 +41,6 @@ strum = "0.24"
strum_macros = "0.24"

[dev-dependencies]
crossbeam = "0.7.3"
rand = "0.7.3"

[features]
Expand Down
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ extern crate log;
#[cfg(target = "x86_64-unknown-linux-gnu")]
extern crate atomic;
extern crate atomic_traits;
extern crate crossbeam_deque;
extern crate crossbeam;
extern crate num_cpus;
#[macro_use]
extern crate downcast_rs;
Expand Down
54 changes: 38 additions & 16 deletions src/scheduler/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use crate::scheduler::CoordinatorMessage;
use crate::util::VMWorkerThread;
use crate::vm::VMBinding;
use crate::MMTK;
use atomic::Ordering;

use super::{GCWork, GCWorkScheduler, GCWorker};

Expand Down Expand Up @@ -66,33 +67,54 @@ impl<VM: VMBinding> GCController<VM> {
}
}

/// Coordinate workers to perform GC in response to a GC request.
pub fn do_gc_until_completion(&mut self) {
/// Process a message. Return true if the GC is finished.
fn process_message(&mut self, message: CoordinatorMessage<VM>) -> bool {
let worker = &mut self.coordinator_worker;
let mmtk = self.mmtk;
match message {
CoordinatorMessage::Work(mut work) => {
work.do_work_with_stat(worker, mmtk);
self.scheduler
.pending_messages
.fetch_sub(1, Ordering::SeqCst);
false
}
CoordinatorMessage::Finish => {
self.scheduler
.pending_messages
.fetch_sub(1, Ordering::SeqCst);
// Quit only if all the buckets are empty.
// For concurrent GCs, the coordinator thread may receive this message when
// some buckets are still not empty. Under such case, the coordinator
// should ignore the message.
let _guard = self.scheduler.worker_monitor.0.lock().unwrap();
self.scheduler.worker_group.all_parked() && self.scheduler.all_buckets_empty()
}
}
}

/// Coordinate workers to perform GC in response to a GC request.
pub fn do_gc_until_completion(&mut self) {
// Schedule collection.
ScheduleCollection.do_work_with_stat(worker, mmtk);
ScheduleCollection.do_work_with_stat(&mut self.coordinator_worker, self.mmtk);

// Drain the message queue and execute coordinator work.
loop {
let message = self.receiver.recv().unwrap();
match message {
CoordinatorMessage::Work(mut work) => {
work.do_work_with_stat(worker, mmtk);
}
CoordinatorMessage::AllWorkerParked | CoordinatorMessage::BucketDrained => {
self.scheduler.update_buckets();
}
}
let _guard = self.scheduler.worker_monitor.0.lock().unwrap();
if self.scheduler.all_workers_parked() && self.scheduler.all_buckets_empty() {
let finished = self.process_message(message);
if finished {
break;
}
}
debug_assert!(!self.scheduler.worker_group.has_designated_work());
// Sometimes multiple finish messages will be sent. Skip them.
for message in self.receiver.try_iter() {
if let CoordinatorMessage::Work(mut work) = message {
work.do_work_with_stat(worker, mmtk);
self.scheduler
.pending_messages
.fetch_sub(1, Ordering::SeqCst);
match message {
CoordinatorMessage::Work(_) => unreachable!(),
CoordinatorMessage::Finish => {}
}
}
self.scheduler.deactivate_all();
Expand All @@ -101,7 +123,7 @@ impl<VM: VMBinding> GCController<VM> {
// Otherwise, for generational GCs, workers will receive and process
// newly generated remembered-sets from those open buckets.
// But these remsets should be preserved until next GC.
EndOfGC.do_work_with_stat(worker, mmtk);
EndOfGC.do_work_with_stat(&mut self.coordinator_worker, self.mmtk);

self.scheduler.debug_assert_all_buckets_deactivated();
}
Expand Down
10 changes: 6 additions & 4 deletions src/scheduler/gc_work.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,9 @@ impl<C: GCWorkContext + 'static> GCWork<C::VM> for Prepare<C> {
mmtk.scheduler.work_buckets[WorkBucketStage::Prepare]
.add(PrepareMutator::<C::VM>::new(mutator));
}
for w in &mmtk.scheduler.workers_shared {
w.local_work_bucket.add(PrepareCollector);
for w in &mmtk.scheduler.worker_group.workers_shared {
let result = w.designated_work.push(Box::new(PrepareCollector));
debug_assert!(result.is_ok());
}
}
}
Expand Down Expand Up @@ -118,8 +119,9 @@ impl<C: GCWorkContext + 'static> GCWork<C::VM> for Release<C> {
mmtk.scheduler.work_buckets[WorkBucketStage::Release]
.add(ReleaseMutator::<C::VM>::new(mutator));
}
for w in &mmtk.scheduler.workers_shared {
w.local_work_bucket.add(ReleaseCollector);
for w in &mmtk.scheduler.worker_group.workers_shared {
let result = w.designated_work.push(Box::new(ReleaseCollector));
debug_assert!(result.is_ok());
}
}
}
Expand Down
Loading