Skip to content

Commit e5bd9eb

Browse files
refactor: Remove tracked structs from query outputs (#969)
* refactor: Remove tracked structs from outputs * clean up * fix `IdentityMap::is_active` * Update persistence snapshot * Perf? * Other nit * Remove Deref * Split `diff_stale_outputs` * More short circuits * Remove shrink to fit? * Undo shrink-to-fit removal * Pass CompletedQuery to diff_outputs --------- Co-authored-by: Ibraheem Ahmed <[email protected]>
1 parent a2cd1b8 commit e5bd9eb

File tree

13 files changed

+294
-187
lines changed

13 files changed

+294
-187
lines changed

src/active_query.rs

Lines changed: 25 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@ pub(crate) struct ActiveQuery {
3030
/// Inputs: Set of subqueries that were accessed thus far.
3131
/// Outputs: Tracks values written by this query. Could be...
3232
///
33-
/// * tracked structs created
3433
/// * invocations of `specify`
3534
/// * accumulators pushed to
3635
input_outputs: FxIndexSet<QueryEdge>,
@@ -77,10 +76,14 @@ impl ActiveQuery {
7776
untracked_read: bool,
7877
) {
7978
assert!(self.input_outputs.is_empty());
79+
8080
self.input_outputs.extend(edges.iter().cloned());
8181
self.durability = self.durability.min(durability);
8282
self.changed_at = self.changed_at.max(changed_at);
8383
self.untracked_read |= untracked_read;
84+
85+
// Mark all tracked structs from the previous iteration as active.
86+
self.tracked_struct_ids.mark_all_active();
8487
}
8588

8689
pub(super) fn add_read(
@@ -139,10 +142,6 @@ impl ActiveQuery {
139142
}
140143

141144
/// True if the given key was output by this query.
142-
pub(super) fn is_output(&self, key: DatabaseKeyIndex) -> bool {
143-
self.input_outputs.contains(&QueryEdge::output(key))
144-
}
145-
146145
pub(super) fn disambiguate(&mut self, key: IdentityHash) -> Disambiguator {
147146
self.disambiguator_map.disambiguate(key)
148147
}
@@ -186,7 +185,7 @@ impl ActiveQuery {
186185
}
187186
}
188187

189-
fn top_into_revisions(&mut self) -> QueryRevisions {
188+
fn top_into_revisions(&mut self) -> CompletedQuery {
190189
let &mut Self {
191190
database_key_index: _,
192191
durability,
@@ -213,22 +212,29 @@ impl ActiveQuery {
213212
#[cfg(feature = "accumulator")]
214213
let accumulated_inputs = AtomicInputAccumulatedValues::new(accumulated_inputs);
215214
let verified_final = cycle_heads.is_empty();
215+
let (active_tracked_structs, stale_tracked_structs) = tracked_struct_ids.drain();
216+
216217
let extra = QueryRevisionsExtra::new(
217218
#[cfg(feature = "accumulator")]
218219
mem::take(accumulated),
219-
mem::take(tracked_struct_ids),
220+
active_tracked_structs,
220221
mem::take(cycle_heads),
221222
iteration_count,
222223
);
223224

224-
QueryRevisions {
225+
let revisions = QueryRevisions {
225226
changed_at,
226227
durability,
227228
origin,
228229
#[cfg(feature = "accumulator")]
229230
accumulated_inputs,
230231
verified_final: AtomicBool::new(verified_final),
231232
extra,
233+
};
234+
235+
CompletedQuery {
236+
revisions,
237+
stale_tracked_structs,
232238
}
233239
}
234240

@@ -370,7 +376,7 @@ impl QueryStack {
370376
&mut self,
371377
key: DatabaseKeyIndex,
372378
#[cfg(debug_assertions)] push_len: usize,
373-
) -> QueryRevisions {
379+
) -> CompletedQuery {
374380
#[cfg(debug_assertions)]
375381
assert_eq!(push_len, self.len(), "unbalanced push/pop");
376382
debug_assert_ne!(self.len, 0, "too many pops");
@@ -395,6 +401,16 @@ impl QueryStack {
395401
}
396402
}
397403

404+
/// The state of a completed query.
405+
pub(crate) struct CompletedQuery {
406+
/// Inputs and outputs accumulated during query execution.
407+
pub(crate) revisions: QueryRevisions,
408+
409+
/// The keys of any tracked structs that were created in a previous execution of the
410+
/// query but not the current one, and should be marked as stale.
411+
pub(crate) stale_tracked_structs: Vec<DatabaseKeyIndex>,
412+
}
413+
398414
struct CapturedQuery {
399415
database_key_index: DatabaseKeyIndex,
400416
durability: Durability,

src/function/diff_outputs.rs

Lines changed: 22 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1,63 +1,50 @@
1+
use crate::active_query::CompletedQuery;
12
use crate::function::memo::Memo;
23
use crate::function::{Configuration, IngredientImpl};
34
use crate::hash::FxIndexSet;
45
use crate::zalsa::Zalsa;
5-
use crate::zalsa_local::{output_edges, QueryOriginRef, QueryRevisions};
6-
use crate::{DatabaseKeyIndex, Event, EventKind, Id};
6+
use crate::zalsa_local::{output_edges, QueryOriginRef};
7+
use crate::{DatabaseKeyIndex, Event, EventKind};
78

89
impl<C> IngredientImpl<C>
910
where
1011
C: Configuration,
1112
{
12-
/// Compute the old and new outputs and invoke `remove_stale_output`
13-
/// for each output that was generated before but is not generated now.
14-
///
15-
/// This function takes a `&mut` reference to `revisions` to remove outputs
16-
/// that no longer exist in this revision from [`QueryRevisions::tracked_struct_ids`].
13+
/// Compute the old and new outputs and invoke `remove_stale_output` for each output that
14+
/// was generated before but is not generated now.
1715
pub(super) fn diff_outputs(
1816
&self,
1917
zalsa: &Zalsa,
2018
key: DatabaseKeyIndex,
2119
old_memo: &Memo<'_, C>,
22-
revisions: &mut QueryRevisions,
20+
completed_query: &CompletedQuery,
2321
) {
2422
let (QueryOriginRef::Derived(edges) | QueryOriginRef::DerivedUntracked(edges)) =
2523
old_memo.revisions.origin.as_ref()
2624
else {
2725
return;
2826
};
29-
// Iterate over the outputs of the `old_memo` and put them into a hashset
30-
//
31-
// Ignore key_generation here, because we use the same tracked struct allocation for
32-
// all generations with the same key_index and can't report it as stale
33-
let mut old_outputs: FxIndexSet<_> = output_edges(edges)
34-
.map(|a| (a.ingredient_index(), a.key_index().index()))
35-
.collect();
3627

37-
if old_outputs.is_empty() {
38-
return;
28+
// Note that tracked structs are not stored as direct query outputs, but they are still outputs
29+
// that need to be reported as stale.
30+
for output in &completed_query.stale_tracked_structs {
31+
Self::report_stale_output(zalsa, key, *output);
3932
}
4033

41-
// Iterate over the outputs of the current query
42-
// and remove elements from `old_outputs` when we find them
43-
for new_output in revisions.origin.as_ref().outputs() {
44-
old_outputs.swap_remove(&(
45-
new_output.ingredient_index(),
46-
new_output.key_index().index(),
47-
));
34+
let mut stale_outputs = output_edges(edges).collect::<FxIndexSet<_>>();
35+
36+
if stale_outputs.is_empty() {
37+
return;
4838
}
4939

50-
// Remove the outputs that are no longer present in the current revision
51-
// to prevent that the next revision is seeded with an id mapping that no longer exists.
52-
if let Some(tracked_struct_ids) = revisions.tracked_struct_ids_mut() {
53-
tracked_struct_ids
54-
.retain(|(k, value)| !old_outputs.contains(&(k.ingredient_index(), value.index())));
55-
};
40+
// Preserve any outputs that were recreated in the current revision.
41+
for new_output in completed_query.revisions.origin.as_ref().outputs() {
42+
stale_outputs.swap_remove(&new_output);
43+
}
5644

57-
for (ingredient_index, key_index) in old_outputs {
58-
// SAFETY: key_index acquired from valid output
59-
let id = unsafe { Id::from_index(key_index) };
60-
Self::report_stale_output(zalsa, key, DatabaseKeyIndex::new(ingredient_index, id));
45+
// Any outputs that were created in a previous revision but not the current one are stale.
46+
for output in stale_outputs {
47+
Self::report_stale_output(zalsa, key, output);
6148
}
6249
}
6350

@@ -68,6 +55,7 @@ where
6855
output_key: output,
6956
})
7057
});
58+
7159
output.remove_stale_output(zalsa, key);
7260
}
7361
}

src/function/execute.rs

Lines changed: 43 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
1+
use crate::active_query::CompletedQuery;
12
use crate::cycle::{CycleRecoveryStrategy, IterationCount};
23
use crate::function::memo::Memo;
34
use crate::function::{Configuration, IngredientImpl};
45
use crate::sync::atomic::{AtomicBool, Ordering};
56
use crate::zalsa::{MemoIngredientIndex, Zalsa, ZalsaDatabase};
6-
use crate::zalsa_local::{ActiveQueryGuard, QueryRevisions};
7+
use crate::zalsa_local::ActiveQueryGuard;
78
use crate::{Event, EventKind, Id};
89

910
impl<C> IngredientImpl<C>
@@ -39,15 +40,15 @@ where
3940
});
4041
let memo_ingredient_index = self.memo_ingredient_index(zalsa, id);
4142

42-
let (new_value, mut revisions) = match C::CYCLE_STRATEGY {
43+
let (new_value, mut completed_query) = match C::CYCLE_STRATEGY {
4344
CycleRecoveryStrategy::Panic => {
4445
Self::execute_query(db, zalsa, active_query, opt_old_memo, id)
4546
}
4647
CycleRecoveryStrategy::FallbackImmediate => {
47-
let (mut new_value, mut revisions) =
48+
let (mut new_value, mut completed_query) =
4849
Self::execute_query(db, zalsa, active_query, opt_old_memo, id);
4950

50-
if let Some(cycle_heads) = revisions.cycle_heads_mut() {
51+
if let Some(cycle_heads) = completed_query.revisions.cycle_heads_mut() {
5152
// Did the new result we got depend on our own provisional value, in a cycle?
5253
if cycle_heads.contains(&database_key_index) {
5354
// Ignore the computed value, leave the fallback value there.
@@ -73,14 +74,14 @@ where
7374
.zalsa_local()
7475
.push_query(database_key_index, IterationCount::initial());
7576
new_value = C::cycle_initial(db, C::id_to_input(zalsa, id));
76-
revisions = active_query.pop();
77+
completed_query = active_query.pop();
7778
// We need to set `cycle_heads` and `verified_final` because it needs to propagate to the callers.
7879
// When verifying this, we will see we have fallback and mark ourselves verified.
79-
revisions.set_cycle_heads(cycle_heads);
80-
revisions.verified_final = AtomicBool::new(false);
80+
completed_query.revisions.set_cycle_heads(cycle_heads);
81+
completed_query.revisions.verified_final = AtomicBool::new(false);
8182
}
8283

83-
(new_value, revisions)
84+
(new_value, completed_query)
8485
}
8586
CycleRecoveryStrategy::Fixpoint => self.execute_maybe_iterate(
8687
db,
@@ -97,16 +98,25 @@ where
9798
// really change, even if some of its inputs have. So we can
9899
// "backdate" its `changed_at` revision to be the same as the
99100
// old value.
100-
self.backdate_if_appropriate(old_memo, database_key_index, &mut revisions, &new_value);
101+
self.backdate_if_appropriate(
102+
old_memo,
103+
database_key_index,
104+
&mut completed_query.revisions,
105+
&new_value,
106+
);
101107

102108
// Diff the new outputs with the old, to discard any no-longer-emitted
103109
// outputs and update the tracked struct IDs for seeding the next revision.
104-
self.diff_outputs(zalsa, database_key_index, old_memo, &mut revisions);
110+
self.diff_outputs(zalsa, database_key_index, old_memo, &completed_query);
105111
}
106112
self.insert_memo(
107113
zalsa,
108114
id,
109-
Memo::new(Some(new_value), zalsa.current_revision(), revisions),
115+
Memo::new(
116+
Some(new_value),
117+
zalsa.current_revision(),
118+
completed_query.revisions,
119+
),
110120
memo_ingredient_index,
111121
)
112122
}
@@ -120,7 +130,7 @@ where
120130
zalsa: &'db Zalsa,
121131
id: Id,
122132
memo_ingredient_index: MemoIngredientIndex,
123-
) -> (C::Output<'db>, QueryRevisions) {
133+
) -> (C::Output<'db>, CompletedQuery) {
124134
let database_key_index = active_query.database_key_index;
125135
let mut iteration_count = IterationCount::initial();
126136
let mut fell_back = false;
@@ -131,11 +141,12 @@ where
131141
let mut opt_last_provisional: Option<&Memo<'db, C>> = None;
132142
loop {
133143
let previous_memo = opt_last_provisional.or(opt_old_memo);
134-
let (mut new_value, mut revisions) =
144+
let (mut new_value, mut completed_query) =
135145
Self::execute_query(db, zalsa, active_query, previous_memo, id);
136146

137147
// Did the new result we got depend on our own provisional value, in a cycle?
138-
if let Some(cycle_heads) = revisions
148+
if let Some(cycle_heads) = completed_query
149+
.revisions
139150
.cycle_heads_mut()
140151
.filter(|cycle_heads| cycle_heads.contains(&database_key_index))
141152
{
@@ -211,14 +222,21 @@ where
211222
})
212223
});
213224
cycle_heads.update_iteration_count(database_key_index, iteration_count);
214-
revisions.update_iteration_count(iteration_count);
225+
completed_query
226+
.revisions
227+
.update_iteration_count(iteration_count);
215228
crate::tracing::debug!(
216-
"{database_key_index:?}: execute: iterate again, revisions: {revisions:#?}"
229+
"{database_key_index:?}: execute: iterate again, revisions: {revisions:#?}",
230+
revisions = &completed_query.revisions
217231
);
218232
opt_last_provisional = Some(self.insert_memo(
219233
zalsa,
220234
id,
221-
Memo::new(Some(new_value), zalsa.current_revision(), revisions),
235+
Memo::new(
236+
Some(new_value),
237+
zalsa.current_revision(),
238+
completed_query.revisions,
239+
),
222240
memo_ingredient_index,
223241
));
224242

@@ -235,15 +253,19 @@ where
235253

236254
if cycle_heads.is_empty() {
237255
// If there are no more cycle heads, we can mark this as verified.
238-
revisions.verified_final.store(true, Ordering::Relaxed);
256+
completed_query
257+
.revisions
258+
.verified_final
259+
.store(true, Ordering::Relaxed);
239260
}
240261
}
241262

242263
crate::tracing::debug!(
243-
"{database_key_index:?}: execute: result.revisions = {revisions:#?}"
264+
"{database_key_index:?}: execute: result.revisions = {revisions:#?}",
265+
revisions = &completed_query.revisions
244266
);
245267

246-
break (new_value, revisions);
268+
break (new_value, completed_query);
247269
}
248270
}
249271

@@ -254,7 +276,7 @@ where
254276
active_query: ActiveQueryGuard<'db>,
255277
opt_old_memo: Option<&Memo<'db, C>>,
256278
id: Id,
257-
) -> (C::Output<'db>, QueryRevisions) {
279+
) -> (C::Output<'db>, CompletedQuery) {
258280
if let Some(old_memo) = opt_old_memo {
259281
// If we already executed this query once, then use the tracked-struct ids from the
260282
// previous execution as the starting point for the new one.

src/function/fetch.rs

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -297,14 +297,20 @@ where
297297
let active_query =
298298
zalsa_local.push_query(database_key_index, IterationCount::initial());
299299
let fallback_value = C::cycle_initial(db, C::id_to_input(zalsa, id));
300-
let mut revisions = active_query.pop();
301-
revisions.set_cycle_heads(CycleHeads::initial(database_key_index));
300+
let mut completed_query = active_query.pop();
301+
completed_query
302+
.revisions
303+
.set_cycle_heads(CycleHeads::initial(database_key_index));
302304
// We need this for `cycle_heads()` to work. We will unset this in the outer `execute()`.
303-
*revisions.verified_final.get_mut() = false;
305+
*completed_query.revisions.verified_final.get_mut() = false;
304306
self.insert_memo(
305307
zalsa,
306308
id,
307-
Memo::new(Some(fallback_value), zalsa.current_revision(), revisions),
309+
Memo::new(
310+
Some(fallback_value),
311+
zalsa.current_revision(),
312+
completed_query.revisions,
313+
),
308314
memo_ingredient_index,
309315
)
310316
}

src/function/memo.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -321,8 +321,15 @@ impl<C: Configuration> crate::table::memo::Memo for Memo<'static, C>
321321
where
322322
C::Output<'static>: Send + Sync + Any,
323323
{
324-
fn origin(&self) -> QueryOriginRef<'_> {
325-
self.revisions.origin.as_ref()
324+
fn remove_outputs(&self, zalsa: &Zalsa, executor: DatabaseKeyIndex) {
325+
for stale_output in self.revisions.origin.as_ref().outputs() {
326+
stale_output.remove_stale_output(zalsa, executor);
327+
}
328+
329+
for (identity, id) in self.revisions.tracked_struct_ids().into_iter().flatten() {
330+
let key = DatabaseKeyIndex::new(identity.ingredient_index(), *id);
331+
key.remove_stale_output(zalsa, executor);
332+
}
326333
}
327334

328335
#[cfg(feature = "salsa_unstable")]

0 commit comments

Comments
 (0)