From cfee423f4865d4aa570f247df337a9af361acadd Mon Sep 17 00:00:00 2001 From: Alexandre Milesi Date: Mon, 10 Nov 2025 14:18:44 -0800 Subject: [PATCH 1/7] feat: decoded media via NIXL Signed-off-by: Alexandre Milesi --- Cargo.lock | 2 + lib/bindings/python/Cargo.lock | 1 + lib/llm/Cargo.toml | 5 +- lib/llm/src/preprocessor.rs | 11 +- lib/llm/src/preprocessor/media.rs | 2 + lib/llm/src/preprocessor/media/decoders.rs | 50 ++------ .../src/preprocessor/media/decoders/image.rs | 40 +++--- lib/llm/src/preprocessor/media/loader.rs | 49 +++++--- lib/llm/src/preprocessor/media/rdma.rs | 119 ++++++++++++++++++ lib/llm/src/protocols/common/preprocessor.rs | 4 +- 10 files changed, 208 insertions(+), 75 deletions(-) create mode 100644 lib/llm/src/preprocessor/media/rdma.rs diff --git a/Cargo.lock b/Cargo.lock index 215d06903b..821fbf40c5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2257,6 +2257,7 @@ dependencies = [ "derive_builder", "dialoguer", "dynamo-async-openai", + "dynamo-memory", "dynamo-parsers", "dynamo-runtime", "either", @@ -3981,6 +3982,7 @@ dependencies = [ "ravif", "rayon", "rgb", + "serde", "tiff", "zune-core", "zune-jpeg", diff --git a/lib/bindings/python/Cargo.lock b/lib/bindings/python/Cargo.lock index 8ac7b954aa..b77ce016d8 100644 --- a/lib/bindings/python/Cargo.lock +++ b/lib/bindings/python/Cargo.lock @@ -2903,6 +2903,7 @@ dependencies = [ "ravif", "rayon", "rgb", + "serde", "tiff", "zune-core", "zune-jpeg", diff --git a/lib/llm/Cargo.toml b/lib/llm/Cargo.toml index ecc4a20d82..dd7c80a1f0 100644 --- a/lib/llm/Cargo.toml +++ b/lib/llm/Cargo.toml @@ -24,6 +24,7 @@ testing-etcd = [] block-manager = ["dep:nixl-sys", "dep:cudarc", "dep:nix", "dep:aligned-vec"] cuda = ["dep:cudarc"] integration = ["dynamo-runtime/integration"] +media-nixl = ["dep:dynamo-memory", "dep:nixl-sys"] [[bench]] name = "tokenizer" @@ -33,9 +34,11 @@ harness = false name = "transfer_context_v2" harness = false required-features = ["block-manager", "testing-cuda"] + [dependencies] # repo dynamo-runtime = { workspace = true } +dynamo-memory = { path = "../memory", version = "0.7.0", optional = true } # workspace aho-corasick = "1.1" @@ -142,7 +145,7 @@ json-five = { version = "0.3" } # media loading in the preprocessor reqwest = { workspace = true } base64 = { version = "0.22" } -image = { version = "0.25" } +image = { version = "0.25", features = ["default", "serde"] } tokio-rayon = {version = "2" } ndarray = { version = "0.16" } diff --git a/lib/llm/src/preprocessor.rs b/lib/llm/src/preprocessor.rs index e7374d8a3a..f583644e45 100644 --- a/lib/llm/src/preprocessor.rs +++ b/lib/llm/src/preprocessor.rs @@ -327,14 +327,21 @@ impl OpenAIPreprocessor { // Execute all fetch tasks if !fetch_tasks.is_empty() { let loader = self.media_loader.as_ref().unwrap(); - let _results = futures::future::join_all( + let results = futures::future::join_all( fetch_tasks .iter() .map(|(_, content_part)| loader.fetch_and_decode_media_part(content_part)), ) .await; - // TODO: decode and pass NIXL descriptors to the media map + for ((type_str, _), result) in fetch_tasks.into_iter().zip(results.into_iter()) { + // if one item fails, errors the whole request, other items will be cleaned up by Drop + let rdma_descriptor = result?; + media_map + .entry(type_str) + .or_default() + .push(MultimodalData::Decoded(rdma_descriptor)); + } } if !media_map.is_empty() { diff --git a/lib/llm/src/preprocessor/media.rs b/lib/llm/src/preprocessor/media.rs index 0c0e3e6b12..34ac68772f 100644 --- a/lib/llm/src/preprocessor/media.rs +++ b/lib/llm/src/preprocessor/media.rs @@ -4,7 +4,9 @@ mod common; mod decoders; mod loader; +mod rdma; pub use common::EncodedMediaData; pub use decoders::{Decoder, ImageDecoder, MediaDecoder}; pub use loader::{MediaFetcher, MediaLoader}; +pub use rdma::{DecodedMediaData, RdmaMediaDataDescriptor, get_nixl_agent, get_nixl_metadata}; diff --git a/lib/llm/src/preprocessor/media/decoders.rs b/lib/llm/src/preprocessor/media/decoders.rs index aa546915ec..230095975e 100644 --- a/lib/llm/src/preprocessor/media/decoders.rs +++ b/lib/llm/src/preprocessor/media/decoders.rs @@ -2,52 +2,14 @@ // SPDX-License-Identifier: Apache-2.0 use anyhow::Result; +use serde::{Deserialize, Serialize}; use super::common::EncodedMediaData; -use ndarray::{ArrayBase, Dimension, OwnedRepr}; -mod image; +use super::rdma::DecodedMediaData; +pub mod image; pub use image::{ImageDecoder, ImageMetadata}; -#[derive(Debug)] -pub enum DecodedMediaMetadata { - #[allow(dead_code)] // used in followup MR - Image(ImageMetadata), -} - -#[derive(Debug, PartialEq, Eq)] -pub enum DataType { - UINT8, -} - -// Decoded media data (image RGB, video frames pixels, ...) -#[derive(Debug)] -pub struct DecodedMediaData { - #[allow(dead_code)] // used in followup MR - pub(crate) data: Vec, - #[allow(dead_code)] // used in followup MR - pub(crate) shape: Vec, - #[allow(dead_code)] // used in followup MR - pub(crate) dtype: DataType, - #[allow(dead_code)] // used in followup MR - pub(crate) metadata: Option, -} - -// convert Array{N} to DecodedMediaData -// TODO: Array1 for audio -impl From, D>> for DecodedMediaData { - fn from(array: ArrayBase, D>) -> Self { - let shape = array.shape().to_vec(); - let (data, _) = array.into_raw_vec_and_offset(); - Self { - data, - shape, - dtype: DataType::UINT8, - metadata: None, - } - } -} - #[async_trait::async_trait] pub trait Decoder: Clone + Send + 'static { fn decode(&self, data: EncodedMediaData) -> Result; @@ -67,3 +29,9 @@ pub struct MediaDecoder { pub image_decoder: ImageDecoder, // TODO: video, audio decoders } + +#[derive(Serialize, Deserialize, Clone, Copy, Debug)] +pub enum DecodedMediaMetadata { + #[allow(dead_code)] // used in followup MR + Image(ImageMetadata), +} diff --git a/lib/llm/src/preprocessor/media/decoders/image.rs b/lib/llm/src/preprocessor/media/decoders/image.rs index e6c857d33b..4042490025 100644 --- a/lib/llm/src/preprocessor/media/decoders/image.rs +++ b/lib/llm/src/preprocessor/media/decoders/image.rs @@ -6,14 +6,15 @@ use std::io::Cursor; use anyhow::Result; use image::{ColorType, GenericImageView, ImageFormat, ImageReader}; use ndarray::Array3; +use serde::{Deserialize, Serialize}; use super::super::common::EncodedMediaData; -use super::super::decoders::{DecodedMediaData, DecodedMediaMetadata}; -use super::Decoder; +use super::super::rdma::DecodedMediaData; +use super::{DecodedMediaMetadata, Decoder}; const DEFAULT_MAX_ALLOC: u64 = 128 * 1024 * 1024; // 128 MB -#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] +#[derive(Clone, Debug, Serialize, Deserialize)] #[serde(deny_unknown_fields)] pub struct ImageDecoder { #[serde(default)] @@ -36,12 +37,12 @@ impl Default for ImageDecoder { } #[allow(clippy::upper_case_acronyms)] -#[derive(Debug)] +#[derive(Serialize, Deserialize, Clone, Copy, Debug)] pub enum ImageLayout { HWC, } -#[derive(Debug)] +#[derive(Serialize, Deserialize, Clone, Copy, Debug)] pub struct ImageMetadata { #[allow(dead_code)] // used in followup MR pub(crate) format: Option, @@ -78,8 +79,8 @@ impl Decoder for ImageDecoder { let (width, height) = img.dimensions(); let shape = (height as usize, width as usize, n_channels as usize); let array = Array3::from_shape_vec(shape, data)?; - let mut decoded: DecodedMediaData = array.into(); - decoded.metadata = Some(DecodedMediaMetadata::Image(ImageMetadata { + let mut decoded: DecodedMediaData = array.try_into()?; + decoded.tensor_info.metadata = Some(DecodedMediaMetadata::Image(ImageMetadata { format, color_type, layout: ImageLayout::HWC, @@ -90,7 +91,7 @@ impl Decoder for ImageDecoder { #[cfg(test)] mod tests { - use super::super::super::decoders::DataType; + use super::super::super::rdma::DataType; use super::*; use image::{DynamicImage, ImageBuffer}; use rstest::rstest; @@ -156,10 +157,10 @@ mod tests { let decoded = result.unwrap(); assert_eq!( - decoded.shape, + decoded.tensor_info.shape, vec![height as usize, width as usize, expected_channels as usize] ); - assert_eq!(decoded.dtype, DataType::UINT8); + assert_eq!(decoded.tensor_info.dtype, DataType::UINT8); } #[rstest] @@ -196,9 +197,12 @@ mod tests { format ); let decoded = result.unwrap(); - assert_eq!(decoded.shape, vec![height as usize, width as usize, 3]); assert_eq!( - decoded.dtype, + decoded.tensor_info.shape, + vec![height as usize, width as usize, 3] + ); + assert_eq!( + decoded.tensor_info.dtype, DataType::UINT8, "dtype should be uint8 for case: {}", test_case @@ -236,11 +240,15 @@ mod tests { ); let decoded = result.unwrap(); - assert_eq!(decoded.shape.len(), 3, "Should have 3 dimensions"); - assert_eq!(decoded.shape[0], 1, "Height should be 1"); - assert_eq!(decoded.shape[1], 1, "Width should be 1"); assert_eq!( - decoded.dtype, + decoded.tensor_info.shape.len(), + 3, + "Should have 3 dimensions" + ); + assert_eq!(decoded.tensor_info.shape[0], 1, "Height should be 1"); + assert_eq!(decoded.tensor_info.shape[1], 1, "Width should be 1"); + assert_eq!( + decoded.tensor_info.dtype, DataType::UINT8, "dtype should be uint8 for {} channels {:?}", input_channels, diff --git a/lib/llm/src/preprocessor/media/loader.rs b/lib/llm/src/preprocessor/media/loader.rs index 91fc65d9bc..b2d96b59fe 100644 --- a/lib/llm/src/preprocessor/media/loader.rs +++ b/lib/llm/src/preprocessor/media/loader.rs @@ -9,7 +9,9 @@ use anyhow::Result; use dynamo_async_openai::types::ChatCompletionRequestUserMessageContentPart; use super::common::EncodedMediaData; -use super::decoders::{DecodedMediaData, Decoder, MediaDecoder}; +use super::decoders::{Decoder, MediaDecoder}; +use super::rdma::{RdmaMediaDataDescriptor, get_nixl_agent}; +use dynamo_memory::nixl::NixlAgent; const DEFAULT_HTTP_USER_AGENT: &str = "dynamo-ai/dynamo"; const DEFAULT_HTTP_TIMEOUT: Duration = Duration::from_secs(30); @@ -39,7 +41,7 @@ pub struct MediaLoader { media_decoder: MediaDecoder, http_client: reqwest::Client, media_fetcher: MediaFetcher, - // TODO: NIXL agent + nixl_agent: NixlAgent, } impl MediaLoader { @@ -53,10 +55,13 @@ impl MediaLoader { let http_client = http_client_builder.build()?; + let nixl_agent = get_nixl_agent()?; + Ok(Self { media_decoder, http_client, media_fetcher, + nixl_agent, }) } @@ -90,9 +95,8 @@ impl MediaLoader { &self, oai_content_part: &ChatCompletionRequestUserMessageContentPart, // TODO: request-level options - ) -> Result { - // fetch the media - // TODO: decode and NIXL-register + ) -> Result { + // fetch the media, decode and NIXL-register let decoded = match oai_content_part { ChatCompletionRequestUserMessageContentPart::ImageUrl(image_part) => { let url = &image_part.image_url.url; @@ -112,13 +116,14 @@ impl MediaLoader { _ => anyhow::bail!("Unsupported media type"), }; - Ok(decoded) + let rdma_descriptor = decoded.into_rdma_descriptor(&self.nixl_agent)?; + Ok(rdma_descriptor) } } #[cfg(test)] mod tests { - use super::super::decoders::DataType; + use super::super::rdma::DataType; use super::*; use dynamo_async_openai::types::{ChatCompletionRequestMessageContentPartImage, ImageUrl}; @@ -157,17 +162,33 @@ mod tests { result.err() ); - let data = result.unwrap(); - assert_eq!(data.dtype, DataType::UINT8); + let descriptor = result.unwrap(); + assert_eq!(descriptor.tensor_info.dtype, DataType::UINT8); // Verify image dimensions: 1,999px × 1,125px (width × height) // Shape format is [height, width, channels] - assert_eq!(data.shape.len(), 3); - assert_eq!(data.shape[0], 1125, "Height should be 1125"); - assert_eq!(data.shape[1], 1999, "Width should be 1999"); - assert_eq!(data.shape[2], 4, "RGBA channels should be 4"); + assert_eq!(descriptor.tensor_info.shape.len(), 3); + assert_eq!( + descriptor.tensor_info.shape[0], 1125, + "Height should be 1125" + ); + assert_eq!( + descriptor.tensor_info.shape[1], 1999, + "Width should be 1999" + ); + assert_eq!( + descriptor.tensor_info.shape[2], 4, + "RGBA channels should be 4" + ); - mock.assert_async().await; + assert!( + descriptor.source_storage.is_some(), + "Source storage should be present" + ); + assert!( + descriptor.source_storage.unwrap().is_registered(), + "Source storage should be registered with NIXL" + ); } #[test] diff --git a/lib/llm/src/preprocessor/media/rdma.rs b/lib/llm/src/preprocessor/media/rdma.rs new file mode 100644 index 0000000000..7b39e45111 --- /dev/null +++ b/lib/llm/src/preprocessor/media/rdma.rs @@ -0,0 +1,119 @@ +// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +use anyhow::Result; +use base64::{Engine as _, engine::general_purpose}; +use ndarray::{ArrayBase, Dimension, OwnedRepr}; +use serde::{Deserialize, Serialize}; +use std::sync::Arc; + +use dynamo_memory::{ + StorageError, SystemStorage, + nixl::{self, NixlAgent, NixlDescriptor, RegisteredView}, +}; + +use super::decoders::DecodedMediaMetadata; + +#[derive(Debug, PartialEq, Eq, Clone, Copy, Serialize, Deserialize)] +pub enum DataType { + UINT8, +} + +// Common tensor metadata shared between decoded and RDMA descriptors +#[derive(Serialize, Deserialize, Clone, Debug)] +pub struct MediaTensorInfo { + pub(crate) shape: Vec, + pub(crate) dtype: DataType, + pub(crate) metadata: Option, +} + +// Decoded media data (image RGB, video frames pixels, ...) +#[derive(Debug)] +pub struct DecodedMediaData { + pub(crate) data: SystemStorage, + pub(crate) tensor_info: MediaTensorInfo, +} + +// Decoded media data NIXL descriptor (sent to the next step in the pipeline / NATS) +#[derive(Serialize, Deserialize, Clone, Debug)] +pub struct RdmaMediaDataDescriptor { + // b64 agent metadata + pub(crate) nixl_metadata: String, + // tensor descriptor + pub(crate) nixl_descriptor: NixlDescriptor, + + #[serde(flatten)] + pub(crate) tensor_info: MediaTensorInfo, + + // reference to the actual data, kept alive while the rdma descriptor is alive + #[serde(skip, default)] + #[allow(dead_code)] + pub(crate) source_storage: Option>>, +} + +impl DecodedMediaData { + pub fn into_rdma_descriptor(self, nixl_agent: &NixlAgent) -> Result { + // Register storage with NIXL + let source_storage = self.data; + let registered = nixl::register_with_nixl(source_storage, nixl_agent, None) + .map_err(|_| anyhow::anyhow!("Failed to register storage with NIXL"))?; + + let nixl_descriptor = registered.descriptor(); + let nixl_metadata = get_nixl_metadata(nixl_agent, registered.storage())?; + + Ok(RdmaMediaDataDescriptor { + nixl_metadata, + nixl_descriptor, + tensor_info: self.tensor_info, + // Keep registered storage alive + source_storage: Some(Arc::new(registered)), + }) + } +} + +// convert Array{N} to DecodedMediaData +// TODO: Array1 for audio +impl TryFrom, D>> for DecodedMediaData { + type Error = StorageError; + + fn try_from(array: ArrayBase, D>) -> Result { + let shape = array.shape().to_vec(); + let (data_vec, _) = array.into_raw_vec_and_offset(); + + // Allocate new system storage and copy data + // TODO: use arena allocator and avoid copies + let mut storage = SystemStorage::new(data_vec.len())?; + unsafe { + std::ptr::copy_nonoverlapping(data_vec.as_ptr(), storage.as_mut_ptr(), data_vec.len()); + } + + Ok(Self { + data: storage, + tensor_info: MediaTensorInfo { + shape, + dtype: DataType::UINT8, + metadata: None, + }, + }) + } +} + +// Get NIXL metadata for a descriptor +// Avoids cross-request leak possibility and reduces metadata size +// TODO: pre-allocate a fixed NIXL-registered RAM pool so metadata can be cached on the target? +pub fn get_nixl_metadata(agent: &NixlAgent, _storage: &SystemStorage) -> Result { + // WAR: Until https://github.com/ai-dynamo/nixl/pull/970 is merged, can't use get_local_partial_md + let nixl_md = agent.raw_agent().get_local_md()?; + // let mut reg_desc_list = RegDescList::new(MemType::Dram)?; + // reg_desc_list.add_storage_desc(storage)?; + // let nixl_partial_md = agent.raw_agent().get_local_partial_md(®_desc_list, None)?; + + let b64_encoded = general_purpose::STANDARD.encode(&nixl_md); + Ok(format!("b64:{}", b64_encoded)) +} + +pub fn get_nixl_agent() -> Result { + let name = format!("media-loader-{}", uuid::Uuid::new_v4()); + let nixl_agent = NixlAgent::with_backends(&name, &["UCX"])?; + Ok(nixl_agent) +} diff --git a/lib/llm/src/protocols/common/preprocessor.rs b/lib/llm/src/protocols/common/preprocessor.rs index 71260da5d2..7d5abf4c50 100644 --- a/lib/llm/src/protocols/common/preprocessor.rs +++ b/lib/llm/src/protocols/common/preprocessor.rs @@ -6,12 +6,13 @@ use serde::{Deserialize, Serialize}; use super::{OutputOptions, SamplingOptions, StopConditions}; use crate::kv_router::RouterConfigOverride; +use crate::preprocessor::media::RdmaMediaDataDescriptor; use crate::protocols::TokenIdType; #[derive(Serialize, Deserialize, Debug, Clone)] pub enum MultimodalData { Url(url::Url), - // TODO: Decoded(DecodedMediaData), + Decoded(RdmaMediaDataDescriptor), } // multimodal map containing {mm_part_type: [data...]} @@ -31,6 +32,7 @@ pub struct PreprocessedRequest { #[builder(default)] #[serde(default, skip_serializing_if = "Option::is_none")] pub multi_modal_data: Option, + /// StopConditions are conditions that the inference engine will use to stop generation. pub stop_conditions: StopConditions, From 43a3f0c91d73710f7cc2a4cf84b78a0e75a794aa Mon Sep 17 00:00:00 2001 From: Alexandre Milesi Date: Fri, 7 Nov 2025 16:57:30 -0800 Subject: [PATCH 2/7] feat: NIXL stub support Signed-off-by: Alexandre Milesi --- Cargo.lock | 4 +- lib/bindings/python/Cargo.lock | 110 +++++++++++++++++- lib/llm/Cargo.toml | 2 +- .../src/preprocessor/media/decoders/image.rs | 5 +- lib/llm/src/preprocessor/media/loader.rs | 35 ++++-- lib/memory/Cargo.toml | 2 +- lib/memory/src/nixl/agent.rs | 4 + 7 files changed, 141 insertions(+), 21 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 821fbf40c5..f883f35791 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5557,9 +5557,9 @@ dependencies = [ [[package]] name = "nixl-sys" version = "0.7.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a73b92494c94b2ff2d004cd9274d966863089e867dc9cd98bc640aefe7622036" +source = "git+https://github.com/ai-dynamo/nixl?rev=ae3f8af#ae3f8af9508a1e1f8aeb687ae3ae66644d3ba5e8" dependencies = [ + "anyhow", "bindgen 0.71.1", "cc", "libc", diff --git a/lib/bindings/python/Cargo.lock b/lib/bindings/python/Cargo.lock index b77ce016d8..dc3bd7c419 100644 --- a/lib/bindings/python/Cargo.lock +++ b/lib/bindings/python/Cargo.lock @@ -376,7 +376,7 @@ version = "0.30.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dbfd150b5dbdb988bcc8fb1fe787eb6b7ee6180ca24da683b61ea5405f3d43ff" dependencies = [ - "bindgen", + "bindgen 0.69.5", "cc", "cmake", "dunce", @@ -587,6 +587,26 @@ dependencies = [ "which", ] +[[package]] +name = "bindgen" +version = "0.71.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5f58bf3d7db68cfbac37cfc485a8d711e87e064c3d0fe0435b92f7a407f9d6b3" +dependencies = [ + "bitflags 2.9.3", + "cexpr", + "clang-sys", + "itertools 0.12.1", + "log", + "prettyplease", + "proc-macro2", + "quote", + "regex", + "rustc-hash 2.1.1", + "shlex", + "syn 2.0.106", +] + [[package]] name = "bit-set" version = "0.5.3" @@ -1527,6 +1547,13 @@ dependencies = [ "uuid", ] +[[package]] +name = "dynamo-config" +version = "0.6.1" +dependencies = [ + "anyhow", +] + [[package]] name = "dynamo-llm" version = "0.7.0" @@ -1555,6 +1582,7 @@ dependencies = [ "derive_builder", "dialoguer", "dynamo-async-openai", + "dynamo-memory", "dynamo-parsers", "dynamo-runtime", "either", @@ -1611,6 +1639,22 @@ dependencies = [ "zeromq", ] +[[package]] +name = "dynamo-memory" +version = "0.6.1" +dependencies = [ + "anyhow", + "cudarc", + "dynamo-config", + "libc", + "nix 0.30.1", + "nixl-sys", + "offset-allocator", + "serde", + "thiserror 2.0.16", + "tracing", +] + [[package]] name = "dynamo-parsers" version = "0.7.0" @@ -1696,7 +1740,7 @@ dependencies = [ "local-ip-address", "log", "nid", - "nix", + "nix 0.29.0", "nuid", "once_cell", "opentelemetry", @@ -3897,6 +3941,34 @@ dependencies = [ "libc", ] +[[package]] +name = "nix" +version = "0.30.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "74523f3a35e05aba87a1d978330aef40f67b0304ac79c1c00b294c9830543db6" +dependencies = [ + "bitflags 2.9.3", + "cfg-if 1.0.3", + "cfg_aliases", + "libc", +] + +[[package]] +name = "nixl-sys" +version = "0.7.0" +source = "git+https://github.com/ai-dynamo/nixl?rev=ae3f8af#ae3f8af9508a1e1f8aeb687ae3ae66644d3ba5e8" +dependencies = [ + "anyhow", + "bindgen 0.71.1", + "cc", + "libc", + "os_info", + "pkg-config", + "serde", + "thiserror 2.0.16", + "tracing", +] + [[package]] name = "nkeys" version = "0.4.5" @@ -4282,6 +4354,18 @@ dependencies = [ "hashbrown 0.14.5", ] +[[package]] +name = "os_info" +version = "3.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d0e1ac5fde8d43c34139135df8ea9ee9465394b2d8d20f032d38998f64afffc3" +dependencies = [ + "log", + "plist", + "serde", + "windows-sys 0.52.0", +] + [[package]] name = "overload" version = "0.1.1" @@ -4517,6 +4601,19 @@ version = "0.3.32" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7edddbd0b52d732b21ad9a5fab5c704c14cd949e5e9a1ec5929a24fded1b904c" +[[package]] +name = "plist" +version = "1.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "740ebea15c5d1428f910cd1a5f52cebf8d25006245ed8ade92702f4943d91e07" +dependencies = [ + "base64 0.22.1", + "indexmap 2.11.0", + "quick-xml", + "serde", + "time", +] + [[package]] name = "png" version = "0.17.16" @@ -4903,6 +5000,15 @@ version = "2.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a993555f31e5a609f617c12db6250dedcac1b0a85076912c436e6fc9b2c8e6a3" +[[package]] +name = "quick-xml" +version = "0.38.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "42a232e7487fc2ef313d96dde7948e7a3c05101870d8985e4fd8d26aedd27b89" +dependencies = [ + "memchr", +] + [[package]] name = "quinn" version = "0.11.9" diff --git a/lib/llm/Cargo.toml b/lib/llm/Cargo.toml index dd7c80a1f0..d560288c51 100644 --- a/lib/llm/Cargo.toml +++ b/lib/llm/Cargo.toml @@ -98,7 +98,7 @@ dialoguer = { version = "0.11", default-features = false, features = [ # block_manager aligned-vec = { version = "0.6.4", optional = true } -nixl-sys = { version = "=0.7.0", optional = true } +nixl-sys = { git = "https://github.com/ai-dynamo/nixl", rev = "ae3f8af", optional = true } cudarc = { workspace = true, optional = true } nix = { version = "0.26", optional = true } diff --git a/lib/llm/src/preprocessor/media/decoders/image.rs b/lib/llm/src/preprocessor/media/decoders/image.rs index 4042490025..10afda4bc2 100644 --- a/lib/llm/src/preprocessor/media/decoders/image.rs +++ b/lib/llm/src/preprocessor/media/decoders/image.rs @@ -47,8 +47,6 @@ pub struct ImageMetadata { #[allow(dead_code)] // used in followup MR pub(crate) format: Option, #[allow(dead_code)] // used in followup MR - pub(crate) color_type: ColorType, - #[allow(dead_code)] // used in followup MR pub(crate) layout: ImageLayout, } @@ -68,7 +66,7 @@ impl Decoder for ImageDecoder { let img = reader.decode()?; let n_channels = img.color().channel_count(); - let (data, color_type) = match n_channels { + let (data, _color_type) = match n_channels { 1 => (img.to_luma8().into_raw(), ColorType::L8), 2 => (img.to_luma_alpha8().into_raw(), ColorType::La8), 3 => (img.to_rgb8().into_raw(), ColorType::Rgb8), @@ -82,7 +80,6 @@ impl Decoder for ImageDecoder { let mut decoded: DecodedMediaData = array.try_into()?; decoded.tensor_info.metadata = Some(DecodedMediaMetadata::Image(ImageMetadata { format, - color_type, layout: ImageLayout::HWC, })); Ok(decoded) diff --git a/lib/llm/src/preprocessor/media/loader.rs b/lib/llm/src/preprocessor/media/loader.rs index b2d96b59fe..7ddb7b28c5 100644 --- a/lib/llm/src/preprocessor/media/loader.rs +++ b/lib/llm/src/preprocessor/media/loader.rs @@ -41,7 +41,7 @@ pub struct MediaLoader { media_decoder: MediaDecoder, http_client: reqwest::Client, media_fetcher: MediaFetcher, - nixl_agent: NixlAgent, + nixl_agent: Option, } impl MediaLoader { @@ -55,7 +55,15 @@ impl MediaLoader { let http_client = http_client_builder.build()?; - let nixl_agent = get_nixl_agent()?; + let nixl_agent = match get_nixl_agent() { + Ok(agent) => Some(agent), + Err(e) => { + tracing::warn!( + "Error when creating NIXL agent (will not be able to register media data): {e}" + ); + None + } + }; Ok(Self { media_decoder, @@ -96,6 +104,10 @@ impl MediaLoader { oai_content_part: &ChatCompletionRequestUserMessageContentPart, // TODO: request-level options ) -> Result { + if self.nixl_agent.is_none() { + anyhow::bail!("NIXL agent is not available, cannot decode and register media data"); + } + // fetch the media, decode and NIXL-register let decoded = match oai_content_part { ChatCompletionRequestUserMessageContentPart::ImageUrl(image_part) => { @@ -116,7 +128,7 @@ impl MediaLoader { _ => anyhow::bail!("Unsupported media type"), }; - let rdma_descriptor = decoded.into_rdma_descriptor(&self.nixl_agent)?; + let rdma_descriptor = decoded.into_rdma_descriptor(self.nixl_agent.as_ref().unwrap())?; Ok(rdma_descriptor) } } @@ -148,7 +160,7 @@ mod tests { ..Default::default() }; - let loader = MediaLoader::new(media_decoder, fetcher).unwrap(); + let loader: MediaLoader = MediaLoader::new(media_decoder, fetcher).unwrap(); let image_url = ImageUrl::from(format!("{}/llm-optimize-deploy-graphic.png", server.url())); let content_part = ChatCompletionRequestUserMessageContentPart::ImageUrl( @@ -156,13 +168,14 @@ mod tests { ); let result = loader.fetch_and_decode_media_part(&content_part).await; - assert!( - result.is_ok(), - "Failed to fetch and decode image: {:?}", - result.err() - ); - - let descriptor = result.unwrap(); + let descriptor = match result { + Ok(descriptor) => descriptor, + Err(e) if e.to_string().contains("NIXL agent is not available") => { + eprintln!("Skipping test: NIXL agent not available"); + return; + } + Err(e) => panic!("Failed to fetch and decode image: {}", e), + }; assert_eq!(descriptor.tensor_info.dtype, DataType::UINT8); // Verify image dimensions: 1,999px × 1,125px (width × height) diff --git a/lib/memory/Cargo.toml b/lib/memory/Cargo.toml index c435b2a278..7b7c513d42 100644 --- a/lib/memory/Cargo.toml +++ b/lib/memory/Cargo.toml @@ -26,7 +26,7 @@ dynamo-config = { workspace = true } anyhow = { workspace = true } cudarc = { workspace = true } -nixl-sys = { version = "0.7" } +nixl-sys = { git = "https://github.com/ai-dynamo/nixl", rev = "ae3f8af" } serde = { workspace = true} thiserror = { workspace = true } tracing = { workspace = true } diff --git a/lib/memory/src/nixl/agent.rs b/lib/memory/src/nixl/agent.rs index a1cf4343fb..f5361674ff 100644 --- a/lib/memory/src/nixl/agent.rs +++ b/lib/memory/src/nixl/agent.rs @@ -32,6 +32,10 @@ pub struct NixlAgent { impl NixlAgent { /// Create a NIXL agent without any backends. pub fn new(name: &str) -> Result { + if nixl_sys::is_stub() { + return Err(anyhow::anyhow!("NIXL is stubbed, cannot create agent")); + } + let agent = Agent::new(name)?; Ok(Self { From 01f94d6785d50539e01a13533a4073890039203d Mon Sep 17 00:00:00 2001 From: Alexandre Milesi Date: Mon, 10 Nov 2025 09:22:02 -0800 Subject: [PATCH 3/7] chore: cleanups Signed-off-by: Alexandre Milesi --- Cargo.lock | 2 +- lib/bindings/python/Cargo.lock | 2 +- lib/llm/Cargo.toml | 2 +- lib/llm/src/preprocessor/media/README.md | 29 +++++++++++++++++++ lib/llm/src/preprocessor/media/decoders.rs | 1 - .../src/preprocessor/media/decoders/image.rs | 6 ++-- lib/llm/src/preprocessor/media/loader.rs | 4 ++- lib/memory/Cargo.toml | 2 +- 8 files changed, 39 insertions(+), 9 deletions(-) create mode 100644 lib/llm/src/preprocessor/media/README.md diff --git a/Cargo.lock b/Cargo.lock index f883f35791..224f16aa55 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5557,7 +5557,7 @@ dependencies = [ [[package]] name = "nixl-sys" version = "0.7.0" -source = "git+https://github.com/ai-dynamo/nixl?rev=ae3f8af#ae3f8af9508a1e1f8aeb687ae3ae66644d3ba5e8" +source = "git+https://github.com/ai-dynamo/nixl?rev=00bac00#00bac003950fd00cccc05dc880f58c17ae46ebd1" dependencies = [ "anyhow", "bindgen 0.71.1", diff --git a/lib/bindings/python/Cargo.lock b/lib/bindings/python/Cargo.lock index dc3bd7c419..958774f465 100644 --- a/lib/bindings/python/Cargo.lock +++ b/lib/bindings/python/Cargo.lock @@ -3956,7 +3956,7 @@ dependencies = [ [[package]] name = "nixl-sys" version = "0.7.0" -source = "git+https://github.com/ai-dynamo/nixl?rev=ae3f8af#ae3f8af9508a1e1f8aeb687ae3ae66644d3ba5e8" +source = "git+https://github.com/ai-dynamo/nixl?rev=00bac00#00bac003950fd00cccc05dc880f58c17ae46ebd1" dependencies = [ "anyhow", "bindgen 0.71.1", diff --git a/lib/llm/Cargo.toml b/lib/llm/Cargo.toml index d560288c51..105541f263 100644 --- a/lib/llm/Cargo.toml +++ b/lib/llm/Cargo.toml @@ -98,7 +98,7 @@ dialoguer = { version = "0.11", default-features = false, features = [ # block_manager aligned-vec = { version = "0.6.4", optional = true } -nixl-sys = { git = "https://github.com/ai-dynamo/nixl", rev = "ae3f8af", optional = true } +nixl-sys = { git = "https://github.com/ai-dynamo/nixl", rev = "00bac00", optional = true } cudarc = { workspace = true, optional = true } nix = { version = "0.26", optional = true } diff --git a/lib/llm/src/preprocessor/media/README.md b/lib/llm/src/preprocessor/media/README.md new file mode 100644 index 0000000000..9bc983a9be --- /dev/null +++ b/lib/llm/src/preprocessor/media/README.md @@ -0,0 +1,29 @@ +# Media decoding in the frontend + + +This component performs media download, base64 decoding, media decoding and NIXL registration. Today, this is used in the OpenAI preprocessor, to transform multimodal inputs (image_url, video_url, audio_url) into fully decoded data (pixel values, ...) accessible to the backends via NIXL. + + + +## TODOs + +### Modalities + +- [x] Image decoding +- [ ] Video decoding +- [ ] Audio decoding + +### Performance + +- [x] Image SW decoding +- [ ] Video HW decoding (NVDEC) +- [ ] JPEG HW decoding (nvJPEG) +- [ ] Sparse video sampling (seek-forward) +- [ ] Memory slab pre-allocation/registration + +### Memory management +- [ ] Memory spilling to lower storage tiers +- [ ] Early-free memory on client notifications + +### Observability +- [ ] Observability on performance, memory usage and input distributions diff --git a/lib/llm/src/preprocessor/media/decoders.rs b/lib/llm/src/preprocessor/media/decoders.rs index 230095975e..984aab1ea2 100644 --- a/lib/llm/src/preprocessor/media/decoders.rs +++ b/lib/llm/src/preprocessor/media/decoders.rs @@ -32,6 +32,5 @@ pub struct MediaDecoder { #[derive(Serialize, Deserialize, Clone, Copy, Debug)] pub enum DecodedMediaMetadata { - #[allow(dead_code)] // used in followup MR Image(ImageMetadata), } diff --git a/lib/llm/src/preprocessor/media/decoders/image.rs b/lib/llm/src/preprocessor/media/decoders/image.rs index 10afda4bc2..09de4db8d2 100644 --- a/lib/llm/src/preprocessor/media/decoders/image.rs +++ b/lib/llm/src/preprocessor/media/decoders/image.rs @@ -44,9 +44,8 @@ pub enum ImageLayout { #[derive(Serialize, Deserialize, Clone, Copy, Debug)] pub struct ImageMetadata { - #[allow(dead_code)] // used in followup MR pub(crate) format: Option, - #[allow(dead_code)] // used in followup MR + pub(crate) color_type: ColorType, pub(crate) layout: ImageLayout, } @@ -66,7 +65,7 @@ impl Decoder for ImageDecoder { let img = reader.decode()?; let n_channels = img.color().channel_count(); - let (data, _color_type) = match n_channels { + let (data, color_type) = match n_channels { 1 => (img.to_luma8().into_raw(), ColorType::L8), 2 => (img.to_luma_alpha8().into_raw(), ColorType::La8), 3 => (img.to_rgb8().into_raw(), ColorType::Rgb8), @@ -80,6 +79,7 @@ impl Decoder for ImageDecoder { let mut decoded: DecodedMediaData = array.try_into()?; decoded.tensor_info.metadata = Some(DecodedMediaMetadata::Image(ImageMetadata { format, + color_type, layout: ImageLayout::HWC, })); Ok(decoded) diff --git a/lib/llm/src/preprocessor/media/loader.rs b/lib/llm/src/preprocessor/media/loader.rs index 7ddb7b28c5..1d63a36575 100644 --- a/lib/llm/src/preprocessor/media/loader.rs +++ b/lib/llm/src/preprocessor/media/loader.rs @@ -168,14 +168,16 @@ mod tests { ); let result = loader.fetch_and_decode_media_part(&content_part).await; + let descriptor = match result { Ok(descriptor) => descriptor, Err(e) if e.to_string().contains("NIXL agent is not available") => { - eprintln!("Skipping test: NIXL agent not available"); + println!("test test_fetch_and_decode ... ignored (NIXL agent not available)"); return; } Err(e) => panic!("Failed to fetch and decode image: {}", e), }; + mock.assert_async().await; assert_eq!(descriptor.tensor_info.dtype, DataType::UINT8); // Verify image dimensions: 1,999px × 1,125px (width × height) diff --git a/lib/memory/Cargo.toml b/lib/memory/Cargo.toml index 7b7c513d42..43bd04c82b 100644 --- a/lib/memory/Cargo.toml +++ b/lib/memory/Cargo.toml @@ -26,7 +26,7 @@ dynamo-config = { workspace = true } anyhow = { workspace = true } cudarc = { workspace = true } -nixl-sys = { git = "https://github.com/ai-dynamo/nixl", rev = "ae3f8af" } +nixl-sys = { git = "https://github.com/ai-dynamo/nixl", rev = "00bac00" } serde = { workspace = true} thiserror = { workspace = true } tracing = { workspace = true } From 0aabc4126e4ce911991a62707313c40ec42c94b2 Mon Sep 17 00:00:00 2001 From: Alexandre Milesi Date: Mon, 10 Nov 2025 17:20:05 -0800 Subject: [PATCH 4/7] fix: image-rs serde Signed-off-by: Alexandre Milesi --- Cargo.lock | 12 +- lib/bindings/python/Cargo.lock | 406 +++++---------------------------- lib/llm/Cargo.toml | 2 +- 3 files changed, 66 insertions(+), 354 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 224f16aa55..69f2a1c124 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7967,9 +7967,9 @@ checksum = "1bc711410fbe7399f390ca1c3b60ad0f53f80e95c5eb935e52268a0e2cd49acc" [[package]] name = "serde" -version = "1.0.226" +version = "1.0.228" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0dca6411025b24b60bfa7ec1fe1f8e710ac09782dca409ee8237ba74b51295fd" +checksum = "9a8e94ea7f378bd32cbbd37198a4a91436180c5bb472411e48b5ec2e2124ae9e" dependencies = [ "serde_core", "serde_derive", @@ -8018,18 +8018,18 @@ dependencies = [ [[package]] name = "serde_core" -version = "1.0.226" +version = "1.0.228" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ba2ba63999edb9dac981fb34b3e5c0d111a69b0924e253ed29d83f7c99e966a4" +checksum = "41d385c7d4ca58e59fc732af25c3983b67ac852c1a25000afe1175de458b67ad" dependencies = [ "serde_derive", ] [[package]] name = "serde_derive" -version = "1.0.226" +version = "1.0.228" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8db53ae22f34573731bafa1db20f04027b2d25e02d8205921b569171699cdb33" +checksum = "d540f220d3187173da220f885ab66608367b6574e925011a9353e4badda91d79" dependencies = [ "proc-macro2", "quote", diff --git a/lib/bindings/python/Cargo.lock b/lib/bindings/python/Cargo.lock index 958774f465..1fd36a7281 100644 --- a/lib/bindings/python/Cargo.lock +++ b/lib/bindings/python/Cargo.lock @@ -55,12 +55,6 @@ dependencies = [ "equator", ] -[[package]] -name = "allocator-api2" -version = "0.2.21" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "683d7910e743518b0e34f1186f92494becacb047c7b6bf616c96772180fef923" - [[package]] name = "android-tzdata" version = "0.1.1" @@ -179,18 +173,6 @@ version = "0.7.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7c02d123df017efcdfbd739ef81735b36c5ba83ec3c59c80a9d7ecc718f92e50" -[[package]] -name = "async-broadcast" -version = "0.7.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "435a87a52755b8f27fcf321ac4f04b2802e337c8c4872923137471ec39c37532" -dependencies = [ - "event-listener", - "event-listener-strategy", - "futures-core", - "pin-project-lite", -] - [[package]] name = "async-channel" version = "2.5.0" @@ -485,17 +467,6 @@ dependencies = [ "tokio", ] -[[package]] -name = "backon" -version = "1.6.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cffb0e931875b666fc4fcb20fee52e9bbd1ef836fd9e9e04ec21555f9f85f7ef" -dependencies = [ - "fastrand", - "gloo-timers", - "tokio", -] - [[package]] name = "backtrace" version = "0.3.75" @@ -1193,18 +1164,8 @@ version = "0.20.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fc7f46116c46ff9ab3eb1597a45688b6715c6e628b5c133e288e709a29bcb4ee" dependencies = [ - "darling_core 0.20.11", - "darling_macro 0.20.11", -] - -[[package]] -name = "darling" -version = "0.21.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9cdf337090841a411e2a7f3deb9187445851f91b309c0c0a29e05f74a00a48c0" -dependencies = [ - "darling_core 0.21.3", - "darling_macro 0.21.3", + "darling_core", + "darling_macro", ] [[package]] @@ -1221,38 +1182,13 @@ dependencies = [ "syn 2.0.106", ] -[[package]] -name = "darling_core" -version = "0.21.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1247195ecd7e3c85f83c8d2a366e4210d588e802133e1e355180a9870b517ea4" -dependencies = [ - "fnv", - "ident_case", - "proc-macro2", - "quote", - "strsim", - "syn 2.0.106", -] - [[package]] name = "darling_macro" version = "0.20.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fc34b93ccb385b40dc71c6fceac4b2ad23662c7eeb248cf10d529b7e055b6ead" dependencies = [ - "darling_core 0.20.11", - "quote", - "syn 2.0.106", -] - -[[package]] -name = "darling_macro" -version = "0.21.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d38308df82d1080de0afee5d069fa14b0326a88c14f15c5ccda35b4a6c414c81" -dependencies = [ - "darling_core 0.21.3", + "darling_core", "quote", "syn 2.0.106", ] @@ -1349,7 +1285,7 @@ version = "0.20.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2d5bcf7b024d6835cfb3d473887cd966994907effbe9227e8c8219824d06c4e8" dependencies = [ - "darling 0.20.11", + "darling", "proc-macro2", "quote", "syn 2.0.106", @@ -1371,16 +1307,7 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4a9b99b9cbbe49445b21764dc0625032a89b145a2642e67603e1c936f5458d05" dependencies = [ - "derive_more-impl 1.0.0", -] - -[[package]] -name = "derive_more" -version = "2.0.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "093242cf7570c207c83073cf82f79706fe7b8317e98620a47d5be7c3d8497678" -dependencies = [ - "derive_more-impl 2.0.1", + "derive_more-impl", ] [[package]] @@ -1395,17 +1322,6 @@ dependencies = [ "unicode-xid", ] -[[package]] -name = "derive_more-impl" -version = "2.0.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bda628edc44c4bb645fbe0f758797143e4e07926f7ebf4e9bdfbd3d2ce621df3" -dependencies = [ - "proc-macro2", - "quote", - "syn 2.0.106", -] - [[package]] name = "dialoguer" version = "0.11.0" @@ -1735,8 +1651,6 @@ dependencies = [ "futures", "humantime", "inotify", - "k8s-openapi", - "kube", "local-ip-address", "log", "nid", @@ -1751,7 +1665,6 @@ dependencies = [ "rand 0.9.2", "rayon", "regex", - "reqwest", "serde", "serde_json", "socket2 0.5.10", @@ -2006,6 +1919,26 @@ version = "2.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "37909eebbb50d72f9059c3b6d82c0463f2ff062c9e95845c43a6c9c0355411be" +[[package]] +name = "fax" +version = "0.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f05de7d48f37cd6730705cbca900770cab77a89f413d23e100ad7fad7795a0ab" +dependencies = [ + "fax_derive", +] + +[[package]] +name = "fax_derive" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a0aca10fb742cb43f9e7bb8467c91aa9bcb8e3ffbc6a6f7389bb93ffc920577d" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.106", +] + [[package]] name = "fdeflate" version = "0.3.7" @@ -2518,18 +2451,6 @@ version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0cc23270f6e1808e30a928bdc84dea0b9b4136a8bc82338574f23baf47bbd280" -[[package]] -name = "gloo-timers" -version = "0.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bbb143cf96099802033e0d4f4963b19fd2e0b728bcf076cd9cf7f6634f092994" -dependencies = [ - "futures-channel", - "futures-core", - "js-sys", - "wasm-bindgen", -] - [[package]] name = "h2" version = "0.4.12" @@ -2584,8 +2505,6 @@ version = "0.15.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9229cfe53dfd69f0609a49f65461bd93001ea1ef889cd5529dd176593f5338a1" dependencies = [ - "allocator-api2", - "equivalent", "foldhash", ] @@ -2648,17 +2567,6 @@ dependencies = [ "windows-sys 0.59.0", ] -[[package]] -name = "hostname" -version = "0.4.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a56f203cd1c76362b69e3863fd987520ac36cf70a8c92627449b2f64a8cf7d65" -dependencies = [ - "cfg-if 1.0.3", - "libc", - "windows-link 0.1.3", -] - [[package]] name = "http" version = "1.3.1" @@ -2743,7 +2651,6 @@ dependencies = [ "http", "hyper", "hyper-util", - "log", "rustls", "rustls-native-certs 0.8.1", "rustls-pki-types", @@ -2931,9 +2838,9 @@ dependencies = [ [[package]] name = "image" -version = "0.25.6" +version = "0.25.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "db35664ce6b9810857a38a906215e75a9c879f0696556a39f59c62829710251a" +checksum = "529feb3e6769d234375c4cf1ee2ce713682b8e76538cb13f9fc23e1400a591e7" dependencies = [ "bytemuck", "byteorder-lite", @@ -2941,6 +2848,7 @@ dependencies = [ "exr", "gif", "image-webp", + "moxcms", "num-traits", "png", "qoi", @@ -3205,12 +3113,6 @@ dependencies = [ "libc", ] -[[package]] -name = "jpeg-decoder" -version = "0.3.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "00810f1d8b74be64b13dbf3db89ac67740615d6c891f0e7b6179326533011a07" - [[package]] name = "js-sys" version = "0.3.77" @@ -3231,18 +3133,6 @@ dependencies = [ "unicode-general-category", ] -[[package]] -name = "json-patch" -version = "4.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f300e415e2134745ef75f04562dd0145405c2f7fd92065db029ac4b16b57fe90" -dependencies = [ - "jsonptr", - "serde", - "serde_json", - "thiserror 1.0.69", -] - [[package]] name = "json5" version = "0.4.1" @@ -3254,29 +3144,6 @@ dependencies = [ "serde", ] -[[package]] -name = "jsonpath-rust" -version = "0.7.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0c00ae348f9f8fd2d09f82a98ca381c60df9e0820d8d79fce43e649b4dc3128b" -dependencies = [ - "pest", - "pest_derive", - "regex", - "serde_json", - "thiserror 2.0.16", -] - -[[package]] -name = "jsonptr" -version = "0.7.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a5a3cc660ba5d72bce0b3bb295bf20847ccbb40fd423f3f05b61273672e561fe" -dependencies = [ - "serde", - "serde_json", -] - [[package]] name = "jwalk" version = "0.8.1" @@ -3287,18 +3154,6 @@ dependencies = [ "rayon", ] -[[package]] -name = "k8s-openapi" -version = "0.26.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d13f06d5326a915becaffabdfab75051b8cdc260c2a5c06c0e90226ede89a692" -dependencies = [ - "base64 0.22.1", - "chrono", - "serde", - "serde_json", -] - [[package]] name = "kernel32-sys" version = "0.2.2" @@ -3309,115 +3164,6 @@ dependencies = [ "winapi-build", ] -[[package]] -name = "kube" -version = "2.0.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "48e7bb0b6a46502cc20e4575b6ff401af45cfea150b34ba272a3410b78aa014e" -dependencies = [ - "k8s-openapi", - "kube-client", - "kube-core", - "kube-derive", - "kube-runtime", -] - -[[package]] -name = "kube-client" -version = "2.0.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4987d57a184d2b5294fdad3d7fc7f278899469d21a4da39a8f6ca16426567a36" -dependencies = [ - "base64 0.22.1", - "bytes", - "chrono", - "either", - "futures", - "home", - "http", - "http-body", - "http-body-util", - "hyper", - "hyper-rustls", - "hyper-timeout", - "hyper-util", - "jsonpath-rust", - "k8s-openapi", - "kube-core", - "pem", - "rustls", - "secrecy", - "serde", - "serde_json", - "serde_yaml", - "thiserror 2.0.16", - "tokio", - "tokio-util", - "tower", - "tower-http", - "tracing", -] - -[[package]] -name = "kube-core" -version = "2.0.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "914bbb770e7bb721a06e3538c0edd2babed46447d128f7c21caa68747060ee73" -dependencies = [ - "chrono", - "derive_more 2.0.1", - "form_urlencoded", - "http", - "json-patch", - "k8s-openapi", - "schemars 1.0.4", - "serde", - "serde-value", - "serde_json", - "thiserror 2.0.16", -] - -[[package]] -name = "kube-derive" -version = "2.0.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "03dee8252be137772a6ab3508b81cd797dee62ee771112a2453bc85cbbe150d2" -dependencies = [ - "darling 0.21.3", - "proc-macro2", - "quote", - "serde", - "serde_json", - "syn 2.0.106", -] - -[[package]] -name = "kube-runtime" -version = "2.0.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6aea4de4b562c5cc89ab10300bb63474ae1fa57ff5a19275f2e26401a323e3fd" -dependencies = [ - "ahash", - "async-broadcast", - "async-stream", - "backon", - "educe", - "futures", - "hashbrown 0.15.5", - "hostname", - "json-patch", - "k8s-openapi", - "kube-client", - "parking_lot", - "pin-project", - "serde", - "serde_json", - "thiserror 2.0.16", - "tokio", - "tokio-util", - "tracing", -] - [[package]] name = "lalrpop-util" version = "0.20.2" @@ -3598,7 +3344,7 @@ version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d149aaa2965d70381709d9df4c7ee1fc0de1c614a4efc2ee356f5e43d68749f8" dependencies = [ - "derive_more 1.0.0", + "derive_more", "malachite", "num-integer", "num-traits", @@ -3855,6 +3601,16 @@ dependencies = [ "syn 2.0.106", ] +[[package]] +name = "moxcms" +version = "0.7.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0fbdd3d7436f8b5e892b8b7ea114271ff0fa00bc5acae845d53b07d498616ef6" +dependencies = [ + "num-traits", + "pxfm", +] + [[package]] name = "multimap" version = "0.10.1" @@ -4335,15 +4091,6 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "04744f49eae99ab78e0d5c0b603ab218f515ea8cfe5a456d7629ad883a3b6e7d" -[[package]] -name = "ordered-float" -version = "2.10.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "68f19d67e5a2795c94e73e0bb1cc1a7edeb2e28efd39e2e1c9b7a40c1108b11c" -dependencies = [ - "num-traits", -] - [[package]] name = "ordered-multimap" version = "0.7.3" @@ -4436,16 +4183,6 @@ dependencies = [ "syn 2.0.106", ] -[[package]] -name = "pem" -version = "3.0.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1d30c53c26bc5b31a98cd02d20f25a7c8567146caf63ed593a9d87b2775291be" -dependencies = [ - "base64 0.22.1", - "serde_core", -] - [[package]] name = "pem-rfc7468" version = "0.7.0" @@ -4616,11 +4353,11 @@ dependencies = [ [[package]] name = "png" -version = "0.17.16" +version = "0.18.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "82151a2fc869e011c153adc57cf2789ccb8d9906ce52c0b39a6b5697749d7526" +checksum = "97baced388464909d42d89643fe4361939af9b7ce7a31ee32a168f832a70f2a0" dependencies = [ - "bitflags 1.3.2", + "bitflags 2.9.3", "crc32fast", "fdeflate", "flate2", @@ -4884,6 +4621,15 @@ dependencies = [ "version_check", ] +[[package]] +name = "pxfm" +version = "0.1.25" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a3cbdf373972bf78df4d3b518d07003938e2c7d1fb5891e55f9cb6df57009d84" +dependencies = [ + "num-traits", +] + [[package]] name = "pyo3" version = "0.23.5" @@ -5792,23 +5538,10 @@ checksum = "82d20c4491bc164fa2f6c5d44565947a52ad80b9505d8e36f8d54c27c739fcd0" dependencies = [ "dyn-clone", "ref-cast", - "schemars_derive", "serde", "serde_json", ] -[[package]] -name = "schemars_derive" -version = "1.0.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "33d020396d1d138dc19f1165df7545479dcd58d93810dc5d646a16e55abefa80" -dependencies = [ - "proc-macro2", - "quote", - "serde_derive_internals", - "syn 2.0.106", -] - [[package]] name = "scopeguard" version = "1.2.0" @@ -5900,16 +5633,6 @@ dependencies = [ "typeid", ] -[[package]] -name = "serde-value" -version = "0.7.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f3a1a3341211875ef120e117ea7fd5228530ae7e7036a779fdc9117be6b3282c" -dependencies = [ - "ordered-float", - "serde", -] - [[package]] name = "serde_core" version = "1.0.228" @@ -5930,17 +5653,6 @@ dependencies = [ "syn 2.0.106", ] -[[package]] -name = "serde_derive_internals" -version = "0.29.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "18d26a20a969b9e3fdf2fc2d9f21eda6c40e2de84c9408bb5d3b05d499aae711" -dependencies = [ - "proc-macro2", - "quote", - "syn 2.0.106", -] - [[package]] name = "serde_json" version = "1.0.143" @@ -6040,7 +5752,7 @@ version = "3.14.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "de90945e6565ce0d9a25098082ed4ee4002e047cb59892c318d66821e14bb30f" dependencies = [ - "darling 0.20.11", + "darling", "proc-macro2", "quote", "syn 2.0.106", @@ -6438,13 +6150,16 @@ dependencies = [ [[package]] name = "tiff" -version = "0.9.1" +version = "0.10.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ba1310fcea54c6a9a4fd1aad794ecc02c31682f6bfbecdf460bf19533eed1e3e" +checksum = "af9605de7fee8d9551863fd692cce7637f548dbd9db9180fcc07ccc6d26c336f" dependencies = [ + "fax", "flate2", - "jpeg-decoder", + "half", + "quick-error", "weezl", + "zune-jpeg", ] [[package]] @@ -6636,7 +6351,6 @@ dependencies = [ "futures-sink", "futures-util", "pin-project-lite", - "slab", "tokio", ] @@ -6866,14 +6580,12 @@ version = "0.6.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "adc82fd73de2a9722ac5da747f12383d2bfdb93591ee6c58486e0097890f05f2" dependencies = [ - "base64 0.22.1", "bitflags 2.9.3", "bytes", "futures-util", "http", "http-body", "iri-string", - "mime", "pin-project-lite", "tower", "tower-layer", @@ -7337,7 +7049,7 @@ version = "0.20.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b7df16e474ef958526d1205f6dda359fdfab79d9aa6d54bafcb92dcd07673dca" dependencies = [ - "darling 0.20.11", + "darling", "once_cell", "proc-macro-error2", "proc-macro2", diff --git a/lib/llm/Cargo.toml b/lib/llm/Cargo.toml index 105541f263..b575a7e090 100644 --- a/lib/llm/Cargo.toml +++ b/lib/llm/Cargo.toml @@ -145,7 +145,7 @@ json-five = { version = "0.3" } # media loading in the preprocessor reqwest = { workspace = true } base64 = { version = "0.22" } -image = { version = "0.25", features = ["default", "serde"] } +image = { version = "0.25", features = ["serde"] } tokio-rayon = {version = "2" } ndarray = { version = "0.16" } From d33d718dba8d604e2430f83760ed0721d48c9706 Mon Sep 17 00:00:00 2001 From: Alexandre Milesi Date: Mon, 10 Nov 2025 14:23:36 -0800 Subject: [PATCH 5/7] feat: switch to media-nixl feature flag Signed-off-by: Alexandre Milesi --- Cargo.lock | 6 +- lib/bindings/python/Cargo.lock | 110 +------------------ lib/llm/Cargo.toml | 4 +- lib/llm/src/mocker/engine.rs | 1 + lib/llm/src/preprocessor.rs | 26 +++-- lib/llm/src/preprocessor/media.rs | 5 +- lib/llm/src/preprocessor/media/loader.rs | 91 ++++++++------- lib/llm/src/preprocessor/media/rdma.rs | 31 ++++-- lib/llm/src/protocols/common/preprocessor.rs | 2 + lib/memory/Cargo.toml | 2 +- lib/memory/src/nixl/agent.rs | 4 - 11 files changed, 105 insertions(+), 177 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 69f2a1c124..f00ff9a981 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5556,10 +5556,10 @@ dependencies = [ [[package]] name = "nixl-sys" -version = "0.7.0" -source = "git+https://github.com/ai-dynamo/nixl?rev=00bac00#00bac003950fd00cccc05dc880f58c17ae46ebd1" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6d80bd4b5b8363cfd933000a8757a453e58ee10ee6e400c38ae31db512444a31" dependencies = [ - "anyhow", "bindgen 0.71.1", "cc", "libc", diff --git a/lib/bindings/python/Cargo.lock b/lib/bindings/python/Cargo.lock index 1fd36a7281..2125a1bb83 100644 --- a/lib/bindings/python/Cargo.lock +++ b/lib/bindings/python/Cargo.lock @@ -358,7 +358,7 @@ version = "0.30.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dbfd150b5dbdb988bcc8fb1fe787eb6b7ee6180ca24da683b61ea5405f3d43ff" dependencies = [ - "bindgen 0.69.5", + "bindgen", "cc", "cmake", "dunce", @@ -558,26 +558,6 @@ dependencies = [ "which", ] -[[package]] -name = "bindgen" -version = "0.71.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5f58bf3d7db68cfbac37cfc485a8d711e87e064c3d0fe0435b92f7a407f9d6b3" -dependencies = [ - "bitflags 2.9.3", - "cexpr", - "clang-sys", - "itertools 0.12.1", - "log", - "prettyplease", - "proc-macro2", - "quote", - "regex", - "rustc-hash 2.1.1", - "shlex", - "syn 2.0.106", -] - [[package]] name = "bit-set" version = "0.5.3" @@ -1463,13 +1443,6 @@ dependencies = [ "uuid", ] -[[package]] -name = "dynamo-config" -version = "0.6.1" -dependencies = [ - "anyhow", -] - [[package]] name = "dynamo-llm" version = "0.7.0" @@ -1498,7 +1471,6 @@ dependencies = [ "derive_builder", "dialoguer", "dynamo-async-openai", - "dynamo-memory", "dynamo-parsers", "dynamo-runtime", "either", @@ -1555,22 +1527,6 @@ dependencies = [ "zeromq", ] -[[package]] -name = "dynamo-memory" -version = "0.6.1" -dependencies = [ - "anyhow", - "cudarc", - "dynamo-config", - "libc", - "nix 0.30.1", - "nixl-sys", - "offset-allocator", - "serde", - "thiserror 2.0.16", - "tracing", -] - [[package]] name = "dynamo-parsers" version = "0.7.0" @@ -1654,7 +1610,7 @@ dependencies = [ "local-ip-address", "log", "nid", - "nix 0.29.0", + "nix", "nuid", "once_cell", "opentelemetry", @@ -3697,34 +3653,6 @@ dependencies = [ "libc", ] -[[package]] -name = "nix" -version = "0.30.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "74523f3a35e05aba87a1d978330aef40f67b0304ac79c1c00b294c9830543db6" -dependencies = [ - "bitflags 2.9.3", - "cfg-if 1.0.3", - "cfg_aliases", - "libc", -] - -[[package]] -name = "nixl-sys" -version = "0.7.0" -source = "git+https://github.com/ai-dynamo/nixl?rev=00bac00#00bac003950fd00cccc05dc880f58c17ae46ebd1" -dependencies = [ - "anyhow", - "bindgen 0.71.1", - "cc", - "libc", - "os_info", - "pkg-config", - "serde", - "thiserror 2.0.16", - "tracing", -] - [[package]] name = "nkeys" version = "0.4.5" @@ -4101,18 +4029,6 @@ dependencies = [ "hashbrown 0.14.5", ] -[[package]] -name = "os_info" -version = "3.12.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d0e1ac5fde8d43c34139135df8ea9ee9465394b2d8d20f032d38998f64afffc3" -dependencies = [ - "log", - "plist", - "serde", - "windows-sys 0.52.0", -] - [[package]] name = "overload" version = "0.1.1" @@ -4338,19 +4254,6 @@ version = "0.3.32" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7edddbd0b52d732b21ad9a5fab5c704c14cd949e5e9a1ec5929a24fded1b904c" -[[package]] -name = "plist" -version = "1.8.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "740ebea15c5d1428f910cd1a5f52cebf8d25006245ed8ade92702f4943d91e07" -dependencies = [ - "base64 0.22.1", - "indexmap 2.11.0", - "quick-xml", - "serde", - "time", -] - [[package]] name = "png" version = "0.18.0" @@ -4746,15 +4649,6 @@ version = "2.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a993555f31e5a609f617c12db6250dedcac1b0a85076912c436e6fc9b2c8e6a3" -[[package]] -name = "quick-xml" -version = "0.38.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "42a232e7487fc2ef313d96dde7948e7a3c05101870d8985e4fd8d26aedd27b89" -dependencies = [ - "memchr", -] - [[package]] name = "quinn" version = "0.11.9" diff --git a/lib/llm/Cargo.toml b/lib/llm/Cargo.toml index b575a7e090..9e871cad98 100644 --- a/lib/llm/Cargo.toml +++ b/lib/llm/Cargo.toml @@ -24,7 +24,7 @@ testing-etcd = [] block-manager = ["dep:nixl-sys", "dep:cudarc", "dep:nix", "dep:aligned-vec"] cuda = ["dep:cudarc"] integration = ["dynamo-runtime/integration"] -media-nixl = ["dep:dynamo-memory", "dep:nixl-sys"] +media-nixl = ["dep:nixl-sys", "dep:dynamo-memory"] [[bench]] name = "tokenizer" @@ -98,7 +98,7 @@ dialoguer = { version = "0.11", default-features = false, features = [ # block_manager aligned-vec = { version = "0.6.4", optional = true } -nixl-sys = { git = "https://github.com/ai-dynamo/nixl", rev = "00bac00", optional = true } +nixl-sys = { version = "0.7", optional = true } cudarc = { workspace = true, optional = true } nix = { version = "0.26", optional = true } diff --git a/lib/llm/src/mocker/engine.rs b/lib/llm/src/mocker/engine.rs index bb0b28ddc2..5858f2fa58 100644 --- a/lib/llm/src/mocker/engine.rs +++ b/lib/llm/src/mocker/engine.rs @@ -228,6 +228,7 @@ impl AsyncEngine, ManyOut, Error> input: SingleIn, ) -> Result, Error> { let (request, ctx) = input.into_parts(); + println!("request: {request:?}"); // Extract dp_rank from request field (defaults to 0 if not set) let dp_rank = request.dp_rank.unwrap_or(0); diff --git a/lib/llm/src/preprocessor.rs b/lib/llm/src/preprocessor.rs index f583644e45..74963f6be5 100644 --- a/lib/llm/src/preprocessor.rs +++ b/lib/llm/src/preprocessor.rs @@ -27,7 +27,8 @@ use std::{collections::HashMap, pin::Pin, sync::Arc}; use tracing; use crate::model_card::{ModelDeploymentCard, ModelInfo}; -use crate::preprocessor::media::MediaLoader; +#[cfg(feature = "media-nixl")] +use crate::preprocessor::media::{MediaDecoder, MediaLoader, MediaFetcher}; use crate::preprocessor::prompt::OAIChatLikeRequest; use crate::protocols::common::preprocessor::{ MultimodalData, MultimodalDataMap, PreprocessedRequestBuilder, @@ -114,6 +115,7 @@ pub struct OpenAIPreprocessor { /// Per-model runtime configuration propagated to response generator (e.g., reasoning/tool parser) runtime_config: crate::local_model::runtime_config::ModelRuntimeConfig, tool_call_parser: Option, + #[cfg(feature = "media-nixl")] media_loader: Option, } @@ -143,7 +145,8 @@ impl OpenAIPreprocessor { // // Initialize runtime config from the ModelDeploymentCard let runtime_config = mdc.runtime_config.clone(); - let media_loader = None; // TODO: enable with decoder config from MDC + #[cfg(feature = "media-nixl")] + let media_loader = Some(MediaLoader::new(MediaDecoder::default(), MediaFetcher::default())?); Ok(Arc::new(Self { formatter, tokenizer, @@ -151,6 +154,7 @@ impl OpenAIPreprocessor { mdcsum, runtime_config, tool_call_parser, + #[cfg(feature = "media-nixl")] media_loader, })) } @@ -279,7 +283,8 @@ impl OpenAIPreprocessor { let messages = request.messages(); let message_count = messages.len().unwrap_or(0); let mut media_map: MultimodalDataMap = HashMap::new(); - let mut fetch_tasks = Vec::new(); + #[cfg(feature = "media-nixl")] + let mut fetch_tasks: Vec<(String, ChatCompletionRequestUserMessageContentPart)> = Vec::new(); for idx in 0..message_count { let msg = messages @@ -312,19 +317,22 @@ impl OpenAIPreprocessor { _ => continue, }; + #[cfg(feature = "media-nixl")] if self.media_loader.is_some() { fetch_tasks.push((type_str, content_part.clone())); - } else { - // No loader, just pass the URL through - media_map - .entry(type_str) - .or_default() - .push(MultimodalData::Url(url)); + continue; } + + //Fallback: ust pass the URL through + media_map + .entry(type_str) + .or_default() + .push(MultimodalData::Url(url)); } } // Execute all fetch tasks + #[cfg(feature = "media-nixl")] if !fetch_tasks.is_empty() { let loader = self.media_loader.as_ref().unwrap(); let results = futures::future::join_all( diff --git a/lib/llm/src/preprocessor/media.rs b/lib/llm/src/preprocessor/media.rs index 34ac68772f..65566c9937 100644 --- a/lib/llm/src/preprocessor/media.rs +++ b/lib/llm/src/preprocessor/media.rs @@ -9,4 +9,7 @@ mod rdma; pub use common::EncodedMediaData; pub use decoders::{Decoder, ImageDecoder, MediaDecoder}; pub use loader::{MediaFetcher, MediaLoader}; -pub use rdma::{DecodedMediaData, RdmaMediaDataDescriptor, get_nixl_agent, get_nixl_metadata}; + +pub use rdma::{DecodedMediaData, RdmaMediaDataDescriptor}; +#[cfg(feature = "media-nixl")] +pub use rdma::{get_nixl_agent, get_nixl_metadata}; diff --git a/lib/llm/src/preprocessor/media/loader.rs b/lib/llm/src/preprocessor/media/loader.rs index 1d63a36575..b37e8e5e79 100644 --- a/lib/llm/src/preprocessor/media/loader.rs +++ b/lib/llm/src/preprocessor/media/loader.rs @@ -8,10 +8,17 @@ use anyhow::Result; use dynamo_async_openai::types::ChatCompletionRequestUserMessageContentPart; -use super::common::EncodedMediaData; -use super::decoders::{Decoder, MediaDecoder}; -use super::rdma::{RdmaMediaDataDescriptor, get_nixl_agent}; -use dynamo_memory::nixl::NixlAgent; +use super::decoders::{MediaDecoder}; +use super::rdma::RdmaMediaDataDescriptor; + +#[cfg(feature = "media-nixl")] +use { + super::rdma::get_nixl_agent, + dynamo_memory::nixl::NixlAgent, + super::common::EncodedMediaData, + super::decoders::Decoder +}; + const DEFAULT_HTTP_USER_AGENT: &str = "dynamo-ai/dynamo"; const DEFAULT_HTTP_TIMEOUT: Duration = Duration::from_secs(30); @@ -38,10 +45,13 @@ impl Default for MediaFetcher { } pub struct MediaLoader { + #[allow(dead_code)] media_decoder: MediaDecoder, + #[allow(dead_code)] http_client: reqwest::Client, media_fetcher: MediaFetcher, - nixl_agent: Option, + #[cfg(feature = "media-nixl")] + nixl_agent: NixlAgent, } impl MediaLoader { @@ -55,20 +65,14 @@ impl MediaLoader { let http_client = http_client_builder.build()?; - let nixl_agent = match get_nixl_agent() { - Ok(agent) => Some(agent), - Err(e) => { - tracing::warn!( - "Error when creating NIXL agent (will not be able to register media data): {e}" - ); - None - } - }; + #[cfg(feature = "media-nixl")] + let nixl_agent = get_nixl_agent()?; Ok(Self { media_decoder, http_client, media_fetcher, + #[cfg(feature = "media-nixl")] nixl_agent, }) } @@ -104,36 +108,40 @@ impl MediaLoader { oai_content_part: &ChatCompletionRequestUserMessageContentPart, // TODO: request-level options ) -> Result { - if self.nixl_agent.is_none() { - anyhow::bail!("NIXL agent is not available, cannot decode and register media data"); - } + #[cfg(not(feature = "media-nixl"))] + anyhow::bail!("NIXL is not supported, cannot decode and register media data {oai_content_part:?}"); - // fetch the media, decode and NIXL-register - let decoded = match oai_content_part { - ChatCompletionRequestUserMessageContentPart::ImageUrl(image_part) => { - let url = &image_part.image_url.url; - self.check_if_url_allowed(url)?; - let data = EncodedMediaData::from_url(url, &self.http_client).await?; - self.media_decoder.image_decoder.decode_async(data).await? - } - ChatCompletionRequestUserMessageContentPart::VideoUrl(video_part) => { - let url = &video_part.video_url.url; - self.check_if_url_allowed(url)?; - EncodedMediaData::from_url(url, &self.http_client).await?; - anyhow::bail!("Video decoding is not supported yet"); - } - ChatCompletionRequestUserMessageContentPart::AudioUrl(_) => { - anyhow::bail!("Audio decoding is not supported yet"); - } - _ => anyhow::bail!("Unsupported media type"), - }; + #[cfg(feature = "media-nixl")] + { + // fetch the media, decode and NIXL-register + let decoded = match oai_content_part { + ChatCompletionRequestUserMessageContentPart::ImageUrl(image_part) => { + let url = &image_part.image_url.url; + self.check_if_url_allowed(url)?; + let data = EncodedMediaData::from_url(url, &self.http_client).await?; + self.media_decoder.image_decoder.decode_async(data).await? + } + ChatCompletionRequestUserMessageContentPart::VideoUrl(video_part) => { + let url = &video_part.video_url.url; + self.check_if_url_allowed(url)?; + EncodedMediaData::from_url(url, &self.http_client).await?; + anyhow::bail!("Video decoding is not supported yet"); + } + ChatCompletionRequestUserMessageContentPart::AudioUrl(_) => { + anyhow::bail!("Audio decoding is not supported yet"); + } + _ => anyhow::bail!("Unsupported media type"), + }; + + + let rdma_descriptor = decoded.into_rdma_descriptor(&self.nixl_agent)?; + Ok(rdma_descriptor) + } - let rdma_descriptor = decoded.into_rdma_descriptor(self.nixl_agent.as_ref().unwrap())?; - Ok(rdma_descriptor) } } -#[cfg(test)] +#[cfg(all(test, feature = "media-nixl"))] mod tests { use super::super::rdma::DataType; use super::*; @@ -205,6 +213,11 @@ mod tests { "Source storage should be registered with NIXL" ); } +} + +#[cfg(test)] +mod tests_non_nixl { + use super::*; #[test] fn test_direct_ip_blocked() { diff --git a/lib/llm/src/preprocessor/media/rdma.rs b/lib/llm/src/preprocessor/media/rdma.rs index 7b39e45111..f250479743 100644 --- a/lib/llm/src/preprocessor/media/rdma.rs +++ b/lib/llm/src/preprocessor/media/rdma.rs @@ -2,14 +2,15 @@ // SPDX-License-Identifier: Apache-2.0 use anyhow::Result; -use base64::{Engine as _, engine::general_purpose}; use ndarray::{ArrayBase, Dimension, OwnedRepr}; use serde::{Deserialize, Serialize}; -use std::sync::Arc; -use dynamo_memory::{ - StorageError, SystemStorage, - nixl::{self, NixlAgent, NixlDescriptor, RegisteredView}, +#[cfg(feature = "media-nixl")] +use { + dynamo_memory::SystemStorage, + dynamo_memory::nixl::{self, NixlAgent, NixlDescriptor, RegisteredView}, + base64::{Engine as _, engine::general_purpose}, + std::sync::Arc, }; use super::decoders::DecodedMediaMetadata; @@ -30,16 +31,20 @@ pub struct MediaTensorInfo { // Decoded media data (image RGB, video frames pixels, ...) #[derive(Debug)] pub struct DecodedMediaData { + #[cfg(feature = "media-nixl")] pub(crate) data: SystemStorage, pub(crate) tensor_info: MediaTensorInfo, } // Decoded media data NIXL descriptor (sent to the next step in the pipeline / NATS) + #[derive(Serialize, Deserialize, Clone, Debug)] pub struct RdmaMediaDataDescriptor { // b64 agent metadata + #[cfg(feature = "media-nixl")] pub(crate) nixl_metadata: String, // tensor descriptor + #[cfg(feature = "media-nixl")] pub(crate) nixl_descriptor: NixlDescriptor, #[serde(flatten)] @@ -48,12 +53,13 @@ pub struct RdmaMediaDataDescriptor { // reference to the actual data, kept alive while the rdma descriptor is alive #[serde(skip, default)] #[allow(dead_code)] + #[cfg(feature = "media-nixl")] pub(crate) source_storage: Option>>, } impl DecodedMediaData { + #[cfg(feature = "media-nixl")] pub fn into_rdma_descriptor(self, nixl_agent: &NixlAgent) -> Result { - // Register storage with NIXL let source_storage = self.data; let registered = nixl::register_with_nixl(source_storage, nixl_agent, None) .map_err(|_| anyhow::anyhow!("Failed to register storage with NIXL"))?; @@ -73,21 +79,24 @@ impl DecodedMediaData { // convert Array{N} to DecodedMediaData // TODO: Array1 for audio + impl TryFrom, D>> for DecodedMediaData { - type Error = StorageError; + type Error = anyhow::Error; fn try_from(array: ArrayBase, D>) -> Result { let shape = array.shape().to_vec(); - let (data_vec, _) = array.into_raw_vec_and_offset(); - // Allocate new system storage and copy data - // TODO: use arena allocator and avoid copies + #[cfg(feature = "media-nixl")] + let (data_vec, _) = array.into_raw_vec_and_offset(); + #[cfg(feature = "media-nixl")] let mut storage = SystemStorage::new(data_vec.len())?; + #[cfg(feature = "media-nixl")] unsafe { std::ptr::copy_nonoverlapping(data_vec.as_ptr(), storage.as_mut_ptr(), data_vec.len()); } Ok(Self { + #[cfg(feature = "media-nixl")] data: storage, tensor_info: MediaTensorInfo { shape, @@ -101,6 +110,7 @@ impl TryFrom, D>> for DecodedMediaData { // Get NIXL metadata for a descriptor // Avoids cross-request leak possibility and reduces metadata size // TODO: pre-allocate a fixed NIXL-registered RAM pool so metadata can be cached on the target? +#[cfg(feature = "media-nixl")] pub fn get_nixl_metadata(agent: &NixlAgent, _storage: &SystemStorage) -> Result { // WAR: Until https://github.com/ai-dynamo/nixl/pull/970 is merged, can't use get_local_partial_md let nixl_md = agent.raw_agent().get_local_md()?; @@ -112,6 +122,7 @@ pub fn get_nixl_metadata(agent: &NixlAgent, _storage: &SystemStorage) -> Result< Ok(format!("b64:{}", b64_encoded)) } +#[cfg(feature = "media-nixl")] pub fn get_nixl_agent() -> Result { let name = format!("media-loader-{}", uuid::Uuid::new_v4()); let nixl_agent = NixlAgent::with_backends(&name, &["UCX"])?; diff --git a/lib/llm/src/protocols/common/preprocessor.rs b/lib/llm/src/protocols/common/preprocessor.rs index 7d5abf4c50..fc8f01cac1 100644 --- a/lib/llm/src/protocols/common/preprocessor.rs +++ b/lib/llm/src/protocols/common/preprocessor.rs @@ -6,12 +6,14 @@ use serde::{Deserialize, Serialize}; use super::{OutputOptions, SamplingOptions, StopConditions}; use crate::kv_router::RouterConfigOverride; +#[cfg(feature = "media-nixl")] use crate::preprocessor::media::RdmaMediaDataDescriptor; use crate::protocols::TokenIdType; #[derive(Serialize, Deserialize, Debug, Clone)] pub enum MultimodalData { Url(url::Url), + #[cfg(feature = "media-nixl")] Decoded(RdmaMediaDataDescriptor), } diff --git a/lib/memory/Cargo.toml b/lib/memory/Cargo.toml index 43bd04c82b..c435b2a278 100644 --- a/lib/memory/Cargo.toml +++ b/lib/memory/Cargo.toml @@ -26,7 +26,7 @@ dynamo-config = { workspace = true } anyhow = { workspace = true } cudarc = { workspace = true } -nixl-sys = { git = "https://github.com/ai-dynamo/nixl", rev = "00bac00" } +nixl-sys = { version = "0.7" } serde = { workspace = true} thiserror = { workspace = true } tracing = { workspace = true } diff --git a/lib/memory/src/nixl/agent.rs b/lib/memory/src/nixl/agent.rs index f5361674ff..a1cf4343fb 100644 --- a/lib/memory/src/nixl/agent.rs +++ b/lib/memory/src/nixl/agent.rs @@ -32,10 +32,6 @@ pub struct NixlAgent { impl NixlAgent { /// Create a NIXL agent without any backends. pub fn new(name: &str) -> Result { - if nixl_sys::is_stub() { - return Err(anyhow::anyhow!("NIXL is stubbed, cannot create agent")); - } - let agent = Agent::new(name)?; Ok(Self { From 94d027db29b97b6b64de2ce0e81c8d40cb7726b4 Mon Sep 17 00:00:00 2001 From: Alexandre Milesi Date: Mon, 10 Nov 2025 14:26:30 -0800 Subject: [PATCH 6/7] chore: cleanups Signed-off-by: Alexandre Milesi --- Cargo.lock | 4 +- lib/bindings/python/Cargo.lock | 341 ++++++++++++++++++++++- lib/llm/Cargo.toml | 2 +- lib/llm/src/mocker/engine.rs | 1 - lib/llm/src/preprocessor.rs | 12 +- lib/llm/src/preprocessor/media/README.md | 36 ++- lib/llm/src/preprocessor/media/loader.rs | 24 +- lib/llm/src/preprocessor/media/rdma.rs | 2 +- 8 files changed, 392 insertions(+), 30 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f00ff9a981..0c2bd33158 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5556,9 +5556,9 @@ dependencies = [ [[package]] name = "nixl-sys" -version = "0.7.1" +version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6d80bd4b5b8363cfd933000a8757a453e58ee10ee6e400c38ae31db512444a31" +checksum = "a73b92494c94b2ff2d004cd9274d966863089e867dc9cd98bc640aefe7622036" dependencies = [ "bindgen 0.71.1", "cc", diff --git a/lib/bindings/python/Cargo.lock b/lib/bindings/python/Cargo.lock index 2125a1bb83..55211a6341 100644 --- a/lib/bindings/python/Cargo.lock +++ b/lib/bindings/python/Cargo.lock @@ -55,6 +55,12 @@ dependencies = [ "equator", ] +[[package]] +name = "allocator-api2" +version = "0.2.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "683d7910e743518b0e34f1186f92494becacb047c7b6bf616c96772180fef923" + [[package]] name = "android-tzdata" version = "0.1.1" @@ -173,6 +179,18 @@ version = "0.7.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7c02d123df017efcdfbd739ef81735b36c5ba83ec3c59c80a9d7ecc718f92e50" +[[package]] +name = "async-broadcast" +version = "0.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "435a87a52755b8f27fcf321ac4f04b2802e337c8c4872923137471ec39c37532" +dependencies = [ + "event-listener", + "event-listener-strategy", + "futures-core", + "pin-project-lite", +] + [[package]] name = "async-channel" version = "2.5.0" @@ -467,6 +485,17 @@ dependencies = [ "tokio", ] +[[package]] +name = "backon" +version = "1.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cffb0e931875b666fc4fcb20fee52e9bbd1ef836fd9e9e04ec21555f9f85f7ef" +dependencies = [ + "fastrand", + "gloo-timers", + "tokio", +] + [[package]] name = "backtrace" version = "0.3.75" @@ -1144,8 +1173,18 @@ version = "0.20.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fc7f46116c46ff9ab3eb1597a45688b6715c6e628b5c133e288e709a29bcb4ee" dependencies = [ - "darling_core", - "darling_macro", + "darling_core 0.20.11", + "darling_macro 0.20.11", +] + +[[package]] +name = "darling" +version = "0.21.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9cdf337090841a411e2a7f3deb9187445851f91b309c0c0a29e05f74a00a48c0" +dependencies = [ + "darling_core 0.21.3", + "darling_macro 0.21.3", ] [[package]] @@ -1162,13 +1201,38 @@ dependencies = [ "syn 2.0.106", ] +[[package]] +name = "darling_core" +version = "0.21.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1247195ecd7e3c85f83c8d2a366e4210d588e802133e1e355180a9870b517ea4" +dependencies = [ + "fnv", + "ident_case", + "proc-macro2", + "quote", + "strsim", + "syn 2.0.106", +] + [[package]] name = "darling_macro" version = "0.20.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fc34b93ccb385b40dc71c6fceac4b2ad23662c7eeb248cf10d529b7e055b6ead" dependencies = [ - "darling_core", + "darling_core 0.20.11", + "quote", + "syn 2.0.106", +] + +[[package]] +name = "darling_macro" +version = "0.21.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d38308df82d1080de0afee5d069fa14b0326a88c14f15c5ccda35b4a6c414c81" +dependencies = [ + "darling_core 0.21.3", "quote", "syn 2.0.106", ] @@ -1265,7 +1329,7 @@ version = "0.20.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2d5bcf7b024d6835cfb3d473887cd966994907effbe9227e8c8219824d06c4e8" dependencies = [ - "darling", + "darling 0.20.11", "proc-macro2", "quote", "syn 2.0.106", @@ -1287,7 +1351,16 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4a9b99b9cbbe49445b21764dc0625032a89b145a2642e67603e1c936f5458d05" dependencies = [ - "derive_more-impl", + "derive_more-impl 1.0.0", +] + +[[package]] +name = "derive_more" +version = "2.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "093242cf7570c207c83073cf82f79706fe7b8317e98620a47d5be7c3d8497678" +dependencies = [ + "derive_more-impl 2.0.1", ] [[package]] @@ -1302,6 +1375,17 @@ dependencies = [ "unicode-xid", ] +[[package]] +name = "derive_more-impl" +version = "2.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bda628edc44c4bb645fbe0f758797143e4e07926f7ebf4e9bdfbd3d2ce621df3" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.106", +] + [[package]] name = "dialoguer" version = "0.11.0" @@ -1607,6 +1691,8 @@ dependencies = [ "futures", "humantime", "inotify", + "k8s-openapi", + "kube", "local-ip-address", "log", "nid", @@ -1621,6 +1707,7 @@ dependencies = [ "rand 0.9.2", "rayon", "regex", + "reqwest", "serde", "serde_json", "socket2 0.5.10", @@ -2407,6 +2494,18 @@ version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0cc23270f6e1808e30a928bdc84dea0b9b4136a8bc82338574f23baf47bbd280" +[[package]] +name = "gloo-timers" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbb143cf96099802033e0d4f4963b19fd2e0b728bcf076cd9cf7f6634f092994" +dependencies = [ + "futures-channel", + "futures-core", + "js-sys", + "wasm-bindgen", +] + [[package]] name = "h2" version = "0.4.12" @@ -2461,6 +2560,8 @@ version = "0.15.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9229cfe53dfd69f0609a49f65461bd93001ea1ef889cd5529dd176593f5338a1" dependencies = [ + "allocator-api2", + "equivalent", "foldhash", ] @@ -2523,6 +2624,17 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "hostname" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a56f203cd1c76362b69e3863fd987520ac36cf70a8c92627449b2f64a8cf7d65" +dependencies = [ + "cfg-if 1.0.3", + "libc", + "windows-link 0.1.3", +] + [[package]] name = "http" version = "1.3.1" @@ -2607,6 +2719,7 @@ dependencies = [ "http", "hyper", "hyper-util", + "log", "rustls", "rustls-native-certs 0.8.1", "rustls-pki-types", @@ -3089,6 +3202,18 @@ dependencies = [ "unicode-general-category", ] +[[package]] +name = "json-patch" +version = "4.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f300e415e2134745ef75f04562dd0145405c2f7fd92065db029ac4b16b57fe90" +dependencies = [ + "jsonptr", + "serde", + "serde_json", + "thiserror 1.0.69", +] + [[package]] name = "json5" version = "0.4.1" @@ -3100,6 +3225,29 @@ dependencies = [ "serde", ] +[[package]] +name = "jsonpath-rust" +version = "0.7.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c00ae348f9f8fd2d09f82a98ca381c60df9e0820d8d79fce43e649b4dc3128b" +dependencies = [ + "pest", + "pest_derive", + "regex", + "serde_json", + "thiserror 2.0.16", +] + +[[package]] +name = "jsonptr" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a5a3cc660ba5d72bce0b3bb295bf20847ccbb40fd423f3f05b61273672e561fe" +dependencies = [ + "serde", + "serde_json", +] + [[package]] name = "jwalk" version = "0.8.1" @@ -3110,6 +3258,18 @@ dependencies = [ "rayon", ] +[[package]] +name = "k8s-openapi" +version = "0.26.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d13f06d5326a915becaffabdfab75051b8cdc260c2a5c06c0e90226ede89a692" +dependencies = [ + "base64 0.22.1", + "chrono", + "serde", + "serde_json", +] + [[package]] name = "kernel32-sys" version = "0.2.2" @@ -3120,6 +3280,115 @@ dependencies = [ "winapi-build", ] +[[package]] +name = "kube" +version = "2.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "48e7bb0b6a46502cc20e4575b6ff401af45cfea150b34ba272a3410b78aa014e" +dependencies = [ + "k8s-openapi", + "kube-client", + "kube-core", + "kube-derive", + "kube-runtime", +] + +[[package]] +name = "kube-client" +version = "2.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4987d57a184d2b5294fdad3d7fc7f278899469d21a4da39a8f6ca16426567a36" +dependencies = [ + "base64 0.22.1", + "bytes", + "chrono", + "either", + "futures", + "home", + "http", + "http-body", + "http-body-util", + "hyper", + "hyper-rustls", + "hyper-timeout", + "hyper-util", + "jsonpath-rust", + "k8s-openapi", + "kube-core", + "pem", + "rustls", + "secrecy", + "serde", + "serde_json", + "serde_yaml", + "thiserror 2.0.16", + "tokio", + "tokio-util", + "tower", + "tower-http", + "tracing", +] + +[[package]] +name = "kube-core" +version = "2.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "914bbb770e7bb721a06e3538c0edd2babed46447d128f7c21caa68747060ee73" +dependencies = [ + "chrono", + "derive_more 2.0.1", + "form_urlencoded", + "http", + "json-patch", + "k8s-openapi", + "schemars 1.0.4", + "serde", + "serde-value", + "serde_json", + "thiserror 2.0.16", +] + +[[package]] +name = "kube-derive" +version = "2.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "03dee8252be137772a6ab3508b81cd797dee62ee771112a2453bc85cbbe150d2" +dependencies = [ + "darling 0.21.3", + "proc-macro2", + "quote", + "serde", + "serde_json", + "syn 2.0.106", +] + +[[package]] +name = "kube-runtime" +version = "2.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6aea4de4b562c5cc89ab10300bb63474ae1fa57ff5a19275f2e26401a323e3fd" +dependencies = [ + "ahash", + "async-broadcast", + "async-stream", + "backon", + "educe", + "futures", + "hashbrown 0.15.5", + "hostname", + "json-patch", + "k8s-openapi", + "kube-client", + "parking_lot", + "pin-project", + "serde", + "serde_json", + "thiserror 2.0.16", + "tokio", + "tokio-util", + "tracing", +] + [[package]] name = "lalrpop-util" version = "0.20.2" @@ -3300,7 +3569,7 @@ version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d149aaa2965d70381709d9df4c7ee1fc0de1c614a4efc2ee356f5e43d68749f8" dependencies = [ - "derive_more", + "derive_more 1.0.0", "malachite", "num-integer", "num-traits", @@ -4019,6 +4288,15 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "04744f49eae99ab78e0d5c0b603ab218f515ea8cfe5a456d7629ad883a3b6e7d" +[[package]] +name = "ordered-float" +version = "2.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "68f19d67e5a2795c94e73e0bb1cc1a7edeb2e28efd39e2e1c9b7a40c1108b11c" +dependencies = [ + "num-traits", +] + [[package]] name = "ordered-multimap" version = "0.7.3" @@ -4099,6 +4377,16 @@ dependencies = [ "syn 2.0.106", ] +[[package]] +name = "pem" +version = "3.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d30c53c26bc5b31a98cd02d20f25a7c8567146caf63ed593a9d87b2775291be" +dependencies = [ + "base64 0.22.1", + "serde_core", +] + [[package]] name = "pem-rfc7468" version = "0.7.0" @@ -5432,10 +5720,23 @@ checksum = "82d20c4491bc164fa2f6c5d44565947a52ad80b9505d8e36f8d54c27c739fcd0" dependencies = [ "dyn-clone", "ref-cast", + "schemars_derive", "serde", "serde_json", ] +[[package]] +name = "schemars_derive" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "33d020396d1d138dc19f1165df7545479dcd58d93810dc5d646a16e55abefa80" +dependencies = [ + "proc-macro2", + "quote", + "serde_derive_internals", + "syn 2.0.106", +] + [[package]] name = "scopeguard" version = "1.2.0" @@ -5527,6 +5828,16 @@ dependencies = [ "typeid", ] +[[package]] +name = "serde-value" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f3a1a3341211875ef120e117ea7fd5228530ae7e7036a779fdc9117be6b3282c" +dependencies = [ + "ordered-float", + "serde", +] + [[package]] name = "serde_core" version = "1.0.228" @@ -5547,6 +5858,17 @@ dependencies = [ "syn 2.0.106", ] +[[package]] +name = "serde_derive_internals" +version = "0.29.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "18d26a20a969b9e3fdf2fc2d9f21eda6c40e2de84c9408bb5d3b05d499aae711" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.106", +] + [[package]] name = "serde_json" version = "1.0.143" @@ -5646,7 +5968,7 @@ version = "3.14.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "de90945e6565ce0d9a25098082ed4ee4002e047cb59892c318d66821e14bb30f" dependencies = [ - "darling", + "darling 0.20.11", "proc-macro2", "quote", "syn 2.0.106", @@ -6245,6 +6567,7 @@ dependencies = [ "futures-sink", "futures-util", "pin-project-lite", + "slab", "tokio", ] @@ -6474,12 +6797,14 @@ version = "0.6.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "adc82fd73de2a9722ac5da747f12383d2bfdb93591ee6c58486e0097890f05f2" dependencies = [ + "base64 0.22.1", "bitflags 2.9.3", "bytes", "futures-util", "http", "http-body", "iri-string", + "mime", "pin-project-lite", "tower", "tower-layer", @@ -6943,7 +7268,7 @@ version = "0.20.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b7df16e474ef958526d1205f6dda359fdfab79d9aa6d54bafcb92dcd07673dca" dependencies = [ - "darling", + "darling 0.20.11", "once_cell", "proc-macro-error2", "proc-macro2", diff --git a/lib/llm/Cargo.toml b/lib/llm/Cargo.toml index 9e871cad98..19b96c5f73 100644 --- a/lib/llm/Cargo.toml +++ b/lib/llm/Cargo.toml @@ -98,7 +98,7 @@ dialoguer = { version = "0.11", default-features = false, features = [ # block_manager aligned-vec = { version = "0.6.4", optional = true } -nixl-sys = { version = "0.7", optional = true } +nixl-sys = { version = "=0.7.0", optional = true } cudarc = { workspace = true, optional = true } nix = { version = "0.26", optional = true } diff --git a/lib/llm/src/mocker/engine.rs b/lib/llm/src/mocker/engine.rs index 5858f2fa58..bb0b28ddc2 100644 --- a/lib/llm/src/mocker/engine.rs +++ b/lib/llm/src/mocker/engine.rs @@ -228,7 +228,6 @@ impl AsyncEngine, ManyOut, Error> input: SingleIn, ) -> Result, Error> { let (request, ctx) = input.into_parts(); - println!("request: {request:?}"); // Extract dp_rank from request field (defaults to 0 if not set) let dp_rank = request.dp_rank.unwrap_or(0); diff --git a/lib/llm/src/preprocessor.rs b/lib/llm/src/preprocessor.rs index 74963f6be5..c3eead7691 100644 --- a/lib/llm/src/preprocessor.rs +++ b/lib/llm/src/preprocessor.rs @@ -28,7 +28,7 @@ use tracing; use crate::model_card::{ModelDeploymentCard, ModelInfo}; #[cfg(feature = "media-nixl")] -use crate::preprocessor::media::{MediaDecoder, MediaLoader, MediaFetcher}; +use crate::preprocessor::media::{MediaDecoder, MediaFetcher, MediaLoader}; use crate::preprocessor::prompt::OAIChatLikeRequest; use crate::protocols::common::preprocessor::{ MultimodalData, MultimodalDataMap, PreprocessedRequestBuilder, @@ -145,8 +145,13 @@ impl OpenAIPreprocessor { // // Initialize runtime config from the ModelDeploymentCard let runtime_config = mdc.runtime_config.clone(); + #[cfg(feature = "media-nixl")] - let media_loader = Some(MediaLoader::new(MediaDecoder::default(), MediaFetcher::default())?); + let media_loader = match mdc.media_decoder { + Some(media_decoder) => Some(MediaLoader::new(media_decoder, mdc.media_fetcher)?), + None => None, + }; + Ok(Arc::new(Self { formatter, tokenizer, @@ -284,7 +289,8 @@ impl OpenAIPreprocessor { let message_count = messages.len().unwrap_or(0); let mut media_map: MultimodalDataMap = HashMap::new(); #[cfg(feature = "media-nixl")] - let mut fetch_tasks: Vec<(String, ChatCompletionRequestUserMessageContentPart)> = Vec::new(); + let mut fetch_tasks: Vec<(String, ChatCompletionRequestUserMessageContentPart)> = + Vec::new(); for idx in 0..message_count { let msg = messages diff --git a/lib/llm/src/preprocessor/media/README.md b/lib/llm/src/preprocessor/media/README.md index 9bc983a9be..fede33bc9f 100644 --- a/lib/llm/src/preprocessor/media/README.md +++ b/lib/llm/src/preprocessor/media/README.md @@ -3,6 +3,39 @@ This component performs media download, base64 decoding, media decoding and NIXL registration. Today, this is used in the OpenAI preprocessor, to transform multimodal inputs (image_url, video_url, audio_url) into fully decoded data (pixel values, ...) accessible to the backends via NIXL. +## Usage + +Media decoding is enabled when registering the MDC: + +Set HTTP download options: + +```python +from dynamo.llm import MediaFetcher +fetcher = MediaFetcher() +fetcher.user_agent("dynamo") +fetcher.timeout_ms(15000) +fetcher.allow_direct_ip(True) +fetcher.allow_direct_port(False) +fetcher.allowed_media_domains(["google.com"]) +``` + +Set media decoding options: + +```python +from dynamo.llm import MediaDecoder +decoder = MediaDecoder() +decoder.image_decoder({"max_image_width": 4096, "max_image_height": 4096, "max_alloc": 16*1024*1024}) +``` + +And register the LLM as usual, adding the media configuration: + +```python +register_llm( + ..., + media_decoder=decoder, + media_fetcher=fetcher, +) +``` ## TODOs @@ -25,5 +58,6 @@ This component performs media download, base64 decoding, media decoding and NIXL - [ ] Memory spilling to lower storage tiers - [ ] Early-free memory on client notifications -### Observability +### Misc - [ ] Observability on performance, memory usage and input distributions +- [ ] Per-request decoding options diff --git a/lib/llm/src/preprocessor/media/loader.rs b/lib/llm/src/preprocessor/media/loader.rs index b37e8e5e79..0d229d7437 100644 --- a/lib/llm/src/preprocessor/media/loader.rs +++ b/lib/llm/src/preprocessor/media/loader.rs @@ -8,18 +8,15 @@ use anyhow::Result; use dynamo_async_openai::types::ChatCompletionRequestUserMessageContentPart; -use super::decoders::{MediaDecoder}; +use super::decoders::MediaDecoder; use super::rdma::RdmaMediaDataDescriptor; #[cfg(feature = "media-nixl")] use { - super::rdma::get_nixl_agent, + super::common::EncodedMediaData, super::decoders::Decoder, super::rdma::get_nixl_agent, dynamo_memory::nixl::NixlAgent, - super::common::EncodedMediaData, - super::decoders::Decoder }; - const DEFAULT_HTTP_USER_AGENT: &str = "dynamo-ai/dynamo"; const DEFAULT_HTTP_TIMEOUT: Duration = Duration::from_secs(30); @@ -55,8 +52,9 @@ pub struct MediaLoader { } impl MediaLoader { - pub fn new(media_decoder: MediaDecoder, media_fetcher: MediaFetcher) -> Result { - let mut http_client_builder = + pub fn new(media_decoder: MediaDecoder, media_fetcher: Option) -> Result { + let media_fetcher = media_fetcher.unwrap_or_default(); + let mut http_client_builder: reqwest::ClientBuilder = reqwest::Client::builder().user_agent(&media_fetcher.user_agent); if let Some(timeout) = media_fetcher.timeout { @@ -109,7 +107,9 @@ impl MediaLoader { // TODO: request-level options ) -> Result { #[cfg(not(feature = "media-nixl"))] - anyhow::bail!("NIXL is not supported, cannot decode and register media data {oai_content_part:?}"); + anyhow::bail!( + "NIXL is not supported, cannot decode and register media data {oai_content_part:?}" + ); #[cfg(feature = "media-nixl")] { @@ -133,11 +133,9 @@ impl MediaLoader { _ => anyhow::bail!("Unsupported media type"), }; - let rdma_descriptor = decoded.into_rdma_descriptor(&self.nixl_agent)?; Ok(rdma_descriptor) } - } } @@ -225,7 +223,7 @@ mod tests_non_nixl { allow_direct_ip: false, ..Default::default() }; - let loader = MediaLoader::new(MediaDecoder::default(), fetcher).unwrap(); + let loader = MediaLoader::new(MediaDecoder::default(), Some(fetcher)).unwrap(); let url = url::Url::parse("http://192.168.1.1/image.jpg").unwrap(); let result = loader.check_if_url_allowed(&url); @@ -245,7 +243,7 @@ mod tests_non_nixl { allow_direct_port: false, ..Default::default() }; - let loader = MediaLoader::new(MediaDecoder::default(), fetcher).unwrap(); + let loader = MediaLoader::new(MediaDecoder::default(), Some(fetcher)).unwrap(); let url = url::Url::parse("http://example.com:8080/image.jpg").unwrap(); let result = loader.check_if_url_allowed(&url); @@ -269,7 +267,7 @@ mod tests_non_nixl { allowed_media_domains: Some(allowed_domains), ..Default::default() }; - let loader = MediaLoader::new(MediaDecoder::default(), fetcher).unwrap(); + let loader = MediaLoader::new(MediaDecoder::default(), Some(fetcher)).unwrap(); // Allowed domain should pass let url = url::Url::parse("https://trusted.com/image.jpg").unwrap(); diff --git a/lib/llm/src/preprocessor/media/rdma.rs b/lib/llm/src/preprocessor/media/rdma.rs index f250479743..fc36c09d2f 100644 --- a/lib/llm/src/preprocessor/media/rdma.rs +++ b/lib/llm/src/preprocessor/media/rdma.rs @@ -7,9 +7,9 @@ use serde::{Deserialize, Serialize}; #[cfg(feature = "media-nixl")] use { + base64::{Engine as _, engine::general_purpose}, dynamo_memory::SystemStorage, dynamo_memory::nixl::{self, NixlAgent, NixlDescriptor, RegisteredView}, - base64::{Engine as _, engine::general_purpose}, std::sync::Arc, }; From 1d673355f3c278a58a8d332e1cf3c0dceda55843 Mon Sep 17 00:00:00 2001 From: Krishnan Prashanth Date: Mon, 24 Nov 2025 10:03:49 -0800 Subject: [PATCH 7/7] Removing duplicate moxcms in Cargo.lock Signed-off-by: Krishnan Prashanth --- lib/bindings/python/Cargo.lock | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/lib/bindings/python/Cargo.lock b/lib/bindings/python/Cargo.lock index 066360cf60..a74f021e03 100644 --- a/lib/bindings/python/Cargo.lock +++ b/lib/bindings/python/Cargo.lock @@ -3919,16 +3919,6 @@ dependencies = [ "pxfm", ] -[[package]] -name = "moxcms" -version = "0.7.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0fbdd3d7436f8b5e892b8b7ea114271ff0fa00bc5acae845d53b07d498616ef6" -dependencies = [ - "num-traits", - "pxfm", -] - [[package]] name = "multimap" version = "0.10.1"