Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion src/dist/download.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use tracing::{debug, warn};
use url::Url;

use crate::config::Cfg;
use crate::dist::DEFAULT_DIST_SERVER;
use crate::dist::temp;
use crate::download::{download_file, download_file_with_resume};
use crate::errors::RustupError;
Expand Down Expand Up @@ -116,7 +117,7 @@ impl<'a> DownloadCfg<'a> {
}
}

pub(crate) fn clean(&self, hashes: &[String]) -> Result<()> {
pub(crate) fn clean(&self, hashes: &[impl AsRef<Path>]) -> Result<()> {
for hash in hashes.iter() {
let used_file = self.download_dir.join(hash);
if self.download_dir.join(&used_file).exists() {
Expand Down Expand Up @@ -207,6 +208,15 @@ impl<'a> DownloadCfg<'a> {
retry_time: Mutex::new(None),
}
}

pub(crate) fn url(&self, url: &str) -> Result<Url> {
match &*self.tmp_cx.dist_server {
server if server != DEFAULT_DIST_SERVER => utils::parse_url(
&url.replace(DEFAULT_DIST_SERVER, self.tmp_cx.dist_server.as_str()),
),
_ => utils::parse_url(url),
}
}
}

/// Tracks download progress and displays information about it to a terminal.
Expand Down
111 changes: 48 additions & 63 deletions src/dist/manifestation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,8 @@ mod tests;
use std::path::Path;

use anyhow::{Context, Result, anyhow, bail};
use futures_util::stream::StreamExt;
use std::sync::Arc;
use tokio::sync::Semaphore;
use futures_util::stream::{FuturesUnordered, StreamExt};
use tracing::{info, warn};
use url::Url;

use crate::dist::component::{
Components, Package, TarGzPackage, TarXzPackage, TarZStdPackage, Transaction,
Expand Down Expand Up @@ -153,23 +150,20 @@ impl Manifestation {
}
}

let altered = tmp_cx.dist_server != DEFAULT_DIST_SERVER;

// Download component packages and validate hashes
let mut things_to_install = Vec::new();
let mut things_downloaded = Vec::new();
let components = update
.components_urls_and_hashes(new_manifest)
.map(|res| {
res.map(|(component, binary)| ComponentBinary {
component,
binary,
status: download_cfg.status_for(component.short_name(new_manifest)),
manifest: new_manifest,
download_cfg,
})
})
.collect::<Result<Vec<_>>>()?;

let components_len = components.len();
const DEFAULT_CONCURRENT_DOWNLOADS: usize = 2;
let concurrent_downloads = download_cfg
.process
Expand All @@ -184,39 +178,6 @@ impl Manifestation {
.and_then(|s| s.parse().ok())
.unwrap_or(DEFAULT_MAX_RETRIES);

info!("downloading component(s)");
let semaphore = Arc::new(Semaphore::new(concurrent_downloads));
let component_stream = tokio_stream::iter(components.into_iter()).map(|bin| {
let sem = semaphore.clone();
async move {
let _permit = sem.acquire().await.unwrap();
let url = if altered {
utils::parse_url(
&bin.binary
.url
.replace(DEFAULT_DIST_SERVER, tmp_cx.dist_server.as_str()),
)?
} else {
utils::parse_url(&bin.binary.url)?
};

bin.download(&url, download_cfg, max_retries, new_manifest)
.await
.map(|downloaded| (bin, downloaded))
}
});
if components_len > 0 {
let results = component_stream
.buffered(components_len)
.collect::<Vec<_>>()
.await;
for result in results {
let (bin, downloaded_file) = result?;
things_downloaded.push(bin.binary.hash.clone());
things_to_install.push((bin, downloaded_file));
}
}

// Begin transaction
let mut tx = Transaction::new(prefix.clone(), tmp_cx, download_cfg.process);

Expand Down Expand Up @@ -255,15 +216,41 @@ impl Manifestation {
tx = self.uninstall_component(component, new_manifest, tx, download_cfg.process)?;
}

// Install components
for (component_bin, installer_file) in things_to_install {
tx = component_bin.install(installer_file, tx, new_manifest, self, download_cfg)?;
let mut downloads = FuturesUnordered::new();
let mut component_iter = components.iter();
let mut cleanup_downloads = vec![];
loop {
Copy link
Member

@rami3l rami3l Oct 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@djc Thanks for the testbed!

After careful comparisons, I think this is equivalent to b5e9e4d (#4471) once we squash 5800cba (#4471) back onto it to eliminate the use of channels, modulo some edge cases such as empty todo list and download failures.

The core of this approach includes directly issuing a blocking call to what we have pulled from a FuturesUnordered. When that blocking call is running, there will be no progress updates and no new installations being initiated. So this is the same as what we've had after the very first step of #4471.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for looking into. Do you think it's useful to merge (something like) this and proceed from there?

When that blocking call is running, there will be no progress updates and no new installations being initiated.

If this is the main issue that's left, it feels like there might be easier (and better) ways to solve this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@djc I think that is THE remaining issue, yes.

However I still have concerns regarding the aforementioned edge cases that I and @FranciscoTGouveia have found out when working on #4471 so more careful checks have to be carried out.

I think it'd be nice if both of us can have another look at this PR to ensure the semantics are right, it might take some time though while I try to coordinate 🙇

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

However I still have concerns regarding the aforementioned edge cases that I and @FranciscoTGouveia have found out when working on #4471 so more careful checks have to be carried out.

I think it'd be nice if both of us can have another look at this PR to ensure the semantics are right, it might take some time though while I try to coordinate 🙇

Sounds great!

if downloads.is_empty() && component_iter.len() == 0 {
break;
}

let (bin, downloaded) = match downloads.next().await {
Some(Ok((bin, downloaded))) => (bin, downloaded),
Some(Err(e)) => return Err(e),
None => {
if let Some(bin) = component_iter.next() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems inconsistent with the while section below in the number of tasks being added to the todo list.

downloads.push(bin.download(max_retries));
}
continue;
}
};

while component_iter.len() > 0 && downloads.len() < concurrent_downloads {
if let Some(bin) = component_iter.next() {
downloads.push(bin.download(max_retries));
}
}

cleanup_downloads.push(&bin.binary.hash);
tx = bin.install(downloaded, tx, self)?;
}

// Install new distribution manifest
let new_manifest_str = new_manifest.clone().stringify()?;
tx.modify_file(rel_installed_manifest_path)?;
utils::write_file("manifest", &installed_manifest_path, &new_manifest_str)?;
download_cfg.clean(&cleanup_downloads)?;
drop(downloads);

// Write configuration.
//
Expand All @@ -284,8 +271,6 @@ impl Manifestation {
// End transaction
tx.commit();

download_cfg.clean(&things_downloaded)?;

Ok(UpdateStatus::Changed)
}

Expand Down Expand Up @@ -694,21 +679,21 @@ struct ComponentBinary<'a> {
component: &'a Component,
binary: &'a HashedBinary,
status: DownloadStatus,
manifest: &'a Manifest,
download_cfg: &'a DownloadCfg<'a>,
}

impl<'a> ComponentBinary<'a> {
async fn download(
&self,
url: &Url,
download_cfg: &DownloadCfg<'_>,
max_retries: usize,
new_manifest: &Manifest,
) -> Result<File> {
async fn download(&self, max_retries: usize) -> Result<(&Self, File)> {
use tokio_retry::{RetryIf, strategy::FixedInterval};

let url = self.download_cfg.url(&self.binary.url)?;
let downloaded_file = RetryIf::spawn(
FixedInterval::from_millis(0).take(max_retries),
|| download_cfg.download(url, &self.binary.hash, &self.status),
|| {
self.download_cfg
.download(&url, &self.binary.hash, &self.status)
},
|e: &anyhow::Error| {
// retry only known retriable cases
match e.downcast_ref::<RustupError>() {
Expand All @@ -722,18 +707,18 @@ impl<'a> ComponentBinary<'a> {
},
)
.await
.with_context(|| RustupError::ComponentDownloadFailed(self.component.name(new_manifest)))?;
.with_context(|| {
RustupError::ComponentDownloadFailed(self.component.name(self.manifest))
})?;

Ok(downloaded_file)
Ok((self, downloaded_file))
}

fn install<'t>(
&self,
installer_file: File,
tx: Transaction<'t>,
new_manifest: &Manifest,
manifestation: &Manifestation,
download_cfg: &DownloadCfg<'_>,
) -> Result<Transaction<'t>> {
// For historical reasons, the rust-installer component
// names are not the same as the dist manifest component
Expand All @@ -742,15 +727,15 @@ impl<'a> ComponentBinary<'a> {
let component = self.component;
let pkg_name = component.name_in_manifest();
let short_pkg_name = component.short_name_in_manifest();
let short_name = component.short_name(new_manifest);
let short_name = component.short_name(self.manifest);

self.status.installing();

let reader = utils::FileReaderWithProgress::new_file(&installer_file)?;
let package = match self.binary.compression {
CompressionKind::GZip => &TarGzPackage::new(reader, download_cfg)? as &dyn Package,
CompressionKind::XZ => &TarXzPackage::new(reader, download_cfg)?,
CompressionKind::ZStd => &TarZStdPackage::new(reader, download_cfg)?,
CompressionKind::GZip => &TarGzPackage::new(reader, self.download_cfg)? as &dyn Package,
CompressionKind::XZ => &TarXzPackage::new(reader, self.download_cfg)?,
CompressionKind::ZStd => &TarZStdPackage::new(reader, self.download_cfg)?,
};

// If the package doesn't contain the component that the
Expand Down
Loading