diff --git a/src/memory_manager.rs b/src/memory_manager.rs index 7ce6a75966..305ea34f70 100644 --- a/src/memory_manager.rs +++ b/src/memory_manager.rs @@ -900,9 +900,3 @@ pub fn add_work_packets( ) { mmtk.scheduler.work_buckets[bucket].bulk_add(packets) } - -/// Add a callback to be notified after the transitive closure is finished. -/// The callback should return true if it add more work packets to the closure bucket. -pub fn on_closure_end(mmtk: &'static MMTK, f: Box bool>) { - mmtk.scheduler.on_closure_end(f) -} diff --git a/src/plan/markcompact/global.rs b/src/plan/markcompact/global.rs index 8a419f52e7..d749a4bc32 100644 --- a/src/plan/markcompact/global.rs +++ b/src/plan/markcompact/global.rs @@ -121,10 +121,6 @@ impl Plan for MarkCompact { scheduler.work_buckets[WorkBucketStage::PhantomRefClosure] .add(PhantomRefProcessing::>::new()); - // VM-specific weak ref processing - scheduler.work_buckets[WorkBucketStage::WeakRefClosure] - .add(VMProcessWeakRefs::>::new()); - use crate::util::reference_processor::RefForwarding; scheduler.work_buckets[WorkBucketStage::RefForwarding] .add(RefForwarding::>::new()); @@ -147,6 +143,17 @@ impl Plan for MarkCompact { .add(ForwardFinalization::>::new()); } + // VM-specific weak ref processing + scheduler.work_buckets[WorkBucketStage::VMRefClosure] + .set_sentinel(Box::new(VMProcessWeakRefs::>::new())); + + // VM-specific weak ref forwarding + scheduler.work_buckets[WorkBucketStage::VMRefForwarding] + .add(VMForwardWeakRefs::>::new()); + + // VM-specific work after forwarding, possible to implement ref enququing. + scheduler.work_buckets[WorkBucketStage::Release].add(VMPostForwarding::::default()); + // Analysis GC work #[cfg(feature = "analysis")] { diff --git a/src/scheduler/gc_work.rs b/src/scheduler/gc_work.rs index 45bd877bd8..d24a1d7678 100644 --- a/src/scheduler/gc_work.rs +++ b/src/scheduler/gc_work.rs @@ -108,9 +108,9 @@ impl Release { impl GCWork for Release { fn do_work(&mut self, worker: &mut GCWorker, mmtk: &'static MMTK) { trace!("Release Global"); + self.plan.base().gc_trigger.policy.on_gc_release(mmtk); - ::VMCollection::vm_release(); // We assume this is the only running work packet that accesses plan at the point of execution #[allow(clippy::cast_ref_to_mut)] let plan_mut: &mut C::PlanType = unsafe { &mut *(self.plan as *const _ as *mut _) }; @@ -252,24 +252,190 @@ impl GCWork for EndOfGC { impl CoordinatorWork for EndOfGC {} -/// Delegate to the VM binding for reference processing. +/// This implements `ObjectTracer` by forwarding the `trace_object` calls to the wrapped +/// `ProcessEdgesWork` instance. +struct ProcessEdgesWorkTracer { + process_edges_work: E, + stage: WorkBucketStage, +} + +impl ObjectTracer for ProcessEdgesWorkTracer { + /// Forward the `trace_object` call to the underlying `ProcessEdgesWork`, + /// and flush as soon as the underlying buffer of `process_edges_work` is full. + /// + /// This function is inlined because `trace_object` is probably the hottest function in MMTk. + /// If this function is called in small closures, please profile the program and make sure the + /// closure is inlined, too. + #[inline(always)] + fn trace_object(&mut self, object: ObjectReference) -> ObjectReference { + let result = self.process_edges_work.trace_object(object); + self.flush_if_full(); + result + } +} + +impl ProcessEdgesWorkTracer { + #[inline(always)] + fn flush_if_full(&mut self) { + if self.process_edges_work.nodes.is_full() { + self.flush(); + } + } + + pub fn flush_if_not_empty(&mut self) { + if !self.process_edges_work.nodes.is_empty() { + self.flush(); + } + } + + #[cold] + fn flush(&mut self) { + let next_nodes = self.process_edges_work.pop_nodes(); + assert!(!next_nodes.is_empty()); + let work_packet = self.process_edges_work.create_scan_work(next_nodes, false); + let worker = self.process_edges_work.worker(); + worker.scheduler().work_buckets[self.stage].add(work_packet); + } +} + +/// This type implements `ObjectTracerContext` by creating a temporary `ProcessEdgesWork` during +/// the call to `with_tracer`, making use of its `trace_object` method. It then creates work +/// packets using the methods of the `ProcessEdgesWork` and add the work packet into the given +/// `stage`. +struct ProcessEdgesWorkTracerContext { + stage: WorkBucketStage, + phantom_data: PhantomData, +} + +impl Clone for ProcessEdgesWorkTracerContext { + fn clone(&self) -> Self { + Self { ..*self } + } +} + +impl ObjectTracerContext for ProcessEdgesWorkTracerContext { + type TracerType = ProcessEdgesWorkTracer; + + fn with_tracer(&self, worker: &mut GCWorker, func: F) -> R + where + F: FnOnce(&mut Self::TracerType) -> R, + { + let mmtk = worker.mmtk; + + // Prepare the underlying ProcessEdgesWork + let mut process_edges_work = E::new(vec![], false, mmtk); + // FIXME: This line allows us to omit the borrowing lifetime of worker. + // We should refactor ProcessEdgesWork so that it uses `worker` locally, not as a member. + process_edges_work.set_worker(worker); + + // Cretae the tracer. + let mut tracer = ProcessEdgesWorkTracer { + process_edges_work, + stage: self.stage, + }; + + // The caller can use the tracer here. + let result = func(&mut tracer); + + // Flush the queued nodes. + tracer.flush_if_not_empty(); + + result + } +} + +/// Delegate to the VM binding for weak reference processing. /// /// Some VMs (e.g. v8) do not have a Java-like global weak reference storage, and the /// processing of those weakrefs may be more complex. For such case, we delegate to the /// VM binding to process weak references. -#[derive(Default)] -pub struct VMProcessWeakRefs(PhantomData); +/// +/// NOTE: This will replace `{Soft,Weak,Phantom}RefProcessing` and `Finalization` in the future. +pub struct VMProcessWeakRefs { + phantom_data: PhantomData, +} impl VMProcessWeakRefs { pub fn new() -> Self { - Self(PhantomData) + Self { + phantom_data: PhantomData, + } } } impl GCWork for VMProcessWeakRefs { fn do_work(&mut self, worker: &mut GCWorker, _mmtk: &'static MMTK) { - trace!("ProcessWeakRefs"); - ::VMCollection::process_weak_refs(worker); // TODO: Pass a factory/callback to decide what work packet to create. + trace!("VMProcessWeakRefs"); + + let stage = WorkBucketStage::VMRefClosure; + + let need_to_repeat = { + let tracer_factory = ProcessEdgesWorkTracerContext:: { + stage, + phantom_data: PhantomData, + }; + ::VMScanning::process_weak_refs(worker, tracer_factory) + }; + + if need_to_repeat { + // Schedule Self as the new sentinel so we'll call `process_weak_refs` again after the + // current transitive closure. + let new_self = Box::new(Self::new()); + + worker.scheduler().work_buckets[stage].set_sentinel(new_self); + } + } +} + +/// Delegate to the VM binding for forwarding weak references. +/// +/// Some VMs (e.g. v8) do not have a Java-like global weak reference storage, and the +/// processing of those weakrefs may be more complex. For such case, we delegate to the +/// VM binding to process weak references. +/// +/// NOTE: This will replace `RefForwarding` and `ForwardFinalization` in the future. +pub struct VMForwardWeakRefs { + phantom_data: PhantomData, +} + +impl VMForwardWeakRefs { + pub fn new() -> Self { + Self { + phantom_data: PhantomData, + } + } +} + +impl GCWork for VMForwardWeakRefs { + fn do_work(&mut self, worker: &mut GCWorker, _mmtk: &'static MMTK) { + trace!("VMForwardWeakRefs"); + + let stage = WorkBucketStage::VMRefForwarding; + + let tracer_factory = ProcessEdgesWorkTracerContext:: { + stage, + phantom_data: PhantomData, + }; + ::VMScanning::forward_weak_refs(worker, tracer_factory) + } +} + +/// This work packet calls `Collection::post_forwarding`. +/// +/// NOTE: This will replace `RefEnqueue` in the future. +/// +/// NOTE: Although this work packet runs in parallel with the `Release` work packet, it does not +/// access the `Plan` instance. +#[derive(Default)] +pub struct VMPostForwarding { + phantom_data: PhantomData, +} + +impl GCWork for VMPostForwarding { + fn do_work(&mut self, worker: &mut GCWorker, _mmtk: &'static MMTK) { + trace!("VMPostForwarding start"); + ::VMCollection::post_forwarding(worker.tls); + trace!("VMPostForwarding end"); } } @@ -678,38 +844,22 @@ pub trait ScanObjectsWork: GCWork + Sized { // If any object does not support edge-enqueuing, we process them now. if !scan_later.is_empty() { - // We create an instance of E to use its `trace_object` method and its object queue. - let mut process_edges_work = Self::E::new(vec![], false, mmtk); - let mut closure = |object| process_edges_work.trace_object(object); - - // Scan objects and trace their edges at the same time. - for object in scan_later.iter().copied() { - ::VMScanning::scan_object_and_trace_edges( - tls, - object, - &mut closure, - ); - self.post_scan_object(object); - } - - // Create work packets to scan adjacent objects. We skip ProcessEdgesWork and create - // object-scanning packets directly, because the edges are already traced. - if !process_edges_work.nodes.is_empty() { - let next_nodes = process_edges_work.nodes.take(); - let make_packet = |nodes| { - let work_packet = self.make_another(nodes); - memory_manager::add_work_packet(mmtk, WorkBucketStage::Closure, work_packet); - }; - - // Divide the resulting nodes into appropriately sized packets. - if next_nodes.len() <= Self::E::CAPACITY { - make_packet(next_nodes); - } else { - for chunk in next_nodes.chunks(Self::E::CAPACITY) { - make_packet(chunk.into()); - } + let object_tracer_context = ProcessEdgesWorkTracerContext:: { + stage: WorkBucketStage::Closure, + phantom_data: PhantomData, + }; + + object_tracer_context.with_tracer(worker, |object_tracer| { + // Scan objects and trace their edges at the same time. + for object in scan_later.iter().copied() { + ::VMScanning::scan_object_and_trace_edges( + tls, + object, + object_tracer, + ); + self.post_scan_object(object); } - } + }); } } } diff --git a/src/scheduler/scheduler.rs b/src/scheduler/scheduler.rs index 65cfd44df2..83085e520e 100644 --- a/src/scheduler/scheduler.rs +++ b/src/scheduler/scheduler.rs @@ -1,6 +1,6 @@ use super::stat::SchedulerStat; use super::work_bucket::*; -use super::worker::{GCWorker, GCWorkerShared, ThreadId, WorkerGroup}; +use super::worker::{GCWorker, GCWorkerShared, ParkingGuard, ThreadId, WorkerGroup}; use super::*; use crate::mmtk::MMTK; use crate::util::opaque_pointer::*; @@ -33,16 +33,6 @@ pub struct GCWorkScheduler { coordinator_worker_shared: Arc>, /// Condition Variable for worker synchronization pub worker_monitor: Arc<(Mutex<()>, Condvar)>, - /// A callback to be fired after the `Closure` bucket is drained. - /// This callback should return `true` if it adds more work packets to the - /// `Closure` bucket. `WorkBucket::can_open` then consult this return value - /// to prevent the GC from proceeding to the next stage, if we still have - /// `Closure` work to do. - /// - /// We use this callback to process ephemeron objects. `closure_end` can re-enable - /// the `Closure` bucket multiple times to iteratively discover and process - /// more ephemeron objects. - closure_end: Mutex bool>>>, /// Counter for pending coordinator messages. pub(super) pending_coordinator_packets: AtomicUsize, /// How to assign the affinity of each GC thread. Specified by the user. @@ -68,10 +58,12 @@ impl GCWorkScheduler { WorkBucketStage::WeakRefClosure => WorkBucket::new(false, worker_monitor.clone(), worker_group.clone()), WorkBucketStage::FinalRefClosure => WorkBucket::new(false, worker_monitor.clone(), worker_group.clone()), WorkBucketStage::PhantomRefClosure => WorkBucket::new(false, worker_monitor.clone(), worker_group.clone()), + WorkBucketStage::VMRefClosure => WorkBucket::new(false, worker_monitor.clone(), worker_group.clone()), WorkBucketStage::CalculateForwarding => WorkBucket::new(false, worker_monitor.clone(), worker_group.clone()), WorkBucketStage::SecondRoots => WorkBucket::new(false, worker_monitor.clone(), worker_group.clone()), WorkBucketStage::RefForwarding => WorkBucket::new(false, worker_monitor.clone(), worker_group.clone()), WorkBucketStage::FinalizableForwarding => WorkBucket::new(false, worker_monitor.clone(), worker_group.clone()), + WorkBucketStage::VMRefForwarding => WorkBucket::new(false, worker_monitor.clone(), worker_group.clone()), WorkBucketStage::Compact => WorkBucket::new(false, worker_monitor.clone(), worker_group.clone()), WorkBucketStage::Release => WorkBucket::new(false, worker_monitor.clone(), worker_group.clone()), WorkBucketStage::Final => WorkBucket::new(false, worker_monitor.clone(), worker_group.clone()), @@ -90,19 +82,7 @@ impl GCWorkScheduler { let cur_stages = open_stages.clone(); work_buckets[stage].set_open_condition( move |scheduler: &GCWorkScheduler| { - let should_open = scheduler.are_buckets_drained(&cur_stages); - // Additional check before the `RefClosure` bucket opens. - if should_open && stage == LAST_CLOSURE_BUCKET { - if let Some(closure_end) = - scheduler.closure_end.lock().unwrap().as_ref() - { - if closure_end() { - // Don't open `LAST_CLOSURE_BUCKET` if `closure_end` added more works to `Closure`. - return false; - } - } - } - should_open + scheduler.are_buckets_drained(&cur_stages) }, ); open_stages.push(stage); @@ -117,7 +97,6 @@ impl GCWorkScheduler { worker_group, coordinator_worker_shared, worker_monitor, - closure_end: Mutex::new(None), pending_coordinator_packets: AtomicUsize::new(0), affinity, }) @@ -204,10 +183,6 @@ impl GCWorkScheduler { self.work_buckets[WorkBucketStage::PhantomRefClosure] .add(PhantomRefProcessing::::new()); - // VM-specific weak ref processing - self.work_buckets[WorkBucketStage::WeakRefClosure] - .add(VMProcessWeakRefs::::new()); - use crate::util::reference_processor::RefForwarding; if plan.constraints().needs_forward_after_liveness { self.work_buckets[WorkBucketStage::RefForwarding] @@ -230,6 +205,38 @@ impl GCWorkScheduler { .add(ForwardFinalization::::new()); } } + + // We add the VM-specific weak ref processing work regardless of MMTK-side options, + // including Options::no_finalizer and Options::no_reference_types. + // + // VMs need weak reference handling to function properly. The VM may treat weak references + // as strong references, but it is not appropriate to simply disable weak reference + // handling from MMTk's side. The VM, however, may choose to do nothing in + // `Collection::process_weak_refs` if appropriate. + // + // It is also not sound for MMTk core to turn off weak + // reference processing or finalization alone, because (1) not all VMs have the notion of + // weak references or finalizers, so it may not make sence, and (2) the VM may + // processing them together. + + // VM-specific weak ref processing + // The `VMProcessWeakRefs` work packet is set as the sentinel so that it is executed when + // the `VMRefClosure` bucket is drained. The VM binding may spawn new work packets into + // the `VMRefClosure` bucket, and request another `VMProcessWeakRefs` work packet to be + // executed again after this bucket is drained again. Strictly speaking, the first + // `VMProcessWeakRefs` packet can be an ordinary packet (doesn't have to be a sentinel) + // because there are no other packets in the bucket. We set it as sentinel for + // consistency. + self.work_buckets[WorkBucketStage::VMRefClosure] + .set_sentinel(Box::new(VMProcessWeakRefs::::new())); + + if plan.constraints().needs_forward_after_liveness { + // VM-specific weak ref forwarding + self.work_buckets[WorkBucketStage::VMRefForwarding] + .add(VMForwardWeakRefs::::new()); + } + + self.work_buckets[WorkBucketStage::Release].add(VMPostForwarding::::default()); } fn are_buckets_drained(&self, buckets: &[WorkBucketStage]) -> bool { @@ -240,14 +247,22 @@ impl GCWorkScheduler { buckets.iter().all(|&b| self.work_buckets[b].is_drained()) } - pub fn on_closure_end(&self, f: Box bool>) { - *self.closure_end.lock().unwrap() = Some(f); - } - pub fn all_buckets_empty(&self) -> bool { self.work_buckets.values().all(|bucket| bucket.is_empty()) } + /// Schedule "sentinel" work packets for all activated buckets. + fn schedule_sentinels(&self) -> bool { + let mut new_packets = false; + for (id, work_bucket) in self.work_buckets.iter() { + if work_bucket.is_activated() && work_bucket.maybe_schedule_sentinel() { + trace!("Scheduled sentinel packet into {:?}", id); + new_packets = true; + } + } + new_packets + } + /// Open buckets if their conditions are met. /// /// This function should only be called after all the workers are parked. @@ -267,8 +282,15 @@ impl GCWorkScheduler { buckets_updated = buckets_updated || bucket_opened; if bucket_opened { new_packets = new_packets || !bucket.is_drained(); - // Quit the loop. There'are already new packets in the newly opened buckets. if new_packets { + // Quit the loop. There are already new packets in the newly opened buckets. + trace!("Found new packets at stage {:?}. Break.", id); + break; + } + new_packets = new_packets || bucket.maybe_schedule_sentinel(); + if new_packets { + // Quit the loop. A sentinel packet is added to the newly opened buckets. + trace!("Sentinel is scheduled at stage {:?}. Break.", id); break; } } @@ -389,15 +411,15 @@ impl GCWorkScheduler { fn poll_slow(&self, worker: &GCWorker) -> Box> { // Note: The lock is released during `wait` in the loop. let mut guard = self.worker_monitor.0.lock().unwrap(); - loop { + 'polling_loop: loop { // Retry polling if let Some(work) = self.poll_schedulable_work(worker) { return work; } // Prepare to park this worker - let all_parked = self.worker_group.inc_parked_workers(); + let parking_guard = ParkingGuard::new(self.worker_group.as_ref()); // If all workers are parked, try activate new buckets - if all_parked { + if parking_guard.all_parked() { // If there're any designated work, resume the workers and process them if self.worker_group.has_designated_work() { assert!( @@ -407,19 +429,15 @@ impl GCWorkScheduler { self.worker_monitor.1.notify_all(); // The current worker is going to wait, because the designated work is not for it. } else if self.pending_coordinator_packets.load(Ordering::SeqCst) == 0 { + // See if any bucket has a sentinel. + if self.schedule_sentinels() { + // We're not going to sleep since new work packets are just scheduled. + break 'polling_loop; + } + // Try to open new buckets. if self.update_buckets() { // We're not going to sleep since a new bucket is just open. - self.worker_group.dec_parked_workers(); - // We guarantee that we can at least fetch one packet. - let work = self.poll_schedulable_work(worker).unwrap(); - // Optimize for the case that a newly opened bucket only has one packet. - // We only notify_all if there're more than one packets available. - if !self.all_activated_buckets_are_empty() { - // Have more jobs in this buckets. Notify other workers. - self.worker_monitor.1.notify_all(); - } - // Return this packet and execute it. - return work; + break 'polling_loop; } debug_assert!(!self.worker_group.has_designated_work()); // The current pause is finished if we can't open more buckets. @@ -432,9 +450,19 @@ impl GCWorkScheduler { } // Wait guard = self.worker_monitor.1.wait(guard).unwrap(); - // Unpark this worker - self.worker_group.dec_parked_workers(); + // The worker is unparked here where `parking_guard` goes out of scope. + } + + // We guarantee that we can at least fetch one packet when we reach here. + let work = self.poll_schedulable_work(worker).unwrap(); + // Optimize for the case that a newly opened bucket only has one packet. + // We only notify_all if there're more than one packets available. + if !self.all_activated_buckets_are_empty() { + // Have more jobs in this buckets. Notify other workers. + self.worker_monitor.1.notify_all(); } + // Return this packet and execute it. + work } pub fn enable_stat(&self) { diff --git a/src/scheduler/work_bucket.rs b/src/scheduler/work_bucket.rs index 078c6a94ab..e99ac4c61e 100644 --- a/src/scheduler/work_bucket.rs +++ b/src/scheduler/work_bucket.rs @@ -49,6 +49,18 @@ pub struct WorkBucket { prioritized_queue: Option>, monitor: Arc<(Mutex<()>, Condvar)>, can_open: Option) -> bool) + Send>>, + /// After this bucket is activated and all pending work packets (including the packets in this + /// bucket) are drained, this work packet, if exists, will be added to this bucket. When this + /// happens, it will prevent opening subsequent work packets. + /// + /// The sentinel work packet may set another work packet as the new sentinel which will be + /// added to this bucket again after all pending work packets are drained. This may happend + /// again and again, causing the GC to stay at the same stage and drain work packets in a loop. + /// + /// This is useful for handling weak references that may expand the transitive closure + /// recursively, such as ephemerons and Java-style SoftReference and finalizers. Sentinels + /// can be used repeatedly to discover and process more such objects. + sentinel: Mutex>>>, group: Arc>, } @@ -64,6 +76,7 @@ impl WorkBucket { prioritized_queue: None, monitor, can_open: None, + sentinel: Mutex::new(None), group, } } @@ -192,6 +205,16 @@ impl WorkBucket { self.can_open = Some(Box::new(pred)); } + pub fn set_sentinel(&self, new_sentinel: Box>) { + let mut sentinel = self.sentinel.lock().unwrap(); + *sentinel = Some(new_sentinel); + } + + pub fn has_sentinel(&self) -> bool { + let sentinel = self.sentinel.lock().unwrap(); + sentinel.is_some() + } + #[inline(always)] pub fn update(&self, scheduler: &GCWorkScheduler) -> bool { if let Some(can_open) = self.can_open.as_ref() { @@ -202,23 +225,81 @@ impl WorkBucket { } false } + + pub fn maybe_schedule_sentinel(&self) -> bool { + debug_assert!( + self.is_activated(), + "Attempted to schedule sentinel work while bucket is not open" + ); + let maybe_sentinel = { + let mut sentinel = self.sentinel.lock().unwrap(); + sentinel.take() + }; + if let Some(work) = maybe_sentinel { + // We cannot call `self.add` now, because: + // 1. The current function is called only when all workers parked, and we are holding + // the monitor lock. `self.add` also needs that lock to notify other workers. + // Trying to lock it again will result in deadlock. + // 2. After this function returns, the current worker will check if there is pending + // work immediately, and notify other workers. + // So we can just "sneak" the sentinel work packet into the current bucket now. + self.queue.push(work); + true + } else { + false + } + } } #[derive(Debug, Enum, Copy, Clone, Eq, PartialEq)] pub enum WorkBucketStage { + /// This bucket is always open. Unconstrained, + /// Preparation work. Plans, spaces, GC workers, mutators, etc. should be prepared for GC at + /// this stage. Prepare, + /// Compute the transtive closure following only strong references. Closure, + /// Handle Java-style soft references, and potentially expand the transitive closure. SoftRefClosure, + /// Handle Java-style weak references. WeakRefClosure, + /// Resurrect Java-style finalizable objects, and potentially expand the transitive closure. FinalRefClosure, + /// Handle Java-style phantom references. PhantomRefClosure, + /// Let the VM handle VM-specific weak data structures, including weak references, weak + /// collections, table of finalizable objects, ephemerons, etc. Potentially expand the + /// transitive closure. + /// + /// NOTE: This stage is intended to replace the Java-specific weak reference handling stages + /// above. + VMRefClosure, + /// Compute the forwarding addresses of objects (mark-compact-only). CalculateForwarding, + /// Scan roots again to initiate another transitive closure to update roots and reference + /// after computing the forwarding addresses (mark-compact-only). SecondRoots, + /// Update Java-style weak references after computing forwarding addresses (mark-compact-only). + /// + /// NOTE: This stage should be updated to adapt to the VM-side reference handling. It shall + /// be kept after removing `{Soft,Weak,Final,Phantom}RefClosure`. RefForwarding, + /// Update the list of Java-style finalization cadidates and finalizable objects after + /// computing forwarding addresses (mark-compact-only). FinalizableForwarding, + /// Let the VM handle the forwarding of reference fields in any VM-specific weak data + /// structures, including weak references, weak collections, table of finalizable objects, + /// ephemerons, etc., after computing forwarding addresses (mark-compact-only). + /// + /// NOTE: This stage is intended to replace Java-specific forwarding phases above. + VMRefForwarding, + /// Compact objects (mark-compact-only). Compact, + /// Work packets that should be done just before GC shall go here. This includes releasing + /// resources and setting states in plans, spaces, GC workers, mutators, etc. Release, + /// Resume mutators and end GC. Final, } @@ -228,5 +309,3 @@ impl WorkBucketStage { WorkBucketStage::from_usize(1) } } - -pub const LAST_CLOSURE_BUCKET: WorkBucketStage = WorkBucketStage::PhantomRefClosure; diff --git a/src/scheduler/worker.rs b/src/scheduler/worker.rs index cfb12faf0e..109914aff6 100644 --- a/src/scheduler/worker.rs +++ b/src/scheduler/worker.rs @@ -294,3 +294,29 @@ impl WorkerGroup { .any(|w| !w.designated_work.is_empty()) } } + +/// This ensures the worker always decrements the parked worker count on all control flow paths. +pub(crate) struct ParkingGuard<'a, VM: VMBinding> { + worker_group: &'a WorkerGroup, + all_parked: bool, +} + +impl<'a, VM: VMBinding> ParkingGuard<'a, VM> { + pub fn new(worker_group: &'a WorkerGroup) -> Self { + let all_parked = worker_group.inc_parked_workers(); + ParkingGuard { + worker_group, + all_parked, + } + } + + pub fn all_parked(&self) -> bool { + self.all_parked + } +} + +impl<'a, VM: VMBinding> Drop for ParkingGuard<'a, VM> { + fn drop(&mut self) { + self.worker_group.dec_parked_workers(); + } +} diff --git a/src/vm/collection.rs b/src/vm/collection.rs index 686acb4e4c..8ff08bcbd7 100644 --- a/src/vm/collection.rs +++ b/src/vm/collection.rs @@ -98,9 +98,26 @@ pub trait Collection { /// * `tls`: The thread pointer for the current GC thread. fn schedule_finalization(_tls: VMWorkerThread) {} - /// Inform the VM to do its VM-specific release work at the end of a GC. - fn vm_release() {} - - /// Delegate to the VM binding for reference processing. - fn process_weak_refs(_worker: &mut GCWorker) {} // FIXME: Add an appropriate factory/callback parameter. + /// A hook for the VM to do work after forwarding objects. + /// + /// This function is called after all of the following have finished: + /// - The life and death of objects are determined. Objects determined to be live will not + /// be reclaimed in this GC. + /// - Live objects have been moved to their destinations. (copying GC only) + /// - References in objects have been updated to point to new addresses. (copying GC only) + /// + /// And this function may run concurrently with the release work of GC, i.e. freeing the space + /// occupied by dead objects. + /// + /// It is safe for the VM to read and write object fields at this time, although GC has not + /// finished yet. GC will be reclaiming spaces of dead objects, but will not damage live + /// objects. However, the VM cannot allocate new objects at this time. + /// + /// One possible use of this hook is enqueuing `{Soft,Weak,Phantom}Reference` instances to + /// reference queues (for Java). VMs (including JVM implementations) do not have to handle + /// weak references this way, but mmtk-core provides this opportunity. + /// + /// Arguments: + /// * `tls_worker`: The thread pointer for the worker thread performing this call. + fn post_forwarding(_tls: VMWorkerThread) {} } diff --git a/src/vm/mod.rs b/src/vm/mod.rs index ff47f3d3b7..92f95e4546 100644 --- a/src/vm/mod.rs +++ b/src/vm/mod.rs @@ -32,6 +32,7 @@ pub use self::reference_glue::Finalizable; pub use self::reference_glue::ReferenceGlue; pub use self::scanning::EdgeVisitor; pub use self::scanning::ObjectTracer; +pub use self::scanning::ObjectTracerContext; pub use self::scanning::RootsWorkFactory; pub use self::scanning::Scanning; diff --git a/src/vm/scanning.rs b/src/vm/scanning.rs index 4c55db608e..49a3b4eed4 100644 --- a/src/vm/scanning.rs +++ b/src/vm/scanning.rs @@ -1,4 +1,5 @@ use crate::plan::Mutator; +use crate::scheduler::GCWorker; use crate::util::ObjectReference; use crate::util::VMWorkerThread; use crate::vm::edge_shape::Edge; @@ -21,6 +22,9 @@ impl EdgeVisitor for F { pub trait ObjectTracer { /// Call this function for the content of each edge, /// and assign the returned value back to the edge. + /// + /// Note: This function is performance-critical. + /// Implementations should consider inlining if necessary. fn trace_object(&mut self, object: ObjectReference) -> ObjectReference; } @@ -31,6 +35,44 @@ impl ObjectReference> ObjectTracer for F { } } +/// An `ObjectTracerContext` gives a GC worker temporary access to an `ObjectTracer`, allowing +/// the GC worker to trace objects. This trait is intended to abstract out the implementation +/// details of tracing objects, enqueuing objects, and creating work packets that expand the +/// transitive closure, allowing the VM binding to focus on VM-specific parts. +/// +/// This trait is used during root scanning and binding-side weak reference processing. +pub trait ObjectTracerContext: Clone + Send + 'static { + /// The concrete `ObjectTracer` type. + /// + /// FIXME: The current code works because of the unsafe method `ProcessEdgesWork::set_worker`. + /// The tracer should borrow the worker passed to `with_queuing_tracer` during its lifetime. + /// For this reason, `TracerType` should have a `<'w>` lifetime parameter. + /// Generic Associated Types (GAT) is already stablized in Rust 1.65. + /// We should update our toolchain version, too. + type TracerType: ObjectTracer; + + /// Create a temporary `ObjectTracer` and provide access in the scope of `func`. + /// + /// When the `ObjectTracer::trace_object` is called, if the traced object is first visited + /// in this transitive closure, it will be enqueued. After `func` returns, the implememtation + /// will create work packets to continue computing the transitive closure from the newly + /// enqueued objects. + /// + /// API functions that provide `QueuingTracerFactory` should document + /// 1. on which fields the user is supposed to call `ObjectTracer::trace_object`, and + /// 2. which work bucket the generated work packet will be added to. Sometimes the user needs + /// to know when the computing of transitive closure finishes. + /// + /// Arguments: + /// - `worker`: The current GC worker. + /// - `func`: A caller-supplied closure in which the created `ObjectTracer` can be used. + /// + /// Returns: The return value of `func`. + fn with_tracer(&self, worker: &mut GCWorker, func: F) -> R + where + F: FnOnce(&mut Self::TracerType) -> R; +} + /// Root-scanning methods use this trait to create work packets for processing roots. /// /// Notes on the required traits: @@ -179,4 +221,70 @@ pub trait Scanning { fn supports_return_barrier() -> bool; fn prepare_for_roots_re_scanning(); + + /// Process weak references. + /// + /// This function is called after a transitive closure is completed. + /// + /// MMTk core enables the VM binding to do the following in this function: + /// + /// 1. Query if an object is already reached in this transitive closure. + /// 2. Keep certain objects and their descendents alive. + /// 3. Get the new address of objects that are either + /// - already alive before this function is called, or + /// - explicitly kept alive in this function. + /// 4. Request this function to be called again after transitive closure is finished again. + /// + /// The VM binding can call `ObjectReference::is_reachable()` to query if an object is + /// currently reached. + /// + /// The VM binding can use `tracer_factory` to get access to an `ObjectTracer`, and call + /// its `trace_object(object)` method to keep `object` and its decendents alive. + /// + /// The return value of `ObjectTracer::trace_object(object)` is the new address of the given + /// `object` if it is moved by the GC. + /// + /// The VM binding can return `true` from `process_weak_refs` to request `process_weak_refs` + /// to be called again after the MMTk core finishes transitive closure again from the objects + /// newly visited by `ObjectTracer::trace_object`. This is useful if a VM supports multiple + /// levels of reachabilities (such as Java) or ephemerons. + /// + /// Implementation-wise, this function is called as the "sentinel" of the `VMRefClosure` work + /// bucket, which means it is called when all work packets in that bucket have finished. The + /// `tracer_factory` expands the transitive closure by adding more work packets in the same + /// bucket. This means if `process_weak_refs` returns true, those work packets will have + /// finished (completing the transitive closure) by the time `process_weak_refs` is called + /// again. The VM binding can make use of this by adding custom work packets into the + /// `VMRefClosure` bucket. The bucket will be `VMRefForwarding`, instead, when forwarding. + /// See below. + /// + /// Arguments: + /// * `worker`: The current GC worker. + /// * `tracer_context`: Use this to get access an `ObjectTracer` and use it to retain and + /// update weak references. + /// + /// This function shall return true if this function needs to be called again after the GC + /// finishes expanding the transitive closure from the objects kept alive. + fn process_weak_refs( + _worker: &mut GCWorker, + _tracer_context: impl ObjectTracerContext, + ) -> bool { + false + } + + /// Forward weak references. + /// + /// This function will only be called in the forwarding stage when using the mark-compact GC + /// algorithm. Mark-compact computes transive closure twice during each GC. It marks objects + /// in the first transitive closure, and forward references in the second transitive closure. + /// + /// Arguments: + /// * `worker`: The current GC worker. + /// * `tracer_context`: Use this to get access an `ObjectTracer` and use it to update weak + /// references. + fn forward_weak_refs( + _worker: &mut GCWorker, + _tracer_context: impl ObjectTracerContext, + ) { + } }