diff --git a/common/src/queries/accounts.rs b/common/src/queries/accounts.rs index b075c356..4f8fa2bd 100644 --- a/common/src/queries/accounts.rs +++ b/common/src/queries/accounts.rs @@ -1,6 +1,8 @@ use std::collections::HashMap; -use crate::{DRepChoice, KeyHash, PoolId, PoolLiveStakeInfo, StakeAddress, TxIdentifier}; +use crate::{ + DRepChoice, KeyHash, PoolId, PoolLiveStakeInfo, RewardType, StakeAddress, TxIdentifier, +}; pub const DEFAULT_ACCOUNTS_QUERY_TOPIC: (&str, &str) = ("accounts-state-query-topic", "cardano.query.accounts"); @@ -12,8 +14,8 @@ pub const DEFAULT_HISTORICAL_ACCOUNTS_QUERY_TOPIC: (&str, &str) = ( #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub enum AccountsStateQuery { - GetAccountInfo { stake_address: StakeAddress }, - GetAccountRewardHistory { stake_key: Vec }, + GetAccountInfo { account: StakeAddress }, + GetAccountRewardHistory { account: StakeAddress }, GetAccountHistory { stake_key: Vec }, GetAccountRegistrationHistory { account: StakeAddress }, GetAccountDelegationHistory { account: StakeAddress }, @@ -47,7 +49,7 @@ pub enum AccountsStateQuery { #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub enum AccountsStateQueryResponse { AccountInfo(AccountInfo), - AccountRewardHistory(AccountRewardHistory), + AccountRewardHistory(Vec), AccountHistory(AccountHistory), AccountRegistrationHistory(Vec), AccountDelegationHistory(Vec), @@ -91,9 +93,6 @@ pub struct AccountInfo { pub delegated_drep: Option, } -#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] -pub struct AccountRewardHistory {} - #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub struct AccountHistory {} @@ -150,6 +149,20 @@ pub struct AccountWithdrawal { pub amount: u64, } +#[derive( + Debug, Clone, minicbor::Decode, minicbor::Encode, serde::Serialize, serde::Deserialize, +)] +pub struct AccountReward { + #[n(0)] + pub epoch: u32, + #[n(1)] + pub amount: u64, + #[n(2)] + pub pool: PoolId, + #[n(3)] + pub reward_type: RewardType, +} + #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub struct AccountWithdrawalHistory {} diff --git a/common/src/stake_addresses.rs b/common/src/stake_addresses.rs index b9308ce5..fff09af9 100644 --- a/common/src/stake_addresses.rs +++ b/common/src/stake_addresses.rs @@ -535,6 +535,13 @@ impl StakeAddressMap { update_value_with_delta(&mut sas.rewards, delta) } + pub fn pay_reward(&mut self, stake_address: &StakeAddress, delta: u64) -> Result<()> { + let sas = self.entry(stake_address.clone()).or_default(); + sas.rewards = + sas.rewards.checked_add(delta).ok_or_else(|| anyhow::anyhow!("reward overflow"))?; + Ok(()) + } + /// Update utxo value with delta pub fn update_utxo_value(&mut self, stake_address: &StakeAddress, delta: i64) -> Result<()> { let sas = self.entry(stake_address.clone()).or_default(); diff --git a/common/src/types.rs b/common/src/types.rs index 0134b2b9..e530c97e 100644 --- a/common/src/types.rs +++ b/common/src/types.rs @@ -212,7 +212,38 @@ pub struct StakeAddressDelta { #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub struct StakeRewardDelta { pub stake_address: StakeAddress, - pub delta: i64, + pub delta: u64, + pub reward_type: RewardType, + pub pool: PoolId, +} + +/// Type of reward being given +#[derive( + Debug, + Clone, + PartialEq, + minicbor::Encode, + minicbor::Decode, + serde::Serialize, + serde::Deserialize, +)] +pub enum RewardType { + #[n(0)] + Leader, + #[n(1)] + Member, + #[n(2)] + PoolRefund, +} + +impl fmt::Display for RewardType { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + RewardType::Leader => write!(f, "leader"), + RewardType::Member => write!(f, "member"), + RewardType::PoolRefund => write!(f, "pool_deposit_refund"), + } + } } pub type PolicyId = [u8; 28]; diff --git a/modules/accounts_state/src/accounts_state.rs b/modules/accounts_state/src/accounts_state.rs index 0b146d46..11a16def 100644 --- a/modules/accounts_state/src/accounts_state.rs +++ b/modules/accounts_state/src/accounts_state.rs @@ -505,8 +505,8 @@ impl AccountsState { }; let response = match query { - AccountsStateQuery::GetAccountInfo { stake_address } => { - if let Some(account) = state.get_stake_state(stake_address) { + AccountsStateQuery::GetAccountInfo { account } => { + if let Some(account) = state.get_stake_state(account) { AccountsStateQueryResponse::AccountInfo(AccountInfo { utxo_value: account.utxo_value, rewards: account.rewards, diff --git a/modules/accounts_state/src/rewards.rs b/modules/accounts_state/src/rewards.rs index 484ad119..5bd3e9f5 100644 --- a/modules/accounts_state/src/rewards.rs +++ b/modules/accounts_state/src/rewards.rs @@ -5,6 +5,7 @@ use acropolis_common::{ protocol_params::ShelleyParams, rational_number::RationalNumber, KeyHash, Lovelace, SPORewards, StakeAddress, }; +use acropolis_common::{PoolId, RewardType}; use anyhow::{bail, Result}; use bigdecimal::{BigDecimal, One, ToPrimitive, Zero}; use std::cmp::min; @@ -13,13 +14,6 @@ use std::collections::{BTreeMap, HashSet}; use std::sync::Arc; use tracing::{debug, info, warn}; -/// Type of reward being given -#[derive(Debug, Clone, PartialEq)] -pub enum RewardType { - Leader, - Member, -} - /// Reward Detail #[derive(Debug, Clone)] pub struct RewardDetail { @@ -31,6 +25,9 @@ pub struct RewardDetail { /// Reward amount pub amount: Lovelace, + + // Pool that reward came from + pub pool: PoolId, } /// Result of a rewards calculation @@ -211,6 +208,7 @@ pub fn calculate_rewards( num_delegators_paid += 1; total_paid_to_delegators += reward.amount; } + RewardType::PoolRefund => {} } spo_rewards.total_rewards += reward.amount; result.total_paid += reward.amount; @@ -391,6 +389,7 @@ fn calculate_spo_rewards( account: delegator_stake_address.clone(), rtype: RewardType::Member, amount: to_pay, + pool: operator_id.to_vec(), }); total_paid += to_pay; delegators_paid += 1; @@ -408,6 +407,7 @@ fn calculate_spo_rewards( account: spo.reward_account.clone(), rtype: RewardType::Leader, amount: spo_benefit, + pool: operator_id.to_vec(), }); } else { info!( diff --git a/modules/accounts_state/src/state.rs b/modules/accounts_state/src/state.rs index f66ab412..a711c347 100644 --- a/modules/accounts_state/src/state.rs +++ b/modules/accounts_state/src/state.rs @@ -1,9 +1,10 @@ //! Acropolis AccountsState: State storage use crate::monetary::calculate_monetary_change; -use crate::rewards::{calculate_rewards, RewardType, RewardsResult}; +use crate::rewards::{calculate_rewards, RewardsResult}; use crate::snapshot::Snapshot; use crate::verifier::Verifier; use acropolis_common::queries::accounts::OptimalPoolSizing; +use acropolis_common::RewardType; use acropolis_common::{ math::update_value_with_delta, messages::{ @@ -14,8 +15,9 @@ use acropolis_common::{ protocol_params::ProtocolParams, stake_addresses::{StakeAddressMap, StakeAddressState}, BlockInfo, DRepChoice, DRepCredential, DelegatedStake, InstantaneousRewardSource, - InstantaneousRewardTarget, KeyHash, Lovelace, MoveInstantaneousReward, PoolLiveStakeInfo, - PoolRegistration, Pot, SPORewards, StakeAddress, StakeRewardDelta, TxCertificate, + InstantaneousRewardTarget, KeyHash, Lovelace, MoveInstantaneousReward, PoolId, + PoolLiveStakeInfo, PoolRegistration, Pot, SPORewards, StakeAddress, StakeRewardDelta, + TxCertificate, }; use anyhow::Result; use imbl::OrdMap; @@ -114,10 +116,7 @@ pub struct State { previous_protocol_parameters: Option, /// Pool refunds to apply next epoch (list of reward accounts to refund to) - pool_refunds: Vec, - - /// MIRs to pay next epoch - mirs: Vec, + pool_refunds: Vec<(PoolId, StakeAddress)>, /// Addresses registration changes in current epoch current_epoch_registration_changes: Arc>>, @@ -302,7 +301,6 @@ impl State { // Pay MIRs before snapshot, so reserves is correct for total_supply in rewards let mut reward_deltas = Vec::::new(); - reward_deltas.extend(self.pay_mirs()); // Capture a new snapshot for the end of the previous epoch and push it to state let snapshot = Snapshot::new( @@ -469,13 +467,15 @@ impl State { } // Send them their deposits back - for stake_address in refunds { + for (pool, stake_address) in refunds { // If their reward account has been deregistered, it goes to Treasury let mut stake_addresses = self.stake_addresses.lock().unwrap(); if stake_addresses.is_registered(&stake_address) { reward_deltas.push(StakeRewardDelta { stake_address: stake_address.clone(), - delta: deposit as i64, + delta: deposit, + reward_type: RewardType::PoolRefund, + pool, }); stake_addresses.add_to_reward(&stake_address, deposit); } else { @@ -493,73 +493,61 @@ impl State { } /// Pay MIRs - fn pay_mirs(&mut self) -> Vec { - let mut reward_deltas = Vec::::new(); - - let mirs = take(&mut self.mirs); - for mir in mirs { - let (source, source_name, other, other_name) = match &mir.source { - InstantaneousRewardSource::Reserves => ( - &mut self.pots.reserves, - "reserves", - &mut self.pots.treasury, - "treasury", - ), - InstantaneousRewardSource::Treasury => ( - &mut self.pots.treasury, - "treasury", - &mut self.pots.reserves, - "reserves", - ), - }; - - match &mir.target { - InstantaneousRewardTarget::StakeAddresses(deltas) => { - // Transfer to (in theory also from) stake addresses from (to) a pot - let mut total_value: u64 = 0; - for (stake_address, value) in deltas.iter() { - // Get old stake address state, or create one - let mut stake_addresses = self.stake_addresses.lock().unwrap(); - let sas = stake_addresses.entry(stake_address.clone()).or_default(); - - // Add to this one - reward_deltas.push(StakeRewardDelta { - stake_address: stake_address.clone(), - delta: *value, - }); - if let Err(e) = update_value_with_delta(&mut sas.rewards, *value) { - error!("MIR to stake address {}: {e}", stake_address); - } + fn pay_mir(&mut self, mir: &MoveInstantaneousReward) { + let (source, source_name, other, other_name) = match &mir.source { + InstantaneousRewardSource::Reserves => ( + &mut self.pots.reserves, + "reserves", + &mut self.pots.treasury, + "treasury", + ), + InstantaneousRewardSource::Treasury => ( + &mut self.pots.treasury, + "treasury", + &mut self.pots.reserves, + "reserves", + ), + }; - // Update the source - if let Err(e) = update_value_with_delta(source, -*value) { - error!("MIR from {source_name}: {e}"); - } + match &mir.target { + InstantaneousRewardTarget::StakeAddresses(deltas) => { + // Transfer to a stake addresses from a pot + let mut total_value: u64 = 0; + for (stake_address, value) in deltas.iter() { + // Get old stake address state, or create one + let mut stake_addresses = self.stake_addresses.lock().unwrap(); + let sas = stake_addresses.entry(stake_address.clone()).or_default(); + + if let Err(e) = update_value_with_delta(&mut sas.rewards, *value) { + error!("MIR to stake address {}: {e}", stake_address); + } - let _ = update_value_with_delta(&mut total_value, *value); + // Update the source + if let Err(e) = update_value_with_delta(source, -*value) { + error!("MIR from {source_name}: {e}"); } - info!( - "MIR of {total_value} to {} stake addresses from {source_name}", - deltas.len() - ); + let _ = update_value_with_delta(&mut total_value, *value); } - InstantaneousRewardTarget::OtherAccountingPot(value) => { - // Transfer between pots - if let Err(e) = update_value_with_delta(source, -(*value as i64)) { - error!("MIR from {source_name}: {e}"); - } - if let Err(e) = update_value_with_delta(other, *value as i64) { - error!("MIR to {other_name}: {e}"); - } + info!( + "MIR of {total_value} to {} stake addresses from {source_name}", + deltas.len() + ); + } - info!("MIR of {value} from {source_name} to {other_name}"); + InstantaneousRewardTarget::OtherAccountingPot(value) => { + // Transfer between pots + if let Err(e) = update_value_with_delta(source, -(*value as i64)) { + error!("MIR from {source_name}: {e}"); + } + if let Err(e) = update_value_with_delta(other, *value as i64) { + error!("MIR to {other_name}: {e}"); } + + info!("MIR of {value} from {source_name} to {other_name}"); } } - - reward_deltas } /// Derive the Stake Pool Delegation Distribution (SPDD) - a map of total stake values @@ -636,7 +624,9 @@ impl State { stake_addresses.add_to_reward(&reward.account, reward.amount); reward_deltas.push(StakeRewardDelta { stake_address: reward.account.clone(), - delta: reward.amount as i64, + delta: reward.amount, + reward_type: reward.rtype.clone(), + pool: reward.pool.clone(), }); } else { warn!( @@ -765,7 +755,11 @@ impl State { hex::encode(id), retired_spo.reward_account ); - self.pool_refunds.push(retired_spo.reward_account.clone()); // Store full StakeAddress + self.pool_refunds.push(( + retired_spo.operator.clone(), + retired_spo.reward_account.clone(), + )); + // Store full StakeAddress } // Schedule to retire - we need them to still be in place when we count @@ -846,12 +840,6 @@ impl State { stake_addresses.record_stake_delegation(stake_address, spo); } - /// Handle an MoveInstantaneousReward (pre-Conway only) - pub fn handle_mir(&mut self, mir: &MoveInstantaneousReward) -> Result<()> { - self.mirs.push(mir.clone()); - Ok(()) - } - /// record a drep delegation fn record_drep_delegation(&mut self, stake_address: &StakeAddress, drep: &DRepChoice) { let mut stake_addresses = self.stake_addresses.lock().unwrap(); @@ -872,7 +860,7 @@ impl State { } TxCertificate::MoveInstantaneousReward(mir) => { - self.handle_mir(mir).unwrap_or_else(|e| error!("MIR failed: {e:#}")); + self.pay_mir(mir); } TxCertificate::Registration(reg) => { @@ -1178,8 +1166,7 @@ mod tests { target: InstantaneousRewardTarget::OtherAccountingPot(42), }; - state.handle_mir(&mir).unwrap(); - state.pay_mirs(); + state.pay_mir(&mir); assert_eq!(state.pots.reserves, 58); assert_eq!(state.pots.treasury, 42); assert_eq!(state.pots.deposits, 0); @@ -1190,9 +1177,7 @@ mod tests { target: InstantaneousRewardTarget::OtherAccountingPot(10), }; - state.handle_mir(&mir).unwrap(); - let reward_deltas = state.pay_mirs(); - assert_eq!(reward_deltas.len(), 0); + state.pay_mir(&mir); assert_eq!(state.pots.reserves, 68); assert_eq!(state.pots.treasury, 32); assert_eq!(state.pots.deposits, 0); @@ -1234,8 +1219,7 @@ mod tests { ]), }; - state.handle_mir(&mir).unwrap(); - state.pay_mirs(); + state.pay_mir(&mir); assert_eq!(state.pots.reserves, 58); assert_eq!(state.pots.treasury, 0); assert_eq!(state.pots.deposits, 2_000_000); // Paid deposit @@ -1280,16 +1264,12 @@ mod tests { target: InstantaneousRewardTarget::StakeAddresses(vec![(stake_address.clone(), 42)]), }; - state.handle_mir(&mir).unwrap(); - let diffs = state.pay_mirs(); - assert_eq!(state.pots.reserves, 58); - assert_eq!(diffs.len(), 1); - assert_eq!(diffs[0].stake_address.get_hash(), stake_address.get_hash()); - assert_eq!(diffs[0].delta, 42); + state.pay_mir(&mir); { let stake_addresses = state.stake_addresses.lock().unwrap(); let sas = stake_addresses.get(&stake_address).unwrap(); + assert_eq!(state.pots.reserves, 58); assert_eq!(sas.rewards, 42); } diff --git a/modules/accounts_state/src/verifier.rs b/modules/accounts_state/src/verifier.rs index 4f4784d4..34b114c0 100644 --- a/modules/accounts_state/src/verifier.rs +++ b/modules/accounts_state/src/verifier.rs @@ -1,7 +1,7 @@ //! Verification of calculated values against captured CSV from Haskell node / DBSync -use crate::rewards::{RewardDetail, RewardType, RewardsResult}; +use crate::rewards::{RewardDetail, RewardsResult}; use crate::state::Pots; -use acropolis_common::{KeyHash, StakeAddress}; +use acropolis_common::{KeyHash, RewardType, StakeAddress}; use hex::FromHex; use itertools::EitherOrBoth::{Both, Left, Right}; use itertools::Itertools; @@ -167,10 +167,11 @@ impl Verifier { continue; }; - expected_rewards.entry(spo).or_default().push(RewardDetail { + expected_rewards.entry(spo.clone()).or_default().push(RewardDetail { account: stake_address, rtype, amount, + pool: spo, }); } diff --git a/modules/historical_accounts_state/src/historical_accounts_state.rs b/modules/historical_accounts_state/src/historical_accounts_state.rs index 9633c500..99d07d30 100644 --- a/modules/historical_accounts_state/src/historical_accounts_state.rs +++ b/modules/historical_accounts_state/src/historical_accounts_state.rs @@ -126,10 +126,7 @@ impl HistoricalAccountsState { { Self::check_sync(¤t_block, block_info); let mut state = state_mutex.lock().await; - state - .handle_rewards(rewards_msg) - .inspect_err(|e| error!("Reward deltas handling error: {e:#}")) - .ok(); + state.handle_rewards(rewards_msg, block_info.epoch as u32); } } @@ -348,6 +345,15 @@ impl HistoricalAccountsState { Err(e) => AccountsStateQueryResponse::Error(e.to_string()), } } + AccountsStateQuery::GetAccountRewardHistory { account } => { + match state.lock().await.get_reward_history(account).await { + Ok(Some(rewards)) => { + AccountsStateQueryResponse::AccountRewardHistory(rewards) + } + Ok(None) => AccountsStateQueryResponse::NotFound, + Err(e) => AccountsStateQueryResponse::Error(e.to_string()), + } + } _ => AccountsStateQueryResponse::Error(format!( "Unimplemented query variant: {:?}", query diff --git a/modules/historical_accounts_state/src/immutable_historical_account_store.rs b/modules/historical_accounts_state/src/immutable_historical_account_store.rs index eb95c929..545b9722 100644 --- a/modules/historical_accounts_state/src/immutable_historical_account_store.rs +++ b/modules/historical_accounts_state/src/immutable_historical_account_store.rs @@ -1,7 +1,7 @@ use std::{collections::HashMap, path::Path}; use acropolis_common::{ - queries::accounts::{AccountWithdrawal, DelegationUpdate, RegistrationUpdate}, + queries::accounts::{AccountReward, AccountWithdrawal, DelegationUpdate, RegistrationUpdate}, ShelleyAddress, StakeAddress, }; use anyhow::Result; @@ -11,7 +11,7 @@ use rayon::iter::{IntoParallelIterator, ParallelIterator}; use tokio::sync::Mutex; use tracing::{debug, error}; -use crate::state::{AccountEntry, ActiveStakeHistory, HistoricalAccountsConfig, RewardHistory}; +use crate::state::{AccountEntry, ActiveStakeHistory, HistoricalAccountsConfig}; pub struct ImmutableHistoricalAccountStore { rewards_history: Partition, @@ -85,11 +85,8 @@ impl ImmutableHistoricalAccountStore { // Persist rewards if config.store_rewards_history { - batch.insert( - &self.rewards_history, - epoch_key, - to_vec(&entry.reward_history)?, - ); + let rewards = entry.reward_history.clone().unwrap_or_default(); + batch.insert(&self.rewards_history, epoch_key, to_vec(&rewards)?); } // Persist active stake @@ -152,12 +149,12 @@ impl ImmutableHistoricalAccountStore { pending.extend(drained); } - pub async fn _get_rewards_history( + pub async fn get_reward_history( &self, account: &StakeAddress, - ) -> Result>> { + ) -> Result>> { let mut immutable_rewards = - self.collect_partition::(&self.rewards_history, account.get_hash())?; + self.collect_partition::(&self.rewards_history, account.get_hash())?; self.merge_pending( account, diff --git a/modules/historical_accounts_state/src/state.rs b/modules/historical_accounts_state/src/state.rs index 4ceef42c..ea8f63dc 100644 --- a/modules/historical_accounts_state/src/state.rs +++ b/modules/historical_accounts_state/src/state.rs @@ -8,9 +8,9 @@ use acropolis_common::{ AddressDeltasMessage, StakeRewardDeltasMessage, TxCertificatesMessage, WithdrawalsMessage, }, queries::accounts::{ - AccountWithdrawal, DelegationUpdate, RegistrationStatus, RegistrationUpdate, + AccountReward, AccountWithdrawal, DelegationUpdate, RegistrationStatus, RegistrationUpdate, }, - BlockInfo, InstantaneousRewardTarget, PoolId, ShelleyAddress, StakeAddress, StakeCredential, + BlockInfo, InstantaneousRewardTarget, PoolId, RewardType, ShelleyAddress, StakeAddress, TxCertificate, TxIdentifier, }; use tracing::warn; @@ -24,7 +24,7 @@ use anyhow::Result; #[derive(Debug, Default, Clone)] pub struct AccountEntry { - pub reward_history: Option>, + pub reward_history: Option>, pub active_stake_history: Option>, pub delegation_history: Option>, pub registration_history: Option>, @@ -33,18 +33,6 @@ pub struct AccountEntry { pub addresses: Option>, } -#[derive(Debug, Clone, minicbor::Decode, minicbor::Encode)] -pub struct RewardHistory { - #[n(0)] - pub epoch: u32, - #[n(1)] - pub amount: u64, - #[n(2)] - pub pool: PoolId, - #[n(3)] - pub is_owner: bool, -} - #[derive(Debug, Clone, minicbor::Decode, minicbor::Encode)] pub struct ActiveStakeHistory { #[n(0)] @@ -116,8 +104,24 @@ impl State { && block_info.number > self.volatile.epoch_start_block + self.volatile.security_param_k } - pub fn handle_rewards(&mut self, _reward_deltas: &StakeRewardDeltasMessage) -> Result<()> { - Ok(()) + pub fn handle_rewards(&mut self, reward_deltas: &StakeRewardDeltasMessage, epoch: u32) { + let volatile = self.volatile.window.back_mut().expect("window should never be empty"); + for reward in reward_deltas.deltas.iter() { + let entry = volatile.entry(reward.stake_address.clone()).or_default(); + + let reward_epoch = match reward.reward_type { + RewardType::PoolRefund => epoch, + _ => epoch.saturating_sub(2), + }; + + let update = AccountReward { + epoch: reward_epoch, + amount: reward.delta, + pool: reward.pool.clone(), + reward_type: reward.reward_type.clone(), + }; + entry.reward_history.get_or_insert_with(Vec::new).push(update); + } } pub fn handle_tx_certificates(&mut self, tx_certs: &TxCertificatesMessage, epoch: u32) { @@ -239,11 +243,23 @@ impl State { } } - pub async fn _get_reward_history( + pub async fn get_reward_history( &self, - _account: &StakeCredential, - ) -> Result> { - Ok(Vec::new()) + account: &StakeAddress, + ) -> Result>> { + let immutable = self.immutable.get_reward_history(account).await?; + + let mut volatile = Vec::new(); + self.merge_volatile_history(account, |e| e.reward_history.as_ref(), &mut volatile); + + match immutable { + Some(mut rewards) => { + rewards.extend(volatile); + Ok(Some(rewards)) + } + None if volatile.is_empty() => Ok(None), + None => Ok(Some(volatile)), + } } pub async fn _get_active_stake_history( diff --git a/modules/rest_blockfrost/src/handlers/accounts.rs b/modules/rest_blockfrost/src/handlers/accounts.rs index 88fdb02f..f7e86265 100644 --- a/modules/rest_blockfrost/src/handlers/accounts.rs +++ b/modules/rest_blockfrost/src/handlers/accounts.rs @@ -13,7 +13,9 @@ use anyhow::{anyhow, Result}; use caryatid_sdk::Context; use crate::handlers_config::HandlersConfig; -use crate::types::{AccountWithdrawalREST, DelegationUpdateREST, RegistrationUpdateREST}; +use crate::types::{ + AccountRewardREST, AccountWithdrawalREST, DelegationUpdateREST, RegistrationUpdateREST, +}; #[derive(serde::Serialize)] pub struct StakeAccountRest { @@ -35,13 +37,13 @@ pub async fn handle_single_account_blockfrost( params: Vec, handlers_config: Arc, ) -> Result { - let stake_address = match parse_stake_address(¶ms) { + let account = match parse_stake_address(¶ms) { Ok(addr) => addr, Err(resp) => return Ok(resp), }; // Prepare the message let msg = Arc::new(Message::StateQuery(StateQuery::Accounts( - AccountsStateQuery::GetAccountInfo { stake_address }, + AccountsStateQuery::GetAccountInfo { account }, ))); let account = query_state( &context, @@ -118,16 +120,14 @@ pub async fn handle_account_registrations_blockfrost( params: Vec, handlers_config: Arc, ) -> Result { - let stake_address = match parse_stake_address(¶ms) { + let account = match parse_stake_address(¶ms) { Ok(addr) => addr, Err(resp) => return Ok(resp), }; // Prepare the message let msg = Arc::new(Message::StateQuery(StateQuery::Accounts( - AccountsStateQuery::GetAccountRegistrationHistory { - account: stake_address, - }, + AccountsStateQuery::GetAccountRegistrationHistory { account }, ))); // Get registrations from historical accounts state @@ -145,10 +145,10 @@ pub async fn handle_account_registrations_blockfrost( Message::StateQueryResponse(StateQueryResponse::Accounts( AccountsStateQueryResponse::Error(e), )) => Err(anyhow::anyhow!( - "Internal server error while retrieving account info: {e}" + "Internal server error while retrieving account registrations: {e}" )), _ => Err(anyhow::anyhow!( - "Unexpected message type while retrieving account info" + "Unexpected message type while retrieving account registrations" )), }, ) @@ -214,16 +214,14 @@ pub async fn handle_account_delegations_blockfrost( params: Vec, handlers_config: Arc, ) -> Result { - let stake_address = match parse_stake_address(¶ms) { + let account = match parse_stake_address(¶ms) { Ok(addr) => addr, Err(resp) => return Ok(resp), }; // Prepare the message let msg = Arc::new(Message::StateQuery(StateQuery::Accounts( - AccountsStateQuery::GetAccountDelegationHistory { - account: stake_address, - }, + AccountsStateQuery::GetAccountDelegationHistory { account }, ))); // Get delegations from historical accounts state @@ -241,10 +239,10 @@ pub async fn handle_account_delegations_blockfrost( Message::StateQueryResponse(StateQueryResponse::Accounts( AccountsStateQueryResponse::Error(e), )) => Err(anyhow::anyhow!( - "Internal server error while retrieving account info: {e}" + "Internal server error while retrieving account delegations: {e}" )), _ => Err(anyhow::anyhow!( - "Unexpected message type while retrieving account info" + "Unexpected message type while retrieving account delegations" )), }, ) @@ -347,10 +345,10 @@ pub async fn handle_account_mirs_blockfrost( Message::StateQueryResponse(StateQueryResponse::Accounts( AccountsStateQueryResponse::Error(e), )) => Err(anyhow::anyhow!( - "Internal server error while retrieving account info: {e}" + "Internal server error while retrieving account mirs: {e}" )), _ => Err(anyhow::anyhow!( - "Unexpected message type while retrieving account info" + "Unexpected message type while retrieving account mirs" )), }, ) @@ -415,16 +413,14 @@ pub async fn handle_account_withdrawals_blockfrost( params: Vec, handlers_config: Arc, ) -> Result { - let stake_address = match parse_stake_address(¶ms) { + let account = match parse_stake_address(¶ms) { Ok(addr) => addr, Err(resp) => return Ok(resp), }; // Prepare the message let msg = Arc::new(Message::StateQuery(StateQuery::Accounts( - AccountsStateQuery::GetAccountRegistrationHistory { - account: stake_address, - }, + AccountsStateQuery::GetAccountRegistrationHistory { account }, ))); // Get withdrawals from historical accounts state @@ -434,18 +430,18 @@ pub async fn handle_account_withdrawals_blockfrost( msg, |message| match message { Message::StateQueryResponse(StateQueryResponse::Accounts( - AccountsStateQueryResponse::AccountWithdrawalHistory(registrations), - )) => Ok(Some(registrations)), + AccountsStateQueryResponse::AccountWithdrawalHistory(withdrawals), + )) => Ok(Some(withdrawals)), Message::StateQueryResponse(StateQueryResponse::Accounts( AccountsStateQueryResponse::NotFound, )) => Ok(None), Message::StateQueryResponse(StateQueryResponse::Accounts( AccountsStateQueryResponse::Error(e), )) => Err(anyhow::anyhow!( - "Internal server error while retrieving account info: {e}" + "Internal server error while retrieving account withdrawals: {e}" )), _ => Err(anyhow::anyhow!( - "Unexpected message type while retrieving account info" + "Unexpected message type while retrieving account withdrawals" )), }, ) @@ -505,6 +501,69 @@ pub async fn handle_account_withdrawals_blockfrost( } } +pub async fn handle_account_rewards_blockfrost( + context: Arc>, + params: Vec, + handlers_config: Arc, +) -> Result { + let account = match parse_stake_address(¶ms) { + Ok(addr) => addr, + Err(resp) => return Ok(resp), + }; + + // Prepare the message + let msg = Arc::new(Message::StateQuery(StateQuery::Accounts( + AccountsStateQuery::GetAccountRewardHistory { account }, + ))); + + // Get rewards from historical accounts state + let rewards = query_state( + &context, + &handlers_config.historical_accounts_query_topic, + msg, + |message| match message { + Message::StateQueryResponse(StateQueryResponse::Accounts( + AccountsStateQueryResponse::AccountRewardHistory(rewards), + )) => Ok(Some(rewards)), + Message::StateQueryResponse(StateQueryResponse::Accounts( + AccountsStateQueryResponse::NotFound, + )) => Ok(None), + Message::StateQueryResponse(StateQueryResponse::Accounts( + AccountsStateQueryResponse::Error(e), + )) => Err(anyhow::anyhow!( + "Internal server error while retrieving account rewards: {e}" + )), + _ => Err(anyhow::anyhow!( + "Unexpected message type while retrieving account rewards" + )), + }, + ) + .await?; + + let Some(rewards) = rewards else { + return Ok(RESTResponse::with_text(404, "Account not found")); + }; + + let rest_response = + match rewards.iter().map(|r| r.try_into()).collect::, _>>() { + Ok(v) => v, + Err(e) => { + return Ok(RESTResponse::with_text( + 500, + &format!("Failed to convert reward entry: {e}"), + )) + } + }; + + match serde_json::to_string_pretty(&rest_response) { + Ok(json) => Ok(RESTResponse::with_json(200, &json)), + Err(e) => Ok(RESTResponse::with_text( + 500, + &format!("Internal server error while serializing reward history: {e}"), + )), + } +} + fn parse_stake_address(params: &[String]) -> Result { let Some(stake_key) = params.first() else { return Err(RESTResponse::with_text( diff --git a/modules/rest_blockfrost/src/rest_blockfrost.rs b/modules/rest_blockfrost/src/rest_blockfrost.rs index cbf72f73..57b6a7d4 100644 --- a/modules/rest_blockfrost/src/rest_blockfrost.rs +++ b/modules/rest_blockfrost/src/rest_blockfrost.rs @@ -16,7 +16,11 @@ mod handlers_config; mod types; mod utils; use handlers::{ - accounts::handle_single_account_blockfrost, + accounts::{ + handle_account_delegations_blockfrost, handle_account_mirs_blockfrost, + handle_account_registrations_blockfrost, handle_account_rewards_blockfrost, + handle_account_withdrawals_blockfrost, handle_single_account_blockfrost, + }, addresses::{ handle_address_asset_utxos_blockfrost, handle_address_extended_blockfrost, handle_address_single_blockfrost, handle_address_totals_blockfrost, @@ -58,13 +62,7 @@ use handlers::{ }, }; -use crate::{ - handlers::accounts::{ - handle_account_delegations_blockfrost, handle_account_mirs_blockfrost, - handle_account_registrations_blockfrost, handle_account_withdrawals_blockfrost, - }, - handlers_config::HandlersConfig, -}; +use crate::handlers_config::HandlersConfig; // Accounts topics const DEFAULT_HANDLE_SINGLE_ACCOUNT_TOPIC: (&str, &str) = @@ -83,6 +81,10 @@ const DEFAULT_HANDLE_ACCOUNT_WITHDRAWALS_TOPIC: (&str, &str) = ( "handle-topic-account-withdrawals", "rest.get.accounts.*.withdrawals", ); +const DEFAULT_HANDLE_ACCOUNT_REWARDS_TOPIC: (&str, &str) = ( + "handle-topic-account-rewards", + "rest.get.accounts.*.rewards", +); // Blocks topics const DEFAULT_HANDLE_BLOCKS_LATEST_HASH_NUMBER_TOPIC: (&str, &str) = @@ -301,6 +303,14 @@ impl BlockfrostREST { handle_account_withdrawals_blockfrost, ); + // Handler for /accounts/{stake_address}/rewards + register_handler( + context.clone(), + DEFAULT_HANDLE_ACCOUNT_REWARDS_TOPIC, + handlers_config.clone(), + handle_account_rewards_blockfrost, + ); + // Handler for /blocks/latest, /blocks/{hash_or_number} register_handler( context.clone(), diff --git a/modules/rest_blockfrost/src/types.rs b/modules/rest_blockfrost/src/types.rs index 77f179f9..56d2eb02 100644 --- a/modules/rest_blockfrost/src/types.rs +++ b/modules/rest_blockfrost/src/types.rs @@ -2,10 +2,9 @@ use crate::cost_models::{PLUTUS_V1, PLUTUS_V2, PLUTUS_V3}; use acropolis_common::{ messages::EpochActivityMessage, protocol_params::{Nonce, NonceVariant, ProtocolParams}, - queries::blocks::BlockInfo, - queries::governance::DRepActionUpdate, + queries::{accounts::AccountReward, blocks::BlockInfo, governance::DRepActionUpdate}, rest_helper::ToCheckedF64, - serialization::{DisplayFromBech32, PoolPrefix}, + serialization::{Bech32WithHrp, DisplayFromBech32, PoolPrefix}, AssetAddressEntry, AssetMetadataStandard, AssetMintRecord, KeyHash, PolicyAsset, PoolEpochState, PoolUpdateAction, Relay, TxHash, Vote, }; @@ -874,3 +873,24 @@ pub struct AccountWithdrawalREST { pub tx_hash: String, pub amount: String, } + +#[derive(Serialize)] +pub struct AccountRewardREST { + pub epoch: u32, + pub amount: String, + pub pool_id: String, + #[serde(rename = "type")] + pub reward_type: String, +} + +impl TryFrom<&AccountReward> for AccountRewardREST { + type Error = anyhow::Error; + fn try_from(value: &AccountReward) -> Result { + Ok(Self { + epoch: value.epoch, + amount: value.amount.to_string(), + pool_id: value.pool.to_bech32_with_hrp("pool")?, + reward_type: value.reward_type.to_string(), + }) + } +} diff --git a/modules/spo_state/src/state.rs b/modules/spo_state/src/state.rs index 8d3b79d6..6abc878b 100644 --- a/modules/spo_state/src/state.rs +++ b/modules/spo_state/src/state.rs @@ -650,7 +650,7 @@ impl State { // Handle deltas for delta in reward_deltas_msg.deltas.iter() { let mut stake_addresses = stake_addresses.lock().unwrap(); - if let Err(e) = stake_addresses.update_reward(&delta.stake_address, delta.delta) { + if let Err(e) = stake_addresses.pay_reward(&delta.stake_address, delta.delta) { error!("Updating reward account {}: {e}", delta.stake_address); } }