diff --git a/Cargo.lock b/Cargo.lock index d237f4ca..022148b5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -302,6 +302,22 @@ dependencies = [ "tracing", ] +[[package]] +name = "acropolis_module_peer_network_interface" +version = "0.2.0" +dependencies = [ + "acropolis_common", + "anyhow", + "caryatid_sdk", + "config", + "crossbeam", + "pallas 0.33.0", + "serde", + "serde_json", + "tokio", + "tracing", +] + [[package]] name = "acropolis_module_rest_blockfrost" version = "0.1.0" @@ -486,6 +502,7 @@ dependencies = [ "acropolis_module_historical_accounts_state", "acropolis_module_mithril_snapshot_fetcher", "acropolis_module_parameters_state", + "acropolis_module_peer_network_interface", "acropolis_module_rest_blockfrost", "acropolis_module_spdd_state", "acropolis_module_spo_state", diff --git a/Cargo.toml b/Cargo.toml index 035c1edb..86b8639e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,6 +11,7 @@ members = [ "modules/mithril_snapshot_fetcher", # Mithril snapshot fetcher "modules/snapshot_bootstrapper", # Bootstrap state from a ledger snapshot "modules/upstream_chain_fetcher", # Upstream chain fetcher + "modules/peer_network_interface", # Multi-peer network interface "modules/block_unpacker", # Block to transaction unpacker "modules/tx_unpacker", # Tx to UTXO unpacker "modules/utxo_state", # UTXO state diff --git a/common/src/genesis_values.rs b/common/src/genesis_values.rs index 33781b81..bc339142 100644 --- a/common/src/genesis_values.rs +++ b/common/src/genesis_values.rs @@ -6,6 +6,7 @@ const MAINNET_SHELLEY_GENESIS_HASH: &str = "1a3be38bcbb7911969283716ad7aa550250226b76a61fc51cc9a9a35d9276d81"; #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +#[serde(rename_all = "kebab-case")] pub struct GenesisValues { pub byron_timestamp: u64, pub shelley_epoch: u64, diff --git a/common/src/lib.rs b/common/src/lib.rs index 0ca5a58a..c5da58cf 100644 --- a/common/src/lib.rs +++ b/common/src/lib.rs @@ -21,6 +21,7 @@ pub mod snapshot; pub mod stake_addresses; pub mod state_history; pub mod types; +pub mod upstream_cache; pub mod validation; // Flattened re-exports diff --git a/modules/upstream_chain_fetcher/src/upstream_cache.rs b/common/src/upstream_cache.rs similarity index 86% rename from modules/upstream_chain_fetcher/src/upstream_cache.rs rename to common/src/upstream_cache.rs index 29e42f3f..c8cae2a3 100644 --- a/modules/upstream_chain_fetcher/src/upstream_cache.rs +++ b/common/src/upstream_cache.rs @@ -1,9 +1,9 @@ -use acropolis_common::{messages::RawBlockMessage, BlockInfo}; -use anyhow::{anyhow, bail, Result}; +use crate::{messages::RawBlockMessage, BlockInfo}; +use anyhow::{anyhow, bail, Context, Result}; use std::{ fs::File, io::{BufReader, Write}, - path::Path, + path::{Path, PathBuf}, sync::Arc, }; @@ -19,26 +19,26 @@ pub trait Storage { } pub struct FileStorage { - path: String, + path: PathBuf, } impl FileStorage { - pub fn new(path: &str) -> Self { - Self { - path: path.to_string(), - } + pub fn new>(path: P) -> Result { + let path = path.as_ref().to_path_buf(); + std::fs::create_dir_all(&path)?; + Ok(Self { path }) } - fn get_file_name(&self, chunk_no: usize) -> String { - format!("{}/chunk-{chunk_no}.json", self.path) + fn get_file_name(&self, chunk_no: usize) -> PathBuf { + self.path.join(format!("chunk-{chunk_no}.json")) } } pub type UpstreamCache = UpstreamCacheImpl; impl UpstreamCache { - pub fn new(path: &str) -> Self { - UpstreamCache::new_impl(FileStorage::new(path)) + pub fn new>(path: P) -> Result { + Ok(UpstreamCache::new_impl(FileStorage::new(path)?)) } } @@ -124,7 +124,9 @@ impl UpstreamCacheImpl { pub fn write_record(&mut self, record: &UpstreamCacheRecord) -> Result<()> { self.chunk_cached.push(record.clone()); - self.storage.write_chunk(self.current_chunk, &self.chunk_cached)?; + self.storage + .write_chunk(self.current_chunk, &self.chunk_cached) + .context("could not write cache record")?; self.current_record += 1; if self.current_record >= self.density { @@ -139,26 +141,25 @@ impl UpstreamCacheImpl { impl Storage for FileStorage { fn read_chunk(&mut self, chunk_no: usize) -> Result> { - let name = self.get_file_name(chunk_no); - let path = Path::new(&name); + let path = self.get_file_name(chunk_no); if !path.try_exists()? { return Ok(vec![]); } - let file = File::open(&name)?; + let file = File::open(&path)?; let reader = BufReader::new(file); - match serde_json::from_reader::, Vec>(reader) - { - Ok(res) => Ok(res.clone()), - Err(err) => Err(anyhow!( - "Error reading upstream cache chunk JSON from {name}: '{err}'" - )), - } + serde_json::from_reader(reader).with_context(|| { + format!( + "Error reading upstream cache chunk JSON from {}", + path.display() + ) + }) } fn write_chunk(&mut self, chunk_no: usize, data: &[UpstreamCacheRecord]) -> Result<()> { - let mut file = File::create(self.get_file_name(chunk_no))?; - file.write_all(serde_json::to_string(data)?.as_bytes())?; + let mut file = + File::create(self.get_file_name(chunk_no)).context("could not write chunk")?; + file.write_all(&serde_json::to_vec(data)?)?; Ok(()) } } @@ -166,7 +167,7 @@ impl Storage for FileStorage { #[cfg(test)] mod test { use crate::upstream_cache::{Storage, UpstreamCacheImpl, UpstreamCacheRecord}; - use acropolis_common::{messages::RawBlockMessage, BlockHash, BlockInfo, BlockStatus, Era}; + use crate::{messages::RawBlockMessage, BlockHash, BlockInfo, BlockStatus, Era}; use anyhow::Result; use std::{collections::HashMap, sync::Arc}; diff --git a/modules/genesis_bootstrapper/build.rs b/modules/genesis_bootstrapper/build.rs index b4fac344..15376a94 100644 --- a/modules/genesis_bootstrapper/build.rs +++ b/modules/genesis_bootstrapper/build.rs @@ -62,6 +62,16 @@ async fn main() -> Result<()> { "https://book.world.dev.cardano.org/environments/mainnet/shelley-genesis.json", "mainnet-shelley-genesis.json", ), + download( + &client, + "https://book.world.dev.cardano.org/environments/preview/byron-genesis.json", + "preview-byron-genesis.json", + ), + download( + &client, + "https://book.world.dev.cardano.org/environments/preview/shelley-genesis.json", + "preview-shelley-genesis.json", + ), download( &client, "https://raw.githubusercontent.com/Hornan7/SanchoNet-Tutorials/refs/heads/main/genesis/byron-genesis.json", diff --git a/modules/genesis_bootstrapper/src/genesis_bootstrapper.rs b/modules/genesis_bootstrapper/src/genesis_bootstrapper.rs index 58997e20..1f076fc9 100644 --- a/modules/genesis_bootstrapper/src/genesis_bootstrapper.rs +++ b/modules/genesis_bootstrapper/src/genesis_bootstrapper.rs @@ -32,6 +32,9 @@ const DEFAULT_NETWORK_NAME: &str = "mainnet"; const MAINNET_BYRON_GENESIS: &[u8] = include_bytes!("../downloads/mainnet-byron-genesis.json"); const MAINNET_SHELLEY_GENESIS: &[u8] = include_bytes!("../downloads/mainnet-shelley-genesis.json"); const MAINNET_SHELLEY_START_EPOCH: u64 = 208; +const PREVIEW_BYRON_GENESIS: &[u8] = include_bytes!("../downloads/preview-byron-genesis.json"); +const PREVIEW_SHELLEY_GENESIS: &[u8] = include_bytes!("../downloads/preview-shelley-genesis.json"); +const PREVIEW_SHELLEY_START_EPOCH: u64 = 0; const SANCHONET_BYRON_GENESIS: &[u8] = include_bytes!("../downloads/sanchonet-byron-genesis.json"); const SANCHONET_SHELLEY_GENESIS: &[u8] = include_bytes!("../downloads/sanchonet-shelley-genesis.json"); @@ -101,6 +104,11 @@ impl GenesisBootstrapper { MAINNET_SHELLEY_GENESIS, MAINNET_SHELLEY_START_EPOCH, ), + "preview" => ( + PREVIEW_BYRON_GENESIS, + PREVIEW_SHELLEY_GENESIS, + PREVIEW_SHELLEY_START_EPOCH, + ), "sanchonet" => ( SANCHONET_BYRON_GENESIS, SANCHONET_SHELLEY_GENESIS, diff --git a/modules/peer_network_interface/Cargo.toml b/modules/peer_network_interface/Cargo.toml new file mode 100644 index 00000000..c15ee1b2 --- /dev/null +++ b/modules/peer_network_interface/Cargo.toml @@ -0,0 +1,26 @@ +# Acropolis upstream chain fetcher module + +[package] +name = "acropolis_module_peer_network_interface" +version = "0.2.0" +edition = "2024" +authors = ["Simon Gellis "] +description = "Multiplexed chain fetcher Caryatid module for Acropolis" +license = "Apache-2.0" + +[dependencies] +acropolis_common = { path = "../../common" } + +caryatid_sdk = { workspace = true } + +anyhow = { workspace = true } +config = { workspace = true } +crossbeam = "0.8.4" +pallas = { workspace = true } +serde = { workspace = true, features = ["rc"] } +serde_json = { workspace = true } +tokio = { workspace = true } +tracing = { workspace = true } + +[lib] +path = "src/peer_network_interface.rs" diff --git a/modules/peer_network_interface/NOTES.md b/modules/peer_network_interface/NOTES.md new file mode 100644 index 00000000..5d084cbe --- /dev/null +++ b/modules/peer_network_interface/NOTES.md @@ -0,0 +1,18 @@ +# Architecture + +This module uses an event-queue-based architecture. A `NetworkManager` is responsible for creating a set of `PeerConnection`s and sending commands to them. Each `PeerConnection` maintains a connection to a single peer; it responds to commands from the `NetworkManager`, and emits events to an event queue. The `NetworkManager` reads from that queue to decide which chain to follow. When blocks from the preferred chain have been fetched, it publishes those blocks to the message bus. + +This module requests the body for every block announced by any chain, from the first chain which announced it. When it has the body for the next block announced, it will publish it to the message bus. + +```mermaid +graph LR + EQ[Event Queue]-->NM[NetworkManager] + subgraph Peers + P1[PeerConnection 1] + P2[PeerConnection 2] + P3[PeerConnection 3] + end + NM -->|RequestBlock
FindIntersect| P1 & P2 & P3 + Peers -->|ChainSync
BlockFetched
Disconnect|EQ + NM -->|BlockAvailable| MB[Message Bus] +``` \ No newline at end of file diff --git a/modules/peer_network_interface/README.md b/modules/peer_network_interface/README.md new file mode 100644 index 00000000..5e488841 --- /dev/null +++ b/modules/peer_network_interface/README.md @@ -0,0 +1,18 @@ +# Peer network interface module + +The peer network interface module uses the ChainSync and BlockFetch protocols to fetch blocks from one of several upstream sources. It chooses one peer to treat as the "preferred" chain to follow, but will gracefully switch which peer it follows during network issues. + +It can either run independently, either from the origin or current tip, or +be triggered by a Mithril snapshot event (the default) where it starts from +where the snapshot left off, and follows the chain from there. + +Rollbacks are handled by signalling in the block data - it is downstream +subscribers' responsibility to deal with the effects of this. + +## Configuration + +See [./config.default.toml](./config.default.toml) for the available configuration options and their default values. + +## Messages + +This module publishes "raw block messages" to the configured `block-topic`. Each message includes the raw bytes composing the header and body of a block. The module follows the head of one chain at any given time, though that chain may switch during runtime. If that chain reports a rollback (or if this module switches to a different chain), the next message it emits will be the new head of the chain and have the status `RolledBack`. diff --git a/modules/peer_network_interface/config.default.toml b/modules/peer_network_interface/config.default.toml new file mode 100644 index 00000000..c48d02dc --- /dev/null +++ b/modules/peer_network_interface/config.default.toml @@ -0,0 +1,24 @@ +# The topic to publish blocks on +block-topic = "cardano.block.available" +# The topic to wait for when sync-point is "snapshot" +snapshot-completion-topic = "cardano.snapshot.complete" +# The topic to wait for when listening for genesis values from another module +genesis-completion-topic = "cardano.sequence.bootstrapped" + +# Upstream node connections +node-addresses = [ + "backbone.cardano.iog.io:3001", + "backbone.mainnet.cardanofoundation.org:3001", + "backbone.mainnet.emurgornd.com:3001", +] +# The network magic for the chain to connect to +magic-number = 764824073 + +# The initial point to start syncing from. Options: +# - "origin": sync from the very start of the chain +# - "tip": sync from the very end of the chain +# - "cache": replay messages from a local filesystem cache, then sync from the point right after that cache. +# - "snapshot": wait for another module to restore from a snapshot, then sync from the point right after that snapshot. +sync-point = "snapshot" +# The cache dir to use when sync-point is "cache" +cache-dir = "upstream-cache" \ No newline at end of file diff --git a/modules/peer_network_interface/src/chain_state.rs b/modules/peer_network_interface/src/chain_state.rs new file mode 100644 index 00000000..3ec72e09 --- /dev/null +++ b/modules/peer_network_interface/src/chain_state.rs @@ -0,0 +1,500 @@ +use std::collections::{BTreeMap, VecDeque}; + +use acropolis_common::{BlockHash, params::SECURITY_PARAMETER_K}; +use pallas::network::miniprotocols::Point; + +use crate::{connection::Header, network::PeerId}; + +#[derive(Debug)] +struct BlockData { + header: Header, + announced_by: Vec, + body: Option>, +} + +#[derive(Debug, Default)] +struct SlotBlockData { + blocks: Vec, +} +impl SlotBlockData { + fn track_announcement(&mut self, id: PeerId, header: Header) { + if let Some(block) = self.blocks.iter_mut().find(|b| b.header.hash == header.hash) { + block.announced_by.push(id); + } else { + self.blocks.push(BlockData { + header, + announced_by: vec![id], + body: None, + }); + } + } + + fn track_rollback(&mut self, id: PeerId) -> bool { + self.blocks.retain_mut(|block| { + block.announced_by.retain(|p| *p != id); + !block.announced_by.is_empty() + }); + !self.blocks.is_empty() + } + + fn was_hash_announced(&self, id: PeerId, hash: BlockHash) -> bool { + self.blocks.iter().any(|b| b.header.hash == hash && b.announced_by.contains(&id)) + } + + fn find_announced_hash(&self, id: PeerId) -> Option { + self.blocks.iter().find_map(|b| { + if b.announced_by.contains(&id) { + Some(b.header.hash) + } else { + None + } + }) + } + + fn announcers(&self, hash: BlockHash) -> Vec { + match self.blocks.iter().find(|b| b.header.hash == hash) { + Some(b) => b.announced_by.clone(), + None => vec![], + } + } + + fn track_body(&mut self, hash: BlockHash, body: Vec) { + let Some(block) = self.blocks.iter_mut().find(|b| b.header.hash == hash) else { + return; + }; + if block.body.is_none() { + block.body = Some(body); + } + } + + fn body(&self, hash: BlockHash) -> Option<(&Header, &[u8])> { + for block in &self.blocks { + if block.header.hash != hash { + continue; + } + return Some((&block.header, block.body.as_ref()?)); + } + None + } +} + +#[derive(Debug, Default)] +pub struct ChainState { + pub preferred_upstream: Option, + blocks: BTreeMap, + published_blocks: VecDeque<(u64, BlockHash)>, + unpublished_blocks: VecDeque<(u64, BlockHash)>, + rolled_back: bool, +} + +impl ChainState { + pub fn new() -> Self { + Self::default() + } + + pub fn handle_roll_forward(&mut self, id: PeerId, header: Header) -> Vec { + let is_preferred = self.preferred_upstream == Some(id); + let slot = header.slot; + let hash = header.hash; + let slot_blocks = self.blocks.entry(header.slot).or_default(); + slot_blocks.track_announcement(id, header); + if is_preferred { + self.unpublished_blocks.push_back((slot, hash)); + } + self.block_announcers(slot, hash) + } + + pub fn handle_roll_backward(&mut self, id: PeerId, point: Point) { + let is_preferred = self.preferred_upstream == Some(id); + match point { + Point::Origin => { + self.blocks.retain(|_, b| b.track_rollback(id)); + if is_preferred { + if !self.published_blocks.is_empty() { + self.rolled_back = true; + } + self.published_blocks.clear(); + self.unpublished_blocks.clear(); + } + } + Point::Specific(slot, _) => { + self.blocks.retain(|s, b| *s <= slot || b.track_rollback(id)); + if is_preferred { + while let Some((s, _)) = self.unpublished_blocks.back() { + if *s > slot { + self.unpublished_blocks.pop_back(); + } else { + break; + } + } + while let Some((s, _)) = self.published_blocks.back() { + if *s > slot { + self.rolled_back = true; + self.published_blocks.pop_back(); + } else { + break; + } + } + } + } + } + } + + pub fn handle_body_fetched(&mut self, slot: u64, hash: BlockHash, body: Vec) { + let Some(slot_blocks) = self.blocks.get_mut(&slot) else { + return; + }; + slot_blocks.track_body(hash, body); + } + + pub fn handle_new_preferred_upstream(&mut self, id: PeerId) { + if self.preferred_upstream == Some(id) { + return; + } + self.preferred_upstream = Some(id); + + // If there are any blocks queued to be published which our preferred upstream never announced, + // unqueue them now. + while let Some((slot, hash)) = self.unpublished_blocks.back() { + let Some(slot_blocks) = self.blocks.get(slot) else { + break; + }; + if !slot_blocks.was_hash_announced(id, *hash) { + self.unpublished_blocks.pop_back(); + } else { + break; + } + } + + // If we published any blocks which our preferred upstream never announced, + // we'll have to publish that we rolled them back + while let Some((slot, hash)) = self.published_blocks.back() { + let Some(slot_blocks) = self.blocks.get(slot) else { + break; + }; + if !slot_blocks.was_hash_announced(id, *hash) { + self.rolled_back = true; + self.published_blocks.pop_back(); + } else { + break; + } + } + + // If this other chain has announced blocks which we haven't published yet, + // queue them to be published as soon as we have their bodies + let head_slot = self.published_blocks.back().map(|(s, _)| *s); + if let Some(slot) = head_slot { + for (slot, blocks) in self.blocks.range(slot + 1..) { + if let Some(hash) = blocks.find_announced_hash(id) { + self.unpublished_blocks.push_back((*slot, hash)); + } + } + } + } + + pub fn next_unpublished_block(&self) -> Option<(&Header, &[u8], bool)> { + let (slot, hash) = self.unpublished_blocks.front()?; + let slot_blocks = self.blocks.get(slot)?; + let (header, body) = slot_blocks.body(*hash)?; + Some((header, body, self.rolled_back)) + } + + pub fn handle_block_published(&mut self) { + if let Some(published) = self.unpublished_blocks.pop_front() { + self.published_blocks.push_back(published); + self.rolled_back = false; + while self.published_blocks.len() > SECURITY_PARAMETER_K as usize { + let Some((slot, _)) = self.published_blocks.pop_back() else { + break; + }; + self.blocks.remove(&slot); + } + } + } + + pub fn choose_points_for_find_intersect(&self) -> Vec { + let mut iterator = self.published_blocks.iter().rev(); + let mut result = vec![]; + + // send the 5 most recent points + for _ in 0..5 { + if let Some((slot, hash)) = iterator.next() { + result.push(Point::Specific(*slot, hash.to_vec())); + } + } + + // then 5 more points, spaced out by 10 block heights each + let mut iterator = iterator.step_by(10); + for _ in 0..5 { + if let Some((slot, hash)) = iterator.next() { + result.push(Point::Specific(*slot, hash.to_vec())); + } + } + + // then 5 more points, spaced out by a total of 100 block heights each + // (in case of an implausibly long rollback) + let mut iterator = iterator.step_by(10); + for _ in 0..5 { + if let Some((slot, hash)) = iterator.next() { + result.push(Point::Specific(*slot, hash.to_vec())); + } + } + + result + } + + pub fn block_announcers(&self, slot: u64, hash: BlockHash) -> Vec { + match self.blocks.get(&slot) { + Some(slot_blocks) => slot_blocks.announcers(hash), + None => vec![], + } + } +} + +#[cfg(test)] +mod tests { + use acropolis_common::Era; + use pallas::crypto::hash::Hasher; + + use super::*; + + fn make_block(slot: u64, desc: &str) -> (Header, Vec) { + let mut hasher = Hasher::<256>::new(); + hasher.input(&slot.to_le_bytes()); + hasher.input(desc.as_bytes()); + let hash = BlockHash::new(*hasher.finalize()); + let header = Header { + hash, + slot, + number: slot, + bytes: desc.as_bytes().to_vec(), + era: Era::Conway, + }; + let body = desc.as_bytes().to_vec(); + (header, body) + } + + #[test] + fn should_work_in_happy_path() { + let mut state = ChainState::new(); + let peer = PeerId(0); + state.handle_new_preferred_upstream(peer); + + let (header, body) = make_block(0, "first block"); + + // simulate a roll forward from our peer + let announced = state.handle_roll_forward(peer, header.clone()); + assert_eq!(announced, vec![peer]); + + // we don't have any new blocks to report yet + assert_eq!(state.next_unpublished_block(), None); + + // report that our peer returned the body + state.handle_body_fetched(header.slot, header.hash, body.clone()); + + // NOW we have a new block to report + assert_eq!( + state.next_unpublished_block(), + Some((&header, body.as_slice(), false)) + ); + state.handle_block_published(); + assert_eq!(state.next_unpublished_block(), None); + } + + #[test] + fn should_handle_blocks_fetched_out_of_order() { + let mut state = ChainState::new(); + let p1 = PeerId(0); + state.handle_new_preferred_upstream(p1); + + let (h1, b1) = make_block(0, "first block"); + let (h2, b2) = make_block(1, "second block"); + + // simulate a roll forward + state.handle_roll_forward(p1, h1.clone()); + state.handle_roll_forward(p1, h2.clone()); + + // we don't have any new blocks to report yet + assert_eq!(state.next_unpublished_block(), None); + + // report that our peer returned the SECOND body first. + state.handle_body_fetched(h2.slot, h2.hash, b2.clone()); + + // without the first block, we can't use that yet. + assert_eq!(state.next_unpublished_block(), None); + + // but once it reports the first body... + state.handle_body_fetched(h1.slot, h1.hash, b1.clone()); + + // NOW we have TWO new blocks to report + assert_eq!( + state.next_unpublished_block(), + Some((&h1, b1.as_slice(), false)) + ); + state.handle_block_published(); + assert_eq!( + state.next_unpublished_block(), + Some((&h2, b2.as_slice(), false)) + ); + state.handle_block_published(); + assert_eq!(state.next_unpublished_block(), None); + } + + #[test] + fn should_handle_rollback() { + let mut state = ChainState::new(); + let p1 = PeerId(0); + state.handle_new_preferred_upstream(p1); + + let (h1, b1) = make_block(0, "first block"); + let (h2, b2) = make_block(1, "second block pre-rollback"); + let (h3, b3) = make_block(1, "second block post-rollback"); + let (h4, b4) = make_block(1, "third block post-rollback"); + + // publish the first block + assert_eq!(state.handle_roll_forward(p1, h1.clone()), vec![p1]); + state.handle_body_fetched(h1.slot, h1.hash, b1.clone()); + assert_eq!( + state.next_unpublished_block(), + Some((&h1, b1.as_slice(), false)) + ); + state.handle_block_published(); + + // publish the second block + assert_eq!(state.handle_roll_forward(p1, h2.clone()), vec![p1]); + state.handle_body_fetched(h2.slot, h2.hash, b2.clone()); + assert_eq!( + state.next_unpublished_block(), + Some((&h2, b2.as_slice(), false)) + ); + state.handle_block_published(); + assert_eq!(state.next_unpublished_block(), None); + + // now, roll the chain back to the first block + state.handle_roll_backward(p1, Point::Specific(h1.slot, h1.hash.to_vec())); + assert_eq!(state.next_unpublished_block(), None); + + // and when we advance to the new second block, the system should report it as a rollback + assert_eq!(state.handle_roll_forward(p1, h3.clone()), vec![p1]); + state.handle_body_fetched(h3.slot, h3.hash, b3.clone()); + assert_eq!( + state.next_unpublished_block(), + Some((&h3, b3.as_slice(), true)) + ); + state.handle_block_published(); + + // and the new third block should not be a rollback + assert_eq!(state.handle_roll_forward(p1, h4.clone()), vec![p1]); + state.handle_body_fetched(h4.slot, h4.hash, b4.clone()); + assert_eq!( + state.next_unpublished_block(), + Some((&h4, b4.as_slice(), false)) + ); + state.handle_block_published(); + } + + #[test] + fn should_not_report_rollback_for_unpublished_portion_of_chain() { + let mut state = ChainState::new(); + let p1 = PeerId(0); + state.handle_new_preferred_upstream(p1); + + let (h1, b1) = make_block(0, "first block"); + let (h2, b2) = make_block(1, "second block pre-rollback"); + let (h3, b3) = make_block(1, "second block post-rollback"); + + // publish the first block + assert_eq!(state.handle_roll_forward(p1, h1.clone()), vec![p1]); + state.handle_body_fetched(h1.slot, h1.hash, b1.clone()); + assert_eq!( + state.next_unpublished_block(), + Some((&h1, b1.as_slice(), false)) + ); + state.handle_block_published(); + + // roll forward to the second block, but pretend the body is taking a while to download + assert_eq!(state.handle_roll_forward(p1, h2.clone()), vec![p1]); + + // oops, we just received a rollback + state.handle_roll_backward(p1, Point::Specific(h1.slot, h1.hash.to_vec())); + + // and THEN we got the second body + state.handle_body_fetched(h2.slot, h2.hash, b2.clone()); + + // don't publish the old second block, since it isn't part of the chain + assert_eq!(state.next_unpublished_block(), None); + + // and when we advance to the new second block, the system should not report it as a rollback + assert_eq!(state.handle_roll_forward(p1, h3.clone()), vec![p1]); + state.handle_body_fetched(h3.slot, h3.hash, b3.clone()); + assert_eq!( + state.next_unpublished_block(), + Some((&h3, b3.as_slice(), false)) + ); + state.handle_block_published(); + assert_eq!(state.next_unpublished_block(), None); + } + + #[test] + fn should_gracefully_handle_switching_chains() { + let mut state = ChainState::new(); + let p1 = PeerId(0); + let p2 = PeerId(1); + state.handle_new_preferred_upstream(p1); + + let (h1, b1) = make_block(0, "first block"); + let (p1h2, p1b2) = make_block(1, "our preferred upstream's second block"); + let (p1h3, p1b3) = make_block(2, "our preferred upstream's third block"); + let (p2h2, p2b2) = make_block(1, "another upstream's second block"); + let (p2h3, p2b3) = make_block(2, "another upstream's third block"); + + // publish three blocks on our current chain + assert_eq!(state.handle_roll_forward(p1, h1.clone()), vec![p1]); + state.handle_body_fetched(h1.slot, h1.hash, b1.clone()); + assert_eq!( + state.next_unpublished_block(), + Some((&h1, b1.as_slice(), false)) + ); + state.handle_block_published(); + + assert_eq!(state.handle_roll_forward(p1, p1h2.clone()), vec![p1]); + state.handle_body_fetched(p1h2.slot, p1h2.hash, p1b2.clone()); + assert_eq!( + state.next_unpublished_block(), + Some((&p1h2, p1b2.as_slice(), false)) + ); + state.handle_block_published(); + + assert_eq!(state.handle_roll_forward(p1, p1h3.clone()), vec![p1]); + state.handle_body_fetched(p1h3.slot, p1h3.hash, p1b3.clone()); + assert_eq!( + state.next_unpublished_block(), + Some((&p1h3, p1b3.as_slice(), false)) + ); + state.handle_block_published(); + + // that other chain forked + assert_eq!(state.handle_roll_forward(p2, h1.clone()), vec![p1, p2]); + assert_eq!(state.handle_roll_forward(p2, p2h2.clone()), vec![p2]); + state.handle_body_fetched(p2h2.slot, p2h2.hash, p2b2.clone()); + assert_eq!(state.handle_roll_forward(p2, p2h3.clone()), vec![p2]); + state.handle_body_fetched(p2h3.slot, p2h3.hash, p2b3.clone()); + assert_eq!(state.next_unpublished_block(), None); + + // and then we decided to switch to it + state.handle_new_preferred_upstream(p2); + + // now we should publish two blocks, and the first should be marked as "rollback" + assert_eq!( + state.next_unpublished_block(), + Some((&p2h2, p2b2.as_slice(), true)) + ); + state.handle_block_published(); + assert_eq!( + state.next_unpublished_block(), + Some((&p2h3, p2b3.as_slice(), false)) + ); + state.handle_block_published(); + assert_eq!(state.next_unpublished_block(), None); + } +} diff --git a/modules/peer_network_interface/src/configuration.rs b/modules/peer_network_interface/src/configuration.rs new file mode 100644 index 00000000..9f02ebb0 --- /dev/null +++ b/modules/peer_network_interface/src/configuration.rs @@ -0,0 +1,41 @@ +use std::path::PathBuf; + +use acropolis_common::genesis_values::GenesisValues; +use anyhow::Result; +use config::Config; + +#[derive(Clone, Debug, serde::Deserialize, PartialEq)] +#[serde(rename_all = "camelCase")] +pub enum SyncPoint { + Origin, + Tip, + Cache, + Snapshot, +} + +#[derive(serde::Deserialize)] +#[serde(rename_all = "kebab-case")] +pub struct InterfaceConfig { + pub block_topic: String, + pub sync_point: SyncPoint, + pub snapshot_completion_topic: String, + pub genesis_completion_topic: String, + pub node_addresses: Vec, + pub magic_number: u64, + pub cache_dir: PathBuf, + #[serde(flatten)] + pub genesis_values: Option, +} + +impl InterfaceConfig { + pub fn try_load(config: &Config) -> Result { + let full_config = Config::builder() + .add_source(config::File::from_str( + include_str!("../config.default.toml"), + config::FileFormat::Toml, + )) + .add_source(config.clone()) + .build()?; + Ok(full_config.try_deserialize()?) + } +} diff --git a/modules/peer_network_interface/src/connection.rs b/modules/peer_network_interface/src/connection.rs new file mode 100644 index 00000000..ec4e38c8 --- /dev/null +++ b/modules/peer_network_interface/src/connection.rs @@ -0,0 +1,236 @@ +use std::time::Duration; + +use acropolis_common::{BlockHash, Era}; +use anyhow::{Result, bail}; +pub use pallas::network::miniprotocols::Point; +use pallas::{ + ledger::traverse::MultiEraHeader, + network::{ + facades::PeerClient, + miniprotocols::{blockfetch, chainsync}, + }, +}; +use tokio::{ + select, + sync::{mpsc, oneshot}, +}; +use tracing::error; + +use crate::network::PeerMessageSender; + +pub struct PeerConnection { + pub address: String, + chainsync: mpsc::UnboundedSender, + blockfetch: mpsc::UnboundedSender, +} + +impl PeerConnection { + pub fn new(address: String, magic: u64, sender: PeerMessageSender, delay: Duration) -> Self { + let worker = PeerConnectionWorker { + address: address.clone(), + magic, + sender, + }; + let (chainsync_tx, chainsync_rx) = mpsc::unbounded_channel(); + let (blockfetch_tx, blockfetch_rx) = mpsc::unbounded_channel(); + tokio::spawn(async move { + tokio::time::sleep(delay).await; + worker.run(chainsync_rx, blockfetch_rx).await; + }); + Self { + address, + chainsync: chainsync_tx, + blockfetch: blockfetch_tx, + } + } + + pub async fn find_tip(&self) -> Result { + let (tx, rx) = oneshot::channel(); + self.chainsync.send(ChainsyncCommand::FindTip(tx))?; + Ok(rx.await?) + } + + pub fn find_intersect(&self, points: Vec) -> Result<()> { + self.chainsync.send(ChainsyncCommand::FindIntersect(points))?; + Ok(()) + } + + pub fn request_block(&self, hash: BlockHash, slot: u64) -> Result<()> { + self.blockfetch.send(BlockfetchCommand::Fetch(hash, slot))?; + Ok(()) + } +} + +#[derive(Debug)] +pub enum PeerEvent { + ChainSync(PeerChainSyncEvent), + BlockFetched(BlockFetched), + Disconnected, +} + +#[derive(Debug)] +pub enum PeerChainSyncEvent { + RollForward(Header), + RollBackward(Point), + IntersectNotFound(Point), +} + +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct Header { + pub hash: BlockHash, + pub slot: u64, + pub number: u64, + pub bytes: Vec, + pub era: Era, +} + +#[derive(Debug)] +pub struct BlockFetched { + pub slot: u64, + pub hash: BlockHash, + pub body: Vec, +} + +struct PeerConnectionWorker { + address: String, + magic: u64, + sender: PeerMessageSender, +} + +impl PeerConnectionWorker { + async fn run( + mut self, + chainsync: mpsc::UnboundedReceiver, + blockfetch: mpsc::UnboundedReceiver, + ) { + if let Err(err) = self.do_run(chainsync, blockfetch).await { + error!(peer = self.address, "{err:#}"); + } + let _ = self.sender.write(PeerEvent::Disconnected).await; + } + + async fn do_run( + &mut self, + chainsync: mpsc::UnboundedReceiver, + blockfetch: mpsc::UnboundedReceiver, + ) -> Result<()> { + let client = PeerClient::connect(self.address.clone(), self.magic).await?; + select! { + res = self.run_chainsync(client.chainsync, chainsync) => res, + res = self.run_blockfetch(client.blockfetch, blockfetch) => res, + } + } + + async fn run_chainsync( + &self, + mut client: chainsync::N2NClient, + mut commands: mpsc::UnboundedReceiver, + ) -> Result<()> { + let mut reached = None; + loop { + select! { + msg = client.request_or_await_next(), if reached.is_some() => { + if let Some(parsed) = self.parse_chainsync_message(msg?)? { + reached = Some(parsed.point); + self.sender.write(PeerEvent::ChainSync(parsed.event)).await?; + } + } + cmd = commands.recv() => { + let Some(cmd) = cmd else { + bail!("parent process has disconnected"); + }; + if !client.has_agency() { + // To run find_intersect, we must have agency. + // If we don't, it's because we requested the next response already. + // There's no way to cancel that request, so just wait for it to finish. + client.recv_while_must_reply().await?; + }; + match cmd { + ChainsyncCommand::FindIntersect(points) => { + let (point, tip) = client.find_intersect(points).await?; + reached = point; + if reached.is_none() { + self.sender.write(PeerEvent::ChainSync(PeerChainSyncEvent::IntersectNotFound(tip.0))).await?; + } + } + ChainsyncCommand::FindTip(done) => { + let points = reached.as_slice().to_vec(); + let (_, tip) = client.find_intersect(points).await?; + if done.send(tip.0).is_err() { + bail!("parent process has disconnected"); + } + } + } + } + } + } + } + + async fn run_blockfetch( + &self, + mut client: blockfetch::Client, + mut commands: mpsc::UnboundedReceiver, + ) -> Result<()> { + while let Some(BlockfetchCommand::Fetch(hash, slot)) = commands.recv().await { + let point = Point::Specific(slot, hash.to_vec()); + let body = client.fetch_single(point).await?; + self.sender.write(PeerEvent::BlockFetched(BlockFetched { slot, hash, body })).await?; + } + bail!("parent process has disconnected"); + } + + fn parse_chainsync_message( + &self, + msg: chainsync::NextResponse, + ) -> Result> { + match msg { + chainsync::NextResponse::RollForward(header, _) => { + let Some(parsed) = self.parse_header(header)? else { + return Ok(None); + }; + let point = Point::Specific(parsed.slot, parsed.hash.to_vec()); + Ok(Some(ParsedChainsyncMessage { + point, + event: PeerChainSyncEvent::RollForward(parsed), + })) + } + chainsync::NextResponse::RollBackward(point, _) => Ok(Some(ParsedChainsyncMessage { + point: point.clone(), + event: PeerChainSyncEvent::RollBackward(point), + })), + chainsync::NextResponse::Await => Ok(None), + } + } + + fn parse_header(&self, header: chainsync::HeaderContent) -> Result> { + let hdr_tag = header.byron_prefix.map(|p| p.0); + let hdr_variant = header.variant; + let hdr = MultiEraHeader::decode(hdr_variant, hdr_tag, &header.cbor)?; + if hdr.as_eb().is_some() { + // skip EpochBoundary blocks + return Ok(None); + } + let era = Era::try_from(hdr_variant)?; + Ok(Some(Header { + hash: BlockHash::new(*hdr.hash()), + slot: hdr.slot(), + number: hdr.number(), + bytes: header.cbor, + era, + })) + } +} + +enum ChainsyncCommand { + FindIntersect(Vec), + FindTip(oneshot::Sender), +} + +struct ParsedChainsyncMessage { + point: Point, + event: PeerChainSyncEvent, +} + +enum BlockfetchCommand { + Fetch(BlockHash, u64), +} diff --git a/modules/peer_network_interface/src/network.rs b/modules/peer_network_interface/src/network.rs new file mode 100644 index 00000000..ff7f494a --- /dev/null +++ b/modules/peer_network_interface/src/network.rs @@ -0,0 +1,253 @@ +use std::{collections::BTreeMap, time::Duration}; + +use crate::{ + BlockSink, + chain_state::ChainState, + connection::{PeerChainSyncEvent, PeerConnection, PeerEvent}, +}; +use acropolis_common::BlockHash; +use anyhow::{Context as _, Result, bail}; +use pallas::network::miniprotocols::Point; +use tokio::sync::mpsc; +use tracing::{info, warn}; + +struct PeerData { + conn: PeerConnection, + reqs: Vec<(BlockHash, u64)>, +} +impl PeerData { + fn new(conn: PeerConnection) -> Self { + Self { conn, reqs: vec![] } + } + + fn find_intersect(&self, points: Vec) { + if let Err(error) = self.conn.find_intersect(points) { + warn!("could not sync {}: {error:#}", self.conn.address); + } + } + + fn request_block(&mut self, hash: BlockHash, slot: u64) -> bool { + if self.reqs.contains(&(hash, slot)) { + return true; + } + if let Err(error) = self.conn.request_block(hash, slot) { + warn!( + "could not request block from {}: {error:#}", + self.conn.address + ); + return false; + } + self.reqs.push((hash, slot)); + true + } + + fn ack_block(&mut self, hash: BlockHash) { + self.reqs.retain(|(h, _)| *h != hash); + } +} + +pub struct NetworkManager { + network_magic: u64, + next_id: u64, + peers: BTreeMap, + chain: ChainState, + events: mpsc::Receiver, + events_sender: mpsc::Sender, + block_sink: BlockSink, + published_blocks: u64, +} + +impl NetworkManager { + pub fn new( + network_magic: u64, + events: mpsc::Receiver, + events_sender: mpsc::Sender, + block_sink: BlockSink, + ) -> Self { + Self { + network_magic, + next_id: 0, + peers: BTreeMap::new(), + chain: ChainState::new(), + events, + events_sender, + block_sink, + published_blocks: 0, + } + } + + pub async fn run(mut self) -> Result<()> { + while let Some(event) = self.events.recv().await { + match event { + NetworkEvent::PeerUpdate { peer, event } => { + self.handle_peer_update(peer, event); + if true { + self.publish_blocks().await?; + } + } + } + } + bail!("event sink closed") + } + + pub fn handle_new_connection(&mut self, address: String, delay: Duration) { + let id = PeerId(self.next_id); + self.next_id += 1; + let sender = PeerMessageSender { + sink: self.events_sender.clone(), + id, + }; + let conn = PeerConnection::new(address, self.network_magic, sender, delay); + let peer = PeerData::new(conn); + let points = self.chain.choose_points_for_find_intersect(); + if !points.is_empty() { + peer.find_intersect(points); + } + self.peers.insert(id, peer); + if self.chain.preferred_upstream.is_none() { + self.set_preferred_upstream(id); + } + } + + pub async fn sync_to_tip(&mut self) -> Result<()> { + loop { + let Some(upstream) = self.chain.preferred_upstream else { + bail!("no peers"); + }; + let Some(peer) = self.peers.get(&upstream) else { + bail!("preferred upstream not found"); + }; + match peer.conn.find_tip().await { + Ok(point) => { + self.sync_to_point(point); + return Ok(()); + } + Err(e) => { + warn!("could not fetch tip from {}: {e:#}", peer.conn.address); + self.handle_disconnect(upstream); + } + } + } + } + + pub fn sync_to_point(&mut self, point: Point) { + for peer in self.peers.values() { + peer.find_intersect(vec![point.clone()]); + } + } + + // Implementation note: this method is deliberately synchronous/non-blocking. + // The task which handles network events should only block when waiting for new messages, + // or when publishing messages to other modules. This avoids deadlock; if our event queue + // is full and this method is blocked on writing to it, the queue can never drain. + fn handle_peer_update(&mut self, peer: PeerId, event: PeerEvent) { + match event { + PeerEvent::ChainSync(PeerChainSyncEvent::RollForward(header)) => { + let slot = header.slot; + let hash = header.hash; + let request_body_from = self.chain.handle_roll_forward(peer, header); + if !request_body_from.is_empty() { + // Request the block from the first peer which announced it + self.request_block(slot, hash, request_body_from); + } + } + PeerEvent::ChainSync(PeerChainSyncEvent::RollBackward(point)) => { + self.chain.handle_roll_backward(peer, point); + } + PeerEvent::ChainSync(PeerChainSyncEvent::IntersectNotFound(tip)) => { + // We called find_intersect on a peer, and it didn't recognize any of the points we passed. + // That peer must either be behind us or on a different fork; either way, that chain should sync from its own tip + if let Some(peer) = self.peers.get(&peer) { + peer.find_intersect(vec![tip]); + } + } + PeerEvent::BlockFetched(fetched) => { + for peer in self.peers.values_mut() { + peer.ack_block(fetched.hash); + } + self.chain.handle_body_fetched(fetched.slot, fetched.hash, fetched.body); + } + PeerEvent::Disconnected => { + self.handle_disconnect(peer); + } + } + } + + fn handle_disconnect(&mut self, id: PeerId) { + let Some(peer) = self.peers.remove(&id) else { + return; + }; + warn!("disconnected from {}", peer.conn.address); + let was_preferred = self.chain.preferred_upstream.is_some_and(|i| i == id); + if was_preferred && let Some(new_preferred) = self.peers.keys().next().copied() { + self.set_preferred_upstream(new_preferred); + } + if self.peers.is_empty() { + warn!("no upstream peers!"); + } + for (requested_hash, requested_slot) in peer.reqs { + let announcers = self.chain.block_announcers(requested_slot, requested_hash); + self.request_block(requested_slot, requested_hash, announcers); + } + + let address = peer.conn.address.clone(); + self.handle_new_connection(address, Duration::from_secs(5)); + } + + fn request_block(&mut self, slot: u64, hash: BlockHash, announcers: Vec) { + for announcer in announcers { + let Some(peer) = self.peers.get_mut(&announcer) else { + continue; + }; + if peer.request_block(hash, slot) { + break; // only fetch from one + } else { + self.handle_disconnect(announcer); + } + } + } + + fn set_preferred_upstream(&mut self, id: PeerId) { + let Some(peer) = self.peers.get(&id) else { + warn!("setting preferred upstream to unrecognized node {id:?}"); + return; + }; + info!("setting preferred upstream to {}", peer.conn.address); + self.chain.handle_new_preferred_upstream(id); + } + + async fn publish_blocks(&mut self) -> Result<()> { + while let Some((header, body, rolled_back)) = self.chain.next_unpublished_block() { + self.block_sink.announce(header, body, rolled_back).await?; + self.published_blocks += 1; + if self.published_blocks.is_multiple_of(100) { + info!("Published block {}", header.number); + } + self.chain.handle_block_published(); + } + Ok(()) + } +} + +pub enum NetworkEvent { + PeerUpdate { peer: PeerId, event: PeerEvent }, +} + +#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)] +pub struct PeerId(pub(crate) u64); + +pub struct PeerMessageSender { + id: PeerId, + sink: mpsc::Sender, +} +impl PeerMessageSender { + pub async fn write(&self, event: PeerEvent) -> Result<()> { + self.sink + .send(NetworkEvent::PeerUpdate { + peer: self.id, + event, + }) + .await + .context("network manager has shut down") + } +} diff --git a/modules/peer_network_interface/src/peer_network_interface.rs b/modules/peer_network_interface/src/peer_network_interface.rs new file mode 100644 index 00000000..5bc39af1 --- /dev/null +++ b/modules/peer_network_interface/src/peer_network_interface.rs @@ -0,0 +1,214 @@ +mod chain_state; +mod configuration; +mod connection; +mod network; + +use acropolis_common::{ + BlockInfo, BlockStatus, + genesis_values::GenesisValues, + messages::{CardanoMessage, Message, RawBlockMessage}, + upstream_cache::{UpstreamCache, UpstreamCacheRecord}, +}; +use anyhow::{Result, bail}; +use caryatid_sdk::{Context, Module, Subscription, module}; +use config::Config; +use pallas::network::miniprotocols::Point; +use tokio::sync::mpsc; +use tracing::{error, warn}; + +use std::{path::Path, sync::Arc, time::Duration}; + +use crate::{ + configuration::{InterfaceConfig, SyncPoint}, + connection::Header, + network::NetworkManager, +}; + +#[module( + message_type(Message), + name = "peer-network-interface", + description = "Mini-protocol chain fetcher from several upstream nodes" +)] +pub struct PeerNetworkInterface; + +impl PeerNetworkInterface { + pub async fn init(&self, context: Arc>, config: Arc) -> Result<()> { + let cfg = InterfaceConfig::try_load(&config)?; + let genesis_complete = if cfg.genesis_values.is_none() { + Some(context.subscribe(&cfg.genesis_completion_topic).await?) + } else { + None + }; + let snapshot_complete = match cfg.sync_point { + SyncPoint::Snapshot => Some(context.subscribe(&cfg.snapshot_completion_topic).await?), + _ => None, + }; + let (events_sender, events) = mpsc::channel(1024); + + context.clone().run(async move { + let genesis_values = if let Some(mut sub) = genesis_complete { + Self::wait_genesis_completion(&mut sub) + .await + .expect("could not fetch genesis values") + } else { + cfg.genesis_values.expect("genesis values not found") + }; + + let mut upstream_cache = None; + let mut cache_sync_point = Point::Origin; + if cfg.sync_point == SyncPoint::Cache { + match Self::init_cache(&cfg.cache_dir, &cfg.block_topic, &context).await { + Ok((cache, sync_point)) => { + upstream_cache = Some(cache); + cache_sync_point = sync_point; + } + Err(e) => { + warn!("could not initialize upstream cache: {e:#}"); + } + } + } + + let sink = BlockSink { + context, + topic: cfg.block_topic, + genesis_values, + upstream_cache, + }; + + let mut manager = NetworkManager::new(cfg.magic_number, events, events_sender, sink); + for address in cfg.node_addresses { + manager.handle_new_connection(address, Duration::ZERO); + } + + match cfg.sync_point { + SyncPoint::Origin => manager.sync_to_point(Point::Origin), + SyncPoint::Tip => { + if let Err(error) = manager.sync_to_tip().await { + warn!("could not sync to tip: {error:#}"); + return; + } + } + SyncPoint::Cache => manager.sync_to_point(cache_sync_point), + SyncPoint::Snapshot => { + let mut subscription = + snapshot_complete.expect("Snapshot topic subscription missing"); + match Self::wait_snapshot_completion(&mut subscription).await { + Ok(point) => manager.sync_to_point(point), + Err(error) => { + warn!("snapshot restoration never completed: {error:#}"); + return; + } + } + } + } + + if let Err(err) = manager.run().await { + error!("chain sync failed: {err:#}"); + } + }); + + Ok(()) + } + + async fn init_cache( + cache_dir: &Path, + block_topic: &str, + context: &Context, + ) -> Result<(UpstreamCache, Point)> { + let mut cache = UpstreamCache::new(cache_dir)?; + let mut cache_sync_point = None; + cache.start_reading()?; + while let Some(record) = cache.read_record()? { + cache_sync_point = Some((record.id.slot, record.id.hash)); + let message = Arc::new(Message::Cardano(( + record.id, + CardanoMessage::BlockAvailable(Arc::unwrap_or_clone(record.message)), + ))); + context.message_bus.publish(block_topic, message).await?; + } + let sync_point = match cache_sync_point { + None => Point::Origin, + Some((slot, hash)) => Point::Specific(slot, hash.to_vec()), + }; + Ok((cache, sync_point)) + } + + async fn wait_genesis_completion( + subscription: &mut Box>, + ) -> Result { + let (_, message) = subscription.read().await?; + match message.as_ref() { + Message::Cardano((_, CardanoMessage::GenesisComplete(complete))) => { + Ok(complete.values.clone()) + } + msg => bail!("Unexpected message in genesis completion topic: {msg:?}"), + } + } + + async fn wait_snapshot_completion( + subscription: &mut Box>, + ) -> Result { + let (_, message) = subscription.read().await?; + match message.as_ref() { + Message::Cardano((block, CardanoMessage::SnapshotComplete)) => { + Ok(Point::Specific(block.slot, block.hash.to_vec())) + } + msg => bail!("Unexpected message in snapshot completion topic: {msg:?}"), + } + } +} + +struct BlockSink { + context: Arc>, + topic: String, + genesis_values: GenesisValues, + upstream_cache: Option, +} +impl BlockSink { + pub async fn announce( + &mut self, + header: &Header, + body: &[u8], + rolled_back: bool, + ) -> Result<()> { + let info = self.make_block_info(header, rolled_back); + let raw_block = RawBlockMessage { + header: header.bytes.clone(), + body: body.to_vec(), + }; + if let Some(cache) = self.upstream_cache.as_mut() { + let record = UpstreamCacheRecord { + id: info.clone(), + message: Arc::new(raw_block.clone()), + }; + cache.write_record(&record)?; + } + let message = Arc::new(Message::Cardano(( + info, + CardanoMessage::BlockAvailable(raw_block), + ))); + self.context.publish(&self.topic, message).await + } + + fn make_block_info(&self, header: &Header, rolled_back: bool) -> BlockInfo { + let slot = header.slot; + let (epoch, epoch_slot) = self.genesis_values.slot_to_epoch(slot); + let new_epoch = slot == self.genesis_values.epoch_to_first_slot(epoch); + let timestamp = self.genesis_values.slot_to_timestamp(slot); + BlockInfo { + status: if rolled_back { + BlockStatus::RolledBack + } else { + BlockStatus::Volatile + }, + slot, + number: header.number, + hash: header.hash, + epoch, + epoch_slot, + new_epoch, + timestamp, + era: header.era, + } + } +} diff --git a/modules/upstream_chain_fetcher/src/body_fetcher.rs b/modules/upstream_chain_fetcher/src/body_fetcher.rs index 50c0631e..6b453b3c 100644 --- a/modules/upstream_chain_fetcher/src/body_fetcher.rs +++ b/modules/upstream_chain_fetcher/src/body_fetcher.rs @@ -1,7 +1,11 @@ //! Acropolis Miniprotocols module for Caryatid //! Multi-connection, block body fetching part of the client (in separate thread). -use acropolis_common::{messages::RawBlockMessage, BlockHash, BlockInfo, BlockStatus, Era}; +use acropolis_common::{ + messages::RawBlockMessage, + upstream_cache::{UpstreamCache, UpstreamCacheRecord}, + BlockHash, BlockInfo, BlockStatus, Era, +}; use anyhow::{bail, Result}; use crossbeam::channel::{Receiver, TryRecvError}; use pallas::{ @@ -15,7 +19,6 @@ use std::{sync::Arc, time::Duration}; use tokio::{sync::Mutex, time::sleep}; use tracing::{debug, error, info}; -use crate::upstream_cache::{UpstreamCache, UpstreamCacheRecord}; use crate::{ utils, utils::{ diff --git a/modules/upstream_chain_fetcher/src/upstream_chain_fetcher.rs b/modules/upstream_chain_fetcher/src/upstream_chain_fetcher.rs index 71322189..6cbdca43 100644 --- a/modules/upstream_chain_fetcher/src/upstream_chain_fetcher.rs +++ b/modules/upstream_chain_fetcher/src/upstream_chain_fetcher.rs @@ -4,6 +4,7 @@ use acropolis_common::{ genesis_values::GenesisValues, messages::{CardanoMessage, Message}, + upstream_cache::{UpstreamCache, UpstreamCacheRecord}, BlockInfo, }; use anyhow::{anyhow, bail, Result}; @@ -24,12 +25,10 @@ use tokio::{sync::Mutex, time::sleep}; use tracing::{debug, error, info}; mod body_fetcher; -mod upstream_cache; mod utils; use crate::utils::FetchResult; use body_fetcher::BodyFetcher; -use upstream_cache::{UpstreamCache, UpstreamCacheRecord}; use utils::{FetcherConfig, SyncPoint}; const MAX_BODY_FETCHER_CHANNEL_LENGTH: usize = 100; @@ -228,7 +227,7 @@ impl UpstreamChainFetcher { Self::sync_to_point(cfg, None, Point::Origin).await?; } SyncPoint::Cache => { - let mut upstream_cache = UpstreamCache::new(&cfg.cache_dir); + let mut upstream_cache = UpstreamCache::new(&cfg.cache_dir)?; let point = match Self::read_cache(cfg.clone(), &mut upstream_cache).await? { None => Point::Origin, Some(blk) => Point::Specific(blk.slot, blk.hash.to_vec()), diff --git a/processes/omnibus/.gitignore b/processes/omnibus/.gitignore index 1fa2d34e..ce63bc1e 100644 --- a/processes/omnibus/.gitignore +++ b/processes/omnibus/.gitignore @@ -3,6 +3,7 @@ sled-immutable-utxos fjall-blocks fjall-immutable-utxos cache +upstream-cache # DB files *_db diff --git a/processes/omnibus/Cargo.toml b/processes/omnibus/Cargo.toml index 7814b5c8..4af31037 100644 --- a/processes/omnibus/Cargo.toml +++ b/processes/omnibus/Cargo.toml @@ -12,6 +12,7 @@ acropolis_common = { path = "../../common" } acropolis_module_genesis_bootstrapper = { path = "../../modules/genesis_bootstrapper" } acropolis_module_mithril_snapshot_fetcher = { path = "../../modules/mithril_snapshot_fetcher" } acropolis_module_upstream_chain_fetcher = { path = "../../modules/upstream_chain_fetcher" } +acropolis_module_peer_network_interface = { path = "../../modules/peer_network_interface" } acropolis_module_block_unpacker = { path = "../../modules/block_unpacker" } acropolis_module_tx_unpacker = { path = "../../modules/tx_unpacker" } acropolis_module_utxo_state = { path = "../../modules/utxo_state" } diff --git a/processes/omnibus/omnibus-local.toml b/processes/omnibus/omnibus-local.toml new file mode 100644 index 00000000..d90b9371 --- /dev/null +++ b/processes/omnibus/omnibus-local.toml @@ -0,0 +1,102 @@ +# Top-level configuration for Acropolis omnibus process + +[module.genesis-bootstrapper] +network-name = "preview" # "sanchonet", "preview", "mainnet" + +#[module.mithril-snapshot-fetcher] +#Turned off with SanchoNet +#aggregator-url = "https://aggregator.release-mainnet.api.mithril.network/aggregator" +#genesis-key = "5b3139312c36362c3134302c3138352c3133382c31312c3233372c3230372c3235302c3134342c32372c322c3138382c33302c31322c38312c3135352c3230342c31302c3137392c37352c32332c3133382c3139362c3231372c352c31342c32302c35372c37392c33392c3137365d" +#download = false + +[module.rest-blockfrost] + +[module.peer-network-interface] +sync-point = "origin" #"cache" # "origin", "tip", "snapshot" +node-addresses = [ + "localhost:3001", + "localhost:3002", + "localhost:3003", +] +magic-number = 2 + +[module.block-unpacker] + +[module.tx-unpacker] +publish-utxo-deltas-topic = "cardano.utxo.deltas" +publish-withdrawals-topic = "cardano.withdrawals" +publish-certificates-topic = "cardano.certificates" +publish-governance-topic = "cardano.governance" +publish-block-txs-topic = "cardano.block.txs" + +[module.utxo-state] +store = "memory" # "memory", "dashmap", "fjall", "fjall-async", "sled", "sled-async", "fake" +address-delta-topic = "cardano.address.delta" + +[module.spo-state] +# Enables /pools/{pool_id}/history endpoint, enables to query active_stakes +store-epochs-history = false +# Enable /pools/retired +store-retired-pools = false +# Enables /pools/{pool_id} endpoint +store-registration = false +# # Enables /pools/{pool_id}/updates endpoint +store-updates = false +# Enables /pools/{pool_id}/delegators endpoint (Requires store-stake-addresses to be enabled) +store-delegators = false +# Enables /pools/{pool_id}/votes endpoint +store-votes = false +# Store stake_addresses +store-stake-addresses = false + +[module.drep-state] + +[module.governance-state] + +[module.parameters-state] +store-history = false +network-name = "sanchonet" # "sanchonet", "mainnet" + +[module.stake-delta-filter] +cache-mode = "write" # "predefined", "read", "write", "write-if-absent" +write-full-cache = "false" + +[module.epochs-state] +# Enables /epochs/{number} endpoint (for historical epochs) +store-history = false +# Enables /pools/{pool_id}/blocks endpoint +store-block-hashes = false + + +[module.accounts-state] + +[module.clock] + +[module.rest-server] +address = "127.0.0.1" +port = 4340 + +[module.spy] +# Enable for message spying +#topic = "cardano.drep.state" + +[startup] +topic = "cardano.sequence.start" + +[message-bus.external] +class = "rabbit-mq" +url = "amqp://127.0.0.1:5672/%2f" +exchange = "caryatid" + +[message-bus.internal] +class = "in-memory" +workers = 50 +dispatch-queue-size = 1000 +worker-queue-size = 100 +bulk-block-capacity = 50 +bulk-resume-capacity = 75 + +# Message routing +[[message-router.route]] # Everything is internal only +pattern = "#" +bus = "internal" diff --git a/processes/omnibus/src/main.rs b/processes/omnibus/src/main.rs index ab2d4ca2..0a8d7f83 100644 --- a/processes/omnibus/src/main.rs +++ b/processes/omnibus/src/main.rs @@ -22,6 +22,7 @@ use acropolis_module_governance_state::GovernanceState; use acropolis_module_historical_accounts_state::HistoricalAccountsState; use acropolis_module_mithril_snapshot_fetcher::MithrilSnapshotFetcher; use acropolis_module_parameters_state::ParametersState; +use acropolis_module_peer_network_interface::PeerNetworkInterface; use acropolis_module_rest_blockfrost::BlockfrostREST; use acropolis_module_spdd_state::SPDDState; use acropolis_module_spo_state::SPOState; @@ -102,6 +103,7 @@ pub async fn main() -> Result<()> { MithrilSnapshotFetcher::register(&mut process); UpstreamChainFetcher::register(&mut process); BlockUnpacker::register(&mut process); + PeerNetworkInterface::register(&mut process); TxUnpacker::register(&mut process); UTXOState::register(&mut process); SPOState::register(&mut process);