diff --git a/Cargo.lock b/Cargo.lock index 0f6eb0a5a09..6b314cb93af 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3989,7 +3989,7 @@ dependencies = [ [[package]] name = "mithril-aggregator" -version = "0.7.85" +version = "0.7.86" dependencies = [ "anyhow", "async-trait", @@ -4262,7 +4262,7 @@ dependencies = [ [[package]] name = "mithril-common" -version = "0.6.18" +version = "0.6.19" dependencies = [ "anyhow", "async-trait", @@ -4304,7 +4304,7 @@ dependencies = [ [[package]] name = "mithril-dmq" -version = "0.1.10" +version = "0.1.11" dependencies = [ "anyhow", "async-trait", @@ -4479,7 +4479,7 @@ dependencies = [ [[package]] name = "mithril-signer" -version = "0.2.269" +version = "0.2.270" dependencies = [ "anyhow", "async-trait", @@ -5072,7 +5072,7 @@ dependencies = [ [[package]] name = "pallas-codec" version = "1.0.0-alpha.2" -source = "git+https://github.com/txpipe/pallas.git?branch=main#b9175a7e5b54b1b593f69bfdadf0652867143905" +source = "git+https://github.com/txpipe/pallas.git?branch=main#d34b143f838c16e33759d63cd3aec8eaab539111" dependencies = [ "hex", "minicbor 0.26.5", @@ -5098,12 +5098,12 @@ dependencies = [ [[package]] name = "pallas-crypto" version = "1.0.0-alpha.2" -source = "git+https://github.com/txpipe/pallas.git?branch=main#b9175a7e5b54b1b593f69bfdadf0652867143905" +source = "git+https://github.com/txpipe/pallas.git?branch=main#d34b143f838c16e33759d63cd3aec8eaab539111" dependencies = [ "cryptoxide", "hex", "pallas-codec 1.0.0-alpha.2", - "rand_core 0.6.4", + "rand_core 0.9.3", "serde", "thiserror 1.0.69", ] @@ -5129,7 +5129,7 @@ dependencies = [ [[package]] name = "pallas-network" version = "1.0.0-alpha.2" -source = "git+https://github.com/txpipe/pallas.git?branch=main#b9175a7e5b54b1b593f69bfdadf0652867143905" +source = "git+https://github.com/txpipe/pallas.git?branch=main#d34b143f838c16e33759d63cd3aec8eaab539111" dependencies = [ "byteorder", "hex", diff --git a/internal/mithril-dmq/Cargo.toml b/internal/mithril-dmq/Cargo.toml index ca6af881dfd..7a5b53cc1c0 100644 --- a/internal/mithril-dmq/Cargo.toml +++ b/internal/mithril-dmq/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "mithril-dmq" description = "Mechanisms to publish and consume messages of a 'Decentralized Message Queue network' through a DMQ node" -version = "0.1.10" +version = "0.1.11" authors.workspace = true documentation.workspace = true edition.workspace = true diff --git a/internal/mithril-dmq/src/consumer/client/pallas.rs b/internal/mithril-dmq/src/consumer/client/pallas.rs index 6030e7b9bcb..fa9ee422951 100644 --- a/internal/mithril-dmq/src/consumer/client/pallas.rs +++ b/internal/mithril-dmq/src/consumer/client/pallas.rs @@ -7,7 +7,9 @@ use tokio::sync::{Mutex, MutexGuard}; use mithril_common::{ CardanoNetwork, StdResult, - crypto_helper::{OpCert, TryFromBytes}, + crypto_helper::{ + OpCert, OpCertWithoutColdVerificationKey, TryFromBytes, ed25519::Ed25519VerificationKey, + }, entities::PartyId, logging::LoggerExtensions, }; @@ -113,10 +115,19 @@ impl DmqConsumerClientPallas { .0 .into_iter() .map(|dmq_message| { - let opcert = OpCert::try_from_bytes(&dmq_message.operational_certificate) + let opcert_without_verification_key = + OpCertWithoutColdVerificationKey::try_from_bytes( + &dmq_message.operational_certificate, + ) .with_context(|| "Failed to parse operational certificate")?; + let cold_verification_key = + Ed25519VerificationKey::from_bytes(&dmq_message.cold_verification_key) + .with_context(|| "Failed to parse cold verification key")? + .into_inner(); + let opcert: OpCert = + (opcert_without_verification_key, cold_verification_key).into(); let party_id = opcert.compute_protocol_party_id()?; - let payload = M::try_from_bytes(&dmq_message.msg_body) + let payload = M::try_from_bytes(&dmq_message.msg_payload.msg_body) .with_context(|| "Failed to parse DMQ message body")?; Ok((payload, party_id)) @@ -158,7 +169,10 @@ mod tests { use mithril_common::{crypto_helper::TryToBytes, current_function, test::TempDir}; use pallas_network::{ facades::DmqServer, - miniprotocols::{localmsgnotification, localmsgsubmission::DmqMsg}, + miniprotocols::{ + localmsgnotification, + localmsgsubmission::{DmqMsg, DmqMsgPayload}, + }, }; use tokio::{net::UnixListener, task::JoinHandle, time::sleep}; @@ -173,41 +187,46 @@ mod tests { fn fake_msgs() -> Vec { vec![ DmqMsg { - msg_id: vec![0, 1], - msg_body: DmqMessageTestPayload::new(b"msg_1").to_bytes_vec().unwrap(), - block_number: 10, - ttl: 100, + msg_payload: DmqMsgPayload { + msg_id: vec![0, 1], + msg_body: DmqMessageTestPayload::new(b"msg_1").to_bytes_vec().unwrap(), + kes_period: 10, + expires_at: 100, + }, kes_signature: vec![0, 1, 2, 3], operational_certificate: vec![ - 130, 132, 88, 32, 230, 80, 215, 83, 21, 9, 187, 108, 255, 215, 153, 140, 40, - 198, 142, 78, 200, 250, 98, 26, 9, 82, 32, 110, 161, 30, 176, 63, 205, 125, - 203, 41, 0, 0, 88, 64, 212, 171, 206, 39, 218, 5, 255, 3, 193, 52, 44, 198, - 171, 83, 19, 80, 114, 225, 186, 191, 156, 192, 84, 146, 245, 159, 31, 240, 9, - 247, 4, 87, 170, 168, 98, 199, 21, 139, 19, 190, 12, 251, 65, 215, 169, 26, 86, - 37, 137, 188, 17, 14, 178, 205, 175, 93, 39, 86, 4, 138, 187, 234, 95, 5, 88, - 32, 32, 253, 186, 201, 177, 11, 117, 135, 187, 167, 181, 188, 22, 59, 206, 105, + 132, 88, 32, 230, 80, 215, 83, 21, 9, 187, 108, 255, 215, 153, 140, 40, 198, + 142, 78, 200, 250, 98, 26, 9, 82, 32, 110, 161, 30, 176, 63, 205, 125, 203, 41, + 0, 0, 88, 64, 212, 171, 206, 39, 218, 5, 255, 3, 193, 52, 44, 198, 171, 83, 19, + 80, 114, 225, 186, 191, 156, 192, 84, 146, 245, 159, 31, 240, 9, 247, 4, 87, + 170, 168, 98, 199, 21, 139, 19, 190, 12, 251, 65, 215, 169, 26, 86, 37, 137, + 188, 17, 14, 178, 205, 175, 93, 39, 86, 4, 138, 187, 234, 95, 5, + ], + cold_verification_key: vec![ + 32, 253, 186, 201, 177, 11, 117, 135, 187, 167, 181, 188, 22, 59, 206, 105, 231, 150, 215, 30, 78, 212, 76, 16, 252, 180, 72, 134, 137, 247, 161, 68, ], - kes_period: 10, }, DmqMsg { - msg_id: vec![1, 2], - msg_body: DmqMessageTestPayload::new(b"msg_2").to_bytes_vec().unwrap(), - block_number: 11, - ttl: 100, + msg_payload: DmqMsgPayload { + msg_id: vec![1, 2], + msg_body: DmqMessageTestPayload::new(b"msg_2").to_bytes_vec().unwrap(), + kes_period: 11, + expires_at: 101, + }, kes_signature: vec![1, 2, 3, 4], operational_certificate: vec![ - 130, 132, 88, 32, 230, 80, 215, 83, 21, 9, 187, 108, 255, 215, 153, 140, 40, - 198, 142, 78, 200, 250, 98, 26, 9, 82, 32, 110, 161, 30, 176, 63, 205, 125, - 203, 41, 0, 0, 88, 64, 132, 4, 199, 39, 190, 173, 88, 102, 121, 117, 55, 62, - 39, 189, 113, 96, 175, 24, 171, 240, 74, 42, 139, 202, 128, 185, 44, 130, 209, - 77, 191, 122, 196, 224, 33, 158, 187, 156, 203, 190, 173, 150, 247, 87, 172, - 58, 153, 185, 157, 87, 128, 14, 187, 107, 187, 215, 105, 195, 107, 135, 172, - 43, 173, 9, 88, 32, 77, 75, 24, 6, 47, 133, 2, 89, 141, 224, 69, 202, 123, 105, - 240, 103, 245, 159, 147, 177, 110, 58, 248, 115, 58, 152, 138, 220, 35, 65, - 245, 200, + 132, 88, 32, 230, 80, 215, 83, 21, 9, 187, 108, 255, 215, 153, 140, 40, 198, + 142, 78, 200, 250, 98, 26, 9, 82, 32, 110, 161, 30, 176, 63, 205, 125, 203, 41, + 0, 0, 88, 64, 212, 171, 206, 39, 218, 5, 255, 3, 193, 52, 44, 198, 171, 83, 19, + 80, 114, 225, 186, 191, 156, 192, 84, 146, 245, 159, 31, 240, 9, 247, 4, 87, + 170, 168, 98, 199, 21, 139, 19, 190, 12, 251, 65, 215, 169, 26, 86, 37, 137, + 188, 17, 14, 178, 205, 175, 93, 39, 86, 4, 138, 187, 234, 95, 5, + ], + cold_verification_key: vec![ + 77, 75, 24, 6, 47, 133, 2, 89, 141, 224, 69, 202, 123, 105, 240, 103, 245, 159, + 147, 177, 110, 58, 248, 115, 58, 152, 138, 220, 35, 65, 245, 200, ], - kes_period: 11, }, ] } diff --git a/internal/mithril-dmq/src/consumer/server/pallas.rs b/internal/mithril-dmq/src/consumer/server/pallas.rs index 0a7ac3d0c3c..4888e2f083a 100644 --- a/internal/mithril-dmq/src/consumer/server/pallas.rs +++ b/internal/mithril-dmq/src/consumer/server/pallas.rs @@ -289,7 +289,7 @@ mod tests { use mithril_common::{current_function, test::TempDir}; - use crate::test_tools::TestLogger; + use crate::{test::fake_message::compute_fake_msg, test_tools::TestLogger}; use super::*; @@ -297,23 +297,12 @@ mod tests { TempDir::create_with_short_path("dmq_consumer_server", folder_name) } - fn fake_msg() -> DmqMsg { - DmqMsg { - msg_id: vec![0, 1], - msg_body: vec![0, 1, 2], - block_number: 10, - ttl: 100, - kes_signature: vec![0, 1, 2, 3], - operational_certificate: vec![0, 1, 2, 3, 4], - kes_period: 10, - } - } - #[tokio::test(flavor = "multi_thread")] async fn pallas_dmq_consumer_server_non_blocking_success() { + let current_function_name = current_function!(); let (stop_tx, stop_rx) = watch::channel(()); let (signature_dmq_tx, signature_dmq_rx) = unbounded_channel::(); - let socket_path = create_temp_dir(current_function!()).join("node.socket"); + let socket_path = create_temp_dir(current_function_name).join("node.socket"); let cardano_network = CardanoNetwork::TestNet(0); let dmq_consumer_server = Arc::new(DmqConsumerServerPallas::new( socket_path.to_path_buf(), @@ -322,7 +311,7 @@ mod tests { TestLogger::stdout(), )); dmq_consumer_server.register_receiver(signature_dmq_rx).await.unwrap(); - let message = fake_msg(); + let message: DmqMsg = compute_fake_msg(b"test", current_function_name).await.into(); let client = tokio::spawn({ async move { // sleep to avoid refused connection from the server @@ -370,9 +359,10 @@ mod tests { #[tokio::test(flavor = "multi_thread")] async fn pallas_dmq_consumer_server_blocking_success() { + let current_function_name = current_function!(); let (stop_tx, stop_rx) = watch::channel(()); let (signature_dmq_tx, signature_dmq_rx) = unbounded_channel::(); - let socket_path = create_temp_dir(current_function!()).join("node.socket"); + let socket_path = create_temp_dir(current_function_name).join("node.socket"); let cardano_network = CardanoNetwork::TestNet(0); let dmq_consumer_server = Arc::new(DmqConsumerServerPallas::new( socket_path.to_path_buf(), @@ -381,7 +371,7 @@ mod tests { TestLogger::stdout(), )); dmq_consumer_server.register_receiver(signature_dmq_rx).await.unwrap(); - let message = fake_msg(); + let message: DmqMsg = compute_fake_msg(b"test", current_function_name).await.into(); let client = tokio::spawn({ async move { // sleep to avoid refused connection from the server diff --git a/internal/mithril-dmq/src/consumer/server/queue.rs b/internal/mithril-dmq/src/consumer/server/queue.rs index 5ee5c8190f0..614cde640ce 100644 --- a/internal/mithril-dmq/src/consumer/server/queue.rs +++ b/internal/mithril-dmq/src/consumer/server/queue.rs @@ -1,13 +1,18 @@ -use std::collections::VecDeque; +use std::{collections::VecDeque, sync::Arc}; +use mithril_common::StdResult; use tokio::sync::{Mutex, Notify}; -use crate::DmqMessage; +use crate::{ + DmqMessage, + model::{SystemUnixTimestampProvider, UnixTimestampProvider}, +}; /// A queue for storing DMQ messages. pub(crate) struct MessageQueue { messages: Mutex>, new_message_notify: Notify, + timestamp_provider: Arc, max_size: usize, } @@ -16,33 +21,62 @@ impl MessageQueue { const MAX_SIZE_DEFAULT: usize = 10000; /// Creates a new instance of [BlockingNonBlockingQueue]. - pub fn new(max_size: usize) -> Self { + pub fn new(max_size: usize, timestamp_provider: Arc) -> Self { Self { messages: Mutex::new(VecDeque::new()), new_message_notify: Notify::new(), + timestamp_provider, max_size, } } - /// Enqueues a new message into the queue. - pub async fn enqueue(&self, message: DmqMessage) { + /// Cleans the queue + /// + /// Removes expired messages and ensures the queue does not exceed the maximum size. + async fn clean_queue(&self) { let mut message_queue_guard = self.messages.lock().await; - (*message_queue_guard).push_back(message); + // Remove expired messages from the front of the queue + // There may be other expired messages in the queue, but they will be removed on dequeue + // This avoids full scan of the queue. + while let Some(message) = message_queue_guard.front() + && self.has_message_expired(message).unwrap_or(false) + { + message_queue_guard.pop_front(); + } while message_queue_guard.len() > self.max_size { message_queue_guard.pop_front(); } + } + + /// Checks if a message has expired. + fn has_message_expired(&self, message: &DmqMessage) -> StdResult { + let current_timestamp: u32 = self.timestamp_provider.current_timestamp()?.try_into()?; + Ok(message.msg_payload.expires_at < current_timestamp) + } + + /// Enqueues a new message into the queue. + pub async fn enqueue(&self, message: DmqMessage) { + { + // Run in a block to avoid Mutex deadlock in clean_queue + let mut message_queue_guard = self.messages.lock().await; + (*message_queue_guard).push_back(message); + } + self.clean_queue().await; self.new_message_notify.notify_waiters(); } /// Returns the messages from the queue in a non blocking way, if available. pub async fn dequeue_non_blocking(&self, limit: Option) -> Vec { + self.clean_queue().await; let mut message_queue_guard = self.messages.lock().await; let limit = limit.unwrap_or((*message_queue_guard).len()); let mut messages = Vec::new(); for _ in 0..limit { - if let Some(message) = (*message_queue_guard).pop_front() { + if let Some(message) = (*message_queue_guard).pop_front() + && !self.has_message_expired(&message).unwrap_or(false) + { messages.push(message); } } @@ -76,7 +110,10 @@ impl MessageQueue { impl Default for MessageQueue { fn default() -> Self { - Self::new(Self::MAX_SIZE_DEFAULT) + Self::new( + Self::MAX_SIZE_DEFAULT, + Arc::new(SystemUnixTimestampProvider), + ) } } @@ -85,39 +122,59 @@ mod tests { use std::{ops::RangeInclusive, time::Duration}; use anyhow::anyhow; - use pallas_network::miniprotocols::localmsgsubmission::DmqMsg; + use pallas_network::miniprotocols::localmsgsubmission::{DmqMsg, DmqMsgPayload}; use tokio::time::sleep; + use crate::model::MockUnixTimestampProvider; + use super::*; fn fake_msg() -> DmqMsg { DmqMsg { - msg_id: vec![0, 1], - msg_body: vec![0, 1, 2], - block_number: 10, - ttl: 100, + msg_payload: DmqMsgPayload { + msg_id: vec![0, 1], + msg_body: vec![0, 1, 2], + kes_period: 10, + expires_at: 100, + }, kes_signature: vec![0, 1, 2, 3], operational_certificate: vec![0, 1, 2, 3, 4], - kes_period: 10, + cold_verification_key: vec![0, 1, 2, 3, 4, 5], } } - fn fake_messages(range: RangeInclusive) -> Vec { + fn fake_messages(range: RangeInclusive, expires_at: u32) -> Vec { range .map(|i| { - DmqMsg { - msg_id: vec![i], - ..fake_msg() - } - .into() + let mut message = fake_msg(); + message.msg_payload.msg_id = vec![i]; + message.msg_payload.expires_at = expires_at; + message.into() }) .collect::>() } + fn create_queue(max_size: usize, current_timestamp: u64) -> MessageQueue { + MessageQueue::new( + max_size, + Arc::new({ + let mut mock_timestamp_provider = MockUnixTimestampProvider::new(); + mock_timestamp_provider + .expect_current_timestamp() + .returning(move || Ok(current_timestamp)); + + mock_timestamp_provider + }), + ) + } + #[tokio::test] async fn enqueue_and_dequeue_non_blocking_no_limit() { - let queue = MessageQueue::default(); - let messages = fake_messages(1..=5); + let max_size = 100; + let current_timestamp = 10; + let expires_at = 100; + let queue = create_queue(max_size, current_timestamp); + let messages = fake_messages(1..=5, expires_at); for message in messages.clone() { queue.enqueue(message).await; } @@ -130,8 +187,11 @@ mod tests { #[tokio::test] async fn enqueue_and_dequeue_non_blocking_with_limit() { - let queue = MessageQueue::default(); - let messages = fake_messages(1..=5); + let max_size = 100; + let current_timestamp = 10; + let expires_at = 100; + let queue = create_queue(max_size, current_timestamp); + let messages = fake_messages(1..=5, expires_at); for message in messages.clone() { queue.enqueue(message).await; } @@ -144,8 +204,11 @@ mod tests { #[tokio::test] async fn enqueue_and_dequeue_blocking_no_limit() { - let queue = MessageQueue::default(); - let messages = fake_messages(1..=5); + let max_size = 100; + let current_timestamp = 10; + let expires_at = 100; + let queue = create_queue(max_size, current_timestamp); + let messages = fake_messages(1..=5, expires_at); for message in messages.clone() { queue.enqueue(message).await; } @@ -158,8 +221,11 @@ mod tests { #[tokio::test] async fn enqueue_and_dequeue_blocking_with_limit() { - let queue = MessageQueue::default(); - let messages = fake_messages(1..=5); + let max_size = 100; + let current_timestamp = 10; + let expires_at = 100; + let queue = create_queue(max_size, current_timestamp); + let messages = fake_messages(1..=5, expires_at); for message in messages.clone() { queue.enqueue(message).await; } @@ -172,7 +238,9 @@ mod tests { #[tokio::test] async fn dequeue_blocking_blocks_when_no_message_available() { - let queue = MessageQueue::default(); + let max_size = 100; + let current_timestamp = 10; + let queue = create_queue(max_size, current_timestamp); let result = tokio::select!( _res = sleep(Duration::from_millis(100)) => {Err(anyhow!("Timeout"))}, @@ -183,10 +251,12 @@ mod tests { } #[tokio::test] - async fn enqueue_blocks_over_max_size_drains_oldest_messages() { - let max_queue_size = 3; - let queue = MessageQueue::new(max_queue_size); - let messages = fake_messages(1..=5); + async fn queue_drains_oldest_messages_when_full() { + let max_size = 3; + let current_timestamp = 10; + let expires_at = 100; + let queue = create_queue(max_size, current_timestamp); + let messages = fake_messages(1..=5, expires_at); for message in messages.clone() { queue.enqueue(message).await; } @@ -196,4 +266,36 @@ mod tests { assert_eq!(messages[2..=4].to_vec(), dequeued_messages); } + + #[tokio::test] + async fn queue_drains_expired_message() { + let max_size = 3; + let total_expired_messages = 4; + let total_non_expired_messages = 6; + let current_timestamp = 10; + let expires_at_expired = 1; + let expires_at_non_expired = 100; + let queue = create_queue(max_size, current_timestamp); + let expired_messages = fake_messages(1..=total_expired_messages, expires_at_expired); + let non_expired_messages = fake_messages( + total_expired_messages + 1..=total_non_expired_messages + total_expired_messages, + expires_at_non_expired, + ); + for message in expired_messages.clone() { + queue.enqueue(message).await; + } + for message in non_expired_messages.clone() { + queue.enqueue(message).await; + } + let limit = None; + + let dequeued_messages = queue.dequeue_blocking(limit).await; + + let expected_non_expired_messages_range = + total_non_expired_messages as usize - max_size..total_non_expired_messages as usize; + assert_eq!( + non_expired_messages[expected_non_expired_messages_range].to_vec(), + dequeued_messages + ); + } } diff --git a/internal/mithril-dmq/src/model/builder.rs b/internal/mithril-dmq/src/model/builder.rs index 2d49ae5afcc..2a14c301312 100644 --- a/internal/mithril-dmq/src/model/builder.rs +++ b/internal/mithril-dmq/src/model/builder.rs @@ -1,25 +1,26 @@ use std::sync::Arc; -use anyhow::{Context, anyhow}; +use anyhow::Context; use blake2::{Blake2b, Digest, digest::consts::U64}; -use pallas_network::miniprotocols::localmsgsubmission::DmqMsg; +use pallas_network::miniprotocols::localmsgsubmission::{DmqMsg, DmqMsgPayload}; use mithril_cardano_node_chain::chain_observer::ChainObserver; use mithril_common::{ StdResult, - crypto_helper::{KesSigner, TryToBytes}, + crypto_helper::{KesSigner, SerDeShelleyFileFormat, TryToBytes}, }; -use crate::model::DmqMessage; +use crate::model::{DmqMessage, SystemUnixTimestampProvider, UnixTimestampProvider}; -/// The TTL (Time To Live) for DMQ messages in blocks. -const DMQ_MESSAGE_TTL_IN_BLOCKS: u16 = 100; +/// The TTL (Time To Live) for DMQ messages in seconds (default is 30 minutes). +const DMQ_MESSAGE_TTL_IN_SECONDS: u16 = 1800; /// A builder for creating DMQ messages. pub struct DmqMessageBuilder { kes_signer: Arc, chain_observer: Arc, - ttl_blocks: u16, + timestamp_provider: Arc, + ttl_seconds: u16, } impl DmqMessageBuilder { @@ -28,63 +29,80 @@ impl DmqMessageBuilder { Self { kes_signer, chain_observer, - ttl_blocks: DMQ_MESSAGE_TTL_IN_BLOCKS, + timestamp_provider: Arc::new(SystemUnixTimestampProvider), + ttl_seconds: DMQ_MESSAGE_TTL_IN_SECONDS, } } - /// Set the TTL (Time To Live) for DMQ messages in blocks. - pub fn set_ttl(mut self, ttl_blocks: u16) -> Self { - self.ttl_blocks = ttl_blocks; + /// Sets the timestamp provider for the DMQ message builder. + pub fn set_timestamp_provider( + mut self, + timestamp_provider: Arc, + ) -> Self { + self.timestamp_provider = timestamp_provider; + self + } + + /// Sets the TTL (Time To Live) for DMQ messages in seconds. + pub fn set_ttl(mut self, ttl_seconds: u16) -> Self { + self.ttl_seconds = ttl_seconds; self } + /// Computes a message id for a DMQ message payload. + fn compute_msg_id(dmq_message_payload: &DmqMsgPayload) -> Vec { + let mut hasher = Blake2b::::new(); + hasher.update(&dmq_message_payload.msg_body); + hasher.update(dmq_message_payload.kes_period.to_be_bytes()); + hasher.update(dmq_message_payload.expires_at.to_be_bytes()); + + hasher.finalize().to_vec() + } + + /// Enriches a DMQ message payload with a computed message ID. + fn enrich_msg_payload_with_id(dmq_message_payload: DmqMsgPayload) -> DmqMsgPayload { + let msg_id = Self::compute_msg_id(&dmq_message_payload); + let mut dmq_message_payload_with_id = dmq_message_payload; + dmq_message_payload_with_id.msg_id = msg_id; + + dmq_message_payload_with_id + } + /// Builds a DMQ message from the provided message bytes. pub async fn build(&self, message_bytes: &[u8]) -> StdResult { - fn compute_msg_id(dmq_message: &DmqMsg) -> Vec { - let mut hasher = Blake2b::::new(); - hasher.update(&dmq_message.msg_body); - hasher.update(dmq_message.block_number.to_be_bytes()); - hasher.update(dmq_message.ttl.to_be_bytes()); - hasher.update(&dmq_message.kes_signature); - hasher.update(&dmq_message.operational_certificate); - hasher.update(dmq_message.kes_period.to_be_bytes()); - - hasher.finalize().to_vec() - } - - let block_number = self - .chain_observer - .get_current_chain_point() - .await - .with_context(|| "Failed to get current chain point while building DMQ message")? - .ok_or(anyhow!( - "No current chain point available while building DMQ message" - ))? - .block_number; - let block_number = (*block_number) + let expires_at: u32 = (self.timestamp_provider.current_timestamp()? + + self.ttl_seconds as u64) .try_into() - .with_context(|| "Failed to convert block number to u32")?; + .with_context(|| "Failed to compute expires_at while building DMQ message")?; let kes_period = self .chain_observer .get_current_kes_period() .await .with_context(|| "Failed to get KES period while building DMQ message")? .unwrap_or_default(); + let dmq_message_payload = Self::enrich_msg_payload_with_id(DmqMsgPayload { + msg_id: vec![], + msg_body: message_bytes.to_vec(), + kes_period: kes_period as u64, + expires_at, + }); + let (kes_signature, operational_certificate) = self .kes_signer - .sign(message_bytes, kes_period) + .sign(&dmq_message_payload.bytes_to_sign()?, kes_period) .with_context(|| "Failed to KES sign message while building DMQ message")?; - let mut dmq_message = DmqMsg { - msg_id: vec![], - msg_body: message_bytes.to_vec(), - block_number, - ttl: self.ttl_blocks, + let operational_certificate_without_cold_verification_key = + operational_certificate.get_opcert_without_cold_verification_key(); + let cold_verification_key = operational_certificate.get_cold_verification_key(); + + let dmq_message = DmqMsg { + msg_payload: dmq_message_payload, kes_signature: kes_signature.to_bytes_vec()?, - operational_certificate: operational_certificate.to_bytes_vec()?, - kes_period, + operational_certificate: operational_certificate_without_cold_verification_key + .to_cbor_bytes()?, + cold_verification_key: cold_verification_key.to_bytes().to_vec(), }; - dmq_message.msg_id = compute_msg_id(&dmq_message); Ok(dmq_message.into()) } @@ -100,6 +118,8 @@ mod tests { test::{crypto_helper::KesSignerFake, double::Dummy}, }; + use crate::model::MockUnixTimestampProvider; + use super::*; mod test_utils { @@ -131,28 +151,48 @@ mod tests { }, ..TimePoint::dummy() }))); - let builder = DmqMessageBuilder::new(kes_signer, chain_observer).set_ttl(100); + let builder = DmqMessageBuilder::new(kes_signer.clone(), chain_observer) + .set_ttl(1000) + .set_timestamp_provider(Arc::new({ + let mut mock_timestamp_provider = MockUnixTimestampProvider::new(); + mock_timestamp_provider + .expect_current_timestamp() + .returning(|| Ok(234)); + + mock_timestamp_provider + })); let message = test_utils::TestMessage { content: b"test".to_vec(), }; let dmq_message = builder.build(&message.to_bytes_vec().unwrap()).await.unwrap(); - assert!(!dmq_message.msg_id.is_empty()); + let expected_msg_payload = DmqMessageBuilder::enrich_msg_payload_with_id(DmqMsgPayload { + msg_id: vec![], + msg_body: b"test".to_vec(), + kes_period: 0, + expires_at: 1234, + }); assert_eq!( DmqMsg { - msg_id: vec![], - msg_body: b"test".to_vec(), - block_number: 123, - ttl: 100, + msg_payload: expected_msg_payload.clone(), kes_signature: kes_signature.to_bytes_vec().unwrap(), - operational_certificate: operational_certificate.to_bytes_vec().unwrap(), - kes_period: 0, + operational_certificate: operational_certificate + .get_opcert_without_cold_verification_key() + .to_cbor_bytes() + .unwrap(), + cold_verification_key: operational_certificate + .get_cold_verification_key() + .to_bytes() + .to_vec(), }, - DmqMsg { - msg_id: vec![], - ..dmq_message.into() - } + dmq_message.clone().into() + ); + + let signed_messages = kes_signer.get_signed_messages(); + assert_eq!( + vec![expected_msg_payload.bytes_to_sign().unwrap()], + signed_messages ); } } diff --git a/internal/mithril-dmq/src/model/message.rs b/internal/mithril-dmq/src/model/message.rs index 848704bce99..516eb28bb8b 100644 --- a/internal/mithril-dmq/src/model/message.rs +++ b/internal/mithril-dmq/src/model/message.rs @@ -72,18 +72,22 @@ impl<'de> Deserialize<'de> for DmqMessage { #[cfg(test)] mod tests { + use pallas_network::miniprotocols::localmsgsubmission::DmqMsgPayload; + use super::*; #[test] fn test_dmq_message_serialize_deserialize() { let dmq_msg = DmqMsg { - msg_id: vec![1, 2, 3], - msg_body: vec![4, 5, 6], - block_number: 123, - ttl: 10, - kes_signature: vec![7, 8, 9], - operational_certificate: vec![10, 11, 12], - kes_period: 0, + msg_payload: DmqMsgPayload { + msg_id: vec![0, 1], + msg_body: vec![0, 1, 2], + kes_period: 10, + expires_at: 100, + }, + kes_signature: vec![0, 1, 2, 3], + operational_certificate: vec![0, 1, 2, 3, 4], + cold_verification_key: vec![0, 1, 2, 3, 4, 5], }; let dmq_message = DmqMessage::from(dmq_msg.clone()); diff --git a/internal/mithril-dmq/src/model/mod.rs b/internal/mithril-dmq/src/model/mod.rs index 959c5f7ee05..d5e78173523 100644 --- a/internal/mithril-dmq/src/model/mod.rs +++ b/internal/mithril-dmq/src/model/mod.rs @@ -1,5 +1,7 @@ mod builder; mod message; +mod timestamp; pub use builder::*; pub use message::*; +pub use timestamp::*; diff --git a/internal/mithril-dmq/src/model/timestamp.rs b/internal/mithril-dmq/src/model/timestamp.rs new file mode 100644 index 00000000000..c1c7f8bfe34 --- /dev/null +++ b/internal/mithril-dmq/src/model/timestamp.rs @@ -0,0 +1,19 @@ +use std::time::{SystemTime, UNIX_EPOCH}; + +use mithril_common::StdResult; + +/// Provides the current timestamp in seconds since the UNIX epoch. +#[cfg_attr(test, mockall::automock)] +pub trait UnixTimestampProvider: Send + Sync { + /// Returns the current timestamp in seconds since the UNIX epoch. + fn current_timestamp(&self) -> StdResult; +} + +/// Provides the current timestamp in seconds since the UNIX epoch from the system. +pub struct SystemUnixTimestampProvider; + +impl UnixTimestampProvider for SystemUnixTimestampProvider { + fn current_timestamp(&self) -> StdResult { + Ok(SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs()) + } +} diff --git a/internal/mithril-dmq/src/publisher/server/pallas.rs b/internal/mithril-dmq/src/publisher/server/pallas.rs index deb0ea250f1..383cff1695f 100644 --- a/internal/mithril-dmq/src/publisher/server/pallas.rs +++ b/internal/mithril-dmq/src/publisher/server/pallas.rs @@ -242,7 +242,10 @@ mod tests { use pallas_network::{ facades::DmqClient, - miniprotocols::{localmsgsubmission::DmqMsg, localtxsubmission}, + miniprotocols::{ + localmsgsubmission::{DmqMsg, DmqMsgPayload}, + localtxsubmission, + }, }; use tokio::sync::{mpsc::unbounded_channel, watch}; @@ -258,13 +261,15 @@ mod tests { async fn fake_msg() -> DmqMsg { DmqMsg { - msg_id: vec![0, 1], - msg_body: vec![2, 3, 4, 5], - block_number: 10, - ttl: 100, + msg_payload: DmqMsgPayload { + msg_id: vec![0, 1], + msg_body: vec![0, 1, 2], + kes_period: 10, + expires_at: 100, + }, kes_signature: vec![0, 1, 2, 3], - operational_certificate: vec![0, 1, 2, 3, 4, 5], - kes_period: 10, + operational_certificate: vec![0, 1, 2, 3, 4], + cold_verification_key: vec![0, 1, 2, 3, 4, 5], } } diff --git a/internal/mithril-dmq/src/test/fake_message.rs b/internal/mithril-dmq/src/test/fake_message.rs new file mode 100644 index 00000000000..91cc5e28360 --- /dev/null +++ b/internal/mithril-dmq/src/test/fake_message.rs @@ -0,0 +1,26 @@ +//! Fake message computation for testing purposes. + +use std::sync::Arc; + +use mithril_cardano_node_chain::test::double::FakeChainObserver; +use mithril_common::{crypto_helper::TryToBytes, test::crypto_helper::KesSignerFake}; + +use crate::{DmqMessage, DmqMessageBuilder, test::payload::DmqMessageTestPayload}; + +/// Computes a fake DMQ message for testing purposes. +pub async fn compute_fake_msg(bytes: &[u8], test_directory: &str) -> DmqMessage { + let dmq_builder = DmqMessageBuilder::new( + { + let (kes_signature, operational_certificate) = + KesSignerFake::dummy_signature(test_directory); + let kes_signer = + KesSignerFake::new(vec![Ok((kes_signature, operational_certificate.clone()))]); + + Arc::new(kes_signer) + }, + Arc::new(FakeChainObserver::default()), + ) + .set_ttl(100); + let message = DmqMessageTestPayload::new(bytes); + dmq_builder.build(&message.to_bytes_vec().unwrap()).await.unwrap() +} diff --git a/internal/mithril-dmq/src/test/mod.rs b/internal/mithril-dmq/src/test/mod.rs index 97121e33c28..4670dd1b5e5 100644 --- a/internal/mithril-dmq/src/test/mod.rs +++ b/internal/mithril-dmq/src/test/mod.rs @@ -5,7 +5,7 @@ //! This module provides in particular test doubles for the traits defined in this crate. pub mod double; - +pub mod fake_message; pub mod payload; #[cfg(test)] diff --git a/internal/mithril-dmq/tests/consumer_client_server.rs b/internal/mithril-dmq/tests/consumer_client_server.rs index d3b35317f55..1cd453cf210 100644 --- a/internal/mithril-dmq/tests/consumer_client_server.rs +++ b/internal/mithril-dmq/tests/consumer_client_server.rs @@ -3,35 +3,12 @@ use std::sync::Arc; use tokio::sync::{mpsc::unbounded_channel, watch}; -use mithril_cardano_node_chain::test::double::FakeChainObserver; -use mithril_common::{ - CardanoNetwork, - crypto_helper::TryToBytes, - current_function, - test::{TempDir, crypto_helper::KesSignerFake}, -}; +use mithril_common::{CardanoNetwork, current_function, test::TempDir}; use mithril_dmq::{ DmqConsumerClient, DmqConsumerClientPallas, DmqConsumerServer, DmqConsumerServerPallas, - DmqMessage, DmqMessageBuilder, test::payload::DmqMessageTestPayload, + DmqMessage, test::fake_message::compute_fake_msg, test::payload::DmqMessageTestPayload, }; -async fn create_fake_msg(bytes: &[u8], test_directory: &str) -> DmqMessage { - let dmq_builder = DmqMessageBuilder::new( - { - let (kes_signature, operational_certificate) = - KesSignerFake::dummy_signature(test_directory); - let kes_signer = - KesSignerFake::new(vec![Ok((kes_signature, operational_certificate.clone()))]); - - Arc::new(kes_signer) - }, - Arc::new(FakeChainObserver::default()), - ) - .set_ttl(100); - let message = DmqMessageTestPayload::new(bytes); - dmq_builder.build(&message.to_bytes_vec().unwrap()).await.unwrap() -} - #[tokio::test(flavor = "multi_thread")] async fn dmq_consumer_client_server() { let current_function_name = current_function!(); @@ -69,14 +46,14 @@ async fn dmq_consumer_client_server() { ); let mut messages = vec![]; signature_dmq_tx - .send(create_fake_msg(b"msg_1", current_function_name).await) + .send(compute_fake_msg(b"msg_1", current_function_name).await) .unwrap(); signature_dmq_tx - .send(create_fake_msg(b"msg_2", current_function_name).await) + .send(compute_fake_msg(b"msg_2", current_function_name).await) .unwrap(); messages.extend_from_slice(&consumer_client.consume_messages().await.unwrap()); signature_dmq_tx - .send(create_fake_msg(b"msg_3", current_function_name).await) + .send(compute_fake_msg(b"msg_3", current_function_name).await) .unwrap(); messages.extend_from_slice(&consumer_client.consume_messages().await.unwrap()); @@ -109,7 +86,7 @@ async fn dmq_consumer_client_server() { ); let mut messages = vec![]; signature_dmq_tx - .send(create_fake_msg(b"msg_4", current_function_name).await) + .send(compute_fake_msg(b"msg_4", current_function_name).await) .unwrap(); messages.extend_from_slice(&consumer_client.consume_messages().await.unwrap()); stop_tx.send(()).unwrap(); diff --git a/internal/mithril-dmq/tests/publisher_client_server.rs b/internal/mithril-dmq/tests/publisher_client_server.rs index 8aebd9c6f6d..b90728780be 100644 --- a/internal/mithril-dmq/tests/publisher_client_server.rs +++ b/internal/mithril-dmq/tests/publisher_client_server.rs @@ -5,33 +5,15 @@ use tokio::sync::{mpsc::unbounded_channel, watch}; use mithril_cardano_node_chain::test::double::FakeChainObserver; use mithril_common::{ - CardanoNetwork, - crypto_helper::TryToBytes, - current_function, + CardanoNetwork, current_function, test::{TempDir, crypto_helper::KesSignerFake}, }; use mithril_dmq::{ DmqMessage, DmqMessageBuilder, DmqPublisherClient, DmqPublisherClientPallas, - DmqPublisherServer, DmqPublisherServerPallas, test::payload::DmqMessageTestPayload, + DmqPublisherServer, DmqPublisherServerPallas, + test::{fake_message::compute_fake_msg, payload::DmqMessageTestPayload}, }; -async fn create_fake_msg(bytes: &[u8], test_directory: &str) -> DmqMessage { - let dmq_builder = DmqMessageBuilder::new( - { - let (kes_signature, operational_certificate) = - KesSignerFake::dummy_signature(test_directory); - let kes_signer = - KesSignerFake::new(vec![Ok((kes_signature, operational_certificate.clone()))]); - - Arc::new(kes_signer) - }, - Arc::new(FakeChainObserver::default()), - ) - .set_ttl(100); - let message = DmqMessageTestPayload::new(bytes); - dmq_builder.build(&message.to_bytes_vec().unwrap()).await.unwrap() -} - #[tokio::test] async fn dmq_publisher_client_server() { let current_function_name = current_function!(); @@ -157,9 +139,9 @@ async fn dmq_publisher_client_server() { let (_, _, messages) = tokio::try_join!(server, client, recorder).unwrap(); assert_eq!( vec![ - create_fake_msg(b"msg_1", current_function_name).await, - create_fake_msg(b"msg_2", current_function_name).await, - create_fake_msg(b"msg_3", current_function_name).await, + compute_fake_msg(b"msg_1", current_function_name).await, + compute_fake_msg(b"msg_2", current_function_name).await, + compute_fake_msg(b"msg_3", current_function_name).await, ], messages ); diff --git a/mithril-aggregator/Cargo.toml b/mithril-aggregator/Cargo.toml index d9c3b92803b..13eb7606775 100644 --- a/mithril-aggregator/Cargo.toml +++ b/mithril-aggregator/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "mithril-aggregator" -version = "0.7.85" +version = "0.7.86" description = "A Mithril Aggregator server" authors = { workspace = true } edition = { workspace = true } diff --git a/mithril-aggregator/src/services/signer_registration/verifier.rs b/mithril-aggregator/src/services/signer_registration/verifier.rs index bc5712bf498..edef6172934 100644 --- a/mithril-aggregator/src/services/signer_registration/verifier.rs +++ b/mithril-aggregator/src/services/signer_registration/verifier.rs @@ -48,7 +48,7 @@ impl SignerRegistrationVerifier for MithrilSignerRegistrationVerifier { .get_current_kes_period() .await? .unwrap_or_default() - - operational_certificate.start_kes_period as KesPeriod, + - operational_certificate.get_start_kes_period() as KesPeriod, ), None => None, }; diff --git a/mithril-common/Cargo.toml b/mithril-common/Cargo.toml index 8f5f401570d..c45e7cdb72e 100644 --- a/mithril-common/Cargo.toml +++ b/mithril-common/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "mithril-common" -version = "0.6.18" +version = "0.6.19" description = "Common types, interfaces, and utilities for Mithril nodes." authors = { workspace = true } edition = { workspace = true } diff --git a/mithril-common/src/crypto_helper/cardano/kes/verifier_standard.rs b/mithril-common/src/crypto_helper/cardano/kes/verifier_standard.rs index da660bb22e9..5b3e28b78c3 100644 --- a/mithril-common/src/crypto_helper/cardano/kes/verifier_standard.rs +++ b/mithril-common/src/crypto_helper/cardano/kes/verifier_standard.rs @@ -30,7 +30,11 @@ impl KesVerifier for KesVerifierStandard { let kes_period_try_max = std::cmp::min(64, kes_period.saturating_add(1)); for kes_period_try in kes_period_try_min..kes_period_try_max { if signature - .verify(kes_period_try, &operational_certificate.kes_vk, message) + .verify( + kes_period_try, + &operational_certificate.get_kes_verification_key(), + message, + ) .is_ok() { return Ok(()); @@ -39,7 +43,7 @@ impl KesVerifier for KesVerifierStandard { Err(KesVerifyError::SignatureInvalid( kes_period, - operational_certificate.start_kes_period as u32, + operational_certificate.get_start_kes_period() as u32, ) .into()) } diff --git a/mithril-common/src/crypto_helper/cardano/key_certification.rs b/mithril-common/src/crypto_helper/cardano/key_certification.rs index 3bf8b12095a..0a6449a78bb 100644 --- a/mithril-common/src/crypto_helper/cardano/key_certification.rs +++ b/mithril-common/src/crypto_helper/cardano/key_certification.rs @@ -266,7 +266,7 @@ impl KeyRegWrapper { } else { return Err(ProtocolRegistrationErrorWrapper::KesSignatureInvalid( kes_period, - opcert.start_kes_period, + opcert.get_start_kes_period(), )); } } else { diff --git a/mithril-common/src/crypto_helper/cardano/opcert.rs b/mithril-common/src/crypto_helper/cardano/opcert.rs index dd62c5c95e8..4d21e9f06aa 100644 --- a/mithril-common/src/crypto_helper/cardano/opcert.rs +++ b/mithril-common/src/crypto_helper/cardano/opcert.rs @@ -26,7 +26,7 @@ pub enum OpCertError { /// Raw Fields of the operational certificates (without including the cold VK) #[derive(Clone, Debug, Deserialize, PartialEq, Eq, Serialize)] -struct RawFields( +struct RawOpCertWithoutColdVerificationKey( #[serde(with = "serde_bytes")] Vec, u64, u64, @@ -35,16 +35,72 @@ struct RawFields( /// Raw Operational Certificate #[derive(Clone, Debug, Deserialize, PartialEq, Eq, Serialize)] -struct RawOpCert(RawFields, EdVerificationKey); +struct RawOpCert(RawOpCertWithoutColdVerificationKey, EdVerificationKey); -/// Parsed Operational Certificate +/// Parsed Operational Certificate without cold verification key #[derive(Clone, Debug, PartialEq, Eq)] -pub struct OpCert { +pub struct OpCertWithoutColdVerificationKey { pub(crate) kes_vk: KesPublicKey, pub(crate) issue_number: u64, /// KES period at which KES key is initalized pub start_kes_period: u64, pub(crate) cert_sig: EdSignature, +} + +impl SerDeShelleyFileFormat for OpCertWithoutColdVerificationKey { + const TYPE: &'static str = "NodeOperationalCertificateWithoutColdVerificationKey"; + const DESCRIPTION: &'static str = ""; +} + +impl Serialize for OpCertWithoutColdVerificationKey { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + let raw_cert = RawOpCertWithoutColdVerificationKey( + self.kes_vk.as_bytes().to_vec(), + self.issue_number, + self.start_kes_period, + self.cert_sig.to_bytes().to_vec(), + ); + + raw_cert.serialize(serializer) + } +} + +impl<'de> Deserialize<'de> for OpCertWithoutColdVerificationKey { + fn deserialize(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + let raw_cert = RawOpCertWithoutColdVerificationKey::deserialize(deserializer)?; + + Ok(Self { + kes_vk: KesPublicKey::from_bytes(&raw_cert.0) + .map_err(|_| Error::custom("KES vk serialisation error"))?, + issue_number: raw_cert.1, + start_kes_period: raw_cert.2, + cert_sig: EdSignature::from_slice(&raw_cert.3) + .map_err(|_| Error::custom("ed25519 signature serialisation error"))?, + }) + } +} + +impl From<&OpCertWithoutColdVerificationKey> for RawOpCertWithoutColdVerificationKey { + fn from(opcert: &OpCertWithoutColdVerificationKey) -> Self { + RawOpCertWithoutColdVerificationKey( + opcert.kes_vk.as_bytes().to_vec(), + opcert.issue_number, + opcert.start_kes_period, + opcert.cert_sig.to_bytes().to_vec(), + ) + } +} + +/// Parsed Operational Certificate +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct OpCert { + pub(crate) opcert_without_vk: OpCertWithoutColdVerificationKey, pub(crate) cold_vk: EdVerificationKey, } @@ -69,14 +125,46 @@ impl OpCert { )); Self { - kes_vk, - issue_number, - start_kes_period, - cert_sig, + opcert_without_vk: OpCertWithoutColdVerificationKey { + kes_vk, + issue_number, + start_kes_period, + cert_sig, + }, cold_vk, } } + /// Get the KES verification key + pub fn get_kes_verification_key(&self) -> KesPublicKey { + self.opcert_without_vk.kes_vk + } + + /// Get the issue number + pub fn get_issue_number(&self) -> u64 { + self.opcert_without_vk.issue_number + } + + /// Get the start KES period + pub fn get_start_kes_period(&self) -> u64 { + self.opcert_without_vk.start_kes_period + } + + /// Get the certificate signature + pub fn get_certificate_signature(&self) -> EdSignature { + self.opcert_without_vk.cert_sig + } + + /// Get the OpCert without cold verification key + pub fn get_opcert_without_cold_verification_key(&self) -> OpCertWithoutColdVerificationKey { + self.opcert_without_vk.clone() + } + + /// Get the cold verification key + pub fn get_cold_verification_key(&self) -> EdVerificationKey { + self.cold_vk + } + /// Compute message to sign pub(crate) fn compute_message_to_sign( kes_vk: &KesPublicKey, @@ -96,11 +184,11 @@ impl OpCert { .cold_vk .verify( &Self::compute_message_to_sign( - &self.kes_vk, - self.issue_number, - self.start_kes_period, + &self.opcert_without_vk.kes_vk, + self.opcert_without_vk.issue_number, + self.opcert_without_vk.start_kes_period, ), - &self.cert_sig, + &self.opcert_without_vk.cert_sig, ) .is_ok() { @@ -129,10 +217,10 @@ impl OpCert { /// Compute the hash of an OpCert pub fn compute_hash(&self) -> String { let mut hasher = Sha256::new(); - hasher.update(self.kes_vk.as_bytes()); - hasher.update(self.issue_number.to_be_bytes()); - hasher.update(self.start_kes_period.to_be_bytes()); - hasher.update(self.cert_sig.to_bytes()); + hasher.update(self.opcert_without_vk.kes_vk.as_bytes()); + hasher.update(self.opcert_without_vk.issue_number.to_be_bytes()); + hasher.update(self.opcert_without_vk.start_kes_period.to_be_bytes()); + hasher.update(self.opcert_without_vk.cert_sig.to_bytes()); hasher.update(self.cold_vk.as_bytes()); hex::encode(hasher.finalize()) } @@ -143,15 +231,9 @@ impl Serialize for OpCert { where S: Serializer, { - let raw_cert = RawOpCert( - RawFields( - self.kes_vk.as_bytes().to_vec(), - self.issue_number, - self.start_kes_period, - self.cert_sig.to_bytes().to_vec(), - ), - self.cold_vk, - ); + let raw_opcert_without_vk: RawOpCertWithoutColdVerificationKey = + (&self.opcert_without_vk).into(); + let raw_cert = RawOpCert(raw_opcert_without_vk, self.cold_vk); raw_cert.serialize(serializer) } @@ -163,18 +245,33 @@ impl<'de> Deserialize<'de> for OpCert { D: Deserializer<'de>, { let raw_cert = RawOpCert::deserialize(deserializer)?; + let raw_opcert_without_vk = &raw_cert.0; + Ok(Self { - kes_vk: KesPublicKey::from_bytes(&raw_cert.0.0) - .map_err(|_| Error::custom("KES vk serialisation error"))?, - issue_number: raw_cert.0.1, - start_kes_period: raw_cert.0.2, - cert_sig: EdSignature::from_slice(&raw_cert.0.3) - .map_err(|_| Error::custom("ed25519 signature serialisation error"))?, + opcert_without_vk: OpCertWithoutColdVerificationKey { + kes_vk: KesPublicKey::from_bytes(&raw_opcert_without_vk.0) + .map_err(|_| Error::custom("KES vk serialisation error"))?, + issue_number: raw_opcert_without_vk.1, + start_kes_period: raw_opcert_without_vk.2, + cert_sig: EdSignature::from_slice(&raw_opcert_without_vk.3) + .map_err(|_| Error::custom("ed25519 signature serialisation error"))?, + }, cold_vk: raw_cert.1, }) } } +impl From<(OpCertWithoutColdVerificationKey, EdVerificationKey)> for OpCert { + fn from( + (opcert_without_vk, cold_vk): (OpCertWithoutColdVerificationKey, EdVerificationKey), + ) -> Self { + Self { + opcert_without_vk, + cold_vk, + } + } +} + #[cfg(test)] mod tests { use super::*; @@ -220,5 +317,14 @@ mod tests { "d9899c574fd7a710732391706b59e878bfd416214c49d2b3841c5c8b".to_string(), party_id_as_hash ); + + let operational_certificate_bytes_without_cold_vk = operational_certificate + .get_opcert_without_cold_verification_key() + .to_cbor_bytes() + .expect("compute CBOR bytes should not fail"); + assert_eq!( + "845820e650d7531509bb6cffd7998c28c68e4ec8fa621a0952206ea11eb03fcd7dcb2900005840d4abce27da05ff03c1342cc6ab53135072e1babf9cc05492f59f1ff009f70457aaa862c7158b13be0cfb41d7a91a562589bc110eb2cdaf5d2756048abbea5f05", + hex::encode(operational_certificate_bytes_without_cold_vk) + ); } } diff --git a/mithril-common/src/crypto_helper/codec/binary.rs b/mithril-common/src/crypto_helper/codec/binary.rs index ab5f356d302..98d92a437af 100644 --- a/mithril-common/src/crypto_helper/codec/binary.rs +++ b/mithril-common/src/crypto_helper/codec/binary.rs @@ -210,7 +210,7 @@ mod binary_kes_sig { } mod binary_opcert { - use crate::crypto_helper::{OpCert, SerDeShelleyFileFormat}; + use crate::crypto_helper::{OpCert, OpCertWithoutColdVerificationKey, SerDeShelleyFileFormat}; use anyhow::anyhow; use super::*; @@ -226,6 +226,18 @@ mod binary_opcert { Self::from_cbor_bytes(bytes).map_err(|e| anyhow!(format!("{e:?}"))) } } + + impl TryToBytes for OpCertWithoutColdVerificationKey { + fn to_bytes_vec(&self) -> StdResult> { + self.to_cbor_bytes().map_err(|e| anyhow!(format!("{e:?}"))) + } + } + + impl TryFromBytes for OpCertWithoutColdVerificationKey { + fn try_from_bytes(bytes: &[u8]) -> StdResult { + Self::from_cbor_bytes(bytes).map_err(|e| anyhow!(format!("{e:?}"))) + } + } } mod binary_mk_proof { diff --git a/mithril-common/src/crypto_helper/mod.rs b/mithril-common/src/crypto_helper/mod.rs index 23b31c81e25..e6a44e116dc 100644 --- a/mithril-common/src/crypto_helper/mod.rs +++ b/mithril-common/src/crypto_helper/mod.rs @@ -13,8 +13,8 @@ pub use cardano::ColdKeyGenerator; pub use cardano::{ KesPeriod, KesSigner, KesSignerStandard, KesVerifier, KesVerifierStandard, KesVerifyError, - OpCert, ProtocolInitializerErrorWrapper, ProtocolRegistrationErrorWrapper, - SerDeShelleyFileFormat, Sum6KesBytes, + OpCert, OpCertWithoutColdVerificationKey, ProtocolInitializerErrorWrapper, + ProtocolRegistrationErrorWrapper, SerDeShelleyFileFormat, Sum6KesBytes, }; pub use codec::*; pub use ed25519_alias::{era::*, genesis::*, manifest::*}; diff --git a/mithril-common/src/crypto_helper/types/protocol_key.rs b/mithril-common/src/crypto_helper/types/protocol_key.rs index aa4fed3961e..923010949da 100644 --- a/mithril-common/src/crypto_helper/types/protocol_key.rs +++ b/mithril-common/src/crypto_helper/types/protocol_key.rs @@ -129,6 +129,11 @@ where pub fn key_to_bytes_hex(key: &T) -> StdResult { key.to_bytes_hex() } + + /// Consume self and return the inner key + pub fn into_inner(self) -> T { + self.key + } } impl Deref for ProtocolKey diff --git a/mithril-common/src/test/crypto_helper/cardano/kes/signer_fake.rs b/mithril-common/src/test/crypto_helper/cardano/kes/signer_fake.rs index 7eb9073ee3d..87b5cac4fcc 100644 --- a/mithril-common/src/test/crypto_helper/cardano/kes/signer_fake.rs +++ b/mithril-common/src/test/crypto_helper/cardano/kes/signer_fake.rs @@ -16,6 +16,7 @@ type KesSignatureResult = StdResult<(Sum6KesSig, OpCert)>; /// Fake KES Signer implementation. pub struct KesSignerFake { results: Mutex>, + signed_messages: Mutex>>, } impl KesSignerFake { @@ -23,9 +24,16 @@ impl KesSignerFake { pub fn new(results: Vec) -> Self { Self { results: Mutex::new(results.into()), + signed_messages: Mutex::new(VecDeque::new()), } } + /// Returns the messages that were requested to be signed + pub fn get_signed_messages(&self) -> Vec> { + let messages = self.signed_messages.lock().unwrap(); + messages.iter().cloned().collect() + } + /// Returns a dummy signature result that is always successful. pub fn dummy_signature(test_directory: &str) -> (Sum6KesSig, OpCert) { let KesCryptographicMaterialForTest { @@ -55,7 +63,10 @@ impl KesSignerFake { } impl KesSigner for KesSignerFake { - fn sign(&self, _message: &[u8], _kes_period: KesPeriod) -> KesSignatureResult { + fn sign(&self, message: &[u8], _kes_period: KesPeriod) -> KesSignatureResult { + let mut messages = self.signed_messages.lock().unwrap(); + messages.push_back(message.to_vec()); + let mut results = self.results.lock().unwrap(); results.pop_front().unwrap() @@ -77,11 +88,11 @@ mod tests { 0 as KesPeriod, "fake_kes_signer_returns_signature_batches_in_expected_order", ); - let message = b"Test message for KES signing"; + let message1 = b"Test message 1 for KES signing"; let kes_signer = KesSignerStandard::new(kes_secret_key_file, operational_certificate_file); let kes_signing_period = 1; let (kes_signature, op_cert) = kes_signer - .sign(message, kes_signing_period) + .sign(message1, kes_signing_period) .expect("Signing should not fail"); let fake_kes_signer = KesSignerFake::new(vec![ Ok((kes_signature, op_cert.clone())), @@ -89,13 +100,17 @@ mod tests { ]); let (kes_signature_1, op_cert_1) = fake_kes_signer - .sign(message, kes_signing_period) + .sign(message1, kes_signing_period) .expect("Signing should not fail"); assert_eq!(kes_signature, kes_signature_1); assert_eq!(op_cert, op_cert_1); + let message2 = b"Test message 2 for KES signing"; fake_kes_signer - .sign(message, kes_signing_period) + .sign(message2, kes_signing_period) .expect_err("Signing should fail"); + + let signed_messages = fake_kes_signer.get_signed_messages(); + assert_eq!(vec![message1.to_vec(), message2.to_vec()], signed_messages); } } diff --git a/mithril-signer/Cargo.toml b/mithril-signer/Cargo.toml index e7c1f9b25d7..6beed3eb73c 100644 --- a/mithril-signer/Cargo.toml +++ b/mithril-signer/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "mithril-signer" -version = "0.2.269" +version = "0.2.270" description = "A Mithril Signer" authors = { workspace = true } edition = { workspace = true } diff --git a/mithril-signer/src/runtime/runner.rs b/mithril-signer/src/runtime/runner.rs index fefa145ba31..d259f181757 100644 --- a/mithril-signer/src/runtime/runner.rs +++ b/mithril-signer/src/runtime/runner.rs @@ -178,7 +178,7 @@ impl Runner for SignerRunner { .get_current_kes_period() .await? .unwrap_or_default() - - operational_certificate.start_kes_period as KesPeriod, + - operational_certificate.get_start_kes_period() as KesPeriod, ), None => None, };