Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/run-tests-on-push-to-main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ jobs:
--package acropolis_codec \
--package acropolis_module_assets_state \
--package acropolis_module_block_unpacker \
--package acropolis_module_chain_store \
--package acropolis_module_consensus \
--package acropolis_module_drdd_state \
--package acropolis_module_snapshot_bootstrapper \
Expand Down
84 changes: 37 additions & 47 deletions modules/chain_store/src/chain_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,12 @@ impl ChainStore {
BlocksStateQueryResponse::Error("Invalid message for blocks-state".into()),
)));
};
let Some(state) = query_history.lock().await.current().map(|s| s.clone()) else {
let Some(state) = query_history.lock().await.current().cloned() else {
return Arc::new(Message::StateQueryResponse(StateQueryResponse::Blocks(
BlocksStateQueryResponse::Error("unitialised state".to_string()),
)));
};
let res = Self::handle_blocks_query(&query_store, &state, &query)
let res = Self::handle_blocks_query(&query_store, &state, query)
.unwrap_or_else(|err| BlocksStateQueryResponse::Error(err.to_string()));
Arc::new(Message::StateQueryResponse(StateQueryResponse::Blocks(res)))
}
Expand All @@ -95,23 +95,20 @@ impl ChainStore {
error!("Could not insert block: {err}");
}

match message.as_ref() {
Message::Cardano((block_info, _)) => {
if block_info.new_epoch {
let Ok((_, message)) = params_message.await else {
return;
};
let mut history = history.lock().await;
let mut state = history.get_current_state();
if !Self::handle_new_params(&mut state, message).is_ok() {
return;
};
history.commit(block_info.number, state);
// Have the next params message ready for the next epoch
params_message = params_subscription.read();
}
if let Message::Cardano((block_info, _)) = message.as_ref() {
if block_info.new_epoch {
let Ok((_, message)) = params_message.await else {
return;
};
let mut history = history.lock().await;
let mut state = history.get_current_state();
if Self::handle_new_params(&mut state, message).is_err() {
return;
};
history.commit(block_info.number, state);
// Have the next params message ready for the next epoch
params_message = params_subscription.read();
}
_ => (),
}
}
});
Expand All @@ -137,7 +134,7 @@ impl ChainStore {
let Some(block) = store.get_latest_block()? else {
return Ok(BlocksStateQueryResponse::NotFound);
};
let info = Self::to_block_info(block, store, &state, true)?;
let info = Self::to_block_info(block, store, state, true)?;
Ok(BlocksStateQueryResponse::LatestBlock(info))
}
BlocksStateQuery::GetLatestBlockTransactions { limit, skip, order } => {
Expand All @@ -158,21 +155,21 @@ impl ChainStore {
let Some(block) = Self::get_block_by_key(store, block_key)? else {
return Ok(BlocksStateQueryResponse::NotFound);
};
let info = Self::to_block_info(block, store, &state, false)?;
let info = Self::to_block_info(block, store, state, false)?;
Ok(BlocksStateQueryResponse::BlockInfo(info))
}
BlocksStateQuery::GetBlockBySlot { slot } => {
let Some(block) = store.get_block_by_slot(*slot)? else {
return Ok(BlocksStateQueryResponse::NotFound);
};
let info = Self::to_block_info(block, store, &state, false)?;
let info = Self::to_block_info(block, store, state, false)?;
Ok(BlocksStateQueryResponse::BlockBySlot(info))
}
BlocksStateQuery::GetBlockByEpochSlot { epoch, slot } => {
let Some(block) = store.get_block_by_epoch_slot(*epoch, *slot)? else {
return Ok(BlocksStateQueryResponse::NotFound);
};
let info = Self::to_block_info(block, store, &state, false)?;
let info = Self::to_block_info(block, store, state, false)?;
Ok(BlocksStateQueryResponse::BlockByEpochSlot(info))
}
BlocksStateQuery::GetNextBlocks {
Expand All @@ -185,7 +182,7 @@ impl ChainStore {
blocks: vec![],
}));
}
let Some(block) = Self::get_block_by_key(store, &block_key)? else {
let Some(block) = Self::get_block_by_key(store, block_key)? else {
return Ok(BlocksStateQueryResponse::NotFound);
};
let number = match block_key {
Expand All @@ -195,7 +192,7 @@ impl ChainStore {
let min_number = number + 1 + skip;
let max_number = min_number + limit - 1;
let blocks = store.get_blocks_by_number_range(min_number, max_number)?;
let info = Self::to_block_info_bulk(blocks, store, &state, false)?;
let info = Self::to_block_info_bulk(blocks, store, state, false)?;
Ok(BlocksStateQueryResponse::NextBlocks(NextBlocks {
blocks: info,
}))
Expand All @@ -210,7 +207,7 @@ impl ChainStore {
blocks: vec![],
}));
}
let Some(block) = Self::get_block_by_key(store, &block_key)? else {
let Some(block) = Self::get_block_by_key(store, block_key)? else {
return Ok(BlocksStateQueryResponse::NotFound);
};
let number = match block_key {
Expand All @@ -224,7 +221,7 @@ impl ChainStore {
};
let min_number = max_number.saturating_sub(limit - 1);
let blocks = store.get_blocks_by_number_range(min_number, max_number)?;
let info = Self::to_block_info_bulk(blocks, store, &state, false)?;
let info = Self::to_block_info_bulk(blocks, store, state, false)?;
Ok(BlocksStateQueryResponse::PreviousBlocks(PreviousBlocks {
blocks: info,
}))
Expand Down Expand Up @@ -325,7 +322,7 @@ impl ChainStore {
is_latest: bool,
) -> Result<BlockInfo> {
let blocks = vec![block];
let mut info = Self::to_block_info_bulk(blocks, store, &state, is_latest)?;
let mut info = Self::to_block_info_bulk(blocks, store, state, is_latest)?;
Ok(info.remove(0))
}

Expand Down Expand Up @@ -491,17 +488,13 @@ impl ChainStore {
for tx in decoded.txs() {
let hash = TxHash(*tx.hash());
for output in tx.outputs() {
match output.address() {
Ok(pallas_address) => match map_parameters::map_address(&pallas_address) {
Ok(address) => {
addresses
.entry(BechOrdAddress(address))
.or_insert_with(Vec::new)
.push(hash.clone());
}
_ => (),
},
_ => (),
if let Ok(pallas_address) = output.address() {
if let Ok(address) = map_parameters::map_address(&pallas_address) {
addresses
.entry(BechOrdAddress(address))
.or_insert_with(Vec::new)
.push(hash);
}
}
}
}
Expand All @@ -518,16 +511,13 @@ impl ChainStore {
}

fn handle_new_params(state: &mut State, message: Arc<Message>) -> Result<()> {
match message.as_ref() {
Message::Cardano((_, CardanoMessage::ProtocolParams(params))) => {
if let Some(byron) = &params.params.byron {
state.byron_heavy_delegates = byron.heavy_delegation.clone();
}
if let Some(shelley) = &params.params.shelley {
state.shelley_genesis_delegates = shelley.gen_delegs.clone();
}
if let Message::Cardano((_, CardanoMessage::ProtocolParams(params))) = message.as_ref() {
if let Some(byron) = &params.params.byron {
state.byron_heavy_delegates = byron.heavy_delegation.clone();
}
if let Some(shelley) = &params.params.shelley {
state.shelley_genesis_delegates = shelley.gen_delegs.clone();
}
_ => (),
}
Ok(())
}
Expand Down
8 changes: 4 additions & 4 deletions modules/chain_store/src/stores/fjall.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,21 +131,21 @@ impl FjallBlockStore {
minicbor::encode(raw, &mut bytes).expect("infallible");
bytes
};
batch.insert(&self.blocks, &*info.hash, encoded);
batch.insert(&self.blocks, *info.hash, encoded);
batch.insert(
&self.block_hashes_by_slot,
info.slot.to_be_bytes(),
&*info.hash,
*info.hash,
);
batch.insert(
&self.block_hashes_by_number,
info.number.to_be_bytes(),
&*info.hash,
*info.hash,
);
batch.insert(
&self.block_hashes_by_epoch_slot,
epoch_slot_key(info.epoch, info.epoch_slot),
&*info.hash,
*info.hash,
);
}

Expand Down