diff --git a/Cargo.lock b/Cargo.lock index cc5a8af..4e93f35 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -698,6 +698,7 @@ version = "0.2.0" dependencies = [ "axum", "base64", + "bytes", "chrono", "fs2", "gpt_disk_io", @@ -708,6 +709,7 @@ dependencies = [ "tempfile", "thiserror", "tokio", + "tokio-stream", "tokio-util", "tracing", "tracing-appender", @@ -939,6 +941,17 @@ dependencies = [ "syn", ] +[[package]] +name = "tokio-stream" +version = "0.1.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "32da49809aab5c3bc678af03902d4ccddea2a87d028d86392a4b1560c6906c70" +dependencies = [ + "futures-core", + "pin-project-lite", + "tokio", +] + [[package]] name = "tokio-util" version = "0.7.18" diff --git a/Cargo.toml b/Cargo.toml index d39501e..ee7b880 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,6 +9,8 @@ license = "MIT" axum = "0.8" tokio = { version = "1", features = ["full", "signal"] } tokio-util = { version = "0.7", features = ["io"] } +tokio-stream = "0.1" +bytes = "1" tracing = "0.1" tracing-subscriber = { version = "0.3", features = ["env-filter"] } tracing-appender = "0.2" diff --git a/src/routes/action.rs b/src/routes/action.rs index c0e2f7d..00ba94d 100644 --- a/src/routes/action.rs +++ b/src/routes/action.rs @@ -27,7 +27,6 @@ pub async fn handle_remove( let removed = action_service.mark_completed(&mac)?; if removed { - tracing::info!("Marked MAC {} as completed", mac); Ok((StatusCode::OK, "OK").into_response()) } else { tracing::warn!("MAC {} not found in action.cfg", mac); diff --git a/src/routes/iso.rs b/src/routes/iso.rs index 03a1854..2eebd0b 100644 --- a/src/routes/iso.rs +++ b/src/routes/iso.rs @@ -14,6 +14,7 @@ use axum::http::{HeaderMap, StatusCode, header}; use axum::response::Response; use serde::Deserialize; use tokio::fs::File; +use tokio_stream::wrappers::ReceiverStream; use tokio_util::io::ReaderStream; /// Query parameters for the ISO endpoint. @@ -180,10 +181,11 @@ async fn serve_template( .unwrap()) } -/// Serve a file from within the ISO. +/// Serve a file from within the ISO using streaming. fn serve_from_iso(iso_service: &IsoService, iso_name: &str, path: &str) -> AppResult { - let content = iso_service.read_from_iso(iso_name, path)?; - let content_length = content.len(); + let (content_length, receiver) = iso_service.stream_from_iso(iso_name, path)?; + let stream = ReceiverStream::new(receiver); + let body = Body::from_stream(stream); // Determine content type based on file extension let content_type = guess_content_type(path); @@ -192,11 +194,11 @@ fn serve_from_iso(iso_service: &IsoService, iso_name: &str, path: &str) -> AppRe .status(StatusCode::OK) .header(header::CONTENT_TYPE, content_type) .header(header::CONTENT_LENGTH, content_length) - .body(Body::from(content)) + .body(body) .unwrap()) } -/// Serve initrd with firmware concatenated. +/// Serve initrd with firmware concatenated using streaming. /// /// Used for Debian netboot where firmware.cpio.gz needs to be appended to initrd. fn serve_initrd_with_firmware( @@ -205,14 +207,16 @@ fn serve_initrd_with_firmware( initrd_path: &str, firmware: &str, ) -> AppResult { - let content = iso_service.read_initrd_with_firmware(iso_name, initrd_path, firmware)?; - let content_length = content.len(); + let (content_length, receiver) = + iso_service.stream_initrd_with_firmware(iso_name, initrd_path, firmware)?; + let stream = ReceiverStream::new(receiver); + let body = Body::from_stream(stream); Ok(Response::builder() .status(StatusCode::OK) .header(header::CONTENT_TYPE, "application/octet-stream") .header(header::CONTENT_LENGTH, content_length) - .body(Body::from(content)) + .body(body) .unwrap()) } diff --git a/src/services/iso.rs b/src/services/iso.rs index 4b1b109..1926a93 100644 --- a/src/services/iso.rs +++ b/src/services/iso.rs @@ -3,14 +3,18 @@ //! Handles iso.cfg parsing, ISO9660 reading, and template detection. use crate::error::{AppError, AppResult}; +use bytes::Bytes; use gpt_disk_io::BlockIo; use gpt_disk_types::{BlockSize, Lba}; -use iso9660::{find_file, mount, read_file_vec}; +use iso9660::{find_file, mount}; use std::fs::File; use std::io::{BufRead, BufReader, Read, Seek, SeekFrom}; use std::path::PathBuf; +use tokio::sync::mpsc; const ISO_BLOCK_SIZE: u64 = 2048; +/// Chunk size for streaming (32MB) +const CHUNK_SIZE: usize = 32 * 1024 * 1024; /// Wrapper to implement BlockIo for std::fs::File. struct FileBlockIo { @@ -353,10 +357,69 @@ impl IsoService { None } - /// Read a file from within an ISO. - pub fn read_from_iso(&self, iso_name: &str, file_path: &str) -> AppResult> { + /// Check if firmware concatenation is configured and path matches initrd_path. + /// + /// Returns Some((initrd_path, firmware)) if the requested path matches initrd_path + /// and firmware is configured. Returns None otherwise. + pub fn should_concat_firmware(&self, iso_name: &str, path: &str) -> AppResult> { + let config = self.load_config(iso_name)?; + + // Normalize path for comparison (handle leading slash variations) + let normalized_path = path.trim_start_matches('/'); + + if let (Some(initrd_path), Some(firmware)) = (config.initrd_path, config.firmware) { + let normalized_initrd = initrd_path.trim_start_matches('/'); + if normalized_path == normalized_initrd { + return Ok(Some((initrd_path, firmware))); + } + } + + Ok(None) + } + + /// Get the boot template path for an ISO. + /// + /// Checks automation profile first, then falls back to ISO-level template. + /// Order: iso/{iso}/automation/{profile}/boot.ipxe.j2 -> iso/{iso}/boot.ipxe.j2 + pub fn boot_template_path(&self, iso_name: &str, automation: Option<&str>) -> AppResult { + // Check automation profile specific template first + if let Some(profile) = automation { + let profile_path = self + .iso_dir(iso_name) + .join("automation") + .join(profile) + .join("boot.ipxe.j2"); + if profile_path.exists() { + tracing::info!( + "Using profile-specific boot template: {:?}", + profile_path + ); + return Ok(profile_path); + } + } + + // Fall back to ISO-level template + let iso_path = self.iso_dir(iso_name).join("boot.ipxe.j2"); + if iso_path.exists() { + tracing::info!("Using ISO-level boot template: {:?}", iso_path); + return Ok(iso_path); + } + + Err(AppError::TemplateNotFound { path: iso_path }) + } + + /// Stream a file from within an ISO. + /// + /// Returns the file size and a receiver that yields chunks. + /// Uses spawn_blocking for the synchronous ISO reads. + pub fn stream_from_iso( + &self, + iso_name: &str, + file_path: &str, + ) -> AppResult<(u64, mpsc::Receiver>)> { let iso_path = self.iso_file_path(iso_name)?; + // Open ISO and find file entry to get size let file = File::open(&iso_path).map_err(|e| AppError::FileRead { path: iso_path.clone(), source: e, @@ -382,105 +445,195 @@ impl IsoService { tracing::debug!("Looking for file in ISO: {}", normalized_path); let entry = find_file(&mut block_io, &volume, &normalized_path).map_err(|e| { - tracing::debug!("File not found: {}", e); + tracing::debug!("File not found in ISO: {}", e); AppError::FileNotFoundInIso { iso: iso_name.to_string(), path: file_path.to_string(), } })?; - read_file_vec(&mut block_io, &entry).map_err(|e| AppError::IsoRead { - path: iso_path, - message: format!("Failed to read file from ISO: {}", e), - }) - } + let file_size = entry.size; + let extent_lba = entry.extent_lba; - /// Check if firmware concatenation is configured and path matches initrd_path. - /// - /// Returns Some((initrd_path, firmware)) if the requested path matches initrd_path - /// and firmware is configured. Returns None otherwise. - pub fn should_concat_firmware(&self, iso_name: &str, path: &str) -> AppResult> { - let config = self.load_config(iso_name)?; + // Create bounded channel for backpressure (2 chunks max in flight) + let (tx, rx) = mpsc::channel(2); - // Normalize path for comparison (handle leading slash variations) - let normalized_path = path.trim_start_matches('/'); + let iso_path_clone = iso_path.clone(); - if let (Some(initrd_path), Some(firmware)) = (config.initrd_path, config.firmware) { - let normalized_initrd = initrd_path.trim_start_matches('/'); - if normalized_path == normalized_initrd { - return Ok(Some((initrd_path, firmware))); + // Spawn blocking task to read chunks. + // We re-open the ISO here because FileBlockIo contains a File handle + // which is not Send and cannot be moved into the spawned task. + tokio::task::spawn_blocking(move || { + let result = (|| -> Result<(), std::io::Error> { + let file = File::open(&iso_path_clone)?; + let mut block_io = FileBlockIo::new(file)?; + + let mut offset: u64 = 0; + let total_size = file_size; + + while offset < total_size { + let remaining = total_size - offset; + let chunk_size = std::cmp::min(remaining as usize, CHUNK_SIZE); + + // Calculate sector-aligned read + let start_lba = extent_lba as u64 + (offset / ISO_BLOCK_SIZE); + let sectors_needed = (chunk_size as u64).div_ceil(ISO_BLOCK_SIZE); + let read_size = (sectors_needed * ISO_BLOCK_SIZE) as usize; + + let mut buffer = vec![0u8; read_size]; + block_io.read_blocks(Lba(start_lba), &mut buffer)?; + + // Truncate to actual chunk size (handle last partial chunk) + buffer.truncate(chunk_size); + + let bytes = Bytes::from(buffer); + if tx.blocking_send(Ok(bytes)).is_err() { + // Receiver dropped, stop sending + break; + } + + offset += chunk_size as u64; + } + + Ok(()) + })(); + + if let Err(e) = result { + let _ = tx.blocking_send(Err(e)); } - } + }); - Ok(None) + Ok((file_size, rx)) } - /// Read initrd from ISO and concatenate firmware file. + /// Stream initrd from ISO with firmware file concatenated. /// - /// This is used for Debian netboot where firmware needs to be appended to initrd. - /// See: https://wiki.debian.org/DebianInstaller/NetbootFirmware - pub fn read_initrd_with_firmware( + /// Returns the combined size and a receiver that yields chunks. + /// First streams all initrd chunks, then firmware chunks. + pub fn stream_initrd_with_firmware( &self, iso_name: &str, initrd_path: &str, firmware_filename: &str, - ) -> AppResult> { - // Read initrd from ISO - let mut content = self.read_from_iso(iso_name, initrd_path)?; - - // Read firmware file from the iso config directory + ) -> AppResult<(u64, mpsc::Receiver>)> { + let iso_path = self.iso_file_path(iso_name)?; let firmware_path = self.iso_dir(iso_name).join(firmware_filename); - tracing::info!( - "Concatenating firmware {:?} to initrd {}", - firmware_path, - initrd_path - ); + // Get initrd file entry for size + let file = File::open(&iso_path).map_err(|e| AppError::FileRead { + path: iso_path.clone(), + source: e, + })?; - let mut firmware_file = File::open(&firmware_path).map_err(|e| AppError::FileRead { - path: firmware_path.clone(), + let mut block_io = FileBlockIo::new(file).map_err(|e| AppError::FileRead { + path: iso_path.clone(), source: e, })?; - firmware_file - .read_to_end(&mut content) - .map_err(|e| AppError::FileRead { - path: firmware_path, - source: e, - })?; + let volume = mount(&mut block_io, 0).map_err(|e| AppError::IsoRead { + path: iso_path.clone(), + message: format!("Failed to mount ISO: {}", e), + })?; - Ok(content) - } + // Normalize path - ensure leading slash + let normalized_path = if initrd_path.starts_with('/') { + initrd_path.to_string() + } else { + format!("/{}", initrd_path) + }; - /// Get the boot template path for an ISO. - /// - /// Checks automation profile first, then falls back to ISO-level template. - /// Order: iso/{iso}/automation/{profile}/boot.ipxe.j2 -> iso/{iso}/boot.ipxe.j2 - pub fn boot_template_path(&self, iso_name: &str, automation: Option<&str>) -> AppResult { - // Check automation profile specific template first - if let Some(profile) = automation { - let profile_path = self - .iso_dir(iso_name) - .join("automation") - .join(profile) - .join("boot.ipxe.j2"); - if profile_path.exists() { - tracing::info!( - "Using profile-specific boot template: {:?}", - profile_path - ); - return Ok(profile_path); + tracing::debug!("Looking for initrd in ISO: {}", normalized_path); + + let entry = find_file(&mut block_io, &volume, &normalized_path).map_err(|e| { + tracing::debug!("Initrd not found in ISO: {}", e); + AppError::FileNotFoundInIso { + iso: iso_name.to_string(), + path: initrd_path.to_string(), } - } + })?; - // Fall back to ISO-level template - let iso_path = self.iso_dir(iso_name).join("boot.ipxe.j2"); - if iso_path.exists() { - tracing::info!("Using ISO-level boot template: {:?}", iso_path); - return Ok(iso_path); - } + let initrd_size = entry.size; + let extent_lba = entry.extent_lba; - Err(AppError::TemplateNotFound { path: iso_path }) + // Get firmware size + let firmware_metadata = std::fs::metadata(&firmware_path).map_err(|e| AppError::FileRead { + path: firmware_path.clone(), + source: e, + })?; + let firmware_size = firmware_metadata.len(); + + let total_size = initrd_size + firmware_size; + + tracing::info!( + "Streaming initrd ({} bytes) + firmware ({} bytes) = {} bytes total", + initrd_size, + firmware_size, + total_size + ); + + // Create bounded channel for backpressure (2 chunks max in flight) + let (tx, rx) = mpsc::channel(2); + + let iso_path_clone = iso_path.clone(); + let firmware_path_clone = firmware_path.clone(); + + // Spawn blocking task to read chunks. + // We re-open the ISO here because FileBlockIo contains a File handle + // which is not Send and cannot be moved into the spawned task. + tokio::task::spawn_blocking(move || { + let result = (|| -> Result<(), std::io::Error> { + // Phase 1: Stream initrd from ISO + let file = File::open(&iso_path_clone)?; + let mut block_io = FileBlockIo::new(file)?; + + let mut offset: u64 = 0; + while offset < initrd_size { + let remaining = initrd_size - offset; + let chunk_size = std::cmp::min(remaining as usize, CHUNK_SIZE); + + let start_lba = extent_lba as u64 + (offset / ISO_BLOCK_SIZE); + let sectors_needed = (chunk_size as u64).div_ceil(ISO_BLOCK_SIZE); + let read_size = (sectors_needed * ISO_BLOCK_SIZE) as usize; + + let mut buffer = vec![0u8; read_size]; + block_io.read_blocks(Lba(start_lba), &mut buffer)?; + buffer.truncate(chunk_size); + + let bytes = Bytes::from(buffer); + if tx.blocking_send(Ok(bytes)).is_err() { + return Ok(()); + } + + offset += chunk_size as u64; + } + + // Phase 2: Stream firmware from disk + let mut firmware_file = File::open(&firmware_path_clone)?; + let mut offset: u64 = 0; + while offset < firmware_size { + let remaining = firmware_size - offset; + let chunk_size = std::cmp::min(remaining as usize, CHUNK_SIZE); + + let mut buffer = vec![0u8; chunk_size]; + firmware_file.read_exact(&mut buffer)?; + + let bytes = Bytes::from(buffer); + if tx.blocking_send(Ok(bytes)).is_err() { + return Ok(()); + } + + offset += chunk_size as u64; + } + + Ok(()) + })(); + + if let Err(e) = result { + let _ = tx.blocking_send(Err(e)); + } + }); + + Ok((total_size, rx)) } }