From 3997b46fd18bcdf3207aa3b0dbd6f5b3107c0e7d Mon Sep 17 00:00:00 2001 From: Titi Wangsa Damhore <111611+twdamhore@users.noreply.github.com> Date: Tue, 20 Jan 2026 10:28:29 -0800 Subject: [PATCH 1/3] Implement streaming for ISO file reads to reduce memory usage Previously, files read from ISOs were loaded entirely into memory, causing 500MB+ peak memory usage for large files like initrd. Changes: - Add stream_from_iso() method that reads in 32MB chunks via mpsc channel - Add stream_initrd_with_firmware() for streaming initrd + firmware concatenation - Update route handlers to use ReceiverStream for streaming responses - Use spawn_blocking for synchronous ISO reads (fixes async runtime blocking) - Remove old read_from_iso() and read_initrd_with_firmware() methods Benefits: - Peak memory reduced from file size to ~64MB (2 chunks in flight) - Better time-to-first-byte (streaming starts after first chunk) - Improved concurrency (doesn't block async runtime) - Content-Length header still provided for all responses Co-Authored-By: Claude Opus 4.5 --- Cargo.lock | 13 ++ Cargo.toml | 2 + src/routes/iso.rs | 20 +-- src/services/iso.rs | 297 ++++++++++++++++++++++++++++++++------------ 4 files changed, 248 insertions(+), 84 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index cc5a8af..4e93f35 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -698,6 +698,7 @@ version = "0.2.0" dependencies = [ "axum", "base64", + "bytes", "chrono", "fs2", "gpt_disk_io", @@ -708,6 +709,7 @@ dependencies = [ "tempfile", "thiserror", "tokio", + "tokio-stream", "tokio-util", "tracing", "tracing-appender", @@ -939,6 +941,17 @@ dependencies = [ "syn", ] +[[package]] +name = "tokio-stream" +version = "0.1.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "32da49809aab5c3bc678af03902d4ccddea2a87d028d86392a4b1560c6906c70" +dependencies = [ + "futures-core", + "pin-project-lite", + "tokio", +] + [[package]] name = "tokio-util" version = "0.7.18" diff --git a/Cargo.toml b/Cargo.toml index d39501e..ee7b880 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,6 +9,8 @@ license = "MIT" axum = "0.8" tokio = { version = "1", features = ["full", "signal"] } tokio-util = { version = "0.7", features = ["io"] } +tokio-stream = "0.1" +bytes = "1" tracing = "0.1" tracing-subscriber = { version = "0.3", features = ["env-filter"] } tracing-appender = "0.2" diff --git a/src/routes/iso.rs b/src/routes/iso.rs index 03a1854..2eebd0b 100644 --- a/src/routes/iso.rs +++ b/src/routes/iso.rs @@ -14,6 +14,7 @@ use axum::http::{HeaderMap, StatusCode, header}; use axum::response::Response; use serde::Deserialize; use tokio::fs::File; +use tokio_stream::wrappers::ReceiverStream; use tokio_util::io::ReaderStream; /// Query parameters for the ISO endpoint. @@ -180,10 +181,11 @@ async fn serve_template( .unwrap()) } -/// Serve a file from within the ISO. +/// Serve a file from within the ISO using streaming. fn serve_from_iso(iso_service: &IsoService, iso_name: &str, path: &str) -> AppResult { - let content = iso_service.read_from_iso(iso_name, path)?; - let content_length = content.len(); + let (content_length, receiver) = iso_service.stream_from_iso(iso_name, path)?; + let stream = ReceiverStream::new(receiver); + let body = Body::from_stream(stream); // Determine content type based on file extension let content_type = guess_content_type(path); @@ -192,11 +194,11 @@ fn serve_from_iso(iso_service: &IsoService, iso_name: &str, path: &str) -> AppRe .status(StatusCode::OK) .header(header::CONTENT_TYPE, content_type) .header(header::CONTENT_LENGTH, content_length) - .body(Body::from(content)) + .body(body) .unwrap()) } -/// Serve initrd with firmware concatenated. +/// Serve initrd with firmware concatenated using streaming. /// /// Used for Debian netboot where firmware.cpio.gz needs to be appended to initrd. fn serve_initrd_with_firmware( @@ -205,14 +207,16 @@ fn serve_initrd_with_firmware( initrd_path: &str, firmware: &str, ) -> AppResult { - let content = iso_service.read_initrd_with_firmware(iso_name, initrd_path, firmware)?; - let content_length = content.len(); + let (content_length, receiver) = + iso_service.stream_initrd_with_firmware(iso_name, initrd_path, firmware)?; + let stream = ReceiverStream::new(receiver); + let body = Body::from_stream(stream); Ok(Response::builder() .status(StatusCode::OK) .header(header::CONTENT_TYPE, "application/octet-stream") .header(header::CONTENT_LENGTH, content_length) - .body(Body::from(content)) + .body(body) .unwrap()) } diff --git a/src/services/iso.rs b/src/services/iso.rs index 4b1b109..68688ed 100644 --- a/src/services/iso.rs +++ b/src/services/iso.rs @@ -3,14 +3,18 @@ //! Handles iso.cfg parsing, ISO9660 reading, and template detection. use crate::error::{AppError, AppResult}; +use bytes::Bytes; use gpt_disk_io::BlockIo; use gpt_disk_types::{BlockSize, Lba}; -use iso9660::{find_file, mount, read_file_vec}; +use iso9660::{find_file, mount}; use std::fs::File; use std::io::{BufRead, BufReader, Read, Seek, SeekFrom}; use std::path::PathBuf; +use tokio::sync::mpsc; const ISO_BLOCK_SIZE: u64 = 2048; +/// Chunk size for streaming (32MB) +const CHUNK_SIZE: usize = 32 * 1024 * 1024; /// Wrapper to implement BlockIo for std::fs::File. struct FileBlockIo { @@ -353,10 +357,69 @@ impl IsoService { None } - /// Read a file from within an ISO. - pub fn read_from_iso(&self, iso_name: &str, file_path: &str) -> AppResult> { + /// Check if firmware concatenation is configured and path matches initrd_path. + /// + /// Returns Some((initrd_path, firmware)) if the requested path matches initrd_path + /// and firmware is configured. Returns None otherwise. + pub fn should_concat_firmware(&self, iso_name: &str, path: &str) -> AppResult> { + let config = self.load_config(iso_name)?; + + // Normalize path for comparison (handle leading slash variations) + let normalized_path = path.trim_start_matches('/'); + + if let (Some(initrd_path), Some(firmware)) = (config.initrd_path, config.firmware) { + let normalized_initrd = initrd_path.trim_start_matches('/'); + if normalized_path == normalized_initrd { + return Ok(Some((initrd_path, firmware))); + } + } + + Ok(None) + } + + /// Get the boot template path for an ISO. + /// + /// Checks automation profile first, then falls back to ISO-level template. + /// Order: iso/{iso}/automation/{profile}/boot.ipxe.j2 -> iso/{iso}/boot.ipxe.j2 + pub fn boot_template_path(&self, iso_name: &str, automation: Option<&str>) -> AppResult { + // Check automation profile specific template first + if let Some(profile) = automation { + let profile_path = self + .iso_dir(iso_name) + .join("automation") + .join(profile) + .join("boot.ipxe.j2"); + if profile_path.exists() { + tracing::info!( + "Using profile-specific boot template: {:?}", + profile_path + ); + return Ok(profile_path); + } + } + + // Fall back to ISO-level template + let iso_path = self.iso_dir(iso_name).join("boot.ipxe.j2"); + if iso_path.exists() { + tracing::info!("Using ISO-level boot template: {:?}", iso_path); + return Ok(iso_path); + } + + Err(AppError::TemplateNotFound { path: iso_path }) + } + + /// Stream a file from within an ISO. + /// + /// Returns the file size and a receiver that yields chunks. + /// Uses spawn_blocking for the synchronous ISO reads. + pub fn stream_from_iso( + &self, + iso_name: &str, + file_path: &str, + ) -> AppResult<(u64, mpsc::Receiver>)> { let iso_path = self.iso_file_path(iso_name)?; + // Open ISO and find file entry to get size let file = File::open(&iso_path).map_err(|e| AppError::FileRead { path: iso_path.clone(), source: e, @@ -379,108 +442,190 @@ impl IsoService { format!("/{}", file_path) }; - tracing::debug!("Looking for file in ISO: {}", normalized_path); - - let entry = find_file(&mut block_io, &volume, &normalized_path).map_err(|e| { - tracing::debug!("File not found: {}", e); + let entry = find_file(&mut block_io, &volume, &normalized_path).map_err(|_| { AppError::FileNotFoundInIso { iso: iso_name.to_string(), path: file_path.to_string(), } })?; - read_file_vec(&mut block_io, &entry).map_err(|e| AppError::IsoRead { - path: iso_path, - message: format!("Failed to read file from ISO: {}", e), - }) - } + let file_size = entry.size; + let extent_lba = entry.extent_lba; - /// Check if firmware concatenation is configured and path matches initrd_path. - /// - /// Returns Some((initrd_path, firmware)) if the requested path matches initrd_path - /// and firmware is configured. Returns None otherwise. - pub fn should_concat_firmware(&self, iso_name: &str, path: &str) -> AppResult> { - let config = self.load_config(iso_name)?; + // Create bounded channel for backpressure (2 chunks max in flight) + let (tx, rx) = mpsc::channel(2); - // Normalize path for comparison (handle leading slash variations) - let normalized_path = path.trim_start_matches('/'); + // Clone path for the spawned task + let iso_path_clone = iso_path.clone(); - if let (Some(initrd_path), Some(firmware)) = (config.initrd_path, config.firmware) { - let normalized_initrd = initrd_path.trim_start_matches('/'); - if normalized_path == normalized_initrd { - return Ok(Some((initrd_path, firmware))); + // Spawn blocking task to read chunks + tokio::task::spawn_blocking(move || { + let result = (|| -> Result<(), std::io::Error> { + let file = File::open(&iso_path_clone)?; + let mut block_io = FileBlockIo::new(file)?; + + let mut offset: u64 = 0; + let total_size = file_size; + + while offset < total_size { + let remaining = total_size - offset; + let chunk_size = std::cmp::min(remaining as usize, CHUNK_SIZE); + + // Calculate sector-aligned read + let start_lba = extent_lba as u64 + (offset / ISO_BLOCK_SIZE); + let sectors_needed = (chunk_size as u64).div_ceil(ISO_BLOCK_SIZE); + let read_size = (sectors_needed * ISO_BLOCK_SIZE) as usize; + + let mut buffer = vec![0u8; read_size]; + block_io.read_blocks(Lba(start_lba), &mut buffer)?; + + // Truncate to actual chunk size (handle last partial chunk) + buffer.truncate(chunk_size); + + let bytes = Bytes::from(buffer); + if tx.blocking_send(Ok(bytes)).is_err() { + // Receiver dropped, stop sending + break; + } + + offset += chunk_size as u64; + } + + Ok(()) + })(); + + if let Err(e) = result { + let _ = tx.blocking_send(Err(e)); } - } + }); - Ok(None) + Ok((file_size, rx)) } - /// Read initrd from ISO and concatenate firmware file. + /// Stream initrd from ISO with firmware file concatenated. /// - /// This is used for Debian netboot where firmware needs to be appended to initrd. - /// See: https://wiki.debian.org/DebianInstaller/NetbootFirmware - pub fn read_initrd_with_firmware( + /// Returns the combined size and a receiver that yields chunks. + /// First streams all initrd chunks, then firmware chunks. + pub fn stream_initrd_with_firmware( &self, iso_name: &str, initrd_path: &str, firmware_filename: &str, - ) -> AppResult> { - // Read initrd from ISO - let mut content = self.read_from_iso(iso_name, initrd_path)?; - - // Read firmware file from the iso config directory + ) -> AppResult<(u64, mpsc::Receiver>)> { + let iso_path = self.iso_file_path(iso_name)?; let firmware_path = self.iso_dir(iso_name).join(firmware_filename); - tracing::info!( - "Concatenating firmware {:?} to initrd {}", - firmware_path, - initrd_path - ); + // Get initrd file entry for size + let file = File::open(&iso_path).map_err(|e| AppError::FileRead { + path: iso_path.clone(), + source: e, + })?; - let mut firmware_file = File::open(&firmware_path).map_err(|e| AppError::FileRead { - path: firmware_path.clone(), + let mut block_io = FileBlockIo::new(file).map_err(|e| AppError::FileRead { + path: iso_path.clone(), source: e, })?; - firmware_file - .read_to_end(&mut content) - .map_err(|e| AppError::FileRead { - path: firmware_path, - source: e, - })?; + let volume = mount(&mut block_io, 0).map_err(|e| AppError::IsoRead { + path: iso_path.clone(), + message: format!("Failed to mount ISO: {}", e), + })?; - Ok(content) - } + // Normalize path - ensure leading slash + let normalized_path = if initrd_path.starts_with('/') { + initrd_path.to_string() + } else { + format!("/{}", initrd_path) + }; - /// Get the boot template path for an ISO. - /// - /// Checks automation profile first, then falls back to ISO-level template. - /// Order: iso/{iso}/automation/{profile}/boot.ipxe.j2 -> iso/{iso}/boot.ipxe.j2 - pub fn boot_template_path(&self, iso_name: &str, automation: Option<&str>) -> AppResult { - // Check automation profile specific template first - if let Some(profile) = automation { - let profile_path = self - .iso_dir(iso_name) - .join("automation") - .join(profile) - .join("boot.ipxe.j2"); - if profile_path.exists() { - tracing::info!( - "Using profile-specific boot template: {:?}", - profile_path - ); - return Ok(profile_path); + let entry = find_file(&mut block_io, &volume, &normalized_path).map_err(|_| { + AppError::FileNotFoundInIso { + iso: iso_name.to_string(), + path: initrd_path.to_string(), } - } + })?; - // Fall back to ISO-level template - let iso_path = self.iso_dir(iso_name).join("boot.ipxe.j2"); - if iso_path.exists() { - tracing::info!("Using ISO-level boot template: {:?}", iso_path); - return Ok(iso_path); - } + let initrd_size = entry.size; + let extent_lba = entry.extent_lba; - Err(AppError::TemplateNotFound { path: iso_path }) + // Get firmware size + let firmware_metadata = std::fs::metadata(&firmware_path).map_err(|e| AppError::FileRead { + path: firmware_path.clone(), + source: e, + })?; + let firmware_size = firmware_metadata.len(); + + let total_size = initrd_size + firmware_size; + + tracing::info!( + "Streaming initrd ({} bytes) + firmware ({} bytes) = {} bytes total", + initrd_size, + firmware_size, + total_size + ); + + // Create bounded channel for backpressure (2 chunks max in flight) + let (tx, rx) = mpsc::channel(2); + + // Clone paths for the spawned task + let iso_path_clone = iso_path.clone(); + let firmware_path_clone = firmware_path.clone(); + + // Spawn blocking task to read chunks + tokio::task::spawn_blocking(move || { + let result = (|| -> Result<(), std::io::Error> { + // Phase 1: Stream initrd from ISO + let file = File::open(&iso_path_clone)?; + let mut block_io = FileBlockIo::new(file)?; + + let mut offset: u64 = 0; + while offset < initrd_size { + let remaining = initrd_size - offset; + let chunk_size = std::cmp::min(remaining as usize, CHUNK_SIZE); + + let start_lba = extent_lba as u64 + (offset / ISO_BLOCK_SIZE); + let sectors_needed = (chunk_size as u64).div_ceil(ISO_BLOCK_SIZE); + let read_size = (sectors_needed * ISO_BLOCK_SIZE) as usize; + + let mut buffer = vec![0u8; read_size]; + block_io.read_blocks(Lba(start_lba), &mut buffer)?; + buffer.truncate(chunk_size); + + let bytes = Bytes::from(buffer); + if tx.blocking_send(Ok(bytes)).is_err() { + return Ok(()); + } + + offset += chunk_size as u64; + } + + // Phase 2: Stream firmware from disk + let mut firmware_file = File::open(&firmware_path_clone)?; + let mut offset: u64 = 0; + while offset < firmware_size { + let remaining = firmware_size - offset; + let chunk_size = std::cmp::min(remaining as usize, CHUNK_SIZE); + + let mut buffer = vec![0u8; chunk_size]; + firmware_file.read_exact(&mut buffer)?; + + let bytes = Bytes::from(buffer); + if tx.blocking_send(Ok(bytes)).is_err() { + return Ok(()); + } + + offset += chunk_size as u64; + } + + Ok(()) + })(); + + if let Err(e) = result { + let _ = tx.blocking_send(Err(e)); + } + }); + + Ok((total_size, rx)) } } From c082f2852c6fc3b9d206f38ae961d6a09639a79e Mon Sep 17 00:00:00 2001 From: Titi Wangsa Damhore <111611+twdamhore@users.noreply.github.com> Date: Tue, 20 Jan 2026 10:32:40 -0800 Subject: [PATCH 2/3] Add debug logging and comments for clarity - Restore debug logging for file-not-found errors in ISO reads - Add comments explaining why ISO is opened twice (FileBlockIo is not Send) Co-Authored-By: Claude Opus 4.5 --- src/services/iso.rs | 20 ++++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/src/services/iso.rs b/src/services/iso.rs index 68688ed..1926a93 100644 --- a/src/services/iso.rs +++ b/src/services/iso.rs @@ -442,7 +442,10 @@ impl IsoService { format!("/{}", file_path) }; - let entry = find_file(&mut block_io, &volume, &normalized_path).map_err(|_| { + tracing::debug!("Looking for file in ISO: {}", normalized_path); + + let entry = find_file(&mut block_io, &volume, &normalized_path).map_err(|e| { + tracing::debug!("File not found in ISO: {}", e); AppError::FileNotFoundInIso { iso: iso_name.to_string(), path: file_path.to_string(), @@ -455,10 +458,11 @@ impl IsoService { // Create bounded channel for backpressure (2 chunks max in flight) let (tx, rx) = mpsc::channel(2); - // Clone path for the spawned task let iso_path_clone = iso_path.clone(); - // Spawn blocking task to read chunks + // Spawn blocking task to read chunks. + // We re-open the ISO here because FileBlockIo contains a File handle + // which is not Send and cannot be moved into the spawned task. tokio::task::spawn_blocking(move || { let result = (|| -> Result<(), std::io::Error> { let file = File::open(&iso_path_clone)?; @@ -538,7 +542,10 @@ impl IsoService { format!("/{}", initrd_path) }; - let entry = find_file(&mut block_io, &volume, &normalized_path).map_err(|_| { + tracing::debug!("Looking for initrd in ISO: {}", normalized_path); + + let entry = find_file(&mut block_io, &volume, &normalized_path).map_err(|e| { + tracing::debug!("Initrd not found in ISO: {}", e); AppError::FileNotFoundInIso { iso: iso_name.to_string(), path: initrd_path.to_string(), @@ -567,11 +574,12 @@ impl IsoService { // Create bounded channel for backpressure (2 chunks max in flight) let (tx, rx) = mpsc::channel(2); - // Clone paths for the spawned task let iso_path_clone = iso_path.clone(); let firmware_path_clone = firmware_path.clone(); - // Spawn blocking task to read chunks + // Spawn blocking task to read chunks. + // We re-open the ISO here because FileBlockIo contains a File handle + // which is not Send and cannot be moved into the spawned task. tokio::task::spawn_blocking(move || { let result = (|| -> Result<(), std::io::Error> { // Phase 1: Stream initrd from ISO From 053804b7b959a35cfbc1a7842c51dc7869f804a8 Mon Sep 17 00:00:00 2001 From: Titi Wangsa Damhore <111611+twdamhore@users.noreply.github.com> Date: Tue, 20 Jan 2026 10:35:01 -0800 Subject: [PATCH 3/3] Remove duplicate logging for MAC completion The "Marked MAC ... as completed" message was logged in both the route handler and the service layer. Removed the route handler copy since the service layer is the appropriate location for this log. Co-Authored-By: Claude Opus 4.5 --- src/routes/action.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/routes/action.rs b/src/routes/action.rs index c0e2f7d..00ba94d 100644 --- a/src/routes/action.rs +++ b/src/routes/action.rs @@ -27,7 +27,6 @@ pub async fn handle_remove( let removed = action_service.mark_completed(&mac)?; if removed { - tracing::info!("Marked MAC {} as completed", mac); Ok((StatusCode::OK, "OK").into_response()) } else { tracing::warn!("MAC {} not found in action.cfg", mac);