Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,14 @@ fmt: node_modules
jarmuz-static: node_modules
./jarmuz-static.mjs

.PHONY: build
build: jarmuz-static
cargo build -p paddler --features web_admin_panel

.PHONY: build.cuda
build.cuda: jarmuz-static
cargo build -p paddler --features cuda,web_admin_panel

.PHONY: release
release: jarmuz-static
cargo build --release -p paddler --features web_admin_panel
Expand All @@ -51,10 +59,6 @@ release.cuda: jarmuz-static
release.vulkan: jarmuz-static
cargo build --release -p paddler --features web_admin_panel,vulkan

.PHONY: build
build: jarmuz-static
cargo build -p paddler --features web_admin_panel

.PHONY: test
test: test.unit test.models test.integration

Expand Down
2 changes: 0 additions & 2 deletions paddler/src/agent/continuous_batch_embedding_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,6 @@ impl<'context> ContinuousBatchEmbeddingProcessor<'context> {
return Err(anyhow!("Embeddings are not enabled"));
}

self.llama_context.clear_kv_cache();

let tokens_lines_list = input_batch
.into_iter()
.map(|input| {
Expand Down
247 changes: 163 additions & 84 deletions paddler/src/agent/continuous_batch_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,18 @@ use crate::decoded_image::DecodedImage;
use crate::dispenses_slots::DispensesSlots;
use crate::slot_aggregated_status::SlotAggregatedStatus;

struct GeneratingContribution {
request_index: usize,
batch_position: i32,
}

struct IngestingContribution {
request_index: usize,
chunk_size: usize,
is_last_chunk: bool,
last_batch_position: i32,
}

pub struct ContinuousBatchScheduler {
active_requests: Vec<ContinuousBatchActiveRequest>,
command_rx: Receiver<ContinuousBatchSchedulerCommand>,
Expand Down Expand Up @@ -729,66 +741,75 @@ impl ContinuousBatchScheduler {
}

fn execute_one_iteration(&mut self) -> Result<()> {
self.advance_generating_requests();

let batch_n_tokens = self.scheduler_context.inference_parameters.batch_n_tokens;
let max_sequences = self.active_requests.len();

#[expect(
clippy::cast_possible_truncation,
clippy::cast_possible_wrap,
reason = "token counts and positions fit in i32 for llama.cpp FFI"
)]
let mut batch = LlamaBatch::new(batch_n_tokens, max_sequences.max(1) as i32)?;
loop {
let max_sequences = self.active_requests.len();

let mut current_batch_token_count: usize = 0;
#[expect(
clippy::cast_possible_truncation,
clippy::cast_possible_wrap,
reason = "token counts and positions fit in i32 for llama.cpp FFI"
)]
let mut batch = LlamaBatch::new(batch_n_tokens, max_sequences.max(1) as i32)?;

current_batch_token_count +=
self.sample_generating_requests_into_batch(&mut batch, batch_n_tokens)?;
let mut generating_contributions: Vec<GeneratingContribution> = Vec::new();
let mut ingesting_contributions: Vec<IngestingContribution> = Vec::new();

self.ingest_prompt_tokens_into_batch(
&mut batch,
batch_n_tokens,
current_batch_token_count,
)?;
let mut current_batch_token_count: usize = 0;

if batch.n_tokens() == 0 {
return Ok(());
}
current_batch_token_count += self.add_generating_pending_tokens_to_batch(
&mut batch,
batch_n_tokens,
&mut generating_contributions,
)?;

debug!(
"{:?}: decoding batch with {} tokens for {} active requests",
self.scheduler_context.agent_name,
batch.n_tokens(),
self.active_requests.len()
);
self.add_ingesting_prompt_chunks_to_batch(
&mut batch,
batch_n_tokens,
current_batch_token_count,
&mut ingesting_contributions,
)?;

if batch.n_tokens() == 0 {
return Ok(());
}

debug!(
"{:?}: decoding batch with {} tokens for {} active requests",
self.scheduler_context.agent_name,
batch.n_tokens(),
self.active_requests.len()
);

if let Err(err) = self.llama_context.decode(&mut batch) {
match err {
DecodeError::NoKvCacheSlot => {
match self.llama_context.decode(&mut batch) {
Ok(()) => {
self.commit_contributions(&generating_contributions, &ingesting_contributions);

return Ok(());
}
Err(DecodeError::NoKvCacheSlot) => {
self.evict_largest_sequence();

return self.execute_one_iteration();
if self.active_requests.is_empty() {
return Ok(());
}
}
Comment thread
mcharytoniuk marked this conversation as resolved.
DecodeError::Aborted | DecodeError::NTokensZero => {
Err(DecodeError::Aborted | DecodeError::NTokensZero) => {
return Ok(());
}
DecodeError::Unknown(error_code) => {
Err(DecodeError::Unknown(error_code)) => {
return Err(anyhow!(
"Decode failed with unknown error code: {error_code}"
));
}
}
}

Ok(())
}

fn sample_generating_requests_into_batch(
&mut self,
batch: &mut LlamaBatch,
batch_n_tokens: usize,
) -> Result<usize> {
let mut tokens_added: usize = 0;

fn advance_generating_requests(&mut self) {
for active_request in &mut self.active_requests {
if !matches!(
active_request.phase,
Expand All @@ -797,44 +818,38 @@ impl ContinuousBatchScheduler {
continue;
}

if tokens_added >= batch_n_tokens {
break;
if active_request.pending_sampled_token.is_some() {
continue;
}

let sampled_token = if let Some(harvested) =
active_request.pending_sampled_token.take()
{
harvested
} else {
let Some(batch_index) = active_request.i_batch else {
continue;
};

match sample_token_at_batch_index(
&self.llama_context,
batch_index,
&mut active_request.chain,
&mut active_request.grammar_sampler,
) {
Ok(token) => token,
Err(result) => {
error!(
"{:?}: sequence {} sampling error: {result:?}",
self.scheduler_context.agent_name, active_request.sequence_id
);

if active_request.generated_tokens_tx.send(result).is_err() {
warn!(
"{:?}: failed to send result to client (receiver dropped)",
self.scheduler_context.agent_name
);
}
let Some(batch_index) = active_request.i_batch else {
continue;
};

active_request.i_batch = None;
active_request.phase = ContinuousBatchRequestPhase::Completed;
let sampled_token = match sample_token_at_batch_index(
&self.llama_context,
batch_index,
&mut active_request.chain,
&mut active_request.grammar_sampler,
) {
Ok(token) => token,
Err(result) => {
error!(
"{:?}: sequence {} sampling error: {result:?}",
self.scheduler_context.agent_name, active_request.sequence_id
);

continue;
if active_request.generated_tokens_tx.send(result).is_err() {
warn!(
"{:?}: failed to send result to client (receiver dropped)",
self.scheduler_context.agent_name
);
}

active_request.i_batch = None;
active_request.phase = ContinuousBatchRequestPhase::Completed;

continue;
}
};

Expand Down Expand Up @@ -925,16 +940,48 @@ impl ContinuousBatchScheduler {
continue;
}

active_request.i_batch = Some(batch.n_tokens());
active_request.pending_sampled_token = Some(sampled_token);
}
}

fn add_generating_pending_tokens_to_batch(
&self,
batch: &mut LlamaBatch,
batch_n_tokens: usize,
contributions: &mut Vec<GeneratingContribution>,
) -> Result<usize> {
let mut tokens_added: usize = 0;

for (request_index, active_request) in self.active_requests.iter().enumerate() {
if !matches!(
active_request.phase,
ContinuousBatchRequestPhase::Generating
) {
continue;
}

let Some(pending_token) = active_request.pending_sampled_token else {
continue;
};

if tokens_added >= batch_n_tokens {
break;
}

let batch_position = batch.n_tokens();

batch.add(
sampled_token,
pending_token,
active_request.current_token_position,
&[active_request.sequence_id],
true,
)?;

active_request.current_token_position += 1;
contributions.push(GeneratingContribution {
request_index,
batch_position,
});

tokens_added += 1;
}

Expand All @@ -946,13 +993,14 @@ impl ContinuousBatchScheduler {
clippy::cast_possible_wrap,
reason = "token counts and positions fit in i32 for llama.cpp FFI"
)]
fn ingest_prompt_tokens_into_batch(
&mut self,
fn add_ingesting_prompt_chunks_to_batch(
&self,
batch: &mut LlamaBatch,
batch_n_tokens: usize,
mut current_batch_token_count: usize,
contributions: &mut Vec<IngestingContribution>,
) -> Result<()> {
for active_request in &mut self.active_requests {
for (request_index, active_request) in self.active_requests.iter().enumerate() {
if !matches!(active_request.phase, ContinuousBatchRequestPhase::Ingesting) {
continue;
}
Expand Down Expand Up @@ -982,19 +1030,50 @@ impl ContinuousBatchScheduler {
)?;
}

if is_last_chunk {
active_request.i_batch = Some(batch.n_tokens() - 1);
active_request.phase = ContinuousBatchRequestPhase::Generating;
}
contributions.push(IngestingContribution {
request_index,
chunk_size,
is_last_chunk,
last_batch_position: batch.n_tokens() - 1,
});

active_request.prompt_tokens_ingested += chunk_size;
active_request.current_token_position += chunk_size as i32;
current_batch_token_count += chunk_size;
}

Ok(())
}

#[expect(
clippy::cast_possible_truncation,
clippy::cast_possible_wrap,
reason = "chunk sizes fit in i32 for llama.cpp position arithmetic"
)]
fn commit_contributions(
&mut self,
generating_contributions: &[GeneratingContribution],
ingesting_contributions: &[IngestingContribution],
) {
for contribution in generating_contributions {
let request = &mut self.active_requests[contribution.request_index];

request.pending_sampled_token = None;
request.i_batch = Some(contribution.batch_position);
request.current_token_position += 1;
}

for contribution in ingesting_contributions {
let request = &mut self.active_requests[contribution.request_index];

request.prompt_tokens_ingested += contribution.chunk_size;
request.current_token_position += contribution.chunk_size as i32;

if contribution.is_last_chunk {
request.i_batch = Some(contribution.last_batch_position);
request.phase = ContinuousBatchRequestPhase::Generating;
}
}
}

fn evict_largest_sequence(&mut self) {
let mut largest_seq_index: Option<usize> = None;
let mut largest_position: i32 = -1;
Expand Down
5 changes: 4 additions & 1 deletion paddler_integration_tests/tests/agent_cuda_clean_shutdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,10 @@ async fn test_cuda_agent_exits_cleanly_on_sigterm_during_multimodal_inference()
}
}

assert!(received_token, "agent never produced a token before SIGTERM");
assert!(
received_token,
"agent never produced a token before SIGTERM"
);

let exit_status = cluster
.agent
Expand Down
Loading
Loading