Skip to content

Commit df6ad26

Browse files
committed
dist: call DownloadTracker methods directly
1 parent b42b6b2 commit df6ad26

File tree

8 files changed

+43
-147
lines changed

8 files changed

+43
-147
lines changed

src/diskio/mod.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,6 @@ use std::{fmt::Debug, fs::OpenOptions};
6666

6767
use anyhow::Result;
6868

69-
use crate::dist::download::DownloadTracker;
7069
use crate::process::Process;
7170

7271
/// Carries the implementation specific data for complete file transfers into the executor.
@@ -443,13 +442,12 @@ pub(crate) fn create_dir<P: AsRef<Path>>(path: P) -> io::Result<()> {
443442

444443
/// Get the executor for disk IO.
445444
pub(crate) fn get_executor<'a>(
446-
tracker: Option<&'a DownloadTracker>,
447445
ram_budget: usize,
448446
process: &Process,
449447
) -> anyhow::Result<Box<dyn Executor + 'a>> {
450448
// If this gets lots of use, consider exposing via the config file.
451449
Ok(match process.io_thread_count()? {
452450
0 | 1 => Box::new(immediate::ImmediateUnpacker::new()),
453-
n => Box::new(threaded::Threaded::new(tracker, n, ram_budget)),
451+
n => Box::new(threaded::Threaded::new(n, ram_budget)),
454452
})
455453
}

src/diskio/test.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ fn test_incremental_file(io_threads: &str) -> Result<()> {
2424

2525
let mut written = 0;
2626
let mut file_finished = false;
27-
let mut io_executor: Box<dyn Executor> = get_executor(None, 32 * 1024 * 1024, &tp.process)?;
27+
let mut io_executor: Box<dyn Executor> = get_executor(32 * 1024 * 1024, &tp.process)?;
2828
let (item, mut sender) = Item::write_file_segmented(
2929
work_dir.path().join("scratch"),
3030
0o666,
@@ -90,7 +90,7 @@ fn test_complete_file(io_threads: &str) -> Result<()> {
9090
vars.insert("RUSTUP_IO_THREADS".to_string(), io_threads.to_string());
9191
let tp = TestProcess::with_vars(vars);
9292

93-
let mut io_executor: Box<dyn Executor> = get_executor(None, 32 * 1024 * 1024, &tp.process)?;
93+
let mut io_executor: Box<dyn Executor> = get_executor(32 * 1024 * 1024, &tp.process)?;
9494
let mut chunk = io_executor.get_buffer(10);
9595
chunk.extend(b"0123456789");
9696
assert_eq!(chunk.len(), 10);

src/diskio/threaded.rs

Lines changed: 15 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@ use sharded_slab::pool::{OwnedRef, OwnedRefMut};
1515
use tracing::debug;
1616

1717
use super::{CompletedIo, Executor, Item, perform};
18-
use crate::dist::download::{DownloadTracker, Notification};
1918

2019
#[derive(Copy, Clone, Debug, Enum)]
2120
pub(crate) enum Bucket {
@@ -96,23 +95,18 @@ impl fmt::Debug for Pool {
9695
}
9796
}
9897

99-
pub(crate) struct Threaded<'a> {
98+
pub(crate) struct Threaded {
10099
n_files: Arc<AtomicUsize>,
101100
pool: threadpool::ThreadPool,
102-
tracker: Option<&'a DownloadTracker>,
103101
rx: Receiver<Task>,
104102
tx: Sender<Task>,
105103
vec_pools: EnumMap<Bucket, Pool>,
106104
ram_budget: usize,
107105
}
108106

109-
impl<'a> Threaded<'a> {
107+
impl Threaded {
110108
/// Construct a new Threaded executor.
111-
pub(crate) fn new(
112-
tracker: Option<&'a DownloadTracker>,
113-
thread_count: usize,
114-
ram_budget: usize,
115-
) -> Self {
109+
pub(crate) fn new(thread_count: usize, ram_budget: usize) -> Self {
116110
// Defaults to hardware thread count threads; this is suitable for
117111
// our needs as IO bound operations tend to show up as write latencies
118112
// rather than close latencies, so we don't need to look at
@@ -168,7 +162,6 @@ impl<'a> Threaded<'a> {
168162
Self {
169163
n_files: Arc::new(AtomicUsize::new(0)),
170164
pool,
171-
tracker,
172165
rx,
173166
tx,
174167
vec_pools,
@@ -233,7 +226,7 @@ impl<'a> Threaded<'a> {
233226
}
234227
}
235228

236-
impl Executor for Threaded<'_> {
229+
impl Executor for Threaded {
237230
fn dispatch(&self, item: Item) -> Box<dyn Iterator<Item = CompletedIo> + '_> {
238231
// Yield any completed work before accepting new work - keep memory
239232
// pressure under control
@@ -260,39 +253,22 @@ impl Executor for Threaded<'_> {
260253
// items, and the download tracker's progress is confounded with
261254
// actual handling of data today, we synthesis a data buffer and
262255
// pretend to have bytes to deliver.
263-
let mut prev_files = self.n_files.load(Ordering::Relaxed);
264-
if let Some(tracker) = self.tracker {
265-
tracker.handle(Notification::DownloadFinished(None));
266-
tracker.handle(Notification::DownloadContentLengthReceived(
267-
prev_files as u64,
268-
None,
269-
));
270-
}
256+
let prev_files = self.n_files.load(Ordering::Relaxed);
271257
if prev_files > 50 {
272258
debug!("{prev_files} deferred IO operations");
273259
}
274-
let buf: Vec<u8> = vec![0; prev_files];
260+
275261
// Cheap wrap-around correctness check - we have 20k files, more than
276262
// 32K means we subtracted from 0 somewhere.
277263
assert!(32767 > prev_files);
278264
let mut current_files = prev_files;
279265
while current_files != 0 {
280266
use std::thread::sleep;
281267
sleep(std::time::Duration::from_millis(100));
282-
prev_files = current_files;
283268
current_files = self.n_files.load(Ordering::Relaxed);
284-
let step_count = prev_files - current_files;
285-
if let Some(tracker) = self.tracker {
286-
tracker.handle(Notification::DownloadDataReceived(
287-
&buf[0..step_count],
288-
None,
289-
));
290-
}
291269
}
292270
self.pool.join();
293-
if let Some(tracker) = self.tracker {
294-
tracker.handle(Notification::DownloadFinished(None));
295-
}
271+
296272
// close the feedback channel so that blocking reads on it can
297273
// complete. send is atomic, and we know the threads completed from the
298274
// pool join, so this is race-free. It is possible that try_iter is safe
@@ -352,19 +328,19 @@ impl Executor for Threaded<'_> {
352328
}
353329
}
354330

355-
impl Drop for Threaded<'_> {
331+
impl Drop for Threaded {
356332
fn drop(&mut self) {
357333
// We are not permitted to fail - consume but do not handle the items.
358334
self.join().for_each(drop);
359335
}
360336
}
361337

362-
struct JoinIterator<'a, 'b> {
363-
executor: &'a Threaded<'b>,
338+
struct JoinIterator<'a> {
339+
executor: &'a Threaded,
364340
consume_sentinel: bool,
365341
}
366342

367-
impl JoinIterator<'_, '_> {
343+
impl JoinIterator<'_> {
368344
fn inner<T: Iterator<Item = Task>>(&self, mut iter: T) -> Option<CompletedIo> {
369345
loop {
370346
let task_o = iter.next();
@@ -388,7 +364,7 @@ impl JoinIterator<'_, '_> {
388364
}
389365
}
390366

391-
impl Iterator for JoinIterator<'_, '_> {
367+
impl Iterator for JoinIterator<'_> {
392368
type Item = CompletedIo;
393369

394370
fn next(&mut self) -> Option<CompletedIo> {
@@ -400,12 +376,12 @@ impl Iterator for JoinIterator<'_, '_> {
400376
}
401377
}
402378

403-
struct SubmitIterator<'a, 'b> {
404-
executor: &'a Threaded<'b>,
379+
struct SubmitIterator<'a> {
380+
executor: &'a Threaded,
405381
item: Cell<Option<Item>>,
406382
}
407383

408-
impl Iterator for SubmitIterator<'_, '_> {
384+
impl Iterator for SubmitIterator<'_> {
409385
type Item = CompletedIo;
410386

411387
fn next(&mut self) -> Option<CompletedIo> {

src/dist/component/package.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -299,8 +299,7 @@ fn unpack_without_first_dir<R: Read>(
299299
}
300300
};
301301
let unpack_ram = unpack_ram(IO_CHUNK_SIZE, effective_max_ram, dl_cfg);
302-
let mut io_executor: Box<dyn Executor> =
303-
get_executor(Some(&dl_cfg.tracker), unpack_ram, dl_cfg.process)?;
302+
let mut io_executor: Box<dyn Executor> = get_executor(unpack_ram, dl_cfg.process)?;
304303

305304
let mut directories: HashMap<PathBuf, DirStatus> = HashMap::new();
306305
// Path is presumed to exist. Call it a precondition.

src/dist/download.rs

Lines changed: 3 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ impl<'a> DownloadCfg<'a> {
4747
let target_file = self.download_dir.join(Path::new(hash));
4848

4949
if target_file.exists() {
50-
let cached_result = file_hash(&target_file, &self.tracker)?;
50+
let cached_result = file_hash(&target_file)?;
5151
if hash == cached_result {
5252
debug!("reusing previously downloaded file");
5353
debug!(url = url.as_ref(), "checksum passed");
@@ -216,36 +216,6 @@ impl DownloadTracker {
216216
}
217217
}
218218

219-
pub(crate) fn handle(&self, n: Notification<'_>) {
220-
match n {
221-
Notification::DownloadContentLengthReceived(content_len, url) => {
222-
if let Some(url) = url {
223-
self.content_length_received(content_len, url);
224-
}
225-
}
226-
Notification::DownloadDataReceived(data, url) => {
227-
if let Some(url) = url {
228-
self.data_received(data.len(), url);
229-
}
230-
}
231-
Notification::DownloadFinished(url) => {
232-
if let Some(url) = url {
233-
self.download_finished(url);
234-
}
235-
}
236-
Notification::DownloadFailed(url) => {
237-
self.download_failed(url);
238-
debug!("download failed");
239-
}
240-
Notification::DownloadingComponent(component, url) => {
241-
self.create_progress_bar(component.to_owned(), url.to_owned());
242-
}
243-
Notification::RetryingDownload(url) => {
244-
self.retrying_download(url);
245-
}
246-
}
247-
}
248-
249219
/// Creates a new ProgressBar for the given component.
250220
pub(crate) fn create_progress_bar(&self, component: String, url: String) {
251221
let pb = ProgressBar::hidden();
@@ -329,25 +299,9 @@ impl DownloadTracker {
329299
}
330300
}
331301

332-
#[derive(Debug)]
333-
pub(crate) enum Notification<'a> {
334-
/// The URL of the download is passed as the last argument, to allow us to track concurrent downloads.
335-
DownloadingComponent(&'a str, &'a str),
336-
RetryingDownload(&'a str),
337-
/// Received the Content-Length of the to-be downloaded data with
338-
/// the respective URL of the download (for tracking concurrent downloads).
339-
DownloadContentLengthReceived(u64, Option<&'a str>),
340-
/// Received some data.
341-
DownloadDataReceived(&'a [u8], Option<&'a str>),
342-
/// Download has finished.
343-
DownloadFinished(Option<&'a str>),
344-
/// Download has failed.
345-
DownloadFailed(&'a str),
346-
}
347-
348-
fn file_hash(path: &Path, tracker: &DownloadTracker) -> Result<String> {
302+
fn file_hash(path: &Path) -> Result<String> {
349303
let mut hasher = Sha256::new();
350-
let mut downloaded = utils::FileReaderWithProgress::new_file(path, tracker)?;
304+
let mut downloaded = utils::FileReaderWithProgress::new_file(path)?;
351305
use std::io::Read;
352306
let mut buf = vec![0; 32768];
353307
while let Ok(n) = downloaded.read(&mut buf) {

src/dist/manifestation.rs

Lines changed: 9 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ use crate::dist::component::{
1717
Components, Package, TarGzPackage, TarXzPackage, TarZStdPackage, Transaction,
1818
};
1919
use crate::dist::config::Config;
20-
use crate::dist::download::{DownloadCfg, File, Notification};
20+
use crate::dist::download::{DownloadCfg, File};
2121
use crate::dist::manifest::{Component, CompressionKind, HashedBinary, Manifest, TargetedPackage};
2222
use crate::dist::prefix::InstallPrefix;
2323
#[cfg(test)]
@@ -177,12 +177,10 @@ impl Manifestation {
177177

178178
info!("downloading component(s)");
179179
for bin in &components {
180-
download_cfg
181-
.tracker
182-
.handle(Notification::DownloadingComponent(
183-
&bin.component.short_name(new_manifest),
184-
&bin.binary.url,
185-
));
180+
download_cfg.tracker.create_progress_bar(
181+
bin.component.short_name(new_manifest),
182+
bin.binary.url.clone(),
183+
);
186184
}
187185

188186
let semaphore = Arc::new(Semaphore::new(concurrent_downloads));
@@ -277,8 +275,7 @@ impl Manifestation {
277275
}
278276
}
279277

280-
let reader =
281-
utils::FileReaderWithProgress::new_file(&installer_file, &download_cfg.tracker)?;
278+
let reader = utils::FileReaderWithProgress::new_file(&installer_file)?;
282279
let package = match format {
283280
CompressionKind::GZip => &TarGzPackage::new(reader, download_cfg)? as &dyn Package,
284281
CompressionKind::XZ => &TarXzPackage::new(reader, download_cfg)?,
@@ -445,7 +442,7 @@ impl Manifestation {
445442

446443
dl_cfg
447444
.tracker
448-
.handle(Notification::DownloadingComponent("rust", &url));
445+
.create_progress_bar("rust".to_owned(), url.clone());
449446

450447
let dl = dl_cfg
451448
.download_and_check(&url, update_hash, ".tar.gz")
@@ -468,7 +465,7 @@ impl Manifestation {
468465
}
469466

470467
// Install all the components in the installer
471-
let reader = utils::FileReaderWithProgress::new_file(&installer_file, &dl_cfg.tracker)?;
468+
let reader = utils::FileReaderWithProgress::new_file(&installer_file)?;
472469
let package: &dyn Package = &TarGzPackage::new(reader, dl_cfg)?;
473470
for component in package.components() {
474471
tx = package.install(&self.installation, &component, None, tx)?;
@@ -749,9 +746,7 @@ impl<'a> ComponentBinary<'a> {
749746
match e.downcast_ref::<RustupError>() {
750747
Some(RustupError::BrokenPartialFile)
751748
| Some(RustupError::DownloadingFile { .. }) => {
752-
download_cfg
753-
.tracker
754-
.handle(Notification::RetryingDownload(url.as_str()));
749+
download_cfg.tracker.retrying_download(url.as_str());
755750
true
756751
}
757752
_ => false,

src/download/mod.rs

Lines changed: 7 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,7 @@ use tracing::info;
2626
use tracing::warn;
2727
use url::Url;
2828

29-
use crate::{
30-
dist::download::{DownloadTracker, Notification},
31-
errors::RustupError,
32-
process::Process,
33-
};
29+
use crate::{dist::download::DownloadTracker, errors::RustupError, process::Process};
3430

3531
#[cfg(test)]
3632
mod tests;
@@ -111,14 +107,9 @@ async fn download_file_(
111107

112108
match msg {
113109
Event::DownloadContentLengthReceived(len) => {
114-
tracker.handle(Notification::DownloadContentLengthReceived(
115-
len,
116-
Some(url.as_str()),
117-
));
118-
}
119-
Event::DownloadDataReceived(data) => {
120-
tracker.handle(Notification::DownloadDataReceived(data, Some(url.as_str())));
110+
tracker.content_length_received(len, url.as_str())
121111
}
112+
Event::DownloadDataReceived(data) => tracker.data_received(data.len(), url.as_str()),
122113
Event::ResumingPartialDownload => debug!("resuming partial download"),
123114
}
124115

@@ -219,10 +210,10 @@ async fn download_file_(
219210
.await;
220211

221212
// The notification should only be sent if the download was successful (i.e. didn't timeout)
222-
tracker.handle(match &res {
223-
Ok(_) => Notification::DownloadFinished(Some(url.as_str())),
224-
Err(_) => Notification::DownloadFailed(url.as_str()),
225-
});
213+
match &res {
214+
Ok(_) => tracker.download_finished(url.as_str()),
215+
Err(_) => tracker.download_failed(url.as_str()),
216+
};
226217

227218
res
228219
}

0 commit comments

Comments
 (0)