diff --git a/Makefile b/Makefile index 703f9599..97a705e9 100644 --- a/Makefile +++ b/Makefile @@ -39,6 +39,14 @@ fmt: node_modules jarmuz-static: node_modules ./jarmuz-static.mjs +.PHONY: build +build: jarmuz-static + cargo build -p paddler --features web_admin_panel + +.PHONY: build.cuda +build.cuda: jarmuz-static + cargo build -p paddler --features cuda,web_admin_panel + .PHONY: release release: jarmuz-static cargo build --release -p paddler --features web_admin_panel @@ -51,10 +59,6 @@ release.cuda: jarmuz-static release.vulkan: jarmuz-static cargo build --release -p paddler --features web_admin_panel,vulkan -.PHONY: build -build: jarmuz-static - cargo build -p paddler --features web_admin_panel - .PHONY: test test: test.unit test.models test.integration diff --git a/paddler/src/agent/continuous_batch_embedding_processor.rs b/paddler/src/agent/continuous_batch_embedding_processor.rs index 93657f78..573b796e 100644 --- a/paddler/src/agent/continuous_batch_embedding_processor.rs +++ b/paddler/src/agent/continuous_batch_embedding_processor.rs @@ -57,8 +57,6 @@ impl<'context> ContinuousBatchEmbeddingProcessor<'context> { return Err(anyhow!("Embeddings are not enabled")); } - self.llama_context.clear_kv_cache(); - let tokens_lines_list = input_batch .into_iter() .map(|input| { diff --git a/paddler/src/agent/continuous_batch_scheduler.rs b/paddler/src/agent/continuous_batch_scheduler.rs index 9cab4ff6..838e534b 100644 --- a/paddler/src/agent/continuous_batch_scheduler.rs +++ b/paddler/src/agent/continuous_batch_scheduler.rs @@ -43,6 +43,18 @@ use crate::decoded_image::DecodedImage; use crate::dispenses_slots::DispensesSlots; use crate::slot_aggregated_status::SlotAggregatedStatus; +struct GeneratingContribution { + request_index: usize, + batch_position: i32, +} + +struct IngestingContribution { + request_index: usize, + chunk_size: usize, + is_last_chunk: bool, + last_batch_position: i32, +} + pub struct ContinuousBatchScheduler { active_requests: Vec, command_rx: Receiver, @@ -729,66 +741,75 @@ impl ContinuousBatchScheduler { } fn execute_one_iteration(&mut self) -> Result<()> { + self.advance_generating_requests(); + let batch_n_tokens = self.scheduler_context.inference_parameters.batch_n_tokens; - let max_sequences = self.active_requests.len(); - #[expect( - clippy::cast_possible_truncation, - clippy::cast_possible_wrap, - reason = "token counts and positions fit in i32 for llama.cpp FFI" - )] - let mut batch = LlamaBatch::new(batch_n_tokens, max_sequences.max(1) as i32)?; + loop { + let max_sequences = self.active_requests.len(); - let mut current_batch_token_count: usize = 0; + #[expect( + clippy::cast_possible_truncation, + clippy::cast_possible_wrap, + reason = "token counts and positions fit in i32 for llama.cpp FFI" + )] + let mut batch = LlamaBatch::new(batch_n_tokens, max_sequences.max(1) as i32)?; - current_batch_token_count += - self.sample_generating_requests_into_batch(&mut batch, batch_n_tokens)?; + let mut generating_contributions: Vec = Vec::new(); + let mut ingesting_contributions: Vec = Vec::new(); - self.ingest_prompt_tokens_into_batch( - &mut batch, - batch_n_tokens, - current_batch_token_count, - )?; + let mut current_batch_token_count: usize = 0; - if batch.n_tokens() == 0 { - return Ok(()); - } + current_batch_token_count += self.add_generating_pending_tokens_to_batch( + &mut batch, + batch_n_tokens, + &mut generating_contributions, + )?; - debug!( - "{:?}: decoding batch with {} tokens for {} active requests", - self.scheduler_context.agent_name, - batch.n_tokens(), - self.active_requests.len() - ); + self.add_ingesting_prompt_chunks_to_batch( + &mut batch, + batch_n_tokens, + current_batch_token_count, + &mut ingesting_contributions, + )?; + + if batch.n_tokens() == 0 { + return Ok(()); + } + + debug!( + "{:?}: decoding batch with {} tokens for {} active requests", + self.scheduler_context.agent_name, + batch.n_tokens(), + self.active_requests.len() + ); - if let Err(err) = self.llama_context.decode(&mut batch) { - match err { - DecodeError::NoKvCacheSlot => { + match self.llama_context.decode(&mut batch) { + Ok(()) => { + self.commit_contributions(&generating_contributions, &ingesting_contributions); + + return Ok(()); + } + Err(DecodeError::NoKvCacheSlot) => { self.evict_largest_sequence(); - return self.execute_one_iteration(); + if self.active_requests.is_empty() { + return Ok(()); + } } - DecodeError::Aborted | DecodeError::NTokensZero => { + Err(DecodeError::Aborted | DecodeError::NTokensZero) => { return Ok(()); } - DecodeError::Unknown(error_code) => { + Err(DecodeError::Unknown(error_code)) => { return Err(anyhow!( "Decode failed with unknown error code: {error_code}" )); } } } - - Ok(()) } - fn sample_generating_requests_into_batch( - &mut self, - batch: &mut LlamaBatch, - batch_n_tokens: usize, - ) -> Result { - let mut tokens_added: usize = 0; - + fn advance_generating_requests(&mut self) { for active_request in &mut self.active_requests { if !matches!( active_request.phase, @@ -797,44 +818,38 @@ impl ContinuousBatchScheduler { continue; } - if tokens_added >= batch_n_tokens { - break; + if active_request.pending_sampled_token.is_some() { + continue; } - let sampled_token = if let Some(harvested) = - active_request.pending_sampled_token.take() - { - harvested - } else { - let Some(batch_index) = active_request.i_batch else { - continue; - }; - - match sample_token_at_batch_index( - &self.llama_context, - batch_index, - &mut active_request.chain, - &mut active_request.grammar_sampler, - ) { - Ok(token) => token, - Err(result) => { - error!( - "{:?}: sequence {} sampling error: {result:?}", - self.scheduler_context.agent_name, active_request.sequence_id - ); - - if active_request.generated_tokens_tx.send(result).is_err() { - warn!( - "{:?}: failed to send result to client (receiver dropped)", - self.scheduler_context.agent_name - ); - } + let Some(batch_index) = active_request.i_batch else { + continue; + }; - active_request.i_batch = None; - active_request.phase = ContinuousBatchRequestPhase::Completed; + let sampled_token = match sample_token_at_batch_index( + &self.llama_context, + batch_index, + &mut active_request.chain, + &mut active_request.grammar_sampler, + ) { + Ok(token) => token, + Err(result) => { + error!( + "{:?}: sequence {} sampling error: {result:?}", + self.scheduler_context.agent_name, active_request.sequence_id + ); - continue; + if active_request.generated_tokens_tx.send(result).is_err() { + warn!( + "{:?}: failed to send result to client (receiver dropped)", + self.scheduler_context.agent_name + ); } + + active_request.i_batch = None; + active_request.phase = ContinuousBatchRequestPhase::Completed; + + continue; } }; @@ -925,16 +940,48 @@ impl ContinuousBatchScheduler { continue; } - active_request.i_batch = Some(batch.n_tokens()); + active_request.pending_sampled_token = Some(sampled_token); + } + } + + fn add_generating_pending_tokens_to_batch( + &self, + batch: &mut LlamaBatch, + batch_n_tokens: usize, + contributions: &mut Vec, + ) -> Result { + let mut tokens_added: usize = 0; + + for (request_index, active_request) in self.active_requests.iter().enumerate() { + if !matches!( + active_request.phase, + ContinuousBatchRequestPhase::Generating + ) { + continue; + } + + let Some(pending_token) = active_request.pending_sampled_token else { + continue; + }; + + if tokens_added >= batch_n_tokens { + break; + } + + let batch_position = batch.n_tokens(); batch.add( - sampled_token, + pending_token, active_request.current_token_position, &[active_request.sequence_id], true, )?; - active_request.current_token_position += 1; + contributions.push(GeneratingContribution { + request_index, + batch_position, + }); + tokens_added += 1; } @@ -946,13 +993,14 @@ impl ContinuousBatchScheduler { clippy::cast_possible_wrap, reason = "token counts and positions fit in i32 for llama.cpp FFI" )] - fn ingest_prompt_tokens_into_batch( - &mut self, + fn add_ingesting_prompt_chunks_to_batch( + &self, batch: &mut LlamaBatch, batch_n_tokens: usize, mut current_batch_token_count: usize, + contributions: &mut Vec, ) -> Result<()> { - for active_request in &mut self.active_requests { + for (request_index, active_request) in self.active_requests.iter().enumerate() { if !matches!(active_request.phase, ContinuousBatchRequestPhase::Ingesting) { continue; } @@ -982,19 +1030,50 @@ impl ContinuousBatchScheduler { )?; } - if is_last_chunk { - active_request.i_batch = Some(batch.n_tokens() - 1); - active_request.phase = ContinuousBatchRequestPhase::Generating; - } + contributions.push(IngestingContribution { + request_index, + chunk_size, + is_last_chunk, + last_batch_position: batch.n_tokens() - 1, + }); - active_request.prompt_tokens_ingested += chunk_size; - active_request.current_token_position += chunk_size as i32; current_batch_token_count += chunk_size; } Ok(()) } + #[expect( + clippy::cast_possible_truncation, + clippy::cast_possible_wrap, + reason = "chunk sizes fit in i32 for llama.cpp position arithmetic" + )] + fn commit_contributions( + &mut self, + generating_contributions: &[GeneratingContribution], + ingesting_contributions: &[IngestingContribution], + ) { + for contribution in generating_contributions { + let request = &mut self.active_requests[contribution.request_index]; + + request.pending_sampled_token = None; + request.i_batch = Some(contribution.batch_position); + request.current_token_position += 1; + } + + for contribution in ingesting_contributions { + let request = &mut self.active_requests[contribution.request_index]; + + request.prompt_tokens_ingested += contribution.chunk_size; + request.current_token_position += contribution.chunk_size as i32; + + if contribution.is_last_chunk { + request.i_batch = Some(contribution.last_batch_position); + request.phase = ContinuousBatchRequestPhase::Generating; + } + } + } + fn evict_largest_sequence(&mut self) { let mut largest_seq_index: Option = None; let mut largest_position: i32 = -1; diff --git a/paddler_integration_tests/tests/agent_cuda_clean_shutdown.rs b/paddler_integration_tests/tests/agent_cuda_clean_shutdown.rs index c0fdbd20..feab0a3f 100644 --- a/paddler_integration_tests/tests/agent_cuda_clean_shutdown.rs +++ b/paddler_integration_tests/tests/agent_cuda_clean_shutdown.rs @@ -124,7 +124,10 @@ async fn test_cuda_agent_exits_cleanly_on_sigterm_during_multimodal_inference() } } - assert!(received_token, "agent never produced a token before SIGTERM"); + assert!( + received_token, + "agent never produced a token before SIGTERM" + ); let exit_status = cluster .agent diff --git a/paddler_model_tests/tests/continuous_batch_clean_shutdown.rs b/paddler_model_tests/tests/continuous_batch_clean_shutdown.rs deleted file mode 100644 index 6ccb8ae8..00000000 --- a/paddler_model_tests/tests/continuous_batch_clean_shutdown.rs +++ /dev/null @@ -1,59 +0,0 @@ -#![cfg(feature = "tests_that_use_llms")] - -use llama_cpp_bindings::LogOptions; -use llama_cpp_bindings::send_logs_to_tracing; -use paddler::agent::continue_from_raw_prompt_request::ContinueFromRawPromptRequest; -use paddler::agent::continuous_batch_scheduler_command::ContinuousBatchSchedulerCommand; -use paddler_model_tests::collect_generated_tokens::collect_generated_tokens; -use paddler_model_tests::device_test; -use paddler_model_tests::managed_model::ManagedModel; -use paddler_model_tests::managed_model_params::ManagedModelParams; -use paddler_types::generated_token_result::GeneratedTokenResult; -use paddler_types::huggingface_model_reference::HuggingFaceModelReference; -use paddler_types::request_params::ContinueFromRawPromptParams; -use tokio::sync::mpsc; - -const QWEN3_0_6B_LAYER_COUNT: u32 = 28; - -device_test!(continuous_batch_shutdown_after_inference_does_not_abort, |device| { - send_logs_to_tracing(LogOptions::default()); - - let managed_model = ManagedModel::from_huggingface(ManagedModelParams { - inference_parameters: device.inference_parameters_for_full_offload(QWEN3_0_6B_LAYER_COUNT), - model: HuggingFaceModelReference { - filename: "Qwen3-0.6B-Q8_0.gguf".to_owned(), - repo_id: "Qwen/Qwen3-0.6B-GGUF".to_owned(), - revision: "main".to_owned(), - }, - multimodal_projection: None, - slots: 1, - }) - .await?; - - let (generated_tokens_tx, generated_tokens_rx) = mpsc::unbounded_channel(); - let (_stop_tx, generate_tokens_stop_rx) = mpsc::unbounded_channel::<()>(); - - managed_model - .handle() - .command_tx - .send(ContinuousBatchSchedulerCommand::ContinueFromRawPrompt( - ContinueFromRawPromptRequest { - generated_tokens_tx, - generate_tokens_stop_rx, - params: ContinueFromRawPromptParams { - grammar: None, - max_tokens: 16, - raw_prompt: "Count from 1 to 5:".to_owned(), - }, - }, - )) - .map_err(|err| anyhow::anyhow!("Failed to send command: {err}"))?; - - let results = collect_generated_tokens(generated_tokens_rx).await?; - - assert!(matches!(results.last(), Some(GeneratedTokenResult::Done))); - - managed_model.shutdown()?; - - Ok(()) -}); diff --git a/paddler_model_tests/tests/continuous_batch_concurrent_multimodal.rs b/paddler_model_tests/tests/continuous_batch_concurrent_multimodal.rs index 2ecbab17..83c301f8 100644 --- a/paddler_model_tests/tests/continuous_batch_concurrent_multimodal.rs +++ b/paddler_model_tests/tests/continuous_batch_concurrent_multimodal.rs @@ -22,126 +22,130 @@ use tokio::sync::mpsc; const QWEN3_5_0_8B_LAYER_COUNT: u32 = 999; -device_test!(two_concurrent_multimodal_requests_both_produce_tokens, |device| { - send_logs_to_tracing(LogOptions::default()); - - let managed_model = ManagedModel::from_huggingface(ManagedModelParams { - inference_parameters: device.inference_parameters_for_full_offload(QWEN3_5_0_8B_LAYER_COUNT), - model: HuggingFaceModelReference { - filename: "Qwen3.5-0.8B-Q4_K_M.gguf".to_owned(), - repo_id: "unsloth/Qwen3.5-0.8B-GGUF".to_owned(), - revision: "main".to_owned(), - }, - multimodal_projection: Some(HuggingFaceModelReference { - filename: "mmproj-F16.gguf".to_owned(), - repo_id: "unsloth/Qwen3.5-0.8B-GGUF".to_owned(), - revision: "main".to_owned(), - }), - slots: 4, - }) - .await?; - - let test_image_data_uri = load_test_image_as_data_uri(); - - let build_conversation = || { - ConversationHistory::new(vec![ - ConversationMessage { - content: ConversationMessageContent::Text( - "You are a helpful assistant. Give engaging, short, precise answers." - .to_owned(), - ), - role: "system".to_owned(), +device_test!( + two_concurrent_multimodal_requests_both_produce_tokens, + |device| { + send_logs_to_tracing(LogOptions::default()); + + let managed_model = ManagedModel::from_huggingface(ManagedModelParams { + inference_parameters: device + .inference_parameters_for_full_offload(QWEN3_5_0_8B_LAYER_COUNT), + model: HuggingFaceModelReference { + filename: "Qwen3.5-0.8B-Q4_K_M.gguf".to_owned(), + repo_id: "unsloth/Qwen3.5-0.8B-GGUF".to_owned(), + revision: "main".to_owned(), }, - ConversationMessage { - content: ConversationMessageContent::Text( - "Hello! How can I help you today?".to_owned(), - ), - role: "assistant".to_owned(), - }, - ConversationMessage { - content: ConversationMessageContent::Parts(vec![ - ConversationMessageContentPart::ImageUrl { - image_url: ImageUrl { - url: test_image_data_uri.clone(), + multimodal_projection: Some(HuggingFaceModelReference { + filename: "mmproj-F16.gguf".to_owned(), + repo_id: "unsloth/Qwen3.5-0.8B-GGUF".to_owned(), + revision: "main".to_owned(), + }), + slots: 4, + }) + .await?; + + let test_image_data_uri = load_test_image_as_data_uri(); + + let build_conversation = || { + ConversationHistory::new(vec![ + ConversationMessage { + content: ConversationMessageContent::Text( + "You are a helpful assistant. Give engaging, short, precise answers." + .to_owned(), + ), + role: "system".to_owned(), + }, + ConversationMessage { + content: ConversationMessageContent::Text( + "Hello! How can I help you today?".to_owned(), + ), + role: "assistant".to_owned(), + }, + ConversationMessage { + content: ConversationMessageContent::Parts(vec![ + ConversationMessageContentPart::ImageUrl { + image_url: ImageUrl { + url: test_image_data_uri.clone(), + }, }, - }, - ConversationMessageContentPart::Text { - text: "Describe what you see in this image.".to_owned(), - }, - ]), - role: "user".to_owned(), - }, - ]) - }; - - let mut receivers = Vec::new(); - let mut stop_senders = Vec::new(); - - for _ in 0..2u8 { - let (generated_tokens_tx, generated_tokens_rx) = mpsc::unbounded_channel(); - let (stop_tx, generate_tokens_stop_rx) = mpsc::unbounded_channel::<()>(); - - managed_model - .handle() - .command_tx - .send( - ContinuousBatchSchedulerCommand::ContinueFromConversationHistory( - ContinueFromConversationHistoryRequest { - generated_tokens_tx, - generate_tokens_stop_rx, - params: ContinueFromConversationHistoryParams { - add_generation_prompt: true, - conversation_history: build_conversation(), - enable_thinking: false, - grammar: None, - max_tokens: 32, - tools: vec![], + ConversationMessageContentPart::Text { + text: "Describe what you see in this image.".to_owned(), }, - }, - ), - ) - .map_err(|err| anyhow::anyhow!("Failed to send command: {err}"))?; - - receivers.push(generated_tokens_rx); - stop_senders.push(stop_tx); - } + ]), + role: "user".to_owned(), + }, + ]) + }; + + let mut receivers = Vec::new(); + let mut stop_senders = Vec::new(); + + for _ in 0..2u8 { + let (generated_tokens_tx, generated_tokens_rx) = mpsc::unbounded_channel(); + let (stop_tx, generate_tokens_stop_rx) = mpsc::unbounded_channel::<()>(); + + managed_model + .handle() + .command_tx + .send( + ContinuousBatchSchedulerCommand::ContinueFromConversationHistory( + ContinueFromConversationHistoryRequest { + generated_tokens_tx, + generate_tokens_stop_rx, + params: ContinueFromConversationHistoryParams { + add_generation_prompt: true, + conversation_history: build_conversation(), + enable_thinking: false, + grammar: None, + max_tokens: 32, + tools: vec![], + }, + }, + ), + ) + .map_err(|err| anyhow::anyhow!("Failed to send command: {err}"))?; - let (results_0, results_1) = tokio::join!( - collect_generated_tokens(receivers.remove(0)), - collect_generated_tokens(receivers.remove(0)), - ); + receivers.push(generated_tokens_rx); + stop_senders.push(stop_tx); + } - let results_0 = results_0?; - let results_1 = results_1?; + let (results_0, results_1) = tokio::join!( + collect_generated_tokens(receivers.remove(0)), + collect_generated_tokens(receivers.remove(0)), + ); - log_generated_response(&results_0); - log_generated_response(&results_1); + let results_0 = results_0?; + let results_1 = results_1?; - for (index, results) in [&results_0, &results_1].iter().enumerate() { - let token_count = results - .iter() - .filter(|result| matches!(result, GeneratedTokenResult::Token(_))) - .count(); + log_generated_response(&results_0); + log_generated_response(&results_1); - assert!( - token_count > 0, - "Concurrent multimodal request {index} should produce at least one token, got: {results:?}" - ); - assert!( - !results + for (index, results) in [&results_0, &results_1].iter().enumerate() { + let token_count = results .iter() - .any(|result| matches!(result, GeneratedTokenResult::SamplerError(_))), - "Concurrent multimodal request {index} should not produce SamplerError, got: {results:?}" - ); - assert!( - matches!(results.last(), Some(GeneratedTokenResult::Done)), - "Concurrent multimodal request {index} should end with Done, got: {results:?}" - ); + .filter(|result| matches!(result, GeneratedTokenResult::Token(_))) + .count(); + + assert!( + token_count > 0, + "Concurrent multimodal request {index} should produce at least one token, got: {results:?}" + ); + assert!( + !results + .iter() + .any(|result| matches!(result, GeneratedTokenResult::SamplerError(_))), + "Concurrent multimodal request {index} should not produce SamplerError, got: {results:?}" + ); + assert!( + matches!(results.last(), Some(GeneratedTokenResult::Done)), + "Concurrent multimodal request {index} should end with Done, got: {results:?}" + ); + } + + drop(stop_senders); + + managed_model.shutdown()?; + + Ok(()) } - - drop(stop_senders); - - managed_model.shutdown()?; - - Ok(()) -}); +); diff --git a/paddler_model_tests/tests/continuous_batch_kv_cache_dtypes.rs b/paddler_model_tests/tests/continuous_batch_kv_cache_dtypes.rs index 4e52ba7c..d2e48277 100644 --- a/paddler_model_tests/tests/continuous_batch_kv_cache_dtypes.rs +++ b/paddler_model_tests/tests/continuous_batch_kv_cache_dtypes.rs @@ -1,8 +1,4 @@ #![cfg(feature = "tests_that_use_llms")] -#![expect( - non_snake_case, - reason = "test function names embed ggml dtype identifiers (e.g. IQ4_NL) for readable test output and parity with llama.cpp naming" -)] use anyhow::Result; use llama_cpp_bindings::LogOptions; @@ -19,16 +15,14 @@ use paddler_types::kv_cache_dtype::KvCacheDtype; use paddler_types::request_params::ContinueFromRawPromptParams; use tokio::sync::mpsc; -async fn assert_generates_tokens_with_kv_cache_dtypes( - k_cache_dtype: KvCacheDtype, - v_cache_dtype: KvCacheDtype, -) -> Result<()> { +#[actix_web::test] +async fn generates_tokens_with_distinct_k_and_v_cache_dtypes() -> Result<()> { send_logs_to_tracing(LogOptions::default()); let managed_model = ManagedModel::from_huggingface(ManagedModelParams { inference_parameters: InferenceParameters { - k_cache_dtype, - v_cache_dtype, + k_cache_dtype: KvCacheDtype::Q8_0, + v_cache_dtype: KvCacheDtype::Q4_0, ..InferenceParameters::default() }, model: HuggingFaceModelReference { @@ -74,53 +68,3 @@ async fn assert_generates_tokens_with_kv_cache_dtypes( Ok(()) } - -#[actix_web::test] -async fn generates_tokens_with_F32_kv_cache() -> Result<()> { - assert_generates_tokens_with_kv_cache_dtypes(KvCacheDtype::F32, KvCacheDtype::F32).await -} - -#[actix_web::test] -async fn generates_tokens_with_F16_kv_cache() -> Result<()> { - assert_generates_tokens_with_kv_cache_dtypes(KvCacheDtype::F16, KvCacheDtype::F16).await -} - -#[actix_web::test] -async fn generates_tokens_with_BF16_kv_cache() -> Result<()> { - assert_generates_tokens_with_kv_cache_dtypes(KvCacheDtype::BF16, KvCacheDtype::BF16).await -} - -#[actix_web::test] -async fn generates_tokens_with_Q8_0_kv_cache() -> Result<()> { - assert_generates_tokens_with_kv_cache_dtypes(KvCacheDtype::Q8_0, KvCacheDtype::Q8_0).await -} - -#[actix_web::test] -async fn generates_tokens_with_Q4_0_kv_cache() -> Result<()> { - assert_generates_tokens_with_kv_cache_dtypes(KvCacheDtype::Q4_0, KvCacheDtype::Q4_0).await -} - -#[actix_web::test] -async fn generates_tokens_with_Q4_1_kv_cache() -> Result<()> { - assert_generates_tokens_with_kv_cache_dtypes(KvCacheDtype::Q4_1, KvCacheDtype::Q4_1).await -} - -#[actix_web::test] -async fn generates_tokens_with_IQ4_NL_kv_cache() -> Result<()> { - assert_generates_tokens_with_kv_cache_dtypes(KvCacheDtype::IQ4_NL, KvCacheDtype::IQ4_NL).await -} - -#[actix_web::test] -async fn generates_tokens_with_Q5_0_kv_cache() -> Result<()> { - assert_generates_tokens_with_kv_cache_dtypes(KvCacheDtype::Q5_0, KvCacheDtype::Q5_0).await -} - -#[actix_web::test] -async fn generates_tokens_with_Q5_1_kv_cache() -> Result<()> { - assert_generates_tokens_with_kv_cache_dtypes(KvCacheDtype::Q5_1, KvCacheDtype::Q5_1).await -} - -#[actix_web::test] -async fn generates_tokens_with_distinct_k_and_v_cache_dtypes() -> Result<()> { - assert_generates_tokens_with_kv_cache_dtypes(KvCacheDtype::Q8_0, KvCacheDtype::Q4_0).await -} diff --git a/paddler_model_tests/tests/continuous_batch_kv_pressure_eviction.rs b/paddler_model_tests/tests/continuous_batch_kv_pressure_eviction.rs new file mode 100644 index 00000000..20b85ffb --- /dev/null +++ b/paddler_model_tests/tests/continuous_batch_kv_pressure_eviction.rs @@ -0,0 +1,127 @@ +#![cfg(feature = "tests_that_use_llms")] + +use anyhow::Result; +use llama_cpp_bindings::LogOptions; +use llama_cpp_bindings::send_logs_to_tracing; +use paddler::agent::continue_from_raw_prompt_request::ContinueFromRawPromptRequest; +use paddler::agent::continuous_batch_scheduler_command::ContinuousBatchSchedulerCommand; +use paddler_model_tests::collect_generated_tokens::collect_generated_tokens; +use paddler_model_tests::managed_model::ManagedModel; +use paddler_model_tests::managed_model_params::ManagedModelParams; +use paddler_types::generated_token_result::GeneratedTokenResult; +use paddler_types::huggingface_model_reference::HuggingFaceModelReference; +use paddler_types::inference_parameters::InferenceParameters; +use paddler_types::request_params::ContinueFromRawPromptParams; +use tokio::sync::mpsc; + +/// Two concurrent generations with combined KV footprint larger than the +/// allocated context force the scheduler down the `DecodeError::NoKvCacheSlot` +/// path. Previously, the scheduler recursed into `execute_one_iteration` +/// unboundedly on each eviction attempt, risking a stack overflow if the +/// remaining sequence still couldn't fit. The scheduler must now terminate +/// deterministically: at least one request completes with `Done`, and the +/// test returns within the timeout rather than hanging or crashing. +#[actix_web::test] +async fn test_eviction_terminates_and_survivor_completes() -> Result<()> { + send_logs_to_tracing(LogOptions::default()); + + // Per llama.cpp's non-unified KV cache layout, `n_ctx_seq = n_ctx / n_seq_max`: + // with context_size=256 and 2 slots, each sequence has a 128-token KV budget. + // A long prompt plus enough max_tokens pushes the larger sequence past that + // budget and forces `NoKvCacheSlot` during decode. The short sequence stays + // within budget and completes after the large one is evicted. + // + // Sampling is pinned to greedy (temperature=0) so the long sequence's decoded + // continuation is deterministic across runs — whether it hits EOS before + // exceeding the per-sequence KV budget is then a property of the model, not + // of the sampler's random draws. + let managed_model = ManagedModel::from_huggingface(ManagedModelParams { + inference_parameters: InferenceParameters { + batch_n_tokens: 256, + context_size: 256, + temperature: 0.0, + ..InferenceParameters::default() + }, + model: HuggingFaceModelReference { + filename: "Qwen3-0.6B-Q8_0.gguf".to_owned(), + repo_id: "Qwen/Qwen3-0.6B-GGUF".to_owned(), + revision: "main".to_owned(), + }, + multimodal_projection: None, + slots: 2, + }) + .await?; + + let long_prompt = "Describe in great detail how the process of photosynthesis \ + works in plants. Cover the light-dependent reactions, the Calvin cycle, \ + the role of chlorophyll, the thylakoid membrane, and the stroma. \ + Explain how water and carbon dioxide are converted to glucose and oxygen. \ + Discuss the evolutionary history of this process and its importance \ + throughout the biosphere, and then give a long essay response."; + let short_prompt = "Hi"; + + let (tx_a, rx_a) = mpsc::unbounded_channel(); + let (_stop_tx_a, stop_rx_a) = mpsc::unbounded_channel::<()>(); + let (tx_b, rx_b) = mpsc::unbounded_channel(); + let (_stop_tx_b, stop_rx_b) = mpsc::unbounded_channel::<()>(); + + managed_model + .handle() + .command_tx + .send(ContinuousBatchSchedulerCommand::ContinueFromRawPrompt( + ContinueFromRawPromptRequest { + generated_tokens_tx: tx_a, + generate_tokens_stop_rx: stop_rx_a, + params: ContinueFromRawPromptParams { + grammar: None, + max_tokens: 200, + raw_prompt: long_prompt.to_owned(), + }, + }, + )) + .map_err(|err| anyhow::anyhow!("Failed to send first command: {err}"))?; + + managed_model + .handle() + .command_tx + .send(ContinuousBatchSchedulerCommand::ContinueFromRawPrompt( + ContinueFromRawPromptRequest { + generated_tokens_tx: tx_b, + generate_tokens_stop_rx: stop_rx_b, + params: ContinueFromRawPromptParams { + grammar: None, + max_tokens: 20, + raw_prompt: short_prompt.to_owned(), + }, + }, + )) + .map_err(|err| anyhow::anyhow!("Failed to send second command: {err}"))?; + + let (results_a, results_b) = tokio::join!( + collect_generated_tokens(rx_a), + collect_generated_tokens(rx_b) + ); + + let results_a = results_a?; + let results_b = results_b?; + + let long_was_evicted = results_a.iter().any(|result| { + matches!(result, GeneratedTokenResult::SamplerError(message) if message.contains("evicted")) + }); + let short_completed_with_done = matches!(results_b.last(), Some(GeneratedTokenResult::Done)); + + assert!( + long_was_evicted, + "The long prompt must be evicted (it exhausts its per-sequence KV budget \ + first); got results_a={results_a:?}" + ); + assert!( + short_completed_with_done, + "The short prompt must complete with Done after the long one is evicted; \ + got results_b={results_b:?}" + ); + + managed_model.shutdown()?; + + Ok(()) +} diff --git a/paddler_model_tests/tests/continuous_batch_mixed_multimodal_plain.rs b/paddler_model_tests/tests/continuous_batch_mixed_multimodal_plain.rs index a008984e..56edb5e6 100644 --- a/paddler_model_tests/tests/continuous_batch_mixed_multimodal_plain.rs +++ b/paddler_model_tests/tests/continuous_batch_mixed_multimodal_plain.rs @@ -24,151 +24,159 @@ use tokio::sync::mpsc; const QWEN3_5_0_8B_LAYER_COUNT: u32 = 999; -device_test!(plain_then_multimodal_midflight_both_produce_tokens, |device| { - send_logs_to_tracing(LogOptions::default()); - - let managed_model = ManagedModel::from_huggingface(ManagedModelParams { - inference_parameters: device.inference_parameters_for_full_offload(QWEN3_5_0_8B_LAYER_COUNT), - model: HuggingFaceModelReference { - filename: "Qwen3.5-0.8B-Q4_K_M.gguf".to_owned(), - repo_id: "unsloth/Qwen3.5-0.8B-GGUF".to_owned(), - revision: "main".to_owned(), - }, - multimodal_projection: Some(HuggingFaceModelReference { - filename: "mmproj-F16.gguf".to_owned(), - repo_id: "unsloth/Qwen3.5-0.8B-GGUF".to_owned(), - revision: "main".to_owned(), - }), - slots: 4, - }) - .await?; - - let test_image_data_uri = load_test_image_as_data_uri(); - - let (plain_tokens_tx, mut plain_tokens_rx) = mpsc::unbounded_channel(); - let (plain_stop_tx, plain_stop_rx) = mpsc::unbounded_channel::<()>(); - - managed_model - .handle() - .command_tx - .send(ContinuousBatchSchedulerCommand::ContinueFromRawPrompt( - ContinueFromRawPromptRequest { - generated_tokens_tx: plain_tokens_tx, - generate_tokens_stop_rx: plain_stop_rx, - params: ContinueFromRawPromptParams { - grammar: None, - max_tokens: 256, - raw_prompt: "Write a long poem about the sea.".to_owned(), - }, +device_test!( + plain_then_multimodal_midflight_both_produce_tokens, + |device| { + send_logs_to_tracing(LogOptions::default()); + + let managed_model = ManagedModel::from_huggingface(ManagedModelParams { + inference_parameters: device + .inference_parameters_for_full_offload(QWEN3_5_0_8B_LAYER_COUNT), + model: HuggingFaceModelReference { + filename: "Qwen3.5-0.8B-Q4_K_M.gguf".to_owned(), + repo_id: "unsloth/Qwen3.5-0.8B-GGUF".to_owned(), + revision: "main".to_owned(), }, - )) - .map_err(|err| anyhow::anyhow!("Failed to send plain command: {err}"))?; + multimodal_projection: Some(HuggingFaceModelReference { + filename: "mmproj-F16.gguf".to_owned(), + repo_id: "unsloth/Qwen3.5-0.8B-GGUF".to_owned(), + revision: "main".to_owned(), + }), + slots: 4, + }) + .await?; + + let test_image_data_uri = load_test_image_as_data_uri(); + + let (plain_tokens_tx, mut plain_tokens_rx) = mpsc::unbounded_channel(); + let (plain_stop_tx, plain_stop_rx) = mpsc::unbounded_channel::<()>(); + + managed_model + .handle() + .command_tx + .send(ContinuousBatchSchedulerCommand::ContinueFromRawPrompt( + ContinueFromRawPromptRequest { + generated_tokens_tx: plain_tokens_tx, + generate_tokens_stop_rx: plain_stop_rx, + params: ContinueFromRawPromptParams { + grammar: None, + max_tokens: 256, + raw_prompt: "Write a long poem about the sea.".to_owned(), + }, + }, + )) + .map_err(|err| anyhow::anyhow!("Failed to send plain command: {err}"))?; - let mut plain_results: Vec = Vec::new(); - let mut plain_tokens_seen: usize = 0; + let mut plain_results: Vec = Vec::new(); + let mut plain_tokens_seen: usize = 0; - while plain_tokens_seen < 4 { - let result = plain_tokens_rx - .recv() - .await - .ok_or_else(|| anyhow::anyhow!("plain channel closed before first tokens"))?; + while plain_tokens_seen < 4 { + let result = plain_tokens_rx + .recv() + .await + .ok_or_else(|| anyhow::anyhow!("plain channel closed before first tokens"))?; - if matches!(result, GeneratedTokenResult::Token(_)) { - plain_tokens_seen += 1; - } + if matches!(result, GeneratedTokenResult::Token(_)) { + plain_tokens_seen += 1; + } - plain_results.push(result); - } + plain_results.push(result); + } - let multimodal_conversation = ConversationHistory::new(vec![ - ConversationMessage { - content: ConversationMessageContent::Text( - "You are a helpful assistant. Give engaging, short, precise answers.".to_owned(), - ), - role: "system".to_owned(), - }, - ConversationMessage { - content: ConversationMessageContent::Text( - "Hello! How can I help you today?".to_owned(), - ), - role: "assistant".to_owned(), - }, - ConversationMessage { - content: ConversationMessageContent::Parts(vec![ - ConversationMessageContentPart::ImageUrl { - image_url: ImageUrl { - url: test_image_data_uri, + let multimodal_conversation = ConversationHistory::new(vec![ + ConversationMessage { + content: ConversationMessageContent::Text( + "You are a helpful assistant. Give engaging, short, precise answers." + .to_owned(), + ), + role: "system".to_owned(), + }, + ConversationMessage { + content: ConversationMessageContent::Text( + "Hello! How can I help you today?".to_owned(), + ), + role: "assistant".to_owned(), + }, + ConversationMessage { + content: ConversationMessageContent::Parts(vec![ + ConversationMessageContentPart::ImageUrl { + image_url: ImageUrl { + url: test_image_data_uri, + }, }, - }, - ConversationMessageContentPart::Text { - text: "Describe what you see in this image.".to_owned(), - }, - ]), - role: "user".to_owned(), - }, - ]); - - let (multimodal_tokens_tx, multimodal_tokens_rx) = mpsc::unbounded_channel(); - let (multimodal_stop_tx, multimodal_stop_rx) = mpsc::unbounded_channel::<()>(); - - managed_model - .handle() - .command_tx - .send( - ContinuousBatchSchedulerCommand::ContinueFromConversationHistory( - ContinueFromConversationHistoryRequest { - generated_tokens_tx: multimodal_tokens_tx, - generate_tokens_stop_rx: multimodal_stop_rx, - params: ContinueFromConversationHistoryParams { - add_generation_prompt: true, - conversation_history: multimodal_conversation, - enable_thinking: false, - grammar: None, - max_tokens: 32, - tools: vec![], + ConversationMessageContentPart::Text { + text: "Describe what you see in this image.".to_owned(), }, - }, - ), - ) - .map_err(|err| anyhow::anyhow!("Failed to send multimodal command: {err}"))?; - - let (plain_remaining, multimodal_results) = tokio::join!( - collect_generated_tokens(plain_tokens_rx), - collect_generated_tokens(multimodal_tokens_rx), - ); - - plain_results.extend(plain_remaining?); - let multimodal_results = multimodal_results?; - - log_generated_response(&plain_results); - log_generated_response(&multimodal_results); - - for (label, results) in [("plain", &plain_results), ("multimodal", &multimodal_results)] { - let token_count = results - .iter() - .filter(|result| matches!(result, GeneratedTokenResult::Token(_))) - .count(); - - assert!( - token_count > 0, - "Concurrent {label} request should produce at least one token, got: {results:?}" + ]), + role: "user".to_owned(), + }, + ]); + + let (multimodal_tokens_tx, multimodal_tokens_rx) = mpsc::unbounded_channel(); + let (multimodal_stop_tx, multimodal_stop_rx) = mpsc::unbounded_channel::<()>(); + + managed_model + .handle() + .command_tx + .send( + ContinuousBatchSchedulerCommand::ContinueFromConversationHistory( + ContinueFromConversationHistoryRequest { + generated_tokens_tx: multimodal_tokens_tx, + generate_tokens_stop_rx: multimodal_stop_rx, + params: ContinueFromConversationHistoryParams { + add_generation_prompt: true, + conversation_history: multimodal_conversation, + enable_thinking: false, + grammar: None, + max_tokens: 32, + tools: vec![], + }, + }, + ), + ) + .map_err(|err| anyhow::anyhow!("Failed to send multimodal command: {err}"))?; + + let (plain_remaining, multimodal_results) = tokio::join!( + collect_generated_tokens(plain_tokens_rx), + collect_generated_tokens(multimodal_tokens_rx), ); - assert!( - !results + + plain_results.extend(plain_remaining?); + let multimodal_results = multimodal_results?; + + log_generated_response(&plain_results); + log_generated_response(&multimodal_results); + + for (label, results) in [ + ("plain", &plain_results), + ("multimodal", &multimodal_results), + ] { + let token_count = results .iter() - .any(|result| matches!(result, GeneratedTokenResult::SamplerError(_))), - "Concurrent {label} request should not produce SamplerError, got: {results:?}" - ); - assert!( - matches!(results.last(), Some(GeneratedTokenResult::Done)), - "Concurrent {label} request should end with Done, got: {results:?}" - ); - } + .filter(|result| matches!(result, GeneratedTokenResult::Token(_))) + .count(); + + assert!( + token_count > 0, + "Concurrent {label} request should produce at least one token, got: {results:?}" + ); + assert!( + !results + .iter() + .any(|result| matches!(result, GeneratedTokenResult::SamplerError(_))), + "Concurrent {label} request should not produce SamplerError, got: {results:?}" + ); + assert!( + matches!(results.last(), Some(GeneratedTokenResult::Done)), + "Concurrent {label} request should end with Done, got: {results:?}" + ); + } - drop(plain_stop_tx); - drop(multimodal_stop_tx); + drop(plain_stop_tx); + drop(multimodal_stop_tx); - managed_model.shutdown()?; + managed_model.shutdown()?; - Ok(()) -}); + Ok(()) + } +); diff --git a/paddler_model_tests/tests/continuous_batch_partial_offload.rs b/paddler_model_tests/tests/continuous_batch_partial_offload.rs index 21dae8d9..2d82aa88 100644 --- a/paddler_model_tests/tests/continuous_batch_partial_offload.rs +++ b/paddler_model_tests/tests/continuous_batch_partial_offload.rs @@ -19,57 +19,60 @@ use tokio::sync::mpsc; const QWEN3_0_6B_PARTIAL_GPU_LAYER_COUNT: u32 = 14; -gpu_device_test!(continuous_batch_partial_offload_generates_tokens, |device| { - send_logs_to_tracing(LogOptions::default()); +gpu_device_test!( + continuous_batch_partial_offload_generates_tokens, + |device| { + send_logs_to_tracing(LogOptions::default()); - let managed_model = ManagedModel::from_huggingface(ManagedModelParams { - inference_parameters: InferenceParameters { - n_gpu_layers: QWEN3_0_6B_PARTIAL_GPU_LAYER_COUNT, - ..InferenceParameters::default() - }, - model: HuggingFaceModelReference { - filename: "Qwen3-0.6B-Q8_0.gguf".to_owned(), - repo_id: "Qwen/Qwen3-0.6B-GGUF".to_owned(), - revision: "main".to_owned(), - }, - multimodal_projection: None, - slots: 1, - }) - .await?; + let managed_model = ManagedModel::from_huggingface(ManagedModelParams { + inference_parameters: InferenceParameters { + n_gpu_layers: QWEN3_0_6B_PARTIAL_GPU_LAYER_COUNT, + ..InferenceParameters::default() + }, + model: HuggingFaceModelReference { + filename: "Qwen3-0.6B-Q8_0.gguf".to_owned(), + repo_id: "Qwen/Qwen3-0.6B-GGUF".to_owned(), + revision: "main".to_owned(), + }, + multimodal_projection: None, + slots: 1, + }) + .await?; - let (generated_tokens_tx, generated_tokens_rx) = mpsc::unbounded_channel(); - let (_stop_tx, generate_tokens_stop_rx) = mpsc::unbounded_channel::<()>(); + let (generated_tokens_tx, generated_tokens_rx) = mpsc::unbounded_channel(); + let (_stop_tx, generate_tokens_stop_rx) = mpsc::unbounded_channel::<()>(); - managed_model - .handle() - .command_tx - .send(ContinuousBatchSchedulerCommand::ContinueFromRawPrompt( - ContinueFromRawPromptRequest { - generated_tokens_tx, - generate_tokens_stop_rx, - params: ContinueFromRawPromptParams { - grammar: None, - max_tokens: 16, - raw_prompt: "Count from 1 to 5:".to_owned(), + managed_model + .handle() + .command_tx + .send(ContinuousBatchSchedulerCommand::ContinueFromRawPrompt( + ContinueFromRawPromptRequest { + generated_tokens_tx, + generate_tokens_stop_rx, + params: ContinueFromRawPromptParams { + grammar: None, + max_tokens: 16, + raw_prompt: "Count from 1 to 5:".to_owned(), + }, }, - }, - )) - .map_err(|err| anyhow::anyhow!("Failed to send command: {err}"))?; + )) + .map_err(|err| anyhow::anyhow!("Failed to send command: {err}"))?; - let results = collect_generated_tokens(generated_tokens_rx).await?; + let results = collect_generated_tokens(generated_tokens_rx).await?; - let token_count = results - .iter() - .filter(|result| matches!(result, GeneratedTokenResult::Token(_))) - .count(); + let token_count = results + .iter() + .filter(|result| matches!(result, GeneratedTokenResult::Token(_))) + .count(); - assert!( - token_count > 0, - "partial-offload test produced no tokens with n_gpu_layers={QWEN3_0_6B_PARTIAL_GPU_LAYER_COUNT}" - ); - assert!(matches!(results.last(), Some(GeneratedTokenResult::Done))); + assert!( + token_count > 0, + "partial-offload test produced no tokens with n_gpu_layers={QWEN3_0_6B_PARTIAL_GPU_LAYER_COUNT}" + ); + assert!(matches!(results.last(), Some(GeneratedTokenResult::Done))); - managed_model.shutdown()?; + managed_model.shutdown()?; - Ok(()) -}); + Ok(()) + } +); diff --git a/paddler_model_tests/tests/continuous_batch_shutdown_during_generation.rs b/paddler_model_tests/tests/continuous_batch_shutdown_during_generation.rs deleted file mode 100644 index 005aafc0..00000000 --- a/paddler_model_tests/tests/continuous_batch_shutdown_during_generation.rs +++ /dev/null @@ -1,62 +0,0 @@ -#![cfg(feature = "tests_that_use_llms")] - -use anyhow::Result; -use llama_cpp_bindings::LogOptions; -use llama_cpp_bindings::send_logs_to_tracing; -use paddler::agent::continue_from_raw_prompt_request::ContinueFromRawPromptRequest; -use paddler::agent::continuous_batch_scheduler_command::ContinuousBatchSchedulerCommand; -use paddler_model_tests::managed_model::ManagedModel; -use paddler_model_tests::managed_model_params::ManagedModelParams; -use paddler_types::generated_token_result::GeneratedTokenResult; -use paddler_types::huggingface_model_reference::HuggingFaceModelReference; -use paddler_types::inference_parameters::InferenceParameters; -use paddler_types::request_params::ContinueFromRawPromptParams; -use tokio::sync::mpsc; - -#[actix_web::test] -async fn test_shutdown_during_active_generation_does_not_hang() -> Result<()> { - send_logs_to_tracing(LogOptions::default()); - - let managed_model = ManagedModel::from_huggingface(ManagedModelParams { - inference_parameters: InferenceParameters::default(), - model: HuggingFaceModelReference { - filename: "Qwen3-0.6B-Q8_0.gguf".to_owned(), - repo_id: "Qwen/Qwen3-0.6B-GGUF".to_owned(), - revision: "main".to_owned(), - }, - multimodal_projection: None, - slots: 1, - }) - .await?; - - let (generated_tokens_tx, mut generated_tokens_rx) = mpsc::unbounded_channel(); - let (_stop_tx, generate_tokens_stop_rx) = mpsc::unbounded_channel::<()>(); - - managed_model - .handle() - .command_tx - .send(ContinuousBatchSchedulerCommand::ContinueFromRawPrompt( - ContinueFromRawPromptRequest { - generated_tokens_tx, - generate_tokens_stop_rx, - params: ContinueFromRawPromptParams { - grammar: None, - max_tokens: 1000, - raw_prompt: "Write a very long essay about the history of computing".to_owned(), - }, - }, - )) - .map_err(|err| anyhow::anyhow!("Failed to send command: {err}"))?; - - // Wait for first token to confirm generation is active - let first = generated_tokens_rx.recv().await; - assert!( - matches!(first, Some(GeneratedTokenResult::Token(_))), - "Expected first result to be a token" - ); - - // Shutdown while generation is active — should not hang - managed_model.shutdown()?; - - Ok(()) -}