Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 99 additions & 0 deletions src/handlers.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -492,3 +492,102 @@ describe("multi-user mode isolation", () => {
expect(tasksB[0].prompt).toContain("grumpy");
});
});

describe("reply routing to waiting sessions", () => {
it("routes reply to waiting session instead of creating new task", async () => {
const sentMessages: string[] = [];
const clearedTaskIds: string[] = [];
const mockSessionManager = {
getWaitingForUser: (userId: string) => userId === "slack:U123"
? { taskId: "task-waiting", session: { sendMessage: (t: string) => sentMessages.push(t) } }
: null,
clearWaiting: (taskId: string) => { clearedTaskIds.push(taskId); },
};

const deps = makeDeps({
sessionManager: mockSessionManager as any,
});

const handler = createMessageHandler(deps);
const msg = makeMessage("yes, fix them all");
await handler(msg);

// Should have sent to session, not created a new task
expect(sentMessages).toEqual(["yes, fix them all"]);
expect(clearedTaskIds).toEqual(["task-waiting"]);
});

it("resumes the queue task when routing reply to waiting session", async () => {
const mockSessionManager = {
getWaitingForUser: (userId: string) => userId === "slack:U123"
? { taskId: "task-waiting", session: { sendMessage: () => {} } }
: null,
clearWaiting: () => {},
};

const deps = makeDeps({
sessionManager: mockSessionManager as any,
});

// Create a task in waiting_user state
const id = deps.queue.enqueue({ userId: "slack:U123", repo: "my-app", prompt: "original" });
deps.queue.dequeue(); // running

// Override the mock to return the real task ID
(mockSessionManager as any).getWaitingForUser = (userId: string) => userId === "slack:U123"
? { taskId: id, session: { sendMessage: () => {} } }
: null;

deps.queue.setWaiting(id);
expect(deps.queue.get(id)?.status).toBe("waiting_user");

const handler = createMessageHandler(deps);
const msg = makeMessage("yes do it");
await handler(msg);

// Queue task should be resumed (back to running)
expect(deps.queue.get(id)?.status).toBe("running");
});

it("adds user message to session history even when routing to waiting session", async () => {
const mockSessionManager = {
getWaitingForUser: () => ({ taskId: "t1", session: { sendMessage: () => {} } }),
clearWaiting: () => {},
};

const deps = makeDeps({
sessionManager: mockSessionManager as any,
});

const handler = createMessageHandler(deps);
const msg = makeMessage("my reply");
await handler(msg);

const history = deps.sessions.getHistory("slack:U123");
expect(history.some(h => h.content === "my reply")).toBe(true);
});

it("falls through to normal handling when no waiting session", async () => {
const mockSessionManager = {
getWaitingForUser: () => null,
clearWaiting: () => {},
};

const deps = makeDeps({
config: makeConfig({
repos: {},
users: { "slack:U123": { name: "testuser", repos: [] } },
}),
sessionManager: mockSessionManager as any,
});

const handler = createMessageHandler(deps);
const msg = makeMessage("discuss something");
await handler(msg);

// Should have created a discuss task (normal flow)
const tasks = deps.queue.listByUser("slack:U123", 1);
expect(tasks.length).toBe(1);
expect(tasks[0].taskType).toBe("discuss");
});
});
21 changes: 21 additions & 0 deletions src/handlers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import type { RepoRegistry } from "./repo-registry";
import type { IncomingMessage, EventAdapter, IncomingEvent } from "./adapters/types";
import type { AgentRunner } from "./runner";
import type { TraceStore } from "./trace";
import type { SessionManager } from "./session-manager";

export interface HandlerDeps {
config: Config;
Expand All @@ -19,6 +20,7 @@ export interface HandlerDeps {
schedules: ScheduleStore;
repoRegistry: RepoRegistry;
trace: TraceStore;
sessionManager?: SessionManager;
pendingReplies: Map<string, IncomingMessage>;
pendingEventReplies: Map<string, { adapter: EventAdapter; event: IncomingEvent }>;
runningProcesses: Map<string, { abort: AbortController; task: Task }>;
Expand Down Expand Up @@ -235,13 +237,20 @@ async function handleListTasks(msg: IncomingMessage, deps: HandlerDeps) {
}
const running = tasks.filter((t) => t.status === "running");
const pending = tasks.filter((t) => t.status === "pending");
const waiting = tasks.filter((t) => t.status === "waiting_user");
const lines: string[] = [];
if (running.length > 0) {
lines.push("Running:");
for (const t of running) {
lines.push(` ${t.id.slice(0, 7)} — "${t.prompt.slice(0, 60)}" on ${t.repo} (${formatDuration(t.createdAt)})`);
}
}
if (waiting.length > 0) {
lines.push("Waiting for input:");
for (const t of waiting) {
lines.push(` ${t.id.slice(0, 7)} — "${t.prompt.slice(0, 60)}" on ${t.repo} (awaiting reply)`);
}
}
if (pending.length > 0) {
lines.push("Pending:");
for (const t of pending) {
Expand Down Expand Up @@ -482,6 +491,18 @@ export function createMessageHandler(deps: HandlerDeps): (msg: IncomingMessage)
return async (msg: IncomingMessage) => {
deps.sessions.addMessage(msg.userId, "user", msg.text);

// Route reply to waiting streaming session if one exists
if (deps.sessionManager) {
const waiting = deps.sessionManager.getWaitingForUser(msg.userId);
if (waiting) {
waiting.session.sendMessage(msg.text);
deps.sessionManager.clearWaiting(waiting.taskId);
deps.queue.resume(waiting.taskId);
deps.trace.append(waiting.taskId, "lifecycle", "User reply received", msg.text.slice(0, 200));
return;
}
}

const parsed = parseMessage(msg.text);

const handlers: Record<string, () => Promise<void>> = {
Expand Down
5 changes: 5 additions & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import { startCronLoop } from "./cron";
import { ScheduleStore } from "./schedules";
import { createMessageHandler, createEventHandler } from "./handlers";
import { createWorker } from "./worker";
import { SessionManager } from "./session-manager";

const config = loadConfig();
const db = new Database(process.env.DB_PATH || "./ove.db");
Expand All @@ -32,6 +33,7 @@ const sessions = new SessionStore(db);
const trace = new TraceStore(db);
const schedules = new ScheduleStore(db);
const repoRegistry = new RepoRegistry(db);
const sessionManager = new SessionManager();

repoRegistry.migrateFromConfig(
Object.fromEntries(
Expand Down Expand Up @@ -166,6 +168,7 @@ async function main() {
getRunner,
getRunnerForRepo,
getRepoInfo,
sessionManager,
};

const handleMessage = createMessageHandler(handlerDeps);
Expand Down Expand Up @@ -219,6 +222,7 @@ async function main() {
getRunnerOptsForRepo,
getRepoInfo,
trace,
sessionManager,
});
worker.start();

Expand All @@ -244,6 +248,7 @@ async function main() {
for (const ea of eventAdapters) {
await ea.stop();
}
sessionManager.killAll();
process.exit(0);
}

Expand Down
92 changes: 91 additions & 1 deletion src/queue.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ describe("TaskQueue", () => {
describe("metrics()", () => {
it("returns zeroes on empty queue", () => {
const m = queue.metrics();
expect(m.counts).toEqual({ pending: 0, running: 0, completed: 0, failed: 0 });
expect(m.counts).toEqual({ pending: 0, running: 0, completed: 0, failed: 0, waiting: 0 });
expect(m.avgDurationByRepo).toEqual([]);
expect(m.throughput.lastHour).toBe(0);
expect(m.throughput.last24h).toBe(0);
Expand Down Expand Up @@ -285,4 +285,94 @@ describe("TaskQueue", () => {
expect(m.errorRate).toBe(0);
});
});

describe("waiting_user status", () => {
it("setWaiting transitions running task to waiting_user", () => {
const id = queue.enqueue({ userId: "u1", repo: "r1", prompt: "test" });
queue.dequeue(); // running
queue.setWaiting(id);
const task = queue.get(id);
expect(task?.status).toBe("waiting_user");
});

it("resume transitions waiting_user back to running", () => {
const id = queue.enqueue({ userId: "u1", repo: "r1", prompt: "test" });
queue.dequeue();
queue.setWaiting(id);
queue.resume(id);
const task = queue.get(id);
expect(task?.status).toBe("running");
});

it("dequeue skips waiting_user tasks", () => {
const id = queue.enqueue({ userId: "u1", repo: "r1", prompt: "test" });
queue.dequeue();
queue.setWaiting(id);
queue.enqueue({ userId: "u1", repo: "r1", prompt: "test2" });
const next = queue.dequeue();
expect(next).toBeNull();
});

it("resetStale also resets waiting_user tasks", () => {
const id = queue.enqueue({ userId: "u1", repo: "r1", prompt: "test" });
queue.dequeue();
queue.setWaiting(id);
const count = queue.resetStale();
expect(count).toBe(1);
expect(queue.get(id)?.status).toBe("failed");
});

it("listActive includes waiting_user tasks", () => {
const id = queue.enqueue({ userId: "u1", repo: "r1", prompt: "test" });
queue.dequeue();
queue.setWaiting(id);
const active = queue.listActive();
expect(active.some(t => t.id === id)).toBe(true);
});

it("getWaitingForUser returns waiting_user task for a user", () => {
const id = queue.enqueue({ userId: "u1", repo: "r1", prompt: "test" });
queue.dequeue();
queue.setWaiting(id);
const waiting = queue.getWaitingForUser("u1");
expect(waiting?.id).toBe(id);
});

it("getWaitingForUser returns null when no waiting tasks", () => {
const waiting = queue.getWaitingForUser("u1");
expect(waiting).toBeNull();
});
});

it("cancel works on waiting_user tasks", () => {
const id = queue.enqueue({ userId: "u1", repo: "r1", prompt: "test" });
queue.dequeue();
queue.setWaiting(id);
const cancelled = queue.cancel(id);
expect(cancelled).toBe(true);
expect(queue.get(id)?.status).toBe("failed");
});

it("stats counts waiting_user tasks", () => {
const id = queue.enqueue({ userId: "u1", repo: "r1", prompt: "test" });
queue.dequeue();
queue.setWaiting(id);
const stats = queue.stats();
expect(stats.waiting).toBe(1);
});

describe("sessionId tracking", () => {
it("stores sessionId via setSessionId", () => {
const id = queue.enqueue({ userId: "u1", repo: "r1", prompt: "test" });
queue.setSessionId(id, "ses-abc-123");
const task = queue.get(id);
expect(task?.sessionId).toBe("ses-abc-123");
});

it("sessionId is null by default", () => {
const id = queue.enqueue({ userId: "u1", repo: "r1", prompt: "test" });
const task = queue.get(id);
expect(task?.sessionId).toBeNull();
});
});
});
Loading