diff --git a/.github/workflows/run-tests-on-push-to-main.yml b/.github/workflows/run-tests-on-push-to-main.yml index 1a12dcc3..a8e221f0 100644 --- a/.github/workflows/run-tests-on-push-to-main.yml +++ b/.github/workflows/run-tests-on-push-to-main.yml @@ -39,6 +39,7 @@ jobs: --package acropolis_module_epochs_state \ --package acropolis_module_genesis_bootstrapper \ --package acropolis_module_governance_state \ + --package acropolis_module_historical_accounts_state \ --package acropolis_module_mithril_snapshot_fetcher \ --package acropolis_module_parameters_state \ --package acropolis_module_snapshot_bootstrapper \ diff --git a/common/src/queries/accounts.rs b/common/src/queries/accounts.rs index be573a13..b075c356 100644 --- a/common/src/queries/accounts.rs +++ b/common/src/queries/accounts.rs @@ -18,7 +18,7 @@ pub enum AccountsStateQuery { GetAccountRegistrationHistory { account: StakeAddress }, GetAccountDelegationHistory { account: StakeAddress }, GetAccountMIRHistory { account: StakeAddress }, - GetAccountWithdrawalHistory { stake_key: Vec }, + GetAccountWithdrawalHistory { account: StakeAddress }, GetAccountAssociatedAddresses { stake_key: Vec }, GetAccountAssets { stake_key: Vec }, GetAccountAssetsTotals { stake_key: Vec }, @@ -52,7 +52,7 @@ pub enum AccountsStateQueryResponse { AccountRegistrationHistory(Vec), AccountDelegationHistory(Vec), AccountMIRHistory(Vec), - AccountWithdrawalHistory(AccountWithdrawalHistory), + AccountWithdrawalHistory(Vec), AccountAssociatedAddresses(AccountAssociatedAddresses), AccountAssets(AccountAssets), AccountAssetsTotals(AccountAssetsTotals), diff --git a/common/src/stake_addresses.rs b/common/src/stake_addresses.rs index 3d762e59..b9308ce5 100644 --- a/common/src/stake_addresses.rs +++ b/common/src/stake_addresses.rs @@ -867,6 +867,8 @@ mod tests { } mod withdrawal_tests { + use crate::TxIdentifier; + use super::*; #[test] @@ -880,6 +882,7 @@ mod tests { let withdrawal = Withdrawal { address: stake_address.clone(), value: 40, + tx_identifier: TxIdentifier::default(), }; stake_addresses.process_withdrawal(&withdrawal); @@ -898,6 +901,7 @@ mod tests { let withdrawal = Withdrawal { address: stake_address.clone(), value: 24, + tx_identifier: TxIdentifier::default(), }; stake_addresses.process_withdrawal(&withdrawal); assert_eq!(stake_addresses.get(&stake_address).unwrap().rewards, 12); @@ -906,6 +910,7 @@ mod tests { let withdrawal = Withdrawal { address: stake_address.clone(), value: 2, + tx_identifier: TxIdentifier::default(), }; stake_addresses.process_withdrawal(&withdrawal); assert_eq!(stake_addresses.get(&stake_address).unwrap().rewards, 10); @@ -922,6 +927,7 @@ mod tests { let withdrawal = Withdrawal { address: stake_address.clone(), value: 0, + tx_identifier: TxIdentifier::default(), }; stake_addresses.process_withdrawal(&withdrawal); @@ -936,6 +942,7 @@ mod tests { let withdrawal = Withdrawal { address: stake_address.clone(), value: 10, + tx_identifier: TxIdentifier::default(), }; // Should log error but not panic diff --git a/common/src/types.rs b/common/src/types.rs index 89d1bc89..7444afb9 100644 --- a/common/src/types.rs +++ b/common/src/types.rs @@ -610,6 +610,9 @@ pub struct Withdrawal { /// Value to withdraw pub value: Lovelace, + + // Identifier of withdrawal tx + pub tx_identifier: TxIdentifier, } /// Treasury pot account diff --git a/modules/accounts_state/src/state.rs b/modules/accounts_state/src/state.rs index db15508d..96723651 100644 --- a/modules/accounts_state/src/state.rs +++ b/modules/accounts_state/src/state.rs @@ -1314,6 +1314,7 @@ mod tests { withdrawals: vec![Withdrawal { address: stake_address.clone(), value: 39, + tx_identifier: TxIdentifier::default(), }], }; diff --git a/modules/historical_accounts_state/src/historical_accounts_state.rs b/modules/historical_accounts_state/src/historical_accounts_state.rs index b64fa5cb..9633c500 100644 --- a/modules/historical_accounts_state/src/historical_accounts_state.rs +++ b/modules/historical_accounts_state/src/historical_accounts_state.rs @@ -110,7 +110,7 @@ impl HistoricalAccountsState { if let Message::Cardano((ref block_info, CardanoMessage::ProtocolParams(params))) = params_msg.as_ref() { - Self::check_sync(¤t_block, &block_info); + Self::check_sync(¤t_block, block_info); let mut state = state_mutex.lock().await; state.volatile.start_new_epoch(block_info.number); if let Some(shelley) = ¶ms.params.shelley { @@ -124,7 +124,7 @@ impl HistoricalAccountsState { CardanoMessage::StakeRewardDeltas(rewards_msg), )) = rewards_msg.as_ref() { - Self::check_sync(¤t_block, &block_info); + Self::check_sync(¤t_block, block_info); let mut state = state_mutex.lock().await; state .handle_rewards(rewards_msg) @@ -142,12 +142,9 @@ impl HistoricalAccountsState { ); let _entered = span.enter(); - Self::check_sync(¤t_block, &block_info); + Self::check_sync(¤t_block, block_info); let mut state = state_mutex.lock().await; - state - .handle_tx_certificates(tx_certs_msg, block_info.epoch as u32) - .inspect_err(|e| error!("TxCertificates handling error: {e:#}")) - .ok(); + state.handle_tx_certificates(tx_certs_msg, block_info.epoch as u32); } _ => error!("Unexpected message type: {certs_message:?}"), @@ -163,12 +160,9 @@ impl HistoricalAccountsState { ); let _entered = span.enter(); - Self::check_sync(¤t_block, &block_info); + Self::check_sync(¤t_block, block_info); let mut state = state_mutex.lock().await; - state - .handle_withdrawals(withdrawals_msg) - .inspect_err(|e| error!("Withdrawals handling error: {e:#}")) - .ok(); + state.handle_withdrawals(withdrawals_msg); } _ => error!("Unexpected message type: {message:?}"), @@ -184,7 +178,7 @@ impl HistoricalAccountsState { ); let _entered = span.enter(); - Self::check_sync(¤t_block, &block_info); + Self::check_sync(¤t_block, block_info); { let mut state = state_mutex.lock().await; state @@ -206,8 +200,7 @@ impl HistoricalAccountsState { if should_prune { let (store, cfg) = { - let mut state: tokio::sync::MutexGuard<'_, State> = - state_mutex.lock().await; + let mut state = state_mutex.lock().await; state.prune_volatile().await; (state.immutable.clone(), state.config.clone()) }; @@ -320,7 +313,7 @@ impl HistoricalAccountsState { let response = match query { AccountsStateQuery::GetAccountRegistrationHistory { account } => { - match state.lock().await.get_registration_history(&account).await { + match state.lock().await.get_registration_history(account).await { Ok(Some(registrations)) => { AccountsStateQueryResponse::AccountRegistrationHistory( registrations, @@ -331,7 +324,7 @@ impl HistoricalAccountsState { } } AccountsStateQuery::GetAccountDelegationHistory { account } => { - match state.lock().await.get_delegation_history(&account).await { + match state.lock().await.get_delegation_history(account).await { Ok(Some(delegations)) => { AccountsStateQueryResponse::AccountDelegationHistory(delegations) } @@ -340,13 +333,21 @@ impl HistoricalAccountsState { } } AccountsStateQuery::GetAccountMIRHistory { account } => { - match state.lock().await.get_mir_history(&account).await { + match state.lock().await.get_mir_history(account).await { Ok(Some(mirs)) => AccountsStateQueryResponse::AccountMIRHistory(mirs), Ok(None) => AccountsStateQueryResponse::NotFound, Err(e) => AccountsStateQueryResponse::Error(e.to_string()), } } - + AccountsStateQuery::GetAccountWithdrawalHistory { account } => { + match state.lock().await.get_withdrawal_history(account).await { + Ok(Some(withdrawals)) => { + AccountsStateQueryResponse::AccountWithdrawalHistory(withdrawals) + } + Ok(None) => AccountsStateQueryResponse::NotFound, + Err(e) => AccountsStateQueryResponse::Error(e.to_string()), + } + } _ => AccountsStateQueryResponse::Error(format!( "Unimplemented query variant: {:?}", query diff --git a/modules/historical_accounts_state/src/immutable_historical_account_store.rs b/modules/historical_accounts_state/src/immutable_historical_account_store.rs index ae6fbd0e..eb95c929 100644 --- a/modules/historical_accounts_state/src/immutable_historical_account_store.rs +++ b/modules/historical_accounts_state/src/immutable_historical_account_store.rs @@ -87,7 +87,7 @@ impl ImmutableHistoricalAccountStore { if config.store_rewards_history { batch.insert( &self.rewards_history, - &epoch_key, + epoch_key, to_vec(&entry.reward_history)?, ); } @@ -96,7 +96,7 @@ impl ImmutableHistoricalAccountStore { if config.store_active_stake_history { batch.insert( &self.active_stake_history, - &epoch_key, + epoch_key, to_vec(&entry.active_stake_history)?, ); } @@ -104,28 +104,28 @@ impl ImmutableHistoricalAccountStore { // Persist account delegation updates if config.store_delegation_history { if let Some(updates) = &entry.delegation_history { - batch.insert(&self.delegation_history, &epoch_key, to_vec(&updates)?); + batch.insert(&self.delegation_history, epoch_key, to_vec(updates)?); } } // Persist account registration updates if config.store_registration_history { if let Some(updates) = &entry.registration_history { - batch.insert(&self.registration_history, &epoch_key, to_vec(&updates)?); + batch.insert(&self.registration_history, epoch_key, to_vec(updates)?); } } // Persist withdrawal updates if config.store_withdrawal_history { if let Some(updates) = &entry.withdrawal_history { - batch.insert(&self.withdrawal_history, &epoch_key, to_vec(&updates)?); + batch.insert(&self.withdrawal_history, epoch_key, to_vec(updates)?); } } // Persist MIR updates if config.store_mir_history { if let Some(updates) = &entry.mir_history { - batch.insert(&self.mir_history, &epoch_key, to_vec(&updates)?); + batch.insert(&self.mir_history, epoch_key, to_vec(updates)?); } } @@ -133,7 +133,7 @@ impl ImmutableHistoricalAccountStore { // TODO: Deduplicate addresses across epochs if config.store_addresses { if let Some(updates) = &entry.addresses { - batch.insert(&self.addresses, &epoch_key, to_vec(&updates)?); + batch.insert(&self.addresses, epoch_key, to_vec(updates)?); } } } @@ -157,7 +157,7 @@ impl ImmutableHistoricalAccountStore { account: &StakeAddress, ) -> Result>> { let mut immutable_rewards = - self.collect_partition::(&self.rewards_history, &account.get_hash())?; + self.collect_partition::(&self.rewards_history, account.get_hash())?; self.merge_pending( account, @@ -175,7 +175,7 @@ impl ImmutableHistoricalAccountStore { ) -> Result>> { let mut immutable_active_stake = self.collect_partition::( &self.active_stake_history, - &account.get_hash(), + account.get_hash(), )?; self.merge_pending( @@ -194,7 +194,7 @@ impl ImmutableHistoricalAccountStore { ) -> Result>> { let mut immutable_registrations = self.collect_partition::( &self.registration_history, - &account.get_hash(), + account.get_hash(), )?; self.merge_pending( @@ -212,7 +212,7 @@ impl ImmutableHistoricalAccountStore { account: &StakeAddress, ) -> Result>> { let mut immutable_delegations = self - .collect_partition::(&self.delegation_history, &account.get_hash())?; + .collect_partition::(&self.delegation_history, account.get_hash())?; self.merge_pending( account, @@ -229,21 +229,19 @@ impl ImmutableHistoricalAccountStore { account: &StakeAddress, ) -> Result>> { let mut immutable_mirs = - self.collect_partition::(&self.mir_history, &account.get_hash())?; + self.collect_partition::(&self.mir_history, account.get_hash())?; self.merge_pending(account, |e| e.mir_history.as_ref(), &mut immutable_mirs).await; Ok((!immutable_mirs.is_empty()).then_some(immutable_mirs)) } - pub async fn _get_withdrawal_history( + pub async fn get_withdrawal_history( &self, account: &StakeAddress, ) -> Result>> { - let mut immutable_withdrawals = self.collect_partition::( - &self.withdrawal_history, - &account.get_hash(), - )?; + let mut immutable_withdrawals = self + .collect_partition::(&self.withdrawal_history, account.get_hash())?; self.merge_pending( account, @@ -260,7 +258,7 @@ impl ImmutableHistoricalAccountStore { account: &StakeAddress, ) -> Result>> { let mut immutable_addresses = - self.collect_partition::(&self.addresses, &account.get_hash())?; + self.collect_partition::(&self.addresses, account.get_hash())?; self.merge_pending(account, |e| e.addresses.as_ref(), &mut immutable_addresses).await; diff --git a/modules/historical_accounts_state/src/state.rs b/modules/historical_accounts_state/src/state.rs index 89690432..4ceef42c 100644 --- a/modules/historical_accounts_state/src/state.rs +++ b/modules/historical_accounts_state/src/state.rs @@ -120,25 +120,21 @@ impl State { Ok(()) } - pub fn handle_tx_certificates( - &mut self, - tx_certs: &TxCertificatesMessage, - epoch: u32, - ) -> Result<()> { + pub fn handle_tx_certificates(&mut self, tx_certs: &TxCertificatesMessage, epoch: u32) { // Handle certificates for tx_cert in tx_certs.certificates.iter() { match &tx_cert.cert { // Pre-Conway stake registration/deregistration certs TxCertificate::StakeRegistration(stake_address) => { self.handle_stake_registration_change( - &stake_address, + stake_address, &tx_cert.tx_identifier, RegistrationStatus::Registered, ); } TxCertificate::StakeDeregistration(stake_address) => { self.handle_stake_registration_change( - &stake_address, + stake_address, &tx_cert.tx_identifier, RegistrationStatus::Deregistered, ); @@ -221,15 +217,26 @@ impl State { _ => (), }; } - Ok(()) } pub fn handle_address_deltas(&mut self, _address_deltas: &AddressDeltasMessage) -> Result<()> { Ok(()) } - pub fn handle_withdrawals(&mut self, _withdrawals: &WithdrawalsMessage) -> Result<()> { - Ok(()) + pub fn handle_withdrawals(&mut self, withdrawals_msg: &WithdrawalsMessage) { + let window = self.volatile.window.back_mut().expect("window should never be empty"); + + for w in &withdrawals_msg.withdrawals { + window + .entry(w.address.clone()) + .or_default() + .withdrawal_history + .get_or_insert_with(Vec::new) + .push(AccountWithdrawal { + tx_identifier: w.tx_identifier, + amount: w.value, + }) + } } pub async fn _get_reward_history( @@ -250,10 +257,10 @@ impl State { &self, account: &StakeAddress, ) -> Result>> { - let immutable = self.immutable.get_registration_history(&account).await?; + let immutable = self.immutable.get_registration_history(account).await?; let mut volatile = Vec::new(); - self.merge_volatile_history(&account, |e| e.registration_history.as_ref(), &mut volatile); + self.merge_volatile_history(account, |e| e.registration_history.as_ref(), &mut volatile); match immutable { Some(mut registrations) => { @@ -269,10 +276,10 @@ impl State { &self, account: &StakeAddress, ) -> Result>> { - let immutable = self.immutable.get_delegation_history(&account).await?; + let immutable = self.immutable.get_delegation_history(account).await?; let mut volatile = Vec::new(); - self.merge_volatile_history(&account, |e| e.delegation_history.as_ref(), &mut volatile); + self.merge_volatile_history(account, |e| e.delegation_history.as_ref(), &mut volatile); match immutable { Some(mut delegations) => { @@ -288,10 +295,10 @@ impl State { &self, account: &StakeAddress, ) -> Result>> { - let immutable = self.immutable.get_mir_history(&account).await?; + let immutable = self.immutable.get_mir_history(account).await?; let mut volatile = Vec::new(); - self.merge_volatile_history(&account, |e| e.mir_history.as_ref(), &mut volatile); + self.merge_volatile_history(account, |e| e.mir_history.as_ref(), &mut volatile); match immutable { Some(mut mirs) => { @@ -303,11 +310,22 @@ impl State { } } - pub async fn _get_withdrawal_history( + pub async fn get_withdrawal_history( &self, - _account: &StakeAddress, - ) -> Result> { - Ok(Vec::new()) + account: &StakeAddress, + ) -> Result>> { + let immutable = self.immutable.get_withdrawal_history(account).await?; + + let mut volatile = Vec::new(); + self.merge_volatile_history(account, |e| e.withdrawal_history.as_ref(), &mut volatile); + match immutable { + Some(mut withdrawals) => { + withdrawals.extend(volatile); + Ok(Some(withdrawals)) + } + None if volatile.is_empty() => Ok(None), + None => Ok(Some(volatile)), + } } pub async fn _get_addresses(&self, _account: StakeAddress) -> Result> { diff --git a/modules/rest_blockfrost/src/handlers/accounts.rs b/modules/rest_blockfrost/src/handlers/accounts.rs index f4784615..ef26c4aa 100644 --- a/modules/rest_blockfrost/src/handlers/accounts.rs +++ b/modules/rest_blockfrost/src/handlers/accounts.rs @@ -410,6 +410,101 @@ pub async fn handle_account_mirs_blockfrost( } } +pub async fn handle_account_withdrawals_blockfrost( + context: Arc>, + params: Vec, + handlers_config: Arc, +) -> Result { + let stake_address = match parse_stake_address(¶ms) { + Ok(addr) => addr, + Err(resp) => return Ok(resp), + }; + + // Prepare the message + let msg = Arc::new(Message::StateQuery(StateQuery::Accounts( + AccountsStateQuery::GetAccountRegistrationHistory { + account: stake_address, + }, + ))); + + // Get withdrawals from historical accounts state + let withdrawals = query_state( + &context, + &handlers_config.historical_accounts_query_topic, + msg, + |message| match message { + Message::StateQueryResponse(StateQueryResponse::Accounts( + AccountsStateQueryResponse::AccountWithdrawalHistory(registrations), + )) => Ok(Some(registrations)), + Message::StateQueryResponse(StateQueryResponse::Accounts( + AccountsStateQueryResponse::NotFound, + )) => Ok(None), + Message::StateQueryResponse(StateQueryResponse::Accounts( + AccountsStateQueryResponse::Error(e), + )) => Err(anyhow::anyhow!( + "Internal server error while retrieving account info: {e}" + )), + _ => Err(anyhow::anyhow!( + "Unexpected message type while retrieving account info" + )), + }, + ) + .await?; + + let Some(withdrawals) = withdrawals else { + return Ok(RESTResponse::with_text(404, "Account not found")); + }; + + // Get TxHashes from TxIdentifiers + let tx_ids: Vec<_> = withdrawals.iter().map(|r| r.tx_identifier.clone()).collect(); + let msg = Arc::new(Message::StateQuery(StateQuery::Blocks( + BlocksStateQuery::GetTransactionHashes { tx_ids }, + ))); + let tx_hashes = query_state( + &context, + &handlers_config.blocks_query_topic, + msg, + |message| match message { + Message::StateQueryResponse(StateQueryResponse::Blocks( + BlocksStateQueryResponse::TransactionHashes(TransactionHashes { tx_hashes }), + )) => Ok(tx_hashes), + Message::StateQueryResponse(StateQueryResponse::Blocks( + BlocksStateQueryResponse::Error(e), + )) => Err(anyhow::anyhow!( + "Internal server error while resolving transaction hashes: {e}" + )), + _ => Err(anyhow::anyhow!( + "Unexpected message type while resolving transaction hashes" + )), + }, + ) + .await?; + + let mut rest_response = Vec::new(); + + for w in withdrawals { + let Some(tx_hash) = tx_hashes.get(&w.tx_identifier) else { + return Ok(RESTResponse::with_text( + 500, + "Missing tx hash for withdrawal", + )); + }; + + rest_response.push(AccountWithdrawalREST { + tx_hash: hex::encode(tx_hash), + amount: w.amount.to_string(), + }); + } + + match serde_json::to_string_pretty(&rest_response) { + Ok(json) => Ok(RESTResponse::with_json(200, &json)), + Err(e) => Ok(RESTResponse::with_text( + 500, + &format!("Internal server error while serializing withdrawal history: {e}"), + )), + } +} + fn parse_stake_address(params: &[String]) -> Result { let Some(stake_key) = params.first() else { return Err(RESTResponse::with_text( diff --git a/modules/rest_blockfrost/src/rest_blockfrost.rs b/modules/rest_blockfrost/src/rest_blockfrost.rs index 66e4763d..cbf72f73 100644 --- a/modules/rest_blockfrost/src/rest_blockfrost.rs +++ b/modules/rest_blockfrost/src/rest_blockfrost.rs @@ -61,7 +61,7 @@ use handlers::{ use crate::{ handlers::accounts::{ handle_account_delegations_blockfrost, handle_account_mirs_blockfrost, - handle_account_registrations_blockfrost, + handle_account_registrations_blockfrost, handle_account_withdrawals_blockfrost, }, handlers_config::HandlersConfig, }; @@ -79,6 +79,10 @@ const DEFAULT_HANDLE_ACCOUNT_DELEGATIONS_TOPIC: (&str, &str) = ( ); const DEFAULT_HANDLE_ACCOUNT_MIRS_TOPIC: (&str, &str) = ("handle-topic-account-mirs", "rest.get.accounts.*.mirs"); +const DEFAULT_HANDLE_ACCOUNT_WITHDRAWALS_TOPIC: (&str, &str) = ( + "handle-topic-account-withdrawals", + "rest.get.accounts.*.withdrawals", +); // Blocks topics const DEFAULT_HANDLE_BLOCKS_LATEST_HASH_NUMBER_TOPIC: (&str, &str) = @@ -289,6 +293,14 @@ impl BlockfrostREST { handle_account_mirs_blockfrost, ); + // Handler for /accounts/{stake_address}/withdrawals + register_handler( + context.clone(), + DEFAULT_HANDLE_ACCOUNT_WITHDRAWALS_TOPIC, + handlers_config.clone(), + handle_account_withdrawals_blockfrost, + ); + // Handler for /blocks/latest, /blocks/{hash_or_number} register_handler( context.clone(), diff --git a/modules/tx_unpacker/src/tx_unpacker.rs b/modules/tx_unpacker/src/tx_unpacker.rs index bd3025b8..ab93de13 100644 --- a/modules/tx_unpacker/src/tx_unpacker.rs +++ b/modules/tx_unpacker/src/tx_unpacker.rs @@ -286,11 +286,11 @@ impl TxUnpacker { match StakeAddress::from_binary(key) { Ok(stake_address) => { withdrawals.push(Withdrawal { - address: stake_address, - value, - }); - } - + address: stake_address, + value, + tx_identifier + }); + } Err(e) => error!("Bad stake address: {e:#}"), } }