From 5fe9c591577a567a19dd23fc7eca0dfd7dce8766 Mon Sep 17 00:00:00 2001 From: Wenyu Zhao Date: Thu, 12 May 2022 16:37:04 +1000 Subject: [PATCH 01/24] work stealing --- Cargo.toml | 3 +- src/lib.rs | 2 +- src/scheduler/controller.rs | 6 +- src/scheduler/gc_work.rs | 8 +- src/scheduler/scheduler.rs | 255 ++++++++++++++++-------------- src/scheduler/work_bucket.rs | 214 ++++++++++++++++--------- src/scheduler/worker.rs | 142 +++++++++++++---- src/util/sanity/sanity_checker.rs | 4 +- 8 files changed, 402 insertions(+), 232 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 51cb2cc69a..e64e96fc69 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,7 +26,7 @@ mimalloc-sys = {version = "0.1.6", optional = true } hoard-sys = {version = "0.1.1", optional = true } lazy_static = "1.1" log = {version = "0.4", features = ["max_level_trace", "release_max_level_off"] } -crossbeam-deque = "0.6" +crossbeam = "0.8.1" num_cpus = "1.8" enum-map = "0.6.2" downcast-rs = "1.1.1" @@ -38,7 +38,6 @@ pfm = {version = "0.1.0-beta.1", optional = true} atomic_refcell = "0.1.7" [dev-dependencies] -crossbeam = "0.7.3" rand = "0.7.3" [features] diff --git a/src/lib.rs b/src/lib.rs index 2d6c6f43c6..4192e7061e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -43,7 +43,7 @@ extern crate log; #[cfg(target = "x86_64-unknown-linux-gnu")] extern crate atomic; extern crate atomic_traits; -extern crate crossbeam_deque; +extern crate crossbeam; extern crate num_cpus; #[macro_use] extern crate downcast_rs; diff --git a/src/scheduler/controller.rs b/src/scheduler/controller.rs index ceaae741d7..9cfabe9dd8 100644 --- a/src/scheduler/controller.rs +++ b/src/scheduler/controller.rs @@ -81,12 +81,10 @@ impl GCController { CoordinatorMessage::Work(mut work) => { work.do_work_with_stat(worker, mmtk); } - CoordinatorMessage::AllWorkerParked | CoordinatorMessage::BucketDrained => { - self.scheduler.update_buckets(); - } + CoordinatorMessage::Finish => {} } let _guard = self.scheduler.worker_monitor.0.lock().unwrap(); - if self.scheduler.all_workers_parked() && self.scheduler.all_buckets_empty() { + if self.scheduler.worker_group.all_parked() && self.scheduler.all_buckets_empty() { break; } } diff --git a/src/scheduler/gc_work.rs b/src/scheduler/gc_work.rs index e20a8c6c62..aa1581112b 100644 --- a/src/scheduler/gc_work.rs +++ b/src/scheduler/gc_work.rs @@ -50,8 +50,8 @@ impl GCWork for Prepare { mmtk.scheduler.work_buckets[WorkBucketStage::Prepare] .add(PrepareMutator::::new(mutator)); } - for w in &mmtk.scheduler.workers_shared { - w.local_work_bucket.add(PrepareCollector); + for w in &mmtk.scheduler.worker_group.workers_shared { + w.local_work.push(Box::new(PrepareCollector)); } } } @@ -118,8 +118,8 @@ impl GCWork for Release { mmtk.scheduler.work_buckets[WorkBucketStage::Release] .add(ReleaseMutator::::new(mutator)); } - for w in &mmtk.scheduler.workers_shared { - w.local_work_bucket.add(ReleaseCollector); + for w in &mmtk.scheduler.worker_group.workers_shared { + w.local_work.push(Box::new(ReleaseCollector)); } } } diff --git a/src/scheduler/scheduler.rs b/src/scheduler/scheduler.rs index d39538acd2..587caf0422 100644 --- a/src/scheduler/scheduler.rs +++ b/src/scheduler/scheduler.rs @@ -1,12 +1,12 @@ use super::stat::SchedulerStat; -use super::work_bucket::WorkBucketStage::*; use super::work_bucket::*; -use super::worker::{GCWorker, GCWorkerShared}; +use super::worker::{GCWorker, GCWorkerShared, WorkerGroup}; use super::*; use crate::mmtk::MMTK; use crate::util::opaque_pointer::*; use crate::vm::Collection; use crate::vm::{GCThreadContext, VMBinding}; +use crossbeam::deque::Steal; use enum_map::{enum_map, EnumMap}; use std::collections::HashMap; use std::sync::atomic::Ordering; @@ -15,18 +15,14 @@ use std::sync::{Arc, Condvar, Mutex}; pub enum CoordinatorMessage { Work(Box>), - AllWorkerParked, - BucketDrained, + Finish, } -/// The shared data structure for distributing work packets between worker threads and the coordinator thread. pub struct GCWorkScheduler { - /// Work buckets for worker threads + /// Work buckets pub work_buckets: EnumMap>, - /// Work for the coordinator thread - pub coordinator_work: WorkBucket, - /// The shared parts of GC workers - pub workers_shared: Vec>>, + /// workers + pub worker_group: Arc>, /// The shared part of the GC worker object of the controller thread coordinator_worker_shared: Arc>, /// Condition Variable for worker synchronization @@ -74,18 +70,27 @@ impl GCWorkScheduler { { // Unconstrained is always open. Prepare will be opened at the beginning of a GC. // This vec will grow for each stage we call with open_next() - let mut open_stages: Vec = vec![Unconstrained, Prepare]; + let first_stw_stage = work_buckets + .iter() + .skip(1) + .next() + .map(|(id, _)| id) + .unwrap(); + let mut open_stages: Vec = vec![first_stw_stage]; // The rest will open after the previous stage is done. + let stages = work_buckets + .iter() + .map(|(stage, _)| stage) + .collect::>(); let mut open_next = |s: WorkBucketStage| { let cur_stages = open_stages.clone(); work_buckets[s].set_open_condition(move |scheduler: &GCWorkScheduler| { - let should_open = scheduler.are_buckets_drained(&cur_stages) - && scheduler.all_workers_parked(); + let should_open = scheduler.are_buckets_drained(&cur_stages); // Additional check before the `RefClosure` bucket opens. if should_open && s == crate::scheduler::work_bucket::LAST_CLOSURE_BUCKET { if let Some(closure_end) = scheduler.closure_end.lock().unwrap().as_ref() { if closure_end() { - // Don't open `RefClosure` if `closure_end` added more works to `Closure`. + // Don't open `LAST_CLOSURE_BUCKET` if `closure_end` added more works to `Closure`. return false; } } @@ -95,36 +100,23 @@ impl GCWorkScheduler { open_stages.push(s); }; - open_next(Closure); - open_next(SoftRefClosure); - open_next(WeakRefClosure); - open_next(FinalRefClosure); - open_next(PhantomRefClosure); - open_next(CalculateForwarding); - open_next(SecondRoots); - open_next(RefForwarding); - open_next(FinalizableForwarding); - open_next(Compact); - open_next(Release); - open_next(Final); + for stages in stages { + if stages != WorkBucketStage::Unconstrained && stages != first_stw_stage { + open_next(stages); + } + } } - // Create the work bucket for the controller. - let coordinator_work = WorkBucket::new(true, worker_monitor.clone()); - - // We prepare the shared part of workers, but do not create the actual workers now. - // The shared parts of workers are communication hubs between controller and workers. - let workers_shared = (0..num_workers) - .map(|_| Arc::new(GCWorkerShared::::new(worker_monitor.clone()))) - .collect::>(); + let coordinator_worker_shared = Arc::new(GCWorkerShared::::new()); - // Similarly, we create the shared part of the work of the controller, but not the controller itself. - let coordinator_worker_shared = Arc::new(GCWorkerShared::::new(worker_monitor.clone())); + let worker_group = WorkerGroup::new(num_workers); + work_buckets.values_mut().for_each(|bucket| { + bucket.set_group(worker_group.clone()); + }); Arc::new(Self { work_buckets, - coordinator_work, - workers_shared, + worker_group, coordinator_worker_shared, worker_monitor, closure_end: Mutex::new(None), @@ -133,11 +125,7 @@ impl GCWorkScheduler { #[inline] pub fn num_workers(&self) -> usize { - self.workers_shared.len() - } - - pub fn all_workers_parked(&self) -> bool { - self.workers_shared.iter().all(|w| w.is_parked()) + self.worker_group.as_ref().worker_count() } /// Create GC threads, including the controller thread and all workers. @@ -163,18 +151,7 @@ impl GCWorkScheduler { ); VM::VMCollection::spawn_gc_thread(tls, GCThreadContext::::Controller(gc_controller)); - // Spawn each worker thread. - for (ordinal, shared) in self.workers_shared.iter().enumerate() { - let worker = Box::new(GCWorker::new( - mmtk, - ordinal, - self.clone(), - false, - sender.clone(), - shared.clone(), - )); - VM::VMCollection::spawn_gc_thread(tls, GCThreadContext::::Worker(worker)); - } + self.worker_group.spawn(mmtk, sender, tls) } /// Schedule all the common work packets @@ -262,48 +239,52 @@ impl GCWorkScheduler { } /// Open buckets if their conditions are met - pub fn update_buckets(&self) { + fn update_buckets(&self) -> bool { let mut buckets_updated = false; + let mut new_packets = false; for (id, bucket) in self.work_buckets.iter() { if id == WorkBucketStage::Unconstrained { continue; } - buckets_updated |= bucket.update(self); - } - if buckets_updated { - // Notify the workers for new work - let _guard = self.worker_monitor.0.lock().unwrap(); - self.worker_monitor.1.notify_all(); + let bucket_opened = bucket.update(self); + buckets_updated |= bucket_opened; + if bucket_opened { + new_packets |= !bucket.is_drained(); + } } + buckets_updated && new_packets } pub fn deactivate_all(&self) { - for (stage, bucket) in self.work_buckets.iter() { - if stage == WorkBucketStage::Unconstrained { - continue; + self.work_buckets.iter().for_each(|(id, bkt)| { + if id != WorkBucketStage::Unconstrained { + bkt.deactivate(); } - - bucket.deactivate(); - } + }); } pub fn reset_state(&self) { - for (stage, bucket) in self.work_buckets.iter() { - if stage == WorkBucketStage::Unconstrained || stage == WorkBucketStage::Prepare { - continue; + let first_stw_stage = self + .work_buckets + .iter() + .skip(1) + .next() + .map(|(id, _)| id) + .unwrap(); + self.work_buckets.iter().for_each(|(id, bkt)| { + if id != WorkBucketStage::Unconstrained && id != first_stw_stage { + bkt.deactivate(); } - - bucket.deactivate(); - } + }); } pub fn debug_assert_all_buckets_deactivated(&self) { - for (stage, bucket) in self.work_buckets.iter() { - if stage == WorkBucketStage::Unconstrained { - return; - } - - debug_assert!(!bucket.is_activated()); + if cfg!(debug_assertions) { + self.work_buckets.iter().for_each(|(id, bkt)| { + if id != WorkBucketStage::Unconstrained { + assert!(!bkt.is_activated()); + } + }); } } @@ -314,34 +295,70 @@ impl GCWorkScheduler { .unwrap(); } - #[inline] - fn pop_scheduable_work(&self, worker: &GCWorker) -> Option<(Box>, bool)> { - if let Some(work) = worker.shared.local_work_bucket.poll() { - return Some((work, worker.shared.local_work_bucket.is_empty())); + #[inline(always)] + fn all_activated_buckets_are_empty(&self) -> bool { + for bucket in self.work_buckets.values() { + if bucket.is_activated() && !bucket.is_drained() { + return false; + } + } + true + } + + #[inline(always)] + fn pop_schedulable_work_once(&self, worker: &GCWorker) -> Steal>> { + let mut retry = false; + match worker.shared.local_work.pop() { + Some(w) => return Steal::Success(w), + _ => {} } for work_bucket in self.work_buckets.values() { - if let Some(work) = work_bucket.poll() { - return Some((work, work_bucket.is_empty())); + match work_bucket.poll(&worker.shared.local_work_buffer) { + Steal::Success(w) => return Steal::Success(w), + Steal::Retry => retry = true, + _ => {} + } + } + for (id, stealer) in &self.worker_group.stealers { + if *id == worker.ordinal { + continue; + } + match stealer.steal() { + Steal::Success(w) => return Steal::Success(w), + Steal::Retry => retry = true, + _ => {} } } - None + if retry { + Steal::Retry + } else { + Steal::Empty + } } - /// Get a scheduable work. Called by workers #[inline] - pub fn poll(&self, worker: &GCWorker) -> Box> { - let work = if let Some((work, bucket_is_empty)) = self.pop_scheduable_work(worker) { - if bucket_is_empty { - worker - .sender - .send(CoordinatorMessage::BucketDrained) - .unwrap(); + fn pop_schedulable_work(&self, worker: &GCWorker) -> Option>> { + loop { + match self.pop_schedulable_work_once(worker) { + Steal::Success(w) => { + return Some(w); + } + Steal::Retry => { + std::thread::yield_now(); + continue; + } + Steal::Empty => { + return None; + } } - work - } else { - self.poll_slow(worker) - }; - work + } + } + + /// Get a schedulable work. Called by workers + #[inline] + pub fn poll(&self, worker: &GCWorker) -> Box> { + self.pop_schedulable_work(worker) + .unwrap_or_else(|| self.poll_slow(worker)) } #[cold] @@ -350,32 +367,35 @@ impl GCWorkScheduler { let mut guard = self.worker_monitor.0.lock().unwrap(); loop { debug_assert!(!worker.shared.is_parked()); - if let Some((work, bucket_is_empty)) = self.pop_scheduable_work(worker) { - if bucket_is_empty { - worker - .sender - .send(CoordinatorMessage::BucketDrained) - .unwrap(); - } + if let Some(work) = self.pop_schedulable_work(worker) { return work; } // Park this worker - worker.shared.parked.store(true, Ordering::SeqCst); - if self.all_workers_parked() { - worker - .sender - .send(CoordinatorMessage::AllWorkerParked) - .unwrap(); + let all_parked = self.worker_group.inc_parked_workers(); + if all_parked { + if self.update_buckets() { + self.worker_group.dec_parked_workers(); + // We guarantee that we can at least fetch one packet. + let work = self.pop_schedulable_work(worker).unwrap(); + // Optimize for the case that a newly opened bucket only has one packet. + if !self.all_activated_buckets_are_empty() { + // Have more jobs in this buckets. Notify other workers. + self.worker_monitor.1.notify_all(); + } + return work; + } + worker.sender.send(CoordinatorMessage::Finish).unwrap(); } // Wait guard = self.worker_monitor.1.wait(guard).unwrap(); // Unpark this worker + self.worker_group.dec_parked_workers(); worker.shared.parked.store(false, Ordering::SeqCst); } } pub fn enable_stat(&self) { - for worker in &self.workers_shared { + for worker in &self.worker_group.workers_shared { let worker_stat = worker.borrow_stat(); worker_stat.enable(); } @@ -385,7 +405,7 @@ impl GCWorkScheduler { pub fn statistics(&self) -> HashMap { let mut summary = SchedulerStat::default(); - for worker in &self.workers_shared { + for worker in &self.worker_group.workers_shared { let worker_stat = worker.borrow_stat(); summary.merge(&worker_stat); } @@ -396,8 +416,9 @@ impl GCWorkScheduler { pub fn notify_mutators_paused(&self, mmtk: &'static MMTK) { mmtk.plan.base().gc_requester.clear_request(); - debug_assert!(!self.work_buckets[WorkBucketStage::Prepare].is_activated()); - self.work_buckets[WorkBucketStage::Prepare].activate(); + let first_stw_bucket = self.work_buckets.values().skip(1).next().unwrap(); + debug_assert!(!first_stw_bucket.is_activated()); + first_stw_bucket.activate(); let _guard = self.worker_monitor.0.lock().unwrap(); self.worker_monitor.1.notify_all(); } diff --git a/src/scheduler/work_bucket.rs b/src/scheduler/work_bucket.rs index 269d2f4f07..4f01e2fbf8 100644 --- a/src/scheduler/work_bucket.rs +++ b/src/scheduler/work_bucket.rs @@ -1,143 +1,215 @@ +use super::worker::WorkerGroup; use super::*; use crate::vm::VMBinding; +use crossbeam::deque::{Injector, Steal, Worker}; use enum_map::Enum; -use spin::RwLock; -use std::cmp; -use std::collections::BinaryHeap; -use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; +use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, Condvar, Mutex}; -/// A unique work-packet id for each instance of work-packet -#[derive(Eq, PartialEq, Clone, Copy)] -struct WorkUID(u64); - -impl WorkUID { - pub fn new() -> Self { - static WORK_UID: AtomicU64 = AtomicU64::new(0); - Self(WORK_UID.fetch_add(1, Ordering::Relaxed)) - } -} - -struct PrioritizedWork { - priority: usize, - work_uid: WorkUID, - work: Box>, +struct BucketQueue { + queue: Injector>>, } -impl PrioritizedWork { - pub fn new(priority: usize, work: Box>) -> Self { +impl BucketQueue { + fn new() -> Self { Self { - priority, - work, - work_uid: WorkUID::new(), + queue: Injector::new(), } } -} -impl PartialEq for PrioritizedWork { - fn eq(&self, other: &Self) -> bool { - self.priority == other.priority && self.work_uid == other.work_uid + #[inline(always)] + fn is_empty(&self) -> bool { + self.queue.is_empty() } -} -impl Eq for PrioritizedWork {} + #[inline(always)] + fn steal_batch_and_pop( + &self, + dest: &Worker>>, + ) -> Steal>> { + self.queue.steal_batch_and_pop(dest) + } -impl Ord for PrioritizedWork { - fn cmp(&self, other: &Self) -> cmp::Ordering { - self.priority.cmp(&other.priority) + #[inline(always)] + fn push(&self, w: Box>) { + self.queue.push(w); } -} -impl PartialOrd for PrioritizedWork { - fn partial_cmp(&self, other: &Self) -> Option { - Some(self.cmp(other)) + #[inline(always)] + fn push_all(&self, ws: Vec>>) { + for w in ws { + self.queue.push(w); + } } } pub struct WorkBucket { active: AtomicBool, - /// A priority queue - queue: RwLock>>, + queue: BucketQueue, + prioritized_queue: Option>, monitor: Arc<(Mutex<()>, Condvar)>, can_open: Option) -> bool) + Send>>, + group: Option>>, } impl WorkBucket { pub const DEFAULT_PRIORITY: usize = 1000; + pub fn new(active: bool, monitor: Arc<(Mutex<()>, Condvar)>) -> Self { Self { active: AtomicBool::new(active), - queue: Default::default(), + queue: BucketQueue::new(), + prioritized_queue: None, monitor, can_open: None, + group: None, } } + + pub fn set_group(&mut self, group: Arc>) { + self.group = Some(group) + } + + #[inline(always)] + fn parked_workers(&self) -> Option { + Some(self.group.as_ref()?.parked_workers()) + } + + #[inline(always)] fn notify_one_worker(&self) { - let _guard = self.monitor.0.lock().unwrap(); - self.monitor.1.notify_one() + if !self.is_activated() { + return; + } + if let Some(parked) = self.parked_workers() { + if parked > 0 { + let _guard = self.monitor.0.lock().unwrap(); + self.monitor.1.notify_one() + } + } } - fn notify_all_workers(&self) { - let _guard = self.monitor.0.lock().unwrap(); - self.monitor.1.notify_all() + + #[inline(always)] + pub fn force_notify_all_workers(&self) { + if let Some(parked) = self.parked_workers() { + if parked > 0 { + let _guard = self.monitor.0.lock().unwrap(); + self.monitor.1.notify_all() + } + } } + + #[inline(always)] + pub fn notify_all_workers(&self) { + if !self.is_activated() { + return; + } + if let Some(parked) = self.parked_workers() { + if parked > 0 { + let _guard = self.monitor.0.lock().unwrap(); + self.monitor.1.notify_all() + } + } + } + + #[inline(always)] pub fn is_activated(&self) -> bool { self.active.load(Ordering::SeqCst) } + /// Enable the bucket pub fn activate(&self) { self.active.store(true, Ordering::SeqCst); } + /// Test if the bucket is drained + #[inline(always)] pub fn is_empty(&self) -> bool { - self.queue.read().len() == 0 + self.queue.is_empty() + && self + .prioritized_queue + .as_ref() + .map(|q| q.is_empty()) + .unwrap_or(true) } + + #[inline(always)] pub fn is_drained(&self) -> bool { self.is_activated() && self.is_empty() } + /// Disable the bucket pub fn deactivate(&self) { - debug_assert!( - self.queue.read().is_empty(), - "Bucket not drained before close" - ); + debug_assert!(self.queue.is_empty(), "Bucket not drained before close"); self.active.store(false, Ordering::SeqCst); } - /// Add a work packet to this bucket, with a given priority - pub fn add_with_priority(&self, priority: usize, work: Box>) { - self.queue - .write() - .push(PrioritizedWork::new(priority, work)); - self.notify_one_worker(); // FIXME: Performance + + #[inline(always)] + pub fn add_prioritized(&self, work: Box>) { + self.prioritized_queue.as_ref().unwrap().push(work); + if self.is_activated() && self.parked_workers().map(|c| c > 0).unwrap_or(true) { + self.notify_one_worker(); + } } - /// Add a work packet to this bucket, with a default priority (1000) + + /// Add a work packet to this bucket + #[inline(always)] pub fn add>(&self, work: W) { - self.add_with_priority(Self::DEFAULT_PRIORITY, Box::new(work)); + self.queue.push(Box::new(work)); + if self.is_activated() && self.parked_workers().map(|c| c > 0).unwrap_or(true) { + self.notify_one_worker(); + } } - pub fn bulk_add_with_priority(&self, priority: usize, work_vec: Vec>>) { - { - let mut queue = self.queue.write(); - for w in work_vec { - queue.push(PrioritizedWork::new(priority, w)); - } + + #[inline(always)] + pub fn add_dyn(&self, work: Box>) { + self.queue.push(work); + if self.is_activated() && self.parked_workers().map(|c| c > 0).unwrap_or(true) { + self.notify_one_worker(); + } + } + + #[inline(always)] + pub fn bulk_add_prioritized(&self, work_vec: Vec>>) { + self.prioritized_queue.as_ref().unwrap().push_all(work_vec); + if self.is_activated() { + self.notify_all_workers(); } - self.notify_all_workers(); // FIXME: Performance } + + #[inline(always)] pub fn bulk_add(&self, work_vec: Vec>>) { - self.bulk_add_with_priority(1000, work_vec) + if work_vec.is_empty() { + return; + } + self.queue.push_all(work_vec); + if self.is_activated() { + self.notify_all_workers(); + } } - /// Get a work packet (with the greatest priority) from this bucket - pub fn poll(&self) -> Option>> { - if !self.active.load(Ordering::SeqCst) { - return None; + + /// Get a work packet from this bucket + #[inline(always)] + pub fn poll(&self, worker: &Worker>>) -> Steal>> { + if !self.active.load(Ordering::SeqCst) || self.is_empty() { + return Steal::Empty; + } + if let Some(prioritized_queue) = self.prioritized_queue.as_ref() { + prioritized_queue + .steal_batch_and_pop(worker) + .or_else(|| self.queue.steal_batch_and_pop(worker)) + } else { + self.queue.steal_batch_and_pop(worker) } - self.queue.write().pop().map(|v| v.work) } + pub fn set_open_condition( &mut self, pred: impl Fn(&GCWorkScheduler) -> bool + Send + 'static, ) { self.can_open = Some(Box::new(pred)); } + + #[inline(always)] pub fn update(&self, scheduler: &GCWorkScheduler) -> bool { if let Some(can_open) = self.can_open.as_ref() { if !self.is_activated() && can_open(scheduler) { diff --git a/src/scheduler/worker.rs b/src/scheduler/worker.rs index ce7eb02f61..4a45d7a49e 100644 --- a/src/scheduler/worker.rs +++ b/src/scheduler/worker.rs @@ -4,13 +4,12 @@ use super::*; use crate::mmtk::MMTK; use crate::util::copy::GCWorkerCopyContext; use crate::util::opaque_pointer::*; -use crate::vm::VMBinding; +use crate::vm::{Collection, GCThreadContext, VMBinding}; use atomic_refcell::{AtomicRef, AtomicRefCell, AtomicRefMut}; -use std::sync::atomic::{AtomicBool, Ordering}; +use crossbeam::deque::{Stealer, Worker}; +use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::mpsc::Sender; -use std::sync::{Arc, Condvar, Mutex}; - -const LOCALLY_CACHED_WORKS: usize = 1; +use std::sync::Arc; /// The part shared between a GCWorker and the scheduler. /// This structure is used for communication, e.g. adding new work packets. @@ -19,16 +18,19 @@ pub struct GCWorkerShared { pub parked: AtomicBool, /// Worker-local statistics data. stat: AtomicRefCell>, - /// Incoming work packets to be executed by the current worker. - pub local_work_bucket: WorkBucket, + pub local_work: Worker>>, + /// Cache of work packets created by the current worker. + /// May be flushed to the global pool or executed locally. + pub local_work_buffer: Worker>>, } impl GCWorkerShared { - pub fn new(worker_monitor: Arc<(Mutex<()>, Condvar)>) -> Self { + pub fn new() -> Self { Self { parked: AtomicBool::new(true), stat: Default::default(), - local_work_bucket: WorkBucket::new(true, worker_monitor), + local_work: Worker::new_fifo(), + local_work_buffer: Worker::new_fifo(), } } } @@ -52,9 +54,6 @@ pub struct GCWorker { /// True if this struct is the embedded GCWorker of the controller thread. /// False if this struct belongs to a standalone GCWorker thread. is_coordinator: bool, - /// Cache of work packets created by the current worker. - /// May be flushed to the global pool or executed locally. - local_work_buffer: Vec<(WorkBucketStage, Box>)>, /// Reference to the shared part of the GC worker. It is used for synchronization. pub shared: Arc>, } @@ -98,30 +97,30 @@ impl GCWorker { scheduler, mmtk, is_coordinator, - local_work_buffer: Vec::with_capacity(LOCALLY_CACHED_WORKS), shared, } } #[inline] - pub fn add_work(&mut self, bucket: WorkBucketStage, work: impl GCWork) { - if !self.scheduler().work_buckets[bucket].is_activated() { - self.scheduler.work_buckets[bucket].add_with_priority(1000, Box::new(work)); + pub fn add_work_prioritized(&mut self, bucket: WorkBucketStage, work: impl GCWork) { + if !self.scheduler().work_buckets[bucket].is_activated() + || !self.shared.local_work_buffer.is_empty() + { + self.scheduler.work_buckets[bucket].add_prioritized(Box::new(work)); return; } - self.local_work_buffer.push((bucket, Box::new(work))); - if self.local_work_buffer.len() > LOCALLY_CACHED_WORKS { - self.flush(); - } + self.shared.local_work_buffer.push(Box::new(work)); } - #[cold] - fn flush(&mut self) { - let mut buffer = Vec::with_capacity(LOCALLY_CACHED_WORKS); - std::mem::swap(&mut buffer, &mut self.local_work_buffer); - for (bucket, work) in buffer { - self.scheduler.work_buckets[bucket].add_with_priority(1000, work); + #[inline] + pub fn add_work(&mut self, bucket: WorkBucketStage, work: impl GCWork) { + if !self.scheduler().work_buckets[bucket].is_activated() + || !self.shared.local_work_buffer.is_empty() + { + self.scheduler.work_buckets[bucket].add(work); + return; } + self.shared.local_work_buffer.push(Box::new(work)); } pub fn is_coordinator(&self) -> bool { @@ -140,18 +139,99 @@ impl GCWorker { work.do_work(self, self.mmtk); } + fn poll(&self) -> Box> { + self.shared + .local_work + .pop() + .or_else(|| { + self.shared + .local_work_buffer + .pop() + .or_else(|| Some(self.scheduler().poll(self))) + }) + .unwrap() + } + pub fn run(&mut self, tls: VMWorkerThread, mmtk: &'static MMTK) { self.tls = tls; self.copy = crate::plan::create_gc_worker_context(tls, mmtk); self.shared.parked.store(false, Ordering::SeqCst); loop { - while let Some((bucket, mut work)) = self.local_work_buffer.pop() { - debug_assert!(self.scheduler.work_buckets[bucket].is_activated()); - work.do_work_with_stat(self, mmtk); - } - let mut work = self.scheduler().poll(self); + let mut work = self.poll(); debug_assert!(!self.shared.is_parked()); work.do_work_with_stat(self, mmtk); } } } + +pub struct WorkerGroup { + pub workers_shared: Vec>>, + pub stealers: Vec<(usize, Stealer>>)>, + parked_workers: AtomicUsize, +} + +impl WorkerGroup { + pub fn new(num_workers: usize) -> Arc { + let workers_shared = (0..num_workers) + .map(|_| Arc::new(GCWorkerShared::::new())) + .collect::>(); + + let stealers = workers_shared + .iter() + .zip(0..num_workers) + .map(|(w, ordinal)| (ordinal, w.local_work_buffer.stealer())) + .collect(); + + Arc::new(Self { + workers_shared, + stealers, + parked_workers: Default::default(), + }) + } + + pub fn spawn( + &self, + mmtk: &'static MMTK, + sender: Sender>, + tls: VMThread, + ) { + // Spawn each worker thread. + for (ordinal, shared) in self.workers_shared.iter().enumerate() { + let worker = Box::new(GCWorker::new( + mmtk, + ordinal, + mmtk.scheduler.clone(), + false, + sender.clone(), + shared.clone(), + )); + VM::VMCollection::spawn_gc_thread(tls, GCThreadContext::::Worker(worker)); + } + } + + #[inline(always)] + pub fn worker_count(&self) -> usize { + self.workers_shared.len() + } + + #[inline(always)] + pub fn inc_parked_workers(&self) -> bool { + let old = self.parked_workers.fetch_add(1, Ordering::SeqCst); + old + 1 == self.worker_count() + } + + #[inline(always)] + pub fn dec_parked_workers(&self) { + self.parked_workers.fetch_sub(1, Ordering::SeqCst); + } + + #[inline(always)] + pub fn parked_workers(&self) -> usize { + self.parked_workers.load(Ordering::SeqCst) + } + + #[inline(always)] + pub fn all_parked(&self) -> bool { + self.parked_workers() == self.worker_count() + } +} diff --git a/src/util/sanity/sanity_checker.rs b/src/util/sanity/sanity_checker.rs index e02e67c4d3..cae2ada132 100644 --- a/src/util/sanity/sanity_checker.rs +++ b/src/util/sanity/sanity_checker.rs @@ -109,7 +109,7 @@ impl GCWork for SanityPrepare

{ mmtk.scheduler.work_buckets[WorkBucketStage::Prepare] .add(PrepareMutator::::new(mutator)); } - for w in &mmtk.scheduler.workers_shared { + for w in &mmtk.scheduler.worker_group.workers_shared { w.local_work_bucket.add(PrepareCollector); } } @@ -133,7 +133,7 @@ impl GCWork for SanityRelease

{ mmtk.scheduler.work_buckets[WorkBucketStage::Release] .add(ReleaseMutator::::new(mutator)); } - for w in &mmtk.scheduler.workers_shared { + for w in &mmtk.scheduler.worker_group.workers_shared { w.local_work_bucket.add(ReleaseCollector); } } From 126f465d809333904a34536104bdd1965fe427e0 Mon Sep 17 00:00:00 2001 From: Wenyu Zhao Date: Thu, 12 May 2022 16:43:27 +1000 Subject: [PATCH 02/24] Fix CI --- src/scheduler/scheduler.rs | 26 +++++++------------------- src/scheduler/worker.rs | 11 ++++++++--- src/util/sanity/sanity_checker.rs | 4 ++-- 3 files changed, 17 insertions(+), 24 deletions(-) diff --git a/src/scheduler/scheduler.rs b/src/scheduler/scheduler.rs index 587caf0422..97e8d1b639 100644 --- a/src/scheduler/scheduler.rs +++ b/src/scheduler/scheduler.rs @@ -70,12 +70,7 @@ impl GCWorkScheduler { { // Unconstrained is always open. Prepare will be opened at the beginning of a GC. // This vec will grow for each stage we call with open_next() - let first_stw_stage = work_buckets - .iter() - .skip(1) - .next() - .map(|(id, _)| id) - .unwrap(); + let first_stw_stage = work_buckets.iter().nth(1).map(|(id, _)| id).unwrap(); let mut open_stages: Vec = vec![first_stw_stage]; // The rest will open after the previous stage is done. let stages = work_buckets @@ -264,13 +259,7 @@ impl GCWorkScheduler { } pub fn reset_state(&self) { - let first_stw_stage = self - .work_buckets - .iter() - .skip(1) - .next() - .map(|(id, _)| id) - .unwrap(); + let first_stw_stage = self.work_buckets.iter().nth(1).map(|(id, _)| id).unwrap(); self.work_buckets.iter().for_each(|(id, bkt)| { if id != WorkBucketStage::Unconstrained && id != first_stw_stage { bkt.deactivate(); @@ -308,9 +297,8 @@ impl GCWorkScheduler { #[inline(always)] fn pop_schedulable_work_once(&self, worker: &GCWorker) -> Steal>> { let mut retry = false; - match worker.shared.local_work.pop() { - Some(w) => return Steal::Success(w), - _ => {} + if let Some(w) = worker.shared.local_work.pop() { + return Steal::Success(w); } for work_bucket in self.work_buckets.values() { match work_bucket.poll(&worker.shared.local_work_buffer) { @@ -319,8 +307,8 @@ impl GCWorkScheduler { _ => {} } } - for (id, stealer) in &self.worker_group.stealers { - if *id == worker.ordinal { + for (id, stealer) in self.worker_group.stealers.iter().enumerate() { + if id == worker.ordinal { continue; } match stealer.steal() { @@ -416,7 +404,7 @@ impl GCWorkScheduler { pub fn notify_mutators_paused(&self, mmtk: &'static MMTK) { mmtk.plan.base().gc_requester.clear_request(); - let first_stw_bucket = self.work_buckets.values().skip(1).next().unwrap(); + let first_stw_bucket = self.work_buckets.values().nth(1).unwrap(); debug_assert!(!first_stw_bucket.is_activated()); first_stw_bucket.activate(); let _guard = self.worker_monitor.0.lock().unwrap(); diff --git a/src/scheduler/worker.rs b/src/scheduler/worker.rs index 4a45d7a49e..5aacc842ac 100644 --- a/src/scheduler/worker.rs +++ b/src/scheduler/worker.rs @@ -24,6 +24,12 @@ pub struct GCWorkerShared { pub local_work_buffer: Worker>>, } +impl Default for GCWorkerShared { + fn default() -> Self { + Self::new() + } +} + impl GCWorkerShared { pub fn new() -> Self { Self { @@ -166,7 +172,7 @@ impl GCWorker { pub struct WorkerGroup { pub workers_shared: Vec>>, - pub stealers: Vec<(usize, Stealer>>)>, + pub stealers: Vec>>>, parked_workers: AtomicUsize, } @@ -178,8 +184,7 @@ impl WorkerGroup { let stealers = workers_shared .iter() - .zip(0..num_workers) - .map(|(w, ordinal)| (ordinal, w.local_work_buffer.stealer())) + .map(|worker| worker.local_work_buffer.stealer()) .collect(); Arc::new(Self { diff --git a/src/util/sanity/sanity_checker.rs b/src/util/sanity/sanity_checker.rs index cae2ada132..10069b895e 100644 --- a/src/util/sanity/sanity_checker.rs +++ b/src/util/sanity/sanity_checker.rs @@ -110,7 +110,7 @@ impl GCWork for SanityPrepare

{ .add(PrepareMutator::::new(mutator)); } for w in &mmtk.scheduler.worker_group.workers_shared { - w.local_work_bucket.add(PrepareCollector); + w.local_work.push(Box::new(PrepareCollector)); } } } @@ -134,7 +134,7 @@ impl GCWork for SanityRelease

{ .add(ReleaseMutator::::new(mutator)); } for w in &mmtk.scheduler.worker_group.workers_shared { - w.local_work_bucket.add(ReleaseCollector); + w.local_work.push(Box::new(ReleaseCollector)); } } } From 4d3e75d7d30e14ae450adccad60d48e55ff66f78 Mon Sep 17 00:00:00 2001 From: Wenyu Zhao Date: Fri, 13 May 2022 18:58:54 +1000 Subject: [PATCH 03/24] Add comments --- src/scheduler/scheduler.rs | 33 +++++++++++++++++++++++++-------- src/scheduler/work_bucket.rs | 20 ++++++++++---------- src/scheduler/worker.rs | 32 ++++++++++++++++++++++++++++++-- 3 files changed, 65 insertions(+), 20 deletions(-) diff --git a/src/scheduler/scheduler.rs b/src/scheduler/scheduler.rs index 97e8d1b639..8df0e5d817 100644 --- a/src/scheduler/scheduler.rs +++ b/src/scheduler/scheduler.rs @@ -21,7 +21,7 @@ pub enum CoordinatorMessage { pub struct GCWorkScheduler { /// Work buckets pub work_buckets: EnumMap>, - /// workers + /// Workers pub worker_group: Arc>, /// The shared part of the GC worker object of the controller thread coordinator_worker_shared: Arc>, @@ -233,7 +233,12 @@ impl GCWorkScheduler { self.work_buckets.values().all(|bucket| bucket.is_empty()) } - /// Open buckets if their conditions are met + /// Open buckets if their conditions are met. + /// + /// This function should only be called after all the workers are parked. + /// No workers will be waked up by this function. The caller is responsible for that. + /// + /// Return true if there're any non-empty buckets updated. fn update_buckets(&self) -> bool { let mut buckets_updated = false; let mut new_packets = false; @@ -284,6 +289,7 @@ impl GCWorkScheduler { .unwrap(); } + /// Check if all the work buckets are empty #[inline(always)] fn all_activated_buckets_are_empty(&self) -> bool { for bucket in self.work_buckets.values() { @@ -294,38 +300,44 @@ impl GCWorkScheduler { true } + /// Get a schedulable work packet without retry. #[inline(always)] fn pop_schedulable_work_once(&self, worker: &GCWorker) -> Steal>> { - let mut retry = false; + let mut should_retry = false; + // Try find a packet that can be processed only by this worker. if let Some(w) = worker.shared.local_work.pop() { return Steal::Success(w); } + // Try get a packet from a work bucket. for work_bucket in self.work_buckets.values() { match work_bucket.poll(&worker.shared.local_work_buffer) { Steal::Success(w) => return Steal::Success(w), - Steal::Retry => retry = true, + Steal::Retry => should_retry = true, _ => {} } } + // Try steal some packets from any worker for (id, stealer) in self.worker_group.stealers.iter().enumerate() { if id == worker.ordinal { continue; } match stealer.steal() { Steal::Success(w) => return Steal::Success(w), - Steal::Retry => retry = true, + Steal::Retry => should_retry = true, _ => {} } } - if retry { + if should_retry { Steal::Retry } else { Steal::Empty } } + /// Get a schedulable work packet. #[inline] fn pop_schedulable_work(&self, worker: &GCWorker) -> Option>> { + // Loop until we successfully get a packet. loop { match self.pop_schedulable_work_once(worker) { Steal::Success(w) => { @@ -342,7 +354,8 @@ impl GCWorkScheduler { } } - /// Get a schedulable work. Called by workers + /// Called by workers to get a schedulable work packet. + /// Park the worker if there're no available packets. #[inline] pub fn poll(&self, worker: &GCWorker) -> Box> { self.pop_schedulable_work(worker) @@ -358,20 +371,24 @@ impl GCWorkScheduler { if let Some(work) = self.pop_schedulable_work(worker) { return work; } - // Park this worker + // Prepare to park this worker let all_parked = self.worker_group.inc_parked_workers(); + // If all workers are parked, try activate new buckets if all_parked { if self.update_buckets() { self.worker_group.dec_parked_workers(); // We guarantee that we can at least fetch one packet. let work = self.pop_schedulable_work(worker).unwrap(); // Optimize for the case that a newly opened bucket only has one packet. + // We only notify_all if there're motr than one packets available. if !self.all_activated_buckets_are_empty() { // Have more jobs in this buckets. Notify other workers. self.worker_monitor.1.notify_all(); } + // Return this packet and execute it. return work; } + // The current pause is finished if if we can't open more buckets. worker.sender.send(CoordinatorMessage::Finish).unwrap(); } // Wait diff --git a/src/scheduler/work_bucket.rs b/src/scheduler/work_bucket.rs index 4f01e2fbf8..6527510984 100644 --- a/src/scheduler/work_bucket.rs +++ b/src/scheduler/work_bucket.rs @@ -77,9 +77,11 @@ impl WorkBucket { #[inline(always)] fn notify_one_worker(&self) { + // If the bucket is not activated, don't notify anyone. if !self.is_activated() { return; } + // Notify one if there're any parked workers. if let Some(parked) = self.parked_workers() { if parked > 0 { let _guard = self.monitor.0.lock().unwrap(); @@ -88,21 +90,13 @@ impl WorkBucket { } } - #[inline(always)] - pub fn force_notify_all_workers(&self) { - if let Some(parked) = self.parked_workers() { - if parked > 0 { - let _guard = self.monitor.0.lock().unwrap(); - self.monitor.1.notify_all() - } - } - } - #[inline(always)] pub fn notify_all_workers(&self) { + // If the bucket is not activated, don't notify anyone. if !self.is_activated() { return; } + // Notify all if there're any parked workers. if let Some(parked) = self.parked_workers() { if parked > 0 { let _guard = self.monitor.0.lock().unwrap(); @@ -143,6 +137,8 @@ impl WorkBucket { self.active.store(false, Ordering::SeqCst); } + /// Add a work packet to this bucket + /// Panic if this bucket cannot receive prioritized packets. #[inline(always)] pub fn add_prioritized(&self, work: Box>) { self.prioritized_queue.as_ref().unwrap().push(work); @@ -160,6 +156,7 @@ impl WorkBucket { } } + /// Add a work packet to this bucket #[inline(always)] pub fn add_dyn(&self, work: Box>) { self.queue.push(work); @@ -168,6 +165,8 @@ impl WorkBucket { } } + /// Add multiple packets with a higher priority. + /// Panic if this bucket cannot receive prioritized packets. #[inline(always)] pub fn bulk_add_prioritized(&self, work_vec: Vec>>) { self.prioritized_queue.as_ref().unwrap().push_all(work_vec); @@ -176,6 +175,7 @@ impl WorkBucket { } } + /// Add multiple packets #[inline(always)] pub fn bulk_add(&self, work_vec: Vec>>) { if work_vec.is_empty() { diff --git a/src/scheduler/worker.rs b/src/scheduler/worker.rs index 5aacc842ac..dd12acce0c 100644 --- a/src/scheduler/worker.rs +++ b/src/scheduler/worker.rs @@ -18,9 +18,9 @@ pub struct GCWorkerShared { pub parked: AtomicBool, /// Worker-local statistics data. stat: AtomicRefCell>, + /// A queue of GCWork that can only be processed by the owned thread. pub local_work: Worker>>, - /// Cache of work packets created by the current worker. - /// May be flushed to the global pool or executed locally. + /// Local work packet queue. pub local_work_buffer: Worker>>, } @@ -107,6 +107,9 @@ impl GCWorker { } } + /// Add a work packet to the work queue and mark it with a higher priority. + /// If the bucket is activated, the packet will be pushed to the local queue, otherwise it will be + /// pushed to the global bucket with a higher priority. #[inline] pub fn add_work_prioritized(&mut self, bucket: WorkBucketStage, work: impl GCWork) { if !self.scheduler().work_buckets[bucket].is_activated() @@ -118,6 +121,9 @@ impl GCWorker { self.shared.local_work_buffer.push(Box::new(work)); } + /// Add a work packet to the work queue. + /// If the bucket is activated, the packet will be pushed to the local queue, otherwise it will be + /// pushed to the global bucket. #[inline] pub fn add_work(&mut self, bucket: WorkBucketStage, work: impl GCWork) { if !self.scheduler().work_buckets[bucket].is_activated() @@ -145,6 +151,12 @@ impl GCWorker { work.do_work(self, self.mmtk); } + /// Poll a ready-to-execute work pakcet in the following order: + /// + /// 1. Any packet that should be processed only by this worker. + /// 2. Poll from the local work queue. + /// 3. Poll from activated global work-buckets + /// 4. Steal from other workers fn poll(&self) -> Box> { self.shared .local_work @@ -158,6 +170,8 @@ impl GCWorker { .unwrap() } + /// Entry of the worker thread. + /// Each worker will keep polling and executing work packets in a loop. pub fn run(&mut self, tls: VMWorkerThread, mmtk: &'static MMTK) { self.tls = tls; self.copy = crate::plan::create_gc_worker_context(tls, mmtk); @@ -170,13 +184,17 @@ impl GCWorker { } } +/// A worker group to manage all the GC workers (except the coordinator worker). pub struct WorkerGroup { + /// Shared worker data pub workers_shared: Vec>>, + /// Handles for stealing packets from workers pub stealers: Vec>>>, parked_workers: AtomicUsize, } impl WorkerGroup { + /// Create a WorkerGroup pub fn new(num_workers: usize) -> Arc { let workers_shared = (0..num_workers) .map(|_| Arc::new(GCWorkerShared::::new())) @@ -194,6 +212,7 @@ impl WorkerGroup { }) } + /// Spawn all the worker threads pub fn spawn( &self, mmtk: &'static MMTK, @@ -214,27 +233,36 @@ impl WorkerGroup { } } + /// Get the number of workers in the group #[inline(always)] pub fn worker_count(&self) -> usize { self.workers_shared.len() } + /// Increase the packed-workers counter. + /// Called before a worker is parked. + /// + /// Return true if all the workers are parked. #[inline(always)] pub fn inc_parked_workers(&self) -> bool { let old = self.parked_workers.fetch_add(1, Ordering::SeqCst); old + 1 == self.worker_count() } + /// Decrease the packed-workers counter. + /// Called after a worker is resumed from the parked state. #[inline(always)] pub fn dec_parked_workers(&self) { self.parked_workers.fetch_sub(1, Ordering::SeqCst); } + /// Get the number of parked workers in the group #[inline(always)] pub fn parked_workers(&self) -> usize { self.parked_workers.load(Ordering::SeqCst) } + /// Check if all the workers are packed #[inline(always)] pub fn all_parked(&self) -> bool { self.parked_workers() == self.worker_count() From 80b8497ffec6752b971cbe23a5908e84dd533544 Mon Sep 17 00:00:00 2001 From: Wenyu Zhao Date: Sat, 14 May 2022 20:28:33 +1000 Subject: [PATCH 04/24] optimizations --- src/scheduler/scheduler.rs | 48 +++++++++++++++---------------- src/scheduler/work_bucket.rs | 55 +++++++++++++----------------------- src/scheduler/worker.rs | 6 ++-- 3 files changed, 47 insertions(+), 62 deletions(-) diff --git a/src/scheduler/scheduler.rs b/src/scheduler/scheduler.rs index 8df0e5d817..a1618d0851 100644 --- a/src/scheduler/scheduler.rs +++ b/src/scheduler/scheduler.rs @@ -47,23 +47,24 @@ unsafe impl Sync for GCWorkScheduler {} impl GCWorkScheduler { pub fn new(num_workers: usize) -> Arc { let worker_monitor: Arc<(Mutex<()>, Condvar)> = Default::default(); + let worker_group = WorkerGroup::new(num_workers); // Create work buckets for workers. let mut work_buckets = enum_map! { - WorkBucketStage::Unconstrained => WorkBucket::new(true, worker_monitor.clone()), - WorkBucketStage::Prepare => WorkBucket::new(false, worker_monitor.clone()), - WorkBucketStage::Closure => WorkBucket::new(false, worker_monitor.clone()), - WorkBucketStage::SoftRefClosure => WorkBucket::new(false, worker_monitor.clone()), - WorkBucketStage::WeakRefClosure => WorkBucket::new(false, worker_monitor.clone()), - WorkBucketStage::FinalRefClosure => WorkBucket::new(false, worker_monitor.clone()), - WorkBucketStage::PhantomRefClosure => WorkBucket::new(false, worker_monitor.clone()), - WorkBucketStage::CalculateForwarding => WorkBucket::new(false, worker_monitor.clone()), - WorkBucketStage::SecondRoots => WorkBucket::new(false, worker_monitor.clone()), - WorkBucketStage::RefForwarding => WorkBucket::new(false, worker_monitor.clone()), - WorkBucketStage::FinalizableForwarding => WorkBucket::new(false, worker_monitor.clone()), - WorkBucketStage::Compact => WorkBucket::new(false, worker_monitor.clone()), - WorkBucketStage::Release => WorkBucket::new(false, worker_monitor.clone()), - WorkBucketStage::Final => WorkBucket::new(false, worker_monitor.clone()), + WorkBucketStage::Unconstrained => WorkBucket::new(true, worker_monitor.clone(), worker_group.clone()), + WorkBucketStage::Prepare => WorkBucket::new(false, worker_monitor.clone(), worker_group.clone()), + WorkBucketStage::Closure => WorkBucket::new(false, worker_monitor.clone(), worker_group.clone()), + WorkBucketStage::SoftRefClosure => WorkBucket::new(false, worker_monitor.clone(), worker_group.clone()), + WorkBucketStage::WeakRefClosure => WorkBucket::new(false, worker_monitor.clone(), worker_group.clone()), + WorkBucketStage::FinalRefClosure => WorkBucket::new(false, worker_monitor.clone(), worker_group.clone()), + WorkBucketStage::PhantomRefClosure => WorkBucket::new(false, worker_monitor.clone(), worker_group.clone()), + WorkBucketStage::CalculateForwarding => WorkBucket::new(false, worker_monitor.clone(), worker_group.clone()), + WorkBucketStage::SecondRoots => WorkBucket::new(false, worker_monitor.clone(), worker_group.clone()), + WorkBucketStage::RefForwarding => WorkBucket::new(false, worker_monitor.clone(), worker_group.clone()), + WorkBucketStage::FinalizableForwarding => WorkBucket::new(false, worker_monitor.clone(), worker_group.clone()), + WorkBucketStage::Compact => WorkBucket::new(false, worker_monitor.clone(), worker_group.clone()), + WorkBucketStage::Release => WorkBucket::new(false, worker_monitor.clone(), worker_group.clone()), + WorkBucketStage::Final => WorkBucket::new(false, worker_monitor.clone(), worker_group.clone()), }; // Set the open condition of each bucket. @@ -104,11 +105,6 @@ impl GCWorkScheduler { let coordinator_worker_shared = Arc::new(GCWorkerShared::::new()); - let worker_group = WorkerGroup::new(num_workers); - work_buckets.values_mut().for_each(|bucket| { - bucket.set_group(worker_group.clone()); - }); - Arc::new(Self { work_buckets, worker_group, @@ -302,7 +298,7 @@ impl GCWorkScheduler { /// Get a schedulable work packet without retry. #[inline(always)] - fn pop_schedulable_work_once(&self, worker: &GCWorker) -> Steal>> { + fn poll_schedulable_work_once(&self, worker: &GCWorker) -> Steal>> { let mut should_retry = false; // Try find a packet that can be processed only by this worker. if let Some(w) = worker.shared.local_work.pop() { @@ -336,10 +332,10 @@ impl GCWorkScheduler { /// Get a schedulable work packet. #[inline] - fn pop_schedulable_work(&self, worker: &GCWorker) -> Option>> { + fn poll_schedulable_work(&self, worker: &GCWorker) -> Option>> { // Loop until we successfully get a packet. loop { - match self.pop_schedulable_work_once(worker) { + match self.poll_schedulable_work_once(worker) { Steal::Success(w) => { return Some(w); } @@ -358,7 +354,7 @@ impl GCWorkScheduler { /// Park the worker if there're no available packets. #[inline] pub fn poll(&self, worker: &GCWorker) -> Box> { - self.pop_schedulable_work(worker) + self.poll_schedulable_work(worker) .unwrap_or_else(|| self.poll_slow(worker)) } @@ -368,7 +364,8 @@ impl GCWorkScheduler { let mut guard = self.worker_monitor.0.lock().unwrap(); loop { debug_assert!(!worker.shared.is_parked()); - if let Some(work) = self.pop_schedulable_work(worker) { + // Retry polling + if let Some(work) = self.poll_schedulable_work(worker) { return work; } // Prepare to park this worker @@ -376,9 +373,10 @@ impl GCWorkScheduler { // If all workers are parked, try activate new buckets if all_parked { if self.update_buckets() { + // We're not going to sleep since a new bucket is just open. self.worker_group.dec_parked_workers(); // We guarantee that we can at least fetch one packet. - let work = self.pop_schedulable_work(worker).unwrap(); + let work = self.poll_schedulable_work(worker).unwrap(); // Optimize for the case that a newly opened bucket only has one packet. // We only notify_all if there're motr than one packets available. if !self.all_activated_buckets_are_empty() { diff --git a/src/scheduler/work_bucket.rs b/src/scheduler/work_bucket.rs index 6527510984..d9aa4160c8 100644 --- a/src/scheduler/work_bucket.rs +++ b/src/scheduler/work_bucket.rs @@ -49,32 +49,27 @@ pub struct WorkBucket { prioritized_queue: Option>, monitor: Arc<(Mutex<()>, Condvar)>, can_open: Option) -> bool) + Send>>, - group: Option>>, + group: Arc>, } impl WorkBucket { pub const DEFAULT_PRIORITY: usize = 1000; - pub fn new(active: bool, monitor: Arc<(Mutex<()>, Condvar)>) -> Self { + pub fn new( + active: bool, + monitor: Arc<(Mutex<()>, Condvar)>, + group: Arc>, + ) -> Self { Self { active: AtomicBool::new(active), queue: BucketQueue::new(), prioritized_queue: None, monitor, can_open: None, - group: None, + group, } } - pub fn set_group(&mut self, group: Arc>) { - self.group = Some(group) - } - - #[inline(always)] - fn parked_workers(&self) -> Option { - Some(self.group.as_ref()?.parked_workers()) - } - #[inline(always)] fn notify_one_worker(&self) { // If the bucket is not activated, don't notify anyone. @@ -82,11 +77,9 @@ impl WorkBucket { return; } // Notify one if there're any parked workers. - if let Some(parked) = self.parked_workers() { - if parked > 0 { - let _guard = self.monitor.0.lock().unwrap(); - self.monitor.1.notify_one() - } + if self.group.parked_workers() > 0 { + let _guard = self.monitor.0.lock().unwrap(); + self.monitor.1.notify_one() } } @@ -97,22 +90,20 @@ impl WorkBucket { return; } // Notify all if there're any parked workers. - if let Some(parked) = self.parked_workers() { - if parked > 0 { - let _guard = self.monitor.0.lock().unwrap(); - self.monitor.1.notify_all() - } + if self.group.parked_workers() > 0 { + let _guard = self.monitor.0.lock().unwrap(); + self.monitor.1.notify_all() } } #[inline(always)] pub fn is_activated(&self) -> bool { - self.active.load(Ordering::SeqCst) + self.active.load(Ordering::Relaxed) } /// Enable the bucket pub fn activate(&self) { - self.active.store(true, Ordering::SeqCst); + self.active.store(true, Ordering::Relaxed); } /// Test if the bucket is drained @@ -134,7 +125,7 @@ impl WorkBucket { /// Disable the bucket pub fn deactivate(&self) { debug_assert!(self.queue.is_empty(), "Bucket not drained before close"); - self.active.store(false, Ordering::SeqCst); + self.active.store(false, Ordering::Relaxed); } /// Add a work packet to this bucket @@ -142,27 +133,21 @@ impl WorkBucket { #[inline(always)] pub fn add_prioritized(&self, work: Box>) { self.prioritized_queue.as_ref().unwrap().push(work); - if self.is_activated() && self.parked_workers().map(|c| c > 0).unwrap_or(true) { - self.notify_one_worker(); - } + self.notify_one_worker(); } /// Add a work packet to this bucket #[inline(always)] pub fn add>(&self, work: W) { self.queue.push(Box::new(work)); - if self.is_activated() && self.parked_workers().map(|c| c > 0).unwrap_or(true) { - self.notify_one_worker(); - } + self.notify_one_worker(); } /// Add a work packet to this bucket #[inline(always)] pub fn add_dyn(&self, work: Box>) { self.queue.push(work); - if self.is_activated() && self.parked_workers().map(|c| c > 0).unwrap_or(true) { - self.notify_one_worker(); - } + self.notify_one_worker(); } /// Add multiple packets with a higher priority. @@ -190,7 +175,7 @@ impl WorkBucket { /// Get a work packet from this bucket #[inline(always)] pub fn poll(&self, worker: &Worker>>) -> Steal>> { - if !self.active.load(Ordering::SeqCst) || self.is_empty() { + if !self.is_activated() || self.is_empty() { return Steal::Empty; } if let Some(prioritized_queue) = self.prioritized_queue.as_ref() { diff --git a/src/scheduler/worker.rs b/src/scheduler/worker.rs index dd12acce0c..77f7e1b20a 100644 --- a/src/scheduler/worker.rs +++ b/src/scheduler/worker.rs @@ -11,6 +11,8 @@ use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::mpsc::Sender; use std::sync::Arc; +const LOCALLY_CACHED_PACKETS: usize = 16; + /// The part shared between a GCWorker and the scheduler. /// This structure is used for communication, e.g. adding new work packets. pub struct GCWorkerShared { @@ -113,7 +115,7 @@ impl GCWorker { #[inline] pub fn add_work_prioritized(&mut self, bucket: WorkBucketStage, work: impl GCWork) { if !self.scheduler().work_buckets[bucket].is_activated() - || !self.shared.local_work_buffer.is_empty() + || self.shared.local_work_buffer.len() >= LOCALLY_CACHED_PACKETS { self.scheduler.work_buckets[bucket].add_prioritized(Box::new(work)); return; @@ -127,7 +129,7 @@ impl GCWorker { #[inline] pub fn add_work(&mut self, bucket: WorkBucketStage, work: impl GCWork) { if !self.scheduler().work_buckets[bucket].is_activated() - || !self.shared.local_work_buffer.is_empty() + || self.shared.local_work_buffer.len() >= LOCALLY_CACHED_PACKETS { self.scheduler.work_buckets[bucket].add(work); return; From e6e1d6aa7ae5a333f1001a0402c2dff824b1bd55 Mon Sep 17 00:00:00 2001 From: Wenyu Zhao Date: Mon, 16 May 2022 10:32:39 +1000 Subject: [PATCH 05/24] wip --- src/scheduler/worker.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/scheduler/worker.rs b/src/scheduler/worker.rs index 77f7e1b20a..bfd9f7c813 100644 --- a/src/scheduler/worker.rs +++ b/src/scheduler/worker.rs @@ -11,7 +11,7 @@ use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::mpsc::Sender; use std::sync::Arc; -const LOCALLY_CACHED_PACKETS: usize = 16; +const LOCALLY_CACHED_PACKETS: usize = 1; /// The part shared between a GCWorker and the scheduler. /// This structure is used for communication, e.g. adding new work packets. From 8c0a0297d33d8cec3aa9f2adce9cb06bab50e342 Mon Sep 17 00:00:00 2001 From: Wenyu Zhao Date: Mon, 16 May 2022 12:39:27 +1000 Subject: [PATCH 06/24] optimize --- src/scheduler/gc_work.rs | 6 ++++-- src/scheduler/work_bucket.rs | 11 +++++++++++ src/scheduler/worker.rs | 27 +++++++++++++++++---------- 3 files changed, 32 insertions(+), 12 deletions(-) diff --git a/src/scheduler/gc_work.rs b/src/scheduler/gc_work.rs index aa1581112b..8dbfd478e3 100644 --- a/src/scheduler/gc_work.rs +++ b/src/scheduler/gc_work.rs @@ -51,7 +51,8 @@ impl GCWork for Prepare { .add(PrepareMutator::::new(mutator)); } for w in &mmtk.scheduler.worker_group.workers_shared { - w.local_work.push(Box::new(PrepareCollector)); + let result = w.local_work.push(Box::new(PrepareCollector)); + debug_assert!(result.is_ok()); } } } @@ -119,7 +120,8 @@ impl GCWork for Release { .add(ReleaseMutator::::new(mutator)); } for w in &mmtk.scheduler.worker_group.workers_shared { - w.local_work.push(Box::new(ReleaseCollector)); + let result = w.local_work.push(Box::new(ReleaseCollector)); + debug_assert!(result.is_ok()); } } } diff --git a/src/scheduler/work_bucket.rs b/src/scheduler/work_bucket.rs index d9aa4160c8..b3834f24b7 100644 --- a/src/scheduler/work_bucket.rs +++ b/src/scheduler/work_bucket.rs @@ -150,6 +150,17 @@ impl WorkBucket { self.notify_one_worker(); } + /// Add a work packet to this bucket, but not notify any workers. + /// Used internally by the scheduler and workers. + #[inline(always)] + pub(super) fn add_dyn_no_notify(&self, work: Box>, prioritized: bool) { + if !prioritized { + self.queue.push(work); + } else { + self.prioritized_queue.as_ref().unwrap().push(work); + } + } + /// Add multiple packets with a higher priority. /// Panic if this bucket cannot receive prioritized packets. #[inline(always)] diff --git a/src/scheduler/worker.rs b/src/scheduler/worker.rs index bfd9f7c813..d26006402c 100644 --- a/src/scheduler/worker.rs +++ b/src/scheduler/worker.rs @@ -7,12 +7,11 @@ use crate::util::opaque_pointer::*; use crate::vm::{Collection, GCThreadContext, VMBinding}; use atomic_refcell::{AtomicRef, AtomicRefCell, AtomicRefMut}; use crossbeam::deque::{Stealer, Worker}; +use crossbeam::queue::ArrayQueue; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::mpsc::Sender; use std::sync::Arc; -const LOCALLY_CACHED_PACKETS: usize = 1; - /// The part shared between a GCWorker and the scheduler. /// This structure is used for communication, e.g. adding new work packets. pub struct GCWorkerShared { @@ -21,7 +20,7 @@ pub struct GCWorkerShared { /// Worker-local statistics data. stat: AtomicRefCell>, /// A queue of GCWork that can only be processed by the owned thread. - pub local_work: Worker>>, + pub local_work: ArrayQueue>>, /// Local work packet queue. pub local_work_buffer: Worker>>, } @@ -37,7 +36,7 @@ impl GCWorkerShared { Self { parked: AtomicBool::new(true), stat: Default::default(), - local_work: Worker::new_fifo(), + local_work: ArrayQueue::new(16), local_work_buffer: Worker::new_fifo(), } } @@ -114,13 +113,17 @@ impl GCWorker { /// pushed to the global bucket with a higher priority. #[inline] pub fn add_work_prioritized(&mut self, bucket: WorkBucketStage, work: impl GCWork) { - if !self.scheduler().work_buckets[bucket].is_activated() - || self.shared.local_work_buffer.len() >= LOCALLY_CACHED_PACKETS - { + if !self.scheduler().work_buckets[bucket].is_activated() { self.scheduler.work_buckets[bucket].add_prioritized(Box::new(work)); return; } self.shared.local_work_buffer.push(Box::new(work)); + if self.shared.local_work_buffer.len() > 512 { + while let Some(w) = self.shared.local_work_buffer.pop() { + self.scheduler.work_buckets[bucket].add_dyn_no_notify(w, true); + } + self.scheduler.work_buckets[bucket].notify_all_workers(); + } } /// Add a work packet to the work queue. @@ -128,13 +131,17 @@ impl GCWorker { /// pushed to the global bucket. #[inline] pub fn add_work(&mut self, bucket: WorkBucketStage, work: impl GCWork) { - if !self.scheduler().work_buckets[bucket].is_activated() - || self.shared.local_work_buffer.len() >= LOCALLY_CACHED_PACKETS - { + if !self.scheduler().work_buckets[bucket].is_activated() { self.scheduler.work_buckets[bucket].add(work); return; } self.shared.local_work_buffer.push(Box::new(work)); + if self.shared.local_work_buffer.len() > 512 { + while let Some(w) = self.shared.local_work_buffer.pop() { + self.scheduler.work_buckets[bucket].add_dyn_no_notify(w, false); + } + self.scheduler.work_buckets[bucket].notify_all_workers(); + } } pub fn is_coordinator(&self) -> bool { From c63b94c11f1f2f9164c5523e2be31b78b6981c27 Mon Sep 17 00:00:00 2001 From: Wenyu Zhao Date: Thu, 19 May 2022 13:14:19 +1000 Subject: [PATCH 07/24] revert --- src/scheduler/scheduler.rs | 34 +++++++++++++++++++--------------- src/scheduler/work_bucket.rs | 23 ++++++++++++++--------- 2 files changed, 33 insertions(+), 24 deletions(-) diff --git a/src/scheduler/scheduler.rs b/src/scheduler/scheduler.rs index a1618d0851..1b99e86ffe 100644 --- a/src/scheduler/scheduler.rs +++ b/src/scheduler/scheduler.rs @@ -47,24 +47,23 @@ unsafe impl Sync for GCWorkScheduler {} impl GCWorkScheduler { pub fn new(num_workers: usize) -> Arc { let worker_monitor: Arc<(Mutex<()>, Condvar)> = Default::default(); - let worker_group = WorkerGroup::new(num_workers); // Create work buckets for workers. let mut work_buckets = enum_map! { - WorkBucketStage::Unconstrained => WorkBucket::new(true, worker_monitor.clone(), worker_group.clone()), - WorkBucketStage::Prepare => WorkBucket::new(false, worker_monitor.clone(), worker_group.clone()), - WorkBucketStage::Closure => WorkBucket::new(false, worker_monitor.clone(), worker_group.clone()), - WorkBucketStage::SoftRefClosure => WorkBucket::new(false, worker_monitor.clone(), worker_group.clone()), - WorkBucketStage::WeakRefClosure => WorkBucket::new(false, worker_monitor.clone(), worker_group.clone()), - WorkBucketStage::FinalRefClosure => WorkBucket::new(false, worker_monitor.clone(), worker_group.clone()), - WorkBucketStage::PhantomRefClosure => WorkBucket::new(false, worker_monitor.clone(), worker_group.clone()), - WorkBucketStage::CalculateForwarding => WorkBucket::new(false, worker_monitor.clone(), worker_group.clone()), - WorkBucketStage::SecondRoots => WorkBucket::new(false, worker_monitor.clone(), worker_group.clone()), - WorkBucketStage::RefForwarding => WorkBucket::new(false, worker_monitor.clone(), worker_group.clone()), - WorkBucketStage::FinalizableForwarding => WorkBucket::new(false, worker_monitor.clone(), worker_group.clone()), - WorkBucketStage::Compact => WorkBucket::new(false, worker_monitor.clone(), worker_group.clone()), - WorkBucketStage::Release => WorkBucket::new(false, worker_monitor.clone(), worker_group.clone()), - WorkBucketStage::Final => WorkBucket::new(false, worker_monitor.clone(), worker_group.clone()), + WorkBucketStage::Unconstrained => WorkBucket::new(true, worker_monitor.clone()), + WorkBucketStage::Prepare => WorkBucket::new(false, worker_monitor.clone()), + WorkBucketStage::Closure => WorkBucket::new(false, worker_monitor.clone()), + WorkBucketStage::SoftRefClosure => WorkBucket::new(false, worker_monitor.clone()), + WorkBucketStage::WeakRefClosure => WorkBucket::new(false, worker_monitor.clone()), + WorkBucketStage::FinalRefClosure => WorkBucket::new(false, worker_monitor.clone()), + WorkBucketStage::PhantomRefClosure => WorkBucket::new(false, worker_monitor.clone()), + WorkBucketStage::CalculateForwarding => WorkBucket::new(false, worker_monitor.clone()), + WorkBucketStage::SecondRoots => WorkBucket::new(false, worker_monitor.clone()), + WorkBucketStage::RefForwarding => WorkBucket::new(false, worker_monitor.clone()), + WorkBucketStage::FinalizableForwarding => WorkBucket::new(false, worker_monitor.clone()), + WorkBucketStage::Compact => WorkBucket::new(false, worker_monitor.clone()), + WorkBucketStage::Release => WorkBucket::new(false, worker_monitor.clone()), + WorkBucketStage::Final => WorkBucket::new(false, worker_monitor.clone()), }; // Set the open condition of each bucket. @@ -105,6 +104,11 @@ impl GCWorkScheduler { let coordinator_worker_shared = Arc::new(GCWorkerShared::::new()); + let worker_group = WorkerGroup::new(num_workers); + work_buckets.values_mut().for_each(|bucket| { + bucket.set_group(worker_group.clone()); + }); + Arc::new(Self { work_buckets, worker_group, diff --git a/src/scheduler/work_bucket.rs b/src/scheduler/work_bucket.rs index b3834f24b7..ed0a268ab6 100644 --- a/src/scheduler/work_bucket.rs +++ b/src/scheduler/work_bucket.rs @@ -49,27 +49,32 @@ pub struct WorkBucket { prioritized_queue: Option>, monitor: Arc<(Mutex<()>, Condvar)>, can_open: Option) -> bool) + Send>>, - group: Arc>, + group: Option>>, } impl WorkBucket { pub const DEFAULT_PRIORITY: usize = 1000; - pub fn new( - active: bool, - monitor: Arc<(Mutex<()>, Condvar)>, - group: Arc>, - ) -> Self { + pub fn new(active: bool, monitor: Arc<(Mutex<()>, Condvar)>) -> Self { Self { active: AtomicBool::new(active), queue: BucketQueue::new(), prioritized_queue: None, monitor, can_open: None, - group, + group: None, } } + pub fn set_group(&mut self, group: Arc>) { + self.group = Some(group) + } + + #[inline(always)] + fn parked_workers(&self) -> usize { + self.group.as_ref().unwrap().parked_workers() + } + #[inline(always)] fn notify_one_worker(&self) { // If the bucket is not activated, don't notify anyone. @@ -77,7 +82,7 @@ impl WorkBucket { return; } // Notify one if there're any parked workers. - if self.group.parked_workers() > 0 { + if self.parked_workers() > 0 { let _guard = self.monitor.0.lock().unwrap(); self.monitor.1.notify_one() } @@ -90,7 +95,7 @@ impl WorkBucket { return; } // Notify all if there're any parked workers. - if self.group.parked_workers() > 0 { + if self.parked_workers() > 0 { let _guard = self.monitor.0.lock().unwrap(); self.monitor.1.notify_all() } From e20f4c8378456906bb246fa6ced7f081cb12c918 Mon Sep 17 00:00:00 2001 From: Wenyu Zhao Date: Thu, 19 May 2022 15:18:03 +1000 Subject: [PATCH 08/24] Revert "revert" This reverts commit c63b94c11f1f2f9164c5523e2be31b78b6981c27. --- src/scheduler/scheduler.rs | 34 +++++++++++++++------------------- src/scheduler/work_bucket.rs | 23 +++++++++-------------- 2 files changed, 24 insertions(+), 33 deletions(-) diff --git a/src/scheduler/scheduler.rs b/src/scheduler/scheduler.rs index 1b99e86ffe..a1618d0851 100644 --- a/src/scheduler/scheduler.rs +++ b/src/scheduler/scheduler.rs @@ -47,23 +47,24 @@ unsafe impl Sync for GCWorkScheduler {} impl GCWorkScheduler { pub fn new(num_workers: usize) -> Arc { let worker_monitor: Arc<(Mutex<()>, Condvar)> = Default::default(); + let worker_group = WorkerGroup::new(num_workers); // Create work buckets for workers. let mut work_buckets = enum_map! { - WorkBucketStage::Unconstrained => WorkBucket::new(true, worker_monitor.clone()), - WorkBucketStage::Prepare => WorkBucket::new(false, worker_monitor.clone()), - WorkBucketStage::Closure => WorkBucket::new(false, worker_monitor.clone()), - WorkBucketStage::SoftRefClosure => WorkBucket::new(false, worker_monitor.clone()), - WorkBucketStage::WeakRefClosure => WorkBucket::new(false, worker_monitor.clone()), - WorkBucketStage::FinalRefClosure => WorkBucket::new(false, worker_monitor.clone()), - WorkBucketStage::PhantomRefClosure => WorkBucket::new(false, worker_monitor.clone()), - WorkBucketStage::CalculateForwarding => WorkBucket::new(false, worker_monitor.clone()), - WorkBucketStage::SecondRoots => WorkBucket::new(false, worker_monitor.clone()), - WorkBucketStage::RefForwarding => WorkBucket::new(false, worker_monitor.clone()), - WorkBucketStage::FinalizableForwarding => WorkBucket::new(false, worker_monitor.clone()), - WorkBucketStage::Compact => WorkBucket::new(false, worker_monitor.clone()), - WorkBucketStage::Release => WorkBucket::new(false, worker_monitor.clone()), - WorkBucketStage::Final => WorkBucket::new(false, worker_monitor.clone()), + WorkBucketStage::Unconstrained => WorkBucket::new(true, worker_monitor.clone(), worker_group.clone()), + WorkBucketStage::Prepare => WorkBucket::new(false, worker_monitor.clone(), worker_group.clone()), + WorkBucketStage::Closure => WorkBucket::new(false, worker_monitor.clone(), worker_group.clone()), + WorkBucketStage::SoftRefClosure => WorkBucket::new(false, worker_monitor.clone(), worker_group.clone()), + WorkBucketStage::WeakRefClosure => WorkBucket::new(false, worker_monitor.clone(), worker_group.clone()), + WorkBucketStage::FinalRefClosure => WorkBucket::new(false, worker_monitor.clone(), worker_group.clone()), + WorkBucketStage::PhantomRefClosure => WorkBucket::new(false, worker_monitor.clone(), worker_group.clone()), + WorkBucketStage::CalculateForwarding => WorkBucket::new(false, worker_monitor.clone(), worker_group.clone()), + WorkBucketStage::SecondRoots => WorkBucket::new(false, worker_monitor.clone(), worker_group.clone()), + WorkBucketStage::RefForwarding => WorkBucket::new(false, worker_monitor.clone(), worker_group.clone()), + WorkBucketStage::FinalizableForwarding => WorkBucket::new(false, worker_monitor.clone(), worker_group.clone()), + WorkBucketStage::Compact => WorkBucket::new(false, worker_monitor.clone(), worker_group.clone()), + WorkBucketStage::Release => WorkBucket::new(false, worker_monitor.clone(), worker_group.clone()), + WorkBucketStage::Final => WorkBucket::new(false, worker_monitor.clone(), worker_group.clone()), }; // Set the open condition of each bucket. @@ -104,11 +105,6 @@ impl GCWorkScheduler { let coordinator_worker_shared = Arc::new(GCWorkerShared::::new()); - let worker_group = WorkerGroup::new(num_workers); - work_buckets.values_mut().for_each(|bucket| { - bucket.set_group(worker_group.clone()); - }); - Arc::new(Self { work_buckets, worker_group, diff --git a/src/scheduler/work_bucket.rs b/src/scheduler/work_bucket.rs index ed0a268ab6..b3834f24b7 100644 --- a/src/scheduler/work_bucket.rs +++ b/src/scheduler/work_bucket.rs @@ -49,32 +49,27 @@ pub struct WorkBucket { prioritized_queue: Option>, monitor: Arc<(Mutex<()>, Condvar)>, can_open: Option) -> bool) + Send>>, - group: Option>>, + group: Arc>, } impl WorkBucket { pub const DEFAULT_PRIORITY: usize = 1000; - pub fn new(active: bool, monitor: Arc<(Mutex<()>, Condvar)>) -> Self { + pub fn new( + active: bool, + monitor: Arc<(Mutex<()>, Condvar)>, + group: Arc>, + ) -> Self { Self { active: AtomicBool::new(active), queue: BucketQueue::new(), prioritized_queue: None, monitor, can_open: None, - group: None, + group, } } - pub fn set_group(&mut self, group: Arc>) { - self.group = Some(group) - } - - #[inline(always)] - fn parked_workers(&self) -> usize { - self.group.as_ref().unwrap().parked_workers() - } - #[inline(always)] fn notify_one_worker(&self) { // If the bucket is not activated, don't notify anyone. @@ -82,7 +77,7 @@ impl WorkBucket { return; } // Notify one if there're any parked workers. - if self.parked_workers() > 0 { + if self.group.parked_workers() > 0 { let _guard = self.monitor.0.lock().unwrap(); self.monitor.1.notify_one() } @@ -95,7 +90,7 @@ impl WorkBucket { return; } // Notify all if there're any parked workers. - if self.parked_workers() > 0 { + if self.group.parked_workers() > 0 { let _guard = self.monitor.0.lock().unwrap(); self.monitor.1.notify_all() } From 08502b60cbae4c720791db0df0c1b03a50a865f3 Mon Sep 17 00:00:00 2001 From: Wenyu Zhao Date: Thu, 19 May 2022 21:59:54 +1000 Subject: [PATCH 09/24] minor --- src/scheduler/scheduler.rs | 2 +- src/scheduler/worker.rs | 10 +++++----- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/src/scheduler/scheduler.rs b/src/scheduler/scheduler.rs index a1618d0851..f29c4a9a7c 100644 --- a/src/scheduler/scheduler.rs +++ b/src/scheduler/scheduler.rs @@ -393,7 +393,7 @@ impl GCWorkScheduler { guard = self.worker_monitor.1.wait(guard).unwrap(); // Unpark this worker self.worker_group.dec_parked_workers(); - worker.shared.parked.store(false, Ordering::SeqCst); + worker.shared.parked.store(false, Ordering::Relaxed); } } diff --git a/src/scheduler/worker.rs b/src/scheduler/worker.rs index d26006402c..f25960bc30 100644 --- a/src/scheduler/worker.rs +++ b/src/scheduler/worker.rs @@ -74,7 +74,7 @@ const STAT_BORROWED_MSG: &str = "GCWorkerShared.stat is already borrowed. This impl GCWorkerShared { pub fn is_parked(&self) -> bool { - self.parked.load(Ordering::SeqCst) + self.parked.load(Ordering::Relaxed) } pub fn borrow_stat(&self) -> AtomicRef> { @@ -184,7 +184,7 @@ impl GCWorker { pub fn run(&mut self, tls: VMWorkerThread, mmtk: &'static MMTK) { self.tls = tls; self.copy = crate::plan::create_gc_worker_context(tls, mmtk); - self.shared.parked.store(false, Ordering::SeqCst); + self.shared.parked.store(false, Ordering::Relaxed); loop { let mut work = self.poll(); debug_assert!(!self.shared.is_parked()); @@ -254,7 +254,7 @@ impl WorkerGroup { /// Return true if all the workers are parked. #[inline(always)] pub fn inc_parked_workers(&self) -> bool { - let old = self.parked_workers.fetch_add(1, Ordering::SeqCst); + let old = self.parked_workers.fetch_add(1, Ordering::Relaxed); old + 1 == self.worker_count() } @@ -262,13 +262,13 @@ impl WorkerGroup { /// Called after a worker is resumed from the parked state. #[inline(always)] pub fn dec_parked_workers(&self) { - self.parked_workers.fetch_sub(1, Ordering::SeqCst); + self.parked_workers.fetch_sub(1, Ordering::Relaxed); } /// Get the number of parked workers in the group #[inline(always)] pub fn parked_workers(&self) -> usize { - self.parked_workers.load(Ordering::SeqCst) + self.parked_workers.load(Ordering::Relaxed) } /// Check if all the workers are packed From f7a941eb0c1ddfee676a440debfca5d664a6c6bd Mon Sep 17 00:00:00 2001 From: Wenyu Zhao Date: Mon, 23 May 2022 14:43:46 +1000 Subject: [PATCH 10/24] minor --- src/scheduler/work_bucket.rs | 11 ----------- src/scheduler/worker.rs | 22 ++++++++-------------- 2 files changed, 8 insertions(+), 25 deletions(-) diff --git a/src/scheduler/work_bucket.rs b/src/scheduler/work_bucket.rs index 9f3f79e672..15349fa6b1 100644 --- a/src/scheduler/work_bucket.rs +++ b/src/scheduler/work_bucket.rs @@ -150,17 +150,6 @@ impl WorkBucket { self.notify_one_worker(); } - /// Add a work packet to this bucket, but not notify any workers. - /// Used internally by the scheduler and workers. - #[inline(always)] - pub(super) fn add_boxed_no_notify(&self, work: Box>, prioritized: bool) { - if !prioritized { - self.queue.push(work); - } else { - self.prioritized_queue.as_ref().unwrap().push(work); - } - } - /// Add multiple packets with a higher priority. /// Panic if this bucket cannot receive prioritized packets. #[inline(always)] diff --git a/src/scheduler/worker.rs b/src/scheduler/worker.rs index 22fec85480..1505d6b922 100644 --- a/src/scheduler/worker.rs +++ b/src/scheduler/worker.rs @@ -108,22 +108,20 @@ impl GCWorker { } } + const LOCALLY_CACHED_WORK_PACKETS: usize = 16; + /// Add a work packet to the work queue and mark it with a higher priority. /// If the bucket is activated, the packet will be pushed to the local queue, otherwise it will be /// pushed to the global bucket with a higher priority. #[inline] pub fn add_work_prioritized(&mut self, bucket: WorkBucketStage, work: impl GCWork) { - if !self.scheduler().work_buckets[bucket].is_activated() { + if !self.scheduler().work_buckets[bucket].is_activated() + || self.shared.local_work_buffer.len() >= Self::LOCALLY_CACHED_WORK_PACKETS + { self.scheduler.work_buckets[bucket].add_prioritized(Box::new(work)); return; } self.shared.local_work_buffer.push(Box::new(work)); - if self.shared.local_work_buffer.len() > 512 { - while let Some(w) = self.shared.local_work_buffer.pop() { - self.scheduler.work_buckets[bucket].add_boxed_no_notify(w, true); - } - self.scheduler.work_buckets[bucket].notify_all_workers(); - } } /// Add a work packet to the work queue. @@ -131,17 +129,13 @@ impl GCWorker { /// pushed to the global bucket. #[inline] pub fn add_work(&mut self, bucket: WorkBucketStage, work: impl GCWork) { - if !self.scheduler().work_buckets[bucket].is_activated() { + if !self.scheduler().work_buckets[bucket].is_activated() + || self.shared.local_work_buffer.len() >= Self::LOCALLY_CACHED_WORK_PACKETS + { self.scheduler.work_buckets[bucket].add(work); return; } self.shared.local_work_buffer.push(Box::new(work)); - if self.shared.local_work_buffer.len() > 512 { - while let Some(w) = self.shared.local_work_buffer.pop() { - self.scheduler.work_buckets[bucket].add_boxed_no_notify(w, false); - } - self.scheduler.work_buckets[bucket].notify_all_workers(); - } } pub fn is_coordinator(&self) -> bool { From fae3b20d9d1b8f804852d4a546f99377b8019cae Mon Sep 17 00:00:00 2001 From: Wenyu Zhao Date: Mon, 23 May 2022 14:44:56 +1000 Subject: [PATCH 11/24] Fix CI --- src/util/sanity/sanity_checker.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/util/sanity/sanity_checker.rs b/src/util/sanity/sanity_checker.rs index 10069b895e..f333338421 100644 --- a/src/util/sanity/sanity_checker.rs +++ b/src/util/sanity/sanity_checker.rs @@ -110,7 +110,8 @@ impl GCWork for SanityPrepare

{ .add(PrepareMutator::::new(mutator)); } for w in &mmtk.scheduler.worker_group.workers_shared { - w.local_work.push(Box::new(PrepareCollector)); + let result = w.local_work.push(Box::new(PrepareCollector)); + debug_assert!(result.is_ok()); } } } @@ -134,7 +135,8 @@ impl GCWork for SanityRelease

{ .add(ReleaseMutator::::new(mutator)); } for w in &mmtk.scheduler.worker_group.workers_shared { - w.local_work.push(Box::new(ReleaseCollector)); + let result = w.local_work.push(Box::new(ReleaseCollector)); + debug_assert!(result.is_ok()); } } } From 7402d7ba838f3faa35a2c32187196a72f9d67d4f Mon Sep 17 00:00:00 2001 From: Wenyu Zhao Date: Tue, 24 May 2022 11:07:22 +1000 Subject: [PATCH 12/24] minor --- src/scheduler/controller.rs | 20 +++++++++++++------- src/scheduler/scheduler.rs | 20 ++++++++++---------- src/scheduler/work_bucket.rs | 2 -- src/scheduler/worker.rs | 23 ++++++++--------------- 4 files changed, 31 insertions(+), 34 deletions(-) diff --git a/src/scheduler/controller.rs b/src/scheduler/controller.rs index 9cfabe9dd8..8b7f30731a 100644 --- a/src/scheduler/controller.rs +++ b/src/scheduler/controller.rs @@ -76,16 +76,22 @@ impl GCController { // Drain the message queue and execute coordinator work. loop { - let message = self.receiver.recv().unwrap(); - match message { + match self.receiver.recv().unwrap() { CoordinatorMessage::Work(mut work) => { work.do_work_with_stat(worker, mmtk); } - CoordinatorMessage::Finish => {} - } - let _guard = self.scheduler.worker_monitor.0.lock().unwrap(); - if self.scheduler.worker_group.all_parked() && self.scheduler.all_buckets_empty() { - break; + CoordinatorMessage::Finish => { + // Quit only if all the buckets are empty. + // For concurrent GCs, the coordinator thread may receive this message when + // some buckets are still not empty. Under such case, the coordinator + // should ignore the message. + let _guard = self.scheduler.worker_monitor.0.lock().unwrap(); + if self.scheduler.worker_group.all_parked() + && self.scheduler.all_buckets_empty() + { + break; + } + } } } for message in self.receiver.try_iter() { diff --git a/src/scheduler/scheduler.rs b/src/scheduler/scheduler.rs index f29c4a9a7c..d514e33dd4 100644 --- a/src/scheduler/scheduler.rs +++ b/src/scheduler/scheduler.rs @@ -9,12 +9,15 @@ use crate::vm::{GCThreadContext, VMBinding}; use crossbeam::deque::Steal; use enum_map::{enum_map, EnumMap}; use std::collections::HashMap; -use std::sync::atomic::Ordering; use std::sync::mpsc::channel; use std::sync::{Arc, Condvar, Mutex}; pub enum CoordinatorMessage { + /// Send a work-packet to the coordinator thread/ Work(Box>), + /// Notify the coordinator thread that all GC tasks are finished. + /// When sending this message, all the work buckets should be + /// empty, and all the workers should be parked. Finish, } @@ -96,9 +99,9 @@ impl GCWorkScheduler { open_stages.push(s); }; - for stages in stages { - if stages != WorkBucketStage::Unconstrained && stages != first_stw_stage { - open_next(stages); + for stage in stages { + if stage != WorkBucketStage::Unconstrained && stage != first_stw_stage { + open_next(stage); } } } @@ -127,7 +130,7 @@ impl GCWorkScheduler { // Spawn the controller thread. let coordinator_worker = GCWorker::new( mmtk, - 0, + usize::MAX, self.clone(), true, sender.clone(), @@ -360,10 +363,8 @@ impl GCWorkScheduler { #[cold] fn poll_slow(&self, worker: &GCWorker) -> Box> { - debug_assert!(!worker.shared.is_parked()); let mut guard = self.worker_monitor.0.lock().unwrap(); loop { - debug_assert!(!worker.shared.is_parked()); // Retry polling if let Some(work) = self.poll_schedulable_work(worker) { return work; @@ -378,7 +379,7 @@ impl GCWorkScheduler { // We guarantee that we can at least fetch one packet. let work = self.poll_schedulable_work(worker).unwrap(); // Optimize for the case that a newly opened bucket only has one packet. - // We only notify_all if there're motr than one packets available. + // We only notify_all if there're more than one packets available. if !self.all_activated_buckets_are_empty() { // Have more jobs in this buckets. Notify other workers. self.worker_monitor.1.notify_all(); @@ -386,14 +387,13 @@ impl GCWorkScheduler { // Return this packet and execute it. return work; } - // The current pause is finished if if we can't open more buckets. + // The current pause is finished if we can't open more buckets. worker.sender.send(CoordinatorMessage::Finish).unwrap(); } // Wait guard = self.worker_monitor.1.wait(guard).unwrap(); // Unpark this worker self.worker_group.dec_parked_workers(); - worker.shared.parked.store(false, Ordering::Relaxed); } } diff --git a/src/scheduler/work_bucket.rs b/src/scheduler/work_bucket.rs index 15349fa6b1..11bc67412d 100644 --- a/src/scheduler/work_bucket.rs +++ b/src/scheduler/work_bucket.rs @@ -53,8 +53,6 @@ pub struct WorkBucket { } impl WorkBucket { - pub const DEFAULT_PRIORITY: usize = 1000; - pub fn new( active: bool, monitor: Arc<(Mutex<()>, Condvar)>, diff --git a/src/scheduler/worker.rs b/src/scheduler/worker.rs index 1505d6b922..dabfb35230 100644 --- a/src/scheduler/worker.rs +++ b/src/scheduler/worker.rs @@ -6,23 +6,21 @@ use crate::util::copy::GCWorkerCopyContext; use crate::util::opaque_pointer::*; use crate::vm::{Collection, GCThreadContext, VMBinding}; use atomic_refcell::{AtomicRef, AtomicRefCell, AtomicRefMut}; -use crossbeam::deque::{Stealer, Worker}; +use crossbeam::deque::{self, Stealer}; use crossbeam::queue::ArrayQueue; -use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; +use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::mpsc::Sender; use std::sync::Arc; /// The part shared between a GCWorker and the scheduler. /// This structure is used for communication, e.g. adding new work packets. pub struct GCWorkerShared { - /// True if the GC worker is parked. - pub parked: AtomicBool, /// Worker-local statistics data. stat: AtomicRefCell>, /// A queue of GCWork that can only be processed by the owned thread. pub local_work: ArrayQueue>>, /// Local work packet queue. - pub local_work_buffer: Worker>>, + pub local_work_buffer: deque::Worker>>, } impl Default for GCWorkerShared { @@ -34,10 +32,9 @@ impl Default for GCWorkerShared { impl GCWorkerShared { pub fn new() -> Self { Self { - parked: AtomicBool::new(true), stat: Default::default(), local_work: ArrayQueue::new(16), - local_work_buffer: Worker::new_fifo(), + local_work_buffer: deque::Worker::new_fifo(), } } } @@ -73,10 +70,6 @@ const STAT_BORROWED_MSG: &str = "GCWorkerShared.stat is already borrowed. This the mutator calls harness_begin or harness_end while the GC is running."; impl GCWorkerShared { - pub fn is_parked(&self) -> bool { - self.parked.load(Ordering::Relaxed) - } - pub fn borrow_stat(&self) -> AtomicRef> { self.stat.try_borrow().expect(STAT_BORROWED_MSG) } @@ -154,7 +147,7 @@ impl GCWorker { work.do_work(self, self.mmtk); } - /// Poll a ready-to-execute work pakcet in the following order: + /// Poll a ready-to-execute work packet in the following order: /// /// 1. Any packet that should be processed only by this worker. /// 2. Poll from the local work queue. @@ -182,10 +175,8 @@ impl GCWorker { pub fn run(&mut self, tls: VMWorkerThread, mmtk: &'static MMTK) { self.tls = tls; self.copy = crate::plan::create_gc_worker_context(tls, mmtk); - self.shared.parked.store(false, Ordering::Relaxed); loop { let mut work = self.poll(); - debug_assert!(!self.shared.is_parked()); work.do_work_with_stat(self, mmtk); } } @@ -253,6 +244,7 @@ impl WorkerGroup { #[inline(always)] pub fn inc_parked_workers(&self) -> bool { let old = self.parked_workers.fetch_add(1, Ordering::Relaxed); + debug_assert!(old < self.worker_count()); old + 1 == self.worker_count() } @@ -260,7 +252,8 @@ impl WorkerGroup { /// Called after a worker is resumed from the parked state. #[inline(always)] pub fn dec_parked_workers(&self) { - self.parked_workers.fetch_sub(1, Ordering::Relaxed); + let old = self.parked_workers.fetch_sub(1, Ordering::Relaxed); + debug_assert!(old <= self.worker_count()); } /// Get the number of parked workers in the group From c7884b6669d9970454f1693ef8754a907ba3574f Mon Sep 17 00:00:00 2001 From: Wenyu Zhao Date: Tue, 24 May 2022 12:49:16 +1000 Subject: [PATCH 13/24] refactor local work queues --- src/scheduler/gc_work.rs | 4 +- src/scheduler/scheduler.rs | 13 +++--- src/scheduler/worker.rs | 66 +++++++++++++++---------------- src/util/sanity/sanity_checker.rs | 4 +- 4 files changed, 43 insertions(+), 44 deletions(-) diff --git a/src/scheduler/gc_work.rs b/src/scheduler/gc_work.rs index aa71a0a6fd..615458f0a1 100644 --- a/src/scheduler/gc_work.rs +++ b/src/scheduler/gc_work.rs @@ -51,7 +51,7 @@ impl GCWork for Prepare { .add(PrepareMutator::::new(mutator)); } for w in &mmtk.scheduler.worker_group.workers_shared { - let result = w.local_work.push(Box::new(PrepareCollector)); + let result = w.designated_work.push(Box::new(PrepareCollector)); debug_assert!(result.is_ok()); } } @@ -120,7 +120,7 @@ impl GCWork for Release { .add(ReleaseMutator::::new(mutator)); } for w in &mmtk.scheduler.worker_group.workers_shared { - let result = w.local_work.push(Box::new(ReleaseCollector)); + let result = w.designated_work.push(Box::new(ReleaseCollector)); debug_assert!(result.is_ok()); } } diff --git a/src/scheduler/scheduler.rs b/src/scheduler/scheduler.rs index d514e33dd4..dedb54a5a4 100644 --- a/src/scheduler/scheduler.rs +++ b/src/scheduler/scheduler.rs @@ -6,7 +6,7 @@ use crate::mmtk::MMTK; use crate::util::opaque_pointer::*; use crate::vm::Collection; use crate::vm::{GCThreadContext, VMBinding}; -use crossbeam::deque::Steal; +use crossbeam::deque::{self, Steal}; use enum_map::{enum_map, EnumMap}; use std::collections::HashMap; use std::sync::mpsc::channel; @@ -106,7 +106,7 @@ impl GCWorkScheduler { } } - let coordinator_worker_shared = Arc::new(GCWorkerShared::::new()); + let coordinator_worker_shared = Arc::new(GCWorkerShared::::new(None)); Arc::new(Self { work_buckets, @@ -135,6 +135,7 @@ impl GCWorkScheduler { true, sender.clone(), self.coordinator_worker_shared.clone(), + deque::Worker::new_fifo(), ); let gc_controller = GCController::new( mmtk, @@ -304,23 +305,23 @@ impl GCWorkScheduler { fn poll_schedulable_work_once(&self, worker: &GCWorker) -> Steal>> { let mut should_retry = false; // Try find a packet that can be processed only by this worker. - if let Some(w) = worker.shared.local_work.pop() { + if let Some(w) = worker.shared.designated_work.pop() { return Steal::Success(w); } // Try get a packet from a work bucket. for work_bucket in self.work_buckets.values() { - match work_bucket.poll(&worker.shared.local_work_buffer) { + match work_bucket.poll(&worker.local_work_buffer) { Steal::Success(w) => return Steal::Success(w), Steal::Retry => should_retry = true, _ => {} } } // Try steal some packets from any worker - for (id, stealer) in self.worker_group.stealers.iter().enumerate() { + for (id, worker_shared) in self.worker_group.workers_shared.iter().enumerate() { if id == worker.ordinal { continue; } - match stealer.steal() { + match worker_shared.stealer.as_ref().unwrap().steal() { Steal::Success(w) => return Steal::Success(w), Steal::Retry => should_retry = true, _ => {} diff --git a/src/scheduler/worker.rs b/src/scheduler/worker.rs index dabfb35230..02a844ceff 100644 --- a/src/scheduler/worker.rs +++ b/src/scheduler/worker.rs @@ -10,7 +10,7 @@ use crossbeam::deque::{self, Stealer}; use crossbeam::queue::ArrayQueue; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::mpsc::Sender; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; /// The part shared between a GCWorker and the scheduler. /// This structure is used for communication, e.g. adding new work packets. @@ -18,23 +18,17 @@ pub struct GCWorkerShared { /// Worker-local statistics data. stat: AtomicRefCell>, /// A queue of GCWork that can only be processed by the owned thread. - pub local_work: ArrayQueue>>, - /// Local work packet queue. - pub local_work_buffer: deque::Worker>>, -} - -impl Default for GCWorkerShared { - fn default() -> Self { - Self::new() - } + pub designated_work: ArrayQueue>>, + /// Handle for stealing packets from the current worker + pub stealer: Option>>>, } impl GCWorkerShared { - pub fn new() -> Self { + pub fn new(stealer: Option>>>) -> Self { Self { stat: Default::default(), - local_work: ArrayQueue::new(16), - local_work_buffer: deque::Worker::new_fifo(), + designated_work: ArrayQueue::new(16), + stealer, } } } @@ -60,6 +54,8 @@ pub struct GCWorker { is_coordinator: bool, /// Reference to the shared part of the GC worker. It is used for synchronization. pub shared: Arc>, + /// Local work packet queue. + pub local_work_buffer: deque::Worker>>, } unsafe impl Sync for GCWorkerShared {} @@ -87,6 +83,7 @@ impl GCWorker { is_coordinator: bool, sender: Sender>, shared: Arc>, + local_work_buffer: deque::Worker>>, ) -> Self { Self { tls: VMWorkerThread(VMThread::UNINITIALIZED), @@ -98,6 +95,7 @@ impl GCWorker { mmtk, is_coordinator, shared, + local_work_buffer, } } @@ -109,12 +107,12 @@ impl GCWorker { #[inline] pub fn add_work_prioritized(&mut self, bucket: WorkBucketStage, work: impl GCWork) { if !self.scheduler().work_buckets[bucket].is_activated() - || self.shared.local_work_buffer.len() >= Self::LOCALLY_CACHED_WORK_PACKETS + || self.local_work_buffer.len() >= Self::LOCALLY_CACHED_WORK_PACKETS { self.scheduler.work_buckets[bucket].add_prioritized(Box::new(work)); return; } - self.shared.local_work_buffer.push(Box::new(work)); + self.local_work_buffer.push(Box::new(work)); } /// Add a work packet to the work queue. @@ -123,12 +121,12 @@ impl GCWorker { #[inline] pub fn add_work(&mut self, bucket: WorkBucketStage, work: impl GCWork) { if !self.scheduler().work_buckets[bucket].is_activated() - || self.shared.local_work_buffer.len() >= Self::LOCALLY_CACHED_WORK_PACKETS + || self.local_work_buffer.len() >= Self::LOCALLY_CACHED_WORK_PACKETS { self.scheduler.work_buckets[bucket].add(work); return; } - self.shared.local_work_buffer.push(Box::new(work)); + self.local_work_buffer.push(Box::new(work)); } pub fn is_coordinator(&self) -> bool { @@ -155,15 +153,10 @@ impl GCWorker { /// 4. Steal from other workers fn poll(&self) -> Box> { self.shared - .local_work + .designated_work .pop() - .or_else(|| { - self.shared - .local_work_buffer - .pop() - .or_else(|| Some(self.scheduler().poll(self))) - }) - .unwrap() + .or_else(|| self.local_work_buffer.pop()) + .unwrap_or_else(|| self.scheduler().poll(self)) } pub fn do_boxed_work(&'static mut self, mut work: Box>) { @@ -186,27 +179,29 @@ impl GCWorker { pub struct WorkerGroup { /// Shared worker data pub workers_shared: Vec>>, - /// Handles for stealing packets from workers - pub stealers: Vec>>>, parked_workers: AtomicUsize, + unspawned_local_work_queues: Mutex>>>>, } impl WorkerGroup { /// Create a WorkerGroup pub fn new(num_workers: usize) -> Arc { - let workers_shared = (0..num_workers) - .map(|_| Arc::new(GCWorkerShared::::new())) + let unspawned_local_work_queues = (0..num_workers) + .map(|_| deque::Worker::new_fifo()) .collect::>(); - let stealers = workers_shared - .iter() - .map(|worker| worker.local_work_buffer.stealer()) - .collect(); + let workers_shared = (0..num_workers) + .map(|i| { + Arc::new(GCWorkerShared::::new(Some( + unspawned_local_work_queues[i].stealer(), + ))) + }) + .collect::>(); Arc::new(Self { workers_shared, - stealers, parked_workers: Default::default(), + unspawned_local_work_queues: Mutex::new(unspawned_local_work_queues), }) } @@ -217,6 +212,7 @@ impl WorkerGroup { sender: Sender>, tls: VMThread, ) { + let mut unspawned_local_work_queues = self.unspawned_local_work_queues.lock().unwrap(); // Spawn each worker thread. for (ordinal, shared) in self.workers_shared.iter().enumerate() { let worker = Box::new(GCWorker::new( @@ -226,9 +222,11 @@ impl WorkerGroup { false, sender.clone(), shared.clone(), + unspawned_local_work_queues.pop().unwrap(), )); VM::VMCollection::spawn_gc_thread(tls, GCThreadContext::::Worker(worker)); } + debug_assert!(unspawned_local_work_queues.is_empty()); } /// Get the number of workers in the group diff --git a/src/util/sanity/sanity_checker.rs b/src/util/sanity/sanity_checker.rs index f333338421..2da10c9a75 100644 --- a/src/util/sanity/sanity_checker.rs +++ b/src/util/sanity/sanity_checker.rs @@ -110,7 +110,7 @@ impl GCWork for SanityPrepare

{ .add(PrepareMutator::::new(mutator)); } for w in &mmtk.scheduler.worker_group.workers_shared { - let result = w.local_work.push(Box::new(PrepareCollector)); + let result = w.designated_work.push(Box::new(PrepareCollector)); debug_assert!(result.is_ok()); } } @@ -135,7 +135,7 @@ impl GCWork for SanityRelease

{ .add(ReleaseMutator::::new(mutator)); } for w in &mmtk.scheduler.worker_group.workers_shared { - let result = w.local_work.push(Box::new(ReleaseCollector)); + let result = w.designated_work.push(Box::new(ReleaseCollector)); debug_assert!(result.is_ok()); } } From 3f5bb03728a195cd3a37e0add5387f13ddea31b5 Mon Sep 17 00:00:00 2001 From: Wenyu Zhao Date: Tue, 24 May 2022 13:53:52 +1000 Subject: [PATCH 14/24] Fix work buckets enumeration --- Cargo.toml | 2 +- src/scheduler/scheduler.rs | 54 ++++++++++++++++++------------------ src/scheduler/work_bucket.rs | 7 +++++ 3 files changed, 35 insertions(+), 28 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index b304c7bc6e..e728e206fc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -31,7 +31,7 @@ lazy_static = "1.1" log = {version = "0.4", features = ["max_level_trace", "release_max_level_off"] } crossbeam = "0.8.1" num_cpus = "1.8" -enum-map = "0.6.2" +enum-map = "=2.1" downcast-rs = "1.1.1" atomic-traits = "0.2.0" atomic = "0.4.6" diff --git a/src/scheduler/scheduler.rs b/src/scheduler/scheduler.rs index dedb54a5a4..6d9dafe96b 100644 --- a/src/scheduler/scheduler.rs +++ b/src/scheduler/scheduler.rs @@ -7,6 +7,7 @@ use crate::util::opaque_pointer::*; use crate::vm::Collection; use crate::vm::{GCThreadContext, VMBinding}; use crossbeam::deque::{self, Steal}; +use enum_map::Enum; use enum_map::{enum_map, EnumMap}; use std::collections::HashMap; use std::sync::mpsc::channel; @@ -74,34 +75,31 @@ impl GCWorkScheduler { { // Unconstrained is always open. Prepare will be opened at the beginning of a GC. // This vec will grow for each stage we call with open_next() - let first_stw_stage = work_buckets.iter().nth(1).map(|(id, _)| id).unwrap(); + let first_stw_stage = WorkBucketStage::first_stw_stage(); let mut open_stages: Vec = vec![first_stw_stage]; // The rest will open after the previous stage is done. - let stages = work_buckets - .iter() - .map(|(stage, _)| stage) - .collect::>(); - let mut open_next = |s: WorkBucketStage| { - let cur_stages = open_stages.clone(); - work_buckets[s].set_open_condition(move |scheduler: &GCWorkScheduler| { - let should_open = scheduler.are_buckets_drained(&cur_stages); - // Additional check before the `RefClosure` bucket opens. - if should_open && s == crate::scheduler::work_bucket::LAST_CLOSURE_BUCKET { - if let Some(closure_end) = scheduler.closure_end.lock().unwrap().as_ref() { - if closure_end() { - // Don't open `LAST_CLOSURE_BUCKET` if `closure_end` added more works to `Closure`. - return false; - } - } - } - should_open - }); - open_stages.push(s); - }; - + let stages = (0..WorkBucketStage::LENGTH).map(WorkBucketStage::from_usize); for stage in stages { if stage != WorkBucketStage::Unconstrained && stage != first_stw_stage { - open_next(stage); + let cur_stages = open_stages.clone(); + work_buckets[stage].set_open_condition( + move |scheduler: &GCWorkScheduler| { + let should_open = scheduler.are_buckets_drained(&cur_stages); + // Additional check before the `RefClosure` bucket opens. + if should_open && stage == LAST_CLOSURE_BUCKET { + if let Some(closure_end) = + scheduler.closure_end.lock().unwrap().as_ref() + { + if closure_end() { + // Don't open `LAST_CLOSURE_BUCKET` if `closure_end` added more works to `Closure`. + return false; + } + } + } + should_open + }, + ); + open_stages.push(stage); } } } @@ -242,10 +240,12 @@ impl GCWorkScheduler { fn update_buckets(&self) -> bool { let mut buckets_updated = false; let mut new_packets = false; - for (id, bucket) in self.work_buckets.iter() { + for i in 0..WorkBucketStage::LENGTH { + let id = WorkBucketStage::from_usize(i); if id == WorkBucketStage::Unconstrained { continue; } + let bucket = &self.work_buckets[id]; let bucket_opened = bucket.update(self); buckets_updated |= bucket_opened; if bucket_opened { @@ -264,7 +264,7 @@ impl GCWorkScheduler { } pub fn reset_state(&self) { - let first_stw_stage = self.work_buckets.iter().nth(1).map(|(id, _)| id).unwrap(); + let first_stw_stage = WorkBucketStage::first_stw_stage(); self.work_buckets.iter().for_each(|(id, bkt)| { if id != WorkBucketStage::Unconstrained && id != first_stw_stage { bkt.deactivate(); @@ -420,7 +420,7 @@ impl GCWorkScheduler { pub fn notify_mutators_paused(&self, mmtk: &'static MMTK) { mmtk.plan.base().gc_requester.clear_request(); - let first_stw_bucket = self.work_buckets.values().nth(1).unwrap(); + let first_stw_bucket = &self.work_buckets[WorkBucketStage::first_stw_stage()]; debug_assert!(!first_stw_bucket.is_activated()); first_stw_bucket.activate(); let _guard = self.worker_monitor.0.lock().unwrap(); diff --git a/src/scheduler/work_bucket.rs b/src/scheduler/work_bucket.rs index 11bc67412d..9a9a3f41d0 100644 --- a/src/scheduler/work_bucket.rs +++ b/src/scheduler/work_bucket.rs @@ -222,4 +222,11 @@ pub enum WorkBucketStage { Final, } +impl WorkBucketStage { + #[inline] + pub fn first_stw_stage() -> Self { + WorkBucketStage::from_usize(1) + } +} + pub const LAST_CLOSURE_BUCKET: WorkBucketStage = WorkBucketStage::PhantomRefClosure; From 8f7db595be9ed1b5ee3e4fa347a704d5f0ca6422 Mon Sep 17 00:00:00 2001 From: Wenyu Zhao Date: Thu, 26 May 2022 20:04:10 +1000 Subject: [PATCH 15/24] Fix --- src/util/reference_processor.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/util/reference_processor.rs b/src/util/reference_processor.rs index 7ab4c9c31b..7998a7bba9 100644 --- a/src/util/reference_processor.rs +++ b/src/util/reference_processor.rs @@ -96,8 +96,6 @@ impl ReferenceProcessors { /// Scan weak references. pub fn scan_weak_refs(&self, trace: &mut E, mmtk: &'static MMTK) { - self.soft - .scan::(trace, mmtk.plan.is_current_gc_nursery()); self.weak .scan::(trace, mmtk.plan.is_current_gc_nursery()); } From 49775b43355ac41722d2587664cabf82a91398fc Mon Sep 17 00:00:00 2001 From: Wenyu Zhao Date: Wed, 1 Jun 2022 19:39:55 +1000 Subject: [PATCH 16/24] WIP: Fix designated work --- src/scheduler/controller.rs | 1 + src/scheduler/scheduler.rs | 11 +++++++++-- src/scheduler/worker.rs | 7 +++++++ 3 files changed, 17 insertions(+), 2 deletions(-) diff --git a/src/scheduler/controller.rs b/src/scheduler/controller.rs index 8b7f30731a..1c9d470618 100644 --- a/src/scheduler/controller.rs +++ b/src/scheduler/controller.rs @@ -94,6 +94,7 @@ impl GCController { } } } + debug_assert!(!self.scheduler.worker_group.has_designated_work()); for message in self.receiver.try_iter() { if let CoordinatorMessage::Work(mut work) = message { work.do_work_with_stat(worker, mmtk); diff --git a/src/scheduler/scheduler.rs b/src/scheduler/scheduler.rs index 6d9dafe96b..24a026fce4 100644 --- a/src/scheduler/scheduler.rs +++ b/src/scheduler/scheduler.rs @@ -364,8 +364,8 @@ impl GCWorkScheduler { #[cold] fn poll_slow(&self, worker: &GCWorker) -> Box> { - let mut guard = self.worker_monitor.0.lock().unwrap(); loop { + let guard = self.worker_monitor.0.lock().unwrap(); // Retry polling if let Some(work) = self.poll_schedulable_work(worker) { return work; @@ -374,6 +374,12 @@ impl GCWorkScheduler { let all_parked = self.worker_group.inc_parked_workers(); // If all workers are parked, try activate new buckets if all_parked { + // If there're any designated work, resume the workers and process them. + if self.worker_group.has_designated_work() { + self.worker_group.dec_parked_workers(); + self.worker_monitor.1.notify_all(); + continue; + } if self.update_buckets() { // We're not going to sleep since a new bucket is just open. self.worker_group.dec_parked_workers(); @@ -388,11 +394,12 @@ impl GCWorkScheduler { // Return this packet and execute it. return work; } + debug_assert!(!self.worker_group.has_designated_work()); // The current pause is finished if we can't open more buckets. worker.sender.send(CoordinatorMessage::Finish).unwrap(); } // Wait - guard = self.worker_monitor.1.wait(guard).unwrap(); + let _ = self.worker_monitor.1.wait(guard).unwrap(); // Unpark this worker self.worker_group.dec_parked_workers(); } diff --git a/src/scheduler/worker.rs b/src/scheduler/worker.rs index 02a844ceff..c0479f45c1 100644 --- a/src/scheduler/worker.rs +++ b/src/scheduler/worker.rs @@ -265,4 +265,11 @@ impl WorkerGroup { pub fn all_parked(&self) -> bool { self.parked_workers() == self.worker_count() } + + /// Return true if there're any pending designated work + pub fn has_designated_work(&self) -> bool { + self.workers_shared + .iter() + .any(|w| !w.designated_work.is_empty()) + } } From ce40169a07d14b7fb117e14e9dccb23d906296c8 Mon Sep 17 00:00:00 2001 From: Wenyu Zhao Date: Wed, 1 Jun 2022 19:45:33 +1000 Subject: [PATCH 17/24] Fix Cargo.toml --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index eeb24b049a..ad033071b8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -27,7 +27,7 @@ mimalloc-sys = { version = "0.1.6", optional = true } hoard-sys = { version = "0.1.1", optional = true } lazy_static = "1.1" log = { version = "0.4", features = ["max_level_trace", "release_max_level_off"] } -crossbeam-deque = "0.6" +crossbeam = "0.8.1" num_cpus = "1.8" enum-map = "=2.1" downcast-rs = "1.1.1" From 4743e16cfee7b9d445a33b6e4fd92b27b151f3cd Mon Sep 17 00:00:00 2001 From: Wenyu Zhao Date: Thu, 2 Jun 2022 09:47:46 +1000 Subject: [PATCH 18/24] WIP --- src/scheduler/scheduler.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/scheduler/scheduler.rs b/src/scheduler/scheduler.rs index 24a026fce4..113fe8e26e 100644 --- a/src/scheduler/scheduler.rs +++ b/src/scheduler/scheduler.rs @@ -374,7 +374,7 @@ impl GCWorkScheduler { let all_parked = self.worker_group.inc_parked_workers(); // If all workers are parked, try activate new buckets if all_parked { - // If there're any designated work, resume the workers and process them. + // If there're any designated work, resume the workers and process them if self.worker_group.has_designated_work() { self.worker_group.dec_parked_workers(); self.worker_monitor.1.notify_all(); From 49e792f3ec8126ebed6022255674bd900c2404c9 Mon Sep 17 00:00:00 2001 From: Wenyu Zhao Date: Thu, 2 Jun 2022 12:06:17 +1000 Subject: [PATCH 19/24] Fix CI --- src/scheduler/scheduler.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/scheduler/scheduler.rs b/src/scheduler/scheduler.rs index 113fe8e26e..5e262586ca 100644 --- a/src/scheduler/scheduler.rs +++ b/src/scheduler/scheduler.rs @@ -399,7 +399,7 @@ impl GCWorkScheduler { worker.sender.send(CoordinatorMessage::Finish).unwrap(); } // Wait - let _ = self.worker_monitor.1.wait(guard).unwrap(); + let _guard = self.worker_monitor.1.wait(guard).unwrap(); // Unpark this worker self.worker_group.dec_parked_workers(); } From 48b1e415e5cf9d1d53f9b5fecbde301d429de645 Mon Sep 17 00:00:00 2001 From: Wenyu Zhao Date: Sat, 4 Jun 2022 00:50:14 +1000 Subject: [PATCH 20/24] minor --- src/scheduler/scheduler.rs | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/src/scheduler/scheduler.rs b/src/scheduler/scheduler.rs index 5e262586ca..4dd6b428c3 100644 --- a/src/scheduler/scheduler.rs +++ b/src/scheduler/scheduler.rs @@ -247,9 +247,13 @@ impl GCWorkScheduler { } let bucket = &self.work_buckets[id]; let bucket_opened = bucket.update(self); - buckets_updated |= bucket_opened; + buckets_updated = buckets_updated || bucket_opened; if bucket_opened { - new_packets |= !bucket.is_drained(); + new_packets = new_packets || !bucket.is_drained(); + // Quit the loop. There'are already new packets in the newly opened buckets. + if new_packets { + break; + } } } buckets_updated && new_packets From db098b27968c134219c1a2ee8867f98e74ab202a Mon Sep 17 00:00:00 2001 From: Wenyu Zhao Date: Sat, 4 Jun 2022 10:25:56 +1000 Subject: [PATCH 21/24] Fix jikes --- src/scheduler/controller.rs | 58 +++++++++++++++++++++++-------------- src/scheduler/scheduler.rs | 11 +++++++ 2 files changed, 47 insertions(+), 22 deletions(-) diff --git a/src/scheduler/controller.rs b/src/scheduler/controller.rs index 1c9d470618..b9da24bca8 100644 --- a/src/scheduler/controller.rs +++ b/src/scheduler/controller.rs @@ -12,6 +12,7 @@ use crate::scheduler::CoordinatorMessage; use crate::util::VMWorkerThread; use crate::vm::VMBinding; use crate::MMTK; +use atomic::Ordering; use super::{GCWork, GCWorkScheduler, GCWorker}; @@ -66,38 +67,51 @@ impl GCController { } } - /// Coordinate workers to perform GC in response to a GC request. - pub fn do_gc_until_completion(&mut self) { + /// Process a message. Return true if the GC is finished. + fn process_message(&mut self, message: CoordinatorMessage) -> bool { let worker = &mut self.coordinator_worker; let mmtk = self.mmtk; + self.scheduler + .completed_messages + .fetch_add(1, Ordering::SeqCst); + match message { + CoordinatorMessage::Work(mut work) => { + work.do_work_with_stat(worker, mmtk); + false + } + CoordinatorMessage::Finish => { + // Quit only if all the buckets are empty. + // For concurrent GCs, the coordinator thread may receive this message when + // some buckets are still not empty. Under such case, the coordinator + // should ignore the message. + let _guard = self.scheduler.worker_monitor.0.lock().unwrap(); + self.scheduler.worker_group.all_parked() && self.scheduler.all_buckets_empty() + } + } + } + /// Coordinate workers to perform GC in response to a GC request. + pub fn do_gc_until_completion(&mut self) { // Schedule collection. - ScheduleCollection.do_work_with_stat(worker, mmtk); + ScheduleCollection.do_work_with_stat(&mut self.coordinator_worker, self.mmtk); // Drain the message queue and execute coordinator work. loop { - match self.receiver.recv().unwrap() { - CoordinatorMessage::Work(mut work) => { - work.do_work_with_stat(worker, mmtk); - } - CoordinatorMessage::Finish => { - // Quit only if all the buckets are empty. - // For concurrent GCs, the coordinator thread may receive this message when - // some buckets are still not empty. Under such case, the coordinator - // should ignore the message. - let _guard = self.scheduler.worker_monitor.0.lock().unwrap(); - if self.scheduler.worker_group.all_parked() - && self.scheduler.all_buckets_empty() - { - break; - } - } + let message = self.receiver.recv().unwrap(); + let finished = self.process_message(message); + if finished { + break; } } debug_assert!(!self.scheduler.worker_group.has_designated_work()); + // Sometimes multiple finish messages will be sent. Skip them. for message in self.receiver.try_iter() { - if let CoordinatorMessage::Work(mut work) = message { - work.do_work_with_stat(worker, mmtk); + self.scheduler + .completed_messages + .fetch_add(1, Ordering::SeqCst); + match message { + CoordinatorMessage::Work(_) => unreachable!(), + CoordinatorMessage::Finish => {} } } self.scheduler.deactivate_all(); @@ -106,7 +120,7 @@ impl GCController { // Otherwise, for generational GCs, workers will receive and process // newly generated remembered-sets from those open buckets. // But these remsets should be preserved until next GC. - EndOfGC.do_work_with_stat(worker, mmtk); + EndOfGC.do_work_with_stat(&mut self.coordinator_worker, self.mmtk); self.scheduler.debug_assert_all_buckets_deactivated(); } diff --git a/src/scheduler/scheduler.rs b/src/scheduler/scheduler.rs index 4dd6b428c3..0be6495f9a 100644 --- a/src/scheduler/scheduler.rs +++ b/src/scheduler/scheduler.rs @@ -10,6 +10,7 @@ use crossbeam::deque::{self, Steal}; use enum_map::Enum; use enum_map::{enum_map, EnumMap}; use std::collections::HashMap; +use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::mpsc::channel; use std::sync::{Arc, Condvar, Mutex}; @@ -41,6 +42,10 @@ pub struct GCWorkScheduler { /// the `Closure` bucket multiple times to iteratively discover and process /// more ephemeron objects. closure_end: Mutex bool>>>, + /// Counter for messages sent to the controller. + pub(super) issued_messages: AtomicUsize, + /// Counter for messages completed by the controller. + pub(super) completed_messages: AtomicUsize, } // FIXME: GCWorkScheduler should be naturally Sync, but we cannot remove this `impl` yet. @@ -112,6 +117,8 @@ impl GCWorkScheduler { coordinator_worker_shared, worker_monitor, closure_end: Mutex::new(None), + issued_messages: AtomicUsize::new(0), + completed_messages: AtomicUsize::new(0), }) } @@ -221,6 +228,8 @@ impl GCWorkScheduler { fn are_buckets_drained(&self, buckets: &[WorkBucketStage]) -> bool { buckets.iter().all(|&b| self.work_buckets[b].is_drained()) + && self.issued_messages.load(Ordering::Relaxed) + == self.completed_messages.load(Ordering::Relaxed) } pub fn on_closure_end(&self, f: Box bool>) { @@ -287,6 +296,7 @@ impl GCWorkScheduler { } pub fn add_coordinator_work(&self, work: impl CoordinatorWork, worker: &GCWorker) { + self.issued_messages.fetch_add(1, Ordering::SeqCst); worker .sender .send(CoordinatorMessage::Work(Box::new(work))) @@ -400,6 +410,7 @@ impl GCWorkScheduler { } debug_assert!(!self.worker_group.has_designated_work()); // The current pause is finished if we can't open more buckets. + self.issued_messages.fetch_add(1, Ordering::SeqCst); worker.sender.send(CoordinatorMessage::Finish).unwrap(); } // Wait From 34e15e2f03dc81b96459e4c5a630aabd596bde61 Mon Sep 17 00:00:00 2001 From: Wenyu Zhao Date: Sat, 4 Jun 2022 10:28:40 +1000 Subject: [PATCH 22/24] minor --- src/scheduler/controller.rs | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/src/scheduler/controller.rs b/src/scheduler/controller.rs index b9da24bca8..ba22f77415 100644 --- a/src/scheduler/controller.rs +++ b/src/scheduler/controller.rs @@ -71,15 +71,18 @@ impl GCController { fn process_message(&mut self, message: CoordinatorMessage) -> bool { let worker = &mut self.coordinator_worker; let mmtk = self.mmtk; - self.scheduler - .completed_messages - .fetch_add(1, Ordering::SeqCst); match message { CoordinatorMessage::Work(mut work) => { work.do_work_with_stat(worker, mmtk); + self.scheduler + .completed_messages + .fetch_add(1, Ordering::SeqCst); false } CoordinatorMessage::Finish => { + self.scheduler + .completed_messages + .fetch_add(1, Ordering::SeqCst); // Quit only if all the buckets are empty. // For concurrent GCs, the coordinator thread may receive this message when // some buckets are still not empty. Under such case, the coordinator From 091dacf1eeb1a966def27613dad7176eeff53357 Mon Sep 17 00:00:00 2001 From: Wenyu Zhao Date: Sat, 4 Jun 2022 13:16:50 +1000 Subject: [PATCH 23/24] cleanup --- src/scheduler/controller.rs | 12 ++++++------ src/scheduler/scheduler.rs | 16 ++++++---------- 2 files changed, 12 insertions(+), 16 deletions(-) diff --git a/src/scheduler/controller.rs b/src/scheduler/controller.rs index ba22f77415..6054f357b1 100644 --- a/src/scheduler/controller.rs +++ b/src/scheduler/controller.rs @@ -75,14 +75,14 @@ impl GCController { CoordinatorMessage::Work(mut work) => { work.do_work_with_stat(worker, mmtk); self.scheduler - .completed_messages - .fetch_add(1, Ordering::SeqCst); + .pending_messages + .fetch_sub(1, Ordering::SeqCst); false } CoordinatorMessage::Finish => { self.scheduler - .completed_messages - .fetch_add(1, Ordering::SeqCst); + .pending_messages + .fetch_sub(1, Ordering::SeqCst); // Quit only if all the buckets are empty. // For concurrent GCs, the coordinator thread may receive this message when // some buckets are still not empty. Under such case, the coordinator @@ -110,8 +110,8 @@ impl GCController { // Sometimes multiple finish messages will be sent. Skip them. for message in self.receiver.try_iter() { self.scheduler - .completed_messages - .fetch_add(1, Ordering::SeqCst); + .pending_messages + .fetch_sub(1, Ordering::SeqCst); match message { CoordinatorMessage::Work(_) => unreachable!(), CoordinatorMessage::Finish => {} diff --git a/src/scheduler/scheduler.rs b/src/scheduler/scheduler.rs index 0be6495f9a..58d2f08d1f 100644 --- a/src/scheduler/scheduler.rs +++ b/src/scheduler/scheduler.rs @@ -42,10 +42,8 @@ pub struct GCWorkScheduler { /// the `Closure` bucket multiple times to iteratively discover and process /// more ephemeron objects. closure_end: Mutex bool>>>, - /// Counter for messages sent to the controller. - pub(super) issued_messages: AtomicUsize, - /// Counter for messages completed by the controller. - pub(super) completed_messages: AtomicUsize, + /// Counter for pending coordinator messages. + pub(super) pending_messages: AtomicUsize, } // FIXME: GCWorkScheduler should be naturally Sync, but we cannot remove this `impl` yet. @@ -117,8 +115,7 @@ impl GCWorkScheduler { coordinator_worker_shared, worker_monitor, closure_end: Mutex::new(None), - issued_messages: AtomicUsize::new(0), - completed_messages: AtomicUsize::new(0), + pending_messages: AtomicUsize::new(0), }) } @@ -228,8 +225,7 @@ impl GCWorkScheduler { fn are_buckets_drained(&self, buckets: &[WorkBucketStage]) -> bool { buckets.iter().all(|&b| self.work_buckets[b].is_drained()) - && self.issued_messages.load(Ordering::Relaxed) - == self.completed_messages.load(Ordering::Relaxed) + && self.pending_messages.load(Ordering::Relaxed) == 0 } pub fn on_closure_end(&self, f: Box bool>) { @@ -296,7 +292,7 @@ impl GCWorkScheduler { } pub fn add_coordinator_work(&self, work: impl CoordinatorWork, worker: &GCWorker) { - self.issued_messages.fetch_add(1, Ordering::SeqCst); + self.pending_messages.fetch_add(1, Ordering::SeqCst); worker .sender .send(CoordinatorMessage::Work(Box::new(work))) @@ -410,7 +406,7 @@ impl GCWorkScheduler { } debug_assert!(!self.worker_group.has_designated_work()); // The current pause is finished if we can't open more buckets. - self.issued_messages.fetch_add(1, Ordering::SeqCst); + self.pending_messages.fetch_add(1, Ordering::SeqCst); worker.sender.send(CoordinatorMessage::Finish).unwrap(); } // Wait From 42d3540d36693d49a20653a2fc043e71e4e95acd Mon Sep 17 00:00:00 2001 From: Wenyu Zhao Date: Sat, 4 Jun 2022 20:56:11 +1000 Subject: [PATCH 24/24] seqcst --- src/scheduler/scheduler.rs | 2 +- src/scheduler/work_bucket.rs | 4 ++-- src/scheduler/worker.rs | 6 +++--- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/src/scheduler/scheduler.rs b/src/scheduler/scheduler.rs index 58d2f08d1f..e4f9d01fc9 100644 --- a/src/scheduler/scheduler.rs +++ b/src/scheduler/scheduler.rs @@ -225,7 +225,7 @@ impl GCWorkScheduler { fn are_buckets_drained(&self, buckets: &[WorkBucketStage]) -> bool { buckets.iter().all(|&b| self.work_buckets[b].is_drained()) - && self.pending_messages.load(Ordering::Relaxed) == 0 + && self.pending_messages.load(Ordering::SeqCst) == 0 } pub fn on_closure_end(&self, f: Box bool>) { diff --git a/src/scheduler/work_bucket.rs b/src/scheduler/work_bucket.rs index 9a9a3f41d0..078c6a94ab 100644 --- a/src/scheduler/work_bucket.rs +++ b/src/scheduler/work_bucket.rs @@ -96,12 +96,12 @@ impl WorkBucket { #[inline(always)] pub fn is_activated(&self) -> bool { - self.active.load(Ordering::Relaxed) + self.active.load(Ordering::SeqCst) } /// Enable the bucket pub fn activate(&self) { - self.active.store(true, Ordering::Relaxed); + self.active.store(true, Ordering::SeqCst); } /// Test if the bucket is drained diff --git a/src/scheduler/worker.rs b/src/scheduler/worker.rs index c0479f45c1..011d8bce19 100644 --- a/src/scheduler/worker.rs +++ b/src/scheduler/worker.rs @@ -241,7 +241,7 @@ impl WorkerGroup { /// Return true if all the workers are parked. #[inline(always)] pub fn inc_parked_workers(&self) -> bool { - let old = self.parked_workers.fetch_add(1, Ordering::Relaxed); + let old = self.parked_workers.fetch_add(1, Ordering::SeqCst); debug_assert!(old < self.worker_count()); old + 1 == self.worker_count() } @@ -250,14 +250,14 @@ impl WorkerGroup { /// Called after a worker is resumed from the parked state. #[inline(always)] pub fn dec_parked_workers(&self) { - let old = self.parked_workers.fetch_sub(1, Ordering::Relaxed); + let old = self.parked_workers.fetch_sub(1, Ordering::SeqCst); debug_assert!(old <= self.worker_count()); } /// Get the number of parked workers in the group #[inline(always)] pub fn parked_workers(&self) -> usize { - self.parked_workers.load(Ordering::Relaxed) + self.parked_workers.load(Ordering::SeqCst) } /// Check if all the workers are packed