From a6a9fdbd35be7580edc32e761769a16f6e6ce7c5 Mon Sep 17 00:00:00 2001 From: Paul Clark Date: Tue, 21 Oct 2025 11:53:23 +0100 Subject: [PATCH 01/17] Combine block body and header messages into one Rationale: Within memory there is no extra cost to sending the full block body in the two cases (EpochState, SPOState) which subscribe only to headers, and having separate header messages complicates the forthcoming insertion of consensus into the block flow. The only case where this would add overhead would be if either EpochState and/or SPOState were remote while all the other consumers of block bodies were still internal. This seems very unlikely... --- common/src/messages.rs | 19 +++----- modules/block_unpacker/src/block_unpacker.rs | 6 +-- modules/epochs_state/src/epochs_state.rs | 28 ++++++------ .../src/mithril_snapshot_fetcher.rs | 43 ++++++------------- modules/spo_state/src/spo_state.rs | 32 +++++++------- .../src/body_fetcher.rs | 16 ++++--- .../src/upstream_cache.rs | 19 ++++---- modules/upstream_chain_fetcher/src/utils.rs | 21 +++------ 8 files changed, 77 insertions(+), 107 deletions(-) diff --git a/common/src/messages.rs b/common/src/messages.rs index e345e488..101830fe 100644 --- a/common/src/messages.rs +++ b/common/src/messages.rs @@ -31,18 +31,14 @@ use crate::types::*; pub use caryatid_module_clock::messages::ClockTickMessage; pub use caryatid_module_rest_server::messages::{GetRESTResponse, RESTRequest, RESTResponse}; -/// Block header message +/// Raw block data message #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] -pub struct BlockHeaderMessage { - /// Raw Data - pub raw: Vec, -} +pub struct RawBlockMessage { + /// Header raw data + pub header: Vec, -/// Block body message -#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] -pub struct BlockBodyMessage { - /// Raw Data - pub raw: Vec, + /// Body raw data + pub body: Vec, } /// Snapshot completion message @@ -280,8 +276,7 @@ pub struct SPOStateMessage { /// Cardano message enum #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub enum CardanoMessage { - BlockHeader(BlockHeaderMessage), // Block header available - BlockBody(BlockBodyMessage), // Block body available + BlockAvailable(RawBlockMessage), // Block body available SnapshotComplete, // Mithril snapshot loaded ReceivedTxs(RawTxsMessage), // Transaction available GenesisComplete(GenesisCompleteMessage), // Genesis UTXOs done + genesis params diff --git a/modules/block_unpacker/src/block_unpacker.rs b/modules/block_unpacker/src/block_unpacker.rs index f3b51a5a..2b9f3bd3 100644 --- a/modules/block_unpacker/src/block_unpacker.rs +++ b/modules/block_unpacker/src/block_unpacker.rs @@ -9,7 +9,7 @@ use pallas::ledger::traverse::MultiEraBlock; use std::sync::Arc; use tracing::{debug, error, info, info_span, Instrument}; -const DEFAULT_SUBSCRIBE_TOPIC: &str = "cardano.block.body"; +const DEFAULT_SUBSCRIBE_TOPIC: &str = "cardano.block.available"; const DEFAULT_PUBLISH_TOPIC: &str = "cardano.txs"; /// Block unpacker module @@ -42,9 +42,9 @@ impl BlockUnpacker { return; }; match message.as_ref() { - Message::Cardano((block_info, CardanoMessage::BlockBody(body_msg))) => { + Message::Cardano((block_info, CardanoMessage::BlockAvailable(block_msg))) => { // Parse the body - match MultiEraBlock::decode(&body_msg.raw) { + match MultiEraBlock::decode(&block_msg.body) { Ok(block) => { let span = info_span!("block_unpacker", block = block_info.number); diff --git a/modules/epochs_state/src/epochs_state.rs b/modules/epochs_state/src/epochs_state.rs index f57a57ea..1ef0c29d 100644 --- a/modules/epochs_state/src/epochs_state.rs +++ b/modules/epochs_state/src/epochs_state.rs @@ -33,8 +33,8 @@ const DEFAULT_BOOTSTRAPPED_SUBSCRIBE_TOPIC: (&str, &str) = ( "bootstrapped-subscribe-topic", "cardano.sequence.bootstrapped", ); -const DEFAULT_BLOCK_HEADER_SUBSCRIBE_TOPIC: (&str, &str) = - ("block-header-subscribe-topic", "cardano.block.header"); +const DEFAULT_BLOCKS_SUBSCRIBE_TOPIC: (&str, &str) = + ("blocks-subscribe-topic", "cardano.block.available"); const DEFAULT_BLOCK_TXS_SUBSCRIBE_TOPIC: (&str, &str) = ("block-txs-subscribe-topic", "cardano.block.txs"); const DEFAULT_PROTOCOL_PARAMETERS_SUBSCRIBE_TOPIC: (&str, &str) = ( @@ -59,7 +59,7 @@ impl EpochsState { history: Arc>>, epochs_history: EpochsHistoryState, mut bootstrapped_subscription: Box>, - mut headers_subscription: Box>, + mut blocks_subscription: Box>, mut block_txs_subscription: Box>, mut protocol_parameters_subscription: Box>, mut epoch_activity_publisher: EpochActivityPublisher, @@ -81,13 +81,13 @@ impl EpochsState { let mut current_block: Option = None; // Read both topics in parallel - let headers_message_f = headers_subscription.read(); + let blocks_message_f = blocks_subscription.read(); let block_txs_message_f = block_txs_subscription.read(); - // Handle headers first - let (_, message) = headers_message_f.await?; + // Handle blocks first + let (_, message) = blocks_message_f.await?; match message.as_ref() { - Message::Cardano((block_info, CardanoMessage::BlockHeader(header_msg))) => { + Message::Cardano((block_info, CardanoMessage::BlockAvailable(block_msg))) => { // handle rollback here if block_info.status == BlockStatus::RolledBack { state = history.lock().await.get_rolled_back_state(block_info.number); @@ -120,7 +120,7 @@ impl EpochsState { let span = info_span!("epochs_state.decode_header", block = block_info.number); let mut header = None; span.in_scope(|| { - header = match MultiEraHeader::decode(variant, None, &header_msg.raw) { + header = match MultiEraHeader::decode(variant, None, &block_msg.header) { Ok(header) => Some(header), Err(e) => { error!("Can't decode header {}: {e}", block_info.slot); @@ -199,10 +199,10 @@ impl EpochsState { .unwrap_or(DEFAULT_BOOTSTRAPPED_SUBSCRIBE_TOPIC.1.to_string()); info!("Creating subscriber for bootstrapped on '{bootstrapped_subscribe_topic}'"); - let block_headers_subscribe_topic = config - .get_string(DEFAULT_BLOCK_HEADER_SUBSCRIBE_TOPIC.0) - .unwrap_or(DEFAULT_BLOCK_HEADER_SUBSCRIBE_TOPIC.1.to_string()); - info!("Creating subscriber for headers on '{block_headers_subscribe_topic}'"); + let blocks_subscribe_topic = config + .get_string(DEFAULT_BLOCKS_SUBSCRIBE_TOPIC.0) + .unwrap_or(DEFAULT_BLOCKS_SUBSCRIBE_TOPIC.1.to_string()); + info!("Creating subscriber for blocks on '{blocks_subscribe_topic}'"); let block_txs_subscribe_topic = config .get_string(DEFAULT_BLOCK_TXS_SUBSCRIBE_TOPIC.0) @@ -242,7 +242,7 @@ impl EpochsState { // Subscribe let bootstrapped_subscription = context.subscribe(&bootstrapped_subscribe_topic).await?; - let headers_subscription = context.subscribe(&block_headers_subscribe_topic).await?; + let blocks_subscription = context.subscribe(&blocks_subscribe_topic).await?; let protocol_parameters_subscription = context.subscribe(&protocol_parameters_subscribe_topic).await?; let block_txs_subscription = context.subscribe(&block_txs_subscribe_topic).await?; @@ -344,7 +344,7 @@ impl EpochsState { history, epochs_history, bootstrapped_subscription, - headers_subscription, + blocks_subscription, block_txs_subscription, protocol_parameters_subscription, epoch_activity_publisher, diff --git a/modules/mithril_snapshot_fetcher/src/mithril_snapshot_fetcher.rs b/modules/mithril_snapshot_fetcher/src/mithril_snapshot_fetcher.rs index 20804f2f..34c5e408 100644 --- a/modules/mithril_snapshot_fetcher/src/mithril_snapshot_fetcher.rs +++ b/modules/mithril_snapshot_fetcher/src/mithril_snapshot_fetcher.rs @@ -3,7 +3,7 @@ use acropolis_common::{ genesis_values::GenesisValues, - messages::{BlockBodyMessage, BlockHeaderMessage, CardanoMessage, Message}, + messages::{RawBlockMessage, CardanoMessage, Message}, BlockInfo, BlockStatus, Era, }; use anyhow::{anyhow, bail, Result}; @@ -24,15 +24,14 @@ use std::path::Path; use std::sync::Arc; use std::thread::sleep; use std::time::Duration as SystemDuration; -use tokio::{join, sync::Mutex}; +use tokio::sync::Mutex; use tracing::{debug, error, info, info_span, Instrument}; mod pause; use pause::PauseType; const DEFAULT_STARTUP_TOPIC: &str = "cardano.sequence.bootstrapped"; -const DEFAULT_HEADER_TOPIC: &str = "cardano.block.header"; -const DEFAULT_BODY_TOPIC: &str = "cardano.block.body"; +const DEFAULT_BLOCK_TOPIC: &str = "cardano.block.available"; const DEFAULT_COMPLETION_TOPIC: &str = "cardano.snapshot.complete"; const DEFAULT_AGGREGATOR_URL: &str = @@ -238,9 +237,8 @@ impl MithrilSnapshotFetcher { config: Arc, genesis: GenesisValues, ) -> Result<()> { - let header_topic = - config.get_string("header-topic").unwrap_or(DEFAULT_HEADER_TOPIC.to_string()); - let body_topic = config.get_string("body-topic").unwrap_or(DEFAULT_BODY_TOPIC.to_string()); + let block_topic = + config.get_string("block-topic").unwrap_or(DEFAULT_BLOCK_TOPIC.to_string()); let completion_topic = config.get_string("completion-topic").unwrap_or(DEFAULT_COMPLETION_TOPIC.to_string()); let directory = config.get_string("directory").unwrap_or(DEFAULT_DIRECTORY.to_string()); @@ -345,33 +343,20 @@ impl MithrilSnapshotFetcher { } } - // Send the block header message - let header = block.header(); - let header_message = BlockHeaderMessage { - raw: header.cbor().to_vec(), + // Send the block message + let message = RawBlockMessage { + header: block.header().cbor().to_vec(), + body: raw_block, }; - let header_message_enum = Message::Cardano(( + let message_enum = Message::Cardano(( block_info.clone(), - CardanoMessage::BlockHeader(header_message), + CardanoMessage::BlockAvailable(message), )); - let header_future = context - .message_bus - .publish(&header_topic, Arc::new(header_message_enum)); - // Send the block body message - let body_message = BlockBodyMessage { raw: raw_block }; - - let body_message_enum = Message::Cardano(( - block_info.clone(), - CardanoMessage::BlockBody(body_message), - )); - let body_future = - context.message_bus.publish(&body_topic, Arc::new(body_message_enum)); - - let (header_result, body_result) = join!(header_future, body_future); - header_result.unwrap_or_else(|e| error!("Failed to publish header: {e}")); - body_result.unwrap_or_else(|e| error!("Failed to publish body: {e}")); + context.message_bus.publish(&block_topic, Arc::new(message_enum)) + .await + .unwrap_or_else(|e| error!("Failed to publish block message: {e}")); last_block_info = Some(block_info); Ok::<(), anyhow::Error>(()) diff --git a/modules/spo_state/src/spo_state.rs b/modules/spo_state/src/spo_state.rs index b345c0c2..7e0ac96c 100644 --- a/modules/spo_state/src/spo_state.rs +++ b/modules/spo_state/src/spo_state.rs @@ -46,8 +46,8 @@ const DEFAULT_WITHDRAWALS_SUBSCRIBE_TOPIC: (&str, &str) = ("withdrawals-subscribe-topic", "cardano.withdrawals"); const DEFAULT_GOVERNANCE_SUBSCRIBE_TOPIC: (&str, &str) = ("governance-subscribe-topic", "cardano.governance"); -const DEFAULT_BLOCK_HEADER_SUBSCRIBE_TOPIC: (&str, &str) = - ("block-header-subscribe-topic", "cardano.block.header"); +const DEFAULT_BLOCKS_SUBSCRIBE_TOPIC: (&str, &str) = + ("blocks-subscribe-topic", "cardano.block.available"); const DEFAULT_EPOCH_ACTIVITY_SUBSCRIBE_TOPIC: (&str, &str) = ("epoch-activity-subscribe-topic", "cardano.epoch.activity"); const DEFAULT_SPDD_SUBSCRIBE_TOPIC: (&str, &str) = @@ -85,7 +85,7 @@ impl SPOState { store_config: &StoreConfig, // subscribers mut certificates_subscription: Box>, - mut block_headers_subscription: Box>, + mut blocks_subscription: Box>, mut withdrawals_subscription: Option>>, mut governance_subscription: Option>>, mut epoch_activity_subscription: Box>, @@ -114,7 +114,7 @@ impl SPOState { // read per-block topics in parallel let certs_message_f = certificates_subscription.read(); - let block_headers_message_f = block_headers_subscription.read(); + let blocks_message_f = blocks_subscription.read(); let withdrawals_message_f = withdrawals_subscription.as_mut().map(|s| s.read()); let governance_message_f = governance_subscription.as_mut().map(|s| s.read()); let stake_deltas_message_f = stake_deltas_subscription.as_mut().map(|s| s.read()); @@ -139,11 +139,11 @@ impl SPOState { } }; - // handle Block Headers (handle_mint) before handle_tx_certs + // handle blocks (handle_mint) before handle_tx_certs // in case of epoch boundary - let (_, block_headers_message) = block_headers_message_f.await?; - match block_headers_message.as_ref() { - Message::Cardano((block_info, CardanoMessage::BlockHeader(header_msg))) => { + let (_, block_message) = blocks_message_f.await?; + match block_message.as_ref() { + Message::Cardano((block_info, CardanoMessage::BlockAvailable(block_msg))) => { let span = info_span!("spo_state.handle_block_header", block = block_info.number); @@ -161,7 +161,7 @@ impl SPOState { // Parse the header - note we ignore the subtag because EBBs // are suppressed upstream - match MultiEraHeader::decode(variant, None, &header_msg.raw) { + match MultiEraHeader::decode(variant, None, &block_msg.header) { Ok(header) => { if let Some(vrf_vkey) = header.vrf_vkey() { state.handle_mint(&block_info, vrf_vkey); @@ -173,7 +173,7 @@ impl SPOState { }); } - _ => error!("Unexpected message type: {block_headers_message:?}"), + _ => error!("Unexpected message type: {block_message:?}"), } // handle tx certificates @@ -439,10 +439,10 @@ impl SPOState { .unwrap_or(DEFAULT_SPO_REWARDS_SUBSCRIBE_TOPIC.1.to_string()); info!("Creating SPO rewards subscriber on '{spo_rewards_subscribe_topic}'"); - let block_headers_subscribe_topic = config - .get_string(DEFAULT_BLOCK_HEADER_SUBSCRIBE_TOPIC.0) - .unwrap_or(DEFAULT_BLOCK_HEADER_SUBSCRIBE_TOPIC.1.to_string()); - info!("Creating block headers subscriber on '{block_headers_subscribe_topic}'"); + let blocks_subscribe_topic = config + .get_string(DEFAULT_BLOCKS_SUBSCRIBE_TOPIC.0) + .unwrap_or(DEFAULT_BLOCKS_SUBSCRIBE_TOPIC.1.to_string()); + info!("Creating blocks subscriber on '{blocks_subscribe_topic}'"); let stake_reward_deltas_subscribe_topic = config .get_string(DEFAULT_STAKE_REWARD_DELTAS_SUBSCRIBE_TOPIC.0) @@ -725,7 +725,7 @@ impl SPOState { // Subscriptions let certificates_subscription = context.subscribe(&certificates_subscribe_topic).await?; - let block_headers_subscription = context.subscribe(&block_headers_subscribe_topic).await?; + let blocks_subscription = context.subscribe(&blocks_subscribe_topic).await?; let epoch_activity_subscription = context.subscribe(&epoch_activity_subscribe_topic).await?; let spdd_subscription = context.subscribe(&spdd_subscribe_topic).await?; @@ -771,7 +771,7 @@ impl SPOState { retired_pools_history, &store_config, certificates_subscription, - block_headers_subscription, + blocks_subscription, withdrawals_subscription, governance_subscription, epoch_activity_subscription, diff --git a/modules/upstream_chain_fetcher/src/body_fetcher.rs b/modules/upstream_chain_fetcher/src/body_fetcher.rs index afa2a5ee..bdf8cd5e 100644 --- a/modules/upstream_chain_fetcher/src/body_fetcher.rs +++ b/modules/upstream_chain_fetcher/src/body_fetcher.rs @@ -2,7 +2,7 @@ //! Multi-connection, block body fetching part of the client (in separate thread). use acropolis_common::{ - messages::{BlockBodyMessage, BlockHeaderMessage}, + messages::RawBlockMessage, BlockInfo, BlockStatus, Era, }; use anyhow::{bail, Result}; @@ -55,13 +55,13 @@ impl BodyFetcher { } } - async fn fetch_block(&mut self, point: Point) -> Result>> { + async fn fetch_block(&mut self, point: Point) -> Result>>> { // Fetch the block body debug!("Requesting single block {point:?}"); let body = self.peer.blockfetch().fetch_single(point.clone()).await; match body { - Ok(body) => Ok(Success(Arc::new(BlockBodyMessage { raw: body }))), + Ok(body) => Ok(Success(Arc::new(body))), Err(blockfetch::ClientError::Plexer(e)) => { error!("Can't fetch block at {point:?}: {e}, will try to restart"); Ok(NetworkError) @@ -139,16 +139,18 @@ impl BodyFetcher { // reconstruct a Point from the header because the one we get // in the RollForward is the *tip*, not the next read point let fetch_point = Point::Specific(block_info.slot, block_info.hash.to_vec()); - let msg_body = match self.fetch_block(fetch_point).await? { + let raw_body = match self.fetch_block(fetch_point).await? { Success(body) => body, NetworkError => return Ok(NetworkError), }; - let msg_hdr = Arc::new(BlockHeaderMessage { raw: h.cbor }); + let message = Arc::new(RawBlockMessage { + header: h.cbor, + body: raw_body.to_vec(), + }); let record = UpstreamCacheRecord { id: block_info.clone(), - hdr: msg_hdr.clone(), - body: msg_body.clone(), + message: message.clone(), }; Ok(Success(record)) diff --git a/modules/upstream_chain_fetcher/src/upstream_cache.rs b/modules/upstream_chain_fetcher/src/upstream_cache.rs index 9281a358..86c39eed 100644 --- a/modules/upstream_chain_fetcher/src/upstream_cache.rs +++ b/modules/upstream_chain_fetcher/src/upstream_cache.rs @@ -1,5 +1,5 @@ use acropolis_common::{ - messages::{BlockBodyMessage, BlockHeaderMessage}, + messages::RawBlockMessage, BlockInfo, }; use anyhow::{anyhow, bail, Result}; @@ -13,8 +13,7 @@ use std::{ #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub struct UpstreamCacheRecord { pub id: BlockInfo, - pub hdr: Arc, - pub body: Arc, + pub message: Arc, } pub trait Storage { @@ -171,7 +170,7 @@ impl Storage for FileStorage { mod test { use crate::upstream_cache::{Storage, UpstreamCacheImpl, UpstreamCacheRecord}; use acropolis_common::{ - messages::{BlockBodyMessage, BlockHeaderMessage}, + messages::RawBlockMessage, BlockHash, BlockInfo, BlockStatus, Era, }; use anyhow::Result; @@ -194,11 +193,9 @@ mod test { fn ucr(n: u64, hdr: usize, body: usize) -> UpstreamCacheRecord { UpstreamCacheRecord { id: blk(n), - hdr: Arc::new(BlockHeaderMessage { - raw: vec![hdr as u8], - }), - body: Arc::new(BlockBodyMessage { - raw: vec![body as u8], + message: Arc::new(RawBlockMessage { + header: vec![hdr as u8], + body: vec![body as u8], }), } } @@ -259,8 +256,8 @@ mod test { for n in 0..11 { let record = cache.read_record()?.unwrap(); assert_eq!(record.id.number, perm[n]); - assert_eq!(record.hdr.raw, vec![n as u8]); - assert_eq!(record.body.raw, vec![(n + 100) as u8]); + assert_eq!(record.message.header, vec![n as u8]); + assert_eq!(record.message.body, vec![(n + 100) as u8]); cache.next_record()?; } diff --git a/modules/upstream_chain_fetcher/src/utils.rs b/modules/upstream_chain_fetcher/src/utils.rs index dd3ee50b..5c538014 100644 --- a/modules/upstream_chain_fetcher/src/utils.rs +++ b/modules/upstream_chain_fetcher/src/utils.rs @@ -10,8 +10,7 @@ use serde::Deserialize; use std::sync::Arc; use tracing::{error, info}; -const DEFAULT_HEADER_TOPIC: (&str, &str) = ("header-topic", "cardano.block.header"); -const DEFAULT_BODY_TOPIC: (&str, &str) = ("body-topic", "cardano.block.body"); +const DEFAULT_BLOCK_TOPIC: (&str, &str) = ("block-topic", "cardano.block.available"); const DEFAULT_SNAPSHOT_COMPLETION_TOPIC: (&str, &str) = ("snapshot-completion-topic", "cardano.snapshot.complete"); const DEFAULT_GENESIS_COMPLETION_TOPIC: (&str, &str) = @@ -42,8 +41,7 @@ pub enum SyncPoint { pub struct FetcherConfig { pub context: Arc>, - pub header_topic: String, - pub body_topic: String, + pub block_topic: String, pub sync_point: SyncPoint, pub snapshot_completion_topic: String, pub genesis_completion_topic: String, @@ -100,8 +98,7 @@ impl FetcherConfig { pub fn new(context: Arc>, config: Arc) -> Result { Ok(Self { context, - header_topic: Self::conf(&config, DEFAULT_HEADER_TOPIC), - body_topic: Self::conf(&config, DEFAULT_BODY_TOPIC), + block_topic: Self::conf(&config, DEFAULT_BLOCK_TOPIC), snapshot_completion_topic: Self::conf(&config, DEFAULT_SNAPSHOT_COMPLETION_TOPIC), genesis_completion_topic: Self::conf(&config, DEFAULT_GENESIS_COMPLETION_TOPIC), sync_point: Self::conf_enum::(&config, DEFAULT_SYNC_POINT)?, @@ -124,18 +121,12 @@ impl FetcherConfig { } pub async fn publish_message(cfg: Arc, record: &UpstreamCacheRecord) -> Result<()> { - let header_msg = Arc::new(Message::Cardano(( + let message = Arc::new(Message::Cardano(( record.id.clone(), - CardanoMessage::BlockHeader((*record.hdr).clone()), + CardanoMessage::BlockAvailable((*record.message).clone()), ))); - let body_msg = Arc::new(Message::Cardano(( - record.id.clone(), - CardanoMessage::BlockBody((*record.body).clone()), - ))); - - cfg.context.message_bus.publish(&cfg.header_topic, header_msg).await?; - cfg.context.message_bus.publish(&cfg.body_topic, body_msg).await + cfg.context.message_bus.publish(&cfg.block_topic, message).await } pub async fn peer_connect(cfg: Arc, role: &str) -> Result> { From 73101f060b082d96aacf3bc10605f0eb3b487d60 Mon Sep 17 00:00:00 2001 From: Paul Clark Date: Tue, 21 Oct 2025 11:58:46 +0100 Subject: [PATCH 02/17] Emergency fix of omnibus.toml --- processes/omnibus/omnibus.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/processes/omnibus/omnibus.toml b/processes/omnibus/omnibus.toml index 7ffa0669..5fdc2f0e 100644 --- a/processes/omnibus/omnibus.toml +++ b/processes/omnibus/omnibus.toml @@ -90,7 +90,7 @@ store-spdd-history = false spdd-db-path = "./spdd_db" # Number of epochs to retain in SPDD history # Example: 73 -spdd-retention-epochs = none +spdd-retention-epochs = "none" # Verify against captured CSV verify-pots-file = "../../modules/accounts_state/test-data/pots.mainnet.csv" verify-rewards-files = "../../modules/accounts_state/test-data/rewards.mainnet.{}.csv" From afcc78a7e79f35ec22b563ca7d1176f7785f1890 Mon Sep 17 00:00:00 2001 From: Paul Clark Date: Tue, 21 Oct 2025 14:31:53 +0100 Subject: [PATCH 03/17] New Consensus module, interposed in block flow, but no-op Consensus listens on cardano.block.available, and resends as cardano.block.proposed. Downstreams BlockUnpacker, EpochsState, SPOState now subscribe for cardano.block.proposed. Side-effect: Removed info-level filter from omnibus main to allow debug output again - needs investigation what it was for (ajw) --- Cargo.lock | 14 ++++ Cargo.toml | 1 + modules/block_unpacker/README.md | 6 +- modules/block_unpacker/src/block_unpacker.rs | 2 +- modules/consensus/Cargo.toml | 23 +++++++ modules/consensus/README.md | 39 +++++++++++ modules/consensus/src/consensus.rs | 70 ++++++++++++++++++++ modules/epochs_state/README.md | 7 +- modules/epochs_state/src/epochs_state.rs | 2 +- modules/spo_state/src/spo_state.rs | 2 +- processes/omnibus/Cargo.toml | 1 + processes/omnibus/omnibus.toml | 2 + processes/omnibus/src/main.rs | 9 ++- 13 files changed, 166 insertions(+), 12 deletions(-) create mode 100644 modules/consensus/Cargo.toml create mode 100644 modules/consensus/README.md create mode 100644 modules/consensus/src/consensus.rs diff --git a/Cargo.lock b/Cargo.lock index ab4e2f65..670d844c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -109,6 +109,19 @@ dependencies = [ "tracing", ] +[[package]] +name = "acropolis_module_consensus" +version = "0.1.0" +dependencies = [ + "acropolis_common", + "anyhow", + "caryatid_sdk", + "config", + "pallas", + "tokio", + "tracing", +] + [[package]] name = "acropolis_module_drdd_state" version = "0.1.0" @@ -391,6 +404,7 @@ dependencies = [ "acropolis_module_address_state", "acropolis_module_assets_state", "acropolis_module_block_unpacker", + "acropolis_module_consensus", "acropolis_module_drdd_state", "acropolis_module_drep_state", "acropolis_module_epochs_state", diff --git a/Cargo.toml b/Cargo.toml index f0533b62..5317c1b5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,6 +21,7 @@ members = [ "modules/epochs_state", # Tracks fees and blocks minted and epochs history "modules/accounts_state", # Tracks stake and reward accounts "modules/assets_state", # Tracks native asset mints and burns + "modules/consensus", # Chooses favoured chain across multiple options # Process builds "processes/omnibus", # All-inclusive omnibus process diff --git a/modules/block_unpacker/README.md b/modules/block_unpacker/README.md index aad00a87..bad3e4c1 100644 --- a/modules/block_unpacker/README.md +++ b/modules/block_unpacker/README.md @@ -12,15 +12,15 @@ everything except the section header can be left out. [module.block-unpacker] # Message topics -subscribe-topic = "cardano.block.body" +subscribe-topic = "cardano.block.proposed" publish-topic = "cardano.txs" ``` ## Messages -The block unpacker subscribes for BlockBodyMessages on -`cardano.block.body` (see the [Upstream Chain +The block unpacker subscribes for RawBlockMessages on +`cardano.block.proposed` (see the [Upstream Chain Fetcher](../upstream_chain_fetcher) module for details). It unpacks this into transactions, which it publishes as a single RawTxsMessage on `cardano.txs`, containing the block information and an ordered vector of diff --git a/modules/block_unpacker/src/block_unpacker.rs b/modules/block_unpacker/src/block_unpacker.rs index 2b9f3bd3..098d67b4 100644 --- a/modules/block_unpacker/src/block_unpacker.rs +++ b/modules/block_unpacker/src/block_unpacker.rs @@ -9,7 +9,7 @@ use pallas::ledger::traverse::MultiEraBlock; use std::sync::Arc; use tracing::{debug, error, info, info_span, Instrument}; -const DEFAULT_SUBSCRIBE_TOPIC: &str = "cardano.block.available"; +const DEFAULT_SUBSCRIBE_TOPIC: &str = "cardano.block.proposed"; const DEFAULT_PUBLISH_TOPIC: &str = "cardano.txs"; /// Block unpacker module diff --git a/modules/consensus/Cargo.toml b/modules/consensus/Cargo.toml new file mode 100644 index 00000000..94583c8f --- /dev/null +++ b/modules/consensus/Cargo.toml @@ -0,0 +1,23 @@ +# Acropolis consensus module + +[package] +name = "acropolis_module_consensus" +version = "0.1.0" +edition = "2021" +authors = ["Paul Clark "] +description = "Consensus Caryatid module for Acropolis" +license = "Apache-2.0" + +[dependencies] +acropolis_common = { path = "../../common" } + +caryatid_sdk = { workspace = true } + +anyhow = { workspace = true } +config = { workspace = true } +pallas = { workspace = true } +tokio = { workspace = true } +tracing = { workspace = true } + +[lib] +path = "src/consensus.rs" diff --git a/modules/consensus/README.md b/modules/consensus/README.md new file mode 100644 index 00000000..ec239329 --- /dev/null +++ b/modules/consensus/README.md @@ -0,0 +1,39 @@ +# Consensus module + +The consensus module takes proposed blocks from (optionally multiple) upstream +sources and decides which chain to favour, passing on blocks on the favoured chain +to other validation and storage modules downstream + +## Configuration + +The following is the default configuration - if the defaults are OK, +everything except the section header can be left out. + +```toml +[module.consensus] + +# Message topics +subscribe-blocks-topic = "cardano.block.available" +publish-blocks-topic = "cardano.block.proposed" + +``` + +## Messages + +The consensus module subscribes for RawBlockMessages on +`cardano.block.available`. It uses the consensus rules to +decide which of multiple chains to favour, and sends candidate +blocks on `cardano.block.proposed` to request validation and storage. + +Both input and output are RawBlockMessages: + +```rust +pub struct RawBlockMessage { + /// Header raw data + pub header: Vec, + + /// Body raw data + pub body: Vec, +} +``` + diff --git a/modules/consensus/src/consensus.rs b/modules/consensus/src/consensus.rs new file mode 100644 index 00000000..0239f0ad --- /dev/null +++ b/modules/consensus/src/consensus.rs @@ -0,0 +1,70 @@ +//! Acropolis consensus module for Caryatid +//! Maintains a favoured chain based on offered options from multiple sources + +use acropolis_common::messages::{CardanoMessage, Message}; +use anyhow::Result; +use caryatid_sdk::{module, Context, Module}; +use config::Config; +use std::sync::Arc; +use tracing::{error, info, info_span, Instrument}; + +const DEFAULT_SUBSCRIBE_BLOCKS_TOPIC: &str = "cardano.block.available"; +const DEFAULT_PUBLISH_BLOCKS_TOPIC: &str = "cardano.block.proposed"; + +/// Consensus module +/// Parameterised by the outer message enum used on the bus +#[module( + message_type(Message), + name = "consensus", + description = "Consensus algorithm" +)] +pub struct Consensus; + +impl Consensus { + /// Main init function + pub async fn init(&self, context: Arc>, config: Arc) -> Result<()> { + + // Subscribe for block messages + // Get configuration + let subscribe_blocks_topic = config.get_string("subscribe-blocks-topic") + .unwrap_or(DEFAULT_SUBSCRIBE_BLOCKS_TOPIC.to_string()); + info!("Creating blocks subscriber on '{subscribe_blocks_topic}'"); + + let publish_blocks_topic = config.get_string("publish-blocks topic") + .unwrap_or(DEFAULT_PUBLISH_BLOCKS_TOPIC.to_string()); + info!("Publishing blocks on '{publish_blocks_topic}'"); + + let mut subscription = context.subscribe(&subscribe_blocks_topic).await?; + + // TODO Subscribe for validation errors + // TODO Reject and rollback blocks if validation fails + + context.clone().run(async move { + loop { + let Ok((_, message)) = subscription.read().await else { + return; + }; + match message.as_ref() { + Message::Cardano((block_info, CardanoMessage::BlockAvailable(_block_msg))) => { + let span = info_span!("consensus", block = block_info.number); + + async { + // TODO Actually decide on favoured chain! + context + .message_bus + .publish(&publish_blocks_topic, message.clone()) + .await + .unwrap_or_else(|e| error!("Failed to publish: {e}")); + } + .instrument(span) + .await; + } + + _ => error!("Unexpected message type: {message:?}"), + } + } + }); + + Ok(()) + } +} diff --git a/modules/epochs_state/README.md b/modules/epochs_state/README.md index 27083dd7..2f26616c 100644 --- a/modules/epochs_state/README.md +++ b/modules/epochs_state/README.md @@ -17,7 +17,7 @@ everything except the section header can be left out. [module.epochs-state] # Message topics -subscribe-headers-topic = "cardano.block.headers" +subscribe-blocks-topic = "cardano.block.proposed" subscribe-fees-topic = "cardano.fees" publish-topic = "cardano.epoch.activity" @@ -25,9 +25,8 @@ publish-topic = "cardano.epoch.activity" ## Messages -The epochs state subscribes for BlockHeaderMessages on -`cardano.block.header` (see the [Upstream Chain -Fetcher](../upstream_chain_fetcher) module for details). +The epochs state subscribes for RawBlockMessages on +`cardano.block.proposed` (see the [Consensus](../consensus) module for details). TODO subscription for fees diff --git a/modules/epochs_state/src/epochs_state.rs b/modules/epochs_state/src/epochs_state.rs index 1ef0c29d..82770e9d 100644 --- a/modules/epochs_state/src/epochs_state.rs +++ b/modules/epochs_state/src/epochs_state.rs @@ -34,7 +34,7 @@ const DEFAULT_BOOTSTRAPPED_SUBSCRIBE_TOPIC: (&str, &str) = ( "cardano.sequence.bootstrapped", ); const DEFAULT_BLOCKS_SUBSCRIBE_TOPIC: (&str, &str) = - ("blocks-subscribe-topic", "cardano.block.available"); + ("blocks-subscribe-topic", "cardano.block.proposed"); const DEFAULT_BLOCK_TXS_SUBSCRIBE_TOPIC: (&str, &str) = ("block-txs-subscribe-topic", "cardano.block.txs"); const DEFAULT_PROTOCOL_PARAMETERS_SUBSCRIBE_TOPIC: (&str, &str) = ( diff --git a/modules/spo_state/src/spo_state.rs b/modules/spo_state/src/spo_state.rs index 7e0ac96c..4a874566 100644 --- a/modules/spo_state/src/spo_state.rs +++ b/modules/spo_state/src/spo_state.rs @@ -47,7 +47,7 @@ const DEFAULT_WITHDRAWALS_SUBSCRIBE_TOPIC: (&str, &str) = const DEFAULT_GOVERNANCE_SUBSCRIBE_TOPIC: (&str, &str) = ("governance-subscribe-topic", "cardano.governance"); const DEFAULT_BLOCKS_SUBSCRIBE_TOPIC: (&str, &str) = - ("blocks-subscribe-topic", "cardano.block.available"); + ("blocks-subscribe-topic", "cardano.block.proposed"); const DEFAULT_EPOCH_ACTIVITY_SUBSCRIBE_TOPIC: (&str, &str) = ("epoch-activity-subscribe-topic", "cardano.epoch.activity"); const DEFAULT_SPDD_SUBSCRIBE_TOPIC: (&str, &str) = diff --git a/processes/omnibus/Cargo.toml b/processes/omnibus/Cargo.toml index 16a6f940..31a992d4 100644 --- a/processes/omnibus/Cargo.toml +++ b/processes/omnibus/Cargo.toml @@ -27,6 +27,7 @@ acropolis_module_spdd_state = { path = "../../modules/spdd_state" } acropolis_module_drdd_state = { path = "../../modules/drdd_state" } acropolis_module_assets_state = { path = "../../modules/assets_state" } acropolis_module_address_state = { path = "../../modules/address_state" } +acropolis_module_consensus = { path = "../../modules/consensus" } caryatid_process = { workspace = true } caryatid_sdk = { workspace = true } diff --git a/processes/omnibus/omnibus.toml b/processes/omnibus/omnibus.toml index 5fdc2f0e..f1df01d9 100644 --- a/processes/omnibus/omnibus.toml +++ b/processes/omnibus/omnibus.toml @@ -15,6 +15,8 @@ sync-point = "snapshot" node-address = "backbone.cardano.iog.io:3001" magic-number = 764824073 +[module.consensus] + [module.block-unpacker] [module.rest-blockfrost] diff --git a/processes/omnibus/src/main.rs b/processes/omnibus/src/main.rs index e4b856ee..afba0066 100644 --- a/processes/omnibus/src/main.rs +++ b/processes/omnibus/src/main.rs @@ -13,6 +13,7 @@ use acropolis_module_accounts_state::AccountsState; use acropolis_module_address_state::AddressState; use acropolis_module_assets_state::AssetsState; use acropolis_module_block_unpacker::BlockUnpacker; +use acropolis_module_consensus::Consensus; use acropolis_module_drdd_state::DRDDState; use acropolis_module_drep_state::DRepState; use acropolis_module_epochs_state::EpochsState; @@ -50,8 +51,11 @@ static GLOBAL: Jemalloc = Jemalloc; pub async fn main() -> Result<()> { // Standard logging using RUST_LOG for log levels default to INFO for events only let fmt_layer = fmt::layer() - .with_filter(EnvFilter::from_default_env().add_directive(filter::LevelFilter::INFO.into())) - .with_filter(filter::filter_fn(|meta| meta.is_event())); + .with_filter(EnvFilter::from_default_env()); + + // TODO disabled this filter because it prevents debugging - investigate + //.add_directive(filter::LevelFilter::INFO.into())) + // .with_filter(filter::filter_fn(|meta| meta.is_event())); // Only turn on tracing if some OTEL environment variables exist if std::env::vars().any(|(name, _)| name.starts_with("OTEL_")) { @@ -105,6 +109,7 @@ pub async fn main() -> Result<()> { BlockfrostREST::register(&mut process); SPDDState::register(&mut process); DRDDState::register(&mut process); + Consensus::register(&mut process); Clock::::register(&mut process); RESTServer::::register(&mut process); From 639b133c9aeac810ef70eb01ce5dd333ffd39e8e Mon Sep 17 00:00:00 2001 From: Paul Clark Date: Tue, 21 Oct 2025 14:49:44 +0100 Subject: [PATCH 04/17] Add skeleton validation result --- Cargo.lock | 37 +++++++++++++++++----------------- common/Cargo.toml | 1 + common/src/lib.rs | 1 + common/src/validation.rs | 43 ++++++++++++++++++++++++++++++++++++++++ 4 files changed, 64 insertions(+), 18 deletions(-) create mode 100644 common/src/validation.rs diff --git a/Cargo.lock b/Cargo.lock index 670d844c..b0657443 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -33,6 +33,7 @@ dependencies = [ "serde", "serde_json", "serde_with 3.14.1", + "thiserror 2.0.17", "tokio", "tracing", ] @@ -662,7 +663,7 @@ dependencies = [ "nom 7.1.3", "num-traits", "rusticata-macros", - "thiserror 2.0.16", + "thiserror 2.0.17", "time", ] @@ -3401,7 +3402,7 @@ dependencies = [ "serde_json", "sha2", "slog", - "thiserror 2.0.16", + "thiserror 2.0.17", "tokio", "walkdir", ] @@ -3429,7 +3430,7 @@ dependencies = [ "slog", "strum", "tar", - "thiserror 2.0.16", + "thiserror 2.0.17", "tokio", "uuid", "zstd", @@ -3468,7 +3469,7 @@ dependencies = [ "sha2", "slog", "strum", - "thiserror 2.0.16", + "thiserror 2.0.17", "tokio", "typetag", "walkdir", @@ -3491,7 +3492,7 @@ dependencies = [ "rayon", "rug", "serde", - "thiserror 2.0.16", + "thiserror 2.0.17", ] [[package]] @@ -3728,7 +3729,7 @@ dependencies = [ "futures-sink", "js-sys", "pin-project-lite", - "thiserror 2.0.16", + "thiserror 2.0.17", "tracing", ] @@ -3758,7 +3759,7 @@ dependencies = [ "opentelemetry_sdk", "prost", "reqwest 0.12.23", - "thiserror 2.0.16", + "thiserror 2.0.17", "tokio", "tonic 0.13.1", "tracing", @@ -3800,7 +3801,7 @@ dependencies = [ "percent-encoding", "rand 0.9.2", "serde_json", - "thiserror 2.0.16", + "thiserror 2.0.17", "tokio", "tokio-stream", ] @@ -3833,7 +3834,7 @@ dependencies = [ "rc2", "sha1", "sha2", - "thiserror 2.0.16", + "thiserror 2.0.17", "x509-parser", ] @@ -4198,7 +4199,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "21e0a3a33733faeaf8651dfee72dd0f388f0c8e5ad496a3478fa5a922f49cfa8" dependencies = [ "memchr", - "thiserror 2.0.16", + "thiserror 2.0.17", "ucd-trie", ] @@ -4524,7 +4525,7 @@ dependencies = [ "rustc-hash", "rustls", "socket2 0.6.0", - "thiserror 2.0.16", + "thiserror 2.0.17", "tokio", "tracing", "web-time", @@ -4545,7 +4546,7 @@ dependencies = [ "rustls", "rustls-pki-types", "slab", - "thiserror 2.0.16", + "thiserror 2.0.17", "tinyvec", "tracing", "web-time", @@ -5746,11 +5747,11 @@ dependencies = [ [[package]] name = "thiserror" -version = "2.0.16" +version = "2.0.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3467d614147380f2e4e374161426ff399c91084acd2363eaf549172b3d5e60c0" +checksum = "f63587ca0f12b72a0600bcba1d40081f830876000bb46dd2337a3051618f4fc8" dependencies = [ - "thiserror-impl 2.0.16", + "thiserror-impl 2.0.17", ] [[package]] @@ -5766,9 +5767,9 @@ dependencies = [ [[package]] name = "thiserror-impl" -version = "2.0.16" +version = "2.0.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6c5e1be1c48b9172ee610da68fd9cd2770e7a4056cb3fc98710ee6906f0c7960" +checksum = "3ff15c8ecd7de3849db632e14d18d2571fa09dfc5ed93479bc4485c7a517c913" dependencies = [ "proc-macro2", "quote", @@ -6965,7 +6966,7 @@ dependencies = [ "nom 7.1.3", "oid-registry", "rusticata-macros", - "thiserror 2.0.16", + "thiserror 2.0.17", "time", ] diff --git a/common/Cargo.toml b/common/Cargo.toml index 3cb24e0f..b5d3f101 100644 --- a/common/Cargo.toml +++ b/common/Cargo.toml @@ -39,6 +39,7 @@ dashmap = { workspace = true } rayon = "1.11.0" cryptoxide = "0.5.1" blake2 = "0.10.6" +thiserror = "2.0.17" [lib] crate-type = ["rlib"] diff --git a/common/src/lib.rs b/common/src/lib.rs index cc564b42..9399514a 100644 --- a/common/src/lib.rs +++ b/common/src/lib.rs @@ -17,6 +17,7 @@ pub mod serialization; pub mod stake_addresses; pub mod state_history; pub mod types; +pub mod validation; // Flattened re-exports pub use self::address::*; diff --git a/common/src/validation.rs b/common/src/validation.rs new file mode 100644 index 00000000..b352d50b --- /dev/null +++ b/common/src/validation.rs @@ -0,0 +1,43 @@ +//! Validation results for Acropolis consensus + +// We don't use these types in the acropolis_common crate itself +#![allow(dead_code)] + +use thiserror::Error; + +/// Validation error +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, Error)] +pub enum ValidationError { + #[error("VRF failure")] + BadVRF, + + #[error("KES failure")] + BadKES, + + #[error("Doubly spent UTXO: {0}")] + DoubleSpendUTXO(String), +} + +/// Validation status +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub enum ValidationStatus { + + // All good + Go, + + // Error + Error(ValidationError), +} + +/// Result of validation of a block +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct ValidationResult { + + // Block this applies to (safety check) + pub block_number: u64, + + // Status + pub status: ValidationStatus, +} + + From e7252943183cccb4c061ef6502d86130f953b467 Mon Sep 17 00:00:00 2001 From: Paul Clark Date: Tue, 21 Oct 2025 14:52:47 +0100 Subject: [PATCH 05/17] Use correct NASA terminology! --- common/src/validation.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/common/src/validation.rs b/common/src/validation.rs index b352d50b..bd3916dd 100644 --- a/common/src/validation.rs +++ b/common/src/validation.rs @@ -26,7 +26,7 @@ pub enum ValidationStatus { Go, // Error - Error(ValidationError), + NoGo(ValidationError), } /// Result of validation of a block From 2cd5191b5d97388a5a69b120d058b6343a51458b Mon Sep 17 00:00:00 2001 From: Paul Clark Date: Tue, 21 Oct 2025 17:24:33 +0100 Subject: [PATCH 06/17] No/NoGo validation in consensus Reads BlockValidation messages from all configured validators, fails the block if any say NoGo. All must respond for it to continue. Note: Validation failure is only logged so far, we don't have multiple upstreams to run consensus on. Note: There are no configured topics yet because no validators yet issue messages! --- Cargo.lock | 1 + common/src/messages.rs | 2 + common/src/validation.rs | 13 ------ modules/consensus/Cargo.toml | 1 + modules/consensus/README.md | 29 ++++++++++++-- modules/consensus/src/consensus.rs | 63 ++++++++++++++++++++++++++++-- processes/omnibus/omnibus.toml | 7 ++++ 7 files changed, 96 insertions(+), 20 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b0657443..9d46a86d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -118,6 +118,7 @@ dependencies = [ "anyhow", "caryatid_sdk", "config", + "futures", "pallas", "tokio", "tracing", diff --git a/common/src/messages.rs b/common/src/messages.rs index 101830fe..1c12c52c 100644 --- a/common/src/messages.rs +++ b/common/src/messages.rs @@ -26,6 +26,7 @@ use crate::queries::{ }; use crate::types::*; +use crate::validation::ValidationStatus; // Caryatid core messages which we re-export pub use caryatid_module_clock::messages::ClockTickMessage; @@ -277,6 +278,7 @@ pub struct SPOStateMessage { #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub enum CardanoMessage { BlockAvailable(RawBlockMessage), // Block body available + BlockValidation(ValidationStatus), // Result of a block validation SnapshotComplete, // Mithril snapshot loaded ReceivedTxs(RawTxsMessage), // Transaction available GenesisComplete(GenesisCompleteMessage), // Genesis UTXOs done + genesis params diff --git a/common/src/validation.rs b/common/src/validation.rs index bd3916dd..240968ca 100644 --- a/common/src/validation.rs +++ b/common/src/validation.rs @@ -28,16 +28,3 @@ pub enum ValidationStatus { // Error NoGo(ValidationError), } - -/// Result of validation of a block -#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] -pub struct ValidationResult { - - // Block this applies to (safety check) - pub block_number: u64, - - // Status - pub status: ValidationStatus, -} - - diff --git a/modules/consensus/Cargo.toml b/modules/consensus/Cargo.toml index 94583c8f..2cd73ac6 100644 --- a/modules/consensus/Cargo.toml +++ b/modules/consensus/Cargo.toml @@ -18,6 +18,7 @@ config = { workspace = true } pallas = { workspace = true } tokio = { workspace = true } tracing = { workspace = true } +futures = "0.3.31" [lib] path = "src/consensus.rs" diff --git a/modules/consensus/README.md b/modules/consensus/README.md index ec239329..7b540401 100644 --- a/modules/consensus/README.md +++ b/modules/consensus/README.md @@ -1,13 +1,14 @@ # Consensus module -The consensus module takes proposed blocks from (optionally multiple) upstream -sources and decides which chain to favour, passing on blocks on the favoured chain +The consensus module takes proposed blocks from a (later, multiple) upstream +source and decides which chain to favour, passing on blocks on the favoured chain to other validation and storage modules downstream ## Configuration -The following is the default configuration - if the defaults are OK, -everything except the section header can be left out. +The following is the default configuration - these are the default +topics so they can be left out if they are OK. The validators *must* +be configured - if empty, no validation is performed ```toml [module.consensus] @@ -16,8 +17,28 @@ everything except the section header can be left out. subscribe-blocks-topic = "cardano.block.available" publish-blocks-topic = "cardano.block.proposed" +# Validation result topics +validators = [ + "cardano.validation.vrf", + "cardano.validation.kes", + "cardano.validation.utxo" + ... +] + ``` +## Validation + +The consensus module passes on blocks it receives from upstream (currently only a +single source) and sends them out as 'proposed' blocks for validation. It then listens +on all of the `validators` topics for BlockValidation messages, which give a Go / NoGo +for the block. The model is a NASA flight control desk, and like thre, a single NoGo +is enough to stop the block. + +At the moment the module simply logs the validation failure. Once it is actually operating +consensus across multiple sources, it will use this and the length of chain to choose the best +chain. + ## Messages The consensus module subscribes for RawBlockMessages on diff --git a/modules/consensus/src/consensus.rs b/modules/consensus/src/consensus.rs index 0239f0ad..5fa21ea6 100644 --- a/modules/consensus/src/consensus.rs +++ b/modules/consensus/src/consensus.rs @@ -1,12 +1,16 @@ //! Acropolis consensus module for Caryatid //! Maintains a favoured chain based on offered options from multiple sources -use acropolis_common::messages::{CardanoMessage, Message}; +use acropolis_common::{ + messages::{CardanoMessage, Message}, + validation::ValidationStatus +}; use anyhow::Result; use caryatid_sdk::{module, Context, Module}; use config::Config; use std::sync::Arc; use tracing::{error, info, info_span, Instrument}; +use futures::future::try_join_all; const DEFAULT_SUBSCRIBE_BLOCKS_TOPIC: &str = "cardano.block.available"; const DEFAULT_PUBLISH_BLOCKS_TOPIC: &str = "cardano.block.proposed"; @@ -34,10 +38,19 @@ impl Consensus { .unwrap_or(DEFAULT_PUBLISH_BLOCKS_TOPIC.to_string()); info!("Publishing blocks on '{publish_blocks_topic}'"); + let validator_topics: Vec = config.get::>("validators") + .unwrap_or_default(); + for topic in &validator_topics { + info!("Validator: {topic}"); + } + + // Subscribe for incoming blocks let mut subscription = context.subscribe(&subscribe_blocks_topic).await?; - // TODO Subscribe for validation errors - // TODO Reject and rollback blocks if validation fails + // Subscribe all the validators + let mut validator_subscriptions: Vec<_> = try_join_all( + validator_topics.iter().map(|topic| context.subscribe(topic)) + ).await?; context.clone().run(async move { loop { @@ -50,11 +63,55 @@ impl Consensus { async { // TODO Actually decide on favoured chain! + + // Send to downstreams to validate context .message_bus .publish(&publish_blocks_topic, message.clone()) .await .unwrap_or_else(|e| error!("Failed to publish: {e}")); + + // Read validation responses from all validators in parallel + let results: Vec<_> = try_join_all( + validator_subscriptions + .iter_mut() + .map(|s| s.read())) + .await + .unwrap_or_else(|e| { + error!("Failed to read validations: {e}"); + vec![] + }); + + // All must be positive! + let all_say_go = results + .iter() + .fold(true, |all_ok, (_topic, msg)| { + match msg.as_ref() { + Message::Cardano( + (block_info, + CardanoMessage::BlockValidation(status))) => { + match status { + ValidationStatus::Go => all_ok && true, + ValidationStatus::NoGo(err) => { + error!(block = block_info.number, + ?err, + "Validation failure"); + false + } + } + } + + _ => { + error!("Unexpected validation message type: {msg:?}"); + false + } + } + }); + + if !all_say_go { + error!(block = block_info.number, "Validation rejected block"); + // TODO Consequences: rollback, blacklist source + } } .instrument(span) .await; diff --git a/processes/omnibus/omnibus.toml b/processes/omnibus/omnibus.toml index f1df01d9..8cba0ffb 100644 --- a/processes/omnibus/omnibus.toml +++ b/processes/omnibus/omnibus.toml @@ -16,6 +16,13 @@ node-address = "backbone.cardano.iog.io:3001" magic-number = 764824073 [module.consensus] +# List of validation result topics to listen on +validators = [ +# Add these once they are actually being sent +# "cardano.validation.vrf", +# "cardano.validation.kes", +# "cardano.validation.utxo" +] [module.block-unpacker] From 850cd19f383e1944343d14023f5e2c84935e2213 Mon Sep 17 00:00:00 2001 From: Paul Clark Date: Tue, 21 Oct 2025 17:40:56 +0100 Subject: [PATCH 07/17] cargo fmt --- common/src/validation.rs | 1 - modules/consensus/src/consensus.rs | 79 +++++++++---------- .../src/mithril_snapshot_fetcher.rs | 6 +- .../src/body_fetcher.rs | 5 +- .../src/upstream_cache.rs | 10 +-- processes/omnibus/src/main.rs | 3 +- 6 files changed, 46 insertions(+), 58 deletions(-) diff --git a/common/src/validation.rs b/common/src/validation.rs index 240968ca..4933d153 100644 --- a/common/src/validation.rs +++ b/common/src/validation.rs @@ -21,7 +21,6 @@ pub enum ValidationError { /// Validation status #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub enum ValidationStatus { - // All good Go, diff --git a/modules/consensus/src/consensus.rs b/modules/consensus/src/consensus.rs index 5fa21ea6..e2655318 100644 --- a/modules/consensus/src/consensus.rs +++ b/modules/consensus/src/consensus.rs @@ -3,14 +3,14 @@ use acropolis_common::{ messages::{CardanoMessage, Message}, - validation::ValidationStatus + validation::ValidationStatus, }; use anyhow::Result; use caryatid_sdk::{module, Context, Module}; use config::Config; +use futures::future::try_join_all; use std::sync::Arc; use tracing::{error, info, info_span, Instrument}; -use futures::future::try_join_all; const DEFAULT_SUBSCRIBE_BLOCKS_TOPIC: &str = "cardano.block.available"; const DEFAULT_PUBLISH_BLOCKS_TOPIC: &str = "cardano.block.proposed"; @@ -27,19 +27,20 @@ pub struct Consensus; impl Consensus { /// Main init function pub async fn init(&self, context: Arc>, config: Arc) -> Result<()> { - // Subscribe for block messages // Get configuration - let subscribe_blocks_topic = config.get_string("subscribe-blocks-topic") + let subscribe_blocks_topic = config + .get_string("subscribe-blocks-topic") .unwrap_or(DEFAULT_SUBSCRIBE_BLOCKS_TOPIC.to_string()); info!("Creating blocks subscriber on '{subscribe_blocks_topic}'"); - let publish_blocks_topic = config.get_string("publish-blocks topic") + let publish_blocks_topic = config + .get_string("publish-blocks topic") .unwrap_or(DEFAULT_PUBLISH_BLOCKS_TOPIC.to_string()); info!("Publishing blocks on '{publish_blocks_topic}'"); - let validator_topics: Vec = config.get::>("validators") - .unwrap_or_default(); + let validator_topics: Vec = + config.get::>("validators").unwrap_or_default(); for topic in &validator_topics { info!("Validator: {topic}"); } @@ -48,9 +49,8 @@ impl Consensus { let mut subscription = context.subscribe(&subscribe_blocks_topic).await?; // Subscribe all the validators - let mut validator_subscriptions: Vec<_> = try_join_all( - validator_topics.iter().map(|topic| context.subscribe(topic)) - ).await?; + let mut validator_subscriptions: Vec<_> = + try_join_all(validator_topics.iter().map(|topic| context.subscribe(topic))).await?; context.clone().run(async move { loop { @@ -72,41 +72,38 @@ impl Consensus { .unwrap_or_else(|e| error!("Failed to publish: {e}")); // Read validation responses from all validators in parallel - let results: Vec<_> = try_join_all( - validator_subscriptions - .iter_mut() - .map(|s| s.read())) - .await - .unwrap_or_else(|e| { - error!("Failed to read validations: {e}"); - vec![] - }); + let results: Vec<_> = + try_join_all(validator_subscriptions.iter_mut().map(|s| s.read())) + .await + .unwrap_or_else(|e| { + error!("Failed to read validations: {e}"); + vec![] + }); // All must be positive! - let all_say_go = results - .iter() - .fold(true, |all_ok, (_topic, msg)| { - match msg.as_ref() { - Message::Cardano( - (block_info, - CardanoMessage::BlockValidation(status))) => { - match status { - ValidationStatus::Go => all_ok && true, - ValidationStatus::NoGo(err) => { - error!(block = block_info.number, - ?err, - "Validation failure"); - false - } - } - } - - _ => { - error!("Unexpected validation message type: {msg:?}"); + let all_say_go = results.iter().fold(true, |all_ok, (_topic, msg)| { + match msg.as_ref() { + Message::Cardano(( + block_info, + CardanoMessage::BlockValidation(status), + )) => match status { + ValidationStatus::Go => all_ok && true, + ValidationStatus::NoGo(err) => { + error!( + block = block_info.number, + ?err, + "Validation failure" + ); false } + }, + + _ => { + error!("Unexpected validation message type: {msg:?}"); + false } - }); + } + }); if !all_say_go { error!(block = block_info.number, "Validation rejected block"); @@ -114,7 +111,7 @@ impl Consensus { } } .instrument(span) - .await; + .await; } _ => error!("Unexpected message type: {message:?}"), diff --git a/modules/mithril_snapshot_fetcher/src/mithril_snapshot_fetcher.rs b/modules/mithril_snapshot_fetcher/src/mithril_snapshot_fetcher.rs index 34c5e408..1b26f7c4 100644 --- a/modules/mithril_snapshot_fetcher/src/mithril_snapshot_fetcher.rs +++ b/modules/mithril_snapshot_fetcher/src/mithril_snapshot_fetcher.rs @@ -3,7 +3,7 @@ use acropolis_common::{ genesis_values::GenesisValues, - messages::{RawBlockMessage, CardanoMessage, Message}, + messages::{CardanoMessage, Message, RawBlockMessage}, BlockInfo, BlockStatus, Era, }; use anyhow::{anyhow, bail, Result}; @@ -354,7 +354,9 @@ impl MithrilSnapshotFetcher { CardanoMessage::BlockAvailable(message), )); - context.message_bus.publish(&block_topic, Arc::new(message_enum)) + context + .message_bus + .publish(&block_topic, Arc::new(message_enum)) .await .unwrap_or_else(|e| error!("Failed to publish block message: {e}")); diff --git a/modules/upstream_chain_fetcher/src/body_fetcher.rs b/modules/upstream_chain_fetcher/src/body_fetcher.rs index bdf8cd5e..3fce888c 100644 --- a/modules/upstream_chain_fetcher/src/body_fetcher.rs +++ b/modules/upstream_chain_fetcher/src/body_fetcher.rs @@ -1,10 +1,7 @@ //! Acropolis Miniprotocols module for Caryatid //! Multi-connection, block body fetching part of the client (in separate thread). -use acropolis_common::{ - messages::RawBlockMessage, - BlockInfo, BlockStatus, Era, -}; +use acropolis_common::{messages::RawBlockMessage, BlockInfo, BlockStatus, Era}; use anyhow::{bail, Result}; use crossbeam::channel::{Receiver, TryRecvError}; use pallas::{ diff --git a/modules/upstream_chain_fetcher/src/upstream_cache.rs b/modules/upstream_chain_fetcher/src/upstream_cache.rs index 86c39eed..6012cc70 100644 --- a/modules/upstream_chain_fetcher/src/upstream_cache.rs +++ b/modules/upstream_chain_fetcher/src/upstream_cache.rs @@ -1,7 +1,4 @@ -use acropolis_common::{ - messages::RawBlockMessage, - BlockInfo, -}; +use acropolis_common::{messages::RawBlockMessage, BlockInfo}; use anyhow::{anyhow, bail, Result}; use std::{ fs::File, @@ -169,10 +166,7 @@ impl Storage for FileStorage { #[cfg(test)] mod test { use crate::upstream_cache::{Storage, UpstreamCacheImpl, UpstreamCacheRecord}; - use acropolis_common::{ - messages::RawBlockMessage, - BlockHash, BlockInfo, BlockStatus, Era, - }; + use acropolis_common::{messages::RawBlockMessage, BlockHash, BlockInfo, BlockStatus, Era}; use anyhow::Result; use std::{collections::HashMap, sync::Arc}; diff --git a/processes/omnibus/src/main.rs b/processes/omnibus/src/main.rs index afba0066..ca5ab52b 100644 --- a/processes/omnibus/src/main.rs +++ b/processes/omnibus/src/main.rs @@ -50,8 +50,7 @@ static GLOBAL: Jemalloc = Jemalloc; #[tokio::main] pub async fn main() -> Result<()> { // Standard logging using RUST_LOG for log levels default to INFO for events only - let fmt_layer = fmt::layer() - .with_filter(EnvFilter::from_default_env()); + let fmt_layer = fmt::layer().with_filter(EnvFilter::from_default_env()); // TODO disabled this filter because it prevents debugging - investigate //.add_directive(filter::LevelFilter::INFO.into())) From 561e57d263a7292b8dea857bbced5984a0dcc0b2 Mon Sep 17 00:00:00 2001 From: Paul Clark Date: Tue, 21 Oct 2025 18:16:00 +0100 Subject: [PATCH 08/17] Typo --- modules/consensus/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modules/consensus/README.md b/modules/consensus/README.md index 7b540401..5ec65fd5 100644 --- a/modules/consensus/README.md +++ b/modules/consensus/README.md @@ -32,7 +32,7 @@ validators = [ The consensus module passes on blocks it receives from upstream (currently only a single source) and sends them out as 'proposed' blocks for validation. It then listens on all of the `validators` topics for BlockValidation messages, which give a Go / NoGo -for the block. The model is a NASA flight control desk, and like thre, a single NoGo +for the block. The model is a NASA flight control desk, and like there, a single NoGo is enough to stop the block. At the moment the module simply logs the validation failure. Once it is actually operating From 404bffcab11021c6687c089339156ee6623a78b8 Mon Sep 17 00:00:00 2001 From: Paul Clark Date: Wed, 22 Oct 2025 11:41:59 +0100 Subject: [PATCH 09/17] Fix typo in config property Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- modules/consensus/src/consensus.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modules/consensus/src/consensus.rs b/modules/consensus/src/consensus.rs index e2655318..bbf67b4b 100644 --- a/modules/consensus/src/consensus.rs +++ b/modules/consensus/src/consensus.rs @@ -35,7 +35,7 @@ impl Consensus { info!("Creating blocks subscriber on '{subscribe_blocks_topic}'"); let publish_blocks_topic = config - .get_string("publish-blocks topic") + .get_string("publish-blocks-topic") .unwrap_or(DEFAULT_PUBLISH_BLOCKS_TOPIC.to_string()); info!("Publishing blocks on '{publish_blocks_topic}'"); From b6dbe762e2081cbc8454f3c6f073124c93a0b5a3 Mon Sep 17 00:00:00 2001 From: Paul Clark Date: Wed, 22 Oct 2025 11:42:28 +0100 Subject: [PATCH 10/17] Comment /// fix Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- common/src/validation.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/common/src/validation.rs b/common/src/validation.rs index 4933d153..8d7349b1 100644 --- a/common/src/validation.rs +++ b/common/src/validation.rs @@ -21,7 +21,7 @@ pub enum ValidationError { /// Validation status #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub enum ValidationStatus { - // All good + /// All good Go, // Error From 99d9514a940cb6d41b9deee0b165ff0fa8931133 Mon Sep 17 00:00:00 2001 From: Paul Clark Date: Wed, 22 Oct 2025 11:42:49 +0100 Subject: [PATCH 11/17] Comment /// fix Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- common/src/validation.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/common/src/validation.rs b/common/src/validation.rs index 8d7349b1..1501af67 100644 --- a/common/src/validation.rs +++ b/common/src/validation.rs @@ -24,6 +24,6 @@ pub enum ValidationStatus { /// All good Go, - // Error + /// Error NoGo(ValidationError), } From 6d37a744b937bbe30974349fd399aa3c61d5d0b3 Mon Sep 17 00:00:00 2001 From: Paul Clark Date: Wed, 22 Oct 2025 11:49:02 +0100 Subject: [PATCH 12/17] Fix confusing comment re downstreams --- modules/consensus/src/consensus.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modules/consensus/src/consensus.rs b/modules/consensus/src/consensus.rs index bbf67b4b..3cf5aec9 100644 --- a/modules/consensus/src/consensus.rs +++ b/modules/consensus/src/consensus.rs @@ -64,7 +64,7 @@ impl Consensus { async { // TODO Actually decide on favoured chain! - // Send to downstreams to validate + // Send to all validators and state modules context .message_bus .publish(&publish_blocks_topic, message.clone()) From d236fea552e794f1b9542912e0b861fd1742a25e Mon Sep 17 00:00:00 2001 From: Paul Clark Date: Wed, 22 Oct 2025 11:49:16 +0100 Subject: [PATCH 13/17] Set spdd-retention-epochs to 0 by default --- processes/omnibus/omnibus.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/processes/omnibus/omnibus.toml b/processes/omnibus/omnibus.toml index 8cba0ffb..23d816d3 100644 --- a/processes/omnibus/omnibus.toml +++ b/processes/omnibus/omnibus.toml @@ -99,7 +99,7 @@ store-spdd-history = false spdd-db-path = "./spdd_db" # Number of epochs to retain in SPDD history # Example: 73 -spdd-retention-epochs = "none" +spdd-retention-epochs = 0 # Verify against captured CSV verify-pots-file = "../../modules/accounts_state/test-data/pots.mainnet.csv" verify-rewards-files = "../../modules/accounts_state/test-data/rewards.mainnet.{}.csv" From 0d7697e12698e46b5d0f6cfb01ced37525f28391 Mon Sep 17 00:00:00 2001 From: Paul Clark Date: Wed, 22 Oct 2025 14:22:16 +0100 Subject: [PATCH 14/17] cargo fmt --- modules/upstream_chain_fetcher/src/body_fetcher.rs | 5 +---- processes/omnibus/src/main.rs | 2 +- 2 files changed, 2 insertions(+), 5 deletions(-) diff --git a/modules/upstream_chain_fetcher/src/body_fetcher.rs b/modules/upstream_chain_fetcher/src/body_fetcher.rs index 058e9cb8..038d8497 100644 --- a/modules/upstream_chain_fetcher/src/body_fetcher.rs +++ b/modules/upstream_chain_fetcher/src/body_fetcher.rs @@ -1,10 +1,7 @@ //! Acropolis Miniprotocols module for Caryatid //! Multi-connection, block body fetching part of the client (in separate thread). -use acropolis_common::{ - messages::RawBlockMessage, - BlockHash, BlockInfo, BlockStatus, Era, -}; +use acropolis_common::{messages::RawBlockMessage, BlockHash, BlockInfo, BlockStatus, Era}; use anyhow::{bail, Result}; use crossbeam::channel::{Receiver, TryRecvError}; use pallas::{ diff --git a/processes/omnibus/src/main.rs b/processes/omnibus/src/main.rs index c2f907c3..bfd4e0a9 100644 --- a/processes/omnibus/src/main.rs +++ b/processes/omnibus/src/main.rs @@ -13,8 +13,8 @@ use acropolis_module_accounts_state::AccountsState; use acropolis_module_address_state::AddressState; use acropolis_module_assets_state::AssetsState; use acropolis_module_block_unpacker::BlockUnpacker; -use acropolis_module_consensus::Consensus; use acropolis_module_chain_store::ChainStore; +use acropolis_module_consensus::Consensus; use acropolis_module_drdd_state::DRDDState; use acropolis_module_drep_state::DRepState; use acropolis_module_epochs_state::EpochsState; From e045a64bcb549e53943a1347bc0de31d0fb62bc0 Mon Sep 17 00:00:00 2001 From: Paul Clark Date: Thu, 23 Oct 2025 13:23:16 +0100 Subject: [PATCH 15/17] Fix publication topic for Mithril fetcher, log it Routes blocks to consensus --- .../src/mithril_snapshot_fetcher.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/modules/mithril_snapshot_fetcher/src/mithril_snapshot_fetcher.rs b/modules/mithril_snapshot_fetcher/src/mithril_snapshot_fetcher.rs index 2b00904e..97061df4 100644 --- a/modules/mithril_snapshot_fetcher/src/mithril_snapshot_fetcher.rs +++ b/modules/mithril_snapshot_fetcher/src/mithril_snapshot_fetcher.rs @@ -31,7 +31,7 @@ mod pause; use pause::PauseType; const DEFAULT_STARTUP_TOPIC: &str = "cardano.sequence.bootstrapped"; -const DEFAULT_BLOCK_TOPIC: &str = "cardano.block.proposed"; +const DEFAULT_BLOCK_TOPIC: &str = "cardano.block.available"; const DEFAULT_COMPLETION_TOPIC: &str = "cardano.snapshot.complete"; const DEFAULT_AGGREGATOR_URL: &str = @@ -239,8 +239,12 @@ impl MithrilSnapshotFetcher { ) -> Result<()> { let block_topic = config.get_string("block-topic").unwrap_or(DEFAULT_BLOCK_TOPIC.to_string()); + info!("Publishing blocks on '{block_topic}'"); + let completion_topic = config.get_string("completion-topic").unwrap_or(DEFAULT_COMPLETION_TOPIC.to_string()); + info!("Publishing completion on '{completion_topic}'"); + let directory = config.get_string("directory").unwrap_or(DEFAULT_DIRECTORY.to_string()); let mut pause_constraint = PauseType::from_config(&config, DEFAULT_PAUSE).unwrap_or(PauseType::NoPause); From 0e380768c738b566ca6231516663916cbe1ae95b Mon Sep 17 00:00:00 2001 From: Paul Clark Date: Thu, 23 Oct 2025 13:23:54 +0100 Subject: [PATCH 16/17] Add timeout to consensus validation reads Note: Triggers a Caryatid bug where we don't handle timeouts on the future which produces a panic in select_all() - should be fine once this is fixed (and this is a Black Swan safety catch anyway) --- modules/consensus/src/consensus.rs | 80 +++++++++++++++++++----------- 1 file changed, 50 insertions(+), 30 deletions(-) diff --git a/modules/consensus/src/consensus.rs b/modules/consensus/src/consensus.rs index 3cf5aec9..8b5f1445 100644 --- a/modules/consensus/src/consensus.rs +++ b/modules/consensus/src/consensus.rs @@ -10,10 +10,13 @@ use caryatid_sdk::{module, Context, Module}; use config::Config; use futures::future::try_join_all; use std::sync::Arc; +use std::time::Duration; +use tokio::time::timeout; use tracing::{error, info, info_span, Instrument}; const DEFAULT_SUBSCRIBE_BLOCKS_TOPIC: &str = "cardano.block.available"; const DEFAULT_PUBLISH_BLOCKS_TOPIC: &str = "cardano.block.proposed"; +const DEFAULT_VALIDATION_TIMEOUT: i64 = 60; // seconds /// Consensus module /// Parameterised by the outer message enum used on the bus @@ -45,6 +48,11 @@ impl Consensus { info!("Validator: {topic}"); } + let validation_timeout = Duration::from_secs( + config.get_int("validation-timeout").unwrap_or(DEFAULT_VALIDATION_TIMEOUT) as u64, + ); + info!("Validation timeout {validation_timeout:?}"); + // Subscribe for incoming blocks let mut subscription = context.subscribe(&subscribe_blocks_topic).await?; @@ -55,6 +63,7 @@ impl Consensus { context.clone().run(async move { loop { let Ok((_, message)) = subscription.read().await else { + error!("Block message read failed"); return; }; match message.as_ref() { @@ -72,38 +81,49 @@ impl Consensus { .unwrap_or_else(|e| error!("Failed to publish: {e}")); // Read validation responses from all validators in parallel - let results: Vec<_> = - try_join_all(validator_subscriptions.iter_mut().map(|s| s.read())) - .await - .unwrap_or_else(|e| { - error!("Failed to read validations: {e}"); - vec![] - }); - - // All must be positive! - let all_say_go = results.iter().fold(true, |all_ok, (_topic, msg)| { - match msg.as_ref() { - Message::Cardano(( - block_info, - CardanoMessage::BlockValidation(status), - )) => match status { - ValidationStatus::Go => all_ok && true, - ValidationStatus::NoGo(err) => { - error!( - block = block_info.number, - ?err, - "Validation failure" - ); - false + // and check they are all positive, with a safety timeout + let all_say_go = match timeout( + validation_timeout, + try_join_all(validator_subscriptions.iter_mut().map(|s| s.read())), + ) + .await + { + Ok(Ok(results)) => { + results.iter().fold(true, |all_ok, (_topic, msg)| { + match msg.as_ref() { + Message::Cardano(( + block_info, + CardanoMessage::BlockValidation(status), + )) => match status { + ValidationStatus::Go => all_ok && true, + ValidationStatus::NoGo(err) => { + error!( + block = block_info.number, + ?err, + "Validation failure" + ); + false + } + }, + + _ => { + error!( + "Unexpected validation message type: {msg:?}" + ); + false + } } - }, - - _ => { - error!("Unexpected validation message type: {msg:?}"); - false - } + }) + } + Ok(Err(e)) => { + error!("Failed to read validations: {e}"); + false + } + Err(_) => { + error!("Timeout waiting for validation responses"); + false } - }); + }; if !all_say_go { error!(block = block_info.number, "Validation rejected block"); From 231f0cad63fb42a946ecc23a8991a9739d8815b2 Mon Sep 17 00:00:00 2001 From: Paul Clark Date: Thu, 23 Oct 2025 13:29:55 +0100 Subject: [PATCH 17/17] Restore fmt filtering Still needs fixing to allow DEBUG but left for future PR --- processes/omnibus/src/main.rs | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/processes/omnibus/src/main.rs b/processes/omnibus/src/main.rs index c5382f51..74735165 100644 --- a/processes/omnibus/src/main.rs +++ b/processes/omnibus/src/main.rs @@ -61,11 +61,9 @@ pub async fn main() -> Result<()> { let args = ::parse(); // Standard logging using RUST_LOG for log levels default to INFO for events only - let fmt_layer = fmt::layer().with_filter(EnvFilter::from_default_env()); - - // TODO disabled this filter because it prevents debugging - investigate - //.add_directive(filter::LevelFilter::INFO.into())) - // .with_filter(filter::filter_fn(|meta| meta.is_event())); + let fmt_layer = fmt::layer() + .with_filter(EnvFilter::from_default_env().add_directive(filter::LevelFilter::INFO.into())) + .with_filter(filter::filter_fn(|meta| meta.is_event())); // Only turn on tracing if some OTEL environment variables exist if std::env::vars().any(|(name, _)| name.starts_with("OTEL_")) {