Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/run-tests-on-push-to-main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ jobs:
--package acropolis_module_mithril_snapshot_fetcher \
--package acropolis_module_snapshot_bootstrapper \
--package acropolis_module_spdd_state \
--package acropolis_module_spo_state \
--package acropolis_module_stake_delta_filter \
--package acropolis_module_tx_submitter \
--package acropolis_module_upstream_chain_fetcher \
Expand Down
10 changes: 4 additions & 6 deletions modules/spo_state/src/epochs_history.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,7 @@ impl EpochsHistoryState {
pool_operators: &Vec<KeyHash>,
epoch: u64,
) -> Option<Vec<u64>> {
let Some(epochs_history) = self.epochs_history.as_ref() else {
return None;
};
let epochs_history = self.epochs_history.as_ref()?;

let mut active_stakes = Vec::<u64>::new();
for pool_operator in pool_operators {
Expand Down Expand Up @@ -169,15 +167,15 @@ impl EpochsHistoryState {
&self,
_block: &BlockInfo,
epoch_activity_message: &EpochActivityMessage,
spos: &Vec<(KeyHash, usize)>,
spos: &[(KeyHash, usize)],
) {
let Some(epochs_history) = self.epochs_history.as_ref() else {
return;
};
let EpochActivityMessage { epoch, .. } = epoch_activity_message;

spos.iter().for_each(|(spo, amount)| {
Self::update_epochs_history_with(epochs_history, &spo, *epoch, |epoch_state| {
Self::update_epochs_history_with(epochs_history, spo, *epoch, |epoch_state| {
epoch_state.blocks_minted = Some(*amount as u64);
});
})
Expand All @@ -189,7 +187,7 @@ impl EpochsHistoryState {
epoch: u64,
update_fn: impl FnOnce(&mut EpochState),
) {
let mut epochs = epochs_history.entry(spo.clone()).or_insert_with(BTreeMap::new);
let mut epochs = epochs_history.entry(spo.clone()).or_default();
let epoch_state = epochs.entry(epoch).or_insert_with(|| EpochState::new(epoch));
update_fn(epoch_state);
}
Expand Down
17 changes: 7 additions & 10 deletions modules/spo_state/src/historical_spo_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,28 +37,26 @@ impl HistoricalSPOState {

pub fn add_pool_registration(&mut self, reg: &PoolRegistration) -> Option<bool> {
// update registration if enabled
self.registration.as_mut().and_then(|registration| {
self.registration.as_mut().map(|registration| {
*registration = reg.clone();
Some(true)
true
})
}

pub fn add_pool_updates(&mut self, update: PoolUpdateEvent) -> Option<bool> {
// update updates if enabled
self.updates.as_mut().and_then(|updates| {
self.updates.as_mut().map(|updates| {
updates.push(update);
Some(true)
true
})
}

pub fn add_delegator(&mut self, delegator: &StakeAddress) -> Option<bool> {
self.delegators
.as_mut()
.and_then(|delegators| Some(delegators.insert(delegator.clone()).is_some()))
self.delegators.as_mut().map(|delegators| delegators.insert(delegator.clone()).is_some())
}

pub fn remove_delegator(&mut self, delegator: &StakeAddress) -> Option<bool> {
self.delegators.as_mut().and_then(|delegators| Some(delegators.remove(delegator).is_some()))
self.delegators.as_mut().map(|delegators| delegators.remove(delegator).is_some())
}

pub fn get_all_blocks(&self) -> Option<Vec<u64>> {
Expand All @@ -72,9 +70,8 @@ impl HistoricalSPOState {
}

pub fn add_block(&mut self, epoch: u64, block_number: u64) -> Option<()> {
self.blocks.as_mut().and_then(|blocks| {
self.blocks.as_mut().map(|blocks| {
blocks.entry(epoch).or_insert_with(Vector::new).push_back(block_number);
Some(())
})
}
}
4 changes: 2 additions & 2 deletions modules/spo_state/src/retired_pools_history.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,12 @@ impl RetiredPoolsHistoryState {
/// Handle Retired SPOs
/// Update retired_pools_history with deregistrations
///
pub fn handle_deregistrations(&self, block: &BlockInfo, retired_spos: &Vec<KeyHash>) {
pub fn handle_deregistrations(&self, block: &BlockInfo, retired_spos: &[KeyHash]) {
let Some(retired_pools_history) = self.retired_pools_history.as_ref() else {
return;
};

retired_pools_history.insert(block.epoch, retired_spos.clone());
retired_pools_history.insert(block.epoch, retired_spos.to_vec());
}
}

Expand Down
54 changes: 26 additions & 28 deletions modules/spo_state/src/spo_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ pub struct SPOState;

impl SPOState {
/// Main async run loop
#[allow(clippy::too_many_arguments)]
async fn run(
history: Arc<Mutex<StateHistory<State>>>,
epochs_history: EpochsHistoryState,
Expand All @@ -99,11 +100,8 @@ impl SPOState {
// Get the stake address deltas from the genesis bootstrap, which we know
// don't contain any stake, plus an extra parameter state (!unexplained)
// !TODO this seems overly specific to our startup process
match stake_deltas_subscription.as_mut() {
Some(sub) => {
let _ = sub.read().await?;
}
None => {}
if let Some(sub) = stake_deltas_subscription.as_mut() {
let _ = sub.read().await?;
}

// Main loop of synchronised messages
Expand Down Expand Up @@ -164,7 +162,7 @@ impl SPOState {
match MultiEraHeader::decode(variant, None, &block_msg.header) {
Ok(header) => {
if let Some(vrf_vkey) = header.vrf_vkey() {
state.handle_mint(&block_info, vrf_vkey);
state.handle_mint(block_info, vrf_vkey);
}
}

Expand All @@ -181,7 +179,7 @@ impl SPOState {
Message::Cardano((block_info, CardanoMessage::TxCertificates(tx_certs_msg))) => {
let span = info_span!("spo_state.handle_certs", block = block_info.number);
async {
Self::check_sync(&current_block, &block_info);
Self::check_sync(&current_block, block_info);
let maybe_message = state
.handle_tx_certs(block_info, tx_certs_msg)
.inspect_err(|e| error!("TxCerts Messages handling error: {e}"))
Expand Down Expand Up @@ -227,7 +225,7 @@ impl SPOState {
{
let span = info_span!("spo_state.handle_spdd", block = block_info.number);
span.in_scope(|| {
Self::check_sync(&current_block, &block_info);
Self::check_sync(&current_block, block_info);
// update epochs_history
epochs_history.handle_spdd(block_info, spdd_message);
});
Expand All @@ -244,7 +242,7 @@ impl SPOState {
let span =
info_span!("spo_state.handle_spo_rewards", block = block_info.number);
span.in_scope(|| {
Self::check_sync(&current_block, &block_info);
Self::check_sync(&current_block, block_info);
// update epochs_history
epochs_history.handle_spo_rewards(block_info, spo_rewards_message);
});
Expand All @@ -264,7 +262,7 @@ impl SPOState {
block = block_info.number
);
span.in_scope(|| {
Self::check_sync(&current_block, &block_info);
Self::check_sync(&current_block, block_info);
// update epochs_history
state
.handle_stake_reward_deltas(block_info, stake_reward_deltas_message)
Expand All @@ -284,7 +282,7 @@ impl SPOState {
let span =
info_span!("spo_state.handle_epoch_activity", block = block_info.number);
span.in_scope(|| {
Self::check_sync(&current_block, &block_info);
Self::check_sync(&current_block, block_info);
// update epochs_history
let spos: Vec<(KeyHash, usize)> = epoch_activity_message
.spo_blocks
Expand All @@ -311,7 +309,7 @@ impl SPOState {
let span =
info_span!("spo_state.handle_withdrawals", block = block_info.number);
async {
Self::check_sync(&current_block, &block_info);
Self::check_sync(&current_block, block_info);
state
.handle_withdrawals(withdrawals_msg)
.inspect_err(|e| error!("Withdrawals handling error: {e:#}"))
Expand All @@ -336,7 +334,7 @@ impl SPOState {
let span =
info_span!("spo_state.handle_stake_deltas", block = block_info.number);
async {
Self::check_sync(&current_block, &block_info);
Self::check_sync(&current_block, block_info);
state
.handle_stake_deltas(deltas_msg)
.inspect_err(|e| error!("StakeAddressDeltas handling error: {e:#}"))
Expand All @@ -361,7 +359,7 @@ impl SPOState {
let span =
info_span!("spo_state.handle_governance", block = block_info.number);
span.in_scope(|| {
Self::check_sync(&current_block, &block_info);
Self::check_sync(&current_block, block_info);
state
.handle_governance(&governance_msg.voting_procedures)
.inspect_err(|e| error!("Governance handling error: {e:#}"))
Expand Down Expand Up @@ -574,7 +572,7 @@ impl SPOState {
PoolsStateQuery::GetPoolHistory { pool_id } => {
if epochs_history.is_enabled() {
let history =
epochs_history.get_pool_history(pool_id).unwrap_or(Vec::new());
epochs_history.get_pool_history(pool_id).unwrap_or_default();
PoolsStateQueryResponse::PoolHistory(history)
} else {
PoolsStateQueryResponse::Error(
Expand Down Expand Up @@ -633,22 +631,23 @@ impl SPOState {
}

PoolsStateQuery::GetPoolTotalBlocksMinted { pool_id } => {
PoolsStateQueryResponse::PoolTotalBlocksMinted(state.get_total_blocks_minted_by_pool(&pool_id))
PoolsStateQueryResponse::PoolTotalBlocksMinted(state.get_total_blocks_minted_by_pool(pool_id))
}

PoolsStateQuery::GetBlocksByPool { pool_id } => {
state
.is_historical_blocks_enabled()
.then(|| PoolsStateQueryResponse::BlocksByPool(state.get_blocks_by_pool(pool_id).unwrap_or_default()))
.unwrap_or(PoolsStateQueryResponse::Error("Blocks are not enabled".into()))
if state.is_historical_blocks_enabled() {
PoolsStateQueryResponse::BlocksByPool(state.get_blocks_by_pool(pool_id).unwrap_or_default())
} else {
PoolsStateQueryResponse::Error("Blocks are not enabled".into())
}
}

PoolsStateQuery::GetBlocksByPoolAndEpoch { pool_id, epoch } => {
state
.is_historical_blocks_enabled()
.then(|| PoolsStateQueryResponse::BlocksByPoolAndEpoch(state.get_blocks_by_pool_and_epoch(pool_id, *epoch)
.unwrap_or_default()))
.unwrap_or(PoolsStateQueryResponse::Error("Blocks are not enabled".into()))
if state.is_historical_blocks_enabled() {
PoolsStateQueryResponse::BlocksByPoolAndEpoch(state.get_blocks_by_pool_and_epoch(pool_id, *epoch).unwrap_or_default())
} else {
PoolsStateQueryResponse::Error("Blocks are not enabled".into())
}
}

PoolsStateQuery::GetPoolUpdates { pool_id } => {
Expand Down Expand Up @@ -701,9 +700,8 @@ impl SPOState {
block_height,
})) => {
info!("inspecting state at block height {}", block_height);
let maybe_spo_state = guard
.get_by_index_reverse(*block_height)
.map(|state| LedgerSPOState::from(state));
let maybe_spo_state =
guard.get_by_index_reverse(*block_height).map(LedgerSPOState::from);

if let Some(spo_state) = maybe_spo_state {
context_snapshot
Expand Down
Loading