diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index acfcc45ea6b..7b2c17e93df 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -177,7 +177,7 @@ jobs: include: - os: ubuntu-24.04 - test-args: --features full,unstable --workspace + test-args: --features full,unstable,future_dmq --workspace # Exclude nodes not officially supported on Windows and macOS (only mithril-client is supported) - os: macos-14 test-args: > @@ -805,6 +805,7 @@ jobs: # the same name (we only want to document those anyway) cargo doc --no-deps --lib -p mithril-stm -p mithril-common \ -p mithril-cardano-node-chain -p mithril-cardano-node-internal-database \ + -p mithril-dmq \ -p mithril-build-script -p mithril-cli-helper -p mithril-doc -p mithril-doc-derive \ -p mithril-era -p mithril-metric -p mithril-persistence -p mithril-resource-pool \ -p mithril-ticker -p mithril-signed-entity-lock -p mithril-signed-entity-preloader \ diff --git a/CHANGELOG.md b/CHANGELOG.md index 516c50366c0..8373e2f867f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -23,6 +23,9 @@ As a minor extension, we have adopted a slightly different versioning convention - Abstracted the implementation of KES signature and verification to allow multiple and reusable implementations. +- **UNSTABLE** : + - Support for DMQ signature publisher in the signer and signature consumer in the aggregator. + - Crates versions: | Crate | Version | diff --git a/Cargo.lock b/Cargo.lock index 09f2d150883..44d0d601941 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3837,7 +3837,17 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c0452a60c1863c1f50b5f77cd295e8d2786849f35883f0b9e18e7e6e1b5691b0" dependencies = [ "half", - "minicbor-derive", + "minicbor-derive 0.15.3", +] + +[[package]] +name = "minicbor" +version = "0.26.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8a309f581ade7597820083bc275075c4c6986e57e53f8d26f88507cfefc8c987" +dependencies = [ + "half", + "minicbor-derive 0.16.2", ] [[package]] @@ -3851,6 +3861,17 @@ dependencies = [ "syn 2.0.104", ] +[[package]] +name = "minicbor-derive" +version = "0.16.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a9882ef5c56df184b8ffc107fc6c61e33ee3a654b021961d790a78571bb9d67a" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.104", +] + [[package]] name = "minicov" version = "0.3.7" @@ -3889,7 +3910,7 @@ dependencies = [ [[package]] name = "mithril-aggregator" -version = "0.7.69" +version = "0.7.70" dependencies = [ "anyhow", "async-trait", @@ -3908,6 +3929,7 @@ dependencies = [ "mithril-cardano-node-internal-database", "mithril-cli-helper", "mithril-common", + "mithril-dmq", "mithril-doc", "mithril-era", "mithril-metric", @@ -3999,9 +4021,9 @@ dependencies = [ "mockall", "nom 8.0.0", "pallas-addresses", - "pallas-codec", - "pallas-crypto", - "pallas-network", + "pallas-codec 0.32.1", + "pallas-crypto 0.32.1", + "pallas-network 0.32.1", "pallas-primitives", "pallas-traverse", "rand_core 0.6.4", @@ -4138,7 +4160,7 @@ dependencies = [ [[package]] name = "mithril-common" -version = "0.6.3" +version = "0.6.4" dependencies = [ "anyhow", "async-trait", @@ -4178,6 +4200,23 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "mithril-dmq" +version = "0.1.1" +dependencies = [ + "anyhow", + "async-trait", + "blake2 0.10.6", + "mithril-cardano-node-chain", + "mithril-common", + "mockall", + "pallas-network 1.0.0-alpha.2", + "slog", + "slog-async", + "slog-term", + "tokio", +] + [[package]] name = "mithril-doc" version = "0.1.24" @@ -4331,7 +4370,7 @@ dependencies = [ [[package]] name = "mithril-signer" -version = "0.2.256" +version = "0.2.257" dependencies = [ "anyhow", "async-trait", @@ -4346,6 +4385,7 @@ dependencies = [ "mithril-cardano-node-internal-database", "mithril-cli-helper", "mithril-common", + "mithril-dmq", "mithril-doc", "mithril-era", "mithril-metric", @@ -4930,8 +4970,8 @@ dependencies = [ "crc", "cryptoxide", "hex", - "pallas-codec", - "pallas-crypto", + "pallas-codec 0.32.1", + "pallas-crypto 0.32.1", "thiserror 1.0.69", ] @@ -4942,7 +4982,18 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e344b3e39ca3bd79bb7547b65b980869c3c377a00c48ece70430f4611c32a18b" dependencies = [ "hex", - "minicbor", + "minicbor 0.25.1", + "serde", + "thiserror 1.0.69", +] + +[[package]] +name = "pallas-codec" +version = "1.0.0-alpha.2" +source = "git+https://github.com/txpipe/pallas.git?branch=main#a97bd93cdc55fa2b061a6ad5fd572f5528a912b8" +dependencies = [ + "hex", + "minicbor 0.26.5", "serde", "thiserror 1.0.69", ] @@ -4955,13 +5006,26 @@ checksum = "59c89ea16190a87a1d8bd36923093740a2b659ed6129f4636329319a70cc4db3" dependencies = [ "cryptoxide", "hex", - "pallas-codec", + "pallas-codec 0.32.1", "rand_core 0.6.4", "serde", "thiserror 1.0.69", "zeroize", ] +[[package]] +name = "pallas-crypto" +version = "1.0.0-alpha.2" +source = "git+https://github.com/txpipe/pallas.git?branch=main#a97bd93cdc55fa2b061a6ad5fd572f5528a912b8" +dependencies = [ + "cryptoxide", + "hex", + "pallas-codec 1.0.0-alpha.2", + "rand_core 0.6.4", + "serde", + "thiserror 1.0.69", +] + [[package]] name = "pallas-network" version = "0.32.1" @@ -4971,8 +5035,25 @@ dependencies = [ "byteorder", "hex", "itertools 0.13.0", - "pallas-codec", - "pallas-crypto", + "pallas-codec 0.32.1", + "pallas-crypto 0.32.1", + "rand 0.8.5", + "socket2", + "thiserror 1.0.69", + "tokio", + "tracing", +] + +[[package]] +name = "pallas-network" +version = "1.0.0-alpha.2" +source = "git+https://github.com/txpipe/pallas.git?branch=main#a97bd93cdc55fa2b061a6ad5fd572f5528a912b8" +dependencies = [ + "byteorder", + "hex", + "itertools 0.13.0", + "pallas-codec 1.0.0-alpha.2", + "pallas-crypto 1.0.0-alpha.2", "rand 0.8.5", "socket2", "thiserror 1.0.69", @@ -4990,8 +5071,8 @@ dependencies = [ "bech32 0.9.1", "hex", "log", - "pallas-codec", - "pallas-crypto", + "pallas-codec 0.32.1", + "pallas-crypto 0.32.1", "serde", "serde_json", ] @@ -5005,8 +5086,8 @@ dependencies = [ "hex", "itertools 0.13.0", "pallas-addresses", - "pallas-codec", - "pallas-crypto", + "pallas-codec 0.32.1", + "pallas-crypto 0.32.1", "pallas-primitives", "paste", "serde", diff --git a/Cargo.toml b/Cargo.toml index fe906161c2a..018a45120c5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,6 +13,7 @@ members = [ "internal/cardano-node/mithril-cardano-node-internal-database", "internal/mithril-build-script", "internal/mithril-cli-helper", + "internal/mithril-dmq", "internal/mithril-doc", "internal/mithril-doc-derive", "internal/mithril-era", diff --git a/Makefile b/Makefile index 05a1a270dd5..3057609c4fc 100644 --- a/Makefile +++ b/Makefile @@ -1,6 +1,7 @@ COMPONENTS = mithril-aggregator mithril-client mithril-client-cli mithril-client-wasm \ mithril-common mithril-relay mithril-signer mithril-stm \ internal/mithril-build-script internal/mithril-cli-helper internal/mithril-doc \ + internal/mithril-dmq \ internal/mithril-doc-derive internal/mithril-era internal/mithril-metric internal/mithril-persistence \ internal/mithril-resource-pool internal/mithril-ticker \ internal/cardano-node/mithril-cardano-node-chain internal/cardano-node/mithril-cardano-node-internal-database \ diff --git a/README.md b/README.md index 95b9b93297e..c03f744d740 100644 --- a/README.md +++ b/README.md @@ -88,6 +88,8 @@ This repository consists of the following parts: - [**Mithril cli helper**](./internal/mithril-cli-helper): **CLI** tools for **Mithril** binaries. + - [**Mithril DMQ node**](./internal/mithril-dmq): mechanisms to publish and consume messages of a **Decentralized Message Queue network** through a DMQ node, used by Mithril network nodes. + - [**Mithril doc**](./internal/mithril-doc): an API that generates Markdown documentation for crate command line arguments. - [**Mithril doc derive**](./internal/mithril-doc-derive): a macro implementation used by **Mithril doc**. diff --git a/docs/website/root/manual/develop/nodes/mithril-aggregator.md b/docs/website/root/manual/develop/nodes/mithril-aggregator.md index a0297073009..574b006bdc6 100644 --- a/docs/website/root/manual/develop/nodes/mithril-aggregator.md +++ b/docs/website/root/manual/develop/nodes/mithril-aggregator.md @@ -495,21 +495,21 @@ The configuration parameters can be set in either of the following ways: Here is a list of the available parameters for the serve command: -| Parameter | Command line (long) | Command line (short) | Environment variable | Description | Default value | Example | Mandatory | -| ------------------------------ | -------------------- | :------------------: | ------------------------------------------------------------------------------------ | ------------------------------------------------------------------------------------ | -------------------------- | ----------------------------------------------------------------------------------------------------------------------- | :----------------: | -| `cardano_cli_path` | - | - | `CARDANO_CLI_PATH` | Cardano CLI tool path | - | `cardano-cli` | :heavy_check_mark: | -| `cardano_node_socket_path` | - | - | `CARDANO_NODE_SOCKET_PATH` | Path of the socket used by the Cardano CLI tool to communicate with the Cardano node | - | `/tmp/cardano.sock` | :heavy_check_mark: | -| `config_directory` | `--config-directory` | - | - | Directory of the configuration file | `./config` | - | - | -| `data_stores_directory` | - | - | `data_stores_directory` | Directory to store aggregator databases | - | `./mithril-aggregator/stores` | :heavy_check_mark: | -| `db_directory` | `--db-directory` | - | `DB_DIRECTORY` | Directory of the **Cardano node** stores | `/db` | - | :heavy_check_mark: | -| `genesis_verification_key` | - | - | `GENESIS_VERIFICATION_KEY` | Genesis verification key | - | - | :heavy_check_mark: | -| `network` | - | - | `NETWORK` | Cardano network | - | `testnet` or `mainnet` or `devnet` | :heavy_check_mark: | -| `network_magic` | - | - | `NETWORK_MAGIC` | Cardano network magic number (for `testnet` and `devnet`) | - | `1097911063` or `42` | - | -| `protocol_parameters` | - | - | `PROTOCOL_PARAMETERS__K`, `PROTOCOL_PARAMETERS__M`, and `PROTOCOL_PARAMETERS__PHI_F` | Mithril protocol parameters | - | `{ k: 5, m: 100, phi_f: 0.65 }` | :heavy_check_mark: | -| `run_mode` | `--run-mode` | `-r` | `RUN_MODE` | Runtime mode | `dev` | - | :heavy_check_mark: | -| `store_retention_limit` | - | - | `STORE_RETENTION_LIMIT` | Maximum number of records in stores. If not set, no limit is set. | - | - | - | -| `custom_origin_tag_white_list` | - | - | `CUSTOM_ORIGIN_TAG_WHITE_LIST` | Custom origin tag of client request added to the whitelist (comma separated list). | `EXPLORER,BENCHMARK,CI,NA` | `EXAMPLE` | - | -| `verbose` | `--verbose` | `-v` | `VERBOSE` | Verbosity level | - | Parsed from the number of occurrences: `-v` for `Warning`, `-vv` for `Info`, `-vvv` for `Debug` and `-vvvv` for `Trace` | :heavy_check_mark: | +| Parameter | Command line (long) | Command line (short) | Environment variable | Description | Default value | Example | Mandatory | +| ------------------------------ | -------------------- | :------------------: | ------------------------------------------------------------------------------------ | ---------------------------------------------------------------------------------- | -------------------------- | ----------------------------------------------------------------------------------------------------------------------- | :----------------: | +| `cardano_cli_path` | - | - | `CARDANO_CLI_PATH` | Cardano CLI tool path | - | `cardano-cli` | :heavy_check_mark: | +| `cardano_node_socket_path` | - | - | `CARDANO_NODE_SOCKET_PATH` | Path of the socket opened by the Cardano node | - | `/ipc/node.socket` | :heavy_check_mark: | +| `config_directory` | `--config-directory` | - | - | Directory of the configuration file | `./config` | - | - | +| `data_stores_directory` | - | - | `data_stores_directory` | Directory to store aggregator databases | - | `./mithril-aggregator/stores` | :heavy_check_mark: | +| `db_directory` | `--db-directory` | - | `DB_DIRECTORY` | Directory of the **Cardano node** stores | `/db` | - | :heavy_check_mark: | +| `genesis_verification_key` | - | - | `GENESIS_VERIFICATION_KEY` | Genesis verification key | - | - | :heavy_check_mark: | +| `network` | - | - | `NETWORK` | Cardano network | - | `testnet` or `mainnet` or `devnet` | :heavy_check_mark: | +| `network_magic` | - | - | `NETWORK_MAGIC` | Cardano network magic number (for `testnet` and `devnet`) | - | `1097911063` or `42` | - | +| `protocol_parameters` | - | - | `PROTOCOL_PARAMETERS__K`, `PROTOCOL_PARAMETERS__M`, and `PROTOCOL_PARAMETERS__PHI_F` | Mithril protocol parameters | - | `{ k: 5, m: 100, phi_f: 0.65 }` | :heavy_check_mark: | +| `run_mode` | `--run-mode` | `-r` | `RUN_MODE` | Runtime mode | `dev` | - | :heavy_check_mark: | +| `store_retention_limit` | - | - | `STORE_RETENTION_LIMIT` | Maximum number of records in stores. If not set, no limit is set. | - | - | - | +| `custom_origin_tag_white_list` | - | - | `CUSTOM_ORIGIN_TAG_WHITE_LIST` | Custom origin tag of client request added to the whitelist (comma separated list). | `EXPLORER,BENCHMARK,CI,NA` | `EXAMPLE` | - | +| `verbose` | `--verbose` | `-v` | `VERBOSE` | Verbosity level | - | Parsed from the number of occurrences: `-v` for `Warning`, `-vv` for `Info`, `-vvv` for `Debug` and `-vvvv` for `Trace` | :heavy_check_mark: | `serve` command: @@ -545,40 +545,40 @@ Here is a list of the available parameters for the serve command: `genesis bootstrap` command: -| Parameter | Command line (long) | Command line (short) | Environment variable | Description | Default value | Example | Mandatory | -| -------------------------- | ------------------- | :------------------: | -------------------------- | ------------------------------------------------------------------------------------ | ------------- | ---------------------------------- | :----------------: | -| `genesis_secret_key` | - | - | `GENESIS_SECRET_KEY` | Genesis secret key, :warning: for test only | - | - | :heavy_check_mark: | -| `data_stores_directory` | - | - | `DATA_STORES_DIRECTORY` | Directory to store aggregator databases | - | `./mithril-aggregator/stores` | :heavy_check_mark: | -| `cardano_node_socket_path` | - | - | `CARDANO_NODE_SOCKET_PATH` | Path of the socket used by the Cardano CLI tool to communicate with the Cardano node | - | `/tmp/cardano.sock` | :heavy_check_mark: | -| `cardano_cli_path` | - | - | `CARDANO_CLI_PATH` | Cardano CLI tool path | - | `cardano-cli` | - | -| `chain_observer_type` | - | - | `CHAIN_OBSERVER_TYPE` | Chain observer type that can be `cardano-cli`, `pallas` or `fake`. | `pallas` | - | :heavy_check_mark: | -| `network` | - | - | `NETWORK` | Cardano network | - | `testnet` or `mainnet` or `devnet` | :heavy_check_mark: | -| `network_magic` | - | - | `NETWORK_MAGIC` | Cardano network magic number (for `testnet` and `devnet`) | - | `1097911063` or `42` | - | +| Parameter | Command line (long) | Command line (short) | Environment variable | Description | Default value | Example | Mandatory | +| -------------------------- | ------------------- | :------------------: | -------------------------- | ------------------------------------------------------------------ | ------------- | ---------------------------------- | :----------------: | +| `genesis_secret_key` | - | - | `GENESIS_SECRET_KEY` | Genesis secret key, :warning: for test only | - | - | :heavy_check_mark: | +| `data_stores_directory` | - | - | `DATA_STORES_DIRECTORY` | Directory to store aggregator databases | - | `./mithril-aggregator/stores` | :heavy_check_mark: | +| `cardano_node_socket_path` | - | - | `CARDANO_NODE_SOCKET_PATH` | Path of the socket opened by the Cardano node | - | `/ipc/node.socket` | :heavy_check_mark: | +| `cardano_cli_path` | - | - | `CARDANO_CLI_PATH` | Cardano CLI tool path | - | `cardano-cli` | - | +| `chain_observer_type` | - | - | `CHAIN_OBSERVER_TYPE` | Chain observer type that can be `cardano-cli`, `pallas` or `fake`. | `pallas` | - | :heavy_check_mark: | +| `network` | - | - | `NETWORK` | Cardano network | - | `testnet` or `mainnet` or `devnet` | :heavy_check_mark: | +| `network_magic` | - | - | `NETWORK_MAGIC` | Cardano network magic number (for `testnet` and `devnet`) | - | `1097911063` or `42` | - | `genesis export` command: -| Parameter | Command line (long) | Command line (short) | Environment variable | Description | Default value | Example | Mandatory | -| -------------------------- | ------------------- | :------------------: | -------------------------- | ------------------------------------------------------------------------------------ | ------------- | ---------------------------------- | :----------------: | -| `target_path` | `--target-path` | - | - | Path of the file to export the payload to. | - | - | :heavy_check_mark: | -| `data_stores_directory` | - | - | `DATA_STORES_DIRECTORY` | Directory to store aggregator databases | - | `./mithril-aggregator/stores` | :heavy_check_mark: | -| `cardano_node_socket_path` | - | - | `CARDANO_NODE_SOCKET_PATH` | Path of the socket used by the Cardano CLI tool to communicate with the Cardano node | - | `/tmp/cardano.sock` | :heavy_check_mark: | -| `cardano_cli_path` | - | - | `CARDANO_CLI_PATH` | Cardano CLI tool path | - | `cardano-cli` | - | -| `chain_observer_type` | - | - | `CHAIN_OBSERVER_TYPE` | Chain observer type that can be `cardano-cli`, `pallas` or `fake`. | `pallas` | - | :heavy_check_mark: | -| `network` | - | - | `NETWORK` | Cardano network | - | `testnet` or `mainnet` or `devnet` | :heavy_check_mark: | -| `network_magic` | - | - | `NETWORK_MAGIC` | Cardano network magic number (for `testnet` and `devnet`) | - | `1097911063` or `42` | - | +| Parameter | Command line (long) | Command line (short) | Environment variable | Description | Default value | Example | Mandatory | +| -------------------------- | ------------------- | :------------------: | -------------------------- | ------------------------------------------------------------------ | ------------- | ---------------------------------- | :----------------: | +| `target_path` | `--target-path` | - | - | Path of the file to export the payload to. | - | - | :heavy_check_mark: | +| `data_stores_directory` | - | - | `DATA_STORES_DIRECTORY` | Directory to store aggregator databases | - | `./mithril-aggregator/stores` | :heavy_check_mark: | +| `cardano_node_socket_path` | - | - | `CARDANO_NODE_SOCKET_PATH` | Path of the socket opened by the Cardano node | - | `/ipc/node.socket` | :heavy_check_mark: | +| `cardano_cli_path` | - | - | `CARDANO_CLI_PATH` | Cardano CLI tool path | - | `cardano-cli` | - | +| `chain_observer_type` | - | - | `CHAIN_OBSERVER_TYPE` | Chain observer type that can be `cardano-cli`, `pallas` or `fake`. | `pallas` | - | :heavy_check_mark: | +| `network` | - | - | `NETWORK` | Cardano network | - | `testnet` or `mainnet` or `devnet` | :heavy_check_mark: | +| `network_magic` | - | - | `NETWORK_MAGIC` | Cardano network magic number (for `testnet` and `devnet`) | - | `1097911063` or `42` | - | `genesis import` command: -| Parameter | Command line (long) | Command line (short) | Environment variable | Description | Default value | Example | Mandatory | -| -------------------------- | ---------------------------- | :------------------: | -------------------------- | ------------------------------------------------------------------------------------ | ------------- | ---------------------------------- | :----------------: | -| `signed_payload_path` | `--signed-payload-path` | - | - | Path of the payload to import. | - | - | :heavy_check_mark: | -| `genesis_verification_key` | `--genesis-verification-key` | - | - | Genesis verification key | - | - | :heavy_check_mark: | -| `data_stores_directory` | - | - | `DATA_STORES_DIRECTORY` | Directory to store aggregator databases | - | `./mithril-aggregator/stores` | :heavy_check_mark: | -| `cardano_node_socket_path` | - | - | `CARDANO_NODE_SOCKET_PATH` | Path of the socket used by the Cardano CLI tool to communicate with the Cardano node | - | `/tmp/cardano.sock` | :heavy_check_mark: | -| `cardano_cli_path` | - | - | `CARDANO_CLI_PATH` | Cardano CLI tool path | - | `cardano-cli` | - | -| `chain_observer_type` | - | - | `CHAIN_OBSERVER_TYPE` | Chain observer type that can be `cardano-cli`, `pallas` or `fake`. | `pallas` | - | :heavy_check_mark: | -| `network` | - | - | `NETWORK` | Cardano network | - | `testnet` or `mainnet` or `devnet` | :heavy_check_mark: | -| `network_magic` | - | - | `NETWORK_MAGIC` | Cardano network magic number (for `testnet` and `devnet`) | - | `1097911063` or `42` | - | +| Parameter | Command line (long) | Command line (short) | Environment variable | Description | Default value | Example | Mandatory | +| -------------------------- | ---------------------------- | :------------------: | -------------------------- | ------------------------------------------------------------------ | ------------- | ---------------------------------- | :----------------: | +| `signed_payload_path` | `--signed-payload-path` | - | - | Path of the payload to import. | - | - | :heavy_check_mark: | +| `genesis_verification_key` | `--genesis-verification-key` | - | - | Genesis verification key | - | - | :heavy_check_mark: | +| `data_stores_directory` | - | - | `DATA_STORES_DIRECTORY` | Directory to store aggregator databases | - | `./mithril-aggregator/stores` | :heavy_check_mark: | +| `cardano_node_socket_path` | - | - | `CARDANO_NODE_SOCKET_PATH` | Path of the socket opened by the Cardano node | - | `/ipc/node.socket` | :heavy_check_mark: | +| `cardano_cli_path` | - | - | `CARDANO_CLI_PATH` | Cardano CLI tool path | - | `cardano-cli` | - | +| `chain_observer_type` | - | - | `CHAIN_OBSERVER_TYPE` | Chain observer type that can be `cardano-cli`, `pallas` or `fake`. | `pallas` | - | :heavy_check_mark: | +| `network` | - | - | `NETWORK` | Cardano network | - | `testnet` or `mainnet` or `devnet` | :heavy_check_mark: | +| `network_magic` | - | - | `NETWORK_MAGIC` | Cardano network magic number (for `testnet` and `devnet`) | - | `1097911063` or `42` | - | `genesis sign` command: diff --git a/docs/website/root/manual/develop/nodes/mithril-signer.md b/docs/website/root/manual/develop/nodes/mithril-signer.md index 1d2fcc15922..df6b0a08e15 100644 --- a/docs/website/root/manual/develop/nodes/mithril-signer.md +++ b/docs/website/root/manual/develop/nodes/mithril-signer.md @@ -236,7 +236,7 @@ Here is a list of the available parameters: | `verbose` | `--verbose` | `-v` | `VERBOSE` | Verbosity level | - | Parsed from the number of occurrences: `-v` for `Warning`, `-vv` for `Info`, `-vvv` for `Debug` and `-vvvv` for `Trace` | :heavy_check_mark: | | `run_mode` | `--run-mode` | `-r` | `RUN_MODE` | Runtime mode | `dev` | - | :heavy_check_mark: | | `cardano_cli_path` | - | - | `CARDANO_CLI_PATH` | Cardano CLI tool path | - | `cardano-cli` | :heavy_check_mark: | -| `cardano_node_socket_path` | - | - | `CARDANO_NODE_SOCKET_PATH` | Path of the socket used by the Cardano CLI tool to communicate with the Cardano node | - | `/tmp/cardano.sock` | :heavy_check_mark: | +| `cardano_node_socket_path` | - | - | `CARDANO_NODE_SOCKET_PATH` | Path of the socket opened by the Cardano node | - | `/ipc/node.socket` | :heavy_check_mark: | | `db_directory` | `--db-directory` | - | `DB_DIRECTORY` | Directory to snapshot from the **Cardano node** | `/db` | - | :heavy_check_mark: | | `network` | - | - | `NETWORK` | Cardano network | - | `testnet` or `mainnet` or `devnet` | :heavy_check_mark: | | `network_magic` | - | - | `NETWORK_MAGIC` | Cardano network magic number (for `testnet` and `devnet`) | - | `1097911063` or `42` | - | diff --git a/docs/website/versioned_docs/version-maintained/manual/develop/nodes/mithril-aggregator.md b/docs/website/versioned_docs/version-maintained/manual/develop/nodes/mithril-aggregator.md index a0297073009..574b006bdc6 100644 --- a/docs/website/versioned_docs/version-maintained/manual/develop/nodes/mithril-aggregator.md +++ b/docs/website/versioned_docs/version-maintained/manual/develop/nodes/mithril-aggregator.md @@ -495,21 +495,21 @@ The configuration parameters can be set in either of the following ways: Here is a list of the available parameters for the serve command: -| Parameter | Command line (long) | Command line (short) | Environment variable | Description | Default value | Example | Mandatory | -| ------------------------------ | -------------------- | :------------------: | ------------------------------------------------------------------------------------ | ------------------------------------------------------------------------------------ | -------------------------- | ----------------------------------------------------------------------------------------------------------------------- | :----------------: | -| `cardano_cli_path` | - | - | `CARDANO_CLI_PATH` | Cardano CLI tool path | - | `cardano-cli` | :heavy_check_mark: | -| `cardano_node_socket_path` | - | - | `CARDANO_NODE_SOCKET_PATH` | Path of the socket used by the Cardano CLI tool to communicate with the Cardano node | - | `/tmp/cardano.sock` | :heavy_check_mark: | -| `config_directory` | `--config-directory` | - | - | Directory of the configuration file | `./config` | - | - | -| `data_stores_directory` | - | - | `data_stores_directory` | Directory to store aggregator databases | - | `./mithril-aggregator/stores` | :heavy_check_mark: | -| `db_directory` | `--db-directory` | - | `DB_DIRECTORY` | Directory of the **Cardano node** stores | `/db` | - | :heavy_check_mark: | -| `genesis_verification_key` | - | - | `GENESIS_VERIFICATION_KEY` | Genesis verification key | - | - | :heavy_check_mark: | -| `network` | - | - | `NETWORK` | Cardano network | - | `testnet` or `mainnet` or `devnet` | :heavy_check_mark: | -| `network_magic` | - | - | `NETWORK_MAGIC` | Cardano network magic number (for `testnet` and `devnet`) | - | `1097911063` or `42` | - | -| `protocol_parameters` | - | - | `PROTOCOL_PARAMETERS__K`, `PROTOCOL_PARAMETERS__M`, and `PROTOCOL_PARAMETERS__PHI_F` | Mithril protocol parameters | - | `{ k: 5, m: 100, phi_f: 0.65 }` | :heavy_check_mark: | -| `run_mode` | `--run-mode` | `-r` | `RUN_MODE` | Runtime mode | `dev` | - | :heavy_check_mark: | -| `store_retention_limit` | - | - | `STORE_RETENTION_LIMIT` | Maximum number of records in stores. If not set, no limit is set. | - | - | - | -| `custom_origin_tag_white_list` | - | - | `CUSTOM_ORIGIN_TAG_WHITE_LIST` | Custom origin tag of client request added to the whitelist (comma separated list). | `EXPLORER,BENCHMARK,CI,NA` | `EXAMPLE` | - | -| `verbose` | `--verbose` | `-v` | `VERBOSE` | Verbosity level | - | Parsed from the number of occurrences: `-v` for `Warning`, `-vv` for `Info`, `-vvv` for `Debug` and `-vvvv` for `Trace` | :heavy_check_mark: | +| Parameter | Command line (long) | Command line (short) | Environment variable | Description | Default value | Example | Mandatory | +| ------------------------------ | -------------------- | :------------------: | ------------------------------------------------------------------------------------ | ---------------------------------------------------------------------------------- | -------------------------- | ----------------------------------------------------------------------------------------------------------------------- | :----------------: | +| `cardano_cli_path` | - | - | `CARDANO_CLI_PATH` | Cardano CLI tool path | - | `cardano-cli` | :heavy_check_mark: | +| `cardano_node_socket_path` | - | - | `CARDANO_NODE_SOCKET_PATH` | Path of the socket opened by the Cardano node | - | `/ipc/node.socket` | :heavy_check_mark: | +| `config_directory` | `--config-directory` | - | - | Directory of the configuration file | `./config` | - | - | +| `data_stores_directory` | - | - | `data_stores_directory` | Directory to store aggregator databases | - | `./mithril-aggregator/stores` | :heavy_check_mark: | +| `db_directory` | `--db-directory` | - | `DB_DIRECTORY` | Directory of the **Cardano node** stores | `/db` | - | :heavy_check_mark: | +| `genesis_verification_key` | - | - | `GENESIS_VERIFICATION_KEY` | Genesis verification key | - | - | :heavy_check_mark: | +| `network` | - | - | `NETWORK` | Cardano network | - | `testnet` or `mainnet` or `devnet` | :heavy_check_mark: | +| `network_magic` | - | - | `NETWORK_MAGIC` | Cardano network magic number (for `testnet` and `devnet`) | - | `1097911063` or `42` | - | +| `protocol_parameters` | - | - | `PROTOCOL_PARAMETERS__K`, `PROTOCOL_PARAMETERS__M`, and `PROTOCOL_PARAMETERS__PHI_F` | Mithril protocol parameters | - | `{ k: 5, m: 100, phi_f: 0.65 }` | :heavy_check_mark: | +| `run_mode` | `--run-mode` | `-r` | `RUN_MODE` | Runtime mode | `dev` | - | :heavy_check_mark: | +| `store_retention_limit` | - | - | `STORE_RETENTION_LIMIT` | Maximum number of records in stores. If not set, no limit is set. | - | - | - | +| `custom_origin_tag_white_list` | - | - | `CUSTOM_ORIGIN_TAG_WHITE_LIST` | Custom origin tag of client request added to the whitelist (comma separated list). | `EXPLORER,BENCHMARK,CI,NA` | `EXAMPLE` | - | +| `verbose` | `--verbose` | `-v` | `VERBOSE` | Verbosity level | - | Parsed from the number of occurrences: `-v` for `Warning`, `-vv` for `Info`, `-vvv` for `Debug` and `-vvvv` for `Trace` | :heavy_check_mark: | `serve` command: @@ -545,40 +545,40 @@ Here is a list of the available parameters for the serve command: `genesis bootstrap` command: -| Parameter | Command line (long) | Command line (short) | Environment variable | Description | Default value | Example | Mandatory | -| -------------------------- | ------------------- | :------------------: | -------------------------- | ------------------------------------------------------------------------------------ | ------------- | ---------------------------------- | :----------------: | -| `genesis_secret_key` | - | - | `GENESIS_SECRET_KEY` | Genesis secret key, :warning: for test only | - | - | :heavy_check_mark: | -| `data_stores_directory` | - | - | `DATA_STORES_DIRECTORY` | Directory to store aggregator databases | - | `./mithril-aggregator/stores` | :heavy_check_mark: | -| `cardano_node_socket_path` | - | - | `CARDANO_NODE_SOCKET_PATH` | Path of the socket used by the Cardano CLI tool to communicate with the Cardano node | - | `/tmp/cardano.sock` | :heavy_check_mark: | -| `cardano_cli_path` | - | - | `CARDANO_CLI_PATH` | Cardano CLI tool path | - | `cardano-cli` | - | -| `chain_observer_type` | - | - | `CHAIN_OBSERVER_TYPE` | Chain observer type that can be `cardano-cli`, `pallas` or `fake`. | `pallas` | - | :heavy_check_mark: | -| `network` | - | - | `NETWORK` | Cardano network | - | `testnet` or `mainnet` or `devnet` | :heavy_check_mark: | -| `network_magic` | - | - | `NETWORK_MAGIC` | Cardano network magic number (for `testnet` and `devnet`) | - | `1097911063` or `42` | - | +| Parameter | Command line (long) | Command line (short) | Environment variable | Description | Default value | Example | Mandatory | +| -------------------------- | ------------------- | :------------------: | -------------------------- | ------------------------------------------------------------------ | ------------- | ---------------------------------- | :----------------: | +| `genesis_secret_key` | - | - | `GENESIS_SECRET_KEY` | Genesis secret key, :warning: for test only | - | - | :heavy_check_mark: | +| `data_stores_directory` | - | - | `DATA_STORES_DIRECTORY` | Directory to store aggregator databases | - | `./mithril-aggregator/stores` | :heavy_check_mark: | +| `cardano_node_socket_path` | - | - | `CARDANO_NODE_SOCKET_PATH` | Path of the socket opened by the Cardano node | - | `/ipc/node.socket` | :heavy_check_mark: | +| `cardano_cli_path` | - | - | `CARDANO_CLI_PATH` | Cardano CLI tool path | - | `cardano-cli` | - | +| `chain_observer_type` | - | - | `CHAIN_OBSERVER_TYPE` | Chain observer type that can be `cardano-cli`, `pallas` or `fake`. | `pallas` | - | :heavy_check_mark: | +| `network` | - | - | `NETWORK` | Cardano network | - | `testnet` or `mainnet` or `devnet` | :heavy_check_mark: | +| `network_magic` | - | - | `NETWORK_MAGIC` | Cardano network magic number (for `testnet` and `devnet`) | - | `1097911063` or `42` | - | `genesis export` command: -| Parameter | Command line (long) | Command line (short) | Environment variable | Description | Default value | Example | Mandatory | -| -------------------------- | ------------------- | :------------------: | -------------------------- | ------------------------------------------------------------------------------------ | ------------- | ---------------------------------- | :----------------: | -| `target_path` | `--target-path` | - | - | Path of the file to export the payload to. | - | - | :heavy_check_mark: | -| `data_stores_directory` | - | - | `DATA_STORES_DIRECTORY` | Directory to store aggregator databases | - | `./mithril-aggregator/stores` | :heavy_check_mark: | -| `cardano_node_socket_path` | - | - | `CARDANO_NODE_SOCKET_PATH` | Path of the socket used by the Cardano CLI tool to communicate with the Cardano node | - | `/tmp/cardano.sock` | :heavy_check_mark: | -| `cardano_cli_path` | - | - | `CARDANO_CLI_PATH` | Cardano CLI tool path | - | `cardano-cli` | - | -| `chain_observer_type` | - | - | `CHAIN_OBSERVER_TYPE` | Chain observer type that can be `cardano-cli`, `pallas` or `fake`. | `pallas` | - | :heavy_check_mark: | -| `network` | - | - | `NETWORK` | Cardano network | - | `testnet` or `mainnet` or `devnet` | :heavy_check_mark: | -| `network_magic` | - | - | `NETWORK_MAGIC` | Cardano network magic number (for `testnet` and `devnet`) | - | `1097911063` or `42` | - | +| Parameter | Command line (long) | Command line (short) | Environment variable | Description | Default value | Example | Mandatory | +| -------------------------- | ------------------- | :------------------: | -------------------------- | ------------------------------------------------------------------ | ------------- | ---------------------------------- | :----------------: | +| `target_path` | `--target-path` | - | - | Path of the file to export the payload to. | - | - | :heavy_check_mark: | +| `data_stores_directory` | - | - | `DATA_STORES_DIRECTORY` | Directory to store aggregator databases | - | `./mithril-aggregator/stores` | :heavy_check_mark: | +| `cardano_node_socket_path` | - | - | `CARDANO_NODE_SOCKET_PATH` | Path of the socket opened by the Cardano node | - | `/ipc/node.socket` | :heavy_check_mark: | +| `cardano_cli_path` | - | - | `CARDANO_CLI_PATH` | Cardano CLI tool path | - | `cardano-cli` | - | +| `chain_observer_type` | - | - | `CHAIN_OBSERVER_TYPE` | Chain observer type that can be `cardano-cli`, `pallas` or `fake`. | `pallas` | - | :heavy_check_mark: | +| `network` | - | - | `NETWORK` | Cardano network | - | `testnet` or `mainnet` or `devnet` | :heavy_check_mark: | +| `network_magic` | - | - | `NETWORK_MAGIC` | Cardano network magic number (for `testnet` and `devnet`) | - | `1097911063` or `42` | - | `genesis import` command: -| Parameter | Command line (long) | Command line (short) | Environment variable | Description | Default value | Example | Mandatory | -| -------------------------- | ---------------------------- | :------------------: | -------------------------- | ------------------------------------------------------------------------------------ | ------------- | ---------------------------------- | :----------------: | -| `signed_payload_path` | `--signed-payload-path` | - | - | Path of the payload to import. | - | - | :heavy_check_mark: | -| `genesis_verification_key` | `--genesis-verification-key` | - | - | Genesis verification key | - | - | :heavy_check_mark: | -| `data_stores_directory` | - | - | `DATA_STORES_DIRECTORY` | Directory to store aggregator databases | - | `./mithril-aggregator/stores` | :heavy_check_mark: | -| `cardano_node_socket_path` | - | - | `CARDANO_NODE_SOCKET_PATH` | Path of the socket used by the Cardano CLI tool to communicate with the Cardano node | - | `/tmp/cardano.sock` | :heavy_check_mark: | -| `cardano_cli_path` | - | - | `CARDANO_CLI_PATH` | Cardano CLI tool path | - | `cardano-cli` | - | -| `chain_observer_type` | - | - | `CHAIN_OBSERVER_TYPE` | Chain observer type that can be `cardano-cli`, `pallas` or `fake`. | `pallas` | - | :heavy_check_mark: | -| `network` | - | - | `NETWORK` | Cardano network | - | `testnet` or `mainnet` or `devnet` | :heavy_check_mark: | -| `network_magic` | - | - | `NETWORK_MAGIC` | Cardano network magic number (for `testnet` and `devnet`) | - | `1097911063` or `42` | - | +| Parameter | Command line (long) | Command line (short) | Environment variable | Description | Default value | Example | Mandatory | +| -------------------------- | ---------------------------- | :------------------: | -------------------------- | ------------------------------------------------------------------ | ------------- | ---------------------------------- | :----------------: | +| `signed_payload_path` | `--signed-payload-path` | - | - | Path of the payload to import. | - | - | :heavy_check_mark: | +| `genesis_verification_key` | `--genesis-verification-key` | - | - | Genesis verification key | - | - | :heavy_check_mark: | +| `data_stores_directory` | - | - | `DATA_STORES_DIRECTORY` | Directory to store aggregator databases | - | `./mithril-aggregator/stores` | :heavy_check_mark: | +| `cardano_node_socket_path` | - | - | `CARDANO_NODE_SOCKET_PATH` | Path of the socket opened by the Cardano node | - | `/ipc/node.socket` | :heavy_check_mark: | +| `cardano_cli_path` | - | - | `CARDANO_CLI_PATH` | Cardano CLI tool path | - | `cardano-cli` | - | +| `chain_observer_type` | - | - | `CHAIN_OBSERVER_TYPE` | Chain observer type that can be `cardano-cli`, `pallas` or `fake`. | `pallas` | - | :heavy_check_mark: | +| `network` | - | - | `NETWORK` | Cardano network | - | `testnet` or `mainnet` or `devnet` | :heavy_check_mark: | +| `network_magic` | - | - | `NETWORK_MAGIC` | Cardano network magic number (for `testnet` and `devnet`) | - | `1097911063` or `42` | - | `genesis sign` command: diff --git a/docs/website/versioned_docs/version-maintained/manual/develop/nodes/mithril-signer.md b/docs/website/versioned_docs/version-maintained/manual/develop/nodes/mithril-signer.md index 1d2fcc15922..df6b0a08e15 100644 --- a/docs/website/versioned_docs/version-maintained/manual/develop/nodes/mithril-signer.md +++ b/docs/website/versioned_docs/version-maintained/manual/develop/nodes/mithril-signer.md @@ -236,7 +236,7 @@ Here is a list of the available parameters: | `verbose` | `--verbose` | `-v` | `VERBOSE` | Verbosity level | - | Parsed from the number of occurrences: `-v` for `Warning`, `-vv` for `Info`, `-vvv` for `Debug` and `-vvvv` for `Trace` | :heavy_check_mark: | | `run_mode` | `--run-mode` | `-r` | `RUN_MODE` | Runtime mode | `dev` | - | :heavy_check_mark: | | `cardano_cli_path` | - | - | `CARDANO_CLI_PATH` | Cardano CLI tool path | - | `cardano-cli` | :heavy_check_mark: | -| `cardano_node_socket_path` | - | - | `CARDANO_NODE_SOCKET_PATH` | Path of the socket used by the Cardano CLI tool to communicate with the Cardano node | - | `/tmp/cardano.sock` | :heavy_check_mark: | +| `cardano_node_socket_path` | - | - | `CARDANO_NODE_SOCKET_PATH` | Path of the socket opened by the Cardano node | - | `/ipc/node.socket` | :heavy_check_mark: | | `db_directory` | `--db-directory` | - | `DB_DIRECTORY` | Directory to snapshot from the **Cardano node** | `/db` | - | :heavy_check_mark: | | `network` | - | - | `NETWORK` | Cardano network | - | `testnet` or `mainnet` or `devnet` | :heavy_check_mark: | | `network_magic` | - | - | `NETWORK_MAGIC` | Cardano network magic number (for `testnet` and `devnet`) | - | `1097911063` or `42` | - | diff --git a/internal/mithril-dmq/Cargo.toml b/internal/mithril-dmq/Cargo.toml new file mode 100644 index 00000000000..493a0ae77d2 --- /dev/null +++ b/internal/mithril-dmq/Cargo.toml @@ -0,0 +1,30 @@ +[package] +name = "mithril-dmq" +description = "Mechanisms to publish and consume messages of a 'Decentralized Message Queue network' through a DMQ node" +version = "0.1.1" +authors.workspace = true +documentation.workspace = true +edition.workspace = true +homepage.workspace = true +license.workspace = true +repository.workspace = true +include = ["**/*.rs", "Cargo.toml", "README.md", ".gitignore"] + +[lib] +crate-type = ["lib", "cdylib", "staticlib"] + +[dependencies] +anyhow = { workspace = true } +async-trait = { workspace = true } +blake2 = "0.10.6" +mithril-cardano-node-chain = { path = "../cardano-node/mithril-cardano-node-chain" } +mithril-common = { path = "../../mithril-common" } +pallas-network = { git = "https://github.com/txpipe/pallas.git", branch = "main" } +slog = { workspace = true } +tokio = { workspace = true, features = ["sync"] } + +[dev-dependencies] +mithril-common = { path = "../../mithril-common", features = ["test_tools"] } +mockall = { workspace = true } +slog-async = { workspace = true } +slog-term = { workspace = true } diff --git a/internal/mithril-dmq/Makefile b/internal/mithril-dmq/Makefile new file mode 100644 index 00000000000..e503264d97d --- /dev/null +++ b/internal/mithril-dmq/Makefile @@ -0,0 +1,19 @@ +.PHONY: all build test check doc + +CARGO = cargo + +all: test build + +build: + ${CARGO} build --release + +test: + ${CARGO} test + +check: + ${CARGO} check --release --all-features --all-targets + ${CARGO} clippy --release --all-features --all-targets + ${CARGO} fmt --check + +doc: + ${CARGO} doc --no-deps --open \ No newline at end of file diff --git a/internal/mithril-dmq/README.md b/internal/mithril-dmq/README.md new file mode 100644 index 00000000000..fdeb4df373e --- /dev/null +++ b/internal/mithril-dmq/README.md @@ -0,0 +1,5 @@ +# Mithril-dmq-node + +This crate provides mechanisms to publish and consume messages of a **Decentralized Message Queue network** through a DMQ node. + +The full protocol is defined in the [CIP-0137](https://github.com/cardano-foundation/CIPs/tree/master/CIP-0137#local-message-notification-mini-protocol). diff --git a/internal/mithril-dmq/src/consumer/interface.rs b/internal/mithril-dmq/src/consumer/interface.rs new file mode 100644 index 00000000000..afad017a47b --- /dev/null +++ b/internal/mithril-dmq/src/consumer/interface.rs @@ -0,0 +1,11 @@ +use std::fmt::Debug; + +use mithril_common::{crypto_helper::TryFromBytes, entities::PartyId, StdResult}; + +/// Trait for consuming messages from a DMQ node. +#[cfg_attr(test, mockall::automock)] +#[async_trait::async_trait] +pub trait DmqConsumer: Send + Sync { + /// Consume messages from the DMQ node. + async fn consume_messages(&self) -> StdResult>; +} diff --git a/internal/mithril-dmq/src/consumer/mod.rs b/internal/mithril-dmq/src/consumer/mod.rs new file mode 100644 index 00000000000..4035f6c0659 --- /dev/null +++ b/internal/mithril-dmq/src/consumer/mod.rs @@ -0,0 +1,5 @@ +mod interface; +mod pallas; + +pub use interface::*; +pub use pallas::*; diff --git a/internal/mithril-dmq/src/consumer/pallas.rs b/internal/mithril-dmq/src/consumer/pallas.rs new file mode 100644 index 00000000000..c6438797a1c --- /dev/null +++ b/internal/mithril-dmq/src/consumer/pallas.rs @@ -0,0 +1,343 @@ +use std::{fmt::Debug, marker::PhantomData, path::PathBuf}; + +use anyhow::{anyhow, Context}; +use pallas_network::facades::DmqClient; +use slog::{debug, error, Logger}; +use tokio::sync::{Mutex, MutexGuard}; + +use mithril_common::{ + crypto_helper::{OpCert, TryFromBytes}, + entities::PartyId, + logging::LoggerExtensions, + CardanoNetwork, StdResult, +}; + +use crate::DmqConsumer; + +/// A DMQ consumer implementation. +/// +/// This implementation is built upon the n2c mini-protocols DMQ implementation in Pallas. +pub struct DmqConsumerPallas { + socket: PathBuf, + network: CardanoNetwork, + client: Mutex>, + logger: Logger, + phantom: PhantomData, +} + +impl DmqConsumerPallas { + /// Creates a new `DmqConsumerPallas` instance. + pub fn new(socket: PathBuf, network: CardanoNetwork, logger: Logger) -> Self { + Self { + socket, + network, + client: Mutex::new(None), + logger: logger.new_with_component_name::(), + phantom: PhantomData, + } + } + + /// Creates and returns a new `DmqClient` connected to the specified socket. + async fn new_client(&self) -> StdResult { + debug!( + self.logger, + "Create new DMQ client"; + "socket" => ?self.socket, + "network" => ?self.network + ); + DmqClient::connect(&self.socket, self.network.code()) + .await + .with_context(|| "DmqConsumerPallas failed to create a new client") + } + + /// Gets the cached `DmqClient`, creating a new one if it does not exist. + async fn get_client(&self) -> StdResult>> { + { + // Run this in a separate block to avoid dead lock on the Mutex + let client_lock = self.client.lock().await; + if client_lock.as_ref().is_some() { + return Ok(client_lock); + } + } + + let mut client_lock = self.client.lock().await; + *client_lock = Some(self.new_client().await?); + + Ok(client_lock) + } + + /// Drops the current `DmqClient`, if it exists. + async fn drop_client(&self) -> StdResult<()> { + debug!( + self.logger, + "Drop existing DMQ client"; + "socket" => ?self.socket, + "network" => ?self.network + ); + let mut client_lock = self.client.lock().await; + if let Some(client) = client_lock.take() { + client.abort().await; + } + + Ok(()) + } + + #[cfg(test)] + /// Check if the client already exists (test only). + async fn has_client(&self) -> bool { + let client_lock = self.client.lock().await; + + client_lock.as_ref().is_some() + } + + async fn consume_messages_internal(&self) -> StdResult> { + debug!(self.logger, "Waiting for messages from DMQ..."); + let mut client_guard = self.get_client().await?; + let client = client_guard + .as_mut() + .ok_or(anyhow!("DMQ client does not exist"))?; + client + .msg_notification() + .send_request_messages_blocking() + .await + .with_context(|| "Failed to request notifications from DMQ server: {}")?; + + let reply = client + .msg_notification() + .recv_next_reply() + .await + .with_context(|| "Failed to receive notifications from DMQ server")?; + debug!(self.logger, "Received single signatures from DMQ"; "messages" => ?reply); + if let Err(e) = client.msg_notification().send_done().await { + error!(self.logger, "Failed to send Done"; "error" => ?e); + } + + reply + .0 + .into_iter() + .map(|dmq_message| { + let opcert = OpCert::try_from_bytes(&dmq_message.operational_certificate) + .with_context(|| "Failed to parse operational certificate")?; + let party_id = opcert.compute_protocol_party_id()?; + let payload = M::try_from_bytes(&dmq_message.msg_body) + .with_context(|| "Failed to parse DMQ message body")?; + + Ok((payload, party_id)) + }) + .collect::>>() + .with_context(|| "Failed to parse DMQ messages") + } +} + +#[async_trait::async_trait] +impl DmqConsumer for DmqConsumerPallas { + async fn consume_messages(&self) -> StdResult> { + let messages = self.consume_messages_internal().await; + if messages.is_err() { + self.drop_client().await?; + } + + messages + } +} + +#[cfg(all(test, unix))] +mod tests { + + use std::{fs, future, time::Duration, vec}; + + use mithril_common::{crypto_helper::TryToBytes, current_function, test_utils::TempDir}; + use pallas_network::{ + facades::DmqServer, + miniprotocols::{localmsgnotification, localmsgsubmission::DmqMsg}, + }; + use tokio::{net::UnixListener, task::JoinHandle, time::sleep}; + + use crate::{test::payload::DmqMessageTestPayload, test_tools::TestLogger}; + + use super::*; + + fn create_temp_dir(folder_name: &str) -> PathBuf { + TempDir::create_with_short_path("dmq_consumer", folder_name) + } + + fn fake_msgs() -> Vec { + vec![ + DmqMsg { + msg_id: vec![0, 1], + msg_body: DmqMessageTestPayload::new(b"msg_1").to_bytes_vec().unwrap(), + block_number: 10, + ttl: 100, + kes_signature: vec![0, 1, 2, 3], + operational_certificate: vec![ + 130, 132, 88, 32, 230, 80, 215, 83, 21, 9, 187, 108, 255, 215, 153, 140, 40, + 198, 142, 78, 200, 250, 98, 26, 9, 82, 32, 110, 161, 30, 176, 63, 205, 125, + 203, 41, 0, 0, 88, 64, 212, 171, 206, 39, 218, 5, 255, 3, 193, 52, 44, 198, + 171, 83, 19, 80, 114, 225, 186, 191, 156, 192, 84, 146, 245, 159, 31, 240, 9, + 247, 4, 87, 170, 168, 98, 199, 21, 139, 19, 190, 12, 251, 65, 215, 169, 26, 86, + 37, 137, 188, 17, 14, 178, 205, 175, 93, 39, 86, 4, 138, 187, 234, 95, 5, 88, + 32, 32, 253, 186, 201, 177, 11, 117, 135, 187, 167, 181, 188, 22, 59, 206, 105, + 231, 150, 215, 30, 78, 212, 76, 16, 252, 180, 72, 134, 137, 247, 161, 68, + ], + }, + DmqMsg { + msg_id: vec![1, 2], + msg_body: DmqMessageTestPayload::new(b"msg_2").to_bytes_vec().unwrap(), + block_number: 11, + ttl: 100, + kes_signature: vec![1, 2, 3, 4], + operational_certificate: vec![ + 130, 132, 88, 32, 230, 80, 215, 83, 21, 9, 187, 108, 255, 215, 153, 140, 40, + 198, 142, 78, 200, 250, 98, 26, 9, 82, 32, 110, 161, 30, 176, 63, 205, 125, + 203, 41, 0, 0, 88, 64, 132, 4, 199, 39, 190, 173, 88, 102, 121, 117, 55, 62, + 39, 189, 113, 96, 175, 24, 171, 240, 74, 42, 139, 202, 128, 185, 44, 130, 209, + 77, 191, 122, 196, 224, 33, 158, 187, 156, 203, 190, 173, 150, 247, 87, 172, + 58, 153, 185, 157, 87, 128, 14, 187, 107, 187, 215, 105, 195, 107, 135, 172, + 43, 173, 9, 88, 32, 77, 75, 24, 6, 47, 133, 2, 89, 141, 224, 69, 202, 123, 105, + 240, 103, 245, 159, 147, 177, 110, 58, 248, 115, 58, 152, 138, 220, 35, 65, + 245, 200, + ], + }, + ] + } + + fn setup_dmq_server( + socket_path: PathBuf, + reply_messages: Vec, + ) -> JoinHandle { + tokio::spawn({ + async move { + // server setup + if socket_path.exists() { + fs::remove_file(socket_path.clone()).unwrap(); + } + let listener = UnixListener::bind(socket_path).unwrap(); + let mut server = pallas_network::facades::DmqServer::accept(&listener, 0) + .await + .unwrap(); + + // init local msg notification server + let server_msg = server.msg_notification(); + + // server waits for blocking request from client + let request = server_msg.recv_next_request().await.unwrap(); + assert_eq!(request, localmsgnotification::Request::Blocking); + + if !reply_messages.is_empty() { + // server replies with messages if any + server_msg + .send_reply_messages_blocking(reply_messages) + .await + .unwrap(); + assert_eq!(*server_msg.state(), localmsgnotification::State::Idle); + + // server receives done from client + server_msg.recv_done().await.unwrap(); + assert_eq!(*server_msg.state(), localmsgnotification::State::Done); + } else { + // server waits if no message available + future::pending().await + } + + server + } + }) + } + + #[tokio::test] + async fn pallas_dmq_consumer_publisher_succeeds_when_messages_are_available() { + let socket_path = create_temp_dir(current_function!()).join("node.socket"); + let reply_messages = fake_msgs(); + let server = setup_dmq_server(socket_path.clone(), reply_messages); + let client = tokio::spawn(async move { + let consumer = DmqConsumerPallas::new( + socket_path, + CardanoNetwork::TestNet(0), + TestLogger::stdout(), + ); + + consumer.consume_messages().await.unwrap() + }); + + let (_, client_res) = tokio::join!(server, client); + let messages = client_res.unwrap(); + + assert_eq!( + vec![ + ( + DmqMessageTestPayload::new(b"msg_1"), + "pool1mxyec46067n3querj9cxkk0g0zlag93pf3ya9vuyr3wgkq2e6t7".to_string() + ), + ( + DmqMessageTestPayload::new(b"msg_2"), + "pool17sln0evyk5tfj6zh2qrlk9vttgy6264sfe2fkec5mheasnlx3yd".to_string() + ), + ], + messages + ); + } + + #[tokio::test] + async fn pallas_dmq_consumer_publisher_blocks_when_no_message_available() { + let socket_path = create_temp_dir(current_function!()).join("node.socket"); + let reply_messages = vec![]; + let server = setup_dmq_server(socket_path.clone(), reply_messages); + let client = tokio::spawn(async move { + let consumer = DmqConsumerPallas::::new( + socket_path, + CardanoNetwork::TestNet(0), + TestLogger::stdout(), + ); + + consumer.consume_messages().await.unwrap(); + }); + + let result = tokio::select!( + _res = sleep(Duration::from_millis(100)) => {Err(anyhow!("Timeout"))}, + _res = client => {Ok(())}, + _res = server => {Ok(())}, + ); + + result.expect_err("Should have timed out"); + } + + #[tokio::test] + async fn pallas_dmq_consumer_client_is_dropped_when_returning_error() { + let socket_path = create_temp_dir(current_function!()).join("node.socket"); + let reply_messages = fake_msgs(); + let server = setup_dmq_server(socket_path.clone(), reply_messages); + let client = tokio::spawn(async move { + let consumer = DmqConsumerPallas::::new( + socket_path, + CardanoNetwork::TestNet(0), + TestLogger::stdout(), + ); + + consumer.consume_messages().await.unwrap(); + + consumer + }); + + let (server_res, client_res) = tokio::join!(server, client); + let consumer = client_res.unwrap(); + let server = server_res.unwrap(); + server.abort().await; + + let client = tokio::spawn(async move { + assert!(consumer.has_client().await, "Client should exist"); + + consumer + .consume_messages() + .await + .expect_err("Consuming messages should fail"); + + assert!( + !consumer.has_client().await, + "Client should have been dropped after error" + ); + + consumer + }); + client.await.unwrap(); + } +} diff --git a/internal/mithril-dmq/src/lib.rs b/internal/mithril-dmq/src/lib.rs new file mode 100644 index 00000000000..db7536fc64e --- /dev/null +++ b/internal/mithril-dmq/src/lib.rs @@ -0,0 +1,36 @@ +#![warn(missing_docs)] +//! This crate provides mechanisms to publish and consume messages of a Decentralized Message Queue network through a DMQ node. + +mod consumer; +mod message; +mod publisher; +pub mod test; + +pub use consumer::{DmqConsumer, DmqConsumerPallas}; +pub use message::DmqMessageBuilder; +pub use publisher::{DmqPublisher, DmqPublisherPallas}; + +#[cfg(test)] +pub(crate) mod test_tools { + use std::io; + use std::sync::Arc; + + use slog::{Drain, Logger}; + use slog_async::Async; + use slog_term::{CompactFormat, PlainDecorator}; + + pub struct TestLogger; + + impl TestLogger { + fn from_writer(writer: W) -> Logger { + let decorator = PlainDecorator::new(writer); + let drain = CompactFormat::new(decorator).build().fuse(); + let drain = Async::new(drain).build().fuse(); + Logger::root(Arc::new(drain), slog::o!()) + } + + pub fn stdout() -> Logger { + Self::from_writer(slog_term::TestStdoutWriter) + } + } +} diff --git a/internal/mithril-dmq/src/message.rs b/internal/mithril-dmq/src/message.rs new file mode 100644 index 00000000000..8b05dab8263 --- /dev/null +++ b/internal/mithril-dmq/src/message.rs @@ -0,0 +1,147 @@ +use std::sync::Arc; + +use anyhow::{anyhow, Context}; +use blake2::{digest::consts::U64, Blake2b, Digest}; +use pallas_network::miniprotocols::localmsgsubmission::DmqMsg; + +use mithril_cardano_node_chain::chain_observer::ChainObserver; +use mithril_common::{ + crypto_helper::{KesSigner, TryToBytes}, + StdResult, +}; + +/// The TTL (Time To Live) for DMQ messages in blocks. +const DMQ_MESSAGE_TTL_IN_BLOCKS: u16 = 100; + +/// A builder for creating DMQ messages. +pub struct DmqMessageBuilder { + kes_signer: Arc, + chain_observer: Arc, + ttl_blocks: u16, +} + +impl DmqMessageBuilder { + /// Creates a new instance of `DmqMessageBuilder`. + pub fn new(kes_signer: Arc, chain_observer: Arc) -> Self { + Self { + kes_signer, + chain_observer, + ttl_blocks: DMQ_MESSAGE_TTL_IN_BLOCKS, + } + } + + /// Set the TTL (Time To Live) for DMQ messages in blocks. + pub fn set_ttl(mut self, ttl_blocks: u16) -> Self { + self.ttl_blocks = ttl_blocks; + + self + } + + /// Builds a DMQ message from the provided message bytes. + pub async fn build(&self, message_bytes: &[u8]) -> StdResult { + fn compute_msg_id(dmq_message: &DmqMsg) -> Vec { + let mut hasher = Blake2b::::new(); + hasher.update(&dmq_message.msg_body); + hasher.update(dmq_message.block_number.to_be_bytes()); + hasher.update(dmq_message.ttl.to_be_bytes()); + hasher.update(&dmq_message.kes_signature); + hasher.update(&dmq_message.operational_certificate); + + hasher.finalize().to_vec() + } + + let block_number = self + .chain_observer + .get_current_chain_point() + .await + .with_context(|| "Failed to get current chain point while building DMQ message")? + .ok_or(anyhow!( + "No current chain point available while building DMQ message" + ))? + .block_number; + let block_number = (*block_number) + .try_into() + .with_context(|| "Failed to convert block number to u32")?; + let (kes_signature, operational_certificate) = self + .kes_signer + .sign(message_bytes, block_number) + .with_context(|| "Failed to KES sign message while building DMQ message")?; + let mut dmq_message = DmqMsg { + msg_id: vec![], + msg_body: message_bytes.to_vec(), + block_number, + ttl: self.ttl_blocks, + kes_signature: kes_signature.to_bytes_vec()?, + operational_certificate: operational_certificate.to_bytes_vec()?, + }; + dmq_message.msg_id = compute_msg_id(&dmq_message); + + Ok(dmq_message) + } +} + +#[cfg(test)] +mod tests { + use mithril_cardano_node_chain::test::double::FakeChainObserver; + use mithril_common::{ + crypto_helper::{KesSignerFake, TryToBytes}, + entities::{BlockNumber, ChainPoint, TimePoint}, + }; + + use super::*; + + mod test_utils { + use super::*; + + pub(super) struct TestMessage { + pub(super) content: Vec, + } + + impl TryToBytes for TestMessage { + fn to_bytes_vec(&self) -> StdResult> { + Ok(self.content.clone()) + } + } + } + + #[tokio::test] + async fn test_build_dmq_message() { + let (kes_signature, operational_certificate) = KesSignerFake::dummy_signature(); + let kes_signer = Arc::new(KesSignerFake::new(vec![Ok(( + kes_signature, + operational_certificate.clone(), + ))])); + let chain_observer = Arc::new(FakeChainObserver::new(Some(TimePoint { + chain_point: ChainPoint { + block_number: BlockNumber(123), + ..ChainPoint::dummy() + }, + ..TimePoint::dummy() + }))); + let builder = DmqMessageBuilder::new(kes_signer, chain_observer).set_ttl(100); + let message = test_utils::TestMessage { + content: b"test".to_vec(), + }; + + let dmq_message = builder + .build(&message.to_bytes_vec().unwrap()) + .await + .unwrap(); + + assert!(!dmq_message.msg_id.is_empty()); + assert_eq!( + DmqMsg { + msg_id: vec![], + msg_body: b"test".to_vec(), + block_number: 123, + ttl: 100, + kes_signature: kes_signature.to_bytes_vec().unwrap(), + operational_certificate: operational_certificate.to_bytes_vec().unwrap(), + }, + DmqMsg { + msg_id: vec![], + ..dmq_message + } + ); + } +} diff --git a/internal/mithril-dmq/src/publisher/interface.rs b/internal/mithril-dmq/src/publisher/interface.rs new file mode 100644 index 00000000000..6f41a23bf3a --- /dev/null +++ b/internal/mithril-dmq/src/publisher/interface.rs @@ -0,0 +1,9 @@ +use mithril_common::{crypto_helper::TryToBytes, StdResult}; + +/// Trait for publishing messages from a DMQ node. +#[cfg_attr(test, mockall::automock)] +#[async_trait::async_trait] +pub trait DmqPublisher: Send + Sync { + /// Publishes a message to the DMQ node. + async fn publish_message(&self, message: M) -> StdResult<()>; +} diff --git a/internal/mithril-dmq/src/publisher/mod.rs b/internal/mithril-dmq/src/publisher/mod.rs new file mode 100644 index 00000000000..4035f6c0659 --- /dev/null +++ b/internal/mithril-dmq/src/publisher/mod.rs @@ -0,0 +1,5 @@ +mod interface; +mod pallas; + +pub use interface::*; +pub use pallas::*; diff --git a/internal/mithril-dmq/src/publisher/pallas.rs b/internal/mithril-dmq/src/publisher/pallas.rs new file mode 100644 index 00000000000..3d9a57dc19a --- /dev/null +++ b/internal/mithril-dmq/src/publisher/pallas.rs @@ -0,0 +1,213 @@ +use std::{fmt::Debug, marker::PhantomData, path::PathBuf}; + +use anyhow::Context; +use pallas_network::{facades::DmqClient, miniprotocols::localtxsubmission::Response}; +use slog::{debug, error, Logger}; + +use mithril_common::{ + crypto_helper::TryToBytes, logging::LoggerExtensions, CardanoNetwork, StdResult, +}; + +use crate::{DmqMessageBuilder, DmqPublisher}; + +/// A DMQ publisher implementation. +/// +/// This implementation is built upon the n2c mini-protocols DMQ implementation in Pallas. +pub struct DmqPublisherPallas { + socket: PathBuf, + network: CardanoNetwork, + dmq_message_builder: DmqMessageBuilder, + logger: Logger, + phantom: PhantomData, +} + +impl DmqPublisherPallas { + /// Creates a new instance of [DmqPublisherPallas]. + pub fn new( + socket: PathBuf, + network: CardanoNetwork, + dmq_message_builder: DmqMessageBuilder, + logger: Logger, + ) -> Self { + Self { + socket, + network, + dmq_message_builder, + logger: logger.new_with_component_name::(), + phantom: PhantomData, + } + } + + /// Creates and returns a new `DmqClient` connected to the specified socket. + async fn new_client(&self) -> StdResult { + let magic = self.network.code(); + DmqClient::connect(&self.socket, magic) + .await + .with_context(|| "DmqPublisherPallas failed to create a new client") + } +} + +#[async_trait::async_trait] +impl DmqPublisher for DmqPublisherPallas { + async fn publish_message(&self, message: M) -> StdResult<()> { + debug!( + self.logger, + "Publish message to DMQ"; + "message" => ?message + ); + let mut client = self.new_client().await?; + let message_bytes = &message.to_bytes_vec()?; + let dmq_message = self + .dmq_message_builder + .build(message_bytes) + .await + .with_context(|| "Failed to build DMQ message")?; + client + .msg_submission() + .send_submit_tx(dmq_message) + .await + .with_context(|| "Failed to submit DMQ message")?; + let response = client.msg_submission().recv_submit_tx_response().await?; + if let Err(e) = client.msg_submission().terminate_gracefully().await { + error!(self.logger, "Failed to send Done"; "error" => ?e); + } + + if response != Response::Accepted { + anyhow::bail!("Failed to publish DMQ message: {:?}", response); + } + + Ok(()) + } +} + +#[cfg(all(test, unix))] +mod tests { + + use std::{fs, sync::Arc}; + + use pallas_network::miniprotocols::{ + localmsgsubmission::DmqMsgValidationError, localtxsubmission, + }; + use tokio::{net::UnixListener, task::JoinHandle}; + + use mithril_cardano_node_chain::test::double::FakeChainObserver; + use mithril_common::{crypto_helper::KesSignerFake, current_function, test_utils::TempDir}; + + use crate::{test::payload::DmqMessageTestPayload, test_tools::TestLogger}; + + use super::*; + + fn create_temp_dir(folder_name: &str) -> PathBuf { + TempDir::create_with_short_path("dmq_publisher", folder_name) + } + + fn setup_dmq_server(socket_path: PathBuf, reply_success: bool) -> JoinHandle<()> { + tokio::spawn({ + async move { + // server setup + if socket_path.exists() { + fs::remove_file(socket_path.clone()).unwrap(); + } + let listener = UnixListener::bind(socket_path).unwrap(); + let mut server = pallas_network::facades::DmqServer::accept(&listener, 0) + .await + .unwrap(); + + // init local msg submission server + let server_msg = server.msg_submission(); + + // server waits for request from client and replies to it + let request = server_msg.recv_next_request().await.unwrap(); + match &request { + localtxsubmission::Request::Submit(_) => (), + request => panic!("Expected a Submit request, but received: {request:?}"), + } + let response = if reply_success { + localtxsubmission::Response::Accepted + } else { + localtxsubmission::Response::Rejected(DmqMsgValidationError( + "fake error".to_string(), + )) + }; + server_msg.send_submit_tx_response(response).await.unwrap(); + + // server receives done from client + let request = server_msg.recv_next_request().await.unwrap(); + assert_eq!(localtxsubmission::Request::Done, request); + } + }) + } + + #[tokio::test] + async fn pallas_dmq_signature_publisher_success() { + let socket_path = create_temp_dir(current_function!()).join("node.socket"); + let reply_success = true; + let server = setup_dmq_server(socket_path.clone(), reply_success); + let client = tokio::spawn(async move { + let publisher = DmqPublisherPallas::new( + socket_path, + CardanoNetwork::TestNet(0), + DmqMessageBuilder::new( + { + let (kes_signature, operational_certificate) = + KesSignerFake::dummy_signature(); + let kes_signer = KesSignerFake::new(vec![Ok(( + kes_signature, + operational_certificate.clone(), + ))]); + + Arc::new(kes_signer) + }, + Arc::new(FakeChainObserver::default()), + ) + .set_ttl(100), + TestLogger::stdout(), + ); + + publisher + .publish_message(DmqMessageTestPayload::dummy()) + .await + }); + + let (_, res) = tokio::join!(server, client); + + res.unwrap().unwrap(); + } + + #[tokio::test] + async fn pallas_dmq_signature_publisher_fails() { + let socket_path = create_temp_dir(current_function!()).join("node.socket"); + let reply_success = false; + let server = setup_dmq_server(socket_path.clone(), reply_success); + let client = tokio::spawn(async move { + let publisher = DmqPublisherPallas::new( + socket_path, + CardanoNetwork::TestNet(0), + DmqMessageBuilder::new( + { + let (kes_signature, operational_certificate) = + KesSignerFake::dummy_signature(); + let kes_signer = KesSignerFake::new(vec![Ok(( + kes_signature, + operational_certificate.clone(), + ))]); + + Arc::new(kes_signer) + }, + Arc::new(FakeChainObserver::default()), + ) + .set_ttl(100), + TestLogger::stdout(), + ); + + publisher + .publish_message(DmqMessageTestPayload::dummy()) + .await + }); + + let (_, res) = tokio::join!(server, client); + + res.unwrap() + .expect_err("Publishing DMQ message should fail"); + } +} diff --git a/internal/mithril-dmq/src/test/double/consumer.rs b/internal/mithril-dmq/src/test/double/consumer.rs new file mode 100644 index 00000000000..612b9e73141 --- /dev/null +++ b/internal/mithril-dmq/src/test/double/consumer.rs @@ -0,0 +1,81 @@ +use std::{collections::VecDeque, fmt::Debug}; + +use tokio::sync::Mutex; + +use mithril_common::{crypto_helper::TryFromBytes, entities::PartyId, StdResult}; + +use crate::DmqConsumer; + +type ConsumerReturn = StdResult>; + +/// A fake implementation of the [DmqConsumer] trait for testing purposes. +pub struct DmqConsumerFake { + results: Mutex>>, +} + +impl DmqConsumerFake { + /// Creates a new `DmqConsumerFake` instance with the provided results. + pub fn new(results: Vec>>) -> Self { + Self { + results: Mutex::new(VecDeque::from(results)), + } + } +} + +#[async_trait::async_trait] +impl DmqConsumer for DmqConsumerFake { + async fn consume_messages(&self) -> ConsumerReturn { + let mut results = self.results.lock().await; + + results + .pop_front() + .ok_or_else(|| anyhow::anyhow!("No more results available in DmqConsumerFake"))? + } +} + +#[cfg(test)] +mod tests { + use crate::test::payload::DmqMessageTestPayload; + + use super::*; + + #[tokio::test] + async fn consume_messages_success() { + let consumer = DmqConsumerFake::new(vec![ + Ok(vec![( + DmqMessageTestPayload::new(b"test-1"), + "pool-id-1".to_string(), + )]), + Ok(vec![( + DmqMessageTestPayload::new(b"test-2"), + "pool-id-2".to_string(), + )]), + ]); + + let messages = consumer.consume_messages().await.unwrap(); + + assert_eq!( + vec![( + DmqMessageTestPayload::new(b"test-1"), + "pool-id-1".to_string(), + )], + messages + ); + } + + #[tokio::test] + async fn consume_messages_failure() { + let consumer = DmqConsumerFake::new(vec![ + Err(anyhow::anyhow!("Test error")), + Ok(vec![( + DmqMessageTestPayload::new(b"test-2"), + "pool-id-2".to_string(), + )]), + ]); + + consumer + .consume_messages() + .await + .expect_err("DmqConsumerFake should return an error"); + } +} diff --git a/internal/mithril-dmq/src/test/double/mod.rs b/internal/mithril-dmq/src/test/double/mod.rs new file mode 100644 index 00000000000..fd20a88aa0d --- /dev/null +++ b/internal/mithril-dmq/src/test/double/mod.rs @@ -0,0 +1,9 @@ +//! Test doubles +//! +//! Enable unit testing with controlled inputs and predictable behavior. + +mod consumer; +mod publisher; + +pub use consumer::*; +pub use publisher::*; diff --git a/internal/mithril-dmq/src/test/double/publisher.rs b/internal/mithril-dmq/src/test/double/publisher.rs new file mode 100644 index 00000000000..4960ca25320 --- /dev/null +++ b/internal/mithril-dmq/src/test/double/publisher.rs @@ -0,0 +1,61 @@ +use std::{collections::VecDeque, fmt::Debug, marker::PhantomData}; + +use tokio::sync::Mutex; + +use mithril_common::{crypto_helper::TryToBytes, StdResult}; + +use crate::DmqPublisher; + +/// A fake implementation of the [DmqPublisher] trait for testing purposes. +pub struct DmqPublisherFake { + results: Mutex>>, + phantom: PhantomData, +} + +impl DmqPublisherFake { + /// Creates a new `DmqPublisherFake` instance with the provided results. + pub fn new(results: Vec>) -> Self { + Self { + results: Mutex::new(VecDeque::from(results)), + phantom: PhantomData, + } + } +} + +#[async_trait::async_trait] +impl DmqPublisher for DmqPublisherFake { + async fn publish_message(&self, _message: M) -> StdResult<()> { + let mut results = self.results.lock().await; + + results + .pop_front() + .ok_or_else(|| anyhow::anyhow!("No more results available in DmqPublisherFake"))? + } +} + +#[cfg(test)] +mod tests { + use crate::test::payload::DmqMessageTestPayload; + + use super::*; + + #[tokio::test] + async fn publish_messages_success() { + let publisher = DmqPublisherFake::new(vec![Ok(()), Err(anyhow::anyhow!("Test error"))]); + + publisher + .publish_message(DmqMessageTestPayload::new(b"test-1")) + .await + .unwrap(); + } + + #[tokio::test] + async fn publish_messages_failure() { + let publisher = DmqPublisherFake::new(vec![Err(anyhow::anyhow!("Test error")), Ok(())]); + + publisher + .publish_message(DmqMessageTestPayload::new(b"test-1")) + .await + .expect_err("DmqPublisherFake should return an error"); + } +} diff --git a/internal/mithril-dmq/src/test/mod.rs b/internal/mithril-dmq/src/test/mod.rs new file mode 100644 index 00000000000..a1d1c1930d9 --- /dev/null +++ b/internal/mithril-dmq/src/test/mod.rs @@ -0,0 +1,10 @@ +//! Test utilities. +//! +//! ⚠ Do not use in production code ⚠ +//! +//! This module provides in particular test doubles for the traits defined in this crate. + +pub mod double; + +#[cfg(test)] +pub(crate) mod payload; diff --git a/internal/mithril-dmq/src/test/payload.rs b/internal/mithril-dmq/src/test/payload.rs new file mode 100644 index 00000000000..4dbb829afac --- /dev/null +++ b/internal/mithril-dmq/src/test/payload.rs @@ -0,0 +1,50 @@ +use std::fmt::Debug; + +use mithril_common::{ + crypto_helper::{TryFromBytes, TryToBytes}, + StdResult, +}; + +/// A test message payload for the DMQ. +#[derive(PartialEq, Eq)] +pub struct DmqMessageTestPayload { + message: Vec, +} + +impl DmqMessageTestPayload { + /// Creates a new `DmqMessageTestPayload` with the given bytes. + pub fn new(bytes: &[u8]) -> Self { + Self { + message: bytes.to_vec(), + } + } + + /// Creates a dummy `DmqMessageTestPayload` with a predefined message. + pub fn dummy() -> Self { + Self { + message: b"dummy message".to_vec(), + } + } +} + +impl Debug for DmqMessageTestPayload { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("DmqMessageTestPayload") + .field("message", &self.message) + .finish() + } +} + +impl TryToBytes for DmqMessageTestPayload { + fn to_bytes_vec(&self) -> StdResult> { + Ok(self.message.clone()) + } +} + +impl TryFromBytes for DmqMessageTestPayload { + fn try_from_bytes(bytes: &[u8]) -> StdResult { + Ok(Self { + message: bytes.to_vec(), + }) + } +} diff --git a/mithril-aggregator/Cargo.toml b/mithril-aggregator/Cargo.toml index ddf7d805559..256228e502b 100644 --- a/mithril-aggregator/Cargo.toml +++ b/mithril-aggregator/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "mithril-aggregator" -version = "0.7.69" +version = "0.7.70" description = "A Mithril Aggregator server" authors = { workspace = true } edition = { workspace = true } @@ -14,6 +14,7 @@ default = ["jemallocator"] bundle_tls = ["reqwest/native-tls-vendored"] jemallocator = ["dep:tikv-jemallocator"] +future_dmq = ["dep:mithril-dmq"] [dependencies] anyhow = { workspace = true } @@ -29,6 +30,7 @@ mithril-cardano-node-chain = { path = "../internal/cardano-node/mithril-cardano- mithril-cardano-node-internal-database = { path = "../internal/cardano-node/mithril-cardano-node-internal-database" } mithril-cli-helper = { path = "../internal/mithril-cli-helper" } mithril-common = { path = "../mithril-common", features = ["full"] } +mithril-dmq = { path = "../internal/mithril-dmq", optional = true } mithril-doc = { path = "../internal/mithril-doc" } mithril-era = { path = "../internal/mithril-era" } mithril-metric = { path = "../internal/mithril-metric" } diff --git a/mithril-aggregator/config/dev.json b/mithril-aggregator/config/dev.json index e9beb33babf..d950ad05650 100644 --- a/mithril-aggregator/config/dev.json +++ b/mithril-aggregator/config/dev.json @@ -1,7 +1,7 @@ { "environment": "Production", "cardano_cli_path": "cardano-cli", - "cardano_node_socket_path": "/tmp/cardano.sock", + "cardano_node_socket_path": "/ipc/node.socket", "network": "devnet", "network_magic": 42, "run_interval": 30000, diff --git a/mithril-aggregator/config/preview.json b/mithril-aggregator/config/preview.json index 99038a2cfee..b7af38ed360 100644 --- a/mithril-aggregator/config/preview.json +++ b/mithril-aggregator/config/preview.json @@ -1,7 +1,7 @@ { "environment": "Production", "cardano_cli_path": "cardano-cli", - "cardano_node_socket_path": "/tmp/cardano.sock", + "cardano_node_socket_path": "/ipc/node.socket", "network": "preview", "run_interval": 60000, "protocol_parameters": { diff --git a/mithril-aggregator/src/commands/genesis_command.rs b/mithril-aggregator/src/commands/genesis_command.rs index f864569e320..94f6e93578c 100644 --- a/mithril-aggregator/src/commands/genesis_command.rs +++ b/mithril-aggregator/src/commands/genesis_command.rs @@ -27,9 +27,8 @@ pub struct GenesisCommandConfiguration { #[example = "`cardano-cli`"] pub cardano_cli_path: Option, - /// Path of the socket used by the Cardano CLI tool - /// to communicate with the Cardano node - #[example = "`/tmp/cardano.sock`"] + /// Path of the socket opened by the Cardano node + #[example = "`/ipc/node.socket`"] pub cardano_node_socket_path: PathBuf, /// Cardano Network Magic number diff --git a/mithril-aggregator/src/configuration.rs b/mithril-aggregator/src/configuration.rs index 76509df4c69..b925a1c14e4 100644 --- a/mithril-aggregator/src/configuration.rs +++ b/mithril-aggregator/src/configuration.rs @@ -61,12 +61,16 @@ pub trait ConfigurationSource { panic!("cardano_cli_path is not implemented."); } - /// Path of the socket used by the Cardano CLI tool - /// to communicate with the Cardano node + /// Path of the socket opened by the Cardano node fn cardano_node_socket_path(&self) -> PathBuf { panic!("cardano_node_socket_path is not implemented."); } + /// Path of the socket opened by the DMQ node + fn dmq_node_socket_path(&self) -> Option { + panic!("dmq_node_socket_path is not implemented."); + } + /// Cardano node version. /// /// **NOTE**: This cannot be verified for now (see [this @@ -390,11 +394,14 @@ pub struct ServeCommandConfiguration { #[example = "`cardano-cli`"] pub cardano_cli_path: PathBuf, - /// Path of the socket used by the Cardano CLI tool - /// to communicate with the Cardano node - #[example = "`/tmp/cardano.sock`"] + /// Path of the socket opened by the Cardano node + #[example = "`/ipc/node.socket`"] pub cardano_node_socket_path: PathBuf, + /// Path of the socket opened by the DMQ node + #[example = "`/ipc/dmq.socket`"] + pub dmq_node_socket_path: Option, + /// Cardano node version. /// /// **NOTE**: This cannot be verified for now (see [this @@ -628,6 +635,7 @@ impl ServeCommandConfiguration { environment: ExecutionEnvironment::Test, cardano_cli_path: PathBuf::new(), cardano_node_socket_path: PathBuf::new(), + dmq_node_socket_path: None, cardano_node_version: "0.0.1".to_string(), network_magic: Some(42), network: "devnet".to_string(), @@ -707,6 +715,10 @@ impl ConfigurationSource for ServeCommandConfiguration { self.cardano_node_socket_path.clone() } + fn dmq_node_socket_path(&self) -> Option { + self.dmq_node_socket_path.clone() + } + fn cardano_node_version(&self) -> String { self.cardano_node_version.clone() } diff --git a/mithril-aggregator/src/dependency_injection/builder/enablers/misc.rs b/mithril-aggregator/src/dependency_injection/builder/enablers/misc.rs index 1cf29aa7fec..0c377a35f84 100644 --- a/mithril-aggregator/src/dependency_injection/builder/enablers/misc.rs +++ b/mithril-aggregator/src/dependency_injection/builder/enablers/misc.rs @@ -7,11 +7,17 @@ use std::sync::Arc; use std::time::Duration; +#[cfg(feature = "future_dmq")] +use mithril_common::messages::RegisterSignatureMessageDmq; +#[cfg(feature = "future_dmq")] +use mithril_dmq::DmqConsumerPallas; use mithril_signed_entity_lock::SignedEntityTypeLock; use crate::database::repository::CertificateRepository; use crate::dependency_injection::{DependenciesBuilder, Result}; use crate::get_dependency; +#[cfg(feature = "future_dmq")] +use crate::services::SignatureConsumerDmq; use crate::services::{ AggregatorClient, AggregatorHTTPClient, MessageService, MithrilMessageService, SequentialSignatureProcessor, SignatureConsumer, SignatureConsumerNoop, SignatureProcessor, @@ -74,9 +80,22 @@ impl DependenciesBuilder { /// Builds a [SignatureConsumer] pub async fn build_signature_consumer(&mut self) -> Result> { - let signature_consumer = SignatureConsumerNoop; + #[cfg(feature = "future_dmq")] + let signature_consumer = match self.configuration.dmq_node_socket_path() { + Some(dmq_node_socket_path) => { + let dmq_consumer = Arc::new(DmqConsumerPallas::::new( + dmq_node_socket_path, + self.configuration.get_network()?, + self.root_logger(), + )); + Arc::new(SignatureConsumerDmq::new(dmq_consumer)) as Arc + } + _ => Arc::new(SignatureConsumerNoop) as Arc, + }; + #[cfg(not(feature = "future_dmq"))] + let signature_consumer = Arc::new(SignatureConsumerNoop) as Arc; - Ok(Arc::new(signature_consumer)) + Ok(signature_consumer) } /// Builds a [SignatureProcessor] diff --git a/mithril-aggregator/src/services/signature_consumer/dmq.rs b/mithril-aggregator/src/services/signature_consumer/dmq.rs new file mode 100644 index 00000000000..4b14be060d8 --- /dev/null +++ b/mithril-aggregator/src/services/signature_consumer/dmq.rs @@ -0,0 +1,104 @@ +use std::sync::Arc; + +use anyhow::Context; +use async_trait::async_trait; + +use mithril_common::{ + entities::{SignedEntityType, SingleSignature}, + messages::RegisterSignatureMessageDmq, + StdResult, +}; + +use mithril_dmq::DmqConsumer; + +use super::SignatureConsumer; + +/// DMQ implementation of the [SignatureConsumer] trait. +pub struct SignatureConsumerDmq { + dmq_consumer: Arc>, +} + +impl SignatureConsumerDmq { + /// Creates a new instance of [SignatureConsumerDmq]. + pub fn new(dmq_consumer: Arc>) -> Self { + Self { dmq_consumer } + } +} + +#[async_trait] +impl SignatureConsumer for SignatureConsumerDmq { + async fn get_signatures(&self) -> StdResult> { + self.dmq_consumer + .consume_messages() + .await + .map(|messages| { + messages + .into_iter() + .map(|(message, party_id)| { + let signature = message.signature; + let won_indexes = signature.indexes.clone(); + let single_signature = + SingleSignature::new(party_id, signature, won_indexes); + let signed_entity_type = message.signed_entity_type; + + (single_signature, signed_entity_type) + }) + .collect() + }) + .with_context(|| "Failed to get signatures from DMQ") + } + + fn get_origin_tag(&self) -> String { + "DMQ".to_string() + } +} + +#[cfg(test)] +mod tests { + use mithril_common::{crypto_helper::ProtocolSingleSignature, test_utils::fake_keys}; + use mithril_dmq::test::double::DmqConsumerFake; + + use super::*; + + #[tokio::test] + async fn get_signatures_success() { + let signed_entity_type = SignedEntityType::dummy(); + let single_signature: ProtocolSingleSignature = + fake_keys::single_signature()[0].try_into().unwrap(); + let dmq_consumer = Arc::new(DmqConsumerFake::new(vec![Ok(vec![( + RegisterSignatureMessageDmq { + signature: single_signature.clone(), + signed_entity_type: signed_entity_type.to_owned(), + }, + "pool-id-1".to_string(), + )])])); + let consumer = SignatureConsumerDmq::new(dmq_consumer); + + let signatures = consumer.get_signatures().await.unwrap(); + + assert_eq!( + vec![( + SingleSignature::new( + "pool-id-1".to_string(), + single_signature.clone(), + single_signature.indexes.clone() + ), + signed_entity_type + )], + signatures + ); + } + + #[tokio::test] + async fn get_signatures_failure() { + let dmq_consumer = Arc::new(DmqConsumerFake::new(vec![Err(anyhow::anyhow!( + "Test error" + ))])); + let consumer = SignatureConsumerDmq::new(dmq_consumer); + + consumer + .get_signatures() + .await + .expect_err("SignatureConsumerDmq should return an error"); + } +} diff --git a/mithril-aggregator/src/services/signature_consumer/mod.rs b/mithril-aggregator/src/services/signature_consumer/mod.rs index 96610b2b029..52522ee135e 100644 --- a/mithril-aggregator/src/services/signature_consumer/mod.rs +++ b/mithril-aggregator/src/services/signature_consumer/mod.rs @@ -1,7 +1,11 @@ +#[cfg(feature = "future_dmq")] +mod dmq; mod fake; mod interface; mod noop; +#[cfg(feature = "future_dmq")] +pub use dmq::*; pub use fake::*; pub use interface::*; pub use noop::*; diff --git a/mithril-common/Cargo.toml b/mithril-common/Cargo.toml index 509dfeefaab..70ca27fcd5a 100644 --- a/mithril-common/Cargo.toml +++ b/mithril-common/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "mithril-common" -version = "0.6.3" +version = "0.6.4" description = "Common types, interfaces, and utilities for Mithril nodes." authors = { workspace = true } edition = { workspace = true } diff --git a/mithril-common/src/crypto_helper/cardano/kes/mod.rs b/mithril-common/src/crypto_helper/cardano/kes/mod.rs index 1dc2cc27ba3..b555fe2843f 100644 --- a/mithril-common/src/crypto_helper/cardano/kes/mod.rs +++ b/mithril-common/src/crypto_helper/cardano/kes/mod.rs @@ -1,12 +1,16 @@ mod error; mod interface; mod signer_with_key; -#[cfg(test)] -pub(crate) mod tests_setup; - mod verifier_standard; pub use error::*; pub use interface::*; pub use signer_with_key::*; pub use verifier_standard::*; + +cfg_test_tools! { + mod signer_fake; + pub mod tests_setup; + + pub use signer_fake::*; +} diff --git a/mithril-common/src/crypto_helper/cardano/kes/signer_fake.rs b/mithril-common/src/crypto_helper/cardano/kes/signer_fake.rs new file mode 100644 index 00000000000..935eeefbc75 --- /dev/null +++ b/mithril-common/src/crypto_helper/cardano/kes/signer_fake.rs @@ -0,0 +1,106 @@ +use std::collections::VecDeque; + +use kes_summed_ed25519::kes::Sum6KesSig; +use std::sync::Mutex; + +use crate::{ + crypto_helper::{ + cardano::kes::{ + tests_setup::{ + create_kes_cryptographic_material, KesCryptographicMaterialForTest, + KesPartyIndexForTest, + }, + KesSignerStandard, + }, + KesPeriod, KesSigner, OpCert, + }, + StdResult, +}; + +type KesSignatureResult = StdResult<(Sum6KesSig, OpCert)>; + +/// Fake KES Signer implementation. +pub struct KesSignerFake { + results: Mutex>, +} + +impl KesSignerFake { + /// Creates a new `KesSignerFake` instance. + pub fn new(results: Vec) -> Self { + Self { + results: Mutex::new(results.into()), + } + } + + /// Returns a dummy signature result that is always successful. + pub fn dummy_signature() -> (Sum6KesSig, OpCert) { + let KesCryptographicMaterialForTest { + party_id: _, + operational_certificate_file, + kes_secret_key_file, + } = create_kes_cryptographic_material( + 1 as KesPartyIndexForTest, + 0 as KesPeriod, + "fake_kes_signer_returns_signature_batches_in_expected_order", + ); + let message = b"Test message for KES signing"; + let kes_signer = KesSignerStandard::new(kes_secret_key_file, operational_certificate_file); + let kes_signing_period = 1; + let (kes_signature, op_cert) = kes_signer + .sign(message, kes_signing_period) + .expect("Signing should not fail"); + + (kes_signature, op_cert) + } + + /// Returns a dummy signature result that always fails. + pub fn dummy_signature_result_err() -> KesSignatureResult { + Err(anyhow::anyhow!("Dummy error")) + } +} + +impl KesSigner for KesSignerFake { + fn sign(&self, _message: &[u8], _kes_period: KesPeriod) -> KesSignatureResult { + let mut results = self.results.lock().unwrap(); + + results.pop_front().unwrap() + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn fake_kes_signer_returns_signature_batches_in_expected_order() { + let KesCryptographicMaterialForTest { + party_id: _, + operational_certificate_file, + kes_secret_key_file, + } = create_kes_cryptographic_material( + 1 as KesPartyIndexForTest, + 0 as KesPeriod, + "fake_kes_signer_returns_signature_batches_in_expected_order", + ); + let message = b"Test message for KES signing"; + let kes_signer = KesSignerStandard::new(kes_secret_key_file, operational_certificate_file); + let kes_signing_period = 1; + let (kes_signature, op_cert) = kes_signer + .sign(message, kes_signing_period) + .expect("Signing should not fail"); + let fake_kes_signer = KesSignerFake::new(vec![ + Ok((kes_signature, op_cert.clone())), + Err(anyhow::anyhow!("Fake error")), + ]); + + let (kes_signature_1, op_cert_1) = fake_kes_signer + .sign(message, kes_signing_period) + .expect("Signing should not fail"); + assert_eq!(kes_signature, kes_signature_1); + assert_eq!(op_cert, op_cert_1); + + fake_kes_signer + .sign(message, kes_signing_period) + .expect_err("Signing should fail"); + } +} diff --git a/mithril-common/src/crypto_helper/cardano/kes/signer_with_key.rs b/mithril-common/src/crypto_helper/cardano/kes/signer_with_key.rs index 07fdd577df5..1d1e08dd4e4 100644 --- a/mithril-common/src/crypto_helper/cardano/kes/signer_with_key.rs +++ b/mithril-common/src/crypto_helper/cardano/kes/signer_with_key.rs @@ -65,13 +65,14 @@ impl KesSigner for KesSignerStandard { mod tests { use super::*; - use crate::crypto_helper::cardano::{ - kes::tests_setup::create_kes_cryptographic_material, - tests_setup::KesCryptographicMaterialForTest, KesVerifier, KesVerifierStandard, + use crate::crypto_helper::cardano::kes::{ + tests_setup::{ + create_kes_cryptographic_material, KesCryptographicMaterialForTest, + KesPartyIndexForTest, + }, + KesVerifier, KesVerifierStandard, }; - type PartyIndex = u64; - #[test] fn create_valid_signature_for_message() { let KesCryptographicMaterialForTest { @@ -79,7 +80,7 @@ mod tests { operational_certificate_file, kes_secret_key_file, } = create_kes_cryptographic_material( - 1 as PartyIndex, + 1 as KesPartyIndexForTest, 0 as KesPeriod, "create_valid_signature_for_message", ); @@ -103,7 +104,7 @@ mod tests { operational_certificate_file, kes_secret_key_file, } = create_kes_cryptographic_material( - 1 as PartyIndex, + 1 as KesPartyIndexForTest, 0 as KesPeriod, "create_invalid_signature_for_different_message", ); @@ -133,7 +134,7 @@ mod tests { operational_certificate_file, kes_secret_key_file, } = create_kes_cryptographic_material( - 1 as PartyIndex, + 1 as KesPartyIndexForTest, kes_period_start, "create_invalid_signature_for_invalid_kes_period", ); diff --git a/mithril-common/src/crypto_helper/cardano/kes/tests_setup.rs b/mithril-common/src/crypto_helper/cardano/kes/tests_setup.rs index 20473797f79..e64db4d8bd9 100644 --- a/mithril-common/src/crypto_helper/cardano/kes/tests_setup.rs +++ b/mithril-common/src/crypto_helper/cardano/kes/tests_setup.rs @@ -6,16 +6,20 @@ use kes_summed_ed25519::traits::KesSk; use crate::crypto_helper::{cardano::ColdKeyGenerator, OpCert}; use crate::crypto_helper::{KesPeriod, ProtocolPartyId, SerDeShelleyFileFormat, Sum6KesBytes}; +/// A type alias for the party index used in KES cryptographic material. +pub type KesPartyIndexForTest = u64; + /// A struct to hold KES cryptographic material for testing purposes. pub(crate) struct KesCryptographicMaterialForTest { + #[allow(dead_code)] pub party_id: ProtocolPartyId, pub operational_certificate_file: PathBuf, pub kes_secret_key_file: PathBuf, } /// Create KES cryptographic material for testing purposes. -pub(crate) fn create_kes_cryptographic_material( - party_idx: u64, +pub fn create_kes_cryptographic_material( + party_idx: KesPartyIndexForTest, kes_period: KesPeriod, test_directory: &str, ) -> KesCryptographicMaterialForTest { diff --git a/mithril-common/src/crypto_helper/cardano/kes/verifier_standard.rs b/mithril-common/src/crypto_helper/cardano/kes/verifier_standard.rs index ecc4f55ee62..82cc0a222ed 100644 --- a/mithril-common/src/crypto_helper/cardano/kes/verifier_standard.rs +++ b/mithril-common/src/crypto_helper/cardano/kes/verifier_standard.rs @@ -47,9 +47,12 @@ impl KesVerifier for KesVerifierStandard { #[cfg(test)] mod tests { - use crate::crypto_helper::cardano::{ - kes::tests_setup::create_kes_cryptographic_material, - tests_setup::KesCryptographicMaterialForTest, KesSigner, KesSignerStandard, + use crate::crypto_helper::cardano::kes::{ + tests_setup::{ + create_kes_cryptographic_material, KesCryptographicMaterialForTest, + KesPartyIndexForTest, + }, + KesSigner, KesSignerStandard, }; use super::*; @@ -60,7 +63,11 @@ mod tests { party_id: _, operational_certificate_file, kes_secret_key_file, - } = create_kes_cryptographic_material(1, 0 as KesPeriod, "verify_valid_signature_succeeds"); + } = create_kes_cryptographic_material( + 1 as KesPartyIndexForTest, + 0 as KesPeriod, + "verify_valid_signature_succeeds", + ); let message = b"Test message for KES signing"; let kes_signer = KesSignerStandard::new(kes_secret_key_file, operational_certificate_file); let kes_signing_period = 1; @@ -79,7 +86,11 @@ mod tests { party_id: _, operational_certificate_file, kes_secret_key_file, - } = create_kes_cryptographic_material(1, 0 as KesPeriod, "verify_invalid_signature_fails"); + } = create_kes_cryptographic_material( + 1 as KesPartyIndexForTest, + 0 as KesPeriod, + "verify_invalid_signature_fails", + ); let message = b"Test message for KES signing"; let kes_signer = KesSignerStandard::new(kes_secret_key_file, operational_certificate_file); let kes_signing_period = 1; diff --git a/mithril-common/src/crypto_helper/cardano/key_certification.rs b/mithril-common/src/crypto_helper/cardano/key_certification.rs index aafe18c0f9a..fdacbe0e447 100644 --- a/mithril-common/src/crypto_helper/cardano/key_certification.rs +++ b/mithril-common/src/crypto_helper/cardano/key_certification.rs @@ -299,16 +299,20 @@ impl KeyRegWrapper { #[cfg(test)] mod test { - use super::*; - use crate::crypto_helper::cardano::tests_setup::KesCryptographicMaterialForTest; - use crate::crypto_helper::cardano::{ - tests_setup::create_kes_cryptographic_material, KesSignerStandard, + use crate::crypto_helper::cardano::kes::{ + tests_setup::{ + create_kes_cryptographic_material, KesCryptographicMaterialForTest, + KesPartyIndexForTest, + }, + KesSignerStandard, }; use crate::crypto_helper::{OpCert, SerDeShelleyFileFormat}; use rand_chacha::ChaCha20Rng; use rand_core::SeedableRng; + use super::*; + #[test] fn test_vector_key_reg() { let params = StmParameters { @@ -321,12 +325,20 @@ mod test { party_id: party_id_1, operational_certificate_file: operational_certificate_file_1, kes_secret_key_file: kes_secret_key_file_1, - } = create_kes_cryptographic_material(1, 0 as KesPeriod, "test_vector_key_reg"); + } = create_kes_cryptographic_material( + 1 as KesPartyIndexForTest, + 0 as KesPeriod, + "test_vector_key_reg", + ); let KesCryptographicMaterialForTest { party_id: party_id_2, operational_certificate_file: operational_certificate_file_2, kes_secret_key_file: kes_secret_key_file_2, - } = create_kes_cryptographic_material(2, 0 as KesPeriod, "test_vector_key_reg"); + } = create_kes_cryptographic_material( + 2 as KesPartyIndexForTest, + 0 as KesPeriod, + "test_vector_key_reg", + ); let mut key_reg = KeyRegWrapper::init(&vec![(party_id_1, 10), (party_id_2, 3)]); diff --git a/mithril-common/src/crypto_helper/merkle_map.rs b/mithril-common/src/crypto_helper/merkle_map.rs index fd8f5b0beba..9e680b869b4 100644 --- a/mithril-common/src/crypto_helper/merkle_map.rs +++ b/mithril-common/src/crypto_helper/merkle_map.rs @@ -360,8 +360,7 @@ impl Deserialize<'de>> MKMapProof { /// Convert the proof from bytes pub fn from_bytes(bytes: &[u8]) -> StdResult { let (res, _) = - bincode::serde::decode_from_slice::(bytes, bincode::config::standard()) - .map_err(|e| anyhow!(e))?; + bincode::serde::decode_from_slice::(bytes, bincode::config::standard())?; Ok(res) } diff --git a/mithril-common/src/crypto_helper/merkle_tree.rs b/mithril-common/src/crypto_helper/merkle_tree.rs index 85c4f87d959..80f447af734 100644 --- a/mithril-common/src/crypto_helper/merkle_tree.rs +++ b/mithril-common/src/crypto_helper/merkle_tree.rs @@ -204,8 +204,7 @@ impl MKProof { /// Convert the proof from bytes pub fn from_bytes(bytes: &[u8]) -> StdResult { let (res, _) = - bincode::serde::decode_from_slice::(bytes, bincode::config::standard()) - .map_err(|e| anyhow!(e))?; + bincode::serde::decode_from_slice::(bytes, bincode::config::standard())?; Ok(res) } diff --git a/mithril-common/src/crypto_helper/mod.rs b/mithril-common/src/crypto_helper/mod.rs index b6dc97bf9cb..6ae2dc7615b 100644 --- a/mithril-common/src/crypto_helper/mod.rs +++ b/mithril-common/src/crypto_helper/mod.rs @@ -20,6 +20,9 @@ pub use cardano::{ OpCert, ProtocolInitializerErrorWrapper, ProtocolRegistrationErrorWrapper, SerDeShelleyFileFormat, Sum6KesBytes, }; +cfg_test_tools! { + pub use cardano::KesSignerFake; +} pub use codec::*; pub use ed25519_alias::{era::*, genesis::*, manifest::*}; pub use merkle_map::{MKMap, MKMapKey, MKMapNode, MKMapProof, MKMapValue}; diff --git a/mithril-common/src/entities/signed_entity_type.rs b/mithril-common/src/entities/signed_entity_type.rs index a733061c5ea..fef43ec9c7b 100644 --- a/mithril-common/src/entities/signed_entity_type.rs +++ b/mithril-common/src/entities/signed_entity_type.rs @@ -8,7 +8,10 @@ use serde::{Deserialize, Serialize}; use sha2::Sha256; use strum::{AsRefStr, Display, EnumDiscriminants, EnumIter, EnumString, IntoEnumIterator}; -use crate::StdResult; +use crate::{ + crypto_helper::{TryFromBytes, TryToBytes}, + StdResult, +}; use super::{BlockNumber, CardanoDbBeacon, Epoch}; @@ -154,6 +157,21 @@ impl SignedEntityType { } } +impl TryFromBytes for SignedEntityType { + fn try_from_bytes(bytes: &[u8]) -> StdResult { + let (res, _) = + bincode::serde::decode_from_slice::(bytes, bincode::config::standard())?; + + Ok(res) + } +} + +impl TryToBytes for SignedEntityType { + fn to_bytes_vec(&self) -> StdResult> { + bincode::serde::encode_to_vec(self, bincode::config::standard()).map_err(|e| e.into()) + } +} + impl SignedEntityTypeDiscriminants { /// Get all the discriminants pub fn all() -> BTreeSet { @@ -406,6 +424,19 @@ mod tests { ); } + #[test] + fn bytes_encoding() { + let cardano_stake_distribution = SignedEntityType::CardanoStakeDistribution(Epoch(25)); + let cardano_stake_distribution_bytes = cardano_stake_distribution.to_bytes_vec().unwrap(); + let cardano_stake_distribution_from_bytes = + SignedEntityType::try_from_bytes(&cardano_stake_distribution_bytes).unwrap(); + + assert_eq!( + cardano_stake_distribution, + cardano_stake_distribution_from_bytes + ); + } + // Expected ord: // MithrilStakeDistribution < CardanoStakeDistribution < CardanoImmutableFilesFull < CardanoDatabase < CardanoTransactions #[test] diff --git a/mithril-common/src/messages/mod.rs b/mithril-common/src/messages/mod.rs index 1de4ccee9ce..d25b57103ab 100644 --- a/mithril-common/src/messages/mod.rs +++ b/mithril-common/src/messages/mod.rs @@ -61,7 +61,7 @@ pub use mithril_stake_distribution::MithrilStakeDistributionMessage; pub use mithril_stake_distribution_list::{ MithrilStakeDistributionListItemMessage, MithrilStakeDistributionListMessage, }; -pub use register_signature::RegisterSignatureMessageHttp; +pub use register_signature::{RegisterSignatureMessageDmq, RegisterSignatureMessageHttp}; pub use register_signer::RegisterSignerMessage; pub use snapshot::SnapshotMessage; pub use snapshot_download::SnapshotDownloadMessage; diff --git a/mithril-common/src/messages/register_signature.rs b/mithril-common/src/messages/register_signature.rs index 3a595cbfddb..5f6dbf2223f 100644 --- a/mithril-common/src/messages/register_signature.rs +++ b/mithril-common/src/messages/register_signature.rs @@ -1,7 +1,13 @@ -use serde::{Deserialize, Serialize}; use std::fmt::{Debug, Formatter}; -use crate::entities::{HexEncodedSingleSignature, LotteryIndex, PartyId, SignedEntityType}; +use anyhow::anyhow; +use serde::{Deserialize, Serialize}; + +use crate::{ + crypto_helper::{ProtocolSingleSignature, TryFromBytes, TryToBytes}, + entities::{HexEncodedSingleSignature, LotteryIndex, PartyId, SignedEntityType}, + StdResult, +}; #[cfg(any(test, feature = "test_tools"))] use crate::test_utils::fake_keys; @@ -65,11 +71,124 @@ impl Debug for RegisterSignatureMessageHttp { } } +/// Message structure to register single signature through the DMQ network. +#[derive(Clone, PartialEq, Eq)] +pub struct RegisterSignatureMessageDmq { + /// Signed entity type + pub signed_entity_type: SignedEntityType, + + /// The single signature. + pub signature: ProtocolSingleSignature, +} + +impl RegisterSignatureMessageDmq { + cfg_test_tools! { + /// Return a dummy test entity (test-only). + pub fn dummy() -> Self { + use crate::entities::Epoch; + Self { + signed_entity_type: SignedEntityType::MithrilStakeDistribution(Epoch(5)), + signature: fake_keys::single_signature()[0].try_into().unwrap(), + } + } + } + + /// Convert a `RegisterSignatureMessageDmq` into bytes + /// + /// # Layout + /// * Signed entity type length (u16) + /// * Signed entity type + /// * Protocol signature length (u32) + /// * Protocol signature + pub fn try_to_bytes_vec(&self) -> StdResult> { + let mut bytes = Vec::new(); + + let signed_entity_bytes = self.signed_entity_type.to_bytes_vec()?; + bytes.extend_from_slice(&(signed_entity_bytes.len() as u16).to_be_bytes()); + bytes.extend_from_slice(&signed_entity_bytes); + + let signature_bytes = self.signature.to_bytes_vec()?; + bytes.extend_from_slice(&(signature_bytes.len() as u32).to_be_bytes()); + bytes.extend_from_slice(&signature_bytes); + + Ok(bytes) + } + + /// Extract a `RegisterSignatureMessageDmq` from bytes. + pub fn try_from_bytes_vec(bytes: &[u8]) -> StdResult { + const SIGNED_ENTITY_TYPE_LENGTH_BYTES_SIZE: usize = 2; + const SIGNATURE_LENGTH_BYTES_SIZE: usize = 4; + let mut bytes_index = 0; + + let mut u16_bytes = [0u8; SIGNED_ENTITY_TYPE_LENGTH_BYTES_SIZE]; + u16_bytes.copy_from_slice( + bytes + .get(bytes_index..bytes_index + SIGNED_ENTITY_TYPE_LENGTH_BYTES_SIZE) + .ok_or(anyhow!("Failed to read `Signed entity type length` bytes"))?, + ); + let signed_entity_bytes_length = u16::from_be_bytes(u16_bytes) as usize; + bytes_index += SIGNED_ENTITY_TYPE_LENGTH_BYTES_SIZE; + + let signed_entity_bytes = bytes + .get(bytes_index..bytes_index + signed_entity_bytes_length) + .ok_or(anyhow!("Failed to read `Signed entity type` bytes"))?; + let signed_entity_type = SignedEntityType::try_from_bytes(signed_entity_bytes)?; + bytes_index += signed_entity_bytes_length; + + let mut u32_bytes = [0u8; SIGNATURE_LENGTH_BYTES_SIZE]; + u32_bytes.copy_from_slice( + bytes + .get(bytes_index..bytes_index + SIGNATURE_LENGTH_BYTES_SIZE) + .ok_or(anyhow!("Failed to read `Signature length` bytes"))?, + ); + let signature_bytes_length = u32::from_be_bytes(u32_bytes) as usize; + bytes_index += SIGNATURE_LENGTH_BYTES_SIZE; + + let signature_bytes = bytes + .get(bytes_index..bytes_index + signature_bytes_length) + .ok_or(anyhow!("Failed to read `Signature` bytes"))?; + let signature = ProtocolSingleSignature::from_bytes(signature_bytes)?; + + Ok(Self { + signed_entity_type, + signature, + }) + } +} + +impl Debug for RegisterSignatureMessageDmq { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + let is_pretty_printing = f.alternate(); + let mut debug = f.debug_struct("RegisterSignatureMessageDmq"); + debug.field( + "signed_entity_type", + &format_args!("{:?}", self.signed_entity_type), + ); + + match is_pretty_printing { + true => debug.field("signature", &self.signature).finish(), + false => debug.finish_non_exhaustive(), + } + } +} + +impl TryFromBytes for RegisterSignatureMessageDmq { + fn try_from_bytes(bytes: &[u8]) -> StdResult { + Self::try_from_bytes_vec(bytes) + } +} + +impl TryToBytes for RegisterSignatureMessageDmq { + fn to_bytes_vec(&self) -> StdResult> { + self.try_to_bytes_vec() + } +} + #[cfg(test)] mod tests { use super::*; - mod http_message { + mod golden_http_message { use crate::entities::{CardanoDbBeacon, Epoch}; use super::*; @@ -127,4 +246,44 @@ mod tests { assert_eq!(golden_message_current(), message); } } + + mod golden_dmq_message { + use hex::FromHex; + + use crate::entities::{CardanoDbBeacon, Epoch}; + + use super::*; + + const CURRENT_BYTES_HEX: &str = r#"0005020afbc006000001b8000000000000002f0000000000000000000000000000000100000000000000030000000000000004000000000000000600000000000000080000000000000009000000000000000a000000000000000b000000000000000c000000000000000e00000000000000120000000000000015000000000000001600000000000000170000000000000019000000000000001a000000000000001b000000000000001e0000000000000021000000000000002200000000000000260000000000000029000000000000002b0000000000000032000000000000003a000000000000003b000000000000003c000000000000003d000000000000003e0000000000000043000000000000004500000000000000470000000000000049000000000000004b000000000000004c000000000000004d0000000000000051000000000000005200000000000000530000000000000054000000000000005a000000000000005b000000000000005c000000000000005d0000000000000061000000000000006282b10fe518fbf7abc4d28f7156bd5c387021c1d436d61cd8e3ad647fb22862571db5ff6f9de0eb2e64a9e8bdfc528b240000000000000002"#; + + fn golden_message_current() -> RegisterSignatureMessageDmq { + RegisterSignatureMessageDmq { + signed_entity_type: SignedEntityType::CardanoImmutableFilesFull( + CardanoDbBeacon::new(*Epoch(10), 1728), + ), + signature: "7b227369676d61223a5b3133302c3137372c31352c3232392c32342c3235312c3234372c3137312c3139362c3231302c3134332c3131332c38362c3138392c39322c35362c3131322c33332c3139332c3231322c35342c3231342c32382c3231362c3232372c3137332c3130302c3132372c3137382c34302c39382c38372c32392c3138312c3235352c3131312c3135372c3232342c3233352c34362c3130302c3136392c3233322c3138392c3235322c38322c3133392c33365d2c22696e6465786573223a5b302c312c332c342c362c382c392c31302c31312c31322c31342c31382c32312c32322c32332c32352c32362c32372c33302c33332c33342c33382c34312c34332c35302c35382c35392c36302c36312c36322c36372c36392c37312c37332c37352c37362c37372c38312c38322c38332c38342c39302c39312c39322c39332c39372c39385d2c227369676e65725f696e646578223a327d".to_string().try_into().unwrap(), + } + } + + #[test] + fn test_current_bytes_decoded_into_current_message() { + let message_from_bytes_hex = RegisterSignatureMessageDmq::try_from_bytes_vec( + &Vec::from_hex(CURRENT_BYTES_HEX).unwrap(), + ) + .unwrap(); + + assert_eq!(golden_message_current(), message_from_bytes_hex); + } + + #[test] + fn test_current_bijective_bytes_codec() { + let message_to_bytes = golden_message_current().try_to_bytes_vec().unwrap(); + let message_from_bytes = + RegisterSignatureMessageDmq::try_from_bytes_vec(&message_to_bytes).unwrap(); + let message_from_bytes_to_bytes = message_from_bytes.try_to_bytes_vec().unwrap(); + + assert_eq!(golden_message_current(), message_from_bytes); + assert_eq!(message_to_bytes, message_from_bytes_to_bytes); + } + } } diff --git a/mithril-signer/Cargo.toml b/mithril-signer/Cargo.toml index 8c2e6247bb8..8dc2c7a1d96 100644 --- a/mithril-signer/Cargo.toml +++ b/mithril-signer/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "mithril-signer" -version = "0.2.256" +version = "0.2.257" description = "A Mithril Signer" authors = { workspace = true } edition = { workspace = true } @@ -14,6 +14,7 @@ default = ["jemallocator"] bundle_tls = ["reqwest/native-tls-vendored"] jemallocator = ["dep:tikv-jemallocator"] +future_dmq = ["dep:mithril-dmq"] [dependencies] anyhow = { workspace = true } @@ -26,6 +27,7 @@ mithril-cardano-node-chain = { path = "../internal/cardano-node/mithril-cardano- mithril-cardano-node-internal-database = { path = "../internal/cardano-node/mithril-cardano-node-internal-database" } mithril-cli-helper = { path = "../internal/mithril-cli-helper" } mithril-common = { path = "../mithril-common", features = ["full"] } +mithril-dmq = { path = "../internal/mithril-dmq", optional = true } mithril-doc = { path = "../internal/mithril-doc" } mithril-era = { path = "../internal/mithril-era" } mithril-metric = { path = "../internal/mithril-metric" } diff --git a/mithril-signer/src/configuration.rs b/mithril-signer/src/configuration.rs index efaa211b1c1..5f1f9698e65 100644 --- a/mithril-signer/src/configuration.rs +++ b/mithril-signer/src/configuration.rs @@ -35,11 +35,14 @@ pub struct Configuration { #[example = "`cardano-cli`"] pub cardano_cli_path: PathBuf, - /// Path of the socket used by the Cardano CLI tool - /// to communicate with the Cardano node - #[example = "`/tmp/cardano.sock`"] + /// Path of the socket opened by the Cardano node + #[example = "`/ipc/node.socket`"] pub cardano_node_socket_path: PathBuf, + /// Path of the socket opened by the DMQ node + #[example = "`/ipc/dmq.socket`"] + pub dmq_node_socket_path: Option, + /// Cardano network #[example = "`testnet` or `mainnet` or `devnet`"] pub network: String, @@ -150,6 +153,7 @@ impl Configuration { relay_endpoint: None, cardano_cli_path: PathBuf::new(), cardano_node_socket_path: PathBuf::new(), + dmq_node_socket_path: None, db_directory: PathBuf::new(), network: "devnet".to_string(), network_magic: Some(42), diff --git a/mithril-signer/src/dependency_injection/builder.rs b/mithril-signer/src/dependency_injection/builder.rs index cb232dfaace..5abb7723b2f 100644 --- a/mithril-signer/src/dependency_injection/builder.rs +++ b/mithril-signer/src/dependency_injection/builder.rs @@ -20,7 +20,11 @@ use mithril_cardano_node_internal_database::{ ImmutableFileObserver, ImmutableFileSystemObserver, }; use mithril_common::api_version::APIVersionProvider; -use mithril_common::crypto_helper::{OpCert, ProtocolPartyId, SerDeShelleyFileFormat}; +use mithril_common::crypto_helper::{ + KesSigner, KesSignerStandard, OpCert, ProtocolPartyId, SerDeShelleyFileFormat, +}; +#[cfg(feature = "future_dmq")] +use mithril_common::messages::RegisterSignatureMessageDmq; use mithril_common::signable_builder::{ CardanoStakeDistributionSignableBuilder, CardanoTransactionsSignableBuilder, MithrilSignableBuilderService, MithrilStakeDistributionSignableBuilder, @@ -37,10 +41,12 @@ use mithril_persistence::database::repository::CardanoTransactionRepository; use mithril_persistence::database::{ApplicationNodeType, SqlMigration}; use mithril_persistence::sqlite::{ConnectionBuilder, SqliteConnection, SqliteConnectionPool}; -use crate::database::repository::{ - ProtocolInitializerRepository, SignedBeaconRepository, StakePoolStore, -}; +#[cfg(feature = "future_dmq")] +use mithril_dmq::{DmqMessageBuilder, DmqPublisherPallas}; + use crate::dependency_injection::SignerDependencyContainer; +#[cfg(feature = "future_dmq")] +use crate::services::SignaturePublisherDmq; use crate::services::{ AggregatorHTTPClient, CardanoTransactionsImporter, CardanoTransactionsPreloaderActivationSigner, MithrilEpochService, MithrilSingleSigner, @@ -50,6 +56,10 @@ use crate::services::{ TransactionsImporterWithPruner, TransactionsImporterWithVacuum, }; use crate::store::MKTreeStoreSqlite; +use crate::{ + database::repository::{ProtocolInitializerRepository, SignedBeaconRepository, StakePoolStore}, + services::SignaturePublisher, +}; use crate::{ Configuration, MetricsService, HTTP_REQUEST_TIMEOUT_DURATION, SQLITE_FILE, SQLITE_FILE_CARDANO_TRANSACTION, @@ -390,10 +400,53 @@ impl<'a> DependenciesBuilder<'a> { self.root_logger(), )); + let kes_signer = match ( + &self.config.kes_secret_key_path, + &self.config.operational_certificate_path, + ) { + (Some(kes_secret_key_path), Some(operational_certificate_path)) => { + Some(Arc::new(KesSignerStandard::new( + kes_secret_key_path.clone(), + operational_certificate_path.clone(), + )) as Arc) + } + (Some(_), None) | (None, Some(_)) => { + return Err(anyhow!( + "kes_secret_key and operational_certificate are both mandatory".to_string(), + )) + } + _ => None, + }; + let signature_publisher = { - // Temporary no-op publisher before a DMQ-based implementation is available. let first_publisher = SignaturePublisherRetrier::new( - Arc::new(SignaturePublisherNoop {}), + { + #[cfg(feature = "future_dmq")] + let publisher = match &self.config.dmq_node_socket_path { + Some(dmq_node_socket_path) => { + let cardano_network = &self.config.get_network()?; + let dmq_message_builder = DmqMessageBuilder::new( + kes_signer.clone().ok_or(anyhow!( + "A KES signer is mandatory to sign DMQ messages" + ))?, + chain_observer.clone(), + ); + Arc::new(SignaturePublisherDmq::new(Arc::new(DmqPublisherPallas::< + RegisterSignatureMessageDmq, + >::new( + dmq_node_socket_path.to_owned(), + *cardano_network, + dmq_message_builder, + self.root_logger(), + )))) as Arc + } + _ => Arc::new(SignaturePublisherNoop) as Arc, + }; + #[cfg(not(feature = "future_dmq"))] + let publisher = Arc::new(SignaturePublisherNoop) as Arc; + + publisher + }, SignaturePublishRetryPolicy::never(), ); @@ -442,6 +495,7 @@ impl<'a> DependenciesBuilder<'a> { upkeep_service, epoch_service, certifier, + kes_signer, }; Ok(services) diff --git a/mithril-signer/src/dependency_injection/containers.rs b/mithril-signer/src/dependency_injection/containers.rs index 5cf9b9eb8bf..0bf258e4802 100644 --- a/mithril-signer/src/dependency_injection/containers.rs +++ b/mithril-signer/src/dependency_injection/containers.rs @@ -1,3 +1,4 @@ +use mithril_common::crypto_helper::KesSigner; use std::sync::Arc; use tokio::sync::RwLock; @@ -81,4 +82,7 @@ pub struct SignerDependencyContainer { /// Certifier service pub certifier: Arc, + + /// Kes signer service + pub kes_signer: Option>, } diff --git a/mithril-signer/src/runtime/runner.rs b/mithril-signer/src/runtime/runner.rs index 286fd34400d..b22afce6b7b 100644 --- a/mithril-signer/src/runtime/runner.rs +++ b/mithril-signer/src/runtime/runner.rs @@ -1,14 +1,10 @@ -use std::sync::Arc; - use anyhow::Context; use async_trait::async_trait; use slog::{debug, warn, Logger}; use thiserror::Error; use tokio::sync::RwLockReadGuard; -use mithril_common::crypto_helper::{ - KesPeriod, KesSigner, KesSignerStandard, OpCert, ProtocolOpCert, SerDeShelleyFileFormat, -}; +use mithril_common::crypto_helper::{KesPeriod, OpCert, ProtocolOpCert, SerDeShelleyFileFormat}; use mithril_common::entities::{ Epoch, PartyId, ProtocolMessage, SignedEntityType, Signer, TimePoint, }; @@ -186,28 +182,10 @@ impl Runner for SignerRunner { ), None => None, }; - let kes_signer = match ( - &self.config.kes_secret_key_path, - &self.config.operational_certificate_path, - ) { - (Some(kes_secret_key_path), Some(operational_certificate_path)) => { - Some(Arc::new(KesSignerStandard::new( - kes_secret_key_path.clone(), - operational_certificate_path.clone(), - )) as Arc) - } - (Some(_), None) | (None, Some(_)) => { - return Err(RunnerError::NoValueError( - "kes_secret_key and operational_certificate are both mandatory".to_string(), - ) - .into()) - } - _ => None, - }; let protocol_initializer = MithrilProtocolInitializerBuilder::build( stake, &protocol_parameters, - kes_signer, + self.services.kes_signer.clone(), kes_period, )?; let signer = Signer::new( @@ -528,6 +506,7 @@ mod tests { aggregator_client.clone(), logger.clone(), )); + let kes_signer = None; SignerDependencyContainer { stake_store, @@ -547,6 +526,7 @@ mod tests { upkeep_service, epoch_service, certifier, + kes_signer, } } diff --git a/mithril-signer/src/services/signature_publisher/dmq.rs b/mithril-signer/src/services/signature_publisher/dmq.rs new file mode 100644 index 00000000000..80c88c946e3 --- /dev/null +++ b/mithril-signer/src/services/signature_publisher/dmq.rs @@ -0,0 +1,83 @@ +use std::sync::Arc; + +use anyhow::Context; +use async_trait::async_trait; + +use mithril_common::{ + entities::{ProtocolMessage, SignedEntityType, SingleSignature}, + messages::RegisterSignatureMessageDmq, + StdResult, +}; +use mithril_dmq::DmqPublisher; + +use super::SignaturePublisher; + +/// DMQ implementation of the [SignaturePublisher] trait. +pub struct SignaturePublisherDmq { + dmq_publisher: Arc>, +} + +impl SignaturePublisherDmq { + /// Creates a new instance of [SignaturePublisherDmq]. + pub fn new(dmq_publisher: Arc>) -> Self { + Self { dmq_publisher } + } +} + +#[async_trait] +impl SignaturePublisher for SignaturePublisherDmq { + async fn publish( + &self, + signed_entity_type: &SignedEntityType, + signature: &SingleSignature, + _protocol_message: &ProtocolMessage, + ) -> StdResult<()> { + let message = RegisterSignatureMessageDmq { + signature: signature.signature.to_owned(), + signed_entity_type: signed_entity_type.to_owned(), + }; + + self.dmq_publisher + .publish_message(message) + .await + .with_context(|| "Failed to publish DMQ message") + } +} + +#[cfg(test)] +mod tests { + use mithril_common::test_utils::fake_data; + use mithril_dmq::test::double::DmqPublisherFake; + + use super::*; + + #[tokio::test] + async fn publish_signature_success() { + let signed_entity_type = SignedEntityType::dummy(); + let signature = fake_data::single_signature(vec![1, 2, 3]); + let protocol_message = ProtocolMessage::default(); + let dmq_publisher = Arc::new(DmqPublisherFake::new(vec![Ok(())])); + let publisher = SignaturePublisherDmq::new(dmq_publisher); + + publisher + .publish(&signed_entity_type, &signature, &protocol_message) + .await + .unwrap(); + } + + #[tokio::test] + async fn publish_signature_failure() { + let signed_entity_type = SignedEntityType::dummy(); + let signature = fake_data::single_signature(vec![1, 2, 3]); + let protocol_message = ProtocolMessage::default(); + let dmq_publisher = Arc::new(DmqPublisherFake::new(vec![Err(anyhow::anyhow!( + "Test error" + ))])); + let publisher = SignaturePublisherDmq::new(dmq_publisher); + + publisher + .publish(&signed_entity_type, &signature, &protocol_message) + .await + .expect_err("SignaturePublisherDmq should return an error"); + } +} diff --git a/mithril-signer/src/services/signature_publisher/mod.rs b/mithril-signer/src/services/signature_publisher/mod.rs index ad86537109a..def982fda6f 100644 --- a/mithril-signer/src/services/signature_publisher/mod.rs +++ b/mithril-signer/src/services/signature_publisher/mod.rs @@ -1,10 +1,14 @@ mod delayer; +#[cfg(feature = "future_dmq")] +mod dmq; mod http; mod interface; mod noop; mod retrier; pub use delayer::*; +#[cfg(feature = "future_dmq")] +pub use dmq::*; pub use interface::*; pub use noop::*; pub use retrier::*; diff --git a/mithril-signer/tests/test_extensions/state_machine_tester.rs b/mithril-signer/tests/test_extensions/state_machine_tester.rs index 1c5ffe95718..7da1eac2234 100644 --- a/mithril-signer/tests/test_extensions/state_machine_tester.rs +++ b/mithril-signer/tests/test_extensions/state_machine_tester.rs @@ -27,6 +27,7 @@ use mithril_cardano_node_internal_database::{ }; use mithril_common::{ api_version::APIVersionProvider, + crypto_helper::{KesSigner, KesSignerStandard}, entities::{ BlockNumber, CardanoTransactionsSigningConfig, ChainPoint, Epoch, SignedEntityConfig, SignedEntityType, SignedEntityTypeDiscriminants, SignerWithStake, SlotNumber, SupportedEra, @@ -287,6 +288,10 @@ impl StateMachineTester { certificate_handler.clone(), logger.clone(), )); + let kes_signer = Some(Arc::new(KesSignerStandard::new( + config.kes_secret_key_path.clone().unwrap(), + config.operational_certificate_path.clone().unwrap(), + )) as Arc); let services = SignerDependencyContainer { certificate_handler: certificate_handler.clone(), @@ -306,6 +311,7 @@ impl StateMachineTester { upkeep_service, epoch_service, certifier, + kes_signer, }; // set up stake distribution chain_observer