diff --git a/.github/workflows/run-tests-on-push-to-main.yml b/.github/workflows/run-tests-on-push-to-main.yml index 6e40f3ac..aef0082b 100644 --- a/.github/workflows/run-tests-on-push-to-main.yml +++ b/.github/workflows/run-tests-on-push-to-main.yml @@ -41,6 +41,7 @@ jobs: --package acropolis_module_mithril_snapshot_fetcher \ --package acropolis_module_snapshot_bootstrapper \ --package acropolis_module_spdd_state \ + --package acropolis_module_spo_state \ --package acropolis_module_stake_delta_filter \ --package acropolis_module_tx_submitter \ --package acropolis_module_upstream_chain_fetcher \ diff --git a/modules/spo_state/src/epochs_history.rs b/modules/spo_state/src/epochs_history.rs index b2135d3c..6ad7ed51 100644 --- a/modules/spo_state/src/epochs_history.rs +++ b/modules/spo_state/src/epochs_history.rs @@ -103,9 +103,7 @@ impl EpochsHistoryState { pool_operators: &Vec, epoch: u64, ) -> Option> { - let Some(epochs_history) = self.epochs_history.as_ref() else { - return None; - }; + let epochs_history = self.epochs_history.as_ref()?; let mut active_stakes = Vec::::new(); for pool_operator in pool_operators { @@ -169,7 +167,7 @@ impl EpochsHistoryState { &self, _block: &BlockInfo, epoch_activity_message: &EpochActivityMessage, - spos: &Vec<(KeyHash, usize)>, + spos: &[(KeyHash, usize)], ) { let Some(epochs_history) = self.epochs_history.as_ref() else { return; @@ -177,7 +175,7 @@ impl EpochsHistoryState { let EpochActivityMessage { epoch, .. } = epoch_activity_message; spos.iter().for_each(|(spo, amount)| { - Self::update_epochs_history_with(epochs_history, &spo, *epoch, |epoch_state| { + Self::update_epochs_history_with(epochs_history, spo, *epoch, |epoch_state| { epoch_state.blocks_minted = Some(*amount as u64); }); }) @@ -189,7 +187,7 @@ impl EpochsHistoryState { epoch: u64, update_fn: impl FnOnce(&mut EpochState), ) { - let mut epochs = epochs_history.entry(spo.clone()).or_insert_with(BTreeMap::new); + let mut epochs = epochs_history.entry(spo.clone()).or_default(); let epoch_state = epochs.entry(epoch).or_insert_with(|| EpochState::new(epoch)); update_fn(epoch_state); } diff --git a/modules/spo_state/src/historical_spo_state.rs b/modules/spo_state/src/historical_spo_state.rs index afc129ed..a1d866b3 100644 --- a/modules/spo_state/src/historical_spo_state.rs +++ b/modules/spo_state/src/historical_spo_state.rs @@ -37,28 +37,26 @@ impl HistoricalSPOState { pub fn add_pool_registration(&mut self, reg: &PoolRegistration) -> Option { // update registration if enabled - self.registration.as_mut().and_then(|registration| { + self.registration.as_mut().map(|registration| { *registration = reg.clone(); - Some(true) + true }) } pub fn add_pool_updates(&mut self, update: PoolUpdateEvent) -> Option { // update updates if enabled - self.updates.as_mut().and_then(|updates| { + self.updates.as_mut().map(|updates| { updates.push(update); - Some(true) + true }) } pub fn add_delegator(&mut self, delegator: &StakeAddress) -> Option { - self.delegators - .as_mut() - .and_then(|delegators| Some(delegators.insert(delegator.clone()).is_some())) + self.delegators.as_mut().map(|delegators| delegators.insert(delegator.clone()).is_some()) } pub fn remove_delegator(&mut self, delegator: &StakeAddress) -> Option { - self.delegators.as_mut().and_then(|delegators| Some(delegators.remove(delegator).is_some())) + self.delegators.as_mut().map(|delegators| delegators.remove(delegator).is_some()) } pub fn get_all_blocks(&self) -> Option> { @@ -72,9 +70,8 @@ impl HistoricalSPOState { } pub fn add_block(&mut self, epoch: u64, block_number: u64) -> Option<()> { - self.blocks.as_mut().and_then(|blocks| { + self.blocks.as_mut().map(|blocks| { blocks.entry(epoch).or_insert_with(Vector::new).push_back(block_number); - Some(()) }) } } diff --git a/modules/spo_state/src/retired_pools_history.rs b/modules/spo_state/src/retired_pools_history.rs index d770ce4a..00fe423b 100644 --- a/modules/spo_state/src/retired_pools_history.rs +++ b/modules/spo_state/src/retired_pools_history.rs @@ -54,12 +54,12 @@ impl RetiredPoolsHistoryState { /// Handle Retired SPOs /// Update retired_pools_history with deregistrations /// - pub fn handle_deregistrations(&self, block: &BlockInfo, retired_spos: &Vec) { + pub fn handle_deregistrations(&self, block: &BlockInfo, retired_spos: &[KeyHash]) { let Some(retired_pools_history) = self.retired_pools_history.as_ref() else { return; }; - retired_pools_history.insert(block.epoch, retired_spos.clone()); + retired_pools_history.insert(block.epoch, retired_spos.to_vec()); } } diff --git a/modules/spo_state/src/spo_state.rs b/modules/spo_state/src/spo_state.rs index 4a874566..c44628e4 100644 --- a/modules/spo_state/src/spo_state.rs +++ b/modules/spo_state/src/spo_state.rs @@ -78,6 +78,7 @@ pub struct SPOState; impl SPOState { /// Main async run loop + #[allow(clippy::too_many_arguments)] async fn run( history: Arc>>, epochs_history: EpochsHistoryState, @@ -99,11 +100,8 @@ impl SPOState { // Get the stake address deltas from the genesis bootstrap, which we know // don't contain any stake, plus an extra parameter state (!unexplained) // !TODO this seems overly specific to our startup process - match stake_deltas_subscription.as_mut() { - Some(sub) => { - let _ = sub.read().await?; - } - None => {} + if let Some(sub) = stake_deltas_subscription.as_mut() { + let _ = sub.read().await?; } // Main loop of synchronised messages @@ -164,7 +162,7 @@ impl SPOState { match MultiEraHeader::decode(variant, None, &block_msg.header) { Ok(header) => { if let Some(vrf_vkey) = header.vrf_vkey() { - state.handle_mint(&block_info, vrf_vkey); + state.handle_mint(block_info, vrf_vkey); } } @@ -181,7 +179,7 @@ impl SPOState { Message::Cardano((block_info, CardanoMessage::TxCertificates(tx_certs_msg))) => { let span = info_span!("spo_state.handle_certs", block = block_info.number); async { - Self::check_sync(¤t_block, &block_info); + Self::check_sync(¤t_block, block_info); let maybe_message = state .handle_tx_certs(block_info, tx_certs_msg) .inspect_err(|e| error!("TxCerts Messages handling error: {e}")) @@ -227,7 +225,7 @@ impl SPOState { { let span = info_span!("spo_state.handle_spdd", block = block_info.number); span.in_scope(|| { - Self::check_sync(¤t_block, &block_info); + Self::check_sync(¤t_block, block_info); // update epochs_history epochs_history.handle_spdd(block_info, spdd_message); }); @@ -244,7 +242,7 @@ impl SPOState { let span = info_span!("spo_state.handle_spo_rewards", block = block_info.number); span.in_scope(|| { - Self::check_sync(¤t_block, &block_info); + Self::check_sync(¤t_block, block_info); // update epochs_history epochs_history.handle_spo_rewards(block_info, spo_rewards_message); }); @@ -264,7 +262,7 @@ impl SPOState { block = block_info.number ); span.in_scope(|| { - Self::check_sync(¤t_block, &block_info); + Self::check_sync(¤t_block, block_info); // update epochs_history state .handle_stake_reward_deltas(block_info, stake_reward_deltas_message) @@ -284,7 +282,7 @@ impl SPOState { let span = info_span!("spo_state.handle_epoch_activity", block = block_info.number); span.in_scope(|| { - Self::check_sync(¤t_block, &block_info); + Self::check_sync(¤t_block, block_info); // update epochs_history let spos: Vec<(KeyHash, usize)> = epoch_activity_message .spo_blocks @@ -311,7 +309,7 @@ impl SPOState { let span = info_span!("spo_state.handle_withdrawals", block = block_info.number); async { - Self::check_sync(¤t_block, &block_info); + Self::check_sync(¤t_block, block_info); state .handle_withdrawals(withdrawals_msg) .inspect_err(|e| error!("Withdrawals handling error: {e:#}")) @@ -336,7 +334,7 @@ impl SPOState { let span = info_span!("spo_state.handle_stake_deltas", block = block_info.number); async { - Self::check_sync(¤t_block, &block_info); + Self::check_sync(¤t_block, block_info); state .handle_stake_deltas(deltas_msg) .inspect_err(|e| error!("StakeAddressDeltas handling error: {e:#}")) @@ -361,7 +359,7 @@ impl SPOState { let span = info_span!("spo_state.handle_governance", block = block_info.number); span.in_scope(|| { - Self::check_sync(¤t_block, &block_info); + Self::check_sync(¤t_block, block_info); state .handle_governance(&governance_msg.voting_procedures) .inspect_err(|e| error!("Governance handling error: {e:#}")) @@ -574,7 +572,7 @@ impl SPOState { PoolsStateQuery::GetPoolHistory { pool_id } => { if epochs_history.is_enabled() { let history = - epochs_history.get_pool_history(pool_id).unwrap_or(Vec::new()); + epochs_history.get_pool_history(pool_id).unwrap_or_default(); PoolsStateQueryResponse::PoolHistory(history) } else { PoolsStateQueryResponse::Error( @@ -633,22 +631,23 @@ impl SPOState { } PoolsStateQuery::GetPoolTotalBlocksMinted { pool_id } => { - PoolsStateQueryResponse::PoolTotalBlocksMinted(state.get_total_blocks_minted_by_pool(&pool_id)) + PoolsStateQueryResponse::PoolTotalBlocksMinted(state.get_total_blocks_minted_by_pool(pool_id)) } PoolsStateQuery::GetBlocksByPool { pool_id } => { - state - .is_historical_blocks_enabled() - .then(|| PoolsStateQueryResponse::BlocksByPool(state.get_blocks_by_pool(pool_id).unwrap_or_default())) - .unwrap_or(PoolsStateQueryResponse::Error("Blocks are not enabled".into())) + if state.is_historical_blocks_enabled() { + PoolsStateQueryResponse::BlocksByPool(state.get_blocks_by_pool(pool_id).unwrap_or_default()) + } else { + PoolsStateQueryResponse::Error("Blocks are not enabled".into()) + } } PoolsStateQuery::GetBlocksByPoolAndEpoch { pool_id, epoch } => { - state - .is_historical_blocks_enabled() - .then(|| PoolsStateQueryResponse::BlocksByPoolAndEpoch(state.get_blocks_by_pool_and_epoch(pool_id, *epoch) - .unwrap_or_default())) - .unwrap_or(PoolsStateQueryResponse::Error("Blocks are not enabled".into())) + if state.is_historical_blocks_enabled() { + PoolsStateQueryResponse::BlocksByPoolAndEpoch(state.get_blocks_by_pool_and_epoch(pool_id, *epoch).unwrap_or_default()) + } else { + PoolsStateQueryResponse::Error("Blocks are not enabled".into()) + } } PoolsStateQuery::GetPoolUpdates { pool_id } => { @@ -701,9 +700,8 @@ impl SPOState { block_height, })) => { info!("inspecting state at block height {}", block_height); - let maybe_spo_state = guard - .get_by_index_reverse(*block_height) - .map(|state| LedgerSPOState::from(state)); + let maybe_spo_state = + guard.get_by_index_reverse(*block_height).map(LedgerSPOState::from); if let Some(spo_state) = maybe_spo_state { context_snapshot diff --git a/modules/spo_state/src/state.rs b/modules/spo_state/src/state.rs index b1ab81dc..84a4f566 100644 --- a/modules/spo_state/src/state.rs +++ b/modules/spo_state/src/state.rs @@ -129,13 +129,12 @@ impl From<&State> for SPOState { retiring: state .pending_deregistrations .iter() - .map(|(epoch, key_hashes)| { + .flat_map(|(epoch, key_hashes)| { key_hashes .iter() .map(|key_hash| (key_hash.clone(), *epoch)) .collect::, u64)>>() }) - .flatten() .collect(), } } @@ -148,7 +147,7 @@ impl State { } /// Get total blocks minted by pools - pub fn get_total_blocks_minted_by_pools(&self, pools_operators: &Vec) -> Vec { + pub fn get_total_blocks_minted_by_pools(&self, pools_operators: &[KeyHash]) -> Vec { pools_operators .iter() .map(|pool_operator| *self.total_blocks_minted.get(pool_operator).unwrap_or(&0)) @@ -172,27 +171,19 @@ impl State { /// Get pool metadata pub fn get_pool_metadata(&self, pool_id: &KeyHash) -> Option { - self.spos.get(pool_id).map(|p| p.pool_metadata.clone()).flatten() + self.spos.get(pool_id).and_then(|p| p.pool_metadata.clone()) } /// Get Pool Delegators pub fn get_pool_delegators(&self, pool_operator: &KeyHash) -> Option> { - let Some(stake_addresses) = self.stake_addresses.as_ref() else { - return None; - }; - let Some(historical_spos) = self.historical_spos.as_ref() else { - return None; - }; + let stake_addresses = self.stake_addresses.as_ref()?; + let historical_spos = self.historical_spos.as_ref()?; let stake_addresses = stake_addresses.lock().unwrap(); let delegators = historical_spos .get(pool_operator) - .map(|s| s.delegators.clone()) - .flatten() - .map(|s| s.into_iter().collect::>()); - let Some(delegators) = delegators else { - return None; - }; + .and_then(|s| s.delegators.clone()) + .map(|s| s.into_iter().collect::>())?; let delegators_map = stake_addresses.get_accounts_balances_map(&delegators); delegators_map.map(|map| map.into_iter().collect()) @@ -202,19 +193,13 @@ impl State { /// Return Vector of block heights /// Return None when store_blocks not enabled pub fn get_blocks_by_pool(&self, pool_id: &KeyHash) -> Option> { - let Some(historical_spos) = self.historical_spos.as_ref() else { - return None; - }; - historical_spos.get(pool_id).and_then(|s| s.get_all_blocks()) + self.historical_spos.as_ref()?.get(pool_id).and_then(|s| s.get_all_blocks()) } /// Get Blocks by Pool and Epoch /// Return None when store_blocks not enabled pub fn get_blocks_by_pool_and_epoch(&self, pool_id: &KeyHash, epoch: u64) -> Option> { - let Some(historical_spos) = self.historical_spos.as_ref() else { - return None; - }; - historical_spos.get(pool_id).and_then(|s| s.get_blocks_by_epoch(epoch)) + self.historical_spos.as_ref()?.get(pool_id).and_then(|s| s.get_blocks_by_epoch(epoch)) } /// Get Pool Updates @@ -367,10 +352,8 @@ impl State { .entry(reg.operator.clone()) .or_insert_with(|| HistoricalSPOState::new(&self.store_config)); historical_spo.add_pool_registration(reg); - historical_spo.add_pool_updates(PoolUpdateEvent::register_event( - tx_identifier.clone(), - *cert_index, - )); + historical_spo + .add_pool_updates(PoolUpdateEvent::register_event(*tx_identifier, *cert_index)); } } @@ -446,10 +429,9 @@ impl State { return; }; let mut stake_addresses = stake_addresses.lock().unwrap(); - let old_spo = - stake_addresses.get(&stake_address).map(|s| s.delegated_spo.clone()).flatten(); + let old_spo = stake_addresses.get(stake_address).and_then(|s| s.delegated_spo.clone()); - if stake_addresses.deregister_stake_address(&stake_address) { + if stake_addresses.deregister_stake_address(stake_address) { // update historical_spos if let Some(historical_spos) = self.historical_spos.as_mut() { if let Some(old_spo) = old_spo.as_ref() { @@ -477,17 +459,16 @@ impl State { return; }; let mut stake_addresses = stake_addresses.lock().unwrap(); - let old_spo = - stake_addresses.get(&stake_address).map(|s| s.delegated_spo.clone()).flatten(); + let old_spo = stake_addresses.get(stake_address).and_then(|s| s.delegated_spo.clone()); - if stake_addresses.record_stake_delegation(&stake_address, spo) { + if stake_addresses.record_stake_delegation(stake_address, spo) { // update historical_spos if let Some(historical_spos) = self.historical_spos.as_mut() { // Remove old delegator if let Some(old_spo) = old_spo.as_ref() { match historical_spos.get_mut(old_spo) { Some(historical_spo) => { - if let Some(removed) = historical_spo.remove_delegator(&stake_address) { + if let Some(removed) = historical_spo.remove_delegator(stake_address) { if !removed { error!( "Historical SPO state for {} does not contain delegator {}", @@ -507,7 +488,7 @@ impl State { let historical_spo = historical_spos .entry(spo.clone()) .or_insert_with(|| HistoricalSPOState::new(&self.store_config)); - if let Some(added) = historical_spo.add_delegator(&stake_address) { + if let Some(added) = historical_spo.add_delegator(stake_address) { if !added { error!( "Historical SPO state for {} already contains delegator {}", @@ -539,7 +520,7 @@ impl State { TxCertificate::PoolRegistration(reg) => { self.handle_pool_registration( block, - ®, + reg, &tx_cert.tx_identifier, &tx_cert.cert_index, ); @@ -547,7 +528,7 @@ impl State { TxCertificate::PoolRetirement(ret) => { self.handle_pool_retirement( block, - &ret, + ret, &tx_cert.tx_identifier, &tx_cert.cert_index, ); @@ -555,10 +536,10 @@ impl State { // for stake addresses TxCertificate::StakeRegistration(stake_address) => { - self.register_stake_address(&stake_address); + self.register_stake_address(stake_address); } TxCertificate::StakeDeregistration(stake_address) => { - self.deregister_stake_address(&stake_address); + self.deregister_stake_address(stake_address); } TxCertificate::Registration(reg) => { self.register_stake_address(®.stake_address); @@ -615,9 +596,9 @@ impl State { .or_insert_with(|| HistoricalSPOState::new(&self.store_config)); if let Some(votes) = historical_spo.votes.as_mut() { - for (_, vp) in &single_votes.voting_procedures { + for vp in single_votes.voting_procedures.values() { votes.push(VoteRecord { - tx_hash: tx_hash.clone(), + tx_hash: *tx_hash, vote_index: vp.vote_index, vote: vp.vote.clone(), }); diff --git a/modules/spo_state/src/store_config.rs b/modules/spo_state/src/store_config.rs index a89c15d6..5dab80c2 100644 --- a/modules/spo_state/src/store_config.rs +++ b/modules/spo_state/src/store_config.rs @@ -25,6 +25,7 @@ pub struct StoreConfig { } impl StoreConfig { + #[allow(clippy::too_many_arguments)] pub fn new( store_epochs_history: bool, store_retired_pools: bool,