Skip to content
34 changes: 25 additions & 9 deletions src/active_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ pub(crate) struct ActiveQuery {
/// Inputs: Set of subqueries that were accessed thus far.
/// Outputs: Tracks values written by this query. Could be...
///
/// * tracked structs created
/// * invocations of `specify`
/// * accumulators pushed to
input_outputs: FxIndexSet<QueryEdge>,
Expand Down Expand Up @@ -77,10 +76,14 @@ impl ActiveQuery {
untracked_read: bool,
) {
assert!(self.input_outputs.is_empty());

self.input_outputs.extend(edges.iter().cloned());
self.durability = self.durability.min(durability);
self.changed_at = self.changed_at.max(changed_at);
self.untracked_read |= untracked_read;

// Mark all tracked structs from the previous iteration as active.
self.tracked_struct_ids.mark_all_active();
}

pub(super) fn add_read(
Expand Down Expand Up @@ -139,10 +142,6 @@ impl ActiveQuery {
}

/// True if the given key was output by this query.
pub(super) fn is_output(&self, key: DatabaseKeyIndex) -> bool {
self.input_outputs.contains(&QueryEdge::output(key))
}

pub(super) fn disambiguate(&mut self, key: IdentityHash) -> Disambiguator {
self.disambiguator_map.disambiguate(key)
}
Expand Down Expand Up @@ -186,7 +185,7 @@ impl ActiveQuery {
}
}

fn top_into_revisions(&mut self) -> QueryRevisions {
fn top_into_revisions(&mut self) -> CompletedQuery {
let &mut Self {
database_key_index: _,
durability,
Expand All @@ -213,22 +212,29 @@ impl ActiveQuery {
#[cfg(feature = "accumulator")]
let accumulated_inputs = AtomicInputAccumulatedValues::new(accumulated_inputs);
let verified_final = cycle_heads.is_empty();
let (active_tracked_structs, stale_tracked_structs) = tracked_struct_ids.drain();

let extra = QueryRevisionsExtra::new(
#[cfg(feature = "accumulator")]
mem::take(accumulated),
mem::take(tracked_struct_ids),
active_tracked_structs,
mem::take(cycle_heads),
iteration_count,
);

QueryRevisions {
let revisions = QueryRevisions {
changed_at,
durability,
origin,
#[cfg(feature = "accumulator")]
accumulated_inputs,
verified_final: AtomicBool::new(verified_final),
extra,
};

CompletedQuery {
revisions,
stale_tracked_structs,
}
}

Expand Down Expand Up @@ -370,7 +376,7 @@ impl QueryStack {
&mut self,
key: DatabaseKeyIndex,
#[cfg(debug_assertions)] push_len: usize,
) -> QueryRevisions {
) -> CompletedQuery {
#[cfg(debug_assertions)]
assert_eq!(push_len, self.len(), "unbalanced push/pop");
debug_assert_ne!(self.len, 0, "too many pops");
Expand All @@ -395,6 +401,16 @@ impl QueryStack {
}
}

/// The state of a completed query.
pub(crate) struct CompletedQuery {
/// Inputs and outputs accumulated during query execution.
pub(crate) revisions: QueryRevisions,

/// The keys of any tracked structs that were created in a previous execution of the
/// query but not the current one, and should be marked as stale.
pub(crate) stale_tracked_structs: Vec<DatabaseKeyIndex>,
}

struct CapturedQuery {
database_key_index: DatabaseKeyIndex,
durability: Durability,
Expand Down
56 changes: 22 additions & 34 deletions src/function/diff_outputs.rs
Original file line number Diff line number Diff line change
@@ -1,63 +1,50 @@
use crate::active_query::CompletedQuery;
use crate::function::memo::Memo;
use crate::function::{Configuration, IngredientImpl};
use crate::hash::FxIndexSet;
use crate::zalsa::Zalsa;
use crate::zalsa_local::{output_edges, QueryOriginRef, QueryRevisions};
use crate::{DatabaseKeyIndex, Event, EventKind, Id};
use crate::zalsa_local::{output_edges, QueryOriginRef};
use crate::{DatabaseKeyIndex, Event, EventKind};

impl<C> IngredientImpl<C>
where
C: Configuration,
{
/// Compute the old and new outputs and invoke `remove_stale_output`
/// for each output that was generated before but is not generated now.
///
/// This function takes a `&mut` reference to `revisions` to remove outputs
/// that no longer exist in this revision from [`QueryRevisions::tracked_struct_ids`].
/// Compute the old and new outputs and invoke `remove_stale_output` for each output that
/// was generated before but is not generated now.
pub(super) fn diff_outputs(
&self,
zalsa: &Zalsa,
key: DatabaseKeyIndex,
old_memo: &Memo<'_, C>,
revisions: &mut QueryRevisions,
completed_query: &CompletedQuery,
) {
let (QueryOriginRef::Derived(edges) | QueryOriginRef::DerivedUntracked(edges)) =
old_memo.revisions.origin.as_ref()
else {
return;
};
// Iterate over the outputs of the `old_memo` and put them into a hashset
//
// Ignore key_generation here, because we use the same tracked struct allocation for
// all generations with the same key_index and can't report it as stale
let mut old_outputs: FxIndexSet<_> = output_edges(edges)
.map(|a| (a.ingredient_index(), a.key_index().index()))
.collect();

if old_outputs.is_empty() {
return;
// Note that tracked structs are not stored as direct query outputs, but they are still outputs
// that need to be reported as stale.
for output in &completed_query.stale_tracked_structs {
Self::report_stale_output(zalsa, key, *output);
}

// Iterate over the outputs of the current query
// and remove elements from `old_outputs` when we find them
for new_output in revisions.origin.as_ref().outputs() {
old_outputs.swap_remove(&(
new_output.ingredient_index(),
new_output.key_index().index(),
));
let mut stale_outputs = output_edges(edges).collect::<FxIndexSet<_>>();

if stale_outputs.is_empty() {
return;
}

// Remove the outputs that are no longer present in the current revision
// to prevent that the next revision is seeded with an id mapping that no longer exists.
if let Some(tracked_struct_ids) = revisions.tracked_struct_ids_mut() {
tracked_struct_ids
.retain(|(k, value)| !old_outputs.contains(&(k.ingredient_index(), value.index())));
};
// Preserve any outputs that were recreated in the current revision.
for new_output in completed_query.revisions.origin.as_ref().outputs() {
stale_outputs.swap_remove(&new_output);
}

for (ingredient_index, key_index) in old_outputs {
// SAFETY: key_index acquired from valid output
let id = unsafe { Id::from_index(key_index) };
Self::report_stale_output(zalsa, key, DatabaseKeyIndex::new(ingredient_index, id));
// Any outputs that were created in a previous revision but not the current one are stale.
for output in stale_outputs {
Self::report_stale_output(zalsa, key, output);
}
}

Expand All @@ -68,6 +55,7 @@ where
output_key: output,
})
});

output.remove_stale_output(zalsa, key);
}
}
64 changes: 43 additions & 21 deletions src/function/execute.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
use crate::active_query::CompletedQuery;
use crate::cycle::{CycleRecoveryStrategy, IterationCount};
use crate::function::memo::Memo;
use crate::function::{Configuration, IngredientImpl};
use crate::sync::atomic::{AtomicBool, Ordering};
use crate::zalsa::{MemoIngredientIndex, Zalsa, ZalsaDatabase};
use crate::zalsa_local::{ActiveQueryGuard, QueryRevisions};
use crate::zalsa_local::ActiveQueryGuard;
use crate::{Event, EventKind, Id};

impl<C> IngredientImpl<C>
Expand Down Expand Up @@ -39,15 +40,15 @@ where
});
let memo_ingredient_index = self.memo_ingredient_index(zalsa, id);

let (new_value, mut revisions) = match C::CYCLE_STRATEGY {
let (new_value, mut completed_query) = match C::CYCLE_STRATEGY {
CycleRecoveryStrategy::Panic => {
Self::execute_query(db, zalsa, active_query, opt_old_memo, id)
}
CycleRecoveryStrategy::FallbackImmediate => {
let (mut new_value, mut revisions) =
let (mut new_value, mut completed_query) =
Self::execute_query(db, zalsa, active_query, opt_old_memo, id);

if let Some(cycle_heads) = revisions.cycle_heads_mut() {
if let Some(cycle_heads) = completed_query.revisions.cycle_heads_mut() {
// Did the new result we got depend on our own provisional value, in a cycle?
if cycle_heads.contains(&database_key_index) {
// Ignore the computed value, leave the fallback value there.
Expand All @@ -73,14 +74,14 @@ where
.zalsa_local()
.push_query(database_key_index, IterationCount::initial());
new_value = C::cycle_initial(db, C::id_to_input(zalsa, id));
revisions = active_query.pop();
completed_query = active_query.pop();
// We need to set `cycle_heads` and `verified_final` because it needs to propagate to the callers.
// When verifying this, we will see we have fallback and mark ourselves verified.
revisions.set_cycle_heads(cycle_heads);
revisions.verified_final = AtomicBool::new(false);
completed_query.revisions.set_cycle_heads(cycle_heads);
completed_query.revisions.verified_final = AtomicBool::new(false);
}

(new_value, revisions)
(new_value, completed_query)
}
CycleRecoveryStrategy::Fixpoint => self.execute_maybe_iterate(
db,
Expand All @@ -97,16 +98,25 @@ where
// really change, even if some of its inputs have. So we can
// "backdate" its `changed_at` revision to be the same as the
// old value.
self.backdate_if_appropriate(old_memo, database_key_index, &mut revisions, &new_value);
self.backdate_if_appropriate(
old_memo,
database_key_index,
&mut completed_query.revisions,
&new_value,
);

// Diff the new outputs with the old, to discard any no-longer-emitted
// outputs and update the tracked struct IDs for seeding the next revision.
self.diff_outputs(zalsa, database_key_index, old_memo, &mut revisions);
self.diff_outputs(zalsa, database_key_index, old_memo, &completed_query);
}
self.insert_memo(
zalsa,
id,
Memo::new(Some(new_value), zalsa.current_revision(), revisions),
Memo::new(
Some(new_value),
zalsa.current_revision(),
completed_query.revisions,
),
memo_ingredient_index,
)
}
Expand All @@ -120,7 +130,7 @@ where
zalsa: &'db Zalsa,
id: Id,
memo_ingredient_index: MemoIngredientIndex,
) -> (C::Output<'db>, QueryRevisions) {
) -> (C::Output<'db>, CompletedQuery) {
let database_key_index = active_query.database_key_index;
let mut iteration_count = IterationCount::initial();
let mut fell_back = false;
Expand All @@ -131,11 +141,12 @@ where
let mut opt_last_provisional: Option<&Memo<'db, C>> = None;
loop {
let previous_memo = opt_last_provisional.or(opt_old_memo);
let (mut new_value, mut revisions) =
let (mut new_value, mut completed_query) =
Self::execute_query(db, zalsa, active_query, previous_memo, id);

// Did the new result we got depend on our own provisional value, in a cycle?
if let Some(cycle_heads) = revisions
if let Some(cycle_heads) = completed_query
.revisions
.cycle_heads_mut()
.filter(|cycle_heads| cycle_heads.contains(&database_key_index))
{
Expand Down Expand Up @@ -211,14 +222,21 @@ where
})
});
cycle_heads.update_iteration_count(database_key_index, iteration_count);
revisions.update_iteration_count(iteration_count);
completed_query
.revisions
.update_iteration_count(iteration_count);
crate::tracing::debug!(
"{database_key_index:?}: execute: iterate again, revisions: {revisions:#?}"
"{database_key_index:?}: execute: iterate again, revisions: {revisions:#?}",
revisions = &completed_query.revisions
);
opt_last_provisional = Some(self.insert_memo(
zalsa,
id,
Memo::new(Some(new_value), zalsa.current_revision(), revisions),
Memo::new(
Some(new_value),
zalsa.current_revision(),
completed_query.revisions,
),
memo_ingredient_index,
));

Expand All @@ -235,15 +253,19 @@ where

if cycle_heads.is_empty() {
// If there are no more cycle heads, we can mark this as verified.
revisions.verified_final.store(true, Ordering::Relaxed);
completed_query
.revisions
.verified_final
.store(true, Ordering::Relaxed);
}
}

crate::tracing::debug!(
"{database_key_index:?}: execute: result.revisions = {revisions:#?}"
"{database_key_index:?}: execute: result.revisions = {revisions:#?}",
revisions = &completed_query.revisions
);

break (new_value, revisions);
break (new_value, completed_query);
}
}

Expand All @@ -254,7 +276,7 @@ where
active_query: ActiveQueryGuard<'db>,
opt_old_memo: Option<&Memo<'db, C>>,
id: Id,
) -> (C::Output<'db>, QueryRevisions) {
) -> (C::Output<'db>, CompletedQuery) {
if let Some(old_memo) = opt_old_memo {
// If we already executed this query once, then use the tracked-struct ids from the
// previous execution as the starting point for the new one.
Expand Down
14 changes: 10 additions & 4 deletions src/function/fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -297,14 +297,20 @@ where
let active_query =
zalsa_local.push_query(database_key_index, IterationCount::initial());
let fallback_value = C::cycle_initial(db, C::id_to_input(zalsa, id));
let mut revisions = active_query.pop();
revisions.set_cycle_heads(CycleHeads::initial(database_key_index));
let mut completed_query = active_query.pop();
completed_query
.revisions
.set_cycle_heads(CycleHeads::initial(database_key_index));
// We need this for `cycle_heads()` to work. We will unset this in the outer `execute()`.
*revisions.verified_final.get_mut() = false;
*completed_query.revisions.verified_final.get_mut() = false;
self.insert_memo(
zalsa,
id,
Memo::new(Some(fallback_value), zalsa.current_revision(), revisions),
Memo::new(
Some(fallback_value),
zalsa.current_revision(),
completed_query.revisions,
),
memo_ingredient_index,
)
}
Expand Down
11 changes: 9 additions & 2 deletions src/function/memo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -321,8 +321,15 @@ impl<C: Configuration> crate::table::memo::Memo for Memo<'static, C>
where
C::Output<'static>: Send + Sync + Any,
{
fn origin(&self) -> QueryOriginRef<'_> {
self.revisions.origin.as_ref()
fn remove_outputs(&self, zalsa: &Zalsa, executor: DatabaseKeyIndex) {
for stale_output in self.revisions.origin.as_ref().outputs() {
stale_output.remove_stale_output(zalsa, executor);
}

for (identity, id) in self.revisions.tracked_struct_ids().into_iter().flatten() {
let key = DatabaseKeyIndex::new(identity.ingredient_index(), *id);
key.remove_stale_output(zalsa, executor);
}
}

#[cfg(feature = "salsa_unstable")]
Expand Down
Loading