From 60e8609150fa3869a5d5ddc34fe5d7cdc8383f4e Mon Sep 17 00:00:00 2001 From: buddhisthead Date: Mon, 3 Nov 2025 07:42:23 -0800 Subject: [PATCH 01/12] Rebased on main --- common/src/messages.rs | 6 + .../src/snapshot_bootstrapper.rs | 240 +++++++++++++++++- 2 files changed, 232 insertions(+), 14 deletions(-) diff --git a/common/src/messages.rs b/common/src/messages.rs index 25b24fea..ef8e82ef 100644 --- a/common/src/messages.rs +++ b/common/src/messages.rs @@ -57,6 +57,12 @@ pub struct RawTxsMessage { pub txs: Vec>, } +/// Bootup completion message, sent by any module that has finished its startup sequence, +/// e.g. GenesisBootstrapper after sending all genesis UTxOs or SnapshotBootstrapper after +/// loading a snapshot. +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct BootupCompleteMessage {} + /// Genesis completion message #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub struct GenesisCompleteMessage { diff --git a/modules/snapshot_bootstrapper/src/snapshot_bootstrapper.rs b/modules/snapshot_bootstrapper/src/snapshot_bootstrapper.rs index b9e61b20..a31e7cfc 100644 --- a/modules/snapshot_bootstrapper/src/snapshot_bootstrapper.rs +++ b/modules/snapshot_bootstrapper/src/snapshot_bootstrapper.rs @@ -1,16 +1,41 @@ use std::sync::Arc; use acropolis_common::{ - ledger_state::LedgerState, - messages::{Message, SnapshotMessage, SnapshotStateMessage}, + genesis_values::GenesisValues, + messages::{CardanoMessage, GenesisCompleteMessage, Message}, + snapshot::{ + streaming_snapshot::{ + DRepCallback, DRepInfo, GovernanceProposal, PoolCallback, PoolInfo, ProposalCallback, + SnapshotCallbacks, SnapshotMetadata, StakeCallback, UtxoCallback, UtxoEntry, + }, + StreamingSnapshotParser, + }, + stake_addresses::AccountState, + BlockHash, BlockInfo, BlockStatus, Era, }; -use anyhow::{Context as AnyhowContext, Result}; +use anyhow::Result; use caryatid_sdk::{module, Context, Module}; use config::Config; +use tokio::time::Instant; use tracing::{error, info, info_span, Instrument}; const DEFAULT_SNAPSHOT_TOPIC: &str = "cardano.snapshot"; const DEFAULT_STARTUP_TOPIC: &str = "cardano.sequence.start"; +const DEFAULT_COMPLETION_TOPIC: &str = "cardano.sequence.bootstrapped"; + +/// Callback handler that accumulates snapshot data and builds state +struct SnapshotHandler { + context: Arc>, + snapshot_topic: String, + + // Accumulated data from callbacks + metadata: Option, + utxo_count: u64, + pools: Vec, + accounts: Vec, + dreps: Vec, + proposals: Vec, +} #[module( message_type(Message), @@ -19,15 +44,172 @@ const DEFAULT_STARTUP_TOPIC: &str = "cardano.sequence.start"; )] pub struct SnapshotBootstrapper; +impl SnapshotHandler { + fn new(context: Arc>, snapshot_topic: String) -> Self { + Self { + context, + snapshot_topic, + metadata: None, + utxo_count: 0, + pools: Vec::new(), + accounts: Vec::new(), + dreps: Vec::new(), + proposals: Vec::new(), + } + } + + /// Build BlockInfo from accumulated metadata + fn build_block_info(&self) -> Result { + let metadata = + self.metadata.as_ref().ok_or_else(|| anyhow::anyhow!("No metadata available"))?; + + // Create a synthetic BlockInfo representing the snapshot state + // This represents the last block included in the snapshot + Ok(BlockInfo { + status: BlockStatus::Immutable, // Snapshot blocks are immutable + slot: 0, // TODO: Extract from snapshot metadata if available + number: 0, // TODO: Extract from snapshot metadata if available + hash: BlockHash::default(), // TODO: Extract from snapshot metadata if available + epoch: metadata.epoch, + epoch_slot: 0, // TODO: Extract from snapshot metadata if available + new_epoch: false, // Not necessarily a new epoch + timestamp: 0, // TODO: Extract from snapshot metadata if available + era: Era::Conway, // TODO: Determine from snapshot or config + }) + } + + /// Build GenesisValues from snapshot data + fn build_genesis_values(&self) -> Result { + // TODO: These values should ideally come from the snapshot or configuration + // For now, using defaults for Conway era + Ok(GenesisValues { + byron_timestamp: 1506203091, // Byron mainnet genesis timestamp + shelley_epoch: 208, // Shelley started at epoch 208 on mainnet + shelley_epoch_len: 432000, // 5 days in seconds + shelley_genesis_hash: [ + // Shelley mainnet genesis hash (placeholder - should be from config) + 0x1a, 0x3d, 0x98, 0x7a, 0x95, 0xad, 0xd2, 0x3e, 0x4f, 0x4d, 0x2d, 0x78, 0x74, 0x9f, + 0x96, 0x65, 0xd4, 0x1e, 0x48, 0x3e, 0xf2, 0xa2, 0x22, 0x9c, 0x4b, 0x0b, 0xf3, 0x9f, + 0xad, 0x7d, 0x5e, 0x27, + ], + }) + } + + async fn publish_completion( + &self, + block_info: BlockInfo, + genesis_values: GenesisValues, + ) -> Result<()> { + let message = Message::Cardano(( + block_info, + CardanoMessage::GenesisComplete(GenesisCompleteMessage { + values: genesis_values, + }), + )); + + self.context + .message_bus + .publish(&self.snapshot_topic, Arc::new(message)) + .await + .map_err(|e| anyhow::anyhow!("Failed to publish completion: {}", e)) + } +} + +impl UtxoCallback for SnapshotHandler { + fn on_utxo(&mut self, _utxo: UtxoEntry) -> Result<()> { + self.utxo_count += 1; + + // Log progress every million UTXOs + if self.utxo_count % 1_000_000 == 0 { + info!("Processed {} UTXOs", self.utxo_count); + } + // TODO: Accumulate UTXO data if needed or send in chunks to UTXOState processor + Ok(()) + } +} + +impl PoolCallback for SnapshotHandler { + fn on_pools(&mut self, pools: Vec) -> Result<()> { + info!("Received {} pools", pools.len()); + self.pools = pools; + // TODO: Publish pool data. + Ok(()) + } +} + +impl StakeCallback for SnapshotHandler { + fn on_accounts(&mut self, accounts: Vec) -> Result<()> { + info!("Received {} accounts", accounts.len()); + self.accounts = accounts; + // TODO: Publish account data. + Ok(()) + } +} + +impl DRepCallback for SnapshotHandler { + fn on_dreps(&mut self, dreps: Vec) -> Result<()> { + info!("Received {} DReps", dreps.len()); + self.dreps = dreps; + // TODO: Publish DRep data. + Ok(()) + } +} + +impl ProposalCallback for SnapshotHandler { + fn on_proposals(&mut self, proposals: Vec) -> Result<()> { + info!("Received {} proposals", proposals.len()); + self.proposals = proposals; + // TODO: Publish proposal data. + Ok(()) + } +} + +impl SnapshotCallbacks for SnapshotHandler { + fn on_metadata(&mut self, metadata: SnapshotMetadata) -> Result<()> { + info!("Received snapshot metadata for epoch {}", metadata.epoch); + info!(" - UTXOs: {:?}", metadata.utxo_count); + info!( + " - Pot balances: treasury={}, reserves={}, deposits={}", + metadata.pot_balances.treasury, + metadata.pot_balances.reserves, + metadata.pot_balances.deposits + ); + info!( + " - Previous epoch blocks: {}", + metadata.blocks_previous_epoch.len() + ); + info!( + " - Current epoch blocks: {}", + metadata.blocks_current_epoch.len() + ); + + self.metadata = Some(metadata); + Ok(()) + } + + fn on_complete(&mut self) -> Result<()> { + info!("Snapshot parsing completed"); + info!("Final statistics:"); + info!(" - UTXOs processed: {}", self.utxo_count); + info!(" - Pools: {}", self.pools.len()); + info!(" - Accounts: {}", self.accounts.len()); + info!(" - DReps: {}", self.dreps.len()); + info!(" - Proposals: {}", self.proposals.len()); + + // We could send a Resolver reference from here for large data, i.e. the UTXO set, + // which could be a file reference. For a file reference, we'd extend the parser to + // give us a callback value with the offset into the file; and we'd make the streaming + // UTXO parser public and reusable, adding it to the resolver implementation. + Ok(()) + } +} + impl SnapshotBootstrapper { pub async fn init(&self, context: Arc>, config: Arc) -> Result<()> { let file_path = config .get_string("snapshot-path") .inspect_err(|e| error!("failed to find snapshot-path config: {e}"))?; - let ledger_state = - LedgerState::from_directory(file_path).context("failed to load ledger state")?; - let startup_topic = config.get_string("startup-topic").unwrap_or(DEFAULT_STARTUP_TOPIC.to_string()); @@ -35,7 +217,12 @@ impl SnapshotBootstrapper { config.get_string("snapshot-topic").unwrap_or(DEFAULT_SNAPSHOT_TOPIC.to_string()); info!("Publishing snapshots on '{snapshot_topic}'"); + let completion_topic = + config.get_string("completion-topic").unwrap_or(DEFAULT_COMPLETION_TOPIC.to_string()); + info!("Completing with '{completion_topic}'"); + let mut subscription = context.subscribe(&startup_topic).await?; + context.clone().run(async move { let Ok(_) = subscription.read().await else { return; @@ -44,14 +231,11 @@ impl SnapshotBootstrapper { let span = info_span!("snapshot_bootstrapper.handle"); async { - let spo_state_message = Message::Snapshot(SnapshotMessage::Bootstrap( - SnapshotStateMessage::SPOState(ledger_state.spo_state), - )); - context - .message_bus - .publish(&snapshot_topic, Arc::new(spo_state_message)) - .await - .unwrap_or_else(|e| error!("failed to publish: {e}")); + if let Err(e) = + Self::process_snapshot(&file_path, context.clone(), &completion_topic).await + { + error!("Failed to process snapshot: {}", e); + } } .instrument(span) .await; @@ -59,4 +243,32 @@ impl SnapshotBootstrapper { Ok(()) } + + async fn process_snapshot( + file_path: &str, + context: Arc>, + completion_topic: &str, + ) -> Result<()> { + let parser = StreamingSnapshotParser::new(file_path); + let mut callbacks = SnapshotHandler::new(context.clone(), completion_topic.to_string()); + + info!("Starting snapshot parsing from: {}", file_path); + let start = Instant::now(); + + // Parse the snapshot with our callback handler + parser.parse(&mut callbacks)?; + + let duration = start.elapsed(); + info!("✓ Parse completed successfully in {:.2?}", duration); + + // Build the final state from accumulated data + let block_info = callbacks.build_block_info()?; + let genesis_values = callbacks.build_genesis_values()?; + + // Publish completion message to trigger next phase (e.g., Mithril) + callbacks.publish_completion(block_info, genesis_values).await?; + + info!("Snapshot bootstrap completed successfully"); + Ok(()) + } } From 1815afd246381f000ec57730298079ff11fc4d2a Mon Sep 17 00:00:00 2001 From: buddhisthead Date: Wed, 22 Oct 2025 12:15:54 -0700 Subject: [PATCH 02/12] Remove a message type that was just for protoyping --- common/src/messages.rs | 6 ------ 1 file changed, 6 deletions(-) diff --git a/common/src/messages.rs b/common/src/messages.rs index ef8e82ef..25b24fea 100644 --- a/common/src/messages.rs +++ b/common/src/messages.rs @@ -57,12 +57,6 @@ pub struct RawTxsMessage { pub txs: Vec>, } -/// Bootup completion message, sent by any module that has finished its startup sequence, -/// e.g. GenesisBootstrapper after sending all genesis UTxOs or SnapshotBootstrapper after -/// loading a snapshot. -#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] -pub struct BootupCompleteMessage {} - /// Genesis completion message #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub struct GenesisCompleteMessage { From f444ae6939daa89e75c2136ad8dafc179c7d3f2c Mon Sep 17 00:00:00 2001 From: buddhisthead Date: Wed, 22 Oct 2025 13:16:34 -0700 Subject: [PATCH 03/12] Apply some co-pilot feedback that seemed nice --- .../snapshot_bootstrapper/src/snapshot_bootstrapper.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/modules/snapshot_bootstrapper/src/snapshot_bootstrapper.rs b/modules/snapshot_bootstrapper/src/snapshot_bootstrapper.rs index a31e7cfc..4eb7500e 100644 --- a/modules/snapshot_bootstrapper/src/snapshot_bootstrapper.rs +++ b/modules/snapshot_bootstrapper/src/snapshot_bootstrapper.rs @@ -131,7 +131,7 @@ impl UtxoCallback for SnapshotHandler { impl PoolCallback for SnapshotHandler { fn on_pools(&mut self, pools: Vec) -> Result<()> { info!("Received {} pools", pools.len()); - self.pools = pools; + self.pools.extend(pools); // TODO: Publish pool data. Ok(()) } @@ -140,7 +140,7 @@ impl PoolCallback for SnapshotHandler { impl StakeCallback for SnapshotHandler { fn on_accounts(&mut self, accounts: Vec) -> Result<()> { info!("Received {} accounts", accounts.len()); - self.accounts = accounts; + self.accounts.extend(accounts); // TODO: Publish account data. Ok(()) } @@ -149,7 +149,7 @@ impl StakeCallback for SnapshotHandler { impl DRepCallback for SnapshotHandler { fn on_dreps(&mut self, dreps: Vec) -> Result<()> { info!("Received {} DReps", dreps.len()); - self.dreps = dreps; + self.dreps.extend(dreps); // TODO: Publish DRep data. Ok(()) } @@ -158,7 +158,7 @@ impl DRepCallback for SnapshotHandler { impl ProposalCallback for SnapshotHandler { fn on_proposals(&mut self, proposals: Vec) -> Result<()> { info!("Received {} proposals", proposals.len()); - self.proposals = proposals; + self.proposals.extend(proposals); // TODO: Publish proposal data. Ok(()) } From d60382be7953e53742a2077052f3aeb32da4c0bb Mon Sep 17 00:00:00 2001 From: buddhisthead Date: Fri, 24 Oct 2025 10:40:47 -0700 Subject: [PATCH 04/12] Add publish Start message for snapshot parsing/publishing for synchronization with consumer modules --- common/src/messages.rs | 1 + .../src/snapshot_bootstrapper.rs | 17 +++++++++++++++-- 2 files changed, 16 insertions(+), 2 deletions(-) diff --git a/common/src/messages.rs b/common/src/messages.rs index 25b24fea..03b47e47 100644 --- a/common/src/messages.rs +++ b/common/src/messages.rs @@ -311,6 +311,7 @@ pub enum CardanoMessage { #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub enum SnapshotMessage { + Startup(), // subscirbers should listen for incremental snapshot data Bootstrap(SnapshotStateMessage), DumpRequest(SnapshotDumpMessage), Dump(SnapshotStateMessage), diff --git a/modules/snapshot_bootstrapper/src/snapshot_bootstrapper.rs b/modules/snapshot_bootstrapper/src/snapshot_bootstrapper.rs index 4eb7500e..eaa1e052 100644 --- a/modules/snapshot_bootstrapper/src/snapshot_bootstrapper.rs +++ b/modules/snapshot_bootstrapper/src/snapshot_bootstrapper.rs @@ -95,6 +95,16 @@ impl SnapshotHandler { }) } + async fn publish_start( + &self, + ) -> Result<()> { + self.context + .message_bus + .publish(&self.snapshot_topic, Arc::new(Message::Snapshot(acropolis_common::messages::SnapshotMessage::Startup()))) + .await + .map_err(|e| anyhow::anyhow!("Failed to publish completion: {}", e)) + } + async fn publish_completion( &self, block_info: BlockInfo, @@ -151,6 +161,7 @@ impl DRepCallback for SnapshotHandler { info!("Received {} DReps", dreps.len()); self.dreps.extend(dreps); // TODO: Publish DRep data. + Ok(()) } } @@ -252,14 +263,16 @@ impl SnapshotBootstrapper { let parser = StreamingSnapshotParser::new(file_path); let mut callbacks = SnapshotHandler::new(context.clone(), completion_topic.to_string()); - info!("Starting snapshot parsing from: {}", file_path); + info!("Starting snapshot parsing and publishing from: {}", file_path); let start = Instant::now(); + callbacks.publish_start().await?; + // Parse the snapshot with our callback handler parser.parse(&mut callbacks)?; let duration = start.elapsed(); - info!("✓ Parse completed successfully in {:.2?}", duration); + info!("✓ Parse and publish completed successfully in {:.2?}", duration); // Build the final state from accumulated data let block_info = callbacks.build_block_info()?; From 26d6b01055222dbad7b9203ca33e1dca818e3f0f Mon Sep 17 00:00:00 2001 From: buddhisthead Date: Sun, 2 Nov 2025 21:06:20 -0800 Subject: [PATCH 05/12] Format --- .../src/snapshot_bootstrapper.rs | 21 +++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/modules/snapshot_bootstrapper/src/snapshot_bootstrapper.rs b/modules/snapshot_bootstrapper/src/snapshot_bootstrapper.rs index eaa1e052..0fc87913 100644 --- a/modules/snapshot_bootstrapper/src/snapshot_bootstrapper.rs +++ b/modules/snapshot_bootstrapper/src/snapshot_bootstrapper.rs @@ -95,12 +95,15 @@ impl SnapshotHandler { }) } - async fn publish_start( - &self, - ) -> Result<()> { + async fn publish_start(&self) -> Result<()> { self.context .message_bus - .publish(&self.snapshot_topic, Arc::new(Message::Snapshot(acropolis_common::messages::SnapshotMessage::Startup()))) + .publish( + &self.snapshot_topic, + Arc::new(Message::Snapshot( + acropolis_common::messages::SnapshotMessage::Startup(), + )), + ) .await .map_err(|e| anyhow::anyhow!("Failed to publish completion: {}", e)) } @@ -263,7 +266,10 @@ impl SnapshotBootstrapper { let parser = StreamingSnapshotParser::new(file_path); let mut callbacks = SnapshotHandler::new(context.clone(), completion_topic.to_string()); - info!("Starting snapshot parsing and publishing from: {}", file_path); + info!( + "Starting snapshot parsing and publishing from: {}", + file_path + ); let start = Instant::now(); callbacks.publish_start().await?; @@ -272,7 +278,10 @@ impl SnapshotBootstrapper { parser.parse(&mut callbacks)?; let duration = start.elapsed(); - info!("✓ Parse and publish completed successfully in {:.2?}", duration); + info!( + "✓ Parse and publish completed successfully in {:.2?}", + duration + ); // Build the final state from accumulated data let block_info = callbacks.build_block_info()?; From 121d85331f1e99a087f28899042973eb170f9454 Mon Sep 17 00:00:00 2001 From: buddhisthead Date: Mon, 3 Nov 2025 07:47:57 -0800 Subject: [PATCH 06/12] Apply spelling/word changes suggested by co-pilot review --- common/src/messages.rs | 2 +- modules/snapshot_bootstrapper/src/snapshot_bootstrapper.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/common/src/messages.rs b/common/src/messages.rs index 03b47e47..a6bf190b 100644 --- a/common/src/messages.rs +++ b/common/src/messages.rs @@ -311,7 +311,7 @@ pub enum CardanoMessage { #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub enum SnapshotMessage { - Startup(), // subscirbers should listen for incremental snapshot data + Startup(), // subscribers should listen for incremental snapshot data Bootstrap(SnapshotStateMessage), DumpRequest(SnapshotDumpMessage), Dump(SnapshotStateMessage), diff --git a/modules/snapshot_bootstrapper/src/snapshot_bootstrapper.rs b/modules/snapshot_bootstrapper/src/snapshot_bootstrapper.rs index 0fc87913..0b569c2d 100644 --- a/modules/snapshot_bootstrapper/src/snapshot_bootstrapper.rs +++ b/modules/snapshot_bootstrapper/src/snapshot_bootstrapper.rs @@ -105,7 +105,7 @@ impl SnapshotHandler { )), ) .await - .map_err(|e| anyhow::anyhow!("Failed to publish completion: {}", e)) + .map_err(|e| anyhow::anyhow!("Failed to publish start message: {}", e)) } async fn publish_completion( From 9bb2a4bfbb12f9b52f7467faa91baf54b44f22d7 Mon Sep 17 00:00:00 2001 From: buddhisthead Date: Mon, 3 Nov 2025 07:57:49 -0800 Subject: [PATCH 07/12] Apply clippy feedback --- modules/snapshot_bootstrapper/src/snapshot_bootstrapper.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modules/snapshot_bootstrapper/src/snapshot_bootstrapper.rs b/modules/snapshot_bootstrapper/src/snapshot_bootstrapper.rs index 0b569c2d..48ed8569 100644 --- a/modules/snapshot_bootstrapper/src/snapshot_bootstrapper.rs +++ b/modules/snapshot_bootstrapper/src/snapshot_bootstrapper.rs @@ -133,7 +133,7 @@ impl UtxoCallback for SnapshotHandler { self.utxo_count += 1; // Log progress every million UTXOs - if self.utxo_count % 1_000_000 == 0 { + if self.utxo_count.is_multiple_of(1_000_000) { info!("Processed {} UTXOs", self.utxo_count); } // TODO: Accumulate UTXO data if needed or send in chunks to UTXOState processor From 9cd3e9984250ee9e8f69b29cf30a19f770b7e9dd Mon Sep 17 00:00:00 2001 From: buddhisthead Date: Mon, 3 Nov 2025 12:01:02 -0800 Subject: [PATCH 08/12] Clippy feedback is good. --- common/src/messages.rs | 2 +- modules/snapshot_bootstrapper/src/snapshot_bootstrapper.rs | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/common/src/messages.rs b/common/src/messages.rs index a6bf190b..6ae7ff7f 100644 --- a/common/src/messages.rs +++ b/common/src/messages.rs @@ -311,7 +311,7 @@ pub enum CardanoMessage { #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub enum SnapshotMessage { - Startup(), // subscribers should listen for incremental snapshot data + Startup, // subscribers should listen for incremental snapshot data Bootstrap(SnapshotStateMessage), DumpRequest(SnapshotDumpMessage), Dump(SnapshotStateMessage), diff --git a/modules/snapshot_bootstrapper/src/snapshot_bootstrapper.rs b/modules/snapshot_bootstrapper/src/snapshot_bootstrapper.rs index 48ed8569..51eac778 100644 --- a/modules/snapshot_bootstrapper/src/snapshot_bootstrapper.rs +++ b/modules/snapshot_bootstrapper/src/snapshot_bootstrapper.rs @@ -101,11 +101,11 @@ impl SnapshotHandler { .publish( &self.snapshot_topic, Arc::new(Message::Snapshot( - acropolis_common::messages::SnapshotMessage::Startup(), + acropolis_common::messages::SnapshotMessage::Startup, )), ) .await - .map_err(|e| anyhow::anyhow!("Failed to publish start message: {}", e)) + .map_err(|e| anyhow::anyhow!("Failed to publish start message: {e}")) } async fn publish_completion( @@ -124,7 +124,7 @@ impl SnapshotHandler { .message_bus .publish(&self.snapshot_topic, Arc::new(message)) .await - .map_err(|e| anyhow::anyhow!("Failed to publish completion: {}", e)) + .map_err(|e| anyhow::anyhow!("Failed to publish completion: {e}")) } } From 011d40674fe8ae004c44f36939615b056645e4f2 Mon Sep 17 00:00:00 2001 From: buddhisthead Date: Wed, 5 Nov 2025 07:55:10 -0800 Subject: [PATCH 09/12] Use anyhow::Context for better error messages per PR feedback --- .../src/snapshot_bootstrapper.rs | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/modules/snapshot_bootstrapper/src/snapshot_bootstrapper.rs b/modules/snapshot_bootstrapper/src/snapshot_bootstrapper.rs index 51eac778..5f6f0815 100644 --- a/modules/snapshot_bootstrapper/src/snapshot_bootstrapper.rs +++ b/modules/snapshot_bootstrapper/src/snapshot_bootstrapper.rs @@ -13,7 +13,7 @@ use acropolis_common::{ stake_addresses::AccountState, BlockHash, BlockInfo, BlockStatus, Era, }; -use anyhow::Result; +use anyhow::{Result}; use caryatid_sdk::{module, Context, Module}; use config::Config; use tokio::time::Instant; @@ -96,7 +96,7 @@ impl SnapshotHandler { } async fn publish_start(&self) -> Result<()> { - self.context + anyhow::Context::context(self.context .message_bus .publish( &self.snapshot_topic, @@ -104,8 +104,7 @@ impl SnapshotHandler { acropolis_common::messages::SnapshotMessage::Startup, )), ) - .await - .map_err(|e| anyhow::anyhow!("Failed to publish start message: {e}")) + .await, "Failed to publish start message") } async fn publish_completion( @@ -120,11 +119,10 @@ impl SnapshotHandler { }), )); - self.context + anyhow::Context::context(self.context .message_bus .publish(&self.snapshot_topic, Arc::new(message)) - .await - .map_err(|e| anyhow::anyhow!("Failed to publish completion: {e}")) + .await, "Failed to publish completion") } } From fe4a9718454ef75154804c4f901fd3a4d535b5e7 Mon Sep 17 00:00:00 2001 From: buddhisthead Date: Wed, 5 Nov 2025 11:23:16 -0800 Subject: [PATCH 10/12] Oh yeah, run the formatter --- .../src/snapshot_bootstrapper.rs | 31 ++++++++++--------- 1 file changed, 17 insertions(+), 14 deletions(-) diff --git a/modules/snapshot_bootstrapper/src/snapshot_bootstrapper.rs b/modules/snapshot_bootstrapper/src/snapshot_bootstrapper.rs index 5f6f0815..534085da 100644 --- a/modules/snapshot_bootstrapper/src/snapshot_bootstrapper.rs +++ b/modules/snapshot_bootstrapper/src/snapshot_bootstrapper.rs @@ -13,7 +13,7 @@ use acropolis_common::{ stake_addresses::AccountState, BlockHash, BlockInfo, BlockStatus, Era, }; -use anyhow::{Result}; +use anyhow::Result; use caryatid_sdk::{module, Context, Module}; use config::Config; use tokio::time::Instant; @@ -96,15 +96,18 @@ impl SnapshotHandler { } async fn publish_start(&self) -> Result<()> { - anyhow::Context::context(self.context - .message_bus - .publish( - &self.snapshot_topic, - Arc::new(Message::Snapshot( - acropolis_common::messages::SnapshotMessage::Startup, - )), - ) - .await, "Failed to publish start message") + anyhow::Context::context( + self.context + .message_bus + .publish( + &self.snapshot_topic, + Arc::new(Message::Snapshot( + acropolis_common::messages::SnapshotMessage::Startup, + )), + ) + .await, + "Failed to publish start message", + ) } async fn publish_completion( @@ -119,10 +122,10 @@ impl SnapshotHandler { }), )); - anyhow::Context::context(self.context - .message_bus - .publish(&self.snapshot_topic, Arc::new(message)) - .await, "Failed to publish completion") + anyhow::Context::context( + self.context.message_bus.publish(&self.snapshot_topic, Arc::new(message)).await, + "Failed to publish completion", + ) } } From 5aa10b72862c5476e5deb6b688054c043bc73b2b Mon Sep 17 00:00:00 2001 From: buddhisthead Date: Wed, 5 Nov 2025 12:30:29 -0800 Subject: [PATCH 11/12] Add documentation of the boostrapping process --- common/src/snapshot/NOTES.md | 80 ++++++++++++++++++++++++++++++++++++ 1 file changed, 80 insertions(+) create mode 100644 common/src/snapshot/NOTES.md diff --git a/common/src/snapshot/NOTES.md b/common/src/snapshot/NOTES.md new file mode 100644 index 00000000..f313300b --- /dev/null +++ b/common/src/snapshot/NOTES.md @@ -0,0 +1,80 @@ +# Bootstrapping from a Snapshot file +We can boot an Acropolis node either from geneis and replay all of the blocks up to +some point, or we can boot from a snapshot file. This module provides the components +needed to boot from a snapshot file. See [snapshot_bootsrapper](../../../modules/snapshot_bootstrapper/src/snapshot_bootstrapper.rs) for the process that references and runs with these helpers. + +Booting from a snapshot takes minutes instead of the hours it takes to boot from +genesis. It also allows booting from a given epoch which allows one to create tests +that rely only on that epoch of data. We're also skipping some of the problematic +eras and will typically boot from Conway around epoch 305, 306, and 307. It takes +three epochs to have enough context to correctly calculate the rewards. + +The required data for boostrapping are: +- snapshot files (each has an associated epoch number and point) +- nonces +- headers + +## Snapshot Files +The snapshots come from the Amaru project. In their words, +"the snapshots we generated are different [from a Mithril snapshot]: they're +the actual ledger state; i.e. the in-memory state that is constructed by iterating over each block up to a specific point. So, it's all the UTxOs, the set of pending governance actions, the account balance, etc. +If you get this from a trusted source, you don't need to do any replay, you can just start up and load this from disk. +The format of these is completely non-standard; we just forked the haskell node and spit out whatever we needed to in CBOR." + +Snapshot files are referenced by their epoch number in the config.json file below. + +See [Amaru snapshot format](../../../docs/amaru-snapshot-structure.md) + +## Configuration files +There is a path for each network bootstrap configuration file. Network Should +be one of 'mainnet', 'preprod', 'preview' or 'testnet_' where +`magic` is a 32-bits unsigned value denoting a particular testnet. + +The bootstrapper will be given a path to a directory that is expected to contain +the following files: snapshots.json, nonces.json, and headers.json. The path will +be used as a prefix to resolve per-network configuration files +needed for bootstrapping. Given a source directory `data`, and a +a network name of `preview`, the expected layout for configuration files would be: + +* `data/preview/config.json`: a list of epochs to load. +* `data/preview/snapshots.json`: a list of `Snapshot` values (epoch, point, url) +* `data/preview/nonces.json`: a list of `InitialNonces` values, +* `data/preview/headers.json`: a list of `Point`s. + +These files are loaded by [snapshot_bootsrapper](../../../modules/snapshot_bootstrapper/src/snapshot_bootstrapper.rs) during bootup. + +## Bootstrapping sequence + +The bootstrapper will be started with an argument that specifies a network, +e.g. "mainnet". From the network, it will build a path to the configuration +and snapshot files as shown above, then load the data contained or described +in those files. config.json holds a list of typically 3 epochs that can be +used to index into snapshots.json to find the corresponding URLs and meta-data +for each of the three snapshot files. Loading occurs in this order: + +* publish `SnapshotMessage::Startup` +* download the snapshots (on demand; may have already been done externally) +* parse each snapshot and publish their data on the message bus +* read nonces and publish +* read headers and publish +* publish `CardanoMessage::GenesisComplete(GenesisCompleteMessage {...})` + +Modules in the system will have subscribed to the Startup message and also +to individual structural data update messages before the +boostrapper runs the above sequence. Upon receiving the `Startup` message, +they will use data messages to populate their state, history (for BlockFrost), +and any other state required to achieve readiness to operate on reception of +the `GenesisCompleteMessage`. + +## Data update messages + +The bootstrapper will publish data as it parses the snapshot files, nonces, and +headers. Snapshot parsing is done while streaming the data to keep the memory +footprint lower. As elements of the file are parsed, callbacks provide the data +to the boostrapper which publishes the data on the message bus. + +There are TODO markers in [snapshot_bootsrapper](../../../modules/snapshot_bootstrapper/src/snapshot_bootstrapper.rs) that show where to add the +publishing of the parsed snapshot data. + + + From 0b170bbd88185cad78f9aed0991a4117250efed9 Mon Sep 17 00:00:00 2001 From: buddhisthead Date: Thu, 6 Nov 2025 11:23:24 -0800 Subject: [PATCH 12/12] Update TODOs and docs re: reading a config file and multiple snapshot files --- common/src/snapshot/NOTES.md | 2 ++ modules/snapshot_bootstrapper/src/snapshot_bootstrapper.rs | 7 +++++++ 2 files changed, 9 insertions(+) diff --git a/common/src/snapshot/NOTES.md b/common/src/snapshot/NOTES.md index f313300b..549a20a7 100644 --- a/common/src/snapshot/NOTES.md +++ b/common/src/snapshot/NOTES.md @@ -30,6 +30,8 @@ There is a path for each network bootstrap configuration file. Network Should be one of 'mainnet', 'preprod', 'preview' or 'testnet_' where `magic` is a 32-bits unsigned value denoting a particular testnet. +Data structure, e.g. as [Amaru mainnet](https://github.com/pragma-org/amaru/tree/main/data/mainnet) + The bootstrapper will be given a path to a directory that is expected to contain the following files: snapshots.json, nonces.json, and headers.json. The path will be used as a prefix to resolve per-network configuration files diff --git a/modules/snapshot_bootstrapper/src/snapshot_bootstrapper.rs b/modules/snapshot_bootstrapper/src/snapshot_bootstrapper.rs index 534085da..8da7b006 100644 --- a/modules/snapshot_bootstrapper/src/snapshot_bootstrapper.rs +++ b/modules/snapshot_bootstrapper/src/snapshot_bootstrapper.rs @@ -221,6 +221,7 @@ impl SnapshotCallbacks for SnapshotHandler { impl SnapshotBootstrapper { pub async fn init(&self, context: Arc>, config: Arc) -> Result<()> { + // TODO: read a config file path, not the snapshot-path; implement TODOs below. let file_path = config .get_string("snapshot-path") .inspect_err(|e| error!("failed to find snapshot-path config: {e}"))?; @@ -244,6 +245,12 @@ impl SnapshotBootstrapper { }; info!("Received startup message"); + // TODO: + // Read config file per docs in NOTES.md + // read nonces + // read headers + // read and process ALL of the snapshot files, not just one. + let span = info_span!("snapshot_bootstrapper.handle"); async { if let Err(e) =