From 7927b20bb039743b145c50a6a29fbb0c2e63361b Mon Sep 17 00:00:00 2001 From: buddhisthead Date: Wed, 22 Oct 2025 12:08:40 -0700 Subject: [PATCH 1/5] Initial update of the snapshot parser which just lays out the scafolding for processing a snapshot file. --- common/src/messages.rs | 8 +- .../src/snapshot_bootstrapper.rs | 233 ++++++++++++++++-- 2 files changed, 226 insertions(+), 15 deletions(-) diff --git a/common/src/messages.rs b/common/src/messages.rs index 23dc362c..06daba19 100644 --- a/common/src/messages.rs +++ b/common/src/messages.rs @@ -60,6 +60,12 @@ pub struct RawTxsMessage { pub txs: Vec>, } +/// Bootup completion message, sent by any module that has finished its startup sequence, +/// e.g. GenesisBootstrapper after sending all genesis UTxOs or SnapshotBootstrapper after +/// loading a snapshot. +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct BootupCompleteMessage {} + /// Genesis completion message #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub struct GenesisCompleteMessage { @@ -283,7 +289,7 @@ pub struct SPOStateMessage { pub enum CardanoMessage { BlockHeader(BlockHeaderMessage), // Block header available BlockBody(BlockBodyMessage), // Block body available - SnapshotComplete, // Mithril snapshot loaded + SnapshotComplete, // Mithril or Node (file) snapshot loaded ReceivedTxs(RawTxsMessage), // Transaction available GenesisComplete(GenesisCompleteMessage), // Genesis UTXOs done + genesis params GenesisUTxOs(GenesisUTxOsMessage), // Genesis UTxOs with their UTxOIdentifiers diff --git a/modules/snapshot_bootstrapper/src/snapshot_bootstrapper.rs b/modules/snapshot_bootstrapper/src/snapshot_bootstrapper.rs index b9e61b20..937b4a2d 100644 --- a/modules/snapshot_bootstrapper/src/snapshot_bootstrapper.rs +++ b/modules/snapshot_bootstrapper/src/snapshot_bootstrapper.rs @@ -1,16 +1,43 @@ use std::sync::Arc; use acropolis_common::{ - ledger_state::LedgerState, - messages::{Message, SnapshotMessage, SnapshotStateMessage}, + BlockInfo, Era, BlockStatus, BlockHash, genesis_values::GenesisValues, + snapshot::{ + streaming_snapshot::{ + SnapshotCallbacks, UtxoCallback, PoolCallback, StakeCallback, + DRepCallback, ProposalCallback, UtxoEntry, PoolInfo, DRepInfo, + GovernanceProposal, SnapshotMetadata + }, + StreamingSnapshotParser + }, + messages::{ + CardanoMessage, GenesisCompleteMessage, Message + }, + stake_addresses::AccountState, }; -use anyhow::{Context as AnyhowContext, Result}; +use anyhow::Result; use caryatid_sdk::{module, Context, Module}; use config::Config; +use tokio::time::Instant; use tracing::{error, info, info_span, Instrument}; const DEFAULT_SNAPSHOT_TOPIC: &str = "cardano.snapshot"; const DEFAULT_STARTUP_TOPIC: &str = "cardano.sequence.start"; +const DEFAULT_COMPLETION_TOPIC: &str = "cardano.sequence.bootstrapped"; + +/// Callback handler that accumulates snapshot data and builds state +struct SnapshotHandler { + context: Arc>, + snapshot_topic: String, + + // Accumulated data from callbacks + metadata: Option, + utxo_count: u64, + pools: Vec, + accounts: Vec, + dreps: Vec, + proposals: Vec, +} #[module( message_type(Message), @@ -19,15 +46,161 @@ const DEFAULT_STARTUP_TOPIC: &str = "cardano.sequence.start"; )] pub struct SnapshotBootstrapper; +impl SnapshotHandler { + fn new(context: Arc>, snapshot_topic: String) -> Self { + Self { + context, + snapshot_topic, + metadata: None, + utxo_count: 0, + pools: Vec::new(), + accounts: Vec::new(), + dreps: Vec::new(), + proposals: Vec::new(), + } + } + + /// Build BlockInfo from accumulated metadata + fn build_block_info(&self) -> Result { + let metadata = self.metadata.as_ref() + .ok_or_else(|| anyhow::anyhow!("No metadata available"))?; + + // Create a synthetic BlockInfo representing the snapshot state + // This represents the last block included in the snapshot + Ok(BlockInfo { + status: BlockStatus::Immutable, // Snapshot blocks are immutable + slot: 0, // TODO: Extract from snapshot metadata if available + number: 0, // TODO: Extract from snapshot metadata if available + hash: BlockHash::default(), // TODO: Extract from snapshot metadata if available + epoch: metadata.epoch, + epoch_slot: 0, // TODO: Extract from snapshot metadata if available + new_epoch: false, // Not necessarily a new epoch + timestamp: 0, // TODO: Extract from snapshot metadata if available + era: Era::Conway, // TODO: Determine from snapshot or config + }) + } + + /// Build GenesisValues from snapshot data + fn build_genesis_values(&self) -> Result { + // TODO: These values should ideally come from the snapshot or configuration + // For now, using defaults for Conway era + Ok(GenesisValues { + byron_timestamp: 1506203091, // Byron mainnet genesis timestamp + shelley_epoch: 208, // Shelley started at epoch 208 on mainnet + shelley_epoch_len: 432000, // 5 days in seconds + shelley_genesis_hash: [ + // Shelley mainnet genesis hash (placeholder - should be from config) + 0x1a, 0x3d, 0x98, 0x7a, 0x95, 0xad, 0xd2, 0x3e, + 0x4f, 0x4d, 0x2d, 0x78, 0x74, 0x9f, 0x96, 0x65, + 0xd4, 0x1e, 0x48, 0x3e, 0xf2, 0xa2, 0x22, 0x9c, + 0x4b, 0x0b, 0xf3, 0x9f, 0xad, 0x7d, 0x5e, 0x27, + ], + }) + } + + async fn publish_completion(&self, block_info: BlockInfo, genesis_values: GenesisValues) -> Result<()> { + let message = Message::Cardano(( + block_info, + CardanoMessage::GenesisComplete(GenesisCompleteMessage { + values: genesis_values + }), + )); + + self.context + .message_bus + .publish(&self.snapshot_topic, Arc::new(message)) + .await + .map_err(|e| anyhow::anyhow!("Failed to publish completion: {}", e)) + } +} + +impl UtxoCallback for SnapshotHandler { + fn on_utxo(&mut self, _utxo: UtxoEntry) -> Result<()> { + self.utxo_count += 1; + + // Log progress every million UTXOs + if self.utxo_count % 1_000_000 == 0 { + info!("Processed {} UTXOs", self.utxo_count); + } + // TODO: Accumulate UTXO data if needed or send in chunks to UTXOState processor + Ok(()) + } +} + +impl PoolCallback for SnapshotHandler { + fn on_pools(&mut self, pools: Vec) -> Result<()> { + info!("Received {} pools", pools.len()); + self.pools = pools; + // TODO: Publish pool data. + Ok(()) + } +} + +impl StakeCallback for SnapshotHandler { + fn on_accounts(&mut self, accounts: Vec) -> Result<()> { + info!("Received {} accounts", accounts.len()); + self.accounts = accounts; + // TODO: Publish account data. + Ok(()) + } +} + +impl DRepCallback for SnapshotHandler { + fn on_dreps(&mut self, dreps: Vec) -> Result<()> { + info!("Received {} DReps", dreps.len()); + self.dreps = dreps; + // TODO: Publish DRep data. + Ok(()) + } +} + +impl ProposalCallback for SnapshotHandler { + fn on_proposals(&mut self, proposals: Vec) -> Result<()> { + info!("Received {} proposals", proposals.len()); + self.proposals = proposals; + // TODO: Publish proposal data. + Ok(()) + } +} + +impl SnapshotCallbacks for SnapshotHandler { + fn on_metadata(&mut self, metadata: SnapshotMetadata) -> Result<()> { + info!("Received snapshot metadata for epoch {}", metadata.epoch); + info!(" - UTXOs: {:?}", metadata.utxo_count); + info!(" - Pot balances: treasury={}, reserves={}, deposits={}", + metadata.pot_balances.treasury, + metadata.pot_balances.reserves, + metadata.pot_balances.deposits); + info!(" - Previous epoch blocks: {}", metadata.blocks_previous_epoch.len()); + info!(" - Current epoch blocks: {}", metadata.blocks_current_epoch.len()); + + self.metadata = Some(metadata); + Ok(()) + } + + fn on_complete(&mut self) -> Result<()> { + info!("Snapshot parsing completed"); + info!("Final statistics:"); + info!(" - UTXOs processed: {}", self.utxo_count); + info!(" - Pools: {}", self.pools.len()); + info!(" - Accounts: {}", self.accounts.len()); + info!(" - DReps: {}", self.dreps.len()); + info!(" - Proposals: {}", self.proposals.len()); + + // We could send a Resolver reference from here for large data, i.e. the UTXO set, + // which could be a file reference. For a file reference, we'd extend the parser to + // give us a callback value with the offset into the file; and we'd make the streaming + // UTXO parser public and reusable, adding it to the resolver implementation. + Ok(()) + } +} + impl SnapshotBootstrapper { pub async fn init(&self, context: Arc>, config: Arc) -> Result<()> { let file_path = config .get_string("snapshot-path") .inspect_err(|e| error!("failed to find snapshot-path config: {e}"))?; - let ledger_state = - LedgerState::from_directory(file_path).context("failed to load ledger state")?; - let startup_topic = config.get_string("startup-topic").unwrap_or(DEFAULT_STARTUP_TOPIC.to_string()); @@ -35,7 +208,12 @@ impl SnapshotBootstrapper { config.get_string("snapshot-topic").unwrap_or(DEFAULT_SNAPSHOT_TOPIC.to_string()); info!("Publishing snapshots on '{snapshot_topic}'"); + let completion_topic = + config.get_string("completion-topic").unwrap_or(DEFAULT_COMPLETION_TOPIC.to_string()); + info!("Completing with '{completion_topic}'"); + let mut subscription = context.subscribe(&startup_topic).await?; + context.clone().run(async move { let Ok(_) = subscription.read().await else { return; @@ -44,14 +222,13 @@ impl SnapshotBootstrapper { let span = info_span!("snapshot_bootstrapper.handle"); async { - let spo_state_message = Message::Snapshot(SnapshotMessage::Bootstrap( - SnapshotStateMessage::SPOState(ledger_state.spo_state), - )); - context - .message_bus - .publish(&snapshot_topic, Arc::new(spo_state_message)) - .await - .unwrap_or_else(|e| error!("failed to publish: {e}")); + if let Err(e) = Self::process_snapshot( + &file_path, + context.clone(), + &completion_topic, + ).await { + error!("Failed to process snapshot: {}", e); + } } .instrument(span) .await; @@ -59,4 +236,32 @@ impl SnapshotBootstrapper { Ok(()) } + + async fn process_snapshot( + file_path: &str, + context: Arc>, + completion_topic: &str, + ) -> Result<()> { + let parser = StreamingSnapshotParser::new(file_path); + let mut callbacks = SnapshotHandler::new(context.clone(), completion_topic.to_string()); + + info!("Starting snapshot parsing from: {}", file_path); + let start = Instant::now(); + + // Parse the snapshot with our callback handler + parser.parse(&mut callbacks)?; + + let duration = start.elapsed(); + info!("✓ Parse completed successfully in {:.2?}", duration); + + // Build the final state from accumulated data + let block_info = callbacks.build_block_info()?; + let genesis_values = callbacks.build_genesis_values()?; + + // Publish completion message to trigger next phase (e.g., Mithril) + callbacks.publish_completion(block_info, genesis_values).await?; + + info!("Snapshot bootstrap completed successfully"); + Ok(()) + } } From bdc16eb1e11d78260d9c1756eff86bf193ef2313 Mon Sep 17 00:00:00 2001 From: buddhisthead Date: Wed, 22 Oct 2025 12:15:54 -0700 Subject: [PATCH 2/5] Remove a message type that was just for protoyping --- common/src/messages.rs | 6 ------ 1 file changed, 6 deletions(-) diff --git a/common/src/messages.rs b/common/src/messages.rs index 06daba19..c2b45437 100644 --- a/common/src/messages.rs +++ b/common/src/messages.rs @@ -60,12 +60,6 @@ pub struct RawTxsMessage { pub txs: Vec>, } -/// Bootup completion message, sent by any module that has finished its startup sequence, -/// e.g. GenesisBootstrapper after sending all genesis UTxOs or SnapshotBootstrapper after -/// loading a snapshot. -#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] -pub struct BootupCompleteMessage {} - /// Genesis completion message #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub struct GenesisCompleteMessage { From a8e77b47cb77f640eaf8a47c51e7bd0d59011413 Mon Sep 17 00:00:00 2001 From: buddhisthead Date: Wed, 22 Oct 2025 12:19:38 -0700 Subject: [PATCH 3/5] Format. --- .../src/snapshot_bootstrapper.rs | 103 ++++++++++-------- 1 file changed, 55 insertions(+), 48 deletions(-) diff --git a/modules/snapshot_bootstrapper/src/snapshot_bootstrapper.rs b/modules/snapshot_bootstrapper/src/snapshot_bootstrapper.rs index 937b4a2d..a31e7cfc 100644 --- a/modules/snapshot_bootstrapper/src/snapshot_bootstrapper.rs +++ b/modules/snapshot_bootstrapper/src/snapshot_bootstrapper.rs @@ -1,19 +1,17 @@ use std::sync::Arc; use acropolis_common::{ - BlockInfo, Era, BlockStatus, BlockHash, genesis_values::GenesisValues, + genesis_values::GenesisValues, + messages::{CardanoMessage, GenesisCompleteMessage, Message}, snapshot::{ streaming_snapshot::{ - SnapshotCallbacks, UtxoCallback, PoolCallback, StakeCallback, - DRepCallback, ProposalCallback, UtxoEntry, PoolInfo, DRepInfo, - GovernanceProposal, SnapshotMetadata + DRepCallback, DRepInfo, GovernanceProposal, PoolCallback, PoolInfo, ProposalCallback, + SnapshotCallbacks, SnapshotMetadata, StakeCallback, UtxoCallback, UtxoEntry, }, - StreamingSnapshotParser + StreamingSnapshotParser, }, - messages::{ - CardanoMessage, GenesisCompleteMessage, Message - }, stake_addresses::AccountState, + BlockHash, BlockInfo, BlockStatus, Era, }; use anyhow::Result; use caryatid_sdk::{module, Context, Module}; @@ -29,7 +27,7 @@ const DEFAULT_COMPLETION_TOPIC: &str = "cardano.sequence.bootstrapped"; struct SnapshotHandler { context: Arc>, snapshot_topic: String, - + // Accumulated data from callbacks metadata: Option, utxo_count: u64, @@ -59,53 +57,56 @@ impl SnapshotHandler { proposals: Vec::new(), } } - + /// Build BlockInfo from accumulated metadata fn build_block_info(&self) -> Result { - let metadata = self.metadata.as_ref() - .ok_or_else(|| anyhow::anyhow!("No metadata available"))?; - + let metadata = + self.metadata.as_ref().ok_or_else(|| anyhow::anyhow!("No metadata available"))?; + // Create a synthetic BlockInfo representing the snapshot state // This represents the last block included in the snapshot Ok(BlockInfo { status: BlockStatus::Immutable, // Snapshot blocks are immutable - slot: 0, // TODO: Extract from snapshot metadata if available - number: 0, // TODO: Extract from snapshot metadata if available - hash: BlockHash::default(), // TODO: Extract from snapshot metadata if available + slot: 0, // TODO: Extract from snapshot metadata if available + number: 0, // TODO: Extract from snapshot metadata if available + hash: BlockHash::default(), // TODO: Extract from snapshot metadata if available epoch: metadata.epoch, - epoch_slot: 0, // TODO: Extract from snapshot metadata if available + epoch_slot: 0, // TODO: Extract from snapshot metadata if available new_epoch: false, // Not necessarily a new epoch - timestamp: 0, // TODO: Extract from snapshot metadata if available + timestamp: 0, // TODO: Extract from snapshot metadata if available era: Era::Conway, // TODO: Determine from snapshot or config }) } - + /// Build GenesisValues from snapshot data fn build_genesis_values(&self) -> Result { // TODO: These values should ideally come from the snapshot or configuration // For now, using defaults for Conway era Ok(GenesisValues { byron_timestamp: 1506203091, // Byron mainnet genesis timestamp - shelley_epoch: 208, // Shelley started at epoch 208 on mainnet - shelley_epoch_len: 432000, // 5 days in seconds + shelley_epoch: 208, // Shelley started at epoch 208 on mainnet + shelley_epoch_len: 432000, // 5 days in seconds shelley_genesis_hash: [ // Shelley mainnet genesis hash (placeholder - should be from config) - 0x1a, 0x3d, 0x98, 0x7a, 0x95, 0xad, 0xd2, 0x3e, - 0x4f, 0x4d, 0x2d, 0x78, 0x74, 0x9f, 0x96, 0x65, - 0xd4, 0x1e, 0x48, 0x3e, 0xf2, 0xa2, 0x22, 0x9c, - 0x4b, 0x0b, 0xf3, 0x9f, 0xad, 0x7d, 0x5e, 0x27, + 0x1a, 0x3d, 0x98, 0x7a, 0x95, 0xad, 0xd2, 0x3e, 0x4f, 0x4d, 0x2d, 0x78, 0x74, 0x9f, + 0x96, 0x65, 0xd4, 0x1e, 0x48, 0x3e, 0xf2, 0xa2, 0x22, 0x9c, 0x4b, 0x0b, 0xf3, 0x9f, + 0xad, 0x7d, 0x5e, 0x27, ], }) } - - async fn publish_completion(&self, block_info: BlockInfo, genesis_values: GenesisValues) -> Result<()> { + + async fn publish_completion( + &self, + block_info: BlockInfo, + genesis_values: GenesisValues, + ) -> Result<()> { let message = Message::Cardano(( block_info, - CardanoMessage::GenesisComplete(GenesisCompleteMessage { - values: genesis_values + CardanoMessage::GenesisComplete(GenesisCompleteMessage { + values: genesis_values, }), )); - + self.context .message_bus .publish(&self.snapshot_topic, Arc::new(message)) @@ -117,7 +118,7 @@ impl SnapshotHandler { impl UtxoCallback for SnapshotHandler { fn on_utxo(&mut self, _utxo: UtxoEntry) -> Result<()> { self.utxo_count += 1; - + // Log progress every million UTXOs if self.utxo_count % 1_000_000 == 0 { info!("Processed {} UTXOs", self.utxo_count); @@ -167,17 +168,25 @@ impl SnapshotCallbacks for SnapshotHandler { fn on_metadata(&mut self, metadata: SnapshotMetadata) -> Result<()> { info!("Received snapshot metadata for epoch {}", metadata.epoch); info!(" - UTXOs: {:?}", metadata.utxo_count); - info!(" - Pot balances: treasury={}, reserves={}, deposits={}", - metadata.pot_balances.treasury, - metadata.pot_balances.reserves, - metadata.pot_balances.deposits); - info!(" - Previous epoch blocks: {}", metadata.blocks_previous_epoch.len()); - info!(" - Current epoch blocks: {}", metadata.blocks_current_epoch.len()); - + info!( + " - Pot balances: treasury={}, reserves={}, deposits={}", + metadata.pot_balances.treasury, + metadata.pot_balances.reserves, + metadata.pot_balances.deposits + ); + info!( + " - Previous epoch blocks: {}", + metadata.blocks_previous_epoch.len() + ); + info!( + " - Current epoch blocks: {}", + metadata.blocks_current_epoch.len() + ); + self.metadata = Some(metadata); Ok(()) } - + fn on_complete(&mut self) -> Result<()> { info!("Snapshot parsing completed"); info!("Final statistics:"); @@ -213,7 +222,7 @@ impl SnapshotBootstrapper { info!("Completing with '{completion_topic}'"); let mut subscription = context.subscribe(&startup_topic).await?; - + context.clone().run(async move { let Ok(_) = subscription.read().await else { return; @@ -222,11 +231,9 @@ impl SnapshotBootstrapper { let span = info_span!("snapshot_bootstrapper.handle"); async { - if let Err(e) = Self::process_snapshot( - &file_path, - context.clone(), - &completion_topic, - ).await { + if let Err(e) = + Self::process_snapshot(&file_path, context.clone(), &completion_topic).await + { error!("Failed to process snapshot: {}", e); } } @@ -236,7 +243,7 @@ impl SnapshotBootstrapper { Ok(()) } - + async fn process_snapshot( file_path: &str, context: Arc>, @@ -250,7 +257,7 @@ impl SnapshotBootstrapper { // Parse the snapshot with our callback handler parser.parse(&mut callbacks)?; - + let duration = start.elapsed(); info!("✓ Parse completed successfully in {:.2?}", duration); @@ -260,7 +267,7 @@ impl SnapshotBootstrapper { // Publish completion message to trigger next phase (e.g., Mithril) callbacks.publish_completion(block_info, genesis_values).await?; - + info!("Snapshot bootstrap completed successfully"); Ok(()) } From 5d3685439218a95f9ada941e3d66f92430371753 Mon Sep 17 00:00:00 2001 From: buddhisthead Date: Wed, 22 Oct 2025 13:16:34 -0700 Subject: [PATCH 4/5] Apply some co-pilot feedback that seemed nice --- .../snapshot_bootstrapper/src/snapshot_bootstrapper.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/modules/snapshot_bootstrapper/src/snapshot_bootstrapper.rs b/modules/snapshot_bootstrapper/src/snapshot_bootstrapper.rs index a31e7cfc..4eb7500e 100644 --- a/modules/snapshot_bootstrapper/src/snapshot_bootstrapper.rs +++ b/modules/snapshot_bootstrapper/src/snapshot_bootstrapper.rs @@ -131,7 +131,7 @@ impl UtxoCallback for SnapshotHandler { impl PoolCallback for SnapshotHandler { fn on_pools(&mut self, pools: Vec) -> Result<()> { info!("Received {} pools", pools.len()); - self.pools = pools; + self.pools.extend(pools); // TODO: Publish pool data. Ok(()) } @@ -140,7 +140,7 @@ impl PoolCallback for SnapshotHandler { impl StakeCallback for SnapshotHandler { fn on_accounts(&mut self, accounts: Vec) -> Result<()> { info!("Received {} accounts", accounts.len()); - self.accounts = accounts; + self.accounts.extend(accounts); // TODO: Publish account data. Ok(()) } @@ -149,7 +149,7 @@ impl StakeCallback for SnapshotHandler { impl DRepCallback for SnapshotHandler { fn on_dreps(&mut self, dreps: Vec) -> Result<()> { info!("Received {} DReps", dreps.len()); - self.dreps = dreps; + self.dreps.extend(dreps); // TODO: Publish DRep data. Ok(()) } @@ -158,7 +158,7 @@ impl DRepCallback for SnapshotHandler { impl ProposalCallback for SnapshotHandler { fn on_proposals(&mut self, proposals: Vec) -> Result<()> { info!("Received {} proposals", proposals.len()); - self.proposals = proposals; + self.proposals.extend(proposals); // TODO: Publish proposal data. Ok(()) } From 0d124e3c68230c6174baeaf6ee652d5cd6d6221b Mon Sep 17 00:00:00 2001 From: buddhisthead Date: Fri, 24 Oct 2025 10:40:47 -0700 Subject: [PATCH 5/5] Add publish Start message for snapshot parsing/publishing for synchronization with consumer modules --- common/src/messages.rs | 1 + .../src/snapshot_bootstrapper.rs | 17 +++++++++++++++-- 2 files changed, 16 insertions(+), 2 deletions(-) diff --git a/common/src/messages.rs b/common/src/messages.rs index c2b45437..fd37e68b 100644 --- a/common/src/messages.rs +++ b/common/src/messages.rs @@ -313,6 +313,7 @@ pub enum CardanoMessage { #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub enum SnapshotMessage { + Startup(), // subscirbers should listen for incremental snapshot data Bootstrap(SnapshotStateMessage), DumpRequest(SnapshotDumpMessage), Dump(SnapshotStateMessage), diff --git a/modules/snapshot_bootstrapper/src/snapshot_bootstrapper.rs b/modules/snapshot_bootstrapper/src/snapshot_bootstrapper.rs index 4eb7500e..eaa1e052 100644 --- a/modules/snapshot_bootstrapper/src/snapshot_bootstrapper.rs +++ b/modules/snapshot_bootstrapper/src/snapshot_bootstrapper.rs @@ -95,6 +95,16 @@ impl SnapshotHandler { }) } + async fn publish_start( + &self, + ) -> Result<()> { + self.context + .message_bus + .publish(&self.snapshot_topic, Arc::new(Message::Snapshot(acropolis_common::messages::SnapshotMessage::Startup()))) + .await + .map_err(|e| anyhow::anyhow!("Failed to publish completion: {}", e)) + } + async fn publish_completion( &self, block_info: BlockInfo, @@ -151,6 +161,7 @@ impl DRepCallback for SnapshotHandler { info!("Received {} DReps", dreps.len()); self.dreps.extend(dreps); // TODO: Publish DRep data. + Ok(()) } } @@ -252,14 +263,16 @@ impl SnapshotBootstrapper { let parser = StreamingSnapshotParser::new(file_path); let mut callbacks = SnapshotHandler::new(context.clone(), completion_topic.to_string()); - info!("Starting snapshot parsing from: {}", file_path); + info!("Starting snapshot parsing and publishing from: {}", file_path); let start = Instant::now(); + callbacks.publish_start().await?; + // Parse the snapshot with our callback handler parser.parse(&mut callbacks)?; let duration = start.elapsed(); - info!("✓ Parse completed successfully in {:.2?}", duration); + info!("✓ Parse and publish completed successfully in {:.2?}", duration); // Build the final state from accumulated data let block_info = callbacks.build_block_info()?;