From 0048814b4b7e204b9ba9e01666f697d047fb84af Mon Sep 17 00:00:00 2001 From: krishung5 Date: Thu, 6 Nov 2025 17:02:09 -0800 Subject: [PATCH 1/2] WIP: Fix gpu leak Signed-off-by: krishung5 --- .../block_manager/vllm/connector/leader.rs | 20 +++++----- .../vllm/connector/leader/slot.rs | 39 ++++++++++++++++++- 2 files changed, 48 insertions(+), 11 deletions(-) diff --git a/lib/kvbm/src/block_manager/vllm/connector/leader.rs b/lib/kvbm/src/block_manager/vllm/connector/leader.rs index 363be2ace5..95207b5bb5 100644 --- a/lib/kvbm/src/block_manager/vllm/connector/leader.rs +++ b/lib/kvbm/src/block_manager/vllm/connector/leader.rs @@ -507,31 +507,33 @@ impl Leader for KvConnectorLeader { // grab the slot let shared_slot = self.slot_manager().get_slot(&request_id)?; - // mark the slot as finished + // Acquire lock BEFORE marking as finished + // This ensures we check state and prevent new operations from being created let mut slot = shared_slot .lock() .map_err(|e| anyhow::anyhow!("Failed to lock slot: {}", e))?; - slot.mark_as_finished(self.iteration_counter)?; - // todo: allow the request to resolve when it should exit - // the request may have some outstanding operations - // we would like to inform it to shutdown, then have it signal to the work that is officially gone, - // then we can remove the slot and trigger the worker to clean up as well. + // Mark the slot as finished (sets state to Finishing if there are operations, + // or Finished if all operations are complete) + slot.mark_as_finished(self.iteration_counter)?; // remove the request from the inflight requests self.inflight_requests.remove(&request_id); - // remove it from the manager as we will never use it again - self.slot_manager().remove_slot(&request_id)?; - // if the slot has finished, we can return false to vllm, indicating all gpu blocks are free to be reused // otherwise, we return true, which means there are still outstanding operations on gpu blocks which // must be awaited before the gpu blocks can be reused. if we return true, then it is the worker side // of the connector api which will be used to inform vllm that the request is finished. if let SlotState::Finished = slot.state() { + // All operations complete - safe to remove slot and tell vLLM blocks are free + self.slot_manager().remove_slot(&request_id)?; Ok(false) } else { debug_assert!(matches!(slot.state(), SlotState::Finishing)); + // Still has pending operations - keep slot alive for worker to process + // Don't remove slot here. Worker needs it to process the finish event. + // Worker will remove it after verifying all operations are complete. + // The lock on the slot prevents new operations from being created in offload_blocks() Ok(true) } } diff --git a/lib/kvbm/src/block_manager/vllm/connector/leader/slot.rs b/lib/kvbm/src/block_manager/vllm/connector/leader/slot.rs index c633f74d7c..2504e7a073 100644 --- a/lib/kvbm/src/block_manager/vllm/connector/leader/slot.rs +++ b/lib/kvbm/src/block_manager/vllm/connector/leader/slot.rs @@ -989,6 +989,12 @@ impl VllmConnectorSlot { block_ids: &[BlockId], token_blocks: &[TokenBlock], ) -> Result<(), SlotError> { + // Check if slot is in Finishing state before creating operations + // If we're finishing, don't create new operations + if matches!(self.state, SlotState::Finishing | SlotState::Finished) { + return Ok(()); + } + assert!(block_ids.len() == token_blocks.len()); let operation_id = uuid::Uuid::new_v4(); @@ -1173,8 +1179,8 @@ impl LocalTransferEngine { task_token: CancellationToken, kvbm_metrics: KvbmMetrics, ) -> anyhow::Result<()> { - let (onboard_tx, mut onboard_rx) = mpsc::unbounded_channel(); - let (offload_tx, mut offload_rx) = mpsc::unbounded_channel(); + let (onboard_tx, mut onboard_rx) = mpsc::unbounded_channel::(); + let (offload_tx, mut offload_rx) = mpsc::unbounded_channel::(); // Clone resources needed for tasks let block_manager_offload = self.block_manager.clone(); @@ -1212,6 +1218,10 @@ impl LocalTransferEngine { tracing::debug!("LocalOffloadTask: received cancellation signal"); break; } + + let request_id = req.request_id.clone(); + let operation_id = req.operation_id; + if let Err(e) = process_offload_request( req, &block_manager_offload, @@ -1221,6 +1231,31 @@ impl LocalTransferEngine { .await { tracing::error!("LocalOffloadTask: error processing request: {:?}", e); + + // Notify scheduler that this operation is "complete" (even though it failed) + // Create a fake/immediate transfer request that completes instantly + // This increments the workers' completed counter so they can progress + let fake_xfer = BlockTransferRequest { + from_pool: BlockTransferPool::Device, // Use valid Device->Host transfer type + to_pool: BlockTransferPool::Host, // (offload path, but no blocks) + blocks: vec![], // Empty - nothing to transfer + connector_req: Some(LeaderTransferRequest { + request_id: request_id.clone(), + uuid: operation_id, + requirement: None, + request_type: RequestType::Immediate, // Immediate = completes instantly + }), + }; + + match leader_offload.transfer_blocks_request(fake_xfer).await { + Ok(notify_receiver) => { + // Wait for the fake transfer to "complete" (should be instant) + let _ = notify_receiver.await; + } + Err(_xfer_err) => { + // Failed to create completion notification - error already logged above + } + } } } Ok(()) From dff6fcce513e2d1917f5802e2b5a101a9249f285 Mon Sep 17 00:00:00 2001 From: krishung5 Date: Fri, 7 Nov 2025 11:17:45 -0800 Subject: [PATCH 2/2] Address comment --- .../vllm/connector/leader/slot.rs | 42 ++++++++++++++----- 1 file changed, 31 insertions(+), 11 deletions(-) diff --git a/lib/kvbm/src/block_manager/vllm/connector/leader/slot.rs b/lib/kvbm/src/block_manager/vllm/connector/leader/slot.rs index 2504e7a073..1ddc70584a 100644 --- a/lib/kvbm/src/block_manager/vllm/connector/leader/slot.rs +++ b/lib/kvbm/src/block_manager/vllm/connector/leader/slot.rs @@ -712,14 +712,35 @@ impl Slot for VllmConnectorSlot { } fn mark_as_finished(&mut self, _iteration: u64) -> Result<(), SlotError> { - self.state = SlotState::Finishing; - tracing::info!( - request_id = %self.request_id, - "request set to finish: cached_gpu_tokens: {}; cached_host_tokens: {}; cached_disk_tokens: {}", - self.tokens_cached_from_device, - self.tokens_cached_from_host, - self.tokens_cached_from_disk - ); + // Check if there are any pending operations + let has_pending_ops = self + .pending_operations + .as_ref() + .map(|ops| !ops.is_empty()) + .unwrap_or(false); + + if has_pending_ops { + // There are pending operations - need to wait for them to complete + self.state = SlotState::Finishing; + tracing::debug!( + request_id = %self.request_id, + pending_operations = self.pending_operations.as_ref().unwrap().len(), + "request set to finish (with pending operations): cached_gpu_tokens: {}; cached_host_tokens: {}; cached_disk_tokens: {}", + self.tokens_cached_from_device, + self.tokens_cached_from_host, + self.tokens_cached_from_disk + ); + } else { + // No pending operations - can immediately mark as finished + self.state = SlotState::Finished; + tracing::debug!( + request_id = %self.request_id, + "request set to finished (no pending operations): cached_gpu_tokens: {}; cached_host_tokens: {}; cached_disk_tokens: {}", + self.tokens_cached_from_device, + self.tokens_cached_from_host, + self.tokens_cached_from_disk + ); + } Ok(()) } @@ -1232,9 +1253,8 @@ impl LocalTransferEngine { { tracing::error!("LocalOffloadTask: error processing request: {:?}", e); - // Notify scheduler that this operation is "complete" (even though it failed) - // Create a fake/immediate transfer request that completes instantly - // This increments the workers' completed counter so they can progress + // Create a fake/immediate transfer request that completes instantly. + // Otherwise, worker side might stuck and cause memory leak. let fake_xfer = BlockTransferRequest { from_pool: BlockTransferPool::Device, // Use valid Device->Host transfer type to_pool: BlockTransferPool::Host, // (offload path, but no blocks)