From 54af374d0a87498c8a55f184f9a60bd63ac0d77a Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Wed, 12 Nov 2025 14:56:10 +0000 Subject: [PATCH 1/2] Initial plan From c6d9b9a7240c42881e2315e65f1cf650f3e05231 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Wed, 12 Nov 2025 15:08:28 +0000 Subject: [PATCH 2/2] Add thread accounting for CDN download threads Co-authored-by: isaaclins <104733575+isaaclins@users.noreply.github.com> --- psst-core/src/player/file.rs | 40 ++++++++++++++++++++++++++++++++++-- 1 file changed, 38 insertions(+), 2 deletions(-) diff --git a/psst-core/src/player/file.rs b/psst-core/src/player/file.rs index bf0a1b97..a1049712 100644 --- a/psst-core/src/player/file.rs +++ b/psst-core/src/player/file.rs @@ -8,6 +8,8 @@ use std::{ time::Duration, }; +use parking_lot::Mutex; + use symphonia::core::codecs::CodecType; use crate::{ @@ -201,6 +203,7 @@ pub struct StreamedFile { url: CdnUrl, cdn: CdnHandle, cache: CacheHandle, + download_threads: Arc>>>, } impl StreamedFile { @@ -228,6 +231,7 @@ impl StreamedFile { url, cdn, cache, + download_threads: Arc::new(Mutex::new(Vec::new())), }) } @@ -243,14 +247,16 @@ impl StreamedFile { Ok(last_url.clone()) }; let mut download_range = |offset, length| -> Result<(), Error> { + // Clean up completed download threads to prevent unbounded growth + self.cleanup_finished_threads(); + let thread_name = format!( "cdn-{}-{}..{}", self.path.file_id.to_base16(), offset, offset + length ); - // TODO: We spawn threads here without any accounting. Seems wrong. - thread::Builder::new().name(thread_name).spawn({ + let handle = thread::Builder::new().name(thread_name).spawn({ let url = fresh_url()?.url; let cdn = self.cdn.clone(); let cache = self.cache.clone(); @@ -288,6 +294,9 @@ impl StreamedFile { } })?; + // Store the handle for tracking + self.download_threads.lock().push(handle); + Ok(()) }; @@ -305,6 +314,33 @@ impl StreamedFile { } Ok(()) } + + /// Clean up finished download threads to prevent unbounded accumulation. + fn cleanup_finished_threads(&self) { + self.download_threads + .lock() + .retain(|handle| !handle.is_finished()); + } +} + +impl Drop for StreamedFile { + fn drop(&mut self) { + // Wait for all download threads to complete when the file is dropped. + // This ensures graceful cleanup of resources. + let mut threads = self.download_threads.lock(); + let thread_count = threads.len(); + if thread_count > 0 { + log::debug!( + "waiting for {} download thread(s) to finish for file {}", + thread_count, + self.path.file_id.to_base16() + ); + } + while let Some(handle) = threads.pop() { + // Join each thread, ignoring any errors (thread may have already finished) + let _ = handle.join(); + } + } } pub struct CachedFile {