Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
2d4d55b
WIP: Generalise Collection::process_weak_refs
wks Nov 14, 2022
d69338d
Add VMRefClosure and VMRefForwarding stages.
wks Nov 16, 2022
5003220
Use "boss" work to repeat a stage.
wks Nov 16, 2022
eedeeea
Change name
wks Nov 16, 2022
24e5bab
Ref forwarding for mark-compact.
wks Nov 16, 2022
da8a052
Various fixes
wks Nov 17, 2022
acb9bf5
Make logging slightly less verbose.
wks Nov 17, 2022
86e64a1
Bucket updating, vm_prepare/release, tls
wks Nov 17, 2022
792c361
Fix clippy warnings and formatting
wks Nov 17, 2022
abde77a
Typo, stage, options
wks Nov 18, 2022
147d76b
Rename "boss" to "sentinel".
wks Nov 21, 2022
ebb8b95
Move forwarding and nursery into context
wks Nov 22, 2022
b79c041
Merge branch 'master' into gen-weakref-api
wks Dec 5, 2022
91acf85
Merge branch 'master' into gen-weakref-api
wks Dec 6, 2022
5aec01c
Merge branch 'master' into gen-weakref-api
wks Dec 7, 2022
ba194ba
Split context and tracer
wks Dec 9, 2022
165b3fa
Make process_weak_refs parallelizable
wks Dec 14, 2022
b6829d1
Merge branch 'master' into gen-weakref-api
wks Dec 19, 2022
d41c004
Split, rename and more code reuse.
wks Dec 21, 2022
bd18421
Remove vm_prepare
wks Dec 21, 2022
0104959
Merge branch 'master' into gen-weakref-api
wks Dec 21, 2022
a8b4204
VMForwardWeakRefs is no longer a sentinel
wks Dec 22, 2022
d09bb79
Merge branch 'master' into gen-weakref-api
wks Dec 22, 2022
ec513bd
Force inlining the trace_object function.
wks Jan 5, 2023
0e5fee1
Do not split nodes into chunk.
wks Jan 5, 2023
381d5cd
Auto flush ProcessEdgesWork when full
wks Jan 6, 2023
4d6856f
VMPostForwarding work packet
wks Jan 11, 2023
3591280
Add comments on inlining.
wks Jan 12, 2023
8f62b5c
Merge branch 'master' into gen-weakref-api
wks Jan 13, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 0 additions & 6 deletions src/memory_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -900,9 +900,3 @@ pub fn add_work_packets<VM: VMBinding>(
) {
mmtk.scheduler.work_buckets[bucket].bulk_add(packets)
}

/// Add a callback to be notified after the transitive closure is finished.
/// The callback should return true if it add more work packets to the closure bucket.
pub fn on_closure_end<VM: VMBinding>(mmtk: &'static MMTK<VM>, f: Box<dyn Send + Fn() -> bool>) {
mmtk.scheduler.on_closure_end(f)
}
15 changes: 11 additions & 4 deletions src/plan/markcompact/global.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,10 +121,6 @@ impl<VM: VMBinding> Plan for MarkCompact<VM> {
scheduler.work_buckets[WorkBucketStage::PhantomRefClosure]
.add(PhantomRefProcessing::<MarkingProcessEdges<VM>>::new());

// VM-specific weak ref processing
scheduler.work_buckets[WorkBucketStage::WeakRefClosure]
.add(VMProcessWeakRefs::<MarkingProcessEdges<VM>>::new());

use crate::util::reference_processor::RefForwarding;
scheduler.work_buckets[WorkBucketStage::RefForwarding]
.add(RefForwarding::<ForwardingProcessEdges<VM>>::new());
Expand All @@ -147,6 +143,17 @@ impl<VM: VMBinding> Plan for MarkCompact<VM> {
.add(ForwardFinalization::<ForwardingProcessEdges<VM>>::new());
}

// VM-specific weak ref processing
scheduler.work_buckets[WorkBucketStage::VMRefClosure]
.set_sentinel(Box::new(VMProcessWeakRefs::<MarkingProcessEdges<VM>>::new()));

// VM-specific weak ref forwarding
scheduler.work_buckets[WorkBucketStage::VMRefForwarding]
.add(VMForwardWeakRefs::<ForwardingProcessEdges<VM>>::new());

// VM-specific work after forwarding, possible to implement ref enququing.
scheduler.work_buckets[WorkBucketStage::Release].add(VMPostForwarding::<VM>::default());

// Analysis GC work
#[cfg(feature = "analysis")]
{
Expand Down
226 changes: 188 additions & 38 deletions src/scheduler/gc_work.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,9 @@ impl<C: GCWorkContext> Release<C> {
impl<C: GCWorkContext + 'static> GCWork<C::VM> for Release<C> {
fn do_work(&mut self, worker: &mut GCWorker<C::VM>, mmtk: &'static MMTK<C::VM>) {
trace!("Release Global");

self.plan.base().gc_trigger.policy.on_gc_release(mmtk);

<C::VM as VMBinding>::VMCollection::vm_release();
// We assume this is the only running work packet that accesses plan at the point of execution
#[allow(clippy::cast_ref_to_mut)]
let plan_mut: &mut C::PlanType = unsafe { &mut *(self.plan as *const _ as *mut _) };
Expand Down Expand Up @@ -252,24 +252,190 @@ impl<VM: VMBinding> GCWork<VM> for EndOfGC {

impl<VM: VMBinding> CoordinatorWork<VM> for EndOfGC {}

/// Delegate to the VM binding for reference processing.
/// This implements `ObjectTracer` by forwarding the `trace_object` calls to the wrapped
/// `ProcessEdgesWork` instance.
struct ProcessEdgesWorkTracer<E: ProcessEdgesWork> {
process_edges_work: E,
stage: WorkBucketStage,
}

impl<E: ProcessEdgesWork> ObjectTracer for ProcessEdgesWorkTracer<E> {
/// Forward the `trace_object` call to the underlying `ProcessEdgesWork`,
/// and flush as soon as the underlying buffer of `process_edges_work` is full.
///
/// This function is inlined because `trace_object` is probably the hottest function in MMTk.
/// If this function is called in small closures, please profile the program and make sure the
/// closure is inlined, too.
#[inline(always)]
fn trace_object(&mut self, object: ObjectReference) -> ObjectReference {
let result = self.process_edges_work.trace_object(object);
self.flush_if_full();
result
}
}

impl<E: ProcessEdgesWork> ProcessEdgesWorkTracer<E> {
#[inline(always)]
fn flush_if_full(&mut self) {
if self.process_edges_work.nodes.is_full() {
self.flush();
}
}

pub fn flush_if_not_empty(&mut self) {
if !self.process_edges_work.nodes.is_empty() {
self.flush();
}
}

#[cold]
fn flush(&mut self) {
let next_nodes = self.process_edges_work.pop_nodes();
assert!(!next_nodes.is_empty());
let work_packet = self.process_edges_work.create_scan_work(next_nodes, false);
let worker = self.process_edges_work.worker();
worker.scheduler().work_buckets[self.stage].add(work_packet);
}
}

/// This type implements `ObjectTracerContext` by creating a temporary `ProcessEdgesWork` during
/// the call to `with_tracer`, making use of its `trace_object` method. It then creates work
/// packets using the methods of the `ProcessEdgesWork` and add the work packet into the given
/// `stage`.
struct ProcessEdgesWorkTracerContext<E: ProcessEdgesWork> {
stage: WorkBucketStage,
phantom_data: PhantomData<E>,
}

impl<E: ProcessEdgesWork> Clone for ProcessEdgesWorkTracerContext<E> {
fn clone(&self) -> Self {
Self { ..*self }
}
}

impl<E: ProcessEdgesWork> ObjectTracerContext<E::VM> for ProcessEdgesWorkTracerContext<E> {
type TracerType = ProcessEdgesWorkTracer<E>;

fn with_tracer<R, F>(&self, worker: &mut GCWorker<E::VM>, func: F) -> R
where
F: FnOnce(&mut Self::TracerType) -> R,
{
let mmtk = worker.mmtk;

// Prepare the underlying ProcessEdgesWork
let mut process_edges_work = E::new(vec![], false, mmtk);
// FIXME: This line allows us to omit the borrowing lifetime of worker.
// We should refactor ProcessEdgesWork so that it uses `worker` locally, not as a member.
process_edges_work.set_worker(worker);

// Cretae the tracer.
let mut tracer = ProcessEdgesWorkTracer {
process_edges_work,
stage: self.stage,
};

// The caller can use the tracer here.
let result = func(&mut tracer);

// Flush the queued nodes.
tracer.flush_if_not_empty();

result
}
}

/// Delegate to the VM binding for weak reference processing.
///
/// Some VMs (e.g. v8) do not have a Java-like global weak reference storage, and the
/// processing of those weakrefs may be more complex. For such case, we delegate to the
/// VM binding to process weak references.
#[derive(Default)]
pub struct VMProcessWeakRefs<E: ProcessEdgesWork>(PhantomData<E>);
///
/// NOTE: This will replace `{Soft,Weak,Phantom}RefProcessing` and `Finalization` in the future.
pub struct VMProcessWeakRefs<E: ProcessEdgesWork> {
phantom_data: PhantomData<E>,
}

impl<E: ProcessEdgesWork> VMProcessWeakRefs<E> {
pub fn new() -> Self {
Self(PhantomData)
Self {
phantom_data: PhantomData,
}
}
}

impl<E: ProcessEdgesWork> GCWork<E::VM> for VMProcessWeakRefs<E> {
fn do_work(&mut self, worker: &mut GCWorker<E::VM>, _mmtk: &'static MMTK<E::VM>) {
trace!("ProcessWeakRefs");
<E::VM as VMBinding>::VMCollection::process_weak_refs(worker); // TODO: Pass a factory/callback to decide what work packet to create.
trace!("VMProcessWeakRefs");

let stage = WorkBucketStage::VMRefClosure;

let need_to_repeat = {
let tracer_factory = ProcessEdgesWorkTracerContext::<E> {
stage,
phantom_data: PhantomData,
};
<E::VM as VMBinding>::VMScanning::process_weak_refs(worker, tracer_factory)
};

if need_to_repeat {
// Schedule Self as the new sentinel so we'll call `process_weak_refs` again after the
// current transitive closure.
let new_self = Box::new(Self::new());

worker.scheduler().work_buckets[stage].set_sentinel(new_self);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand correctly, this is used for performing weak-ref tracing in a loop. You can use the sentinel packet to start the transitive closure again and again until reaching a steady state.

However, I think OpenJDK may also use the sentinel packet as a "state transition" to start a new phase for a different type of weak-ref processing. For example, OpenJDK does soft-ref processing first (which will start a round of closure), and then phantom processing later (and another round of closure). If the VM is able to set a different sentinel packet itself other than new_self , after soft-ref processing, it can set a phantom sentinel packet to start phantom processing. Is it doable?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To implement state transition, we can embed a state machine in the weak reference processor, and multiplex the operations for different states through the single Scanning::process_weak_refs API. See: https://github.com/wks/mmtk-openjdk/blob/gen-weakref-api/mmtk/src/weak_processor/mod.rs#L61

It is possible to add a way to specify the next work packet to be scheduled as the "sentinel" packet. But that may be more complicated (as it has to expose details such as "sentinel") than simply returning true and letting the function be called again.

}
}
}

/// Delegate to the VM binding for forwarding weak references.
///
/// Some VMs (e.g. v8) do not have a Java-like global weak reference storage, and the
/// processing of those weakrefs may be more complex. For such case, we delegate to the
/// VM binding to process weak references.
///
/// NOTE: This will replace `RefForwarding` and `ForwardFinalization` in the future.
pub struct VMForwardWeakRefs<E: ProcessEdgesWork> {
phantom_data: PhantomData<E>,
}

impl<E: ProcessEdgesWork> VMForwardWeakRefs<E> {
pub fn new() -> Self {
Self {
phantom_data: PhantomData,
}
}
}

impl<E: ProcessEdgesWork> GCWork<E::VM> for VMForwardWeakRefs<E> {
fn do_work(&mut self, worker: &mut GCWorker<E::VM>, _mmtk: &'static MMTK<E::VM>) {
trace!("VMForwardWeakRefs");

let stage = WorkBucketStage::VMRefForwarding;

let tracer_factory = ProcessEdgesWorkTracerContext::<E> {
stage,
phantom_data: PhantomData,
};
<E::VM as VMBinding>::VMScanning::forward_weak_refs(worker, tracer_factory)
}
}

/// This work packet calls `Collection::post_forwarding`.
///
/// NOTE: This will replace `RefEnqueue` in the future.
///
/// NOTE: Although this work packet runs in parallel with the `Release` work packet, it does not
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We cannot guarantee this. The binding may access the plan in their implementation of Collection::post_forwarding().

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed. While Collection::post_forwarding() doesn't expose the Plan to the binding, and the Plan field is declared as pub(crate), many functions in the memory_manager module access the plan instance.

The timing of RefEnqueue (now VMPostForwarding) should be after all references are forwarded. That includes the RefForwarding and the FinalizerForwarding buckets (both subsumed by the new VMRefForwarding bucket). And it doesn't need to access the MMTk instance. However, it seems impossible to prevent the binding from accessing plan in our current program structure. As long as the binding has a &'static MMTk, it can call functions in memory_manager, and indirectly access the plan.

I think what we can do is telling the VM binding not to access the plan indirectly. But even that sounds like a bad idea because we haven't exposed Plan anyway, and the binding has no way to know which API function indirectly accesses the Plan. So maybe we can provide the Collection::post_forwarding() hook like this to make sure our ref-processing code in mmtk-core can be implemented in the binding. But we refactor mmtk-core in the future to solve the "plan vs plan_mut" problem from the root so we can actually prevent the binding from accidentally accessing the plan.

/// access the `Plan` instance.
#[derive(Default)]
pub struct VMPostForwarding<VM: VMBinding> {
phantom_data: PhantomData<VM>,
}

impl<VM: VMBinding> GCWork<VM> for VMPostForwarding<VM> {
fn do_work(&mut self, worker: &mut GCWorker<VM>, _mmtk: &'static MMTK<VM>) {
trace!("VMPostForwarding start");
<VM as VMBinding>::VMCollection::post_forwarding(worker.tls);
trace!("VMPostForwarding end");
}
}

Expand Down Expand Up @@ -678,38 +844,22 @@ pub trait ScanObjectsWork<VM: VMBinding>: GCWork<VM> + Sized {

// If any object does not support edge-enqueuing, we process them now.
if !scan_later.is_empty() {
// We create an instance of E to use its `trace_object` method and its object queue.
let mut process_edges_work = Self::E::new(vec![], false, mmtk);
let mut closure = |object| process_edges_work.trace_object(object);

// Scan objects and trace their edges at the same time.
for object in scan_later.iter().copied() {
<VM as VMBinding>::VMScanning::scan_object_and_trace_edges(
tls,
object,
&mut closure,
);
self.post_scan_object(object);
}

// Create work packets to scan adjacent objects. We skip ProcessEdgesWork and create
// object-scanning packets directly, because the edges are already traced.
if !process_edges_work.nodes.is_empty() {
let next_nodes = process_edges_work.nodes.take();
let make_packet = |nodes| {
let work_packet = self.make_another(nodes);
memory_manager::add_work_packet(mmtk, WorkBucketStage::Closure, work_packet);
};

// Divide the resulting nodes into appropriately sized packets.
if next_nodes.len() <= Self::E::CAPACITY {
make_packet(next_nodes);
} else {
for chunk in next_nodes.chunks(Self::E::CAPACITY) {
make_packet(chunk.into());
}
let object_tracer_context = ProcessEdgesWorkTracerContext::<Self::E> {
stage: WorkBucketStage::Closure,
phantom_data: PhantomData,
};

object_tracer_context.with_tracer(worker, |object_tracer| {
// Scan objects and trace their edges at the same time.
for object in scan_later.iter().copied() {
<VM as VMBinding>::VMScanning::scan_object_and_trace_edges(
tls,
object,
object_tracer,
);
self.post_scan_object(object);
}
}
});
}
}
}
Expand Down
Loading