Skip to content

Commit 9f685b4

Browse files
committed
test artifact pruning in the integration test
1 parent 9502b4d commit 9f685b4

File tree

4 files changed

+127
-24
lines changed

4 files changed

+127
-24
lines changed

nexus/db-queries/src/db/datastore/target_release.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -208,6 +208,7 @@ impl DataStore {
208208
}
209209

210210
/// Represents information fetched about recent target releases
211+
#[derive(Debug)]
211212
pub struct RecentTargetReleases {
212213
/// distinct target releases found
213214
pub releases: BTreeSet<TufRepoUuid>,

nexus/tests/integration_tests/updates.rs

Lines changed: 73 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -426,7 +426,7 @@ async fn test_repo_upload() -> Result<()> {
426426

427427
// Upload a new repository with a different system version but no other
428428
// changes. This should be accepted.
429-
{
429+
let initial_installinator_doc = {
430430
let tweaks = &[ManifestTweak::SystemVersion("2.0.0".parse().unwrap())];
431431
let response = trust_root
432432
.assemble_repo(&logctx.log, tweaks)
@@ -507,7 +507,9 @@ async fn test_repo_upload() -> Result<()> {
507507
description, get_description,
508508
"initial description matches fetched description"
509509
);
510-
}
510+
511+
installinator_doc_1
512+
};
511513
// The installinator document changed, so the generation number is bumped to
512514
// 3.
513515
assert_eq!(
@@ -523,6 +525,75 @@ async fn test_repo_upload() -> Result<()> {
523525
assert_eq!(status.last_run_counters.put_ok, 3);
524526
assert_eq!(status.last_run_counters.copy_ok, 1);
525527
assert_eq!(status.local_repos, 1);
528+
// Run the replication background task again; the local repos should be
529+
// dropped.
530+
let status =
531+
run_tuf_artifact_replication_step(&cptestctx.lockstep_client).await;
532+
eprintln!("{status:?}");
533+
assert_eq!(status.last_run_counters.put_config_ok, 4);
534+
assert_eq!(status.last_run_counters.list_ok, 4);
535+
assert_eq!(status.last_run_counters.sum(), 8);
536+
assert_eq!(status.local_repos, 0);
537+
538+
// Verify the initial installinator document is present on all sled-agents.
539+
let installinator_doc_hash = initial_installinator_doc.hash.to_string();
540+
for sled_agent in &cptestctx.sled_agents {
541+
for dir in sled_agent.sled_agent().artifact_store().storage_paths() {
542+
let path = dir.join(&installinator_doc_hash);
543+
assert!(path.exists(), "{path} does not exist");
544+
}
545+
}
546+
// Collect watchers for all of the sled-agent artifact delete reconcilers.
547+
let mut delete_watchers = cptestctx
548+
.sled_agents
549+
.iter()
550+
.map(|sled_agent| {
551+
sled_agent.sled_agent().artifact_store().create_delete_watcher()
552+
})
553+
.collect::<Vec<_>>();
554+
// Manually prune the first repo.
555+
let initial_repo = datastore
556+
.tuf_repo_get_by_version(
557+
&opctx,
558+
"1.0.0".parse::<Version>().unwrap().into(),
559+
)
560+
.await?;
561+
let recent_releases =
562+
datastore.target_release_fetch_recent_distinct(&opctx, 3).await?;
563+
datastore
564+
.tuf_repo_mark_pruned(
565+
&opctx,
566+
status.generation,
567+
&recent_releases,
568+
initial_repo.repo.id(),
569+
)
570+
.await
571+
.unwrap();
572+
// Marking a repository as pruned bumps the generation number.
573+
assert_eq!(
574+
datastore.tuf_get_generation(&opctx).await.unwrap(),
575+
4u32.into()
576+
);
577+
// Run the replication background task; we should see new configs be put.
578+
let status =
579+
run_tuf_artifact_replication_step(&cptestctx.lockstep_client).await;
580+
eprintln!("{status:?}");
581+
assert_eq!(status.last_run_counters.put_config_ok, 4);
582+
assert_eq!(status.last_run_counters.list_ok, 4);
583+
assert_eq!(status.last_run_counters.sum(), 8);
584+
assert_eq!(status.generation, 4u32.into());
585+
// Wait for the delete reconciler to finish on all sled agents.
586+
futures::future::join_all(
587+
delete_watchers.iter_mut().map(|watcher| watcher.changed()),
588+
)
589+
.await;
590+
// Verify the installinator document from the initial repo is deleted.
591+
for sled_agent in &cptestctx.sled_agents {
592+
for dir in sled_agent.sled_agent().artifact_store().storage_paths() {
593+
let path = dir.join(&installinator_doc_hash);
594+
assert!(!path.exists(), "{path} was not deleted");
595+
}
596+
}
526597

527598
cptestctx.teardown().await;
528599
Ok(())

sled-agent/src/artifact_store.rs

Lines changed: 24 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -81,10 +81,6 @@ pub struct ArtifactStore<T: DatasetsManager> {
8181
ledger_tx: mpsc::Sender<LedgerManagerRequest>,
8282
config: watch::Receiver<Option<ArtifactConfig>>,
8383
pub(crate) storage: T,
84-
85-
/// Used for synchronization in unit tests.
86-
#[cfg(test)]
87-
delete_done: watch::Receiver<Generation>,
8884
}
8985

9086
impl<T: DatasetsManager> ArtifactStore<T> {
@@ -136,14 +132,10 @@ impl<T: DatasetsManager> ArtifactStore<T> {
136132
config_tx,
137133
));
138134

139-
#[cfg(test)]
140-
let (done_signal, delete_done) = watch::channel(0u32.into());
141135
tokio::task::spawn(delete_reconciler(
142136
log.clone(),
143137
storage.clone(),
144138
config.clone(),
145-
#[cfg(test)]
146-
done_signal,
147139
));
148140

149141
ArtifactStore {
@@ -155,9 +147,6 @@ impl<T: DatasetsManager> ArtifactStore<T> {
155147
ledger_tx,
156148
config,
157149
storage,
158-
159-
#[cfg(test)]
160-
delete_done,
161150
}
162151
}
163152
}
@@ -506,7 +495,6 @@ async fn delete_reconciler<T: DatasetsManager>(
506495
log: Logger,
507496
storage: T,
508497
mut receiver: watch::Receiver<Option<ArtifactConfig>>,
509-
#[cfg(test)] done_signal: watch::Sender<Generation>,
510498
) {
511499
while let Ok(()) = receiver.changed().await {
512500
let generation = match receiver.borrow_and_update().as_ref() {
@@ -580,12 +568,7 @@ async fn delete_reconciler<T: DatasetsManager>(
580568
}
581569
}
582570
}
583-
#[cfg(test)]
584-
done_signal.send_if_modified(|old| {
585-
let modified = *old != generation;
586-
*old = generation;
587-
modified
588-
});
571+
storage.signal_delete_done(generation);
589572
}
590573
warn!(log, "Delete reconciler sender dropped");
591574
}
@@ -602,6 +585,8 @@ pub trait DatasetsManager: Clone + Send + Sync + 'static {
602585
async fn copy_permit(&self) -> Option<OwnedSemaphorePermit> {
603586
None
604587
}
588+
589+
fn signal_delete_done(&self, _generation: Generation) {}
605590
}
606591

607592
impl DatasetsManager for InternalDisksReceiver {
@@ -908,16 +893,20 @@ mod test {
908893
use camino_tempfile::Utf8TempDir;
909894
use futures::stream::{self, StreamExt};
910895
use hex_literal::hex;
896+
use omicron_common::api::external::Generation;
911897
use omicron_test_utils::dev::test_setup_log;
912898
use sled_agent_api::ArtifactConfig;
913899
use tokio::io::AsyncReadExt;
914900
use tokio::sync::oneshot;
901+
use tokio::sync::watch;
915902
use tufaceous_artifact::ArtifactHash;
916903

917904
use super::{ArtifactStore, DatasetsManager, Error};
918905

919906
#[derive(Clone)]
920907
struct TestBackend {
908+
delete_done_tx: watch::Sender<Generation>,
909+
delete_done_rx: watch::Receiver<Generation>,
921910
datasets: Vec<Utf8PathBuf>,
922911
_tempdir: Arc<Utf8TempDir>,
923912
}
@@ -934,7 +923,13 @@ mod test {
934923
datasets.push(dataset)
935924
}
936925

937-
TestBackend { datasets, _tempdir: tempdir }
926+
let (delete_done_tx, delete_done_rx) = watch::channel(0u32.into());
927+
TestBackend {
928+
delete_done_tx,
929+
delete_done_rx,
930+
datasets,
931+
_tempdir: tempdir,
932+
}
938933
}
939934
}
940935

@@ -944,6 +939,14 @@ mod test {
944939
) -> impl Iterator<Item = camino::Utf8PathBuf> + '_ {
945940
self.datasets.iter().cloned()
946941
}
942+
943+
fn signal_delete_done(&self, generation: Generation) {
944+
self.delete_done_tx.send_if_modified(|old| {
945+
let modified = *old != generation;
946+
*old = generation;
947+
modified
948+
});
949+
}
947950
}
948951

949952
const TEST_ARTIFACT: Bytes = Bytes::from_static(b"I'm an artifact!\n");
@@ -1121,7 +1124,7 @@ mod test {
11211124
}
11221125

11231126
// clear `delete_done` so we can synchronize with the delete reconciler
1124-
store.delete_done.mark_unchanged();
1127+
store.storage.delete_done_rx.mark_unchanged();
11251128
// put a new config that says we don't want the artifact anymore.
11261129
config.generation = config.generation.next();
11271130
config.artifacts.remove(&TEST_HASH);
@@ -1130,7 +1133,7 @@ mod test {
11301133
// has actually occurred yet
11311134
assert!(store.list().await.unwrap().list.is_empty());
11321135
// wait for deletion to actually complete
1133-
store.delete_done.changed().await.unwrap();
1136+
store.storage.delete_done_rx.changed().await.unwrap();
11341137
// get fails, because it has been deleted
11351138
assert!(matches!(
11361139
store.get(TEST_HASH).await,

sled-agent/src/sim/artifact_store.rs

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,15 @@
77
88
use std::sync::Arc;
99

10+
use camino::Utf8Path;
1011
use camino_tempfile::Utf8TempDir;
1112
use dropshot::{
1213
Body, ConfigDropshot, FreeformBody, HttpError, HttpResponseOk, HttpServer,
1314
Path, RequestContext, ServerBuilder,
1415
};
16+
use omicron_common::api::external::Generation;
1517
use repo_depot_api::*;
16-
use tokio::sync::{OwnedSemaphorePermit, Semaphore};
18+
use tokio::sync::{OwnedSemaphorePermit, Semaphore, watch};
1719

1820
use crate::artifact_store::{ArtifactStore, DatasetsManager};
1921

@@ -28,10 +30,16 @@ pub struct SimArtifactStorage {
2830
// Semaphore to keep track of how many copy requests are in flight, and to
2931
// be able to await on their completion. Used in integration tests.
3032
copy_semaphore: Arc<Semaphore>,
33+
34+
// Watch channel to be able to await on the delete reconciler completing in
35+
// integration tests.
36+
delete_done_tx: watch::Sender<Generation>,
37+
delete_done_rx: watch::Receiver<Generation>,
3138
}
3239

3340
impl SimArtifactStorage {
3441
pub(super) fn new() -> SimArtifactStorage {
42+
let (delete_done_tx, delete_done_rx) = watch::channel(0u32.into());
3543
SimArtifactStorage {
3644
dirs: Arc::new([
3745
camino_tempfile::tempdir().unwrap(),
@@ -40,6 +48,8 @@ impl SimArtifactStorage {
4048
copy_semaphore: Arc::new(
4149
const { Semaphore::const_new(MAX_PERMITS as usize) },
4250
),
51+
delete_done_tx,
52+
delete_done_rx,
4353
}
4454
}
4555
}
@@ -54,6 +64,14 @@ impl DatasetsManager for SimArtifactStorage {
5464
async fn copy_permit(&self) -> Option<OwnedSemaphorePermit> {
5565
Some(self.copy_semaphore.clone().acquire_owned().await.unwrap())
5666
}
67+
68+
fn signal_delete_done(&self, generation: Generation) {
69+
self.delete_done_tx.send_if_modified(|old| {
70+
let modified = *old != generation;
71+
*old = generation;
72+
modified
73+
});
74+
}
5775
}
5876

5977
impl ArtifactStore<SimArtifactStorage> {
@@ -73,6 +91,10 @@ impl ArtifactStore<SimArtifactStorage> {
7391
.unwrap()
7492
}
7593

94+
pub fn storage_paths(&self) -> impl Iterator<Item = &Utf8Path> {
95+
self.storage.dirs.iter().map(|p| p.path())
96+
}
97+
7698
pub async fn wait_for_copy_tasks(&self) {
7799
// Acquire a permit for MAX_PERMITS, which requires that all copy tasks
78100
// have dropped their permits. Then immediately drop it.
@@ -83,6 +105,12 @@ impl ArtifactStore<SimArtifactStorage> {
83105
.await
84106
.unwrap();
85107
}
108+
109+
pub fn create_delete_watcher(&self) -> watch::Receiver<Generation> {
110+
let mut watcher = self.storage.delete_done_rx.clone();
111+
watcher.mark_unchanged();
112+
watcher
113+
}
86114
}
87115

88116
/// Implementation of the Repo Depot API backed by an

0 commit comments

Comments
 (0)