Skip to content

fix: pool write lock held during entire prompt streaming causes cross-session deadlock #58

@ruan330

Description

@ruan330

Problem

SessionPool::with_connection() acquires a write lock on the entire connections HashMap and holds it for the full duration of stream_prompt() — which can run for minutes during long tool calls (e.g., flutter build, git clone, test suites).

pub async fn with_connection(&self, thread_id: &str, f: F) -> Result<R> {
    let mut conns = self.connections.write().await;  // ← global write lock
    let conn = conns.get_mut(thread_id)?;
    f(conn).await  // ← runs for minutes, lock held the entire time
}

This means one busy session blocks ALL other sessions, the status/management API, cleanup tasks, and even new session creation. The entire broker becomes unresponsive until the busy session's prompt completes.

Reproduction

  1. Start agent-broker with claude-agent-acp backend
  2. Open Thread A, ask the agent to run a long command (e.g., flutter build linux)
  3. While Thread A is busy, open Thread B and send any message
  4. Thread B hangs indefinitely — get_or_create waits for the write lock that Thread A holds

Impact

  • Cross-session deadlock: one slow session blocks all others
  • Status API unresponsive: status() needs a read lock, blocked by the write lock
  • Cleanup blocked: cleanup_idle() needs a write lock, blocked
  • New sessions blocked: get_or_create() needs a write lock, blocked
  • Same-thread follow-up blocked: user sends a message while agent is busy → infinite wait

This affects any ACP backend that runs long tool calls. Claude Code is particularly affected because it frequently spawns subagents, runs build commands, and executes test suites that take minutes.

Root Cause

The lock granularity is wrong. with_connection needs mutable access to one connection, but locks the entire HashMap. This is because HashMap::get_mut requires &mut self on the map, which requires a write lock.

Proposed Solution

Replace HashMap<String, AcpConnection> with HashMap<String, Arc<Mutex<AcpConnection>>>.

The pool RwLock is now only held briefly for lookups (read lock, milliseconds). Each connection has its own Mutex, so sessions operate independently.

// Before: global write lock for the entire streaming duration
pub async fn with_connection(&self, thread_id: &str, f: F) -> Result<R> {
    let mut conns = self.connections.write().await;  // blocks everything
    let conn = conns.get_mut(thread_id)?;
    f(conn).await  // minutes
}

// After: brief read lock for lookup, per-connection lock for use
pub async fn get_connection(&self, thread_id: &str) -> Result<Arc<Mutex<AcpConnection>>> {
    let conns = self.connections.read().await;  // milliseconds, shared
    conns.get(thread_id).cloned()
        .ok_or_else(|| anyhow!("no connection"))
}
// Caller locks only their own connection:
// let conn = pool.get_connection(thread_key).await?;
// let mut guard = conn.lock().await;  // only blocks this thread

Additional fix: replace idle timeout with alive check

The current notification loop has no timeout — if the prompt response is lost, it blocks forever. Adding an idle timeout is fragile because legitimate long tool calls (5+ minutes) send no ACP notifications during execution, causing false timeouts.

Better approach: periodically check if the agent process is still alive:

loop {
    tokio::select! {
        msg = rx.recv() => { /* process notification */ },
        _ = tokio::time::sleep(Duration::from_secs(30)) => {
            if !conn.alive() {
                warn!("agent process died");
                break;
            }
            // alive → keep waiting, no matter how long
        }
    }
}

This correctly handles:

  • Long tool calls (process alive → wait) ✅
  • Process crash (process dead → break in 30s) ✅
  • No false timeouts ✅

Why this is the right fix

Alternative Problem
Increase timeout Doesn't fix cross-session blocking
Checkout pattern (remove from map, use, put back) Doesn't support future prompt queueing; lose connection on panic
RwLock<HashMap<String, RwLock<AcpConnection>>> Nested RwLocks add complexity; Mutex is simpler for exclusive access
RwLock<HashMap<String, Arc<Mutex<AcpConnection>>>> Correct granularity, standard connection pool pattern, enables future prompt queueing

The Arc<Mutex> approach also naturally supports future enhancements like ACP prompt queueing: when a connection's Mutex is held, callers can detect "busy" via try_lock() and queue the prompt instead of blocking.

Changes

File Change
pool.rs HashMap<String, Arc<Mutex<AcpConnection>>>, replace with_connection with get_connection returning Arc clone
discord.rs stream_prompt uses get_connection + per-connection lock, alive check loop

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't workingp1High — address this sprintsession

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions