diff --git a/Cargo.toml b/Cargo.toml index 6b3ddcbfdf..ad033071b8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -27,9 +27,9 @@ mimalloc-sys = { version = "0.1.6", optional = true } hoard-sys = { version = "0.1.1", optional = true } lazy_static = "1.1" log = { version = "0.4", features = ["max_level_trace", "release_max_level_off"] } -crossbeam-deque = "0.6" +crossbeam = "0.8.1" num_cpus = "1.8" -enum-map = "0.6.2" +enum-map = "=2.1" downcast-rs = "1.1.1" atomic-traits = "0.2.0" atomic = "0.4.6" @@ -41,7 +41,6 @@ strum = "0.24" strum_macros = "0.24" [dev-dependencies] -crossbeam = "0.7.3" rand = "0.7.3" [features] diff --git a/src/lib.rs b/src/lib.rs index 23c64f70f7..3d35d38cd5 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -39,7 +39,7 @@ extern crate log; #[cfg(target = "x86_64-unknown-linux-gnu")] extern crate atomic; extern crate atomic_traits; -extern crate crossbeam_deque; +extern crate crossbeam; extern crate num_cpus; #[macro_use] extern crate downcast_rs; diff --git a/src/scheduler/controller.rs b/src/scheduler/controller.rs index ceaae741d7..6054f357b1 100644 --- a/src/scheduler/controller.rs +++ b/src/scheduler/controller.rs @@ -12,6 +12,7 @@ use crate::scheduler::CoordinatorMessage; use crate::util::VMWorkerThread; use crate::vm::VMBinding; use crate::MMTK; +use atomic::Ordering; use super::{GCWork, GCWorkScheduler, GCWorker}; @@ -66,33 +67,54 @@ impl GCController { } } - /// Coordinate workers to perform GC in response to a GC request. - pub fn do_gc_until_completion(&mut self) { + /// Process a message. Return true if the GC is finished. + fn process_message(&mut self, message: CoordinatorMessage) -> bool { let worker = &mut self.coordinator_worker; let mmtk = self.mmtk; + match message { + CoordinatorMessage::Work(mut work) => { + work.do_work_with_stat(worker, mmtk); + self.scheduler + .pending_messages + .fetch_sub(1, Ordering::SeqCst); + false + } + CoordinatorMessage::Finish => { + self.scheduler + .pending_messages + .fetch_sub(1, Ordering::SeqCst); + // Quit only if all the buckets are empty. + // For concurrent GCs, the coordinator thread may receive this message when + // some buckets are still not empty. Under such case, the coordinator + // should ignore the message. + let _guard = self.scheduler.worker_monitor.0.lock().unwrap(); + self.scheduler.worker_group.all_parked() && self.scheduler.all_buckets_empty() + } + } + } + /// Coordinate workers to perform GC in response to a GC request. + pub fn do_gc_until_completion(&mut self) { // Schedule collection. - ScheduleCollection.do_work_with_stat(worker, mmtk); + ScheduleCollection.do_work_with_stat(&mut self.coordinator_worker, self.mmtk); // Drain the message queue and execute coordinator work. loop { let message = self.receiver.recv().unwrap(); - match message { - CoordinatorMessage::Work(mut work) => { - work.do_work_with_stat(worker, mmtk); - } - CoordinatorMessage::AllWorkerParked | CoordinatorMessage::BucketDrained => { - self.scheduler.update_buckets(); - } - } - let _guard = self.scheduler.worker_monitor.0.lock().unwrap(); - if self.scheduler.all_workers_parked() && self.scheduler.all_buckets_empty() { + let finished = self.process_message(message); + if finished { break; } } + debug_assert!(!self.scheduler.worker_group.has_designated_work()); + // Sometimes multiple finish messages will be sent. Skip them. for message in self.receiver.try_iter() { - if let CoordinatorMessage::Work(mut work) = message { - work.do_work_with_stat(worker, mmtk); + self.scheduler + .pending_messages + .fetch_sub(1, Ordering::SeqCst); + match message { + CoordinatorMessage::Work(_) => unreachable!(), + CoordinatorMessage::Finish => {} } } self.scheduler.deactivate_all(); @@ -101,7 +123,7 @@ impl GCController { // Otherwise, for generational GCs, workers will receive and process // newly generated remembered-sets from those open buckets. // But these remsets should be preserved until next GC. - EndOfGC.do_work_with_stat(worker, mmtk); + EndOfGC.do_work_with_stat(&mut self.coordinator_worker, self.mmtk); self.scheduler.debug_assert_all_buckets_deactivated(); } diff --git a/src/scheduler/gc_work.rs b/src/scheduler/gc_work.rs index 71b137faa0..615458f0a1 100644 --- a/src/scheduler/gc_work.rs +++ b/src/scheduler/gc_work.rs @@ -50,8 +50,9 @@ impl GCWork for Prepare { mmtk.scheduler.work_buckets[WorkBucketStage::Prepare] .add(PrepareMutator::::new(mutator)); } - for w in &mmtk.scheduler.workers_shared { - w.local_work_bucket.add(PrepareCollector); + for w in &mmtk.scheduler.worker_group.workers_shared { + let result = w.designated_work.push(Box::new(PrepareCollector)); + debug_assert!(result.is_ok()); } } } @@ -118,8 +119,9 @@ impl GCWork for Release { mmtk.scheduler.work_buckets[WorkBucketStage::Release] .add(ReleaseMutator::::new(mutator)); } - for w in &mmtk.scheduler.workers_shared { - w.local_work_bucket.add(ReleaseCollector); + for w in &mmtk.scheduler.worker_group.workers_shared { + let result = w.designated_work.push(Box::new(ReleaseCollector)); + debug_assert!(result.is_ok()); } } } diff --git a/src/scheduler/scheduler.rs b/src/scheduler/scheduler.rs index d39538acd2..e4f9d01fc9 100644 --- a/src/scheduler/scheduler.rs +++ b/src/scheduler/scheduler.rs @@ -1,32 +1,33 @@ use super::stat::SchedulerStat; -use super::work_bucket::WorkBucketStage::*; use super::work_bucket::*; -use super::worker::{GCWorker, GCWorkerShared}; +use super::worker::{GCWorker, GCWorkerShared, WorkerGroup}; use super::*; use crate::mmtk::MMTK; use crate::util::opaque_pointer::*; use crate::vm::Collection; use crate::vm::{GCThreadContext, VMBinding}; +use crossbeam::deque::{self, Steal}; +use enum_map::Enum; use enum_map::{enum_map, EnumMap}; use std::collections::HashMap; -use std::sync::atomic::Ordering; +use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::mpsc::channel; use std::sync::{Arc, Condvar, Mutex}; pub enum CoordinatorMessage { + /// Send a work-packet to the coordinator thread/ Work(Box>), - AllWorkerParked, - BucketDrained, + /// Notify the coordinator thread that all GC tasks are finished. + /// When sending this message, all the work buckets should be + /// empty, and all the workers should be parked. + Finish, } -/// The shared data structure for distributing work packets between worker threads and the coordinator thread. pub struct GCWorkScheduler { - /// Work buckets for worker threads + /// Work buckets pub work_buckets: EnumMap>, - /// Work for the coordinator thread - pub coordinator_work: WorkBucket, - /// The shared parts of GC workers - pub workers_shared: Vec>>, + /// Workers + pub worker_group: Arc>, /// The shared part of the GC worker object of the controller thread coordinator_worker_shared: Arc>, /// Condition Variable for worker synchronization @@ -41,6 +42,8 @@ pub struct GCWorkScheduler { /// the `Closure` bucket multiple times to iteratively discover and process /// more ephemeron objects. closure_end: Mutex bool>>>, + /// Counter for pending coordinator messages. + pub(super) pending_messages: AtomicUsize, } // FIXME: GCWorkScheduler should be naturally Sync, but we cannot remove this `impl` yet. @@ -51,93 +54,74 @@ unsafe impl Sync for GCWorkScheduler {} impl GCWorkScheduler { pub fn new(num_workers: usize) -> Arc { let worker_monitor: Arc<(Mutex<()>, Condvar)> = Default::default(); + let worker_group = WorkerGroup::new(num_workers); // Create work buckets for workers. let mut work_buckets = enum_map! { - WorkBucketStage::Unconstrained => WorkBucket::new(true, worker_monitor.clone()), - WorkBucketStage::Prepare => WorkBucket::new(false, worker_monitor.clone()), - WorkBucketStage::Closure => WorkBucket::new(false, worker_monitor.clone()), - WorkBucketStage::SoftRefClosure => WorkBucket::new(false, worker_monitor.clone()), - WorkBucketStage::WeakRefClosure => WorkBucket::new(false, worker_monitor.clone()), - WorkBucketStage::FinalRefClosure => WorkBucket::new(false, worker_monitor.clone()), - WorkBucketStage::PhantomRefClosure => WorkBucket::new(false, worker_monitor.clone()), - WorkBucketStage::CalculateForwarding => WorkBucket::new(false, worker_monitor.clone()), - WorkBucketStage::SecondRoots => WorkBucket::new(false, worker_monitor.clone()), - WorkBucketStage::RefForwarding => WorkBucket::new(false, worker_monitor.clone()), - WorkBucketStage::FinalizableForwarding => WorkBucket::new(false, worker_monitor.clone()), - WorkBucketStage::Compact => WorkBucket::new(false, worker_monitor.clone()), - WorkBucketStage::Release => WorkBucket::new(false, worker_monitor.clone()), - WorkBucketStage::Final => WorkBucket::new(false, worker_monitor.clone()), + WorkBucketStage::Unconstrained => WorkBucket::new(true, worker_monitor.clone(), worker_group.clone()), + WorkBucketStage::Prepare => WorkBucket::new(false, worker_monitor.clone(), worker_group.clone()), + WorkBucketStage::Closure => WorkBucket::new(false, worker_monitor.clone(), worker_group.clone()), + WorkBucketStage::SoftRefClosure => WorkBucket::new(false, worker_monitor.clone(), worker_group.clone()), + WorkBucketStage::WeakRefClosure => WorkBucket::new(false, worker_monitor.clone(), worker_group.clone()), + WorkBucketStage::FinalRefClosure => WorkBucket::new(false, worker_monitor.clone(), worker_group.clone()), + WorkBucketStage::PhantomRefClosure => WorkBucket::new(false, worker_monitor.clone(), worker_group.clone()), + WorkBucketStage::CalculateForwarding => WorkBucket::new(false, worker_monitor.clone(), worker_group.clone()), + WorkBucketStage::SecondRoots => WorkBucket::new(false, worker_monitor.clone(), worker_group.clone()), + WorkBucketStage::RefForwarding => WorkBucket::new(false, worker_monitor.clone(), worker_group.clone()), + WorkBucketStage::FinalizableForwarding => WorkBucket::new(false, worker_monitor.clone(), worker_group.clone()), + WorkBucketStage::Compact => WorkBucket::new(false, worker_monitor.clone(), worker_group.clone()), + WorkBucketStage::Release => WorkBucket::new(false, worker_monitor.clone(), worker_group.clone()), + WorkBucketStage::Final => WorkBucket::new(false, worker_monitor.clone(), worker_group.clone()), }; // Set the open condition of each bucket. { // Unconstrained is always open. Prepare will be opened at the beginning of a GC. // This vec will grow for each stage we call with open_next() - let mut open_stages: Vec = vec![Unconstrained, Prepare]; + let first_stw_stage = WorkBucketStage::first_stw_stage(); + let mut open_stages: Vec = vec![first_stw_stage]; // The rest will open after the previous stage is done. - let mut open_next = |s: WorkBucketStage| { - let cur_stages = open_stages.clone(); - work_buckets[s].set_open_condition(move |scheduler: &GCWorkScheduler| { - let should_open = scheduler.are_buckets_drained(&cur_stages) - && scheduler.all_workers_parked(); - // Additional check before the `RefClosure` bucket opens. - if should_open && s == crate::scheduler::work_bucket::LAST_CLOSURE_BUCKET { - if let Some(closure_end) = scheduler.closure_end.lock().unwrap().as_ref() { - if closure_end() { - // Don't open `RefClosure` if `closure_end` added more works to `Closure`. - return false; + let stages = (0..WorkBucketStage::LENGTH).map(WorkBucketStage::from_usize); + for stage in stages { + if stage != WorkBucketStage::Unconstrained && stage != first_stw_stage { + let cur_stages = open_stages.clone(); + work_buckets[stage].set_open_condition( + move |scheduler: &GCWorkScheduler| { + let should_open = scheduler.are_buckets_drained(&cur_stages); + // Additional check before the `RefClosure` bucket opens. + if should_open && stage == LAST_CLOSURE_BUCKET { + if let Some(closure_end) = + scheduler.closure_end.lock().unwrap().as_ref() + { + if closure_end() { + // Don't open `LAST_CLOSURE_BUCKET` if `closure_end` added more works to `Closure`. + return false; + } + } } - } - } - should_open - }); - open_stages.push(s); - }; - - open_next(Closure); - open_next(SoftRefClosure); - open_next(WeakRefClosure); - open_next(FinalRefClosure); - open_next(PhantomRefClosure); - open_next(CalculateForwarding); - open_next(SecondRoots); - open_next(RefForwarding); - open_next(FinalizableForwarding); - open_next(Compact); - open_next(Release); - open_next(Final); + should_open + }, + ); + open_stages.push(stage); + } + } } - // Create the work bucket for the controller. - let coordinator_work = WorkBucket::new(true, worker_monitor.clone()); - - // We prepare the shared part of workers, but do not create the actual workers now. - // The shared parts of workers are communication hubs between controller and workers. - let workers_shared = (0..num_workers) - .map(|_| Arc::new(GCWorkerShared::::new(worker_monitor.clone()))) - .collect::>(); - - // Similarly, we create the shared part of the work of the controller, but not the controller itself. - let coordinator_worker_shared = Arc::new(GCWorkerShared::::new(worker_monitor.clone())); + let coordinator_worker_shared = Arc::new(GCWorkerShared::::new(None)); Arc::new(Self { work_buckets, - coordinator_work, - workers_shared, + worker_group, coordinator_worker_shared, worker_monitor, closure_end: Mutex::new(None), + pending_messages: AtomicUsize::new(0), }) } #[inline] pub fn num_workers(&self) -> usize { - self.workers_shared.len() - } - - pub fn all_workers_parked(&self) -> bool { - self.workers_shared.iter().all(|w| w.is_parked()) + self.worker_group.as_ref().worker_count() } /// Create GC threads, including the controller thread and all workers. @@ -148,11 +132,12 @@ impl GCWorkScheduler { // Spawn the controller thread. let coordinator_worker = GCWorker::new( mmtk, - 0, + usize::MAX, self.clone(), true, sender.clone(), self.coordinator_worker_shared.clone(), + deque::Worker::new_fifo(), ); let gc_controller = GCController::new( mmtk, @@ -163,18 +148,7 @@ impl GCWorkScheduler { ); VM::VMCollection::spawn_gc_thread(tls, GCThreadContext::::Controller(gc_controller)); - // Spawn each worker thread. - for (ordinal, shared) in self.workers_shared.iter().enumerate() { - let worker = Box::new(GCWorker::new( - mmtk, - ordinal, - self.clone(), - false, - sender.clone(), - shared.clone(), - )); - VM::VMCollection::spawn_gc_thread(tls, GCThreadContext::::Worker(worker)); - } + self.worker_group.spawn(mmtk, sender, tls) } /// Schedule all the common work packets @@ -251,6 +225,7 @@ impl GCWorkScheduler { fn are_buckets_drained(&self, buckets: &[WorkBucketStage]) -> bool { buckets.iter().all(|&b| self.work_buckets[b].is_drained()) + && self.pending_messages.load(Ordering::SeqCst) == 0 } pub fn on_closure_end(&self, f: Box bool>) { @@ -261,121 +236,188 @@ impl GCWorkScheduler { self.work_buckets.values().all(|bucket| bucket.is_empty()) } - /// Open buckets if their conditions are met - pub fn update_buckets(&self) { + /// Open buckets if their conditions are met. + /// + /// This function should only be called after all the workers are parked. + /// No workers will be waked up by this function. The caller is responsible for that. + /// + /// Return true if there're any non-empty buckets updated. + fn update_buckets(&self) -> bool { let mut buckets_updated = false; - for (id, bucket) in self.work_buckets.iter() { + let mut new_packets = false; + for i in 0..WorkBucketStage::LENGTH { + let id = WorkBucketStage::from_usize(i); if id == WorkBucketStage::Unconstrained { continue; } - buckets_updated |= bucket.update(self); - } - if buckets_updated { - // Notify the workers for new work - let _guard = self.worker_monitor.0.lock().unwrap(); - self.worker_monitor.1.notify_all(); + let bucket = &self.work_buckets[id]; + let bucket_opened = bucket.update(self); + buckets_updated = buckets_updated || bucket_opened; + if bucket_opened { + new_packets = new_packets || !bucket.is_drained(); + // Quit the loop. There'are already new packets in the newly opened buckets. + if new_packets { + break; + } + } } + buckets_updated && new_packets } pub fn deactivate_all(&self) { - for (stage, bucket) in self.work_buckets.iter() { - if stage == WorkBucketStage::Unconstrained { - continue; + self.work_buckets.iter().for_each(|(id, bkt)| { + if id != WorkBucketStage::Unconstrained { + bkt.deactivate(); } - - bucket.deactivate(); - } + }); } pub fn reset_state(&self) { - for (stage, bucket) in self.work_buckets.iter() { - if stage == WorkBucketStage::Unconstrained || stage == WorkBucketStage::Prepare { - continue; + let first_stw_stage = WorkBucketStage::first_stw_stage(); + self.work_buckets.iter().for_each(|(id, bkt)| { + if id != WorkBucketStage::Unconstrained && id != first_stw_stage { + bkt.deactivate(); } - - bucket.deactivate(); - } + }); } pub fn debug_assert_all_buckets_deactivated(&self) { - for (stage, bucket) in self.work_buckets.iter() { - if stage == WorkBucketStage::Unconstrained { - return; - } - - debug_assert!(!bucket.is_activated()); + if cfg!(debug_assertions) { + self.work_buckets.iter().for_each(|(id, bkt)| { + if id != WorkBucketStage::Unconstrained { + assert!(!bkt.is_activated()); + } + }); } } pub fn add_coordinator_work(&self, work: impl CoordinatorWork, worker: &GCWorker) { + self.pending_messages.fetch_add(1, Ordering::SeqCst); worker .sender .send(CoordinatorMessage::Work(Box::new(work))) .unwrap(); } - #[inline] - fn pop_scheduable_work(&self, worker: &GCWorker) -> Option<(Box>, bool)> { - if let Some(work) = worker.shared.local_work_bucket.poll() { - return Some((work, worker.shared.local_work_bucket.is_empty())); + /// Check if all the work buckets are empty + #[inline(always)] + fn all_activated_buckets_are_empty(&self) -> bool { + for bucket in self.work_buckets.values() { + if bucket.is_activated() && !bucket.is_drained() { + return false; + } } + true + } + + /// Get a schedulable work packet without retry. + #[inline(always)] + fn poll_schedulable_work_once(&self, worker: &GCWorker) -> Steal>> { + let mut should_retry = false; + // Try find a packet that can be processed only by this worker. + if let Some(w) = worker.shared.designated_work.pop() { + return Steal::Success(w); + } + // Try get a packet from a work bucket. for work_bucket in self.work_buckets.values() { - if let Some(work) = work_bucket.poll() { - return Some((work, work_bucket.is_empty())); + match work_bucket.poll(&worker.local_work_buffer) { + Steal::Success(w) => return Steal::Success(w), + Steal::Retry => should_retry = true, + _ => {} + } + } + // Try steal some packets from any worker + for (id, worker_shared) in self.worker_group.workers_shared.iter().enumerate() { + if id == worker.ordinal { + continue; + } + match worker_shared.stealer.as_ref().unwrap().steal() { + Steal::Success(w) => return Steal::Success(w), + Steal::Retry => should_retry = true, + _ => {} } } - None + if should_retry { + Steal::Retry + } else { + Steal::Empty + } } - /// Get a scheduable work. Called by workers + /// Get a schedulable work packet. #[inline] - pub fn poll(&self, worker: &GCWorker) -> Box> { - let work = if let Some((work, bucket_is_empty)) = self.pop_scheduable_work(worker) { - if bucket_is_empty { - worker - .sender - .send(CoordinatorMessage::BucketDrained) - .unwrap(); + fn poll_schedulable_work(&self, worker: &GCWorker) -> Option>> { + // Loop until we successfully get a packet. + loop { + match self.poll_schedulable_work_once(worker) { + Steal::Success(w) => { + return Some(w); + } + Steal::Retry => { + std::thread::yield_now(); + continue; + } + Steal::Empty => { + return None; + } } - work - } else { - self.poll_slow(worker) - }; - work + } + } + + /// Called by workers to get a schedulable work packet. + /// Park the worker if there're no available packets. + #[inline] + pub fn poll(&self, worker: &GCWorker) -> Box> { + self.poll_schedulable_work(worker) + .unwrap_or_else(|| self.poll_slow(worker)) } #[cold] fn poll_slow(&self, worker: &GCWorker) -> Box> { - debug_assert!(!worker.shared.is_parked()); - let mut guard = self.worker_monitor.0.lock().unwrap(); loop { - debug_assert!(!worker.shared.is_parked()); - if let Some((work, bucket_is_empty)) = self.pop_scheduable_work(worker) { - if bucket_is_empty { - worker - .sender - .send(CoordinatorMessage::BucketDrained) - .unwrap(); - } + let guard = self.worker_monitor.0.lock().unwrap(); + // Retry polling + if let Some(work) = self.poll_schedulable_work(worker) { return work; } - // Park this worker - worker.shared.parked.store(true, Ordering::SeqCst); - if self.all_workers_parked() { - worker - .sender - .send(CoordinatorMessage::AllWorkerParked) - .unwrap(); + // Prepare to park this worker + let all_parked = self.worker_group.inc_parked_workers(); + // If all workers are parked, try activate new buckets + if all_parked { + // If there're any designated work, resume the workers and process them + if self.worker_group.has_designated_work() { + self.worker_group.dec_parked_workers(); + self.worker_monitor.1.notify_all(); + continue; + } + if self.update_buckets() { + // We're not going to sleep since a new bucket is just open. + self.worker_group.dec_parked_workers(); + // We guarantee that we can at least fetch one packet. + let work = self.poll_schedulable_work(worker).unwrap(); + // Optimize for the case that a newly opened bucket only has one packet. + // We only notify_all if there're more than one packets available. + if !self.all_activated_buckets_are_empty() { + // Have more jobs in this buckets. Notify other workers. + self.worker_monitor.1.notify_all(); + } + // Return this packet and execute it. + return work; + } + debug_assert!(!self.worker_group.has_designated_work()); + // The current pause is finished if we can't open more buckets. + self.pending_messages.fetch_add(1, Ordering::SeqCst); + worker.sender.send(CoordinatorMessage::Finish).unwrap(); } // Wait - guard = self.worker_monitor.1.wait(guard).unwrap(); + let _guard = self.worker_monitor.1.wait(guard).unwrap(); // Unpark this worker - worker.shared.parked.store(false, Ordering::SeqCst); + self.worker_group.dec_parked_workers(); } } pub fn enable_stat(&self) { - for worker in &self.workers_shared { + for worker in &self.worker_group.workers_shared { let worker_stat = worker.borrow_stat(); worker_stat.enable(); } @@ -385,7 +427,7 @@ impl GCWorkScheduler { pub fn statistics(&self) -> HashMap { let mut summary = SchedulerStat::default(); - for worker in &self.workers_shared { + for worker in &self.worker_group.workers_shared { let worker_stat = worker.borrow_stat(); summary.merge(&worker_stat); } @@ -396,8 +438,9 @@ impl GCWorkScheduler { pub fn notify_mutators_paused(&self, mmtk: &'static MMTK) { mmtk.plan.base().gc_requester.clear_request(); - debug_assert!(!self.work_buckets[WorkBucketStage::Prepare].is_activated()); - self.work_buckets[WorkBucketStage::Prepare].activate(); + let first_stw_bucket = &self.work_buckets[WorkBucketStage::first_stw_stage()]; + debug_assert!(!first_stw_bucket.is_activated()); + first_stw_bucket.activate(); let _guard = self.worker_monitor.0.lock().unwrap(); self.worker_monitor.1.notify_all(); } diff --git a/src/scheduler/work_bucket.rs b/src/scheduler/work_bucket.rs index 9694708eac..078c6a94ab 100644 --- a/src/scheduler/work_bucket.rs +++ b/src/scheduler/work_bucket.rs @@ -1,147 +1,198 @@ +use super::worker::WorkerGroup; use super::*; use crate::vm::VMBinding; +use crossbeam::deque::{Injector, Steal, Worker}; use enum_map::Enum; -use spin::RwLock; -use std::cmp; -use std::collections::BinaryHeap; -use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; +use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, Condvar, Mutex}; -/// A unique work-packet id for each instance of work-packet -#[derive(Eq, PartialEq, Clone, Copy)] -struct WorkUID(u64); - -impl WorkUID { - pub fn new() -> Self { - static WORK_UID: AtomicU64 = AtomicU64::new(0); - Self(WORK_UID.fetch_add(1, Ordering::Relaxed)) - } +struct BucketQueue { + queue: Injector>>, } -struct PrioritizedWork { - priority: usize, - work_uid: WorkUID, - work: Box>, -} - -impl PrioritizedWork { - pub fn new(priority: usize, work: Box>) -> Self { +impl BucketQueue { + fn new() -> Self { Self { - priority, - work, - work_uid: WorkUID::new(), + queue: Injector::new(), } } -} -impl PartialEq for PrioritizedWork { - fn eq(&self, other: &Self) -> bool { - self.priority == other.priority && self.work_uid == other.work_uid + #[inline(always)] + fn is_empty(&self) -> bool { + self.queue.is_empty() } -} -impl Eq for PrioritizedWork {} + #[inline(always)] + fn steal_batch_and_pop( + &self, + dest: &Worker>>, + ) -> Steal>> { + self.queue.steal_batch_and_pop(dest) + } -impl Ord for PrioritizedWork { - fn cmp(&self, other: &Self) -> cmp::Ordering { - self.priority.cmp(&other.priority) + #[inline(always)] + fn push(&self, w: Box>) { + self.queue.push(w); } -} -impl PartialOrd for PrioritizedWork { - fn partial_cmp(&self, other: &Self) -> Option { - Some(self.cmp(other)) + #[inline(always)] + fn push_all(&self, ws: Vec>>) { + for w in ws { + self.queue.push(w); + } } } pub struct WorkBucket { active: AtomicBool, - /// A priority queue - queue: RwLock>>, + queue: BucketQueue, + prioritized_queue: Option>, monitor: Arc<(Mutex<()>, Condvar)>, can_open: Option) -> bool) + Send>>, + group: Arc>, } impl WorkBucket { - pub const DEFAULT_PRIORITY: usize = 1000; - pub fn new(active: bool, monitor: Arc<(Mutex<()>, Condvar)>) -> Self { + pub fn new( + active: bool, + monitor: Arc<(Mutex<()>, Condvar)>, + group: Arc>, + ) -> Self { Self { active: AtomicBool::new(active), - queue: Default::default(), + queue: BucketQueue::new(), + prioritized_queue: None, monitor, can_open: None, + group, } } + + #[inline(always)] fn notify_one_worker(&self) { - let _guard = self.monitor.0.lock().unwrap(); - self.monitor.1.notify_one() + // If the bucket is not activated, don't notify anyone. + if !self.is_activated() { + return; + } + // Notify one if there're any parked workers. + if self.group.parked_workers() > 0 { + let _guard = self.monitor.0.lock().unwrap(); + self.monitor.1.notify_one() + } } - fn notify_all_workers(&self) { - let _guard = self.monitor.0.lock().unwrap(); - self.monitor.1.notify_all() + + #[inline(always)] + pub fn notify_all_workers(&self) { + // If the bucket is not activated, don't notify anyone. + if !self.is_activated() { + return; + } + // Notify all if there're any parked workers. + if self.group.parked_workers() > 0 { + let _guard = self.monitor.0.lock().unwrap(); + self.monitor.1.notify_all() + } } + + #[inline(always)] pub fn is_activated(&self) -> bool { self.active.load(Ordering::SeqCst) } + /// Enable the bucket pub fn activate(&self) { self.active.store(true, Ordering::SeqCst); } + /// Test if the bucket is drained + #[inline(always)] pub fn is_empty(&self) -> bool { - self.queue.read().len() == 0 + self.queue.is_empty() + && self + .prioritized_queue + .as_ref() + .map(|q| q.is_empty()) + .unwrap_or(true) } + + #[inline(always)] pub fn is_drained(&self) -> bool { self.is_activated() && self.is_empty() } + /// Disable the bucket pub fn deactivate(&self) { - debug_assert!( - self.queue.read().is_empty(), - "Bucket not drained before close" - ); - self.active.store(false, Ordering::SeqCst); - } - /// Add a work packet to this bucket, with a given priority - pub fn add_with_priority(&self, priority: usize, work: Box>) { - self.queue - .write() - .push(PrioritizedWork::new(priority, work)); - self.notify_one_worker(); // FIXME: Performance - } - /// Add a work packet to this bucket, with a default priority (1000) + debug_assert!(self.queue.is_empty(), "Bucket not drained before close"); + self.active.store(false, Ordering::Relaxed); + } + + /// Add a work packet to this bucket + /// Panic if this bucket cannot receive prioritized packets. + #[inline(always)] + pub fn add_prioritized(&self, work: Box>) { + self.prioritized_queue.as_ref().unwrap().push(work); + self.notify_one_worker(); + } + + /// Add a work packet to this bucket + #[inline(always)] pub fn add>(&self, work: W) { - self.add_with_priority(Self::DEFAULT_PRIORITY, Box::new(work)); + self.queue.push(Box::new(work)); + self.notify_one_worker(); } - /// Add a boxed work packet to this bucket, with a default priority (1000) + + /// Add a work packet to this bucket + #[inline(always)] pub fn add_boxed(&self, work: Box>) { - self.add_with_priority(Self::DEFAULT_PRIORITY, work); + self.queue.push(work); + self.notify_one_worker(); } - pub fn bulk_add_with_priority(&self, priority: usize, work_vec: Vec>>) { - { - let mut queue = self.queue.write(); - for w in work_vec { - queue.push(PrioritizedWork::new(priority, w)); - } + + /// Add multiple packets with a higher priority. + /// Panic if this bucket cannot receive prioritized packets. + #[inline(always)] + pub fn bulk_add_prioritized(&self, work_vec: Vec>>) { + self.prioritized_queue.as_ref().unwrap().push_all(work_vec); + if self.is_activated() { + self.notify_all_workers(); } - self.notify_all_workers(); // FIXME: Performance } + + /// Add multiple packets + #[inline(always)] pub fn bulk_add(&self, work_vec: Vec>>) { - self.bulk_add_with_priority(1000, work_vec) + if work_vec.is_empty() { + return; + } + self.queue.push_all(work_vec); + if self.is_activated() { + self.notify_all_workers(); + } } - /// Get a work packet (with the greatest priority) from this bucket - pub fn poll(&self) -> Option>> { - if !self.active.load(Ordering::SeqCst) { - return None; + + /// Get a work packet from this bucket + #[inline(always)] + pub fn poll(&self, worker: &Worker>>) -> Steal>> { + if !self.is_activated() || self.is_empty() { + return Steal::Empty; + } + if let Some(prioritized_queue) = self.prioritized_queue.as_ref() { + prioritized_queue + .steal_batch_and_pop(worker) + .or_else(|| self.queue.steal_batch_and_pop(worker)) + } else { + self.queue.steal_batch_and_pop(worker) } - self.queue.write().pop().map(|v| v.work) } + pub fn set_open_condition( &mut self, pred: impl Fn(&GCWorkScheduler) -> bool + Send + 'static, ) { self.can_open = Some(Box::new(pred)); } + + #[inline(always)] pub fn update(&self, scheduler: &GCWorkScheduler) -> bool { if let Some(can_open) = self.can_open.as_ref() { if !self.is_activated() && can_open(scheduler) { @@ -171,4 +222,11 @@ pub enum WorkBucketStage { Final, } +impl WorkBucketStage { + #[inline] + pub fn first_stw_stage() -> Self { + WorkBucketStage::from_usize(1) + } +} + pub const LAST_CLOSURE_BUCKET: WorkBucketStage = WorkBucketStage::PhantomRefClosure; diff --git a/src/scheduler/worker.rs b/src/scheduler/worker.rs index 6b2eb1e721..011d8bce19 100644 --- a/src/scheduler/worker.rs +++ b/src/scheduler/worker.rs @@ -4,31 +4,31 @@ use super::*; use crate::mmtk::MMTK; use crate::util::copy::GCWorkerCopyContext; use crate::util::opaque_pointer::*; -use crate::vm::VMBinding; +use crate::vm::{Collection, GCThreadContext, VMBinding}; use atomic_refcell::{AtomicRef, AtomicRefCell, AtomicRefMut}; -use std::sync::atomic::{AtomicBool, Ordering}; +use crossbeam::deque::{self, Stealer}; +use crossbeam::queue::ArrayQueue; +use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::mpsc::Sender; -use std::sync::{Arc, Condvar, Mutex}; - -const LOCALLY_CACHED_WORKS: usize = 1; +use std::sync::{Arc, Mutex}; /// The part shared between a GCWorker and the scheduler. /// This structure is used for communication, e.g. adding new work packets. pub struct GCWorkerShared { - /// True if the GC worker is parked. - pub parked: AtomicBool, /// Worker-local statistics data. stat: AtomicRefCell>, - /// Incoming work packets to be executed by the current worker. - pub local_work_bucket: WorkBucket, + /// A queue of GCWork that can only be processed by the owned thread. + pub designated_work: ArrayQueue>>, + /// Handle for stealing packets from the current worker + pub stealer: Option>>>, } impl GCWorkerShared { - pub fn new(worker_monitor: Arc<(Mutex<()>, Condvar)>) -> Self { + pub fn new(stealer: Option>>>) -> Self { Self { - parked: AtomicBool::new(true), stat: Default::default(), - local_work_bucket: WorkBucket::new(true, worker_monitor), + designated_work: ArrayQueue::new(16), + stealer, } } } @@ -52,11 +52,10 @@ pub struct GCWorker { /// True if this struct is the embedded GCWorker of the controller thread. /// False if this struct belongs to a standalone GCWorker thread. is_coordinator: bool, - /// Cache of work packets created by the current worker. - /// May be flushed to the global pool or executed locally. - local_work_buffer: Vec<(WorkBucketStage, Box>)>, /// Reference to the shared part of the GC worker. It is used for synchronization. pub shared: Arc>, + /// Local work packet queue. + pub local_work_buffer: deque::Worker>>, } unsafe impl Sync for GCWorkerShared {} @@ -67,10 +66,6 @@ const STAT_BORROWED_MSG: &str = "GCWorkerShared.stat is already borrowed. This the mutator calls harness_begin or harness_end while the GC is running."; impl GCWorkerShared { - pub fn is_parked(&self) -> bool { - self.parked.load(Ordering::SeqCst) - } - pub fn borrow_stat(&self) -> AtomicRef> { self.stat.try_borrow().expect(STAT_BORROWED_MSG) } @@ -88,6 +83,7 @@ impl GCWorker { is_coordinator: bool, sender: Sender>, shared: Arc>, + local_work_buffer: deque::Worker>>, ) -> Self { Self { tls: VMWorkerThread(VMThread::UNINITIALIZED), @@ -98,30 +94,39 @@ impl GCWorker { scheduler, mmtk, is_coordinator, - local_work_buffer: Vec::with_capacity(LOCALLY_CACHED_WORKS), shared, + local_work_buffer, } } + const LOCALLY_CACHED_WORK_PACKETS: usize = 16; + + /// Add a work packet to the work queue and mark it with a higher priority. + /// If the bucket is activated, the packet will be pushed to the local queue, otherwise it will be + /// pushed to the global bucket with a higher priority. #[inline] - pub fn add_work(&mut self, bucket: WorkBucketStage, work: impl GCWork) { - if !self.scheduler().work_buckets[bucket].is_activated() { - self.scheduler.work_buckets[bucket].add_with_priority(1000, Box::new(work)); + pub fn add_work_prioritized(&mut self, bucket: WorkBucketStage, work: impl GCWork) { + if !self.scheduler().work_buckets[bucket].is_activated() + || self.local_work_buffer.len() >= Self::LOCALLY_CACHED_WORK_PACKETS + { + self.scheduler.work_buckets[bucket].add_prioritized(Box::new(work)); return; } - self.local_work_buffer.push((bucket, Box::new(work))); - if self.local_work_buffer.len() > LOCALLY_CACHED_WORKS { - self.flush(); - } + self.local_work_buffer.push(Box::new(work)); } - #[cold] - fn flush(&mut self) { - let mut buffer = Vec::with_capacity(LOCALLY_CACHED_WORKS); - std::mem::swap(&mut buffer, &mut self.local_work_buffer); - for (bucket, work) in buffer { - self.scheduler.work_buckets[bucket].add_with_priority(1000, work); + /// Add a work packet to the work queue. + /// If the bucket is activated, the packet will be pushed to the local queue, otherwise it will be + /// pushed to the global bucket. + #[inline] + pub fn add_work(&mut self, bucket: WorkBucketStage, work: impl GCWork) { + if !self.scheduler().work_buckets[bucket].is_activated() + || self.local_work_buffer.len() >= Self::LOCALLY_CACHED_WORK_PACKETS + { + self.scheduler.work_buckets[bucket].add(work); + return; } + self.local_work_buffer.push(Box::new(work)); } pub fn is_coordinator(&self) -> bool { @@ -140,22 +145,131 @@ impl GCWorker { work.do_work(self, self.mmtk); } + /// Poll a ready-to-execute work packet in the following order: + /// + /// 1. Any packet that should be processed only by this worker. + /// 2. Poll from the local work queue. + /// 3. Poll from activated global work-buckets + /// 4. Steal from other workers + fn poll(&self) -> Box> { + self.shared + .designated_work + .pop() + .or_else(|| self.local_work_buffer.pop()) + .unwrap_or_else(|| self.scheduler().poll(self)) + } + pub fn do_boxed_work(&'static mut self, mut work: Box>) { work.do_work(self, self.mmtk); } + /// Entry of the worker thread. + /// Each worker will keep polling and executing work packets in a loop. pub fn run(&mut self, tls: VMWorkerThread, mmtk: &'static MMTK) { self.tls = tls; self.copy = crate::plan::create_gc_worker_context(tls, mmtk); - self.shared.parked.store(false, Ordering::SeqCst); loop { - while let Some((bucket, mut work)) = self.local_work_buffer.pop() { - debug_assert!(self.scheduler.work_buckets[bucket].is_activated()); - work.do_work_with_stat(self, mmtk); - } - let mut work = self.scheduler().poll(self); - debug_assert!(!self.shared.is_parked()); + let mut work = self.poll(); work.do_work_with_stat(self, mmtk); } } } + +/// A worker group to manage all the GC workers (except the coordinator worker). +pub struct WorkerGroup { + /// Shared worker data + pub workers_shared: Vec>>, + parked_workers: AtomicUsize, + unspawned_local_work_queues: Mutex>>>>, +} + +impl WorkerGroup { + /// Create a WorkerGroup + pub fn new(num_workers: usize) -> Arc { + let unspawned_local_work_queues = (0..num_workers) + .map(|_| deque::Worker::new_fifo()) + .collect::>(); + + let workers_shared = (0..num_workers) + .map(|i| { + Arc::new(GCWorkerShared::::new(Some( + unspawned_local_work_queues[i].stealer(), + ))) + }) + .collect::>(); + + Arc::new(Self { + workers_shared, + parked_workers: Default::default(), + unspawned_local_work_queues: Mutex::new(unspawned_local_work_queues), + }) + } + + /// Spawn all the worker threads + pub fn spawn( + &self, + mmtk: &'static MMTK, + sender: Sender>, + tls: VMThread, + ) { + let mut unspawned_local_work_queues = self.unspawned_local_work_queues.lock().unwrap(); + // Spawn each worker thread. + for (ordinal, shared) in self.workers_shared.iter().enumerate() { + let worker = Box::new(GCWorker::new( + mmtk, + ordinal, + mmtk.scheduler.clone(), + false, + sender.clone(), + shared.clone(), + unspawned_local_work_queues.pop().unwrap(), + )); + VM::VMCollection::spawn_gc_thread(tls, GCThreadContext::::Worker(worker)); + } + debug_assert!(unspawned_local_work_queues.is_empty()); + } + + /// Get the number of workers in the group + #[inline(always)] + pub fn worker_count(&self) -> usize { + self.workers_shared.len() + } + + /// Increase the packed-workers counter. + /// Called before a worker is parked. + /// + /// Return true if all the workers are parked. + #[inline(always)] + pub fn inc_parked_workers(&self) -> bool { + let old = self.parked_workers.fetch_add(1, Ordering::SeqCst); + debug_assert!(old < self.worker_count()); + old + 1 == self.worker_count() + } + + /// Decrease the packed-workers counter. + /// Called after a worker is resumed from the parked state. + #[inline(always)] + pub fn dec_parked_workers(&self) { + let old = self.parked_workers.fetch_sub(1, Ordering::SeqCst); + debug_assert!(old <= self.worker_count()); + } + + /// Get the number of parked workers in the group + #[inline(always)] + pub fn parked_workers(&self) -> usize { + self.parked_workers.load(Ordering::SeqCst) + } + + /// Check if all the workers are packed + #[inline(always)] + pub fn all_parked(&self) -> bool { + self.parked_workers() == self.worker_count() + } + + /// Return true if there're any pending designated work + pub fn has_designated_work(&self) -> bool { + self.workers_shared + .iter() + .any(|w| !w.designated_work.is_empty()) + } +} diff --git a/src/util/reference_processor.rs b/src/util/reference_processor.rs index 7ab4c9c31b..7998a7bba9 100644 --- a/src/util/reference_processor.rs +++ b/src/util/reference_processor.rs @@ -96,8 +96,6 @@ impl ReferenceProcessors { /// Scan weak references. pub fn scan_weak_refs(&self, trace: &mut E, mmtk: &'static MMTK) { - self.soft - .scan::(trace, mmtk.plan.is_current_gc_nursery()); self.weak .scan::(trace, mmtk.plan.is_current_gc_nursery()); } diff --git a/src/util/sanity/sanity_checker.rs b/src/util/sanity/sanity_checker.rs index e02e67c4d3..2da10c9a75 100644 --- a/src/util/sanity/sanity_checker.rs +++ b/src/util/sanity/sanity_checker.rs @@ -109,8 +109,9 @@ impl GCWork for SanityPrepare

{ mmtk.scheduler.work_buckets[WorkBucketStage::Prepare] .add(PrepareMutator::::new(mutator)); } - for w in &mmtk.scheduler.workers_shared { - w.local_work_bucket.add(PrepareCollector); + for w in &mmtk.scheduler.worker_group.workers_shared { + let result = w.designated_work.push(Box::new(PrepareCollector)); + debug_assert!(result.is_ok()); } } } @@ -133,8 +134,9 @@ impl GCWork for SanityRelease

{ mmtk.scheduler.work_buckets[WorkBucketStage::Release] .add(ReleaseMutator::::new(mutator)); } - for w in &mmtk.scheduler.workers_shared { - w.local_work_bucket.add(ReleaseCollector); + for w in &mmtk.scheduler.worker_group.workers_shared { + let result = w.designated_work.push(Box::new(ReleaseCollector)); + debug_assert!(result.is_ok()); } } }