diff --git a/Cargo.lock b/Cargo.lock index 8113613e..b1600a3a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -48,6 +48,7 @@ dependencies = [ "config", "csv", "dashmap", + "fjall", "hex", "imbl", "itertools 0.14.0", diff --git a/common/src/queries/accounts.rs b/common/src/queries/accounts.rs index e91dc99e..65ae1bdb 100644 --- a/common/src/queries/accounts.rs +++ b/common/src/queries/accounts.rs @@ -25,6 +25,8 @@ pub enum AccountsStateQuery { // Epochs-related queries GetActiveStakes {}, + GetSPDDByEpoch { epoch: u64 }, + GetSPDDByEpochAndPool { epoch: u64, pool_id: KeyHash }, // Pools related queries GetOptimalPoolSizing, @@ -57,6 +59,10 @@ pub enum AccountsStateQueryResponse { // Epochs-related responses ActiveStakes(u64), + /// Vec<(PoolId, StakeKey, ActiveStakeAmount)> + SPDDByEpoch(Vec<(KeyHash, KeyHash, u64)>), + /// Vec<(StakeKey, ActiveStakeAmount)> + SPDDByEpochAndPool(Vec<(KeyHash, u64)>), // Pools-related responses OptimalPoolSizing(Option), diff --git a/common/src/queries/parameters.rs b/common/src/queries/parameters.rs index 6abcc111..03e26408 100644 --- a/common/src/queries/parameters.rs +++ b/common/src/queries/parameters.rs @@ -7,12 +7,15 @@ pub const DEFAULT_PARAMETERS_QUERY_TOPIC: (&str, &str) = pub enum ParametersStateQuery { GetLatestEpochParameters, GetEpochParameters { epoch_number: u64 }, + GetNetworkName, } #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub enum ParametersStateQueryResponse { LatestEpochParameters(ProtocolParams), EpochParameters(ProtocolParams), + NetworkName(String), + NotFound, Error(String), } diff --git a/common/src/serialization.rs b/common/src/serialization.rs index 94f44510..a8767e21 100644 --- a/common/src/serialization.rs +++ b/common/src/serialization.rs @@ -1,7 +1,9 @@ +use std::marker::PhantomData; + use anyhow::anyhow; use bech32::{Bech32, Hrp}; -use serde::{ser::SerializeMap, Serializer}; -use serde_with::{ser::SerializeAsWrap, SerializeAs}; +use serde::{ser::SerializeMap, Deserialize, Serializer}; +use serde_with::{ser::SerializeAsWrap, DeserializeAs, SerializeAs}; pub struct SerializeMapAs(std::marker::PhantomData<(KAs, VAs)>); @@ -26,6 +28,61 @@ where } } +// Marker types for different HRP prefixes +pub struct PoolPrefix; +pub struct StakePrefix; +pub struct AddrPrefix; + +// Trait to get HRP string from marker types +pub trait HrpPrefix { + const HRP: &'static str; +} + +impl HrpPrefix for PoolPrefix { + const HRP: &'static str = "pool"; +} + +impl HrpPrefix for StakePrefix { + const HRP: &'static str = "stake"; +} + +impl HrpPrefix for AddrPrefix { + const HRP: &'static str = "addr"; +} + +// Generic Bech32 converter with HRP parameter +pub struct DisplayFromBech32(PhantomData); + +// Serialization implementation +impl SerializeAs> for DisplayFromBech32 +where + PREFIX: HrpPrefix, +{ + fn serialize_as(source: &Vec, serializer: S) -> Result + where + S: Serializer, + { + let bech32_string = + source.to_bech32_with_hrp(PREFIX::HRP).map_err(serde::ser::Error::custom)?; + + serializer.serialize_str(&bech32_string) + } +} + +// Deserialization implementation +impl<'de, PREFIX> DeserializeAs<'de, Vec> for DisplayFromBech32 +where + PREFIX: HrpPrefix, +{ + fn deserialize_as(deserializer: D) -> Result, D::Error> + where + D: serde::de::Deserializer<'de>, + { + let s = String::deserialize(deserializer)?; + Vec::::from_bech32_with_hrp(&s, PREFIX::HRP).map_err(serde::de::Error::custom) + } +} + pub trait Bech32WithHrp { fn to_bech32_with_hrp(&self, hrp: &str) -> Result; fn from_bech32_with_hrp(s: &str, expected_hrp: &str) -> Result, anyhow::Error>; diff --git a/common/src/stake_addresses.rs b/common/src/stake_addresses.rs index 8658f675..955a5110 100644 --- a/common/src/stake_addresses.rs +++ b/common/src/stake_addresses.rs @@ -279,7 +279,7 @@ impl StakeAddressMap { /// Derive the Stake Pool Delegation Distribution (SPDD) - a map of total stake values /// (both with and without rewards) for each active SPO /// And Stake Pool Reward State (rewards and delegators_count for each pool) - /// Key of returned map is the SPO 'operator' ID + /// DelegatedStake> pub fn generate_spdd(&self) -> BTreeMap { // Shareable Dashmap with referenced keys let spo_stakes = DashMap::::new(); @@ -317,6 +317,24 @@ impl StakeAddressMap { spo_stakes.iter().map(|entry| (entry.key().clone(), entry.value().clone())).collect() } + /// Dump current Stake Pool Delegation Distribution State + /// (Stake Key, Active Stakes Amount)> + pub fn dump_spdd_state(&self) -> HashMap> { + let entries: Vec<_> = self + .inner + .par_iter() + .filter_map(|(key, sas)| { + sas.delegated_spo.as_ref().map(|spo| (spo.clone(), (key.clone(), sas.utxo_value))) + }) + .collect(); + + let mut result: HashMap> = HashMap::new(); + for (spo, entry) in entries { + result.entry(spo).or_default().push(entry); + } + result + } + /// Derive the DRep Delegation Distribution (DRDD) - the total amount /// delegated to each DRep, including the special "abstain" and "no confidence" dreps. pub fn generate_drdd( diff --git a/common/src/types.rs b/common/src/types.rs index 898ca34a..1a74f072 100644 --- a/common/src/types.rs +++ b/common/src/types.rs @@ -318,6 +318,8 @@ impl Default for UTXODelta { /// Key hash used for pool IDs etc. pub type KeyHash = Vec; +pub type PoolId = Vec; + /// Script identifier pub type ScriptHash = KeyHash; diff --git a/modules/accounts_state/.gitignore b/modules/accounts_state/.gitignore new file mode 100644 index 00000000..fffd97c5 --- /dev/null +++ b/modules/accounts_state/.gitignore @@ -0,0 +1 @@ +*_db \ No newline at end of file diff --git a/modules/accounts_state/Cargo.toml b/modules/accounts_state/Cargo.toml index ec784be1..8bbf4909 100644 --- a/modules/accounts_state/Cargo.toml +++ b/modules/accounts_state/Cargo.toml @@ -25,6 +25,7 @@ serde_json = { workspace = true } serde_with = { workspace = true } tokio = { workspace = true } tracing = { workspace = true } +fjall = "2.11.2" rayon = "1.10.0" csv = "1.3.1" itertools = "0.14.0" diff --git a/modules/accounts_state/src/accounts_state.rs b/modules/accounts_state/src/accounts_state.rs index 98573e70..32fccd43 100644 --- a/modules/accounts_state/src/accounts_state.rs +++ b/modules/accounts_state/src/accounts_state.rs @@ -33,6 +33,9 @@ use acropolis_common::queries::accounts::{ }; use verifier::Verifier; +use crate::spo_distribution_store::SPDDStore; +mod spo_distribution_store; + const DEFAULT_SPO_STATE_TOPIC: &str = "cardano.spo.state"; const DEFAULT_EPOCH_ACTIVITY_TOPIC: &str = "cardano.epoch.activity"; const DEFAULT_TX_CERTIFICATES_TOPIC: &str = "cardano.certificates"; @@ -46,6 +49,10 @@ const DEFAULT_SPO_REWARDS_TOPIC: &str = "cardano.spo.rewards"; const DEFAULT_PROTOCOL_PARAMETERS_TOPIC: &str = "cardano.protocol.parameters"; const DEFAULT_STAKE_REWARD_DELTAS_TOPIC: &str = "cardano.stake.reward.deltas"; +const DEFAULT_STORE_SPDD_HISTORY: (&str, bool) = ("store-spdd-history", false); +const DEFAULT_SPDD_DB_PATH: (&str, &str) = ("spdd-db-path", "./spdd_db"); +const DEFAULT_SPDD_RETENTION_EPOCHS: &str = "spdd-retention-epochs"; + /// Accounts State module #[module( message_type(Message), @@ -58,6 +65,7 @@ impl AccountsState { /// Async run loop async fn run( history: Arc>>, + spdd_store: Option>>, mut drep_publisher: DRepDistributionPublisher, mut spo_publisher: SPODistributionPublisher, mut spo_rewards_publisher: SPORewardsPublisher, @@ -144,6 +152,11 @@ impl AccountsState { let ea_message_f = ea_subscription.read(); let params_message_f = parameters_subscription.read(); + let spdd_store_guard = match spdd_store.as_ref() { + Some(s) => Some(s.lock().await), + None => None, + }; + // Handle DRep let (_, message) = dreps_message_f.await?; match message.as_ref() { @@ -185,6 +198,17 @@ impl AccountsState { if let Err(e) = spo_publisher.publish_spdd(block_info, spdd).await { error!("Error publishing SPO stake distribution: {e:#}") } + + // if we store spdd history + let spdd_state = state.dump_spdd_state(); + if let Some(mut spdd_store) = spdd_store_guard { + // active stakes taken at beginning of epoch i is for epoch + 1 + if let Err(e) = + spdd_store.store_spdd(block_info.epoch + 1, spdd_state) + { + error!("Error storing SPDD state: {e:#}") + } + } } .instrument(span) .await; @@ -396,6 +420,42 @@ impl AccountsState { .unwrap_or(DEFAULT_STAKE_REWARD_DELTAS_TOPIC.to_string()); info!("Creating stake reward deltas subscriber on '{stake_reward_deltas_topic}'"); + // store spdd history config + let store_spdd_history = + config.get_bool(DEFAULT_STORE_SPDD_HISTORY.0).unwrap_or(DEFAULT_STORE_SPDD_HISTORY.1); + info!("Store SPDD history: {}", store_spdd_history); + + let spdd_db_path = + config.get_string(DEFAULT_SPDD_DB_PATH.0).unwrap_or(DEFAULT_SPDD_DB_PATH.1.to_string()); + + // Convert to absolute path if relative + let spdd_db_path = if std::path::Path::new(&spdd_db_path).is_absolute() { + spdd_db_path + } else { + let current_dir = std::env::current_dir() + .map_err(|e| anyhow::anyhow!("Failed to get current directory: {}", e))?; + current_dir.join(&spdd_db_path).to_string_lossy().to_string() + }; + + // Get retention epochs configuration (None = unlimited) + let spdd_retention_epochs = config + .get_int(DEFAULT_SPDD_RETENTION_EPOCHS) + .ok() + .and_then(|v| if v > 0 { Some(v as u64) } else { None }); + info!("SPDD retention epochs: {:?}", spdd_retention_epochs); + + if store_spdd_history { + info!("SPDD database path: {}", spdd_db_path); + match spdd_retention_epochs { + Some(epochs) => info!( + "SPDD retention: {} epochs (~{} GB max)", + epochs, + (epochs as f64 * 0.12).ceil() + ), + None => info!("SPDD retention: unlimited (no automatic pruning)"), + } + } + // Query topics let accounts_query_topic = config .get_string(DEFAULT_ACCOUNTS_QUERY_TOPIC.0) @@ -415,16 +475,28 @@ impl AccountsState { verifier.read_rewards(&verify_rewards_files); } - // Create history + // History let history = Arc::new(Mutex::new(StateHistory::::new( "AccountsState", StateHistoryStore::default_block_store(), ))); - let history_account_state = history.clone(); + let history_query = history.clone(); let history_tick = history.clone(); + // Spdd store + let spdd_store = if store_spdd_history { + Some(Arc::new(Mutex::new(SPDDStore::load( + std::path::Path::new(&spdd_db_path), + spdd_retention_epochs, + )?))) + } else { + None + }; + let spdd_store_query = spdd_store.clone(); + context.handle(&accounts_query_topic, move |message| { - let history = history_account_state.clone(); + let history = history_query.clone(); + let spdd_store = spdd_store_query.clone(); async move { let Message::StateQuery(StateQuery::Accounts(query)) = message.as_ref() else { return Arc::new(Message::StateQueryResponse(StateQueryResponse::Accounts( @@ -435,6 +507,10 @@ impl AccountsState { }; let guard = history.lock().await; + let spdd_store_guard = match spdd_store.as_ref() { + Some(s) => Some(s.lock().await), + None => None, + }; let state = match guard.current() { Some(s) => s, None => { @@ -539,6 +615,32 @@ impl AccountsState { } } + AccountsStateQuery::GetSPDDByEpoch { epoch } => match spdd_store_guard { + Some(spdd_store) => match spdd_store.query_by_epoch(*epoch) { + Ok(result) => AccountsStateQueryResponse::SPDDByEpoch(result), + Err(e) => AccountsStateQueryResponse::Error(e.to_string()), + }, + None => AccountsStateQueryResponse::Error( + "SPDD store is not enabled".to_string(), + ), + }, + + AccountsStateQuery::GetSPDDByEpochAndPool { epoch, pool_id } => { + match spdd_store_guard { + Some(spdd_store) => { + match spdd_store.query_by_epoch_and_pool(*epoch, pool_id) { + Ok(result) => { + AccountsStateQueryResponse::SPDDByEpochAndPool(result) + } + Err(e) => AccountsStateQueryResponse::Error(e.to_string()), + } + } + None => AccountsStateQueryResponse::Error( + "SPDD store is not enabled".to_string(), + ), + } + } + _ => AccountsStateQueryResponse::Error(format!( "Unimplemented query variant: {:?}", query @@ -595,6 +697,7 @@ impl AccountsState { context.run(async move { Self::run( history, + spdd_store, drep_publisher, spo_publisher, spo_rewards_publisher, diff --git a/modules/accounts_state/src/spo_distribution_store.rs b/modules/accounts_state/src/spo_distribution_store.rs new file mode 100644 index 00000000..0dcde887 --- /dev/null +++ b/modules/accounts_state/src/spo_distribution_store.rs @@ -0,0 +1,283 @@ +use std::collections::HashMap; + +use acropolis_common::{AddrKeyhash, PoolId}; +use anyhow::Result; +use fjall::{Config, Keyspace, PartitionCreateOptions}; + +const POOL_ID_LEN: usize = 28; +const STAKE_KEY_LEN: usize = 28; +const EPOCH_LEN: usize = 8; +const TOTAL_KEY_LEN: usize = EPOCH_LEN + POOL_ID_LEN + STAKE_KEY_LEN; + +// Batch size balances commit overhead vs memory usage +// ~720KB per batch (72 bytes × 10,000) +// ~130 commits for typical epoch (~1.3M delegations) +const BATCH_SIZE: usize = 10_000; + +fn encode_key(epoch: u64, pool_id: &PoolId, stake_key: &AddrKeyhash) -> Vec { + let mut key = Vec::with_capacity(TOTAL_KEY_LEN); + key.extend_from_slice(&epoch.to_be_bytes()); + key.extend_from_slice(pool_id); + key.extend_from_slice(stake_key); + key +} + +fn encode_epoch_pool_prefix(epoch: u64, pool_id: &PoolId) -> Vec { + let mut prefix = Vec::with_capacity(EPOCH_LEN + POOL_ID_LEN); + prefix.extend_from_slice(&epoch.to_be_bytes()); + prefix.extend_from_slice(pool_id); + prefix +} + +fn decode_key(key: &[u8]) -> Result<(u64, PoolId, AddrKeyhash)> { + let epoch = u64::from_be_bytes(key[..EPOCH_LEN].try_into()?); + let pool_id = key[EPOCH_LEN..EPOCH_LEN + POOL_ID_LEN].to_vec(); + let stake_key = key[EPOCH_LEN + POOL_ID_LEN..].to_vec(); + Ok((epoch, pool_id, stake_key)) +} + +/// Encode epoch completion marker key +fn encode_epoch_marker(epoch: u64) -> Vec { + epoch.to_be_bytes().to_vec() +} + +pub struct SPDDStore { + keyspace: Keyspace, + /// Partition for all SPDD data + /// Key format: epoch(8 bytes) + pool_id + stake_key + /// Value: amount(8 bytes) + spdd: fjall::PartitionHandle, + /// Partition for epoch completion markers + /// Key format: epoch(8 bytes) + /// Value: "complete" + epoch_markers: fjall::PartitionHandle, + /// Maximum number of epochs to retain (None = unlimited) + retention_epochs: Option, +} + +impl SPDDStore { + #[allow(dead_code)] + pub fn new( + path: impl AsRef, + retention_epochs: Option, + ) -> fjall::Result { + let path = path.as_ref(); + if path.exists() { + std::fs::remove_dir_all(path)?; + } + + let keyspace = Config::new(path).open()?; + let spdd = keyspace.open_partition("spdd", PartitionCreateOptions::default())?; + let epoch_markers = + keyspace.open_partition("epoch_markers", PartitionCreateOptions::default())?; + + Ok(Self { + keyspace, + spdd, + epoch_markers, + retention_epochs, + }) + } + + pub fn load( + path: impl AsRef, + retention_epochs: Option, + ) -> fjall::Result { + let path = path.as_ref(); + + let keyspace = Config::new(path).open()?; + let spdd = keyspace.open_partition("spdd", PartitionCreateOptions::default())?; + let epoch_markers = + keyspace.open_partition("epoch_markers", PartitionCreateOptions::default())?; + + Ok(Self { + keyspace, + spdd, + epoch_markers, + retention_epochs, + }) + } + + pub fn is_epoch_complete(&self, epoch: u64) -> fjall::Result { + let marker_key = encode_epoch_marker(epoch); + Ok(matches!(self.epoch_markers.get(&marker_key)?, Some(value) if value.eq(b"complete"))) + } + + pub fn store_spdd( + &mut self, + epoch: u64, + spdd_state: HashMap>, + ) -> fjall::Result<()> { + if self.is_epoch_complete(epoch)? { + return Ok(()); + } + self.remove_epoch_data(epoch)?; + + let mut batch = self.keyspace.batch(); + let mut count = 0; + for (pool_id, delegations) in spdd_state { + for (stake_key, amount) in delegations { + let key = encode_key(epoch, &pool_id, &stake_key); + let value = amount.to_be_bytes(); + batch.insert(&self.spdd, key, value); + + count += 1; + if count >= BATCH_SIZE { + batch.commit()?; + batch = self.keyspace.batch(); + count = 0; + } + } + } + if count > 0 { + batch.commit()?; + } + + // Mark epoch as complete (single key operation) + let marker_key = encode_epoch_marker(epoch); + self.epoch_markers.insert(marker_key, b"complete")?; + + if let Some(retention) = self.retention_epochs { + if epoch >= retention { + let keep_from_epoch = epoch - retention + 1; + self.prune_epochs_before(keep_from_epoch)?; + } + } + + Ok(()) + } + + pub fn remove_epoch_data(&self, epoch: u64) -> fjall::Result { + // Remove epoch marker first - if process fails midway, epoch will be marked incomplete + let marker_key = encode_epoch_marker(epoch); + self.epoch_markers.remove(marker_key)?; + + let prefix = epoch.to_be_bytes(); + let mut batch = self.keyspace.batch(); + let mut deleted_count = 0; + let mut total_deleted_count: u64 = 0; + + for item in self.spdd.prefix(prefix) { + let (key, _) = item?; + batch.remove(&self.spdd, key); + total_deleted_count += 1; + + deleted_count += 1; + if deleted_count >= BATCH_SIZE { + batch.commit()?; + batch = self.keyspace.batch(); + deleted_count = 0; + } + } + + if deleted_count > 0 { + batch.commit()?; + } + + Ok(total_deleted_count) + } + + pub fn prune_epochs_before(&self, before_epoch: u64) -> fjall::Result { + let mut deleted_epochs: u64 = 0; + + for epoch in (0..before_epoch).rev() { + let deleted_count = self.remove_epoch_data(epoch)?; + if deleted_count == 0 { + break; + } + deleted_epochs += 1; + } + Ok(deleted_epochs) + } + + pub fn query_by_epoch(&self, epoch: u64) -> Result> { + if !self.is_epoch_complete(epoch)? { + return Err(anyhow::anyhow!("Epoch SPDD Data is not complete")); + } + + let prefix = epoch.to_be_bytes(); + let mut result = Vec::new(); + for item in self.spdd.prefix(prefix) { + let (key, value) = item?; + let (_, pool_id, stake_key) = decode_key(&key)?; + let amount = u64::from_be_bytes(value.as_ref().try_into()?); + result.push((pool_id, stake_key, amount)); + } + Ok(result) + } + + pub fn query_by_epoch_and_pool( + &self, + epoch: u64, + pool_id: &PoolId, + ) -> Result> { + if !self.is_epoch_complete(epoch)? { + return Err(anyhow::anyhow!("Epoch SPDD Data is not complete")); + } + + let prefix = encode_epoch_pool_prefix(epoch, pool_id); + let mut result = Vec::new(); + for item in self.spdd.prefix(prefix) { + let (key, value) = item?; + let (_, _, stake_key) = decode_key(&key)?; + let amount = u64::from_be_bytes(value.as_ref().try_into()?); + result.push((stake_key, amount)); + } + Ok(result) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + const DB_PATH: &str = "spdd_db"; + + #[test] + fn test_store_spdd_state() { + let mut spdd_store = SPDDStore::new(std::path::Path::new(DB_PATH), None) + .expect("Failed to create SPDD store"); + let mut spdd_state: HashMap> = HashMap::new(); + spdd_state.insert( + vec![0x01; 28], + vec![(vec![0x10; 28], 100), (vec![0x11; 28], 150)], + ); + spdd_state.insert( + vec![0x02; 28], + vec![(vec![0x20; 28], 200), (vec![0x21; 28], 250)], + ); + assert!(spdd_store.store_spdd(1, spdd_state).is_ok()); + + let result = spdd_store.query_by_epoch(1).unwrap(); + assert_eq!(result.len(), 4); + let result = spdd_store.query_by_epoch_and_pool(1, &vec![0x01; 28]).unwrap(); + assert_eq!(result.len(), 2); + let result = spdd_store.query_by_epoch_and_pool(1, &vec![0x02; 28]).unwrap(); + assert_eq!(result.len(), 2); + } + + #[test] + fn test_prune_old_epochs() { + let mut spdd_store = SPDDStore::new(std::path::Path::new("spdd_prune_test_db"), Some(2)) + .expect("Failed to create SPDD store"); + + for epoch in 1..=3 { + let mut spdd_state: HashMap> = HashMap::new(); + spdd_state.insert( + vec![epoch as u8; 28], + vec![(vec![0x10; 28], epoch * 100), (vec![0x11; 28], epoch * 150)], + ); + spdd_store.store_spdd(epoch, spdd_state).expect("Failed to store SPDD state"); + } + + assert!(!spdd_store.is_epoch_complete(1).unwrap()); + assert!(spdd_store.is_epoch_complete(2).unwrap()); + assert!(spdd_store.is_epoch_complete(3).unwrap()); + + assert!(spdd_store.query_by_epoch(1).is_err()); + let result = spdd_store.query_by_epoch(2).unwrap(); + assert_eq!(result.len(), 2); + let result = spdd_store.query_by_epoch(3).unwrap(); + assert_eq!(result.len(), 2); + } +} diff --git a/modules/accounts_state/src/state.rs b/modules/accounts_state/src/state.rs index 97c626e3..fe3384be 100644 --- a/modules/accounts_state/src/state.rs +++ b/modules/accounts_state/src/state.rs @@ -605,6 +605,11 @@ impl State { stake_addresses.generate_spdd() } + pub fn dump_spdd_state(&self) -> HashMap> { + let stake_addresses = self.stake_addresses.lock().unwrap(); + stake_addresses.dump_spdd_state() + } + /// Derive the DRep Delegation Distribution (SPDD) - the total amount /// delegated to each DRep, including the special "abstain" and "no confidence" dreps. pub fn generate_drdd(&self) -> DRepDelegationDistribution { diff --git a/modules/parameters_state/src/parameters_state.rs b/modules/parameters_state/src/parameters_state.rs index 18b1a2d4..187295ec 100644 --- a/modules/parameters_state/src/parameters_state.rs +++ b/modules/parameters_state/src/parameters_state.rs @@ -196,6 +196,11 @@ impl ParametersState { } } } + ParametersStateQuery::GetNetworkName => { + ParametersStateQueryResponse::NetworkName( + lock.get_current_state().network_name.clone(), + ) + } }; Arc::new(Message::StateQueryResponse(StateQueryResponse::Parameters( response, diff --git a/modules/rest_blockfrost/src/handlers/epochs.rs b/modules/rest_blockfrost/src/handlers/epochs.rs index 4486fadb..ddb7b003 100644 --- a/modules/rest_blockfrost/src/handlers/epochs.rs +++ b/modules/rest_blockfrost/src/handlers/epochs.rs @@ -1,6 +1,8 @@ use crate::{ handlers_config::HandlersConfig, - types::{EpochActivityRest, ProtocolParamsRest}, + types::{ + EpochActivityRest, ProtocolParamsRest, SPDDByEpochAndPoolItemRest, SPDDByEpochItemRest, + }, }; use acropolis_common::{ messages::{Message, RESTResponse, StateQuery, StateQueryResponse}, @@ -13,6 +15,7 @@ use acropolis_common::{ utils::query_state, }, serialization::Bech32WithHrp, + AddressNetwork, StakeAddress, StakeAddressPayload, }; use anyhow::{anyhow, Result}; use caryatid_sdk::Context; @@ -253,6 +256,10 @@ pub async fn handle_epoch_params_blockfrost( "Protocol parameters not found for requested epoch", )), ParametersStateQueryResponse::Error(msg) => Ok(RESTResponse::with_text(400, &msg)), + _ => Ok(RESTResponse::with_text( + 500, + "Unexpected message type while retrieving parameters", + )), } } @@ -395,19 +402,268 @@ pub async fn handle_epoch_previous_blockfrost( } pub async fn handle_epoch_total_stakes_blockfrost( - _context: Arc>, - _params: Vec, - _handlers_config: Arc, + context: Arc>, + params: Vec, + handlers_config: Arc, ) -> Result { - Ok(RESTResponse::with_text(501, "Not implemented")) + if params.len() != 1 { + return Ok(RESTResponse::with_text( + 400, + "Expected one parameter: an epoch number", + )); + } + let param = ¶ms[0]; + + let epoch_number = match param.parse::() { + Ok(num) => num, + Err(_) => { + return Ok(RESTResponse::with_text( + 400, + "Invalid epoch number parameter", + )); + } + }; + + // Query latest epoch from epochs-state + let latest_epoch_msg = Arc::new(Message::StateQuery(StateQuery::Epochs( + EpochsStateQuery::GetLatestEpoch, + ))); + let latest_epoch = query_state( + &context, + &handlers_config.epochs_query_topic, + latest_epoch_msg, + |message| match message { + Message::StateQueryResponse(StateQueryResponse::Epochs( + EpochsStateQueryResponse::LatestEpoch(res), + )) => Ok(res.epoch.epoch), + _ => Err(anyhow::anyhow!( + "Unexpected message type while retrieving latest epoch" + )), + }, + ) + .await?; + + if epoch_number > latest_epoch { + return Ok(RESTResponse::with_text(404, "Epoch not found")); + } + + // Query current network from parameters-state + let current_network_msg = Arc::new(Message::StateQuery(StateQuery::Parameters( + ParametersStateQuery::GetNetworkName, + ))); + let current_network = query_state( + &context, + &handlers_config.parameters_query_topic, + current_network_msg, + |message| match message { + Message::StateQueryResponse(StateQueryResponse::Parameters( + ParametersStateQueryResponse::NetworkName(network), + )) => Ok(network), + _ => Err(anyhow::anyhow!( + "Unexpected message type while retrieving current network" + )), + }, + ) + .await?; + + let network = match current_network.as_str() { + "mainnet" => AddressNetwork::Main, + "testnet" => AddressNetwork::Test, + unknown => { + return Ok(RESTResponse::with_text( + 500, + format!("Internal server error while retrieving current network: {unknown}") + .as_str(), + )) + } + }; + + // Query SPDD by epoch from accounts-state + let msg = Arc::new(Message::StateQuery(StateQuery::Accounts( + AccountsStateQuery::GetSPDDByEpoch { + epoch: epoch_number, + }, + ))); + let spdd = query_state( + &context, + &handlers_config.accounts_query_topic, + msg, + |message| match message { + Message::StateQueryResponse(StateQueryResponse::Accounts( + AccountsStateQueryResponse::SPDDByEpoch(res), + )) => Ok(res), + Message::StateQueryResponse(StateQueryResponse::Accounts( + AccountsStateQueryResponse::Error(e), + )) => Err(anyhow::anyhow!( + "Internal server error while retrieving SPDD by epoch: {e}" + )), + _ => Err(anyhow::anyhow!( + "Unexpected message type while retrieving SPDD by epoch" + )), + }, + ) + .await?; + let spdd_response = spdd + .into_iter() + .map(|(pool_id, stake_key_hash, amount)| { + let stake_address = StakeAddress { + network: network.clone(), + payload: StakeAddressPayload::StakeKeyHash(stake_key_hash), + } + .to_string() + .map_err(|e| anyhow::anyhow!("Failed to convert stake address to string: {e}"))?; + Ok(SPDDByEpochItemRest { + pool_id, + stake_address, + amount, + }) + }) + .collect::>>()?; + + match serde_json::to_string_pretty(&spdd_response) { + Ok(json) => Ok(RESTResponse::with_json(200, &json)), + Err(e) => Ok(RESTResponse::with_text( + 500, + &format!("Failed to serialize SPDD by epoch: {e}"), + )), + } } pub async fn handle_epoch_pool_stakes_blockfrost( - _context: Arc>, - _params: Vec, - _handlers_config: Arc, + context: Arc>, + params: Vec, + handlers_config: Arc, ) -> Result { - Ok(RESTResponse::with_text(501, "Not implemented")) + if params.len() != 2 { + return Ok(RESTResponse::with_text( + 400, + "Expected two parameters: an epoch number and a pool ID", + )); + } + let param = ¶ms[0]; + let pool_id = ¶ms[1]; + + let epoch_number = match param.parse::() { + Ok(num) => num, + Err(_) => { + return Ok(RESTResponse::with_text( + 400, + "Invalid epoch number parameter", + )); + } + }; + + let Ok(pool_id) = Vec::::from_bech32_with_hrp(pool_id, "pool") else { + return Ok(RESTResponse::with_text( + 400, + &format!("Invalid Bech32 stake pool ID: {pool_id}"), + )); + }; + + // Query latest epoch from epochs-state + let latest_epoch_msg = Arc::new(Message::StateQuery(StateQuery::Epochs( + EpochsStateQuery::GetLatestEpoch, + ))); + let latest_epoch = query_state( + &context, + &handlers_config.epochs_query_topic, + latest_epoch_msg, + |message| match message { + Message::StateQueryResponse(StateQueryResponse::Epochs( + EpochsStateQueryResponse::LatestEpoch(res), + )) => Ok(res.epoch.epoch), + _ => Err(anyhow::anyhow!( + "Unexpected message type while retrieving latest epoch" + )), + }, + ) + .await?; + + if epoch_number > latest_epoch { + return Ok(RESTResponse::with_text(404, "Epoch not found")); + } + + // Query current network from parameters-state + let current_network_msg = Arc::new(Message::StateQuery(StateQuery::Parameters( + ParametersStateQuery::GetNetworkName, + ))); + let current_network = query_state( + &context, + &handlers_config.parameters_query_topic, + current_network_msg, + |message| match message { + Message::StateQueryResponse(StateQueryResponse::Parameters( + ParametersStateQueryResponse::NetworkName(network), + )) => Ok(network), + _ => Err(anyhow::anyhow!( + "Unexpected message type while retrieving current network" + )), + }, + ) + .await?; + + let network = match current_network.as_str() { + "mainnet" => AddressNetwork::Main, + "testnet" => AddressNetwork::Test, + unknown => { + return Ok(RESTResponse::with_text( + 500, + format!("Internal server error while retrieving current network: {unknown}") + .as_str(), + )) + } + }; + + // Query SPDD by epoch and pool from accounts-state + let msg = Arc::new(Message::StateQuery(StateQuery::Accounts( + AccountsStateQuery::GetSPDDByEpochAndPool { + epoch: epoch_number, + pool_id, + }, + ))); + let spdd = query_state( + &context, + &handlers_config.accounts_query_topic, + msg, + |message| match message { + Message::StateQueryResponse(StateQueryResponse::Accounts( + AccountsStateQueryResponse::SPDDByEpochAndPool(res), + )) => Ok(res), + Message::StateQueryResponse(StateQueryResponse::Accounts( + AccountsStateQueryResponse::Error(e), + )) => Err(anyhow::anyhow!( + "Internal server error while retrieving SPDD by epoch and pool: {e}" + )), + _ => Err(anyhow::anyhow!( + "Unexpected message type while retrieving SPDD by epoch and pool" + )), + }, + ) + .await?; + let spdd_response = spdd + .into_iter() + .map(|(stake_key_hash, amount)| { + let stake_address = StakeAddress { + network: network.clone(), + payload: StakeAddressPayload::StakeKeyHash(stake_key_hash), + } + .to_string() + .map_err(|e| anyhow::anyhow!("Failed to convert stake address to string: {e}"))?; + + Ok(SPDDByEpochAndPoolItemRest { + stake_address, + amount, + }) + }) + .collect::>>()?; + + match serde_json::to_string_pretty(&spdd_response) { + Ok(json) => Ok(RESTResponse::with_json(200, &json)), + Err(e) => Ok(RESTResponse::with_text( + 500, + &format!("Failed to serialize SPDD by epoch and pool: {e}"), + )), + } } pub async fn handle_epoch_total_blocks_blockfrost( diff --git a/modules/rest_blockfrost/src/types.rs b/modules/rest_blockfrost/src/types.rs index 6730eaf7..e7dd37ca 100644 --- a/modules/rest_blockfrost/src/types.rs +++ b/modules/rest_blockfrost/src/types.rs @@ -4,9 +4,11 @@ use acropolis_common::{ protocol_params::{Nonce, NonceVariant, ProtocolParams}, queries::governance::DRepActionUpdate, rest_helper::ToCheckedF64, + serialization::{DisplayFromBech32, PoolPrefix}, AssetAddressEntry, AssetMetadataStandard, AssetMintRecord, KeyHash, PolicyAsset, PoolEpochState, PoolUpdateAction, Relay, TxHash, Vote, }; +use anyhow::Result; use num_traits::ToPrimitive; use rust_decimal::Decimal; use serde::Serialize; @@ -50,6 +52,26 @@ impl From for EpochActivityRest { } } +// REST response structure for /epochs/{number}/stakes +#[serde_as] +#[derive(Serialize)] +pub struct SPDDByEpochItemRest { + pub stake_address: String, + #[serde_as(as = "DisplayFromBech32")] + pub pool_id: KeyHash, + #[serde_as(as = "DisplayFromStr")] + pub amount: u64, +} + +// REST response structure for /epochs/{number}/stakes/{pool_id} +#[serde_as] +#[derive(Serialize)] +pub struct SPDDByEpochAndPoolItemRest { + pub stake_address: String, + #[serde_as(as = "DisplayFromStr")] + pub amount: u64, +} + // REST response structure for /governance/dreps #[derive(Serialize)] pub struct DRepsListREST { diff --git a/processes/omnibus/.gitignore b/processes/omnibus/.gitignore index e35753a2..4c2d2d58 100644 --- a/processes/omnibus/.gitignore +++ b/processes/omnibus/.gitignore @@ -2,3 +2,6 @@ downloads sled-immutable-utxos fjall-immutable-utxos cache + +# DB files +*_db \ No newline at end of file diff --git a/processes/omnibus/omnibus.toml b/processes/omnibus/omnibus.toml index 4fa7998e..9d45bf81 100644 --- a/processes/omnibus/omnibus.toml +++ b/processes/omnibus/omnibus.toml @@ -83,6 +83,14 @@ write-full-cache = "false" store-history = false [module.accounts-state] +# Store SPDD history +# Enable /epochs/{number}/stakes, /epochs/{number}/stakes/{pool_id} endpoints +store-spdd-history = false +# SPDD database path (only used when store-spdd-history is enabled) +spdd-db-path = "./spdd_db" +# Number of epochs to retain in SPDD history +# Example: 73 +spdd-retention-epochs = none # Verify against captured CSV verify-pots-file = "../../modules/accounts_state/test-data/pots.mainnet.csv" verify-rewards-files = "../../modules/accounts_state/test-data/rewards.mainnet.{}.csv"