Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
cd3d8e2
feat(aggregator): make leader url withstand trailing slash
Alenar Jul 4, 2025
be30da5
refactor(aggregator): invert dependency between aggregator client and…
Alenar Jul 4, 2025
7163579
refactor(aggregator-test): extract leader follower test http server t…
Alenar Jul 4, 2025
a22eedc
refactor(aggregator-test): use axum test for leader aggregator http s…
Alenar Jul 4, 2025
f31bbb8
feat(aggregator): scaffold `CertificateChainSynchronizer`
Alenar Jun 24, 2025
d97c731
feat(aggregator): impl logic that check if sync is needed
Alenar Jun 24, 2025
053ba8c
test(aggregator): rework synchroniser test tools & prepare remote cha…
Alenar Jun 26, 2025
34b578b
feat(aggregator): implement logic to retrieve/validate remote chain
Alenar Jun 26, 2025
ee585f7
feat(aggregator): implement logic to store retrieved chain
Alenar Jul 1, 2025
8be4f82
feat(aggregator): add logs & error context to `CertificateChainSynchr…
Alenar Jul 1, 2025
4ec7378
refactor(aggregator): add a enum to make sync reason explicit in sync…
Alenar Jul 1, 2025
f48f558
refactor(aggregator): make storage of sync certificates works on a batch
Alenar Jul 1, 2025
7ccd88c
test(common): add `latest_certificate` to `CertificateChainFixture`
Alenar Jul 1, 2025
6a4c9c0
test(aggregator): add 'e2e' tests to synchroniser
Alenar Jul 1, 2025
b855ab3
feat(aggregator): add `RemoteCertificateRetriever` to `AggregatorHTTP…
Alenar Jul 1, 2025
d63b82b
feat(aggregator): ensure synced data is stored from genesis to latest
Alenar Jul 3, 2025
70dcce9
feat(aggregator): make synchroniser add an open message at process end
Alenar Jul 3, 2025
c45076c
feat(aggregator): make synchroniser only persist first cert of each e…
Alenar Jul 8, 2025
55d7d32
feat(aggregator): implement persistence of synchronised certificates
Alenar Jul 3, 2025
8931cb8
feat(aggregator): implement persistence of openmessage created post sync
Alenar Jul 4, 2025
930ee6e
feat(aggregator): add follower sync in state machine + wire synchroniser
Alenar Jul 4, 2025
c5fa040
test(aggregator): update tooling to check cert have associated signed…
Alenar Jul 7, 2025
3d4734d
test(aggregator): update `create_certificate_follower` integration test
Alenar Jul 4, 2025
9bebed2
test: update e2e to follower auto-sync
Alenar Jul 8, 2025
1796fdb
fix: address PR reviews comments
Alenar Jul 9, 2025
a54c3be
test(aggregato): in `create_certificate_follower` check open messages…
Alenar Jul 9, 2025
a9d24d8
chore: update changelog
Alenar Jul 9, 2025
11c8aec
chore: upgrade crate versions
Alenar Jul 9, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ As a minor extension, we have adopted a slightly different versioning convention
- **UNSTABLE** :
- Support for DMQ signature publisher in the signer and signature consumer in the aggregator.

- Implement automatic certificates chain synchronization between leader/follower aggregators.

- Crates versions:

| Crate | Version |
Expand Down
8 changes: 5 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion mithril-aggregator/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "mithril-aggregator"
version = "0.7.72"
version = "0.7.73"
description = "A Mithril Aggregator server"
authors = { workspace = true }
edition = { workspace = true }
Expand Down Expand Up @@ -76,6 +76,8 @@ zstd = { version = "0.13.3", features = ["zstdmt"] }
tikv-jemallocator = { version = "0.6.0", optional = true }

[dev-dependencies]
axum = { version = "0.8.4", features = ["json"] }
axum-test = "17.3.0"
criterion = { version = "0.6.0", features = ["html_reports", "async_tokio"] }
http = "1.3.1"
httpmock = "0.7.0"
Expand Down
65 changes: 65 additions & 0 deletions mithril-aggregator/src/database/query/certificate/conditions.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
//! Shared `WhereCondition` across certificates queries

use sqlite::Value;
use std::iter::repeat_n;

use mithril_persistence::sqlite::WhereCondition;

use crate::database::record::CertificateRecord;

pub(super) fn insert_many(certificates_records: Vec<CertificateRecord>) -> WhereCondition {
let columns = "(\
certificate_id, \
parent_certificate_id, \
message, \
signature, \
aggregate_verification_key, \
epoch, \
network, \
signed_entity_type_id, \
signed_entity_beacon, \
protocol_version, \
protocol_parameters, \
protocol_message, \
signers, \
initiated_at, \
sealed_at)";
let values_columns: Vec<&str> = repeat_n(
"(?*, ?*, ?*, ?*, ?*, ?*, ?*, ?*, ?*, ?*, ?*, ?*, ?*, ?*, ?*)",
certificates_records.len(),
)
.collect();

let values: Vec<Value> = certificates_records
.into_iter()
.flat_map(|certificate_record| {
vec![
Value::String(certificate_record.certificate_id),
match certificate_record.parent_certificate_id {
Some(parent_certificate_id) => Value::String(parent_certificate_id),
None => Value::Null,
},
Value::String(certificate_record.message),
Value::String(certificate_record.signature),
Value::String(certificate_record.aggregate_verification_key),
Value::Integer(certificate_record.epoch.try_into().unwrap()),
Value::String(certificate_record.network),
Value::Integer(certificate_record.signed_entity_type.index() as i64),
Value::String(certificate_record.signed_entity_type.get_json_beacon().unwrap()),
Value::String(certificate_record.protocol_version),
Value::String(
serde_json::to_string(&certificate_record.protocol_parameters).unwrap(),
),
Value::String(serde_json::to_string(&certificate_record.protocol_message).unwrap()),
Value::String(serde_json::to_string(&certificate_record.signers).unwrap()),
Value::String(certificate_record.initiated_at.to_rfc3339()),
Value::String(certificate_record.sealed_at.to_rfc3339()),
]
})
.collect();

WhereCondition::new(
format!("{columns} values {}", values_columns.join(", ")).as_str(),
values,
)
}
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
use std::iter::repeat_n;

use sqlite::Value;

use mithril_persistence::sqlite::{Query, SourceAlias, SqLiteEntity, WhereCondition};

use crate::database::record::CertificateRecord;

use super::conditions;

/// Query to insert [CertificateRecord] in the sqlite database
pub struct InsertCertificateRecordQuery {
condition: WhereCondition,
Expand All @@ -17,64 +15,9 @@ impl InsertCertificateRecordQuery {
}

pub fn many(certificates_records: Vec<CertificateRecord>) -> Self {
let columns = "(\
certificate_id, \
parent_certificate_id, \
message, \
signature, \
aggregate_verification_key, \
epoch, \
network, \
signed_entity_type_id, \
signed_entity_beacon, \
protocol_version, \
protocol_parameters, \
protocol_message, \
signers, \
initiated_at, \
sealed_at)";
let values_columns: Vec<&str> = repeat_n(
"(?*, ?*, ?*, ?*, ?*, ?*, ?*, ?*, ?*, ?*, ?*, ?*, ?*, ?*, ?*)",
certificates_records.len(),
)
.collect();

let values: Vec<Value> = certificates_records
.into_iter()
.flat_map(|certificate_record| {
vec![
Value::String(certificate_record.certificate_id),
match certificate_record.parent_certificate_id {
Some(parent_certificate_id) => Value::String(parent_certificate_id),
None => Value::Null,
},
Value::String(certificate_record.message),
Value::String(certificate_record.signature),
Value::String(certificate_record.aggregate_verification_key),
Value::Integer(certificate_record.epoch.try_into().unwrap()),
Value::String(certificate_record.network),
Value::Integer(certificate_record.signed_entity_type.index() as i64),
Value::String(certificate_record.signed_entity_type.get_json_beacon().unwrap()),
Value::String(certificate_record.protocol_version),
Value::String(
serde_json::to_string(&certificate_record.protocol_parameters).unwrap(),
),
Value::String(
serde_json::to_string(&certificate_record.protocol_message).unwrap(),
),
Value::String(serde_json::to_string(&certificate_record.signers).unwrap()),
Value::String(certificate_record.initiated_at.to_rfc3339()),
Value::String(certificate_record.sealed_at.to_rfc3339()),
]
})
.collect();

let condition = WhereCondition::new(
format!("{columns} values {}", values_columns.join(", ")).as_str(),
values,
);

Self { condition }
Self {
condition: conditions::insert_many(certificates_records),
}
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
use mithril_persistence::sqlite::{Query, SourceAlias, SqLiteEntity, WhereCondition};

use crate::database::record::CertificateRecord;

use super::conditions;

/// Query to insert or replace [CertificateRecord] in the sqlite database
pub struct InsertOrReplaceCertificateRecordQuery {
condition: WhereCondition,
}

impl InsertOrReplaceCertificateRecordQuery {
pub fn many(certificates_records: Vec<CertificateRecord>) -> Self {
Self {
condition: conditions::insert_many(certificates_records),
}
}
}

impl Query for InsertOrReplaceCertificateRecordQuery {
type Entity = CertificateRecord;

fn filters(&self) -> WhereCondition {
self.condition.clone()
}

fn get_definition(&self, condition: &str) -> String {
// it is important to alias the fields with the same name as the table
// since the table cannot be aliased in a RETURNING statement in SQLite.
let projection = Self::Entity::get_projection()
.expand(SourceAlias::new(&[("{:certificate:}", "certificate")]));

format!("insert or replace into certificate {condition} returning {projection}")
}
}

#[cfg(test)]
mod tests {
use std::collections::HashMap;

use mithril_common::crypto_helper::tests_setup::setup_certificate_chain;
use mithril_common::entities::Epoch;
use mithril_common::test_utils::fake_data;
use mithril_persistence::sqlite::ConnectionExtensions;

use crate::database::query::{GetCertificateRecordQuery, InsertCertificateRecordQuery};
use crate::database::test_helper::main_db_connection;

use super::*;

#[test]
fn test_insert_many_certificates_records_in_empty_db() {
let certificates = setup_certificate_chain(5, 2);
let certificates_records: Vec<CertificateRecord> = certificates.into();

let connection = main_db_connection().unwrap();

let certificates_records_saved: Vec<CertificateRecord> = connection
.fetch_collect(InsertOrReplaceCertificateRecordQuery::many(
certificates_records.clone(),
))
.expect("saving many records should not fail");

assert_eq!(certificates_records, certificates_records_saved);

// Check insertion order
let all_records: Vec<CertificateRecord> =
connection.fetch_collect(GetCertificateRecordQuery::all()).unwrap();
assert_eq!(
certificates_records.into_iter().rev().collect::<Vec<_>>(),
all_records
);
}

#[test]
fn test_replace_one_certificate_record() {
let certificate_record = CertificateRecord {
epoch: Epoch(12),
..fake_data::certificate("hash").into()
};

let connection = main_db_connection().unwrap();
let certificate_record_saved = connection
.fetch_first(InsertCertificateRecordQuery::one(
certificate_record.clone(),
))
.unwrap();
assert_eq!(Some(Epoch(12)), certificate_record_saved.map(|r| r.epoch));

let modified_certificate_record = CertificateRecord {
epoch: Epoch(23),
..certificate_record
};
let certificate_record_saved = connection
.fetch_first(InsertOrReplaceCertificateRecordQuery::many(vec![
modified_certificate_record.clone(),
]))
.unwrap();
assert_eq!(Some(Epoch(23)), certificate_record_saved.map(|r| r.epoch));

let all_records_cursor = connection.fetch(GetCertificateRecordQuery::all()).unwrap();
assert_eq!(1, all_records_cursor.count());
}

#[test]
fn test_insert_and_replace_many_certificate_record() {
let tested_records: HashMap<_, CertificateRecord> = HashMap::from([
(
"cert1-genesis",
fake_data::genesis_certificate("genesis").into(),
),
("cert2", fake_data::certificate("cert2").into()),
(
"cert2-modified",
CertificateRecord {
epoch: Epoch(14),
..fake_data::certificate("cert2").into()
},
),
("cert3", fake_data::certificate("cert3").into()),
("cert4", fake_data::certificate("cert4").into()),
(
"cert4-modified",
CertificateRecord {
epoch: Epoch(32),
..fake_data::certificate("cert4").into()
},
),
("cert5", fake_data::certificate("cert5").into()),
]);
let connection = main_db_connection().unwrap();

let cursor = connection
.fetch(InsertCertificateRecordQuery::many(vec![
tested_records["cert1-genesis"].clone(),
tested_records["cert2"].clone(),
tested_records["cert3"].clone(),
tested_records["cert4"].clone(),
tested_records["cert5"].clone(),
]))
.unwrap();
assert_eq!(5, cursor.count());

let cursor = connection
.fetch(InsertOrReplaceCertificateRecordQuery::many(vec![
tested_records["cert1-genesis"].clone(),
tested_records["cert2-modified"].clone(),
tested_records["cert3"].clone(),
tested_records["cert4-modified"].clone(),
]))
.unwrap();
assert_eq!(4, cursor.count());

let all_records: Vec<CertificateRecord> =
connection.fetch_collect(GetCertificateRecordQuery::all()).unwrap();
assert_eq!(5, all_records.len());
assert_eq!(
all_records,
vec![
tested_records["cert4-modified"].clone(),
tested_records["cert3"].clone(),
tested_records["cert2-modified"].clone(),
tested_records["cert1-genesis"].clone(),
// Since the cert5 was not in the Insert/replace query, it now has a lower rowid and shows first
tested_records["cert5"].clone(),
]
);
}
}
3 changes: 3 additions & 0 deletions mithril-aggregator/src/database/query/certificate/mod.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
mod conditions;
mod delete_certificate;
mod get_certificate;
mod get_master_certificate;
mod insert_certificate;
mod insert_or_replace_certificate;

pub use delete_certificate::*;
pub use get_certificate::*;
pub use get_master_certificate::*;
pub use insert_certificate::*;
pub use insert_or_replace_certificate::*;
Loading