From 74d1d1bef379eb6476fce602b72cd4562b91d11e Mon Sep 17 00:00:00 2001 From: William Hankins Date: Thu, 23 Oct 2025 16:30:36 +0000 Subject: [PATCH 1/4] fix: read extra address delta message on startup and add gitignore for /db Signed-off-by: William Hankins --- modules/historical_accounts_state/.gitignore | 1 + .../historical_accounts_state/src/historical_accounts_state.rs | 2 ++ 2 files changed, 3 insertions(+) create mode 100644 modules/historical_accounts_state/.gitignore diff --git a/modules/historical_accounts_state/.gitignore b/modules/historical_accounts_state/.gitignore new file mode 100644 index 00000000..0078d059 --- /dev/null +++ b/modules/historical_accounts_state/.gitignore @@ -0,0 +1 @@ +/db \ No newline at end of file diff --git a/modules/historical_accounts_state/src/historical_accounts_state.rs b/modules/historical_accounts_state/src/historical_accounts_state.rs index 2cbb85ee..1338d3de 100644 --- a/modules/historical_accounts_state/src/historical_accounts_state.rs +++ b/modules/historical_accounts_state/src/historical_accounts_state.rs @@ -61,6 +61,8 @@ impl HistoricalAccountsState { ) -> Result<()> { let _ = params_subscription.read().await?; info!("Consumed initial genesis params from params_subscription"); + let _ = address_deltas_subscription.read().await?; + info!("Consumed initial address deltas from address_deltas_subscription"); // Background task to persist epochs sequentially const MAX_PENDING_PERSISTS: usize = 1; From 22a87440486c4d827432273211f628291cd632a0 Mon Sep 17 00:00:00 2001 From: William Hankins Date: Thu, 23 Oct 2025 16:57:53 +0000 Subject: [PATCH 2/4] fix: double locking state causing process to hang Signed-off-by: William Hankins --- .../src/historical_accounts_state.rs | 31 ++++++++++++------- 1 file changed, 19 insertions(+), 12 deletions(-) diff --git a/modules/historical_accounts_state/src/historical_accounts_state.rs b/modules/historical_accounts_state/src/historical_accounts_state.rs index 1338d3de..d8cca8b0 100644 --- a/modules/historical_accounts_state/src/historical_accounts_state.rs +++ b/modules/historical_accounts_state/src/historical_accounts_state.rs @@ -80,9 +80,6 @@ impl HistoricalAccountsState { }); // Main loop of synchronised messages loop { - // Get a mutable state - let mut state = state_mutex.lock().await; - // Create all per-block message futures upfront before processing messages sequentially let certs_message_f = certs_subscription.read(); let address_deltas_message_f = address_deltas_subscription.read(); @@ -95,6 +92,7 @@ impl HistoricalAccountsState { let new_epoch = match certs_message.as_ref() { Message::Cardano((block_info, _)) => { // Handle rollbacks on this topic only + let mut state = state_mutex.lock().await; if block_info.status == BlockStatus::RolledBack { state.volatile.rollback_before(block_info.number); state.volatile.next_block(); @@ -133,6 +131,7 @@ impl HistoricalAccountsState { ); async { Self::check_sync(¤t_block, &block_info); + let mut state = state_mutex.lock().await; state .handle_rewards(rewards_msg) .inspect_err(|e| error!("Reward deltas handling error: {e:#}")) @@ -155,6 +154,7 @@ impl HistoricalAccountsState { ); async { Self::check_sync(¤t_block, &block_info); + let mut state = state_mutex.lock().await; state .handle_tx_certificates(tx_certs_msg) .inspect_err(|e| error!("TxCertificates handling error: {e:#}")) @@ -177,6 +177,7 @@ impl HistoricalAccountsState { ); async { Self::check_sync(¤t_block, &block_info); + let mut state = state_mutex.lock().await; state .handle_withdrawals(withdrawals_msg) .inspect_err(|e| error!("Withdrawals handling error: {e:#}")) @@ -199,6 +200,7 @@ impl HistoricalAccountsState { ); async { Self::check_sync(¤t_block, &block_info); + let mut state = state_mutex.lock().await; state .handle_address_deltas(deltas_msg) .inspect_err(|e| error!("AddressDeltas handling error: {e:#}")) @@ -207,17 +209,22 @@ impl HistoricalAccountsState { .instrument(span) .await; - let should_prune = state.ready_to_prune(block_info); + let (should_prune, imm, cfg, epoch) = { + let state = state_mutex.lock().await; + ( + state.ready_to_prune(&block_info), + state.immutable.clone(), + state.config.clone(), + block_info.epoch, + ) + }; + if should_prune { - state.prune_volatile().await; - if let Err(e) = persist_tx - .send(( - block_info.epoch as u32, - state.immutable.clone(), - state.config.clone(), - )) - .await { + let mut state = state_mutex.lock().await; + state.prune_volatile().await; + } + if let Err(e) = persist_tx.send((epoch as u32, imm, cfg)).await { panic!("persistence worker crashed: {e}"); } } From 74701028dbff8d8d2f145816f25b3803eb817911 Mon Sep 17 00:00:00 2001 From: William Hankins Date: Thu, 23 Oct 2025 18:10:30 +0000 Subject: [PATCH 3/4] fix: remove async locks on state Signed-off-by: William Hankins --- .../src/historical_accounts_state.rs | 137 ++++++++---------- 1 file changed, 63 insertions(+), 74 deletions(-) diff --git a/modules/historical_accounts_state/src/historical_accounts_state.rs b/modules/historical_accounts_state/src/historical_accounts_state.rs index d8cca8b0..56c810db 100644 --- a/modules/historical_accounts_state/src/historical_accounts_state.rs +++ b/modules/historical_accounts_state/src/historical_accounts_state.rs @@ -13,7 +13,7 @@ use caryatid_sdk::{message_bus::Subscription, module, Context, Module}; use config::Config; use std::sync::Arc; use tokio::sync::{mpsc, Mutex}; -use tracing::{error, info, info_span, Instrument}; +use tracing::{error, info, info_span}; mod state; use state::State; @@ -106,9 +106,9 @@ impl HistoricalAccountsState { // Read from epoch-boundary messages only when it's a new epoch if new_epoch { - let (_, message) = params_subscription.read().await?; + let (_, params_msg) = params_subscription.read().await?; if let Message::Cardano((ref block_info, CardanoMessage::ProtocolParams(params))) = - message.as_ref() + params_msg.as_ref() { Self::check_sync(¤t_block, &block_info); let mut state = state_mutex.lock().await; @@ -118,30 +118,18 @@ impl HistoricalAccountsState { } } - // Handle rewards - let (_, message) = rewards_subscription.read().await?; - match message.as_ref() { - Message::Cardano(( - block_info, - CardanoMessage::StakeRewardDeltas(rewards_msg), - )) => { - let span = info_span!( - "historical_account_state.handle_reward_deltas", - block = block_info.number - ); - async { - Self::check_sync(¤t_block, &block_info); - let mut state = state_mutex.lock().await; - state - .handle_rewards(rewards_msg) - .inspect_err(|e| error!("Reward deltas handling error: {e:#}")) - .ok(); - } - .instrument(span) - .await; - } - - _ => error!("Unexpected message type: {message:?}"), + let (_, rewards_msg) = rewards_subscription.read().await?; + if let Message::Cardano(( + block_info, + CardanoMessage::StakeRewardDeltas(rewards_msg), + )) = rewards_msg.as_ref() + { + Self::check_sync(¤t_block, &block_info); + let mut state = state_mutex.lock().await; + state + .handle_rewards(rewards_msg) + .inspect_err(|e| error!("Reward deltas handling error: {e:#}")) + .ok(); } } @@ -152,16 +140,14 @@ impl HistoricalAccountsState { "historical_account_state.handle_certs", block = block_info.number ); - async { - Self::check_sync(¤t_block, &block_info); - let mut state = state_mutex.lock().await; - state - .handle_tx_certificates(tx_certs_msg) - .inspect_err(|e| error!("TxCertificates handling error: {e:#}")) - .ok(); - } - .instrument(span) - .await; + let _entered = span.enter(); + + Self::check_sync(¤t_block, &block_info); + let mut state = state_mutex.lock().await; + state + .handle_tx_certificates(tx_certs_msg) + .inspect_err(|e| error!("TxCertificates handling error: {e:#}")) + .ok(); } _ => error!("Unexpected message type: {certs_message:?}"), @@ -175,16 +161,14 @@ impl HistoricalAccountsState { "historical_account_state.handle_withdrawals", block = block_info.number ); - async { - Self::check_sync(¤t_block, &block_info); - let mut state = state_mutex.lock().await; - state - .handle_withdrawals(withdrawals_msg) - .inspect_err(|e| error!("Withdrawals handling error: {e:#}")) - .ok(); - } - .instrument(span) - .await; + let _entered = span.enter(); + + Self::check_sync(¤t_block, &block_info); + let mut state = state_mutex.lock().await; + state + .handle_withdrawals(withdrawals_msg) + .inspect_err(|e| error!("Withdrawals handling error: {e:#}")) + .ok(); } _ => error!("Unexpected message type: {message:?}"), @@ -198,44 +182,49 @@ impl HistoricalAccountsState { "historical_account_state.handle_address_deltas", block = block_info.number ); - async { - Self::check_sync(¤t_block, &block_info); + let _entered = span.enter(); + + Self::check_sync(¤t_block, &block_info); + { let mut state = state_mutex.lock().await; state .handle_address_deltas(deltas_msg) .inspect_err(|e| error!("AddressDeltas handling error: {e:#}")) .ok(); } - .instrument(span) - .await; - - let (should_prune, imm, cfg, epoch) = { - let state = state_mutex.lock().await; - ( - state.ready_to_prune(&block_info), - state.immutable.clone(), - state.config.clone(), - block_info.epoch, - ) - }; - - if should_prune { - { - let mut state = state_mutex.lock().await; - state.prune_volatile().await; - } - if let Err(e) = persist_tx.send((epoch as u32, imm, cfg)).await { - panic!("persistence worker crashed: {e}"); - } - } + } + + _ => error!("Unexpected message type: {message:?}"), + } + // Prune volatile and persist if needed + if let Some(current_block) = current_block { + let (should_prune, immutable, cfg) = { + let state = state_mutex.lock().await; + ( + state.ready_to_prune(¤t_block), + state.immutable.clone(), + state.config.clone(), + ) + }; + + if should_prune { { - let mut state = state_mutex.lock().await; - state.volatile.next_block(); + let mut state: tokio::sync::MutexGuard<'_, State> = + state_mutex.lock().await; + state.prune_volatile().await; + } + if let Err(e) = + persist_tx.send((current_block.epoch as u32, immutable, cfg)).await + { + panic!("persistence worker crashed: {e}"); } } + } - _ => error!("Unexpected message type: {message:?}"), + { + let mut state = state_mutex.lock().await; + state.volatile.next_block(); } } } From 661d745e756348117d768a737bd134d46f166279 Mon Sep 17 00:00:00 2001 From: William Hankins Date: Thu, 23 Oct 2025 18:20:07 +0000 Subject: [PATCH 4/4] fix: move immutable store clone after prune_volatile for readability Signed-off-by: William Hankins --- .../src/historical_accounts_state.rs | 17 +++++++---------- 1 file changed, 7 insertions(+), 10 deletions(-) diff --git a/modules/historical_accounts_state/src/historical_accounts_state.rs b/modules/historical_accounts_state/src/historical_accounts_state.rs index 56c810db..1a8557de 100644 --- a/modules/historical_accounts_state/src/historical_accounts_state.rs +++ b/modules/historical_accounts_state/src/historical_accounts_state.rs @@ -199,23 +199,20 @@ impl HistoricalAccountsState { // Prune volatile and persist if needed if let Some(current_block) = current_block { - let (should_prune, immutable, cfg) = { + let should_prune = { let state = state_mutex.lock().await; - ( - state.ready_to_prune(¤t_block), - state.immutable.clone(), - state.config.clone(), - ) + state.ready_to_prune(¤t_block) }; if should_prune { - { + let (store, cfg) = { let mut state: tokio::sync::MutexGuard<'_, State> = state_mutex.lock().await; state.prune_volatile().await; - } - if let Err(e) = - persist_tx.send((current_block.epoch as u32, immutable, cfg)).await + (state.immutable.clone(), state.config.clone()) + }; + + if let Err(e) = persist_tx.send((current_block.epoch as u32, store, cfg)).await { panic!("persistence worker crashed: {e}"); }