diff --git a/psst-core/src/player/file.rs b/psst-core/src/player/file.rs index bf0a1b97..a1049712 100644 --- a/psst-core/src/player/file.rs +++ b/psst-core/src/player/file.rs @@ -8,6 +8,8 @@ use std::{ time::Duration, }; +use parking_lot::Mutex; + use symphonia::core::codecs::CodecType; use crate::{ @@ -201,6 +203,7 @@ pub struct StreamedFile { url: CdnUrl, cdn: CdnHandle, cache: CacheHandle, + download_threads: Arc>>>, } impl StreamedFile { @@ -228,6 +231,7 @@ impl StreamedFile { url, cdn, cache, + download_threads: Arc::new(Mutex::new(Vec::new())), }) } @@ -243,14 +247,16 @@ impl StreamedFile { Ok(last_url.clone()) }; let mut download_range = |offset, length| -> Result<(), Error> { + // Clean up completed download threads to prevent unbounded growth + self.cleanup_finished_threads(); + let thread_name = format!( "cdn-{}-{}..{}", self.path.file_id.to_base16(), offset, offset + length ); - // TODO: We spawn threads here without any accounting. Seems wrong. - thread::Builder::new().name(thread_name).spawn({ + let handle = thread::Builder::new().name(thread_name).spawn({ let url = fresh_url()?.url; let cdn = self.cdn.clone(); let cache = self.cache.clone(); @@ -288,6 +294,9 @@ impl StreamedFile { } })?; + // Store the handle for tracking + self.download_threads.lock().push(handle); + Ok(()) }; @@ -305,6 +314,33 @@ impl StreamedFile { } Ok(()) } + + /// Clean up finished download threads to prevent unbounded accumulation. + fn cleanup_finished_threads(&self) { + self.download_threads + .lock() + .retain(|handle| !handle.is_finished()); + } +} + +impl Drop for StreamedFile { + fn drop(&mut self) { + // Wait for all download threads to complete when the file is dropped. + // This ensures graceful cleanup of resources. + let mut threads = self.download_threads.lock(); + let thread_count = threads.len(); + if thread_count > 0 { + log::debug!( + "waiting for {} download thread(s) to finish for file {}", + thread_count, + self.path.file_id.to_base16() + ); + } + while let Some(handle) = threads.pop() { + // Join each thread, ignoring any errors (thread may have already finished) + let _ = handle.join(); + } + } } pub struct CachedFile {