Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions nexus/db-queries/src/db/datastore/update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,19 @@ impl DataStore {
})
}

/// List the artifacts present in a TUF repo.
pub async fn tuf_list_repo_artifacts(
&self,
opctx: &OpContext,
repo_id: TufRepoUuid,
) -> ListResultVec<TufArtifact> {
opctx.authorize(authz::Action::Read, &authz::FLEET).await?;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd add do two things to try to avoid people accidentally using this in API endpoints (since we're making multiple queries here):

  • add a call to opctx.check_complex_operations_allowed()?;
  • add _batched() to the name to convey that (we do this in a few other places in the datastore)

Really, I'd be tempted to apply this to artifacts_for_repo, but that may currently break some callers that might be using it from the API. Those should probably be made paginated but we can do that when the dust settles on these APIs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After #9106 lands we may be able to refactor things a little and apply this to artifacts_for_repo, since it removes the list of artifacts from the public APIs.

let conn = self.pool_connection_authorized(opctx).await?;
artifacts_for_repo(repo_id, &conn)
.await
.map_err(|e| public_error_from_diesel(e, ErrorHandler::Server))
}

/// Returns the current TUF repo generation number.
pub async fn tuf_get_generation(
&self,
Expand Down
39 changes: 23 additions & 16 deletions nexus/src/app/background/tasks/tuf_artifact_replication.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,15 +62,13 @@ use std::ops::ControlFlow;
use std::str::FromStr;
use std::sync::Arc;

use anyhow::{Context, Result};
use anyhow::{Context, Result, ensure};
use chrono::Utc;
use futures::future::{BoxFuture, FutureExt};
use futures::stream::{FuturesUnordered, Stream, StreamExt};
use http::StatusCode;
use nexus_auth::context::OpContext;
use nexus_db_queries::db::{
DataStore, datastore::SQL_BATCH_SIZE, pagination::Paginator,
};
use nexus_db_queries::db::DataStore;
use nexus_networking::sled_client_from_address;
use nexus_types::deployment::SledFilter;
use nexus_types::identity::Asset;
Expand All @@ -79,7 +77,7 @@ use nexus_types::internal_api::background::{
TufArtifactReplicationRequest, TufArtifactReplicationStatus,
};
use omicron_common::api::external::Generation;
use omicron_uuid_kinds::{GenericUuid, SledUuid};
use omicron_uuid_kinds::SledUuid;
use rand::seq::{IndexedRandom, SliceRandom};
use serde_json::json;
use sled_agent_client::types::ArtifactConfig;
Expand Down Expand Up @@ -593,18 +591,26 @@ impl ArtifactReplication {
opctx: &OpContext,
) -> Result<(ArtifactConfig, Inventory)> {
let generation = self.datastore.tuf_get_generation(opctx).await?;
let repos =
self.datastore.tuf_list_repos_unpruned_batched(opctx).await?;
// `tuf_list_repos_unpruned_batched` performs pagination internally,
// so check that the generation hasn't changed during our pagination to
// ensure we got a consistent read.
{
let generation_now =
self.datastore.tuf_get_generation(opctx).await?;
ensure!(
generation == generation_now,
"generation changed from {generation} \
to {generation_now}, bailing"
);
}
Comment on lines +599 to +607
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Making sure I understand: we have to do this check because if the generation changed, the config we'd build from repos would be inconsistent with other Nexuses building a config at generation_now, right? This seems pretty critical and maybe easy to miss - maybe worth a comment? (My very first reaction reading this was "why do we need to check this? if a new repo has been pruned or added that's fine and we'll just pick it up the next time we run")

Alternatively: should tuf_list_repos_unpruned_batched() take a generation argument and fail if it changes at any point during the listing?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Initially I did have this implemented where we read generation and then paginated through the repositories during a single transaction, but I replaced it with the datastore methods from #9107. The comment for tuf_list_repos_unpruned_batched() reads:

    /// Since this involves pagination, this is not a consistent snapshot.
    /// Consider using `tuf_get_generation()` before calling this function and
    /// then making any subsequent queries conditional on the generation not
    /// having changed.

So the second check is my interpretation of making subsequent queries conditional on the generation not having changed. I will add a comment to this effect.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hah, yeah, one conditional check after does seem better than reasserting the condition as we page through a table. Thanks.


let mut inventory = Inventory::default();
let mut paginator = Paginator::new(
SQL_BATCH_SIZE,
dropshot::PaginationOrder::Ascending,
);
while let Some(p) = paginator.next() {
let batch = self
.datastore
.tuf_list_repos(opctx, generation, &p.current_pagparams())
.await?;
paginator = p.found_batch(&batch, &|a| a.id.into_untyped_uuid());
for artifact in batch {
for repo in repos {
for artifact in
self.datastore.tuf_list_repo_artifacts(opctx, repo.id()).await?
{
inventory.0.entry(artifact.sha256.0).or_insert_with(|| {
ArtifactPresence { sleds: BTreeMap::new(), local: None }
});
Expand Down Expand Up @@ -785,6 +791,7 @@ mod tests {
use std::fmt::Write;

use expectorate::assert_contents;
use omicron_uuid_kinds::GenericUuid;
use rand::{Rng, SeedableRng, rngs::StdRng};

use super::*;
Expand Down
75 changes: 73 additions & 2 deletions nexus/tests/integration_tests/updates.rs
Original file line number Diff line number Diff line change
Expand Up @@ -428,7 +428,7 @@ async fn test_repo_upload() -> Result<()> {

// Upload a new repository with a different system version but no other
// changes. This should be accepted.
{
let initial_installinator_doc = {
let tweaks = &[ManifestTweak::SystemVersion("2.0.0".parse().unwrap())];
let response = trust_root
.assemble_repo(&logctx.log, tweaks)
Expand Down Expand Up @@ -509,7 +509,9 @@ async fn test_repo_upload() -> Result<()> {
description, get_description,
"initial description matches fetched description"
);
}

installinator_doc_1
};
// The installinator document changed, so the generation number is bumped to
// 3.
assert_eq!(
Expand All @@ -525,6 +527,75 @@ async fn test_repo_upload() -> Result<()> {
assert_eq!(status.last_run_counters.put_ok, 3);
assert_eq!(status.last_run_counters.copy_ok, 1);
assert_eq!(status.local_repos, 1);
// Run the replication background task again; the local repos should be
// dropped.
let status =
run_tuf_artifact_replication_step(&cptestctx.lockstep_client).await;
eprintln!("{status:?}");
assert_eq!(status.last_run_counters.put_config_ok, 4);
assert_eq!(status.last_run_counters.list_ok, 4);
assert_eq!(status.last_run_counters.sum(), 8);
assert_eq!(status.local_repos, 0);

// Verify the initial installinator document is present on all sled-agents.
let installinator_doc_hash = initial_installinator_doc.hash.to_string();
for sled_agent in &cptestctx.sled_agents {
for dir in sled_agent.sled_agent().artifact_store().storage_paths() {
let path = dir.join(&installinator_doc_hash);
assert!(path.exists(), "{path} does not exist");
}
}
// Collect watchers for all of the sled-agent artifact delete reconcilers.
let mut delete_watchers = cptestctx
.sled_agents
.iter()
.map(|sled_agent| {
sled_agent.sled_agent().artifact_store().subscribe_delete_done()
})
.collect::<Vec<_>>();
// Manually prune the first repo.
let initial_repo = datastore
.tuf_repo_get_by_version(
&opctx,
"1.0.0".parse::<Version>().unwrap().into(),
)
.await?;
let recent_releases =
datastore.target_release_fetch_recent_distinct(&opctx, 3).await?;
datastore
.tuf_repo_mark_pruned(
&opctx,
status.generation,
&recent_releases,
initial_repo.repo.id(),
)
.await
.unwrap();
// Marking a repository as pruned bumps the generation number.
assert_eq!(
datastore.tuf_get_generation(&opctx).await.unwrap(),
4u32.into()
);
// Run the replication background task; we should see new configs be put.
let status =
run_tuf_artifact_replication_step(&cptestctx.lockstep_client).await;
eprintln!("{status:?}");
assert_eq!(status.last_run_counters.put_config_ok, 4);
assert_eq!(status.last_run_counters.list_ok, 4);
assert_eq!(status.last_run_counters.sum(), 8);
assert_eq!(status.generation, 4u32.into());
// Wait for the delete reconciler to finish on all sled agents.
futures::future::join_all(
delete_watchers.iter_mut().map(|watcher| watcher.changed()),
)
.await;
// Verify the installinator document from the initial repo is deleted.
for sled_agent in &cptestctx.sled_agents {
for dir in sled_agent.sled_agent().artifact_store().storage_paths() {
let path = dir.join(&installinator_doc_hash);
assert!(!path.exists(), "{path} was not deleted");
}
}

cptestctx.teardown().await;
Ok(())
Expand Down
45 changes: 24 additions & 21 deletions sled-agent/src/artifact_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,6 @@ pub struct ArtifactStore<T: DatasetsManager> {
ledger_tx: mpsc::Sender<LedgerManagerRequest>,
config: watch::Receiver<Option<ArtifactConfig>>,
pub(crate) storage: T,

/// Used for synchronization in unit tests.
#[cfg(test)]
delete_done: watch::Receiver<Generation>,
}

impl<T: DatasetsManager> ArtifactStore<T> {
Expand Down Expand Up @@ -136,14 +132,10 @@ impl<T: DatasetsManager> ArtifactStore<T> {
config_tx,
));

#[cfg(test)]
let (done_signal, delete_done) = watch::channel(0u32.into());
tokio::task::spawn(delete_reconciler(
log.clone(),
storage.clone(),
config.clone(),
#[cfg(test)]
done_signal,
));

ArtifactStore {
Expand All @@ -155,9 +147,6 @@ impl<T: DatasetsManager> ArtifactStore<T> {
ledger_tx,
config,
storage,

#[cfg(test)]
delete_done,
}
}
}
Expand Down Expand Up @@ -506,7 +495,6 @@ async fn delete_reconciler<T: DatasetsManager>(
log: Logger,
storage: T,
mut receiver: watch::Receiver<Option<ArtifactConfig>>,
#[cfg(test)] done_signal: watch::Sender<Generation>,
) {
while let Ok(()) = receiver.changed().await {
let generation = match receiver.borrow_and_update().as_ref() {
Expand Down Expand Up @@ -580,12 +568,7 @@ async fn delete_reconciler<T: DatasetsManager>(
}
}
}
#[cfg(test)]
done_signal.send_if_modified(|old| {
let modified = *old != generation;
*old = generation;
modified
});
storage.signal_delete_done(generation);
}
warn!(log, "Delete reconciler sender dropped");
}
Expand All @@ -602,6 +585,8 @@ pub trait DatasetsManager: Clone + Send + Sync + 'static {
async fn copy_permit(&self) -> Option<OwnedSemaphorePermit> {
None
}

fn signal_delete_done(&self, _generation: Generation) {}
}

impl DatasetsManager for InternalDisksReceiver {
Expand Down Expand Up @@ -908,16 +893,20 @@ mod test {
use camino_tempfile::Utf8TempDir;
use futures::stream::{self, StreamExt};
use hex_literal::hex;
use omicron_common::api::external::Generation;
use omicron_test_utils::dev::test_setup_log;
use sled_agent_api::ArtifactConfig;
use tokio::io::AsyncReadExt;
use tokio::sync::oneshot;
use tokio::sync::watch;
use tufaceous_artifact::ArtifactHash;

use super::{ArtifactStore, DatasetsManager, Error};

#[derive(Clone)]
struct TestBackend {
delete_done_tx: watch::Sender<Generation>,
delete_done_rx: watch::Receiver<Generation>,
datasets: Vec<Utf8PathBuf>,
_tempdir: Arc<Utf8TempDir>,
}
Expand All @@ -934,7 +923,13 @@ mod test {
datasets.push(dataset)
}

TestBackend { datasets, _tempdir: tempdir }
let (delete_done_tx, delete_done_rx) = watch::channel(0u32.into());
TestBackend {
delete_done_tx,
delete_done_rx,
datasets,
_tempdir: tempdir,
}
}
}

Expand All @@ -944,6 +939,14 @@ mod test {
) -> impl Iterator<Item = camino::Utf8PathBuf> + '_ {
self.datasets.iter().cloned()
}

fn signal_delete_done(&self, generation: Generation) {
self.delete_done_tx.send_if_modified(|old| {
let modified = *old != generation;
*old = generation;
modified
});
}
}

const TEST_ARTIFACT: Bytes = Bytes::from_static(b"I'm an artifact!\n");
Expand Down Expand Up @@ -1121,7 +1124,7 @@ mod test {
}

// clear `delete_done` so we can synchronize with the delete reconciler
store.delete_done.mark_unchanged();
store.storage.delete_done_rx.mark_unchanged();
// put a new config that says we don't want the artifact anymore.
config.generation = config.generation.next();
config.artifacts.remove(&TEST_HASH);
Expand All @@ -1130,7 +1133,7 @@ mod test {
// has actually occurred yet
assert!(store.list().await.unwrap().list.is_empty());
// wait for deletion to actually complete
store.delete_done.changed().await.unwrap();
store.storage.delete_done_rx.changed().await.unwrap();
// get fails, because it has been deleted
assert!(matches!(
store.get(TEST_HASH).await,
Expand Down
25 changes: 24 additions & 1 deletion sled-agent/src/sim/artifact_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,15 @@

use std::sync::Arc;

use camino::Utf8Path;
use camino_tempfile::Utf8TempDir;
use dropshot::{
Body, ConfigDropshot, FreeformBody, HttpError, HttpResponseOk, HttpServer,
Path, RequestContext, ServerBuilder,
};
use omicron_common::api::external::Generation;
use repo_depot_api::*;
use tokio::sync::{OwnedSemaphorePermit, Semaphore};
use tokio::sync::{OwnedSemaphorePermit, Semaphore, watch};

use crate::artifact_store::{ArtifactStore, DatasetsManager};

Expand All @@ -28,6 +30,10 @@ pub struct SimArtifactStorage {
// Semaphore to keep track of how many copy requests are in flight, and to
// be able to await on their completion. Used in integration tests.
copy_semaphore: Arc<Semaphore>,

// Watch channel to be able to await on the delete reconciler completing in
// integration tests.
delete_done: watch::Sender<Generation>,
}

impl SimArtifactStorage {
Expand All @@ -40,6 +46,7 @@ impl SimArtifactStorage {
copy_semaphore: Arc::new(
const { Semaphore::const_new(MAX_PERMITS as usize) },
),
delete_done: watch::Sender::new(0u32.into()),
}
}
}
Expand All @@ -54,6 +61,14 @@ impl DatasetsManager for SimArtifactStorage {
async fn copy_permit(&self) -> Option<OwnedSemaphorePermit> {
Some(self.copy_semaphore.clone().acquire_owned().await.unwrap())
}

fn signal_delete_done(&self, generation: Generation) {
self.delete_done.send_if_modified(|old| {
let modified = *old != generation;
*old = generation;
modified
});
}
}

impl ArtifactStore<SimArtifactStorage> {
Expand All @@ -73,6 +88,10 @@ impl ArtifactStore<SimArtifactStorage> {
.unwrap()
}

pub fn storage_paths(&self) -> impl Iterator<Item = &Utf8Path> {
self.storage.dirs.iter().map(|p| p.path())
}

pub async fn wait_for_copy_tasks(&self) {
// Acquire a permit for MAX_PERMITS, which requires that all copy tasks
// have dropped their permits. Then immediately drop it.
Expand All @@ -83,6 +102,10 @@ impl ArtifactStore<SimArtifactStorage> {
.await
.unwrap();
}

pub fn subscribe_delete_done(&self) -> watch::Receiver<Generation> {
self.storage.delete_done.subscribe()
}
}

/// Implementation of the Repo Depot API backed by an
Expand Down
Loading