diff --git a/.github/workflows/run-tests-on-push-to-main.yml b/.github/workflows/run-tests-on-push-to-main.yml index 742b5784..89066338 100644 --- a/.github/workflows/run-tests-on-push-to-main.yml +++ b/.github/workflows/run-tests-on-push-to-main.yml @@ -40,8 +40,10 @@ jobs: --package acropolis_module_snapshot_bootstrapper \ --package acropolis_module_spdd_state \ --package acropolis_module_stake_delta_filter \ + --package acropolis_module_tx_submitter \ --package acropolis_module_upstream_chain_fetcher \ - --package acropolis_module_utxo_state + --package acropolis_module_utxo_state \ + --package acropolis_process_tx_submitter_cli - name: Run Build run: cargo build --verbose diff --git a/Cargo.lock b/Cargo.lock index bd584c53..04d9375e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -397,6 +397,21 @@ dependencies = [ "tracing", ] +[[package]] +name = "acropolis_module_tx_submitter" +version = "0.1.0" +dependencies = [ + "acropolis_common", + "anyhow", + "caryatid_sdk", + "config", + "futures", + "hex", + "pallas 0.33.0", + "tokio", + "tracing", +] + [[package]] name = "acropolis_module_tx_unpacker" version = "0.2.1" @@ -569,6 +584,24 @@ dependencies = [ "tracing-subscriber", ] +[[package]] +name = "acropolis_process_tx_submitter_cli" +version = "0.1.0" +dependencies = [ + "acropolis_common", + "acropolis_module_tx_submitter", + "anyhow", + "caryatid_process", + "caryatid_sdk", + "clap", + "config", + "hex", + "pallas 0.33.0", + "tokio", + "tracing", + "tracing-subscriber", +] + [[package]] name = "adler2" version = "2.0.1" diff --git a/Cargo.toml b/Cargo.toml index 9efddb72..035c1edb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -25,11 +25,13 @@ members = [ "modules/historical_accounts_state", # Tracks historical account information "modules/consensus", # Chooses favoured chain across multiple options "modules/chain_store", # Tracks historical information about blocks and TXs + "modules/tx_submitter", # Submits TXs to peers # Process builds - "processes/omnibus", # All-inclusive omnibus process - "processes/replayer", # All-inclusive process to replay messages - "processes/golden_tests", # All-inclusive golden tests process + "processes/omnibus", # All-inclusive omnibus process + "processes/replayer", # All-inclusive process to replay messages + "processes/golden_tests", # All-inclusive golden tests process + "processes/tx_submitter_cli", # CLI wrapper for TX submitter ] resolver = "2" @@ -41,7 +43,7 @@ caryatid_module_clock = "0.12" caryatid_module_spy = "0.12" anyhow = "1.0" chrono = "0.4" -clap = { version = "4.5", features = ["derive"] } +clap = { version = "4.5", features = ["derive", "string"] } config = "0.15.11" dashmap = "6.1.0" hex = "0.4" diff --git a/common/src/commands/mod.rs b/common/src/commands/mod.rs new file mode 100644 index 00000000..0824d7a9 --- /dev/null +++ b/common/src/commands/mod.rs @@ -0,0 +1 @@ +pub mod transactions; diff --git a/common/src/commands/transactions.rs b/common/src/commands/transactions.rs new file mode 100644 index 00000000..92453109 --- /dev/null +++ b/common/src/commands/transactions.rs @@ -0,0 +1,19 @@ +use serde_with::{hex::Hex, serde_as}; + +use crate::TxHash; + +#[serde_as] +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub enum TransactionsCommand { + Submit { + #[serde_as(as = "Hex")] + cbor: Vec, + wait_for_ack: bool, + }, +} + +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub enum TransactionsCommandResponse { + Submitted { id: TxHash }, + Error(String), +} diff --git a/common/src/lib.rs b/common/src/lib.rs index abf55551..a9cf24c3 100644 --- a/common/src/lib.rs +++ b/common/src/lib.rs @@ -4,6 +4,7 @@ pub mod address; pub mod byte_array; pub mod calculations; pub mod cip19; +pub mod commands; pub mod crypto; pub mod genesis_values; pub mod hash; diff --git a/common/src/messages.rs b/common/src/messages.rs index 2cd325a6..8ed9856e 100644 --- a/common/src/messages.rs +++ b/common/src/messages.rs @@ -3,6 +3,7 @@ // We don't use these messages in the acropolis_common crate itself #![allow(dead_code)] +use crate::commands::transactions::{TransactionsCommand, TransactionsCommandResponse}; use crate::genesis_values::GenesisValues; use crate::ledger_state::SPOState; use crate::protocol_params::{NonceHash, ProtocolParams}; @@ -350,6 +351,10 @@ pub enum Message { // State query messages StateQuery(StateQuery), StateQueryResponse(StateQueryResponse), + + // Commands + Command(Command), + CommandResponse(CommandResponse), } // Casts from specific Caryatid messages @@ -422,3 +427,13 @@ pub enum StateQueryResponse { UTxOs(UTxOStateQueryResponse), SPDD(SPDDStateQueryResponse), } + +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub enum Command { + Transactions(TransactionsCommand), +} + +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub enum CommandResponse { + Transactions(TransactionsCommandResponse), +} diff --git a/modules/tx_submitter/Cargo.toml b/modules/tx_submitter/Cargo.toml new file mode 100644 index 00000000..f8bb86a5 --- /dev/null +++ b/modules/tx_submitter/Cargo.toml @@ -0,0 +1,23 @@ +[package] +name = "acropolis_module_tx_submitter" +version = "0.1.0" +edition = "2024" +authors = ["Simon Gellis "] +description = "TX submission module for Acropolis" +license = "Apache-2.0" + +[dependencies] +acropolis_common = { path = "../../common" } + +caryatid_sdk = { workspace = true } + +anyhow = { workspace = true } +config = { workspace = true } +futures = "0.3.31" +hex = { workspace = true } +pallas = { workspace = true } +tokio = { workspace = true } +tracing = { workspace = true } + +[lib] +path = "src/tx_submitter.rs" \ No newline at end of file diff --git a/modules/tx_submitter/README.md b/modules/tx_submitter/README.md new file mode 100644 index 00000000..a9b2e836 --- /dev/null +++ b/modules/tx_submitter/README.md @@ -0,0 +1,21 @@ +# TX submission module + +The TX submission module implements the TXSubmission node-to-node protocol to submit transactions to a single upstream source. + +## Messages + +The TX submission module listens for requests to submit transactions on the `cardano.txs.submit` topic. It will send a response once any upstream server has acknowledged the transaction. + +## Default configuration + +```toml +[module.tx-submitter] + +# Upstream node connection +node-address = "backbone.cardano.iog.io:3001" +magic-number = 764824073 + +# Message topics +subscribe-topic = "cardano.txs.submit" + +``` \ No newline at end of file diff --git a/modules/tx_submitter/src/peer.rs b/modules/tx_submitter/src/peer.rs new file mode 100644 index 00000000..3aecaede --- /dev/null +++ b/modules/tx_submitter/src/peer.rs @@ -0,0 +1,333 @@ +use std::{collections::VecDeque, sync::Arc, time::Duration}; + +use anyhow::{Context, Result, bail}; +use config::Config; +use pallas::network::{facades::PeerClient, miniprotocols::txsubmission}; +use tokio::{ + select, + sync::{mpsc, oneshot}, +}; +use tracing::{debug, error, instrument, warn}; + +use crate::{SubmitterConfig, tx::Transaction}; + +pub struct PeerConfig { + address: String, +} +impl PeerConfig { + pub fn parse(config: &Config) -> Result { + let address = + config.get_string("node-address").unwrap_or("backbone.cardano.iog.io:3001".to_string()); + Ok(Self { address }) + } +} + +pub struct PeerConnection { + pub name: String, + tx_sink: mpsc::UnboundedSender, +} +impl PeerConnection { + pub fn open(submitter: &SubmitterConfig, peer: PeerConfig) -> Self { + let (tx_sink, tx_source) = mpsc::unbounded_channel(); + let worker = PeerWorker { + tx_source, + tx_queue: TxQueue::new(), + address: peer.address.clone(), + magic: submitter.magic, + }; + tokio::task::spawn(worker.run()); + Self { + name: peer.address, + tx_sink, + } + } + + pub fn queue(&self, tx: Arc) -> Result> { + let (done, done_rx) = oneshot::channel(); + let queued_tx = QueuedTx { tx, done }; + self.tx_sink.send(queued_tx).context("could not queue tx")?; + Ok(done_rx) + } +} + +struct PeerWorker { + tx_source: mpsc::UnboundedReceiver, + tx_queue: TxQueue, + address: String, + magic: u64, +} +impl PeerWorker { + async fn run(mut self) { + while !self.tx_source.is_closed() { + if let Err(error) = self.run_connection().await { + error!("error connecting to {}: {:#}", self.address, error); + debug!("reconnecting in 5 seconds"); + tokio::time::sleep(Duration::from_secs(5)).await; + } + } + } + + #[instrument(skip(self), fields(address = %self.address))] + async fn run_connection(&mut self) -> Result<()> { + let mut client = + PeerClient::connect(&self.address, self.magic).await.context("could not connect")?; + let submission = client.txsubmission(); + submission.send_init().await.context("failed to init")?; + debug!("initialized connection"); + let mut pending_tx_requests = None; + self.tx_queue.requeue_sent(); + loop { + select! { + new_tx = self.tx_source.recv() => { + let Some(tx) = new_tx else { + // parent process must have disconnected + break; + }; + debug!("received tx {tx}"); + self.tx_queue.push(tx); + if let Some(req) = pending_tx_requests.take() { + let ids = self.tx_queue.req(req); + let count = ids.len(); + submission.reply_tx_ids(ids).await.context("could not send tx ids")?; + self.tx_queue.mark_requested(count); + } + } + request = submission.next_request(), if pending_tx_requests.is_none() => { + let req = request.context("could not receive request")?; + pending_tx_requests = self.handle_request(submission, req).await?; + } + } + } + if !matches!(submission.state(), txsubmission::State::Idle) { + submission.send_done().await?; + } + Ok(()) + } + + async fn handle_request( + &mut self, + submission: &mut txsubmission::GenericClient< + txsubmission::EraTxId, + txsubmission::EraTxBody, + >, + req: txsubmission::Request, + ) -> Result> { + match req { + txsubmission::Request::TxIds(ack, req) => { + debug!("received TxIds({ack}, {req})"); + self.tx_queue.ack(ack)?; + + let ids = self.tx_queue.req(req); + if ids.is_empty() { + Ok(Some(req)) + } else { + let count = ids.len(); + submission.reply_tx_ids(ids).await.context("could not send tx ids")?; + self.tx_queue.mark_requested(count); + Ok(None) + } + } + txsubmission::Request::TxIdsNonBlocking(ack, req) => { + debug!("received TxIdsNonBlocking({ack}, {req})"); + self.tx_queue.ack(ack)?; + + let ids = self.tx_queue.req(req); + let count = ids.len(); + submission.reply_tx_ids(ids).await.context("could not send tx ids")?; + self.tx_queue.mark_requested(count); + Ok(None) + } + txsubmission::Request::Txs(ids) => { + debug!( + "received Txs({:?})", + ids.iter().map(|id| hex::encode(&id.1)).collect::>() + ); + let mut txs = vec![]; + for id in ids { + match self.tx_queue.announced_tx_body(&id) { + Some(body) => { + debug!("Sending TX {}", hex::encode(id.1)); + txs.push(body); + } + None => { + warn!("Server requested unrecognized TX {}", hex::encode(id.1)); + } + } + } + submission.reply_txs(txs).await.context("could not send tx bodies")?; + Ok(None) + } + } + } +} + +struct QueuedTx { + tx: Arc, + done: oneshot::Sender<()>, +} +impl std::fmt::Display for QueuedTx { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_str(&hex::encode(&self.tx.id)) + } +} +impl QueuedTx { + fn tx_id_and_size(&self) -> txsubmission::TxIdAndSize { + txsubmission::TxIdAndSize( + txsubmission::EraTxId(self.tx.era, self.tx.id.to_vec()), + self.tx.body.len() as u32, + ) + } + fn era_tx_body(&self) -> txsubmission::EraTxBody { + txsubmission::EraTxBody(self.tx.era, self.tx.body.clone()) + } +} + +#[derive(Default)] +struct TxQueue { + unsent: VecDeque, + sent: VecDeque, +} +impl TxQueue { + pub fn new() -> Self { + Self::default() + } + + pub fn push(&mut self, tx: QueuedTx) { + self.unsent.push_back(tx); + } + + pub fn ack(&mut self, count: u16) -> Result<()> { + for _ in 0..count { + match self.sent.pop_front() { + Some(tx) => { + debug!("TX {tx} has been acknowledged"); + let _ = tx.done.send(()); + } + None => bail!("Server acked a TX which we never sent"), + } + } + Ok(()) + } + + pub fn req(&self, count: u16) -> Vec> { + self.unsent.iter().take(count as usize).map(|tx| tx.tx_id_and_size()).collect() + } + + pub fn mark_requested(&mut self, count: usize) { + for _ in 0..count { + let tx = self.unsent.pop_front().expect("logic error"); + self.sent.push_back(tx); + } + } + + pub fn announced_tx_body(&self, id: &txsubmission::EraTxId) -> Option { + self.sent.iter().find(|tx| *tx.tx.id == *id.1).map(|tx| tx.era_tx_body()) + } + + pub fn requeue_sent(&mut self) { + while let Some(tx) = self.sent.pop_back() { + self.unsent.push_front(tx); + } + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use acropolis_common::TxHash; + use tokio::sync::oneshot; + + use crate::{peer::QueuedTx, tx::Transaction}; + + use super::TxQueue; + + #[test] + fn tx_queue_should_not_ack_unsubmitted_requests() { + let mut queue = TxQueue::new(); + assert!(queue.ack(1).is_err()); + } + + #[test] + fn tx_queue_should_return_no_txs_when_empty() { + let queue = TxQueue::new(); + assert!(queue.req(1).is_empty()); + } + + #[test] + fn tx_queue_should_acknowledge_request() { + let (done, done_rx) = oneshot::channel(); + let tx = QueuedTx { + tx: Arc::new(Transaction { + id: TxHash::default(), + body: vec![], + era: 6, + }), + done, + }; + let id = tx.tx_id_and_size().0; + let mut queue = TxQueue::new(); + queue.push(tx); + + // the TX hasn't been announced yet + assert!(queue.announced_tx_body(&id).is_none()); + assert!(done_rx.is_empty()); + + // now the server requests it + let ids = queue.req(2); + assert_eq!(ids.len(), 1); + assert_eq!(ids[0].0.1, id.1); + queue.mark_requested(1); + + // the TX has been announced, so the server can request the body + assert!(queue.announced_tx_body(&id).is_some()); + assert!(done_rx.is_empty()); + + // the server acks the request. now we're done! + assert!(queue.ack(1).is_ok()); + assert!(queue.announced_tx_body(&id).is_none()); + assert!(!done_rx.is_empty()); + } + + #[test] + fn tx_queue_should_restart_submission_after_connection_lost() { + let (done, done_rx) = oneshot::channel(); + let tx = QueuedTx { + tx: Arc::new(Transaction { + id: TxHash::default(), + body: vec![], + era: 6, + }), + done, + }; + let id = tx.tx_id_and_size().0; + let mut queue = TxQueue::new(); + queue.push(tx); + + // the TX hasn't been announced yet + assert!(queue.announced_tx_body(&id).is_none()); + assert!(done_rx.is_empty()); + + // now the server requests it + let ids = queue.req(2); + assert_eq!(ids.len(), 1); + assert_eq!(ids[0].0.1, id.1); + queue.mark_requested(1); + + // the TX has been announced, so the server can request the body + assert!(queue.announced_tx_body(&id).is_some()); + assert!(done_rx.is_empty()); + + // uh oh! we disconnected and reconnected before the server acked it. + queue.requeue_sent(); + + // now we pretend we never sent it + assert!(queue.announced_tx_body(&id).is_none()); + assert!(done_rx.is_empty()); + + // and the server can request it again + let ids = queue.req(2); + assert_eq!(ids.len(), 1); + assert_eq!(ids[0].0.1, id.1); + } +} diff --git a/modules/tx_submitter/src/tx.rs b/modules/tx_submitter/src/tx.rs new file mode 100644 index 00000000..c1b038e5 --- /dev/null +++ b/modules/tx_submitter/src/tx.rs @@ -0,0 +1,25 @@ +use acropolis_common::TxHash; +use anyhow::{Result, bail}; +use pallas::ledger::traverse::{Era, MultiEraTx}; + +pub struct Transaction { + pub id: TxHash, + pub body: Vec, + pub era: u16, +} + +impl Transaction { + pub fn from_bytes(bytes: &[u8]) -> Result { + let parsed = MultiEraTx::decode(bytes)?; + let id = TxHash::from(*parsed.hash()); + let era = match parsed.era() { + Era::Conway => 6, + other => bail!("cannot submit {other} era transactions"), + }; + Ok(Self { + id, + body: bytes.to_vec(), + era, + }) + } +} diff --git a/modules/tx_submitter/src/tx_submitter.rs b/modules/tx_submitter/src/tx_submitter.rs new file mode 100644 index 00000000..a8237380 --- /dev/null +++ b/modules/tx_submitter/src/tx_submitter.rs @@ -0,0 +1,98 @@ +mod peer; +mod tx; + +use std::sync::Arc; + +use acropolis_common::{ + commands::transactions::{TransactionsCommand, TransactionsCommandResponse}, + messages::{Command, CommandResponse, Message}, +}; +use anyhow::{Context as _, Result, bail}; +use caryatid_sdk::{Context, Module, module}; +use config::Config; +use futures::stream::{FuturesUnordered, StreamExt}; +use peer::PeerConfig; +use tokio::sync::RwLock; +use tracing::warn; + +use crate::{peer::PeerConnection, tx::Transaction}; + +#[module( + message_type(Message), + name = "tx-submitter", + description = "TX submission module" +)] +pub struct TxSubmitter; + +impl TxSubmitter { + pub async fn init(&self, context: Arc>, config: Arc) -> Result<()> { + let submitter = Arc::new(SubmitterConfig::parse(&config)?); + let peer = PeerConfig::parse(&config)?; + let state = Arc::new(RwLock::new(SubmitterState { + peers: vec![PeerConnection::open(&submitter, peer)], + })); + context.handle(&submitter.subscribe_topic, move |message| { + let state = state.clone(); + async move { + let state = state.read().await; + let res = Self::handle_command(message, &state.peers) + .await + .unwrap_or_else(|e| TransactionsCommandResponse::Error(e.to_string())); + Arc::new(Message::CommandResponse(CommandResponse::Transactions(res))) + } + }); + Ok(()) + } + + async fn handle_command( + message: Arc, + peers: &Vec, + ) -> Result { + let Message::Command(Command::Transactions(TransactionsCommand::Submit { + cbor, + wait_for_ack, + })) = message.as_ref() + else { + bail!("unexpected tx request") + }; + let tx = Arc::new(Transaction::from_bytes(cbor)?); + let mut waiting = FuturesUnordered::new(); + for peer in peers { + let peer_name = peer.name.clone(); + let receiver = peer.queue(tx.clone())?; + waiting.push(async move { + receiver.await.context(format!("could not send tx to {peer_name}")) + }); + } + if !*wait_for_ack { + return Ok(TransactionsCommandResponse::Submitted { id: tx.id }); + } + while let Some(result) = waiting.next().await { + match result { + Ok(()) => return Ok(TransactionsCommandResponse::Submitted { id: tx.id }), + Err(err) => warn!("{err:#}"), + } + } + bail!("could not send tx to any peers"); + } +} + +struct SubmitterConfig { + subscribe_topic: String, + magic: u64, +} +impl SubmitterConfig { + pub fn parse(config: &Config) -> Result { + let subscribe_topic = + config.get_string("subscribe-topic").unwrap_or("cardano.txs.submit".to_string()); + let magic = config.get("magic-number").unwrap_or(764824073); + Ok(Self { + subscribe_topic, + magic, + }) + } +} + +struct SubmitterState { + peers: Vec, +} diff --git a/processes/README.md b/processes/README.md index 42ab8c57..c21ff0c9 100644 --- a/processes/README.md +++ b/processes/README.md @@ -5,3 +5,4 @@ These are the process builds for the Acropolis architecture: * [Omnibus](omnibus/) - All-you-can-eat container for testing * [Replayer](replayer/) - Locally replay previously downloaded selected messages, stored in JSON on disk * [Golden Tests](golden_tests/) - Provides a testing module to execute end to end golden tests +* [TX Submitter CLI](tx_submitter_cli/) - Provides a CLI wrapper for the tx submitter module diff --git a/processes/tx_submitter_cli/Cargo.toml b/processes/tx_submitter_cli/Cargo.toml new file mode 100644 index 00000000..a58ce2b2 --- /dev/null +++ b/processes/tx_submitter_cli/Cargo.toml @@ -0,0 +1,27 @@ +[package] +name = "acropolis_process_tx_submitter_cli" +version = "0.1.0" +edition = "2024" +authors = ["Simon Gellis "] +description = "CLI tool to submit transactions" +license = "Apache-2.0" + +[dependencies] +acropolis_common = { path = "../../common" } +acropolis_module_tx_submitter = { path = "../../modules/tx_submitter" } + +caryatid_sdk = { workspace = true } +caryatid_process = { workspace = true} + +anyhow = { workspace = true } +clap = { workspace = true } +config = { workspace = true } +hex = { workspace = true } +pallas = { workspace = true } +tokio = { workspace = true } +tracing = { workspace = true } +tracing-subscriber = { version = "0.3.20", features = ["registry", "env-filter"] } + +[[bin]] +name = "tx-submitter-cli" +path = "src/main.rs" \ No newline at end of file diff --git a/processes/tx_submitter_cli/README.md b/processes/tx_submitter_cli/README.md new file mode 100644 index 00000000..4c38244a --- /dev/null +++ b/processes/tx_submitter_cli/README.md @@ -0,0 +1,11 @@ +# Acropolis tx-submitter-cli tool + +This process is a CLI wrapper for the [tx_submitter module](../../modules/tx_submitter/). It allows you to submit transactions to upstream peers. + +## How to run it + +```shell +cd processes/tx_submitter_cli +cargo run -- +``` +The `tx-file` arg should be the path to a file containing a raw signed transaction. diff --git a/processes/tx_submitter_cli/src/main.rs b/processes/tx_submitter_cli/src/main.rs new file mode 100644 index 00000000..5a3ee682 --- /dev/null +++ b/processes/tx_submitter_cli/src/main.rs @@ -0,0 +1,125 @@ +use std::{path::PathBuf, sync::Arc}; + +use acropolis_common::{ + commands::transactions::{TransactionsCommand, TransactionsCommandResponse}, + messages::{Command, CommandResponse, Message}, +}; +use acropolis_module_tx_submitter::TxSubmitter; +use anyhow::{Result, bail}; +use caryatid_process::Process; +use caryatid_sdk::{Context, Module, module}; +use clap::Parser; +use config::{Config, File}; +use tokio::{fs, select, sync::mpsc}; +use tracing::info; +use tracing_subscriber::{ + EnvFilter, Layer as _, Registry, filter, fmt, layer::SubscriberExt as _, + util::SubscriberInitExt as _, +}; + +fn default_config_path() -> PathBuf { + PathBuf::from( + option_env!("ACROPOLIS_TX_SUBMITTER_DEFAULT_CONFIG").unwrap_or("tx-submitter.toml"), + ) +} + +#[derive(clap::Parser, Clone)] +struct Args { + /// Path to configuration. + #[arg(long, default_value = default_config_path().into_os_string())] + config: PathBuf, + /// File containing the raw bytes of a transaction. + tx_file: PathBuf, +} + +#[derive(Clone)] +struct CliState { + args: Args, + done: mpsc::Sender>, +} +impl CliState { + pub fn run(self, ctx: Arc>, fut: F) + where + F: FnOnce(Args, Arc>) -> Fut + Send + 'static, + Fut: Future> + Send + 'static, + { + let args = self.args.clone(); + let c = ctx.clone(); + ctx.run(async move { + let result = fut(args, c).await; + let _ = self.done.send(result).await; + }); + } +} + +tokio::task_local!(static CLI: CliState); +async fn run_process(process: Process, args: Args) -> Result<()> { + let (tx, mut rx) = mpsc::channel(1); + let state = CliState { args, done: tx }; + select! { + res = CLI.scope(state, process.run()) => { + res?; + bail!("process terminated") + } + res = rx.recv() => { + match res { + Some(result) => { + info!("process completed"); + result + } + None => bail!("process terminated") + } + } + } +} + +#[tokio::main] +pub async fn main() -> Result<()> { + let args = Args::try_parse()?; + + // Standard logging using RUST_LOG for log levels default to INFO for events only + let fmt_layer = fmt::layer() + .with_filter(EnvFilter::from_default_env().add_directive(filter::LevelFilter::INFO.into())) + .with_filter(filter::filter_fn(|meta| meta.is_event())); + Registry::default().with(fmt_layer).init(); + + let config = Arc::new(Config::builder().add_source(File::from(args.config.as_path())).build()?); + let mut process = Process::::create(config).await; + + TxSubmitter::register(&mut process); + CliDriver::register(&mut process); + + run_process(process, args).await +} + +#[module( + message_type(Message), + name = "cli-driver", + description = "Module to interface with the CLI tool" +)] +struct CliDriver; +impl CliDriver { + pub async fn init(&self, context: Arc>, _config: Arc) -> Result<()> { + let state = CLI.get(); + state.run(context, move |args, context| async move { + let tx = fs::read(args.tx_file).await?; + let request = Arc::new(Message::Command(Command::Transactions( + TransactionsCommand::Submit { + cbor: tx, + wait_for_ack: true, + }, + ))); + let response = context.request("cardano.txs.submit", request).await?; + if let Message::CommandResponse(CommandResponse::Transactions( + TransactionsCommandResponse::Submitted { id }, + )) = response.as_ref() + { + info!("Submitted TX {}", hex::encode(id)); + } else { + info!("{response:?}"); + } + Ok(()) + }); + Ok(()) + } +} diff --git a/processes/tx_submitter_cli/tx-submitter.toml b/processes/tx_submitter_cli/tx-submitter.toml new file mode 100644 index 00000000..1c894c94 --- /dev/null +++ b/processes/tx_submitter_cli/tx-submitter.toml @@ -0,0 +1,18 @@ +[module.tx-submitter] + +[module.cli-driver] + +[message-bus.internal] +class = "in-memory" +workers = 50 +dispatch-queue-size = 1000 +worker-queue-size = 100 +bulk-block-capacity = 50 +bulk-resume-capacity = 75 + +# Message routing +[message-router] +request-timeout = 300 +[[message-router.route]] # Everything is internal only +pattern = "#" +bus = "internal"