From 7c4e8851bd4db9be5187265b59ec2f43bd2ffdb8 Mon Sep 17 00:00:00 2001 From: martinjms Date: Sat, 11 Apr 2026 19:42:23 +0300 Subject: [PATCH 1/3] Add observability: token tracking, error logging, action audit trail New modules: - logger.js: structured JSONL logger with cost estimation, used by all components - tools/logs.js: 3 tools (log_cost_summary, log_search, log_errors) so the agent can query its own usage and debug issues Wired into: - Brain: logs every API call (tokens, cost, advisor usage, duration) and every tool execution (name, args, result, duration, errors) - Kernel: logs auto-commits, rollbacks, verification pass/fail - Webchat: logs inbound messages Logs written to .state/logs/ as daily-rotated JSONL files. Cost estimation built in for Sonnet, Opus, and Haiku pricing. Closes #21 Co-Authored-By: Claude Opus 4.6 (1M context) --- server/kernel.js | 5 + server/modules/brain/index.js | 29 ++++ server/modules/channels/webchat/index.js | 3 + server/modules/logger.js | 212 +++++++++++++++++++++++ server/modules/tools/logs.js | 74 ++++++++ server/tests/logger.test.js | 171 ++++++++++++++++++ 6 files changed, 494 insertions(+) create mode 100644 server/modules/logger.js create mode 100644 server/modules/tools/logs.js create mode 100644 server/tests/logger.test.js diff --git a/server/kernel.js b/server/kernel.js index b3a3303..b0f7dd0 100644 --- a/server/kernel.js +++ b/server/kernel.js @@ -14,6 +14,7 @@ import 'dotenv/config'; import express from 'express'; import cors from 'cors'; +import logger from './modules/logger.js'; import http from 'http'; import { WebSocketServer } from 'ws'; import path from 'path'; @@ -155,6 +156,7 @@ async function autoCommit(filename) { git('add -A'); git(`commit -m "Auto: ${filename}" --no-verify`); console.log(`[kernel] Auto-committed: ${filename}`); + logger.action({ component: 'kernel', action: 'auto-commit', details: filename }); } // ============================================ @@ -194,6 +196,7 @@ async function runVerification() { stdio: ['pipe', 'pipe', 'pipe'] }); console.log('[kernel] Verification PASSED'); + logger.action({ component: 'kernel', action: 'verification-passed' }); // Mark all unverified changes as verified try { @@ -208,6 +211,7 @@ async function runVerification() { } catch (err) { console.error('[kernel] Verification FAILED'); console.error(err.stdout?.slice(-1000) || err.message); + logger.error({ component: 'kernel', error: 'Verification failed', context: err.stdout?.slice(-500) || err.message }); return false; } } @@ -219,6 +223,7 @@ async function rollback() { } console.log(`[kernel] Rolling back to ${lastGoodCommit.slice(0, 8)}...`); + logger.action({ component: 'kernel', action: 'rollback', details: { commit: lastGoodCommit.slice(0, 8) } }); clearTimeout(commitDebounce); git(`reset --hard ${lastGoodCommit}`); diff --git a/server/modules/brain/index.js b/server/modules/brain/index.js index 5c26b76..8fad5e5 100644 --- a/server/modules/brain/index.js +++ b/server/modules/brain/index.js @@ -13,6 +13,7 @@ */ import Anthropic from '@anthropic-ai/sdk'; +import logger from '../logger.js'; import fs from 'fs/promises'; import path from 'path'; @@ -90,6 +91,7 @@ export async function* process(message, { const messages = [...history, { role: 'user', content: message }]; while (true) { + const turnStart = Date.now(); const stream = client.beta.messages.stream({ model: 'claude-sonnet-4-6', max_tokens: 8192, @@ -110,6 +112,22 @@ export async function* process(message, { const response = await stream.finalMessage(); messages.push({ role: 'assistant', content: response.content }); + // Log API usage and cost + const usage = response.usage || {}; + const advisorIter = usage.iterations?.find(i => i.type === 'advisor_message'); + logger.api({ + component: 'brain', + model: 'claude-sonnet-4-6', + input_tokens: usage.input_tokens || 0, + output_tokens: usage.output_tokens || 0, + cache_read_tokens: usage.cache_read_input_tokens || 0, + cache_creation_tokens: usage.cache_creation_input_tokens || 0, + advisor_input_tokens: advisorIter?.input_tokens || 0, + advisor_output_tokens: advisorIter?.output_tokens || 0, + duration_ms: Date.now() - turnStart, + stop_reason: response.stop_reason + }); + // No tool calls — we're done if (response.stop_reason !== 'tool_use') break; @@ -122,13 +140,24 @@ export async function* process(message, { yield { type: 'tool_call', name: block.name, args: block.input }; + const toolStart = Date.now(); let result; try { result = await executeTool(block.name, block.input); } catch (err) { result = { error: err.message }; + logger.error({ component: 'brain', error: err, context: `tool:${block.name}` }); } + logger.tool({ + component: 'brain', + name: block.name, + args: block.input, + result, + duration_ms: Date.now() - toolStart, + error: result?.error + }); + const content = typeof result === 'string' ? result : JSON.stringify(result, null, 2); diff --git a/server/modules/channels/webchat/index.js b/server/modules/channels/webchat/index.js index 776a90f..777eb20 100644 --- a/server/modules/channels/webchat/index.js +++ b/server/modules/channels/webchat/index.js @@ -15,6 +15,7 @@ */ import * as brain from '../../brain/index.js'; +import logger from '../../logger.js'; const MAX_HISTORY = 50; @@ -40,6 +41,8 @@ export function createChannel({ getTools, executeTool, addWSPath, setActive, set if (data.type !== 'chat_message' || !data.text?.trim()) return; + logger.message({ channel: 'webchat', direction: 'inbound', text: data.text }); + if (busy) { ws.send(JSON.stringify({ type: 'error', error: 'Still processing previous message' })); return; diff --git a/server/modules/logger.js b/server/modules/logger.js new file mode 100644 index 0000000..602324d --- /dev/null +++ b/server/modules/logger.js @@ -0,0 +1,212 @@ +/** + * Structured Logger — observability foundation for all components + * + * Every API call, tool execution, error, and action flows through here. + * Logs are written to .state/logs/ as JSONL files (one JSON object per line), + * rotated daily. Components import and use the logger directly. + * + * Usage: + * import logger from '../modules/logger.js'; + * logger.api({ model: 'sonnet', input_tokens: 500, output_tokens: 100 }); + * logger.tool({ name: 'execute_shell', args: {...}, result: '...', tokens: 50 }); + * logger.error({ component: 'brain', error: 'timeout', stack: '...' }); + * logger.action({ component: 'kernel', action: 'hot-reload', file: 'memory.js' }); + */ + +import fs from 'fs/promises'; +import { appendFileSync, mkdirSync, existsSync } from 'fs'; +import path from 'path'; + +const STATE_DIR = path.join(process.env.WORKSPACE || '/workspace-local', '.state', 'logs'); + +// Ensure log directory exists synchronously at load time +try { mkdirSync(STATE_DIR, { recursive: true }); } catch {} + +// Pricing per million tokens (as of April 2026) +const PRICING = { + 'claude-opus-4-6': { input: 15.00, output: 75.00, cache_read: 1.50 }, + 'claude-sonnet-4-6': { input: 3.00, output: 15.00, cache_read: 0.30 }, + 'claude-haiku-4-5-20251001': { input: 0.80, output: 4.00, cache_read: 0.08 }, + // Aliases + 'opus': { input: 15.00, output: 75.00, cache_read: 1.50 }, + 'sonnet': { input: 3.00, output: 15.00, cache_read: 0.30 }, + 'haiku': { input: 0.80, output: 4.00, cache_read: 0.08 }, +}; + +function getLogFile(category) { + const date = new Date().toISOString().slice(0, 10); // YYYY-MM-DD + return path.join(STATE_DIR, `${category}-${date}.jsonl`); +} + +function write(category, data) { + const entry = { + timestamp: new Date().toISOString(), + ...data + }; + + try { + appendFileSync(getLogFile(category), JSON.stringify(entry) + '\n'); + } catch (err) { + console.error(`[logger] Failed to write ${category} log:`, err.message); + } + + return entry; +} + +function estimateCost(model, inputTokens = 0, outputTokens = 0, cacheReadTokens = 0) { + const prices = PRICING[model]; + if (!prices) return null; + return ( + (inputTokens / 1_000_000) * prices.input + + (outputTokens / 1_000_000) * prices.output + + (cacheReadTokens / 1_000_000) * prices.cache_read + ); +} + +const logger = { + /** + * Log an API call to Anthropic (brain, worker, unconscious processes) + */ + api({ component = 'brain', model, input_tokens = 0, output_tokens = 0, + cache_read_tokens = 0, cache_creation_tokens = 0, + advisor_input_tokens = 0, advisor_output_tokens = 0, + duration_ms, stop_reason, error }) { + const cost = estimateCost(model, input_tokens, output_tokens, cache_read_tokens); + const advisor_cost = advisor_input_tokens || advisor_output_tokens + ? estimateCost('opus', advisor_input_tokens, advisor_output_tokens) + : 0; + return write('api', { + component, model, input_tokens, output_tokens, + cache_read_tokens, cache_creation_tokens, + advisor_input_tokens, advisor_output_tokens, + cost_usd: cost, advisor_cost_usd: advisor_cost, + total_cost_usd: (cost || 0) + (advisor_cost || 0), + duration_ms, stop_reason, error + }); + }, + + /** + * Log a tool execution + */ + tool({ component = 'brain', name, args, result, duration_ms, error }) { + return write('tools', { + component, name, + args: typeof args === 'string' ? args.slice(0, 500) : JSON.stringify(args || {}).slice(0, 500), + result: typeof result === 'string' ? result.slice(0, 500) : JSON.stringify(result || {}).slice(0, 500), + duration_ms, error + }); + }, + + /** + * Log an error + */ + error({ component, error, context, stack }) { + const entry = write('errors', { + component, + error: typeof error === 'string' ? error : error?.message || String(error), + context, + stack: stack || (error instanceof Error ? error.stack : undefined) + }); + // Also print to stderr for immediate visibility + console.error(`[${component}] ERROR: ${entry.error}`); + return entry; + }, + + /** + * Log an action (file change, reload, rollback, message, etc.) + */ + action({ component, action, details }) { + return write('actions', { + component, action, + details: typeof details === 'string' ? details : JSON.stringify(details || {}).slice(0, 1000) + }); + }, + + /** + * Log a channel message (inbound or outbound) + */ + message({ channel, direction, userId, text, tokens }) { + return write('messages', { + channel, direction, userId, + text_length: text?.length || 0, + text_preview: text?.slice(0, 200), + tokens + }); + }, + + /** + * Read log entries from a specific category and date + */ + async readLog(category, date, limit = 100) { + const file = date + ? path.join(STATE_DIR, `${category}-${date}.jsonl`) + : getLogFile(category); + + try { + const content = await fs.readFile(file, 'utf-8'); + const lines = content.trim().split('\n').filter(Boolean); + return lines.slice(-limit).map(line => { + try { return JSON.parse(line); } catch { return { raw: line }; } + }); + } catch { + return []; + } + }, + + /** + * Get a cost summary for a date range + */ + async costSummary(date) { + const entries = await logger.readLog('api', date, 10000); + const byComponent = {}; + let totalCost = 0; + let totalInputTokens = 0; + let totalOutputTokens = 0; + + for (const entry of entries) { + const comp = entry.component || 'unknown'; + if (!byComponent[comp]) { + byComponent[comp] = { calls: 0, cost_usd: 0, input_tokens: 0, output_tokens: 0 }; + } + byComponent[comp].calls++; + byComponent[comp].cost_usd += entry.total_cost_usd || 0; + byComponent[comp].input_tokens += entry.input_tokens || 0; + byComponent[comp].output_tokens += entry.output_tokens || 0; + totalCost += entry.total_cost_usd || 0; + totalInputTokens += entry.input_tokens || 0; + totalOutputTokens += entry.output_tokens || 0; + } + + return { + date: date || new Date().toISOString().slice(0, 10), + total_calls: entries.length, + total_cost_usd: Math.round(totalCost * 10000) / 10000, + total_input_tokens: totalInputTokens, + total_output_tokens: totalOutputTokens, + by_component: byComponent + }; + }, + + /** + * List available log dates + */ + async listDates() { + try { + const files = await fs.readdir(STATE_DIR); + const dates = new Set(); + for (const f of files) { + const match = f.match(/\d{4}-\d{2}-\d{2}/); + if (match) dates.add(match[0]); + } + return [...dates].sort().reverse(); + } catch { + return []; + } + }, + + // Export for cost calculations elsewhere + estimateCost, + PRICING +}; + +export default logger; diff --git a/server/modules/tools/logs.js b/server/modules/tools/logs.js new file mode 100644 index 0000000..75bc9a9 --- /dev/null +++ b/server/modules/tools/logs.js @@ -0,0 +1,74 @@ +/** + * Log Tools — the agent can query its own observability data + * + * Tools: + * log_cost_summary — token usage and cost breakdown for a date + * log_search — search recent logs by category + * log_errors — show recent errors + */ + +import logger from '../logger.js'; + +export const tools = [ + { + name: 'log_cost_summary', + description: 'Get token usage and cost breakdown for today or a specific date. Shows total spend, breakdown by component (brain, worker, heartbeat, etc.), and token counts.', + parameters: { + type: 'object', + properties: { + date: { type: 'string', description: 'Date in YYYY-MM-DD format (default: today)' } + } + }, + async execute({ date } = {}) { + return await logger.costSummary(date); + } + }, + + { + name: 'log_search', + description: 'Search recent log entries by category. Categories: api, tools, errors, actions, messages.', + parameters: { + type: 'object', + properties: { + category: { + type: 'string', + description: 'Log category: api, tools, errors, actions, messages', + enum: ['api', 'tools', 'errors', 'actions', 'messages'] + }, + date: { type: 'string', description: 'Date in YYYY-MM-DD format (default: today)' }, + limit: { type: 'number', description: 'Max entries to return (default 20)' } + }, + required: ['category'] + }, + async execute({ category, date, limit = 20 }) { + const entries = await logger.readLog(category, date, limit); + return { + success: true, + category, + date: date || new Date().toISOString().slice(0, 10), + count: entries.length, + entries + }; + } + }, + + { + name: 'log_errors', + description: 'Show recent errors across all components. Quick way to see what went wrong.', + parameters: { + type: 'object', + properties: { + limit: { type: 'number', description: 'Max errors to return (default 10)' }, + date: { type: 'string', description: 'Date in YYYY-MM-DD format (default: today)' } + } + }, + async execute({ limit = 10, date } = {}) { + const errors = await logger.readLog('errors', date, limit); + return { + success: true, + count: errors.length, + errors + }; + } + } +]; diff --git a/server/tests/logger.test.js b/server/tests/logger.test.js new file mode 100644 index 0000000..3a4c606 --- /dev/null +++ b/server/tests/logger.test.js @@ -0,0 +1,171 @@ +import { describe, it, expect, beforeEach, afterEach } from 'vitest'; +import fs from 'fs/promises'; +import path from 'path'; + +// Set workspace to temp dir before importing +const testDir = '/tmp/test-logger-' + Date.now(); +process.env.WORKSPACE = testDir; + +// Dynamic import after setting env +const { default: logger } = await import('../modules/logger.js'); + +describe('Logger', () => { + beforeEach(async () => { + await fs.mkdir(path.join(testDir, '.state', 'logs'), { recursive: true }); + }); + + afterEach(async () => { + try { await fs.rm(testDir, { recursive: true, force: true }); } catch {} + }); + + describe('api logging', () => { + it('logs an API call with cost estimation', () => { + const entry = logger.api({ + component: 'brain', + model: 'claude-sonnet-4-6', + input_tokens: 1000, + output_tokens: 500, + duration_ms: 1200, + stop_reason: 'end_turn' + }); + + expect(entry.timestamp).toBeDefined(); + expect(entry.component).toBe('brain'); + expect(entry.model).toBe('claude-sonnet-4-6'); + expect(entry.input_tokens).toBe(1000); + expect(entry.output_tokens).toBe(500); + expect(entry.cost_usd).toBeGreaterThan(0); + expect(entry.total_cost_usd).toBeGreaterThan(0); + }); + + it('tracks advisor costs separately', () => { + const entry = logger.api({ + model: 'claude-sonnet-4-6', + input_tokens: 1000, + output_tokens: 500, + advisor_input_tokens: 800, + advisor_output_tokens: 600 + }); + + expect(entry.advisor_cost_usd).toBeGreaterThan(0); + expect(entry.total_cost_usd).toBeGreaterThan(entry.cost_usd); + }); + }); + + describe('tool logging', () => { + it('logs a tool execution', () => { + const entry = logger.tool({ + name: 'execute_shell', + args: { command: 'ls' }, + result: 'file1.js\nfile2.js', + duration_ms: 50 + }); + + expect(entry.name).toBe('execute_shell'); + expect(entry.duration_ms).toBe(50); + }); + + it('truncates long args and results', () => { + const entry = logger.tool({ + name: 'test', + args: { data: 'x'.repeat(1000) }, + result: 'y'.repeat(1000) + }); + + expect(entry.args.length).toBeLessThanOrEqual(500); + expect(entry.result.length).toBeLessThanOrEqual(500); + }); + }); + + describe('error logging', () => { + it('logs an error with component context', () => { + const entry = logger.error({ + component: 'brain', + error: new Error('API timeout'), + context: 'tool:execute_shell' + }); + + expect(entry.component).toBe('brain'); + expect(entry.error).toBe('API timeout'); + expect(entry.stack).toBeDefined(); + }); + }); + + describe('action logging', () => { + it('logs a kernel action', () => { + const entry = logger.action({ + component: 'kernel', + action: 'hot-reload', + details: 'memory.js' + }); + + expect(entry.component).toBe('kernel'); + expect(entry.action).toBe('hot-reload'); + }); + }); + + describe('message logging', () => { + it('logs a channel message with preview', () => { + const entry = logger.message({ + channel: 'webchat', + direction: 'inbound', + text: 'Hello, can you help me?' + }); + + expect(entry.channel).toBe('webchat'); + expect(entry.direction).toBe('inbound'); + expect(entry.text_length).toBe(23); + expect(entry.text_preview).toBe('Hello, can you help me?'); + }); + }); + + describe('readLog', () => { + it('reads logged entries back', async () => { + logger.api({ model: 'sonnet', input_tokens: 100, output_tokens: 50 }); + logger.api({ model: 'sonnet', input_tokens: 200, output_tokens: 100 }); + + const entries = await logger.readLog('api'); + expect(entries.length).toBe(2); + expect(entries[0].input_tokens).toBe(100); + expect(entries[1].input_tokens).toBe(200); + }); + + it('returns empty for missing logs', async () => { + const entries = await logger.readLog('api', '1999-01-01'); + expect(entries).toEqual([]); + }); + }); + + describe('costSummary', () => { + it('calculates cost breakdown by component', async () => { + logger.api({ component: 'brain', model: 'claude-sonnet-4-6', input_tokens: 1000, output_tokens: 500 }); + logger.api({ component: 'heartbeat', model: 'claude-sonnet-4-6', input_tokens: 500, output_tokens: 100 }); + logger.api({ component: 'brain', model: 'claude-sonnet-4-6', input_tokens: 2000, output_tokens: 1000 }); + + const summary = await logger.costSummary(); + expect(summary.total_calls).toBe(3); + expect(summary.total_cost_usd).toBeGreaterThan(0); + expect(summary.by_component.brain.calls).toBe(2); + expect(summary.by_component.heartbeat.calls).toBe(1); + }); + }); + + describe('estimateCost', () => { + it('calculates Sonnet costs correctly', () => { + // 1M input tokens at $3/M = $3 + const cost = logger.estimateCost('claude-sonnet-4-6', 1_000_000, 0, 0); + expect(cost).toBe(3); + }); + + it('calculates Opus costs correctly', () => { + // 1M input tokens at $15/M = $15 + const cost = logger.estimateCost('claude-opus-4-6', 1_000_000, 0, 0); + expect(cost).toBe(15); + }); + + it('returns null for unknown models', () => { + const cost = logger.estimateCost('unknown-model', 1000, 500, 0); + expect(cost).toBeNull(); + }); + }); +}); From 42cee6cf3cd8b106c77bad88ca2ee10ecedd8e90 Mon Sep 17 00:00:00 2001 From: martinjms Date: Sun, 12 Apr 2026 00:59:00 +0300 Subject: [PATCH 2/3] Add per-tool logging, full heartbeat traces, and cron traces Per-tool logs: each tool execution writes to both the aggregate tools log and a dedicated tools-{name} log (e.g. tools-execute_shell). Enables isolated debugging per tool across time. Reasoning traces: heartbeat and cron cycles now capture the entire brain conversation (text, tool calls, tool results) to traces/ logs. You can reconstruct exactly what the agent thought and did in each autonomous cycle, including partial traces on failure. 75 tests passing (3 new trace/per-tool tests). Co-Authored-By: Claude Opus 4.6 (1M context) --- server/modules/logger.js | 32 +++++++++++++++-- server/modules/scheduler/heartbeat.js | 40 +++++++++++++++++++-- server/modules/scheduler/index.js | 38 ++++++++++++++++---- server/modules/tools/logs.js | 5 ++- server/tests/logger.test.js | 51 +++++++++++++++++++++++++++ 5 files changed, 150 insertions(+), 16 deletions(-) diff --git a/server/modules/logger.js b/server/modules/logger.js index 602324d..e263e4c 100644 --- a/server/modules/logger.js +++ b/server/modules/logger.js @@ -86,15 +86,23 @@ const logger = { }, /** - * Log a tool execution + * Log a tool execution — writes to both the aggregate tools log + * and a per-tool log (tools-{name}-YYYY-MM-DD.jsonl) */ tool({ component = 'brain', name, args, result, duration_ms, error }) { - return write('tools', { + const entry = { component, name, args: typeof args === 'string' ? args.slice(0, 500) : JSON.stringify(args || {}).slice(0, 500), result: typeof result === 'string' ? result.slice(0, 500) : JSON.stringify(result || {}).slice(0, 500), duration_ms, error - }); + }; + write('tools', entry); + // Per-tool log for isolated debugging + if (name) { + const safeName = name.replace(/[^a-zA-Z0-9_-]/g, '_'); + write(`tools-${safeName}`, entry); + } + return entry; }, /** @@ -122,6 +130,24 @@ const logger = { }); }, + /** + * Log a full reasoning trace (heartbeat, cron, or any autonomous execution). + * Captures the entire brain conversation: text, tool calls, tool results. + */ + trace({ component = 'heartbeat', trigger, events, duration_ms, total_tokens, total_cost_usd }) { + return write('traces', { + component, trigger, + events: (events || []).map(e => { + if (e.type === 'text') return { type: 'text', text: e.text?.slice(0, 2000) }; + if (e.type === 'tool_call') return { type: 'tool_call', name: e.name, args: JSON.stringify(e.args || {}).slice(0, 500) }; + if (e.type === 'tool_result') return { type: 'tool_result', name: e.name, result: e.result?.slice(0, 1000) }; + return { type: e.type }; + }), + event_count: events?.length || 0, + duration_ms, total_tokens, total_cost_usd + }); + }, + /** * Log a channel message (inbound or outbound) */ diff --git a/server/modules/scheduler/heartbeat.js b/server/modules/scheduler/heartbeat.js index 109be87..e94bd81 100644 --- a/server/modules/scheduler/heartbeat.js +++ b/server/modules/scheduler/heartbeat.js @@ -3,15 +3,18 @@ * * The brain can write tasks to HEARTBEAT.md; the heartbeat loop picks * them up and executes them autonomously between conversations. + * + * Every heartbeat cycle is fully traced: the entire reasoning chain + * (text output, tool calls, tool results) is logged to traces/ so + * you can reconstruct exactly what the agent did and why. */ import fs from 'fs/promises'; import path from 'path'; +import logger from '../logger.js'; const HEARTBEAT_FILE = path.join(process.env.WORKSPACE || '/workspace-local', 'HEARTBEAT.md'); -const CHANGES_FILE = path.join(process.env.WORKSPACE || '/workspace-local', '.state', 'changes.json'); - const SYSTEM_PROMPT = `You are Eigenself running an autonomous heartbeat cycle. You have access to your own journal (journal_plan, journal_read) and memory tools. @@ -54,6 +57,7 @@ async function readTasks() { /** * Run a single heartbeat cycle. + * Captures the full reasoning trace for debugging and review. * Returns true if any work was done. */ export async function runHeartbeat({ getTools, executeTool }) { @@ -61,6 +65,7 @@ export async function runHeartbeat({ getTools, executeTool }) { if (tasks.length === 0) return false; console.log(`[heartbeat] ${tasks.length} task(s) pending`); + logger.action({ component: 'heartbeat', action: 'cycle-start', details: { task_count: tasks.length, tasks } }); // Import brain fresh each cycle (hot-reloadable) const brainPath = `file://${path.join(process.env.WORKSPACE || '/workspace-local', 'server/modules/brain/index.js')}?t=${Date.now()}`; @@ -69,6 +74,10 @@ export async function runHeartbeat({ getTools, executeTool }) { const taskList = tasks.map((t, i) => `${i + 1}. ${t}`).join('\n'); const message = `Active tasks:\n${taskList}`; + // Collect all events for the full trace + const traceEvents = []; + const cycleStart = Date.now(); + try { for await (const event of brain.process(message, { history: [], @@ -76,14 +85,39 @@ export async function runHeartbeat({ getTools, executeTool }) { executeTool, systemPrompt: SYSTEM_PROMPT })) { + traceEvents.push(event); + if (event.type === 'tool_call') { console.log(`[heartbeat] Tool: ${event.name}`); } } - console.log('[heartbeat] Cycle complete'); + + const duration = Date.now() - cycleStart; + console.log(`[heartbeat] Cycle complete (${duration}ms, ${traceEvents.length} events)`); + + // Log the full reasoning trace + logger.trace({ + component: 'heartbeat', + trigger: taskList, + events: traceEvents, + duration_ms: duration + }); + + logger.action({ component: 'heartbeat', action: 'cycle-complete', details: { duration_ms: duration, events: traceEvents.length } }); return true; } catch (err) { + const duration = Date.now() - cycleStart; console.error('[heartbeat] Error:', err.message); + + // Log trace even on failure — partial trace is still useful for debugging + logger.trace({ + component: 'heartbeat', + trigger: taskList, + events: [...traceEvents, { type: 'error', error: err.message }], + duration_ms: duration + }); + + logger.error({ component: 'heartbeat', error: err, context: `tasks: ${taskList.slice(0, 200)}` }); return false; } } diff --git a/server/modules/scheduler/index.js b/server/modules/scheduler/index.js index 4ac9956..d224529 100644 --- a/server/modules/scheduler/index.js +++ b/server/modules/scheduler/index.js @@ -11,6 +11,7 @@ import path from 'path'; import { runHeartbeat } from './heartbeat.js'; import { getDueJobs } from './cron.js'; +import logger from '../logger.js'; const HEARTBEAT_INTERVAL_MS = parseInt(process.env.HEARTBEAT_INTERVAL_MS || '300000'); // 5 min default @@ -82,13 +83,36 @@ async function runCronJob(job, { getTools, executeTool }) { const brainPath = `file://${path.join(process.env.WORKSPACE || '/workspace-local', 'server/modules/brain/index.js')}?t=${Date.now()}`; const brain = await import(brainPath); - for await (const event of brain.process(job.task, { - history: [], - getTools, - executeTool - })) { - if (event.type === 'tool_call') { - console.log(`[scheduler] [${job.id}] Tool: ${event.name}`); + const traceEvents = []; + const jobStart = Date.now(); + + logger.action({ component: 'cron', action: 'job-start', details: { id: job.id, task: job.task } }); + + try { + for await (const event of brain.process(job.task, { + history: [], + getTools, + executeTool + })) { + traceEvents.push(event); + if (event.type === 'tool_call') { + console.log(`[scheduler] [${job.id}] Tool: ${event.name}`); + } } + + logger.trace({ + component: 'cron', + trigger: `${job.id}: ${job.task}`, + events: traceEvents, + duration_ms: Date.now() - jobStart + }); + } catch (err) { + logger.trace({ + component: 'cron', + trigger: `${job.id}: ${job.task}`, + events: [...traceEvents, { type: 'error', error: err.message }], + duration_ms: Date.now() - jobStart + }); + throw err; } } diff --git a/server/modules/tools/logs.js b/server/modules/tools/logs.js index 75bc9a9..e26e8ee 100644 --- a/server/modules/tools/logs.js +++ b/server/modules/tools/logs.js @@ -26,14 +26,13 @@ export const tools = [ { name: 'log_search', - description: 'Search recent log entries by category. Categories: api, tools, errors, actions, messages.', + description: 'Search recent log entries by category. Categories: api, tools, tools-{name} (per-tool), errors, actions, messages, traces. For per-tool logs, use "tools-execute_shell", "tools-read_file", etc.', parameters: { type: 'object', properties: { category: { type: 'string', - description: 'Log category: api, tools, errors, actions, messages', - enum: ['api', 'tools', 'errors', 'actions', 'messages'] + description: 'Log category: api, tools, tools-{tool_name}, errors, actions, messages, traces' }, date: { type: 'string', description: 'Date in YYYY-MM-DD format (default: today)' }, limit: { type: 'number', description: 'Max entries to return (default 20)' } diff --git a/server/tests/logger.test.js b/server/tests/logger.test.js index 3a4c606..e8505b8 100644 --- a/server/tests/logger.test.js +++ b/server/tests/logger.test.js @@ -75,6 +75,57 @@ describe('Logger', () => { expect(entry.args.length).toBeLessThanOrEqual(500); expect(entry.result.length).toBeLessThanOrEqual(500); }); + + it('writes per-tool log file', async () => { + logger.tool({ name: 'execute_shell', args: { command: 'ls' }, result: 'ok' }); + logger.tool({ name: 'read_file', args: { path: 'test.txt' }, result: 'content' }); + logger.tool({ name: 'execute_shell', args: { command: 'pwd' }, result: '/tmp' }); + + const shellEntries = await logger.readLog('tools-execute_shell'); + const fileEntries = await logger.readLog('tools-read_file'); + + expect(shellEntries.length).toBe(2); + expect(fileEntries.length).toBe(1); + expect(shellEntries[0].name).toBe('execute_shell'); + }); + }); + + describe('trace logging', () => { + it('logs a full reasoning trace', () => { + const entry = logger.trace({ + component: 'heartbeat', + trigger: '1. Check system status', + events: [ + { type: 'text', text: 'Let me check the system...' }, + { type: 'tool_call', name: 'execute_shell', args: { command: 'uptime' } }, + { type: 'tool_result', name: 'execute_shell', result: 'up 5 days' }, + { type: 'text', text: 'System is running fine.' }, + { type: 'done' } + ], + duration_ms: 3500 + }); + + expect(entry.component).toBe('heartbeat'); + expect(entry.event_count).toBe(5); + expect(entry.events.length).toBe(5); + expect(entry.events[0].type).toBe('text'); + expect(entry.events[1].type).toBe('tool_call'); + expect(entry.events[1].name).toBe('execute_shell'); + }); + + it('can be read back', async () => { + logger.trace({ + component: 'cron', + trigger: 'daily report', + events: [{ type: 'text', text: 'Running report...' }], + duration_ms: 1000 + }); + + const entries = await logger.readLog('traces'); + expect(entries.length).toBe(1); + expect(entries[0].component).toBe('cron'); + expect(entries[0].trigger).toBe('daily report'); + }); }); describe('error logging', () => { From 0e5d7c51f0973406a29332b104780370eed9dbdc Mon Sep 17 00:00:00 2001 From: martinjms Date: Sun, 12 Apr 2026 01:14:51 +0300 Subject: [PATCH 3/3] Add trace_id for cost-per-task tracking across all components MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Every execution context (heartbeat cycle, cron job, conversation turn) gets a unique trace_id that flows through brain → API calls → tool calls. This enables: - log_cost_per_task: "heartbeat hb_1712880000_a cost $0.0234 (3 API calls)" - Correlate all API calls and tool executions to the task that triggered them - See exactly which heartbeat cycle was expensive and why Also adds costByTrace() to the logger for aggregating costs by trace. Co-Authored-By: Claude Opus 4.6 (1M context) --- server/modules/brain/index.js | 7 ++- server/modules/logger.js | 69 ++++++++++++++++++++++++--- server/modules/scheduler/heartbeat.js | 10 ++-- server/modules/scheduler/index.js | 8 +++- server/modules/tools/logs.js | 17 +++++++ 5 files changed, 99 insertions(+), 12 deletions(-) diff --git a/server/modules/brain/index.js b/server/modules/brain/index.js index 8fad5e5..90acd56 100644 --- a/server/modules/brain/index.js +++ b/server/modules/brain/index.js @@ -85,8 +85,11 @@ export async function* process(message, { history = [], getTools, executeTool, - systemPrompt = SYSTEM_PROMPTS.chat + systemPrompt = SYSTEM_PROMPTS.chat, + trace_id }) { + // Generate trace_id if not provided (webchat/telegram generate their own) + const traceId = trace_id || logger.generateTraceId('brain'); const tools = toAnthropicTools(getTools()); const messages = [...history, { role: 'user', content: message }]; @@ -116,6 +119,7 @@ export async function* process(message, { const usage = response.usage || {}; const advisorIter = usage.iterations?.find(i => i.type === 'advisor_message'); logger.api({ + trace_id: traceId, component: 'brain', model: 'claude-sonnet-4-6', input_tokens: usage.input_tokens || 0, @@ -150,6 +154,7 @@ export async function* process(message, { } logger.tool({ + trace_id: traceId, component: 'brain', name: block.name, args: block.input, diff --git a/server/modules/logger.js b/server/modules/logger.js index e263e4c..402d5a2 100644 --- a/server/modules/logger.js +++ b/server/modules/logger.js @@ -63,11 +63,22 @@ function estimateCost(model, inputTokens = 0, outputTokens = 0, cacheReadTokens ); } +/** + * Generate a unique trace ID for correlating logs within one execution context + * (e.g., one heartbeat cycle, one cron job, one user conversation turn). + */ +let traceCounter = 0; +function generateTraceId(prefix = 'tr') { + return `${prefix}_${Date.now()}_${(traceCounter++).toString(36)}`; +} + const logger = { + generateTraceId, + /** * Log an API call to Anthropic (brain, worker, unconscious processes) */ - api({ component = 'brain', model, input_tokens = 0, output_tokens = 0, + api({ trace_id, component = 'brain', model, input_tokens = 0, output_tokens = 0, cache_read_tokens = 0, cache_creation_tokens = 0, advisor_input_tokens = 0, advisor_output_tokens = 0, duration_ms, stop_reason, error }) { @@ -76,7 +87,7 @@ const logger = { ? estimateCost('opus', advisor_input_tokens, advisor_output_tokens) : 0; return write('api', { - component, model, input_tokens, output_tokens, + trace_id, component, model, input_tokens, output_tokens, cache_read_tokens, cache_creation_tokens, advisor_input_tokens, advisor_output_tokens, cost_usd: cost, advisor_cost_usd: advisor_cost, @@ -89,9 +100,9 @@ const logger = { * Log a tool execution — writes to both the aggregate tools log * and a per-tool log (tools-{name}-YYYY-MM-DD.jsonl) */ - tool({ component = 'brain', name, args, result, duration_ms, error }) { + tool({ trace_id, component = 'brain', name, args, result, duration_ms, error }) { const entry = { - component, name, + trace_id, component, name, args: typeof args === 'string' ? args.slice(0, 500) : JSON.stringify(args || {}).slice(0, 500), result: typeof result === 'string' ? result.slice(0, 500) : JSON.stringify(result || {}).slice(0, 500), duration_ms, error @@ -134,9 +145,9 @@ const logger = { * Log a full reasoning trace (heartbeat, cron, or any autonomous execution). * Captures the entire brain conversation: text, tool calls, tool results. */ - trace({ component = 'heartbeat', trigger, events, duration_ms, total_tokens, total_cost_usd }) { + trace({ trace_id, component = 'heartbeat', trigger, events, duration_ms, total_tokens, total_cost_usd }) { return write('traces', { - component, trigger, + trace_id, component, trigger, events: (events || []).map(e => { if (e.type === 'text') return { type: 'text', text: e.text?.slice(0, 2000) }; if (e.type === 'tool_call') return { type: 'tool_call', name: e.name, args: JSON.stringify(e.args || {}).slice(0, 500) }; @@ -213,6 +224,52 @@ const logger = { }; }, + /** + * Get cost breakdown by trace_id — shows cost per heartbeat cycle, cron job, or conversation turn + */ + async costByTrace(date) { + const apiEntries = await logger.readLog('api', date, 10000); + const traces = {}; + + for (const entry of apiEntries) { + const id = entry.trace_id || 'untraced'; + if (!traces[id]) { + traces[id] = { + trace_id: id, + component: entry.component, + api_calls: 0, + input_tokens: 0, + output_tokens: 0, + advisor_tokens: 0, + cost_usd: 0, + first_seen: entry.timestamp, + last_seen: entry.timestamp + }; + } + const t = traces[id]; + t.api_calls++; + t.input_tokens += entry.input_tokens || 0; + t.output_tokens += entry.output_tokens || 0; + t.advisor_tokens += (entry.advisor_input_tokens || 0) + (entry.advisor_output_tokens || 0); + t.cost_usd += entry.total_cost_usd || 0; + t.last_seen = entry.timestamp; + } + + const sorted = Object.values(traces).sort((a, b) => + new Date(b.first_seen) - new Date(a.first_seen) + ); + + return { + date: date || new Date().toISOString().slice(0, 10), + trace_count: sorted.length, + total_cost_usd: Math.round(sorted.reduce((sum, t) => sum + t.cost_usd, 0) * 10000) / 10000, + traces: sorted.map(t => ({ + ...t, + cost_usd: Math.round(t.cost_usd * 10000) / 10000 + })) + }; + }, + /** * List available log dates */ diff --git a/server/modules/scheduler/heartbeat.js b/server/modules/scheduler/heartbeat.js index e94bd81..87a7c98 100644 --- a/server/modules/scheduler/heartbeat.js +++ b/server/modules/scheduler/heartbeat.js @@ -65,7 +65,8 @@ export async function runHeartbeat({ getTools, executeTool }) { if (tasks.length === 0) return false; console.log(`[heartbeat] ${tasks.length} task(s) pending`); - logger.action({ component: 'heartbeat', action: 'cycle-start', details: { task_count: tasks.length, tasks } }); + const traceId = logger.generateTraceId('hb'); + logger.action({ component: 'heartbeat', action: 'cycle-start', details: { trace_id: traceId, task_count: tasks.length, tasks } }); // Import brain fresh each cycle (hot-reloadable) const brainPath = `file://${path.join(process.env.WORKSPACE || '/workspace-local', 'server/modules/brain/index.js')}?t=${Date.now()}`; @@ -83,7 +84,8 @@ export async function runHeartbeat({ getTools, executeTool }) { history: [], getTools, executeTool, - systemPrompt: SYSTEM_PROMPT + systemPrompt: SYSTEM_PROMPT, + trace_id: traceId })) { traceEvents.push(event); @@ -97,13 +99,14 @@ export async function runHeartbeat({ getTools, executeTool }) { // Log the full reasoning trace logger.trace({ + trace_id: traceId, component: 'heartbeat', trigger: taskList, events: traceEvents, duration_ms: duration }); - logger.action({ component: 'heartbeat', action: 'cycle-complete', details: { duration_ms: duration, events: traceEvents.length } }); + logger.action({ component: 'heartbeat', action: 'cycle-complete', details: { trace_id: traceId, duration_ms: duration, events: traceEvents.length } }); return true; } catch (err) { const duration = Date.now() - cycleStart; @@ -111,6 +114,7 @@ export async function runHeartbeat({ getTools, executeTool }) { // Log trace even on failure — partial trace is still useful for debugging logger.trace({ + trace_id: traceId, component: 'heartbeat', trigger: taskList, events: [...traceEvents, { type: 'error', error: err.message }], diff --git a/server/modules/scheduler/index.js b/server/modules/scheduler/index.js index d224529..8286f64 100644 --- a/server/modules/scheduler/index.js +++ b/server/modules/scheduler/index.js @@ -83,16 +83,18 @@ async function runCronJob(job, { getTools, executeTool }) { const brainPath = `file://${path.join(process.env.WORKSPACE || '/workspace-local', 'server/modules/brain/index.js')}?t=${Date.now()}`; const brain = await import(brainPath); + const traceId = logger.generateTraceId('cron'); const traceEvents = []; const jobStart = Date.now(); - logger.action({ component: 'cron', action: 'job-start', details: { id: job.id, task: job.task } }); + logger.action({ component: 'cron', action: 'job-start', details: { trace_id: traceId, id: job.id, task: job.task } }); try { for await (const event of brain.process(job.task, { history: [], getTools, - executeTool + executeTool, + trace_id: traceId })) { traceEvents.push(event); if (event.type === 'tool_call') { @@ -101,6 +103,7 @@ async function runCronJob(job, { getTools, executeTool }) { } logger.trace({ + trace_id: traceId, component: 'cron', trigger: `${job.id}: ${job.task}`, events: traceEvents, @@ -108,6 +111,7 @@ async function runCronJob(job, { getTools, executeTool }) { }); } catch (err) { logger.trace({ + trace_id: traceId, component: 'cron', trigger: `${job.id}: ${job.task}`, events: [...traceEvents, { type: 'error', error: err.message }], diff --git a/server/modules/tools/logs.js b/server/modules/tools/logs.js index e26e8ee..c78f0d4 100644 --- a/server/modules/tools/logs.js +++ b/server/modules/tools/logs.js @@ -69,5 +69,22 @@ export const tools = [ errors }; } + }, + + { + name: 'log_cost_per_task', + description: 'Show cost breakdown per task/heartbeat/cron execution. Each execution has a trace_id that correlates all its API calls and tool uses. Shows exactly how much each heartbeat cycle or conversation turn cost.', + parameters: { + type: 'object', + properties: { + date: { type: 'string', description: 'Date in YYYY-MM-DD format (default: today)' }, + limit: { type: 'number', description: 'Max traces to return (default 20)' } + } + }, + async execute({ date, limit = 20 } = {}) { + const result = await logger.costByTrace(date); + result.traces = result.traces.slice(0, limit); + return result; + } } ];