diff --git a/common/src/messages.rs b/common/src/messages.rs index 25b24fea..6ae7ff7f 100644 --- a/common/src/messages.rs +++ b/common/src/messages.rs @@ -311,6 +311,7 @@ pub enum CardanoMessage { #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub enum SnapshotMessage { + Startup, // subscribers should listen for incremental snapshot data Bootstrap(SnapshotStateMessage), DumpRequest(SnapshotDumpMessage), Dump(SnapshotStateMessage), diff --git a/common/src/snapshot/NOTES.md b/common/src/snapshot/NOTES.md new file mode 100644 index 00000000..549a20a7 --- /dev/null +++ b/common/src/snapshot/NOTES.md @@ -0,0 +1,82 @@ +# Bootstrapping from a Snapshot file +We can boot an Acropolis node either from geneis and replay all of the blocks up to +some point, or we can boot from a snapshot file. This module provides the components +needed to boot from a snapshot file. See [snapshot_bootsrapper](../../../modules/snapshot_bootstrapper/src/snapshot_bootstrapper.rs) for the process that references and runs with these helpers. + +Booting from a snapshot takes minutes instead of the hours it takes to boot from +genesis. It also allows booting from a given epoch which allows one to create tests +that rely only on that epoch of data. We're also skipping some of the problematic +eras and will typically boot from Conway around epoch 305, 306, and 307. It takes +three epochs to have enough context to correctly calculate the rewards. + +The required data for boostrapping are: +- snapshot files (each has an associated epoch number and point) +- nonces +- headers + +## Snapshot Files +The snapshots come from the Amaru project. In their words, +"the snapshots we generated are different [from a Mithril snapshot]: they're +the actual ledger state; i.e. the in-memory state that is constructed by iterating over each block up to a specific point. So, it's all the UTxOs, the set of pending governance actions, the account balance, etc. +If you get this from a trusted source, you don't need to do any replay, you can just start up and load this from disk. +The format of these is completely non-standard; we just forked the haskell node and spit out whatever we needed to in CBOR." + +Snapshot files are referenced by their epoch number in the config.json file below. + +See [Amaru snapshot format](../../../docs/amaru-snapshot-structure.md) + +## Configuration files +There is a path for each network bootstrap configuration file. Network Should +be one of 'mainnet', 'preprod', 'preview' or 'testnet_' where +`magic` is a 32-bits unsigned value denoting a particular testnet. + +Data structure, e.g. as [Amaru mainnet](https://github.com/pragma-org/amaru/tree/main/data/mainnet) + +The bootstrapper will be given a path to a directory that is expected to contain +the following files: snapshots.json, nonces.json, and headers.json. The path will +be used as a prefix to resolve per-network configuration files +needed for bootstrapping. Given a source directory `data`, and a +a network name of `preview`, the expected layout for configuration files would be: + +* `data/preview/config.json`: a list of epochs to load. +* `data/preview/snapshots.json`: a list of `Snapshot` values (epoch, point, url) +* `data/preview/nonces.json`: a list of `InitialNonces` values, +* `data/preview/headers.json`: a list of `Point`s. + +These files are loaded by [snapshot_bootsrapper](../../../modules/snapshot_bootstrapper/src/snapshot_bootstrapper.rs) during bootup. + +## Bootstrapping sequence + +The bootstrapper will be started with an argument that specifies a network, +e.g. "mainnet". From the network, it will build a path to the configuration +and snapshot files as shown above, then load the data contained or described +in those files. config.json holds a list of typically 3 epochs that can be +used to index into snapshots.json to find the corresponding URLs and meta-data +for each of the three snapshot files. Loading occurs in this order: + +* publish `SnapshotMessage::Startup` +* download the snapshots (on demand; may have already been done externally) +* parse each snapshot and publish their data on the message bus +* read nonces and publish +* read headers and publish +* publish `CardanoMessage::GenesisComplete(GenesisCompleteMessage {...})` + +Modules in the system will have subscribed to the Startup message and also +to individual structural data update messages before the +boostrapper runs the above sequence. Upon receiving the `Startup` message, +they will use data messages to populate their state, history (for BlockFrost), +and any other state required to achieve readiness to operate on reception of +the `GenesisCompleteMessage`. + +## Data update messages + +The bootstrapper will publish data as it parses the snapshot files, nonces, and +headers. Snapshot parsing is done while streaming the data to keep the memory +footprint lower. As elements of the file are parsed, callbacks provide the data +to the boostrapper which publishes the data on the message bus. + +There are TODO markers in [snapshot_bootsrapper](../../../modules/snapshot_bootstrapper/src/snapshot_bootstrapper.rs) that show where to add the +publishing of the parsed snapshot data. + + + diff --git a/modules/snapshot_bootstrapper/src/snapshot_bootstrapper.rs b/modules/snapshot_bootstrapper/src/snapshot_bootstrapper.rs index b9e61b20..8da7b006 100644 --- a/modules/snapshot_bootstrapper/src/snapshot_bootstrapper.rs +++ b/modules/snapshot_bootstrapper/src/snapshot_bootstrapper.rs @@ -1,16 +1,41 @@ use std::sync::Arc; use acropolis_common::{ - ledger_state::LedgerState, - messages::{Message, SnapshotMessage, SnapshotStateMessage}, + genesis_values::GenesisValues, + messages::{CardanoMessage, GenesisCompleteMessage, Message}, + snapshot::{ + streaming_snapshot::{ + DRepCallback, DRepInfo, GovernanceProposal, PoolCallback, PoolInfo, ProposalCallback, + SnapshotCallbacks, SnapshotMetadata, StakeCallback, UtxoCallback, UtxoEntry, + }, + StreamingSnapshotParser, + }, + stake_addresses::AccountState, + BlockHash, BlockInfo, BlockStatus, Era, }; -use anyhow::{Context as AnyhowContext, Result}; +use anyhow::Result; use caryatid_sdk::{module, Context, Module}; use config::Config; +use tokio::time::Instant; use tracing::{error, info, info_span, Instrument}; const DEFAULT_SNAPSHOT_TOPIC: &str = "cardano.snapshot"; const DEFAULT_STARTUP_TOPIC: &str = "cardano.sequence.start"; +const DEFAULT_COMPLETION_TOPIC: &str = "cardano.sequence.bootstrapped"; + +/// Callback handler that accumulates snapshot data and builds state +struct SnapshotHandler { + context: Arc>, + snapshot_topic: String, + + // Accumulated data from callbacks + metadata: Option, + utxo_count: u64, + pools: Vec, + accounts: Vec, + dreps: Vec, + proposals: Vec, +} #[module( message_type(Message), @@ -19,15 +44,188 @@ const DEFAULT_STARTUP_TOPIC: &str = "cardano.sequence.start"; )] pub struct SnapshotBootstrapper; +impl SnapshotHandler { + fn new(context: Arc>, snapshot_topic: String) -> Self { + Self { + context, + snapshot_topic, + metadata: None, + utxo_count: 0, + pools: Vec::new(), + accounts: Vec::new(), + dreps: Vec::new(), + proposals: Vec::new(), + } + } + + /// Build BlockInfo from accumulated metadata + fn build_block_info(&self) -> Result { + let metadata = + self.metadata.as_ref().ok_or_else(|| anyhow::anyhow!("No metadata available"))?; + + // Create a synthetic BlockInfo representing the snapshot state + // This represents the last block included in the snapshot + Ok(BlockInfo { + status: BlockStatus::Immutable, // Snapshot blocks are immutable + slot: 0, // TODO: Extract from snapshot metadata if available + number: 0, // TODO: Extract from snapshot metadata if available + hash: BlockHash::default(), // TODO: Extract from snapshot metadata if available + epoch: metadata.epoch, + epoch_slot: 0, // TODO: Extract from snapshot metadata if available + new_epoch: false, // Not necessarily a new epoch + timestamp: 0, // TODO: Extract from snapshot metadata if available + era: Era::Conway, // TODO: Determine from snapshot or config + }) + } + + /// Build GenesisValues from snapshot data + fn build_genesis_values(&self) -> Result { + // TODO: These values should ideally come from the snapshot or configuration + // For now, using defaults for Conway era + Ok(GenesisValues { + byron_timestamp: 1506203091, // Byron mainnet genesis timestamp + shelley_epoch: 208, // Shelley started at epoch 208 on mainnet + shelley_epoch_len: 432000, // 5 days in seconds + shelley_genesis_hash: [ + // Shelley mainnet genesis hash (placeholder - should be from config) + 0x1a, 0x3d, 0x98, 0x7a, 0x95, 0xad, 0xd2, 0x3e, 0x4f, 0x4d, 0x2d, 0x78, 0x74, 0x9f, + 0x96, 0x65, 0xd4, 0x1e, 0x48, 0x3e, 0xf2, 0xa2, 0x22, 0x9c, 0x4b, 0x0b, 0xf3, 0x9f, + 0xad, 0x7d, 0x5e, 0x27, + ], + }) + } + + async fn publish_start(&self) -> Result<()> { + anyhow::Context::context( + self.context + .message_bus + .publish( + &self.snapshot_topic, + Arc::new(Message::Snapshot( + acropolis_common::messages::SnapshotMessage::Startup, + )), + ) + .await, + "Failed to publish start message", + ) + } + + async fn publish_completion( + &self, + block_info: BlockInfo, + genesis_values: GenesisValues, + ) -> Result<()> { + let message = Message::Cardano(( + block_info, + CardanoMessage::GenesisComplete(GenesisCompleteMessage { + values: genesis_values, + }), + )); + + anyhow::Context::context( + self.context.message_bus.publish(&self.snapshot_topic, Arc::new(message)).await, + "Failed to publish completion", + ) + } +} + +impl UtxoCallback for SnapshotHandler { + fn on_utxo(&mut self, _utxo: UtxoEntry) -> Result<()> { + self.utxo_count += 1; + + // Log progress every million UTXOs + if self.utxo_count.is_multiple_of(1_000_000) { + info!("Processed {} UTXOs", self.utxo_count); + } + // TODO: Accumulate UTXO data if needed or send in chunks to UTXOState processor + Ok(()) + } +} + +impl PoolCallback for SnapshotHandler { + fn on_pools(&mut self, pools: Vec) -> Result<()> { + info!("Received {} pools", pools.len()); + self.pools.extend(pools); + // TODO: Publish pool data. + Ok(()) + } +} + +impl StakeCallback for SnapshotHandler { + fn on_accounts(&mut self, accounts: Vec) -> Result<()> { + info!("Received {} accounts", accounts.len()); + self.accounts.extend(accounts); + // TODO: Publish account data. + Ok(()) + } +} + +impl DRepCallback for SnapshotHandler { + fn on_dreps(&mut self, dreps: Vec) -> Result<()> { + info!("Received {} DReps", dreps.len()); + self.dreps.extend(dreps); + // TODO: Publish DRep data. + + Ok(()) + } +} + +impl ProposalCallback for SnapshotHandler { + fn on_proposals(&mut self, proposals: Vec) -> Result<()> { + info!("Received {} proposals", proposals.len()); + self.proposals.extend(proposals); + // TODO: Publish proposal data. + Ok(()) + } +} + +impl SnapshotCallbacks for SnapshotHandler { + fn on_metadata(&mut self, metadata: SnapshotMetadata) -> Result<()> { + info!("Received snapshot metadata for epoch {}", metadata.epoch); + info!(" - UTXOs: {:?}", metadata.utxo_count); + info!( + " - Pot balances: treasury={}, reserves={}, deposits={}", + metadata.pot_balances.treasury, + metadata.pot_balances.reserves, + metadata.pot_balances.deposits + ); + info!( + " - Previous epoch blocks: {}", + metadata.blocks_previous_epoch.len() + ); + info!( + " - Current epoch blocks: {}", + metadata.blocks_current_epoch.len() + ); + + self.metadata = Some(metadata); + Ok(()) + } + + fn on_complete(&mut self) -> Result<()> { + info!("Snapshot parsing completed"); + info!("Final statistics:"); + info!(" - UTXOs processed: {}", self.utxo_count); + info!(" - Pools: {}", self.pools.len()); + info!(" - Accounts: {}", self.accounts.len()); + info!(" - DReps: {}", self.dreps.len()); + info!(" - Proposals: {}", self.proposals.len()); + + // We could send a Resolver reference from here for large data, i.e. the UTXO set, + // which could be a file reference. For a file reference, we'd extend the parser to + // give us a callback value with the offset into the file; and we'd make the streaming + // UTXO parser public and reusable, adding it to the resolver implementation. + Ok(()) + } +} + impl SnapshotBootstrapper { pub async fn init(&self, context: Arc>, config: Arc) -> Result<()> { + // TODO: read a config file path, not the snapshot-path; implement TODOs below. let file_path = config .get_string("snapshot-path") .inspect_err(|e| error!("failed to find snapshot-path config: {e}"))?; - let ledger_state = - LedgerState::from_directory(file_path).context("failed to load ledger state")?; - let startup_topic = config.get_string("startup-topic").unwrap_or(DEFAULT_STARTUP_TOPIC.to_string()); @@ -35,23 +233,31 @@ impl SnapshotBootstrapper { config.get_string("snapshot-topic").unwrap_or(DEFAULT_SNAPSHOT_TOPIC.to_string()); info!("Publishing snapshots on '{snapshot_topic}'"); + let completion_topic = + config.get_string("completion-topic").unwrap_or(DEFAULT_COMPLETION_TOPIC.to_string()); + info!("Completing with '{completion_topic}'"); + let mut subscription = context.subscribe(&startup_topic).await?; + context.clone().run(async move { let Ok(_) = subscription.read().await else { return; }; info!("Received startup message"); + // TODO: + // Read config file per docs in NOTES.md + // read nonces + // read headers + // read and process ALL of the snapshot files, not just one. + let span = info_span!("snapshot_bootstrapper.handle"); async { - let spo_state_message = Message::Snapshot(SnapshotMessage::Bootstrap( - SnapshotStateMessage::SPOState(ledger_state.spo_state), - )); - context - .message_bus - .publish(&snapshot_topic, Arc::new(spo_state_message)) - .await - .unwrap_or_else(|e| error!("failed to publish: {e}")); + if let Err(e) = + Self::process_snapshot(&file_path, context.clone(), &completion_topic).await + { + error!("Failed to process snapshot: {}", e); + } } .instrument(span) .await; @@ -59,4 +265,40 @@ impl SnapshotBootstrapper { Ok(()) } + + async fn process_snapshot( + file_path: &str, + context: Arc>, + completion_topic: &str, + ) -> Result<()> { + let parser = StreamingSnapshotParser::new(file_path); + let mut callbacks = SnapshotHandler::new(context.clone(), completion_topic.to_string()); + + info!( + "Starting snapshot parsing and publishing from: {}", + file_path + ); + let start = Instant::now(); + + callbacks.publish_start().await?; + + // Parse the snapshot with our callback handler + parser.parse(&mut callbacks)?; + + let duration = start.elapsed(); + info!( + "✓ Parse and publish completed successfully in {:.2?}", + duration + ); + + // Build the final state from accumulated data + let block_info = callbacks.build_block_info()?; + let genesis_values = callbacks.build_genesis_values()?; + + // Publish completion message to trigger next phase (e.g., Mithril) + callbacks.publish_completion(block_info, genesis_values).await?; + + info!("Snapshot bootstrap completed successfully"); + Ok(()) + } }