Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
156 changes: 74 additions & 82 deletions src/dist/manifestation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,14 @@ use futures_util::stream::StreamExt;
use std::sync::Arc;
use tokio::sync::Semaphore;
use tracing::info;
use url::Url;

use crate::dist::component::{
Components, Package, TarGzPackage, TarXzPackage, TarZStdPackage, Transaction,
};
use crate::dist::config::Config;
use crate::dist::download::{DownloadCfg, File};
use crate::dist::manifest::{Component, CompressionKind, Manifest, TargetedPackage};
use crate::dist::manifest::{Component, CompressionKind, HashedBinary, Manifest, TargetedPackage};
use crate::dist::notifications::*;
use crate::dist::prefix::InstallPrefix;
use crate::dist::temp;
Expand Down Expand Up @@ -153,8 +154,8 @@ impl Manifestation {
let altered = tmp_cx.dist_server != DEFAULT_DIST_SERVER;

// Download component packages and validate hashes
let mut things_to_install: Vec<(Component, CompressionKind, File)> = Vec::new();
let mut things_downloaded: Vec<String> = Vec::new();
let mut things_to_install = Vec::new();
let mut things_downloaded = Vec::new();
let components = update.components_urls_and_hashes(new_manifest)?;
let components_len = components.len();

Expand All @@ -173,44 +174,44 @@ impl Manifestation {
.unwrap_or(DEFAULT_MAX_RETRIES);

info!("downloading component(s)");
for (component, _, url, _) in components.clone() {
for bin in &components {
(download_cfg.notify_handler)(Notification::DownloadingComponent(
&component.short_name(new_manifest),
&bin.component.short_name(new_manifest),
&self.target_triple,
component.target.as_ref(),
&url,
bin.component.target.as_ref(),
&bin.binary.url,
));
}

let semaphore = Arc::new(Semaphore::new(concurrent_downloads));
let component_stream =
tokio_stream::iter(components.into_iter()).map(|(component, format, url, hash)| {
let sem = semaphore.clone();
async move {
let _permit = sem.acquire().await.unwrap();
self.download_component(
component,
format,
url,
hash,
altered,
tmp_cx,
download_cfg,
max_retries,
new_manifest,
)
let component_stream = tokio_stream::iter(components.into_iter()).map(|bin| {
let sem = semaphore.clone();
async move {
let _permit = sem.acquire().await.unwrap();
let url = if altered {
utils::parse_url(
&bin.binary
.url
.replace(DEFAULT_DIST_SERVER, tmp_cx.dist_server.as_str()),
)?
} else {
utils::parse_url(&bin.binary.url)?
};

bin.download(&url, download_cfg, max_retries, new_manifest)
.await
}
});
.map(|downloaded| (bin, downloaded))
}
});
if components_len > 0 {
let results = component_stream
.buffered(components_len)
.collect::<Vec<_>>()
.await;
for result in results {
let (component, format, downloaded_file, hash) = result?;
things_downloaded.push(hash);
things_to_install.push((component, format, downloaded_file));
let (bin, downloaded_file) = result?;
things_downloaded.push(bin.binary.hash.clone());
things_to_install.push((bin.component, bin.binary.compression, downloaded_file));
}
}

Expand Down Expand Up @@ -547,50 +548,6 @@ impl Manifestation {

Ok(tx)
}

#[allow(clippy::too_many_arguments)]
async fn download_component(
&self,
component: Component,
format: CompressionKind,
url: String,
hash: String,
altered: bool,
tmp_cx: &temp::Context,
download_cfg: &DownloadCfg<'_>,
max_retries: usize,
new_manifest: &Manifest,
) -> Result<(Component, CompressionKind, File, String)> {
use tokio_retry::{RetryIf, strategy::FixedInterval};

let url = if altered {
url.replace(DEFAULT_DIST_SERVER, tmp_cx.dist_server.as_str())
} else {
url
};

let url_url = utils::parse_url(&url)?;

let downloaded_file = RetryIf::spawn(
FixedInterval::from_millis(0).take(max_retries),
|| download_cfg.download(&url_url, &hash),
|e: &anyhow::Error| {
// retry only known retriable cases
match e.downcast_ref::<RustupError>() {
Some(RustupError::BrokenPartialFile)
| Some(RustupError::DownloadingFile { .. }) => {
(download_cfg.notify_handler)(Notification::RetryingDownload(&url));
true
}
_ => false,
}
},
)
.await
.with_context(|| RustupError::ComponentDownloadFailed(component.name(new_manifest)))?;

Ok((component, format, downloaded_file, hash))
}
}

#[derive(Debug)]
Expand Down Expand Up @@ -782,10 +739,10 @@ impl Update {
}

/// Map components to urls and hashes
fn components_urls_and_hashes(
&self,
new_manifest: &Manifest,
) -> Result<Vec<(Component, CompressionKind, String, String)>> {
fn components_urls_and_hashes<'a>(
&'a self,
new_manifest: &'a Manifest,
) -> Result<Vec<ComponentBinary<'a>>> {
let mut components_urls_and_hashes = Vec::new();
for component in &self.components_to_install {
let package = new_manifest.get_package(component.short_name_in_manifest())?;
Expand All @@ -797,14 +754,49 @@ impl Update {
}
// We prefer the first format in the list, since the parsing of the
// manifest leaves us with the files/hash pairs in preference order.
components_urls_and_hashes.push((
component.clone(),
target_package.bins[0].compression,
target_package.bins[0].url.clone(),
target_package.bins[0].hash.clone(),
));
components_urls_and_hashes.push(ComponentBinary {
component,
binary: &target_package.bins[0],
});
}

Ok(components_urls_and_hashes)
}
}

struct ComponentBinary<'a> {
component: &'a Component,
binary: &'a HashedBinary,
}

impl<'a> ComponentBinary<'a> {
async fn download(
&self,
url: &Url,
download_cfg: &DownloadCfg<'_>,
max_retries: usize,
new_manifest: &Manifest,
) -> Result<File> {
use tokio_retry::{RetryIf, strategy::FixedInterval};

let downloaded_file = RetryIf::spawn(
FixedInterval::from_millis(0).take(max_retries),
|| download_cfg.download(url, &self.binary.hash),
|e: &anyhow::Error| {
// retry only known retriable cases
match e.downcast_ref::<RustupError>() {
Some(RustupError::BrokenPartialFile)
| Some(RustupError::DownloadingFile { .. }) => {
(download_cfg.notify_handler)(Notification::RetryingDownload(url.as_str()));
true
}
_ => false,
}
},
)
.await
.with_context(|| RustupError::ComponentDownloadFailed(self.component.name(new_manifest)))?;

Ok(downloaded_file)
}
}
Loading