Skip to content

Commit fdbed8b

Browse files
committed
skip pruned TUF repos when creating artifact config
1 parent a49aa1b commit fdbed8b

File tree

5 files changed

+159
-40
lines changed

5 files changed

+159
-40
lines changed

nexus/db-queries/src/db/datastore/update.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -414,6 +414,19 @@ impl DataStore {
414414
})
415415
}
416416

417+
/// List the artifacts present in a TUF repo.
418+
pub async fn tuf_list_repo_artifacts(
419+
&self,
420+
opctx: &OpContext,
421+
repo_id: TufRepoUuid,
422+
) -> ListResultVec<TufArtifact> {
423+
opctx.authorize(authz::Action::Read, &authz::FLEET).await?;
424+
let conn = self.pool_connection_authorized(opctx).await?;
425+
artifacts_for_repo(repo_id, &conn)
426+
.await
427+
.map_err(|e| public_error_from_diesel(e, ErrorHandler::Server))
428+
}
429+
417430
/// Returns the current TUF repo generation number.
418431
pub async fn tuf_get_generation(
419432
&self,

nexus/src/app/background/tasks/tuf_artifact_replication.rs

Lines changed: 20 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -62,15 +62,13 @@ use std::ops::ControlFlow;
6262
use std::str::FromStr;
6363
use std::sync::Arc;
6464

65-
use anyhow::{Context, Result};
65+
use anyhow::{Context, Result, ensure};
6666
use chrono::Utc;
6767
use futures::future::{BoxFuture, FutureExt};
6868
use futures::stream::{FuturesUnordered, Stream, StreamExt};
6969
use http::StatusCode;
7070
use nexus_auth::context::OpContext;
71-
use nexus_db_queries::db::{
72-
DataStore, datastore::SQL_BATCH_SIZE, pagination::Paginator,
73-
};
71+
use nexus_db_queries::db::DataStore;
7472
use nexus_networking::sled_client_from_address;
7573
use nexus_types::deployment::SledFilter;
7674
use nexus_types::identity::Asset;
@@ -79,7 +77,7 @@ use nexus_types::internal_api::background::{
7977
TufArtifactReplicationRequest, TufArtifactReplicationStatus,
8078
};
8179
use omicron_common::api::external::Generation;
82-
use omicron_uuid_kinds::{GenericUuid, SledUuid};
80+
use omicron_uuid_kinds::SledUuid;
8381
use rand::seq::{IndexedRandom, SliceRandom};
8482
use serde_json::json;
8583
use sled_agent_client::types::ArtifactConfig;
@@ -593,18 +591,23 @@ impl ArtifactReplication {
593591
opctx: &OpContext,
594592
) -> Result<(ArtifactConfig, Inventory)> {
595593
let generation = self.datastore.tuf_get_generation(opctx).await?;
594+
let repos =
595+
self.datastore.tuf_list_repos_unpruned_batched(opctx).await?;
596+
{
597+
let generation_now =
598+
self.datastore.tuf_get_generation(opctx).await?;
599+
ensure!(
600+
generation == generation_now,
601+
"generation changed from {generation} \
602+
to {generation_now}, bailing"
603+
);
604+
}
605+
596606
let mut inventory = Inventory::default();
597-
let mut paginator = Paginator::new(
598-
SQL_BATCH_SIZE,
599-
dropshot::PaginationOrder::Ascending,
600-
);
601-
while let Some(p) = paginator.next() {
602-
let batch = self
603-
.datastore
604-
.tuf_list_repos(opctx, generation, &p.current_pagparams())
605-
.await?;
606-
paginator = p.found_batch(&batch, &|a| a.id.into_untyped_uuid());
607-
for artifact in batch {
607+
for repo in repos {
608+
for artifact in
609+
self.datastore.tuf_list_repo_artifacts(opctx, repo.id()).await?
610+
{
608611
inventory.0.entry(artifact.sha256.0).or_insert_with(|| {
609612
ArtifactPresence { sleds: BTreeMap::new(), local: None }
610613
});
@@ -785,6 +788,7 @@ mod tests {
785788
use std::fmt::Write;
786789

787790
use expectorate::assert_contents;
791+
use omicron_uuid_kinds::GenericUuid;
788792
use rand::{Rng, SeedableRng, rngs::StdRng};
789793

790794
use super::*;

nexus/tests/integration_tests/updates.rs

Lines changed: 73 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -428,7 +428,7 @@ async fn test_repo_upload() -> Result<()> {
428428

429429
// Upload a new repository with a different system version but no other
430430
// changes. This should be accepted.
431-
{
431+
let initial_installinator_doc = {
432432
let tweaks = &[ManifestTweak::SystemVersion("2.0.0".parse().unwrap())];
433433
let response = trust_root
434434
.assemble_repo(&logctx.log, tweaks)
@@ -509,7 +509,9 @@ async fn test_repo_upload() -> Result<()> {
509509
description, get_description,
510510
"initial description matches fetched description"
511511
);
512-
}
512+
513+
installinator_doc_1
514+
};
513515
// The installinator document changed, so the generation number is bumped to
514516
// 3.
515517
assert_eq!(
@@ -525,6 +527,75 @@ async fn test_repo_upload() -> Result<()> {
525527
assert_eq!(status.last_run_counters.put_ok, 3);
526528
assert_eq!(status.last_run_counters.copy_ok, 1);
527529
assert_eq!(status.local_repos, 1);
530+
// Run the replication background task again; the local repos should be
531+
// dropped.
532+
let status =
533+
run_tuf_artifact_replication_step(&cptestctx.lockstep_client).await;
534+
eprintln!("{status:?}");
535+
assert_eq!(status.last_run_counters.put_config_ok, 4);
536+
assert_eq!(status.last_run_counters.list_ok, 4);
537+
assert_eq!(status.last_run_counters.sum(), 8);
538+
assert_eq!(status.local_repos, 0);
539+
540+
// Verify the initial installinator document is present on all sled-agents.
541+
let installinator_doc_hash = initial_installinator_doc.hash.to_string();
542+
for sled_agent in &cptestctx.sled_agents {
543+
for dir in sled_agent.sled_agent().artifact_store().storage_paths() {
544+
let path = dir.join(&installinator_doc_hash);
545+
assert!(path.exists(), "{path} does not exist");
546+
}
547+
}
548+
// Collect watchers for all of the sled-agent artifact delete reconcilers.
549+
let mut delete_watchers = cptestctx
550+
.sled_agents
551+
.iter()
552+
.map(|sled_agent| {
553+
sled_agent.sled_agent().artifact_store().create_delete_watcher()
554+
})
555+
.collect::<Vec<_>>();
556+
// Manually prune the first repo.
557+
let initial_repo = datastore
558+
.tuf_repo_get_by_version(
559+
&opctx,
560+
"1.0.0".parse::<Version>().unwrap().into(),
561+
)
562+
.await?;
563+
let recent_releases =
564+
datastore.target_release_fetch_recent_distinct(&opctx, 3).await?;
565+
datastore
566+
.tuf_repo_mark_pruned(
567+
&opctx,
568+
status.generation,
569+
&recent_releases,
570+
initial_repo.repo.id(),
571+
)
572+
.await
573+
.unwrap();
574+
// Marking a repository as pruned bumps the generation number.
575+
assert_eq!(
576+
datastore.tuf_get_generation(&opctx).await.unwrap(),
577+
4u32.into()
578+
);
579+
// Run the replication background task; we should see new configs be put.
580+
let status =
581+
run_tuf_artifact_replication_step(&cptestctx.lockstep_client).await;
582+
eprintln!("{status:?}");
583+
assert_eq!(status.last_run_counters.put_config_ok, 4);
584+
assert_eq!(status.last_run_counters.list_ok, 4);
585+
assert_eq!(status.last_run_counters.sum(), 8);
586+
assert_eq!(status.generation, 4u32.into());
587+
// Wait for the delete reconciler to finish on all sled agents.
588+
futures::future::join_all(
589+
delete_watchers.iter_mut().map(|watcher| watcher.changed()),
590+
)
591+
.await;
592+
// Verify the installinator document from the initial repo is deleted.
593+
for sled_agent in &cptestctx.sled_agents {
594+
for dir in sled_agent.sled_agent().artifact_store().storage_paths() {
595+
let path = dir.join(&installinator_doc_hash);
596+
assert!(!path.exists(), "{path} was not deleted");
597+
}
598+
}
528599

529600
cptestctx.teardown().await;
530601
Ok(())

sled-agent/src/artifact_store.rs

Lines changed: 24 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -81,10 +81,6 @@ pub struct ArtifactStore<T: DatasetsManager> {
8181
ledger_tx: mpsc::Sender<LedgerManagerRequest>,
8282
config: watch::Receiver<Option<ArtifactConfig>>,
8383
pub(crate) storage: T,
84-
85-
/// Used for synchronization in unit tests.
86-
#[cfg(test)]
87-
delete_done: watch::Receiver<Generation>,
8884
}
8985

9086
impl<T: DatasetsManager> ArtifactStore<T> {
@@ -136,14 +132,10 @@ impl<T: DatasetsManager> ArtifactStore<T> {
136132
config_tx,
137133
));
138134

139-
#[cfg(test)]
140-
let (done_signal, delete_done) = watch::channel(0u32.into());
141135
tokio::task::spawn(delete_reconciler(
142136
log.clone(),
143137
storage.clone(),
144138
config.clone(),
145-
#[cfg(test)]
146-
done_signal,
147139
));
148140

149141
ArtifactStore {
@@ -155,9 +147,6 @@ impl<T: DatasetsManager> ArtifactStore<T> {
155147
ledger_tx,
156148
config,
157149
storage,
158-
159-
#[cfg(test)]
160-
delete_done,
161150
}
162151
}
163152
}
@@ -506,7 +495,6 @@ async fn delete_reconciler<T: DatasetsManager>(
506495
log: Logger,
507496
storage: T,
508497
mut receiver: watch::Receiver<Option<ArtifactConfig>>,
509-
#[cfg(test)] done_signal: watch::Sender<Generation>,
510498
) {
511499
while let Ok(()) = receiver.changed().await {
512500
let generation = match receiver.borrow_and_update().as_ref() {
@@ -580,12 +568,7 @@ async fn delete_reconciler<T: DatasetsManager>(
580568
}
581569
}
582570
}
583-
#[cfg(test)]
584-
done_signal.send_if_modified(|old| {
585-
let modified = *old != generation;
586-
*old = generation;
587-
modified
588-
});
571+
storage.signal_delete_done(generation);
589572
}
590573
warn!(log, "Delete reconciler sender dropped");
591574
}
@@ -602,6 +585,8 @@ pub trait DatasetsManager: Clone + Send + Sync + 'static {
602585
async fn copy_permit(&self) -> Option<OwnedSemaphorePermit> {
603586
None
604587
}
588+
589+
fn signal_delete_done(&self, _generation: Generation) {}
605590
}
606591

607592
impl DatasetsManager for InternalDisksReceiver {
@@ -908,16 +893,20 @@ mod test {
908893
use camino_tempfile::Utf8TempDir;
909894
use futures::stream::{self, StreamExt};
910895
use hex_literal::hex;
896+
use omicron_common::api::external::Generation;
911897
use omicron_test_utils::dev::test_setup_log;
912898
use sled_agent_api::ArtifactConfig;
913899
use tokio::io::AsyncReadExt;
914900
use tokio::sync::oneshot;
901+
use tokio::sync::watch;
915902
use tufaceous_artifact::ArtifactHash;
916903

917904
use super::{ArtifactStore, DatasetsManager, Error};
918905

919906
#[derive(Clone)]
920907
struct TestBackend {
908+
delete_done_tx: watch::Sender<Generation>,
909+
delete_done_rx: watch::Receiver<Generation>,
921910
datasets: Vec<Utf8PathBuf>,
922911
_tempdir: Arc<Utf8TempDir>,
923912
}
@@ -934,7 +923,13 @@ mod test {
934923
datasets.push(dataset)
935924
}
936925

937-
TestBackend { datasets, _tempdir: tempdir }
926+
let (delete_done_tx, delete_done_rx) = watch::channel(0u32.into());
927+
TestBackend {
928+
delete_done_tx,
929+
delete_done_rx,
930+
datasets,
931+
_tempdir: tempdir,
932+
}
938933
}
939934
}
940935

@@ -944,6 +939,14 @@ mod test {
944939
) -> impl Iterator<Item = camino::Utf8PathBuf> + '_ {
945940
self.datasets.iter().cloned()
946941
}
942+
943+
fn signal_delete_done(&self, generation: Generation) {
944+
self.delete_done_tx.send_if_modified(|old| {
945+
let modified = *old != generation;
946+
*old = generation;
947+
modified
948+
});
949+
}
947950
}
948951

949952
const TEST_ARTIFACT: Bytes = Bytes::from_static(b"I'm an artifact!\n");
@@ -1121,7 +1124,7 @@ mod test {
11211124
}
11221125

11231126
// clear `delete_done` so we can synchronize with the delete reconciler
1124-
store.delete_done.mark_unchanged();
1127+
store.storage.delete_done_rx.mark_unchanged();
11251128
// put a new config that says we don't want the artifact anymore.
11261129
config.generation = config.generation.next();
11271130
config.artifacts.remove(&TEST_HASH);
@@ -1130,7 +1133,7 @@ mod test {
11301133
// has actually occurred yet
11311134
assert!(store.list().await.unwrap().list.is_empty());
11321135
// wait for deletion to actually complete
1133-
store.delete_done.changed().await.unwrap();
1136+
store.storage.delete_done_rx.changed().await.unwrap();
11341137
// get fails, because it has been deleted
11351138
assert!(matches!(
11361139
store.get(TEST_HASH).await,

0 commit comments

Comments
 (0)