From d8aea9d0f5be3df906ba740a4f967e51028c11e3 Mon Sep 17 00:00:00 2001 From: William Hankins Date: Thu, 30 Oct 2025 20:11:51 +0000 Subject: [PATCH 01/16] refactor: group StakeAddressDeltas and include Shelley addresses Signed-off-by: William Hankins --- common/src/stake_addresses.rs | 113 ++++++++++++++++-------- common/src/types.rs | 7 +- modules/accounts_state/src/state.rs | 27 ++++-- modules/stake_delta_filter/src/utils.rs | 60 ++++++++----- 4 files changed, 134 insertions(+), 73 deletions(-) diff --git a/common/src/stake_addresses.rs b/common/src/stake_addresses.rs index fff09af9..3f1ce5e3 100644 --- a/common/src/stake_addresses.rs +++ b/common/src/stake_addresses.rs @@ -497,7 +497,7 @@ impl StakeAddressMap { /// Stake Delta pub fn process_stake_delta(&mut self, stake_delta: &StakeAddressDelta) { // Use the full stake address directly - no need to extract hash! - let stake_address = &stake_delta.address; + let stake_address = &stake_delta.stake_address; // Stake addresses don't need to be registered if they aren't used for // stake or drep delegation, but we need to track them in case they are later @@ -610,7 +610,8 @@ mod tests { // Create an entry but don't register stake_addresses.process_stake_delta(&StakeAddressDelta { - address: stake_address.clone(), + stake_address: stake_address.clone(), + addresses: Vec::new(), delta: 100, }); @@ -686,7 +687,8 @@ mod tests { // Create an unregistered entry with UTXO value stake_addresses.process_stake_delta(&StakeAddressDelta { - address: stake_address.clone(), + stake_address: stake_address.clone(), + addresses: Vec::new(), delta: 100, }); @@ -744,7 +746,8 @@ mod tests { stake_addresses.register_stake_address(&stake_address); let delta = StakeAddressDelta { - address: stake_address.clone(), + stake_address: stake_address.clone(), + addresses: Vec::new(), delta: 42, }; stake_addresses.process_stake_delta(&delta); @@ -762,12 +765,14 @@ mod tests { stake_addresses.register_stake_address(&stake_address); stake_addresses.process_stake_delta(&StakeAddressDelta { - address: stake_address.clone(), + stake_address: stake_address.clone(), + addresses: Vec::new(), delta: 100, }); stake_addresses.process_stake_delta(&StakeAddressDelta { - address: stake_address.clone(), + stake_address: stake_address.clone(), + addresses: Vec::new(), delta: -30, }); @@ -782,13 +787,15 @@ mod tests { stake_addresses.register_stake_address(&stake_address); stake_addresses.process_stake_delta(&StakeAddressDelta { - address: stake_address.clone(), + stake_address: stake_address.clone(), + addresses: Vec::new(), delta: 50, }); // Try to subtract more than available stake_addresses.process_stake_delta(&StakeAddressDelta { - address: stake_address.clone(), + stake_address: stake_address.clone(), + addresses: Vec::new(), delta: -100, }); @@ -808,7 +815,8 @@ mod tests { stake_addresses.register_stake_address(&stake_address); stake_addresses.process_stake_delta(&StakeAddressDelta { - address: stake_address.clone(), + stake_address: stake_address.clone(), + addresses: Vec::new(), delta: 42, }); assert_eq!(stake_addresses.get(&stake_address).unwrap().utxo_value, 42); @@ -1018,13 +1026,15 @@ mod tests { stake_addresses.record_stake_delegation(&addr2, &SPO_HASH.to_vec()); stake_addresses.process_stake_delta(&StakeAddressDelta { - address: addr1.clone(), + stake_address: addr1.clone(), + addresses: Vec::new(), delta: 1000, }); stake_addresses.add_to_reward(&addr1, 50); stake_addresses.process_stake_delta(&StakeAddressDelta { - address: addr2.clone(), + stake_address: addr2.clone(), + addresses: Vec::new(), delta: 2000, }); stake_addresses.add_to_reward(&addr2, 100); @@ -1050,11 +1060,13 @@ mod tests { stake_addresses.record_stake_delegation(&addr2, &SPO_HASH_2.to_vec()); stake_addresses.process_stake_delta(&StakeAddressDelta { - address: addr1.clone(), + stake_address: addr1.clone(), + addresses: Vec::new(), delta: 1000, }); stake_addresses.process_stake_delta(&StakeAddressDelta { - address: addr2.clone(), + stake_address: addr2.clone(), + addresses: Vec::new(), delta: 2000, }); @@ -1072,7 +1084,8 @@ mod tests { stake_addresses.register_stake_address(&addr1); stake_addresses.process_stake_delta(&StakeAddressDelta { - address: addr1.clone(), + stake_address: addr1.clone(), + addresses: Vec::new(), delta: 1000, }); @@ -1097,19 +1110,22 @@ mod tests { stake_addresses.record_drep_delegation(&addr3, &DRepChoice::Key(DREP_HASH.to_vec())); stake_addresses.process_stake_delta(&StakeAddressDelta { - address: addr1.clone(), + stake_address: addr1.clone(), + addresses: Vec::new(), delta: 1000, }); stake_addresses.add_to_reward(&addr1, 50); stake_addresses.process_stake_delta(&StakeAddressDelta { - address: addr2.clone(), + stake_address: addr2.clone(), + addresses: Vec::new(), delta: 2000, }); stake_addresses.add_to_reward(&addr2, 100); stake_addresses.process_stake_delta(&StakeAddressDelta { - address: addr3.clone(), + stake_address: addr3.clone(), + addresses: Vec::new(), delta: 3000, }); stake_addresses.add_to_reward(&addr3, 150); @@ -1148,13 +1164,15 @@ mod tests { stake_addresses.record_stake_delegation(&addr2, &SPO_HASH_2.to_vec()); stake_addresses.process_stake_delta(&StakeAddressDelta { - address: addr1.clone(), + stake_address: addr1.clone(), + addresses: Vec::new(), delta: 1000, }); stake_addresses.add_to_reward(&addr1, 50); stake_addresses.process_stake_delta(&StakeAddressDelta { - address: addr2.clone(), + stake_address: addr2.clone(), + addresses: Vec::new(), delta: 2000, }); stake_addresses.add_to_reward(&addr2, 100); @@ -1179,11 +1197,13 @@ mod tests { stake_addresses.record_stake_delegation(&addr2, &SPO_HASH_2.to_vec()); stake_addresses.process_stake_delta(&StakeAddressDelta { - address: addr1.clone(), + stake_address: addr1.clone(), + addresses: Vec::new(), delta: 1000, }); stake_addresses.process_stake_delta(&StakeAddressDelta { - address: addr2.clone(), + stake_address: addr2.clone(), + addresses: Vec::new(), delta: 2000, }); @@ -1206,13 +1226,15 @@ mod tests { stake_addresses.record_stake_delegation(&addr2, &SPO_HASH.to_vec()); stake_addresses.process_stake_delta(&StakeAddressDelta { - address: addr1.clone(), + stake_address: addr1.clone(), + addresses: Vec::new(), delta: 1000, }); stake_addresses.add_to_reward(&addr1, 50); stake_addresses.process_stake_delta(&StakeAddressDelta { - address: addr2.clone(), + stake_address: addr2.clone(), + addresses: Vec::new(), delta: 2000, }); @@ -1238,13 +1260,15 @@ mod tests { stake_addresses.register_stake_address(&addr2); stake_addresses.process_stake_delta(&StakeAddressDelta { - address: addr1.clone(), + stake_address: addr1.clone(), + addresses: Vec::new(), delta: 1000, }); stake_addresses.add_to_reward(&addr1, 500); stake_addresses.process_stake_delta(&StakeAddressDelta { - address: addr2.clone(), + stake_address: addr2.clone(), + addresses: Vec::new(), delta: 2000, }); @@ -1264,7 +1288,8 @@ mod tests { stake_addresses.register_stake_address(&addr1); stake_addresses.process_stake_delta(&StakeAddressDelta { - address: addr1.clone(), + stake_address: addr1.clone(), + addresses: Vec::new(), delta: 1000, }); @@ -1294,13 +1319,15 @@ mod tests { stake_addresses.register_stake_address(&addr2); stake_addresses.process_stake_delta(&StakeAddressDelta { - address: addr1.clone(), + stake_address: addr1.clone(), + addresses: Vec::new(), delta: 1000, }); stake_addresses.add_to_reward(&addr1, 500); stake_addresses.process_stake_delta(&StakeAddressDelta { - address: addr2.clone(), + stake_address: addr2.clone(), + addresses: Vec::new(), delta: 2000, }); @@ -1318,7 +1345,8 @@ mod tests { stake_addresses.register_stake_address(&addr1); stake_addresses.process_stake_delta(&StakeAddressDelta { - address: addr1.clone(), + stake_address: addr1.clone(), + addresses: Vec::new(), delta: 1000, }); @@ -1352,13 +1380,15 @@ mod tests { stake_addresses.register_stake_address(&addr2); stake_addresses.process_stake_delta(&StakeAddressDelta { - address: addr1.clone(), + stake_address: addr1.clone(), + addresses: Vec::new(), delta: 1000, }); stake_addresses.add_to_reward(&addr1, 100); stake_addresses.process_stake_delta(&StakeAddressDelta { - address: addr2.clone(), + stake_address: addr2.clone(), + addresses: Vec::new(), delta: 2000, }); @@ -1378,7 +1408,8 @@ mod tests { stake_addresses.register_stake_address(&addr1); stake_addresses.process_stake_delta(&StakeAddressDelta { - address: addr1.clone(), + stake_address: addr1.clone(), + addresses: Vec::new(), delta: 1000, }); @@ -1408,13 +1439,15 @@ mod tests { stake_addresses.register_stake_address(&addr2); stake_addresses.process_stake_delta(&StakeAddressDelta { - address: addr1.clone(), + stake_address: addr1.clone(), + addresses: Vec::new(), delta: 1000, }); stake_addresses.add_to_reward(&addr1, 100); stake_addresses.process_stake_delta(&StakeAddressDelta { - address: addr2.clone(), + stake_address: addr2.clone(), + addresses: Vec::new(), delta: 2000, }); @@ -1432,7 +1465,8 @@ mod tests { stake_addresses.register_stake_address(&addr1); stake_addresses.process_stake_delta(&StakeAddressDelta { - address: addr1.clone(), + stake_address: addr1.clone(), + addresses: Vec::new(), delta: 1000, }); @@ -1527,15 +1561,18 @@ mod tests { stake_addresses.record_drep_delegation(&addr3, &DRepChoice::Abstain); stake_addresses.process_stake_delta(&StakeAddressDelta { - address: addr1.clone(), + stake_address: addr1.clone(), + addresses: Vec::new(), delta: 1000, }); stake_addresses.process_stake_delta(&StakeAddressDelta { - address: addr2.clone(), + stake_address: addr2.clone(), + addresses: Vec::new(), delta: 2000, }); stake_addresses.process_stake_delta(&StakeAddressDelta { - address: addr3.clone(), + stake_address: addr3.clone(), + addresses: Vec::new(), delta: 3000, }); diff --git a/common/src/types.rs b/common/src/types.rs index e530c97e..5b0d037f 100644 --- a/common/src/types.rs +++ b/common/src/types.rs @@ -201,8 +201,11 @@ pub struct AddressDelta { /// Stake balance change #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub struct StakeAddressDelta { - /// Address - pub address: StakeAddress, + /// Stake address + pub stake_address: StakeAddress, + + /// Shelley addresses contributing to the delta + pub addresses: Vec, /// Balance change pub delta: i64, diff --git a/modules/accounts_state/src/state.rs b/modules/accounts_state/src/state.rs index a711c347..c58e2e6b 100644 --- a/modules/accounts_state/src/state.rs +++ b/modules/accounts_state/src/state.rs @@ -1002,7 +1002,8 @@ mod tests { // Pass in deltas let msg = StakeAddressDeltasMessage { deltas: vec![StakeAddressDelta { - address: stake_address.clone(), + stake_address: stake_address.clone(), + addresses: Vec::new(), delta: 42, }], }; @@ -1089,7 +1090,8 @@ mod tests { // Put some value in let msg1 = StakeAddressDeltasMessage { deltas: vec![StakeAddressDelta { - address: addr1.clone(), + stake_address: addr1.clone(), + addresses: Vec::new(), delta: 42, }], }; @@ -1098,7 +1100,8 @@ mod tests { let msg2 = StakeAddressDeltasMessage { deltas: vec![StakeAddressDelta { - address: addr2.clone(), + stake_address: addr2.clone(), + addresses: Vec::new(), delta: 21, }], }; @@ -1196,7 +1199,8 @@ mod tests { let msg = StakeAddressDeltasMessage { deltas: vec![StakeAddressDelta { - address: stake_address.clone(), + stake_address: stake_address.clone(), + addresses: Vec::new(), delta: 99, }], }; @@ -1242,7 +1246,8 @@ mod tests { state.register_stake_address(&stake_address, None); let msg = StakeAddressDeltasMessage { deltas: vec![StakeAddressDelta { - address: stake_address.clone(), + stake_address: stake_address.clone(), + addresses: Vec::new(), delta: 99, }], }; @@ -1402,19 +1407,23 @@ mod tests { let deltas = vec![ StakeAddressDelta { - address: spo1, + stake_address: spo1, + addresses: Vec::new(), delta: 100, }, StakeAddressDelta { - address: spo2, + stake_address: spo2, + addresses: Vec::new(), delta: 1_000, }, StakeAddressDelta { - address: spo3, + stake_address: spo3, + addresses: Vec::new(), delta: 10_000, }, StakeAddressDelta { - address: spo4, + stake_address: spo4, + addresses: Vec::new(), delta: 100_000, }, ]; diff --git a/modules/stake_delta_filter/src/utils.rs b/modules/stake_delta_filter/src/utils.rs index 7855edd9..af84bd54 100644 --- a/modules/stake_delta_filter/src/utils.rs +++ b/modules/stake_delta_filter/src/utils.rs @@ -1,7 +1,7 @@ use acropolis_common::{ messages::{AddressDeltasMessage, StakeAddressDeltasMessage}, - Address, AddressDelta, BlockInfo, Era, ShelleyAddressDelegationPart, ShelleyAddressPointer, - StakeAddress, StakeAddressDelta, StakeCredential, + Address, AddressDelta, BlockInfo, Era, ShelleyAddress, ShelleyAddressDelegationPart, + ShelleyAddressPointer, StakeAddress, StakeAddressDelta, StakeCredential, }; use anyhow::{anyhow, Result}; use serde_with::serde_as; @@ -321,8 +321,7 @@ pub fn process_message( block: &BlockInfo, mut tracker: Option<&mut Tracker>, ) -> StakeAddressDeltasMessage { - let mut result = StakeAddressDeltasMessage { deltas: Vec::new() }; - + let mut grouped: HashMap)> = HashMap::new(); for d in delta.deltas.iter() { // Variants to be processed: // 1. Shelley Address delegation is a stake @@ -336,12 +335,12 @@ pub fn process_message( cache.ensure_up_to_date(block, &d.address).unwrap_or_else(|e| error!("{e}")); - let stake_address = match &d.address { + let (stake_address, shelley_opt) = match &d.address { // Not good for staking Address::None | Address::Byron(_) => continue, Address::Shelley(shelley) => { - match &shelley.delegation { + let stake_address = match &shelley.delegation { // Base addresses (stake delegated to itself) ShelleyAddressDelegationPart::StakeKeyHash(keyhash) => StakeAddress { network: shelley.network.clone(), @@ -382,20 +381,33 @@ pub fn process_message( // Enterprise addresses, does not delegate stake ShelleyAddressDelegationPart::None => continue, - } + }; + (stake_address, Some(shelley)) } - Address::Stake(stake_address) => stake_address.clone(), + Address::Stake(stake_address) => (stake_address.clone(), None), }; - let stake_delta = StakeAddressDelta { - address: stake_address, - delta: d.value.lovelace, - }; - result.deltas.push(stake_delta); + let entry = grouped.entry(stake_address).or_insert((0, HashSet::new())); + entry.0 += d.value.lovelace; + + if let Some(shelley) = shelley_opt { + entry.1.insert(shelley.clone()); + } } - result + let deltas = grouped + .into_iter() + .map( + |(stake_address, (delta, shelley_addrs))| StakeAddressDelta { + stake_address, + addresses: shelley_addrs.into_iter().collect(), + delta, + }, + ) + .collect::>(); + + StakeAddressDeltasMessage { deltas } } #[cfg(test)] @@ -551,45 +563,45 @@ mod test { let stake_delta = process_message(&cache, &delta, &block, None); assert_eq!( - stake_delta.deltas.first().unwrap().address.to_string().unwrap(), + stake_delta.deltas.first().unwrap().stake_address.to_string().unwrap(), stake_addr ); assert_eq!( - stake_delta.deltas.get(1).unwrap().address.to_string().unwrap(), + stake_delta.deltas.get(1).unwrap().stake_address.to_string().unwrap(), stake_addr ); assert_eq!( - stake_delta.deltas.get(2).unwrap().address.to_string().unwrap(), + stake_delta.deltas.get(2).unwrap().stake_address.to_string().unwrap(), script_addr ); assert_eq!( - stake_delta.deltas.get(3).unwrap().address.to_string().unwrap(), + stake_delta.deltas.get(3).unwrap().stake_address.to_string().unwrap(), script_addr ); assert_eq!( - stake_delta.deltas.get(4).unwrap().address.to_string().unwrap(), + stake_delta.deltas.get(4).unwrap().stake_address.to_string().unwrap(), pointed_addr ); assert_eq!( - stake_delta.deltas.get(5).unwrap().address.to_string().unwrap(), + stake_delta.deltas.get(5).unwrap().stake_address.to_string().unwrap(), pointed_addr ); assert_eq!( - stake_delta.deltas.get(6).unwrap().address.to_string().unwrap(), + stake_delta.deltas.get(6).unwrap().stake_address.to_string().unwrap(), stake_addr ); assert_eq!( - stake_delta.deltas.get(7).unwrap().address.to_string().unwrap(), + stake_delta.deltas.get(7).unwrap().stake_address.to_string().unwrap(), script_addr ); // additional check: payload conversion correctness assert_eq!( - stake_delta.deltas.first().unwrap().address.credential.to_string().unwrap(), + stake_delta.deltas.first().unwrap().stake_address.credential.to_string().unwrap(), stake_key_hash ); assert_eq!( - stake_delta.deltas.get(2).unwrap().address.credential.to_string().unwrap(), + stake_delta.deltas.get(2).unwrap().stake_address.credential.to_string().unwrap(), script_hash ); From df32d53c8cd7bf693e11993d155b0a742fdb1748 Mon Sep 17 00:00:00 2001 From: William Hankins Date: Thu, 30 Oct 2025 23:45:16 +0000 Subject: [PATCH 02/16] feat: address deltas processing and persistence Signed-off-by: William Hankins --- common/src/address.rs | 106 +++++++++++++++++- common/src/queries/accounts.rs | 7 +- .../src/historical_accounts_state.rs | 36 +++--- .../src/immutable_historical_account_store.rs | 57 ++++++++-- .../historical_accounts_state/src/state.rs | 43 ++++++- 5 files changed, 210 insertions(+), 39 deletions(-) diff --git a/common/src/address.rs b/common/src/address.rs index 46cee671..8ad204b1 100644 --- a/common/src/address.rs +++ b/common/src/address.rs @@ -278,7 +278,7 @@ impl ShelleyAddress { Ok(bech32::encode::(hrp, &data)?) } - pub fn to_bytes_key(&self) -> Result> { + pub fn to_bytes_key(&self) -> Vec { let network_bits = match self.network { NetworkId::Mainnet => 1u8, NetworkId::Testnet => 0u8, @@ -325,7 +325,55 @@ impl ShelleyAddress { } } - Ok(data) + data + } + + pub fn from_bytes_key(data: &[u8]) -> Result { + if data.is_empty() { + return Err(anyhow!("empty address bytes")); + } + + let header = data[0]; + let network = match header & 0x0F { + 0 => NetworkId::Testnet, + 1 => NetworkId::Mainnet, + _ => return Err(anyhow!("invalid network bits in header")), + }; + + let payment_bits = (header >> 4) & 0x01; + let delegation_bits = (header >> 5) & 0x03; + + let payment = match payment_bits { + 0 => ShelleyAddressPaymentPart::PaymentKeyHash(data[1..29].to_vec()), + 1 => ShelleyAddressPaymentPart::ScriptHash(data[1..29].to_vec()), + _ => return Err(anyhow!("invalid payment bits")), + }; + + let delegation = match delegation_bits { + 0 => ShelleyAddressDelegationPart::StakeKeyHash(data[29..57].to_vec()), + 1 => ShelleyAddressDelegationPart::ScriptHash(data[29..57].to_vec()), + 2 => { + let mut decoder = VarIntDecoder::new(&data[29..]); + let slot = decoder.read()?; + let tx_index = decoder.read()?; + let cert_index = decoder.read()?; + + ShelleyAddressDelegationPart::Pointer(ShelleyAddressPointer { + slot, + tx_index, + cert_index, + }) + } + 3 => ShelleyAddressDelegationPart::None, + + _ => return Err(anyhow!("invalid delegation bits")), + }; + + Ok(ShelleyAddress { + network, + payment, + delegation, + }) } pub fn stake_address_string(&self) -> Result> { @@ -573,7 +621,7 @@ impl Address { match self { Address::Byron(b) => b.to_bytes_key(), - Address::Shelley(s) => s.to_bytes_key(), + Address::Shelley(s) => Ok(s.to_bytes_key()), Address::Stake(stake) => stake.to_bytes_key(), @@ -1020,4 +1068,56 @@ mod tests { let result = StakeAddress::decode(&mut decoder, &mut ()); assert!(result.is_err()); } + + #[test] + fn test_shelley_address_to_from_bytes_key_roundtrip() { + let payment_hash = vec![0x11; 28]; + let stake_hash = vec![0x22; 28]; + let script_hash = vec![0x33; 28]; + + // Normal address + let addr_base = ShelleyAddress { + network: NetworkId::Mainnet, + payment: ShelleyAddressPaymentPart::PaymentKeyHash(payment_hash.clone()), + delegation: ShelleyAddressDelegationPart::StakeKeyHash(stake_hash.clone()), + }; + let bytes = addr_base.to_bytes_key(); + let decoded = ShelleyAddress::from_bytes_key(&bytes).expect("decode base"); + assert_eq!(addr_base, decoded); + + // Script address + let addr_script = ShelleyAddress { + network: NetworkId::Testnet, + payment: ShelleyAddressPaymentPart::ScriptHash(payment_hash.clone()), + delegation: ShelleyAddressDelegationPart::ScriptHash(script_hash.clone()), + }; + let bytes = addr_script.to_bytes_key(); + let decoded = ShelleyAddress::from_bytes_key(&bytes).expect("decode script"); + assert_eq!(addr_script, decoded); + + // Pointer address + let pointer = ShelleyAddressPointer { + slot: 1234, + tx_index: 56, + cert_index: 2, + }; + let addr_pointer = ShelleyAddress { + network: NetworkId::Mainnet, + payment: ShelleyAddressPaymentPart::PaymentKeyHash(payment_hash.clone()), + delegation: ShelleyAddressDelegationPart::Pointer(pointer), + }; + let bytes = addr_pointer.to_bytes_key(); + let decoded = ShelleyAddress::from_bytes_key(&bytes).expect("decode pointer"); + assert_eq!(addr_pointer, decoded); + + // Enterprise address + let addr_none = ShelleyAddress { + network: NetworkId::Testnet, + payment: ShelleyAddressPaymentPart::PaymentKeyHash(payment_hash), + delegation: ShelleyAddressDelegationPart::None, + }; + let bytes = addr_none.to_bytes_key(); + let decoded = ShelleyAddress::from_bytes_key(&bytes).expect("decode none"); + assert_eq!(addr_none, decoded); + } } diff --git a/common/src/queries/accounts.rs b/common/src/queries/accounts.rs index 4f8fa2bd..da9e8d79 100644 --- a/common/src/queries/accounts.rs +++ b/common/src/queries/accounts.rs @@ -1,7 +1,8 @@ use std::collections::HashMap; use crate::{ - DRepChoice, KeyHash, PoolId, PoolLiveStakeInfo, RewardType, StakeAddress, TxIdentifier, + DRepChoice, KeyHash, PoolId, PoolLiveStakeInfo, RewardType, ShelleyAddress, StakeAddress, + TxIdentifier, }; pub const DEFAULT_ACCOUNTS_QUERY_TOPIC: (&str, &str) = @@ -21,7 +22,7 @@ pub enum AccountsStateQuery { GetAccountDelegationHistory { account: StakeAddress }, GetAccountMIRHistory { account: StakeAddress }, GetAccountWithdrawalHistory { account: StakeAddress }, - GetAccountAssociatedAddresses { stake_key: Vec }, + GetAccountAssociatedAddresses { account: StakeAddress }, GetAccountAssets { stake_key: Vec }, GetAccountAssetsTotals { stake_key: Vec }, GetAccountUTxOs { stake_key: Vec }, @@ -55,7 +56,7 @@ pub enum AccountsStateQueryResponse { AccountDelegationHistory(Vec), AccountMIRHistory(Vec), AccountWithdrawalHistory(Vec), - AccountAssociatedAddresses(AccountAssociatedAddresses), + AccountAssociatedAddresses(Vec), AccountAssets(AccountAssets), AccountAssetsTotals(AccountAssetsTotals), AccountUTxOs(AccountUTxOs), diff --git a/modules/historical_accounts_state/src/historical_accounts_state.rs b/modules/historical_accounts_state/src/historical_accounts_state.rs index 99d07d30..a99695c6 100644 --- a/modules/historical_accounts_state/src/historical_accounts_state.rs +++ b/modules/historical_accounts_state/src/historical_accounts_state.rs @@ -26,8 +26,10 @@ mod volatile_historical_accounts; const DEFAULT_REWARDS_SUBSCRIBE_TOPIC: &str = "cardano.stake.reward.deltas"; const DEFAULT_TX_CERTIFICATES_SUBSCRIBE_TOPIC: &str = "cardano.certificates"; const DEFAULT_WITHDRAWALS_SUBSCRIBE_TOPIC: &str = "cardano.withdrawals"; -const DEFAULT_ADDRESS_DELTAS_SUBSCRIBE_TOPIC: (&str, &str) = - ("address-deltas-subscribe-topic", "cardano.address.delta"); +const DEFAULT_STAKE_ADDRESS_DELTAS_SUBSCRIBE_TOPIC: (&str, &str) = ( + "stake-address-deltas-subscribe-topic", + "cardano.stake.deltas", +); const DEFAULT_PARAMETERS_SUBSCRIBE_TOPIC: (&str, &str) = ("parameters-subscribe-topic", "cardano.protocol.parameters"); @@ -56,13 +58,11 @@ impl HistoricalAccountsState { mut rewards_subscription: Box>, mut certs_subscription: Box>, mut withdrawals_subscription: Box>, - mut address_deltas_subscription: Box>, + mut stake_address_deltas_subscription: Box>, mut params_subscription: Box>, ) -> Result<()> { let _ = params_subscription.read().await?; info!("Consumed initial genesis params from params_subscription"); - let _ = address_deltas_subscription.read().await?; - info!("Consumed initial address deltas from address_deltas_subscription"); // Background task to persist epochs sequentially const MAX_PENDING_PERSISTS: usize = 1; @@ -82,7 +82,7 @@ impl HistoricalAccountsState { loop { // Create all per-block message futures upfront before processing messages sequentially let certs_message_f = certs_subscription.read(); - let address_deltas_message_f = address_deltas_subscription.read(); + let stake_address_deltas_message_f = stake_address_deltas_subscription.read(); let withdrawals_message_f = withdrawals_subscription.read(); let mut current_block: Option = None; @@ -166,22 +166,19 @@ impl HistoricalAccountsState { } // Handle address deltas - let (_, message) = address_deltas_message_f.await?; + let (_, message) = stake_address_deltas_message_f.await?; match message.as_ref() { - Message::Cardano((block_info, CardanoMessage::AddressDeltas(deltas_msg))) => { + Message::Cardano((block_info, CardanoMessage::StakeAddressDeltas(deltas_msg))) => { let span = info_span!( "historical_account_state.handle_address_deltas", block = block_info.number ); let _entered = span.enter(); - Self::check_sync(¤t_block, block_info); + Self::check_sync(¤t_block, &block_info); { let mut state = state_mutex.lock().await; - state - .handle_address_deltas(deltas_msg) - .inspect_err(|e| error!("AddressDeltas handling error: {e:#}")) - .ok(); + state.handle_address_deltas(&deltas_msg); } } @@ -249,8 +246,8 @@ impl HistoricalAccountsState { .unwrap_or(DEFAULT_REWARDS_SUBSCRIBE_TOPIC.to_string()); let address_deltas_topic = config - .get_string(DEFAULT_ADDRESS_DELTAS_SUBSCRIBE_TOPIC.0) - .unwrap_or(DEFAULT_ADDRESS_DELTAS_SUBSCRIBE_TOPIC.1.to_string()); + .get_string(DEFAULT_STAKE_ADDRESS_DELTAS_SUBSCRIBE_TOPIC.0) + .unwrap_or(DEFAULT_STAKE_ADDRESS_DELTAS_SUBSCRIBE_TOPIC.1.to_string()); let params_topic = config .get_string(DEFAULT_PARAMETERS_SUBSCRIBE_TOPIC.0) @@ -354,6 +351,15 @@ impl HistoricalAccountsState { Err(e) => AccountsStateQueryResponse::Error(e.to_string()), } } + AccountsStateQuery::GetAccountAssociatedAddresses { account } => { + match state.lock().await.get_addresses(account).await { + Ok(Some(addresses)) => { + AccountsStateQueryResponse::AccountAssociatedAddresses(addresses) + } + Ok(None) => AccountsStateQueryResponse::NotFound, + Err(e) => AccountsStateQueryResponse::Error(e.to_string()), + } + } _ => AccountsStateQueryResponse::Error(format!( "Unimplemented query variant: {:?}", query diff --git a/modules/historical_accounts_state/src/immutable_historical_account_store.rs b/modules/historical_accounts_state/src/immutable_historical_account_store.rs index 545b9722..8a14e78c 100644 --- a/modules/historical_accounts_state/src/immutable_historical_account_store.rs +++ b/modules/historical_accounts_state/src/immutable_historical_account_store.rs @@ -1,4 +1,8 @@ -use std::{collections::HashMap, path::Path}; +use std::{ + collections::{HashMap, HashSet}, + hash::Hash, + path::Path, +}; use acropolis_common::{ queries::accounts::{AccountReward, AccountWithdrawal, DelegationUpdate, RegistrationUpdate}, @@ -127,10 +131,12 @@ impl ImmutableHistoricalAccountStore { } // Persist address updates - // TODO: Deduplicate addresses across epochs if config.store_addresses { if let Some(updates) = &entry.addresses { - batch.insert(&self.addresses, epoch_key, to_vec(updates)?); + for address in updates { + let address_key = Self::make_address_key(&account, address.clone()); + batch.insert(&self.addresses, address_key, &[]); + } } } } @@ -250,16 +256,28 @@ impl ImmutableHistoricalAccountStore { Ok((!immutable_withdrawals.is_empty()).then_some(immutable_withdrawals)) } - pub async fn _get_addresses( + pub async fn get_addresses( &self, account: &StakeAddress, - ) -> Result>> { - let mut immutable_addresses = - self.collect_partition::(&self.addresses, account.get_hash())?; + ) -> Result>> { + let prefix = account.get_hash(); + let mut addresses: HashSet = HashSet::new(); + + for result in self.addresses.prefix(&prefix) { + let (key, _) = result?; + let shelley = ShelleyAddress::from_bytes_key(&key[prefix.len()..])?; + addresses.insert(shelley); + } - self.merge_pending(account, |e| e.addresses.as_ref(), &mut immutable_addresses).await; + for block_map in self.pending.lock().await.iter() { + if let Some(entry) = block_map.get(account) { + if let Some(addrs) = &entry.addresses { + addresses.extend(addrs.iter().cloned()); + } + } + } - Ok((!immutable_addresses.is_empty()).then_some(immutable_addresses)) + Ok((!addresses.is_empty()).then_some(addresses)) } fn merge_block_deltas( @@ -281,13 +299,12 @@ impl ImmutableHistoricalAccountStore { ); Self::extend_opt_vec(&mut agg_entry.withdrawal_history, entry.withdrawal_history); Self::extend_opt_vec(&mut agg_entry.mir_history, entry.mir_history); - Self::extend_opt_vec(&mut agg_entry.addresses, entry.addresses); + Self::extend_opt_set(&mut agg_entry.addresses, entry.addresses); } acc }) } - #[allow(dead_code)] fn collect_partition(&self, partition: &Partition, prefix: &[u8]) -> Result> where T: for<'a> minicbor::Decode<'a, ()>, @@ -301,7 +318,6 @@ impl ImmutableHistoricalAccountStore { Ok(out) } - #[allow(dead_code)] async fn merge_pending(&self, account: &StakeAddress, f: F, out: &mut Vec) where F: Fn(&AccountEntry) -> Option<&Vec>, @@ -323,6 +339,12 @@ impl ImmutableHistoricalAccountStore { key } + fn make_address_key(account: &StakeAddress, address: ShelleyAddress) -> Vec { + let mut key = account.get_credential().get_hash(); + key.extend(address.to_bytes_key()); + key + } + fn extend_opt_vec(target: &mut Option>, src: Option>) { if let Some(mut v) = src { if !v.is_empty() { @@ -330,4 +352,15 @@ impl ImmutableHistoricalAccountStore { } } } + + fn extend_opt_set(target: &mut Option>, src: Option>) + where + T: Eq + Hash, + { + if let Some(mut s) = src { + if !s.is_empty() { + target.get_or_insert_with(HashSet::new).extend(s.drain()); + } + } + } } diff --git a/modules/historical_accounts_state/src/state.rs b/modules/historical_accounts_state/src/state.rs index ea8f63dc..c3c36761 100644 --- a/modules/historical_accounts_state/src/state.rs +++ b/modules/historical_accounts_state/src/state.rs @@ -1,11 +1,13 @@ use std::{ + collections::HashSet, path::{Path, PathBuf}, sync::Arc, }; use acropolis_common::{ messages::{ - AddressDeltasMessage, StakeRewardDeltasMessage, TxCertificatesMessage, WithdrawalsMessage, + StakeAddressDeltasMessage, StakeRewardDeltasMessage, TxCertificatesMessage, + WithdrawalsMessage, }, queries::accounts::{ AccountReward, AccountWithdrawal, DelegationUpdate, RegistrationStatus, RegistrationUpdate, @@ -30,7 +32,7 @@ pub struct AccountEntry { pub registration_history: Option>, pub withdrawal_history: Option>, pub mir_history: Option>, - pub addresses: Option>, + pub addresses: Option>, } #[derive(Debug, Clone, minicbor::Decode, minicbor::Encode)] @@ -223,8 +225,16 @@ impl State { } } - pub fn handle_address_deltas(&mut self, _address_deltas: &AddressDeltasMessage) -> Result<()> { - Ok(()) + pub fn handle_address_deltas(&mut self, stake_address_deltas: &StakeAddressDeltasMessage) { + let window = self.volatile.window.back_mut().expect("window should never be empty"); + for delta in stake_address_deltas.deltas.iter() { + window + .entry(delta.stake_address.clone()) + .or_default() + .addresses + .get_or_insert_with(HashSet::new) + .extend(delta.addresses.clone()); + } } pub fn handle_withdrawals(&mut self, withdrawals_msg: &WithdrawalsMessage) { @@ -344,8 +354,29 @@ impl State { } } - pub async fn _get_addresses(&self, _account: StakeAddress) -> Result> { - Ok(Vec::new()) + pub async fn get_addresses( + &self, + account: &StakeAddress, + ) -> Result>> { + let immutable = self.immutable.get_addresses(account).await?; + + let mut volatile = HashSet::new(); + for block_map in self.volatile.window.iter() { + if let Some(entry) = block_map.get(account) { + if let Some(pending) = &entry.addresses { + volatile.extend(pending.iter().cloned()); + } + } + } + + match immutable { + Some(mut addresses) => { + addresses.extend(volatile); + Ok(Some(addresses.into_iter().collect())) + } + None if volatile.is_empty() => Ok(None), + None => Ok(Some(volatile.into_iter().collect())), + } } fn handle_stake_registration_change( From eceb697a7521df7a5eeb88e652d80c876ad3af8f Mon Sep 17 00:00:00 2001 From: William Hankins Date: Fri, 31 Oct 2025 16:50:13 +0000 Subject: [PATCH 03/16] test: update CIP19 message processing test for grouped stake address batching Signed-off-by: William Hankins --- modules/stake_delta_filter/src/utils.rs | 68 ++++++++++++------------- 1 file changed, 34 insertions(+), 34 deletions(-) diff --git a/modules/stake_delta_filter/src/utils.rs b/modules/stake_delta_filter/src/utils.rs index af84bd54..3a4d6daf 100644 --- a/modules/stake_delta_filter/src/utils.rs +++ b/modules/stake_delta_filter/src/utils.rs @@ -562,50 +562,50 @@ mod test { let stake_delta = process_message(&cache, &delta, &block, None); + let stake_addr_entry = stake_delta + .deltas + .iter() + .find(|d| d.stake_address.to_string().unwrap() == stake_addr) + .expect("Expected stake_addr not found in deltas"); assert_eq!( - stake_delta.deltas.first().unwrap().stake_address.to_string().unwrap(), - stake_addr - ); - assert_eq!( - stake_delta.deltas.get(1).unwrap().stake_address.to_string().unwrap(), - stake_addr - ); - assert_eq!( - stake_delta.deltas.get(2).unwrap().stake_address.to_string().unwrap(), - script_addr - ); - assert_eq!( - stake_delta.deltas.get(3).unwrap().stake_address.to_string().unwrap(), - script_addr - ); - assert_eq!( - stake_delta.deltas.get(4).unwrap().stake_address.to_string().unwrap(), - pointed_addr - ); - assert_eq!( - stake_delta.deltas.get(5).unwrap().stake_address.to_string().unwrap(), - pointed_addr + stake_addr_entry.addresses.len(), + 2, + "Expected 2 Shelley addresses grouped under stake_addr" ); + + let script_addr_entry = stake_delta + .deltas + .iter() + .find(|d| d.stake_address.to_string().unwrap() == script_addr) + .expect("Expected script_addr not found in deltas"); assert_eq!( - stake_delta.deltas.get(6).unwrap().stake_address.to_string().unwrap(), - stake_addr + script_addr_entry.addresses.len(), + 2, + "Expected 2 Shelley addresses grouped under script_addr" ); - assert_eq!( - stake_delta.deltas.get(7).unwrap().stake_address.to_string().unwrap(), - script_addr + + assert!( + stake_delta.deltas.iter().any(|d| d.stake_address.to_string().unwrap() == pointed_addr), + "Expected pointed_addr not found in deltas" ); // additional check: payload conversion correctness - assert_eq!( - stake_delta.deltas.first().unwrap().stake_address.credential.to_string().unwrap(), - stake_key_hash + assert!( + stake_delta + .deltas + .iter() + .any(|d| d.stake_address.credential.to_string().unwrap() == stake_key_hash), + "Expected stake_key_hash not found in deltas" ); - assert_eq!( - stake_delta.deltas.get(2).unwrap().stake_address.credential.to_string().unwrap(), - script_hash + assert!( + stake_delta + .deltas + .iter() + .any(|d| d.stake_address.credential.to_string().unwrap() == script_hash), + "Expected script_hash not found in deltas" ); - assert_eq!(stake_delta.deltas.len(), 8); + assert_eq!(stake_delta.deltas.len(), 3); Ok(()) } From 19c6211b7a80586996eb17c719d18a9029fdbe94 Mon Sep 17 00:00:00 2001 From: William Hankins Date: Fri, 31 Oct 2025 17:15:42 +0000 Subject: [PATCH 04/16] fix: clippy warnings Signed-off-by: William Hankins --- .../src/historical_accounts_state.rs | 4 ++-- .../src/immutable_historical_account_store.rs | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/modules/historical_accounts_state/src/historical_accounts_state.rs b/modules/historical_accounts_state/src/historical_accounts_state.rs index a99695c6..52a4fbcf 100644 --- a/modules/historical_accounts_state/src/historical_accounts_state.rs +++ b/modules/historical_accounts_state/src/historical_accounts_state.rs @@ -175,10 +175,10 @@ impl HistoricalAccountsState { ); let _entered = span.enter(); - Self::check_sync(¤t_block, &block_info); + Self::check_sync(¤t_block, block_info); { let mut state = state_mutex.lock().await; - state.handle_address_deltas(&deltas_msg); + state.handle_address_deltas(deltas_msg); } } diff --git a/modules/historical_accounts_state/src/immutable_historical_account_store.rs b/modules/historical_accounts_state/src/immutable_historical_account_store.rs index 53e54ab4..301697c6 100644 --- a/modules/historical_accounts_state/src/immutable_historical_account_store.rs +++ b/modules/historical_accounts_state/src/immutable_historical_account_store.rs @@ -135,7 +135,7 @@ impl ImmutableHistoricalAccountStore { if let Some(updates) = &entry.addresses { for address in updates { let address_key = Self::make_address_key(&account, address.clone()); - batch.insert(&self.addresses, address_key, &[]); + batch.insert(&self.addresses, address_key, []); } } } From faab2caee286a3a44abe98b165ade73f35f2cd44 Mon Sep 17 00:00:00 2001 From: William Hankins Date: Fri, 31 Oct 2025 17:28:18 +0000 Subject: [PATCH 05/16] feat: account addresses REST handler Signed-off-by: William Hankins --- .../rest_blockfrost/src/handlers/accounts.rs | 73 ++++++++++++++++++- .../rest_blockfrost/src/rest_blockfrost.rs | 19 ++++- modules/rest_blockfrost/src/types.rs | 5 ++ 3 files changed, 93 insertions(+), 4 deletions(-) diff --git a/modules/rest_blockfrost/src/handlers/accounts.rs b/modules/rest_blockfrost/src/handlers/accounts.rs index bc663b4e..1bb82a6e 100644 --- a/modules/rest_blockfrost/src/handlers/accounts.rs +++ b/modules/rest_blockfrost/src/handlers/accounts.rs @@ -14,7 +14,8 @@ use caryatid_sdk::Context; use crate::handlers_config::HandlersConfig; use crate::types::{ - AccountRewardREST, AccountWithdrawalREST, DelegationUpdateREST, RegistrationUpdateREST, + AccountAddressREST, AccountRewardREST, AccountWithdrawalREST, DelegationUpdateREST, + RegistrationUpdateREST, }; #[derive(serde::Serialize)] @@ -564,6 +565,76 @@ pub async fn handle_account_rewards_blockfrost( } } +pub async fn handle_account_addresses_blockfrost( + context: Arc>, + params: Vec, + handlers_config: Arc, +) -> Result { + let account = match parse_stake_address(¶ms) { + Ok(addr) => addr, + Err(resp) => return Ok(resp), + }; + + // Prepare the message + let msg = Arc::new(Message::StateQuery(StateQuery::Accounts( + AccountsStateQuery::GetAccountAssociatedAddresses { account }, + ))); + + // Get addresses from historical accounts state + let addresses = query_state( + &context, + &handlers_config.historical_accounts_query_topic, + msg, + |message| match message { + Message::StateQueryResponse(StateQueryResponse::Accounts( + AccountsStateQueryResponse::AccountAssociatedAddresses(addresses), + )) => Ok(Some(addresses)), + Message::StateQueryResponse(StateQueryResponse::Accounts( + AccountsStateQueryResponse::NotFound, + )) => Ok(None), + Message::StateQueryResponse(StateQueryResponse::Accounts( + AccountsStateQueryResponse::Error(e), + )) => Err(anyhow::anyhow!( + "Internal server error while retrieving account addresses: {e}" + )), + _ => Err(anyhow::anyhow!( + "Unexpected message type while retrieving account addresses" + )), + }, + ) + .await?; + + let Some(addresses) = addresses else { + return Ok(RESTResponse::with_text(404, "Account not found")); + }; + + let rest_response = match addresses + .iter() + .map(|r| { + Ok::<_, anyhow::Error>(AccountAddressREST { + address: r.to_string().map_err(|e| anyhow!("invalid address: {e}"))?, + }) + }) + .collect::, _>>() + { + Ok(v) => v, + Err(e) => { + return Ok(RESTResponse::with_text( + 500, + &format!("Failed to convert address entry: {e}"), + )); + } + }; + + match serde_json::to_string_pretty(&rest_response) { + Ok(json) => Ok(RESTResponse::with_json(200, &json)), + Err(e) => Ok(RESTResponse::with_text( + 500, + &format!("Internal server error while serializing addresses: {e}"), + )), + } +} + fn parse_stake_address(params: &[String]) -> Result { let Some(stake_key) = params.first() else { return Err(RESTResponse::with_text( diff --git a/modules/rest_blockfrost/src/rest_blockfrost.rs b/modules/rest_blockfrost/src/rest_blockfrost.rs index 57b6a7d4..2cdb5570 100644 --- a/modules/rest_blockfrost/src/rest_blockfrost.rs +++ b/modules/rest_blockfrost/src/rest_blockfrost.rs @@ -17,9 +17,10 @@ mod types; mod utils; use handlers::{ accounts::{ - handle_account_delegations_blockfrost, handle_account_mirs_blockfrost, - handle_account_registrations_blockfrost, handle_account_rewards_blockfrost, - handle_account_withdrawals_blockfrost, handle_single_account_blockfrost, + handle_account_addresses_blockfrost, handle_account_delegations_blockfrost, + handle_account_mirs_blockfrost, handle_account_registrations_blockfrost, + handle_account_rewards_blockfrost, handle_account_withdrawals_blockfrost, + handle_single_account_blockfrost, }, addresses::{ handle_address_asset_utxos_blockfrost, handle_address_extended_blockfrost, @@ -85,6 +86,10 @@ const DEFAULT_HANDLE_ACCOUNT_REWARDS_TOPIC: (&str, &str) = ( "handle-topic-account-rewards", "rest.get.accounts.*.rewards", ); +const DEFAULT_HANDLE_ACCOUNT_ADDRESSES_TOPIC: (&str, &str) = ( + "handle-topic-account-addresses", + "rest.get.accounts.*.addresses", +); // Blocks topics const DEFAULT_HANDLE_BLOCKS_LATEST_HASH_NUMBER_TOPIC: (&str, &str) = @@ -311,6 +316,14 @@ impl BlockfrostREST { handle_account_rewards_blockfrost, ); + // Handler for /accounts/{stake_address}/addresses + register_handler( + context.clone(), + DEFAULT_HANDLE_ACCOUNT_ADDRESSES_TOPIC, + handlers_config.clone(), + handle_account_addresses_blockfrost, + ); + // Handler for /blocks/latest, /blocks/{hash_or_number} register_handler( context.clone(), diff --git a/modules/rest_blockfrost/src/types.rs b/modules/rest_blockfrost/src/types.rs index f798c6c4..74e68b73 100644 --- a/modules/rest_blockfrost/src/types.rs +++ b/modules/rest_blockfrost/src/types.rs @@ -894,3 +894,8 @@ impl TryFrom<&AccountReward> for AccountRewardREST { }) } } + +#[derive(Serialize)] +pub struct AccountAddressREST { + pub address: String, +} From 2e3ab67833aa23054d4a306e8279ec531e66d53b Mon Sep 17 00:00:00 2001 From: William Hankins Date: Fri, 31 Oct 2025 17:29:37 +0000 Subject: [PATCH 06/16] fix: clippy Signed-off-by: William Hankins --- common/src/address.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/common/src/address.rs b/common/src/address.rs index ed8d40c2..84b782dc 100644 --- a/common/src/address.rs +++ b/common/src/address.rs @@ -1119,8 +1119,8 @@ mod tests { // Normal address let addr_base = ShelleyAddress { network: NetworkId::Mainnet, - payment: ShelleyAddressPaymentPart::PaymentKeyHash(payment_hash.clone()), - delegation: ShelleyAddressDelegationPart::StakeKeyHash(stake_hash.clone()), + payment: ShelleyAddressPaymentPart::PaymentKeyHash(payment_hash), + delegation: ShelleyAddressDelegationPart::StakeKeyHash(stake_hash), }; let bytes = addr_base.to_bytes_key(); let decoded = ShelleyAddress::from_bytes_key(&bytes).expect("decode base"); @@ -1129,8 +1129,8 @@ mod tests { // Script address let addr_script = ShelleyAddress { network: NetworkId::Testnet, - payment: ShelleyAddressPaymentPart::ScriptHash(payment_hash.clone()), - delegation: ShelleyAddressDelegationPart::ScriptHash(script_hash.clone()), + payment: ShelleyAddressPaymentPart::ScriptHash(payment_hash), + delegation: ShelleyAddressDelegationPart::ScriptHash(script_hash), }; let bytes = addr_script.to_bytes_key(); let decoded = ShelleyAddress::from_bytes_key(&bytes).expect("decode script"); @@ -1144,7 +1144,7 @@ mod tests { }; let addr_pointer = ShelleyAddress { network: NetworkId::Mainnet, - payment: ShelleyAddressPaymentPart::PaymentKeyHash(payment_hash.clone()), + payment: ShelleyAddressPaymentPart::PaymentKeyHash(payment_hash), delegation: ShelleyAddressDelegationPart::Pointer(pointer), }; let bytes = addr_pointer.to_bytes_key(); From c110c302a4dbb1f4d546d310364f75d07006dc7f Mon Sep 17 00:00:00 2001 From: William Hankins Date: Fri, 31 Oct 2025 18:04:23 +0000 Subject: [PATCH 07/16] fix: message sync and use correct key prefix for immutable address getter Signed-off-by: William Hankins --- .../historical_accounts_state/src/historical_accounts_state.rs | 2 ++ .../src/immutable_historical_account_store.rs | 2 +- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/modules/historical_accounts_state/src/historical_accounts_state.rs b/modules/historical_accounts_state/src/historical_accounts_state.rs index 52a4fbcf..db200fb4 100644 --- a/modules/historical_accounts_state/src/historical_accounts_state.rs +++ b/modules/historical_accounts_state/src/historical_accounts_state.rs @@ -63,6 +63,8 @@ impl HistoricalAccountsState { ) -> Result<()> { let _ = params_subscription.read().await?; info!("Consumed initial genesis params from params_subscription"); + let _ = stake_address_deltas_subscription.read().await?; + info!("Consumed initial stake deltas from stake_address_deltas_subscription"); // Background task to persist epochs sequentially const MAX_PENDING_PERSISTS: usize = 1; diff --git a/modules/historical_accounts_state/src/immutable_historical_account_store.rs b/modules/historical_accounts_state/src/immutable_historical_account_store.rs index 301697c6..3e8650e7 100644 --- a/modules/historical_accounts_state/src/immutable_historical_account_store.rs +++ b/modules/historical_accounts_state/src/immutable_historical_account_store.rs @@ -268,7 +268,7 @@ impl ImmutableHistoricalAccountStore { &self, account: &StakeAddress, ) -> Result>> { - let prefix = account.get_hash(); + let prefix = account.to_binary(); let mut addresses: HashSet = HashSet::new(); for result in self.addresses.prefix(&prefix) { From a635dfdf2716031b7cb3842ff4bf6f904ccda475 Mon Sep 17 00:00:00 2001 From: William Hankins Date: Fri, 31 Oct 2025 23:53:34 +0000 Subject: [PATCH 08/16] refactor: use constructed address key with positional info Signed-off-by: William Hankins --- .../src/immutable_historical_account_store.rs | 20 ++++++++++++++----- 1 file changed, 15 insertions(+), 5 deletions(-) diff --git a/modules/historical_accounts_state/src/immutable_historical_account_store.rs b/modules/historical_accounts_state/src/immutable_historical_account_store.rs index 3e8650e7..14738e0a 100644 --- a/modules/historical_accounts_state/src/immutable_historical_account_store.rs +++ b/modules/historical_accounts_state/src/immutable_historical_account_store.rs @@ -133,8 +133,10 @@ impl ImmutableHistoricalAccountStore { // Persist address updates if config.store_addresses { if let Some(updates) = &entry.addresses { - for address in updates { - let address_key = Self::make_address_key(&account, address.clone()); + for (index, address) in updates.iter().enumerate() { + let idx = index as u32; + let address_key = + Self::make_address_key(&account, epoch, idx, address.clone()); batch.insert(&self.addresses, address_key, []); } } @@ -347,9 +349,17 @@ impl ImmutableHistoricalAccountStore { key } - fn make_address_key(account: &StakeAddress, address: ShelleyAddress) -> Vec { - let mut key = account.to_binary(); - key.extend(address.to_bytes_key()); + fn make_address_key( + account: &StakeAddress, + epoch: u32, + index: u32, + address: ShelleyAddress, + ) -> Vec { + let mut key = Vec::new(); + key.extend_from_slice(&account.to_binary()); + key.extend_from_slice(&epoch.to_be_bytes()); + key.extend_from_slice(&index.to_be_bytes()); + key.extend_from_slice(&address.to_bytes_key()); key } From 64cf7da893ebc47140aa009f7fa810a2d8ab8867 Mon Sep 17 00:00:00 2001 From: William Hankins Date: Sat, 1 Nov 2025 20:11:09 +0000 Subject: [PATCH 09/16] refactor: add timing info to pause logs Signed-off-by: William Hankins --- .../src/mithril_snapshot_fetcher.rs | 11 ++-- modules/mithril_snapshot_fetcher/src/pause.rs | 64 +++++++++++++------ 2 files changed, 49 insertions(+), 26 deletions(-) diff --git a/modules/mithril_snapshot_fetcher/src/mithril_snapshot_fetcher.rs b/modules/mithril_snapshot_fetcher/src/mithril_snapshot_fetcher.rs index 32c7ceac..2f24647e 100644 --- a/modules/mithril_snapshot_fetcher/src/mithril_snapshot_fetcher.rs +++ b/modules/mithril_snapshot_fetcher/src/mithril_snapshot_fetcher.rs @@ -335,14 +335,11 @@ impl MithrilSnapshotFetcher { // Check pause constraint if pause_constraint.should_pause(&block_info) { - let description = pause_constraint.get_description(); - let next_pause_constraint = pause_constraint.get_next(); - let next_description = next_pause_constraint.get_description(); - if prompt_pause(description, next_description).await { + if prompt_pause(pause_constraint.get_description()).await { info!("Continuing without further pauses..."); pause_constraint = PauseType::NoPause; } else { - pause_constraint = next_pause_constraint; + pause_constraint.next(); } } @@ -433,9 +430,9 @@ impl MithrilSnapshotFetcher { } /// Async helper to prompt user for pause behavior -async fn prompt_pause(description: String, next_description: String) -> bool { +async fn prompt_pause(description: String) -> bool { info!( - "Paused at {description}. Press [Enter] to step to {next_description}, or [c + Enter] to continue without pauses." + "Paused at {description}. Press [Enter] to step to to the next, or [c + Enter] to continue without pauses." ); tokio::task::spawn_blocking(|| { use std::io::{self, BufRead}; diff --git a/modules/mithril_snapshot_fetcher/src/pause.rs b/modules/mithril_snapshot_fetcher/src/pause.rs index 0e57bed1..e1abb649 100644 --- a/modules/mithril_snapshot_fetcher/src/pause.rs +++ b/modules/mithril_snapshot_fetcher/src/pause.rs @@ -1,3 +1,5 @@ +use std::time::Instant; + use acropolis_common::BlockInfo; use config::Config; use tracing::{error, info}; @@ -5,8 +7,14 @@ use tracing::{error, info}; #[derive(Debug, Clone, PartialEq)] pub enum PauseType { NoPause, - Epoch(u64), - Block(u64), + Epoch { + number: u64, + start_time: std::time::Instant, + }, + Block { + number: u64, + start_time: std::time::Instant, + }, } impl PauseType { @@ -26,15 +34,22 @@ impl PauseType { let pause_type = parts[0].trim(); let value = parts[1].trim().parse::().ok()?; + let start_time = Instant::now(); match pause_type { "epoch" => { info!("Pausing enabled at epoch {value}"); - Some(PauseType::Epoch(value)) + Some(PauseType::Epoch { + number: value, + start_time, + }) } "block" => { info!("Pausing enabled at block {value}"); - Some(PauseType::Block(value)) + Some(PauseType::Block { + number: value, + start_time, + }) } _ => { error!( @@ -48,29 +63,34 @@ impl PauseType { pub fn should_pause(&self, block_info: &BlockInfo) -> bool { match self { - PauseType::Epoch(target_epoch) => { - if block_info.new_epoch { - return block_info.epoch == *target_epoch; - } - false - } - PauseType::Block(target_block) => block_info.number == *target_block, + PauseType::Epoch { number, .. } => block_info.new_epoch && block_info.epoch == *number, + PauseType::Block { number, .. } => block_info.number == *number, PauseType::NoPause => false, } } - pub fn get_next(&self) -> Self { + pub fn next(&mut self) { match self { - PauseType::Epoch(target_epoch) => PauseType::Epoch(target_epoch + 1), - PauseType::Block(target_block) => PauseType::Block(target_block + 1), - PauseType::NoPause => PauseType::NoPause, + PauseType::Epoch { number, start_time } => { + *number += 1; + *start_time = Instant::now(); + } + PauseType::Block { number, start_time } => { + *number += 1; + *start_time = Instant::now(); + } + PauseType::NoPause => {} } } pub fn get_description(&self) -> String { match self { - PauseType::Epoch(target_epoch) => format!("Epoch {target_epoch}"), - PauseType::Block(target_block) => format!("Block {target_block}"), + PauseType::Epoch { number, start_time } => { + format!("Epoch {number} (started {:?} ago)", start_time.elapsed()) + } + PauseType::Block { number, start_time } => { + format!("Block {number} (started {:?} ago)", start_time.elapsed()) + } PauseType::NoPause => "No pause".to_string(), } } @@ -85,14 +105,20 @@ mod tests { fn test_pause_type_from_config_epoch() { let config = Config::builder().set_override("pause", "epoch:100").unwrap().build().unwrap(); let pause_type = PauseType::from_config(&config, DEFAULT_PAUSE); - assert_eq!(pause_type, Some(PauseType::Epoch(100))); + match pause_type { + Some(PauseType::Epoch { number, .. }) => assert_eq!(number, 100), + _ => panic!("Expected Some(PauseType::Epoch {{ number: 100, .. }})"), + } } #[test] fn test_pause_type_from_config_block() { let config = Config::builder().set_override("pause", "block:100").unwrap().build().unwrap(); let pause_type = PauseType::from_config(&config, DEFAULT_PAUSE); - assert_eq!(pause_type, Some(PauseType::Block(100))); + match pause_type { + Some(PauseType::Block { number, .. }) => assert_eq!(number, 100), + _ => panic!("Expected Some(PauseType::Block {{ number: 100, .. }})"), + } } #[test] From d364c32e694dbfd4812b860e28bce86f6c6f35a9 Mon Sep 17 00:00:00 2001 From: William Hankins Date: Mon, 3 Nov 2025 21:15:43 +0000 Subject: [PATCH 10/16] refactor: account addresses getter to remove duplicates Signed-off-by: William Hankins --- .../src/immutable_historical_account_store.rs | 17 +++++++++----- .../historical_accounts_state/src/state.rs | 22 +++++++++---------- modules/stake_delta_filter/src/utils.rs | 15 ++++++++----- processes/omnibus/omnibus.toml | 2 +- processes/omnibus/src/main.rs | 2 +- 5 files changed, 34 insertions(+), 24 deletions(-) diff --git a/modules/historical_accounts_state/src/immutable_historical_account_store.rs b/modules/historical_accounts_state/src/immutable_historical_account_store.rs index 14738e0a..0a2b5758 100644 --- a/modules/historical_accounts_state/src/immutable_historical_account_store.rs +++ b/modules/historical_accounts_state/src/immutable_historical_account_store.rs @@ -269,20 +269,27 @@ impl ImmutableHistoricalAccountStore { pub async fn get_addresses( &self, account: &StakeAddress, - ) -> Result>> { + ) -> Result>> { let prefix = account.to_binary(); - let mut addresses: HashSet = HashSet::new(); + let mut addresses = Vec::new(); + let mut seen = HashSet::new(); for result in self.addresses.prefix(&prefix) { let (key, _) = result?; - let shelley = ShelleyAddress::from_bytes_key(&key[prefix.len()..])?; - addresses.insert(shelley); + let shelley = ShelleyAddress::from_bytes_key(&key[prefix.len() + 8..])?; + if seen.insert(shelley.clone()) { + addresses.push(shelley); + } } for block_map in self.pending.lock().await.iter() { if let Some(entry) = block_map.get(account) { if let Some(addrs) = &entry.addresses { - addresses.extend(addrs.iter().cloned()); + for addr in addrs { + if seen.insert(addr.clone()) { + addresses.push(addr.clone()); + } + } } } } diff --git a/modules/historical_accounts_state/src/state.rs b/modules/historical_accounts_state/src/state.rs index 9ca247e2..0532892d 100644 --- a/modules/historical_accounts_state/src/state.rs +++ b/modules/historical_accounts_state/src/state.rs @@ -358,25 +358,25 @@ impl State { &self, account: &StakeAddress, ) -> Result>> { - let immutable = self.immutable.get_addresses(account).await?; + let mut addresses = match self.immutable.get_addresses(account).await? { + Some(list) => list, + None => Vec::new(), + }; - let mut volatile = HashSet::new(); + let mut seen: HashSet = addresses.iter().cloned().collect(); for block_map in self.volatile.window.iter() { if let Some(entry) = block_map.get(account) { if let Some(pending) = &entry.addresses { - volatile.extend(pending.iter().cloned()); + for addr in pending { + if seen.insert(addr.clone()) { + addresses.push(addr.clone()); + } + } } } } - match immutable { - Some(mut addresses) => { - addresses.extend(volatile); - Ok(Some(addresses.into_iter().collect())) - } - None if volatile.is_empty() => Ok(None), - None => Ok(Some(volatile.into_iter().collect())), - } + Ok((!addresses.is_empty()).then_some(addresses)) } fn handle_stake_registration_change( diff --git a/modules/stake_delta_filter/src/utils.rs b/modules/stake_delta_filter/src/utils.rs index caeb5b4d..ab311666 100644 --- a/modules/stake_delta_filter/src/utils.rs +++ b/modules/stake_delta_filter/src/utils.rs @@ -321,7 +321,8 @@ pub fn process_message( block: &BlockInfo, mut tracker: Option<&mut Tracker>, ) -> StakeAddressDeltasMessage { - let mut grouped: HashMap)> = HashMap::new(); + let mut grouped: HashMap, HashSet)> = + HashMap::new(); for d in delta.deltas.iter() { // Variants to be processed: // 1. Shelley Address delegation is a stake @@ -388,24 +389,26 @@ pub fn process_message( Address::Stake(stake_address) => (stake_address.clone(), None), }; - let entry = grouped.entry(stake_address).or_insert((0, HashSet::new())); + let entry = grouped.entry(stake_address).or_insert((0, Vec::new(), HashSet::new())); entry.0 += d.value.lovelace; if let Some(shelley) = shelley_opt { - entry.1.insert(shelley.clone()); + if entry.2.insert(shelley.clone()) { + entry.1.push(shelley.clone()); + } } } let deltas = grouped .into_iter() .map( - |(stake_address, (delta, shelley_addrs))| StakeAddressDelta { + |(stake_address, (delta, shelley_addrs, _))| StakeAddressDelta { stake_address, - addresses: shelley_addrs.into_iter().collect(), + addresses: shelley_addrs, delta, }, ) - .collect::>(); + .collect(); StakeAddressDeltasMessage { deltas } } diff --git a/processes/omnibus/omnibus.toml b/processes/omnibus/omnibus.toml index 6922217e..e3cd285d 100644 --- a/processes/omnibus/omnibus.toml +++ b/processes/omnibus/omnibus.toml @@ -71,7 +71,7 @@ store-registration-history = false store-delegation-history = false store-mir-history = false store-withdrawal-history = false -store-addresses = false +store-addresses = true [module.drep-state] # Enables /governance/dreps/{drep_id} endpoint (Requires store-delegators to be enabled) diff --git a/processes/omnibus/src/main.rs b/processes/omnibus/src/main.rs index ab2d4ca2..9ae13de7 100644 --- a/processes/omnibus/src/main.rs +++ b/processes/omnibus/src/main.rs @@ -118,7 +118,7 @@ pub async fn main() -> Result<()> { SPDDState::register(&mut process); DRDDState::register(&mut process); Consensus::register(&mut process); - ChainStore::register(&mut process); + //ChainStore::register(&mut process); Clock::::register(&mut process); RESTServer::::register(&mut process); From d48eec5a16c5be7996d87c7b3ca01bcf3de6cd70 Mon Sep 17 00:00:00 2001 From: William Hankins Date: Mon, 3 Nov 2025 22:38:49 +0000 Subject: [PATCH 11/16] fix: use Vec instead of HashSet for addresses field Signed-off-by: William Hankins --- .../src/immutable_historical_account_store.rs | 21 +++++++++++++------ .../historical_accounts_state/src/state.rs | 9 +++----- processes/omnibus/src/main.rs | 2 +- 3 files changed, 19 insertions(+), 13 deletions(-) diff --git a/modules/historical_accounts_state/src/immutable_historical_account_store.rs b/modules/historical_accounts_state/src/immutable_historical_account_store.rs index 0a2b5758..03ffbf80 100644 --- a/modules/historical_accounts_state/src/immutable_historical_account_store.rs +++ b/modules/historical_accounts_state/src/immutable_historical_account_store.rs @@ -316,7 +316,7 @@ impl ImmutableHistoricalAccountStore { ); Self::extend_opt_vec(&mut agg_entry.withdrawal_history, entry.withdrawal_history); Self::extend_opt_vec(&mut agg_entry.mir_history, entry.mir_history); - Self::extend_opt_set(&mut agg_entry.addresses, entry.addresses); + Self::extend_opt_vec_ordered(&mut agg_entry.addresses, entry.addresses); } acc }) @@ -378,13 +378,22 @@ impl ImmutableHistoricalAccountStore { } } - fn extend_opt_set(target: &mut Option>, src: Option>) + fn extend_opt_vec_ordered(target: &mut Option>, src: Option>) where - T: Eq + Hash, + T: Eq + Hash + Clone, { - if let Some(mut s) = src { - if !s.is_empty() { - target.get_or_insert_with(HashSet::new).extend(s.drain()); + if let Some(src_vec) = src { + if src_vec.is_empty() { + return; + } + + let target_vec = target.get_or_insert_with(Vec::new); + let mut seen: HashSet = target_vec.iter().cloned().collect(); + + for item in src_vec { + if seen.insert(item.clone()) { + target_vec.push(item); + } } } } diff --git a/modules/historical_accounts_state/src/state.rs b/modules/historical_accounts_state/src/state.rs index 0532892d..bb5c7eac 100644 --- a/modules/historical_accounts_state/src/state.rs +++ b/modules/historical_accounts_state/src/state.rs @@ -32,7 +32,7 @@ pub struct AccountEntry { pub registration_history: Option>, pub withdrawal_history: Option>, pub mir_history: Option>, - pub addresses: Option>, + pub addresses: Option>, } #[derive(Debug, Clone, minicbor::Decode, minicbor::Encode)] @@ -232,7 +232,7 @@ impl State { .entry(delta.stake_address.clone()) .or_default() .addresses - .get_or_insert_with(HashSet::new) + .get_or_insert_with(Vec::new) .extend(delta.addresses.clone()); } } @@ -358,10 +358,7 @@ impl State { &self, account: &StakeAddress, ) -> Result>> { - let mut addresses = match self.immutable.get_addresses(account).await? { - Some(list) => list, - None => Vec::new(), - }; + let mut addresses = self.immutable.get_addresses(account).await?.unwrap_or_default(); let mut seen: HashSet = addresses.iter().cloned().collect(); for block_map in self.volatile.window.iter() { diff --git a/processes/omnibus/src/main.rs b/processes/omnibus/src/main.rs index 9ae13de7..ab2d4ca2 100644 --- a/processes/omnibus/src/main.rs +++ b/processes/omnibus/src/main.rs @@ -118,7 +118,7 @@ pub async fn main() -> Result<()> { SPDDState::register(&mut process); DRDDState::register(&mut process); Consensus::register(&mut process); - //ChainStore::register(&mut process); + ChainStore::register(&mut process); Clock::::register(&mut process); RESTServer::::register(&mut process); From 78aaf96e1a41c3a30e390aa067d8e3d1f26a1623 Mon Sep 17 00:00:00 2001 From: William Hankins Date: Mon, 3 Nov 2025 22:41:08 +0000 Subject: [PATCH 12/16] fix: format Signed-off-by: William Hankins --- common/src/queries/accounts.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/common/src/queries/accounts.rs b/common/src/queries/accounts.rs index c41ea06f..80213455 100644 --- a/common/src/queries/accounts.rs +++ b/common/src/queries/accounts.rs @@ -1,6 +1,8 @@ use std::collections::HashMap; -use crate::{DRepChoice, PoolId, PoolLiveStakeInfo, RewardType, ShelleyAddress, StakeAddress, TxIdentifier}; +use crate::{ + DRepChoice, PoolId, PoolLiveStakeInfo, RewardType, ShelleyAddress, StakeAddress, TxIdentifier, +}; pub const DEFAULT_ACCOUNTS_QUERY_TOPIC: (&str, &str) = ("accounts-state-query-topic", "cardano.query.accounts"); From 5f6625a926ed4528725e0fc007dc04123bffe9c9 Mon Sep 17 00:00:00 2001 From: William Hankins Date: Tue, 4 Nov 2025 00:42:05 +0000 Subject: [PATCH 13/16] fix: replace tuple with helper struct in stake address delta aggregation Signed-off-by: William Hankins --- modules/stake_delta_filter/src/utils.rs | 34 ++++++++++++++++--------- processes/omnibus/omnibus.toml | 2 +- 2 files changed, 23 insertions(+), 13 deletions(-) diff --git a/modules/stake_delta_filter/src/utils.rs b/modules/stake_delta_filter/src/utils.rs index ab311666..902c365c 100644 --- a/modules/stake_delta_filter/src/utils.rs +++ b/modules/stake_delta_filter/src/utils.rs @@ -311,6 +311,13 @@ impl Tracker { } } +/// Internal helper used during `process_message` aggregation for deduplication. +struct StakeEntry { + delta: i64, + addresses: Vec, + seen: HashSet, + } + /// Iterates through all address deltas in `delta`, leaves only stake addresses /// (and removes all others). If the address is a pointer, tries to resolve it. /// If the pointer is incorrect, then filters it out too (incorrect pointers cannot @@ -321,8 +328,9 @@ pub fn process_message( block: &BlockInfo, mut tracker: Option<&mut Tracker>, ) -> StakeAddressDeltasMessage { - let mut grouped: HashMap, HashSet)> = + let mut grouped: HashMap = HashMap::new(); + for d in delta.deltas.iter() { // Variants to be processed: // 1. Shelley Address delegation is a stake @@ -389,25 +397,27 @@ pub fn process_message( Address::Stake(stake_address) => (stake_address.clone(), None), }; - let entry = grouped.entry(stake_address).or_insert((0, Vec::new(), HashSet::new())); - entry.0 += d.value.lovelace; + let entry = grouped.entry(stake_address).or_insert_with(|| StakeEntry { + delta: 0, + addresses: Vec::new(), + seen: HashSet::new(), + }); + entry.delta += d.value.lovelace; if let Some(shelley) = shelley_opt { - if entry.2.insert(shelley.clone()) { - entry.1.push(shelley.clone()); + if entry.seen.insert(shelley.clone()) { + entry.addresses.push(shelley.clone()); } } } let deltas = grouped .into_iter() - .map( - |(stake_address, (delta, shelley_addrs, _))| StakeAddressDelta { - stake_address, - addresses: shelley_addrs, - delta, - }, - ) + .map(|(stake_address, entry)| StakeAddressDelta { + stake_address, + addresses: entry.addresses, + delta: entry.delta, + }) .collect(); StakeAddressDeltasMessage { deltas } diff --git a/processes/omnibus/omnibus.toml b/processes/omnibus/omnibus.toml index e3cd285d..6922217e 100644 --- a/processes/omnibus/omnibus.toml +++ b/processes/omnibus/omnibus.toml @@ -71,7 +71,7 @@ store-registration-history = false store-delegation-history = false store-mir-history = false store-withdrawal-history = false -store-addresses = true +store-addresses = false [module.drep-state] # Enables /governance/dreps/{drep_id} endpoint (Requires store-delegators to be enabled) From 084ed313445ac7f601b7cab2f1c198dfbe662320 Mon Sep 17 00:00:00 2001 From: William Hankins Date: Tue, 4 Nov 2025 00:43:31 +0000 Subject: [PATCH 14/16] fix: format Signed-off-by: William Hankins --- modules/stake_delta_filter/src/utils.rs | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/modules/stake_delta_filter/src/utils.rs b/modules/stake_delta_filter/src/utils.rs index 902c365c..b5c6f109 100644 --- a/modules/stake_delta_filter/src/utils.rs +++ b/modules/stake_delta_filter/src/utils.rs @@ -313,10 +313,10 @@ impl Tracker { /// Internal helper used during `process_message` aggregation for deduplication. struct StakeEntry { - delta: i64, - addresses: Vec, - seen: HashSet, - } + delta: i64, + addresses: Vec, + seen: HashSet, +} /// Iterates through all address deltas in `delta`, leaves only stake addresses /// (and removes all others). If the address is a pointer, tries to resolve it. @@ -328,8 +328,7 @@ pub fn process_message( block: &BlockInfo, mut tracker: Option<&mut Tracker>, ) -> StakeAddressDeltasMessage { - let mut grouped: HashMap = - HashMap::new(); + let mut grouped: HashMap = HashMap::new(); for d in delta.deltas.iter() { // Variants to be processed: From 10c2c09b62ec44bd8ac9955caaa86d9ba29182c0 Mon Sep 17 00:00:00 2001 From: William Hankins Date: Tue, 4 Nov 2025 18:34:51 +0000 Subject: [PATCH 15/16] fix: Shelley to/from_bytes_key header byte encoding Signed-off-by: William Hankins --- common/src/address.rs | 96 +++++++++++++++++++++++++++++++++---------- 1 file changed, 74 insertions(+), 22 deletions(-) diff --git a/common/src/address.rs b/common/src/address.rs index 7c74f4fa..45802f9f 100644 --- a/common/src/address.rs +++ b/common/src/address.rs @@ -308,8 +308,10 @@ impl ShelleyAddress { let mut data = Vec::new(); - let build_header = - |variant: u8| -> u8 { network_bits | (payment_bits << 4) | (variant << 5) }; + let build_header = |delegation_bits: u8| -> u8 { + let addr_type = ((delegation_bits & 0x03) << 1) | (payment_bits & 0x01); + (addr_type << 4) | (network_bits & 0x0F) + }; match &self.delegation { ShelleyAddressDelegationPart::None => { @@ -357,8 +359,10 @@ impl ShelleyAddress { _ => return Err(anyhow!("invalid network bits in header")), }; - let payment_bits = (header >> 4) & 0x01; - let delegation_bits = (header >> 5) & 0x03; + let addr_type = (header >> 4) & 0x0F; + + let payment_bits = addr_type & 0x01; + let delegation_bits = (addr_type >> 1) & 0x03; let payment_hash = { let mut arr = [0u8; 28]; @@ -1117,49 +1121,97 @@ mod tests { let stake_hash = Hash::new([0x22; 28]); let script_hash = Hash::new([0x33; 28]); - // Normal address - let addr_base = ShelleyAddress { + // (KeyHash, StakeKeyHash) + let type_1 = ShelleyAddress { network: NetworkId::Mainnet, payment: ShelleyAddressPaymentPart::PaymentKeyHash(payment_hash), delegation: ShelleyAddressDelegationPart::StakeKeyHash(stake_hash), }; - let bytes = addr_base.to_bytes_key(); + let bytes = type_1.to_bytes_key(); + assert_eq!(bytes[0], 0x01); + let decoded = ShelleyAddress::from_bytes_key(&bytes).expect("decode base"); + assert_eq!(type_1, decoded); + + // (ScriptKeyHash, StakeKeyHash) + let type_2 = ShelleyAddress { + network: NetworkId::Mainnet, + payment: ShelleyAddressPaymentPart::ScriptHash(payment_hash), + delegation: ShelleyAddressDelegationPart::StakeKeyHash(stake_hash), + }; + let bytes = type_2.to_bytes_key(); + assert_eq!(bytes[0], 0x11); + let decoded = ShelleyAddress::from_bytes_key(&bytes).expect("decode base"); + assert_eq!(type_2, decoded); + + // (KeyHash, ScriptHash) + let type_3 = ShelleyAddress { + network: NetworkId::Mainnet, + payment: ShelleyAddressPaymentPart::PaymentKeyHash(payment_hash), + delegation: ShelleyAddressDelegationPart::ScriptHash(stake_hash), + }; + let bytes = type_3.to_bytes_key(); + assert_eq!(bytes[0], 0x21); let decoded = ShelleyAddress::from_bytes_key(&bytes).expect("decode base"); - assert_eq!(addr_base, decoded); + assert_eq!(type_3, decoded); - // Script address - let addr_script = ShelleyAddress { - network: NetworkId::Testnet, + // (ScriptHash, ScriptHash) + let type_4 = ShelleyAddress { + network: NetworkId::Mainnet, payment: ShelleyAddressPaymentPart::ScriptHash(payment_hash), delegation: ShelleyAddressDelegationPart::ScriptHash(script_hash), }; - let bytes = addr_script.to_bytes_key(); + let bytes = type_4.to_bytes_key(); + assert_eq!(bytes[0], 0x31); let decoded = ShelleyAddress::from_bytes_key(&bytes).expect("decode script"); - assert_eq!(addr_script, decoded); + assert_eq!(type_4, decoded); - // Pointer address + // (KeyHash, Pointer) let pointer = ShelleyAddressPointer { slot: 1234, tx_index: 56, cert_index: 2, }; - let addr_pointer = ShelleyAddress { + let type_5 = ShelleyAddress { network: NetworkId::Mainnet, payment: ShelleyAddressPaymentPart::PaymentKeyHash(payment_hash), + delegation: ShelleyAddressDelegationPart::Pointer(pointer.clone()), + }; + let bytes = type_5.to_bytes_key(); + assert_eq!(bytes[0], 0x41); + let decoded = ShelleyAddress::from_bytes_key(&bytes).expect("decode pointer"); + assert_eq!(type_5, decoded); + + // (ScriptHash, Pointer) + let type_6 = ShelleyAddress { + network: NetworkId::Mainnet, + payment: ShelleyAddressPaymentPart::ScriptHash(payment_hash), delegation: ShelleyAddressDelegationPart::Pointer(pointer), }; - let bytes = addr_pointer.to_bytes_key(); + let bytes = type_6.to_bytes_key(); + assert_eq!(bytes[0], 0x51); let decoded = ShelleyAddress::from_bytes_key(&bytes).expect("decode pointer"); - assert_eq!(addr_pointer, decoded); + assert_eq!(type_6, decoded); - // Enterprise address - let addr_none = ShelleyAddress { - network: NetworkId::Testnet, + // (KeyHash, None) + let type_7 = ShelleyAddress { + network: NetworkId::Mainnet, payment: ShelleyAddressPaymentPart::PaymentKeyHash(payment_hash), delegation: ShelleyAddressDelegationPart::None, }; - let bytes = addr_none.to_bytes_key(); + let bytes = type_7.to_bytes_key(); + assert_eq!(bytes[0], 0x61); + let decoded = ShelleyAddress::from_bytes_key(&bytes).expect("decode none"); + assert_eq!(type_7, decoded); + + // (ScriptHash, None) + let type_8 = ShelleyAddress { + network: NetworkId::Mainnet, + payment: ShelleyAddressPaymentPart::ScriptHash(payment_hash), + delegation: ShelleyAddressDelegationPart::None, + }; + let bytes = type_8.to_bytes_key(); + assert_eq!(bytes[0], 0x71); let decoded = ShelleyAddress::from_bytes_key(&bytes).expect("decode none"); - assert_eq!(addr_none, decoded); + assert_eq!(type_8, decoded); } } From 39a891918a302e04570b1a9668512a55e80ec6d3 Mon Sep 17 00:00:00 2001 From: William Hankins Date: Wed, 5 Nov 2025 03:19:01 +0000 Subject: [PATCH 16/16] fix: reject Shelley addresses with invalid high bit in from_bytes_key Signed-off-by: William Hankins --- common/src/address.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/common/src/address.rs b/common/src/address.rs index 45802f9f..9fb8b03a 100644 --- a/common/src/address.rs +++ b/common/src/address.rs @@ -353,6 +353,11 @@ impl ShelleyAddress { } let header = data[0]; + + if header & 0x80 != 0 { + return Err(anyhow!("invalid header: high bit set")); + } + let network = match header & 0x0F { 0 => NetworkId::Testnet, 1 => NetworkId::Mainnet,