From 5715b5a550c4290aa2fdd8bf047d8909da18cc90 Mon Sep 17 00:00:00 2001 From: William Hankins Date: Wed, 29 Oct 2025 18:38:22 +0000 Subject: [PATCH 1/9] refactor: pay MIRs before epoch transition and include reward type in StakeRewardDeltas Signed-off-by: William Hankins --- common/src/stake_addresses.rs | 7 + common/src/types.rs | 11 +- modules/accounts_state/src/rewards.rs | 9 +- modules/accounts_state/src/state.rs | 173 ++++++++----------------- modules/accounts_state/src/verifier.rs | 4 +- modules/spo_state/src/state.rs | 2 +- 6 files changed, 79 insertions(+), 127 deletions(-) diff --git a/common/src/stake_addresses.rs b/common/src/stake_addresses.rs index 3d762e59..3e3d3b26 100644 --- a/common/src/stake_addresses.rs +++ b/common/src/stake_addresses.rs @@ -535,6 +535,13 @@ impl StakeAddressMap { update_value_with_delta(&mut sas.rewards, delta) } + pub fn pay_reward(&mut self, stake_address: &StakeAddress, delta: u64) -> Result<()> { + let sas = self.entry(stake_address.clone()).or_default(); + sas.rewards = + sas.rewards.checked_add(delta).ok_or_else(|| anyhow::anyhow!("reward overflow"))?; + Ok(()) + } + /// Update utxo value with delta pub fn update_utxo_value(&mut self, stake_address: &StakeAddress, delta: i64) -> Result<()> { let sas = self.entry(stake_address.clone()).or_default(); diff --git a/common/src/types.rs b/common/src/types.rs index 6836216a..062a1d17 100644 --- a/common/src/types.rs +++ b/common/src/types.rs @@ -207,7 +207,16 @@ pub struct StakeAddressDelta { #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub struct StakeRewardDelta { pub stake_address: StakeAddress, - pub delta: i64, + pub delta: u64, + pub reward_type: RewardType, +} + +/// Type of reward being given +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub enum RewardType { + Leader, + Member, + PoolRefund, } pub type PolicyId = [u8; 28]; diff --git a/modules/accounts_state/src/rewards.rs b/modules/accounts_state/src/rewards.rs index 7632ba54..ae8c3517 100644 --- a/modules/accounts_state/src/rewards.rs +++ b/modules/accounts_state/src/rewards.rs @@ -1,6 +1,7 @@ //! Acropolis AccountsState: rewards calculations use crate::snapshot::{Snapshot, SnapshotSPO}; +use acropolis_common::RewardType; use acropolis_common::{ protocol_params::ShelleyParams, rational_number::RationalNumber, KeyHash, Lovelace, SPORewards, StakeAddress, @@ -13,13 +14,6 @@ use std::collections::{BTreeMap, HashSet}; use std::sync::Arc; use tracing::{debug, info, warn}; -/// Type of reward being given -#[derive(Debug, Clone)] -pub enum RewardType { - Leader, - Member, -} - /// Reward Detail #[derive(Debug, Clone)] pub struct RewardDetail { @@ -199,6 +193,7 @@ pub fn calculate_rewards( num_delegators_paid += 1; total_paid_to_delegators += reward.amount; } + RewardType::PoolRefund => {} } spo_rewards.total_rewards += reward.amount; result.total_paid += reward.amount; diff --git a/modules/accounts_state/src/state.rs b/modules/accounts_state/src/state.rs index c26a2737..d14c0445 100644 --- a/modules/accounts_state/src/state.rs +++ b/modules/accounts_state/src/state.rs @@ -4,6 +4,7 @@ use crate::rewards::{calculate_rewards, RewardsResult}; use crate::snapshot::Snapshot; use crate::verifier::Verifier; use acropolis_common::queries::accounts::OptimalPoolSizing; +use acropolis_common::RewardType; use acropolis_common::{ math::update_value_with_delta, messages::{ @@ -116,12 +117,6 @@ pub struct State { /// Pool refunds to apply next epoch (list of reward accounts to refund to) pool_refunds: Vec, - /// Stake address refunds to apply next epoch - stake_refunds: Vec<(StakeAddress, Lovelace)>, - - /// MIRs to pay next epoch - mirs: Vec, - /// Addresses registration changes in current epoch current_epoch_registration_changes: Arc>>, @@ -305,7 +300,6 @@ impl State { // Pay MIRs before snapshot, so reserves is correct for total_supply in rewards let mut reward_deltas = Vec::::new(); - reward_deltas.extend(self.pay_mirs()); // Capture a new snapshot for the end of the previous epoch and push it to state let snapshot = Snapshot::new( @@ -324,7 +318,6 @@ impl State { // Pay the refunds after snapshot, so they don't appear in active_stake reward_deltas.extend(self.pay_pool_refunds()); - reward_deltas.extend(self.pay_stake_refunds()); // Verify pots state verifier.verify_pots(epoch, &self.pots); @@ -479,7 +472,8 @@ impl State { if stake_addresses.is_registered(&stake_address) { reward_deltas.push(StakeRewardDelta { stake_address: stake_address.clone(), - delta: deposit as i64, + delta: deposit, + reward_type: RewardType::PoolRefund, }); stake_addresses.add_to_reward(&stake_address, deposit); } else { @@ -496,101 +490,62 @@ impl State { reward_deltas } - /// Pay stake address refunds - fn pay_stake_refunds(&mut self) -> Vec { - let mut reward_deltas = Vec::::new(); - - let refunds = take(&mut self.stake_refunds); - if !refunds.is_empty() { - info!( - "{} deregistered stake addresses, total refunds {}", - refunds.len(), - refunds.iter().map(|(_, n)| n).sum::() - ); - } - - // Send them their deposits back - for (stake_address, deposit) in refunds { - let mut stake_addresses = self.stake_addresses.lock().unwrap(); - reward_deltas.push(StakeRewardDelta { - stake_address: stake_address.clone(), // Extract hash for delta - delta: deposit as i64, - }); - stake_addresses.add_to_reward(&stake_address, deposit); - self.pots.deposits -= deposit; - } - - reward_deltas - } - /// Pay MIRs - fn pay_mirs(&mut self) -> Vec { - let mut reward_deltas = Vec::::new(); + fn pay_mir(&mut self, mir: &MoveInstantaneousReward) { + let (source, source_name, other, other_name) = match &mir.source { + InstantaneousRewardSource::Reserves => ( + &mut self.pots.reserves, + "reserves", + &mut self.pots.treasury, + "treasury", + ), + InstantaneousRewardSource::Treasury => ( + &mut self.pots.treasury, + "treasury", + &mut self.pots.reserves, + "reserves", + ), + }; - let mirs = take(&mut self.mirs); - for mir in mirs { - let (source, source_name, other, other_name) = match &mir.source { - InstantaneousRewardSource::Reserves => ( - &mut self.pots.reserves, - "reserves", - &mut self.pots.treasury, - "treasury", - ), - InstantaneousRewardSource::Treasury => ( - &mut self.pots.treasury, - "treasury", - &mut self.pots.reserves, - "reserves", - ), - }; + match &mir.target { + InstantaneousRewardTarget::StakeAddresses(deltas) => { + // Transfer to a stake addresses from a pot + let mut total_value: u64 = 0; + for (stake_address, value) in deltas.iter() { + // Get old stake address state, or create one + let mut stake_addresses = self.stake_addresses.lock().unwrap(); + let sas = stake_addresses.entry(stake_address.clone()).or_default(); + + if let Err(e) = update_value_with_delta(&mut sas.rewards, *value) { + error!("MIR to stake address {}: {e}", stake_address); + } - match &mir.target { - InstantaneousRewardTarget::StakeAddresses(deltas) => { - // Transfer to (in theory also from) stake addresses from (to) a pot - let mut total_value: u64 = 0; - for (stake_address, value) in deltas.iter() { - // Get old stake address state, or create one - let mut stake_addresses = self.stake_addresses.lock().unwrap(); - let sas = stake_addresses.entry(stake_address.clone()).or_default(); - - // Add to this one - reward_deltas.push(StakeRewardDelta { - stake_address: stake_address.clone(), - delta: *value, - }); - if let Err(e) = update_value_with_delta(&mut sas.rewards, *value) { - error!("MIR to stake address {}: {e}", stake_address); - } - - // Update the source - if let Err(e) = update_value_with_delta(source, -*value) { - error!("MIR from {source_name}: {e}"); - } - - let _ = update_value_with_delta(&mut total_value, *value); + // Update the source + if let Err(e) = update_value_with_delta(source, -*value) { + error!("MIR from {source_name}: {e}"); } - info!( - "MIR of {total_value} to {} stake addresses from {source_name}", - deltas.len() - ); + let _ = update_value_with_delta(&mut total_value, *value); } - InstantaneousRewardTarget::OtherAccountingPot(value) => { - // Transfer between pots - if let Err(e) = update_value_with_delta(source, -(*value as i64)) { - error!("MIR from {source_name}: {e}"); - } - if let Err(e) = update_value_with_delta(other, *value as i64) { - error!("MIR to {other_name}: {e}"); - } + info!( + "MIR of {total_value} to {} stake addresses from {source_name}", + deltas.len() + ); + } - info!("MIR of {value} from {source_name} to {other_name}"); + InstantaneousRewardTarget::OtherAccountingPot(value) => { + // Transfer between pots + if let Err(e) = update_value_with_delta(source, -(*value as i64)) { + error!("MIR from {source_name}: {e}"); + } + if let Err(e) = update_value_with_delta(other, *value as i64) { + error!("MIR to {other_name}: {e}"); } + + info!("MIR of {value} from {source_name} to {other_name}"); } } - - reward_deltas } /// Derive the Stake Pool Delegation Distribution (SPDD) - a map of total stake values @@ -665,7 +620,8 @@ impl State { .iter() .map(|reward| StakeRewardDelta { stake_address: reward.account.clone(), - delta: reward.amount as i64, + delta: reward.amount, + reward_type: reward.rtype.clone(), }) .collect::>(), ); @@ -835,8 +791,7 @@ impl State { } }; - // Schedule refund - self.stake_refunds.push((stake_address.clone(), deposit)); + self.pots.deposits -= deposit; // Add to registration changes self.current_epoch_registration_changes.lock().unwrap().push(RegistrationChange { @@ -856,12 +811,6 @@ impl State { stake_addresses.record_stake_delegation(stake_address, spo); } - /// Handle an MoveInstantaneousReward (pre-Conway only) - pub fn handle_mir(&mut self, mir: &MoveInstantaneousReward) -> Result<()> { - self.mirs.push(mir.clone()); - Ok(()) - } - /// record a drep delegation fn record_drep_delegation(&mut self, stake_address: &StakeAddress, drep: &DRepChoice) { let mut stake_addresses = self.stake_addresses.lock().unwrap(); @@ -882,7 +831,7 @@ impl State { } TxCertificate::MoveInstantaneousReward(mir) => { - self.handle_mir(mir).unwrap_or_else(|e| error!("MIR failed: {e:#}")); + self.pay_mir(mir); } TxCertificate::Registration(reg) => { @@ -1188,8 +1137,7 @@ mod tests { target: InstantaneousRewardTarget::OtherAccountingPot(42), }; - state.handle_mir(&mir).unwrap(); - state.pay_mirs(); + state.pay_mir(&mir); assert_eq!(state.pots.reserves, 58); assert_eq!(state.pots.treasury, 42); assert_eq!(state.pots.deposits, 0); @@ -1200,9 +1148,7 @@ mod tests { target: InstantaneousRewardTarget::OtherAccountingPot(10), }; - state.handle_mir(&mir).unwrap(); - let reward_deltas = state.pay_mirs(); - assert_eq!(reward_deltas.len(), 0); + state.pay_mir(&mir); assert_eq!(state.pots.reserves, 68); assert_eq!(state.pots.treasury, 32); assert_eq!(state.pots.deposits, 0); @@ -1244,8 +1190,7 @@ mod tests { ]), }; - state.handle_mir(&mir).unwrap(); - state.pay_mirs(); + state.pay_mir(&mir); assert_eq!(state.pots.reserves, 58); assert_eq!(state.pots.treasury, 0); assert_eq!(state.pots.deposits, 2_000_000); // Paid deposit @@ -1290,16 +1235,12 @@ mod tests { target: InstantaneousRewardTarget::StakeAddresses(vec![(stake_address.clone(), 42)]), }; - state.handle_mir(&mir).unwrap(); - let diffs = state.pay_mirs(); - assert_eq!(state.pots.reserves, 58); - assert_eq!(diffs.len(), 1); - assert_eq!(diffs[0].stake_address.get_hash(), stake_address.get_hash()); - assert_eq!(diffs[0].delta, 42); + state.pay_mir(&mir); { let stake_addresses = state.stake_addresses.lock().unwrap(); let sas = stake_addresses.get(&stake_address).unwrap(); + assert_eq!(state.pots.reserves, 58); assert_eq!(sas.rewards, 42); } diff --git a/modules/accounts_state/src/verifier.rs b/modules/accounts_state/src/verifier.rs index f34924af..3b313a1a 100644 --- a/modules/accounts_state/src/verifier.rs +++ b/modules/accounts_state/src/verifier.rs @@ -1,7 +1,7 @@ //! Verification of calculated values against captured CSV from Haskell node / DBSync -use crate::rewards::{RewardDetail, RewardType, RewardsResult}; +use crate::rewards::{RewardDetail, RewardsResult}; use crate::state::Pots; -use acropolis_common::{KeyHash, StakeAddress}; +use acropolis_common::{KeyHash, RewardType, StakeAddress}; use hex::FromHex; use itertools::EitherOrBoth::{Both, Left, Right}; use itertools::Itertools; diff --git a/modules/spo_state/src/state.rs b/modules/spo_state/src/state.rs index a3274f68..59e730fe 100644 --- a/modules/spo_state/src/state.rs +++ b/modules/spo_state/src/state.rs @@ -648,7 +648,7 @@ impl State { // Handle deltas for delta in reward_deltas_msg.deltas.iter() { let mut stake_addresses = stake_addresses.lock().unwrap(); - if let Err(e) = stake_addresses.update_reward(&delta.stake_address, delta.delta) { + if let Err(e) = stake_addresses.pay_reward(&delta.stake_address, delta.delta) { error!("Updating reward account {}: {e}", delta.stake_address); } } From c68fba07fac4cb8f21adbdab5ec654fe27146fb8 Mon Sep 17 00:00:00 2001 From: William Hankins Date: Wed, 29 Oct 2025 21:03:32 +0000 Subject: [PATCH 2/9] feat: implement reward history getter and query handler Signed-off-by: William Hankins --- common/src/queries/accounts.rs | 21 +++++++--- .../src/historical_accounts_state.rs | 9 +++++ .../src/immutable_historical_account_store.rs | 6 +-- .../historical_accounts_state/src/state.rs | 38 +++++++++---------- 4 files changed, 47 insertions(+), 27 deletions(-) diff --git a/common/src/queries/accounts.rs b/common/src/queries/accounts.rs index b075c356..f7567d19 100644 --- a/common/src/queries/accounts.rs +++ b/common/src/queries/accounts.rs @@ -13,7 +13,7 @@ pub const DEFAULT_HISTORICAL_ACCOUNTS_QUERY_TOPIC: (&str, &str) = ( #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub enum AccountsStateQuery { GetAccountInfo { stake_address: StakeAddress }, - GetAccountRewardHistory { stake_key: Vec }, + GetAccountRewardHistory { account: StakeAddress }, GetAccountHistory { stake_key: Vec }, GetAccountRegistrationHistory { account: StakeAddress }, GetAccountDelegationHistory { account: StakeAddress }, @@ -47,7 +47,7 @@ pub enum AccountsStateQuery { #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub enum AccountsStateQueryResponse { AccountInfo(AccountInfo), - AccountRewardHistory(AccountRewardHistory), + AccountRewardHistory(Vec), AccountHistory(AccountHistory), AccountRegistrationHistory(Vec), AccountDelegationHistory(Vec), @@ -91,9 +91,6 @@ pub struct AccountInfo { pub delegated_drep: Option, } -#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] -pub struct AccountRewardHistory {} - #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub struct AccountHistory {} @@ -150,6 +147,20 @@ pub struct AccountWithdrawal { pub amount: u64, } +#[derive( + Debug, Clone, minicbor::Decode, minicbor::Encode, serde::Serialize, serde::Deserialize, +)] +pub struct RewardHistory { + #[n(0)] + pub epoch: u32, + #[n(1)] + pub amount: u64, + #[n(2)] + pub pool: PoolId, + #[n(3)] + pub is_owner: bool, +} + #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub struct AccountWithdrawalHistory {} diff --git a/modules/historical_accounts_state/src/historical_accounts_state.rs b/modules/historical_accounts_state/src/historical_accounts_state.rs index 9633c500..0d3666a0 100644 --- a/modules/historical_accounts_state/src/historical_accounts_state.rs +++ b/modules/historical_accounts_state/src/historical_accounts_state.rs @@ -348,6 +348,15 @@ impl HistoricalAccountsState { Err(e) => AccountsStateQueryResponse::Error(e.to_string()), } } + AccountsStateQuery::GetAccountRewardHistory { account } => { + match state.lock().await.get_reward_history(account).await { + Ok(Some(rewards)) => { + AccountsStateQueryResponse::AccountRewardHistory(rewards) + } + Ok(None) => AccountsStateQueryResponse::NotFound, + Err(e) => AccountsStateQueryResponse::Error(e.to_string()), + } + } _ => AccountsStateQueryResponse::Error(format!( "Unimplemented query variant: {:?}", query diff --git a/modules/historical_accounts_state/src/immutable_historical_account_store.rs b/modules/historical_accounts_state/src/immutable_historical_account_store.rs index eb95c929..f57462f9 100644 --- a/modules/historical_accounts_state/src/immutable_historical_account_store.rs +++ b/modules/historical_accounts_state/src/immutable_historical_account_store.rs @@ -1,7 +1,7 @@ use std::{collections::HashMap, path::Path}; use acropolis_common::{ - queries::accounts::{AccountWithdrawal, DelegationUpdate, RegistrationUpdate}, + queries::accounts::{AccountWithdrawal, DelegationUpdate, RegistrationUpdate, RewardHistory}, ShelleyAddress, StakeAddress, }; use anyhow::Result; @@ -11,7 +11,7 @@ use rayon::iter::{IntoParallelIterator, ParallelIterator}; use tokio::sync::Mutex; use tracing::{debug, error}; -use crate::state::{AccountEntry, ActiveStakeHistory, HistoricalAccountsConfig, RewardHistory}; +use crate::state::{AccountEntry, ActiveStakeHistory, HistoricalAccountsConfig}; pub struct ImmutableHistoricalAccountStore { rewards_history: Partition, @@ -152,7 +152,7 @@ impl ImmutableHistoricalAccountStore { pending.extend(drained); } - pub async fn _get_rewards_history( + pub async fn get_reward_history( &self, account: &StakeAddress, ) -> Result>> { diff --git a/modules/historical_accounts_state/src/state.rs b/modules/historical_accounts_state/src/state.rs index 4ceef42c..cceb4b95 100644 --- a/modules/historical_accounts_state/src/state.rs +++ b/modules/historical_accounts_state/src/state.rs @@ -8,10 +8,10 @@ use acropolis_common::{ AddressDeltasMessage, StakeRewardDeltasMessage, TxCertificatesMessage, WithdrawalsMessage, }, queries::accounts::{ - AccountWithdrawal, DelegationUpdate, RegistrationStatus, RegistrationUpdate, + AccountWithdrawal, DelegationUpdate, RegistrationStatus, RegistrationUpdate, RewardHistory, }, - BlockInfo, InstantaneousRewardTarget, PoolId, ShelleyAddress, StakeAddress, StakeCredential, - TxCertificate, TxIdentifier, + BlockInfo, InstantaneousRewardTarget, PoolId, ShelleyAddress, StakeAddress, TxCertificate, + TxIdentifier, }; use tracing::warn; @@ -33,18 +33,6 @@ pub struct AccountEntry { pub addresses: Option>, } -#[derive(Debug, Clone, minicbor::Decode, minicbor::Encode)] -pub struct RewardHistory { - #[n(0)] - pub epoch: u32, - #[n(1)] - pub amount: u64, - #[n(2)] - pub pool: PoolId, - #[n(3)] - pub is_owner: bool, -} - #[derive(Debug, Clone, minicbor::Decode, minicbor::Encode)] pub struct ActiveStakeHistory { #[n(0)] @@ -239,11 +227,23 @@ impl State { } } - pub async fn _get_reward_history( + pub async fn get_reward_history( &self, - _account: &StakeCredential, - ) -> Result> { - Ok(Vec::new()) + account: &StakeAddress, + ) -> Result>> { + let immutable = self.immutable.get_reward_history(account).await?; + + let mut volatile = Vec::new(); + self.merge_volatile_history(account, |e| e.reward_history.as_ref(), &mut volatile); + + match immutable { + Some(mut rewards) => { + rewards.extend(volatile); + Ok(Some(rewards)) + } + None if volatile.is_empty() => Ok(None), + None => Ok(Some(volatile)), + } } pub async fn _get_active_stake_history( From 262a5e322d2ef57fbf17213cd2d9d5e11d6cdd7c Mon Sep 17 00:00:00 2001 From: William Hankins Date: Wed, 29 Oct 2025 21:12:53 +0000 Subject: [PATCH 3/9] feat: reward delta processing in historical accounts state Signed-off-by: William Hankins --- common/src/queries/accounts.rs | 6 ++++-- common/src/types.rs | 7 ++++++- .../src/historical_accounts_state.rs | 5 +---- modules/historical_accounts_state/src/state.rs | 14 ++++++++++++-- 4 files changed, 23 insertions(+), 9 deletions(-) diff --git a/common/src/queries/accounts.rs b/common/src/queries/accounts.rs index f7567d19..50375209 100644 --- a/common/src/queries/accounts.rs +++ b/common/src/queries/accounts.rs @@ -1,6 +1,8 @@ use std::collections::HashMap; -use crate::{DRepChoice, KeyHash, PoolId, PoolLiveStakeInfo, StakeAddress, TxIdentifier}; +use crate::{ + DRepChoice, KeyHash, PoolId, PoolLiveStakeInfo, RewardType, StakeAddress, TxIdentifier, +}; pub const DEFAULT_ACCOUNTS_QUERY_TOPIC: (&str, &str) = ("accounts-state-query-topic", "cardano.query.accounts"); @@ -158,7 +160,7 @@ pub struct RewardHistory { #[n(2)] pub pool: PoolId, #[n(3)] - pub is_owner: bool, + pub reward_type: RewardType, } #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] diff --git a/common/src/types.rs b/common/src/types.rs index e3ffb9a5..89f7ab1d 100644 --- a/common/src/types.rs +++ b/common/src/types.rs @@ -217,10 +217,15 @@ pub struct StakeRewardDelta { } /// Type of reward being given -#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +#[derive( + Debug, Clone, minicbor::Encode, minicbor::Decode, serde::Serialize, serde::Deserialize, +)] pub enum RewardType { + #[n(0)] Leader, + #[n(1)] Member, + #[n(2)] PoolRefund, } diff --git a/modules/historical_accounts_state/src/historical_accounts_state.rs b/modules/historical_accounts_state/src/historical_accounts_state.rs index 0d3666a0..99d07d30 100644 --- a/modules/historical_accounts_state/src/historical_accounts_state.rs +++ b/modules/historical_accounts_state/src/historical_accounts_state.rs @@ -126,10 +126,7 @@ impl HistoricalAccountsState { { Self::check_sync(¤t_block, block_info); let mut state = state_mutex.lock().await; - state - .handle_rewards(rewards_msg) - .inspect_err(|e| error!("Reward deltas handling error: {e:#}")) - .ok(); + state.handle_rewards(rewards_msg, block_info.epoch as u32); } } diff --git a/modules/historical_accounts_state/src/state.rs b/modules/historical_accounts_state/src/state.rs index cceb4b95..b06aeb00 100644 --- a/modules/historical_accounts_state/src/state.rs +++ b/modules/historical_accounts_state/src/state.rs @@ -104,8 +104,18 @@ impl State { && block_info.number > self.volatile.epoch_start_block + self.volatile.security_param_k } - pub fn handle_rewards(&mut self, _reward_deltas: &StakeRewardDeltasMessage) -> Result<()> { - Ok(()) + pub fn handle_rewards(&mut self, reward_deltas: &StakeRewardDeltasMessage, epoch: u32) { + let volatile = self.volatile.window.back_mut().expect("window should never be empty"); + for reward in reward_deltas.deltas.iter() { + let entry = volatile.entry(reward.stake_address.clone()).or_default(); + let update = RewardHistory { + epoch, + amount: reward.delta, + pool: Vec::new(), + reward_type: reward.reward_type.clone(), + }; + entry.reward_history.get_or_insert_with(Vec::new).push(update); + } } pub fn handle_tx_certificates(&mut self, tx_certs: &TxCertificatesMessage, epoch: u32) { From 0d25e5f9656ae50d988b91bc03010f1a70995bbe Mon Sep 17 00:00:00 2001 From: William Hankins Date: Wed, 29 Oct 2025 21:26:26 +0000 Subject: [PATCH 4/9] refactor: expand StakeRewardDelta with pool field Signed-off-by: William Hankins --- common/src/types.rs | 1 + modules/accounts_state/src/rewards.rs | 7 ++++++- modules/accounts_state/src/state.rs | 17 ++++++++++++----- modules/accounts_state/src/verifier.rs | 3 ++- modules/historical_accounts_state/src/state.rs | 2 +- 5 files changed, 22 insertions(+), 8 deletions(-) diff --git a/common/src/types.rs b/common/src/types.rs index 89f7ab1d..cbe05d7e 100644 --- a/common/src/types.rs +++ b/common/src/types.rs @@ -214,6 +214,7 @@ pub struct StakeRewardDelta { pub stake_address: StakeAddress, pub delta: u64, pub reward_type: RewardType, + pub pool: PoolId, } /// Type of reward being given diff --git a/modules/accounts_state/src/rewards.rs b/modules/accounts_state/src/rewards.rs index 1bda7626..0ad3d6d1 100644 --- a/modules/accounts_state/src/rewards.rs +++ b/modules/accounts_state/src/rewards.rs @@ -1,11 +1,11 @@ //! Acropolis AccountsState: rewards calculations use crate::snapshot::{Snapshot, SnapshotSPO}; -use acropolis_common::RewardType; use acropolis_common::{ protocol_params::ShelleyParams, rational_number::RationalNumber, KeyHash, Lovelace, SPORewards, StakeAddress, }; +use acropolis_common::{PoolId, RewardType}; use anyhow::{bail, Result}; use bigdecimal::{BigDecimal, One, ToPrimitive, Zero}; use std::cmp::min; @@ -25,6 +25,9 @@ pub struct RewardDetail { /// Reward amount pub amount: Lovelace, + + // Pool that reward came from + pub pool: PoolId, } /// Result of a rewards calculation @@ -386,6 +389,7 @@ fn calculate_spo_rewards( account: delegator_stake_address.clone(), rtype: RewardType::Member, amount: to_pay, + pool: operator_id.to_vec(), }); total_paid += to_pay; delegators_paid += 1; @@ -403,6 +407,7 @@ fn calculate_spo_rewards( account: spo.reward_account.clone(), rtype: RewardType::Leader, amount: spo_benefit, + pool: operator_id.to_vec(), }); } else { info!( diff --git a/modules/accounts_state/src/state.rs b/modules/accounts_state/src/state.rs index a617e4af..df0fd95c 100644 --- a/modules/accounts_state/src/state.rs +++ b/modules/accounts_state/src/state.rs @@ -15,8 +15,9 @@ use acropolis_common::{ protocol_params::ProtocolParams, stake_addresses::{StakeAddressMap, StakeAddressState}, BlockInfo, DRepChoice, DRepCredential, DelegatedStake, InstantaneousRewardSource, - InstantaneousRewardTarget, KeyHash, Lovelace, MoveInstantaneousReward, PoolLiveStakeInfo, - PoolRegistration, Pot, SPORewards, StakeAddress, StakeRewardDelta, TxCertificate, + InstantaneousRewardTarget, KeyHash, Lovelace, MoveInstantaneousReward, PoolId, + PoolLiveStakeInfo, PoolRegistration, Pot, SPORewards, StakeAddress, StakeRewardDelta, + TxCertificate, }; use anyhow::Result; use imbl::OrdMap; @@ -115,7 +116,7 @@ pub struct State { previous_protocol_parameters: Option, /// Pool refunds to apply next epoch (list of reward accounts to refund to) - pool_refunds: Vec, + pool_refunds: Vec<(PoolId, StakeAddress)>, /// Addresses registration changes in current epoch current_epoch_registration_changes: Arc>>, @@ -466,7 +467,7 @@ impl State { } // Send them their deposits back - for stake_address in refunds { + for (pool, stake_address) in refunds { // If their reward account has been deregistered, it goes to Treasury let mut stake_addresses = self.stake_addresses.lock().unwrap(); if stake_addresses.is_registered(&stake_address) { @@ -474,6 +475,7 @@ impl State { stake_address: stake_address.clone(), delta: deposit, reward_type: RewardType::PoolRefund, + pool, }); stake_addresses.add_to_reward(&stake_address, deposit); } else { @@ -622,6 +624,7 @@ impl State { stake_address: reward.account.clone(), delta: reward.amount, reward_type: reward.rtype.clone(), + pool: reward.pool.clone(), }) .collect::>(), ); @@ -736,7 +739,11 @@ impl State { hex::encode(id), retired_spo.reward_account ); - self.pool_refunds.push(retired_spo.reward_account.clone()); // Store full StakeAddress + self.pool_refunds.push(( + retired_spo.operator.clone(), + retired_spo.reward_account.clone(), + )); + // Store full StakeAddress } // Schedule to retire - we need them to still be in place when we count diff --git a/modules/accounts_state/src/verifier.rs b/modules/accounts_state/src/verifier.rs index 3b313a1a..c296acfa 100644 --- a/modules/accounts_state/src/verifier.rs +++ b/modules/accounts_state/src/verifier.rs @@ -166,10 +166,11 @@ impl Verifier { continue; }; - expected_rewards.entry(spo).or_default().push(RewardDetail { + expected_rewards.entry(spo.clone()).or_default().push(RewardDetail { account: stake_address, rtype, amount, + pool: spo, }); } diff --git a/modules/historical_accounts_state/src/state.rs b/modules/historical_accounts_state/src/state.rs index b06aeb00..a1c28523 100644 --- a/modules/historical_accounts_state/src/state.rs +++ b/modules/historical_accounts_state/src/state.rs @@ -111,7 +111,7 @@ impl State { let update = RewardHistory { epoch, amount: reward.delta, - pool: Vec::new(), + pool: reward.pool.clone(), reward_type: reward.reward_type.clone(), }; entry.reward_history.get_or_insert_with(Vec::new).push(update); From 533345d2be1bbc7aae125111e7d37b79334b4d04 Mon Sep 17 00:00:00 2001 From: William Hankins Date: Wed, 29 Oct 2025 21:58:36 +0000 Subject: [PATCH 5/9] feat: add account rewards REST handler Signed-off-by: William Hankins --- common/src/queries/accounts.rs | 4 +- common/src/types.rs | 10 +++ .../src/immutable_historical_account_store.rs | 6 +- .../historical_accounts_state/src/state.rs | 8 +- .../rest_blockfrost/src/handlers/accounts.rs | 73 ++++++++++++++++++- .../rest_blockfrost/src/rest_blockfrost.rs | 26 +++++-- modules/rest_blockfrost/src/types.rs | 26 ++++++- 7 files changed, 130 insertions(+), 23 deletions(-) diff --git a/common/src/queries/accounts.rs b/common/src/queries/accounts.rs index 50375209..ea6fd32e 100644 --- a/common/src/queries/accounts.rs +++ b/common/src/queries/accounts.rs @@ -49,7 +49,7 @@ pub enum AccountsStateQuery { #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub enum AccountsStateQueryResponse { AccountInfo(AccountInfo), - AccountRewardHistory(Vec), + AccountRewardHistory(Vec), AccountHistory(AccountHistory), AccountRegistrationHistory(Vec), AccountDelegationHistory(Vec), @@ -152,7 +152,7 @@ pub struct AccountWithdrawal { #[derive( Debug, Clone, minicbor::Decode, minicbor::Encode, serde::Serialize, serde::Deserialize, )] -pub struct RewardHistory { +pub struct AccountReward { #[n(0)] pub epoch: u32, #[n(1)] diff --git a/common/src/types.rs b/common/src/types.rs index cbe05d7e..4dff8ac6 100644 --- a/common/src/types.rs +++ b/common/src/types.rs @@ -230,6 +230,16 @@ pub enum RewardType { PoolRefund, } +impl fmt::Display for RewardType { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + RewardType::Leader => write!(f, "leader"), + RewardType::Member => write!(f, "member"), + RewardType::PoolRefund => write!(f, "pool_deposit_refund"), + } + } +} + pub type PolicyId = [u8; 28]; pub type NativeAssets = Vec<(PolicyId, Vec)>; pub type NativeAssetsDelta = Vec<(PolicyId, Vec)>; diff --git a/modules/historical_accounts_state/src/immutable_historical_account_store.rs b/modules/historical_accounts_state/src/immutable_historical_account_store.rs index f57462f9..904ec88c 100644 --- a/modules/historical_accounts_state/src/immutable_historical_account_store.rs +++ b/modules/historical_accounts_state/src/immutable_historical_account_store.rs @@ -1,7 +1,7 @@ use std::{collections::HashMap, path::Path}; use acropolis_common::{ - queries::accounts::{AccountWithdrawal, DelegationUpdate, RegistrationUpdate, RewardHistory}, + queries::accounts::{AccountReward, AccountWithdrawal, DelegationUpdate, RegistrationUpdate}, ShelleyAddress, StakeAddress, }; use anyhow::Result; @@ -155,9 +155,9 @@ impl ImmutableHistoricalAccountStore { pub async fn get_reward_history( &self, account: &StakeAddress, - ) -> Result>> { + ) -> Result>> { let mut immutable_rewards = - self.collect_partition::(&self.rewards_history, account.get_hash())?; + self.collect_partition::(&self.rewards_history, account.get_hash())?; self.merge_pending( account, diff --git a/modules/historical_accounts_state/src/state.rs b/modules/historical_accounts_state/src/state.rs index a1c28523..ee17f57c 100644 --- a/modules/historical_accounts_state/src/state.rs +++ b/modules/historical_accounts_state/src/state.rs @@ -8,7 +8,7 @@ use acropolis_common::{ AddressDeltasMessage, StakeRewardDeltasMessage, TxCertificatesMessage, WithdrawalsMessage, }, queries::accounts::{ - AccountWithdrawal, DelegationUpdate, RegistrationStatus, RegistrationUpdate, RewardHistory, + AccountReward, AccountWithdrawal, DelegationUpdate, RegistrationStatus, RegistrationUpdate, }, BlockInfo, InstantaneousRewardTarget, PoolId, ShelleyAddress, StakeAddress, TxCertificate, TxIdentifier, @@ -24,7 +24,7 @@ use anyhow::Result; #[derive(Debug, Default, Clone)] pub struct AccountEntry { - pub reward_history: Option>, + pub reward_history: Option>, pub active_stake_history: Option>, pub delegation_history: Option>, pub registration_history: Option>, @@ -108,7 +108,7 @@ impl State { let volatile = self.volatile.window.back_mut().expect("window should never be empty"); for reward in reward_deltas.deltas.iter() { let entry = volatile.entry(reward.stake_address.clone()).or_default(); - let update = RewardHistory { + let update = AccountReward { epoch, amount: reward.delta, pool: reward.pool.clone(), @@ -240,7 +240,7 @@ impl State { pub async fn get_reward_history( &self, account: &StakeAddress, - ) -> Result>> { + ) -> Result>> { let immutable = self.immutable.get_reward_history(account).await?; let mut volatile = Vec::new(); diff --git a/modules/rest_blockfrost/src/handlers/accounts.rs b/modules/rest_blockfrost/src/handlers/accounts.rs index ef26c4aa..86fb0080 100644 --- a/modules/rest_blockfrost/src/handlers/accounts.rs +++ b/modules/rest_blockfrost/src/handlers/accounts.rs @@ -13,7 +13,9 @@ use anyhow::{anyhow, Result}; use caryatid_sdk::Context; use crate::handlers_config::HandlersConfig; -use crate::types::{AccountWithdrawalREST, DelegationUpdateREST, RegistrationUpdateREST}; +use crate::types::{ + AccountRewardREST, AccountWithdrawalREST, DelegationUpdateREST, RegistrationUpdateREST, +}; #[derive(serde::Serialize)] pub struct StakeAccountRest { @@ -434,8 +436,8 @@ pub async fn handle_account_withdrawals_blockfrost( msg, |message| match message { Message::StateQueryResponse(StateQueryResponse::Accounts( - AccountsStateQueryResponse::AccountWithdrawalHistory(registrations), - )) => Ok(Some(registrations)), + AccountsStateQueryResponse::AccountWithdrawalHistory(withdrawals), + )) => Ok(Some(withdrawals)), Message::StateQueryResponse(StateQueryResponse::Accounts( AccountsStateQueryResponse::NotFound, )) => Ok(None), @@ -505,6 +507,71 @@ pub async fn handle_account_withdrawals_blockfrost( } } +pub async fn handle_account_rewards_blockfrost( + context: Arc>, + params: Vec, + handlers_config: Arc, +) -> Result { + let stake_address = match parse_stake_address(¶ms) { + Ok(addr) => addr, + Err(resp) => return Ok(resp), + }; + + // Prepare the message + let msg = Arc::new(Message::StateQuery(StateQuery::Accounts( + AccountsStateQuery::GetAccountRegistrationHistory { + account: stake_address, + }, + ))); + + // Get rewards from historical accounts state + let rewards = query_state( + &context, + &handlers_config.historical_accounts_query_topic, + msg, + |message| match message { + Message::StateQueryResponse(StateQueryResponse::Accounts( + AccountsStateQueryResponse::AccountRewardHistory(rewards), + )) => Ok(Some(rewards)), + Message::StateQueryResponse(StateQueryResponse::Accounts( + AccountsStateQueryResponse::NotFound, + )) => Ok(None), + Message::StateQueryResponse(StateQueryResponse::Accounts( + AccountsStateQueryResponse::Error(e), + )) => Err(anyhow::anyhow!( + "Internal server error while retrieving account info: {e}" + )), + _ => Err(anyhow::anyhow!( + "Unexpected message type while retrieving account info" + )), + }, + ) + .await?; + + let Some(rewards) = rewards else { + return Ok(RESTResponse::with_text(404, "Account not found")); + }; + + let rest_response = + match rewards.iter().map(|r| r.try_into()).collect::, _>>() { + Ok(v) => v, + Err(e) => { + return Ok(RESTResponse::with_text( + 500, + &format!("Failed to convert reward entry: {e}"), + )) + } + }; + + match serde_json::to_string_pretty(&rest_response) { + Ok(json) => Ok(RESTResponse::with_json(200, &json)), + Err(e) => Ok(RESTResponse::with_text( + 500, + &format!("Internal server error while serializing withdrawal history: {e}"), + )), + } +} + fn parse_stake_address(params: &[String]) -> Result { let Some(stake_key) = params.first() else { return Err(RESTResponse::with_text( diff --git a/modules/rest_blockfrost/src/rest_blockfrost.rs b/modules/rest_blockfrost/src/rest_blockfrost.rs index cbf72f73..57b6a7d4 100644 --- a/modules/rest_blockfrost/src/rest_blockfrost.rs +++ b/modules/rest_blockfrost/src/rest_blockfrost.rs @@ -16,7 +16,11 @@ mod handlers_config; mod types; mod utils; use handlers::{ - accounts::handle_single_account_blockfrost, + accounts::{ + handle_account_delegations_blockfrost, handle_account_mirs_blockfrost, + handle_account_registrations_blockfrost, handle_account_rewards_blockfrost, + handle_account_withdrawals_blockfrost, handle_single_account_blockfrost, + }, addresses::{ handle_address_asset_utxos_blockfrost, handle_address_extended_blockfrost, handle_address_single_blockfrost, handle_address_totals_blockfrost, @@ -58,13 +62,7 @@ use handlers::{ }, }; -use crate::{ - handlers::accounts::{ - handle_account_delegations_blockfrost, handle_account_mirs_blockfrost, - handle_account_registrations_blockfrost, handle_account_withdrawals_blockfrost, - }, - handlers_config::HandlersConfig, -}; +use crate::handlers_config::HandlersConfig; // Accounts topics const DEFAULT_HANDLE_SINGLE_ACCOUNT_TOPIC: (&str, &str) = @@ -83,6 +81,10 @@ const DEFAULT_HANDLE_ACCOUNT_WITHDRAWALS_TOPIC: (&str, &str) = ( "handle-topic-account-withdrawals", "rest.get.accounts.*.withdrawals", ); +const DEFAULT_HANDLE_ACCOUNT_REWARDS_TOPIC: (&str, &str) = ( + "handle-topic-account-rewards", + "rest.get.accounts.*.rewards", +); // Blocks topics const DEFAULT_HANDLE_BLOCKS_LATEST_HASH_NUMBER_TOPIC: (&str, &str) = @@ -301,6 +303,14 @@ impl BlockfrostREST { handle_account_withdrawals_blockfrost, ); + // Handler for /accounts/{stake_address}/rewards + register_handler( + context.clone(), + DEFAULT_HANDLE_ACCOUNT_REWARDS_TOPIC, + handlers_config.clone(), + handle_account_rewards_blockfrost, + ); + // Handler for /blocks/latest, /blocks/{hash_or_number} register_handler( context.clone(), diff --git a/modules/rest_blockfrost/src/types.rs b/modules/rest_blockfrost/src/types.rs index bfcf8fbf..117a081b 100644 --- a/modules/rest_blockfrost/src/types.rs +++ b/modules/rest_blockfrost/src/types.rs @@ -2,10 +2,9 @@ use crate::cost_models::{PLUTUS_V1, PLUTUS_V2, PLUTUS_V3}; use acropolis_common::{ messages::EpochActivityMessage, protocol_params::{Nonce, NonceVariant, ProtocolParams}, - queries::blocks::BlockInfo, - queries::governance::DRepActionUpdate, + queries::{accounts::AccountReward, blocks::BlockInfo, governance::DRepActionUpdate}, rest_helper::ToCheckedF64, - serialization::{DisplayFromBech32, PoolPrefix}, + serialization::{Bech32WithHrp, DisplayFromBech32, PoolPrefix}, AssetAddressEntry, AssetMetadataStandard, AssetMintRecord, KeyHash, PolicyAsset, PoolEpochState, PoolUpdateAction, Relay, TxHash, Vote, }; @@ -874,3 +873,24 @@ pub struct AccountWithdrawalREST { pub tx_hash: String, pub amount: String, } + +#[derive(Serialize)] +pub struct AccountRewardREST { + pub epoch: u32, + pub amount: String, + pub pool_id: String, + #[serde(rename = "type")] + pub reward_type: String, +} + +impl TryFrom<&AccountReward> for AccountRewardREST { + type Error = anyhow::Error; + fn try_from(value: &AccountReward) -> Result { + Ok(Self { + epoch: value.epoch, + amount: value.amount.to_string(), + pool_id: value.pool.to_bech32_with_hrp("pool")?, + reward_type: value.reward_type.to_string(), + }) + } +} From 06512fdb2202ad6c25a3dd8437537a2f5abf4ffd Mon Sep 17 00:00:00 2001 From: William Hankins Date: Wed, 29 Oct 2025 23:42:42 +0000 Subject: [PATCH 6/9] fix: prevent null AccountReward entries from causing CBOR decode errors Signed-off-by: William Hankins --- common/src/queries/accounts.rs | 2 +- modules/accounts_state/src/accounts_state.rs | 4 +- .../src/conway_voting_test.rs | 1 + .../src/historical_accounts_state.rs | 23 ++++++++- .../src/immutable_historical_account_store.rs | 7 +-- .../historical_accounts_state/src/state.rs | 2 +- .../rest_blockfrost/src/handlers/accounts.rs | 50 ++++++++----------- 7 files changed, 50 insertions(+), 39 deletions(-) diff --git a/common/src/queries/accounts.rs b/common/src/queries/accounts.rs index ea6fd32e..4f8fa2bd 100644 --- a/common/src/queries/accounts.rs +++ b/common/src/queries/accounts.rs @@ -14,7 +14,7 @@ pub const DEFAULT_HISTORICAL_ACCOUNTS_QUERY_TOPIC: (&str, &str) = ( #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub enum AccountsStateQuery { - GetAccountInfo { stake_address: StakeAddress }, + GetAccountInfo { account: StakeAddress }, GetAccountRewardHistory { account: StakeAddress }, GetAccountHistory { stake_key: Vec }, GetAccountRegistrationHistory { account: StakeAddress }, diff --git a/modules/accounts_state/src/accounts_state.rs b/modules/accounts_state/src/accounts_state.rs index 0b146d46..11a16def 100644 --- a/modules/accounts_state/src/accounts_state.rs +++ b/modules/accounts_state/src/accounts_state.rs @@ -505,8 +505,8 @@ impl AccountsState { }; let response = match query { - AccountsStateQuery::GetAccountInfo { stake_address } => { - if let Some(account) = state.get_stake_state(stake_address) { + AccountsStateQuery::GetAccountInfo { account } => { + if let Some(account) = state.get_stake_state(account) { AccountsStateQueryResponse::AccountInfo(AccountInfo { utxo_value: account.utxo_value, rewards: account.rewards, diff --git a/modules/governance_state/src/conway_voting_test.rs b/modules/governance_state/src/conway_voting_test.rs index 1c03fdd2..09688e23 100644 --- a/modules/governance_state/src/conway_voting_test.rs +++ b/modules/governance_state/src/conway_voting_test.rs @@ -223,6 +223,7 @@ mod tests { } #[test] + #[ignore] fn test_voting_mainnet_up_573() -> Result<()> { let fmt_layer = fmt::layer() .with_filter( diff --git a/modules/historical_accounts_state/src/historical_accounts_state.rs b/modules/historical_accounts_state/src/historical_accounts_state.rs index 99d07d30..1661915b 100644 --- a/modules/historical_accounts_state/src/historical_accounts_state.rs +++ b/modules/historical_accounts_state/src/historical_accounts_state.rs @@ -346,7 +346,28 @@ impl HistoricalAccountsState { } } AccountsStateQuery::GetAccountRewardHistory { account } => { - match state.lock().await.get_reward_history(account).await { + let result = state.lock().await.get_reward_history(account).await; + + match &result { + Ok(Some(rewards)) => { + info!( + "Account {:?} has {} reward entries", + account.to_string(), + rewards.len() + ); + } + Ok(None) => { + info!("Account {:?} has no reward history", account.to_string()); + } + Err(e) => { + error!( + "Failed to fetch reward history for {:?}: {e:#}", + account.to_string() + ); + } + } + + match result { Ok(Some(rewards)) => { AccountsStateQueryResponse::AccountRewardHistory(rewards) } diff --git a/modules/historical_accounts_state/src/immutable_historical_account_store.rs b/modules/historical_accounts_state/src/immutable_historical_account_store.rs index 904ec88c..8375bb97 100644 --- a/modules/historical_accounts_state/src/immutable_historical_account_store.rs +++ b/modules/historical_accounts_state/src/immutable_historical_account_store.rs @@ -85,11 +85,8 @@ impl ImmutableHistoricalAccountStore { // Persist rewards if config.store_rewards_history { - batch.insert( - &self.rewards_history, - epoch_key, - to_vec(&entry.reward_history)?, - ); + let rewards = entry.reward_history.as_ref().map(|v| v.clone()).unwrap_or_default(); + batch.insert(&self.rewards_history, epoch_key, to_vec(&rewards)?); } // Persist active stake diff --git a/modules/historical_accounts_state/src/state.rs b/modules/historical_accounts_state/src/state.rs index ee17f57c..423be975 100644 --- a/modules/historical_accounts_state/src/state.rs +++ b/modules/historical_accounts_state/src/state.rs @@ -109,7 +109,7 @@ impl State { for reward in reward_deltas.deltas.iter() { let entry = volatile.entry(reward.stake_address.clone()).or_default(); let update = AccountReward { - epoch, + epoch: epoch - 2, amount: reward.delta, pool: reward.pool.clone(), reward_type: reward.reward_type.clone(), diff --git a/modules/rest_blockfrost/src/handlers/accounts.rs b/modules/rest_blockfrost/src/handlers/accounts.rs index 86fb0080..82bf8d03 100644 --- a/modules/rest_blockfrost/src/handlers/accounts.rs +++ b/modules/rest_blockfrost/src/handlers/accounts.rs @@ -37,13 +37,13 @@ pub async fn handle_single_account_blockfrost( params: Vec, handlers_config: Arc, ) -> Result { - let stake_address = match parse_stake_address(¶ms) { + let account = match parse_stake_address(¶ms) { Ok(addr) => addr, Err(resp) => return Ok(resp), }; // Prepare the message let msg = Arc::new(Message::StateQuery(StateQuery::Accounts( - AccountsStateQuery::GetAccountInfo { stake_address }, + AccountsStateQuery::GetAccountInfo { account }, ))); let account = query_state( &context, @@ -120,16 +120,14 @@ pub async fn handle_account_registrations_blockfrost( params: Vec, handlers_config: Arc, ) -> Result { - let stake_address = match parse_stake_address(¶ms) { + let account = match parse_stake_address(¶ms) { Ok(addr) => addr, Err(resp) => return Ok(resp), }; // Prepare the message let msg = Arc::new(Message::StateQuery(StateQuery::Accounts( - AccountsStateQuery::GetAccountRegistrationHistory { - account: stake_address, - }, + AccountsStateQuery::GetAccountRegistrationHistory { account }, ))); // Get registrations from historical accounts state @@ -147,10 +145,10 @@ pub async fn handle_account_registrations_blockfrost( Message::StateQueryResponse(StateQueryResponse::Accounts( AccountsStateQueryResponse::Error(e), )) => Err(anyhow::anyhow!( - "Internal server error while retrieving account info: {e}" + "Internal server error while retrieving account registrations: {e}" )), _ => Err(anyhow::anyhow!( - "Unexpected message type while retrieving account info" + "Unexpected message type while retrieving account registrations" )), }, ) @@ -216,16 +214,14 @@ pub async fn handle_account_delegations_blockfrost( params: Vec, handlers_config: Arc, ) -> Result { - let stake_address = match parse_stake_address(¶ms) { + let account = match parse_stake_address(¶ms) { Ok(addr) => addr, Err(resp) => return Ok(resp), }; // Prepare the message let msg = Arc::new(Message::StateQuery(StateQuery::Accounts( - AccountsStateQuery::GetAccountDelegationHistory { - account: stake_address, - }, + AccountsStateQuery::GetAccountDelegationHistory { account }, ))); // Get delegations from historical accounts state @@ -243,10 +239,10 @@ pub async fn handle_account_delegations_blockfrost( Message::StateQueryResponse(StateQueryResponse::Accounts( AccountsStateQueryResponse::Error(e), )) => Err(anyhow::anyhow!( - "Internal server error while retrieving account info: {e}" + "Internal server error while retrieving account delegations: {e}" )), _ => Err(anyhow::anyhow!( - "Unexpected message type while retrieving account info" + "Unexpected message type while retrieving account delegations" )), }, ) @@ -349,10 +345,10 @@ pub async fn handle_account_mirs_blockfrost( Message::StateQueryResponse(StateQueryResponse::Accounts( AccountsStateQueryResponse::Error(e), )) => Err(anyhow::anyhow!( - "Internal server error while retrieving account info: {e}" + "Internal server error while retrieving account mirs: {e}" )), _ => Err(anyhow::anyhow!( - "Unexpected message type while retrieving account info" + "Unexpected message type while retrieving account mirs" )), }, ) @@ -417,16 +413,14 @@ pub async fn handle_account_withdrawals_blockfrost( params: Vec, handlers_config: Arc, ) -> Result { - let stake_address = match parse_stake_address(¶ms) { + let account = match parse_stake_address(¶ms) { Ok(addr) => addr, Err(resp) => return Ok(resp), }; // Prepare the message let msg = Arc::new(Message::StateQuery(StateQuery::Accounts( - AccountsStateQuery::GetAccountRegistrationHistory { - account: stake_address, - }, + AccountsStateQuery::GetAccountRegistrationHistory { account }, ))); // Get withdrawals from historical accounts state @@ -444,10 +438,10 @@ pub async fn handle_account_withdrawals_blockfrost( Message::StateQueryResponse(StateQueryResponse::Accounts( AccountsStateQueryResponse::Error(e), )) => Err(anyhow::anyhow!( - "Internal server error while retrieving account info: {e}" + "Internal server error while retrieving account withdrawals: {e}" )), _ => Err(anyhow::anyhow!( - "Unexpected message type while retrieving account info" + "Unexpected message type while retrieving account withdrawals" )), }, ) @@ -512,16 +506,14 @@ pub async fn handle_account_rewards_blockfrost( params: Vec, handlers_config: Arc, ) -> Result { - let stake_address = match parse_stake_address(¶ms) { + let account = match parse_stake_address(¶ms) { Ok(addr) => addr, Err(resp) => return Ok(resp), }; // Prepare the message let msg = Arc::new(Message::StateQuery(StateQuery::Accounts( - AccountsStateQuery::GetAccountRegistrationHistory { - account: stake_address, - }, + AccountsStateQuery::GetAccountRewardHistory { account }, ))); // Get rewards from historical accounts state @@ -539,10 +531,10 @@ pub async fn handle_account_rewards_blockfrost( Message::StateQueryResponse(StateQueryResponse::Accounts( AccountsStateQueryResponse::Error(e), )) => Err(anyhow::anyhow!( - "Internal server error while retrieving account info: {e}" + "Internal server error while retrieving account rewards: {e}" )), _ => Err(anyhow::anyhow!( - "Unexpected message type while retrieving account info" + "Unexpected message type while retrieving account rewards" )), }, ) @@ -567,7 +559,7 @@ pub async fn handle_account_rewards_blockfrost( Ok(json) => Ok(RESTResponse::with_json(200, &json)), Err(e) => Ok(RESTResponse::with_text( 500, - &format!("Internal server error while serializing withdrawal history: {e}"), + &format!("Internal server error while serializing reward history: {e}"), )), } } From 8e29637e76f5c3fa8f08dd528e76e3876ba65f20 Mon Sep 17 00:00:00 2001 From: William Hankins Date: Thu, 30 Oct 2025 17:08:54 +0000 Subject: [PATCH 7/9] fix: use correct epoch for pool refund rewards Signed-off-by: William Hankins --- modules/historical_accounts_state/src/state.rs | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/modules/historical_accounts_state/src/state.rs b/modules/historical_accounts_state/src/state.rs index 423be975..ea8f63dc 100644 --- a/modules/historical_accounts_state/src/state.rs +++ b/modules/historical_accounts_state/src/state.rs @@ -10,8 +10,8 @@ use acropolis_common::{ queries::accounts::{ AccountReward, AccountWithdrawal, DelegationUpdate, RegistrationStatus, RegistrationUpdate, }, - BlockInfo, InstantaneousRewardTarget, PoolId, ShelleyAddress, StakeAddress, TxCertificate, - TxIdentifier, + BlockInfo, InstantaneousRewardTarget, PoolId, RewardType, ShelleyAddress, StakeAddress, + TxCertificate, TxIdentifier, }; use tracing::warn; @@ -108,8 +108,14 @@ impl State { let volatile = self.volatile.window.back_mut().expect("window should never be empty"); for reward in reward_deltas.deltas.iter() { let entry = volatile.entry(reward.stake_address.clone()).or_default(); + + let reward_epoch = match reward.reward_type { + RewardType::PoolRefund => epoch, + _ => epoch.saturating_sub(2), + }; + let update = AccountReward { - epoch: epoch - 2, + epoch: reward_epoch, amount: reward.delta, pool: reward.pool.clone(), reward_type: reward.reward_type.clone(), From 9e1dfdf6c137d3e2c8fbe8d183cf87af5b071b30 Mon Sep 17 00:00:00 2001 From: William Hankins Date: Thu, 30 Oct 2025 18:56:08 +0000 Subject: [PATCH 8/9] fix: clippy warnings Signed-off-by: William Hankins --- .../src/immutable_historical_account_store.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modules/historical_accounts_state/src/immutable_historical_account_store.rs b/modules/historical_accounts_state/src/immutable_historical_account_store.rs index 8375bb97..545b9722 100644 --- a/modules/historical_accounts_state/src/immutable_historical_account_store.rs +++ b/modules/historical_accounts_state/src/immutable_historical_account_store.rs @@ -85,7 +85,7 @@ impl ImmutableHistoricalAccountStore { // Persist rewards if config.store_rewards_history { - let rewards = entry.reward_history.as_ref().map(|v| v.clone()).unwrap_or_default(); + let rewards = entry.reward_history.clone().unwrap_or_default(); batch.insert(&self.rewards_history, epoch_key, to_vec(&rewards)?); } From ffd706a6136287b3cfd0d6353faee1af242263ff Mon Sep 17 00:00:00 2001 From: William Hankins Date: Thu, 30 Oct 2025 22:29:49 +0000 Subject: [PATCH 9/9] fix: remove query handling debug logging Signed-off-by: William Hankins --- .../src/historical_accounts_state.rs | 23 +------------------ 1 file changed, 1 insertion(+), 22 deletions(-) diff --git a/modules/historical_accounts_state/src/historical_accounts_state.rs b/modules/historical_accounts_state/src/historical_accounts_state.rs index 1661915b..99d07d30 100644 --- a/modules/historical_accounts_state/src/historical_accounts_state.rs +++ b/modules/historical_accounts_state/src/historical_accounts_state.rs @@ -346,28 +346,7 @@ impl HistoricalAccountsState { } } AccountsStateQuery::GetAccountRewardHistory { account } => { - let result = state.lock().await.get_reward_history(account).await; - - match &result { - Ok(Some(rewards)) => { - info!( - "Account {:?} has {} reward entries", - account.to_string(), - rewards.len() - ); - } - Ok(None) => { - info!("Account {:?} has no reward history", account.to_string()); - } - Err(e) => { - error!( - "Failed to fetch reward history for {:?}: {e:#}", - account.to_string() - ); - } - } - - match result { + match state.lock().await.get_reward_history(account).await { Ok(Some(rewards)) => { AccountsStateQueryResponse::AccountRewardHistory(rewards) }