Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/run-tests-on-push-to-main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ jobs:
--package acropolis_module_mithril_snapshot_fetcher \
--package acropolis_module_snapshot_bootstrapper \
--package acropolis_module_spdd_state \
--package acropolis_module_stake_delta_filter \
--package acropolis_module_utxo_state

- name: Run Build
Expand Down
8 changes: 3 additions & 5 deletions modules/stake_delta_filter/src/stake_delta_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,7 @@ impl StakeDeltaFilterParams {

fn conf_enum<'a, T: Deserialize<'a>>(config: &Arc<Config>, keydef: (&str, T)) -> Result<T> {
if config.get_string(keydef.0).is_ok() {
config
.get::<T>(keydef.0)
.or_else(|e| Err(anyhow!("cannot parse {} value: {e}", keydef.0)))
config.get::<T>(keydef.0).map_err(|e| anyhow!("cannot parse {} value: {e}", keydef.0))
} else {
Ok(keydef.1)
}
Expand Down Expand Up @@ -175,9 +173,9 @@ impl StakeDeltaFilter {
block = block_info.number
);
async {
let msg = process_message(&cache, &delta, &block_info, None);
let msg = process_message(&cache, delta, block_info, None);
publisher
.publish(&block_info, msg)
.publish(block_info, msg)
.await
.unwrap_or_else(|e| error!("Publish error: {e}"))
}
Expand Down
21 changes: 9 additions & 12 deletions modules/stake_delta_filter/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,18 +84,15 @@ impl State {
msg: &TxCertificatesMessage,
) -> Result<()> {
for cert in msg.certificates.iter() {
match cert {
TxCertificate::StakeRegistration(reg) => {
let ptr = ShelleyAddressPointer {
slot: block.slot,
tx_index: reg.tx_index,
cert_index: reg.cert_index,
};

// Sets pointer; updates max processed slot
self.pointer_cache.set_pointer(ptr, reg.stake_address.clone(), block.slot);
}
_ => (),
if let TxCertificate::StakeRegistration(reg) = cert {
let ptr = ShelleyAddressPointer {
slot: block.slot,
tx_index: reg.tx_index,
cert_index: reg.cert_index,
};

// Sets pointer; updates max processed slot
self.pointer_cache.set_pointer(ptr, reg.stake_address.clone(), block.slot);
}
}
Ok(())
Expand Down
32 changes: 17 additions & 15 deletions modules/stake_delta_filter/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,8 @@ impl PointerCache {
}

pub fn update_block(&mut self, blk: &BlockInfo) {
if self.conway_start_slot.is_none() {
if blk.era >= Era::Conway {
self.conway_start_slot = Some(blk.slot);
}
if self.conway_start_slot.is_none() && blk.era >= Era::Conway {
self.conway_start_slot = Some(blk.slot);
}
}

Expand Down Expand Up @@ -127,7 +125,7 @@ impl PointerCache {
pub fn try_save_filtered(
&self,
file_path: &str,
used_pointers: &Vec<ShelleyAddressPointer>,
used_pointers: &[ShelleyAddressPointer],
) -> Result<()> {
let mut clean_pointer_cache = PointerCache {
max_slot: self.max_slot,
Expand Down Expand Up @@ -199,19 +197,19 @@ impl Tracker {
d: &AddressDelta,
sa: Option<&StakeAddress>,
) {
self.occurrence.entry(p.clone()).or_insert(vec![]).push(OccurrenceInfo {
self.occurrence.entry(p.clone()).or_default().push(OccurrenceInfo {
block: b.clone(),
address_delta: d.clone(),
stake_address: sa.cloned(),
});
}

fn get_kind(v: &Vec<OccurrenceInfo>) -> Option<OccurrenceInfoKind> {
fn get_kind(v: &[OccurrenceInfo]) -> Option<OccurrenceInfoKind> {
let mut is_valid = false;
let mut is_invalid = false;
for event in v.iter() {
is_valid |= event.stake_address.is_some();
is_invalid |= !event.stake_address.is_some();
is_invalid |= event.stake_address.is_none();
}
match (is_valid, is_invalid) {
(true, false) => Some(OccurrenceInfoKind::Valid),
Expand All @@ -226,7 +224,7 @@ impl Tracker {
let mut invalid_ptrs = 0;
let mut mixed_ptrs = 0;
for (_k, v) in self.occurrence.iter() {
if let Some(kind) = Self::get_kind(&v) {
if let Some(kind) = Self::get_kind(v) {
match kind {
OccurrenceInfoKind::Valid => valid_ptrs += 1,
OccurrenceInfoKind::Invalid => invalid_ptrs += 1,
Expand All @@ -243,7 +241,7 @@ impl Tracker {
}

fn join_hash_set(hs: HashSet<String>, mid: &str) -> String {
let v = Vec::from_iter(hs.into_iter());
let v = Vec::from_iter(hs);
v.join(mid)
}

Expand Down Expand Up @@ -360,19 +358,23 @@ pub fn process_message(
match cache.decode_pointer(ptr) {
None => {
tracing::warn!("Pointer {ptr:?} is not registered in cache");
tracker.as_mut().map(|t| t.track(ptr, block, &d, None));
if let Some(t) = tracker.as_mut() {
t.track(ptr, block, d, None)
}
continue;
}

Some(None) => {
tracker.as_mut().map(|t| t.track(ptr, block, &d, None));
if let Some(t) = tracker.as_mut() {
t.track(ptr, block, d, None)
}
continue;
}

Some(Some(ref stake_address)) => {
tracker
.as_mut()
.map(|t| t.track(ptr, block, &d, Some(stake_address)));
if let Some(t) = tracker.as_mut() {
t.track(ptr, block, d, Some(stake_address))
}
stake_address.clone()
}
}
Expand Down