diff --git a/nexus/db-queries/src/db/datastore/update.rs b/nexus/db-queries/src/db/datastore/update.rs index 1f45a2070e9..01afd6d9dbd 100644 --- a/nexus/db-queries/src/db/datastore/update.rs +++ b/nexus/db-queries/src/db/datastore/update.rs @@ -414,6 +414,19 @@ impl DataStore { }) } + /// List the artifacts present in a TUF repo. + pub async fn tuf_list_repo_artifacts( + &self, + opctx: &OpContext, + repo_id: TufRepoUuid, + ) -> ListResultVec { + opctx.authorize(authz::Action::Read, &authz::FLEET).await?; + let conn = self.pool_connection_authorized(opctx).await?; + artifacts_for_repo(repo_id, &conn) + .await + .map_err(|e| public_error_from_diesel(e, ErrorHandler::Server)) + } + /// Returns the current TUF repo generation number. pub async fn tuf_get_generation( &self, diff --git a/nexus/src/app/background/tasks/tuf_artifact_replication.rs b/nexus/src/app/background/tasks/tuf_artifact_replication.rs index 43f12703389..664095d9491 100644 --- a/nexus/src/app/background/tasks/tuf_artifact_replication.rs +++ b/nexus/src/app/background/tasks/tuf_artifact_replication.rs @@ -62,15 +62,13 @@ use std::ops::ControlFlow; use std::str::FromStr; use std::sync::Arc; -use anyhow::{Context, Result}; +use anyhow::{Context, Result, ensure}; use chrono::Utc; use futures::future::{BoxFuture, FutureExt}; use futures::stream::{FuturesUnordered, Stream, StreamExt}; use http::StatusCode; use nexus_auth::context::OpContext; -use nexus_db_queries::db::{ - DataStore, datastore::SQL_BATCH_SIZE, pagination::Paginator, -}; +use nexus_db_queries::db::DataStore; use nexus_networking::sled_client_from_address; use nexus_types::deployment::SledFilter; use nexus_types::identity::Asset; @@ -79,7 +77,7 @@ use nexus_types::internal_api::background::{ TufArtifactReplicationRequest, TufArtifactReplicationStatus, }; use omicron_common::api::external::Generation; -use omicron_uuid_kinds::{GenericUuid, SledUuid}; +use omicron_uuid_kinds::SledUuid; use rand::seq::{IndexedRandom, SliceRandom}; use serde_json::json; use sled_agent_client::types::ArtifactConfig; @@ -593,18 +591,26 @@ impl ArtifactReplication { opctx: &OpContext, ) -> Result<(ArtifactConfig, Inventory)> { let generation = self.datastore.tuf_get_generation(opctx).await?; + let repos = + self.datastore.tuf_list_repos_unpruned_batched(opctx).await?; + // `tuf_list_repos_unpruned_batched` performs pagination internally, + // so check that the generation hasn't changed during our pagination to + // ensure we got a consistent read. + { + let generation_now = + self.datastore.tuf_get_generation(opctx).await?; + ensure!( + generation == generation_now, + "generation changed from {generation} \ + to {generation_now}, bailing" + ); + } + let mut inventory = Inventory::default(); - let mut paginator = Paginator::new( - SQL_BATCH_SIZE, - dropshot::PaginationOrder::Ascending, - ); - while let Some(p) = paginator.next() { - let batch = self - .datastore - .tuf_list_repos(opctx, generation, &p.current_pagparams()) - .await?; - paginator = p.found_batch(&batch, &|a| a.id.into_untyped_uuid()); - for artifact in batch { + for repo in repos { + for artifact in + self.datastore.tuf_list_repo_artifacts(opctx, repo.id()).await? + { inventory.0.entry(artifact.sha256.0).or_insert_with(|| { ArtifactPresence { sleds: BTreeMap::new(), local: None } }); @@ -785,6 +791,7 @@ mod tests { use std::fmt::Write; use expectorate::assert_contents; + use omicron_uuid_kinds::GenericUuid; use rand::{Rng, SeedableRng, rngs::StdRng}; use super::*; diff --git a/nexus/tests/integration_tests/updates.rs b/nexus/tests/integration_tests/updates.rs index 45328c4b24b..6dd24a79891 100644 --- a/nexus/tests/integration_tests/updates.rs +++ b/nexus/tests/integration_tests/updates.rs @@ -428,7 +428,7 @@ async fn test_repo_upload() -> Result<()> { // Upload a new repository with a different system version but no other // changes. This should be accepted. - { + let initial_installinator_doc = { let tweaks = &[ManifestTweak::SystemVersion("2.0.0".parse().unwrap())]; let response = trust_root .assemble_repo(&logctx.log, tweaks) @@ -509,7 +509,9 @@ async fn test_repo_upload() -> Result<()> { description, get_description, "initial description matches fetched description" ); - } + + installinator_doc_1 + }; // The installinator document changed, so the generation number is bumped to // 3. assert_eq!( @@ -525,6 +527,75 @@ async fn test_repo_upload() -> Result<()> { assert_eq!(status.last_run_counters.put_ok, 3); assert_eq!(status.last_run_counters.copy_ok, 1); assert_eq!(status.local_repos, 1); + // Run the replication background task again; the local repos should be + // dropped. + let status = + run_tuf_artifact_replication_step(&cptestctx.lockstep_client).await; + eprintln!("{status:?}"); + assert_eq!(status.last_run_counters.put_config_ok, 4); + assert_eq!(status.last_run_counters.list_ok, 4); + assert_eq!(status.last_run_counters.sum(), 8); + assert_eq!(status.local_repos, 0); + + // Verify the initial installinator document is present on all sled-agents. + let installinator_doc_hash = initial_installinator_doc.hash.to_string(); + for sled_agent in &cptestctx.sled_agents { + for dir in sled_agent.sled_agent().artifact_store().storage_paths() { + let path = dir.join(&installinator_doc_hash); + assert!(path.exists(), "{path} does not exist"); + } + } + // Collect watchers for all of the sled-agent artifact delete reconcilers. + let mut delete_watchers = cptestctx + .sled_agents + .iter() + .map(|sled_agent| { + sled_agent.sled_agent().artifact_store().subscribe_delete_done() + }) + .collect::>(); + // Manually prune the first repo. + let initial_repo = datastore + .tuf_repo_get_by_version( + &opctx, + "1.0.0".parse::().unwrap().into(), + ) + .await?; + let recent_releases = + datastore.target_release_fetch_recent_distinct(&opctx, 3).await?; + datastore + .tuf_repo_mark_pruned( + &opctx, + status.generation, + &recent_releases, + initial_repo.repo.id(), + ) + .await + .unwrap(); + // Marking a repository as pruned bumps the generation number. + assert_eq!( + datastore.tuf_get_generation(&opctx).await.unwrap(), + 4u32.into() + ); + // Run the replication background task; we should see new configs be put. + let status = + run_tuf_artifact_replication_step(&cptestctx.lockstep_client).await; + eprintln!("{status:?}"); + assert_eq!(status.last_run_counters.put_config_ok, 4); + assert_eq!(status.last_run_counters.list_ok, 4); + assert_eq!(status.last_run_counters.sum(), 8); + assert_eq!(status.generation, 4u32.into()); + // Wait for the delete reconciler to finish on all sled agents. + futures::future::join_all( + delete_watchers.iter_mut().map(|watcher| watcher.changed()), + ) + .await; + // Verify the installinator document from the initial repo is deleted. + for sled_agent in &cptestctx.sled_agents { + for dir in sled_agent.sled_agent().artifact_store().storage_paths() { + let path = dir.join(&installinator_doc_hash); + assert!(!path.exists(), "{path} was not deleted"); + } + } cptestctx.teardown().await; Ok(()) diff --git a/sled-agent/src/artifact_store.rs b/sled-agent/src/artifact_store.rs index 1865ca85373..c46e6617559 100644 --- a/sled-agent/src/artifact_store.rs +++ b/sled-agent/src/artifact_store.rs @@ -81,10 +81,6 @@ pub struct ArtifactStore { ledger_tx: mpsc::Sender, config: watch::Receiver>, pub(crate) storage: T, - - /// Used for synchronization in unit tests. - #[cfg(test)] - delete_done: watch::Receiver, } impl ArtifactStore { @@ -136,14 +132,10 @@ impl ArtifactStore { config_tx, )); - #[cfg(test)] - let (done_signal, delete_done) = watch::channel(0u32.into()); tokio::task::spawn(delete_reconciler( log.clone(), storage.clone(), config.clone(), - #[cfg(test)] - done_signal, )); ArtifactStore { @@ -155,9 +147,6 @@ impl ArtifactStore { ledger_tx, config, storage, - - #[cfg(test)] - delete_done, } } } @@ -506,7 +495,6 @@ async fn delete_reconciler( log: Logger, storage: T, mut receiver: watch::Receiver>, - #[cfg(test)] done_signal: watch::Sender, ) { while let Ok(()) = receiver.changed().await { let generation = match receiver.borrow_and_update().as_ref() { @@ -580,12 +568,7 @@ async fn delete_reconciler( } } } - #[cfg(test)] - done_signal.send_if_modified(|old| { - let modified = *old != generation; - *old = generation; - modified - }); + storage.signal_delete_done(generation); } warn!(log, "Delete reconciler sender dropped"); } @@ -602,6 +585,8 @@ pub trait DatasetsManager: Clone + Send + Sync + 'static { async fn copy_permit(&self) -> Option { None } + + fn signal_delete_done(&self, _generation: Generation) {} } impl DatasetsManager for InternalDisksReceiver { @@ -908,16 +893,20 @@ mod test { use camino_tempfile::Utf8TempDir; use futures::stream::{self, StreamExt}; use hex_literal::hex; + use omicron_common::api::external::Generation; use omicron_test_utils::dev::test_setup_log; use sled_agent_api::ArtifactConfig; use tokio::io::AsyncReadExt; use tokio::sync::oneshot; + use tokio::sync::watch; use tufaceous_artifact::ArtifactHash; use super::{ArtifactStore, DatasetsManager, Error}; #[derive(Clone)] struct TestBackend { + delete_done_tx: watch::Sender, + delete_done_rx: watch::Receiver, datasets: Vec, _tempdir: Arc, } @@ -934,7 +923,13 @@ mod test { datasets.push(dataset) } - TestBackend { datasets, _tempdir: tempdir } + let (delete_done_tx, delete_done_rx) = watch::channel(0u32.into()); + TestBackend { + delete_done_tx, + delete_done_rx, + datasets, + _tempdir: tempdir, + } } } @@ -944,6 +939,14 @@ mod test { ) -> impl Iterator + '_ { self.datasets.iter().cloned() } + + fn signal_delete_done(&self, generation: Generation) { + self.delete_done_tx.send_if_modified(|old| { + let modified = *old != generation; + *old = generation; + modified + }); + } } const TEST_ARTIFACT: Bytes = Bytes::from_static(b"I'm an artifact!\n"); @@ -1121,7 +1124,7 @@ mod test { } // clear `delete_done` so we can synchronize with the delete reconciler - store.delete_done.mark_unchanged(); + store.storage.delete_done_rx.mark_unchanged(); // put a new config that says we don't want the artifact anymore. config.generation = config.generation.next(); config.artifacts.remove(&TEST_HASH); @@ -1130,7 +1133,7 @@ mod test { // has actually occurred yet assert!(store.list().await.unwrap().list.is_empty()); // wait for deletion to actually complete - store.delete_done.changed().await.unwrap(); + store.storage.delete_done_rx.changed().await.unwrap(); // get fails, because it has been deleted assert!(matches!( store.get(TEST_HASH).await, diff --git a/sled-agent/src/sim/artifact_store.rs b/sled-agent/src/sim/artifact_store.rs index e20dae49ea5..00f6f475465 100644 --- a/sled-agent/src/sim/artifact_store.rs +++ b/sled-agent/src/sim/artifact_store.rs @@ -7,13 +7,15 @@ use std::sync::Arc; +use camino::Utf8Path; use camino_tempfile::Utf8TempDir; use dropshot::{ Body, ConfigDropshot, FreeformBody, HttpError, HttpResponseOk, HttpServer, Path, RequestContext, ServerBuilder, }; +use omicron_common::api::external::Generation; use repo_depot_api::*; -use tokio::sync::{OwnedSemaphorePermit, Semaphore}; +use tokio::sync::{OwnedSemaphorePermit, Semaphore, watch}; use crate::artifact_store::{ArtifactStore, DatasetsManager}; @@ -28,6 +30,10 @@ pub struct SimArtifactStorage { // Semaphore to keep track of how many copy requests are in flight, and to // be able to await on their completion. Used in integration tests. copy_semaphore: Arc, + + // Watch channel to be able to await on the delete reconciler completing in + // integration tests. + delete_done: watch::Sender, } impl SimArtifactStorage { @@ -40,6 +46,7 @@ impl SimArtifactStorage { copy_semaphore: Arc::new( const { Semaphore::const_new(MAX_PERMITS as usize) }, ), + delete_done: watch::Sender::new(0u32.into()), } } } @@ -54,6 +61,14 @@ impl DatasetsManager for SimArtifactStorage { async fn copy_permit(&self) -> Option { Some(self.copy_semaphore.clone().acquire_owned().await.unwrap()) } + + fn signal_delete_done(&self, generation: Generation) { + self.delete_done.send_if_modified(|old| { + let modified = *old != generation; + *old = generation; + modified + }); + } } impl ArtifactStore { @@ -73,6 +88,10 @@ impl ArtifactStore { .unwrap() } + pub fn storage_paths(&self) -> impl Iterator { + self.storage.dirs.iter().map(|p| p.path()) + } + pub async fn wait_for_copy_tasks(&self) { // Acquire a permit for MAX_PERMITS, which requires that all copy tasks // have dropped their permits. Then immediately drop it. @@ -83,6 +102,10 @@ impl ArtifactStore { .await .unwrap(); } + + pub fn subscribe_delete_done(&self) -> watch::Receiver { + self.storage.delete_done.subscribe() + } } /// Implementation of the Repo Depot API backed by an