From 7ccc2c636620d8f4ad0fac119605356e7e8db373 Mon Sep 17 00:00:00 2001 From: Simon Gellis Date: Thu, 23 Oct 2025 19:07:07 -0400 Subject: [PATCH 01/10] feat: start working on tx submission --- .../workflows/run-tests-on-push-to-main.yml | 1 + Cargo.lock | 14 ++ Cargo.toml | 1 + modules/tx_submitter/Cargo.toml | 22 ++ modules/tx_submitter/src/peer.rs | 200 ++++++++++++++++++ modules/tx_submitter/src/tx.rs | 21 ++ modules/tx_submitter/src/tx_submitter.rs | 54 +++++ 7 files changed, 313 insertions(+) create mode 100644 modules/tx_submitter/Cargo.toml create mode 100644 modules/tx_submitter/src/peer.rs create mode 100644 modules/tx_submitter/src/tx.rs create mode 100644 modules/tx_submitter/src/tx_submitter.rs diff --git a/.github/workflows/run-tests-on-push-to-main.yml b/.github/workflows/run-tests-on-push-to-main.yml index 742b5784..18fcab92 100644 --- a/.github/workflows/run-tests-on-push-to-main.yml +++ b/.github/workflows/run-tests-on-push-to-main.yml @@ -40,6 +40,7 @@ jobs: --package acropolis_module_snapshot_bootstrapper \ --package acropolis_module_spdd_state \ --package acropolis_module_stake_delta_filter \ + --package acropolis_module_tx_submitter \ --package acropolis_module_upstream_chain_fetcher \ --package acropolis_module_utxo_state diff --git a/Cargo.lock b/Cargo.lock index bd584c53..9b170e2c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -397,6 +397,20 @@ dependencies = [ "tracing", ] +[[package]] +name = "acropolis_module_tx_submitter" +version = "0.1.0" +dependencies = [ + "acropolis_common", + "anyhow", + "caryatid_sdk", + "config", + "hex", + "pallas 0.33.0", + "tokio", + "tracing", +] + [[package]] name = "acropolis_module_tx_unpacker" version = "0.2.1" diff --git a/Cargo.toml b/Cargo.toml index 9efddb72..ba210f65 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -25,6 +25,7 @@ members = [ "modules/historical_accounts_state", # Tracks historical account information "modules/consensus", # Chooses favoured chain across multiple options "modules/chain_store", # Tracks historical information about blocks and TXs + "modules/tx_submitter", # Submits TXs to peers # Process builds "processes/omnibus", # All-inclusive omnibus process diff --git a/modules/tx_submitter/Cargo.toml b/modules/tx_submitter/Cargo.toml new file mode 100644 index 00000000..76d94b16 --- /dev/null +++ b/modules/tx_submitter/Cargo.toml @@ -0,0 +1,22 @@ +[package] +name = "acropolis_module_tx_submitter" +version = "0.1.0" +edition = "2024" +authors = ["Simon Gellis "] +description = "TX submission module for Acropolis" +license = "Apache-2.0" + +[dependencies] +acropolis_common = { path = "../../common" } + +caryatid_sdk = { workspace = true } + +anyhow = { workspace = true } +config = { workspace = true } +hex = { workspace = true } +pallas = { workspace = true } +tokio = { workspace = true } +tracing = { workspace = true } + +[lib] +path = "src/tx_submitter.rs" \ No newline at end of file diff --git a/modules/tx_submitter/src/peer.rs b/modules/tx_submitter/src/peer.rs new file mode 100644 index 00000000..18372dfd --- /dev/null +++ b/modules/tx_submitter/src/peer.rs @@ -0,0 +1,200 @@ +use std::{collections::VecDeque, sync::Arc, time::Duration}; + +use anyhow::{Context, Result, bail}; +use config::Config; +use pallas::network::{facades::PeerClient, miniprotocols::txsubmission}; +use tokio::{select, sync::mpsc}; +use tracing::{debug, error, warn}; + +use crate::{SubmitterConfig, tx::Transaction}; + +pub struct PeerConfig { + address: String, +} +impl PeerConfig { + pub fn parse(config: &Config) -> Result { + let address = + config.get("node-address").unwrap_or("backbone.cardano.iog.io:3001").to_string(); + Ok(Self { address }) + } +} + +pub struct PeerConnection { + tx_sink: mpsc::UnboundedSender>, +} +impl PeerConnection { + pub fn open(submitter: &SubmitterConfig, peer: PeerConfig) -> Self { + let (tx_sink, tx_source) = mpsc::unbounded_channel(); + let worker = PeerWorker { + tx_source, + tx_queue: TxQueue::new(), + address: peer.address, + magic: submitter.magic, + }; + tokio::task::spawn(worker.run()); + Self { tx_sink } + } + + pub fn queue(&self, tx: Arc) -> Result<()> { + self.tx_sink.send(tx).context("could not queue tx") + } +} + +struct PeerWorker { + tx_source: mpsc::UnboundedReceiver>, + tx_queue: TxQueue, + address: String, + magic: u64, +} +impl PeerWorker { + async fn run(mut self) { + while !self.tx_source.is_closed() { + if let Err(error) = self.run_connection().await { + error!("error connecting to {}: {:#}", self.address, error); + debug!("reconnecting in 5 seconds"); + tokio::time::sleep(Duration::from_secs(5)).await; + } + } + } + + async fn run_connection(&mut self) -> Result<()> { + let mut client = + PeerClient::connect(&self.address, self.magic).await.context("could not connect")?; + let submission = client.txsubmission(); + submission.send_init().await.context("failed to init")?; + let mut pending_tx_requests = None; + self.tx_queue.requeue_sent(); + loop { + select! { + new_tx = self.tx_source.recv() => { + let Some(tx) = new_tx else { + // parent process must have disconnected + break; + }; + self.tx_queue.push(tx); + if let Some(req) = pending_tx_requests.take() { + let ids = self.tx_queue.req(req); + submission.reply_tx_ids(ids).await.context("could not send tx ids")?; + self.tx_queue.mark_requested(req); + } + } + request = submission.next_request(), if pending_tx_requests.is_none() => { + let req = request.context("could not receive request")?; + pending_tx_requests = self.handle_request(submission, req).await?; + } + } + } + if !matches!(submission.state(), txsubmission::State::Idle) { + submission.send_done().await?; + } + Ok(()) + } + + async fn handle_request( + &mut self, + submission: &mut txsubmission::GenericClient< + txsubmission::EraTxId, + txsubmission::EraTxBody, + >, + req: txsubmission::Request, + ) -> Result> { + match req { + txsubmission::Request::TxIds(ack, req) => { + self.tx_queue.ack(ack)?; + + let ids = self.tx_queue.req(req); + if ids.is_empty() { + Ok(Some(req)) + } else { + submission.reply_tx_ids(ids).await.context("could not send tx ids")?; + self.tx_queue.mark_requested(req); + Ok(None) + } + } + txsubmission::Request::TxIdsNonBlocking(ack, req) => { + self.tx_queue.ack(ack)?; + + let ids = self.tx_queue.req(req); + submission.reply_tx_ids(ids).await.context("could not send tx ids")?; + self.tx_queue.mark_requested(req); + Ok(None) + } + txsubmission::Request::Txs(ids) => { + let mut txs = vec![]; + for id in ids { + match self.tx_queue.tx_body(&id) { + Some(body) => { + debug!("Sending TX {}", hex::encode(id.1)); + txs.push(body); + } + None => { + warn!("Server requested unrecognized TX {}", hex::encode(id.1)); + } + } + } + submission.reply_txs(txs).await.context("could not send tx bodies")?; + Ok(None) + } + } + } +} + +#[derive(Default)] +struct TxQueue { + unsent: VecDeque>, + sent: VecDeque>, +} +impl TxQueue { + pub fn new() -> Self { + Self::default() + } + + pub fn push(&mut self, tx: Arc) { + self.unsent.push_back(tx); + } + + pub fn ack(&mut self, count: u16) -> Result<()> { + for _ in 0..count { + match self.sent.pop_front() { + Some(tx) => { + debug!("TX {} has been acknowledged", hex::encode(&tx.id)) + } + None => bail!("Server acked a TX which we never sent"), + } + } + Ok(()) + } + + pub fn req(&self, count: u16) -> Vec> { + self.unsent + .iter() + .take(count as usize) + .map(|tx| { + txsubmission::TxIdAndSize( + txsubmission::EraTxId(tx.era, tx.id.clone()), + tx.body.len() as u32, + ) + }) + .collect() + } + + pub fn mark_requested(&mut self, count: u16) { + for _ in 0..count { + let tx = self.unsent.pop_front().expect("logic error"); + self.sent.push_back(tx); + } + } + + pub fn tx_body(&self, id: &txsubmission::EraTxId) -> Option { + self.sent + .iter() + .find(|tx| tx.id == id.1) + .map(|tx| txsubmission::EraTxBody(tx.era, tx.body.clone())) + } + + pub fn requeue_sent(&mut self) { + while let Some(tx) = self.sent.pop_back() { + self.unsent.push_front(tx); + } + } +} diff --git a/modules/tx_submitter/src/tx.rs b/modules/tx_submitter/src/tx.rs new file mode 100644 index 00000000..67b1e3bb --- /dev/null +++ b/modules/tx_submitter/src/tx.rs @@ -0,0 +1,21 @@ +use anyhow::Result; +use pallas::ledger::traverse::MultiEraTx; + +pub struct Transaction { + pub id: Vec, + pub body: Vec, + pub era: u16, +} + +impl Transaction { + pub fn from_bytes(bytes: &[u8]) -> Result { + let parsed = MultiEraTx::decode(bytes)?; + let id = parsed.hash().to_vec(); + let era = parsed.era().into(); + Ok(Self { + id, + body: bytes.to_vec(), + era, + }) + } +} diff --git a/modules/tx_submitter/src/tx_submitter.rs b/modules/tx_submitter/src/tx_submitter.rs new file mode 100644 index 00000000..b4d987fb --- /dev/null +++ b/modules/tx_submitter/src/tx_submitter.rs @@ -0,0 +1,54 @@ +mod peer; +mod tx; + +use std::sync::Arc; + +use acropolis_common::messages::Message; +use anyhow::Result; +use caryatid_sdk::Context; +use config::Config; +use peer::PeerConfig; +use tracing::error; + +use crate::{peer::PeerConnection, tx::Transaction}; + +pub struct TxSubmitter; + +impl TxSubmitter { + async fn run_tx_submission( + config: Arc, + peer_config: PeerConfig, + ) -> Result<()> { + let mut peers = vec![PeerConnection::open(&config, peer_config)]; + loop { + let tx_bytes = Self::get_tx().await; + let tx = Arc::new(Transaction::from_bytes(&tx_bytes)?); + peers.retain(|peer| peer.queue(tx.clone()).is_ok()); + } + } + + async fn get_tx() -> Vec { + todo!() + } + + pub async fn init(&self, context: Arc>, config: Arc) -> Result<()> { + let submitter = Arc::new(SubmitterConfig::parse(&config)?); + let peer = PeerConfig::parse(&config)?; + context.run(async move { + Self::run_tx_submission(submitter, peer) + .await + .unwrap_or_else(|e| error!("TX submission failed: {e}")); + }); + Ok(()) + } +} + +struct SubmitterConfig { + magic: u64, +} +impl SubmitterConfig { + pub fn parse(config: &Config) -> Result { + let magic = config.get("magic-number").unwrap_or(764824073); + Ok(Self { magic }) + } +} From cb8b6adc713b0791d874862ddfae74088e4f2d29 Mon Sep 17 00:00:00 2001 From: Simon Gellis Date: Fri, 24 Oct 2025 10:57:23 -0400 Subject: [PATCH 02/10] feat: handle tx submission requests --- common/src/commands/mod.rs | 1 + common/src/commands/transactions.rs | 18 +++++++ common/src/lib.rs | 1 + common/src/messages.rs | 15 ++++++ modules/tx_submitter/src/peer.rs | 4 +- modules/tx_submitter/src/tx.rs | 5 +- modules/tx_submitter/src/tx_submitter.rs | 69 +++++++++++++++--------- 7 files changed, 85 insertions(+), 28 deletions(-) create mode 100644 common/src/commands/mod.rs create mode 100644 common/src/commands/transactions.rs diff --git a/common/src/commands/mod.rs b/common/src/commands/mod.rs new file mode 100644 index 00000000..0824d7a9 --- /dev/null +++ b/common/src/commands/mod.rs @@ -0,0 +1 @@ +pub mod transactions; diff --git a/common/src/commands/transactions.rs b/common/src/commands/transactions.rs new file mode 100644 index 00000000..d63f1fbe --- /dev/null +++ b/common/src/commands/transactions.rs @@ -0,0 +1,18 @@ +use serde_with::{hex::Hex, serde_as}; + +use crate::TxHash; + +#[serde_as] +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub enum TransactionsCommand { + Submit { + #[serde_as(as = "Hex")] + cbor: Vec, + }, +} + +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub enum TransactionsCommandResponse { + Submitted { id: TxHash }, + Error(String), +} diff --git a/common/src/lib.rs b/common/src/lib.rs index abf55551..a9cf24c3 100644 --- a/common/src/lib.rs +++ b/common/src/lib.rs @@ -4,6 +4,7 @@ pub mod address; pub mod byte_array; pub mod calculations; pub mod cip19; +pub mod commands; pub mod crypto; pub mod genesis_values; pub mod hash; diff --git a/common/src/messages.rs b/common/src/messages.rs index 2cd325a6..8ed9856e 100644 --- a/common/src/messages.rs +++ b/common/src/messages.rs @@ -3,6 +3,7 @@ // We don't use these messages in the acropolis_common crate itself #![allow(dead_code)] +use crate::commands::transactions::{TransactionsCommand, TransactionsCommandResponse}; use crate::genesis_values::GenesisValues; use crate::ledger_state::SPOState; use crate::protocol_params::{NonceHash, ProtocolParams}; @@ -350,6 +351,10 @@ pub enum Message { // State query messages StateQuery(StateQuery), StateQueryResponse(StateQueryResponse), + + // Commands + Command(Command), + CommandResponse(CommandResponse), } // Casts from specific Caryatid messages @@ -422,3 +427,13 @@ pub enum StateQueryResponse { UTxOs(UTxOStateQueryResponse), SPDD(SPDDStateQueryResponse), } + +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub enum Command { + Transactions(TransactionsCommand), +} + +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub enum CommandResponse { + Transactions(TransactionsCommandResponse), +} diff --git a/modules/tx_submitter/src/peer.rs b/modules/tx_submitter/src/peer.rs index 18372dfd..a13fe5c0 100644 --- a/modules/tx_submitter/src/peer.rs +++ b/modules/tx_submitter/src/peer.rs @@ -171,7 +171,7 @@ impl TxQueue { .take(count as usize) .map(|tx| { txsubmission::TxIdAndSize( - txsubmission::EraTxId(tx.era, tx.id.clone()), + txsubmission::EraTxId(tx.era, tx.id.to_vec()), tx.body.len() as u32, ) }) @@ -188,7 +188,7 @@ impl TxQueue { pub fn tx_body(&self, id: &txsubmission::EraTxId) -> Option { self.sent .iter() - .find(|tx| tx.id == id.1) + .find(|tx| *tx.id == *id.1) .map(|tx| txsubmission::EraTxBody(tx.era, tx.body.clone())) } diff --git a/modules/tx_submitter/src/tx.rs b/modules/tx_submitter/src/tx.rs index 67b1e3bb..0569204a 100644 --- a/modules/tx_submitter/src/tx.rs +++ b/modules/tx_submitter/src/tx.rs @@ -1,8 +1,9 @@ +use acropolis_common::TxHash; use anyhow::Result; use pallas::ledger::traverse::MultiEraTx; pub struct Transaction { - pub id: Vec, + pub id: TxHash, pub body: Vec, pub era: u16, } @@ -10,7 +11,7 @@ pub struct Transaction { impl Transaction { pub fn from_bytes(bytes: &[u8]) -> Result { let parsed = MultiEraTx::decode(bytes)?; - let id = parsed.hash().to_vec(); + let id = TxHash::from(*parsed.hash()); let era = parsed.era().into(); Ok(Self { id, diff --git a/modules/tx_submitter/src/tx_submitter.rs b/modules/tx_submitter/src/tx_submitter.rs index b4d987fb..58083cbf 100644 --- a/modules/tx_submitter/src/tx_submitter.rs +++ b/modules/tx_submitter/src/tx_submitter.rs @@ -3,52 +3,73 @@ mod tx; use std::sync::Arc; -use acropolis_common::messages::Message; -use anyhow::Result; +use acropolis_common::{ + commands::transactions::{TransactionsCommand, TransactionsCommandResponse}, + messages::{Command, CommandResponse, Message}, +}; +use anyhow::{Result, bail}; use caryatid_sdk::Context; use config::Config; use peer::PeerConfig; -use tracing::error; +use tokio::sync::RwLock; use crate::{peer::PeerConnection, tx::Transaction}; pub struct TxSubmitter; impl TxSubmitter { - async fn run_tx_submission( - config: Arc, - peer_config: PeerConfig, - ) -> Result<()> { - let mut peers = vec![PeerConnection::open(&config, peer_config)]; - loop { - let tx_bytes = Self::get_tx().await; - let tx = Arc::new(Transaction::from_bytes(&tx_bytes)?); - peers.retain(|peer| peer.queue(tx.clone()).is_ok()); - } - } - - async fn get_tx() -> Vec { - todo!() - } - pub async fn init(&self, context: Arc>, config: Arc) -> Result<()> { let submitter = Arc::new(SubmitterConfig::parse(&config)?); let peer = PeerConfig::parse(&config)?; - context.run(async move { - Self::run_tx_submission(submitter, peer) - .await - .unwrap_or_else(|e| error!("TX submission failed: {e}")); + let state = Arc::new(RwLock::new(SubmitterState { + peers: vec![PeerConnection::open(&submitter, peer)], + })); + context.handle(&submitter.subscribe_topic, move |message| { + let state = state.clone(); + async move { + let state = state.read().await; + let res = Self::handle_command(message, &state.peers) + .await + .unwrap_or_else(|e| TransactionsCommandResponse::Error(e.to_string())); + Arc::new(Message::CommandResponse(CommandResponse::Transactions(res))) + } }); Ok(()) } + + async fn handle_command( + message: Arc, + peers: &Vec, + ) -> Result { + let Message::Command(Command::Transactions(TransactionsCommand::Submit { cbor })) = + message.as_ref() + else { + bail!("unexpected tx request") + }; + let tx = Arc::new(Transaction::from_bytes(cbor)?); + for peer in peers { + peer.queue(tx.clone())?; + } + Ok(TransactionsCommandResponse::Submitted { id: tx.id }) + } } struct SubmitterConfig { + subscribe_topic: String, magic: u64, } impl SubmitterConfig { pub fn parse(config: &Config) -> Result { + let subscribe_topic = + config.get("subscribe-topic").unwrap_or("cardano.txs.submit").to_string(); let magic = config.get("magic-number").unwrap_or(764824073); - Ok(Self { magic }) + Ok(Self { + subscribe_topic, + magic, + }) } } + +struct SubmitterState { + peers: Vec, +} From 6c6529b1c4a0d1d69b3961c0110155562f1afda0 Mon Sep 17 00:00:00 2001 From: Simon Gellis Date: Fri, 24 Oct 2025 14:30:13 -0400 Subject: [PATCH 03/10] feat: add a CLI tool to submit transactions --- .../workflows/run-tests-on-push-to-main.yml | 1 + Cargo.lock | 18 +++ Cargo.toml | 3 +- modules/tx_submitter/cli/Cargo.toml | 27 +++++ modules/tx_submitter/cli/src/main.rs | 113 ++++++++++++++++++ modules/tx_submitter/cli/tx-submitter.toml | 17 +++ modules/tx_submitter/src/tx_submitter.rs | 9 +- 7 files changed, 185 insertions(+), 3 deletions(-) create mode 100644 modules/tx_submitter/cli/Cargo.toml create mode 100644 modules/tx_submitter/cli/src/main.rs create mode 100644 modules/tx_submitter/cli/tx-submitter.toml diff --git a/.github/workflows/run-tests-on-push-to-main.yml b/.github/workflows/run-tests-on-push-to-main.yml index 18fcab92..8e505f83 100644 --- a/.github/workflows/run-tests-on-push-to-main.yml +++ b/.github/workflows/run-tests-on-push-to-main.yml @@ -41,6 +41,7 @@ jobs: --package acropolis_module_spdd_state \ --package acropolis_module_stake_delta_filter \ --package acropolis_module_tx_submitter \ + --package acropolis_module_tx_submitter_cli \ --package acropolis_module_upstream_chain_fetcher \ --package acropolis_module_utxo_state diff --git a/Cargo.lock b/Cargo.lock index 9b170e2c..55dc6416 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -411,6 +411,24 @@ dependencies = [ "tracing", ] +[[package]] +name = "acropolis_module_tx_submitter_cli" +version = "0.1.0" +dependencies = [ + "acropolis_common", + "acropolis_module_tx_submitter", + "anyhow", + "caryatid_process", + "caryatid_sdk", + "clap", + "config", + "hex", + "pallas 0.33.0", + "tokio", + "tracing", + "tracing-subscriber", +] + [[package]] name = "acropolis_module_tx_unpacker" version = "0.2.1" diff --git a/Cargo.toml b/Cargo.toml index ba210f65..c1dd927e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,6 +26,7 @@ members = [ "modules/consensus", # Chooses favoured chain across multiple options "modules/chain_store", # Tracks historical information about blocks and TXs "modules/tx_submitter", # Submits TXs to peers + "modules/tx_submitter/cli", # CLI wrapper for TX submitter # Process builds "processes/omnibus", # All-inclusive omnibus process @@ -42,7 +43,7 @@ caryatid_module_clock = "0.12" caryatid_module_spy = "0.12" anyhow = "1.0" chrono = "0.4" -clap = { version = "4.5", features = ["derive"] } +clap = { version = "4.5", features = ["derive", "string"] } config = "0.15.11" dashmap = "6.1.0" hex = "0.4" diff --git a/modules/tx_submitter/cli/Cargo.toml b/modules/tx_submitter/cli/Cargo.toml new file mode 100644 index 00000000..f2277ee6 --- /dev/null +++ b/modules/tx_submitter/cli/Cargo.toml @@ -0,0 +1,27 @@ +[package] +name = "acropolis_module_tx_submitter_cli" +version = "0.1.0" +edition = "2024" +authors = ["Simon Gellis "] +description = "CLI tool to submit transactions" +license = "Apache-2.0" + +[dependencies] +acropolis_common = { path = "../../../common" } +acropolis_module_tx_submitter = { path = "../" } + +caryatid_sdk = { workspace = true } +caryatid_process = { workspace = true} + +anyhow = { workspace = true } +clap = { workspace = true } +config = { workspace = true } +hex = { workspace = true } +pallas = { workspace = true } +tokio = { workspace = true } +tracing = { workspace = true } +tracing-subscriber = { version = "0.3.20", features = ["registry", "env-filter"] } + +[[bin]] +name = "tx-submitter-cli" +path = "src/main.rs" \ No newline at end of file diff --git a/modules/tx_submitter/cli/src/main.rs b/modules/tx_submitter/cli/src/main.rs new file mode 100644 index 00000000..c2f11ba8 --- /dev/null +++ b/modules/tx_submitter/cli/src/main.rs @@ -0,0 +1,113 @@ +use std::{path::PathBuf, sync::Arc}; + +use acropolis_common::{ + commands::transactions::TransactionsCommand, + messages::{Command, Message}, +}; +use acropolis_module_tx_submitter::TxSubmitter; +use anyhow::{Result, bail}; +use caryatid_process::Process; +use caryatid_sdk::{Context, Module, module}; +use clap::Parser; +use config::{Config, File}; +use tokio::{fs, select, sync::mpsc}; +use tracing::info; +use tracing_subscriber::{ + EnvFilter, Layer as _, Registry, filter, fmt, layer::SubscriberExt as _, + util::SubscriberInitExt as _, +}; + +fn default_config_path() -> PathBuf { + PathBuf::from( + option_env!("ACROPOLIS_TX_SUBMITTER_DEFAULT_CONFIG").unwrap_or("tx-submitter.toml"), + ) +} + +#[derive(clap::Parser, Clone)] +struct Args { + #[arg(long, value_name = "PATH", default_value = default_config_path().into_os_string())] + config: PathBuf, + tx_file: PathBuf, +} + +#[derive(Clone)] +struct CliState { + args: Args, + done: mpsc::Sender>, +} +impl CliState { + pub fn run(self, ctx: Arc>, fut: F) + where + F: FnOnce(Args, Arc>) -> Fut + Send + 'static, + Fut: Future> + Send + 'static, + { + let args = self.args.clone(); + let c = ctx.clone(); + ctx.run(async move { + let result = fut(args, c).await; + let _ = self.done.send(result).await; + }); + } +} + +tokio::task_local!(static CLI: CliState); +async fn run_process(process: Process, args: Args) -> Result<()> { + let (tx, mut rx) = mpsc::channel(1); + let state = CliState { args, done: tx }; + select! { + res = CLI.scope(state, process.run()) => { + res?; + bail!("process terminated") + } + res = rx.recv() => { + match res { + Some(result) => { + info!("process completed"); + result + } + None => bail!("process terminated") + } + } + } +} + +#[tokio::main] +pub async fn main() -> Result<()> { + let args = Args::try_parse()?; + + // Standard logging using RUST_LOG for log levels default to INFO for events only + let fmt_layer = fmt::layer() + .with_filter(EnvFilter::from_default_env().add_directive(filter::LevelFilter::INFO.into())) + .with_filter(filter::filter_fn(|meta| meta.is_event())); + Registry::default().with(fmt_layer).init(); + + let config = Arc::new(Config::builder().add_source(File::from(args.config.as_path())).build()?); + let mut process = Process::::create(config).await; + + TxSubmitter::register(&mut process); + CliDriver::register(&mut process); + + run_process(process, args).await +} + +#[module( + message_type(Message), + name = "cli-driver", + description = "Module to interface with the CLI tool" +)] +struct CliDriver; +impl CliDriver { + pub async fn init(&self, context: Arc>, _config: Arc) -> Result<()> { + let state = CLI.get(); + state.run(context, move |args, context| async move { + let tx = fs::read(args.tx_file).await?; + let request = Arc::new(Message::Command(Command::Transactions( + TransactionsCommand::Submit { cbor: tx }, + ))); + let response = context.request("cli.tx.submit", request).await?; + info!("{response:?}"); + Ok(()) + }); + Ok(()) + } +} diff --git a/modules/tx_submitter/cli/tx-submitter.toml b/modules/tx_submitter/cli/tx-submitter.toml new file mode 100644 index 00000000..a5f5dccf --- /dev/null +++ b/modules/tx_submitter/cli/tx-submitter.toml @@ -0,0 +1,17 @@ +[module.tx-submitter] +subscribe-topic = "cli.tx.submit" + +[module.cli-driver] + +[message-bus.internal] +class = "in-memory" +workers = 50 +dispatch-queue-size = 1000 +worker-queue-size = 100 +bulk-block-capacity = 50 +bulk-resume-capacity = 75 + +# Message routing +[[message-router.route]] # Everything is internal only +pattern = "#" +bus = "internal" diff --git a/modules/tx_submitter/src/tx_submitter.rs b/modules/tx_submitter/src/tx_submitter.rs index 58083cbf..750008c5 100644 --- a/modules/tx_submitter/src/tx_submitter.rs +++ b/modules/tx_submitter/src/tx_submitter.rs @@ -8,13 +8,18 @@ use acropolis_common::{ messages::{Command, CommandResponse, Message}, }; use anyhow::{Result, bail}; -use caryatid_sdk::Context; +use caryatid_sdk::{Context, Module, module}; use config::Config; use peer::PeerConfig; use tokio::sync::RwLock; use crate::{peer::PeerConnection, tx::Transaction}; +#[module( + message_type(Message), + name = "tx-submitter", + description = "TX submission module" +)] pub struct TxSubmitter; impl TxSubmitter { @@ -61,7 +66,7 @@ struct SubmitterConfig { impl SubmitterConfig { pub fn parse(config: &Config) -> Result { let subscribe_topic = - config.get("subscribe-topic").unwrap_or("cardano.txs.submit").to_string(); + config.get_string("subscribe-topic").unwrap_or("cardano.txs.submit".to_string()); let magic = config.get("magic-number").unwrap_or(764824073); Ok(Self { subscribe_topic, From 9a1f48b0708af811842ab3cf7dbba1f4fc758825 Mon Sep 17 00:00:00 2001 From: Simon Gellis Date: Fri, 24 Oct 2025 15:24:16 -0400 Subject: [PATCH 04/10] fix: test the module end-to-end using the CLI --- Cargo.lock | 1 + modules/tx_submitter/Cargo.toml | 1 + modules/tx_submitter/cli/tx-submitter.toml | 2 + modules/tx_submitter/src/peer.rs | 98 ++++++++++++++-------- modules/tx_submitter/src/tx.rs | 9 +- modules/tx_submitter/src/tx_submitter.rs | 19 ++++- 6 files changed, 91 insertions(+), 39 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 55dc6416..d42ffcf7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -405,6 +405,7 @@ dependencies = [ "anyhow", "caryatid_sdk", "config", + "futures", "hex", "pallas 0.33.0", "tokio", diff --git a/modules/tx_submitter/Cargo.toml b/modules/tx_submitter/Cargo.toml index 76d94b16..f8bb86a5 100644 --- a/modules/tx_submitter/Cargo.toml +++ b/modules/tx_submitter/Cargo.toml @@ -13,6 +13,7 @@ caryatid_sdk = { workspace = true } anyhow = { workspace = true } config = { workspace = true } +futures = "0.3.31" hex = { workspace = true } pallas = { workspace = true } tokio = { workspace = true } diff --git a/modules/tx_submitter/cli/tx-submitter.toml b/modules/tx_submitter/cli/tx-submitter.toml index a5f5dccf..135773fe 100644 --- a/modules/tx_submitter/cli/tx-submitter.toml +++ b/modules/tx_submitter/cli/tx-submitter.toml @@ -12,6 +12,8 @@ bulk-block-capacity = 50 bulk-resume-capacity = 75 # Message routing +[message-router] +request-timeout = 300 [[message-router.route]] # Everything is internal only pattern = "#" bus = "internal" diff --git a/modules/tx_submitter/src/peer.rs b/modules/tx_submitter/src/peer.rs index a13fe5c0..e17b0f66 100644 --- a/modules/tx_submitter/src/peer.rs +++ b/modules/tx_submitter/src/peer.rs @@ -3,8 +3,11 @@ use std::{collections::VecDeque, sync::Arc, time::Duration}; use anyhow::{Context, Result, bail}; use config::Config; use pallas::network::{facades::PeerClient, miniprotocols::txsubmission}; -use tokio::{select, sync::mpsc}; -use tracing::{debug, error, warn}; +use tokio::{ + select, + sync::{mpsc, oneshot}, +}; +use tracing::{debug, error, instrument, warn}; use crate::{SubmitterConfig, tx::Transaction}; @@ -14,13 +17,14 @@ pub struct PeerConfig { impl PeerConfig { pub fn parse(config: &Config) -> Result { let address = - config.get("node-address").unwrap_or("backbone.cardano.iog.io:3001").to_string(); + config.get_string("node-address").unwrap_or("backbone.cardano.iog.io:3001".to_string()); Ok(Self { address }) } } pub struct PeerConnection { - tx_sink: mpsc::UnboundedSender>, + pub name: String, + tx_sink: mpsc::UnboundedSender, } impl PeerConnection { pub fn open(submitter: &SubmitterConfig, peer: PeerConfig) -> Self { @@ -28,20 +32,26 @@ impl PeerConnection { let worker = PeerWorker { tx_source, tx_queue: TxQueue::new(), - address: peer.address, + address: peer.address.clone(), magic: submitter.magic, }; tokio::task::spawn(worker.run()); - Self { tx_sink } + Self { + name: peer.address, + tx_sink, + } } - pub fn queue(&self, tx: Arc) -> Result<()> { - self.tx_sink.send(tx).context("could not queue tx") + pub fn queue(&self, tx: Arc) -> Result> { + let (done, done_rx) = oneshot::channel(); + let queued_tx = QueuedTx { tx, done }; + self.tx_sink.send(queued_tx).context("could not queue tx")?; + Ok(done_rx) } } struct PeerWorker { - tx_source: mpsc::UnboundedReceiver>, + tx_source: mpsc::UnboundedReceiver, tx_queue: TxQueue, address: String, magic: u64, @@ -57,11 +67,13 @@ impl PeerWorker { } } + #[instrument(skip(self), fields(address = %self.address))] async fn run_connection(&mut self) -> Result<()> { let mut client = PeerClient::connect(&self.address, self.magic).await.context("could not connect")?; let submission = client.txsubmission(); submission.send_init().await.context("failed to init")?; + debug!("initialized connection"); let mut pending_tx_requests = None; self.tx_queue.requeue_sent(); loop { @@ -71,11 +83,13 @@ impl PeerWorker { // parent process must have disconnected break; }; + debug!("received tx {tx}"); self.tx_queue.push(tx); if let Some(req) = pending_tx_requests.take() { let ids = self.tx_queue.req(req); + let count = ids.len(); submission.reply_tx_ids(ids).await.context("could not send tx ids")?; - self.tx_queue.mark_requested(req); + self.tx_queue.mark_requested(count); } } request = submission.next_request(), if pending_tx_requests.is_none() => { @@ -100,26 +114,34 @@ impl PeerWorker { ) -> Result> { match req { txsubmission::Request::TxIds(ack, req) => { + debug!("received TxIds({ack}, {req})"); self.tx_queue.ack(ack)?; let ids = self.tx_queue.req(req); if ids.is_empty() { Ok(Some(req)) } else { + let count = ids.len(); submission.reply_tx_ids(ids).await.context("could not send tx ids")?; - self.tx_queue.mark_requested(req); + self.tx_queue.mark_requested(count); Ok(None) } } txsubmission::Request::TxIdsNonBlocking(ack, req) => { + debug!("received TxIdsNonBlocking({ack}, {req})"); self.tx_queue.ack(ack)?; let ids = self.tx_queue.req(req); + let count = ids.len(); submission.reply_tx_ids(ids).await.context("could not send tx ids")?; - self.tx_queue.mark_requested(req); + self.tx_queue.mark_requested(count); Ok(None) } txsubmission::Request::Txs(ids) => { + debug!( + "received Txs({:?})", + ids.iter().map(|id| hex::encode(&id.1)).collect::>() + ); let mut txs = vec![]; for id in ids { match self.tx_queue.tx_body(&id) { @@ -139,17 +161,38 @@ impl PeerWorker { } } +struct QueuedTx { + tx: Arc, + done: oneshot::Sender<()>, +} +impl std::fmt::Display for QueuedTx { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_str(&hex::encode(&self.tx.id)) + } +} +impl QueuedTx { + fn tx_id_and_size(&self) -> txsubmission::TxIdAndSize { + txsubmission::TxIdAndSize( + txsubmission::EraTxId(self.tx.era, self.tx.id.to_vec()), + self.tx.body.len() as u32, + ) + } + fn era_tx_body(&self) -> txsubmission::EraTxBody { + txsubmission::EraTxBody(self.tx.era, self.tx.body.clone()) + } +} + #[derive(Default)] struct TxQueue { - unsent: VecDeque>, - sent: VecDeque>, + unsent: VecDeque, + sent: VecDeque, } impl TxQueue { pub fn new() -> Self { Self::default() } - pub fn push(&mut self, tx: Arc) { + pub fn push(&mut self, tx: QueuedTx) { self.unsent.push_back(tx); } @@ -157,7 +200,8 @@ impl TxQueue { for _ in 0..count { match self.sent.pop_front() { Some(tx) => { - debug!("TX {} has been acknowledged", hex::encode(&tx.id)) + debug!("TX {tx} has been acknowledged"); + let _ = tx.done.send(()); } None => bail!("Server acked a TX which we never sent"), } @@ -166,19 +210,10 @@ impl TxQueue { } pub fn req(&self, count: u16) -> Vec> { - self.unsent - .iter() - .take(count as usize) - .map(|tx| { - txsubmission::TxIdAndSize( - txsubmission::EraTxId(tx.era, tx.id.to_vec()), - tx.body.len() as u32, - ) - }) - .collect() - } - - pub fn mark_requested(&mut self, count: u16) { + self.unsent.iter().take(count as usize).map(|tx| tx.tx_id_and_size()).collect() + } + + pub fn mark_requested(&mut self, count: usize) { for _ in 0..count { let tx = self.unsent.pop_front().expect("logic error"); self.sent.push_back(tx); @@ -186,10 +221,7 @@ impl TxQueue { } pub fn tx_body(&self, id: &txsubmission::EraTxId) -> Option { - self.sent - .iter() - .find(|tx| *tx.id == *id.1) - .map(|tx| txsubmission::EraTxBody(tx.era, tx.body.clone())) + self.sent.iter().find(|tx| *tx.tx.id == *id.1).map(|tx| tx.era_tx_body()) } pub fn requeue_sent(&mut self) { diff --git a/modules/tx_submitter/src/tx.rs b/modules/tx_submitter/src/tx.rs index 0569204a..c1b038e5 100644 --- a/modules/tx_submitter/src/tx.rs +++ b/modules/tx_submitter/src/tx.rs @@ -1,6 +1,6 @@ use acropolis_common::TxHash; -use anyhow::Result; -use pallas::ledger::traverse::MultiEraTx; +use anyhow::{Result, bail}; +use pallas::ledger::traverse::{Era, MultiEraTx}; pub struct Transaction { pub id: TxHash, @@ -12,7 +12,10 @@ impl Transaction { pub fn from_bytes(bytes: &[u8]) -> Result { let parsed = MultiEraTx::decode(bytes)?; let id = TxHash::from(*parsed.hash()); - let era = parsed.era().into(); + let era = match parsed.era() { + Era::Conway => 6, + other => bail!("cannot submit {other} era transactions"), + }; Ok(Self { id, body: bytes.to_vec(), diff --git a/modules/tx_submitter/src/tx_submitter.rs b/modules/tx_submitter/src/tx_submitter.rs index 750008c5..8b9d6a06 100644 --- a/modules/tx_submitter/src/tx_submitter.rs +++ b/modules/tx_submitter/src/tx_submitter.rs @@ -7,11 +7,13 @@ use acropolis_common::{ commands::transactions::{TransactionsCommand, TransactionsCommandResponse}, messages::{Command, CommandResponse, Message}, }; -use anyhow::{Result, bail}; +use anyhow::{Context as _, Result, bail}; use caryatid_sdk::{Context, Module, module}; use config::Config; +use futures::stream::{FuturesUnordered, StreamExt}; use peer::PeerConfig; use tokio::sync::RwLock; +use tracing::warn; use crate::{peer::PeerConnection, tx::Transaction}; @@ -52,10 +54,21 @@ impl TxSubmitter { bail!("unexpected tx request") }; let tx = Arc::new(Transaction::from_bytes(cbor)?); + let mut waiting = FuturesUnordered::new(); for peer in peers { - peer.queue(tx.clone())?; + let peer_name = peer.name.clone(); + let receiver = peer.queue(tx.clone())?; + waiting.push(async move { + receiver.await.context(format!("could not send tx to {peer_name}")) + }); } - Ok(TransactionsCommandResponse::Submitted { id: tx.id }) + while let Some(result) = waiting.next().await { + match result { + Ok(()) => return Ok(TransactionsCommandResponse::Submitted { id: tx.id }), + Err(err) => warn!("{err:#}"), + } + } + bail!("could not send tx to any peers"); } } From 00d9ca1963dc7e6f18051f3496eb034b2956d974 Mon Sep 17 00:00:00 2001 From: Simon Gellis Date: Fri, 24 Oct 2025 15:53:50 -0400 Subject: [PATCH 05/10] docs: add documentation and unit tests --- modules/tx_submitter/README.md | 27 +++++++ modules/tx_submitter/cli/src/main.rs | 4 +- modules/tx_submitter/src/peer.rs | 105 ++++++++++++++++++++++++++- 3 files changed, 133 insertions(+), 3 deletions(-) create mode 100644 modules/tx_submitter/README.md diff --git a/modules/tx_submitter/README.md b/modules/tx_submitter/README.md new file mode 100644 index 00000000..d8a06fa9 --- /dev/null +++ b/modules/tx_submitter/README.md @@ -0,0 +1,27 @@ +# TX submission module + +The TX submission module implements the TXSubmission node-to-node protocol to submit transactions to a single upstream source. It can run as part of a complete Acropolis setup, or independently through its own CLI. + +## CLI tool + +```sh +cargo run --bin tx-submitter-cli -- +``` + +## Messages + +The TX submission module listens for requests to submit transactions on the `cardano.txs.submit` topic. It will send a response once any upstream server has acknowledged the transaction. + +## Default configuration + +```toml +[module.tx-submitter] + +# Upstream node connection +node-address = "backbone.cardano.iog.io:3001" +magic-number = 764824073 + +# Message topics +subscribe-topic = "cardano.txs.submit" + +``` \ No newline at end of file diff --git a/modules/tx_submitter/cli/src/main.rs b/modules/tx_submitter/cli/src/main.rs index c2f11ba8..cdff7deb 100644 --- a/modules/tx_submitter/cli/src/main.rs +++ b/modules/tx_submitter/cli/src/main.rs @@ -25,8 +25,10 @@ fn default_config_path() -> PathBuf { #[derive(clap::Parser, Clone)] struct Args { - #[arg(long, value_name = "PATH", default_value = default_config_path().into_os_string())] + /// Path to configuration. + #[arg(long, default_value = default_config_path().into_os_string())] config: PathBuf, + /// File containing the raw bytes of a transaction. tx_file: PathBuf, } diff --git a/modules/tx_submitter/src/peer.rs b/modules/tx_submitter/src/peer.rs index e17b0f66..3aecaede 100644 --- a/modules/tx_submitter/src/peer.rs +++ b/modules/tx_submitter/src/peer.rs @@ -144,7 +144,7 @@ impl PeerWorker { ); let mut txs = vec![]; for id in ids { - match self.tx_queue.tx_body(&id) { + match self.tx_queue.announced_tx_body(&id) { Some(body) => { debug!("Sending TX {}", hex::encode(id.1)); txs.push(body); @@ -220,7 +220,7 @@ impl TxQueue { } } - pub fn tx_body(&self, id: &txsubmission::EraTxId) -> Option { + pub fn announced_tx_body(&self, id: &txsubmission::EraTxId) -> Option { self.sent.iter().find(|tx| *tx.tx.id == *id.1).map(|tx| tx.era_tx_body()) } @@ -230,3 +230,104 @@ impl TxQueue { } } } + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use acropolis_common::TxHash; + use tokio::sync::oneshot; + + use crate::{peer::QueuedTx, tx::Transaction}; + + use super::TxQueue; + + #[test] + fn tx_queue_should_not_ack_unsubmitted_requests() { + let mut queue = TxQueue::new(); + assert!(queue.ack(1).is_err()); + } + + #[test] + fn tx_queue_should_return_no_txs_when_empty() { + let queue = TxQueue::new(); + assert!(queue.req(1).is_empty()); + } + + #[test] + fn tx_queue_should_acknowledge_request() { + let (done, done_rx) = oneshot::channel(); + let tx = QueuedTx { + tx: Arc::new(Transaction { + id: TxHash::default(), + body: vec![], + era: 6, + }), + done, + }; + let id = tx.tx_id_and_size().0; + let mut queue = TxQueue::new(); + queue.push(tx); + + // the TX hasn't been announced yet + assert!(queue.announced_tx_body(&id).is_none()); + assert!(done_rx.is_empty()); + + // now the server requests it + let ids = queue.req(2); + assert_eq!(ids.len(), 1); + assert_eq!(ids[0].0.1, id.1); + queue.mark_requested(1); + + // the TX has been announced, so the server can request the body + assert!(queue.announced_tx_body(&id).is_some()); + assert!(done_rx.is_empty()); + + // the server acks the request. now we're done! + assert!(queue.ack(1).is_ok()); + assert!(queue.announced_tx_body(&id).is_none()); + assert!(!done_rx.is_empty()); + } + + #[test] + fn tx_queue_should_restart_submission_after_connection_lost() { + let (done, done_rx) = oneshot::channel(); + let tx = QueuedTx { + tx: Arc::new(Transaction { + id: TxHash::default(), + body: vec![], + era: 6, + }), + done, + }; + let id = tx.tx_id_and_size().0; + let mut queue = TxQueue::new(); + queue.push(tx); + + // the TX hasn't been announced yet + assert!(queue.announced_tx_body(&id).is_none()); + assert!(done_rx.is_empty()); + + // now the server requests it + let ids = queue.req(2); + assert_eq!(ids.len(), 1); + assert_eq!(ids[0].0.1, id.1); + queue.mark_requested(1); + + // the TX has been announced, so the server can request the body + assert!(queue.announced_tx_body(&id).is_some()); + assert!(done_rx.is_empty()); + + // uh oh! we disconnected and reconnected before the server acked it. + queue.requeue_sent(); + + // now we pretend we never sent it + assert!(queue.announced_tx_body(&id).is_none()); + assert!(done_rx.is_empty()); + + // and the server can request it again + let ids = queue.req(2); + assert_eq!(ids.len(), 1); + assert_eq!(ids[0].0.1, id.1); + } +} From fc10b87ada749500d830d35485fd8845c881e53b Mon Sep 17 00:00:00 2001 From: Simon Gellis Date: Fri, 24 Oct 2025 16:07:43 -0400 Subject: [PATCH 06/10] fix: use default topic everywhere --- modules/tx_submitter/cli/src/main.rs | 2 +- modules/tx_submitter/cli/tx-submitter.toml | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/modules/tx_submitter/cli/src/main.rs b/modules/tx_submitter/cli/src/main.rs index cdff7deb..e6efda2a 100644 --- a/modules/tx_submitter/cli/src/main.rs +++ b/modules/tx_submitter/cli/src/main.rs @@ -106,7 +106,7 @@ impl CliDriver { let request = Arc::new(Message::Command(Command::Transactions( TransactionsCommand::Submit { cbor: tx }, ))); - let response = context.request("cli.tx.submit", request).await?; + let response = context.request("cardano.txs.submit", request).await?; info!("{response:?}"); Ok(()) }); diff --git a/modules/tx_submitter/cli/tx-submitter.toml b/modules/tx_submitter/cli/tx-submitter.toml index 135773fe..1c894c94 100644 --- a/modules/tx_submitter/cli/tx-submitter.toml +++ b/modules/tx_submitter/cli/tx-submitter.toml @@ -1,5 +1,4 @@ [module.tx-submitter] -subscribe-topic = "cli.tx.submit" [module.cli-driver] From eb0a84f45bb1910e09722e2fa29db58ccc0a9b1f Mon Sep 17 00:00:00 2001 From: Simon Gellis Date: Mon, 27 Oct 2025 11:53:48 -0400 Subject: [PATCH 07/10] fix: move cli tool to processes --- Cargo.lock | 36 +++++++++---------- Cargo.toml | 8 ++--- modules/tx_submitter/README.md | 8 +---- processes/README.md | 1 + .../tx_submitter_cli}/Cargo.toml | 6 ++-- processes/tx_submitter_cli/README.md | 11 ++++++ .../tx_submitter_cli}/src/main.rs | 0 .../tx_submitter_cli}/tx-submitter.toml | 0 8 files changed, 38 insertions(+), 32 deletions(-) rename {modules/tx_submitter/cli => processes/tx_submitter_cli}/Cargo.toml (78%) create mode 100644 processes/tx_submitter_cli/README.md rename {modules/tx_submitter/cli => processes/tx_submitter_cli}/src/main.rs (100%) rename {modules/tx_submitter/cli => processes/tx_submitter_cli}/tx-submitter.toml (100%) diff --git a/Cargo.lock b/Cargo.lock index d42ffcf7..04d9375e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -412,24 +412,6 @@ dependencies = [ "tracing", ] -[[package]] -name = "acropolis_module_tx_submitter_cli" -version = "0.1.0" -dependencies = [ - "acropolis_common", - "acropolis_module_tx_submitter", - "anyhow", - "caryatid_process", - "caryatid_sdk", - "clap", - "config", - "hex", - "pallas 0.33.0", - "tokio", - "tracing", - "tracing-subscriber", -] - [[package]] name = "acropolis_module_tx_unpacker" version = "0.2.1" @@ -602,6 +584,24 @@ dependencies = [ "tracing-subscriber", ] +[[package]] +name = "acropolis_process_tx_submitter_cli" +version = "0.1.0" +dependencies = [ + "acropolis_common", + "acropolis_module_tx_submitter", + "anyhow", + "caryatid_process", + "caryatid_sdk", + "clap", + "config", + "hex", + "pallas 0.33.0", + "tokio", + "tracing", + "tracing-subscriber", +] + [[package]] name = "adler2" version = "2.0.1" diff --git a/Cargo.toml b/Cargo.toml index c1dd927e..035c1edb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,12 +26,12 @@ members = [ "modules/consensus", # Chooses favoured chain across multiple options "modules/chain_store", # Tracks historical information about blocks and TXs "modules/tx_submitter", # Submits TXs to peers - "modules/tx_submitter/cli", # CLI wrapper for TX submitter # Process builds - "processes/omnibus", # All-inclusive omnibus process - "processes/replayer", # All-inclusive process to replay messages - "processes/golden_tests", # All-inclusive golden tests process + "processes/omnibus", # All-inclusive omnibus process + "processes/replayer", # All-inclusive process to replay messages + "processes/golden_tests", # All-inclusive golden tests process + "processes/tx_submitter_cli", # CLI wrapper for TX submitter ] resolver = "2" diff --git a/modules/tx_submitter/README.md b/modules/tx_submitter/README.md index d8a06fa9..a9b2e836 100644 --- a/modules/tx_submitter/README.md +++ b/modules/tx_submitter/README.md @@ -1,12 +1,6 @@ # TX submission module -The TX submission module implements the TXSubmission node-to-node protocol to submit transactions to a single upstream source. It can run as part of a complete Acropolis setup, or independently through its own CLI. - -## CLI tool - -```sh -cargo run --bin tx-submitter-cli -- -``` +The TX submission module implements the TXSubmission node-to-node protocol to submit transactions to a single upstream source. ## Messages diff --git a/processes/README.md b/processes/README.md index 42ab8c57..c21ff0c9 100644 --- a/processes/README.md +++ b/processes/README.md @@ -5,3 +5,4 @@ These are the process builds for the Acropolis architecture: * [Omnibus](omnibus/) - All-you-can-eat container for testing * [Replayer](replayer/) - Locally replay previously downloaded selected messages, stored in JSON on disk * [Golden Tests](golden_tests/) - Provides a testing module to execute end to end golden tests +* [TX Submitter CLI](tx_submitter_cli/) - Provides a CLI wrapper for the tx submitter module diff --git a/modules/tx_submitter/cli/Cargo.toml b/processes/tx_submitter_cli/Cargo.toml similarity index 78% rename from modules/tx_submitter/cli/Cargo.toml rename to processes/tx_submitter_cli/Cargo.toml index f2277ee6..a58ce2b2 100644 --- a/modules/tx_submitter/cli/Cargo.toml +++ b/processes/tx_submitter_cli/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "acropolis_module_tx_submitter_cli" +name = "acropolis_process_tx_submitter_cli" version = "0.1.0" edition = "2024" authors = ["Simon Gellis "] @@ -7,8 +7,8 @@ description = "CLI tool to submit transactions" license = "Apache-2.0" [dependencies] -acropolis_common = { path = "../../../common" } -acropolis_module_tx_submitter = { path = "../" } +acropolis_common = { path = "../../common" } +acropolis_module_tx_submitter = { path = "../../modules/tx_submitter" } caryatid_sdk = { workspace = true } caryatid_process = { workspace = true} diff --git a/processes/tx_submitter_cli/README.md b/processes/tx_submitter_cli/README.md new file mode 100644 index 00000000..4c38244a --- /dev/null +++ b/processes/tx_submitter_cli/README.md @@ -0,0 +1,11 @@ +# Acropolis tx-submitter-cli tool + +This process is a CLI wrapper for the [tx_submitter module](../../modules/tx_submitter/). It allows you to submit transactions to upstream peers. + +## How to run it + +```shell +cd processes/tx_submitter_cli +cargo run -- +``` +The `tx-file` arg should be the path to a file containing a raw signed transaction. diff --git a/modules/tx_submitter/cli/src/main.rs b/processes/tx_submitter_cli/src/main.rs similarity index 100% rename from modules/tx_submitter/cli/src/main.rs rename to processes/tx_submitter_cli/src/main.rs diff --git a/modules/tx_submitter/cli/tx-submitter.toml b/processes/tx_submitter_cli/tx-submitter.toml similarity index 100% rename from modules/tx_submitter/cli/tx-submitter.toml rename to processes/tx_submitter_cli/tx-submitter.toml From f04da3539620d46c20a53d9dfc28790e758daeb2 Mon Sep 17 00:00:00 2001 From: Simon Gellis Date: Mon, 27 Oct 2025 13:01:31 -0400 Subject: [PATCH 08/10] fix: nicer output for happy path --- processes/tx_submitter_cli/src/main.rs | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/processes/tx_submitter_cli/src/main.rs b/processes/tx_submitter_cli/src/main.rs index e6efda2a..e2074854 100644 --- a/processes/tx_submitter_cli/src/main.rs +++ b/processes/tx_submitter_cli/src/main.rs @@ -1,8 +1,8 @@ use std::{path::PathBuf, sync::Arc}; use acropolis_common::{ - commands::transactions::TransactionsCommand, - messages::{Command, Message}, + commands::transactions::{TransactionsCommand, TransactionsCommandResponse}, + messages::{Command, CommandResponse, Message}, }; use acropolis_module_tx_submitter::TxSubmitter; use anyhow::{Result, bail}; @@ -107,7 +107,14 @@ impl CliDriver { TransactionsCommand::Submit { cbor: tx }, ))); let response = context.request("cardano.txs.submit", request).await?; - info!("{response:?}"); + if let Message::CommandResponse(CommandResponse::Transactions( + TransactionsCommandResponse::Submitted { id }, + )) = response.as_ref() + { + info!("Submitted TX {}", hex::encode(id)); + } else { + info!("{response:?}"); + } Ok(()) }); Ok(()) From 22d7156b6b9aff911e10b54dca1cf19c4d792245 Mon Sep 17 00:00:00 2001 From: Simon Gellis Date: Mon, 27 Oct 2025 13:02:53 -0400 Subject: [PATCH 09/10] fix: correct CI config --- .github/workflows/run-tests-on-push-to-main.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/run-tests-on-push-to-main.yml b/.github/workflows/run-tests-on-push-to-main.yml index 8e505f83..89066338 100644 --- a/.github/workflows/run-tests-on-push-to-main.yml +++ b/.github/workflows/run-tests-on-push-to-main.yml @@ -41,9 +41,9 @@ jobs: --package acropolis_module_spdd_state \ --package acropolis_module_stake_delta_filter \ --package acropolis_module_tx_submitter \ - --package acropolis_module_tx_submitter_cli \ --package acropolis_module_upstream_chain_fetcher \ - --package acropolis_module_utxo_state + --package acropolis_module_utxo_state \ + --package acropolis_process_tx_submitter_cli - name: Run Build run: cargo build --verbose From 4a0852a18c1f64ee3b3ede9fa02e316a56c190f3 Mon Sep 17 00:00:00 2001 From: Simon Gellis Date: Mon, 27 Oct 2025 13:25:46 -0400 Subject: [PATCH 10/10] fix: make blocking on submission optional --- common/src/commands/transactions.rs | 1 + modules/tx_submitter/src/tx_submitter.rs | 9 +++++++-- processes/tx_submitter_cli/src/main.rs | 5 ++++- 3 files changed, 12 insertions(+), 3 deletions(-) diff --git a/common/src/commands/transactions.rs b/common/src/commands/transactions.rs index d63f1fbe..92453109 100644 --- a/common/src/commands/transactions.rs +++ b/common/src/commands/transactions.rs @@ -8,6 +8,7 @@ pub enum TransactionsCommand { Submit { #[serde_as(as = "Hex")] cbor: Vec, + wait_for_ack: bool, }, } diff --git a/modules/tx_submitter/src/tx_submitter.rs b/modules/tx_submitter/src/tx_submitter.rs index 8b9d6a06..a8237380 100644 --- a/modules/tx_submitter/src/tx_submitter.rs +++ b/modules/tx_submitter/src/tx_submitter.rs @@ -48,8 +48,10 @@ impl TxSubmitter { message: Arc, peers: &Vec, ) -> Result { - let Message::Command(Command::Transactions(TransactionsCommand::Submit { cbor })) = - message.as_ref() + let Message::Command(Command::Transactions(TransactionsCommand::Submit { + cbor, + wait_for_ack, + })) = message.as_ref() else { bail!("unexpected tx request") }; @@ -62,6 +64,9 @@ impl TxSubmitter { receiver.await.context(format!("could not send tx to {peer_name}")) }); } + if !*wait_for_ack { + return Ok(TransactionsCommandResponse::Submitted { id: tx.id }); + } while let Some(result) = waiting.next().await { match result { Ok(()) => return Ok(TransactionsCommandResponse::Submitted { id: tx.id }), diff --git a/processes/tx_submitter_cli/src/main.rs b/processes/tx_submitter_cli/src/main.rs index e2074854..5a3ee682 100644 --- a/processes/tx_submitter_cli/src/main.rs +++ b/processes/tx_submitter_cli/src/main.rs @@ -104,7 +104,10 @@ impl CliDriver { state.run(context, move |args, context| async move { let tx = fs::read(args.tx_file).await?; let request = Arc::new(Message::Command(Command::Transactions( - TransactionsCommand::Submit { cbor: tx }, + TransactionsCommand::Submit { + cbor: tx, + wait_for_ack: true, + }, ))); let response = context.request("cardano.txs.submit", request).await?; if let Message::CommandResponse(CommandResponse::Transactions(