diff --git a/Cargo.lock b/Cargo.lock index c051dd80..bd584c53 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -48,9 +48,10 @@ dependencies = [ "rayon", "serde", "serde_json", - "serde_with 3.15.0", + "serde_with 3.15.1", "sha2", "tempfile", + "thiserror 2.0.17", "tokio", "tracing", ] @@ -74,7 +75,7 @@ dependencies = [ "rayon", "serde", "serde_json", - "serde_with 3.15.0", + "serde_with 3.15.1", "tokio", "tracing", ] @@ -146,6 +147,20 @@ dependencies = [ "tracing", ] +[[package]] +name = "acropolis_module_consensus" +version = "0.1.0" +dependencies = [ + "acropolis_common", + "anyhow", + "caryatid_sdk", + "config", + "futures", + "pallas 0.33.0", + "tokio", + "tracing", +] + [[package]] name = "acropolis_module_drdd_state" version = "0.1.0" @@ -173,7 +188,7 @@ dependencies = [ "hex", "serde", "serde_json", - "serde_with 3.15.0", + "serde_with 3.15.1", "tokio", "tracing", ] @@ -225,7 +240,7 @@ dependencies = [ "hex", "serde", "serde_json", - "serde_with 3.15.0", + "serde_with 3.15.1", "tokio", "tracing", ] @@ -282,7 +297,7 @@ dependencies = [ "reqwest 0.11.27", "serde", "serde_json", - "serde_with 3.15.0", + "serde_with 3.15.1", "tokio", "tracing", ] @@ -306,7 +321,7 @@ dependencies = [ "serde", "serde_cbor", "serde_json", - "serde_with 3.15.0", + "serde_with 3.15.1", "tokio", "tracing", ] @@ -357,7 +372,7 @@ dependencies = [ "rayon", "serde", "serde_json", - "serde_with 3.15.0", + "serde_with 3.15.1", "tokio", "tracing", ] @@ -377,7 +392,7 @@ dependencies = [ "serde", "serde_json", "serde_json_any_key", - "serde_with 3.15.0", + "serde_with 3.15.1", "tokio", "tracing", ] @@ -447,6 +462,7 @@ dependencies = [ "acropolis_module_assets_state", "acropolis_module_block_unpacker", "acropolis_module_chain_store", + "acropolis_module_consensus", "acropolis_module_drdd_state", "acropolis_module_drep_state", "acropolis_module_epochs_state", @@ -513,7 +529,7 @@ dependencies = [ "config", "serde", "serde_json", - "serde_with 3.15.0", + "serde_with 3.15.1", "tokio", "tracing", "tracing-subscriber", @@ -1531,9 +1547,9 @@ dependencies = [ [[package]] name = "clap" -version = "4.5.49" +version = "4.5.50" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f4512b90fa68d3a9932cea5184017c5d200f5921df706d45e853537dea51508f" +checksum = "0c2cfd7bf8a6017ddaa4e32ffe7403d547790db06bd171c1c53926faab501623" dependencies = [ "clap_builder", "clap_derive", @@ -1541,9 +1557,9 @@ dependencies = [ [[package]] name = "clap_builder" -version = "4.5.49" +version = "4.5.50" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0025e98baa12e766c67ba13ff4695a887a1eba19569aad00a472546795bd6730" +checksum = "0a4c05b9e80c5ccd3a7ef080ad7b6ba7d6fc00a985b8b157197075677c82c7a0" dependencies = [ "anstream", "anstyle", @@ -3173,9 +3189,9 @@ dependencies = [ [[package]] name = "is_terminal_polyfill" -version = "1.70.1" +version = "1.70.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7943c866cc5cd64cbc25b2e01621d07fa8eb2a1a23160ee81ce38704e97b8ecf" +checksum = "a6cb138bb79a146c1bd460005623e142ef0181e3d0219cb493e02f7d08a35695" [[package]] name = "itertools" @@ -3421,9 +3437,9 @@ checksum = "f52b00d39961fc5b2736ea853c9cc86238e165017a493d1d5c8eac6bdc4cc273" [[package]] name = "memmap2" -version = "0.9.8" +version = "0.9.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "843a98750cd611cc2965a8213b53b43e715f13c37a9e096c6408e69990961db7" +checksum = "744133e4a0e0a658e1374cf3bf8e415c4052a15a111acd372764c55b4177d490" dependencies = [ "libc", ] @@ -3592,7 +3608,7 @@ dependencies = [ "serde", "serde_bytes", "serde_json", - "serde_with 3.15.0", + "serde_with 3.15.1", "sha2", "slog", "strum", @@ -3789,9 +3805,9 @@ checksum = "42f5e15c9953c5e4ccceeb2e7382a716482c34515315f7b03532b8b4e8393d2d" [[package]] name = "once_cell_polyfill" -version = "1.70.1" +version = "1.70.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a4895175b425cb1f87721b59f0f286c2092bd4af812243672510e1ac53e2e0ad" +checksum = "384b8ab6d37215f3c5301a95a4accb5d64aa607f1fcb26a11b5303878451b4fe" [[package]] name = "opaque-debug" @@ -4100,7 +4116,7 @@ dependencies = [ "pallas-primitives 0.32.1", "serde", "serde_json", - "serde_with 3.15.0", + "serde_with 3.15.1", ] [[package]] @@ -4118,7 +4134,7 @@ dependencies = [ "pallas-primitives 0.33.0", "serde", "serde_json", - "serde_with 3.15.0", + "serde_with 3.15.1", ] [[package]] @@ -5328,9 +5344,9 @@ dependencies = [ [[package]] name = "rustls" -version = "0.23.33" +version = "0.23.34" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "751e04a496ca00bb97a5e043158d23d66b5aabf2e1d5aa2a0aaebb1aafe6f82c" +checksum = "6a9586e9ee2b4f8fab52a0048ca7334d7024eef48e2cb9407e3497bb7cab7fa7" dependencies = [ "log", "once_cell", @@ -5523,9 +5539,9 @@ dependencies = [ [[package]] name = "self_cell" -version = "1.2.0" +version = "1.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0f7d95a54511e0c7be3f51e8867aa8cf35148d7b9445d44de2f943e2b206e749" +checksum = "16c2f82143577edb4921b71ede051dac62ca3c16084e918bf7b40c96ae10eb33" [[package]] name = "semver" @@ -5668,9 +5684,9 @@ dependencies = [ [[package]] name = "serde_with" -version = "3.15.0" +version = "3.15.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6093cd8c01b25262b84927e0f7151692158fab02d961e04c979d3903eba7ecc5" +checksum = "aa66c845eee442168b2c8134fec70ac50dc20e760769c8ba0ad1319ca1959b04" dependencies = [ "base64 0.22.1", "chrono", @@ -5681,7 +5697,7 @@ dependencies = [ "schemars 1.0.4", "serde_core", "serde_json", - "serde_with_macros 3.15.0", + "serde_with_macros 3.15.1", "time", ] @@ -5699,9 +5715,9 @@ dependencies = [ [[package]] name = "serde_with_macros" -version = "3.15.0" +version = "3.15.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a7e6c180db0816026a61afa1cff5344fb7ebded7e4d3062772179f2501481c27" +checksum = "b91a903660542fced4e99881aa481bdbaec1634568ee02e0b8bd57c64cb38955" dependencies = [ "darling 0.21.3", "proc-macro2", @@ -6569,9 +6585,9 @@ checksum = "2896d95c02a80c6d6a5d6e953d479f5ddf2dfdb6a244441010e373ac0fb88971" [[package]] name = "unicode-ident" -version = "1.0.19" +version = "1.0.20" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f63a545481291138910575129486daeaf8ac54aee4387fe7906919f7830c7d9d" +checksum = "462eeb75aeb73aea900253ce739c8e18a67423fadf006037cd3ff27e82748a06" [[package]] name = "unicode-normalization" diff --git a/Cargo.toml b/Cargo.toml index 5dbc8b16..9efddb72 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,6 +23,7 @@ members = [ "modules/accounts_state", # Tracks stake and reward accounts "modules/assets_state", # Tracks native asset mints and burns "modules/historical_accounts_state", # Tracks historical account information + "modules/consensus", # Chooses favoured chain across multiple options "modules/chain_store", # Tracks historical information about blocks and TXs # Process builds diff --git a/common/Cargo.toml b/common/Cargo.toml index 165ca78e..572dd415 100644 --- a/common/Cargo.toml +++ b/common/Cargo.toml @@ -42,6 +42,7 @@ imbl = { workspace = true } dashmap = { workspace = true } rayon = "1.11.0" cryptoxide = "0.5.1" +thiserror = "2.0.17" sha2 = "0.10.8" caryatid_process.workspace = true config.workspace = true diff --git a/common/src/lib.rs b/common/src/lib.rs index c2c58283..abf55551 100644 --- a/common/src/lib.rs +++ b/common/src/lib.rs @@ -21,6 +21,7 @@ pub mod snapshot; pub mod stake_addresses; pub mod state_history; pub mod types; +pub mod validation; // Flattened re-exports pub use self::address::*; diff --git a/common/src/messages.rs b/common/src/messages.rs index 23dc362c..24ecfafc 100644 --- a/common/src/messages.rs +++ b/common/src/messages.rs @@ -27,23 +27,20 @@ use crate::queries::{ use crate::byte_array::*; use crate::types::*; +use crate::validation::ValidationStatus; // Caryatid core messages which we re-export pub use caryatid_module_clock::messages::ClockTickMessage; pub use caryatid_module_rest_server::messages::{GetRESTResponse, RESTRequest, RESTResponse}; -/// Block header message +/// Raw block data message #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] -pub struct BlockHeaderMessage { - /// Raw Data - pub raw: Vec, -} +pub struct RawBlockMessage { + /// Header raw data + pub header: Vec, -/// Block body message -#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] -pub struct BlockBodyMessage { - /// Raw Data - pub raw: Vec, + /// Body raw data + pub body: Vec, } /// Snapshot completion message @@ -281,8 +278,8 @@ pub struct SPOStateMessage { /// Cardano message enum #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub enum CardanoMessage { - BlockHeader(BlockHeaderMessage), // Block header available - BlockBody(BlockBodyMessage), // Block body available + BlockAvailable(RawBlockMessage), // Block body available + BlockValidation(ValidationStatus), // Result of a block validation SnapshotComplete, // Mithril snapshot loaded ReceivedTxs(RawTxsMessage), // Transaction available GenesisComplete(GenesisCompleteMessage), // Genesis UTXOs done + genesis params diff --git a/common/src/validation.rs b/common/src/validation.rs new file mode 100644 index 00000000..1501af67 --- /dev/null +++ b/common/src/validation.rs @@ -0,0 +1,29 @@ +//! Validation results for Acropolis consensus + +// We don't use these types in the acropolis_common crate itself +#![allow(dead_code)] + +use thiserror::Error; + +/// Validation error +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, Error)] +pub enum ValidationError { + #[error("VRF failure")] + BadVRF, + + #[error("KES failure")] + BadKES, + + #[error("Doubly spent UTXO: {0}")] + DoubleSpendUTXO(String), +} + +/// Validation status +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub enum ValidationStatus { + /// All good + Go, + + /// Error + NoGo(ValidationError), +} diff --git a/modules/block_unpacker/README.md b/modules/block_unpacker/README.md index aad00a87..bad3e4c1 100644 --- a/modules/block_unpacker/README.md +++ b/modules/block_unpacker/README.md @@ -12,15 +12,15 @@ everything except the section header can be left out. [module.block-unpacker] # Message topics -subscribe-topic = "cardano.block.body" +subscribe-topic = "cardano.block.proposed" publish-topic = "cardano.txs" ``` ## Messages -The block unpacker subscribes for BlockBodyMessages on -`cardano.block.body` (see the [Upstream Chain +The block unpacker subscribes for RawBlockMessages on +`cardano.block.proposed` (see the [Upstream Chain Fetcher](../upstream_chain_fetcher) module for details). It unpacks this into transactions, which it publishes as a single RawTxsMessage on `cardano.txs`, containing the block information and an ordered vector of diff --git a/modules/block_unpacker/src/block_unpacker.rs b/modules/block_unpacker/src/block_unpacker.rs index f3b51a5a..098d67b4 100644 --- a/modules/block_unpacker/src/block_unpacker.rs +++ b/modules/block_unpacker/src/block_unpacker.rs @@ -9,7 +9,7 @@ use pallas::ledger::traverse::MultiEraBlock; use std::sync::Arc; use tracing::{debug, error, info, info_span, Instrument}; -const DEFAULT_SUBSCRIBE_TOPIC: &str = "cardano.block.body"; +const DEFAULT_SUBSCRIBE_TOPIC: &str = "cardano.block.proposed"; const DEFAULT_PUBLISH_TOPIC: &str = "cardano.txs"; /// Block unpacker module @@ -42,9 +42,9 @@ impl BlockUnpacker { return; }; match message.as_ref() { - Message::Cardano((block_info, CardanoMessage::BlockBody(body_msg))) => { + Message::Cardano((block_info, CardanoMessage::BlockAvailable(block_msg))) => { // Parse the body - match MultiEraBlock::decode(&body_msg.raw) { + match MultiEraBlock::decode(&block_msg.body) { Ok(block) => { let span = info_span!("block_unpacker", block = block_info.number); diff --git a/modules/chain_store/src/chain_store.rs b/modules/chain_store/src/chain_store.rs index d2cfd5e7..73326732 100644 --- a/modules/chain_store/src/chain_store.rs +++ b/modules/chain_store/src/chain_store.rs @@ -120,11 +120,11 @@ impl ChainStore { } fn handle_new_block(store: &Arc, message: &Message) -> Result<()> { - let Message::Cardano((info, CardanoMessage::BlockBody(body))) = message else { + let Message::Cardano((info, CardanoMessage::BlockAvailable(raw_block))) = message else { bail!("Unexpected message type: {message:?}"); }; - store.insert_block(info, &body.raw) + store.insert_block(info, &raw_block.body) } fn handle_blocks_query( diff --git a/modules/consensus/Cargo.toml b/modules/consensus/Cargo.toml new file mode 100644 index 00000000..2cd73ac6 --- /dev/null +++ b/modules/consensus/Cargo.toml @@ -0,0 +1,24 @@ +# Acropolis consensus module + +[package] +name = "acropolis_module_consensus" +version = "0.1.0" +edition = "2021" +authors = ["Paul Clark "] +description = "Consensus Caryatid module for Acropolis" +license = "Apache-2.0" + +[dependencies] +acropolis_common = { path = "../../common" } + +caryatid_sdk = { workspace = true } + +anyhow = { workspace = true } +config = { workspace = true } +pallas = { workspace = true } +tokio = { workspace = true } +tracing = { workspace = true } +futures = "0.3.31" + +[lib] +path = "src/consensus.rs" diff --git a/modules/consensus/README.md b/modules/consensus/README.md new file mode 100644 index 00000000..5ec65fd5 --- /dev/null +++ b/modules/consensus/README.md @@ -0,0 +1,60 @@ +# Consensus module + +The consensus module takes proposed blocks from a (later, multiple) upstream +source and decides which chain to favour, passing on blocks on the favoured chain +to other validation and storage modules downstream + +## Configuration + +The following is the default configuration - these are the default +topics so they can be left out if they are OK. The validators *must* +be configured - if empty, no validation is performed + +```toml +[module.consensus] + +# Message topics +subscribe-blocks-topic = "cardano.block.available" +publish-blocks-topic = "cardano.block.proposed" + +# Validation result topics +validators = [ + "cardano.validation.vrf", + "cardano.validation.kes", + "cardano.validation.utxo" + ... +] + +``` + +## Validation + +The consensus module passes on blocks it receives from upstream (currently only a +single source) and sends them out as 'proposed' blocks for validation. It then listens +on all of the `validators` topics for BlockValidation messages, which give a Go / NoGo +for the block. The model is a NASA flight control desk, and like there, a single NoGo +is enough to stop the block. + +At the moment the module simply logs the validation failure. Once it is actually operating +consensus across multiple sources, it will use this and the length of chain to choose the best +chain. + +## Messages + +The consensus module subscribes for RawBlockMessages on +`cardano.block.available`. It uses the consensus rules to +decide which of multiple chains to favour, and sends candidate +blocks on `cardano.block.proposed` to request validation and storage. + +Both input and output are RawBlockMessages: + +```rust +pub struct RawBlockMessage { + /// Header raw data + pub header: Vec, + + /// Body raw data + pub body: Vec, +} +``` + diff --git a/modules/consensus/src/consensus.rs b/modules/consensus/src/consensus.rs new file mode 100644 index 00000000..8b5f1445 --- /dev/null +++ b/modules/consensus/src/consensus.rs @@ -0,0 +1,144 @@ +//! Acropolis consensus module for Caryatid +//! Maintains a favoured chain based on offered options from multiple sources + +use acropolis_common::{ + messages::{CardanoMessage, Message}, + validation::ValidationStatus, +}; +use anyhow::Result; +use caryatid_sdk::{module, Context, Module}; +use config::Config; +use futures::future::try_join_all; +use std::sync::Arc; +use std::time::Duration; +use tokio::time::timeout; +use tracing::{error, info, info_span, Instrument}; + +const DEFAULT_SUBSCRIBE_BLOCKS_TOPIC: &str = "cardano.block.available"; +const DEFAULT_PUBLISH_BLOCKS_TOPIC: &str = "cardano.block.proposed"; +const DEFAULT_VALIDATION_TIMEOUT: i64 = 60; // seconds + +/// Consensus module +/// Parameterised by the outer message enum used on the bus +#[module( + message_type(Message), + name = "consensus", + description = "Consensus algorithm" +)] +pub struct Consensus; + +impl Consensus { + /// Main init function + pub async fn init(&self, context: Arc>, config: Arc) -> Result<()> { + // Subscribe for block messages + // Get configuration + let subscribe_blocks_topic = config + .get_string("subscribe-blocks-topic") + .unwrap_or(DEFAULT_SUBSCRIBE_BLOCKS_TOPIC.to_string()); + info!("Creating blocks subscriber on '{subscribe_blocks_topic}'"); + + let publish_blocks_topic = config + .get_string("publish-blocks-topic") + .unwrap_or(DEFAULT_PUBLISH_BLOCKS_TOPIC.to_string()); + info!("Publishing blocks on '{publish_blocks_topic}'"); + + let validator_topics: Vec = + config.get::>("validators").unwrap_or_default(); + for topic in &validator_topics { + info!("Validator: {topic}"); + } + + let validation_timeout = Duration::from_secs( + config.get_int("validation-timeout").unwrap_or(DEFAULT_VALIDATION_TIMEOUT) as u64, + ); + info!("Validation timeout {validation_timeout:?}"); + + // Subscribe for incoming blocks + let mut subscription = context.subscribe(&subscribe_blocks_topic).await?; + + // Subscribe all the validators + let mut validator_subscriptions: Vec<_> = + try_join_all(validator_topics.iter().map(|topic| context.subscribe(topic))).await?; + + context.clone().run(async move { + loop { + let Ok((_, message)) = subscription.read().await else { + error!("Block message read failed"); + return; + }; + match message.as_ref() { + Message::Cardano((block_info, CardanoMessage::BlockAvailable(_block_msg))) => { + let span = info_span!("consensus", block = block_info.number); + + async { + // TODO Actually decide on favoured chain! + + // Send to all validators and state modules + context + .message_bus + .publish(&publish_blocks_topic, message.clone()) + .await + .unwrap_or_else(|e| error!("Failed to publish: {e}")); + + // Read validation responses from all validators in parallel + // and check they are all positive, with a safety timeout + let all_say_go = match timeout( + validation_timeout, + try_join_all(validator_subscriptions.iter_mut().map(|s| s.read())), + ) + .await + { + Ok(Ok(results)) => { + results.iter().fold(true, |all_ok, (_topic, msg)| { + match msg.as_ref() { + Message::Cardano(( + block_info, + CardanoMessage::BlockValidation(status), + )) => match status { + ValidationStatus::Go => all_ok && true, + ValidationStatus::NoGo(err) => { + error!( + block = block_info.number, + ?err, + "Validation failure" + ); + false + } + }, + + _ => { + error!( + "Unexpected validation message type: {msg:?}" + ); + false + } + } + }) + } + Ok(Err(e)) => { + error!("Failed to read validations: {e}"); + false + } + Err(_) => { + error!("Timeout waiting for validation responses"); + false + } + }; + + if !all_say_go { + error!(block = block_info.number, "Validation rejected block"); + // TODO Consequences: rollback, blacklist source + } + } + .instrument(span) + .await; + } + + _ => error!("Unexpected message type: {message:?}"), + } + } + }); + + Ok(()) + } +} diff --git a/modules/epochs_state/README.md b/modules/epochs_state/README.md index 27083dd7..2f26616c 100644 --- a/modules/epochs_state/README.md +++ b/modules/epochs_state/README.md @@ -17,7 +17,7 @@ everything except the section header can be left out. [module.epochs-state] # Message topics -subscribe-headers-topic = "cardano.block.headers" +subscribe-blocks-topic = "cardano.block.proposed" subscribe-fees-topic = "cardano.fees" publish-topic = "cardano.epoch.activity" @@ -25,9 +25,8 @@ publish-topic = "cardano.epoch.activity" ## Messages -The epochs state subscribes for BlockHeaderMessages on -`cardano.block.header` (see the [Upstream Chain -Fetcher](../upstream_chain_fetcher) module for details). +The epochs state subscribes for RawBlockMessages on +`cardano.block.proposed` (see the [Consensus](../consensus) module for details). TODO subscription for fees diff --git a/modules/epochs_state/src/epochs_state.rs b/modules/epochs_state/src/epochs_state.rs index f57a57ea..82770e9d 100644 --- a/modules/epochs_state/src/epochs_state.rs +++ b/modules/epochs_state/src/epochs_state.rs @@ -33,8 +33,8 @@ const DEFAULT_BOOTSTRAPPED_SUBSCRIBE_TOPIC: (&str, &str) = ( "bootstrapped-subscribe-topic", "cardano.sequence.bootstrapped", ); -const DEFAULT_BLOCK_HEADER_SUBSCRIBE_TOPIC: (&str, &str) = - ("block-header-subscribe-topic", "cardano.block.header"); +const DEFAULT_BLOCKS_SUBSCRIBE_TOPIC: (&str, &str) = + ("blocks-subscribe-topic", "cardano.block.proposed"); const DEFAULT_BLOCK_TXS_SUBSCRIBE_TOPIC: (&str, &str) = ("block-txs-subscribe-topic", "cardano.block.txs"); const DEFAULT_PROTOCOL_PARAMETERS_SUBSCRIBE_TOPIC: (&str, &str) = ( @@ -59,7 +59,7 @@ impl EpochsState { history: Arc>>, epochs_history: EpochsHistoryState, mut bootstrapped_subscription: Box>, - mut headers_subscription: Box>, + mut blocks_subscription: Box>, mut block_txs_subscription: Box>, mut protocol_parameters_subscription: Box>, mut epoch_activity_publisher: EpochActivityPublisher, @@ -81,13 +81,13 @@ impl EpochsState { let mut current_block: Option = None; // Read both topics in parallel - let headers_message_f = headers_subscription.read(); + let blocks_message_f = blocks_subscription.read(); let block_txs_message_f = block_txs_subscription.read(); - // Handle headers first - let (_, message) = headers_message_f.await?; + // Handle blocks first + let (_, message) = blocks_message_f.await?; match message.as_ref() { - Message::Cardano((block_info, CardanoMessage::BlockHeader(header_msg))) => { + Message::Cardano((block_info, CardanoMessage::BlockAvailable(block_msg))) => { // handle rollback here if block_info.status == BlockStatus::RolledBack { state = history.lock().await.get_rolled_back_state(block_info.number); @@ -120,7 +120,7 @@ impl EpochsState { let span = info_span!("epochs_state.decode_header", block = block_info.number); let mut header = None; span.in_scope(|| { - header = match MultiEraHeader::decode(variant, None, &header_msg.raw) { + header = match MultiEraHeader::decode(variant, None, &block_msg.header) { Ok(header) => Some(header), Err(e) => { error!("Can't decode header {}: {e}", block_info.slot); @@ -199,10 +199,10 @@ impl EpochsState { .unwrap_or(DEFAULT_BOOTSTRAPPED_SUBSCRIBE_TOPIC.1.to_string()); info!("Creating subscriber for bootstrapped on '{bootstrapped_subscribe_topic}'"); - let block_headers_subscribe_topic = config - .get_string(DEFAULT_BLOCK_HEADER_SUBSCRIBE_TOPIC.0) - .unwrap_or(DEFAULT_BLOCK_HEADER_SUBSCRIBE_TOPIC.1.to_string()); - info!("Creating subscriber for headers on '{block_headers_subscribe_topic}'"); + let blocks_subscribe_topic = config + .get_string(DEFAULT_BLOCKS_SUBSCRIBE_TOPIC.0) + .unwrap_or(DEFAULT_BLOCKS_SUBSCRIBE_TOPIC.1.to_string()); + info!("Creating subscriber for blocks on '{blocks_subscribe_topic}'"); let block_txs_subscribe_topic = config .get_string(DEFAULT_BLOCK_TXS_SUBSCRIBE_TOPIC.0) @@ -242,7 +242,7 @@ impl EpochsState { // Subscribe let bootstrapped_subscription = context.subscribe(&bootstrapped_subscribe_topic).await?; - let headers_subscription = context.subscribe(&block_headers_subscribe_topic).await?; + let blocks_subscription = context.subscribe(&blocks_subscribe_topic).await?; let protocol_parameters_subscription = context.subscribe(&protocol_parameters_subscribe_topic).await?; let block_txs_subscription = context.subscribe(&block_txs_subscribe_topic).await?; @@ -344,7 +344,7 @@ impl EpochsState { history, epochs_history, bootstrapped_subscription, - headers_subscription, + blocks_subscription, block_txs_subscription, protocol_parameters_subscription, epoch_activity_publisher, diff --git a/modules/mithril_snapshot_fetcher/src/mithril_snapshot_fetcher.rs b/modules/mithril_snapshot_fetcher/src/mithril_snapshot_fetcher.rs index 0b9fabad..97061df4 100644 --- a/modules/mithril_snapshot_fetcher/src/mithril_snapshot_fetcher.rs +++ b/modules/mithril_snapshot_fetcher/src/mithril_snapshot_fetcher.rs @@ -3,7 +3,7 @@ use acropolis_common::{ genesis_values::GenesisValues, - messages::{BlockBodyMessage, BlockHeaderMessage, CardanoMessage, Message}, + messages::{CardanoMessage, Message, RawBlockMessage}, BlockHash, BlockInfo, BlockStatus, Era, }; use anyhow::{anyhow, bail, Result}; @@ -24,15 +24,14 @@ use std::path::Path; use std::sync::Arc; use std::thread::sleep; use std::time::Duration as SystemDuration; -use tokio::{join, sync::Mutex}; +use tokio::sync::Mutex; use tracing::{debug, error, info, info_span, Instrument}; mod pause; use pause::PauseType; const DEFAULT_STARTUP_TOPIC: &str = "cardano.sequence.bootstrapped"; -const DEFAULT_HEADER_TOPIC: &str = "cardano.block.header"; -const DEFAULT_BODY_TOPIC: &str = "cardano.block.body"; +const DEFAULT_BLOCK_TOPIC: &str = "cardano.block.available"; const DEFAULT_COMPLETION_TOPIC: &str = "cardano.snapshot.complete"; const DEFAULT_AGGREGATOR_URL: &str = @@ -238,11 +237,14 @@ impl MithrilSnapshotFetcher { config: Arc, genesis: GenesisValues, ) -> Result<()> { - let header_topic = - config.get_string("header-topic").unwrap_or(DEFAULT_HEADER_TOPIC.to_string()); - let body_topic = config.get_string("body-topic").unwrap_or(DEFAULT_BODY_TOPIC.to_string()); + let block_topic = + config.get_string("block-topic").unwrap_or(DEFAULT_BLOCK_TOPIC.to_string()); + info!("Publishing blocks on '{block_topic}'"); + let completion_topic = config.get_string("completion-topic").unwrap_or(DEFAULT_COMPLETION_TOPIC.to_string()); + info!("Publishing completion on '{completion_topic}'"); + let directory = config.get_string("directory").unwrap_or(DEFAULT_DIRECTORY.to_string()); let mut pause_constraint = PauseType::from_config(&config, DEFAULT_PAUSE).unwrap_or(PauseType::NoPause); @@ -345,33 +347,22 @@ impl MithrilSnapshotFetcher { } } - // Send the block header message - let header = block.header(); - let header_message = BlockHeaderMessage { - raw: header.cbor().to_vec(), + // Send the block message + let message = RawBlockMessage { + header: block.header().cbor().to_vec(), + body: raw_block, }; - let header_message_enum = Message::Cardano(( - block_info.clone(), - CardanoMessage::BlockHeader(header_message), - )); - let header_future = context - .message_bus - .publish(&header_topic, Arc::new(header_message_enum)); - - // Send the block body message - let body_message = BlockBodyMessage { raw: raw_block }; - - let body_message_enum = Message::Cardano(( + let message_enum = Message::Cardano(( block_info.clone(), - CardanoMessage::BlockBody(body_message), + CardanoMessage::BlockAvailable(message), )); - let body_future = - context.message_bus.publish(&body_topic, Arc::new(body_message_enum)); - let (header_result, body_result) = join!(header_future, body_future); - header_result.unwrap_or_else(|e| error!("Failed to publish header: {e}")); - body_result.unwrap_or_else(|e| error!("Failed to publish body: {e}")); + context + .message_bus + .publish(&block_topic, Arc::new(message_enum)) + .await + .unwrap_or_else(|e| error!("Failed to publish block message: {e}")); last_block_info = Some(block_info); Ok::<(), anyhow::Error>(()) diff --git a/modules/spo_state/src/spo_state.rs b/modules/spo_state/src/spo_state.rs index b345c0c2..4a874566 100644 --- a/modules/spo_state/src/spo_state.rs +++ b/modules/spo_state/src/spo_state.rs @@ -46,8 +46,8 @@ const DEFAULT_WITHDRAWALS_SUBSCRIBE_TOPIC: (&str, &str) = ("withdrawals-subscribe-topic", "cardano.withdrawals"); const DEFAULT_GOVERNANCE_SUBSCRIBE_TOPIC: (&str, &str) = ("governance-subscribe-topic", "cardano.governance"); -const DEFAULT_BLOCK_HEADER_SUBSCRIBE_TOPIC: (&str, &str) = - ("block-header-subscribe-topic", "cardano.block.header"); +const DEFAULT_BLOCKS_SUBSCRIBE_TOPIC: (&str, &str) = + ("blocks-subscribe-topic", "cardano.block.proposed"); const DEFAULT_EPOCH_ACTIVITY_SUBSCRIBE_TOPIC: (&str, &str) = ("epoch-activity-subscribe-topic", "cardano.epoch.activity"); const DEFAULT_SPDD_SUBSCRIBE_TOPIC: (&str, &str) = @@ -85,7 +85,7 @@ impl SPOState { store_config: &StoreConfig, // subscribers mut certificates_subscription: Box>, - mut block_headers_subscription: Box>, + mut blocks_subscription: Box>, mut withdrawals_subscription: Option>>, mut governance_subscription: Option>>, mut epoch_activity_subscription: Box>, @@ -114,7 +114,7 @@ impl SPOState { // read per-block topics in parallel let certs_message_f = certificates_subscription.read(); - let block_headers_message_f = block_headers_subscription.read(); + let blocks_message_f = blocks_subscription.read(); let withdrawals_message_f = withdrawals_subscription.as_mut().map(|s| s.read()); let governance_message_f = governance_subscription.as_mut().map(|s| s.read()); let stake_deltas_message_f = stake_deltas_subscription.as_mut().map(|s| s.read()); @@ -139,11 +139,11 @@ impl SPOState { } }; - // handle Block Headers (handle_mint) before handle_tx_certs + // handle blocks (handle_mint) before handle_tx_certs // in case of epoch boundary - let (_, block_headers_message) = block_headers_message_f.await?; - match block_headers_message.as_ref() { - Message::Cardano((block_info, CardanoMessage::BlockHeader(header_msg))) => { + let (_, block_message) = blocks_message_f.await?; + match block_message.as_ref() { + Message::Cardano((block_info, CardanoMessage::BlockAvailable(block_msg))) => { let span = info_span!("spo_state.handle_block_header", block = block_info.number); @@ -161,7 +161,7 @@ impl SPOState { // Parse the header - note we ignore the subtag because EBBs // are suppressed upstream - match MultiEraHeader::decode(variant, None, &header_msg.raw) { + match MultiEraHeader::decode(variant, None, &block_msg.header) { Ok(header) => { if let Some(vrf_vkey) = header.vrf_vkey() { state.handle_mint(&block_info, vrf_vkey); @@ -173,7 +173,7 @@ impl SPOState { }); } - _ => error!("Unexpected message type: {block_headers_message:?}"), + _ => error!("Unexpected message type: {block_message:?}"), } // handle tx certificates @@ -439,10 +439,10 @@ impl SPOState { .unwrap_or(DEFAULT_SPO_REWARDS_SUBSCRIBE_TOPIC.1.to_string()); info!("Creating SPO rewards subscriber on '{spo_rewards_subscribe_topic}'"); - let block_headers_subscribe_topic = config - .get_string(DEFAULT_BLOCK_HEADER_SUBSCRIBE_TOPIC.0) - .unwrap_or(DEFAULT_BLOCK_HEADER_SUBSCRIBE_TOPIC.1.to_string()); - info!("Creating block headers subscriber on '{block_headers_subscribe_topic}'"); + let blocks_subscribe_topic = config + .get_string(DEFAULT_BLOCKS_SUBSCRIBE_TOPIC.0) + .unwrap_or(DEFAULT_BLOCKS_SUBSCRIBE_TOPIC.1.to_string()); + info!("Creating blocks subscriber on '{blocks_subscribe_topic}'"); let stake_reward_deltas_subscribe_topic = config .get_string(DEFAULT_STAKE_REWARD_DELTAS_SUBSCRIBE_TOPIC.0) @@ -725,7 +725,7 @@ impl SPOState { // Subscriptions let certificates_subscription = context.subscribe(&certificates_subscribe_topic).await?; - let block_headers_subscription = context.subscribe(&block_headers_subscribe_topic).await?; + let blocks_subscription = context.subscribe(&blocks_subscribe_topic).await?; let epoch_activity_subscription = context.subscribe(&epoch_activity_subscribe_topic).await?; let spdd_subscription = context.subscribe(&spdd_subscribe_topic).await?; @@ -771,7 +771,7 @@ impl SPOState { retired_pools_history, &store_config, certificates_subscription, - block_headers_subscription, + blocks_subscription, withdrawals_subscription, governance_subscription, epoch_activity_subscription, diff --git a/modules/upstream_chain_fetcher/src/body_fetcher.rs b/modules/upstream_chain_fetcher/src/body_fetcher.rs index 555df36f..038d8497 100644 --- a/modules/upstream_chain_fetcher/src/body_fetcher.rs +++ b/modules/upstream_chain_fetcher/src/body_fetcher.rs @@ -1,10 +1,7 @@ //! Acropolis Miniprotocols module for Caryatid //! Multi-connection, block body fetching part of the client (in separate thread). -use acropolis_common::{ - messages::{BlockBodyMessage, BlockHeaderMessage}, - BlockHash, BlockInfo, BlockStatus, Era, -}; +use acropolis_common::{messages::RawBlockMessage, BlockHash, BlockInfo, BlockStatus, Era}; use anyhow::{bail, Result}; use crossbeam::channel::{Receiver, TryRecvError}; use pallas::{ @@ -55,13 +52,13 @@ impl BodyFetcher { } } - async fn fetch_block(&mut self, point: Point) -> Result>> { + async fn fetch_block(&mut self, point: Point) -> Result>>> { // Fetch the block body debug!("Requesting single block {point:?}"); let body = self.peer.blockfetch().fetch_single(point.clone()).await; match body { - Ok(body) => Ok(Success(Arc::new(BlockBodyMessage { raw: body }))), + Ok(body) => Ok(Success(Arc::new(body))), Err(blockfetch::ClientError::Plexer(e)) => { error!("Can't fetch block at {point:?}: {e}, will try to restart"); Ok(NetworkError) @@ -139,16 +136,18 @@ impl BodyFetcher { // reconstruct a Point from the header because the one we get // in the RollForward is the *tip*, not the next read point let fetch_point = Point::Specific(block_info.slot, block_info.hash.to_vec()); - let msg_body = match self.fetch_block(fetch_point).await? { + let raw_body = match self.fetch_block(fetch_point).await? { Success(body) => body, NetworkError => return Ok(NetworkError), }; - let msg_hdr = Arc::new(BlockHeaderMessage { raw: h.cbor }); + let message = Arc::new(RawBlockMessage { + header: h.cbor, + body: raw_body.to_vec(), + }); let record = UpstreamCacheRecord { id: block_info.clone(), - hdr: msg_hdr.clone(), - body: msg_body.clone(), + message: message.clone(), }; Ok(Success(record)) diff --git a/modules/upstream_chain_fetcher/src/upstream_cache.rs b/modules/upstream_chain_fetcher/src/upstream_cache.rs index 9281a358..6012cc70 100644 --- a/modules/upstream_chain_fetcher/src/upstream_cache.rs +++ b/modules/upstream_chain_fetcher/src/upstream_cache.rs @@ -1,7 +1,4 @@ -use acropolis_common::{ - messages::{BlockBodyMessage, BlockHeaderMessage}, - BlockInfo, -}; +use acropolis_common::{messages::RawBlockMessage, BlockInfo}; use anyhow::{anyhow, bail, Result}; use std::{ fs::File, @@ -13,8 +10,7 @@ use std::{ #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub struct UpstreamCacheRecord { pub id: BlockInfo, - pub hdr: Arc, - pub body: Arc, + pub message: Arc, } pub trait Storage { @@ -170,10 +166,7 @@ impl Storage for FileStorage { #[cfg(test)] mod test { use crate::upstream_cache::{Storage, UpstreamCacheImpl, UpstreamCacheRecord}; - use acropolis_common::{ - messages::{BlockBodyMessage, BlockHeaderMessage}, - BlockHash, BlockInfo, BlockStatus, Era, - }; + use acropolis_common::{messages::RawBlockMessage, BlockHash, BlockInfo, BlockStatus, Era}; use anyhow::Result; use std::{collections::HashMap, sync::Arc}; @@ -194,11 +187,9 @@ mod test { fn ucr(n: u64, hdr: usize, body: usize) -> UpstreamCacheRecord { UpstreamCacheRecord { id: blk(n), - hdr: Arc::new(BlockHeaderMessage { - raw: vec![hdr as u8], - }), - body: Arc::new(BlockBodyMessage { - raw: vec![body as u8], + message: Arc::new(RawBlockMessage { + header: vec![hdr as u8], + body: vec![body as u8], }), } } @@ -259,8 +250,8 @@ mod test { for n in 0..11 { let record = cache.read_record()?.unwrap(); assert_eq!(record.id.number, perm[n]); - assert_eq!(record.hdr.raw, vec![n as u8]); - assert_eq!(record.body.raw, vec![(n + 100) as u8]); + assert_eq!(record.message.header, vec![n as u8]); + assert_eq!(record.message.body, vec![(n + 100) as u8]); cache.next_record()?; } diff --git a/modules/upstream_chain_fetcher/src/utils.rs b/modules/upstream_chain_fetcher/src/utils.rs index dd3ee50b..5c538014 100644 --- a/modules/upstream_chain_fetcher/src/utils.rs +++ b/modules/upstream_chain_fetcher/src/utils.rs @@ -10,8 +10,7 @@ use serde::Deserialize; use std::sync::Arc; use tracing::{error, info}; -const DEFAULT_HEADER_TOPIC: (&str, &str) = ("header-topic", "cardano.block.header"); -const DEFAULT_BODY_TOPIC: (&str, &str) = ("body-topic", "cardano.block.body"); +const DEFAULT_BLOCK_TOPIC: (&str, &str) = ("block-topic", "cardano.block.available"); const DEFAULT_SNAPSHOT_COMPLETION_TOPIC: (&str, &str) = ("snapshot-completion-topic", "cardano.snapshot.complete"); const DEFAULT_GENESIS_COMPLETION_TOPIC: (&str, &str) = @@ -42,8 +41,7 @@ pub enum SyncPoint { pub struct FetcherConfig { pub context: Arc>, - pub header_topic: String, - pub body_topic: String, + pub block_topic: String, pub sync_point: SyncPoint, pub snapshot_completion_topic: String, pub genesis_completion_topic: String, @@ -100,8 +98,7 @@ impl FetcherConfig { pub fn new(context: Arc>, config: Arc) -> Result { Ok(Self { context, - header_topic: Self::conf(&config, DEFAULT_HEADER_TOPIC), - body_topic: Self::conf(&config, DEFAULT_BODY_TOPIC), + block_topic: Self::conf(&config, DEFAULT_BLOCK_TOPIC), snapshot_completion_topic: Self::conf(&config, DEFAULT_SNAPSHOT_COMPLETION_TOPIC), genesis_completion_topic: Self::conf(&config, DEFAULT_GENESIS_COMPLETION_TOPIC), sync_point: Self::conf_enum::(&config, DEFAULT_SYNC_POINT)?, @@ -124,18 +121,12 @@ impl FetcherConfig { } pub async fn publish_message(cfg: Arc, record: &UpstreamCacheRecord) -> Result<()> { - let header_msg = Arc::new(Message::Cardano(( + let message = Arc::new(Message::Cardano(( record.id.clone(), - CardanoMessage::BlockHeader((*record.hdr).clone()), + CardanoMessage::BlockAvailable((*record.message).clone()), ))); - let body_msg = Arc::new(Message::Cardano(( - record.id.clone(), - CardanoMessage::BlockBody((*record.body).clone()), - ))); - - cfg.context.message_bus.publish(&cfg.header_topic, header_msg).await?; - cfg.context.message_bus.publish(&cfg.body_topic, body_msg).await + cfg.context.message_bus.publish(&cfg.block_topic, message).await } pub async fn peer_connect(cfg: Arc, role: &str) -> Result> { diff --git a/processes/omnibus/Cargo.toml b/processes/omnibus/Cargo.toml index 99537c09..7814b5c8 100644 --- a/processes/omnibus/Cargo.toml +++ b/processes/omnibus/Cargo.toml @@ -28,6 +28,7 @@ acropolis_module_drdd_state = { path = "../../modules/drdd_state" } acropolis_module_assets_state = { path = "../../modules/assets_state" } acropolis_module_chain_store = { path = "../../modules/chain_store" } acropolis_module_address_state = { path = "../../modules/address_state" } +acropolis_module_consensus = { path = "../../modules/consensus" } acropolis_module_historical_accounts_state = { path = "../../modules/historical_accounts_state" } caryatid_process = { workspace = true } diff --git a/processes/omnibus/omnibus.toml b/processes/omnibus/omnibus.toml index c83c9e37..20bccd9a 100644 --- a/processes/omnibus/omnibus.toml +++ b/processes/omnibus/omnibus.toml @@ -15,6 +15,15 @@ sync-point = "snapshot" node-address = "backbone.cardano.iog.io:3001" magic-number = 764824073 +[module.consensus] +# List of validation result topics to listen on +validators = [ +# Add these once they are actually being sent +# "cardano.validation.vrf", +# "cardano.validation.kes", +# "cardano.validation.utxo" +] + [module.block-unpacker] [module.rest-blockfrost] diff --git a/processes/omnibus/src/main.rs b/processes/omnibus/src/main.rs index 54d91ea5..74735165 100644 --- a/processes/omnibus/src/main.rs +++ b/processes/omnibus/src/main.rs @@ -14,6 +14,7 @@ use acropolis_module_address_state::AddressState; use acropolis_module_assets_state::AssetsState; use acropolis_module_block_unpacker::BlockUnpacker; use acropolis_module_chain_store::ChainStore; +use acropolis_module_consensus::Consensus; use acropolis_module_drdd_state::DRDDState; use acropolis_module_drep_state::DRepState; use acropolis_module_epochs_state::EpochsState; @@ -117,6 +118,7 @@ pub async fn main() -> Result<()> { BlockfrostREST::register(&mut process); SPDDState::register(&mut process); DRDDState::register(&mut process); + Consensus::register(&mut process); ChainStore::register(&mut process); Clock::::register(&mut process);