diff --git a/src/dist/manifestation.rs b/src/dist/manifestation.rs index fd890b1a18..edc6d42580 100644 --- a/src/dist/manifestation.rs +++ b/src/dist/manifestation.rs @@ -11,13 +11,14 @@ use futures_util::stream::StreamExt; use std::sync::Arc; use tokio::sync::Semaphore; use tracing::info; +use url::Url; use crate::dist::component::{ Components, Package, TarGzPackage, TarXzPackage, TarZStdPackage, Transaction, }; use crate::dist::config::Config; use crate::dist::download::{DownloadCfg, File}; -use crate::dist::manifest::{Component, CompressionKind, Manifest, TargetedPackage}; +use crate::dist::manifest::{Component, CompressionKind, HashedBinary, Manifest, TargetedPackage}; use crate::dist::notifications::*; use crate::dist::prefix::InstallPrefix; use crate::dist::temp; @@ -153,8 +154,8 @@ impl Manifestation { let altered = tmp_cx.dist_server != DEFAULT_DIST_SERVER; // Download component packages and validate hashes - let mut things_to_install: Vec<(Component, CompressionKind, File)> = Vec::new(); - let mut things_downloaded: Vec = Vec::new(); + let mut things_to_install = Vec::new(); + let mut things_downloaded = Vec::new(); let components = update.components_urls_and_hashes(new_manifest)?; let components_len = components.len(); @@ -173,44 +174,44 @@ impl Manifestation { .unwrap_or(DEFAULT_MAX_RETRIES); info!("downloading component(s)"); - for (component, _, url, _) in components.clone() { + for bin in &components { (download_cfg.notify_handler)(Notification::DownloadingComponent( - &component.short_name(new_manifest), + &bin.component.short_name(new_manifest), &self.target_triple, - component.target.as_ref(), - &url, + bin.component.target.as_ref(), + &bin.binary.url, )); } let semaphore = Arc::new(Semaphore::new(concurrent_downloads)); - let component_stream = - tokio_stream::iter(components.into_iter()).map(|(component, format, url, hash)| { - let sem = semaphore.clone(); - async move { - let _permit = sem.acquire().await.unwrap(); - self.download_component( - component, - format, - url, - hash, - altered, - tmp_cx, - download_cfg, - max_retries, - new_manifest, - ) + let component_stream = tokio_stream::iter(components.into_iter()).map(|bin| { + let sem = semaphore.clone(); + async move { + let _permit = sem.acquire().await.unwrap(); + let url = if altered { + utils::parse_url( + &bin.binary + .url + .replace(DEFAULT_DIST_SERVER, tmp_cx.dist_server.as_str()), + )? + } else { + utils::parse_url(&bin.binary.url)? + }; + + bin.download(&url, download_cfg, max_retries, new_manifest) .await - } - }); + .map(|downloaded| (bin, downloaded)) + } + }); if components_len > 0 { let results = component_stream .buffered(components_len) .collect::>() .await; for result in results { - let (component, format, downloaded_file, hash) = result?; - things_downloaded.push(hash); - things_to_install.push((component, format, downloaded_file)); + let (bin, downloaded_file) = result?; + things_downloaded.push(bin.binary.hash.clone()); + things_to_install.push((bin.component, bin.binary.compression, downloaded_file)); } } @@ -547,50 +548,6 @@ impl Manifestation { Ok(tx) } - - #[allow(clippy::too_many_arguments)] - async fn download_component( - &self, - component: Component, - format: CompressionKind, - url: String, - hash: String, - altered: bool, - tmp_cx: &temp::Context, - download_cfg: &DownloadCfg<'_>, - max_retries: usize, - new_manifest: &Manifest, - ) -> Result<(Component, CompressionKind, File, String)> { - use tokio_retry::{RetryIf, strategy::FixedInterval}; - - let url = if altered { - url.replace(DEFAULT_DIST_SERVER, tmp_cx.dist_server.as_str()) - } else { - url - }; - - let url_url = utils::parse_url(&url)?; - - let downloaded_file = RetryIf::spawn( - FixedInterval::from_millis(0).take(max_retries), - || download_cfg.download(&url_url, &hash), - |e: &anyhow::Error| { - // retry only known retriable cases - match e.downcast_ref::() { - Some(RustupError::BrokenPartialFile) - | Some(RustupError::DownloadingFile { .. }) => { - (download_cfg.notify_handler)(Notification::RetryingDownload(&url)); - true - } - _ => false, - } - }, - ) - .await - .with_context(|| RustupError::ComponentDownloadFailed(component.name(new_manifest)))?; - - Ok((component, format, downloaded_file, hash)) - } } #[derive(Debug)] @@ -782,10 +739,10 @@ impl Update { } /// Map components to urls and hashes - fn components_urls_and_hashes( - &self, - new_manifest: &Manifest, - ) -> Result> { + fn components_urls_and_hashes<'a>( + &'a self, + new_manifest: &'a Manifest, + ) -> Result>> { let mut components_urls_and_hashes = Vec::new(); for component in &self.components_to_install { let package = new_manifest.get_package(component.short_name_in_manifest())?; @@ -797,14 +754,49 @@ impl Update { } // We prefer the first format in the list, since the parsing of the // manifest leaves us with the files/hash pairs in preference order. - components_urls_and_hashes.push(( - component.clone(), - target_package.bins[0].compression, - target_package.bins[0].url.clone(), - target_package.bins[0].hash.clone(), - )); + components_urls_and_hashes.push(ComponentBinary { + component, + binary: &target_package.bins[0], + }); } Ok(components_urls_and_hashes) } } + +struct ComponentBinary<'a> { + component: &'a Component, + binary: &'a HashedBinary, +} + +impl<'a> ComponentBinary<'a> { + async fn download( + &self, + url: &Url, + download_cfg: &DownloadCfg<'_>, + max_retries: usize, + new_manifest: &Manifest, + ) -> Result { + use tokio_retry::{RetryIf, strategy::FixedInterval}; + + let downloaded_file = RetryIf::spawn( + FixedInterval::from_millis(0).take(max_retries), + || download_cfg.download(url, &self.binary.hash), + |e: &anyhow::Error| { + // retry only known retriable cases + match e.downcast_ref::() { + Some(RustupError::BrokenPartialFile) + | Some(RustupError::DownloadingFile { .. }) => { + (download_cfg.notify_handler)(Notification::RetryingDownload(url.as_str())); + true + } + _ => false, + } + }, + ) + .await + .with_context(|| RustupError::ComponentDownloadFailed(self.component.name(new_manifest)))?; + + Ok(downloaded_file) + } +}