From 726705468eed4565af94cd5cd753991df0310517 Mon Sep 17 00:00:00 2001 From: Golddy Dev Date: Mon, 13 Oct 2025 19:52:45 +0200 Subject: [PATCH 01/13] feat: implement SPDD history storage and querying functionality in accounts state module - Added SPDDStore to store Stake Pool Delegation Distribution data using fjall. - Introduced methods to store and query SPDD by epoch and pool ID. - Updated AccountsState to handle new SPDD queries. - Enhanced REST API to support fetching SPDD data by epoch and pool. - Added configuration option to enable SPDD history storage. - Add serialize as functions for Bech32. - Included tests for SPDDStore functionality. --- Cargo.lock | 1 + common/src/queries/accounts.rs | 6 + common/src/serialization.rs | 61 +++++- common/src/stake_addresses.rs | 20 +- modules/accounts_state/.gitignore | 1 + modules/accounts_state/Cargo.toml | 1 + modules/accounts_state/src/accounts_state.rs | 78 +++++++- .../src/spo_distribution_store.rs | 175 +++++++++++++++++ modules/accounts_state/src/state.rs | 5 + .../rest_blockfrost/src/handlers/epochs.rs | 180 +++++++++++++++++- modules/rest_blockfrost/src/types.rs | 42 ++++ processes/omnibus/.gitignore | 3 + processes/omnibus/omnibus.toml | 3 + 13 files changed, 562 insertions(+), 14 deletions(-) create mode 100644 modules/accounts_state/.gitignore create mode 100644 modules/accounts_state/src/spo_distribution_store.rs diff --git a/Cargo.lock b/Cargo.lock index 12ac5e1a..2cc5d98c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -45,6 +45,7 @@ dependencies = [ "caryatid_sdk", "chrono", "config", + "fjall", "hex", "imbl", "serde", diff --git a/common/src/queries/accounts.rs b/common/src/queries/accounts.rs index e91dc99e..65ae1bdb 100644 --- a/common/src/queries/accounts.rs +++ b/common/src/queries/accounts.rs @@ -25,6 +25,8 @@ pub enum AccountsStateQuery { // Epochs-related queries GetActiveStakes {}, + GetSPDDByEpoch { epoch: u64 }, + GetSPDDByEpochAndPool { epoch: u64, pool_id: KeyHash }, // Pools related queries GetOptimalPoolSizing, @@ -57,6 +59,10 @@ pub enum AccountsStateQueryResponse { // Epochs-related responses ActiveStakes(u64), + /// Vec<(PoolId, StakeKey, ActiveStakeAmount)> + SPDDByEpoch(Vec<(KeyHash, KeyHash, u64)>), + /// Vec<(StakeKey, ActiveStakeAmount)> + SPDDByEpochAndPool(Vec<(KeyHash, u64)>), // Pools-related responses OptimalPoolSizing(Option), diff --git a/common/src/serialization.rs b/common/src/serialization.rs index 94f44510..a8767e21 100644 --- a/common/src/serialization.rs +++ b/common/src/serialization.rs @@ -1,7 +1,9 @@ +use std::marker::PhantomData; + use anyhow::anyhow; use bech32::{Bech32, Hrp}; -use serde::{ser::SerializeMap, Serializer}; -use serde_with::{ser::SerializeAsWrap, SerializeAs}; +use serde::{ser::SerializeMap, Deserialize, Serializer}; +use serde_with::{ser::SerializeAsWrap, DeserializeAs, SerializeAs}; pub struct SerializeMapAs(std::marker::PhantomData<(KAs, VAs)>); @@ -26,6 +28,61 @@ where } } +// Marker types for different HRP prefixes +pub struct PoolPrefix; +pub struct StakePrefix; +pub struct AddrPrefix; + +// Trait to get HRP string from marker types +pub trait HrpPrefix { + const HRP: &'static str; +} + +impl HrpPrefix for PoolPrefix { + const HRP: &'static str = "pool"; +} + +impl HrpPrefix for StakePrefix { + const HRP: &'static str = "stake"; +} + +impl HrpPrefix for AddrPrefix { + const HRP: &'static str = "addr"; +} + +// Generic Bech32 converter with HRP parameter +pub struct DisplayFromBech32(PhantomData); + +// Serialization implementation +impl SerializeAs> for DisplayFromBech32 +where + PREFIX: HrpPrefix, +{ + fn serialize_as(source: &Vec, serializer: S) -> Result + where + S: Serializer, + { + let bech32_string = + source.to_bech32_with_hrp(PREFIX::HRP).map_err(serde::ser::Error::custom)?; + + serializer.serialize_str(&bech32_string) + } +} + +// Deserialization implementation +impl<'de, PREFIX> DeserializeAs<'de, Vec> for DisplayFromBech32 +where + PREFIX: HrpPrefix, +{ + fn deserialize_as(deserializer: D) -> Result, D::Error> + where + D: serde::de::Deserializer<'de>, + { + let s = String::deserialize(deserializer)?; + Vec::::from_bech32_with_hrp(&s, PREFIX::HRP).map_err(serde::de::Error::custom) + } +} + pub trait Bech32WithHrp { fn to_bech32_with_hrp(&self, hrp: &str) -> Result; fn from_bech32_with_hrp(s: &str, expected_hrp: &str) -> Result, anyhow::Error>; diff --git a/common/src/stake_addresses.rs b/common/src/stake_addresses.rs index 8658f675..955a5110 100644 --- a/common/src/stake_addresses.rs +++ b/common/src/stake_addresses.rs @@ -279,7 +279,7 @@ impl StakeAddressMap { /// Derive the Stake Pool Delegation Distribution (SPDD) - a map of total stake values /// (both with and without rewards) for each active SPO /// And Stake Pool Reward State (rewards and delegators_count for each pool) - /// Key of returned map is the SPO 'operator' ID + /// DelegatedStake> pub fn generate_spdd(&self) -> BTreeMap { // Shareable Dashmap with referenced keys let spo_stakes = DashMap::::new(); @@ -317,6 +317,24 @@ impl StakeAddressMap { spo_stakes.iter().map(|entry| (entry.key().clone(), entry.value().clone())).collect() } + /// Dump current Stake Pool Delegation Distribution State + /// (Stake Key, Active Stakes Amount)> + pub fn dump_spdd_state(&self) -> HashMap> { + let entries: Vec<_> = self + .inner + .par_iter() + .filter_map(|(key, sas)| { + sas.delegated_spo.as_ref().map(|spo| (spo.clone(), (key.clone(), sas.utxo_value))) + }) + .collect(); + + let mut result: HashMap> = HashMap::new(); + for (spo, entry) in entries { + result.entry(spo).or_default().push(entry); + } + result + } + /// Derive the DRep Delegation Distribution (DRDD) - the total amount /// delegated to each DRep, including the special "abstain" and "no confidence" dreps. pub fn generate_drdd( diff --git a/modules/accounts_state/.gitignore b/modules/accounts_state/.gitignore new file mode 100644 index 00000000..9dec22b5 --- /dev/null +++ b/modules/accounts_state/.gitignore @@ -0,0 +1 @@ +spdd-db/ \ No newline at end of file diff --git a/modules/accounts_state/Cargo.toml b/modules/accounts_state/Cargo.toml index 171fa9f7..7e511ccc 100644 --- a/modules/accounts_state/Cargo.toml +++ b/modules/accounts_state/Cargo.toml @@ -24,6 +24,7 @@ serde_json = { workspace = true } serde_with = { workspace = true } tokio = { workspace = true } tracing = { workspace = true } +fjall = "2.11.2" [lib] path = "src/accounts_state.rs" diff --git a/modules/accounts_state/src/accounts_state.rs b/modules/accounts_state/src/accounts_state.rs index 5a0a51f6..a432d9fb 100644 --- a/modules/accounts_state/src/accounts_state.rs +++ b/modules/accounts_state/src/accounts_state.rs @@ -31,6 +31,9 @@ use acropolis_common::queries::accounts::{ AccountInfo, AccountsStateQuery, AccountsStateQueryResponse, }; +use crate::spo_distribution_store::SPDDStore; +mod spo_distribution_store; + const DEFAULT_SPO_STATE_TOPIC: &str = "cardano.spo.state"; const DEFAULT_EPOCH_ACTIVITY_TOPIC: &str = "cardano.epoch.activity"; const DEFAULT_TX_CERTIFICATES_TOPIC: &str = "cardano.certificates"; @@ -44,6 +47,8 @@ const DEFAULT_SPO_REWARDS_TOPIC: &str = "cardano.spo.rewards"; const DEFAULT_PROTOCOL_PARAMETERS_TOPIC: &str = "cardano.protocol.parameters"; const DEFAULT_STAKE_REWARD_DELTAS_TOPIC: &str = "cardano.stake.reward.deltas"; +const DEFAULT_STORE_SPDD_HISTORY: (&str, bool) = ("store-spdd-history", false); + /// Accounts State module #[module( message_type(Message), @@ -56,6 +61,7 @@ impl AccountsState { /// Async run loop async fn run( history: Arc>>, + spdd_store: Option>>, mut drep_publisher: DRepDistributionPublisher, mut spo_publisher: SPODistributionPublisher, mut spo_rewards_publisher: SPORewardsPublisher, @@ -136,6 +142,11 @@ impl AccountsState { let ea_message_f = ea_subscription.read(); let params_message_f = parameters_subscription.read(); + let spdd_store_guard = match spdd_store.as_ref() { + Some(s) => Some(s.lock().await), + None => None, + }; + // Handle DRep let (_, message) = dreps_message_f.await?; match message.as_ref() { @@ -177,6 +188,17 @@ impl AccountsState { if let Err(e) = spo_publisher.publish_spdd(block_info, spdd).await { error!("Error publishing SPO stake distribution: {e:#}") } + + // if we store spdd history + let spdd_state = state.dump_spdd_state(); + if let Some(spdd_store) = spdd_store_guard { + // active stakes taken at beginning of epoch i is for epoch + 1 + if let Err(e) = + spdd_store.store_spdd(block_info.epoch + 1, spdd_state) + { + error!("Error storing SPDD state: {e:#}") + } + } } .instrument(span) .await; @@ -400,6 +422,11 @@ impl AccountsState { .unwrap_or(DEFAULT_STAKE_REWARD_DELTAS_TOPIC.to_string()); info!("Creating stake reward deltas subscriber on '{stake_reward_deltas_topic}'"); + // store spdd history config + let store_spdd_history = + config.get_bool(DEFAULT_STORE_SPDD_HISTORY.0).unwrap_or(DEFAULT_STORE_SPDD_HISTORY.1); + info!("Store SPDD history: {}", store_spdd_history); + // Query topics let accounts_query_topic = config .get_string(DEFAULT_ACCOUNTS_QUERY_TOPIC.0) @@ -411,11 +438,23 @@ impl AccountsState { "AccountsState", StateHistoryStore::default_block_store(), ))); - let history_account_state = history.clone(); + let history_query = history.clone(); let history_tick = history.clone(); + // Create spdd history + // Return Err if failed to load SPDD store + let spdd_store = if store_spdd_history { + Some(Arc::new(Mutex::new(SPDDStore::load( + std::path::Path::new("spdd_db"), + )?))) + } else { + None + }; + let spdd_store_query = spdd_store.clone(); + context.handle(&accounts_query_topic, move |message| { - let history = history_account_state.clone(); + let history = history_query.clone(); + let spdd_store = spdd_store_query.clone(); async move { let Message::StateQuery(StateQuery::Accounts(query)) = message.as_ref() else { return Arc::new(Message::StateQueryResponse(StateQueryResponse::Accounts( @@ -426,6 +465,10 @@ impl AccountsState { }; let guard = history.lock().await; + let spdd_store_guard = match spdd_store.as_ref() { + Some(s) => Some(s.lock().await), + None => None, + }; let state = match guard.current() { Some(s) => s, None => { @@ -530,6 +573,36 @@ impl AccountsState { } } + AccountsStateQuery::GetSPDDByEpoch { epoch } => match spdd_store_guard { + Some(spdd_store) => { + let result = spdd_store.query_by_epoch(*epoch); + match result { + Ok(result) => AccountsStateQueryResponse::SPDDByEpoch(result), + Err(e) => AccountsStateQueryResponse::Error(e.to_string()), + } + } + None => AccountsStateQueryResponse::Error( + "SPDD store is not enabled".to_string(), + ), + }, + + AccountsStateQuery::GetSPDDByEpochAndPool { epoch, pool_id } => { + match spdd_store_guard { + Some(spdd_store) => { + let result = spdd_store.query_by_epoch_and_pool(*epoch, pool_id); + match result { + Ok(result) => { + AccountsStateQueryResponse::SPDDByEpochAndPool(result) + } + Err(e) => AccountsStateQueryResponse::Error(e.to_string()), + } + } + None => AccountsStateQueryResponse::Error( + "SPDD store is not enabled".to_string(), + ), + } + } + _ => AccountsStateQueryResponse::Error(format!( "Unimplemented query variant: {:?}", query @@ -586,6 +659,7 @@ impl AccountsState { context.run(async move { Self::run( history, + spdd_store, drep_publisher, spo_publisher, spo_rewards_publisher, diff --git a/modules/accounts_state/src/spo_distribution_store.rs b/modules/accounts_state/src/spo_distribution_store.rs new file mode 100644 index 00000000..b4d5e91c --- /dev/null +++ b/modules/accounts_state/src/spo_distribution_store.rs @@ -0,0 +1,175 @@ +use std::collections::HashMap; + +use acropolis_common::KeyHash; +use fjall::{Config, Keyspace, PartitionCreateOptions}; + +const POOL_ID_LEN: usize = 28; +const STAKE_KEY_LEN: usize = 28; +const EPOCH_LEN: usize = 8; +const TOTAL_KEY_LEN: usize = EPOCH_LEN + POOL_ID_LEN + STAKE_KEY_LEN; // 64 bytes +const BATCH_SIZE: usize = 10_000; + +/// Encode: epoch + pool_id + stake_key +fn encode_key(epoch: u64, pool_id: &KeyHash, stake_key: &KeyHash) -> Vec { + let mut key = Vec::with_capacity(TOTAL_KEY_LEN); + key.extend_from_slice(&epoch.to_be_bytes()); + key.extend_from_slice(pool_id); + key.extend_from_slice(stake_key); + key +} + +/// Encode: epoch + pool_id (for prefix queries) +fn encode_epoch_pool_prefix(epoch: u64, pool_id: &KeyHash) -> Vec { + let mut prefix = Vec::with_capacity(EPOCH_LEN + POOL_ID_LEN); + prefix.extend_from_slice(&epoch.to_be_bytes()); + prefix.extend_from_slice(pool_id); + prefix +} + +/// Decode key to extract pool_id and stake_key +fn decode_key(key: &[u8]) -> (KeyHash, KeyHash) { + let pool_id = key[EPOCH_LEN..EPOCH_LEN + POOL_ID_LEN].to_vec(); + let stake_key = key[EPOCH_LEN + POOL_ID_LEN..].to_vec(); + (pool_id, stake_key) +} + +pub struct SPDDStore { + keyspace: Keyspace, + /// Single partition for all SPDD data + /// Key format: epoch(8 bytes) + pool_id + stake_key + /// Value: amount(8 bytes) + spdd: fjall::PartitionHandle, +} + +impl SPDDStore { + pub fn load(path: impl AsRef) -> fjall::Result { + let keyspace = Config::new(path).open()?; + let spdd = keyspace.open_partition("spdd", PartitionCreateOptions::default())?; + + Ok(Self { keyspace, spdd }) + } + + /// Drop the partition and recreate it (fastest way to clear) + #[allow(dead_code)] + pub fn reset(&mut self) -> fjall::Result<()> { + if let Ok(spdd) = self.keyspace.open_partition("spdd", PartitionCreateOptions::default()) { + self.keyspace.delete_partition(self.spdd.clone())?; + self.spdd = spdd; + } + Ok(()) + } + + /// Store SPDD state for an epoch + pub fn store_spdd( + &self, + epoch: u64, + spdd_state: HashMap>, + ) -> fjall::Result<()> { + let mut batch = self.keyspace.batch(); + let mut count = 0; + + for (pool_id, delegations) in spdd_state { + for (stake_key, amount) in delegations { + let key = encode_key(epoch, &pool_id, &stake_key); + let value = amount.to_be_bytes(); + batch.insert(&self.spdd, key, value); + + count += 1; + if count >= BATCH_SIZE { + batch.commit()?; + batch = self.keyspace.batch(); + count = 0; + } + } + } + + // Commit remaining entries + if count > 0 { + batch.commit()?; + } + + Ok(()) + } + + /// Query all data for an epoch + /// Returns: Vec<(PoolId, StakeKey, ActiveStakeAmount)> + pub fn query_by_epoch(&self, epoch: u64) -> fjall::Result> { + let prefix = epoch.to_be_bytes(); + let mut result = Vec::new(); + + for item in self.spdd.prefix(prefix) { + let (key, value) = item?; + let (pool_id, stake_key) = decode_key(&key); + let amount = u64::from_be_bytes(value.as_ref().try_into().unwrap()); + result.push((pool_id, stake_key, amount)); + } + + Ok(result) + } + + /// Query by epoch and pool_id + /// Returns: Vec<(StakeKey, ActiveStakeAmount)> + pub fn query_by_epoch_and_pool( + &self, + epoch: u64, + pool_id: &KeyHash, + ) -> fjall::Result> { + let prefix = encode_epoch_pool_prefix(epoch, pool_id); + let mut result = Vec::new(); + + for item in self.spdd.prefix(prefix) { + let (key, value) = item?; + let stake_key = key[EPOCH_LEN + POOL_ID_LEN..].to_vec(); + let amount = u64::from_be_bytes(value.as_ref().try_into().unwrap()); + result.push((stake_key, amount)); + } + + Ok(result) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + const DB_PATH: &str = "spdd_db"; + + #[test] + fn test_spdd_store_load() { + let spdd_store = SPDDStore::load(std::path::Path::new(DB_PATH)); + assert!(spdd_store.is_ok()); + } + + #[test] + fn test_store_spdd_state() { + let spdd_store = + SPDDStore::load(std::path::Path::new(DB_PATH)).expect("Failed to load SPDD store"); + let mut spdd_state: HashMap> = HashMap::new(); + spdd_state.insert( + vec![0x01; 28], + vec![(vec![0x10; 28], 100), (vec![0x11; 28], 150)], + ); + spdd_state.insert( + vec![0x02; 28], + vec![(vec![0x20; 28], 200), (vec![0x21; 28], 250)], + ); + spdd_store.store_spdd(1, spdd_state).expect("Failed to store SPDD state"); + let result = spdd_store.query_by_epoch(1).expect("Failed to query SPDD state"); + assert_eq!(result.len(), 4); + let result = spdd_store + .query_by_epoch_and_pool(1, &vec![0x01; 28]) + .expect("Failed to query SPDD state"); + assert_eq!(result.len(), 2); + let result = spdd_store + .query_by_epoch_and_pool(1, &vec![0x02; 28]) + .expect("Failed to query SPDD state"); + assert_eq!(result.len(), 2); + } + + #[test] + fn test_spdd_store_reset() { + let mut spdd_store = + SPDDStore::load(std::path::Path::new(DB_PATH)).expect("Failed to load SPDD store"); + spdd_store.reset().expect("Failed to reset SPDD store"); + } +} diff --git a/modules/accounts_state/src/state.rs b/modules/accounts_state/src/state.rs index d6e4d8d2..524ccf62 100644 --- a/modules/accounts_state/src/state.rs +++ b/modules/accounts_state/src/state.rs @@ -435,6 +435,11 @@ impl State { stake_addresses.generate_spdd() } + pub fn dump_spdd_state(&self) -> HashMap> { + let stake_addresses = self.stake_addresses.lock().unwrap(); + stake_addresses.dump_spdd_state() + } + /// Derive the DRep Delegation Distribution (SPDD) - the total amount /// delegated to each DRep, including the special "abstain" and "no confidence" dreps. pub fn generate_drdd(&self) -> DRepDelegationDistribution { diff --git a/modules/rest_blockfrost/src/handlers/epochs.rs b/modules/rest_blockfrost/src/handlers/epochs.rs index 3fe85659..8bf82c8f 100644 --- a/modules/rest_blockfrost/src/handlers/epochs.rs +++ b/modules/rest_blockfrost/src/handlers/epochs.rs @@ -1,6 +1,8 @@ use crate::{ handlers_config::HandlersConfig, - types::{EpochActivityRest, ProtocolParamsRest}, + types::{ + EpochActivityRest, ProtocolParamsRest, SPDDByEpochAndPoolItemRest, SPDDByEpochItemRest, + }, }; use acropolis_common::{ messages::{Message, RESTResponse, StateQuery, StateQueryResponse}, @@ -11,6 +13,7 @@ use acropolis_common::{ spdd::{SPDDStateQuery, SPDDStateQueryResponse}, utils::query_state, }, + serialization::Bech32WithHrp, }; use anyhow::{anyhow, Result}; use caryatid_sdk::Context; @@ -271,19 +274,178 @@ pub async fn handle_epoch_previous_blockfrost( } pub async fn handle_epoch_total_stakes_blockfrost( - _context: Arc>, - _params: Vec, - _handlers_config: Arc, + context: Arc>, + params: Vec, + handlers_config: Arc, ) -> Result { - Ok(RESTResponse::with_text(501, "Not implemented")) + if params.len() != 1 { + return Ok(RESTResponse::with_text( + 400, + "Expected one parameter: an epoch number", + )); + } + let param = ¶ms[0]; + + let epoch_number = match param.parse::() { + Ok(num) => num, + Err(_) => { + return Ok(RESTResponse::with_text( + 400, + "Invalid epoch number parameter", + )); + } + }; + + // Query latest epoch from epochs-state + let latest_epoch_msg = Arc::new(Message::StateQuery(StateQuery::Epochs( + EpochsStateQuery::GetLatestEpoch, + ))); + let latest_epoch = query_state( + &context, + &handlers_config.epochs_query_topic, + latest_epoch_msg, + |message| match message { + Message::StateQueryResponse(StateQueryResponse::Epochs( + EpochsStateQueryResponse::LatestEpoch(res), + )) => Ok(res.epoch.epoch), + _ => Err(anyhow::anyhow!( + "Unexpected message type while retrieving latest epoch" + )), + }, + ) + .await?; + + if epoch_number > latest_epoch { + return Ok(RESTResponse::with_text(404, "Epoch not found")); + } + + // Query SPDD by epoch from accounts-state + let msg = Arc::new(Message::StateQuery(StateQuery::Accounts( + AccountsStateQuery::GetSPDDByEpoch { + epoch: epoch_number, + }, + ))); + let spdd = query_state( + &context, + &handlers_config.accounts_query_topic, + msg, + |message| match message { + Message::StateQueryResponse(StateQueryResponse::Accounts( + AccountsStateQueryResponse::SPDDByEpoch(res), + )) => Ok(res), + Message::StateQueryResponse(StateQueryResponse::Accounts( + AccountsStateQueryResponse::Error(e), + )) => Err(anyhow::anyhow!( + "Internal server error while retrieving SPDD by epoch: {e}" + )), + _ => Err(anyhow::anyhow!( + "Unexpected message type while retrieving SPDD by epoch" + )), + }, + ) + .await?; + let spdd_response = + spdd.into_iter().map(|item| SPDDByEpochItemRest::from(item)).collect::>(); + + match serde_json::to_string_pretty(&spdd_response) { + Ok(json) => Ok(RESTResponse::with_json(200, &json)), + Err(e) => Ok(RESTResponse::with_text( + 500, + &format!("Failed to serialize SPDD by epoch: {e}"), + )), + } } pub async fn handle_epoch_pool_stakes_blockfrost( - _context: Arc>, - _params: Vec, - _handlers_config: Arc, + context: Arc>, + params: Vec, + handlers_config: Arc, ) -> Result { - Ok(RESTResponse::with_text(501, "Not implemented")) + if params.len() != 2 { + return Ok(RESTResponse::with_text( + 400, + "Expected two parameters: an epoch number and a pool ID", + )); + } + let param = ¶ms[0]; + let pool_id = ¶ms[1]; + + let epoch_number = match param.parse::() { + Ok(num) => num, + Err(_) => { + return Ok(RESTResponse::with_text( + 400, + "Invalid epoch number parameter", + )); + } + }; + + let Ok(pool_id) = Vec::::from_bech32_with_hrp(pool_id, "pool") else { + return Ok(RESTResponse::with_text( + 400, + &format!("Invalid Bech32 stake pool ID: {pool_id}"), + )); + }; + + // Query latest epoch from epochs-state + let latest_epoch_msg = Arc::new(Message::StateQuery(StateQuery::Epochs( + EpochsStateQuery::GetLatestEpoch, + ))); + let latest_epoch = query_state( + &context, + &handlers_config.epochs_query_topic, + latest_epoch_msg, + |message| match message { + Message::StateQueryResponse(StateQueryResponse::Epochs( + EpochsStateQueryResponse::LatestEpoch(res), + )) => Ok(res.epoch.epoch), + _ => Err(anyhow::anyhow!( + "Unexpected message type while retrieving latest epoch" + )), + }, + ) + .await?; + + if epoch_number > latest_epoch { + return Ok(RESTResponse::with_text(404, "Epoch not found")); + } + + // Query SPDD by epoch and pool from accounts-state + let msg = Arc::new(Message::StateQuery(StateQuery::Accounts( + AccountsStateQuery::GetSPDDByEpochAndPool { + epoch: epoch_number, + pool_id, + }, + ))); + let spdd = query_state( + &context, + &handlers_config.accounts_query_topic, + msg, + |message| match message { + Message::StateQueryResponse(StateQueryResponse::Accounts( + AccountsStateQueryResponse::SPDDByEpochAndPool(res), + )) => Ok(res), + Message::StateQueryResponse(StateQueryResponse::Accounts( + AccountsStateQueryResponse::Error(e), + )) => Err(anyhow::anyhow!( + "Internal server error while retrieving SPDD by epoch and pool: {e}" + )), + _ => Err(anyhow::anyhow!( + "Unexpected message type while retrieving SPDD by epoch and pool" + )), + }, + ) + .await?; + let spdd_response = + spdd.into_iter().map(|item| SPDDByEpochAndPoolItemRest::from(item)).collect::>(); + + match serde_json::to_string_pretty(&spdd_response) { + Ok(json) => Ok(RESTResponse::with_json(200, &json)), + Err(e) => Ok(RESTResponse::with_text( + 500, + &format!("Failed to serialize SPDD by epoch and pool: {e}"), + )), + } } pub async fn handle_epoch_total_blocks_blockfrost( diff --git a/modules/rest_blockfrost/src/types.rs b/modules/rest_blockfrost/src/types.rs index 6730eaf7..ef40fde8 100644 --- a/modules/rest_blockfrost/src/types.rs +++ b/modules/rest_blockfrost/src/types.rs @@ -4,6 +4,7 @@ use acropolis_common::{ protocol_params::{Nonce, NonceVariant, ProtocolParams}, queries::governance::DRepActionUpdate, rest_helper::ToCheckedF64, + serialization::{DisplayFromBech32, PoolPrefix, StakePrefix}, AssetAddressEntry, AssetMetadataStandard, AssetMintRecord, KeyHash, PolicyAsset, PoolEpochState, PoolUpdateAction, Relay, TxHash, Vote, }; @@ -50,6 +51,47 @@ impl From for EpochActivityRest { } } +// REST response structure for /epochs/{number}/stakes +#[serde_as] +#[derive(Serialize)] +pub struct SPDDByEpochItemRest { + #[serde_as(as = "DisplayFromBech32")] + stake_address: KeyHash, + #[serde_as(as = "DisplayFromBech32")] + pool_id: KeyHash, + #[serde_as(as = "DisplayFromStr")] + amount: u64, +} + +impl From<(KeyHash, KeyHash, u64)> for SPDDByEpochItemRest { + fn from((pool_id, stake_address, amount): (KeyHash, KeyHash, u64)) -> Self { + Self { + pool_id, + stake_address, + amount, + } + } +} + +// REST response structure for /epochs/{number}/stakes/{pool_id} +#[serde_as] +#[derive(Serialize)] +pub struct SPDDByEpochAndPoolItemRest { + #[serde_as(as = "DisplayFromBech32")] + stake_address: KeyHash, + #[serde_as(as = "DisplayFromStr")] + amount: u64, +} + +impl From<(KeyHash, u64)> for SPDDByEpochAndPoolItemRest { + fn from((stake_address, amount): (KeyHash, u64)) -> Self { + Self { + stake_address, + amount, + } + } +} + // REST response structure for /governance/dreps #[derive(Serialize)] pub struct DRepsListREST { diff --git a/processes/omnibus/.gitignore b/processes/omnibus/.gitignore index e35753a2..4c2d2d58 100644 --- a/processes/omnibus/.gitignore +++ b/processes/omnibus/.gitignore @@ -2,3 +2,6 @@ downloads sled-immutable-utxos fjall-immutable-utxos cache + +# DB files +*_db \ No newline at end of file diff --git a/processes/omnibus/omnibus.toml b/processes/omnibus/omnibus.toml index 12b9954c..d64573c4 100644 --- a/processes/omnibus/omnibus.toml +++ b/processes/omnibus/omnibus.toml @@ -82,6 +82,9 @@ write-full-cache = "false" store-history = false [module.accounts-state] +# Store SPDD history +# Enable /epochs/{number}/stakes, /epochs/{number}/stakes/{pool_id} endpoints +store-spdd-history = true [module.assets-state] # Enables /assets endpoint From 14cdb9cde3744c6092fa7ca78244b181b36339d6 Mon Sep 17 00:00:00 2001 From: Golddy Dev Date: Mon, 13 Oct 2025 20:12:37 +0200 Subject: [PATCH 02/13] fix: stake address string to represent mainnet's stake key hash --- .../rest_blockfrost/src/handlers/epochs.rs | 37 +++++++++++++++++-- modules/rest_blockfrost/src/types.rs | 34 ++++------------- 2 files changed, 40 insertions(+), 31 deletions(-) diff --git a/modules/rest_blockfrost/src/handlers/epochs.rs b/modules/rest_blockfrost/src/handlers/epochs.rs index 8bf82c8f..068a6e7a 100644 --- a/modules/rest_blockfrost/src/handlers/epochs.rs +++ b/modules/rest_blockfrost/src/handlers/epochs.rs @@ -14,6 +14,7 @@ use acropolis_common::{ utils::query_state, }, serialization::Bech32WithHrp, + AddressNetwork, StakeAddress, StakeAddressPayload, }; use anyhow::{anyhow, Result}; use caryatid_sdk::Context; @@ -344,8 +345,22 @@ pub async fn handle_epoch_total_stakes_blockfrost( }, ) .await?; - let spdd_response = - spdd.into_iter().map(|item| SPDDByEpochItemRest::from(item)).collect::>(); + let spdd_response = spdd + .into_iter() + .map(|(pool_id, stake_key_hash, amount)| { + let stake_address = StakeAddress { + network: AddressNetwork::Main, + payload: StakeAddressPayload::StakeKeyHash(stake_key_hash), + } + .to_string() + .map_err(|e| anyhow::anyhow!("Failed to convert stake address to string: {e}"))?; + Ok(SPDDByEpochItemRest { + pool_id, + stake_address, + amount, + }) + }) + .collect::>>()?; match serde_json::to_string_pretty(&spdd_response) { Ok(json) => Ok(RESTResponse::with_json(200, &json)), @@ -436,8 +451,22 @@ pub async fn handle_epoch_pool_stakes_blockfrost( }, ) .await?; - let spdd_response = - spdd.into_iter().map(|item| SPDDByEpochAndPoolItemRest::from(item)).collect::>(); + let spdd_response = spdd + .into_iter() + .map(|(stake_key_hash, amount)| { + let stake_address = StakeAddress { + network: AddressNetwork::Main, + payload: StakeAddressPayload::StakeKeyHash(stake_key_hash), + } + .to_string() + .map_err(|e| anyhow::anyhow!("Failed to convert stake address to string: {e}"))?; + + Ok(SPDDByEpochAndPoolItemRest { + stake_address, + amount, + }) + }) + .collect::>>()?; match serde_json::to_string_pretty(&spdd_response) { Ok(json) => Ok(RESTResponse::with_json(200, &json)), diff --git a/modules/rest_blockfrost/src/types.rs b/modules/rest_blockfrost/src/types.rs index ef40fde8..e7dd37ca 100644 --- a/modules/rest_blockfrost/src/types.rs +++ b/modules/rest_blockfrost/src/types.rs @@ -4,10 +4,11 @@ use acropolis_common::{ protocol_params::{Nonce, NonceVariant, ProtocolParams}, queries::governance::DRepActionUpdate, rest_helper::ToCheckedF64, - serialization::{DisplayFromBech32, PoolPrefix, StakePrefix}, + serialization::{DisplayFromBech32, PoolPrefix}, AssetAddressEntry, AssetMetadataStandard, AssetMintRecord, KeyHash, PolicyAsset, PoolEpochState, PoolUpdateAction, Relay, TxHash, Vote, }; +use anyhow::Result; use num_traits::ToPrimitive; use rust_decimal::Decimal; use serde::Serialize; @@ -55,41 +56,20 @@ impl From for EpochActivityRest { #[serde_as] #[derive(Serialize)] pub struct SPDDByEpochItemRest { - #[serde_as(as = "DisplayFromBech32")] - stake_address: KeyHash, + pub stake_address: String, #[serde_as(as = "DisplayFromBech32")] - pool_id: KeyHash, + pub pool_id: KeyHash, #[serde_as(as = "DisplayFromStr")] - amount: u64, -} - -impl From<(KeyHash, KeyHash, u64)> for SPDDByEpochItemRest { - fn from((pool_id, stake_address, amount): (KeyHash, KeyHash, u64)) -> Self { - Self { - pool_id, - stake_address, - amount, - } - } + pub amount: u64, } // REST response structure for /epochs/{number}/stakes/{pool_id} #[serde_as] #[derive(Serialize)] pub struct SPDDByEpochAndPoolItemRest { - #[serde_as(as = "DisplayFromBech32")] - stake_address: KeyHash, + pub stake_address: String, #[serde_as(as = "DisplayFromStr")] - amount: u64, -} - -impl From<(KeyHash, u64)> for SPDDByEpochAndPoolItemRest { - fn from((stake_address, amount): (KeyHash, u64)) -> Self { - Self { - stake_address, - amount, - } - } + pub amount: u64, } // REST response structure for /governance/dreps From dde93cf02c4bdc0675d0e2be45b290a2d089663e Mon Sep 17 00:00:00 2001 From: Golddy Dev Date: Mon, 13 Oct 2025 20:24:26 +0200 Subject: [PATCH 03/13] fix: test cases --- modules/accounts_state/.gitignore | 2 +- modules/accounts_state/src/accounts_state.rs | 8 ++--- .../src/spo_distribution_store.rs | 35 ++++++------------- 3 files changed, 15 insertions(+), 30 deletions(-) diff --git a/modules/accounts_state/.gitignore b/modules/accounts_state/.gitignore index 9dec22b5..fffd97c5 100644 --- a/modules/accounts_state/.gitignore +++ b/modules/accounts_state/.gitignore @@ -1 +1 @@ -spdd-db/ \ No newline at end of file +*_db \ No newline at end of file diff --git a/modules/accounts_state/src/accounts_state.rs b/modules/accounts_state/src/accounts_state.rs index a432d9fb..b336d584 100644 --- a/modules/accounts_state/src/accounts_state.rs +++ b/modules/accounts_state/src/accounts_state.rs @@ -442,11 +442,11 @@ impl AccountsState { let history_tick = history.clone(); // Create spdd history - // Return Err if failed to load SPDD store + // Return Err if failed to create SPDD store let spdd_store = if store_spdd_history { - Some(Arc::new(Mutex::new(SPDDStore::load( - std::path::Path::new("spdd_db"), - )?))) + Some(Arc::new(Mutex::new(SPDDStore::new(std::path::Path::new( + "spdd_db", + ))?))) } else { None }; diff --git a/modules/accounts_state/src/spo_distribution_store.rs b/modules/accounts_state/src/spo_distribution_store.rs index b4d5e91c..bd544af8 100644 --- a/modules/accounts_state/src/spo_distribution_store.rs +++ b/modules/accounts_state/src/spo_distribution_store.rs @@ -42,23 +42,20 @@ pub struct SPDDStore { } impl SPDDStore { - pub fn load(path: impl AsRef) -> fjall::Result { + pub fn new(path: impl AsRef) -> fjall::Result { + let path = path.as_ref(); + + // Delete existing data + if path.exists() { + std::fs::remove_dir_all(path).map_err(|e| fjall::Error::Io(e))?; + } + let keyspace = Config::new(path).open()?; let spdd = keyspace.open_partition("spdd", PartitionCreateOptions::default())?; Ok(Self { keyspace, spdd }) } - /// Drop the partition and recreate it (fastest way to clear) - #[allow(dead_code)] - pub fn reset(&mut self) -> fjall::Result<()> { - if let Ok(spdd) = self.keyspace.open_partition("spdd", PartitionCreateOptions::default()) { - self.keyspace.delete_partition(self.spdd.clone())?; - self.spdd = spdd; - } - Ok(()) - } - /// Store SPDD state for an epoch pub fn store_spdd( &self, @@ -134,16 +131,10 @@ mod tests { const DB_PATH: &str = "spdd_db"; - #[test] - fn test_spdd_store_load() { - let spdd_store = SPDDStore::load(std::path::Path::new(DB_PATH)); - assert!(spdd_store.is_ok()); - } - #[test] fn test_store_spdd_state() { let spdd_store = - SPDDStore::load(std::path::Path::new(DB_PATH)).expect("Failed to load SPDD store"); + SPDDStore::new(std::path::Path::new(DB_PATH)).expect("Failed to create SPDD store"); let mut spdd_state: HashMap> = HashMap::new(); spdd_state.insert( vec![0x01; 28], @@ -154,6 +145,7 @@ mod tests { vec![(vec![0x20; 28], 200), (vec![0x21; 28], 250)], ); spdd_store.store_spdd(1, spdd_state).expect("Failed to store SPDD state"); + let result = spdd_store.query_by_epoch(1).expect("Failed to query SPDD state"); assert_eq!(result.len(), 4); let result = spdd_store @@ -165,11 +157,4 @@ mod tests { .expect("Failed to query SPDD state"); assert_eq!(result.len(), 2); } - - #[test] - fn test_spdd_store_reset() { - let mut spdd_store = - SPDDStore::load(std::path::Path::new(DB_PATH)).expect("Failed to load SPDD store"); - spdd_store.reset().expect("Failed to reset SPDD store"); - } } From 668f821b60c5711aefe6de95cf9a014dbc036ede Mon Sep 17 00:00:00 2001 From: Golddy Dev Date: Mon, 13 Oct 2025 20:25:09 +0200 Subject: [PATCH 04/13] fix: omnibus.toml --- processes/omnibus/omnibus.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/processes/omnibus/omnibus.toml b/processes/omnibus/omnibus.toml index d64573c4..cfc144b9 100644 --- a/processes/omnibus/omnibus.toml +++ b/processes/omnibus/omnibus.toml @@ -84,7 +84,7 @@ store-history = false [module.accounts-state] # Store SPDD history # Enable /epochs/{number}/stakes, /epochs/{number}/stakes/{pool_id} endpoints -store-spdd-history = true +store-spdd-history = false [module.assets-state] # Enables /assets endpoint From 369a71edaa8b422a79f8a1407a133650d19ff083 Mon Sep 17 00:00:00 2001 From: Golddy Dev Date: Tue, 14 Oct 2025 09:14:04 +0200 Subject: [PATCH 05/13] refactor: add SPDD database path configuration and update SPDDStore initialization - Removed existing data deletion logic from SPDDStore initialization. --- modules/accounts_state/src/accounts_state.rs | 14 +++++++++++--- .../accounts_state/src/spo_distribution_store.rs | 5 ----- processes/omnibus/omnibus.toml | 2 ++ 3 files changed, 13 insertions(+), 8 deletions(-) diff --git a/modules/accounts_state/src/accounts_state.rs b/modules/accounts_state/src/accounts_state.rs index b336d584..a876fc74 100644 --- a/modules/accounts_state/src/accounts_state.rs +++ b/modules/accounts_state/src/accounts_state.rs @@ -48,6 +48,7 @@ const DEFAULT_PROTOCOL_PARAMETERS_TOPIC: &str = "cardano.protocol.parameters"; const DEFAULT_STAKE_REWARD_DELTAS_TOPIC: &str = "cardano.stake.reward.deltas"; const DEFAULT_STORE_SPDD_HISTORY: (&str, bool) = ("store-spdd-history", false); +const DEFAULT_SPDD_DB_PATH: (&str, &str) = ("spdd-db-path", "./spdd_db"); /// Accounts State module #[module( @@ -427,6 +428,13 @@ impl AccountsState { config.get_bool(DEFAULT_STORE_SPDD_HISTORY.0).unwrap_or(DEFAULT_STORE_SPDD_HISTORY.1); info!("Store SPDD history: {}", store_spdd_history); + let spdd_db_path = config + .get_string(DEFAULT_SPDD_DB_PATH.0) + .unwrap_or(DEFAULT_SPDD_DB_PATH.1.to_string()); + if store_spdd_history { + info!("SPDD database path: {}", spdd_db_path); + } + // Query topics let accounts_query_topic = config .get_string(DEFAULT_ACCOUNTS_QUERY_TOPIC.0) @@ -444,9 +452,9 @@ impl AccountsState { // Create spdd history // Return Err if failed to create SPDD store let spdd_store = if store_spdd_history { - Some(Arc::new(Mutex::new(SPDDStore::new(std::path::Path::new( - "spdd_db", - ))?))) + Some(Arc::new(Mutex::new(SPDDStore::new( + std::path::Path::new(&spdd_db_path), + )?))) } else { None }; diff --git a/modules/accounts_state/src/spo_distribution_store.rs b/modules/accounts_state/src/spo_distribution_store.rs index bd544af8..21e4ba26 100644 --- a/modules/accounts_state/src/spo_distribution_store.rs +++ b/modules/accounts_state/src/spo_distribution_store.rs @@ -45,11 +45,6 @@ impl SPDDStore { pub fn new(path: impl AsRef) -> fjall::Result { let path = path.as_ref(); - // Delete existing data - if path.exists() { - std::fs::remove_dir_all(path).map_err(|e| fjall::Error::Io(e))?; - } - let keyspace = Config::new(path).open()?; let spdd = keyspace.open_partition("spdd", PartitionCreateOptions::default())?; diff --git a/processes/omnibus/omnibus.toml b/processes/omnibus/omnibus.toml index cfc144b9..47509ef6 100644 --- a/processes/omnibus/omnibus.toml +++ b/processes/omnibus/omnibus.toml @@ -85,6 +85,8 @@ store-history = false # Store SPDD history # Enable /epochs/{number}/stakes, /epochs/{number}/stakes/{pool_id} endpoints store-spdd-history = false +# SPDD database path (only used when store-spdd-history is enabled) +spdd-db-path = "./spdd_db" [module.assets-state] # Enables /assets endpoint From 785dab776df898160e4c54e9eba2ea972329e824 Mon Sep 17 00:00:00 2001 From: Golddy Dev Date: Tue, 14 Oct 2025 09:52:27 +0200 Subject: [PATCH 06/13] refactor: convert spdd path to absolute --- modules/accounts_state/src/accounts_state.rs | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/modules/accounts_state/src/accounts_state.rs b/modules/accounts_state/src/accounts_state.rs index 56985bc0..7763a77c 100644 --- a/modules/accounts_state/src/accounts_state.rs +++ b/modules/accounts_state/src/accounts_state.rs @@ -430,6 +430,16 @@ impl AccountsState { let spdd_db_path = config.get_string(DEFAULT_SPDD_DB_PATH.0).unwrap_or(DEFAULT_SPDD_DB_PATH.1.to_string()); + + // Convert to absolute path if relative + let spdd_db_path = if std::path::Path::new(&spdd_db_path).is_absolute() { + spdd_db_path + } else { + let current_dir = std::env::current_dir() + .map_err(|e| anyhow::anyhow!("Failed to get current directory: {}", e))?; + current_dir.join(&spdd_db_path).to_string_lossy().to_string() + }; + if store_spdd_history { info!("SPDD database path: {}", spdd_db_path); } From 148f883ba01c063c3e65738c2035886babe5bf45 Mon Sep 17 00:00:00 2001 From: Golddy Dev Date: Tue, 14 Oct 2025 10:02:12 +0200 Subject: [PATCH 07/13] feat: add network name query to parameters state - Introduced query to enum. - Use network from instead of hard-coded, in epochs spdd endpoints --- common/src/queries/parameters.rs | 3 + .../parameters_state/src/parameters_state.rs | 5 ++ .../rest_blockfrost/src/handlers/epochs.rs | 68 ++++++++++++++++++- 3 files changed, 74 insertions(+), 2 deletions(-) diff --git a/common/src/queries/parameters.rs b/common/src/queries/parameters.rs index 6abcc111..03e26408 100644 --- a/common/src/queries/parameters.rs +++ b/common/src/queries/parameters.rs @@ -7,12 +7,15 @@ pub const DEFAULT_PARAMETERS_QUERY_TOPIC: (&str, &str) = pub enum ParametersStateQuery { GetLatestEpochParameters, GetEpochParameters { epoch_number: u64 }, + GetNetworkName, } #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub enum ParametersStateQueryResponse { LatestEpochParameters(ProtocolParams), EpochParameters(ProtocolParams), + NetworkName(String), + NotFound, Error(String), } diff --git a/modules/parameters_state/src/parameters_state.rs b/modules/parameters_state/src/parameters_state.rs index 18b1a2d4..187295ec 100644 --- a/modules/parameters_state/src/parameters_state.rs +++ b/modules/parameters_state/src/parameters_state.rs @@ -196,6 +196,11 @@ impl ParametersState { } } } + ParametersStateQuery::GetNetworkName => { + ParametersStateQueryResponse::NetworkName( + lock.get_current_state().network_name.clone(), + ) + } }; Arc::new(Message::StateQueryResponse(StateQueryResponse::Parameters( response, diff --git a/modules/rest_blockfrost/src/handlers/epochs.rs b/modules/rest_blockfrost/src/handlers/epochs.rs index f6c37d2f..6204a9cd 100644 --- a/modules/rest_blockfrost/src/handlers/epochs.rs +++ b/modules/rest_blockfrost/src/handlers/epochs.rs @@ -256,6 +256,10 @@ pub async fn handle_epoch_params_blockfrost( "Protocol parameters not found for requested epoch", )), ParametersStateQueryResponse::Error(msg) => Ok(RESTResponse::with_text(400, &msg)), + _ => Ok(RESTResponse::with_text( + 500, + "Unexpected message type while retrieving parameters", + )), } } @@ -443,6 +447,36 @@ pub async fn handle_epoch_total_stakes_blockfrost( return Ok(RESTResponse::with_text(404, "Epoch not found")); } + // Query current network from parameters-state + let current_network_msg = Arc::new(Message::StateQuery(StateQuery::Parameters( + ParametersStateQuery::GetNetworkName, + ))); + let current_network = query_state( + &context, + &handlers_config.parameters_query_topic, + current_network_msg, + |message| match message { + Message::StateQueryResponse(StateQueryResponse::Parameters( + ParametersStateQueryResponse::NetworkName(network), + )) => Ok(network), + _ => Err(anyhow::anyhow!( + "Unexpected message type while retrieving current network" + )), + }, + ) + .await?; + + let network = match current_network.as_str() { + "mainnet" => AddressNetwork::Main, + "testnet" => AddressNetwork::Test, + unknown => { + return Ok(RESTResponse::with_text( + 404, + format!("Unknown network: {unknown}").as_str(), + )) + } + }; + // Query SPDD by epoch from accounts-state let msg = Arc::new(Message::StateQuery(StateQuery::Accounts( AccountsStateQuery::GetSPDDByEpoch { @@ -472,7 +506,7 @@ pub async fn handle_epoch_total_stakes_blockfrost( .into_iter() .map(|(pool_id, stake_key_hash, amount)| { let stake_address = StakeAddress { - network: AddressNetwork::Main, + network: network.clone(), payload: StakeAddressPayload::StakeKeyHash(stake_key_hash), } .to_string() @@ -548,6 +582,36 @@ pub async fn handle_epoch_pool_stakes_blockfrost( return Ok(RESTResponse::with_text(404, "Epoch not found")); } + // Query current network from parameters-state + let current_network_msg = Arc::new(Message::StateQuery(StateQuery::Parameters( + ParametersStateQuery::GetNetworkName, + ))); + let current_network = query_state( + &context, + &handlers_config.parameters_query_topic, + current_network_msg, + |message| match message { + Message::StateQueryResponse(StateQueryResponse::Parameters( + ParametersStateQueryResponse::NetworkName(network), + )) => Ok(network), + _ => Err(anyhow::anyhow!( + "Unexpected message type while retrieving current network" + )), + }, + ) + .await?; + + let network = match current_network.as_str() { + "mainnet" => AddressNetwork::Main, + "testnet" => AddressNetwork::Test, + unknown => { + return Ok(RESTResponse::with_text( + 404, + format!("Unknown network: {unknown}").as_str(), + )) + } + }; + // Query SPDD by epoch and pool from accounts-state let msg = Arc::new(Message::StateQuery(StateQuery::Accounts( AccountsStateQuery::GetSPDDByEpochAndPool { @@ -578,7 +642,7 @@ pub async fn handle_epoch_pool_stakes_blockfrost( .into_iter() .map(|(stake_key_hash, amount)| { let stake_address = StakeAddress { - network: AddressNetwork::Main, + network: network.clone(), payload: StakeAddressPayload::StakeKeyHash(stake_key_hash), } .to_string() From 42eacb696badbf30c36356b0ba9fd6a46648cb39 Mon Sep 17 00:00:00 2001 From: Golddy Dev Date: Tue, 14 Oct 2025 10:06:56 +0200 Subject: [PATCH 08/13] fix: return 500 for unknown network --- modules/rest_blockfrost/src/handlers/epochs.rs | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/modules/rest_blockfrost/src/handlers/epochs.rs b/modules/rest_blockfrost/src/handlers/epochs.rs index 6204a9cd..ddb7b003 100644 --- a/modules/rest_blockfrost/src/handlers/epochs.rs +++ b/modules/rest_blockfrost/src/handlers/epochs.rs @@ -471,8 +471,9 @@ pub async fn handle_epoch_total_stakes_blockfrost( "testnet" => AddressNetwork::Test, unknown => { return Ok(RESTResponse::with_text( - 404, - format!("Unknown network: {unknown}").as_str(), + 500, + format!("Internal server error while retrieving current network: {unknown}") + .as_str(), )) } }; @@ -606,8 +607,9 @@ pub async fn handle_epoch_pool_stakes_blockfrost( "testnet" => AddressNetwork::Test, unknown => { return Ok(RESTResponse::with_text( - 404, - format!("Unknown network: {unknown}").as_str(), + 500, + format!("Internal server error while retrieving current network: {unknown}") + .as_str(), )) } }; From 799399b17b1479f93accc2a1bfb9f3c95c62f437 Mon Sep 17 00:00:00 2001 From: Golddy Dev Date: Tue, 14 Oct 2025 10:32:10 +0200 Subject: [PATCH 09/13] feat: add SPDD retention epochs configuration and pruning logic - Introduced a new configuration option for SPDD retention epochs in accounts state. - Updated SPDDStore to support retention logic, allowing automatic pruning of old epochs. - Enhanced logging to provide information on retention settings and pruning actions. - Added tests to verify the pruning functionality for stored epochs. --- modules/accounts_state/src/accounts_state.rs | 23 ++++- .../src/spo_distribution_store.rs | 94 ++++++++++++++++++- processes/omnibus/omnibus.toml | 3 + 3 files changed, 112 insertions(+), 8 deletions(-) diff --git a/modules/accounts_state/src/accounts_state.rs b/modules/accounts_state/src/accounts_state.rs index 7763a77c..fde7ac2e 100644 --- a/modules/accounts_state/src/accounts_state.rs +++ b/modules/accounts_state/src/accounts_state.rs @@ -49,6 +49,7 @@ const DEFAULT_STAKE_REWARD_DELTAS_TOPIC: &str = "cardano.stake.reward.deltas"; const DEFAULT_STORE_SPDD_HISTORY: (&str, bool) = ("store-spdd-history", false); const DEFAULT_SPDD_DB_PATH: (&str, &str) = ("spdd-db-path", "./spdd_db"); +const DEFAULT_SPDD_RETENTION_EPOCHS: &str = "spdd-retention-epochs"; /// Accounts State module #[module( @@ -440,8 +441,23 @@ impl AccountsState { current_dir.join(&spdd_db_path).to_string_lossy().to_string() }; + // Get retention epochs configuration (None = unlimited) + let spdd_retention_epochs = config + .get_int(DEFAULT_SPDD_RETENTION_EPOCHS) + .ok() + .and_then(|v| if v > 0 { Some(v as u64) } else { None }); + info!("SPDD retention epochs: {:?}", spdd_retention_epochs); + if store_spdd_history { info!("SPDD database path: {}", spdd_db_path); + match spdd_retention_epochs { + Some(epochs) => info!( + "SPDD retention: {} epochs (~{} GB max)", + epochs, + (epochs as f64 * 0.12).ceil() + ), + None => info!("SPDD retention: unlimited (no automatic pruning)"), + } } // Query topics @@ -461,9 +477,10 @@ impl AccountsState { // Create spdd history // Return Err if failed to create SPDD store let spdd_store = if store_spdd_history { - Some(Arc::new(Mutex::new(SPDDStore::new(std::path::Path::new( - &spdd_db_path, - ))?))) + Some(Arc::new(Mutex::new(SPDDStore::new( + std::path::Path::new(&spdd_db_path), + spdd_retention_epochs, + )?))) } else { None }; diff --git a/modules/accounts_state/src/spo_distribution_store.rs b/modules/accounts_state/src/spo_distribution_store.rs index 21e4ba26..ed5905d8 100644 --- a/modules/accounts_state/src/spo_distribution_store.rs +++ b/modules/accounts_state/src/spo_distribution_store.rs @@ -2,11 +2,16 @@ use std::collections::HashMap; use acropolis_common::KeyHash; use fjall::{Config, Keyspace, PartitionCreateOptions}; +use tracing::info; const POOL_ID_LEN: usize = 28; const STAKE_KEY_LEN: usize = 28; const EPOCH_LEN: usize = 8; const TOTAL_KEY_LEN: usize = EPOCH_LEN + POOL_ID_LEN + STAKE_KEY_LEN; // 64 bytes + +// Batch size balances commit overhead vs memory usage +// ~720KB per batch (72 bytes × 10,000) +// ~130 commits for typical epoch (~1.3M delegations) const BATCH_SIZE: usize = 10_000; /// Encode: epoch + pool_id + stake_key @@ -39,19 +44,28 @@ pub struct SPDDStore { /// Key format: epoch(8 bytes) + pool_id + stake_key /// Value: amount(8 bytes) spdd: fjall::PartitionHandle, + /// Maximum number of epochs to retain (None = unlimited) + retention_epochs: Option, } impl SPDDStore { - pub fn new(path: impl AsRef) -> fjall::Result { + pub fn new( + path: impl AsRef, + retention_epochs: Option, + ) -> fjall::Result { let path = path.as_ref(); let keyspace = Config::new(path).open()?; let spdd = keyspace.open_partition("spdd", PartitionCreateOptions::default())?; - Ok(Self { keyspace, spdd }) + Ok(Self { + keyspace, + spdd, + retention_epochs, + }) } - /// Store SPDD state for an epoch + /// Store SPDD state for an epoch and prune old epochs if needed pub fn store_spdd( &self, epoch: u64, @@ -80,6 +94,49 @@ impl SPDDStore { batch.commit()?; } + info!("Stored {} SPDD records for epoch {}", count, epoch); + + // Prune old epochs if retention is configured + // Keep the last N epochs, delete everything older + if let Some(retention) = self.retention_epochs { + if epoch >= retention { + // Keep epochs [epoch - retention + 1, epoch] + // Delete everything before (epoch - retention + 1) + let keep_from_epoch = epoch - retention + 1; + self.prune_epochs_before(keep_from_epoch)?; + } + } + + Ok(()) + } + + /// Prune all SPDD data for epochs before the specified epoch + pub fn prune_epochs_before(&self, before_epoch: u64) -> fjall::Result<()> { + let mut batch = self.keyspace.batch(); + let mut deleted_count = 0; + + // Iterate through all epochs less than before_epoch + for epoch in 0..before_epoch { + let prefix = epoch.to_be_bytes(); + + for item in self.spdd.prefix(prefix) { + let (key, _) = item?; + batch.remove(&self.spdd, key); + + deleted_count += 1; + if deleted_count >= BATCH_SIZE { + batch.commit()?; + batch = self.keyspace.batch(); + deleted_count = 0; + } + } + } + + // Commit remaining deletions + if deleted_count > 0 { + batch.commit()?; + } + Ok(()) } @@ -128,8 +185,8 @@ mod tests { #[test] fn test_store_spdd_state() { - let spdd_store = - SPDDStore::new(std::path::Path::new(DB_PATH)).expect("Failed to create SPDD store"); + let spdd_store = SPDDStore::new(std::path::Path::new(DB_PATH), None) + .expect("Failed to create SPDD store"); let mut spdd_state: HashMap> = HashMap::new(); spdd_state.insert( vec![0x01; 28], @@ -152,4 +209,31 @@ mod tests { .expect("Failed to query SPDD state"); assert_eq!(result.len(), 2); } + + #[test] + fn test_prune_old_epochs() { + let spdd_store = SPDDStore::new(std::path::Path::new("spdd_prune_test_db"), Some(2)) + .expect("Failed to create SPDD store"); + + // Store data for epochs 1, 2, 3 + for epoch in 1..=3 { + let mut spdd_state: HashMap> = HashMap::new(); + spdd_state.insert( + vec![epoch as u8; 28], + vec![(vec![0x10; 28], epoch * 100), (vec![0x11; 28], epoch * 150)], + ); + spdd_store.store_spdd(epoch, spdd_state).expect("Failed to store SPDD state"); + } + + // After storing epoch 3 with retention=2, epoch 1 should be pruned + let result = spdd_store.query_by_epoch(1); + assert!(result.is_ok()); + assert_eq!(result.unwrap().len(), 0, "Epoch 1 should be pruned"); + + let result = spdd_store.query_by_epoch(2).expect("Failed to query epoch 2"); + assert_eq!(result.len(), 2, "Epoch 2 should still exist"); + + let result = spdd_store.query_by_epoch(3).expect("Failed to query epoch 3"); + assert_eq!(result.len(), 2, "Epoch 3 should still exist"); + } } diff --git a/processes/omnibus/omnibus.toml b/processes/omnibus/omnibus.toml index dc8b7b23..9f9f6bc4 100644 --- a/processes/omnibus/omnibus.toml +++ b/processes/omnibus/omnibus.toml @@ -88,6 +88,9 @@ store-history = false store-spdd-history = false # SPDD database path (only used when store-spdd-history is enabled) spdd-db-path = "./spdd_db" +# Number of epochs to retain in SPDD history +# Example: 73 +# spdd-retention-epochs = none [module.assets-state] # Enables /assets endpoint From 054e46e7ffef5b68342a4c7d629128ed476829bf Mon Sep 17 00:00:00 2001 From: Golddy Dev Date: Tue, 14 Oct 2025 10:51:11 +0200 Subject: [PATCH 10/13] refactor: enhance SPDD store query logic and update epoch handling - Updated SPDDStore to track the latest stored epoch and added logic to check if an epoch is stored based on retention settings. - Modified AccountsState to utilize the new epoch checking logic when querying SPDD data. - Adjusted omnibus.toml to enable SPDD history storage and set retention epochs for better management of stored data. --- modules/accounts_state/src/accounts_state.rs | 45 +++++++++++++------ .../src/spo_distribution_store.rs | 34 +++++++++++--- 2 files changed, 60 insertions(+), 19 deletions(-) diff --git a/modules/accounts_state/src/accounts_state.rs b/modules/accounts_state/src/accounts_state.rs index fde7ac2e..548c21cc 100644 --- a/modules/accounts_state/src/accounts_state.rs +++ b/modules/accounts_state/src/accounts_state.rs @@ -193,7 +193,7 @@ impl AccountsState { // if we store spdd history let spdd_state = state.dump_spdd_state(); - if let Some(spdd_store) = spdd_store_guard { + if let Some(mut spdd_store) = spdd_store_guard { // active stakes taken at beginning of epoch i is for epoch + 1 if let Err(e) = spdd_store.store_spdd(block_info.epoch + 1, spdd_state) @@ -608,13 +608,21 @@ impl AccountsState { } AccountsStateQuery::GetSPDDByEpoch { epoch } => match spdd_store_guard { - Some(spdd_store) => { - let result = spdd_store.query_by_epoch(*epoch); - match result { - Ok(result) => AccountsStateQueryResponse::SPDDByEpoch(result), - Err(e) => AccountsStateQueryResponse::Error(e.to_string()), + Some(spdd_store) => match spdd_store.is_epoch_stored(*epoch) { + Some(true) => { + let result = spdd_store.query_by_epoch(*epoch); + match result { + Ok(result) => AccountsStateQueryResponse::SPDDByEpoch(result), + Err(e) => AccountsStateQueryResponse::Error(e.to_string()), + } } - } + Some(false) => { + AccountsStateQueryResponse::Error("Epoch is too old".to_string()) + } + None => AccountsStateQueryResponse::Error( + "SPDD store is not started".to_string(), + ), + }, None => AccountsStateQueryResponse::Error( "SPDD store is not enabled".to_string(), ), @@ -622,15 +630,24 @@ impl AccountsState { AccountsStateQuery::GetSPDDByEpochAndPool { epoch, pool_id } => { match spdd_store_guard { - Some(spdd_store) => { - let result = spdd_store.query_by_epoch_and_pool(*epoch, pool_id); - match result { - Ok(result) => { - AccountsStateQueryResponse::SPDDByEpochAndPool(result) + Some(spdd_store) => match spdd_store.is_epoch_stored(*epoch) { + Some(true) => { + let result = + spdd_store.query_by_epoch_and_pool(*epoch, pool_id); + match result { + Ok(result) => { + AccountsStateQueryResponse::SPDDByEpochAndPool(result) + } + Err(e) => AccountsStateQueryResponse::Error(e.to_string()), } - Err(e) => AccountsStateQueryResponse::Error(e.to_string()), } - } + Some(false) => AccountsStateQueryResponse::Error( + "Epoch is too old".to_string(), + ), + None => AccountsStateQueryResponse::Error( + "SPDD store is not started".to_string(), + ), + }, None => AccountsStateQueryResponse::Error( "SPDD store is not enabled".to_string(), ), diff --git a/modules/accounts_state/src/spo_distribution_store.rs b/modules/accounts_state/src/spo_distribution_store.rs index ed5905d8..12288f77 100644 --- a/modules/accounts_state/src/spo_distribution_store.rs +++ b/modules/accounts_state/src/spo_distribution_store.rs @@ -2,7 +2,6 @@ use std::collections::HashMap; use acropolis_common::KeyHash; use fjall::{Config, Keyspace, PartitionCreateOptions}; -use tracing::info; const POOL_ID_LEN: usize = 28; const STAKE_KEY_LEN: usize = 28; @@ -46,6 +45,8 @@ pub struct SPDDStore { spdd: fjall::PartitionHandle, /// Maximum number of epochs to retain (None = unlimited) retention_epochs: Option, + /// Latest epoch stored + latest_epoch: Option, } impl SPDDStore { @@ -62,12 +63,13 @@ impl SPDDStore { keyspace, spdd, retention_epochs, + latest_epoch: None, }) } /// Store SPDD state for an epoch and prune old epochs if needed pub fn store_spdd( - &self, + &mut self, epoch: u64, spdd_state: HashMap>, ) -> fjall::Result<()> { @@ -94,7 +96,7 @@ impl SPDDStore { batch.commit()?; } - info!("Stored {} SPDD records for epoch {}", count, epoch); + self.latest_epoch = Some(epoch); // Prune old epochs if retention is configured // Keep the last N epochs, delete everything older @@ -140,6 +142,24 @@ impl SPDDStore { Ok(()) } + pub fn is_epoch_stored(&self, epoch: u64) -> Option { + let Some(latest_epoch) = self.latest_epoch else { + return None; + }; + let min_epoch = match self.retention_epochs { + Some(retention) => { + if latest_epoch > retention { + latest_epoch - retention + 1 + } else { + 0 + } + } + None => 0, + }; + + Some(epoch >= min_epoch && epoch <= latest_epoch) + } + /// Query all data for an epoch /// Returns: Vec<(PoolId, StakeKey, ActiveStakeAmount)> pub fn query_by_epoch(&self, epoch: u64) -> fjall::Result> { @@ -185,7 +205,7 @@ mod tests { #[test] fn test_store_spdd_state() { - let spdd_store = SPDDStore::new(std::path::Path::new(DB_PATH), None) + let mut spdd_store = SPDDStore::new(std::path::Path::new(DB_PATH), None) .expect("Failed to create SPDD store"); let mut spdd_state: HashMap> = HashMap::new(); spdd_state.insert( @@ -212,7 +232,7 @@ mod tests { #[test] fn test_prune_old_epochs() { - let spdd_store = SPDDStore::new(std::path::Path::new("spdd_prune_test_db"), Some(2)) + let mut spdd_store = SPDDStore::new(std::path::Path::new("spdd_prune_test_db"), Some(2)) .expect("Failed to create SPDD store"); // Store data for epochs 1, 2, 3 @@ -226,6 +246,10 @@ mod tests { } // After storing epoch 3 with retention=2, epoch 1 should be pruned + assert!(!spdd_store.is_epoch_stored(1).unwrap()); + assert!(spdd_store.is_epoch_stored(2).unwrap()); + assert!(spdd_store.is_epoch_stored(3).unwrap()); + let result = spdd_store.query_by_epoch(1); assert!(result.is_ok()); assert_eq!(result.unwrap().len(), 0, "Epoch 1 should be pruned"); From d86b55483fcd1c28c2b398e25bb48bbae0f5522b Mon Sep 17 00:00:00 2001 From: Golddy Dev Date: Tue, 14 Oct 2025 11:04:33 +0200 Subject: [PATCH 11/13] chore: update comnibus.toml --- processes/omnibus/omnibus.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/processes/omnibus/omnibus.toml b/processes/omnibus/omnibus.toml index 9f9f6bc4..d470697c 100644 --- a/processes/omnibus/omnibus.toml +++ b/processes/omnibus/omnibus.toml @@ -90,7 +90,7 @@ store-spdd-history = false spdd-db-path = "./spdd_db" # Number of epochs to retain in SPDD history # Example: 73 -# spdd-retention-epochs = none +spdd-retention-epochs = none [module.assets-state] # Enables /assets endpoint From 4df779bd756349a71844fe0431a616079b7d8716 Mon Sep 17 00:00:00 2001 From: Golddy Dev Date: Tue, 14 Oct 2025 11:26:58 +0200 Subject: [PATCH 12/13] fix: while prune, iterate in reverse order --- modules/accounts_state/src/spo_distribution_store.rs | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/modules/accounts_state/src/spo_distribution_store.rs b/modules/accounts_state/src/spo_distribution_store.rs index 12288f77..36145ee9 100644 --- a/modules/accounts_state/src/spo_distribution_store.rs +++ b/modules/accounts_state/src/spo_distribution_store.rs @@ -117,12 +117,14 @@ impl SPDDStore { let mut batch = self.keyspace.batch(); let mut deleted_count = 0; - // Iterate through all epochs less than before_epoch - for epoch in 0..before_epoch { + // Iterate reverse through all epochs less than before_epoch + for epoch in (0..before_epoch).rev() { let prefix = epoch.to_be_bytes(); + let mut has_data = false; for item in self.spdd.prefix(prefix) { let (key, _) = item?; + has_data = true; batch.remove(&self.spdd, key); deleted_count += 1; @@ -132,6 +134,10 @@ impl SPDDStore { deleted_count = 0; } } + + if !has_data { + break; + } } // Commit remaining deletions From 397c0db0f7bc755c4cebadc648f7a6ee51729f2f Mon Sep 17 00:00:00 2001 From: Golddy Dev Date: Wed, 15 Oct 2025 10:26:54 +0200 Subject: [PATCH 13/13] refactor: SPDDStore for improved epoch handling - Refactored to streamline epoch data management, including methods for checking epoch completion and removing epoch data. - Updated query methods to utilize the new type and improved error handling for incomplete epochs. - Enhanced test cases to reflect changes in data structures and ensure correctness of epoch operations. - Introduced a new type in for better clarity. --- common/src/types.rs | 2 + modules/accounts_state/src/accounts_state.rs | 46 +--- .../src/spo_distribution_store.rs | 216 ++++++++++-------- 3 files changed, 129 insertions(+), 135 deletions(-) diff --git a/common/src/types.rs b/common/src/types.rs index 898ca34a..1a74f072 100644 --- a/common/src/types.rs +++ b/common/src/types.rs @@ -318,6 +318,8 @@ impl Default for UTXODelta { /// Key hash used for pool IDs etc. pub type KeyHash = Vec; +pub type PoolId = Vec; + /// Script identifier pub type ScriptHash = KeyHash; diff --git a/modules/accounts_state/src/accounts_state.rs b/modules/accounts_state/src/accounts_state.rs index fea92bf5..32fccd43 100644 --- a/modules/accounts_state/src/accounts_state.rs +++ b/modules/accounts_state/src/accounts_state.rs @@ -475,7 +475,7 @@ impl AccountsState { verifier.read_rewards(&verify_rewards_files); } - // Create history + // History let history = Arc::new(Mutex::new(StateHistory::::new( "AccountsState", StateHistoryStore::default_block_store(), @@ -483,10 +483,9 @@ impl AccountsState { let history_query = history.clone(); let history_tick = history.clone(); - // Create spdd history - // Return Err if failed to create SPDD store + // Spdd store let spdd_store = if store_spdd_history { - Some(Arc::new(Mutex::new(SPDDStore::new( + Some(Arc::new(Mutex::new(SPDDStore::load( std::path::Path::new(&spdd_db_path), spdd_retention_epochs, )?))) @@ -617,20 +616,9 @@ impl AccountsState { } AccountsStateQuery::GetSPDDByEpoch { epoch } => match spdd_store_guard { - Some(spdd_store) => match spdd_store.is_epoch_stored(*epoch) { - Some(true) => { - let result = spdd_store.query_by_epoch(*epoch); - match result { - Ok(result) => AccountsStateQueryResponse::SPDDByEpoch(result), - Err(e) => AccountsStateQueryResponse::Error(e.to_string()), - } - } - Some(false) => { - AccountsStateQueryResponse::Error("Epoch is too old".to_string()) - } - None => AccountsStateQueryResponse::Error( - "SPDD store is not started".to_string(), - ), + Some(spdd_store) => match spdd_store.query_by_epoch(*epoch) { + Ok(result) => AccountsStateQueryResponse::SPDDByEpoch(result), + Err(e) => AccountsStateQueryResponse::Error(e.to_string()), }, None => AccountsStateQueryResponse::Error( "SPDD store is not enabled".to_string(), @@ -639,24 +627,14 @@ impl AccountsState { AccountsStateQuery::GetSPDDByEpochAndPool { epoch, pool_id } => { match spdd_store_guard { - Some(spdd_store) => match spdd_store.is_epoch_stored(*epoch) { - Some(true) => { - let result = - spdd_store.query_by_epoch_and_pool(*epoch, pool_id); - match result { - Ok(result) => { - AccountsStateQueryResponse::SPDDByEpochAndPool(result) - } - Err(e) => AccountsStateQueryResponse::Error(e.to_string()), + Some(spdd_store) => { + match spdd_store.query_by_epoch_and_pool(*epoch, pool_id) { + Ok(result) => { + AccountsStateQueryResponse::SPDDByEpochAndPool(result) } + Err(e) => AccountsStateQueryResponse::Error(e.to_string()), } - Some(false) => AccountsStateQueryResponse::Error( - "Epoch is too old".to_string(), - ), - None => AccountsStateQueryResponse::Error( - "SPDD store is not started".to_string(), - ), - }, + } None => AccountsStateQueryResponse::Error( "SPDD store is not enabled".to_string(), ), diff --git a/modules/accounts_state/src/spo_distribution_store.rs b/modules/accounts_state/src/spo_distribution_store.rs index 36145ee9..0dcde887 100644 --- a/modules/accounts_state/src/spo_distribution_store.rs +++ b/modules/accounts_state/src/spo_distribution_store.rs @@ -1,20 +1,20 @@ use std::collections::HashMap; -use acropolis_common::KeyHash; +use acropolis_common::{AddrKeyhash, PoolId}; +use anyhow::Result; use fjall::{Config, Keyspace, PartitionCreateOptions}; const POOL_ID_LEN: usize = 28; const STAKE_KEY_LEN: usize = 28; const EPOCH_LEN: usize = 8; -const TOTAL_KEY_LEN: usize = EPOCH_LEN + POOL_ID_LEN + STAKE_KEY_LEN; // 64 bytes +const TOTAL_KEY_LEN: usize = EPOCH_LEN + POOL_ID_LEN + STAKE_KEY_LEN; // Batch size balances commit overhead vs memory usage // ~720KB per batch (72 bytes × 10,000) // ~130 commits for typical epoch (~1.3M delegations) const BATCH_SIZE: usize = 10_000; -/// Encode: epoch + pool_id + stake_key -fn encode_key(epoch: u64, pool_id: &KeyHash, stake_key: &KeyHash) -> Vec { +fn encode_key(epoch: u64, pool_id: &PoolId, stake_key: &AddrKeyhash) -> Vec { let mut key = Vec::with_capacity(TOTAL_KEY_LEN); key.extend_from_slice(&epoch.to_be_bytes()); key.extend_from_slice(pool_id); @@ -22,60 +22,99 @@ fn encode_key(epoch: u64, pool_id: &KeyHash, stake_key: &KeyHash) -> Vec { key } -/// Encode: epoch + pool_id (for prefix queries) -fn encode_epoch_pool_prefix(epoch: u64, pool_id: &KeyHash) -> Vec { +fn encode_epoch_pool_prefix(epoch: u64, pool_id: &PoolId) -> Vec { let mut prefix = Vec::with_capacity(EPOCH_LEN + POOL_ID_LEN); prefix.extend_from_slice(&epoch.to_be_bytes()); prefix.extend_from_slice(pool_id); prefix } -/// Decode key to extract pool_id and stake_key -fn decode_key(key: &[u8]) -> (KeyHash, KeyHash) { +fn decode_key(key: &[u8]) -> Result<(u64, PoolId, AddrKeyhash)> { + let epoch = u64::from_be_bytes(key[..EPOCH_LEN].try_into()?); let pool_id = key[EPOCH_LEN..EPOCH_LEN + POOL_ID_LEN].to_vec(); let stake_key = key[EPOCH_LEN + POOL_ID_LEN..].to_vec(); - (pool_id, stake_key) + Ok((epoch, pool_id, stake_key)) +} + +/// Encode epoch completion marker key +fn encode_epoch_marker(epoch: u64) -> Vec { + epoch.to_be_bytes().to_vec() } pub struct SPDDStore { keyspace: Keyspace, - /// Single partition for all SPDD data + /// Partition for all SPDD data /// Key format: epoch(8 bytes) + pool_id + stake_key /// Value: amount(8 bytes) spdd: fjall::PartitionHandle, + /// Partition for epoch completion markers + /// Key format: epoch(8 bytes) + /// Value: "complete" + epoch_markers: fjall::PartitionHandle, /// Maximum number of epochs to retain (None = unlimited) retention_epochs: Option, - /// Latest epoch stored - latest_epoch: Option, } impl SPDDStore { + #[allow(dead_code)] pub fn new( path: impl AsRef, retention_epochs: Option, ) -> fjall::Result { let path = path.as_ref(); + if path.exists() { + std::fs::remove_dir_all(path)?; + } + + let keyspace = Config::new(path).open()?; + let spdd = keyspace.open_partition("spdd", PartitionCreateOptions::default())?; + let epoch_markers = + keyspace.open_partition("epoch_markers", PartitionCreateOptions::default())?; + + Ok(Self { + keyspace, + spdd, + epoch_markers, + retention_epochs, + }) + } + + pub fn load( + path: impl AsRef, + retention_epochs: Option, + ) -> fjall::Result { + let path = path.as_ref(); let keyspace = Config::new(path).open()?; let spdd = keyspace.open_partition("spdd", PartitionCreateOptions::default())?; + let epoch_markers = + keyspace.open_partition("epoch_markers", PartitionCreateOptions::default())?; Ok(Self { keyspace, spdd, + epoch_markers, retention_epochs, - latest_epoch: None, }) } - /// Store SPDD state for an epoch and prune old epochs if needed + pub fn is_epoch_complete(&self, epoch: u64) -> fjall::Result { + let marker_key = encode_epoch_marker(epoch); + Ok(matches!(self.epoch_markers.get(&marker_key)?, Some(value) if value.eq(b"complete"))) + } + pub fn store_spdd( &mut self, epoch: u64, - spdd_state: HashMap>, + spdd_state: HashMap>, ) -> fjall::Result<()> { + if self.is_epoch_complete(epoch)? { + return Ok(()); + } + self.remove_epoch_data(epoch)?; + let mut batch = self.keyspace.batch(); let mut count = 0; - for (pool_id, delegations) in spdd_state { for (stake_key, amount) in delegations { let key = encode_key(epoch, &pool_id, &stake_key); @@ -90,20 +129,16 @@ impl SPDDStore { } } } - - // Commit remaining entries if count > 0 { batch.commit()?; } - self.latest_epoch = Some(epoch); + // Mark epoch as complete (single key operation) + let marker_key = encode_epoch_marker(epoch); + self.epoch_markers.insert(marker_key, b"complete")?; - // Prune old epochs if retention is configured - // Keep the last N epochs, delete everything older if let Some(retention) = self.retention_epochs { if epoch >= retention { - // Keep epochs [epoch - retention + 1, epoch] - // Delete everything before (epoch - retention + 1) let keep_from_epoch = epoch - retention + 1; self.prune_epochs_before(keep_from_epoch)?; } @@ -112,93 +147,82 @@ impl SPDDStore { Ok(()) } - /// Prune all SPDD data for epochs before the specified epoch - pub fn prune_epochs_before(&self, before_epoch: u64) -> fjall::Result<()> { + pub fn remove_epoch_data(&self, epoch: u64) -> fjall::Result { + // Remove epoch marker first - if process fails midway, epoch will be marked incomplete + let marker_key = encode_epoch_marker(epoch); + self.epoch_markers.remove(marker_key)?; + + let prefix = epoch.to_be_bytes(); let mut batch = self.keyspace.batch(); let mut deleted_count = 0; + let mut total_deleted_count: u64 = 0; - // Iterate reverse through all epochs less than before_epoch - for epoch in (0..before_epoch).rev() { - let prefix = epoch.to_be_bytes(); - let mut has_data = false; - - for item in self.spdd.prefix(prefix) { - let (key, _) = item?; - has_data = true; - batch.remove(&self.spdd, key); - - deleted_count += 1; - if deleted_count >= BATCH_SIZE { - batch.commit()?; - batch = self.keyspace.batch(); - deleted_count = 0; - } - } - - if !has_data { - break; + for item in self.spdd.prefix(prefix) { + let (key, _) = item?; + batch.remove(&self.spdd, key); + total_deleted_count += 1; + + deleted_count += 1; + if deleted_count >= BATCH_SIZE { + batch.commit()?; + batch = self.keyspace.batch(); + deleted_count = 0; } } - // Commit remaining deletions if deleted_count > 0 { batch.commit()?; } - Ok(()) + Ok(total_deleted_count) } - pub fn is_epoch_stored(&self, epoch: u64) -> Option { - let Some(latest_epoch) = self.latest_epoch else { - return None; - }; - let min_epoch = match self.retention_epochs { - Some(retention) => { - if latest_epoch > retention { - latest_epoch - retention + 1 - } else { - 0 - } - } - None => 0, - }; + pub fn prune_epochs_before(&self, before_epoch: u64) -> fjall::Result { + let mut deleted_epochs: u64 = 0; - Some(epoch >= min_epoch && epoch <= latest_epoch) + for epoch in (0..before_epoch).rev() { + let deleted_count = self.remove_epoch_data(epoch)?; + if deleted_count == 0 { + break; + } + deleted_epochs += 1; + } + Ok(deleted_epochs) } - /// Query all data for an epoch - /// Returns: Vec<(PoolId, StakeKey, ActiveStakeAmount)> - pub fn query_by_epoch(&self, epoch: u64) -> fjall::Result> { + pub fn query_by_epoch(&self, epoch: u64) -> Result> { + if !self.is_epoch_complete(epoch)? { + return Err(anyhow::anyhow!("Epoch SPDD Data is not complete")); + } + let prefix = epoch.to_be_bytes(); let mut result = Vec::new(); - for item in self.spdd.prefix(prefix) { let (key, value) = item?; - let (pool_id, stake_key) = decode_key(&key); - let amount = u64::from_be_bytes(value.as_ref().try_into().unwrap()); + let (_, pool_id, stake_key) = decode_key(&key)?; + let amount = u64::from_be_bytes(value.as_ref().try_into()?); result.push((pool_id, stake_key, amount)); } - Ok(result) } - /// Query by epoch and pool_id - /// Returns: Vec<(StakeKey, ActiveStakeAmount)> pub fn query_by_epoch_and_pool( &self, epoch: u64, - pool_id: &KeyHash, - ) -> fjall::Result> { + pool_id: &PoolId, + ) -> Result> { + if !self.is_epoch_complete(epoch)? { + return Err(anyhow::anyhow!("Epoch SPDD Data is not complete")); + } + let prefix = encode_epoch_pool_prefix(epoch, pool_id); let mut result = Vec::new(); - for item in self.spdd.prefix(prefix) { let (key, value) = item?; - let stake_key = key[EPOCH_LEN + POOL_ID_LEN..].to_vec(); - let amount = u64::from_be_bytes(value.as_ref().try_into().unwrap()); + let (_, _, stake_key) = decode_key(&key)?; + let amount = u64::from_be_bytes(value.as_ref().try_into()?); result.push((stake_key, amount)); } - Ok(result) } } @@ -213,7 +237,7 @@ mod tests { fn test_store_spdd_state() { let mut spdd_store = SPDDStore::new(std::path::Path::new(DB_PATH), None) .expect("Failed to create SPDD store"); - let mut spdd_state: HashMap> = HashMap::new(); + let mut spdd_state: HashMap> = HashMap::new(); spdd_state.insert( vec![0x01; 28], vec![(vec![0x10; 28], 100), (vec![0x11; 28], 150)], @@ -222,17 +246,13 @@ mod tests { vec![0x02; 28], vec![(vec![0x20; 28], 200), (vec![0x21; 28], 250)], ); - spdd_store.store_spdd(1, spdd_state).expect("Failed to store SPDD state"); + assert!(spdd_store.store_spdd(1, spdd_state).is_ok()); - let result = spdd_store.query_by_epoch(1).expect("Failed to query SPDD state"); + let result = spdd_store.query_by_epoch(1).unwrap(); assert_eq!(result.len(), 4); - let result = spdd_store - .query_by_epoch_and_pool(1, &vec![0x01; 28]) - .expect("Failed to query SPDD state"); + let result = spdd_store.query_by_epoch_and_pool(1, &vec![0x01; 28]).unwrap(); assert_eq!(result.len(), 2); - let result = spdd_store - .query_by_epoch_and_pool(1, &vec![0x02; 28]) - .expect("Failed to query SPDD state"); + let result = spdd_store.query_by_epoch_and_pool(1, &vec![0x02; 28]).unwrap(); assert_eq!(result.len(), 2); } @@ -241,9 +261,8 @@ mod tests { let mut spdd_store = SPDDStore::new(std::path::Path::new("spdd_prune_test_db"), Some(2)) .expect("Failed to create SPDD store"); - // Store data for epochs 1, 2, 3 for epoch in 1..=3 { - let mut spdd_state: HashMap> = HashMap::new(); + let mut spdd_state: HashMap> = HashMap::new(); spdd_state.insert( vec![epoch as u8; 28], vec![(vec![0x10; 28], epoch * 100), (vec![0x11; 28], epoch * 150)], @@ -251,19 +270,14 @@ mod tests { spdd_store.store_spdd(epoch, spdd_state).expect("Failed to store SPDD state"); } - // After storing epoch 3 with retention=2, epoch 1 should be pruned - assert!(!spdd_store.is_epoch_stored(1).unwrap()); - assert!(spdd_store.is_epoch_stored(2).unwrap()); - assert!(spdd_store.is_epoch_stored(3).unwrap()); - - let result = spdd_store.query_by_epoch(1); - assert!(result.is_ok()); - assert_eq!(result.unwrap().len(), 0, "Epoch 1 should be pruned"); + assert!(!spdd_store.is_epoch_complete(1).unwrap()); + assert!(spdd_store.is_epoch_complete(2).unwrap()); + assert!(spdd_store.is_epoch_complete(3).unwrap()); - let result = spdd_store.query_by_epoch(2).expect("Failed to query epoch 2"); - assert_eq!(result.len(), 2, "Epoch 2 should still exist"); - - let result = spdd_store.query_by_epoch(3).expect("Failed to query epoch 3"); - assert_eq!(result.len(), 2, "Epoch 3 should still exist"); + assert!(spdd_store.query_by_epoch(1).is_err()); + let result = spdd_store.query_by_epoch(2).unwrap(); + assert_eq!(result.len(), 2); + let result = spdd_store.query_by_epoch(3).unwrap(); + assert_eq!(result.len(), 2); } }