From 9c2698c229a350f93300d524675984a7661e5032 Mon Sep 17 00:00:00 2001 From: Simon Gellis Date: Thu, 23 Oct 2025 16:02:10 -0400 Subject: [PATCH] chore: use clipping on stake_delta_filter --- .../workflows/run-tests-on-push-to-main.yml | 1 + .../src/stake_delta_filter.rs | 8 ++--- modules/stake_delta_filter/src/state.rs | 21 ++++++------ modules/stake_delta_filter/src/utils.rs | 32 ++++++++++--------- 4 files changed, 30 insertions(+), 32 deletions(-) diff --git a/.github/workflows/run-tests-on-push-to-main.yml b/.github/workflows/run-tests-on-push-to-main.yml index 129c76cf..9ac03567 100644 --- a/.github/workflows/run-tests-on-push-to-main.yml +++ b/.github/workflows/run-tests-on-push-to-main.yml @@ -36,6 +36,7 @@ jobs: --package acropolis_module_mithril_snapshot_fetcher \ --package acropolis_module_snapshot_bootstrapper \ --package acropolis_module_spdd_state \ + --package acropolis_module_stake_delta_filter \ --package acropolis_module_utxo_state - name: Run Build diff --git a/modules/stake_delta_filter/src/stake_delta_filter.rs b/modules/stake_delta_filter/src/stake_delta_filter.rs index 199647b6..034fb3cb 100644 --- a/modules/stake_delta_filter/src/stake_delta_filter.rs +++ b/modules/stake_delta_filter/src/stake_delta_filter.rs @@ -86,9 +86,7 @@ impl StakeDeltaFilterParams { fn conf_enum<'a, T: Deserialize<'a>>(config: &Arc, keydef: (&str, T)) -> Result { if config.get_string(keydef.0).is_ok() { - config - .get::(keydef.0) - .or_else(|e| Err(anyhow!("cannot parse {} value: {e}", keydef.0))) + config.get::(keydef.0).map_err(|e| anyhow!("cannot parse {} value: {e}", keydef.0)) } else { Ok(keydef.1) } @@ -175,9 +173,9 @@ impl StakeDeltaFilter { block = block_info.number ); async { - let msg = process_message(&cache, &delta, &block_info, None); + let msg = process_message(&cache, delta, block_info, None); publisher - .publish(&block_info, msg) + .publish(block_info, msg) .await .unwrap_or_else(|e| error!("Publish error: {e}")) } diff --git a/modules/stake_delta_filter/src/state.rs b/modules/stake_delta_filter/src/state.rs index a45797aa..f048e6ee 100644 --- a/modules/stake_delta_filter/src/state.rs +++ b/modules/stake_delta_filter/src/state.rs @@ -84,18 +84,15 @@ impl State { msg: &TxCertificatesMessage, ) -> Result<()> { for cert in msg.certificates.iter() { - match cert { - TxCertificate::StakeRegistration(reg) => { - let ptr = ShelleyAddressPointer { - slot: block.slot, - tx_index: reg.tx_index, - cert_index: reg.cert_index, - }; - - // Sets pointer; updates max processed slot - self.pointer_cache.set_pointer(ptr, reg.stake_address.clone(), block.slot); - } - _ => (), + if let TxCertificate::StakeRegistration(reg) = cert { + let ptr = ShelleyAddressPointer { + slot: block.slot, + tx_index: reg.tx_index, + cert_index: reg.cert_index, + }; + + // Sets pointer; updates max processed slot + self.pointer_cache.set_pointer(ptr, reg.stake_address.clone(), block.slot); } } Ok(()) diff --git a/modules/stake_delta_filter/src/utils.rs b/modules/stake_delta_filter/src/utils.rs index 8ad6ae24..71842d89 100644 --- a/modules/stake_delta_filter/src/utils.rs +++ b/modules/stake_delta_filter/src/utils.rs @@ -43,10 +43,8 @@ impl PointerCache { } pub fn update_block(&mut self, blk: &BlockInfo) { - if self.conway_start_slot.is_none() { - if blk.era >= Era::Conway { - self.conway_start_slot = Some(blk.slot); - } + if self.conway_start_slot.is_none() && blk.era >= Era::Conway { + self.conway_start_slot = Some(blk.slot); } } @@ -127,7 +125,7 @@ impl PointerCache { pub fn try_save_filtered( &self, file_path: &str, - used_pointers: &Vec, + used_pointers: &[ShelleyAddressPointer], ) -> Result<()> { let mut clean_pointer_cache = PointerCache { max_slot: self.max_slot, @@ -199,19 +197,19 @@ impl Tracker { d: &AddressDelta, sa: Option<&StakeAddress>, ) { - self.occurrence.entry(p.clone()).or_insert(vec![]).push(OccurrenceInfo { + self.occurrence.entry(p.clone()).or_default().push(OccurrenceInfo { block: b.clone(), address_delta: d.clone(), stake_address: sa.cloned(), }); } - fn get_kind(v: &Vec) -> Option { + fn get_kind(v: &[OccurrenceInfo]) -> Option { let mut is_valid = false; let mut is_invalid = false; for event in v.iter() { is_valid |= event.stake_address.is_some(); - is_invalid |= !event.stake_address.is_some(); + is_invalid |= event.stake_address.is_none(); } match (is_valid, is_invalid) { (true, false) => Some(OccurrenceInfoKind::Valid), @@ -226,7 +224,7 @@ impl Tracker { let mut invalid_ptrs = 0; let mut mixed_ptrs = 0; for (_k, v) in self.occurrence.iter() { - if let Some(kind) = Self::get_kind(&v) { + if let Some(kind) = Self::get_kind(v) { match kind { OccurrenceInfoKind::Valid => valid_ptrs += 1, OccurrenceInfoKind::Invalid => invalid_ptrs += 1, @@ -243,7 +241,7 @@ impl Tracker { } fn join_hash_set(hs: HashSet, mid: &str) -> String { - let v = Vec::from_iter(hs.into_iter()); + let v = Vec::from_iter(hs); v.join(mid) } @@ -360,19 +358,23 @@ pub fn process_message( match cache.decode_pointer(ptr) { None => { tracing::warn!("Pointer {ptr:?} is not registered in cache"); - tracker.as_mut().map(|t| t.track(ptr, block, &d, None)); + if let Some(t) = tracker.as_mut() { + t.track(ptr, block, d, None) + } continue; } Some(None) => { - tracker.as_mut().map(|t| t.track(ptr, block, &d, None)); + if let Some(t) = tracker.as_mut() { + t.track(ptr, block, d, None) + } continue; } Some(Some(ref stake_address)) => { - tracker - .as_mut() - .map(|t| t.track(ptr, block, &d, Some(stake_address))); + if let Some(t) = tracker.as_mut() { + t.track(ptr, block, d, Some(stake_address)) + } stake_address.clone() } }