Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
204 changes: 96 additions & 108 deletions src/commands/agent.js
Original file line number Diff line number Diff line change
@@ -1,139 +1,127 @@
import { query } from "@anthropic-ai/claude-agent-sdk";
import { setupWorktree } from "../utils/git.js";

export const agent = async (deps) => {
import {
buildQueryOptions,
handleSessionInit,
saveMessageContent,
checkInterruption
} from "../utils/agent.js";

/**
* Process a single session with the Claude agent
*/
export const processSession = async (session, deps) => {
const { sessionService, configService } = deps;
const readySessions = await sessionService.getSessionsByStatus({ status: "ready" });

if (readySessions.length === 0) {
console.log("No sessions with status 'ready' found");
return;
}
console.log(`\nRunning agent for session: ${session.sessionId}`);
console.log(`Messages count: ${session.messages.length}`);

console.log(`Found ${readySessions.length} ready sessions`);
await sessionService.updateSessionStatus({
sessionId: session.sessionId,
status: "in-progress"
});

// Process all ready sessions
for (const session of readySessions) {
console.log(`\nRunning agent for session: ${session.sessionId}`);
console.log(`Messages count: ${session.messages.length}`);
// Get project repository
const project = sessionService.getProjectById({ projectId: session.project });
if (!project || !project.gitRepository) {
throw new Error(`No repository found for project ${session.project} in config file.`);
}

try {
await sessionService.updateSessionStatus({
sessionId: session.sessionId,
status: "in-progress"
});
// Setup git worktree
const worktreePath = await setupWorktree(session.sessionId, project.gitRepository);
console.log(`\nWorktree ready at: ${worktreePath}\n`);

// Get system prompt if preset specified
let systemPrompt = null;
if (session.promptPreset) {
systemPrompt = configService.getPrompt(session.promptPreset);
if (systemPrompt) {
console.log(`Using system prompt preset: ${session.promptPreset}`);
} else {
console.warn(`Prompt preset '${session.promptPreset}' not found in config. Using default.`);
}
}

// Get project repository
const project = sessionService.getProjectById({ projectId: session.project });
if (!project || !project.gitRepository) {
throw new Error(`No repository found for project ${session.project} in config file.`);
// Get existing claude session ID for resume
let claudeSessionId = await sessionService.getClaudeSessionIdBySessionId({
sessionId: session.sessionId
});

const queryOptions = buildQueryOptions({ worktreePath, claudeSessionId, systemPrompt });
const lastMessage = session.messages[session.messages.length - 1];

try {
const result = query({
prompt: lastMessage.content,
options: queryOptions,
});

for await (const message of result) {
// Check for user interruption
if (await checkInterruption({ sessionId: session.sessionId, sessionService })) {
console.warn(`Interruption detected for session ${session.sessionId}.`);
break;
}

// Setup git worktree using project repository
const worktreePath = await setupWorktree(session.sessionId, project.gitRepository);
console.log(`\nWorktree ready at: ${worktreePath}\n`);
// Handle session init
claudeSessionId = await handleSessionInit({
message,
sessionId: session.sessionId,
claudeSessionId,
sessionService
});

let claudeSessionId = await sessionService.getClaudeSessionIdBySessionId({
sessionId: session.sessionId
// Save message content
await saveMessageContent({
message,
sessionId: session.sessionId,
sessionService
});
}
} catch (error) {
console.warn(`Error processing session ${session.sessionId}:`, error);
}

const lastMessage = session.messages[session.messages.length - 1];
const userPrompt = `${lastMessage.content}`;

const queryOptions = {
model: "opus",
settingSources: ['project'],
canUseTool: (toolName, inputData) => {
return {
behavior: "allow",
updatedInput: inputData,
};
},
cwd: worktreePath,
};

if (session.promptPreset) {
const systemPrompt = configService.getPrompt(session.promptPreset);
if (systemPrompt) {
queryOptions.systemPrompt = systemPrompt;
console.log(`Using system prompt preset: ${session.promptPreset}`);
} else {
console.warn(`Prompt preset '${session.promptPreset}' not found in config. Using default.`,);
}
}
// Final interruption check
if (await checkInterruption({ sessionId: session.sessionId, sessionService })) {
console.warn(`Work on session ${session.sessionId} was interrupted by user. Discarding results.`);
return false;
}

if (claudeSessionId) {
queryOptions.resume = claudeSessionId;
}
await sessionService.updateSessionStatus({
sessionId: session.sessionId,
status: "review"
});

try {
const result = query({
prompt: userPrompt,
options: queryOptions,
});

//const assistantContent = [];

for await (const message of result) {

const currentSessionState = await sessionService.getViewBySessionId({ sessionId: session.sessionId });
if (currentSessionState.status !== "in-progress") {
console.warn(`Interruption detected for session ${session.sessionId}.`);
break;
}
if (!claudeSessionId && message.type === 'system' && message.subtype === 'init' && message.session_id) {
claudeSessionId = message.session_id;
await sessionService.addClaudeSessionRecord({
sessionId: session.sessionId,
claudeSessionId: message.session_id,
});
console.log(`Claude session started and saved for ${session.sessionId}`);
}

// Collect all content from the streaming response
if (message.message?.content) {
await sessionService.appendSessionMessages({
sessionId: session.sessionId,
messages: [{
// Doc: https://platform.claude.com/docs/en/agent-sdk/typescript#message-types
role: message.message.role,
content: message.message.content,
timestamp: Date.now()
}]
});
}
}

} catch (error) {
console.warn(`Error processing session ${session.sessionId}:`, error);
}
console.log(`\nSession ${session.sessionId} moved to review`);
return true;
};

const currentSessionState = await sessionService.getViewBySessionId({ sessionId: session.sessionId });
if (currentSessionState.status !== "in-progress") {
console.warn(`Work on session ${session.sessionId} was interrupted by user. Discarding results.`);
continue;
}
export const agent = async (deps) => {
const { sessionService } = deps;
const readySessions = await sessionService.getSessionsByStatus({ status: "ready" });

await sessionService.updateSessionStatus({
sessionId: session.sessionId,
status: "review"
});
if (readySessions.length === 0) {
console.log("No sessions with status 'ready' found");
return;
}

console.log(`\nSession ${session.sessionId} moved to review`);
console.log(`Found ${readySessions.length} ready sessions`);

for (const session of readySessions) {
try {
await processSession(session, deps);
} catch (error) {
console.warn(`Failed to process session ${session.sessionId}:`, error);
// Continue to next session
}
}

console.log(`\nAll ${readySessions.length} sessions processed`);
}
};

export const agentStart = async (deps) => {
while (true) {
await agent(deps);
// Wait 5 seconds before next run
await new Promise(resolve => setTimeout(resolve, 5000));
}
}
};
63 changes: 63 additions & 0 deletions src/utils/agent.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/**
* Build query options for Claude agent
*/
export const buildQueryOptions = ({ worktreePath, claudeSessionId, systemPrompt }) => {
const options = {
model: "opus",
settingSources: ['project'],
canUseTool: (toolName, inputData) => ({
behavior: "allow",
updatedInput: inputData,
}),
cwd: worktreePath,
};

if (systemPrompt) {
options.systemPrompt = systemPrompt;
}

if (claudeSessionId) {
options.resume = claudeSessionId;
}

return options;
};

/**
* Handle Claude session initialization message
*/
export const handleSessionInit = async ({ message, sessionId, claudeSessionId, sessionService }) => {
if (!claudeSessionId && message.type === 'system' && message.subtype === 'init' && message.session_id) {
await sessionService.addClaudeSessionRecord({
sessionId,
claudeSessionId: message.session_id,
});
console.log(`Claude session started and saved for ${sessionId}`);
return message.session_id;
}
return claudeSessionId;
};

/**
* Save message content to session
*/
export const saveMessageContent = async ({ message, sessionId, sessionService }) => {
if (message.message?.content) {
await sessionService.appendSessionMessages({
sessionId,
messages: [{
role: message.message.role,
content: message.message.content,
timestamp: Date.now()
}]
});
}
};

/**
* Check if session was interrupted by user
*/
export const checkInterruption = async ({ sessionId, sessionService }) => {
const state = await sessionService.getViewBySessionId({ sessionId });
return state.status !== "in-progress";
};
66 changes: 66 additions & 0 deletions tasks/TASK/000/TASK-046.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
---
title: Simple parallelization of agentStart for multi-thread processing
status: todo
priority: low
---

## Problem
agentStart processes all sessions sequentially. Different Discord threads should run in parallel while keeping same-thread sessions serial.

## Solution - Minimal Changes

### 1. Add thread tracking object in agentStart
```javascript
const runningThreads = {}; // Track which threads are processing
```

### 2. Split session fetching and processing
- Keep getSessionsByStatus to fetch sessions
- Group by thread ID
- Process each thread group independently

### 3. Key code changes in agent.js:
```javascript
// In agentStart function
const runningThreads = {};

while (true) {
const readySessions = await sessionService.getSessionsByStatus({ status: "ready" });

// Group sessions by Discord thread
const sessionsByThread = {};
for (const session of readySessions) {
const threadId = await getThreadId(session, discordService);
if (!sessionsByThread[threadId]) sessionsByThread[threadId] = [];
sessionsByThread[threadId].push(session);
}

// Process each thread's sessions
for (const [threadId, sessions] of Object.entries(sessionsByThread)) {
if (runningThreads[threadId]) continue; // Skip if already running

runningThreads[threadId] = true;
processThreadSessions(sessions, deps).finally(() => {
delete runningThreads[threadId];
});
}

await new Promise(r => setTimeout(r, 5000));
}
```

### 4. Add helper function to process thread sessions
```javascript
async function processThreadSessions(sessions, deps) {
for (const session of sessions) {
// Existing session processing logic
await processSession(session, deps);
}
}
```

## Benefits
- Different threads run in parallel
- Same thread stays serial (no race conditions)
- Super simple - just an object to track running threads
- No complex libraries or major refactoring