Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
2ece329
refactor(dmq): update fake 'DMQMsg' used in tests
jpraynaud Sep 2, 2025
aeb811e
refactor(dmq): adapt consumer client with new 'DmqMsg' structure
jpraynaud Sep 2, 2025
fedd9f4
refactor(dmq): adapt consumer server with new 'DmqMsg' structure
jpraynaud Sep 2, 2025
841671c
refactor(dmq): adapt publisher server with new 'DmqMsg' structure
jpraynaud Sep 2, 2025
812c798
feat(dmq): add a Unix timestamp provider trait
jpraynaud Sep 2, 2025
22b5720
refactor(dmq): update the message builder and its KES signature
jpraynaud Sep 2, 2025
fdcb5d9
feat(common): add signed messages recording in 'FakeKesSigner'
jpraynaud Sep 3, 2025
5047597
refactor(dmq): update message builder signature
jpraynaud Sep 3, 2025
9e0baf6
refactor(dmq): adapt consumer client with new 'DmqMsg' structure
jpraynaud Sep 3, 2025
f8cd590
refactor(dmq): adapt consumer server with new 'DmqMsg' structure
jpraynaud Sep 3, 2025
4d85e0d
refactor(dmq): adapt publisher server with new 'DmqMsg' structure
jpraynaud Sep 3, 2025
76befa1
refactor(dmq): adapt 'DmqMessage' with new 'DmqMsg' structure
jpraynaud Sep 3, 2025
ec8d488
refactor(dmq): update the message builder and KES signature
jpraynaud Sep 3, 2025
e89e93f
feat(common): add 'into_inner' function to 'ProtocolKey'
jpraynaud Sep 4, 2025
1e9be5f
refactor(common): add support for operational certificate without col…
jpraynaud Sep 4, 2025
1cf06c5
refactor(aggregator): add support for operational certificate without…
jpraynaud Sep 4, 2025
fc6cd06
refactor(signer): add support for operational certificate without col…
jpraynaud Sep 4, 2025
68436e8
feat(dmq): add support for separate operational certificate and cold …
jpraynaud Sep 4, 2025
f62ef03
feat(dmq): make consumer server queue drain expired messages
jpraynaud Sep 5, 2025
aac2bb6
tests(dmq): provide tooling for creating signed fake DMQ messages
jpraynaud Sep 5, 2025
8bdc5ad
fix(dmq): blocking tests missing KES signed DMQ messages
jpraynaud Sep 5, 2025
99e81e1
chore(dmq): apply review comments
jpraynaud Sep 8, 2025
3570008
chore: update pallas revision
jpraynaud Sep 22, 2025
fee0ce1
chore: upgrade crate versions
jpraynaud Sep 22, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion internal/mithril-dmq/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
name = "mithril-dmq"
description = "Mechanisms to publish and consume messages of a 'Decentralized Message Queue network' through a DMQ node"
version = "0.1.10"
version = "0.1.11"
authors.workspace = true
documentation.workspace = true
edition.workspace = true
Expand Down
79 changes: 49 additions & 30 deletions internal/mithril-dmq/src/consumer/client/pallas.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ use tokio::sync::{Mutex, MutexGuard};

use mithril_common::{
CardanoNetwork, StdResult,
crypto_helper::{OpCert, TryFromBytes},
crypto_helper::{
OpCert, OpCertWithoutColdVerificationKey, TryFromBytes, ed25519::Ed25519VerificationKey,
},
entities::PartyId,
logging::LoggerExtensions,
};
Expand Down Expand Up @@ -113,10 +115,19 @@ impl<M: TryFromBytes + Debug> DmqConsumerClientPallas<M> {
.0
.into_iter()
.map(|dmq_message| {
let opcert = OpCert::try_from_bytes(&dmq_message.operational_certificate)
let opcert_without_verification_key =
OpCertWithoutColdVerificationKey::try_from_bytes(
&dmq_message.operational_certificate,
)
.with_context(|| "Failed to parse operational certificate")?;
let cold_verification_key =
Ed25519VerificationKey::from_bytes(&dmq_message.cold_verification_key)
.with_context(|| "Failed to parse cold verification key")?
.into_inner();
let opcert: OpCert =
(opcert_without_verification_key, cold_verification_key).into();
let party_id = opcert.compute_protocol_party_id()?;
let payload = M::try_from_bytes(&dmq_message.msg_body)
let payload = M::try_from_bytes(&dmq_message.msg_payload.msg_body)
.with_context(|| "Failed to parse DMQ message body")?;

Ok((payload, party_id))
Expand Down Expand Up @@ -158,7 +169,10 @@ mod tests {
use mithril_common::{crypto_helper::TryToBytes, current_function, test::TempDir};
use pallas_network::{
facades::DmqServer,
miniprotocols::{localmsgnotification, localmsgsubmission::DmqMsg},
miniprotocols::{
localmsgnotification,
localmsgsubmission::{DmqMsg, DmqMsgPayload},
},
};
use tokio::{net::UnixListener, task::JoinHandle, time::sleep};

Expand All @@ -173,41 +187,46 @@ mod tests {
fn fake_msgs() -> Vec<DmqMsg> {
vec![
DmqMsg {
msg_id: vec![0, 1],
msg_body: DmqMessageTestPayload::new(b"msg_1").to_bytes_vec().unwrap(),
block_number: 10,
ttl: 100,
msg_payload: DmqMsgPayload {
msg_id: vec![0, 1],
msg_body: DmqMessageTestPayload::new(b"msg_1").to_bytes_vec().unwrap(),
kes_period: 10,
expires_at: 100,
},
kes_signature: vec![0, 1, 2, 3],
operational_certificate: vec![
130, 132, 88, 32, 230, 80, 215, 83, 21, 9, 187, 108, 255, 215, 153, 140, 40,
198, 142, 78, 200, 250, 98, 26, 9, 82, 32, 110, 161, 30, 176, 63, 205, 125,
203, 41, 0, 0, 88, 64, 212, 171, 206, 39, 218, 5, 255, 3, 193, 52, 44, 198,
171, 83, 19, 80, 114, 225, 186, 191, 156, 192, 84, 146, 245, 159, 31, 240, 9,
247, 4, 87, 170, 168, 98, 199, 21, 139, 19, 190, 12, 251, 65, 215, 169, 26, 86,
37, 137, 188, 17, 14, 178, 205, 175, 93, 39, 86, 4, 138, 187, 234, 95, 5, 88,
32, 32, 253, 186, 201, 177, 11, 117, 135, 187, 167, 181, 188, 22, 59, 206, 105,
132, 88, 32, 230, 80, 215, 83, 21, 9, 187, 108, 255, 215, 153, 140, 40, 198,
142, 78, 200, 250, 98, 26, 9, 82, 32, 110, 161, 30, 176, 63, 205, 125, 203, 41,
0, 0, 88, 64, 212, 171, 206, 39, 218, 5, 255, 3, 193, 52, 44, 198, 171, 83, 19,
80, 114, 225, 186, 191, 156, 192, 84, 146, 245, 159, 31, 240, 9, 247, 4, 87,
170, 168, 98, 199, 21, 139, 19, 190, 12, 251, 65, 215, 169, 26, 86, 37, 137,
188, 17, 14, 178, 205, 175, 93, 39, 86, 4, 138, 187, 234, 95, 5,
],
cold_verification_key: vec![
32, 253, 186, 201, 177, 11, 117, 135, 187, 167, 181, 188, 22, 59, 206, 105,
231, 150, 215, 30, 78, 212, 76, 16, 252, 180, 72, 134, 137, 247, 161, 68,
],
kes_period: 10,
},
DmqMsg {
msg_id: vec![1, 2],
msg_body: DmqMessageTestPayload::new(b"msg_2").to_bytes_vec().unwrap(),
block_number: 11,
ttl: 100,
msg_payload: DmqMsgPayload {
msg_id: vec![1, 2],
msg_body: DmqMessageTestPayload::new(b"msg_2").to_bytes_vec().unwrap(),
kes_period: 11,
expires_at: 101,
},
kes_signature: vec![1, 2, 3, 4],
operational_certificate: vec![
130, 132, 88, 32, 230, 80, 215, 83, 21, 9, 187, 108, 255, 215, 153, 140, 40,
198, 142, 78, 200, 250, 98, 26, 9, 82, 32, 110, 161, 30, 176, 63, 205, 125,
203, 41, 0, 0, 88, 64, 132, 4, 199, 39, 190, 173, 88, 102, 121, 117, 55, 62,
39, 189, 113, 96, 175, 24, 171, 240, 74, 42, 139, 202, 128, 185, 44, 130, 209,
77, 191, 122, 196, 224, 33, 158, 187, 156, 203, 190, 173, 150, 247, 87, 172,
58, 153, 185, 157, 87, 128, 14, 187, 107, 187, 215, 105, 195, 107, 135, 172,
43, 173, 9, 88, 32, 77, 75, 24, 6, 47, 133, 2, 89, 141, 224, 69, 202, 123, 105,
240, 103, 245, 159, 147, 177, 110, 58, 248, 115, 58, 152, 138, 220, 35, 65,
245, 200,
132, 88, 32, 230, 80, 215, 83, 21, 9, 187, 108, 255, 215, 153, 140, 40, 198,
142, 78, 200, 250, 98, 26, 9, 82, 32, 110, 161, 30, 176, 63, 205, 125, 203, 41,
0, 0, 88, 64, 212, 171, 206, 39, 218, 5, 255, 3, 193, 52, 44, 198, 171, 83, 19,
80, 114, 225, 186, 191, 156, 192, 84, 146, 245, 159, 31, 240, 9, 247, 4, 87,
170, 168, 98, 199, 21, 139, 19, 190, 12, 251, 65, 215, 169, 26, 86, 37, 137,
188, 17, 14, 178, 205, 175, 93, 39, 86, 4, 138, 187, 234, 95, 5,
],
cold_verification_key: vec![
77, 75, 24, 6, 47, 133, 2, 89, 141, 224, 69, 202, 123, 105, 240, 103, 245, 159,
147, 177, 110, 58, 248, 115, 58, 152, 138, 220, 35, 65, 245, 200,
],
kes_period: 11,
},
]
}
Expand Down
24 changes: 7 additions & 17 deletions internal/mithril-dmq/src/consumer/server/pallas.rs
Original file line number Diff line number Diff line change
Expand Up @@ -289,31 +289,20 @@ mod tests {

use mithril_common::{current_function, test::TempDir};

use crate::test_tools::TestLogger;
use crate::{test::fake_message::compute_fake_msg, test_tools::TestLogger};

use super::*;

fn create_temp_dir(folder_name: &str) -> PathBuf {
TempDir::create_with_short_path("dmq_consumer_server", folder_name)
}

fn fake_msg() -> DmqMsg {
DmqMsg {
msg_id: vec![0, 1],
msg_body: vec![0, 1, 2],
block_number: 10,
ttl: 100,
kes_signature: vec![0, 1, 2, 3],
operational_certificate: vec![0, 1, 2, 3, 4],
kes_period: 10,
}
}

#[tokio::test(flavor = "multi_thread")]
async fn pallas_dmq_consumer_server_non_blocking_success() {
let current_function_name = current_function!();
let (stop_tx, stop_rx) = watch::channel(());
let (signature_dmq_tx, signature_dmq_rx) = unbounded_channel::<DmqMessage>();
let socket_path = create_temp_dir(current_function!()).join("node.socket");
let socket_path = create_temp_dir(current_function_name).join("node.socket");
let cardano_network = CardanoNetwork::TestNet(0);
let dmq_consumer_server = Arc::new(DmqConsumerServerPallas::new(
socket_path.to_path_buf(),
Expand All @@ -322,7 +311,7 @@ mod tests {
TestLogger::stdout(),
));
dmq_consumer_server.register_receiver(signature_dmq_rx).await.unwrap();
let message = fake_msg();
let message: DmqMsg = compute_fake_msg(b"test", current_function_name).await.into();
let client = tokio::spawn({
async move {
// sleep to avoid refused connection from the server
Expand Down Expand Up @@ -370,9 +359,10 @@ mod tests {

#[tokio::test(flavor = "multi_thread")]
async fn pallas_dmq_consumer_server_blocking_success() {
let current_function_name = current_function!();
let (stop_tx, stop_rx) = watch::channel(());
let (signature_dmq_tx, signature_dmq_rx) = unbounded_channel::<DmqMessage>();
let socket_path = create_temp_dir(current_function!()).join("node.socket");
let socket_path = create_temp_dir(current_function_name).join("node.socket");
let cardano_network = CardanoNetwork::TestNet(0);
let dmq_consumer_server = Arc::new(DmqConsumerServerPallas::new(
socket_path.to_path_buf(),
Expand All @@ -381,7 +371,7 @@ mod tests {
TestLogger::stdout(),
));
dmq_consumer_server.register_receiver(signature_dmq_rx).await.unwrap();
let message = fake_msg();
let message: DmqMsg = compute_fake_msg(b"test", current_function_name).await.into();
let client = tokio::spawn({
async move {
// sleep to avoid refused connection from the server
Expand Down
Loading
Loading