From fac87315242a06ffb6e0b5600680343cd254003e Mon Sep 17 00:00:00 2001 From: William Hankins Date: Fri, 24 Oct 2025 19:49:50 +0000 Subject: [PATCH 1/7] refactor: expand WithdrawalMessage with positional information Signed-off-by: William Hankins --- common/src/messages.rs | 2 +- common/src/types.rs | 6 ++++++ modules/accounts_state/src/state.rs | 12 ++++++++---- modules/historical_accounts_state/src/state.rs | 11 ++++++++++- modules/spo_state/src/state.rs | 2 +- modules/tx_unpacker/src/tx_unpacker.rs | 9 ++++++--- 6 files changed, 32 insertions(+), 10 deletions(-) diff --git a/common/src/messages.rs b/common/src/messages.rs index 2cd325a6..99303006 100644 --- a/common/src/messages.rs +++ b/common/src/messages.rs @@ -104,7 +104,7 @@ pub struct AddressDeltasMessage { #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub struct WithdrawalsMessage { /// Set of withdrawals - pub withdrawals: Vec, + pub withdrawals: Vec, } /// Pot deltas message diff --git a/common/src/types.rs b/common/src/types.rs index 67b40e04..2d74a6f9 100644 --- a/common/src/types.rs +++ b/common/src/types.rs @@ -607,6 +607,12 @@ pub struct Withdrawal { pub value: Lovelace, } +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct WithdrawalWithPos { + pub withdrawal: Withdrawal, + pub tx_identifier: TxIdentifier, +} + /// Treasury pot account #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub enum Pot { diff --git a/modules/accounts_state/src/state.rs b/modules/accounts_state/src/state.rs index ffea3a0d..d8dd7ac1 100644 --- a/modules/accounts_state/src/state.rs +++ b/modules/accounts_state/src/state.rs @@ -967,7 +967,7 @@ impl State { pub fn handle_withdrawals(&mut self, withdrawals_msg: &WithdrawalsMessage) -> Result<()> { for withdrawal in withdrawals_msg.withdrawals.iter() { let mut stake_addresses = self.stake_addresses.lock().unwrap(); - stake_addresses.process_withdrawal(withdrawal); + stake_addresses.process_withdrawal(&withdrawal.withdrawal); } Ok(()) @@ -1019,6 +1019,7 @@ mod tests { StakeRegistrationAndStakeAndVoteDelegation, StakeRegistrationAndStakeAndVoteDelegationWithPos, StakeRegistrationAndVoteDelegation, StakeRegistrationAndVoteDelegationWithPos, TxIdentifier, VoteDelegation, Withdrawal, + WithdrawalWithPos, }; // Helper to create a StakeAddress from a byte slice @@ -1331,9 +1332,12 @@ mod tests { // Withdraw most of it let withdrawals = WithdrawalsMessage { - withdrawals: vec![Withdrawal { - address: stake_address.clone(), - value: 39, + withdrawals: vec![WithdrawalWithPos { + withdrawal: Withdrawal { + address: stake_address.clone(), + value: 39, + }, + tx_identifier: TxIdentifier::default(), }], }; diff --git a/modules/historical_accounts_state/src/state.rs b/modules/historical_accounts_state/src/state.rs index 576b13c9..1a4edcaf 100644 --- a/modules/historical_accounts_state/src/state.rs +++ b/modules/historical_accounts_state/src/state.rs @@ -226,7 +226,16 @@ impl State { Ok(()) } - pub fn handle_withdrawals(&mut self, _withdrawals: &WithdrawalsMessage) -> Result<()> { + pub fn handle_withdrawals(&mut self, withdrawals_msg: &WithdrawalsMessage) -> Result<()> { + for withdrawal in &withdrawals_msg.withdrawals { + let volatile = self.volatile.window.back_mut().expect("window should never be empty"); + let entry = volatile.entry(withdrawal.withdrawal.address.clone()).or_default(); + let withdrawal_entry = AccountWithdrawal { + tx_identifier: withdrawal.tx_identifier, + amount: withdrawal.withdrawal.value, + }; + entry.withdrawal_history.get_or_insert(Vec::new()).push(withdrawal_entry) + } Ok(()) } diff --git a/modules/spo_state/src/state.rs b/modules/spo_state/src/state.rs index 4c0c8b64..3593c92b 100644 --- a/modules/spo_state/src/state.rs +++ b/modules/spo_state/src/state.rs @@ -641,7 +641,7 @@ impl State { }; let mut stake_addresses = stake_addresses.lock().unwrap(); for withdrawal in withdrawals_msg.withdrawals.iter() { - stake_addresses.process_withdrawal(withdrawal); + stake_addresses.process_withdrawal(&withdrawal.withdrawal); } Ok(()) diff --git a/modules/tx_unpacker/src/tx_unpacker.rs b/modules/tx_unpacker/src/tx_unpacker.rs index 3be139ce..d776cf2b 100644 --- a/modules/tx_unpacker/src/tx_unpacker.rs +++ b/modules/tx_unpacker/src/tx_unpacker.rs @@ -285,9 +285,12 @@ impl TxUnpacker { for (key, value) in tx_withdrawals { match StakeAddress::from_binary(key) { Ok(stake_address) => { - withdrawals.push(Withdrawal { - address: stake_address, - value, + withdrawals.push(WithdrawalWithPos { + withdrawal: Withdrawal { + address: stake_address, + value + }, + tx_identifier, }); } From fe78ecc1dda30f487cddafc9805c9e893a9fe267 Mon Sep 17 00:00:00 2001 From: William Hankins Date: Fri, 24 Oct 2025 20:05:22 +0000 Subject: [PATCH 2/7] implement withdrawal history getter and query handler Signed-off-by: William Hankins --- common/src/queries/accounts.rs | 4 +- .../src/historical_accounts_state.rs | 19 ++++---- .../src/immutable_historical_account_store.rs | 2 +- .../historical_accounts_state/src/state.rs | 44 +++++++++++-------- 4 files changed, 38 insertions(+), 31 deletions(-) diff --git a/common/src/queries/accounts.rs b/common/src/queries/accounts.rs index f60e7239..01cce175 100644 --- a/common/src/queries/accounts.rs +++ b/common/src/queries/accounts.rs @@ -18,7 +18,7 @@ pub enum AccountsStateQuery { GetAccountRegistrationHistory { account: StakeAddress }, GetAccountDelegationHistory { account: StakeAddress }, GetAccountMIRHistory { account: StakeAddress }, - GetAccountWithdrawalHistory { stake_key: Vec }, + GetAccountWithdrawalHistory { account: StakeAddress }, GetAccountAssociatedAddresses { stake_key: Vec }, GetAccountAssets { stake_key: Vec }, GetAccountAssetsTotals { stake_key: Vec }, @@ -52,7 +52,7 @@ pub enum AccountsStateQueryResponse { AccountRegistrationHistory(Vec), AccountDelegationHistory(Vec), AccountMIRHistory(Vec), - AccountWithdrawalHistory(AccountWithdrawalHistory), + AccountWithdrawalHistory(Vec), AccountAssociatedAddresses(AccountAssociatedAddresses), AccountAssets(AccountAssets), AccountAssetsTotals(AccountAssetsTotals), diff --git a/modules/historical_accounts_state/src/historical_accounts_state.rs b/modules/historical_accounts_state/src/historical_accounts_state.rs index fb957d60..09ef87ba 100644 --- a/modules/historical_accounts_state/src/historical_accounts_state.rs +++ b/modules/historical_accounts_state/src/historical_accounts_state.rs @@ -144,10 +144,7 @@ impl HistoricalAccountsState { Self::check_sync(¤t_block, &block_info); let mut state = state_mutex.lock().await; - state - .handle_tx_certificates(tx_certs_msg, block_info.epoch as u32) - .inspect_err(|e| error!("TxCertificates handling error: {e:#}")) - .ok(); + state.handle_tx_certificates(tx_certs_msg, block_info.epoch as u32); } _ => error!("Unexpected message type: {certs_message:?}"), @@ -165,10 +162,7 @@ impl HistoricalAccountsState { Self::check_sync(¤t_block, &block_info); let mut state = state_mutex.lock().await; - state - .handle_withdrawals(withdrawals_msg) - .inspect_err(|e| error!("Withdrawals handling error: {e:#}")) - .ok(); + state.handle_withdrawals(withdrawals_msg); } _ => error!("Unexpected message type: {message:?}"), @@ -343,7 +337,14 @@ impl HistoricalAccountsState { Err(e) => AccountsStateQueryResponse::Error(e.to_string()), } } - + AccountsStateQuery::GetAccountWithdrawalHistory { account } => { + match state.lock().await.get_withdrawal_history(&account).await { + Ok(withdrawals) => { + AccountsStateQueryResponse::AccountWithdrawalHistory(withdrawals) + } + Err(e) => AccountsStateQueryResponse::Error(e.to_string()), + } + } _ => AccountsStateQueryResponse::Error(format!( "Unimplemented query variant: {:?}", query diff --git a/modules/historical_accounts_state/src/immutable_historical_account_store.rs b/modules/historical_accounts_state/src/immutable_historical_account_store.rs index a2e4b2af..a1e66ffe 100644 --- a/modules/historical_accounts_state/src/immutable_historical_account_store.rs +++ b/modules/historical_accounts_state/src/immutable_historical_account_store.rs @@ -235,7 +235,7 @@ impl ImmutableHistoricalAccountStore { Ok((!immutable_mirs.is_empty()).then_some(immutable_mirs)) } - pub async fn _get_withdrawal_history( + pub async fn get_withdrawal_history( &self, account: &StakeAddress, ) -> Result>> { diff --git a/modules/historical_accounts_state/src/state.rs b/modules/historical_accounts_state/src/state.rs index 1a4edcaf..04c3a312 100644 --- a/modules/historical_accounts_state/src/state.rs +++ b/modules/historical_accounts_state/src/state.rs @@ -118,11 +118,7 @@ impl State { Ok(()) } - pub fn handle_tx_certificates( - &mut self, - tx_certs: &TxCertificatesMessage, - epoch: u32, - ) -> Result<()> { + pub fn handle_tx_certificates(&mut self, tx_certs: &TxCertificatesMessage, epoch: u32) { // Handle certificates for tx_cert in tx_certs.certificates.iter() { match tx_cert { @@ -219,24 +215,26 @@ impl State { _ => (), }; } - Ok(()) } pub fn handle_address_deltas(&mut self, _address_deltas: &AddressDeltasMessage) -> Result<()> { Ok(()) } - pub fn handle_withdrawals(&mut self, withdrawals_msg: &WithdrawalsMessage) -> Result<()> { - for withdrawal in &withdrawals_msg.withdrawals { - let volatile = self.volatile.window.back_mut().expect("window should never be empty"); - let entry = volatile.entry(withdrawal.withdrawal.address.clone()).or_default(); - let withdrawal_entry = AccountWithdrawal { - tx_identifier: withdrawal.tx_identifier, - amount: withdrawal.withdrawal.value, - }; - entry.withdrawal_history.get_or_insert(Vec::new()).push(withdrawal_entry) + pub fn handle_withdrawals(&mut self, withdrawals_msg: &WithdrawalsMessage) { + let window = self.volatile.window.back_mut().expect("window should never be empty"); + + for w in &withdrawals_msg.withdrawals { + window + .entry(w.withdrawal.address.clone()) + .or_default() + .withdrawal_history + .get_or_insert_with(Vec::new) + .push(AccountWithdrawal { + tx_identifier: w.tx_identifier, + amount: w.withdrawal.value, + }) } - Ok(()) } pub async fn _get_reward_history( @@ -293,11 +291,19 @@ impl State { Ok(mirs) } - pub async fn _get_withdrawal_history( + pub async fn get_withdrawal_history( &self, - _account: &StakeAddress, + account: &StakeAddress, ) -> Result> { - Ok(Vec::new()) + let mut withdrawals = + self.immutable.get_withdrawal_history(&account).await?.unwrap_or_default(); + + self.merge_volatile_history( + &account, + |e| e.withdrawal_history.as_ref(), + &mut withdrawals, + ); + Ok(withdrawals) } pub async fn _get_addresses(&self, _account: StakeAddress) -> Result> { From 37f358b8208ac006fbc7e323dd2f13f8bcf27fd6 Mon Sep 17 00:00:00 2001 From: William Hankins Date: Fri, 24 Oct 2025 20:44:01 +0000 Subject: [PATCH 3/7] add account withdrawals REST handler Signed-off-by: William Hankins --- .../rest_blockfrost/src/handlers/accounts.rs | 114 ++++++++++++++++++ .../rest_blockfrost/src/handlers_config.rs | 8 +- .../rest_blockfrost/src/rest_blockfrost.rs | 16 ++- modules/rest_blockfrost/src/types.rs | 6 + 4 files changed, 142 insertions(+), 2 deletions(-) diff --git a/modules/rest_blockfrost/src/handlers/accounts.rs b/modules/rest_blockfrost/src/handlers/accounts.rs index 7ae84995..e19d0da0 100644 --- a/modules/rest_blockfrost/src/handlers/accounts.rs +++ b/modules/rest_blockfrost/src/handlers/accounts.rs @@ -3,6 +3,9 @@ use std::sync::Arc; use acropolis_common::messages::{Message, RESTResponse, StateQuery, StateQueryResponse}; use acropolis_common::queries::accounts::{AccountsStateQuery, AccountsStateQueryResponse}; +use acropolis_common::queries::blocks::{ + BlocksStateQuery, BlocksStateQueryResponse, TransactionHashes, +}; use acropolis_common::queries::utils::query_state; use acropolis_common::serialization::Bech32WithHrp; use acropolis_common::{DRepChoice, StakeAddress}; @@ -10,6 +13,7 @@ use anyhow::{anyhow, Result}; use caryatid_sdk::Context; use crate::handlers_config::HandlersConfig; +use crate::types::AccountWithdrawalREST; #[derive(serde::Serialize)] pub struct StakeAccountRest { @@ -124,6 +128,116 @@ pub async fn handle_single_account_blockfrost( } } +pub async fn handle_account_withdrawals_blockfrost( + context: Arc>, + params: Vec, + handlers_config: Arc, +) -> Result { + let Some(stake_key) = params.get(0) else { + return Ok(RESTResponse::with_text( + 400, + "Missing stake address parameter", + )); + }; + + // Convert Bech32 stake address to StakeAddress + let stake_address = match StakeAddress::from_string(&stake_key) { + Ok(addr) => addr, + _ => { + return Ok(RESTResponse::with_text( + 400, + &format!("Not a valid stake address: {stake_key}"), + )); + } + }; + + // Prepare the message + let msg = Arc::new(Message::StateQuery(StateQuery::Accounts( + AccountsStateQuery::GetAccountRegistrationHistory { + account: stake_address, + }, + ))); + + // Get withdrawals from historical accounts state + let withdrawals = query_state( + &context, + &handlers_config.historical_accounts_query_topic, + msg, + |message| match message { + Message::StateQueryResponse(StateQueryResponse::Accounts( + AccountsStateQueryResponse::AccountWithdrawalHistory(registrations), + )) => Ok(registrations), + Message::StateQueryResponse(StateQueryResponse::Accounts( + AccountsStateQueryResponse::NotFound, + )) => { + return Err(anyhow::anyhow!("Account not found")); + } + Message::StateQueryResponse(StateQueryResponse::Accounts( + AccountsStateQueryResponse::Error(e), + )) => { + return Err(anyhow::anyhow!( + "Internal server error while retrieving account info: {e}" + )); + } + _ => { + return Err(anyhow::anyhow!( + "Unexpected message type while retrieving account info" + )) + } + }, + ) + .await?; + + // Get TxHashes from TxIdentifiers + let tx_ids: Vec<_> = withdrawals.iter().map(|r| r.tx_identifier.clone()).collect(); + let msg = Arc::new(Message::StateQuery(StateQuery::Blocks( + BlocksStateQuery::GetTransactionHashes { tx_ids }, + ))); + let tx_hashes = query_state( + &context, + &handlers_config.blocks_query_topic, + msg, + |message| match message { + Message::StateQueryResponse(StateQueryResponse::Blocks( + BlocksStateQueryResponse::TransactionHashes(TransactionHashes { tx_hashes }), + )) => Ok(tx_hashes), + Message::StateQueryResponse(StateQueryResponse::Blocks( + BlocksStateQueryResponse::Error(e), + )) => Err(anyhow::anyhow!( + "Internal server error while resolving transaction hashes: {e}" + )), + _ => Err(anyhow::anyhow!( + "Unexpected message type while resolving transaction hashes" + )), + }, + ) + .await?; + + let mut rest_response = Vec::new(); + + for w in withdrawals { + let Some(tx_hash) = tx_hashes.get(&w.tx_identifier) else { + return Ok(RESTResponse::with_text( + 500, + "Missing tx hash for withdrawal", + )); + }; + + rest_response.push(AccountWithdrawalREST { + tx_hash: hex::encode(tx_hash), + amount: w.amount.to_string(), + }); + } + + match serde_json::to_string(&rest_response) { + Ok(json) => Ok(RESTResponse::with_json(200, &json)), + Err(e) => Ok(RESTResponse::with_text( + 500, + &format!("Internal server error while retrieving DRep delegation distribution: {e}"), + )), + } +} + fn map_drep_choice(drep: &DRepChoice) -> Result { match drep { DRepChoice::Key(hash) => { diff --git a/modules/rest_blockfrost/src/handlers_config.rs b/modules/rest_blockfrost/src/handlers_config.rs index 029541d1..837bfb06 100644 --- a/modules/rest_blockfrost/src/handlers_config.rs +++ b/modules/rest_blockfrost/src/handlers_config.rs @@ -1,7 +1,7 @@ use std::sync::Arc; use acropolis_common::queries::{ - accounts::DEFAULT_ACCOUNTS_QUERY_TOPIC, + accounts::{DEFAULT_ACCOUNTS_QUERY_TOPIC, DEFAULT_HISTORICAL_ACCOUNTS_QUERY_TOPIC}, addresses::DEFAULT_ADDRESS_QUERY_TOPIC, assets::{DEFAULT_ASSETS_QUERY_TOPIC, DEFAULT_OFFCHAIN_TOKEN_REGISTRY_URL}, blocks::DEFAULT_BLOCKS_QUERY_TOPIC, @@ -19,6 +19,7 @@ const DEFAULT_EXTERNAL_API_TIMEOUT: (&str, i64) = ("external_api_timeout", 3); / #[derive(Clone)] pub struct HandlersConfig { pub accounts_query_topic: String, + pub historical_accounts_query_topic: String, pub addresses_query_topic: String, pub assets_query_topic: String, pub blocks_query_topic: String, @@ -39,6 +40,10 @@ impl From> for HandlersConfig { .get_string(DEFAULT_ACCOUNTS_QUERY_TOPIC.0) .unwrap_or(DEFAULT_ACCOUNTS_QUERY_TOPIC.1.to_string()); + let historical_accounts_query_topic = config + .get_string(DEFAULT_HISTORICAL_ACCOUNTS_QUERY_TOPIC.0) + .unwrap_or(DEFAULT_HISTORICAL_ACCOUNTS_QUERY_TOPIC.1.to_string()); + let addresses_query_topic = config .get_string(DEFAULT_ADDRESS_QUERY_TOPIC.0) .unwrap_or(DEFAULT_ADDRESS_QUERY_TOPIC.1.to_string()); @@ -89,6 +94,7 @@ impl From> for HandlersConfig { Self { accounts_query_topic, + historical_accounts_query_topic, addresses_query_topic, assets_query_topic, blocks_query_topic, diff --git a/modules/rest_blockfrost/src/rest_blockfrost.rs b/modules/rest_blockfrost/src/rest_blockfrost.rs index ac72e60a..30ec46d4 100644 --- a/modules/rest_blockfrost/src/rest_blockfrost.rs +++ b/modules/rest_blockfrost/src/rest_blockfrost.rs @@ -58,11 +58,17 @@ use handlers::{ }, }; -use crate::handlers_config::HandlersConfig; +use crate::{ + handlers::accounts::handle_account_withdrawals_blockfrost, handlers_config::HandlersConfig, +}; // Accounts topics const DEFAULT_HANDLE_SINGLE_ACCOUNT_TOPIC: (&str, &str) = ("handle-topic-account-single", "rest.get.accounts.*"); +const DEFAULT_HANDLE_ACCOUNT_WITHDRAWALS_TOPIC: (&str, &str) = ( + "handle-topic-account-withdrawals", + "rest.get.accounts.*.withdrawals", +); // Blocks topics const DEFAULT_HANDLE_BLOCKS_LATEST_HASH_NUMBER_TOPIC: (&str, &str) = @@ -249,6 +255,14 @@ impl BlockfrostREST { handle_single_account_blockfrost, ); + // Handler for /accounts/{stake_address}/withdrawals + register_handler( + context.clone(), + DEFAULT_HANDLE_ACCOUNT_WITHDRAWALS_TOPIC, + handlers_config.clone(), + handle_account_withdrawals_blockfrost, + ); + // Handler for /blocks/latest, /blocks/{hash_or_number} register_handler( context.clone(), diff --git a/modules/rest_blockfrost/src/types.rs b/modules/rest_blockfrost/src/types.rs index b4cd3364..14d2b0bf 100644 --- a/modules/rest_blockfrost/src/types.rs +++ b/modules/rest_blockfrost/src/types.rs @@ -854,3 +854,9 @@ impl From for AmountList { Self(out) } } + +#[derive(Serialize)] +pub struct AccountWithdrawalREST { + pub tx_hash: String, + pub amount: String, +} From ecc1187e6bca7756c5cadf5f11640c4da613d16a Mon Sep 17 00:00:00 2001 From: William Hankins Date: Wed, 29 Oct 2025 01:12:13 +0000 Subject: [PATCH 4/7] fix: return 404 on NotFound for account withdrawals endpoint Signed-off-by: William Hankins --- .../src/historical_accounts_state.rs | 3 +- .../historical_accounts_state/src/state.rs | 23 ++++++++------- .../rest_blockfrost/src/handlers/accounts.rs | 28 +++++++++---------- 3 files changed, 28 insertions(+), 26 deletions(-) diff --git a/modules/historical_accounts_state/src/historical_accounts_state.rs b/modules/historical_accounts_state/src/historical_accounts_state.rs index fd153921..b21ad1f8 100644 --- a/modules/historical_accounts_state/src/historical_accounts_state.rs +++ b/modules/historical_accounts_state/src/historical_accounts_state.rs @@ -342,9 +342,10 @@ impl HistoricalAccountsState { } AccountsStateQuery::GetAccountWithdrawalHistory { account } => { match state.lock().await.get_withdrawal_history(&account).await { - Ok(withdrawals) => { + Ok(Some(withdrawals)) => { AccountsStateQueryResponse::AccountWithdrawalHistory(withdrawals) } + Ok(None) => AccountsStateQueryResponse::NotFound, Err(e) => AccountsStateQueryResponse::Error(e.to_string()), } } diff --git a/modules/historical_accounts_state/src/state.rs b/modules/historical_accounts_state/src/state.rs index 154d384b..aa978070 100644 --- a/modules/historical_accounts_state/src/state.rs +++ b/modules/historical_accounts_state/src/state.rs @@ -313,16 +313,19 @@ impl State { pub async fn get_withdrawal_history( &self, account: &StakeAddress, - ) -> Result> { - let mut withdrawals = - self.immutable.get_withdrawal_history(&account).await?.unwrap_or_default(); - - self.merge_volatile_history( - &account, - |e| e.withdrawal_history.as_ref(), - &mut withdrawals, - ); - Ok(withdrawals) + ) -> Result>> { + let immutable = self.immutable.get_withdrawal_history(&account).await?; + + let mut volatile = Vec::new(); + self.merge_volatile_history(&account, |e| e.withdrawal_history.as_ref(), &mut volatile); + match immutable { + Some(mut withdrawals) => { + withdrawals.extend(volatile); + Ok(Some(withdrawals)) + } + None if volatile.is_empty() => Ok(None), + None => Ok(Some(volatile)), + } } pub async fn _get_addresses(&self, _account: StakeAddress) -> Result> { diff --git a/modules/rest_blockfrost/src/handlers/accounts.rs b/modules/rest_blockfrost/src/handlers/accounts.rs index 3e9c953e..ef26c4aa 100644 --- a/modules/rest_blockfrost/src/handlers/accounts.rs +++ b/modules/rest_blockfrost/src/handlers/accounts.rs @@ -435,28 +435,26 @@ pub async fn handle_account_withdrawals_blockfrost( |message| match message { Message::StateQueryResponse(StateQueryResponse::Accounts( AccountsStateQueryResponse::AccountWithdrawalHistory(registrations), - )) => Ok(registrations), + )) => Ok(Some(registrations)), Message::StateQueryResponse(StateQueryResponse::Accounts( AccountsStateQueryResponse::NotFound, - )) => { - return Err(anyhow::anyhow!("Account not found")); - } + )) => Ok(None), Message::StateQueryResponse(StateQueryResponse::Accounts( AccountsStateQueryResponse::Error(e), - )) => { - return Err(anyhow::anyhow!( - "Internal server error while retrieving account info: {e}" - )); - } - _ => { - return Err(anyhow::anyhow!( - "Unexpected message type while retrieving account info" - )) - } + )) => Err(anyhow::anyhow!( + "Internal server error while retrieving account info: {e}" + )), + _ => Err(anyhow::anyhow!( + "Unexpected message type while retrieving account info" + )), }, ) .await?; + let Some(withdrawals) = withdrawals else { + return Ok(RESTResponse::with_text(404, "Account not found")); + }; + // Get TxHashes from TxIdentifiers let tx_ids: Vec<_> = withdrawals.iter().map(|r| r.tx_identifier.clone()).collect(); let msg = Arc::new(Message::StateQuery(StateQuery::Blocks( @@ -502,7 +500,7 @@ pub async fn handle_account_withdrawals_blockfrost( Ok(json) => Ok(RESTResponse::with_json(200, &json)), Err(e) => Ok(RESTResponse::with_text( 500, - &format!("Internal server error while serializing account withdrawal history: {e}"), + &format!("Internal server error while serializing withdrawal history: {e}"), )), } } From d1632ad1054904e8a7303670305bb8c0dbd83b4a Mon Sep 17 00:00:00 2001 From: William Hankins Date: Wed, 29 Oct 2025 01:38:29 +0000 Subject: [PATCH 5/7] refactor: include tx_identifier in Withdrawal instead of using wrapper struct Signed-off-by: William Hankins --- common/src/messages.rs | 2 +- common/src/stake_addresses.rs | 7 +++++++ common/src/types.rs | 5 +---- modules/accounts_state/src/state.rs | 12 +++++------- .../src/historical_accounts_state.rs | 3 +-- modules/historical_accounts_state/src/state.rs | 4 ++-- modules/spo_state/src/state.rs | 2 +- modules/tx_unpacker/src/tx_unpacker.rs | 13 +++++-------- 8 files changed, 23 insertions(+), 25 deletions(-) diff --git a/common/src/messages.rs b/common/src/messages.rs index 16770523..a214977d 100644 --- a/common/src/messages.rs +++ b/common/src/messages.rs @@ -105,7 +105,7 @@ pub struct AddressDeltasMessage { #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub struct WithdrawalsMessage { /// Set of withdrawals - pub withdrawals: Vec, + pub withdrawals: Vec, } /// Pot deltas message diff --git a/common/src/stake_addresses.rs b/common/src/stake_addresses.rs index 3d762e59..b9308ce5 100644 --- a/common/src/stake_addresses.rs +++ b/common/src/stake_addresses.rs @@ -867,6 +867,8 @@ mod tests { } mod withdrawal_tests { + use crate::TxIdentifier; + use super::*; #[test] @@ -880,6 +882,7 @@ mod tests { let withdrawal = Withdrawal { address: stake_address.clone(), value: 40, + tx_identifier: TxIdentifier::default(), }; stake_addresses.process_withdrawal(&withdrawal); @@ -898,6 +901,7 @@ mod tests { let withdrawal = Withdrawal { address: stake_address.clone(), value: 24, + tx_identifier: TxIdentifier::default(), }; stake_addresses.process_withdrawal(&withdrawal); assert_eq!(stake_addresses.get(&stake_address).unwrap().rewards, 12); @@ -906,6 +910,7 @@ mod tests { let withdrawal = Withdrawal { address: stake_address.clone(), value: 2, + tx_identifier: TxIdentifier::default(), }; stake_addresses.process_withdrawal(&withdrawal); assert_eq!(stake_addresses.get(&stake_address).unwrap().rewards, 10); @@ -922,6 +927,7 @@ mod tests { let withdrawal = Withdrawal { address: stake_address.clone(), value: 0, + tx_identifier: TxIdentifier::default(), }; stake_addresses.process_withdrawal(&withdrawal); @@ -936,6 +942,7 @@ mod tests { let withdrawal = Withdrawal { address: stake_address.clone(), value: 10, + tx_identifier: TxIdentifier::default(), }; // Should log error but not panic diff --git a/common/src/types.rs b/common/src/types.rs index 6836216a..2e8e60a4 100644 --- a/common/src/types.rs +++ b/common/src/types.rs @@ -605,11 +605,8 @@ pub struct Withdrawal { /// Value to withdraw pub value: Lovelace, -} -#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] -pub struct WithdrawalWithPos { - pub withdrawal: Withdrawal, + // Identifier of withdrawal tx pub tx_identifier: TxIdentifier, } diff --git a/modules/accounts_state/src/state.rs b/modules/accounts_state/src/state.rs index 8813af45..afeb5e5e 100644 --- a/modules/accounts_state/src/state.rs +++ b/modules/accounts_state/src/state.rs @@ -948,7 +948,7 @@ impl State { pub fn handle_withdrawals(&mut self, withdrawals_msg: &WithdrawalsMessage) -> Result<()> { for withdrawal in withdrawals_msg.withdrawals.iter() { let mut stake_addresses = self.stake_addresses.lock().unwrap(); - stake_addresses.process_withdrawal(&withdrawal.withdrawal); + stake_addresses.process_withdrawal(&withdrawal); } Ok(()) @@ -998,7 +998,7 @@ mod tests { PotDelta, Ratio, Registration, StakeAddress, StakeAddressDelta, StakeAndVoteDelegation, StakeCredential, StakeRegistrationAndStakeAndVoteDelegation, StakeRegistrationAndVoteDelegation, TxCertificateWithPos, TxIdentifier, VoteDelegation, - Withdrawal, WithdrawalWithPos, + Withdrawal, }; // Helper to create a StakeAddress from a byte slice @@ -1311,11 +1311,9 @@ mod tests { // Withdraw most of it let withdrawals = WithdrawalsMessage { - withdrawals: vec![WithdrawalWithPos { - withdrawal: Withdrawal { - address: stake_address.clone(), - value: 39, - }, + withdrawals: vec![Withdrawal { + address: stake_address.clone(), + value: 39, tx_identifier: TxIdentifier::default(), }], }; diff --git a/modules/historical_accounts_state/src/historical_accounts_state.rs b/modules/historical_accounts_state/src/historical_accounts_state.rs index b21ad1f8..02e41e86 100644 --- a/modules/historical_accounts_state/src/historical_accounts_state.rs +++ b/modules/historical_accounts_state/src/historical_accounts_state.rs @@ -200,8 +200,7 @@ impl HistoricalAccountsState { if should_prune { let (store, cfg) = { - let mut state: tokio::sync::MutexGuard<'_, State> = - state_mutex.lock().await; + let mut state = state_mutex.lock().await; state.prune_volatile().await; (state.immutable.clone(), state.config.clone()) }; diff --git a/modules/historical_accounts_state/src/state.rs b/modules/historical_accounts_state/src/state.rs index aa978070..6e506d79 100644 --- a/modules/historical_accounts_state/src/state.rs +++ b/modules/historical_accounts_state/src/state.rs @@ -228,13 +228,13 @@ impl State { for w in &withdrawals_msg.withdrawals { window - .entry(w.withdrawal.address.clone()) + .entry(w.address.clone()) .or_default() .withdrawal_history .get_or_insert_with(Vec::new) .push(AccountWithdrawal { tx_identifier: w.tx_identifier, - amount: w.withdrawal.value, + amount: w.value, }) } } diff --git a/modules/spo_state/src/state.rs b/modules/spo_state/src/state.rs index 4931c804..49a50df7 100644 --- a/modules/spo_state/src/state.rs +++ b/modules/spo_state/src/state.rs @@ -618,7 +618,7 @@ impl State { }; let mut stake_addresses = stake_addresses.lock().unwrap(); for withdrawal in withdrawals_msg.withdrawals.iter() { - stake_addresses.process_withdrawal(&withdrawal.withdrawal); + stake_addresses.process_withdrawal(&withdrawal); } Ok(()) diff --git a/modules/tx_unpacker/src/tx_unpacker.rs b/modules/tx_unpacker/src/tx_unpacker.rs index 41d185ce..ab93de13 100644 --- a/modules/tx_unpacker/src/tx_unpacker.rs +++ b/modules/tx_unpacker/src/tx_unpacker.rs @@ -285,15 +285,12 @@ impl TxUnpacker { for (key, value) in tx_withdrawals { match StakeAddress::from_binary(key) { Ok(stake_address) => { - withdrawals.push(WithdrawalWithPos { - withdrawal: Withdrawal { + withdrawals.push(Withdrawal { address: stake_address, - value - }, - tx_identifier, - }); - } - + value, + tx_identifier + }); + } Err(e) => error!("Bad stake address: {e:#}"), } } From b920cfecaf3e62252b4fe29340dcdc45655bf147 Mon Sep 17 00:00:00 2001 From: William Hankins Date: Wed, 29 Oct 2025 16:17:21 +0000 Subject: [PATCH 6/7] fix: clippy warnings Signed-off-by: William Hankins --- .../workflows/run-tests-on-push-to-main.yml | 1 + .../src/historical_accounts_state.rs | 18 +++++------ .../src/immutable_historical_account_store.rs | 32 +++++++++---------- .../historical_accounts_state/src/state.rs | 20 ++++++------ 4 files changed, 35 insertions(+), 36 deletions(-) diff --git a/.github/workflows/run-tests-on-push-to-main.yml b/.github/workflows/run-tests-on-push-to-main.yml index 7bd62060..eea39c8b 100644 --- a/.github/workflows/run-tests-on-push-to-main.yml +++ b/.github/workflows/run-tests-on-push-to-main.yml @@ -38,6 +38,7 @@ jobs: --package acropolis_module_drep_state \ --package acropolis_module_epochs_state \ --package acropolis_module_genesis_bootstrapper \ + --package acropolis_module_historical_accounts_state \ --package acropolis_module_mithril_snapshot_fetcher \ --package acropolis_module_snapshot_bootstrapper \ --package acropolis_module_spdd_state \ diff --git a/modules/historical_accounts_state/src/historical_accounts_state.rs b/modules/historical_accounts_state/src/historical_accounts_state.rs index 02e41e86..9633c500 100644 --- a/modules/historical_accounts_state/src/historical_accounts_state.rs +++ b/modules/historical_accounts_state/src/historical_accounts_state.rs @@ -110,7 +110,7 @@ impl HistoricalAccountsState { if let Message::Cardano((ref block_info, CardanoMessage::ProtocolParams(params))) = params_msg.as_ref() { - Self::check_sync(¤t_block, &block_info); + Self::check_sync(¤t_block, block_info); let mut state = state_mutex.lock().await; state.volatile.start_new_epoch(block_info.number); if let Some(shelley) = ¶ms.params.shelley { @@ -124,7 +124,7 @@ impl HistoricalAccountsState { CardanoMessage::StakeRewardDeltas(rewards_msg), )) = rewards_msg.as_ref() { - Self::check_sync(¤t_block, &block_info); + Self::check_sync(¤t_block, block_info); let mut state = state_mutex.lock().await; state .handle_rewards(rewards_msg) @@ -142,7 +142,7 @@ impl HistoricalAccountsState { ); let _entered = span.enter(); - Self::check_sync(¤t_block, &block_info); + Self::check_sync(¤t_block, block_info); let mut state = state_mutex.lock().await; state.handle_tx_certificates(tx_certs_msg, block_info.epoch as u32); } @@ -160,7 +160,7 @@ impl HistoricalAccountsState { ); let _entered = span.enter(); - Self::check_sync(¤t_block, &block_info); + Self::check_sync(¤t_block, block_info); let mut state = state_mutex.lock().await; state.handle_withdrawals(withdrawals_msg); } @@ -178,7 +178,7 @@ impl HistoricalAccountsState { ); let _entered = span.enter(); - Self::check_sync(¤t_block, &block_info); + Self::check_sync(¤t_block, block_info); { let mut state = state_mutex.lock().await; state @@ -313,7 +313,7 @@ impl HistoricalAccountsState { let response = match query { AccountsStateQuery::GetAccountRegistrationHistory { account } => { - match state.lock().await.get_registration_history(&account).await { + match state.lock().await.get_registration_history(account).await { Ok(Some(registrations)) => { AccountsStateQueryResponse::AccountRegistrationHistory( registrations, @@ -324,7 +324,7 @@ impl HistoricalAccountsState { } } AccountsStateQuery::GetAccountDelegationHistory { account } => { - match state.lock().await.get_delegation_history(&account).await { + match state.lock().await.get_delegation_history(account).await { Ok(Some(delegations)) => { AccountsStateQueryResponse::AccountDelegationHistory(delegations) } @@ -333,14 +333,14 @@ impl HistoricalAccountsState { } } AccountsStateQuery::GetAccountMIRHistory { account } => { - match state.lock().await.get_mir_history(&account).await { + match state.lock().await.get_mir_history(account).await { Ok(Some(mirs)) => AccountsStateQueryResponse::AccountMIRHistory(mirs), Ok(None) => AccountsStateQueryResponse::NotFound, Err(e) => AccountsStateQueryResponse::Error(e.to_string()), } } AccountsStateQuery::GetAccountWithdrawalHistory { account } => { - match state.lock().await.get_withdrawal_history(&account).await { + match state.lock().await.get_withdrawal_history(account).await { Ok(Some(withdrawals)) => { AccountsStateQueryResponse::AccountWithdrawalHistory(withdrawals) } diff --git a/modules/historical_accounts_state/src/immutable_historical_account_store.rs b/modules/historical_accounts_state/src/immutable_historical_account_store.rs index 5592264a..eb95c929 100644 --- a/modules/historical_accounts_state/src/immutable_historical_account_store.rs +++ b/modules/historical_accounts_state/src/immutable_historical_account_store.rs @@ -87,7 +87,7 @@ impl ImmutableHistoricalAccountStore { if config.store_rewards_history { batch.insert( &self.rewards_history, - &epoch_key, + epoch_key, to_vec(&entry.reward_history)?, ); } @@ -96,7 +96,7 @@ impl ImmutableHistoricalAccountStore { if config.store_active_stake_history { batch.insert( &self.active_stake_history, - &epoch_key, + epoch_key, to_vec(&entry.active_stake_history)?, ); } @@ -104,28 +104,28 @@ impl ImmutableHistoricalAccountStore { // Persist account delegation updates if config.store_delegation_history { if let Some(updates) = &entry.delegation_history { - batch.insert(&self.delegation_history, &epoch_key, to_vec(&updates)?); + batch.insert(&self.delegation_history, epoch_key, to_vec(updates)?); } } // Persist account registration updates if config.store_registration_history { if let Some(updates) = &entry.registration_history { - batch.insert(&self.registration_history, &epoch_key, to_vec(&updates)?); + batch.insert(&self.registration_history, epoch_key, to_vec(updates)?); } } // Persist withdrawal updates if config.store_withdrawal_history { if let Some(updates) = &entry.withdrawal_history { - batch.insert(&self.withdrawal_history, &epoch_key, to_vec(&updates)?); + batch.insert(&self.withdrawal_history, epoch_key, to_vec(updates)?); } } // Persist MIR updates if config.store_mir_history { if let Some(updates) = &entry.mir_history { - batch.insert(&self.mir_history, &epoch_key, to_vec(&updates)?); + batch.insert(&self.mir_history, epoch_key, to_vec(updates)?); } } @@ -133,7 +133,7 @@ impl ImmutableHistoricalAccountStore { // TODO: Deduplicate addresses across epochs if config.store_addresses { if let Some(updates) = &entry.addresses { - batch.insert(&self.addresses, &epoch_key, to_vec(&updates)?); + batch.insert(&self.addresses, epoch_key, to_vec(updates)?); } } } @@ -157,7 +157,7 @@ impl ImmutableHistoricalAccountStore { account: &StakeAddress, ) -> Result>> { let mut immutable_rewards = - self.collect_partition::(&self.rewards_history, &account.get_hash())?; + self.collect_partition::(&self.rewards_history, account.get_hash())?; self.merge_pending( account, @@ -175,7 +175,7 @@ impl ImmutableHistoricalAccountStore { ) -> Result>> { let mut immutable_active_stake = self.collect_partition::( &self.active_stake_history, - &account.get_hash(), + account.get_hash(), )?; self.merge_pending( @@ -194,7 +194,7 @@ impl ImmutableHistoricalAccountStore { ) -> Result>> { let mut immutable_registrations = self.collect_partition::( &self.registration_history, - &account.get_hash(), + account.get_hash(), )?; self.merge_pending( @@ -212,7 +212,7 @@ impl ImmutableHistoricalAccountStore { account: &StakeAddress, ) -> Result>> { let mut immutable_delegations = self - .collect_partition::(&self.delegation_history, &account.get_hash())?; + .collect_partition::(&self.delegation_history, account.get_hash())?; self.merge_pending( account, @@ -229,7 +229,7 @@ impl ImmutableHistoricalAccountStore { account: &StakeAddress, ) -> Result>> { let mut immutable_mirs = - self.collect_partition::(&self.mir_history, &account.get_hash())?; + self.collect_partition::(&self.mir_history, account.get_hash())?; self.merge_pending(account, |e| e.mir_history.as_ref(), &mut immutable_mirs).await; @@ -240,10 +240,8 @@ impl ImmutableHistoricalAccountStore { &self, account: &StakeAddress, ) -> Result>> { - let mut immutable_withdrawals = self.collect_partition::( - &self.withdrawal_history, - &account.get_hash(), - )?; + let mut immutable_withdrawals = self + .collect_partition::(&self.withdrawal_history, account.get_hash())?; self.merge_pending( account, @@ -260,7 +258,7 @@ impl ImmutableHistoricalAccountStore { account: &StakeAddress, ) -> Result>> { let mut immutable_addresses = - self.collect_partition::(&self.addresses, &account.get_hash())?; + self.collect_partition::(&self.addresses, account.get_hash())?; self.merge_pending(account, |e| e.addresses.as_ref(), &mut immutable_addresses).await; diff --git a/modules/historical_accounts_state/src/state.rs b/modules/historical_accounts_state/src/state.rs index 6e506d79..4ceef42c 100644 --- a/modules/historical_accounts_state/src/state.rs +++ b/modules/historical_accounts_state/src/state.rs @@ -127,14 +127,14 @@ impl State { // Pre-Conway stake registration/deregistration certs TxCertificate::StakeRegistration(stake_address) => { self.handle_stake_registration_change( - &stake_address, + stake_address, &tx_cert.tx_identifier, RegistrationStatus::Registered, ); } TxCertificate::StakeDeregistration(stake_address) => { self.handle_stake_registration_change( - &stake_address, + stake_address, &tx_cert.tx_identifier, RegistrationStatus::Deregistered, ); @@ -257,10 +257,10 @@ impl State { &self, account: &StakeAddress, ) -> Result>> { - let immutable = self.immutable.get_registration_history(&account).await?; + let immutable = self.immutable.get_registration_history(account).await?; let mut volatile = Vec::new(); - self.merge_volatile_history(&account, |e| e.registration_history.as_ref(), &mut volatile); + self.merge_volatile_history(account, |e| e.registration_history.as_ref(), &mut volatile); match immutable { Some(mut registrations) => { @@ -276,10 +276,10 @@ impl State { &self, account: &StakeAddress, ) -> Result>> { - let immutable = self.immutable.get_delegation_history(&account).await?; + let immutable = self.immutable.get_delegation_history(account).await?; let mut volatile = Vec::new(); - self.merge_volatile_history(&account, |e| e.delegation_history.as_ref(), &mut volatile); + self.merge_volatile_history(account, |e| e.delegation_history.as_ref(), &mut volatile); match immutable { Some(mut delegations) => { @@ -295,10 +295,10 @@ impl State { &self, account: &StakeAddress, ) -> Result>> { - let immutable = self.immutable.get_mir_history(&account).await?; + let immutable = self.immutable.get_mir_history(account).await?; let mut volatile = Vec::new(); - self.merge_volatile_history(&account, |e| e.mir_history.as_ref(), &mut volatile); + self.merge_volatile_history(account, |e| e.mir_history.as_ref(), &mut volatile); match immutable { Some(mut mirs) => { @@ -314,10 +314,10 @@ impl State { &self, account: &StakeAddress, ) -> Result>> { - let immutable = self.immutable.get_withdrawal_history(&account).await?; + let immutable = self.immutable.get_withdrawal_history(account).await?; let mut volatile = Vec::new(); - self.merge_volatile_history(&account, |e| e.withdrawal_history.as_ref(), &mut volatile); + self.merge_volatile_history(account, |e| e.withdrawal_history.as_ref(), &mut volatile); match immutable { Some(mut withdrawals) => { withdrawals.extend(volatile); From 840dd010ed845960f5f09b8c5dad899e5c510d5f Mon Sep 17 00:00:00 2001 From: William Hankins Date: Wed, 29 Oct 2025 16:24:42 +0000 Subject: [PATCH 7/7] fix: clippy warnings in accounts and spo state Signed-off-by: William Hankins --- modules/accounts_state/src/state.rs | 2 +- modules/spo_state/src/state.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/modules/accounts_state/src/state.rs b/modules/accounts_state/src/state.rs index afeb5e5e..96723651 100644 --- a/modules/accounts_state/src/state.rs +++ b/modules/accounts_state/src/state.rs @@ -948,7 +948,7 @@ impl State { pub fn handle_withdrawals(&mut self, withdrawals_msg: &WithdrawalsMessage) -> Result<()> { for withdrawal in withdrawals_msg.withdrawals.iter() { let mut stake_addresses = self.stake_addresses.lock().unwrap(); - stake_addresses.process_withdrawal(&withdrawal); + stake_addresses.process_withdrawal(withdrawal); } Ok(()) diff --git a/modules/spo_state/src/state.rs b/modules/spo_state/src/state.rs index 49a50df7..8d3b79d6 100644 --- a/modules/spo_state/src/state.rs +++ b/modules/spo_state/src/state.rs @@ -618,7 +618,7 @@ impl State { }; let mut stake_addresses = stake_addresses.lock().unwrap(); for withdrawal in withdrawals_msg.withdrawals.iter() { - stake_addresses.process_withdrawal(&withdrawal); + stake_addresses.process_withdrawal(withdrawal); } Ok(())