From 2ece3294bcfc151908741dbf4ca8aaa423c74224 Mon Sep 17 00:00:00 2001 From: Jean-Philippe Raynaud Date: Tue, 2 Sep 2025 12:53:33 +0200 Subject: [PATCH 01/24] refactor(dmq): update fake 'DMQMsg' used in tests --- internal/mithril-dmq/src/consumer/client/pallas.rs | 12 ++++++------ internal/mithril-dmq/src/consumer/server/pallas.rs | 6 +++--- internal/mithril-dmq/src/consumer/server/queue.rs | 6 +++--- internal/mithril-dmq/src/model/message.rs | 14 +++++++------- .../mithril-dmq/src/publisher/server/pallas.rs | 8 ++++---- 5 files changed, 23 insertions(+), 23 deletions(-) diff --git a/internal/mithril-dmq/src/consumer/client/pallas.rs b/internal/mithril-dmq/src/consumer/client/pallas.rs index 6030e7b9bcb..149dc55c818 100644 --- a/internal/mithril-dmq/src/consumer/client/pallas.rs +++ b/internal/mithril-dmq/src/consumer/client/pallas.rs @@ -175,9 +175,8 @@ mod tests { DmqMsg { msg_id: vec![0, 1], msg_body: DmqMessageTestPayload::new(b"msg_1").to_bytes_vec().unwrap(), - block_number: 10, - ttl: 100, kes_signature: vec![0, 1, 2, 3], + kes_period: 10, operational_certificate: vec![ 130, 132, 88, 32, 230, 80, 215, 83, 21, 9, 187, 108, 255, 215, 153, 140, 40, 198, 142, 78, 200, 250, 98, 26, 9, 82, 32, 110, 161, 30, 176, 63, 205, 125, @@ -188,14 +187,14 @@ mod tests { 32, 32, 253, 186, 201, 177, 11, 117, 135, 187, 167, 181, 188, 22, 59, 206, 105, 231, 150, 215, 30, 78, 212, 76, 16, 252, 180, 72, 134, 137, 247, 161, 68, ], - kes_period: 10, + cold_verification_key: vec![0, 1, 2, 3, 4, 5], + expires_at: 100, }, DmqMsg { msg_id: vec![1, 2], msg_body: DmqMessageTestPayload::new(b"msg_2").to_bytes_vec().unwrap(), - block_number: 11, - ttl: 100, kes_signature: vec![1, 2, 3, 4], + kes_period: 11, operational_certificate: vec![ 130, 132, 88, 32, 230, 80, 215, 83, 21, 9, 187, 108, 255, 215, 153, 140, 40, 198, 142, 78, 200, 250, 98, 26, 9, 82, 32, 110, 161, 30, 176, 63, 205, 125, @@ -207,7 +206,8 @@ mod tests { 240, 103, 245, 159, 147, 177, 110, 58, 248, 115, 58, 152, 138, 220, 35, 65, 245, 200, ], - kes_period: 11, + cold_verification_key: vec![0, 1, 2, 3, 4, 5], + expires_at: 101, }, ] } diff --git a/internal/mithril-dmq/src/consumer/server/pallas.rs b/internal/mithril-dmq/src/consumer/server/pallas.rs index 0a7ac3d0c3c..e7e5736488a 100644 --- a/internal/mithril-dmq/src/consumer/server/pallas.rs +++ b/internal/mithril-dmq/src/consumer/server/pallas.rs @@ -301,11 +301,11 @@ mod tests { DmqMsg { msg_id: vec![0, 1], msg_body: vec![0, 1, 2], - block_number: 10, - ttl: 100, kes_signature: vec![0, 1, 2, 3], - operational_certificate: vec![0, 1, 2, 3, 4], kes_period: 10, + operational_certificate: vec![0, 1, 2, 3, 4], + cold_verification_key: vec![0, 1, 2, 3, 4, 5], + expires_at: 100, } } diff --git a/internal/mithril-dmq/src/consumer/server/queue.rs b/internal/mithril-dmq/src/consumer/server/queue.rs index 5ee5c8190f0..c16ba85e9bb 100644 --- a/internal/mithril-dmq/src/consumer/server/queue.rs +++ b/internal/mithril-dmq/src/consumer/server/queue.rs @@ -94,11 +94,11 @@ mod tests { DmqMsg { msg_id: vec![0, 1], msg_body: vec![0, 1, 2], - block_number: 10, - ttl: 100, kes_signature: vec![0, 1, 2, 3], - operational_certificate: vec![0, 1, 2, 3, 4], kes_period: 10, + operational_certificate: vec![0, 1, 2, 3, 4], + cold_verification_key: vec![0, 1, 2, 3, 4, 5], + expires_at: 100, } } diff --git a/internal/mithril-dmq/src/model/message.rs b/internal/mithril-dmq/src/model/message.rs index 848704bce99..7b263bf8d4a 100644 --- a/internal/mithril-dmq/src/model/message.rs +++ b/internal/mithril-dmq/src/model/message.rs @@ -77,13 +77,13 @@ mod tests { #[test] fn test_dmq_message_serialize_deserialize() { let dmq_msg = DmqMsg { - msg_id: vec![1, 2, 3], - msg_body: vec![4, 5, 6], - block_number: 123, - ttl: 10, - kes_signature: vec![7, 8, 9], - operational_certificate: vec![10, 11, 12], - kes_period: 0, + msg_id: vec![0, 1], + msg_body: vec![0, 1, 2], + kes_signature: vec![0, 1, 2, 3], + kes_period: 10, + operational_certificate: vec![0, 1, 2, 3, 4], + cold_verification_key: vec![0, 1, 2, 3, 4, 5], + expires_at: 100, }; let dmq_message = DmqMessage::from(dmq_msg.clone()); diff --git a/internal/mithril-dmq/src/publisher/server/pallas.rs b/internal/mithril-dmq/src/publisher/server/pallas.rs index deb0ea250f1..fcd466f20dd 100644 --- a/internal/mithril-dmq/src/publisher/server/pallas.rs +++ b/internal/mithril-dmq/src/publisher/server/pallas.rs @@ -259,12 +259,12 @@ mod tests { async fn fake_msg() -> DmqMsg { DmqMsg { msg_id: vec![0, 1], - msg_body: vec![2, 3, 4, 5], - block_number: 10, - ttl: 100, + msg_body: vec![0, 1, 2], kes_signature: vec![0, 1, 2, 3], - operational_certificate: vec![0, 1, 2, 3, 4, 5], kes_period: 10, + operational_certificate: vec![0, 1, 2, 3, 4], + cold_verification_key: vec![0, 1, 2, 3, 4, 5], + expires_at: 100, } } From aeb811e0d259397de43c59079b4fb3049464e2ee Mon Sep 17 00:00:00 2001 From: Jean-Philippe Raynaud Date: Tue, 2 Sep 2025 17:39:04 +0200 Subject: [PATCH 02/24] refactor(dmq): adapt consumer client with new 'DmqMsg' structure --- .../mithril-dmq/src/consumer/client/pallas.rs | 81 +++++++++++-------- 1 file changed, 46 insertions(+), 35 deletions(-) diff --git a/internal/mithril-dmq/src/consumer/client/pallas.rs b/internal/mithril-dmq/src/consumer/client/pallas.rs index 149dc55c818..c383f8f4b7d 100644 --- a/internal/mithril-dmq/src/consumer/client/pallas.rs +++ b/internal/mithril-dmq/src/consumer/client/pallas.rs @@ -113,10 +113,11 @@ impl DmqConsumerClientPallas { .0 .into_iter() .map(|dmq_message| { - let opcert = OpCert::try_from_bytes(&dmq_message.operational_certificate) - .with_context(|| "Failed to parse operational certificate")?; + let opcert = + OpCert::try_from_bytes(&dmq_message.msg_payload.operational_certificate) + .with_context(|| "Failed to parse operational certificate")?; let party_id = opcert.compute_protocol_party_id()?; - let payload = M::try_from_bytes(&dmq_message.msg_body) + let payload = M::try_from_bytes(&dmq_message.msg_payload.msg_body) .with_context(|| "Failed to parse DMQ message body")?; Ok((payload, party_id)) @@ -158,7 +159,10 @@ mod tests { use mithril_common::{crypto_helper::TryToBytes, current_function, test::TempDir}; use pallas_network::{ facades::DmqServer, - miniprotocols::{localmsgnotification, localmsgsubmission::DmqMsg}, + miniprotocols::{ + localmsgnotification, + localmsgsubmission::{DmqMsg, DmqMsgPayload}, + }, }; use tokio::{net::UnixListener, task::JoinHandle, time::sleep}; @@ -173,41 +177,48 @@ mod tests { fn fake_msgs() -> Vec { vec![ DmqMsg { - msg_id: vec![0, 1], - msg_body: DmqMessageTestPayload::new(b"msg_1").to_bytes_vec().unwrap(), + msg_payload: DmqMsgPayload { + msg_id: vec![0, 1], + msg_body: DmqMessageTestPayload::new(b"msg_1").to_bytes_vec().unwrap(), + + kes_period: 10, + operational_certificate: vec![ + 130, 132, 88, 32, 230, 80, 215, 83, 21, 9, 187, 108, 255, 215, 153, 140, + 40, 198, 142, 78, 200, 250, 98, 26, 9, 82, 32, 110, 161, 30, 176, 63, 205, + 125, 203, 41, 0, 0, 88, 64, 212, 171, 206, 39, 218, 5, 255, 3, 193, 52, 44, + 198, 171, 83, 19, 80, 114, 225, 186, 191, 156, 192, 84, 146, 245, 159, 31, + 240, 9, 247, 4, 87, 170, 168, 98, 199, 21, 139, 19, 190, 12, 251, 65, 215, + 169, 26, 86, 37, 137, 188, 17, 14, 178, 205, 175, 93, 39, 86, 4, 138, 187, + 234, 95, 5, 88, 32, 32, 253, 186, 201, 177, 11, 117, 135, 187, 167, 181, + 188, 22, 59, 206, 105, 231, 150, 215, 30, 78, 212, 76, 16, 252, 180, 72, + 134, 137, 247, 161, 68, + ], + cold_verification_key: vec![0, 1, 2, 3, 4, 5], + expires_at: 100, + }, kes_signature: vec![0, 1, 2, 3], - kes_period: 10, - operational_certificate: vec![ - 130, 132, 88, 32, 230, 80, 215, 83, 21, 9, 187, 108, 255, 215, 153, 140, 40, - 198, 142, 78, 200, 250, 98, 26, 9, 82, 32, 110, 161, 30, 176, 63, 205, 125, - 203, 41, 0, 0, 88, 64, 212, 171, 206, 39, 218, 5, 255, 3, 193, 52, 44, 198, - 171, 83, 19, 80, 114, 225, 186, 191, 156, 192, 84, 146, 245, 159, 31, 240, 9, - 247, 4, 87, 170, 168, 98, 199, 21, 139, 19, 190, 12, 251, 65, 215, 169, 26, 86, - 37, 137, 188, 17, 14, 178, 205, 175, 93, 39, 86, 4, 138, 187, 234, 95, 5, 88, - 32, 32, 253, 186, 201, 177, 11, 117, 135, 187, 167, 181, 188, 22, 59, 206, 105, - 231, 150, 215, 30, 78, 212, 76, 16, 252, 180, 72, 134, 137, 247, 161, 68, - ], - cold_verification_key: vec![0, 1, 2, 3, 4, 5], - expires_at: 100, }, DmqMsg { - msg_id: vec![1, 2], - msg_body: DmqMessageTestPayload::new(b"msg_2").to_bytes_vec().unwrap(), + msg_payload: DmqMsgPayload { + msg_id: vec![1, 2], + msg_body: DmqMessageTestPayload::new(b"msg_2").to_bytes_vec().unwrap(), + + kes_period: 11, + operational_certificate: vec![ + 130, 132, 88, 32, 230, 80, 215, 83, 21, 9, 187, 108, 255, 215, 153, 140, + 40, 198, 142, 78, 200, 250, 98, 26, 9, 82, 32, 110, 161, 30, 176, 63, 205, + 125, 203, 41, 0, 0, 88, 64, 132, 4, 199, 39, 190, 173, 88, 102, 121, 117, + 55, 62, 39, 189, 113, 96, 175, 24, 171, 240, 74, 42, 139, 202, 128, 185, + 44, 130, 209, 77, 191, 122, 196, 224, 33, 158, 187, 156, 203, 190, 173, + 150, 247, 87, 172, 58, 153, 185, 157, 87, 128, 14, 187, 107, 187, 215, 105, + 195, 107, 135, 172, 43, 173, 9, 88, 32, 77, 75, 24, 6, 47, 133, 2, 89, 141, + 224, 69, 202, 123, 105, 240, 103, 245, 159, 147, 177, 110, 58, 248, 115, + 58, 152, 138, 220, 35, 65, 245, 200, + ], + cold_verification_key: vec![0, 1, 2, 3, 4, 5], + expires_at: 101, + }, kes_signature: vec![1, 2, 3, 4], - kes_period: 11, - operational_certificate: vec![ - 130, 132, 88, 32, 230, 80, 215, 83, 21, 9, 187, 108, 255, 215, 153, 140, 40, - 198, 142, 78, 200, 250, 98, 26, 9, 82, 32, 110, 161, 30, 176, 63, 205, 125, - 203, 41, 0, 0, 88, 64, 132, 4, 199, 39, 190, 173, 88, 102, 121, 117, 55, 62, - 39, 189, 113, 96, 175, 24, 171, 240, 74, 42, 139, 202, 128, 185, 44, 130, 209, - 77, 191, 122, 196, 224, 33, 158, 187, 156, 203, 190, 173, 150, 247, 87, 172, - 58, 153, 185, 157, 87, 128, 14, 187, 107, 187, 215, 105, 195, 107, 135, 172, - 43, 173, 9, 88, 32, 77, 75, 24, 6, 47, 133, 2, 89, 141, 224, 69, 202, 123, 105, - 240, 103, 245, 159, 147, 177, 110, 58, 248, 115, 58, 152, 138, 220, 35, 65, - 245, 200, - ], - cold_verification_key: vec![0, 1, 2, 3, 4, 5], - expires_at: 101, }, ] } From fedd9f4d5d2ae250426e4db2a40b07f57100462d Mon Sep 17 00:00:00 2001 From: Jean-Philippe Raynaud Date: Tue, 2 Sep 2025 17:39:21 +0200 Subject: [PATCH 03/24] refactor(dmq): adapt consumer server with new 'DmqMsg' structure --- .../mithril-dmq/src/consumer/server/pallas.rs | 20 +++++++++------ .../mithril-dmq/src/consumer/server/queue.rs | 25 ++++++++++--------- 2 files changed, 26 insertions(+), 19 deletions(-) diff --git a/internal/mithril-dmq/src/consumer/server/pallas.rs b/internal/mithril-dmq/src/consumer/server/pallas.rs index e7e5736488a..721fe7abb56 100644 --- a/internal/mithril-dmq/src/consumer/server/pallas.rs +++ b/internal/mithril-dmq/src/consumer/server/pallas.rs @@ -282,7 +282,10 @@ mod tests { use pallas_network::{ facades::DmqClient, - miniprotocols::{localmsgnotification, localmsgsubmission::DmqMsg}, + miniprotocols::{ + localmsgnotification, + localmsgsubmission::{DmqMsg, DmqMsgPayload}, + }, }; use tokio::sync::{mpsc::unbounded_channel, watch}; use tokio::time::sleep; @@ -299,13 +302,16 @@ mod tests { fn fake_msg() -> DmqMsg { DmqMsg { - msg_id: vec![0, 1], - msg_body: vec![0, 1, 2], + msg_payload: DmqMsgPayload { + msg_id: vec![0, 1], + msg_body: vec![0, 1, 2], + + kes_period: 10, + operational_certificate: vec![0, 1, 2, 3, 4], + cold_verification_key: vec![0, 1, 2, 3, 4, 5], + expires_at: 100, + }, kes_signature: vec![0, 1, 2, 3], - kes_period: 10, - operational_certificate: vec![0, 1, 2, 3, 4], - cold_verification_key: vec![0, 1, 2, 3, 4, 5], - expires_at: 100, } } diff --git a/internal/mithril-dmq/src/consumer/server/queue.rs b/internal/mithril-dmq/src/consumer/server/queue.rs index c16ba85e9bb..7927090eb14 100644 --- a/internal/mithril-dmq/src/consumer/server/queue.rs +++ b/internal/mithril-dmq/src/consumer/server/queue.rs @@ -85,31 +85,32 @@ mod tests { use std::{ops::RangeInclusive, time::Duration}; use anyhow::anyhow; - use pallas_network::miniprotocols::localmsgsubmission::DmqMsg; + use pallas_network::miniprotocols::localmsgsubmission::{DmqMsg, DmqMsgPayload}; use tokio::time::sleep; use super::*; fn fake_msg() -> DmqMsg { DmqMsg { - msg_id: vec![0, 1], - msg_body: vec![0, 1, 2], + msg_payload: DmqMsgPayload { + msg_id: vec![0, 1], + msg_body: vec![0, 1, 2], + + kes_period: 10, + operational_certificate: vec![0, 1, 2, 3, 4], + cold_verification_key: vec![0, 1, 2, 3, 4, 5], + expires_at: 100, + }, kes_signature: vec![0, 1, 2, 3], - kes_period: 10, - operational_certificate: vec![0, 1, 2, 3, 4], - cold_verification_key: vec![0, 1, 2, 3, 4, 5], - expires_at: 100, } } fn fake_messages(range: RangeInclusive) -> Vec { range .map(|i| { - DmqMsg { - msg_id: vec![i], - ..fake_msg() - } - .into() + let mut message = fake_msg(); + message.msg_payload.msg_id = vec![i]; + message.into() }) .collect::>() } From 841671ca0c97104f45042968bb7918b5ab02c31b Mon Sep 17 00:00:00 2001 From: Jean-Philippe Raynaud Date: Tue, 2 Sep 2025 17:40:16 +0200 Subject: [PATCH 04/24] refactor(dmq): adapt publisher server with new 'DmqMsg' structure --- .../src/publisher/server/pallas.rs | 20 ++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/internal/mithril-dmq/src/publisher/server/pallas.rs b/internal/mithril-dmq/src/publisher/server/pallas.rs index fcd466f20dd..040ec5a67b3 100644 --- a/internal/mithril-dmq/src/publisher/server/pallas.rs +++ b/internal/mithril-dmq/src/publisher/server/pallas.rs @@ -242,7 +242,10 @@ mod tests { use pallas_network::{ facades::DmqClient, - miniprotocols::{localmsgsubmission::DmqMsg, localtxsubmission}, + miniprotocols::{ + localmsgsubmission::{DmqMsg, DmqMsgPayload}, + localtxsubmission, + }, }; use tokio::sync::{mpsc::unbounded_channel, watch}; @@ -258,13 +261,16 @@ mod tests { async fn fake_msg() -> DmqMsg { DmqMsg { - msg_id: vec![0, 1], - msg_body: vec![0, 1, 2], + msg_payload: DmqMsgPayload { + msg_id: vec![0, 1], + msg_body: vec![0, 1, 2], + + kes_period: 10, + operational_certificate: vec![0, 1, 2, 3, 4], + cold_verification_key: vec![0, 1, 2, 3, 4, 5], + expires_at: 100, + }, kes_signature: vec![0, 1, 2, 3], - kes_period: 10, - operational_certificate: vec![0, 1, 2, 3, 4], - cold_verification_key: vec![0, 1, 2, 3, 4, 5], - expires_at: 100, } } From 812c79828d1166035d0ac8a1511fedc5ddaeadf8 Mon Sep 17 00:00:00 2001 From: Jean-Philippe Raynaud Date: Tue, 2 Sep 2025 17:41:17 +0200 Subject: [PATCH 05/24] feat(dmq): add a Unix timestamp provider trait --- internal/mithril-dmq/src/model/mod.rs | 2 ++ internal/mithril-dmq/src/model/timestamp.rs | 19 +++++++++++++++++++ 2 files changed, 21 insertions(+) create mode 100644 internal/mithril-dmq/src/model/timestamp.rs diff --git a/internal/mithril-dmq/src/model/mod.rs b/internal/mithril-dmq/src/model/mod.rs index 959c5f7ee05..d5e78173523 100644 --- a/internal/mithril-dmq/src/model/mod.rs +++ b/internal/mithril-dmq/src/model/mod.rs @@ -1,5 +1,7 @@ mod builder; mod message; +mod timestamp; pub use builder::*; pub use message::*; +pub use timestamp::*; diff --git a/internal/mithril-dmq/src/model/timestamp.rs b/internal/mithril-dmq/src/model/timestamp.rs new file mode 100644 index 00000000000..c1c7f8bfe34 --- /dev/null +++ b/internal/mithril-dmq/src/model/timestamp.rs @@ -0,0 +1,19 @@ +use std::time::{SystemTime, UNIX_EPOCH}; + +use mithril_common::StdResult; + +/// Provides the current timestamp in seconds since the UNIX epoch. +#[cfg_attr(test, mockall::automock)] +pub trait UnixTimestampProvider: Send + Sync { + /// Returns the current timestamp in seconds since the UNIX epoch. + fn current_timestamp(&self) -> StdResult; +} + +/// Provides the current timestamp in seconds since the UNIX epoch from the system. +pub struct SystemUnixTimestampProvider; + +impl UnixTimestampProvider for SystemUnixTimestampProvider { + fn current_timestamp(&self) -> StdResult { + Ok(SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs()) + } +} From 22b57202069dafb4746423435e8b2640c500e8f7 Mon Sep 17 00:00:00 2001 From: Jean-Philippe Raynaud Date: Tue, 2 Sep 2025 17:42:26 +0200 Subject: [PATCH 06/24] refactor(dmq): update the message builder and its KES signature --- internal/mithril-dmq/src/model/builder.rs | 129 +++++++++++++--------- internal/mithril-dmq/src/model/message.rs | 17 ++- 2 files changed, 87 insertions(+), 59 deletions(-) diff --git a/internal/mithril-dmq/src/model/builder.rs b/internal/mithril-dmq/src/model/builder.rs index 2d49ae5afcc..211cf527c13 100644 --- a/internal/mithril-dmq/src/model/builder.rs +++ b/internal/mithril-dmq/src/model/builder.rs @@ -1,8 +1,8 @@ use std::sync::Arc; -use anyhow::{Context, anyhow}; +use anyhow::Context; use blake2::{Blake2b, Digest, digest::consts::U64}; -use pallas_network::miniprotocols::localmsgsubmission::DmqMsg; +use pallas_network::miniprotocols::localmsgsubmission::{DmqMsg, DmqMsgPayload}; use mithril_cardano_node_chain::chain_observer::ChainObserver; use mithril_common::{ @@ -10,16 +10,17 @@ use mithril_common::{ crypto_helper::{KesSigner, TryToBytes}, }; -use crate::model::DmqMessage; +use crate::model::{DmqMessage, SystemUnixTimestampProvider, UnixTimestampProvider}; -/// The TTL (Time To Live) for DMQ messages in blocks. -const DMQ_MESSAGE_TTL_IN_BLOCKS: u16 = 100; +/// The TTL (Time To Live) for DMQ messages in seconds (default is 30 minutes). +const DMQ_MESSAGE_TTL_IN_SECONDS: u16 = 1800; /// A builder for creating DMQ messages. pub struct DmqMessageBuilder { kes_signer: Arc, chain_observer: Arc, - ttl_blocks: u16, + timestamp_provider: Arc, + ttl_seconds: u16, } impl DmqMessageBuilder { @@ -28,43 +29,45 @@ impl DmqMessageBuilder { Self { kes_signer, chain_observer, - ttl_blocks: DMQ_MESSAGE_TTL_IN_BLOCKS, + timestamp_provider: Arc::new(SystemUnixTimestampProvider), + ttl_seconds: DMQ_MESSAGE_TTL_IN_SECONDS, } } - /// Set the TTL (Time To Live) for DMQ messages in blocks. - pub fn set_ttl(mut self, ttl_blocks: u16) -> Self { - self.ttl_blocks = ttl_blocks; + /// Sets the timestamp provider for the DMQ message builder. + pub fn set_timestamp_provider( + mut self, + timestamp_provider: Arc, + ) -> Self { + self.timestamp_provider = timestamp_provider; + self + } + + /// Sets the TTL (Time To Live) for DMQ messages in seconds. + pub fn set_ttl(mut self, ttl_seconds: u16) -> Self { + self.ttl_seconds = ttl_seconds; self } + /// Computes a message id for a DMQ message payload. + fn compute_msg_id(dmq_message_payload: &DmqMsgPayload) -> Vec { + let mut hasher = Blake2b::::new(); + hasher.update(&dmq_message_payload.msg_body); + hasher.update(dmq_message_payload.kes_period.to_be_bytes()); + hasher.update(&dmq_message_payload.operational_certificate); + hasher.update(&dmq_message_payload.cold_verification_key); + hasher.update(dmq_message_payload.expires_at.to_be_bytes()); + + hasher.finalize().to_vec() + } + /// Builds a DMQ message from the provided message bytes. pub async fn build(&self, message_bytes: &[u8]) -> StdResult { - fn compute_msg_id(dmq_message: &DmqMsg) -> Vec { - let mut hasher = Blake2b::::new(); - hasher.update(&dmq_message.msg_body); - hasher.update(dmq_message.block_number.to_be_bytes()); - hasher.update(dmq_message.ttl.to_be_bytes()); - hasher.update(&dmq_message.kes_signature); - hasher.update(&dmq_message.operational_certificate); - hasher.update(dmq_message.kes_period.to_be_bytes()); - - hasher.finalize().to_vec() - } - - let block_number = self - .chain_observer - .get_current_chain_point() - .await - .with_context(|| "Failed to get current chain point while building DMQ message")? - .ok_or(anyhow!( - "No current chain point available while building DMQ message" - ))? - .block_number; - let block_number = (*block_number) + let expires_at: u32 = (self.timestamp_provider.current_timestamp()? + + self.ttl_seconds as u64) .try_into() - .with_context(|| "Failed to convert block number to u32")?; + .with_context(|| "Failed to compute expires_at while building DMQ message")?; let kes_period = self .chain_observer .get_current_kes_period() @@ -75,16 +78,23 @@ impl DmqMessageBuilder { .kes_signer .sign(message_bytes, kes_period) .with_context(|| "Failed to KES sign message while building DMQ message")?; - let mut dmq_message = DmqMsg { - msg_id: vec![], - msg_body: message_bytes.to_vec(), - block_number, - ttl: self.ttl_blocks, + + let dmq_message = DmqMsg { + msg_payload: { + let mut dmq_message_payload = DmqMsgPayload { + msg_id: vec![], + msg_body: message_bytes.to_vec(), + kes_period: kes_period as u64, + operational_certificate: operational_certificate.to_bytes_vec()?, // TODO: remove the cold verification key in the op cert + cold_verification_key: vec![], + expires_at, + }; + dmq_message_payload.msg_id = Self::compute_msg_id(&dmq_message_payload); + + dmq_message_payload + }, kes_signature: kes_signature.to_bytes_vec()?, - operational_certificate: operational_certificate.to_bytes_vec()?, - kes_period, }; - dmq_message.msg_id = compute_msg_id(&dmq_message); Ok(dmq_message.into()) } @@ -100,6 +110,8 @@ mod tests { test::{crypto_helper::KesSignerFake, double::Dummy}, }; + use crate::model::MockUnixTimestampProvider; + use super::*; mod test_utils { @@ -131,28 +143,39 @@ mod tests { }, ..TimePoint::dummy() }))); - let builder = DmqMessageBuilder::new(kes_signer, chain_observer).set_ttl(100); + let builder = DmqMessageBuilder::new(kes_signer, chain_observer) + .set_ttl(1000) + .set_timestamp_provider(Arc::new({ + let mut mock_timestamp_provider = MockUnixTimestampProvider::new(); + mock_timestamp_provider + .expect_current_timestamp() + .returning(|| Ok(234)); + + mock_timestamp_provider + })); let message = test_utils::TestMessage { content: b"test".to_vec(), }; let dmq_message = builder.build(&message.to_bytes_vec().unwrap()).await.unwrap(); - assert!(!dmq_message.msg_id.is_empty()); + let DmqMsg { + msg_payload, + kes_signature: _, + } = &*dmq_message; assert_eq!( DmqMsg { - msg_id: vec![], - msg_body: b"test".to_vec(), - block_number: 123, - ttl: 100, + msg_payload: DmqMsgPayload { + msg_id: DmqMessageBuilder::compute_msg_id(msg_payload), + msg_body: b"test".to_vec(), + kes_period: 0, + operational_certificate: operational_certificate.to_bytes_vec().unwrap(), + cold_verification_key: vec![], // TODO: fix + expires_at: 1234, + }, kes_signature: kes_signature.to_bytes_vec().unwrap(), - operational_certificate: operational_certificate.to_bytes_vec().unwrap(), - kes_period: 0, }, - DmqMsg { - msg_id: vec![], - ..dmq_message.into() - } + dmq_message.into() ); } } diff --git a/internal/mithril-dmq/src/model/message.rs b/internal/mithril-dmq/src/model/message.rs index 7b263bf8d4a..1ba5fe032c7 100644 --- a/internal/mithril-dmq/src/model/message.rs +++ b/internal/mithril-dmq/src/model/message.rs @@ -72,18 +72,23 @@ impl<'de> Deserialize<'de> for DmqMessage { #[cfg(test)] mod tests { + use pallas_network::miniprotocols::localmsgsubmission::DmqMsgPayload; + use super::*; #[test] fn test_dmq_message_serialize_deserialize() { let dmq_msg = DmqMsg { - msg_id: vec![0, 1], - msg_body: vec![0, 1, 2], + msg_payload: DmqMsgPayload { + msg_id: vec![0, 1], + msg_body: vec![0, 1, 2], + + kes_period: 10, + operational_certificate: vec![0, 1, 2, 3, 4], + cold_verification_key: vec![0, 1, 2, 3, 4, 5], + expires_at: 100, + }, kes_signature: vec![0, 1, 2, 3], - kes_period: 10, - operational_certificate: vec![0, 1, 2, 3, 4], - cold_verification_key: vec![0, 1, 2, 3, 4, 5], - expires_at: 100, }; let dmq_message = DmqMessage::from(dmq_msg.clone()); From fdcb5d9fe592eeed8c322c3672204fc5445c8ec1 Mon Sep 17 00:00:00 2001 From: Jean-Philippe Raynaud Date: Wed, 3 Sep 2025 11:39:36 +0200 Subject: [PATCH 07/24] feat(common): add signed messages recording in 'FakeKesSigner' --- .../crypto_helper/cardano/kes/signer_fake.rs | 25 +++++++++++++++---- 1 file changed, 20 insertions(+), 5 deletions(-) diff --git a/mithril-common/src/test/crypto_helper/cardano/kes/signer_fake.rs b/mithril-common/src/test/crypto_helper/cardano/kes/signer_fake.rs index 7eb9073ee3d..87b5cac4fcc 100644 --- a/mithril-common/src/test/crypto_helper/cardano/kes/signer_fake.rs +++ b/mithril-common/src/test/crypto_helper/cardano/kes/signer_fake.rs @@ -16,6 +16,7 @@ type KesSignatureResult = StdResult<(Sum6KesSig, OpCert)>; /// Fake KES Signer implementation. pub struct KesSignerFake { results: Mutex>, + signed_messages: Mutex>>, } impl KesSignerFake { @@ -23,9 +24,16 @@ impl KesSignerFake { pub fn new(results: Vec) -> Self { Self { results: Mutex::new(results.into()), + signed_messages: Mutex::new(VecDeque::new()), } } + /// Returns the messages that were requested to be signed + pub fn get_signed_messages(&self) -> Vec> { + let messages = self.signed_messages.lock().unwrap(); + messages.iter().cloned().collect() + } + /// Returns a dummy signature result that is always successful. pub fn dummy_signature(test_directory: &str) -> (Sum6KesSig, OpCert) { let KesCryptographicMaterialForTest { @@ -55,7 +63,10 @@ impl KesSignerFake { } impl KesSigner for KesSignerFake { - fn sign(&self, _message: &[u8], _kes_period: KesPeriod) -> KesSignatureResult { + fn sign(&self, message: &[u8], _kes_period: KesPeriod) -> KesSignatureResult { + let mut messages = self.signed_messages.lock().unwrap(); + messages.push_back(message.to_vec()); + let mut results = self.results.lock().unwrap(); results.pop_front().unwrap() @@ -77,11 +88,11 @@ mod tests { 0 as KesPeriod, "fake_kes_signer_returns_signature_batches_in_expected_order", ); - let message = b"Test message for KES signing"; + let message1 = b"Test message 1 for KES signing"; let kes_signer = KesSignerStandard::new(kes_secret_key_file, operational_certificate_file); let kes_signing_period = 1; let (kes_signature, op_cert) = kes_signer - .sign(message, kes_signing_period) + .sign(message1, kes_signing_period) .expect("Signing should not fail"); let fake_kes_signer = KesSignerFake::new(vec![ Ok((kes_signature, op_cert.clone())), @@ -89,13 +100,17 @@ mod tests { ]); let (kes_signature_1, op_cert_1) = fake_kes_signer - .sign(message, kes_signing_period) + .sign(message1, kes_signing_period) .expect("Signing should not fail"); assert_eq!(kes_signature, kes_signature_1); assert_eq!(op_cert, op_cert_1); + let message2 = b"Test message 2 for KES signing"; fake_kes_signer - .sign(message, kes_signing_period) + .sign(message2, kes_signing_period) .expect_err("Signing should fail"); + + let signed_messages = fake_kes_signer.get_signed_messages(); + assert_eq!(vec![message1.to_vec(), message2.to_vec()], signed_messages); } } From 504759717ce6089a1e49c5d4a6cc2d0906424de4 Mon Sep 17 00:00:00 2001 From: Jean-Philippe Raynaud Date: Wed, 3 Sep 2025 12:26:22 +0200 Subject: [PATCH 08/24] refactor(dmq): update message builder signature --- internal/mithril-dmq/src/model/builder.rs | 38 ++++++++++++++++++++--- 1 file changed, 34 insertions(+), 4 deletions(-) diff --git a/internal/mithril-dmq/src/model/builder.rs b/internal/mithril-dmq/src/model/builder.rs index 211cf527c13..fa1dd41fa68 100644 --- a/internal/mithril-dmq/src/model/builder.rs +++ b/internal/mithril-dmq/src/model/builder.rs @@ -74,11 +74,25 @@ impl DmqMessageBuilder { .await .with_context(|| "Failed to get KES period while building DMQ message")? .unwrap_or_default(); + let mut dmq_message_payload = DmqMsgPayload { + msg_id: vec![], + msg_body: message_bytes.to_vec(), + kes_period: kes_period as u64, + operational_certificate: vec![], + cold_verification_key: vec![], + expires_at, + }; + dmq_message_payload.msg_id = Self::compute_msg_id(&dmq_message_payload); + let (kes_signature, operational_certificate) = self .kes_signer - .sign(message_bytes, kes_period) + .sign(&dmq_message_payload.bytes_to_sign()?, kes_period) .with_context(|| "Failed to KES sign message while building DMQ message")?; + // TODO: remove the cold verification key in the op cert + dmq_message_payload.operational_certificate = operational_certificate.to_bytes_vec()?; + dmq_message_payload.cold_verification_key = vec![]; + let dmq_message = DmqMsg { msg_payload: { let mut dmq_message_payload = DmqMsgPayload { @@ -86,7 +100,7 @@ impl DmqMessageBuilder { msg_body: message_bytes.to_vec(), kes_period: kes_period as u64, operational_certificate: operational_certificate.to_bytes_vec()?, // TODO: remove the cold verification key in the op cert - cold_verification_key: vec![], + cold_verification_key: vec![], // TODO: fix expires_at, }; dmq_message_payload.msg_id = Self::compute_msg_id(&dmq_message_payload); @@ -143,7 +157,7 @@ mod tests { }, ..TimePoint::dummy() }))); - let builder = DmqMessageBuilder::new(kes_signer, chain_observer) + let builder = DmqMessageBuilder::new(kes_signer.clone(), chain_observer) .set_ttl(1000) .set_timestamp_provider(Arc::new({ let mut mock_timestamp_provider = MockUnixTimestampProvider::new(); @@ -163,6 +177,7 @@ mod tests { msg_payload, kes_signature: _, } = &*dmq_message; + assert_eq!( DmqMsg { msg_payload: DmqMsgPayload { @@ -175,7 +190,22 @@ mod tests { }, kes_signature: kes_signature.to_bytes_vec().unwrap(), }, - dmq_message.into() + dmq_message.clone().into() + ); + + let signed_messages = kes_signer.get_signed_messages(); + let mut expected_msg_payload = DmqMsgPayload { + msg_id: vec![], + msg_body: b"test".to_vec(), + kes_period: 0, + operational_certificate: vec![], // TODO: fix + cold_verification_key: vec![], // TODO: fix + expires_at: 1234, + }; + expected_msg_payload.msg_id = DmqMessageBuilder::compute_msg_id(&expected_msg_payload); + assert_eq!( + vec![expected_msg_payload.bytes_to_sign().unwrap()], + signed_messages ); } } From 9e0baf6769b057d530d5f84fb4e15887324d84d8 Mon Sep 17 00:00:00 2001 From: Jean-Philippe Raynaud Date: Wed, 3 Sep 2025 15:51:47 +0200 Subject: [PATCH 09/24] refactor(dmq): adapt consumer client with new 'DmqMsg' structure Following the move of operational certificate and cold verification out of the signed payload. --- .../mithril-dmq/src/consumer/client/pallas.rs | 54 +++++++++---------- 1 file changed, 25 insertions(+), 29 deletions(-) diff --git a/internal/mithril-dmq/src/consumer/client/pallas.rs b/internal/mithril-dmq/src/consumer/client/pallas.rs index c383f8f4b7d..d398aed1af6 100644 --- a/internal/mithril-dmq/src/consumer/client/pallas.rs +++ b/internal/mithril-dmq/src/consumer/client/pallas.rs @@ -113,9 +113,8 @@ impl DmqConsumerClientPallas { .0 .into_iter() .map(|dmq_message| { - let opcert = - OpCert::try_from_bytes(&dmq_message.msg_payload.operational_certificate) - .with_context(|| "Failed to parse operational certificate")?; + let opcert = OpCert::try_from_bytes(&dmq_message.operational_certificate) + .with_context(|| "Failed to parse operational certificate")?; let party_id = opcert.compute_protocol_party_id()?; let payload = M::try_from_bytes(&dmq_message.msg_payload.msg_body) .with_context(|| "Failed to parse DMQ message body")?; @@ -180,45 +179,42 @@ mod tests { msg_payload: DmqMsgPayload { msg_id: vec![0, 1], msg_body: DmqMessageTestPayload::new(b"msg_1").to_bytes_vec().unwrap(), - kes_period: 10, - operational_certificate: vec![ - 130, 132, 88, 32, 230, 80, 215, 83, 21, 9, 187, 108, 255, 215, 153, 140, - 40, 198, 142, 78, 200, 250, 98, 26, 9, 82, 32, 110, 161, 30, 176, 63, 205, - 125, 203, 41, 0, 0, 88, 64, 212, 171, 206, 39, 218, 5, 255, 3, 193, 52, 44, - 198, 171, 83, 19, 80, 114, 225, 186, 191, 156, 192, 84, 146, 245, 159, 31, - 240, 9, 247, 4, 87, 170, 168, 98, 199, 21, 139, 19, 190, 12, 251, 65, 215, - 169, 26, 86, 37, 137, 188, 17, 14, 178, 205, 175, 93, 39, 86, 4, 138, 187, - 234, 95, 5, 88, 32, 32, 253, 186, 201, 177, 11, 117, 135, 187, 167, 181, - 188, 22, 59, 206, 105, 231, 150, 215, 30, 78, 212, 76, 16, 252, 180, 72, - 134, 137, 247, 161, 68, - ], - cold_verification_key: vec![0, 1, 2, 3, 4, 5], expires_at: 100, }, kes_signature: vec![0, 1, 2, 3], + operational_certificate: vec![ + 130, 132, 88, 32, 230, 80, 215, 83, 21, 9, 187, 108, 255, 215, 153, 140, 40, + 198, 142, 78, 200, 250, 98, 26, 9, 82, 32, 110, 161, 30, 176, 63, 205, 125, + 203, 41, 0, 0, 88, 64, 212, 171, 206, 39, 218, 5, 255, 3, 193, 52, 44, 198, + 171, 83, 19, 80, 114, 225, 186, 191, 156, 192, 84, 146, 245, 159, 31, 240, 9, + 247, 4, 87, 170, 168, 98, 199, 21, 139, 19, 190, 12, 251, 65, 215, 169, 26, 86, + 37, 137, 188, 17, 14, 178, 205, 175, 93, 39, 86, 4, 138, 187, 234, 95, 5, 88, + 32, 32, 253, 186, 201, 177, 11, 117, 135, 187, 167, 181, 188, 22, 59, 206, 105, + 231, 150, 215, 30, 78, 212, 76, 16, 252, 180, 72, 134, 137, 247, 161, 68, + ], + cold_verification_key: vec![0, 1, 2, 3, 4, 5], }, DmqMsg { msg_payload: DmqMsgPayload { msg_id: vec![1, 2], msg_body: DmqMessageTestPayload::new(b"msg_2").to_bytes_vec().unwrap(), - kes_period: 11, - operational_certificate: vec![ - 130, 132, 88, 32, 230, 80, 215, 83, 21, 9, 187, 108, 255, 215, 153, 140, - 40, 198, 142, 78, 200, 250, 98, 26, 9, 82, 32, 110, 161, 30, 176, 63, 205, - 125, 203, 41, 0, 0, 88, 64, 132, 4, 199, 39, 190, 173, 88, 102, 121, 117, - 55, 62, 39, 189, 113, 96, 175, 24, 171, 240, 74, 42, 139, 202, 128, 185, - 44, 130, 209, 77, 191, 122, 196, 224, 33, 158, 187, 156, 203, 190, 173, - 150, 247, 87, 172, 58, 153, 185, 157, 87, 128, 14, 187, 107, 187, 215, 105, - 195, 107, 135, 172, 43, 173, 9, 88, 32, 77, 75, 24, 6, 47, 133, 2, 89, 141, - 224, 69, 202, 123, 105, 240, 103, 245, 159, 147, 177, 110, 58, 248, 115, - 58, 152, 138, 220, 35, 65, 245, 200, - ], - cold_verification_key: vec![0, 1, 2, 3, 4, 5], expires_at: 101, }, kes_signature: vec![1, 2, 3, 4], + operational_certificate: vec![ + 130, 132, 88, 32, 230, 80, 215, 83, 21, 9, 187, 108, 255, 215, 153, 140, 40, + 198, 142, 78, 200, 250, 98, 26, 9, 82, 32, 110, 161, 30, 176, 63, 205, 125, + 203, 41, 0, 0, 88, 64, 132, 4, 199, 39, 190, 173, 88, 102, 121, 117, 55, 62, + 39, 189, 113, 96, 175, 24, 171, 240, 74, 42, 139, 202, 128, 185, 44, 130, 209, + 77, 191, 122, 196, 224, 33, 158, 187, 156, 203, 190, 173, 150, 247, 87, 172, + 58, 153, 185, 157, 87, 128, 14, 187, 107, 187, 215, 105, 195, 107, 135, 172, + 43, 173, 9, 88, 32, 77, 75, 24, 6, 47, 133, 2, 89, 141, 224, 69, 202, 123, 105, + 240, 103, 245, 159, 147, 177, 110, 58, 248, 115, 58, 152, 138, 220, 35, 65, + 245, 200, + ], + cold_verification_key: vec![0, 1, 2, 3, 4, 5], }, ] } From f8cd590d2ad08d1d8071c0d708c64e40606f6c6d Mon Sep 17 00:00:00 2001 From: Jean-Philippe Raynaud Date: Wed, 3 Sep 2025 15:52:00 +0200 Subject: [PATCH 10/24] refactor(dmq): adapt consumer server with new 'DmqMsg' structure Following the move of operational certificate and cold verification out of the signed payload. --- internal/mithril-dmq/src/consumer/server/pallas.rs | 5 ++--- internal/mithril-dmq/src/consumer/server/queue.rs | 5 ++--- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/internal/mithril-dmq/src/consumer/server/pallas.rs b/internal/mithril-dmq/src/consumer/server/pallas.rs index 721fe7abb56..7d418bb7b59 100644 --- a/internal/mithril-dmq/src/consumer/server/pallas.rs +++ b/internal/mithril-dmq/src/consumer/server/pallas.rs @@ -305,13 +305,12 @@ mod tests { msg_payload: DmqMsgPayload { msg_id: vec![0, 1], msg_body: vec![0, 1, 2], - kes_period: 10, - operational_certificate: vec![0, 1, 2, 3, 4], - cold_verification_key: vec![0, 1, 2, 3, 4, 5], expires_at: 100, }, kes_signature: vec![0, 1, 2, 3], + operational_certificate: vec![0, 1, 2, 3, 4], + cold_verification_key: vec![0, 1, 2, 3, 4, 5], } } diff --git a/internal/mithril-dmq/src/consumer/server/queue.rs b/internal/mithril-dmq/src/consumer/server/queue.rs index 7927090eb14..dc20fb97cc8 100644 --- a/internal/mithril-dmq/src/consumer/server/queue.rs +++ b/internal/mithril-dmq/src/consumer/server/queue.rs @@ -95,13 +95,12 @@ mod tests { msg_payload: DmqMsgPayload { msg_id: vec![0, 1], msg_body: vec![0, 1, 2], - kes_period: 10, - operational_certificate: vec![0, 1, 2, 3, 4], - cold_verification_key: vec![0, 1, 2, 3, 4, 5], expires_at: 100, }, kes_signature: vec![0, 1, 2, 3], + operational_certificate: vec![0, 1, 2, 3, 4], + cold_verification_key: vec![0, 1, 2, 3, 4, 5], } } From 4d85e0d7ba0c28ea94ba3304ab6a2bd0b241fb83 Mon Sep 17 00:00:00 2001 From: Jean-Philippe Raynaud Date: Wed, 3 Sep 2025 15:52:31 +0200 Subject: [PATCH 11/24] refactor(dmq): adapt publisher server with new 'DmqMsg' structure Following the move of operational certificate and cold verification out of the signed payload. --- internal/mithril-dmq/src/publisher/server/pallas.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/internal/mithril-dmq/src/publisher/server/pallas.rs b/internal/mithril-dmq/src/publisher/server/pallas.rs index 040ec5a67b3..383cff1695f 100644 --- a/internal/mithril-dmq/src/publisher/server/pallas.rs +++ b/internal/mithril-dmq/src/publisher/server/pallas.rs @@ -264,13 +264,12 @@ mod tests { msg_payload: DmqMsgPayload { msg_id: vec![0, 1], msg_body: vec![0, 1, 2], - kes_period: 10, - operational_certificate: vec![0, 1, 2, 3, 4], - cold_verification_key: vec![0, 1, 2, 3, 4, 5], expires_at: 100, }, kes_signature: vec![0, 1, 2, 3], + operational_certificate: vec![0, 1, 2, 3, 4], + cold_verification_key: vec![0, 1, 2, 3, 4, 5], } } From 76befa1a7763b1893db89db9db93dc19dfb3afe9 Mon Sep 17 00:00:00 2001 From: Jean-Philippe Raynaud Date: Wed, 3 Sep 2025 15:54:04 +0200 Subject: [PATCH 12/24] refactor(dmq): adapt 'DmqMessage' with new 'DmqMsg' structure Following the move of operational certificate and cold verification out of the signed payload. --- internal/mithril-dmq/src/model/message.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/internal/mithril-dmq/src/model/message.rs b/internal/mithril-dmq/src/model/message.rs index 1ba5fe032c7..516eb28bb8b 100644 --- a/internal/mithril-dmq/src/model/message.rs +++ b/internal/mithril-dmq/src/model/message.rs @@ -82,13 +82,12 @@ mod tests { msg_payload: DmqMsgPayload { msg_id: vec![0, 1], msg_body: vec![0, 1, 2], - kes_period: 10, - operational_certificate: vec![0, 1, 2, 3, 4], - cold_verification_key: vec![0, 1, 2, 3, 4, 5], expires_at: 100, }, kes_signature: vec![0, 1, 2, 3], + operational_certificate: vec![0, 1, 2, 3, 4], + cold_verification_key: vec![0, 1, 2, 3, 4, 5], }; let dmq_message = DmqMessage::from(dmq_msg.clone()); From ec8d488ef7f9ccd99ca6546fe05f70f4fdf6cbac Mon Sep 17 00:00:00 2001 From: Jean-Philippe Raynaud Date: Wed, 3 Sep 2025 15:57:26 +0200 Subject: [PATCH 13/24] refactor(dmq): update the message builder and KES signature Following the move of operational certificate and cold verification out of the signed payload. --- internal/mithril-dmq/src/model/builder.rs | 74 +++++++++-------------- 1 file changed, 28 insertions(+), 46 deletions(-) diff --git a/internal/mithril-dmq/src/model/builder.rs b/internal/mithril-dmq/src/model/builder.rs index fa1dd41fa68..af1469468f6 100644 --- a/internal/mithril-dmq/src/model/builder.rs +++ b/internal/mithril-dmq/src/model/builder.rs @@ -55,13 +55,20 @@ impl DmqMessageBuilder { let mut hasher = Blake2b::::new(); hasher.update(&dmq_message_payload.msg_body); hasher.update(dmq_message_payload.kes_period.to_be_bytes()); - hasher.update(&dmq_message_payload.operational_certificate); - hasher.update(&dmq_message_payload.cold_verification_key); hasher.update(dmq_message_payload.expires_at.to_be_bytes()); hasher.finalize().to_vec() } + /// Enriches a DMQ message payload with a computed message ID. + fn enrich_msg_payload_with_id(dmq_message_payload: DmqMsgPayload) -> DmqMsgPayload { + let msg_id = Self::compute_msg_id(&dmq_message_payload); + let mut dmq_message_payload_with_id = dmq_message_payload; + dmq_message_payload_with_id.msg_id = msg_id; + + dmq_message_payload_with_id + } + /// Builds a DMQ message from the provided message bytes. pub async fn build(&self, message_bytes: &[u8]) -> StdResult { let expires_at: u32 = (self.timestamp_provider.current_timestamp()? @@ -74,40 +81,28 @@ impl DmqMessageBuilder { .await .with_context(|| "Failed to get KES period while building DMQ message")? .unwrap_or_default(); - let mut dmq_message_payload = DmqMsgPayload { + let dmq_message_payload = Self::enrich_msg_payload_with_id(DmqMsgPayload { msg_id: vec![], msg_body: message_bytes.to_vec(), kes_period: kes_period as u64, - operational_certificate: vec![], - cold_verification_key: vec![], expires_at, - }; - dmq_message_payload.msg_id = Self::compute_msg_id(&dmq_message_payload); + }); let (kes_signature, operational_certificate) = self .kes_signer .sign(&dmq_message_payload.bytes_to_sign()?, kes_period) .with_context(|| "Failed to KES sign message while building DMQ message")?; - // TODO: remove the cold verification key in the op cert - dmq_message_payload.operational_certificate = operational_certificate.to_bytes_vec()?; - dmq_message_payload.cold_verification_key = vec![]; - let dmq_message = DmqMsg { - msg_payload: { - let mut dmq_message_payload = DmqMsgPayload { - msg_id: vec![], - msg_body: message_bytes.to_vec(), - kes_period: kes_period as u64, - operational_certificate: operational_certificate.to_bytes_vec()?, // TODO: remove the cold verification key in the op cert - cold_verification_key: vec![], // TODO: fix - expires_at, - }; - dmq_message_payload.msg_id = Self::compute_msg_id(&dmq_message_payload); - - dmq_message_payload - }, + msg_payload: Self::enrich_msg_payload_with_id(DmqMsgPayload { + msg_id: vec![], + msg_body: message_bytes.to_vec(), + kes_period: kes_period as u64, + expires_at, + }), kes_signature: kes_signature.to_bytes_vec()?, + operational_certificate: operational_certificate.to_bytes_vec()?, // TODO: remove the cold verification key in the op cert + cold_verification_key: vec![], // TODO: fix }; Ok(dmq_message.into()) @@ -173,36 +168,23 @@ mod tests { let dmq_message = builder.build(&message.to_bytes_vec().unwrap()).await.unwrap(); - let DmqMsg { - msg_payload, - kes_signature: _, - } = &*dmq_message; - + let expected_msg_payload = DmqMessageBuilder::enrich_msg_payload_with_id(DmqMsgPayload { + msg_id: vec![], + msg_body: b"test".to_vec(), + kes_period: 0, + expires_at: 1234, + }); assert_eq!( DmqMsg { - msg_payload: DmqMsgPayload { - msg_id: DmqMessageBuilder::compute_msg_id(msg_payload), - msg_body: b"test".to_vec(), - kes_period: 0, - operational_certificate: operational_certificate.to_bytes_vec().unwrap(), - cold_verification_key: vec![], // TODO: fix - expires_at: 1234, - }, + msg_payload: expected_msg_payload.clone(), kes_signature: kes_signature.to_bytes_vec().unwrap(), + operational_certificate: operational_certificate.to_bytes_vec().unwrap(), + cold_verification_key: vec![], }, dmq_message.clone().into() ); let signed_messages = kes_signer.get_signed_messages(); - let mut expected_msg_payload = DmqMsgPayload { - msg_id: vec![], - msg_body: b"test".to_vec(), - kes_period: 0, - operational_certificate: vec![], // TODO: fix - cold_verification_key: vec![], // TODO: fix - expires_at: 1234, - }; - expected_msg_payload.msg_id = DmqMessageBuilder::compute_msg_id(&expected_msg_payload); assert_eq!( vec![expected_msg_payload.bytes_to_sign().unwrap()], signed_messages From e89e93f92a34e77c3a20ca7ba6d4d45a56bf760e Mon Sep 17 00:00:00 2001 From: Jean-Philippe Raynaud Date: Thu, 4 Sep 2025 15:00:30 +0200 Subject: [PATCH 14/24] feat(common): add 'into_inner' function to 'ProtocolKey' --- mithril-common/src/crypto_helper/types/protocol_key.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/mithril-common/src/crypto_helper/types/protocol_key.rs b/mithril-common/src/crypto_helper/types/protocol_key.rs index aa4fed3961e..923010949da 100644 --- a/mithril-common/src/crypto_helper/types/protocol_key.rs +++ b/mithril-common/src/crypto_helper/types/protocol_key.rs @@ -129,6 +129,11 @@ where pub fn key_to_bytes_hex(key: &T) -> StdResult { key.to_bytes_hex() } + + /// Consume self and return the inner key + pub fn into_inner(self) -> T { + self.key + } } impl Deref for ProtocolKey From 1e9be5f5a7fc11be8a0fd214d8271f5d13d65262 Mon Sep 17 00:00:00 2001 From: Jean-Philippe Raynaud Date: Thu, 4 Sep 2025 15:03:04 +0200 Subject: [PATCH 15/24] refactor(common): add support for operational certificate without cold verification key --- .../cardano/kes/verifier_standard.rs | 8 +- .../cardano/key_certification.rs | 2 +- .../src/crypto_helper/cardano/opcert.rs | 168 ++++++++++++++---- .../src/crypto_helper/codec/binary.rs | 14 +- mithril-common/src/crypto_helper/mod.rs | 4 +- 5 files changed, 159 insertions(+), 37 deletions(-) diff --git a/mithril-common/src/crypto_helper/cardano/kes/verifier_standard.rs b/mithril-common/src/crypto_helper/cardano/kes/verifier_standard.rs index da660bb22e9..5b3e28b78c3 100644 --- a/mithril-common/src/crypto_helper/cardano/kes/verifier_standard.rs +++ b/mithril-common/src/crypto_helper/cardano/kes/verifier_standard.rs @@ -30,7 +30,11 @@ impl KesVerifier for KesVerifierStandard { let kes_period_try_max = std::cmp::min(64, kes_period.saturating_add(1)); for kes_period_try in kes_period_try_min..kes_period_try_max { if signature - .verify(kes_period_try, &operational_certificate.kes_vk, message) + .verify( + kes_period_try, + &operational_certificate.get_kes_verification_key(), + message, + ) .is_ok() { return Ok(()); @@ -39,7 +43,7 @@ impl KesVerifier for KesVerifierStandard { Err(KesVerifyError::SignatureInvalid( kes_period, - operational_certificate.start_kes_period as u32, + operational_certificate.get_start_kes_period() as u32, ) .into()) } diff --git a/mithril-common/src/crypto_helper/cardano/key_certification.rs b/mithril-common/src/crypto_helper/cardano/key_certification.rs index 3bf8b12095a..0a6449a78bb 100644 --- a/mithril-common/src/crypto_helper/cardano/key_certification.rs +++ b/mithril-common/src/crypto_helper/cardano/key_certification.rs @@ -266,7 +266,7 @@ impl KeyRegWrapper { } else { return Err(ProtocolRegistrationErrorWrapper::KesSignatureInvalid( kes_period, - opcert.start_kes_period, + opcert.get_start_kes_period(), )); } } else { diff --git a/mithril-common/src/crypto_helper/cardano/opcert.rs b/mithril-common/src/crypto_helper/cardano/opcert.rs index dd62c5c95e8..4d21e9f06aa 100644 --- a/mithril-common/src/crypto_helper/cardano/opcert.rs +++ b/mithril-common/src/crypto_helper/cardano/opcert.rs @@ -26,7 +26,7 @@ pub enum OpCertError { /// Raw Fields of the operational certificates (without including the cold VK) #[derive(Clone, Debug, Deserialize, PartialEq, Eq, Serialize)] -struct RawFields( +struct RawOpCertWithoutColdVerificationKey( #[serde(with = "serde_bytes")] Vec, u64, u64, @@ -35,16 +35,72 @@ struct RawFields( /// Raw Operational Certificate #[derive(Clone, Debug, Deserialize, PartialEq, Eq, Serialize)] -struct RawOpCert(RawFields, EdVerificationKey); +struct RawOpCert(RawOpCertWithoutColdVerificationKey, EdVerificationKey); -/// Parsed Operational Certificate +/// Parsed Operational Certificate without cold verification key #[derive(Clone, Debug, PartialEq, Eq)] -pub struct OpCert { +pub struct OpCertWithoutColdVerificationKey { pub(crate) kes_vk: KesPublicKey, pub(crate) issue_number: u64, /// KES period at which KES key is initalized pub start_kes_period: u64, pub(crate) cert_sig: EdSignature, +} + +impl SerDeShelleyFileFormat for OpCertWithoutColdVerificationKey { + const TYPE: &'static str = "NodeOperationalCertificateWithoutColdVerificationKey"; + const DESCRIPTION: &'static str = ""; +} + +impl Serialize for OpCertWithoutColdVerificationKey { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + let raw_cert = RawOpCertWithoutColdVerificationKey( + self.kes_vk.as_bytes().to_vec(), + self.issue_number, + self.start_kes_period, + self.cert_sig.to_bytes().to_vec(), + ); + + raw_cert.serialize(serializer) + } +} + +impl<'de> Deserialize<'de> for OpCertWithoutColdVerificationKey { + fn deserialize(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + let raw_cert = RawOpCertWithoutColdVerificationKey::deserialize(deserializer)?; + + Ok(Self { + kes_vk: KesPublicKey::from_bytes(&raw_cert.0) + .map_err(|_| Error::custom("KES vk serialisation error"))?, + issue_number: raw_cert.1, + start_kes_period: raw_cert.2, + cert_sig: EdSignature::from_slice(&raw_cert.3) + .map_err(|_| Error::custom("ed25519 signature serialisation error"))?, + }) + } +} + +impl From<&OpCertWithoutColdVerificationKey> for RawOpCertWithoutColdVerificationKey { + fn from(opcert: &OpCertWithoutColdVerificationKey) -> Self { + RawOpCertWithoutColdVerificationKey( + opcert.kes_vk.as_bytes().to_vec(), + opcert.issue_number, + opcert.start_kes_period, + opcert.cert_sig.to_bytes().to_vec(), + ) + } +} + +/// Parsed Operational Certificate +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct OpCert { + pub(crate) opcert_without_vk: OpCertWithoutColdVerificationKey, pub(crate) cold_vk: EdVerificationKey, } @@ -69,14 +125,46 @@ impl OpCert { )); Self { - kes_vk, - issue_number, - start_kes_period, - cert_sig, + opcert_without_vk: OpCertWithoutColdVerificationKey { + kes_vk, + issue_number, + start_kes_period, + cert_sig, + }, cold_vk, } } + /// Get the KES verification key + pub fn get_kes_verification_key(&self) -> KesPublicKey { + self.opcert_without_vk.kes_vk + } + + /// Get the issue number + pub fn get_issue_number(&self) -> u64 { + self.opcert_without_vk.issue_number + } + + /// Get the start KES period + pub fn get_start_kes_period(&self) -> u64 { + self.opcert_without_vk.start_kes_period + } + + /// Get the certificate signature + pub fn get_certificate_signature(&self) -> EdSignature { + self.opcert_without_vk.cert_sig + } + + /// Get the OpCert without cold verification key + pub fn get_opcert_without_cold_verification_key(&self) -> OpCertWithoutColdVerificationKey { + self.opcert_without_vk.clone() + } + + /// Get the cold verification key + pub fn get_cold_verification_key(&self) -> EdVerificationKey { + self.cold_vk + } + /// Compute message to sign pub(crate) fn compute_message_to_sign( kes_vk: &KesPublicKey, @@ -96,11 +184,11 @@ impl OpCert { .cold_vk .verify( &Self::compute_message_to_sign( - &self.kes_vk, - self.issue_number, - self.start_kes_period, + &self.opcert_without_vk.kes_vk, + self.opcert_without_vk.issue_number, + self.opcert_without_vk.start_kes_period, ), - &self.cert_sig, + &self.opcert_without_vk.cert_sig, ) .is_ok() { @@ -129,10 +217,10 @@ impl OpCert { /// Compute the hash of an OpCert pub fn compute_hash(&self) -> String { let mut hasher = Sha256::new(); - hasher.update(self.kes_vk.as_bytes()); - hasher.update(self.issue_number.to_be_bytes()); - hasher.update(self.start_kes_period.to_be_bytes()); - hasher.update(self.cert_sig.to_bytes()); + hasher.update(self.opcert_without_vk.kes_vk.as_bytes()); + hasher.update(self.opcert_without_vk.issue_number.to_be_bytes()); + hasher.update(self.opcert_without_vk.start_kes_period.to_be_bytes()); + hasher.update(self.opcert_without_vk.cert_sig.to_bytes()); hasher.update(self.cold_vk.as_bytes()); hex::encode(hasher.finalize()) } @@ -143,15 +231,9 @@ impl Serialize for OpCert { where S: Serializer, { - let raw_cert = RawOpCert( - RawFields( - self.kes_vk.as_bytes().to_vec(), - self.issue_number, - self.start_kes_period, - self.cert_sig.to_bytes().to_vec(), - ), - self.cold_vk, - ); + let raw_opcert_without_vk: RawOpCertWithoutColdVerificationKey = + (&self.opcert_without_vk).into(); + let raw_cert = RawOpCert(raw_opcert_without_vk, self.cold_vk); raw_cert.serialize(serializer) } @@ -163,18 +245,33 @@ impl<'de> Deserialize<'de> for OpCert { D: Deserializer<'de>, { let raw_cert = RawOpCert::deserialize(deserializer)?; + let raw_opcert_without_vk = &raw_cert.0; + Ok(Self { - kes_vk: KesPublicKey::from_bytes(&raw_cert.0.0) - .map_err(|_| Error::custom("KES vk serialisation error"))?, - issue_number: raw_cert.0.1, - start_kes_period: raw_cert.0.2, - cert_sig: EdSignature::from_slice(&raw_cert.0.3) - .map_err(|_| Error::custom("ed25519 signature serialisation error"))?, + opcert_without_vk: OpCertWithoutColdVerificationKey { + kes_vk: KesPublicKey::from_bytes(&raw_opcert_without_vk.0) + .map_err(|_| Error::custom("KES vk serialisation error"))?, + issue_number: raw_opcert_without_vk.1, + start_kes_period: raw_opcert_without_vk.2, + cert_sig: EdSignature::from_slice(&raw_opcert_without_vk.3) + .map_err(|_| Error::custom("ed25519 signature serialisation error"))?, + }, cold_vk: raw_cert.1, }) } } +impl From<(OpCertWithoutColdVerificationKey, EdVerificationKey)> for OpCert { + fn from( + (opcert_without_vk, cold_vk): (OpCertWithoutColdVerificationKey, EdVerificationKey), + ) -> Self { + Self { + opcert_without_vk, + cold_vk, + } + } +} + #[cfg(test)] mod tests { use super::*; @@ -220,5 +317,14 @@ mod tests { "d9899c574fd7a710732391706b59e878bfd416214c49d2b3841c5c8b".to_string(), party_id_as_hash ); + + let operational_certificate_bytes_without_cold_vk = operational_certificate + .get_opcert_without_cold_verification_key() + .to_cbor_bytes() + .expect("compute CBOR bytes should not fail"); + assert_eq!( + "845820e650d7531509bb6cffd7998c28c68e4ec8fa621a0952206ea11eb03fcd7dcb2900005840d4abce27da05ff03c1342cc6ab53135072e1babf9cc05492f59f1ff009f70457aaa862c7158b13be0cfb41d7a91a562589bc110eb2cdaf5d2756048abbea5f05", + hex::encode(operational_certificate_bytes_without_cold_vk) + ); } } diff --git a/mithril-common/src/crypto_helper/codec/binary.rs b/mithril-common/src/crypto_helper/codec/binary.rs index ab5f356d302..98d92a437af 100644 --- a/mithril-common/src/crypto_helper/codec/binary.rs +++ b/mithril-common/src/crypto_helper/codec/binary.rs @@ -210,7 +210,7 @@ mod binary_kes_sig { } mod binary_opcert { - use crate::crypto_helper::{OpCert, SerDeShelleyFileFormat}; + use crate::crypto_helper::{OpCert, OpCertWithoutColdVerificationKey, SerDeShelleyFileFormat}; use anyhow::anyhow; use super::*; @@ -226,6 +226,18 @@ mod binary_opcert { Self::from_cbor_bytes(bytes).map_err(|e| anyhow!(format!("{e:?}"))) } } + + impl TryToBytes for OpCertWithoutColdVerificationKey { + fn to_bytes_vec(&self) -> StdResult> { + self.to_cbor_bytes().map_err(|e| anyhow!(format!("{e:?}"))) + } + } + + impl TryFromBytes for OpCertWithoutColdVerificationKey { + fn try_from_bytes(bytes: &[u8]) -> StdResult { + Self::from_cbor_bytes(bytes).map_err(|e| anyhow!(format!("{e:?}"))) + } + } } mod binary_mk_proof { diff --git a/mithril-common/src/crypto_helper/mod.rs b/mithril-common/src/crypto_helper/mod.rs index 23b31c81e25..e6a44e116dc 100644 --- a/mithril-common/src/crypto_helper/mod.rs +++ b/mithril-common/src/crypto_helper/mod.rs @@ -13,8 +13,8 @@ pub use cardano::ColdKeyGenerator; pub use cardano::{ KesPeriod, KesSigner, KesSignerStandard, KesVerifier, KesVerifierStandard, KesVerifyError, - OpCert, ProtocolInitializerErrorWrapper, ProtocolRegistrationErrorWrapper, - SerDeShelleyFileFormat, Sum6KesBytes, + OpCert, OpCertWithoutColdVerificationKey, ProtocolInitializerErrorWrapper, + ProtocolRegistrationErrorWrapper, SerDeShelleyFileFormat, Sum6KesBytes, }; pub use codec::*; pub use ed25519_alias::{era::*, genesis::*, manifest::*}; From 1cf06c53bb4e57fd17ed63c2e162c8f03e2e0f56 Mon Sep 17 00:00:00 2001 From: Jean-Philippe Raynaud Date: Thu, 4 Sep 2025 15:03:15 +0200 Subject: [PATCH 16/24] refactor(aggregator): add support for operational certificate without cold verification key --- mithril-aggregator/src/services/signer_registration/verifier.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mithril-aggregator/src/services/signer_registration/verifier.rs b/mithril-aggregator/src/services/signer_registration/verifier.rs index bc5712bf498..edef6172934 100644 --- a/mithril-aggregator/src/services/signer_registration/verifier.rs +++ b/mithril-aggregator/src/services/signer_registration/verifier.rs @@ -48,7 +48,7 @@ impl SignerRegistrationVerifier for MithrilSignerRegistrationVerifier { .get_current_kes_period() .await? .unwrap_or_default() - - operational_certificate.start_kes_period as KesPeriod, + - operational_certificate.get_start_kes_period() as KesPeriod, ), None => None, }; From fc6cd06a0c1c6d031d903275d329717fadd2b7c4 Mon Sep 17 00:00:00 2001 From: Jean-Philippe Raynaud Date: Thu, 4 Sep 2025 15:03:25 +0200 Subject: [PATCH 17/24] refactor(signer): add support for operational certificate without cold verification key --- mithril-signer/src/runtime/runner.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mithril-signer/src/runtime/runner.rs b/mithril-signer/src/runtime/runner.rs index fefa145ba31..d259f181757 100644 --- a/mithril-signer/src/runtime/runner.rs +++ b/mithril-signer/src/runtime/runner.rs @@ -178,7 +178,7 @@ impl Runner for SignerRunner { .get_current_kes_period() .await? .unwrap_or_default() - - operational_certificate.start_kes_period as KesPeriod, + - operational_certificate.get_start_kes_period() as KesPeriod, ), None => None, }; From 68436e850ea4a021e1d12be1899cd3358391affb Mon Sep 17 00:00:00 2001 From: Jean-Philippe Raynaud Date: Thu, 4 Sep 2025 15:04:35 +0200 Subject: [PATCH 18/24] feat(dmq): add support for separate operational certificate and cold verification key in message --- .../mithril-dmq/src/consumer/client/pallas.rs | 52 ++++++++++++------- internal/mithril-dmq/src/model/builder.rs | 27 ++++++---- 2 files changed, 48 insertions(+), 31 deletions(-) diff --git a/internal/mithril-dmq/src/consumer/client/pallas.rs b/internal/mithril-dmq/src/consumer/client/pallas.rs index d398aed1af6..fa9ee422951 100644 --- a/internal/mithril-dmq/src/consumer/client/pallas.rs +++ b/internal/mithril-dmq/src/consumer/client/pallas.rs @@ -7,7 +7,9 @@ use tokio::sync::{Mutex, MutexGuard}; use mithril_common::{ CardanoNetwork, StdResult, - crypto_helper::{OpCert, TryFromBytes}, + crypto_helper::{ + OpCert, OpCertWithoutColdVerificationKey, TryFromBytes, ed25519::Ed25519VerificationKey, + }, entities::PartyId, logging::LoggerExtensions, }; @@ -113,8 +115,17 @@ impl DmqConsumerClientPallas { .0 .into_iter() .map(|dmq_message| { - let opcert = OpCert::try_from_bytes(&dmq_message.operational_certificate) + let opcert_without_verification_key = + OpCertWithoutColdVerificationKey::try_from_bytes( + &dmq_message.operational_certificate, + ) .with_context(|| "Failed to parse operational certificate")?; + let cold_verification_key = + Ed25519VerificationKey::from_bytes(&dmq_message.cold_verification_key) + .with_context(|| "Failed to parse cold verification key")? + .into_inner(); + let opcert: OpCert = + (opcert_without_verification_key, cold_verification_key).into(); let party_id = opcert.compute_protocol_party_id()?; let payload = M::try_from_bytes(&dmq_message.msg_payload.msg_body) .with_context(|| "Failed to parse DMQ message body")?; @@ -184,16 +195,17 @@ mod tests { }, kes_signature: vec![0, 1, 2, 3], operational_certificate: vec![ - 130, 132, 88, 32, 230, 80, 215, 83, 21, 9, 187, 108, 255, 215, 153, 140, 40, - 198, 142, 78, 200, 250, 98, 26, 9, 82, 32, 110, 161, 30, 176, 63, 205, 125, - 203, 41, 0, 0, 88, 64, 212, 171, 206, 39, 218, 5, 255, 3, 193, 52, 44, 198, - 171, 83, 19, 80, 114, 225, 186, 191, 156, 192, 84, 146, 245, 159, 31, 240, 9, - 247, 4, 87, 170, 168, 98, 199, 21, 139, 19, 190, 12, 251, 65, 215, 169, 26, 86, - 37, 137, 188, 17, 14, 178, 205, 175, 93, 39, 86, 4, 138, 187, 234, 95, 5, 88, - 32, 32, 253, 186, 201, 177, 11, 117, 135, 187, 167, 181, 188, 22, 59, 206, 105, + 132, 88, 32, 230, 80, 215, 83, 21, 9, 187, 108, 255, 215, 153, 140, 40, 198, + 142, 78, 200, 250, 98, 26, 9, 82, 32, 110, 161, 30, 176, 63, 205, 125, 203, 41, + 0, 0, 88, 64, 212, 171, 206, 39, 218, 5, 255, 3, 193, 52, 44, 198, 171, 83, 19, + 80, 114, 225, 186, 191, 156, 192, 84, 146, 245, 159, 31, 240, 9, 247, 4, 87, + 170, 168, 98, 199, 21, 139, 19, 190, 12, 251, 65, 215, 169, 26, 86, 37, 137, + 188, 17, 14, 178, 205, 175, 93, 39, 86, 4, 138, 187, 234, 95, 5, + ], + cold_verification_key: vec![ + 32, 253, 186, 201, 177, 11, 117, 135, 187, 167, 181, 188, 22, 59, 206, 105, 231, 150, 215, 30, 78, 212, 76, 16, 252, 180, 72, 134, 137, 247, 161, 68, ], - cold_verification_key: vec![0, 1, 2, 3, 4, 5], }, DmqMsg { msg_payload: DmqMsgPayload { @@ -204,17 +216,17 @@ mod tests { }, kes_signature: vec![1, 2, 3, 4], operational_certificate: vec![ - 130, 132, 88, 32, 230, 80, 215, 83, 21, 9, 187, 108, 255, 215, 153, 140, 40, - 198, 142, 78, 200, 250, 98, 26, 9, 82, 32, 110, 161, 30, 176, 63, 205, 125, - 203, 41, 0, 0, 88, 64, 132, 4, 199, 39, 190, 173, 88, 102, 121, 117, 55, 62, - 39, 189, 113, 96, 175, 24, 171, 240, 74, 42, 139, 202, 128, 185, 44, 130, 209, - 77, 191, 122, 196, 224, 33, 158, 187, 156, 203, 190, 173, 150, 247, 87, 172, - 58, 153, 185, 157, 87, 128, 14, 187, 107, 187, 215, 105, 195, 107, 135, 172, - 43, 173, 9, 88, 32, 77, 75, 24, 6, 47, 133, 2, 89, 141, 224, 69, 202, 123, 105, - 240, 103, 245, 159, 147, 177, 110, 58, 248, 115, 58, 152, 138, 220, 35, 65, - 245, 200, + 132, 88, 32, 230, 80, 215, 83, 21, 9, 187, 108, 255, 215, 153, 140, 40, 198, + 142, 78, 200, 250, 98, 26, 9, 82, 32, 110, 161, 30, 176, 63, 205, 125, 203, 41, + 0, 0, 88, 64, 212, 171, 206, 39, 218, 5, 255, 3, 193, 52, 44, 198, 171, 83, 19, + 80, 114, 225, 186, 191, 156, 192, 84, 146, 245, 159, 31, 240, 9, 247, 4, 87, + 170, 168, 98, 199, 21, 139, 19, 190, 12, 251, 65, 215, 169, 26, 86, 37, 137, + 188, 17, 14, 178, 205, 175, 93, 39, 86, 4, 138, 187, 234, 95, 5, + ], + cold_verification_key: vec![ + 77, 75, 24, 6, 47, 133, 2, 89, 141, 224, 69, 202, 123, 105, 240, 103, 245, 159, + 147, 177, 110, 58, 248, 115, 58, 152, 138, 220, 35, 65, 245, 200, ], - cold_verification_key: vec![0, 1, 2, 3, 4, 5], }, ] } diff --git a/internal/mithril-dmq/src/model/builder.rs b/internal/mithril-dmq/src/model/builder.rs index af1469468f6..2a14c301312 100644 --- a/internal/mithril-dmq/src/model/builder.rs +++ b/internal/mithril-dmq/src/model/builder.rs @@ -7,7 +7,7 @@ use pallas_network::miniprotocols::localmsgsubmission::{DmqMsg, DmqMsgPayload}; use mithril_cardano_node_chain::chain_observer::ChainObserver; use mithril_common::{ StdResult, - crypto_helper::{KesSigner, TryToBytes}, + crypto_helper::{KesSigner, SerDeShelleyFileFormat, TryToBytes}, }; use crate::model::{DmqMessage, SystemUnixTimestampProvider, UnixTimestampProvider}; @@ -92,17 +92,16 @@ impl DmqMessageBuilder { .kes_signer .sign(&dmq_message_payload.bytes_to_sign()?, kes_period) .with_context(|| "Failed to KES sign message while building DMQ message")?; + let operational_certificate_without_cold_verification_key = + operational_certificate.get_opcert_without_cold_verification_key(); + let cold_verification_key = operational_certificate.get_cold_verification_key(); let dmq_message = DmqMsg { - msg_payload: Self::enrich_msg_payload_with_id(DmqMsgPayload { - msg_id: vec![], - msg_body: message_bytes.to_vec(), - kes_period: kes_period as u64, - expires_at, - }), + msg_payload: dmq_message_payload, kes_signature: kes_signature.to_bytes_vec()?, - operational_certificate: operational_certificate.to_bytes_vec()?, // TODO: remove the cold verification key in the op cert - cold_verification_key: vec![], // TODO: fix + operational_certificate: operational_certificate_without_cold_verification_key + .to_cbor_bytes()?, + cold_verification_key: cold_verification_key.to_bytes().to_vec(), }; Ok(dmq_message.into()) @@ -178,8 +177,14 @@ mod tests { DmqMsg { msg_payload: expected_msg_payload.clone(), kes_signature: kes_signature.to_bytes_vec().unwrap(), - operational_certificate: operational_certificate.to_bytes_vec().unwrap(), - cold_verification_key: vec![], + operational_certificate: operational_certificate + .get_opcert_without_cold_verification_key() + .to_cbor_bytes() + .unwrap(), + cold_verification_key: operational_certificate + .get_cold_verification_key() + .to_bytes() + .to_vec(), }, dmq_message.clone().into() ); From f62ef035f2816a64ac9b3b0477b6f560dff1d72a Mon Sep 17 00:00:00 2001 From: Jean-Philippe Raynaud Date: Fri, 5 Sep 2025 12:37:35 +0200 Subject: [PATCH 19/24] feat(dmq): make consumer server queue drain expired messages --- .../mithril-dmq/src/consumer/server/queue.rs | 136 +++++++++++++++--- 1 file changed, 114 insertions(+), 22 deletions(-) diff --git a/internal/mithril-dmq/src/consumer/server/queue.rs b/internal/mithril-dmq/src/consumer/server/queue.rs index dc20fb97cc8..ed88db989c9 100644 --- a/internal/mithril-dmq/src/consumer/server/queue.rs +++ b/internal/mithril-dmq/src/consumer/server/queue.rs @@ -1,13 +1,18 @@ -use std::collections::VecDeque; +use std::{collections::VecDeque, sync::Arc}; +use mithril_common::StdResult; use tokio::sync::{Mutex, Notify}; -use crate::DmqMessage; +use crate::{ + DmqMessage, + model::{SystemUnixTimestampProvider, UnixTimestampProvider}, +}; /// A queue for storing DMQ messages. pub(crate) struct MessageQueue { messages: Mutex>, new_message_notify: Notify, + timestamp_provider: Arc, max_size: usize, } @@ -16,33 +21,62 @@ impl MessageQueue { const MAX_SIZE_DEFAULT: usize = 10000; /// Creates a new instance of [BlockingNonBlockingQueue]. - pub fn new(max_size: usize) -> Self { + pub fn new(max_size: usize, timestamp_provider: Arc) -> Self { Self { messages: Mutex::new(VecDeque::new()), new_message_notify: Notify::new(), + timestamp_provider, max_size, } } - /// Enqueues a new message into the queue. - pub async fn enqueue(&self, message: DmqMessage) { + /// Cleans the queue + /// + /// Removes expired messages and ensures the queue does not exceed the maximum size. + async fn clean_queue(&self) { let mut message_queue_guard = self.messages.lock().await; - (*message_queue_guard).push_back(message); + // Remove expired messages from the front of the queue + // There may be other expired messages in the queue, but they will be removed on dequeue + // This avoids full scan of the queue. + while let Some(message) = message_queue_guard.front() + && self.has_message_expired(message).unwrap_or(false) + { + message_queue_guard.pop_front(); + } while message_queue_guard.len() > self.max_size { message_queue_guard.pop_front(); } + } + + /// Checks if a message has expired. + fn has_message_expired(&self, message: &DmqMessage) -> StdResult { + let current_timestamp: u32 = self.timestamp_provider.current_timestamp()?.try_into()?; + Ok(message.msg_payload.expires_at < current_timestamp) + } + + /// Enqueues a new message into the queue. + pub async fn enqueue(&self, message: DmqMessage) { + { + // Run in a block to avoid Mutex deadlock in clean_queue + let mut message_queue_guard = self.messages.lock().await; + (*message_queue_guard).push_back(message); + } + self.clean_queue().await; self.new_message_notify.notify_waiters(); } /// Returns the messages from the queue in a non blocking way, if available. pub async fn dequeue_non_blocking(&self, limit: Option) -> Vec { + self.clean_queue().await; let mut message_queue_guard = self.messages.lock().await; let limit = limit.unwrap_or((*message_queue_guard).len()); let mut messages = Vec::new(); for _ in 0..limit { - if let Some(message) = (*message_queue_guard).pop_front() { + if let Some(message) = (*message_queue_guard).pop_front() + && !self.has_message_expired(&message).unwrap_or(false) + { messages.push(message); } } @@ -76,7 +110,10 @@ impl MessageQueue { impl Default for MessageQueue { fn default() -> Self { - Self::new(Self::MAX_SIZE_DEFAULT) + Self::new( + Self::MAX_SIZE_DEFAULT, + Arc::new(SystemUnixTimestampProvider), + ) } } @@ -88,6 +125,8 @@ mod tests { use pallas_network::miniprotocols::localmsgsubmission::{DmqMsg, DmqMsgPayload}; use tokio::time::sleep; + use crate::model::MockUnixTimestampProvider; + use super::*; fn fake_msg() -> DmqMsg { @@ -104,20 +143,38 @@ mod tests { } } - fn fake_messages(range: RangeInclusive) -> Vec { + fn fake_messages(range: RangeInclusive, expires_at: u32) -> Vec { range .map(|i| { let mut message = fake_msg(); message.msg_payload.msg_id = vec![i]; + message.msg_payload.expires_at = expires_at; message.into() }) .collect::>() } + fn create_queue(max_size: usize, current_timestamp: u64) -> MessageQueue { + MessageQueue::new( + max_size, + Arc::new({ + let mut mock_timestamp_provider = MockUnixTimestampProvider::new(); + mock_timestamp_provider + .expect_current_timestamp() + .returning(move || Ok(current_timestamp)); + + mock_timestamp_provider + }), + ) + } + #[tokio::test] async fn enqueue_and_dequeue_non_blocking_no_limit() { - let queue = MessageQueue::default(); - let messages = fake_messages(1..=5); + let max_size = 100; + let current_timestamp = 10; + let expires_at = 100; + let queue = create_queue(max_size, current_timestamp); + let messages = fake_messages(1..=5, expires_at); for message in messages.clone() { queue.enqueue(message).await; } @@ -130,8 +187,11 @@ mod tests { #[tokio::test] async fn enqueue_and_dequeue_non_blocking_with_limit() { - let queue = MessageQueue::default(); - let messages = fake_messages(1..=5); + let max_size = 100; + let current_timestamp = 10; + let expires_at = 100; + let queue = create_queue(max_size, current_timestamp); + let messages = fake_messages(1..=5, expires_at); for message in messages.clone() { queue.enqueue(message).await; } @@ -144,8 +204,11 @@ mod tests { #[tokio::test] async fn enqueue_and_dequeue_blocking_no_limit() { - let queue = MessageQueue::default(); - let messages = fake_messages(1..=5); + let max_size = 100; + let current_timestamp = 10; + let expires_at = 100; + let queue = create_queue(max_size, current_timestamp); + let messages = fake_messages(1..=5, expires_at); for message in messages.clone() { queue.enqueue(message).await; } @@ -158,8 +221,11 @@ mod tests { #[tokio::test] async fn enqueue_and_dequeue_blocking_with_limit() { - let queue = MessageQueue::default(); - let messages = fake_messages(1..=5); + let max_size = 100; + let current_timestamp = 10; + let expires_at = 100; + let queue = create_queue(max_size, current_timestamp); + let messages = fake_messages(1..=5, expires_at); for message in messages.clone() { queue.enqueue(message).await; } @@ -172,7 +238,9 @@ mod tests { #[tokio::test] async fn dequeue_blocking_blocks_when_no_message_available() { - let queue = MessageQueue::default(); + let max_size = 100; + let current_timestamp = 10; + let queue = create_queue(max_size, current_timestamp); let result = tokio::select!( _res = sleep(Duration::from_millis(100)) => {Err(anyhow!("Timeout"))}, @@ -183,10 +251,12 @@ mod tests { } #[tokio::test] - async fn enqueue_blocks_over_max_size_drains_oldest_messages() { - let max_queue_size = 3; - let queue = MessageQueue::new(max_queue_size); - let messages = fake_messages(1..=5); + async fn queue_drains_oldest_messages_when_full() { + let max_size = 3; + let current_timestamp = 10; + let expires_at = 100; + let queue = create_queue(max_size, current_timestamp); + let messages = fake_messages(1..=5, expires_at); for message in messages.clone() { queue.enqueue(message).await; } @@ -196,4 +266,26 @@ mod tests { assert_eq!(messages[2..=4].to_vec(), dequeued_messages); } + + #[tokio::test] + async fn queue_drains_expired_message() { + let max_size = 3; + let current_timestamp = 10; + let expires_at_expired = 1; + let expires_at_non_expired = 100; + let queue = create_queue(max_size, current_timestamp); + let mut messages = fake_messages(1..=10, expires_at_expired); + for (index, mut message) in messages.clone().into_iter().enumerate() { + if index >= 5 { + message.msg_payload.expires_at = expires_at_non_expired; + messages[index] = message.clone(); + } + queue.enqueue(message).await; + } + let limit = None; + + let dequeued_messages = queue.dequeue_blocking(limit).await; + + assert_eq!(messages[7..=9].to_vec(), dequeued_messages); + } } From aac2bb6499a9e017522452995ddfb4207afca692 Mon Sep 17 00:00:00 2001 From: Jean-Philippe Raynaud Date: Fri, 5 Sep 2025 17:52:42 +0200 Subject: [PATCH 20/24] tests(dmq): provide tooling for creating signed fake DMQ messages --- internal/mithril-dmq/src/test/fake_message.rs | 26 +++++++++++++++++++ internal/mithril-dmq/src/test/mod.rs | 2 +- 2 files changed, 27 insertions(+), 1 deletion(-) create mode 100644 internal/mithril-dmq/src/test/fake_message.rs diff --git a/internal/mithril-dmq/src/test/fake_message.rs b/internal/mithril-dmq/src/test/fake_message.rs new file mode 100644 index 00000000000..91cc5e28360 --- /dev/null +++ b/internal/mithril-dmq/src/test/fake_message.rs @@ -0,0 +1,26 @@ +//! Fake message computation for testing purposes. + +use std::sync::Arc; + +use mithril_cardano_node_chain::test::double::FakeChainObserver; +use mithril_common::{crypto_helper::TryToBytes, test::crypto_helper::KesSignerFake}; + +use crate::{DmqMessage, DmqMessageBuilder, test::payload::DmqMessageTestPayload}; + +/// Computes a fake DMQ message for testing purposes. +pub async fn compute_fake_msg(bytes: &[u8], test_directory: &str) -> DmqMessage { + let dmq_builder = DmqMessageBuilder::new( + { + let (kes_signature, operational_certificate) = + KesSignerFake::dummy_signature(test_directory); + let kes_signer = + KesSignerFake::new(vec![Ok((kes_signature, operational_certificate.clone()))]); + + Arc::new(kes_signer) + }, + Arc::new(FakeChainObserver::default()), + ) + .set_ttl(100); + let message = DmqMessageTestPayload::new(bytes); + dmq_builder.build(&message.to_bytes_vec().unwrap()).await.unwrap() +} diff --git a/internal/mithril-dmq/src/test/mod.rs b/internal/mithril-dmq/src/test/mod.rs index 97121e33c28..4670dd1b5e5 100644 --- a/internal/mithril-dmq/src/test/mod.rs +++ b/internal/mithril-dmq/src/test/mod.rs @@ -5,7 +5,7 @@ //! This module provides in particular test doubles for the traits defined in this crate. pub mod double; - +pub mod fake_message; pub mod payload; #[cfg(test)] From 8bdc5ad6ed60a42445c8c2580cd0d6e81421a5b3 Mon Sep 17 00:00:00 2001 From: Jean-Philippe Raynaud Date: Fri, 5 Sep 2025 17:54:37 +0200 Subject: [PATCH 21/24] fix(dmq): blocking tests missing KES signed DMQ messages --- .../mithril-dmq/src/consumer/server/pallas.rs | 31 +++++----------- .../tests/consumer_client_server.rs | 35 ++++--------------- .../tests/publisher_client_server.rs | 30 ++++------------ 3 files changed, 20 insertions(+), 76 deletions(-) diff --git a/internal/mithril-dmq/src/consumer/server/pallas.rs b/internal/mithril-dmq/src/consumer/server/pallas.rs index 7d418bb7b59..4888e2f083a 100644 --- a/internal/mithril-dmq/src/consumer/server/pallas.rs +++ b/internal/mithril-dmq/src/consumer/server/pallas.rs @@ -282,17 +282,14 @@ mod tests { use pallas_network::{ facades::DmqClient, - miniprotocols::{ - localmsgnotification, - localmsgsubmission::{DmqMsg, DmqMsgPayload}, - }, + miniprotocols::{localmsgnotification, localmsgsubmission::DmqMsg}, }; use tokio::sync::{mpsc::unbounded_channel, watch}; use tokio::time::sleep; use mithril_common::{current_function, test::TempDir}; - use crate::test_tools::TestLogger; + use crate::{test::fake_message::compute_fake_msg, test_tools::TestLogger}; use super::*; @@ -300,25 +297,12 @@ mod tests { TempDir::create_with_short_path("dmq_consumer_server", folder_name) } - fn fake_msg() -> DmqMsg { - DmqMsg { - msg_payload: DmqMsgPayload { - msg_id: vec![0, 1], - msg_body: vec![0, 1, 2], - kes_period: 10, - expires_at: 100, - }, - kes_signature: vec![0, 1, 2, 3], - operational_certificate: vec![0, 1, 2, 3, 4], - cold_verification_key: vec![0, 1, 2, 3, 4, 5], - } - } - #[tokio::test(flavor = "multi_thread")] async fn pallas_dmq_consumer_server_non_blocking_success() { + let current_function_name = current_function!(); let (stop_tx, stop_rx) = watch::channel(()); let (signature_dmq_tx, signature_dmq_rx) = unbounded_channel::(); - let socket_path = create_temp_dir(current_function!()).join("node.socket"); + let socket_path = create_temp_dir(current_function_name).join("node.socket"); let cardano_network = CardanoNetwork::TestNet(0); let dmq_consumer_server = Arc::new(DmqConsumerServerPallas::new( socket_path.to_path_buf(), @@ -327,7 +311,7 @@ mod tests { TestLogger::stdout(), )); dmq_consumer_server.register_receiver(signature_dmq_rx).await.unwrap(); - let message = fake_msg(); + let message: DmqMsg = compute_fake_msg(b"test", current_function_name).await.into(); let client = tokio::spawn({ async move { // sleep to avoid refused connection from the server @@ -375,9 +359,10 @@ mod tests { #[tokio::test(flavor = "multi_thread")] async fn pallas_dmq_consumer_server_blocking_success() { + let current_function_name = current_function!(); let (stop_tx, stop_rx) = watch::channel(()); let (signature_dmq_tx, signature_dmq_rx) = unbounded_channel::(); - let socket_path = create_temp_dir(current_function!()).join("node.socket"); + let socket_path = create_temp_dir(current_function_name).join("node.socket"); let cardano_network = CardanoNetwork::TestNet(0); let dmq_consumer_server = Arc::new(DmqConsumerServerPallas::new( socket_path.to_path_buf(), @@ -386,7 +371,7 @@ mod tests { TestLogger::stdout(), )); dmq_consumer_server.register_receiver(signature_dmq_rx).await.unwrap(); - let message = fake_msg(); + let message: DmqMsg = compute_fake_msg(b"test", current_function_name).await.into(); let client = tokio::spawn({ async move { // sleep to avoid refused connection from the server diff --git a/internal/mithril-dmq/tests/consumer_client_server.rs b/internal/mithril-dmq/tests/consumer_client_server.rs index d3b35317f55..1cd453cf210 100644 --- a/internal/mithril-dmq/tests/consumer_client_server.rs +++ b/internal/mithril-dmq/tests/consumer_client_server.rs @@ -3,35 +3,12 @@ use std::sync::Arc; use tokio::sync::{mpsc::unbounded_channel, watch}; -use mithril_cardano_node_chain::test::double::FakeChainObserver; -use mithril_common::{ - CardanoNetwork, - crypto_helper::TryToBytes, - current_function, - test::{TempDir, crypto_helper::KesSignerFake}, -}; +use mithril_common::{CardanoNetwork, current_function, test::TempDir}; use mithril_dmq::{ DmqConsumerClient, DmqConsumerClientPallas, DmqConsumerServer, DmqConsumerServerPallas, - DmqMessage, DmqMessageBuilder, test::payload::DmqMessageTestPayload, + DmqMessage, test::fake_message::compute_fake_msg, test::payload::DmqMessageTestPayload, }; -async fn create_fake_msg(bytes: &[u8], test_directory: &str) -> DmqMessage { - let dmq_builder = DmqMessageBuilder::new( - { - let (kes_signature, operational_certificate) = - KesSignerFake::dummy_signature(test_directory); - let kes_signer = - KesSignerFake::new(vec![Ok((kes_signature, operational_certificate.clone()))]); - - Arc::new(kes_signer) - }, - Arc::new(FakeChainObserver::default()), - ) - .set_ttl(100); - let message = DmqMessageTestPayload::new(bytes); - dmq_builder.build(&message.to_bytes_vec().unwrap()).await.unwrap() -} - #[tokio::test(flavor = "multi_thread")] async fn dmq_consumer_client_server() { let current_function_name = current_function!(); @@ -69,14 +46,14 @@ async fn dmq_consumer_client_server() { ); let mut messages = vec![]; signature_dmq_tx - .send(create_fake_msg(b"msg_1", current_function_name).await) + .send(compute_fake_msg(b"msg_1", current_function_name).await) .unwrap(); signature_dmq_tx - .send(create_fake_msg(b"msg_2", current_function_name).await) + .send(compute_fake_msg(b"msg_2", current_function_name).await) .unwrap(); messages.extend_from_slice(&consumer_client.consume_messages().await.unwrap()); signature_dmq_tx - .send(create_fake_msg(b"msg_3", current_function_name).await) + .send(compute_fake_msg(b"msg_3", current_function_name).await) .unwrap(); messages.extend_from_slice(&consumer_client.consume_messages().await.unwrap()); @@ -109,7 +86,7 @@ async fn dmq_consumer_client_server() { ); let mut messages = vec![]; signature_dmq_tx - .send(create_fake_msg(b"msg_4", current_function_name).await) + .send(compute_fake_msg(b"msg_4", current_function_name).await) .unwrap(); messages.extend_from_slice(&consumer_client.consume_messages().await.unwrap()); stop_tx.send(()).unwrap(); diff --git a/internal/mithril-dmq/tests/publisher_client_server.rs b/internal/mithril-dmq/tests/publisher_client_server.rs index 8aebd9c6f6d..b90728780be 100644 --- a/internal/mithril-dmq/tests/publisher_client_server.rs +++ b/internal/mithril-dmq/tests/publisher_client_server.rs @@ -5,33 +5,15 @@ use tokio::sync::{mpsc::unbounded_channel, watch}; use mithril_cardano_node_chain::test::double::FakeChainObserver; use mithril_common::{ - CardanoNetwork, - crypto_helper::TryToBytes, - current_function, + CardanoNetwork, current_function, test::{TempDir, crypto_helper::KesSignerFake}, }; use mithril_dmq::{ DmqMessage, DmqMessageBuilder, DmqPublisherClient, DmqPublisherClientPallas, - DmqPublisherServer, DmqPublisherServerPallas, test::payload::DmqMessageTestPayload, + DmqPublisherServer, DmqPublisherServerPallas, + test::{fake_message::compute_fake_msg, payload::DmqMessageTestPayload}, }; -async fn create_fake_msg(bytes: &[u8], test_directory: &str) -> DmqMessage { - let dmq_builder = DmqMessageBuilder::new( - { - let (kes_signature, operational_certificate) = - KesSignerFake::dummy_signature(test_directory); - let kes_signer = - KesSignerFake::new(vec![Ok((kes_signature, operational_certificate.clone()))]); - - Arc::new(kes_signer) - }, - Arc::new(FakeChainObserver::default()), - ) - .set_ttl(100); - let message = DmqMessageTestPayload::new(bytes); - dmq_builder.build(&message.to_bytes_vec().unwrap()).await.unwrap() -} - #[tokio::test] async fn dmq_publisher_client_server() { let current_function_name = current_function!(); @@ -157,9 +139,9 @@ async fn dmq_publisher_client_server() { let (_, _, messages) = tokio::try_join!(server, client, recorder).unwrap(); assert_eq!( vec![ - create_fake_msg(b"msg_1", current_function_name).await, - create_fake_msg(b"msg_2", current_function_name).await, - create_fake_msg(b"msg_3", current_function_name).await, + compute_fake_msg(b"msg_1", current_function_name).await, + compute_fake_msg(b"msg_2", current_function_name).await, + compute_fake_msg(b"msg_3", current_function_name).await, ], messages ); From 99e81e1dba204de00147ad8128ae113e0ea393b4 Mon Sep 17 00:00:00 2001 From: Jean-Philippe Raynaud Date: Mon, 8 Sep 2025 18:05:02 +0200 Subject: [PATCH 22/24] chore(dmq): apply review comments --- .../mithril-dmq/src/consumer/server/queue.rs | 24 +++++++++++++------ 1 file changed, 17 insertions(+), 7 deletions(-) diff --git a/internal/mithril-dmq/src/consumer/server/queue.rs b/internal/mithril-dmq/src/consumer/server/queue.rs index ed88db989c9..614cde640ce 100644 --- a/internal/mithril-dmq/src/consumer/server/queue.rs +++ b/internal/mithril-dmq/src/consumer/server/queue.rs @@ -270,22 +270,32 @@ mod tests { #[tokio::test] async fn queue_drains_expired_message() { let max_size = 3; + let total_expired_messages = 4; + let total_non_expired_messages = 6; let current_timestamp = 10; let expires_at_expired = 1; let expires_at_non_expired = 100; let queue = create_queue(max_size, current_timestamp); - let mut messages = fake_messages(1..=10, expires_at_expired); - for (index, mut message) in messages.clone().into_iter().enumerate() { - if index >= 5 { - message.msg_payload.expires_at = expires_at_non_expired; - messages[index] = message.clone(); - } + let expired_messages = fake_messages(1..=total_expired_messages, expires_at_expired); + let non_expired_messages = fake_messages( + total_expired_messages + 1..=total_non_expired_messages + total_expired_messages, + expires_at_non_expired, + ); + for message in expired_messages.clone() { + queue.enqueue(message).await; + } + for message in non_expired_messages.clone() { queue.enqueue(message).await; } let limit = None; let dequeued_messages = queue.dequeue_blocking(limit).await; - assert_eq!(messages[7..=9].to_vec(), dequeued_messages); + let expected_non_expired_messages_range = + total_non_expired_messages as usize - max_size..total_non_expired_messages as usize; + assert_eq!( + non_expired_messages[expected_non_expired_messages_range].to_vec(), + dequeued_messages + ); } } From 3570008ba8606724b089fdbcb880c77f631f2b2d Mon Sep 17 00:00:00 2001 From: Jean-Philippe Raynaud Date: Mon, 22 Sep 2025 14:38:26 +0200 Subject: [PATCH 23/24] chore: update pallas revision --- Cargo.lock | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0f6eb0a5a09..0debd2bda1a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5072,7 +5072,7 @@ dependencies = [ [[package]] name = "pallas-codec" version = "1.0.0-alpha.2" -source = "git+https://github.com/txpipe/pallas.git?branch=main#b9175a7e5b54b1b593f69bfdadf0652867143905" +source = "git+https://github.com/txpipe/pallas.git?branch=main#d34b143f838c16e33759d63cd3aec8eaab539111" dependencies = [ "hex", "minicbor 0.26.5", @@ -5098,12 +5098,12 @@ dependencies = [ [[package]] name = "pallas-crypto" version = "1.0.0-alpha.2" -source = "git+https://github.com/txpipe/pallas.git?branch=main#b9175a7e5b54b1b593f69bfdadf0652867143905" +source = "git+https://github.com/txpipe/pallas.git?branch=main#d34b143f838c16e33759d63cd3aec8eaab539111" dependencies = [ "cryptoxide", "hex", "pallas-codec 1.0.0-alpha.2", - "rand_core 0.6.4", + "rand_core 0.9.3", "serde", "thiserror 1.0.69", ] @@ -5129,7 +5129,7 @@ dependencies = [ [[package]] name = "pallas-network" version = "1.0.0-alpha.2" -source = "git+https://github.com/txpipe/pallas.git?branch=main#b9175a7e5b54b1b593f69bfdadf0652867143905" +source = "git+https://github.com/txpipe/pallas.git?branch=main#d34b143f838c16e33759d63cd3aec8eaab539111" dependencies = [ "byteorder", "hex", From fee0ce151a6b719e347946608c766c175dc314e9 Mon Sep 17 00:00:00 2001 From: Jean-Philippe Raynaud Date: Mon, 22 Sep 2025 14:38:40 +0200 Subject: [PATCH 24/24] chore: upgrade crate versions * mithril-dmq from `0.1.10` to `0.1.11` * mithril-aggregator from `0.7.85` to `0.7.86` * mithril-common from `0.6.18` to `0.6.19` * mithril-signer from `0.2.269` to `0.2.270` --- Cargo.lock | 8 ++++---- internal/mithril-dmq/Cargo.toml | 2 +- mithril-aggregator/Cargo.toml | 2 +- mithril-common/Cargo.toml | 2 +- mithril-signer/Cargo.toml | 2 +- 5 files changed, 8 insertions(+), 8 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0debd2bda1a..6b314cb93af 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3989,7 +3989,7 @@ dependencies = [ [[package]] name = "mithril-aggregator" -version = "0.7.85" +version = "0.7.86" dependencies = [ "anyhow", "async-trait", @@ -4262,7 +4262,7 @@ dependencies = [ [[package]] name = "mithril-common" -version = "0.6.18" +version = "0.6.19" dependencies = [ "anyhow", "async-trait", @@ -4304,7 +4304,7 @@ dependencies = [ [[package]] name = "mithril-dmq" -version = "0.1.10" +version = "0.1.11" dependencies = [ "anyhow", "async-trait", @@ -4479,7 +4479,7 @@ dependencies = [ [[package]] name = "mithril-signer" -version = "0.2.269" +version = "0.2.270" dependencies = [ "anyhow", "async-trait", diff --git a/internal/mithril-dmq/Cargo.toml b/internal/mithril-dmq/Cargo.toml index ca6af881dfd..7a5b53cc1c0 100644 --- a/internal/mithril-dmq/Cargo.toml +++ b/internal/mithril-dmq/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "mithril-dmq" description = "Mechanisms to publish and consume messages of a 'Decentralized Message Queue network' through a DMQ node" -version = "0.1.10" +version = "0.1.11" authors.workspace = true documentation.workspace = true edition.workspace = true diff --git a/mithril-aggregator/Cargo.toml b/mithril-aggregator/Cargo.toml index d9c3b92803b..13eb7606775 100644 --- a/mithril-aggregator/Cargo.toml +++ b/mithril-aggregator/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "mithril-aggregator" -version = "0.7.85" +version = "0.7.86" description = "A Mithril Aggregator server" authors = { workspace = true } edition = { workspace = true } diff --git a/mithril-common/Cargo.toml b/mithril-common/Cargo.toml index 8f5f401570d..c45e7cdb72e 100644 --- a/mithril-common/Cargo.toml +++ b/mithril-common/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "mithril-common" -version = "0.6.18" +version = "0.6.19" description = "Common types, interfaces, and utilities for Mithril nodes." authors = { workspace = true } edition = { workspace = true } diff --git a/mithril-signer/Cargo.toml b/mithril-signer/Cargo.toml index e7c1f9b25d7..6beed3eb73c 100644 --- a/mithril-signer/Cargo.toml +++ b/mithril-signer/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "mithril-signer" -version = "0.2.269" +version = "0.2.270" description = "A Mithril Signer" authors = { workspace = true } edition = { workspace = true }