Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions modules/historical_accounts_state/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
/db
129 changes: 62 additions & 67 deletions modules/historical_accounts_state/src/historical_accounts_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use caryatid_sdk::{message_bus::Subscription, module, Context, Module};
use config::Config;
use std::sync::Arc;
use tokio::sync::{mpsc, Mutex};
use tracing::{error, info, info_span, Instrument};
use tracing::{error, info, info_span};

mod state;
use state::State;
Expand Down Expand Up @@ -61,6 +61,8 @@ impl HistoricalAccountsState {
) -> Result<()> {
let _ = params_subscription.read().await?;
info!("Consumed initial genesis params from params_subscription");
let _ = address_deltas_subscription.read().await?;
info!("Consumed initial address deltas from address_deltas_subscription");

// Background task to persist epochs sequentially
const MAX_PENDING_PERSISTS: usize = 1;
Expand All @@ -78,9 +80,6 @@ impl HistoricalAccountsState {
});
// Main loop of synchronised messages
loop {
// Get a mutable state
let mut state = state_mutex.lock().await;

// Create all per-block message futures upfront before processing messages sequentially
let certs_message_f = certs_subscription.read();
let address_deltas_message_f = address_deltas_subscription.read();
Expand All @@ -93,6 +92,7 @@ impl HistoricalAccountsState {
let new_epoch = match certs_message.as_ref() {
Message::Cardano((block_info, _)) => {
// Handle rollbacks on this topic only
let mut state = state_mutex.lock().await;
if block_info.status == BlockStatus::RolledBack {
state.volatile.rollback_before(block_info.number);
state.volatile.next_block();
Expand All @@ -106,9 +106,9 @@ impl HistoricalAccountsState {

// Read from epoch-boundary messages only when it's a new epoch
if new_epoch {
let (_, message) = params_subscription.read().await?;
let (_, params_msg) = params_subscription.read().await?;
if let Message::Cardano((ref block_info, CardanoMessage::ProtocolParams(params))) =
message.as_ref()
params_msg.as_ref()
{
Self::check_sync(&current_block, &block_info);
let mut state = state_mutex.lock().await;
Expand All @@ -118,29 +118,18 @@ impl HistoricalAccountsState {
}
}

// Handle rewards
let (_, message) = rewards_subscription.read().await?;
match message.as_ref() {
Message::Cardano((
block_info,
CardanoMessage::StakeRewardDeltas(rewards_msg),
)) => {
let span = info_span!(
"historical_account_state.handle_reward_deltas",
block = block_info.number
);
async {
Self::check_sync(&current_block, &block_info);
state
.handle_rewards(rewards_msg)
.inspect_err(|e| error!("Reward deltas handling error: {e:#}"))
.ok();
}
.instrument(span)
.await;
}

_ => error!("Unexpected message type: {message:?}"),
let (_, rewards_msg) = rewards_subscription.read().await?;
if let Message::Cardano((
block_info,
CardanoMessage::StakeRewardDeltas(rewards_msg),
)) = rewards_msg.as_ref()
{
Self::check_sync(&current_block, &block_info);
let mut state = state_mutex.lock().await;
state
.handle_rewards(rewards_msg)
.inspect_err(|e| error!("Reward deltas handling error: {e:#}"))
.ok();
}
}

Expand All @@ -151,15 +140,14 @@ impl HistoricalAccountsState {
"historical_account_state.handle_certs",
block = block_info.number
);
async {
Self::check_sync(&current_block, &block_info);
state
.handle_tx_certificates(tx_certs_msg)
.inspect_err(|e| error!("TxCertificates handling error: {e:#}"))
.ok();
}
.instrument(span)
.await;
let _entered = span.enter();

Self::check_sync(&current_block, &block_info);
let mut state = state_mutex.lock().await;
state
.handle_tx_certificates(tx_certs_msg)
.inspect_err(|e| error!("TxCertificates handling error: {e:#}"))
.ok();
}

_ => error!("Unexpected message type: {certs_message:?}"),
Expand All @@ -173,15 +161,14 @@ impl HistoricalAccountsState {
"historical_account_state.handle_withdrawals",
block = block_info.number
);
async {
Self::check_sync(&current_block, &block_info);
state
.handle_withdrawals(withdrawals_msg)
.inspect_err(|e| error!("Withdrawals handling error: {e:#}"))
.ok();
}
.instrument(span)
.await;
let _entered = span.enter();

Self::check_sync(&current_block, &block_info);
let mut state = state_mutex.lock().await;
state
.handle_withdrawals(withdrawals_msg)
.inspect_err(|e| error!("Withdrawals handling error: {e:#}"))
.ok();
}

_ => error!("Unexpected message type: {message:?}"),
Expand All @@ -195,38 +182,46 @@ impl HistoricalAccountsState {
"historical_account_state.handle_address_deltas",
block = block_info.number
);
async {
Self::check_sync(&current_block, &block_info);
let _entered = span.enter();

Self::check_sync(&current_block, &block_info);
{
let mut state = state_mutex.lock().await;
state
.handle_address_deltas(deltas_msg)
.inspect_err(|e| error!("AddressDeltas handling error: {e:#}"))
.ok();
}
.instrument(span)
.await;
}

let should_prune = state.ready_to_prune(block_info);
if should_prune {
_ => error!("Unexpected message type: {message:?}"),
}

// Prune volatile and persist if needed
if let Some(current_block) = current_block {
let should_prune = {
let state = state_mutex.lock().await;
state.ready_to_prune(&current_block)
};

if should_prune {
let (store, cfg) = {
let mut state: tokio::sync::MutexGuard<'_, State> =
state_mutex.lock().await;
state.prune_volatile().await;
if let Err(e) = persist_tx
.send((
block_info.epoch as u32,
state.immutable.clone(),
state.config.clone(),
))
.await
{
panic!("persistence worker crashed: {e}");
}
}
(state.immutable.clone(), state.config.clone())
};

if let Err(e) = persist_tx.send((current_block.epoch as u32, store, cfg)).await
{
let mut state = state_mutex.lock().await;
state.volatile.next_block();
panic!("persistence worker crashed: {e}");
}
}
}

_ => error!("Unexpected message type: {message:?}"),
{
let mut state = state_mutex.lock().await;
state.volatile.next_block();
}
}
}
Expand Down