From 7390aad07b180e8f8c4871e1f9ab082549d404f0 Mon Sep 17 00:00:00 2001 From: Simon Gellis Date: Mon, 3 Nov 2025 14:47:04 -0500 Subject: [PATCH 01/15] feat: implementation of PeerNetworkInterface --- Cargo.lock | 16 ++ Cargo.toml | 1 + common/src/genesis_values.rs | 1 + modules/peer_network_interface/Cargo.toml | 26 ++ .../config.default.toml | 11 + .../src/configuration.rs | 42 +++ .../peer_network_interface/src/connection.rs | 233 ++++++++++++++++ modules/peer_network_interface/src/network.rs | 259 ++++++++++++++++++ .../src/peer_network_interface.rs | 144 ++++++++++ 9 files changed, 733 insertions(+) create mode 100644 modules/peer_network_interface/Cargo.toml create mode 100644 modules/peer_network_interface/config.default.toml create mode 100644 modules/peer_network_interface/src/configuration.rs create mode 100644 modules/peer_network_interface/src/connection.rs create mode 100644 modules/peer_network_interface/src/network.rs create mode 100644 modules/peer_network_interface/src/peer_network_interface.rs diff --git a/Cargo.lock b/Cargo.lock index d237f4ca..1aa318d9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -302,6 +302,22 @@ dependencies = [ "tracing", ] +[[package]] +name = "acropolis_module_peer_network_interface" +version = "0.2.0" +dependencies = [ + "acropolis_common", + "anyhow", + "caryatid_sdk", + "config", + "crossbeam", + "pallas 0.33.0", + "serde", + "serde_json", + "tokio", + "tracing", +] + [[package]] name = "acropolis_module_rest_blockfrost" version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml index 035c1edb..86b8639e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,6 +11,7 @@ members = [ "modules/mithril_snapshot_fetcher", # Mithril snapshot fetcher "modules/snapshot_bootstrapper", # Bootstrap state from a ledger snapshot "modules/upstream_chain_fetcher", # Upstream chain fetcher + "modules/peer_network_interface", # Multi-peer network interface "modules/block_unpacker", # Block to transaction unpacker "modules/tx_unpacker", # Tx to UTXO unpacker "modules/utxo_state", # UTXO state diff --git a/common/src/genesis_values.rs b/common/src/genesis_values.rs index 33781b81..bc339142 100644 --- a/common/src/genesis_values.rs +++ b/common/src/genesis_values.rs @@ -6,6 +6,7 @@ const MAINNET_SHELLEY_GENESIS_HASH: &str = "1a3be38bcbb7911969283716ad7aa550250226b76a61fc51cc9a9a35d9276d81"; #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +#[serde(rename_all = "kebab-case")] pub struct GenesisValues { pub byron_timestamp: u64, pub shelley_epoch: u64, diff --git a/modules/peer_network_interface/Cargo.toml b/modules/peer_network_interface/Cargo.toml new file mode 100644 index 00000000..f9a0804a --- /dev/null +++ b/modules/peer_network_interface/Cargo.toml @@ -0,0 +1,26 @@ +# Acropolis upstream chain fetcher module + +[package] +name = "acropolis_module_peer_network_interface" +version = "0.2.0" +edition = "2024" +authors = ["Paul Clark "] +description = "Multiplexed chain fetcher Caryatid module for Acropolis" +license = "Apache-2.0" + +[dependencies] +acropolis_common = { path = "../../common" } + +caryatid_sdk = { workspace = true } + +anyhow = { workspace = true } +config = { workspace = true } +crossbeam = "0.8.4" +pallas = { workspace = true } +serde = { workspace = true, features = ["rc"] } +serde_json = { workspace = true } +tokio = { workspace = true } +tracing = { workspace = true } + +[lib] +path = "src/peer_network_interface.rs" diff --git a/modules/peer_network_interface/config.default.toml b/modules/peer_network_interface/config.default.toml new file mode 100644 index 00000000..da216c60 --- /dev/null +++ b/modules/peer_network_interface/config.default.toml @@ -0,0 +1,11 @@ +block-topic = "cardano.block.available" +snapshot-completion-topic = "cardano.snapshot.complete" +genesis-completion-topic = "cardano.sequence.bootstrapped" + +node-addresses = [ + "backbone.cardano.iog.io:3001", +] +magic-number = 764824073 + +sync-point = "origin" +cache-dir = "upstream-cache" \ No newline at end of file diff --git a/modules/peer_network_interface/src/configuration.rs b/modules/peer_network_interface/src/configuration.rs new file mode 100644 index 00000000..8aca18a7 --- /dev/null +++ b/modules/peer_network_interface/src/configuration.rs @@ -0,0 +1,42 @@ +use std::path::PathBuf; + +use acropolis_common::genesis_values::GenesisValues; +use anyhow::Result; +use config::Config; + +#[derive(Clone, Debug, serde::Deserialize, PartialEq)] +#[serde(rename_all = "camelCase")] +pub enum SyncPoint { + Origin, + Tip, + Cache, + Snapshot, +} + +#[derive(serde::Deserialize)] +#[serde(rename_all = "kebab-case")] +pub struct InterfaceConfig { + pub block_topic: String, + pub sync_point: SyncPoint, + pub snapshot_completion_topic: String, + pub genesis_completion_topic: String, + pub node_addresses: Vec, + pub magic_number: u64, + #[expect(unused)] + pub cache_dir: PathBuf, + #[serde(flatten)] + pub genesis_values: Option, +} + +impl InterfaceConfig { + pub fn try_load(config: &Config) -> Result { + let full_config = Config::builder() + .add_source(config::File::from_str( + include_str!("../config.default.toml"), + config::FileFormat::Toml, + )) + .add_source(config.clone()) + .build()?; + Ok(full_config.try_deserialize()?) + } +} diff --git a/modules/peer_network_interface/src/connection.rs b/modules/peer_network_interface/src/connection.rs new file mode 100644 index 00000000..d38ad595 --- /dev/null +++ b/modules/peer_network_interface/src/connection.rs @@ -0,0 +1,233 @@ +use std::time::Duration; + +use acropolis_common::{BlockHash, Era}; +use anyhow::{Result, bail}; +pub use pallas::network::miniprotocols::Point; +use pallas::{ + ledger::traverse::MultiEraHeader, + network::{ + facades::PeerClient, + miniprotocols::{blockfetch, chainsync}, + }, +}; +use tokio::{ + select, + sync::{mpsc, oneshot}, +}; +use tracing::error; + +use crate::network::PeerMessageSender; + +pub struct PeerConnection { + pub address: String, + chainsync: mpsc::Sender, + blockfetch: mpsc::Sender, +} + +impl PeerConnection { + pub fn new(address: String, magic: u64, sender: PeerMessageSender, delay: Duration) -> Self { + let worker = PeerConnectionWorker { + address: address.clone(), + magic, + sender, + }; + let (chainsync_tx, chainsync_rx) = mpsc::channel(16); + let (blockfetch_tx, blockfetch_rx) = mpsc::channel(16); + tokio::spawn(async move { + tokio::time::sleep(delay).await; + worker.run(chainsync_rx, blockfetch_rx).await; + }); + Self { + address, + chainsync: chainsync_tx, + blockfetch: blockfetch_tx, + } + } + + pub async fn find_tip(&self) -> Result { + let (tx, rx) = oneshot::channel(); + self.chainsync.send(ChainsyncCommand::FindTip(tx)).await?; + Ok(rx.await?) + } + + pub async fn find_intersect(&self, points: Vec) -> Result<()> { + self.chainsync.send(ChainsyncCommand::FindIntersect(points)).await?; + Ok(()) + } + + pub async fn request_block(&self, hash: BlockHash, height: u64) -> Result<()> { + self.blockfetch.send(BlockfetchCommand::Fetch(hash, height)).await?; + Ok(()) + } +} + +pub enum PeerEvent { + ChainSync(PeerChainSyncEvent), + BlockFetched(BlockFetched), + Disconnected, +} + +pub enum PeerChainSyncEvent { + RollForward(Header), + RollBackward(Point), +} + +#[derive(Clone)] +pub struct Header { + pub hash: BlockHash, + pub slot: u64, + pub number: u64, + pub bytes: Vec, + pub era: Era, +} + +pub struct BlockFetched { + pub hash: BlockHash, + pub body: Vec, +} + +struct PeerConnectionWorker { + address: String, + magic: u64, + sender: PeerMessageSender, +} + +impl PeerConnectionWorker { + async fn run( + mut self, + chainsync: mpsc::Receiver, + blockfetch: mpsc::Receiver, + ) { + if let Err(err) = self.do_run(chainsync, blockfetch).await { + error!(peer = self.address, "{err:#}"); + } + let _ = self.sender.write(PeerEvent::Disconnected).await; + } + + async fn do_run( + &mut self, + chainsync: mpsc::Receiver, + blockfetch: mpsc::Receiver, + ) -> Result<()> { + let client = PeerClient::connect(self.address.clone(), self.magic).await?; + select! { + res = self.run_chainsync(client.chainsync, chainsync) => res, + res = self.run_blockfetch(client.blockfetch, blockfetch) => res, + } + } + + async fn run_chainsync( + &self, + mut client: chainsync::N2NClient, + mut commands: mpsc::Receiver, + ) -> Result<()> { + let mut reached = None; + loop { + select! { + msg = client.request_or_await_next(), if reached.is_some() => { + if let Some(parsed) = self.parse_chainsync_message(msg?)? { + reached = Some(parsed.point); + self.sender.write(PeerEvent::ChainSync(parsed.event)).await?; + } + } + cmd = commands.recv() => { + let Some(cmd) = cmd else { + bail!("parent process has disconnected"); + }; + match cmd { + ChainsyncCommand::FindIntersect(points) => { + let (point, _) = client.find_intersect(points).await?; + reached = point; + } + ChainsyncCommand::FindTip(done) => { + let points = reached.as_slice().to_vec(); + let (_, tip) = client.find_intersect(points).await?; + if done.send(tip.0).is_err() { + bail!("parent process has disconnected"); + } + } + } + } + } + } + } + + async fn run_blockfetch( + &self, + mut client: blockfetch::Client, + mut commands: mpsc::Receiver, + ) -> Result<()> { + while let Some(BlockfetchCommand::Fetch(hash, height)) = commands.recv().await { + let point = Point::Specific(height, hash.to_vec()); + let body = client.fetch_single(point).await?; + self.sender.write(PeerEvent::BlockFetched(BlockFetched { hash, body })).await?; + } + bail!("parent process has disconnected"); + } + + fn parse_chainsync_message( + &self, + msg: chainsync::NextResponse, + ) -> Result> { + match msg { + chainsync::NextResponse::RollForward(header, _) => { + let Some(parsed) = self.parse_header(header)? else { + return Ok(None); + }; + let point = Point::Specific(parsed.number, parsed.hash.to_vec()); + Ok(Some(ParsedChainsyncMessage { + point, + event: PeerChainSyncEvent::RollForward(parsed), + })) + } + chainsync::NextResponse::RollBackward(point, _) => Ok(Some(ParsedChainsyncMessage { + point: point.clone(), + event: PeerChainSyncEvent::RollBackward(point), + })), + chainsync::NextResponse::Await => Ok(None), + } + } + + fn parse_header(&self, header: chainsync::HeaderContent) -> Result> { + let hdr_tag = header.byron_prefix.map(|p| p.0); + let hdr_variant = header.variant; + let hdr = MultiEraHeader::decode(hdr_variant, hdr_tag, &header.cbor)?; + let era = match hdr { + MultiEraHeader::EpochBoundary(_) => return Ok(None), + MultiEraHeader::Byron(_) => Era::Byron, + MultiEraHeader::ShelleyCompatible(_) => match hdr_variant { + 1 => Era::Shelley, + 2 => Era::Allegra, + 3 => Era::Mary, + 4 => Era::Alonzo, + x => bail!("Impossible header variant {x} for ShelleyCompatible (TPraos)"), + }, + MultiEraHeader::BabbageCompatible(_) => match hdr_variant { + 5 => Era::Babbage, + 6 => Era::Conway, + x => bail!("Impossible header variant {x} for BabbageCompatible (Praos)"), + }, + }; + Ok(Some(Header { + hash: BlockHash::new(*hdr.hash()), + slot: hdr.slot(), + number: hdr.number(), + bytes: header.cbor, + era, + })) + } +} + +enum ChainsyncCommand { + FindIntersect(Vec), + FindTip(oneshot::Sender), +} + +struct ParsedChainsyncMessage { + point: Point, + event: PeerChainSyncEvent, +} + +enum BlockfetchCommand { + Fetch(BlockHash, u64), +} diff --git a/modules/peer_network_interface/src/network.rs b/modules/peer_network_interface/src/network.rs new file mode 100644 index 00000000..230ebd19 --- /dev/null +++ b/modules/peer_network_interface/src/network.rs @@ -0,0 +1,259 @@ +use std::{ + collections::{HashMap, VecDeque}, + time::Duration, +}; + +use crate::{ + BlockSink, + connection::{Header, PeerChainSyncEvent, PeerConnection, PeerEvent}, +}; +use acropolis_common::BlockHash; +use anyhow::{Context as _, Result, bail}; +use pallas::network::miniprotocols::Point; +use tokio::sync::mpsc; +use tracing::warn; + +pub struct NetworkManager { + network_magic: u64, + next_id: u64, + peers: HashMap, + preferred_upstream: Option, + blocks_to_fetch: VecDeque
, + blocks: HashMap, + head: Option, + rolled_back: bool, + events: mpsc::Receiver, + events_sender: mpsc::Sender, + block_sink: BlockSink, +} + +impl NetworkManager { + pub fn new( + network_magic: u64, + events: mpsc::Receiver, + events_sender: mpsc::Sender, + block_sink: BlockSink, + ) -> Self { + Self { + network_magic, + next_id: 0, + peers: HashMap::new(), + preferred_upstream: None, + blocks_to_fetch: VecDeque::new(), + blocks: HashMap::new(), + head: None, + rolled_back: false, + events, + events_sender, + block_sink, + } + } + + pub async fn run(mut self) -> Result<()> { + while let Some(event) = self.events.recv().await { + match event { + NetworkEvent::NewConnection { address, delay } => { + self.handle_new_connection(address, delay).await + } + NetworkEvent::PeerUpdate { peer, event } => { + self.handle_peer_update(peer, event).await? + } + } + } + bail!("event sink closed") + } + + pub async fn handle_new_connection(&mut self, address: String, delay: Duration) { + let id = PeerId(self.next_id); + self.next_id += 1; + let sender = PeerMessageSender { + sink: self.events_sender.clone(), + id, + }; + let conn = PeerConnection::new(address, self.network_magic, sender, delay); + if self.preferred_upstream.is_none() { + self.peers.insert(id, conn); + self.set_preferred_upstream(id).await; + } else { + if let Some(head) = self.head.clone() + && let Err(error) = conn.find_intersect(vec![head]).await + { + warn!("could not sync {}: {error}", conn.address); + } + self.peers.insert(id, conn); + } + } + + pub async fn sync_to_tip(&mut self) -> Result<()> { + loop { + let Some(upstream) = self.preferred_upstream else { + bail!("no peers"); + }; + let Some(conn) = self.peers.get(&upstream) else { + bail!("preferred upstream not found"); + }; + match conn.find_tip().await { + Ok(point) => { + self.sync_to_point(point).await; + return Ok(()); + } + Err(e) => { + warn!("could not fetch tip from {}: {e}", conn.address); + self.handle_disconnect(upstream).await?; + } + } + } + } + + pub async fn sync_to_point(&mut self, point: Point) { + for conn in self.peers.values() { + if let Err(error) = conn.find_intersect(vec![point.clone()]).await { + warn!("could not sync {}: {error}", conn.address); + } + } + } + + async fn handle_peer_update(&mut self, peer: PeerId, event: PeerEvent) -> Result<()> { + let is_preferred = self.preferred_upstream.is_some_and(|id| id == peer); + match event { + PeerEvent::ChainSync(PeerChainSyncEvent::RollForward(header)) => { + let id = header.hash; + let status = + self.blocks.entry(id).or_insert(BlockStatus::Announced(header, vec![])); + match status { + BlockStatus::Announced(header, peers) => { + peers.push(peer); + if is_preferred { + self.blocks_to_fetch.push_back(header.clone()); + // Request the block from the first peer which announced it + for announcer in peers.clone() { + let Some(peer) = self.peers.get(&announcer) else { + continue; + }; + if let Err(e) = peer.request_block(header.hash, header.number).await + { + warn!("could not request block from {}: {e}", peer.address); + self.handle_disconnect(announcer).await? + } + break; // only fetch from one + } + } + } + BlockStatus::Fetched(_) => { + if is_preferred { + // Chainsync has requested a block which we've already fetched, + // so we might be able to publish one or more. + self.publish_blocks().await?; + } + } + } + } + PeerEvent::ChainSync(PeerChainSyncEvent::RollBackward(point)) => { + if is_preferred { + match point { + Point::Origin => { + self.blocks_to_fetch.clear(); + self.rolled_back = true; + } + Point::Specific(number, _) => { + let mut already_sent = true; + while let Some(newest) = self.blocks_to_fetch.back() { + if newest.number == number { + already_sent = false; + break; + } else { + self.blocks_to_fetch.pop_back(); + } + } + if already_sent { + self.rolled_back = true; + } + } + } + } + } + PeerEvent::BlockFetched(fetched) => { + let Some(block) = self.blocks.get_mut(&fetched.hash) else { + return Ok(()); + }; + block.set_body(&fetched.body); + self.publish_blocks().await?; + } + PeerEvent::Disconnected => self.handle_disconnect(peer).await?, + } + Ok(()) + } + + async fn handle_disconnect(&mut self, peer: PeerId) -> Result<()> { + let Some(conn) = self.peers.remove(&peer) else { + return Ok(()); + }; + let is_preferred = self.preferred_upstream.is_some_and(|id| id == peer); + if is_preferred && let Some(new_preferred) = self.peers.keys().next().copied() { + self.set_preferred_upstream(new_preferred).await; + } + self.events_sender + .send(NetworkEvent::NewConnection { + address: conn.address, + delay: Duration::from_secs(5), + }) + .await?; + Ok(()) + } + + async fn set_preferred_upstream(&mut self, peer: PeerId) { + self.preferred_upstream = Some(peer); + if let Some(head) = self.head.clone() { + self.sync_to_point(head).await; + } + } + + async fn publish_blocks(&mut self) -> Result<()> { + while let Some(header) = self.blocks_to_fetch.front() { + let Some(BlockStatus::Fetched(body)) = self.blocks.get(&header.hash) else { + break; + }; + self.block_sink.announce(header, body, self.rolled_back).await?; + self.head = Some(Point::Specific(header.number, header.hash.to_vec())); + self.rolled_back = false; + self.blocks_to_fetch.pop_front(); + } + Ok(()) + } +} + +pub enum NetworkEvent { + NewConnection { address: String, delay: Duration }, + PeerUpdate { peer: PeerId, event: PeerEvent }, +} + +#[derive(Clone, Copy, PartialEq, Eq, Hash)] +pub struct PeerId(u64); + +pub struct PeerMessageSender { + id: PeerId, + sink: mpsc::Sender, +} +impl PeerMessageSender { + pub async fn write(&self, event: PeerEvent) -> Result<()> { + self.sink + .send(NetworkEvent::PeerUpdate { + peer: self.id, + event, + }) + .await + .context("network manager has shut down") + } +} + +enum BlockStatus { + Announced(Header, Vec), + Fetched(Vec), +} +impl BlockStatus { + fn set_body(&mut self, body: &[u8]) { + if let Self::Announced(_, _) = self { + *self = Self::Fetched(body.to_vec()); + } + } +} diff --git a/modules/peer_network_interface/src/peer_network_interface.rs b/modules/peer_network_interface/src/peer_network_interface.rs new file mode 100644 index 00000000..d9bd0b9c --- /dev/null +++ b/modules/peer_network_interface/src/peer_network_interface.rs @@ -0,0 +1,144 @@ +mod configuration; +mod connection; +mod network; + +use acropolis_common::{ + BlockInfo, BlockStatus, + genesis_values::GenesisValues, + messages::{CardanoMessage, Message, RawBlockMessage}, +}; +use anyhow::{Result, bail}; +use caryatid_sdk::{Context, Module, Subscription, module}; +use config::Config; +use pallas::network::miniprotocols::Point; +use tokio::sync::mpsc; + +use std::{sync::Arc, time::Duration}; + +use crate::{ + configuration::{InterfaceConfig, SyncPoint}, + connection::Header, + network::NetworkManager, +}; + +#[module( + message_type(Message), + name = "peer-network-interface", + description = "Mini-protocol chain fetcher from several upstream nodes" +)] +pub struct PeerNetworkInterface; + +impl PeerNetworkInterface { + pub async fn init(&self, context: Arc>, config: Arc) -> Result<()> { + let cfg = InterfaceConfig::try_load(&config)?; + let genesis_complete = if cfg.genesis_values.is_none() { + Some(context.subscribe(&cfg.genesis_completion_topic).await?) + } else { + None + }; + let snapshot_complete = match cfg.sync_point { + SyncPoint::Snapshot => Some(context.subscribe(&cfg.snapshot_completion_topic).await?), + _ => None, + }; + let (events_sender, events) = mpsc::channel(1024); + + context.clone().run(async move { + let genesis_values = if let Some(mut sub) = genesis_complete { + Self::wait_genesis_completion(&mut sub) + .await + .expect("could not fetch genesis values") + } else { + cfg.genesis_values.expect("genesis values not found") + }; + let sink = BlockSink { + context, + topic: cfg.block_topic, + genesis_values, + }; + + let mut manager = NetworkManager::new(cfg.magic_number, events, events_sender, sink); + for address in cfg.node_addresses { + manager.handle_new_connection(address, Duration::ZERO).await; + } + + match cfg.sync_point { + SyncPoint::Origin => manager.sync_to_point(Point::Origin).await, + SyncPoint::Tip => manager.sync_to_tip().await?, + SyncPoint::Cache => unimplemented!(), + SyncPoint::Snapshot => { + let mut subscription = + snapshot_complete.expect("Snapshot topic subscription missing"); + let point = Self::wait_snapshot_completion(&mut subscription).await?; + manager.sync_to_point(point).await; + } + } + + manager.run().await + }); + + Ok(()) + } + + async fn wait_genesis_completion( + subscription: &mut Box>, + ) -> Result { + let (_, message) = subscription.read().await?; + match message.as_ref() { + Message::Cardano((_, CardanoMessage::GenesisComplete(complete))) => { + Ok(complete.values.clone()) + } + msg => bail!("Unexpected message in genesis completion topic: {msg:?}"), + } + } + + async fn wait_snapshot_completion( + subscription: &mut Box>, + ) -> Result { + let (_, message) = subscription.read().await?; + match message.as_ref() { + Message::Cardano((block, CardanoMessage::SnapshotComplete)) => { + Ok(Point::Specific(block.number, block.hash.to_vec())) + } + msg => bail!("Unexpected message in snapshot completion topic: {msg:?}"), + } + } +} + +struct BlockSink { + context: Arc>, + topic: String, + genesis_values: GenesisValues, +} +impl BlockSink { + pub async fn announce(&self, header: &Header, body: &[u8], rolled_back: bool) -> Result<()> { + let info = self.make_block_info(header, rolled_back); + let available = CardanoMessage::BlockAvailable(RawBlockMessage { + header: header.bytes.clone(), + body: body.to_vec(), + }); + let message = Arc::new(Message::Cardano((info, available))); + self.context.publish(&self.topic, message).await + } + + fn make_block_info(&self, header: &Header, rolled_back: bool) -> BlockInfo { + let slot = header.slot; + let (epoch, epoch_slot) = self.genesis_values.slot_to_epoch(slot); + let new_epoch = slot == self.genesis_values.epoch_to_first_slot(epoch); + let timestamp = self.genesis_values.slot_to_timestamp(slot); + BlockInfo { + status: if rolled_back { + BlockStatus::RolledBack + } else { + BlockStatus::Volatile + }, + slot, + number: header.number, + hash: header.hash, + epoch, + epoch_slot, + new_epoch, + timestamp, + era: header.era, + } + } +} From 22813110288b01ef61a2d1fbf46cf889cb9ef6c9 Mon Sep 17 00:00:00 2001 From: Simon Gellis Date: Mon, 3 Nov 2025 15:24:09 -0500 Subject: [PATCH 02/15] fix: blockfetch uses slots --- .../peer_network_interface/src/connection.rs | 11 +++++++---- modules/peer_network_interface/src/network.rs | 18 ++++++++++++------ .../src/peer_network_interface.rs | 2 +- 3 files changed, 20 insertions(+), 11 deletions(-) diff --git a/modules/peer_network_interface/src/connection.rs b/modules/peer_network_interface/src/connection.rs index d38ad595..d50c02e5 100644 --- a/modules/peer_network_interface/src/connection.rs +++ b/modules/peer_network_interface/src/connection.rs @@ -61,18 +61,20 @@ impl PeerConnection { } } +#[derive(Debug)] pub enum PeerEvent { ChainSync(PeerChainSyncEvent), BlockFetched(BlockFetched), Disconnected, } +#[derive(Debug)] pub enum PeerChainSyncEvent { RollForward(Header), RollBackward(Point), } -#[derive(Clone)] +#[derive(Clone, Debug)] pub struct Header { pub hash: BlockHash, pub slot: u64, @@ -81,6 +83,7 @@ pub struct Header { pub era: Era, } +#[derive(Debug)] pub struct BlockFetched { pub hash: BlockHash, pub body: Vec, @@ -157,8 +160,8 @@ impl PeerConnectionWorker { mut client: blockfetch::Client, mut commands: mpsc::Receiver, ) -> Result<()> { - while let Some(BlockfetchCommand::Fetch(hash, height)) = commands.recv().await { - let point = Point::Specific(height, hash.to_vec()); + while let Some(BlockfetchCommand::Fetch(hash, slot)) = commands.recv().await { + let point = Point::Specific(slot, hash.to_vec()); let body = client.fetch_single(point).await?; self.sender.write(PeerEvent::BlockFetched(BlockFetched { hash, body })).await?; } @@ -174,7 +177,7 @@ impl PeerConnectionWorker { let Some(parsed) = self.parse_header(header)? else { return Ok(None); }; - let point = Point::Specific(parsed.number, parsed.hash.to_vec()); + let point = Point::Specific(parsed.slot, parsed.hash.to_vec()); Ok(Some(ParsedChainsyncMessage { point, event: PeerChainSyncEvent::RollForward(parsed), diff --git a/modules/peer_network_interface/src/network.rs b/modules/peer_network_interface/src/network.rs index 230ebd19..6694102b 100644 --- a/modules/peer_network_interface/src/network.rs +++ b/modules/peer_network_interface/src/network.rs @@ -11,7 +11,7 @@ use acropolis_common::BlockHash; use anyhow::{Context as _, Result, bail}; use pallas::network::miniprotocols::Point; use tokio::sync::mpsc; -use tracing::warn; +use tracing::{info, warn}; pub struct NetworkManager { network_magic: u64, @@ -25,6 +25,7 @@ pub struct NetworkManager { events: mpsc::Receiver, events_sender: mpsc::Sender, block_sink: BlockSink, + published_blocks: u64, } impl NetworkManager { @@ -46,6 +47,7 @@ impl NetworkManager { events, events_sender, block_sink, + published_blocks: 0, } } @@ -130,7 +132,7 @@ impl NetworkManager { let Some(peer) = self.peers.get(&announcer) else { continue; }; - if let Err(e) = peer.request_block(header.hash, header.number).await + if let Err(e) = peer.request_block(header.hash, header.slot).await { warn!("could not request block from {}: {e}", peer.address); self.handle_disconnect(announcer).await? @@ -155,10 +157,10 @@ impl NetworkManager { self.blocks_to_fetch.clear(); self.rolled_back = true; } - Point::Specific(number, _) => { + Point::Specific(slot, _) => { let mut already_sent = true; while let Some(newest) = self.blocks_to_fetch.back() { - if newest.number == number { + if newest.slot == slot { already_sent = false; break; } else { @@ -214,7 +216,11 @@ impl NetworkManager { break; }; self.block_sink.announce(header, body, self.rolled_back).await?; - self.head = Some(Point::Specific(header.number, header.hash.to_vec())); + self.published_blocks += 1; + if self.published_blocks.is_multiple_of(100) { + info!("Published block {}", header.number); + } + self.head = Some(Point::Specific(header.slot, header.hash.to_vec())); self.rolled_back = false; self.blocks_to_fetch.pop_front(); } @@ -227,7 +233,7 @@ pub enum NetworkEvent { PeerUpdate { peer: PeerId, event: PeerEvent }, } -#[derive(Clone, Copy, PartialEq, Eq, Hash)] +#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)] pub struct PeerId(u64); pub struct PeerMessageSender { diff --git a/modules/peer_network_interface/src/peer_network_interface.rs b/modules/peer_network_interface/src/peer_network_interface.rs index d9bd0b9c..f3b5f9d9 100644 --- a/modules/peer_network_interface/src/peer_network_interface.rs +++ b/modules/peer_network_interface/src/peer_network_interface.rs @@ -97,7 +97,7 @@ impl PeerNetworkInterface { let (_, message) = subscription.read().await?; match message.as_ref() { Message::Cardano((block, CardanoMessage::SnapshotComplete)) => { - Ok(Point::Specific(block.number, block.hash.to_vec())) + Ok(Point::Specific(block.slot, block.hash.to_vec())) } msg => bail!("Unexpected message in snapshot completion topic: {msg:?}"), } From 2d1c5b1fda2ab28ebfc3771fdf2922cc6271bef0 Mon Sep 17 00:00:00 2001 From: Simon Gellis Date: Mon, 3 Nov 2025 16:48:21 -0500 Subject: [PATCH 03/15] feat: test PeerNetworkInterface in omnibus --- Cargo.lock | 1 + modules/genesis_bootstrapper/build.rs | 10 ++ .../src/genesis_bootstrapper.rs | 8 ++ processes/omnibus/Cargo.toml | 1 + processes/omnibus/omnibus-local.toml | 102 ++++++++++++++++++ processes/omnibus/src/main.rs | 2 + 6 files changed, 124 insertions(+) create mode 100644 processes/omnibus/omnibus-local.toml diff --git a/Cargo.lock b/Cargo.lock index 1aa318d9..022148b5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -502,6 +502,7 @@ dependencies = [ "acropolis_module_historical_accounts_state", "acropolis_module_mithril_snapshot_fetcher", "acropolis_module_parameters_state", + "acropolis_module_peer_network_interface", "acropolis_module_rest_blockfrost", "acropolis_module_spdd_state", "acropolis_module_spo_state", diff --git a/modules/genesis_bootstrapper/build.rs b/modules/genesis_bootstrapper/build.rs index b4fac344..15376a94 100644 --- a/modules/genesis_bootstrapper/build.rs +++ b/modules/genesis_bootstrapper/build.rs @@ -62,6 +62,16 @@ async fn main() -> Result<()> { "https://book.world.dev.cardano.org/environments/mainnet/shelley-genesis.json", "mainnet-shelley-genesis.json", ), + download( + &client, + "https://book.world.dev.cardano.org/environments/preview/byron-genesis.json", + "preview-byron-genesis.json", + ), + download( + &client, + "https://book.world.dev.cardano.org/environments/preview/shelley-genesis.json", + "preview-shelley-genesis.json", + ), download( &client, "https://raw.githubusercontent.com/Hornan7/SanchoNet-Tutorials/refs/heads/main/genesis/byron-genesis.json", diff --git a/modules/genesis_bootstrapper/src/genesis_bootstrapper.rs b/modules/genesis_bootstrapper/src/genesis_bootstrapper.rs index 58997e20..1f076fc9 100644 --- a/modules/genesis_bootstrapper/src/genesis_bootstrapper.rs +++ b/modules/genesis_bootstrapper/src/genesis_bootstrapper.rs @@ -32,6 +32,9 @@ const DEFAULT_NETWORK_NAME: &str = "mainnet"; const MAINNET_BYRON_GENESIS: &[u8] = include_bytes!("../downloads/mainnet-byron-genesis.json"); const MAINNET_SHELLEY_GENESIS: &[u8] = include_bytes!("../downloads/mainnet-shelley-genesis.json"); const MAINNET_SHELLEY_START_EPOCH: u64 = 208; +const PREVIEW_BYRON_GENESIS: &[u8] = include_bytes!("../downloads/preview-byron-genesis.json"); +const PREVIEW_SHELLEY_GENESIS: &[u8] = include_bytes!("../downloads/preview-shelley-genesis.json"); +const PREVIEW_SHELLEY_START_EPOCH: u64 = 0; const SANCHONET_BYRON_GENESIS: &[u8] = include_bytes!("../downloads/sanchonet-byron-genesis.json"); const SANCHONET_SHELLEY_GENESIS: &[u8] = include_bytes!("../downloads/sanchonet-shelley-genesis.json"); @@ -101,6 +104,11 @@ impl GenesisBootstrapper { MAINNET_SHELLEY_GENESIS, MAINNET_SHELLEY_START_EPOCH, ), + "preview" => ( + PREVIEW_BYRON_GENESIS, + PREVIEW_SHELLEY_GENESIS, + PREVIEW_SHELLEY_START_EPOCH, + ), "sanchonet" => ( SANCHONET_BYRON_GENESIS, SANCHONET_SHELLEY_GENESIS, diff --git a/processes/omnibus/Cargo.toml b/processes/omnibus/Cargo.toml index 7814b5c8..4af31037 100644 --- a/processes/omnibus/Cargo.toml +++ b/processes/omnibus/Cargo.toml @@ -12,6 +12,7 @@ acropolis_common = { path = "../../common" } acropolis_module_genesis_bootstrapper = { path = "../../modules/genesis_bootstrapper" } acropolis_module_mithril_snapshot_fetcher = { path = "../../modules/mithril_snapshot_fetcher" } acropolis_module_upstream_chain_fetcher = { path = "../../modules/upstream_chain_fetcher" } +acropolis_module_peer_network_interface = { path = "../../modules/peer_network_interface" } acropolis_module_block_unpacker = { path = "../../modules/block_unpacker" } acropolis_module_tx_unpacker = { path = "../../modules/tx_unpacker" } acropolis_module_utxo_state = { path = "../../modules/utxo_state" } diff --git a/processes/omnibus/omnibus-local.toml b/processes/omnibus/omnibus-local.toml new file mode 100644 index 00000000..488e7ead --- /dev/null +++ b/processes/omnibus/omnibus-local.toml @@ -0,0 +1,102 @@ +# Top-level configuration for Acropolis omnibus process + +[module.genesis-bootstrapper] +network-name = "preview" # "sanchonet", "mainnet" + +#[module.mithril-snapshot-fetcher] +#Turned off with SanchoNet +#aggregator-url = "https://aggregator.release-mainnet.api.mithril.network/aggregator" +#genesis-key = "5b3139312c36362c3134302c3138352c3133382c31312c3233372c3230372c3235302c3134342c32372c322c3138382c33302c31322c38312c3135352c3230342c31302c3137392c37352c32332c3133382c3139362c3231372c352c31342c32302c35372c37392c33392c3137365d" +#download = false + +[module.rest-blockfrost] + +[module.peer-network-interface] +sync-point = "origin" #"cache" # "origin", "tip", "snapshot" +node-addresses = [ + "localhost:3001", + "localhost:3002", + "localhost:3003", +] +magic-number = 2 + +[module.block-unpacker] + +[module.tx-unpacker] +publish-utxo-deltas-topic = "cardano.utxo.deltas" +publish-withdrawals-topic = "cardano.withdrawals" +publish-certificates-topic = "cardano.certificates" +publish-governance-topic = "cardano.governance" +publish-block-txs-topic = "cardano.block.txs" + +[module.utxo-state] +store = "memory" # "memory", "dashmap", "fjall", "fjall-async", "sled", "sled-async", "fake" +address-delta-topic = "cardano.address.delta" + +[module.spo-state] +# Enables /pools/{pool_id}/history endpoint, enables to query active_stakes +store-epochs-history = false +# Enable /pools/retired +store-retired-pools = false +# Enables /pools/{pool_id} endpoint +store-registration = false +# # Enables /pools/{pool_id}/updates endpoint +store-updates = false +# Enables /pools/{pool_id}/delegators endpoint (Requires store-stake-addresses to be enabled) +store-delegators = false +# Enables /pools/{pool_id}/votes endpoint +store-votes = false +# Store stake_addresses +store-stake-addresses = false + +[module.drep-state] + +[module.governance-state] + +[module.parameters-state] +store-history = false +network-name = "sanchonet" # "sanchonet", "mainnet" + +[module.stake-delta-filter] +cache-mode = "write" # "predefined", "read", "write", "write-if-absent" +write-full-cache = "false" + +[module.epochs-state] +# Enables /epochs/{number} endpoint (for historical epochs) +store-history = false +# Enables /pools/{pool_id}/blocks endpoint +store-block-hashes = false + + +[module.accounts-state] + +[module.clock] + +[module.rest-server] +address = "127.0.0.1" +port = 4340 + +[module.spy] +# Enable for message spying +#topic = "cardano.drep.state" + +[startup] +topic = "cardano.sequence.start" + +[message-bus.external] +class = "rabbit-mq" +url = "amqp://127.0.0.1:5672/%2f" +exchange = "caryatid" + +[message-bus.internal] +class = "in-memory" +workers = 50 +dispatch-queue-size = 1000 +worker-queue-size = 100 +bulk-block-capacity = 50 +bulk-resume-capacity = 75 + +# Message routing +[[message-router.route]] # Everything is internal only +pattern = "#" +bus = "internal" diff --git a/processes/omnibus/src/main.rs b/processes/omnibus/src/main.rs index ab2d4ca2..0a8d7f83 100644 --- a/processes/omnibus/src/main.rs +++ b/processes/omnibus/src/main.rs @@ -22,6 +22,7 @@ use acropolis_module_governance_state::GovernanceState; use acropolis_module_historical_accounts_state::HistoricalAccountsState; use acropolis_module_mithril_snapshot_fetcher::MithrilSnapshotFetcher; use acropolis_module_parameters_state::ParametersState; +use acropolis_module_peer_network_interface::PeerNetworkInterface; use acropolis_module_rest_blockfrost::BlockfrostREST; use acropolis_module_spdd_state::SPDDState; use acropolis_module_spo_state::SPOState; @@ -102,6 +103,7 @@ pub async fn main() -> Result<()> { MithrilSnapshotFetcher::register(&mut process); UpstreamChainFetcher::register(&mut process); BlockUnpacker::register(&mut process); + PeerNetworkInterface::register(&mut process); TxUnpacker::register(&mut process); UTXOState::register(&mut process); SPOState::register(&mut process); From 41ee5ded3efa602f0ffda2379b6ab6aa01f00631 Mon Sep 17 00:00:00 2001 From: Simon Gellis Date: Mon, 3 Nov 2025 17:15:37 -0500 Subject: [PATCH 04/15] fix: gracefully handle dropped connections --- .../peer_network_interface/src/connection.rs | 36 ++++---- modules/peer_network_interface/src/network.rs | 86 ++++++++++--------- .../src/peer_network_interface.rs | 6 +- 3 files changed, 70 insertions(+), 58 deletions(-) diff --git a/modules/peer_network_interface/src/connection.rs b/modules/peer_network_interface/src/connection.rs index d50c02e5..75d41a7d 100644 --- a/modules/peer_network_interface/src/connection.rs +++ b/modules/peer_network_interface/src/connection.rs @@ -20,8 +20,8 @@ use crate::network::PeerMessageSender; pub struct PeerConnection { pub address: String, - chainsync: mpsc::Sender, - blockfetch: mpsc::Sender, + chainsync: mpsc::UnboundedSender, + blockfetch: mpsc::UnboundedSender, } impl PeerConnection { @@ -31,8 +31,8 @@ impl PeerConnection { magic, sender, }; - let (chainsync_tx, chainsync_rx) = mpsc::channel(16); - let (blockfetch_tx, blockfetch_rx) = mpsc::channel(16); + let (chainsync_tx, chainsync_rx) = mpsc::unbounded_channel(); + let (blockfetch_tx, blockfetch_rx) = mpsc::unbounded_channel(); tokio::spawn(async move { tokio::time::sleep(delay).await; worker.run(chainsync_rx, blockfetch_rx).await; @@ -46,17 +46,17 @@ impl PeerConnection { pub async fn find_tip(&self) -> Result { let (tx, rx) = oneshot::channel(); - self.chainsync.send(ChainsyncCommand::FindTip(tx)).await?; + self.chainsync.send(ChainsyncCommand::FindTip(tx))?; Ok(rx.await?) } - pub async fn find_intersect(&self, points: Vec) -> Result<()> { - self.chainsync.send(ChainsyncCommand::FindIntersect(points)).await?; + pub fn find_intersect(&self, points: Vec) -> Result<()> { + self.chainsync.send(ChainsyncCommand::FindIntersect(points))?; Ok(()) } - pub async fn request_block(&self, hash: BlockHash, height: u64) -> Result<()> { - self.blockfetch.send(BlockfetchCommand::Fetch(hash, height)).await?; + pub fn request_block(&self, hash: BlockHash, height: u64) -> Result<()> { + self.blockfetch.send(BlockfetchCommand::Fetch(hash, height))?; Ok(()) } } @@ -98,8 +98,8 @@ struct PeerConnectionWorker { impl PeerConnectionWorker { async fn run( mut self, - chainsync: mpsc::Receiver, - blockfetch: mpsc::Receiver, + chainsync: mpsc::UnboundedReceiver, + blockfetch: mpsc::UnboundedReceiver, ) { if let Err(err) = self.do_run(chainsync, blockfetch).await { error!(peer = self.address, "{err:#}"); @@ -109,8 +109,8 @@ impl PeerConnectionWorker { async fn do_run( &mut self, - chainsync: mpsc::Receiver, - blockfetch: mpsc::Receiver, + chainsync: mpsc::UnboundedReceiver, + blockfetch: mpsc::UnboundedReceiver, ) -> Result<()> { let client = PeerClient::connect(self.address.clone(), self.magic).await?; select! { @@ -122,7 +122,7 @@ impl PeerConnectionWorker { async fn run_chainsync( &self, mut client: chainsync::N2NClient, - mut commands: mpsc::Receiver, + mut commands: mpsc::UnboundedReceiver, ) -> Result<()> { let mut reached = None; loop { @@ -137,6 +137,12 @@ impl PeerConnectionWorker { let Some(cmd) = cmd else { bail!("parent process has disconnected"); }; + if !client.has_agency() { + // To run find_intersect, we must have agency. + // If we don't, it's because we requested the next response already. + // There's no way to cancel that request, so just wait for it to finish. + client.recv_while_must_reply().await?; + }; match cmd { ChainsyncCommand::FindIntersect(points) => { let (point, _) = client.find_intersect(points).await?; @@ -158,7 +164,7 @@ impl PeerConnectionWorker { async fn run_blockfetch( &self, mut client: blockfetch::Client, - mut commands: mpsc::Receiver, + mut commands: mpsc::UnboundedReceiver, ) -> Result<()> { while let Some(BlockfetchCommand::Fetch(hash, slot)) = commands.recv().await { let point = Point::Specific(slot, hash.to_vec()); diff --git a/modules/peer_network_interface/src/network.rs b/modules/peer_network_interface/src/network.rs index 6694102b..e0a2a90c 100644 --- a/modules/peer_network_interface/src/network.rs +++ b/modules/peer_network_interface/src/network.rs @@ -1,5 +1,5 @@ use std::{ - collections::{HashMap, VecDeque}, + collections::{BTreeMap, HashMap, VecDeque}, time::Duration, }; @@ -16,7 +16,7 @@ use tracing::{info, warn}; pub struct NetworkManager { network_magic: u64, next_id: u64, - peers: HashMap, + peers: BTreeMap, preferred_upstream: Option, blocks_to_fetch: VecDeque
, blocks: HashMap, @@ -38,7 +38,7 @@ impl NetworkManager { Self { network_magic, next_id: 0, - peers: HashMap::new(), + peers: BTreeMap::new(), preferred_upstream: None, blocks_to_fetch: VecDeque::new(), blocks: HashMap::new(), @@ -54,18 +54,18 @@ impl NetworkManager { pub async fn run(mut self) -> Result<()> { while let Some(event) = self.events.recv().await { match event { - NetworkEvent::NewConnection { address, delay } => { - self.handle_new_connection(address, delay).await - } NetworkEvent::PeerUpdate { peer, event } => { - self.handle_peer_update(peer, event).await? + let maybe_publish_blocks = self.handle_peer_update(peer, event)?; + if maybe_publish_blocks { + self.publish_blocks().await?; + } } } } bail!("event sink closed") } - pub async fn handle_new_connection(&mut self, address: String, delay: Duration) { + pub fn handle_new_connection(&mut self, address: String, delay: Duration) { let id = PeerId(self.next_id); self.next_id += 1; let sender = PeerMessageSender { @@ -75,10 +75,10 @@ impl NetworkManager { let conn = PeerConnection::new(address, self.network_magic, sender, delay); if self.preferred_upstream.is_none() { self.peers.insert(id, conn); - self.set_preferred_upstream(id).await; + self.set_preferred_upstream(id); } else { if let Some(head) = self.head.clone() - && let Err(error) = conn.find_intersect(vec![head]).await + && let Err(error) = conn.find_intersect(vec![head]) { warn!("could not sync {}: {error}", conn.address); } @@ -96,26 +96,26 @@ impl NetworkManager { }; match conn.find_tip().await { Ok(point) => { - self.sync_to_point(point).await; + self.sync_to_point(point); return Ok(()); } Err(e) => { warn!("could not fetch tip from {}: {e}", conn.address); - self.handle_disconnect(upstream).await?; + self.handle_disconnect(upstream); } } } } - pub async fn sync_to_point(&mut self, point: Point) { + pub fn sync_to_point(&mut self, point: Point) { for conn in self.peers.values() { - if let Err(error) = conn.find_intersect(vec![point.clone()]).await { + if let Err(error) = conn.find_intersect(vec![point.clone()]) { warn!("could not sync {}: {error}", conn.address); } } } - async fn handle_peer_update(&mut self, peer: PeerId, event: PeerEvent) -> Result<()> { + fn handle_peer_update(&mut self, peer: PeerId, event: PeerEvent) -> Result { let is_preferred = self.preferred_upstream.is_some_and(|id| id == peer); match event { PeerEvent::ChainSync(PeerChainSyncEvent::RollForward(header)) => { @@ -132,21 +132,20 @@ impl NetworkManager { let Some(peer) = self.peers.get(&announcer) else { continue; }; - if let Err(e) = peer.request_block(header.hash, header.slot).await + if let Err(e) = peer.request_block(header.hash, header.slot) { warn!("could not request block from {}: {e}", peer.address); - self.handle_disconnect(announcer).await? + self.handle_disconnect(announcer); } break; // only fetch from one } } + Ok(false) } BlockStatus::Fetched(_) => { - if is_preferred { - // Chainsync has requested a block which we've already fetched, - // so we might be able to publish one or more. - self.publish_blocks().await?; - } + // If chainsync has requested a block which we've already fetched, + // we might be able to publish one or more. + Ok(is_preferred) } } } @@ -173,40 +172,48 @@ impl NetworkManager { } } } + Ok(false) } PeerEvent::BlockFetched(fetched) => { let Some(block) = self.blocks.get_mut(&fetched.hash) else { - return Ok(()); + return Ok(false); }; block.set_body(&fetched.body); - self.publish_blocks().await?; + Ok(true) + } + PeerEvent::Disconnected => { + self.handle_disconnect(peer); + Ok(false) } - PeerEvent::Disconnected => self.handle_disconnect(peer).await?, } - Ok(()) } - async fn handle_disconnect(&mut self, peer: PeerId) -> Result<()> { + fn handle_disconnect(&mut self, peer: PeerId) { let Some(conn) = self.peers.remove(&peer) else { - return Ok(()); + return; }; + warn!("disconnected from {}", conn.address); let is_preferred = self.preferred_upstream.is_some_and(|id| id == peer); if is_preferred && let Some(new_preferred) = self.peers.keys().next().copied() { - self.set_preferred_upstream(new_preferred).await; + self.set_preferred_upstream(new_preferred); } - self.events_sender - .send(NetworkEvent::NewConnection { - address: conn.address, - delay: Duration::from_secs(5), - }) - .await?; - Ok(()) + if self.peers.is_empty() { + warn!("no upstream peers!"); + } + let address = conn.address.clone(); + drop(conn); + self.handle_new_connection(address, Duration::from_secs(5)); } - async fn set_preferred_upstream(&mut self, peer: PeerId) { + fn set_preferred_upstream(&mut self, peer: PeerId) { + if let Some(conn) = self.peers.get(&peer) { + info!("setting preferred upstream to {}", conn.address); + } else { + warn!("setting preferred upstream to unrecognized node {peer:?}"); + } self.preferred_upstream = Some(peer); if let Some(head) = self.head.clone() { - self.sync_to_point(head).await; + self.sync_to_point(head); } } @@ -229,11 +236,10 @@ impl NetworkManager { } pub enum NetworkEvent { - NewConnection { address: String, delay: Duration }, PeerUpdate { peer: PeerId, event: PeerEvent }, } -#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)] +#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)] pub struct PeerId(u64); pub struct PeerMessageSender { diff --git a/modules/peer_network_interface/src/peer_network_interface.rs b/modules/peer_network_interface/src/peer_network_interface.rs index f3b5f9d9..3e970313 100644 --- a/modules/peer_network_interface/src/peer_network_interface.rs +++ b/modules/peer_network_interface/src/peer_network_interface.rs @@ -58,18 +58,18 @@ impl PeerNetworkInterface { let mut manager = NetworkManager::new(cfg.magic_number, events, events_sender, sink); for address in cfg.node_addresses { - manager.handle_new_connection(address, Duration::ZERO).await; + manager.handle_new_connection(address, Duration::ZERO); } match cfg.sync_point { - SyncPoint::Origin => manager.sync_to_point(Point::Origin).await, + SyncPoint::Origin => manager.sync_to_point(Point::Origin), SyncPoint::Tip => manager.sync_to_tip().await?, SyncPoint::Cache => unimplemented!(), SyncPoint::Snapshot => { let mut subscription = snapshot_complete.expect("Snapshot topic subscription missing"); let point = Self::wait_snapshot_completion(&mut subscription).await?; - manager.sync_to_point(point).await; + manager.sync_to_point(point); } } From e0852834c95025a7f0a0178065670ad6bddfdb46 Mon Sep 17 00:00:00 2001 From: Simon Gellis Date: Tue, 4 Nov 2025 11:08:47 -0500 Subject: [PATCH 05/15] fix: handle rollbacks when switching chains --- modules/peer_network_interface/src/network.rs | 148 ++++++++++++------ .../src/peer_network_interface.rs | 3 +- 2 files changed, 106 insertions(+), 45 deletions(-) diff --git a/modules/peer_network_interface/src/network.rs b/modules/peer_network_interface/src/network.rs index e0a2a90c..5a5675c0 100644 --- a/modules/peer_network_interface/src/network.rs +++ b/modules/peer_network_interface/src/network.rs @@ -15,12 +15,13 @@ use tracing::{info, warn}; pub struct NetworkManager { network_magic: u64, + security_param: u64, next_id: u64, peers: BTreeMap, preferred_upstream: Option, blocks_to_fetch: VecDeque
, blocks: HashMap, - head: Option, + chain_prefix: VecDeque, rolled_back: bool, events: mpsc::Receiver, events_sender: mpsc::Sender, @@ -31,18 +32,20 @@ pub struct NetworkManager { impl NetworkManager { pub fn new( network_magic: u64, + security_param: u64, events: mpsc::Receiver, events_sender: mpsc::Sender, block_sink: BlockSink, ) -> Self { Self { network_magic, + security_param, next_id: 0, peers: BTreeMap::new(), preferred_upstream: None, blocks_to_fetch: VecDeque::new(), blocks: HashMap::new(), - head: None, + chain_prefix: VecDeque::new(), rolled_back: false, events, events_sender, @@ -73,16 +76,17 @@ impl NetworkManager { id, }; let conn = PeerConnection::new(address, self.network_magic, sender, delay); - if self.preferred_upstream.is_none() { - self.peers.insert(id, conn); - self.set_preferred_upstream(id); - } else { - if let Some(head) = self.head.clone() - && let Err(error) = conn.find_intersect(vec![head]) + if self.preferred_upstream.is_some() { + let points = self.choose_points_for_find_intersect(); + if !points.is_empty() + && let Err(error) = conn.find_intersect(points) { - warn!("could not sync {}: {error}", conn.address); + warn!("could not sync {}: {error:#}", conn.address); } self.peers.insert(id, conn); + } else { + self.peers.insert(id, conn); + self.set_preferred_upstream(id); } } @@ -91,16 +95,16 @@ impl NetworkManager { let Some(upstream) = self.preferred_upstream else { bail!("no peers"); }; - let Some(conn) = self.peers.get(&upstream) else { + let Some(peer) = self.peers.get(&upstream) else { bail!("preferred upstream not found"); }; - match conn.find_tip().await { + match peer.find_tip().await { Ok(point) => { self.sync_to_point(point); return Ok(()); } Err(e) => { - warn!("could not fetch tip from {}: {e}", conn.address); + warn!("could not fetch tip from {}: {e:#}", peer.address); self.handle_disconnect(upstream); } } @@ -108,13 +112,18 @@ impl NetworkManager { } pub fn sync_to_point(&mut self, point: Point) { - for conn in self.peers.values() { - if let Err(error) = conn.find_intersect(vec![point.clone()]) { - warn!("could not sync {}: {error}", conn.address); + for peer in self.peers.values() { + if let Err(error) = peer.find_intersect(vec![point.clone()]) { + warn!("could not sync {}: {error:#}", peer.address); } } } + // Implementation note: this method is deliberately synchronous/non-blocking. + // The task which handles network events should only block when waiting for new messages, + // or when publishing messages to other modules. This avoids deadlock; if our event queue + // is full and this method is blocked on writing to it, the queue can never drain. + // Returns true if we might have new events to publish downstream. fn handle_peer_update(&mut self, peer: PeerId, event: PeerEvent) -> Result { let is_preferred = self.preferred_upstream.is_some_and(|id| id == peer); match event { @@ -132,8 +141,7 @@ impl NetworkManager { let Some(peer) = self.peers.get(&announcer) else { continue; }; - if let Err(e) = peer.request_block(header.hash, header.slot) - { + if let Err(e) = peer.request_block(header.hash, header.slot) { warn!("could not request block from {}: {e}", peer.address); self.handle_disconnect(announcer); } @@ -153,20 +161,24 @@ impl NetworkManager { if is_preferred { match point { Point::Origin => { + self.rolled_back = !self.chain_prefix.is_empty(); + self.chain_prefix.clear(); self.blocks_to_fetch.clear(); - self.rolled_back = true; } Point::Specific(slot, _) => { - let mut already_sent = true; - while let Some(newest) = self.blocks_to_fetch.back() { - if newest.slot == slot { - already_sent = false; - break; - } else { - self.blocks_to_fetch.pop_back(); - } + // don't bother fetching any blocks from after the rollback point + while self.blocks_to_fetch.back().is_some_and(|b| b.slot > slot) { + self.blocks_to_fetch.pop_back(); } - if already_sent { + + // If we're rolling back to before a block which we've emitted events for, + // set rolled_back to true so that we signal that in the next message. + while self + .chain_prefix + .back() + .is_some_and(|point| is_point_after(point, slot)) + { + self.chain_prefix.pop_back(); self.rolled_back = true; } } @@ -188,32 +200,38 @@ impl NetworkManager { } } - fn handle_disconnect(&mut self, peer: PeerId) { - let Some(conn) = self.peers.remove(&peer) else { + fn handle_disconnect(&mut self, id: PeerId) { + let Some(peer) = self.peers.remove(&id) else { return; }; - warn!("disconnected from {}", conn.address); - let is_preferred = self.preferred_upstream.is_some_and(|id| id == peer); - if is_preferred && let Some(new_preferred) = self.peers.keys().next().copied() { + warn!("disconnected from {}", peer.address); + let was_preferred = self.preferred_upstream.is_some_and(|i| i == id); + if was_preferred && let Some(new_preferred) = self.peers.keys().next().copied() { self.set_preferred_upstream(new_preferred); } if self.peers.is_empty() { warn!("no upstream peers!"); } - let address = conn.address.clone(); - drop(conn); + let address = peer.address.clone(); + drop(peer); self.handle_new_connection(address, Duration::from_secs(5)); } - fn set_preferred_upstream(&mut self, peer: PeerId) { - if let Some(conn) = self.peers.get(&peer) { - info!("setting preferred upstream to {}", conn.address); - } else { - warn!("setting preferred upstream to unrecognized node {peer:?}"); - } - self.preferred_upstream = Some(peer); - if let Some(head) = self.head.clone() { - self.sync_to_point(head); + fn set_preferred_upstream(&mut self, id: PeerId) { + let Some(peer) = self.peers.get(&id) else { + warn!("setting preferred upstream to unrecognized node {id:?}"); + return; + }; + info!("setting preferred upstream to {}", peer.address); + self.preferred_upstream = Some(id); + + // If our preferred upstream changed, resync all connections. + // That will trigger a rollback if needed. + let points = self.choose_points_for_find_intersect(); + for peer in self.peers.values() { + if let Err(error) = peer.find_intersect(points.clone()) { + warn!("could not sync {}: {error:#}", peer.address) + } } } @@ -227,12 +245,54 @@ impl NetworkManager { if self.published_blocks.is_multiple_of(100) { info!("Published block {}", header.number); } - self.head = Some(Point::Specific(header.slot, header.hash.to_vec())); + let point = Point::Specific(header.slot, header.hash.to_vec()); + self.chain_prefix.push_back(point); + while self.chain_prefix.len() > self.security_param as usize { + self.chain_prefix.pop_front(); + } self.rolled_back = false; self.blocks_to_fetch.pop_front(); } Ok(()) } + + fn choose_points_for_find_intersect(&self) -> Vec { + let mut iterator = self.chain_prefix.iter().rev(); + let mut result = vec![]; + + // send the 5 most recent points + for _ in 0..5 { + if let Some(next) = iterator.next() { + result.push(next.clone()); + } + } + + // then 5 more points, spaced out by 10 block heights each + let mut iterator = iterator.step_by(10); + for _ in 0..5 { + if let Some(next) = iterator.next() { + result.push(next.clone()); + } + } + + // then 5 more points, spaced out by a total of 100 block heights each + // (in case of an implausibly long rollback) + let mut iterator = iterator.step_by(10); + for _ in 0..5 { + if let Some(next) = iterator.next() { + result.push(next.clone()); + } + } + + result + } +} + +const fn is_point_after(point: &Point, slot: u64) -> bool { + match point { + Point::Origin => false, + Point::Specific(s, _) => *s > slot, + } } pub enum NetworkEvent { diff --git a/modules/peer_network_interface/src/peer_network_interface.rs b/modules/peer_network_interface/src/peer_network_interface.rs index 3e970313..d4ee84f6 100644 --- a/modules/peer_network_interface/src/peer_network_interface.rs +++ b/modules/peer_network_interface/src/peer_network_interface.rs @@ -56,7 +56,8 @@ impl PeerNetworkInterface { genesis_values, }; - let mut manager = NetworkManager::new(cfg.magic_number, events, events_sender, sink); + let mut manager = + NetworkManager::new(cfg.magic_number, 2160, events, events_sender, sink); for address in cfg.node_addresses { manager.handle_new_connection(address, Duration::ZERO); } From ece87d3b1194c68efc16290b66d19ae84e277b1b Mon Sep 17 00:00:00 2001 From: Simon Gellis Date: Tue, 4 Nov 2025 12:34:06 -0500 Subject: [PATCH 06/15] feat: integrate upstream cache --- common/src/lib.rs | 1 + .../src/upstream_cache.rs | 40 +++++++-------- .../src/configuration.rs | 1 - .../peer_network_interface/src/connection.rs | 21 ++------ .../src/peer_network_interface.rs | 51 +++++++++++++++++-- .../src/body_fetcher.rs | 7 ++- .../src/upstream_chain_fetcher.rs | 3 +- 7 files changed, 77 insertions(+), 47 deletions(-) rename {modules/upstream_chain_fetcher => common}/src/upstream_cache.rs (89%) diff --git a/common/src/lib.rs b/common/src/lib.rs index 0ca5a58a..c5da58cf 100644 --- a/common/src/lib.rs +++ b/common/src/lib.rs @@ -21,6 +21,7 @@ pub mod snapshot; pub mod stake_addresses; pub mod state_history; pub mod types; +pub mod upstream_cache; pub mod validation; // Flattened re-exports diff --git a/modules/upstream_chain_fetcher/src/upstream_cache.rs b/common/src/upstream_cache.rs similarity index 89% rename from modules/upstream_chain_fetcher/src/upstream_cache.rs rename to common/src/upstream_cache.rs index 29e42f3f..ebc75d24 100644 --- a/modules/upstream_chain_fetcher/src/upstream_cache.rs +++ b/common/src/upstream_cache.rs @@ -1,9 +1,9 @@ -use acropolis_common::{messages::RawBlockMessage, BlockInfo}; -use anyhow::{anyhow, bail, Result}; +use crate::{messages::RawBlockMessage, BlockInfo}; +use anyhow::{anyhow, bail, Context, Result}; use std::{ fs::File, io::{BufReader, Write}, - path::Path, + path::{Path, PathBuf}, sync::Arc, }; @@ -19,25 +19,25 @@ pub trait Storage { } pub struct FileStorage { - path: String, + path: PathBuf, } impl FileStorage { - pub fn new(path: &str) -> Self { + pub fn new>(path: P) -> Self { Self { - path: path.to_string(), + path: path.as_ref().to_path_buf(), } } - fn get_file_name(&self, chunk_no: usize) -> String { - format!("{}/chunk-{chunk_no}.json", self.path) + fn get_file_name(&self, chunk_no: usize) -> PathBuf { + self.path.join(format!("chunk-{chunk_no}.json")) } } pub type UpstreamCache = UpstreamCacheImpl; impl UpstreamCache { - pub fn new(path: &str) -> Self { + pub fn new>(path: P) -> Self { UpstreamCache::new_impl(FileStorage::new(path)) } } @@ -139,26 +139,24 @@ impl UpstreamCacheImpl { impl Storage for FileStorage { fn read_chunk(&mut self, chunk_no: usize) -> Result> { - let name = self.get_file_name(chunk_no); - let path = Path::new(&name); + let path = self.get_file_name(chunk_no); if !path.try_exists()? { return Ok(vec![]); } - let file = File::open(&name)?; + let file = File::open(&path)?; let reader = BufReader::new(file); - match serde_json::from_reader::, Vec>(reader) - { - Ok(res) => Ok(res.clone()), - Err(err) => Err(anyhow!( - "Error reading upstream cache chunk JSON from {name}: '{err}'" - )), - } + serde_json::from_reader(reader).with_context(|| { + format!( + "Error reading upstream cache chunk JSON from {}", + path.display() + ) + }) } fn write_chunk(&mut self, chunk_no: usize, data: &[UpstreamCacheRecord]) -> Result<()> { let mut file = File::create(self.get_file_name(chunk_no))?; - file.write_all(serde_json::to_string(data)?.as_bytes())?; + file.write_all(&serde_json::to_vec(data)?)?; Ok(()) } } @@ -166,7 +164,7 @@ impl Storage for FileStorage { #[cfg(test)] mod test { use crate::upstream_cache::{Storage, UpstreamCacheImpl, UpstreamCacheRecord}; - use acropolis_common::{messages::RawBlockMessage, BlockHash, BlockInfo, BlockStatus, Era}; + use crate::{messages::RawBlockMessage, BlockHash, BlockInfo, BlockStatus, Era}; use anyhow::Result; use std::{collections::HashMap, sync::Arc}; diff --git a/modules/peer_network_interface/src/configuration.rs b/modules/peer_network_interface/src/configuration.rs index 8aca18a7..9f02ebb0 100644 --- a/modules/peer_network_interface/src/configuration.rs +++ b/modules/peer_network_interface/src/configuration.rs @@ -22,7 +22,6 @@ pub struct InterfaceConfig { pub genesis_completion_topic: String, pub node_addresses: Vec, pub magic_number: u64, - #[expect(unused)] pub cache_dir: PathBuf, #[serde(flatten)] pub genesis_values: Option, diff --git a/modules/peer_network_interface/src/connection.rs b/modules/peer_network_interface/src/connection.rs index 75d41a7d..5afdd9a6 100644 --- a/modules/peer_network_interface/src/connection.rs +++ b/modules/peer_network_interface/src/connection.rs @@ -201,22 +201,11 @@ impl PeerConnectionWorker { let hdr_tag = header.byron_prefix.map(|p| p.0); let hdr_variant = header.variant; let hdr = MultiEraHeader::decode(hdr_variant, hdr_tag, &header.cbor)?; - let era = match hdr { - MultiEraHeader::EpochBoundary(_) => return Ok(None), - MultiEraHeader::Byron(_) => Era::Byron, - MultiEraHeader::ShelleyCompatible(_) => match hdr_variant { - 1 => Era::Shelley, - 2 => Era::Allegra, - 3 => Era::Mary, - 4 => Era::Alonzo, - x => bail!("Impossible header variant {x} for ShelleyCompatible (TPraos)"), - }, - MultiEraHeader::BabbageCompatible(_) => match hdr_variant { - 5 => Era::Babbage, - 6 => Era::Conway, - x => bail!("Impossible header variant {x} for BabbageCompatible (Praos)"), - }, - }; + if hdr.as_eb().is_some() { + // skip EpochBoundary blocks + return Ok(None); + } + let era = Era::try_from(hdr_variant)?; Ok(Some(Header { hash: BlockHash::new(*hdr.hash()), slot: hdr.slot(), diff --git a/modules/peer_network_interface/src/peer_network_interface.rs b/modules/peer_network_interface/src/peer_network_interface.rs index d4ee84f6..1ed9f672 100644 --- a/modules/peer_network_interface/src/peer_network_interface.rs +++ b/modules/peer_network_interface/src/peer_network_interface.rs @@ -6,6 +6,7 @@ use acropolis_common::{ BlockInfo, BlockStatus, genesis_values::GenesisValues, messages::{CardanoMessage, Message, RawBlockMessage}, + upstream_cache::{UpstreamCache, UpstreamCacheRecord}, }; use anyhow::{Result, bail}; use caryatid_sdk::{Context, Module, Subscription, module}; @@ -50,10 +51,28 @@ impl PeerNetworkInterface { } else { cfg.genesis_values.expect("genesis values not found") }; + + let mut upstream_cache = None; + let mut cache_sync_point = None; + if cfg.sync_point == SyncPoint::Cache { + let mut cache = UpstreamCache::new(&cfg.cache_dir); + cache.start_reading()?; + while let Some(record) = cache.read_record()? { + cache_sync_point = Some((record.id.slot, record.id.hash)); + let message = Arc::new(Message::Cardano(( + record.id, + CardanoMessage::BlockAvailable(Arc::unwrap_or_clone(record.message)), + ))); + context.message_bus.publish(&cfg.block_topic, message).await?; + } + upstream_cache = Some(cache); + } + let sink = BlockSink { context, topic: cfg.block_topic, genesis_values, + upstream_cache, }; let mut manager = @@ -65,7 +84,13 @@ impl PeerNetworkInterface { match cfg.sync_point { SyncPoint::Origin => manager.sync_to_point(Point::Origin), SyncPoint::Tip => manager.sync_to_tip().await?, - SyncPoint::Cache => unimplemented!(), + SyncPoint::Cache => { + let point = match cache_sync_point { + Some((slot, hash)) => Point::Specific(slot, hash.to_vec()), + None => Point::Origin, + }; + manager.sync_to_point(point); + } SyncPoint::Snapshot => { let mut subscription = snapshot_complete.expect("Snapshot topic subscription missing"); @@ -109,15 +134,31 @@ struct BlockSink { context: Arc>, topic: String, genesis_values: GenesisValues, + upstream_cache: Option, } impl BlockSink { - pub async fn announce(&self, header: &Header, body: &[u8], rolled_back: bool) -> Result<()> { + pub async fn announce( + &mut self, + header: &Header, + body: &[u8], + rolled_back: bool, + ) -> Result<()> { let info = self.make_block_info(header, rolled_back); - let available = CardanoMessage::BlockAvailable(RawBlockMessage { + let raw_block = RawBlockMessage { header: header.bytes.clone(), body: body.to_vec(), - }); - let message = Arc::new(Message::Cardano((info, available))); + }; + if let Some(cache) = self.upstream_cache.as_mut() { + let record = UpstreamCacheRecord { + id: info.clone(), + message: Arc::new(raw_block.clone()), + }; + cache.write_record(&record)?; + } + let message = Arc::new(Message::Cardano(( + info, + CardanoMessage::BlockAvailable(raw_block), + ))); self.context.publish(&self.topic, message).await } diff --git a/modules/upstream_chain_fetcher/src/body_fetcher.rs b/modules/upstream_chain_fetcher/src/body_fetcher.rs index 50c0631e..6b453b3c 100644 --- a/modules/upstream_chain_fetcher/src/body_fetcher.rs +++ b/modules/upstream_chain_fetcher/src/body_fetcher.rs @@ -1,7 +1,11 @@ //! Acropolis Miniprotocols module for Caryatid //! Multi-connection, block body fetching part of the client (in separate thread). -use acropolis_common::{messages::RawBlockMessage, BlockHash, BlockInfo, BlockStatus, Era}; +use acropolis_common::{ + messages::RawBlockMessage, + upstream_cache::{UpstreamCache, UpstreamCacheRecord}, + BlockHash, BlockInfo, BlockStatus, Era, +}; use anyhow::{bail, Result}; use crossbeam::channel::{Receiver, TryRecvError}; use pallas::{ @@ -15,7 +19,6 @@ use std::{sync::Arc, time::Duration}; use tokio::{sync::Mutex, time::sleep}; use tracing::{debug, error, info}; -use crate::upstream_cache::{UpstreamCache, UpstreamCacheRecord}; use crate::{ utils, utils::{ diff --git a/modules/upstream_chain_fetcher/src/upstream_chain_fetcher.rs b/modules/upstream_chain_fetcher/src/upstream_chain_fetcher.rs index 71322189..e2ae7b71 100644 --- a/modules/upstream_chain_fetcher/src/upstream_chain_fetcher.rs +++ b/modules/upstream_chain_fetcher/src/upstream_chain_fetcher.rs @@ -4,6 +4,7 @@ use acropolis_common::{ genesis_values::GenesisValues, messages::{CardanoMessage, Message}, + upstream_cache::{UpstreamCache, UpstreamCacheRecord}, BlockInfo, }; use anyhow::{anyhow, bail, Result}; @@ -24,12 +25,10 @@ use tokio::{sync::Mutex, time::sleep}; use tracing::{debug, error, info}; mod body_fetcher; -mod upstream_cache; mod utils; use crate::utils::FetchResult; use body_fetcher::BodyFetcher; -use upstream_cache::{UpstreamCache, UpstreamCacheRecord}; use utils::{FetcherConfig, SyncPoint}; const MAX_BODY_FETCHER_CHANNEL_LENGTH: usize = 100; From 4647b72d52f8697d13c78ec822aa586d51cebcbc Mon Sep 17 00:00:00 2001 From: Simon Gellis Date: Tue, 4 Nov 2025 13:33:51 -0500 Subject: [PATCH 07/15] fix: handle cache errors more gracefully --- common/src/upstream_cache.rs | 16 ++-- .../config.default.toml | 2 + modules/peer_network_interface/src/network.rs | 16 ++-- .../src/peer_network_interface.rs | 77 ++++++++++++------- .../src/upstream_chain_fetcher.rs | 2 +- processes/omnibus/.gitignore | 1 + processes/omnibus/omnibus-local.toml | 2 +- 7 files changed, 72 insertions(+), 44 deletions(-) diff --git a/common/src/upstream_cache.rs b/common/src/upstream_cache.rs index ebc75d24..93bbf27f 100644 --- a/common/src/upstream_cache.rs +++ b/common/src/upstream_cache.rs @@ -23,10 +23,10 @@ pub struct FileStorage { } impl FileStorage { - pub fn new>(path: P) -> Self { - Self { - path: path.as_ref().to_path_buf(), - } + pub fn new>(path: P) -> Result { + let path = path.as_ref().to_path_buf(); + std::fs::create_dir_all(&path)?; + Ok(Self { path }) } fn get_file_name(&self, chunk_no: usize) -> PathBuf { @@ -37,8 +37,8 @@ impl FileStorage { pub type UpstreamCache = UpstreamCacheImpl; impl UpstreamCache { - pub fn new>(path: P) -> Self { - UpstreamCache::new_impl(FileStorage::new(path)) + pub fn new>(path: P) -> Result { + Ok(UpstreamCache::new_impl(FileStorage::new(path)?)) } } @@ -124,7 +124,7 @@ impl UpstreamCacheImpl { pub fn write_record(&mut self, record: &UpstreamCacheRecord) -> Result<()> { self.chunk_cached.push(record.clone()); - self.storage.write_chunk(self.current_chunk, &self.chunk_cached)?; + self.storage.write_chunk(self.current_chunk, &self.chunk_cached).context("could not write cache record")?; self.current_record += 1; if self.current_record >= self.density { @@ -155,7 +155,7 @@ impl Storage for FileStorage { } fn write_chunk(&mut self, chunk_no: usize, data: &[UpstreamCacheRecord]) -> Result<()> { - let mut file = File::create(self.get_file_name(chunk_no))?; + let mut file = File::create(self.get_file_name(chunk_no)).context("could not write chunk")?; file.write_all(&serde_json::to_vec(data)?)?; Ok(()) } diff --git a/modules/peer_network_interface/config.default.toml b/modules/peer_network_interface/config.default.toml index da216c60..ca4d15c7 100644 --- a/modules/peer_network_interface/config.default.toml +++ b/modules/peer_network_interface/config.default.toml @@ -4,6 +4,8 @@ genesis-completion-topic = "cardano.sequence.bootstrapped" node-addresses = [ "backbone.cardano.iog.io:3001", + "backbone.mainnet.cardanofoundation.org:3001", + "backbone.mainnet.emurgornd.com:3001", ] magic-number = 764824073 diff --git a/modules/peer_network_interface/src/network.rs b/modules/peer_network_interface/src/network.rs index 5a5675c0..2142b6d9 100644 --- a/modules/peer_network_interface/src/network.rs +++ b/modules/peer_network_interface/src/network.rs @@ -58,7 +58,7 @@ impl NetworkManager { while let Some(event) = self.events.recv().await { match event { NetworkEvent::PeerUpdate { peer, event } => { - let maybe_publish_blocks = self.handle_peer_update(peer, event)?; + let maybe_publish_blocks = self.handle_peer_update(peer, event); if maybe_publish_blocks { self.publish_blocks().await?; } @@ -124,7 +124,7 @@ impl NetworkManager { // or when publishing messages to other modules. This avoids deadlock; if our event queue // is full and this method is blocked on writing to it, the queue can never drain. // Returns true if we might have new events to publish downstream. - fn handle_peer_update(&mut self, peer: PeerId, event: PeerEvent) -> Result { + fn handle_peer_update(&mut self, peer: PeerId, event: PeerEvent) -> bool { let is_preferred = self.preferred_upstream.is_some_and(|id| id == peer); match event { PeerEvent::ChainSync(PeerChainSyncEvent::RollForward(header)) => { @@ -148,12 +148,12 @@ impl NetworkManager { break; // only fetch from one } } - Ok(false) + false } BlockStatus::Fetched(_) => { // If chainsync has requested a block which we've already fetched, // we might be able to publish one or more. - Ok(is_preferred) + is_preferred } } } @@ -184,18 +184,18 @@ impl NetworkManager { } } } - Ok(false) + false } PeerEvent::BlockFetched(fetched) => { let Some(block) = self.blocks.get_mut(&fetched.hash) else { - return Ok(false); + return false; }; block.set_body(&fetched.body); - Ok(true) + true } PeerEvent::Disconnected => { self.handle_disconnect(peer); - Ok(false) + false } } } diff --git a/modules/peer_network_interface/src/peer_network_interface.rs b/modules/peer_network_interface/src/peer_network_interface.rs index 1ed9f672..7bd9e870 100644 --- a/modules/peer_network_interface/src/peer_network_interface.rs +++ b/modules/peer_network_interface/src/peer_network_interface.rs @@ -3,18 +3,16 @@ mod connection; mod network; use acropolis_common::{ - BlockInfo, BlockStatus, - genesis_values::GenesisValues, - messages::{CardanoMessage, Message, RawBlockMessage}, - upstream_cache::{UpstreamCache, UpstreamCacheRecord}, + BlockInfo, BlockStatus, genesis_values::GenesisValues, messages::{CardanoMessage, Message, RawBlockMessage}, upstream_cache::{UpstreamCache, UpstreamCacheRecord} }; use anyhow::{Result, bail}; use caryatid_sdk::{Context, Module, Subscription, module}; use config::Config; use pallas::network::miniprotocols::Point; use tokio::sync::mpsc; +use tracing::{error, warn}; -use std::{sync::Arc, time::Duration}; +use std::{path::Path, sync::Arc, time::Duration}; use crate::{ configuration::{InterfaceConfig, SyncPoint}, @@ -53,19 +51,17 @@ impl PeerNetworkInterface { }; let mut upstream_cache = None; - let mut cache_sync_point = None; + let mut cache_sync_point = Point::Origin; if cfg.sync_point == SyncPoint::Cache { - let mut cache = UpstreamCache::new(&cfg.cache_dir); - cache.start_reading()?; - while let Some(record) = cache.read_record()? { - cache_sync_point = Some((record.id.slot, record.id.hash)); - let message = Arc::new(Message::Cardano(( - record.id, - CardanoMessage::BlockAvailable(Arc::unwrap_or_clone(record.message)), - ))); - context.message_bus.publish(&cfg.block_topic, message).await?; + match Self::init_cache(&cfg.cache_dir, &cfg.block_topic, &context).await { + Ok((cache, sync_point)) => { + upstream_cache = Some(cache); + cache_sync_point = sync_point; + } + Err(e) => { + warn!("could not initialize upstream cache: {e:#}"); + } } - upstream_cache = Some(cache); } let sink = BlockSink { @@ -83,28 +79,57 @@ impl PeerNetworkInterface { match cfg.sync_point { SyncPoint::Origin => manager.sync_to_point(Point::Origin), - SyncPoint::Tip => manager.sync_to_tip().await?, - SyncPoint::Cache => { - let point = match cache_sync_point { - Some((slot, hash)) => Point::Specific(slot, hash.to_vec()), - None => Point::Origin, - }; - manager.sync_to_point(point); + SyncPoint::Tip => { + if let Err(error) = manager.sync_to_tip().await { + warn!("could not sync to tip: {error:#}"); + return; + } } + SyncPoint::Cache => manager.sync_to_point(cache_sync_point), SyncPoint::Snapshot => { let mut subscription = snapshot_complete.expect("Snapshot topic subscription missing"); - let point = Self::wait_snapshot_completion(&mut subscription).await?; - manager.sync_to_point(point); + match Self::wait_snapshot_completion(&mut subscription).await { + Ok(point) => manager.sync_to_point(point), + Err(error) => { + warn!("snapshot restoration never completed: {error:#}"); + return; + } + } } } - manager.run().await + if let Err(err) = manager.run().await { + error!("chain sync failed: {err:#}"); + } }); Ok(()) } + async fn init_cache( + cache_dir: &Path, + block_topic: &str, + context: &Context, + ) -> Result<(UpstreamCache, Point)> { + let mut cache = UpstreamCache::new(cache_dir)?; + let mut cache_sync_point = None; + cache.start_reading()?; + while let Some(record) = cache.read_record()? { + cache_sync_point = Some((record.id.slot, record.id.hash)); + let message = Arc::new(Message::Cardano(( + record.id, + CardanoMessage::BlockAvailable(Arc::unwrap_or_clone(record.message)), + ))); + context.message_bus.publish(block_topic, message).await?; + } + let sync_point = match cache_sync_point { + None => Point::Origin, + Some((slot, hash)) => Point::Specific(slot, hash.to_vec()), + }; + Ok((cache, sync_point)) + } + async fn wait_genesis_completion( subscription: &mut Box>, ) -> Result { diff --git a/modules/upstream_chain_fetcher/src/upstream_chain_fetcher.rs b/modules/upstream_chain_fetcher/src/upstream_chain_fetcher.rs index e2ae7b71..6cbdca43 100644 --- a/modules/upstream_chain_fetcher/src/upstream_chain_fetcher.rs +++ b/modules/upstream_chain_fetcher/src/upstream_chain_fetcher.rs @@ -227,7 +227,7 @@ impl UpstreamChainFetcher { Self::sync_to_point(cfg, None, Point::Origin).await?; } SyncPoint::Cache => { - let mut upstream_cache = UpstreamCache::new(&cfg.cache_dir); + let mut upstream_cache = UpstreamCache::new(&cfg.cache_dir)?; let point = match Self::read_cache(cfg.clone(), &mut upstream_cache).await? { None => Point::Origin, Some(blk) => Point::Specific(blk.slot, blk.hash.to_vec()), diff --git a/processes/omnibus/.gitignore b/processes/omnibus/.gitignore index 1fa2d34e..ce63bc1e 100644 --- a/processes/omnibus/.gitignore +++ b/processes/omnibus/.gitignore @@ -3,6 +3,7 @@ sled-immutable-utxos fjall-blocks fjall-immutable-utxos cache +upstream-cache # DB files *_db diff --git a/processes/omnibus/omnibus-local.toml b/processes/omnibus/omnibus-local.toml index 488e7ead..d90b9371 100644 --- a/processes/omnibus/omnibus-local.toml +++ b/processes/omnibus/omnibus-local.toml @@ -1,7 +1,7 @@ # Top-level configuration for Acropolis omnibus process [module.genesis-bootstrapper] -network-name = "preview" # "sanchonet", "mainnet" +network-name = "preview" # "sanchonet", "preview", "mainnet" #[module.mithril-snapshot-fetcher] #Turned off with SanchoNet From dabb272bafa61da9e08b822dee5a64755f69d18c Mon Sep 17 00:00:00 2001 From: Simon Gellis Date: Tue, 4 Nov 2025 15:06:32 -0500 Subject: [PATCH 08/15] feat: extract chain logic to separate helper --- .../peer_network_interface/src/chain_state.rs | 199 ++++++++++++++++++ .../peer_network_interface/src/connection.rs | 7 +- modules/peer_network_interface/src/network.rs | 179 +++------------- .../src/peer_network_interface.rs | 3 +- 4 files changed, 239 insertions(+), 149 deletions(-) create mode 100644 modules/peer_network_interface/src/chain_state.rs diff --git a/modules/peer_network_interface/src/chain_state.rs b/modules/peer_network_interface/src/chain_state.rs new file mode 100644 index 00000000..ae0c3125 --- /dev/null +++ b/modules/peer_network_interface/src/chain_state.rs @@ -0,0 +1,199 @@ +use std::collections::{BTreeMap, VecDeque}; + +use acropolis_common::{BlockHash, params::SECURITY_PARAMETER_K}; +use pallas::network::miniprotocols::Point; + +use crate::{connection::Header, network::PeerId}; + +struct BlockData { + header: Header, + announced_by: Vec, + body: Option>, +} + +#[derive(Default)] +struct SlotBlockData { + blocks: Vec, +} +impl SlotBlockData { + fn track_announcement(&mut self, id: PeerId, header: Header) { + if let Some(block) = self.blocks.iter_mut().find(|b| b.header.hash == header.hash) { + block.announced_by.push(id); + } else { + self.blocks.push(BlockData { header, announced_by: vec![id], body: None }); + } + } + + fn announced(&self, id: PeerId, hash: BlockHash) -> bool { + self.blocks.iter().any(|b| b.header.hash == hash && b.announced_by.contains(&id)) + } + + fn announcers(&self, hash: BlockHash) -> Vec { + match self.blocks.iter().find(|b| b.header.hash == hash) { + Some(b) => b.announced_by.clone(), + None => vec![] + } + } + + fn track_body(&mut self, hash: BlockHash, body: Vec) { + let Some(block) = self.blocks.iter_mut().find(|b| b.header.hash == hash) else { + return; + }; + if block.body.is_none() { + block.body = Some(body); + } + } + + fn body(&self, hash: BlockHash) -> Option<(&Header, &[u8])> { + for block in &self.blocks { + if block.header.hash != hash { + continue; + } + return Some((&block.header, block.body.as_ref()?)); + } + None + } +} + +#[derive(Default)] +pub struct ChainState { + pub preferred_upstream: Option, + blocks: BTreeMap, + published_blocks: VecDeque<(u64, BlockHash)>, + unpublished_blocks: VecDeque<(u64, BlockHash)>, + rolled_back: bool, +} + +impl ChainState { + pub fn new() -> Self { + Self::default() + } + + pub fn handle_roll_forward(&mut self, id: PeerId, header: Header) -> Vec { + let is_preferred = self.preferred_upstream == Some(id); + let slot = header.slot; + let hash = header.hash; + let slot_blocks = self.blocks.entry(header.slot).or_default(); + slot_blocks.track_announcement(id, header); + if is_preferred { + self.unpublished_blocks.push_back((slot, hash)); + self.block_announcers(slot, hash) + } else { + vec![] + } + } + + pub fn handle_roll_backward(&mut self, id: PeerId, point: Point) -> bool { + let is_preferred = self.preferred_upstream == Some(id); + if !is_preferred { + return false; + } + match point { + Point::Origin => { + self.rolled_back = !self.published_blocks.is_empty(); + self.published_blocks.clear(); + self.unpublished_blocks.clear(); + self.rolled_back + } + Point::Specific(slot, _) => { + while let Some((s, _)) = self.unpublished_blocks.back() { + if *s > slot { + self.unpublished_blocks.pop_back(); + } else { + break; + } + } + self.rolled_back = false; + while let Some((s, _)) = self.published_blocks.back() { + if *s > slot { + self.rolled_back = true; + self.published_blocks.pop_back(); + } else { + break; + } + } + self.rolled_back + } + } + } + + pub fn handle_body_fetched(&mut self, slot: u64, hash: BlockHash, body: Vec) { + let Some(slot_blocks) = self.blocks.get_mut(&slot) else { + return; + }; + slot_blocks.track_body(hash, body); + } + + pub fn handle_new_preferred_upstream(&mut self, id: PeerId) { + if self.preferred_upstream == Some(id) { + return; + } + self.preferred_upstream = Some(id); + while let Some((slot, hash)) = self.unpublished_blocks.back() { + let Some(slot_blocks) = self.blocks.get(slot) else { + break; + }; + if !slot_blocks.announced(id, *hash) { + self.unpublished_blocks.pop_back(); + } + } + } + + pub fn next_unpublished_block(&self) -> Option<(&Header, &[u8], bool)> { + let (slot, hash) = self.unpublished_blocks.front()?; + let slot_blocks = self.blocks.get(slot)?; + let (header, body) = slot_blocks.body(*hash)?; + Some((header, body, self.rolled_back)) + } + + pub fn handle_block_published(&mut self) { + if let Some(published) = self.unpublished_blocks.pop_front() { + self.published_blocks.push_back(published); + self.rolled_back = false; + while self.published_blocks.len() > SECURITY_PARAMETER_K as usize { + let Some((slot, _)) = self.published_blocks.pop_back() else { + break; + }; + self.blocks.remove(&slot); + } + } + } + + pub fn choose_points_for_find_intersect(&self) -> Vec { + let mut iterator = self.published_blocks.iter().rev(); + let mut result = vec![]; + + // send the 5 most recent points + for _ in 0..5 { + if let Some((slot, hash)) = iterator.next() { + result.push(Point::Specific(*slot, hash.to_vec())); + } + } + + // then 5 more points, spaced out by 10 block heights each + let mut iterator = iterator.step_by(10); + for _ in 0..5 { + if let Some((slot, hash)) = iterator.next() { + result.push(Point::Specific(*slot, hash.to_vec())); + } + } + + // then 5 more points, spaced out by a total of 100 block heights each + // (in case of an implausibly long rollback) + let mut iterator = iterator.step_by(10); + for _ in 0..5 { + if let Some((slot, hash)) = iterator.next() { + result.push(Point::Specific(*slot, hash.to_vec())); + } + } + + result + } + + fn block_announcers(&self, slot: u64, hash: BlockHash) -> Vec { + match self.blocks.get(&slot) { + Some(slot_blocks) => slot_blocks.announcers(hash), + None => vec![] + } + } +} \ No newline at end of file diff --git a/modules/peer_network_interface/src/connection.rs b/modules/peer_network_interface/src/connection.rs index 5afdd9a6..a2fdabb5 100644 --- a/modules/peer_network_interface/src/connection.rs +++ b/modules/peer_network_interface/src/connection.rs @@ -55,8 +55,8 @@ impl PeerConnection { Ok(()) } - pub fn request_block(&self, hash: BlockHash, height: u64) -> Result<()> { - self.blockfetch.send(BlockfetchCommand::Fetch(hash, height))?; + pub fn request_block(&self, hash: BlockHash, slot: u64) -> Result<()> { + self.blockfetch.send(BlockfetchCommand::Fetch(hash, slot))?; Ok(()) } } @@ -85,6 +85,7 @@ pub struct Header { #[derive(Debug)] pub struct BlockFetched { + pub slot: u64, pub hash: BlockHash, pub body: Vec, } @@ -169,7 +170,7 @@ impl PeerConnectionWorker { while let Some(BlockfetchCommand::Fetch(hash, slot)) = commands.recv().await { let point = Point::Specific(slot, hash.to_vec()); let body = client.fetch_single(point).await?; - self.sender.write(PeerEvent::BlockFetched(BlockFetched { hash, body })).await?; + self.sender.write(PeerEvent::BlockFetched(BlockFetched { slot, hash, body })).await?; } bail!("parent process has disconnected"); } diff --git a/modules/peer_network_interface/src/network.rs b/modules/peer_network_interface/src/network.rs index 2142b6d9..30c431ee 100644 --- a/modules/peer_network_interface/src/network.rs +++ b/modules/peer_network_interface/src/network.rs @@ -1,13 +1,11 @@ use std::{ - collections::{BTreeMap, HashMap, VecDeque}, + collections::BTreeMap, time::Duration, }; use crate::{ - BlockSink, - connection::{Header, PeerChainSyncEvent, PeerConnection, PeerEvent}, + BlockSink, chain_state::ChainState, connection::{PeerChainSyncEvent, PeerConnection, PeerEvent} }; -use acropolis_common::BlockHash; use anyhow::{Context as _, Result, bail}; use pallas::network::miniprotocols::Point; use tokio::sync::mpsc; @@ -15,13 +13,9 @@ use tracing::{info, warn}; pub struct NetworkManager { network_magic: u64, - security_param: u64, next_id: u64, peers: BTreeMap, - preferred_upstream: Option, - blocks_to_fetch: VecDeque
, - blocks: HashMap, - chain_prefix: VecDeque, + chain: ChainState, rolled_back: bool, events: mpsc::Receiver, events_sender: mpsc::Sender, @@ -32,20 +26,15 @@ pub struct NetworkManager { impl NetworkManager { pub fn new( network_magic: u64, - security_param: u64, events: mpsc::Receiver, events_sender: mpsc::Sender, block_sink: BlockSink, ) -> Self { Self { network_magic, - security_param, next_id: 0, peers: BTreeMap::new(), - preferred_upstream: None, - blocks_to_fetch: VecDeque::new(), - blocks: HashMap::new(), - chain_prefix: VecDeque::new(), + chain: ChainState::new(), rolled_back: false, events, events_sender, @@ -58,8 +47,8 @@ impl NetworkManager { while let Some(event) = self.events.recv().await { match event { NetworkEvent::PeerUpdate { peer, event } => { - let maybe_publish_blocks = self.handle_peer_update(peer, event); - if maybe_publish_blocks { + self.handle_peer_update(peer, event); + if true { self.publish_blocks().await?; } } @@ -76,8 +65,8 @@ impl NetworkManager { id, }; let conn = PeerConnection::new(address, self.network_magic, sender, delay); - if self.preferred_upstream.is_some() { - let points = self.choose_points_for_find_intersect(); + if self.chain.preferred_upstream.is_some() { + let points = self.chain.choose_points_for_find_intersect(); if !points.is_empty() && let Err(error) = conn.find_intersect(points) { @@ -92,7 +81,7 @@ impl NetworkManager { pub async fn sync_to_tip(&mut self) -> Result<()> { loop { - let Some(upstream) = self.preferred_upstream else { + let Some(upstream) = self.chain.preferred_upstream else { bail!("no peers"); }; let Some(peer) = self.peers.get(&upstream) else { @@ -124,78 +113,37 @@ impl NetworkManager { // or when publishing messages to other modules. This avoids deadlock; if our event queue // is full and this method is blocked on writing to it, the queue can never drain. // Returns true if we might have new events to publish downstream. - fn handle_peer_update(&mut self, peer: PeerId, event: PeerEvent) -> bool { - let is_preferred = self.preferred_upstream.is_some_and(|id| id == peer); + fn handle_peer_update(&mut self, peer: PeerId, event: PeerEvent) { match event { PeerEvent::ChainSync(PeerChainSyncEvent::RollForward(header)) => { - let id = header.hash; - let status = - self.blocks.entry(id).or_insert(BlockStatus::Announced(header, vec![])); - match status { - BlockStatus::Announced(header, peers) => { - peers.push(peer); - if is_preferred { - self.blocks_to_fetch.push_back(header.clone()); - // Request the block from the first peer which announced it - for announcer in peers.clone() { - let Some(peer) = self.peers.get(&announcer) else { - continue; - }; - if let Err(e) = peer.request_block(header.hash, header.slot) { - warn!("could not request block from {}: {e}", peer.address); - self.handle_disconnect(announcer); - } - break; // only fetch from one - } + let slot = header.slot; + let hash = header.hash; + let request_body_from = self.chain.handle_roll_forward(peer, header); + if !request_body_from.is_empty() { + // Request the block from the first peer which announced it + for announcer in request_body_from { + let Some(peer) = self.peers.get(&announcer) else { + continue; + }; + if let Err(e) = peer.request_block(hash, slot) { + warn!("could not request block from {}: {e}", peer.address); + self.handle_disconnect(announcer); } - false - } - BlockStatus::Fetched(_) => { - // If chainsync has requested a block which we've already fetched, - // we might be able to publish one or more. - is_preferred + break; // only fetch from one } } } PeerEvent::ChainSync(PeerChainSyncEvent::RollBackward(point)) => { - if is_preferred { - match point { - Point::Origin => { - self.rolled_back = !self.chain_prefix.is_empty(); - self.chain_prefix.clear(); - self.blocks_to_fetch.clear(); - } - Point::Specific(slot, _) => { - // don't bother fetching any blocks from after the rollback point - while self.blocks_to_fetch.back().is_some_and(|b| b.slot > slot) { - self.blocks_to_fetch.pop_back(); - } - - // If we're rolling back to before a block which we've emitted events for, - // set rolled_back to true so that we signal that in the next message. - while self - .chain_prefix - .back() - .is_some_and(|point| is_point_after(point, slot)) - { - self.chain_prefix.pop_back(); - self.rolled_back = true; - } - } - } + let rolled_back = self.chain.handle_roll_backward(peer, point); + if rolled_back { + self.rolled_back = true; } - false } PeerEvent::BlockFetched(fetched) => { - let Some(block) = self.blocks.get_mut(&fetched.hash) else { - return false; - }; - block.set_body(&fetched.body); - true + self.chain.handle_body_fetched(fetched.slot, fetched.hash, fetched.body); } PeerEvent::Disconnected => { self.handle_disconnect(peer); - false } } } @@ -205,7 +153,7 @@ impl NetworkManager { return; }; warn!("disconnected from {}", peer.address); - let was_preferred = self.preferred_upstream.is_some_and(|i| i == id); + let was_preferred = self.chain.preferred_upstream.is_some_and(|i| i == id); if was_preferred && let Some(new_preferred) = self.peers.keys().next().copied() { self.set_preferred_upstream(new_preferred); } @@ -223,11 +171,11 @@ impl NetworkManager { return; }; info!("setting preferred upstream to {}", peer.address); - self.preferred_upstream = Some(id); + self.chain.handle_new_preferred_upstream(id); // If our preferred upstream changed, resync all connections. // That will trigger a rollback if needed. - let points = self.choose_points_for_find_intersect(); + let points = self.chain.choose_points_for_find_intersect(); for peer in self.peers.values() { if let Err(error) = peer.find_intersect(points.clone()) { warn!("could not sync {}: {error:#}", peer.address) @@ -236,63 +184,16 @@ impl NetworkManager { } async fn publish_blocks(&mut self) -> Result<()> { - while let Some(header) = self.blocks_to_fetch.front() { - let Some(BlockStatus::Fetched(body)) = self.blocks.get(&header.hash) else { - break; - }; - self.block_sink.announce(header, body, self.rolled_back).await?; + while let Some((header, body, rolled_back)) = self.chain.next_unpublished_block() { + self.block_sink.announce(header, body, rolled_back).await?; self.published_blocks += 1; - if self.published_blocks.is_multiple_of(100) { + if self.published_blocks.is_multiple_of(1) { info!("Published block {}", header.number); } - let point = Point::Specific(header.slot, header.hash.to_vec()); - self.chain_prefix.push_back(point); - while self.chain_prefix.len() > self.security_param as usize { - self.chain_prefix.pop_front(); - } - self.rolled_back = false; - self.blocks_to_fetch.pop_front(); + self.chain.handle_block_published(); } Ok(()) } - - fn choose_points_for_find_intersect(&self) -> Vec { - let mut iterator = self.chain_prefix.iter().rev(); - let mut result = vec![]; - - // send the 5 most recent points - for _ in 0..5 { - if let Some(next) = iterator.next() { - result.push(next.clone()); - } - } - - // then 5 more points, spaced out by 10 block heights each - let mut iterator = iterator.step_by(10); - for _ in 0..5 { - if let Some(next) = iterator.next() { - result.push(next.clone()); - } - } - - // then 5 more points, spaced out by a total of 100 block heights each - // (in case of an implausibly long rollback) - let mut iterator = iterator.step_by(10); - for _ in 0..5 { - if let Some(next) = iterator.next() { - result.push(next.clone()); - } - } - - result - } -} - -const fn is_point_after(point: &Point, slot: u64) -> bool { - match point { - Point::Origin => false, - Point::Specific(s, _) => *s > slot, - } } pub enum NetworkEvent { @@ -317,15 +218,3 @@ impl PeerMessageSender { .context("network manager has shut down") } } - -enum BlockStatus { - Announced(Header, Vec), - Fetched(Vec), -} -impl BlockStatus { - fn set_body(&mut self, body: &[u8]) { - if let Self::Announced(_, _) = self { - *self = Self::Fetched(body.to_vec()); - } - } -} diff --git a/modules/peer_network_interface/src/peer_network_interface.rs b/modules/peer_network_interface/src/peer_network_interface.rs index 7bd9e870..ae87eead 100644 --- a/modules/peer_network_interface/src/peer_network_interface.rs +++ b/modules/peer_network_interface/src/peer_network_interface.rs @@ -1,3 +1,4 @@ +mod chain_state; mod configuration; mod connection; mod network; @@ -72,7 +73,7 @@ impl PeerNetworkInterface { }; let mut manager = - NetworkManager::new(cfg.magic_number, 2160, events, events_sender, sink); + NetworkManager::new(cfg.magic_number, events, events_sender, sink); for address in cfg.node_addresses { manager.handle_new_connection(address, Duration::ZERO); } From 61595347e294d76556f3d3e8805212af48658c16 Mon Sep 17 00:00:00 2001 From: Simon Gellis Date: Tue, 4 Nov 2025 15:45:46 -0500 Subject: [PATCH 09/15] fix: gracefully handle disconnection during in-flight block reqs --- .../peer_network_interface/src/chain_state.rs | 11 +- modules/peer_network_interface/src/network.rs | 100 ++++++++++++------ 2 files changed, 79 insertions(+), 32 deletions(-) diff --git a/modules/peer_network_interface/src/chain_state.rs b/modules/peer_network_interface/src/chain_state.rs index ae0c3125..a0a93b85 100644 --- a/modules/peer_network_interface/src/chain_state.rs +++ b/modules/peer_network_interface/src/chain_state.rs @@ -167,6 +167,9 @@ impl ChainState { for _ in 0..5 { if let Some((slot, hash)) = iterator.next() { result.push(Point::Specific(*slot, hash.to_vec())); + } else { + result.push(Point::Origin); + return result; } } @@ -175,6 +178,9 @@ impl ChainState { for _ in 0..5 { if let Some((slot, hash)) = iterator.next() { result.push(Point::Specific(*slot, hash.to_vec())); + } else { + result.push(Point::Origin); + return result; } } @@ -184,13 +190,16 @@ impl ChainState { for _ in 0..5 { if let Some((slot, hash)) = iterator.next() { result.push(Point::Specific(*slot, hash.to_vec())); + } else { + result.push(Point::Origin); + return result; } } result } - fn block_announcers(&self, slot: u64, hash: BlockHash) -> Vec { + pub fn block_announcers(&self, slot: u64, hash: BlockHash) -> Vec { match self.blocks.get(&slot) { Some(slot_blocks) => slot_blocks.announcers(hash), None => vec![] diff --git a/modules/peer_network_interface/src/network.rs b/modules/peer_network_interface/src/network.rs index 30c431ee..3af911e7 100644 --- a/modules/peer_network_interface/src/network.rs +++ b/modules/peer_network_interface/src/network.rs @@ -6,15 +6,48 @@ use std::{ use crate::{ BlockSink, chain_state::ChainState, connection::{PeerChainSyncEvent, PeerConnection, PeerEvent} }; +use acropolis_common::BlockHash; use anyhow::{Context as _, Result, bail}; use pallas::network::miniprotocols::Point; use tokio::sync::mpsc; use tracing::{info, warn}; +struct PeerData { + conn: PeerConnection, + reqs: Vec<(BlockHash, u64)>, +} +impl PeerData { + fn new(conn: PeerConnection) -> Self { + Self { + conn, + reqs: vec![] + } + } + + fn find_intersect(&self, points: Vec) { + if let Err(error) = self.conn.find_intersect(points) { + warn!("could not sync {}: {error:#}", self.conn.address); + } + } + + fn request_block(&mut self, hash: BlockHash, slot: u64) -> bool { + if let Err(error) = self.conn.request_block(hash, slot) { + warn!("could not request block from {}: {error:#}", self.conn.address); + return false; + } + self.reqs.push((hash, slot)); + true + } + + fn ack_block(&mut self, hash: BlockHash) { + self.reqs.retain(|(h, _)| *h != hash); + } +} + pub struct NetworkManager { network_magic: u64, next_id: u64, - peers: BTreeMap, + peers: BTreeMap, chain: ChainState, rolled_back: bool, events: mpsc::Receiver, @@ -65,16 +98,15 @@ impl NetworkManager { id, }; let conn = PeerConnection::new(address, self.network_magic, sender, delay); + let peer = PeerData::new(conn); if self.chain.preferred_upstream.is_some() { let points = self.chain.choose_points_for_find_intersect(); - if !points.is_empty() - && let Err(error) = conn.find_intersect(points) - { - warn!("could not sync {}: {error:#}", conn.address); + if !points.is_empty() { + peer.find_intersect(points); } - self.peers.insert(id, conn); + self.peers.insert(id, peer); } else { - self.peers.insert(id, conn); + self.peers.insert(id, peer); self.set_preferred_upstream(id); } } @@ -87,13 +119,13 @@ impl NetworkManager { let Some(peer) = self.peers.get(&upstream) else { bail!("preferred upstream not found"); }; - match peer.find_tip().await { + match peer.conn.find_tip().await { Ok(point) => { self.sync_to_point(point); return Ok(()); } Err(e) => { - warn!("could not fetch tip from {}: {e:#}", peer.address); + warn!("could not fetch tip from {}: {e:#}", peer.conn.address); self.handle_disconnect(upstream); } } @@ -102,9 +134,7 @@ impl NetworkManager { pub fn sync_to_point(&mut self, point: Point) { for peer in self.peers.values() { - if let Err(error) = peer.find_intersect(vec![point.clone()]) { - warn!("could not sync {}: {error:#}", peer.address); - } + peer.find_intersect(vec![point.clone()]); } } @@ -112,7 +142,6 @@ impl NetworkManager { // The task which handles network events should only block when waiting for new messages, // or when publishing messages to other modules. This avoids deadlock; if our event queue // is full and this method is blocked on writing to it, the queue can never drain. - // Returns true if we might have new events to publish downstream. fn handle_peer_update(&mut self, peer: PeerId, event: PeerEvent) { match event { PeerEvent::ChainSync(PeerChainSyncEvent::RollForward(header)) => { @@ -121,16 +150,7 @@ impl NetworkManager { let request_body_from = self.chain.handle_roll_forward(peer, header); if !request_body_from.is_empty() { // Request the block from the first peer which announced it - for announcer in request_body_from { - let Some(peer) = self.peers.get(&announcer) else { - continue; - }; - if let Err(e) = peer.request_block(hash, slot) { - warn!("could not request block from {}: {e}", peer.address); - self.handle_disconnect(announcer); - } - break; // only fetch from one - } + self.request_block(slot, hash, request_body_from); } } PeerEvent::ChainSync(PeerChainSyncEvent::RollBackward(point)) => { @@ -140,6 +160,9 @@ impl NetworkManager { } } PeerEvent::BlockFetched(fetched) => { + for peer in self.peers.values_mut() { + peer.ack_block(fetched.hash); + } self.chain.handle_body_fetched(fetched.slot, fetched.hash, fetched.body); } PeerEvent::Disconnected => { @@ -152,7 +175,7 @@ impl NetworkManager { let Some(peer) = self.peers.remove(&id) else { return; }; - warn!("disconnected from {}", peer.address); + warn!("disconnected from {}", peer.conn.address); let was_preferred = self.chain.preferred_upstream.is_some_and(|i| i == id); if was_preferred && let Some(new_preferred) = self.peers.keys().next().copied() { self.set_preferred_upstream(new_preferred); @@ -160,26 +183,41 @@ impl NetworkManager { if self.peers.is_empty() { warn!("no upstream peers!"); } - let address = peer.address.clone(); - drop(peer); + for (requested_hash, requested_slot) in peer.reqs { + let announcers = self.chain.block_announcers(requested_slot, requested_hash); + self.request_block(requested_slot, requested_hash, announcers); + } + + let address = peer.conn.address.clone(); self.handle_new_connection(address, Duration::from_secs(5)); } + fn request_block(&mut self, slot: u64, hash: BlockHash, announcers: Vec) { + for announcer in announcers { + let Some(peer) = self.peers.get_mut(&announcer) else { + continue; + }; + if peer.request_block(hash, slot) { + break; // only fetch from one + } else { + self.handle_disconnect(announcer); + } + } + } + fn set_preferred_upstream(&mut self, id: PeerId) { let Some(peer) = self.peers.get(&id) else { warn!("setting preferred upstream to unrecognized node {id:?}"); return; }; - info!("setting preferred upstream to {}", peer.address); + info!("setting preferred upstream to {}", peer.conn.address); self.chain.handle_new_preferred_upstream(id); // If our preferred upstream changed, resync all connections. // That will trigger a rollback if needed. let points = self.chain.choose_points_for_find_intersect(); for peer in self.peers.values() { - if let Err(error) = peer.find_intersect(points.clone()) { - warn!("could not sync {}: {error:#}", peer.address) - } + peer.find_intersect(points.clone()); } } @@ -187,7 +225,7 @@ impl NetworkManager { while let Some((header, body, rolled_back)) = self.chain.next_unpublished_block() { self.block_sink.announce(header, body, rolled_back).await?; self.published_blocks += 1; - if self.published_blocks.is_multiple_of(1) { + if self.published_blocks.is_multiple_of(100) { info!("Published block {}", header.number); } self.chain.handle_block_published(); From a91d460dae5eb16dfa7b7c75188ea9e6c8b0951e Mon Sep 17 00:00:00 2001 From: Simon Gellis Date: Tue, 4 Nov 2025 17:13:59 -0500 Subject: [PATCH 10/15] fix: correctly implement and test chain switching --- .../peer_network_interface/src/chain_state.rs | 314 ++++++++++++++++-- .../peer_network_interface/src/connection.rs | 2 +- modules/peer_network_interface/src/network.rs | 12 +- 3 files changed, 291 insertions(+), 37 deletions(-) diff --git a/modules/peer_network_interface/src/chain_state.rs b/modules/peer_network_interface/src/chain_state.rs index a0a93b85..086c16bf 100644 --- a/modules/peer_network_interface/src/chain_state.rs +++ b/modules/peer_network_interface/src/chain_state.rs @@ -5,13 +5,14 @@ use pallas::network::miniprotocols::Point; use crate::{connection::Header, network::PeerId}; +#[derive(Debug)] struct BlockData { header: Header, announced_by: Vec, body: Option>, } -#[derive(Default)] +#[derive(Debug, Default)] struct SlotBlockData { blocks: Vec, } @@ -24,10 +25,28 @@ impl SlotBlockData { } } - fn announced(&self, id: PeerId, hash: BlockHash) -> bool { + fn track_rollback(&mut self, id: PeerId) -> bool { + self.blocks.retain_mut(|block| { + block.announced_by.retain(|p| *p != id); + !block.announced_by.is_empty() + }); + !self.blocks.is_empty() + } + + fn was_hash_announced(&self, id: PeerId, hash: BlockHash) -> bool { self.blocks.iter().any(|b| b.header.hash == hash && b.announced_by.contains(&id)) } + fn find_announced_hash(&self, id: PeerId) -> Option { + self.blocks.iter().find_map(|b| { + if b.announced_by.contains(&id) { + Some(b.header.hash) + } else { + None + } + }) + } + fn announcers(&self, hash: BlockHash) -> Vec { match self.blocks.iter().find(|b| b.header.hash == hash) { Some(b) => b.announced_by.clone(), @@ -55,7 +74,7 @@ impl SlotBlockData { } } -#[derive(Default)] +#[derive(Debug, Default)] pub struct ChainState { pub preferred_upstream: Option, blocks: BTreeMap, @@ -77,42 +96,42 @@ impl ChainState { slot_blocks.track_announcement(id, header); if is_preferred { self.unpublished_blocks.push_back((slot, hash)); - self.block_announcers(slot, hash) - } else { - vec![] } + self.block_announcers(slot, hash) } - pub fn handle_roll_backward(&mut self, id: PeerId, point: Point) -> bool { + pub fn handle_roll_backward(&mut self, id: PeerId, point: Point) { let is_preferred = self.preferred_upstream == Some(id); - if !is_preferred { - return false; - } match point { Point::Origin => { - self.rolled_back = !self.published_blocks.is_empty(); - self.published_blocks.clear(); - self.unpublished_blocks.clear(); - self.rolled_back + self.blocks.retain(|_, b| b.track_rollback(id)); + if is_preferred { + if !self.published_blocks.is_empty() { + self.rolled_back = true; + } + self.published_blocks.clear(); + self.unpublished_blocks.clear(); + } } Point::Specific(slot, _) => { - while let Some((s, _)) = self.unpublished_blocks.back() { - if *s > slot { - self.unpublished_blocks.pop_back(); - } else { - break; + self.blocks.retain(|s, b| *s <= slot || b.track_rollback(id)); + if is_preferred { + while let Some((s, _)) = self.unpublished_blocks.back() { + if *s > slot { + self.unpublished_blocks.pop_back(); + } else { + break; + } } - } - self.rolled_back = false; - while let Some((s, _)) = self.published_blocks.back() { - if *s > slot { - self.rolled_back = true; - self.published_blocks.pop_back(); - } else { - break; + while let Some((s, _)) = self.published_blocks.back() { + if *s > slot { + self.rolled_back = true; + self.published_blocks.pop_back(); + } else { + break; + } } } - self.rolled_back } } } @@ -129,12 +148,42 @@ impl ChainState { return; } self.preferred_upstream = Some(id); + + // If there are any blocks queued to be published which our preferred upstream never announced, + // unqueue them now. while let Some((slot, hash)) = self.unpublished_blocks.back() { let Some(slot_blocks) = self.blocks.get(slot) else { break; }; - if !slot_blocks.announced(id, *hash) { + if !slot_blocks.was_hash_announced(id, *hash) { self.unpublished_blocks.pop_back(); + } else { + break; + } + } + + // If we published any blocks which our preferred upstream never announced, + // we'll have to publish that we rolled them back + while let Some((slot, hash)) = self.published_blocks.back() { + let Some(slot_blocks) = self.blocks.get(slot) else { + break; + }; + if !slot_blocks.was_hash_announced(id, *hash) { + self.rolled_back = true; + self.published_blocks.pop_back(); + } else { + break; + } + } + + // If this other chain has announced blocks which we haven't published yet, + // queue them to be published as soon as we have their bodies + let head_slot = self.published_blocks.back().map(|(s, _)| *s); + if let Some(slot) = head_slot { + for (slot, blocks) in self.blocks.range(slot+1..) { + if let Some(hash) = blocks.find_announced_hash(id) { + self.unpublished_blocks.push_back((*slot, hash)); + } } } } @@ -205,4 +254,211 @@ impl ChainState { None => vec![] } } +} + +#[cfg(test)] +mod tests { + use acropolis_common::Era; + use pallas::crypto::hash::Hasher; + + use super::*; + + fn make_block(slot: u64, desc: &str) -> (Header, Vec) { + let mut hasher = Hasher::<256>::new(); + hasher.input(&slot.to_le_bytes()); + hasher.input(desc.as_bytes()); + let hash = BlockHash::new(*hasher.finalize()); + let header = Header { + hash, + slot, + number: slot, + bytes: desc.as_bytes().to_vec(), + era: Era::Conway, + }; + let body = desc.as_bytes().to_vec(); + (header, body) + } + + #[test] + fn should_work_in_happy_path() { + let mut state = ChainState::new(); + let peer = PeerId(0); + state.handle_new_preferred_upstream(peer); + + let (header, body) = make_block(0, "first block"); + + // simulate a roll forward from our peer + let announced = state.handle_roll_forward(peer, header.clone()); + assert_eq!(announced, vec![peer]); + + // we don't have any new blocks to report yet + assert_eq!(state.next_unpublished_block(), None); + + // report that our peer returned the body + state.handle_body_fetched(header.slot, header.hash, body.clone()); + + // NOW we have a new block to report + assert_eq!(state.next_unpublished_block(), Some((&header, body.as_slice(), false))); + state.handle_block_published(); + assert_eq!(state.next_unpublished_block(), None); + } + + #[test] + fn should_handle_blocks_fetched_out_of_order() { + let mut state = ChainState::new(); + let p1 = PeerId(0); + state.handle_new_preferred_upstream(p1); + + let (h1, b1) = make_block(0, "first block"); + let (h2, b2) = make_block(1, "second block"); + + // simulate a roll forward + state.handle_roll_forward(p1, h1.clone()); + state.handle_roll_forward(p1, h2.clone()); + + // we don't have any new blocks to report yet + assert_eq!(state.next_unpublished_block(), None); + + // report that our peer returned the SECOND body first. + state.handle_body_fetched(h2.slot, h2.hash, b2.clone()); + + // without the first block, we can't use that yet. + assert_eq!(state.next_unpublished_block(), None); + + // but once it reports the first body... + state.handle_body_fetched(h1.slot, h1.hash, b1.clone()); + + // NOW we have TWO new blocks to report + assert_eq!(state.next_unpublished_block(), Some((&h1, b1.as_slice(), false))); + state.handle_block_published(); + assert_eq!(state.next_unpublished_block(), Some((&h2, b2.as_slice(), false))); + state.handle_block_published(); + assert_eq!(state.next_unpublished_block(), None); + } + + #[test] + fn should_handle_rollback() { + let mut state = ChainState::new(); + let p1 = PeerId(0); + state.handle_new_preferred_upstream(p1); + + let (h1, b1) = make_block(0, "first block"); + let (h2, b2) = make_block(1, "second block pre-rollback"); + let (h3, b3) = make_block(1, "second block post-rollback"); + let (h4, b4) = make_block(1, "third block post-rollback"); + + // publish the first block + assert_eq!(state.handle_roll_forward(p1, h1.clone()), vec![p1]); + state.handle_body_fetched(h1.slot, h1.hash, b1.clone()); + assert_eq!(state.next_unpublished_block(), Some((&h1, b1.as_slice(), false))); + state.handle_block_published(); + + // publish the second block + assert_eq!(state.handle_roll_forward(p1, h2.clone()), vec![p1]); + state.handle_body_fetched(h2.slot, h2.hash, b2.clone()); + assert_eq!(state.next_unpublished_block(), Some((&h2, b2.as_slice(), false))); + state.handle_block_published(); + assert_eq!(state.next_unpublished_block(), None); + + // now, roll the chain back to the first block + state.handle_roll_backward(p1, Point::Specific(h1.slot, h1.hash.to_vec())); + assert_eq!(state.next_unpublished_block(), None); + + // and when we advance to the new second block, the system should report it as a rollback + assert_eq!(state.handle_roll_forward(p1, h3.clone()), vec![p1]); + state.handle_body_fetched(h3.slot, h3.hash, b3.clone()); + assert_eq!(state.next_unpublished_block(), Some((&h3, b3.as_slice(), true))); + state.handle_block_published(); + + // and the new third block should not be a rollback + assert_eq!(state.handle_roll_forward(p1, h4.clone()), vec![p1]); + state.handle_body_fetched(h4.slot, h4.hash, b4.clone()); + assert_eq!(state.next_unpublished_block(), Some((&h4, b4.as_slice(), false))); + state.handle_block_published(); + } + + #[test] + fn should_not_report_rollback_for_unpublished_portion_of_chain() { + let mut state = ChainState::new(); + let p1 = PeerId(0); + state.handle_new_preferred_upstream(p1); + + let (h1, b1) = make_block(0, "first block"); + let (h2, b2) = make_block(1, "second block pre-rollback"); + let (h3, b3) = make_block(1, "second block post-rollback"); + + // publish the first block + assert_eq!(state.handle_roll_forward(p1, h1.clone()), vec![p1]); + state.handle_body_fetched(h1.slot, h1.hash, b1.clone()); + assert_eq!(state.next_unpublished_block(), Some((&h1, b1.as_slice(), false))); + state.handle_block_published(); + + // roll forward to the second block, but pretend the body is taking a while to download + assert_eq!(state.handle_roll_forward(p1, h2.clone()), vec![p1]); + + // oops, we just received a rollback + state.handle_roll_backward(p1, Point::Specific(h1.slot, h1.hash.to_vec())); + + // and THEN we got the second body + state.handle_body_fetched(h2.slot, h2.hash, b2.clone()); + + // don't publish the old second block, since it isn't part of the chain + assert_eq!(state.next_unpublished_block(), None); + + // and when we advance to the new second block, the system should not report it as a rollback + assert_eq!(state.handle_roll_forward(p1, h3.clone()), vec![p1]); + state.handle_body_fetched(h3.slot, h3.hash, b3.clone()); + assert_eq!(state.next_unpublished_block(), Some((&h3, b3.as_slice(), false))); + state.handle_block_published(); + assert_eq!(state.next_unpublished_block(), None); + } + + #[test] + fn should_gracefully_handle_switching_chains() { + let mut state = ChainState::new(); + let p1 = PeerId(0); + let p2 = PeerId(1); + state.handle_new_preferred_upstream(p1); + + let (h1, b1) = make_block(0, "first block"); + let (p1h2, p1b2) = make_block(1, "our preferred upstream's second block"); + let (p1h3, p1b3) = make_block(2, "our preferred upstream's third block"); + let (p2h2, p2b2) = make_block(1, "another upstream's second block"); + let (p2h3, p2b3) = make_block(2, "another upstream's third block"); + + // publish three blocks on our current chain + assert_eq!(state.handle_roll_forward(p1, h1.clone()), vec![p1]); + state.handle_body_fetched(h1.slot, h1.hash, b1.clone()); + assert_eq!(state.next_unpublished_block(), Some((&h1, b1.as_slice(), false))); + state.handle_block_published(); + + assert_eq!(state.handle_roll_forward(p1, p1h2.clone()), vec![p1]); + state.handle_body_fetched(p1h2.slot, p1h2.hash, p1b2.clone()); + assert_eq!(state.next_unpublished_block(), Some((&p1h2, p1b2.as_slice(), false))); + state.handle_block_published(); + + assert_eq!(state.handle_roll_forward(p1, p1h3.clone()), vec![p1]); + state.handle_body_fetched(p1h3.slot, p1h3.hash, p1b3.clone()); + assert_eq!(state.next_unpublished_block(), Some((&p1h3, p1b3.as_slice(), false))); + state.handle_block_published(); + + // that other chain forked + assert_eq!(state.handle_roll_forward(p2, h1.clone()), vec![p1, p2]); + assert_eq!(state.handle_roll_forward(p2, p2h2.clone()), vec![p2]); + state.handle_body_fetched(p2h2.slot, p2h2.hash, p2b2.clone()); + assert_eq!(state.handle_roll_forward(p2, p2h3.clone()), vec![p2]); + state.handle_body_fetched(p2h3.slot, p2h3.hash, p2b3.clone()); + assert_eq!(state.next_unpublished_block(), None); + + // and then we decided to switch to it + state.handle_new_preferred_upstream(p2); + + // now we should publish two blocks, and the first should be marked as "rollback" + assert_eq!(state.next_unpublished_block(), Some((&p2h2, p2b2.as_slice(), true))); + state.handle_block_published(); + assert_eq!(state.next_unpublished_block(), Some((&p2h3, p2b3.as_slice(), false))); + state.handle_block_published(); + assert_eq!(state.next_unpublished_block(), None); + + } } \ No newline at end of file diff --git a/modules/peer_network_interface/src/connection.rs b/modules/peer_network_interface/src/connection.rs index a2fdabb5..f8b8f148 100644 --- a/modules/peer_network_interface/src/connection.rs +++ b/modules/peer_network_interface/src/connection.rs @@ -74,7 +74,7 @@ pub enum PeerChainSyncEvent { RollBackward(Point), } -#[derive(Clone, Debug)] +#[derive(Clone, Debug, PartialEq, Eq)] pub struct Header { pub hash: BlockHash, pub slot: u64, diff --git a/modules/peer_network_interface/src/network.rs b/modules/peer_network_interface/src/network.rs index 3af911e7..e4a072e1 100644 --- a/modules/peer_network_interface/src/network.rs +++ b/modules/peer_network_interface/src/network.rs @@ -31,6 +31,9 @@ impl PeerData { } fn request_block(&mut self, hash: BlockHash, slot: u64) -> bool { + if self.reqs.contains(&(hash, slot)) { + return true; + } if let Err(error) = self.conn.request_block(hash, slot) { warn!("could not request block from {}: {error:#}", self.conn.address); return false; @@ -49,7 +52,6 @@ pub struct NetworkManager { next_id: u64, peers: BTreeMap, chain: ChainState, - rolled_back: bool, events: mpsc::Receiver, events_sender: mpsc::Sender, block_sink: BlockSink, @@ -68,7 +70,6 @@ impl NetworkManager { next_id: 0, peers: BTreeMap::new(), chain: ChainState::new(), - rolled_back: false, events, events_sender, block_sink, @@ -154,10 +155,7 @@ impl NetworkManager { } } PeerEvent::ChainSync(PeerChainSyncEvent::RollBackward(point)) => { - let rolled_back = self.chain.handle_roll_backward(peer, point); - if rolled_back { - self.rolled_back = true; - } + self.chain.handle_roll_backward(peer, point); } PeerEvent::BlockFetched(fetched) => { for peer in self.peers.values_mut() { @@ -239,7 +237,7 @@ pub enum NetworkEvent { } #[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)] -pub struct PeerId(u64); +pub struct PeerId(pub(crate) u64); pub struct PeerMessageSender { id: PeerId, From 21f7e678b3a2a7033035821fcb60049fc5545d6e Mon Sep 17 00:00:00 2001 From: Simon Gellis Date: Tue, 4 Nov 2025 20:18:12 -0500 Subject: [PATCH 11/15] docs: docs --- modules/peer_network_interface/NOTES.md | 18 ++++++++++++++++++ modules/peer_network_interface/README.md | 18 ++++++++++++++++++ .../peer_network_interface/config.default.toml | 13 ++++++++++++- 3 files changed, 48 insertions(+), 1 deletion(-) create mode 100644 modules/peer_network_interface/NOTES.md create mode 100644 modules/peer_network_interface/README.md diff --git a/modules/peer_network_interface/NOTES.md b/modules/peer_network_interface/NOTES.md new file mode 100644 index 00000000..626901fd --- /dev/null +++ b/modules/peer_network_interface/NOTES.md @@ -0,0 +1,18 @@ +# Architecture + +This module uses an event-queue-based architecture. A `NetworkManager` is responsible for creating a set of `PeerConnection`s and sending commands to them. Each `PeerConnection` maintains a connection to a single peer; it responds to commands from the `NetworkManager`, and emits events to an event queue. The `NetworkManager` reads from that queue to decide which chain to follow. When blocks from the preferred chain have been fetched, it publishes those blocks to the message bus. + +This module requests the body for every block announced by any chain, from the first chain which announced it. When it has the body for the next block announced + +```mermaid +graph LR + EQ[Event Queue]-->NM[NetworkManager] + subgraph Peers + P1[PeerConnection 1] + P2[PeerConnection 2] + P3[PeerConnection 3] + end + NM -->|RequestBlock
FindIntersect| P1 & P2 & P3 + Peers -->|ChainSync
BlockFetched
Disconnect|EQ + NM -->|BlockAvailable| MB[Message Bus] +``` \ No newline at end of file diff --git a/modules/peer_network_interface/README.md b/modules/peer_network_interface/README.md new file mode 100644 index 00000000..5e488841 --- /dev/null +++ b/modules/peer_network_interface/README.md @@ -0,0 +1,18 @@ +# Peer network interface module + +The peer network interface module uses the ChainSync and BlockFetch protocols to fetch blocks from one of several upstream sources. It chooses one peer to treat as the "preferred" chain to follow, but will gracefully switch which peer it follows during network issues. + +It can either run independently, either from the origin or current tip, or +be triggered by a Mithril snapshot event (the default) where it starts from +where the snapshot left off, and follows the chain from there. + +Rollbacks are handled by signalling in the block data - it is downstream +subscribers' responsibility to deal with the effects of this. + +## Configuration + +See [./config.default.toml](./config.default.toml) for the available configuration options and their default values. + +## Messages + +This module publishes "raw block messages" to the configured `block-topic`. Each message includes the raw bytes composing the header and body of a block. The module follows the head of one chain at any given time, though that chain may switch during runtime. If that chain reports a rollback (or if this module switches to a different chain), the next message it emits will be the new head of the chain and have the status `RolledBack`. diff --git a/modules/peer_network_interface/config.default.toml b/modules/peer_network_interface/config.default.toml index ca4d15c7..c48d02dc 100644 --- a/modules/peer_network_interface/config.default.toml +++ b/modules/peer_network_interface/config.default.toml @@ -1,13 +1,24 @@ +# The topic to publish blocks on block-topic = "cardano.block.available" +# The topic to wait for when sync-point is "snapshot" snapshot-completion-topic = "cardano.snapshot.complete" +# The topic to wait for when listening for genesis values from another module genesis-completion-topic = "cardano.sequence.bootstrapped" +# Upstream node connections node-addresses = [ "backbone.cardano.iog.io:3001", "backbone.mainnet.cardanofoundation.org:3001", "backbone.mainnet.emurgornd.com:3001", ] +# The network magic for the chain to connect to magic-number = 764824073 -sync-point = "origin" +# The initial point to start syncing from. Options: +# - "origin": sync from the very start of the chain +# - "tip": sync from the very end of the chain +# - "cache": replay messages from a local filesystem cache, then sync from the point right after that cache. +# - "snapshot": wait for another module to restore from a snapshot, then sync from the point right after that snapshot. +sync-point = "snapshot" +# The cache dir to use when sync-point is "cache" cache-dir = "upstream-cache" \ No newline at end of file From 03ec42154f0b272a4b75feeeb22f204c6d20b813 Mon Sep 17 00:00:00 2001 From: Simon Gellis Date: Tue, 4 Nov 2025 20:20:06 -0500 Subject: [PATCH 12/15] fix: credit where credit is due --- modules/peer_network_interface/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modules/peer_network_interface/Cargo.toml b/modules/peer_network_interface/Cargo.toml index f9a0804a..c15ee1b2 100644 --- a/modules/peer_network_interface/Cargo.toml +++ b/modules/peer_network_interface/Cargo.toml @@ -4,7 +4,7 @@ name = "acropolis_module_peer_network_interface" version = "0.2.0" edition = "2024" -authors = ["Paul Clark "] +authors = ["Simon Gellis "] description = "Multiplexed chain fetcher Caryatid module for Acropolis" license = "Apache-2.0" From f9b1a4eee218b7a2f65b9844846515018c9bd60b Mon Sep 17 00:00:00 2001 From: Simon Gellis Date: Tue, 4 Nov 2025 20:24:01 -0500 Subject: [PATCH 13/15] fix: finish a sentence --- modules/peer_network_interface/NOTES.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modules/peer_network_interface/NOTES.md b/modules/peer_network_interface/NOTES.md index 626901fd..5d084cbe 100644 --- a/modules/peer_network_interface/NOTES.md +++ b/modules/peer_network_interface/NOTES.md @@ -2,7 +2,7 @@ This module uses an event-queue-based architecture. A `NetworkManager` is responsible for creating a set of `PeerConnection`s and sending commands to them. Each `PeerConnection` maintains a connection to a single peer; it responds to commands from the `NetworkManager`, and emits events to an event queue. The `NetworkManager` reads from that queue to decide which chain to follow. When blocks from the preferred chain have been fetched, it publishes those blocks to the message bus. -This module requests the body for every block announced by any chain, from the first chain which announced it. When it has the body for the next block announced +This module requests the body for every block announced by any chain, from the first chain which announced it. When it has the body for the next block announced, it will publish it to the message bus. ```mermaid graph LR From b0c5fc7fd1655eec4a5790495726382c8f188975 Mon Sep 17 00:00:00 2001 From: Simon Gellis Date: Tue, 4 Nov 2025 20:25:53 -0500 Subject: [PATCH 14/15] fix: run fmt on whole project --- common/src/upstream_cache.rs | 7 +- .../peer_network_interface/src/chain_state.rs | 85 ++++++++++++++----- modules/peer_network_interface/src/network.rs | 19 ++--- .../src/peer_network_interface.rs | 10 ++- 4 files changed, 85 insertions(+), 36 deletions(-) diff --git a/common/src/upstream_cache.rs b/common/src/upstream_cache.rs index 93bbf27f..c8cae2a3 100644 --- a/common/src/upstream_cache.rs +++ b/common/src/upstream_cache.rs @@ -124,7 +124,9 @@ impl UpstreamCacheImpl { pub fn write_record(&mut self, record: &UpstreamCacheRecord) -> Result<()> { self.chunk_cached.push(record.clone()); - self.storage.write_chunk(self.current_chunk, &self.chunk_cached).context("could not write cache record")?; + self.storage + .write_chunk(self.current_chunk, &self.chunk_cached) + .context("could not write cache record")?; self.current_record += 1; if self.current_record >= self.density { @@ -155,7 +157,8 @@ impl Storage for FileStorage { } fn write_chunk(&mut self, chunk_no: usize, data: &[UpstreamCacheRecord]) -> Result<()> { - let mut file = File::create(self.get_file_name(chunk_no)).context("could not write chunk")?; + let mut file = + File::create(self.get_file_name(chunk_no)).context("could not write chunk")?; file.write_all(&serde_json::to_vec(data)?)?; Ok(()) } diff --git a/modules/peer_network_interface/src/chain_state.rs b/modules/peer_network_interface/src/chain_state.rs index 086c16bf..74f75dd9 100644 --- a/modules/peer_network_interface/src/chain_state.rs +++ b/modules/peer_network_interface/src/chain_state.rs @@ -21,7 +21,11 @@ impl SlotBlockData { if let Some(block) = self.blocks.iter_mut().find(|b| b.header.hash == header.hash) { block.announced_by.push(id); } else { - self.blocks.push(BlockData { header, announced_by: vec![id], body: None }); + self.blocks.push(BlockData { + header, + announced_by: vec![id], + body: None, + }); } } @@ -50,7 +54,7 @@ impl SlotBlockData { fn announcers(&self, hash: BlockHash) -> Vec { match self.blocks.iter().find(|b| b.header.hash == hash) { Some(b) => b.announced_by.clone(), - None => vec![] + None => vec![], } } @@ -180,7 +184,7 @@ impl ChainState { // queue them to be published as soon as we have their bodies let head_slot = self.published_blocks.back().map(|(s, _)| *s); if let Some(slot) = head_slot { - for (slot, blocks) in self.blocks.range(slot+1..) { + for (slot, blocks) in self.blocks.range(slot + 1..) { if let Some(hash) = blocks.find_announced_hash(id) { self.unpublished_blocks.push_back((*slot, hash)); } @@ -251,7 +255,7 @@ impl ChainState { pub fn block_announcers(&self, slot: u64, hash: BlockHash) -> Vec { match self.blocks.get(&slot) { Some(slot_blocks) => slot_blocks.announcers(hash), - None => vec![] + None => vec![], } } } @@ -298,7 +302,10 @@ mod tests { state.handle_body_fetched(header.slot, header.hash, body.clone()); // NOW we have a new block to report - assert_eq!(state.next_unpublished_block(), Some((&header, body.as_slice(), false))); + assert_eq!( + state.next_unpublished_block(), + Some((&header, body.as_slice(), false)) + ); state.handle_block_published(); assert_eq!(state.next_unpublished_block(), None); } @@ -329,9 +336,15 @@ mod tests { state.handle_body_fetched(h1.slot, h1.hash, b1.clone()); // NOW we have TWO new blocks to report - assert_eq!(state.next_unpublished_block(), Some((&h1, b1.as_slice(), false))); + assert_eq!( + state.next_unpublished_block(), + Some((&h1, b1.as_slice(), false)) + ); state.handle_block_published(); - assert_eq!(state.next_unpublished_block(), Some((&h2, b2.as_slice(), false))); + assert_eq!( + state.next_unpublished_block(), + Some((&h2, b2.as_slice(), false)) + ); state.handle_block_published(); assert_eq!(state.next_unpublished_block(), None); } @@ -350,13 +363,19 @@ mod tests { // publish the first block assert_eq!(state.handle_roll_forward(p1, h1.clone()), vec![p1]); state.handle_body_fetched(h1.slot, h1.hash, b1.clone()); - assert_eq!(state.next_unpublished_block(), Some((&h1, b1.as_slice(), false))); + assert_eq!( + state.next_unpublished_block(), + Some((&h1, b1.as_slice(), false)) + ); state.handle_block_published(); // publish the second block assert_eq!(state.handle_roll_forward(p1, h2.clone()), vec![p1]); state.handle_body_fetched(h2.slot, h2.hash, b2.clone()); - assert_eq!(state.next_unpublished_block(), Some((&h2, b2.as_slice(), false))); + assert_eq!( + state.next_unpublished_block(), + Some((&h2, b2.as_slice(), false)) + ); state.handle_block_published(); assert_eq!(state.next_unpublished_block(), None); @@ -367,13 +386,19 @@ mod tests { // and when we advance to the new second block, the system should report it as a rollback assert_eq!(state.handle_roll_forward(p1, h3.clone()), vec![p1]); state.handle_body_fetched(h3.slot, h3.hash, b3.clone()); - assert_eq!(state.next_unpublished_block(), Some((&h3, b3.as_slice(), true))); + assert_eq!( + state.next_unpublished_block(), + Some((&h3, b3.as_slice(), true)) + ); state.handle_block_published(); // and the new third block should not be a rollback assert_eq!(state.handle_roll_forward(p1, h4.clone()), vec![p1]); state.handle_body_fetched(h4.slot, h4.hash, b4.clone()); - assert_eq!(state.next_unpublished_block(), Some((&h4, b4.as_slice(), false))); + assert_eq!( + state.next_unpublished_block(), + Some((&h4, b4.as_slice(), false)) + ); state.handle_block_published(); } @@ -390,7 +415,10 @@ mod tests { // publish the first block assert_eq!(state.handle_roll_forward(p1, h1.clone()), vec![p1]); state.handle_body_fetched(h1.slot, h1.hash, b1.clone()); - assert_eq!(state.next_unpublished_block(), Some((&h1, b1.as_slice(), false))); + assert_eq!( + state.next_unpublished_block(), + Some((&h1, b1.as_slice(), false)) + ); state.handle_block_published(); // roll forward to the second block, but pretend the body is taking a while to download @@ -408,7 +436,10 @@ mod tests { // and when we advance to the new second block, the system should not report it as a rollback assert_eq!(state.handle_roll_forward(p1, h3.clone()), vec![p1]); state.handle_body_fetched(h3.slot, h3.hash, b3.clone()); - assert_eq!(state.next_unpublished_block(), Some((&h3, b3.as_slice(), false))); + assert_eq!( + state.next_unpublished_block(), + Some((&h3, b3.as_slice(), false)) + ); state.handle_block_published(); assert_eq!(state.next_unpublished_block(), None); } @@ -429,17 +460,26 @@ mod tests { // publish three blocks on our current chain assert_eq!(state.handle_roll_forward(p1, h1.clone()), vec![p1]); state.handle_body_fetched(h1.slot, h1.hash, b1.clone()); - assert_eq!(state.next_unpublished_block(), Some((&h1, b1.as_slice(), false))); + assert_eq!( + state.next_unpublished_block(), + Some((&h1, b1.as_slice(), false)) + ); state.handle_block_published(); assert_eq!(state.handle_roll_forward(p1, p1h2.clone()), vec![p1]); state.handle_body_fetched(p1h2.slot, p1h2.hash, p1b2.clone()); - assert_eq!(state.next_unpublished_block(), Some((&p1h2, p1b2.as_slice(), false))); + assert_eq!( + state.next_unpublished_block(), + Some((&p1h2, p1b2.as_slice(), false)) + ); state.handle_block_published(); assert_eq!(state.handle_roll_forward(p1, p1h3.clone()), vec![p1]); state.handle_body_fetched(p1h3.slot, p1h3.hash, p1b3.clone()); - assert_eq!(state.next_unpublished_block(), Some((&p1h3, p1b3.as_slice(), false))); + assert_eq!( + state.next_unpublished_block(), + Some((&p1h3, p1b3.as_slice(), false)) + ); state.handle_block_published(); // that other chain forked @@ -454,11 +494,16 @@ mod tests { state.handle_new_preferred_upstream(p2); // now we should publish two blocks, and the first should be marked as "rollback" - assert_eq!(state.next_unpublished_block(), Some((&p2h2, p2b2.as_slice(), true))); + assert_eq!( + state.next_unpublished_block(), + Some((&p2h2, p2b2.as_slice(), true)) + ); state.handle_block_published(); - assert_eq!(state.next_unpublished_block(), Some((&p2h3, p2b3.as_slice(), false))); + assert_eq!( + state.next_unpublished_block(), + Some((&p2h3, p2b3.as_slice(), false)) + ); state.handle_block_published(); assert_eq!(state.next_unpublished_block(), None); - } -} \ No newline at end of file +} diff --git a/modules/peer_network_interface/src/network.rs b/modules/peer_network_interface/src/network.rs index e4a072e1..e4e26e4d 100644 --- a/modules/peer_network_interface/src/network.rs +++ b/modules/peer_network_interface/src/network.rs @@ -1,10 +1,9 @@ -use std::{ - collections::BTreeMap, - time::Duration, -}; +use std::{collections::BTreeMap, time::Duration}; use crate::{ - BlockSink, chain_state::ChainState, connection::{PeerChainSyncEvent, PeerConnection, PeerEvent} + BlockSink, + chain_state::ChainState, + connection::{PeerChainSyncEvent, PeerConnection, PeerEvent}, }; use acropolis_common::BlockHash; use anyhow::{Context as _, Result, bail}; @@ -18,10 +17,7 @@ struct PeerData { } impl PeerData { fn new(conn: PeerConnection) -> Self { - Self { - conn, - reqs: vec![] - } + Self { conn, reqs: vec![] } } fn find_intersect(&self, points: Vec) { @@ -35,7 +31,10 @@ impl PeerData { return true; } if let Err(error) = self.conn.request_block(hash, slot) { - warn!("could not request block from {}: {error:#}", self.conn.address); + warn!( + "could not request block from {}: {error:#}", + self.conn.address + ); return false; } self.reqs.push((hash, slot)); diff --git a/modules/peer_network_interface/src/peer_network_interface.rs b/modules/peer_network_interface/src/peer_network_interface.rs index ae87eead..5bc39af1 100644 --- a/modules/peer_network_interface/src/peer_network_interface.rs +++ b/modules/peer_network_interface/src/peer_network_interface.rs @@ -4,7 +4,10 @@ mod connection; mod network; use acropolis_common::{ - BlockInfo, BlockStatus, genesis_values::GenesisValues, messages::{CardanoMessage, Message, RawBlockMessage}, upstream_cache::{UpstreamCache, UpstreamCacheRecord} + BlockInfo, BlockStatus, + genesis_values::GenesisValues, + messages::{CardanoMessage, Message, RawBlockMessage}, + upstream_cache::{UpstreamCache, UpstreamCacheRecord}, }; use anyhow::{Result, bail}; use caryatid_sdk::{Context, Module, Subscription, module}; @@ -72,8 +75,7 @@ impl PeerNetworkInterface { upstream_cache, }; - let mut manager = - NetworkManager::new(cfg.magic_number, events, events_sender, sink); + let mut manager = NetworkManager::new(cfg.magic_number, events, events_sender, sink); for address in cfg.node_addresses { manager.handle_new_connection(address, Duration::ZERO); } @@ -94,7 +96,7 @@ impl PeerNetworkInterface { Ok(point) => manager.sync_to_point(point), Err(error) => { warn!("snapshot restoration never completed: {error:#}"); - return; + return; } } } From 1b857075bd4d44ed38d943fd00d26b2bc36bf48c Mon Sep 17 00:00:00 2001 From: Simon Gellis Date: Wed, 5 Nov 2025 09:40:15 -0500 Subject: [PATCH 15/15] fix: gracefully handle connecting to lagging-behind peer --- .../peer_network_interface/src/chain_state.rs | 9 ------ .../peer_network_interface/src/connection.rs | 6 +++- modules/peer_network_interface/src/network.rs | 28 +++++++++---------- 3 files changed, 18 insertions(+), 25 deletions(-) diff --git a/modules/peer_network_interface/src/chain_state.rs b/modules/peer_network_interface/src/chain_state.rs index 74f75dd9..3ec72e09 100644 --- a/modules/peer_network_interface/src/chain_state.rs +++ b/modules/peer_network_interface/src/chain_state.rs @@ -220,9 +220,6 @@ impl ChainState { for _ in 0..5 { if let Some((slot, hash)) = iterator.next() { result.push(Point::Specific(*slot, hash.to_vec())); - } else { - result.push(Point::Origin); - return result; } } @@ -231,9 +228,6 @@ impl ChainState { for _ in 0..5 { if let Some((slot, hash)) = iterator.next() { result.push(Point::Specific(*slot, hash.to_vec())); - } else { - result.push(Point::Origin); - return result; } } @@ -243,9 +237,6 @@ impl ChainState { for _ in 0..5 { if let Some((slot, hash)) = iterator.next() { result.push(Point::Specific(*slot, hash.to_vec())); - } else { - result.push(Point::Origin); - return result; } } diff --git a/modules/peer_network_interface/src/connection.rs b/modules/peer_network_interface/src/connection.rs index f8b8f148..ec4e38c8 100644 --- a/modules/peer_network_interface/src/connection.rs +++ b/modules/peer_network_interface/src/connection.rs @@ -72,6 +72,7 @@ pub enum PeerEvent { pub enum PeerChainSyncEvent { RollForward(Header), RollBackward(Point), + IntersectNotFound(Point), } #[derive(Clone, Debug, PartialEq, Eq)] @@ -146,8 +147,11 @@ impl PeerConnectionWorker { }; match cmd { ChainsyncCommand::FindIntersect(points) => { - let (point, _) = client.find_intersect(points).await?; + let (point, tip) = client.find_intersect(points).await?; reached = point; + if reached.is_none() { + self.sender.write(PeerEvent::ChainSync(PeerChainSyncEvent::IntersectNotFound(tip.0))).await?; + } } ChainsyncCommand::FindTip(done) => { let points = reached.as_slice().to_vec(); diff --git a/modules/peer_network_interface/src/network.rs b/modules/peer_network_interface/src/network.rs index e4e26e4d..ff7f494a 100644 --- a/modules/peer_network_interface/src/network.rs +++ b/modules/peer_network_interface/src/network.rs @@ -99,14 +99,12 @@ impl NetworkManager { }; let conn = PeerConnection::new(address, self.network_magic, sender, delay); let peer = PeerData::new(conn); - if self.chain.preferred_upstream.is_some() { - let points = self.chain.choose_points_for_find_intersect(); - if !points.is_empty() { - peer.find_intersect(points); - } - self.peers.insert(id, peer); - } else { - self.peers.insert(id, peer); + let points = self.chain.choose_points_for_find_intersect(); + if !points.is_empty() { + peer.find_intersect(points); + } + self.peers.insert(id, peer); + if self.chain.preferred_upstream.is_none() { self.set_preferred_upstream(id); } } @@ -156,6 +154,13 @@ impl NetworkManager { PeerEvent::ChainSync(PeerChainSyncEvent::RollBackward(point)) => { self.chain.handle_roll_backward(peer, point); } + PeerEvent::ChainSync(PeerChainSyncEvent::IntersectNotFound(tip)) => { + // We called find_intersect on a peer, and it didn't recognize any of the points we passed. + // That peer must either be behind us or on a different fork; either way, that chain should sync from its own tip + if let Some(peer) = self.peers.get(&peer) { + peer.find_intersect(vec![tip]); + } + } PeerEvent::BlockFetched(fetched) => { for peer in self.peers.values_mut() { peer.ack_block(fetched.hash); @@ -209,13 +214,6 @@ impl NetworkManager { }; info!("setting preferred upstream to {}", peer.conn.address); self.chain.handle_new_preferred_upstream(id); - - // If our preferred upstream changed, resync all connections. - // That will trigger a rollback if needed. - let points = self.chain.choose_points_for_find_intersect(); - for peer in self.peers.values() { - peer.find_intersect(points.clone()); - } } async fn publish_blocks(&mut self) -> Result<()> {