diff --git a/modules/historical_accounts_state/.gitignore b/modules/historical_accounts_state/.gitignore new file mode 100644 index 00000000..0078d059 --- /dev/null +++ b/modules/historical_accounts_state/.gitignore @@ -0,0 +1 @@ +/db \ No newline at end of file diff --git a/modules/historical_accounts_state/src/historical_accounts_state.rs b/modules/historical_accounts_state/src/historical_accounts_state.rs index 2cbb85ee..1a8557de 100644 --- a/modules/historical_accounts_state/src/historical_accounts_state.rs +++ b/modules/historical_accounts_state/src/historical_accounts_state.rs @@ -13,7 +13,7 @@ use caryatid_sdk::{message_bus::Subscription, module, Context, Module}; use config::Config; use std::sync::Arc; use tokio::sync::{mpsc, Mutex}; -use tracing::{error, info, info_span, Instrument}; +use tracing::{error, info, info_span}; mod state; use state::State; @@ -61,6 +61,8 @@ impl HistoricalAccountsState { ) -> Result<()> { let _ = params_subscription.read().await?; info!("Consumed initial genesis params from params_subscription"); + let _ = address_deltas_subscription.read().await?; + info!("Consumed initial address deltas from address_deltas_subscription"); // Background task to persist epochs sequentially const MAX_PENDING_PERSISTS: usize = 1; @@ -78,9 +80,6 @@ impl HistoricalAccountsState { }); // Main loop of synchronised messages loop { - // Get a mutable state - let mut state = state_mutex.lock().await; - // Create all per-block message futures upfront before processing messages sequentially let certs_message_f = certs_subscription.read(); let address_deltas_message_f = address_deltas_subscription.read(); @@ -93,6 +92,7 @@ impl HistoricalAccountsState { let new_epoch = match certs_message.as_ref() { Message::Cardano((block_info, _)) => { // Handle rollbacks on this topic only + let mut state = state_mutex.lock().await; if block_info.status == BlockStatus::RolledBack { state.volatile.rollback_before(block_info.number); state.volatile.next_block(); @@ -106,9 +106,9 @@ impl HistoricalAccountsState { // Read from epoch-boundary messages only when it's a new epoch if new_epoch { - let (_, message) = params_subscription.read().await?; + let (_, params_msg) = params_subscription.read().await?; if let Message::Cardano((ref block_info, CardanoMessage::ProtocolParams(params))) = - message.as_ref() + params_msg.as_ref() { Self::check_sync(¤t_block, &block_info); let mut state = state_mutex.lock().await; @@ -118,29 +118,18 @@ impl HistoricalAccountsState { } } - // Handle rewards - let (_, message) = rewards_subscription.read().await?; - match message.as_ref() { - Message::Cardano(( - block_info, - CardanoMessage::StakeRewardDeltas(rewards_msg), - )) => { - let span = info_span!( - "historical_account_state.handle_reward_deltas", - block = block_info.number - ); - async { - Self::check_sync(¤t_block, &block_info); - state - .handle_rewards(rewards_msg) - .inspect_err(|e| error!("Reward deltas handling error: {e:#}")) - .ok(); - } - .instrument(span) - .await; - } - - _ => error!("Unexpected message type: {message:?}"), + let (_, rewards_msg) = rewards_subscription.read().await?; + if let Message::Cardano(( + block_info, + CardanoMessage::StakeRewardDeltas(rewards_msg), + )) = rewards_msg.as_ref() + { + Self::check_sync(¤t_block, &block_info); + let mut state = state_mutex.lock().await; + state + .handle_rewards(rewards_msg) + .inspect_err(|e| error!("Reward deltas handling error: {e:#}")) + .ok(); } } @@ -151,15 +140,14 @@ impl HistoricalAccountsState { "historical_account_state.handle_certs", block = block_info.number ); - async { - Self::check_sync(¤t_block, &block_info); - state - .handle_tx_certificates(tx_certs_msg) - .inspect_err(|e| error!("TxCertificates handling error: {e:#}")) - .ok(); - } - .instrument(span) - .await; + let _entered = span.enter(); + + Self::check_sync(¤t_block, &block_info); + let mut state = state_mutex.lock().await; + state + .handle_tx_certificates(tx_certs_msg) + .inspect_err(|e| error!("TxCertificates handling error: {e:#}")) + .ok(); } _ => error!("Unexpected message type: {certs_message:?}"), @@ -173,15 +161,14 @@ impl HistoricalAccountsState { "historical_account_state.handle_withdrawals", block = block_info.number ); - async { - Self::check_sync(¤t_block, &block_info); - state - .handle_withdrawals(withdrawals_msg) - .inspect_err(|e| error!("Withdrawals handling error: {e:#}")) - .ok(); - } - .instrument(span) - .await; + let _entered = span.enter(); + + Self::check_sync(¤t_block, &block_info); + let mut state = state_mutex.lock().await; + state + .handle_withdrawals(withdrawals_msg) + .inspect_err(|e| error!("Withdrawals handling error: {e:#}")) + .ok(); } _ => error!("Unexpected message type: {message:?}"), @@ -195,38 +182,46 @@ impl HistoricalAccountsState { "historical_account_state.handle_address_deltas", block = block_info.number ); - async { - Self::check_sync(¤t_block, &block_info); + let _entered = span.enter(); + + Self::check_sync(¤t_block, &block_info); + { + let mut state = state_mutex.lock().await; state .handle_address_deltas(deltas_msg) .inspect_err(|e| error!("AddressDeltas handling error: {e:#}")) .ok(); } - .instrument(span) - .await; + } - let should_prune = state.ready_to_prune(block_info); - if should_prune { + _ => error!("Unexpected message type: {message:?}"), + } + + // Prune volatile and persist if needed + if let Some(current_block) = current_block { + let should_prune = { + let state = state_mutex.lock().await; + state.ready_to_prune(¤t_block) + }; + + if should_prune { + let (store, cfg) = { + let mut state: tokio::sync::MutexGuard<'_, State> = + state_mutex.lock().await; state.prune_volatile().await; - if let Err(e) = persist_tx - .send(( - block_info.epoch as u32, - state.immutable.clone(), - state.config.clone(), - )) - .await - { - panic!("persistence worker crashed: {e}"); - } - } + (state.immutable.clone(), state.config.clone()) + }; + if let Err(e) = persist_tx.send((current_block.epoch as u32, store, cfg)).await { - let mut state = state_mutex.lock().await; - state.volatile.next_block(); + panic!("persistence worker crashed: {e}"); } } + } - _ => error!("Unexpected message type: {message:?}"), + { + let mut state = state_mutex.lock().await; + state.volatile.next_block(); } } }