feat(usage): async buffered token-usage recording#467
Conversation
Decouple model invocations from the synchronous chargeback write path
with a producer-consumer buffer modeled on CoPaw PR #3766. Producers
call enqueueTokenUsage from the request hot-path; a periodic flush
drains the queue, batches inserts into one SQLite transaction, and
emits a usage.batch_flushed event onto each affected session's
hash-chain so the chargeback feed remains tamper-evident.
- New src/usage/token-usage-buffer.ts: bounded queue, periodic +
opportunistic flush, drain-on-shutdown, in-flight serialization.
- New recordUsageEventBatch in src/memory/db.ts: single-transaction
bulk insert into usage_events.
- Lifecycle wired into gateway.ts (start beside observability ingest;
awaited stop in shutdown handler).
- 4 sync recordUsageEvent call sites switched to enqueueTokenUsage,
threading auditRunId so the batch event chains under the same run
as the per-call model.usage event:
gateway-chat-service.ts, gateway-service.ts (delegate +
bootstrap-autostart paths), scheduled-task-runner.ts.
- New tests/token-usage-buffer.test.ts (6 tests): drop-on-invalid,
batched flush + audit emission, opportunistic flush, queue
overflow, drain-on-stop, idempotent start.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
Pull request overview
This PR introduces an asynchronous, bounded producer-consumer buffer for recording model token usage, decoupling request hot paths from synchronous SQLite chargeback writes while emitting tamper-evident batch audit events.
Changes:
- Added a new token-usage buffer with periodic/opportunistic flushing, bounded queueing, and drain-on-shutdown behavior.
- Added
recordUsageEventBatch()to bulk-insertusage_eventsrows in a single SQLite transaction. - Switched several synchronous
recordUsageEventcall sites toenqueueTokenUsage, and wired buffer lifecycle into gateway startup/shutdown with new unit tests.
Reviewed changes
Copilot reviewed 7 out of 7 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
src/usage/token-usage-buffer.ts |
New async queue + flush pipeline; emits usage.batch_flushed audit events. |
src/memory/db.ts |
Adds recordUsageEventBatch() bulk insert helper for chargeback table. |
src/gateway/gateway.ts |
Starts the buffer on startup and drains it during shutdown. |
src/gateway/gateway-chat-service.ts |
Replaces per-call synchronous usage writes with enqueueing (threads auditRunId). |
src/gateway/gateway-service.ts |
Replaces usage writes with enqueueing in delegation + bootstrap autostart paths. |
src/scheduler/scheduled-task-runner.ts |
Enqueues usage instead of writing synchronously (threads auditRunId). |
tests/token-usage-buffer.test.ts |
Adds unit tests for buffering/flush/backpressure/drain/idempotent start. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| import { afterEach, beforeEach, describe, expect, test } from 'vitest'; | ||
|
|
||
| const ORIGINAL_HOME = process.env.HOME; | ||
|
|
||
| function makeTempHome(): string { | ||
| return fs.mkdtempSync(path.join(os.tmpdir(), 'hybridclaw-usage-buffer-')); | ||
| } | ||
|
|
||
| function createTempDbPath(): string { | ||
| const dir = fs.mkdtempSync(path.join(os.tmpdir(), 'hybridclaw-usage-buf-')); | ||
| return path.join(dir, 'usage.db'); | ||
| } | ||
|
|
||
| afterEach(() => { | ||
| if (ORIGINAL_HOME === undefined) delete process.env.HOME; | ||
| else process.env.HOME = ORIGINAL_HOME; | ||
| }); | ||
|
|
||
| describe.sequential('token usage buffer', () => { | ||
| beforeEach(async () => { | ||
| // Each test re-imports the buffer to get a clean module state. | ||
| }); | ||
|
|
||
| test('enqueueTokenUsage drops events with missing session/agent', async () => { | ||
| const dbPath = createTempDbPath(); | ||
| const { initDatabase } = await import('../src/memory/db.ts'); | ||
| initDatabase({ quiet: true, dbPath }); | ||
| const { | ||
| _resetTokenUsageBufferForTests, | ||
| enqueueTokenUsage, | ||
| getTokenUsageBufferStats, | ||
| } = await import('../src/usage/token-usage-buffer.ts'); |
There was a problem hiding this comment.
The tests change process.env.HOME to isolate audit output, but the buffer/audit modules are imported in the first test before HOME is set and modules are never reset. Because ESM imports are cached, later tests that set HOME may still use a DATA_DIR computed from the original HOME, causing audit files to be written to the real home directory. Align with existing audit tests by calling vi.resetModules() in beforeEach/afterEach (and setting HOME before imports), so each test gets a clean module load with the intended HOME.
| const models = Array.from(new Set(events.map((e) => e.model))).sort(); | ||
| const agents = Array.from(new Set(events.map((e) => e.agentId))).sort(); | ||
| const runId = events[0]?.auditRunId || makeAuditRunId('usage-batch'); | ||
| const parentRunId = events[0]?.auditParentRunId; | ||
|
|
There was a problem hiding this comment.
emitBatchAuditEvents() groups only by sessionId but then chooses runId/parentRunId from events[0]. If a session’s batch contains usage events from multiple audit runs (common in chat where each turn has its own runId), the usage.batch_flushed event will be chained under an unrelated run and lose correct parentage for the other events. Consider grouping by (sessionId, auditRunId, auditParentRunId) (or emitting one batch audit event per distinct run) so the batch attestation is threaded under the correct run(s).
| const inputTokens = sumInt(events, (e) => e.inputTokens); | ||
| const outputTokens = sumInt(events, (e) => e.outputTokens); | ||
| const declaredTotal = sumInt(events, (e) => e.totalTokens ?? 0); | ||
| const totalTokens = declaredTotal || inputTokens + outputTokens; | ||
| const toolCalls = sumInt(events, (e) => e.toolCalls ?? 0); | ||
| const costUsd = events.reduce((acc, e) => acc + (e.costUsd ?? 0), 0); |
There was a problem hiding this comment.
totalTokens is computed as declaredTotal || inputTokens + outputTokens, where declaredTotal is the sum of e.totalTokens ?? 0. If only some events provide totalTokens and others omit it, declaredTotal becomes non-zero and the fallback is skipped, undercounting the batch. Safer approach: compute per-event total as normalize(totalTokens ?? input+output) and then sum across events so mixed presence doesn’t skew totals.
| function computeBatchHash(events: TokenUsageEvent[]): string { | ||
| // Deterministic, order-sensitive hash of the batch payload — enough to | ||
| // detect tampering between the buffer flush and the SQLite write. | ||
| const lines = events.map((e) => | ||
| [ | ||
| e.sessionId, | ||
| e.agentId, | ||
| e.model, | ||
| e.inputTokens ?? 0, | ||
| e.outputTokens ?? 0, | ||
| e.totalTokens ?? 0, | ||
| e.toolCalls ?? 0, | ||
| e.costUsd ?? 0, | ||
| e.timestamp ?? '', | ||
| ].join('|'), | ||
| ); |
There was a problem hiding this comment.
computeBatchHash() hashes raw event fields, but recordUsageEventBatch() normalizes values before writing (floors token counts, coerces empty model to 'unknown', may synthesize/trim timestamps, clamps cost). That means batchHash may not correspond to the canonical payload actually persisted in usage_events, which weakens the “tamper-evident” claim and can create false mismatches. Consider hashing the same normalized representation that is inserted into SQLite (either normalize in the buffer before hashing, or have recordUsageEventBatch() return the normalized rows used for insertion and hash those).
| function computeBatchHash(events: TokenUsageEvent[]): string { | |
| // Deterministic, order-sensitive hash of the batch payload — enough to | |
| // detect tampering between the buffer flush and the SQLite write. | |
| const lines = events.map((e) => | |
| [ | |
| e.sessionId, | |
| e.agentId, | |
| e.model, | |
| e.inputTokens ?? 0, | |
| e.outputTokens ?? 0, | |
| e.totalTokens ?? 0, | |
| e.toolCalls ?? 0, | |
| e.costUsd ?? 0, | |
| e.timestamp ?? '', | |
| ].join('|'), | |
| ); | |
| function normalizeUsageTimestamp(timestamp?: string): string { | |
| if (typeof timestamp !== 'string') { | |
| return new Date().toISOString(); | |
| } | |
| const trimmed = timestamp.trim(); | |
| if (!trimmed) { | |
| return new Date().toISOString(); | |
| } | |
| const parsed = new Date(trimmed); | |
| return Number.isNaN(parsed.getTime()) ? trimmed : parsed.toISOString(); | |
| } | |
| function normalizeUsageEventForHash(event: TokenUsageEvent) { | |
| const inputTokens = | |
| typeof event.inputTokens === 'number' && | |
| Number.isFinite(event.inputTokens) && | |
| event.inputTokens > 0 | |
| ? Math.floor(event.inputTokens) | |
| : 0; | |
| const outputTokens = | |
| typeof event.outputTokens === 'number' && | |
| Number.isFinite(event.outputTokens) && | |
| event.outputTokens > 0 | |
| ? Math.floor(event.outputTokens) | |
| : 0; | |
| const totalTokens = | |
| typeof event.totalTokens === 'number' && | |
| Number.isFinite(event.totalTokens) && | |
| event.totalTokens > 0 | |
| ? Math.floor(event.totalTokens) | |
| : 0; | |
| const toolCalls = | |
| typeof event.toolCalls === 'number' && | |
| Number.isFinite(event.toolCalls) && | |
| event.toolCalls > 0 | |
| ? Math.floor(event.toolCalls) | |
| : 0; | |
| const costUsd = | |
| typeof event.costUsd === 'number' && Number.isFinite(event.costUsd) | |
| ? Math.max(0, event.costUsd) | |
| : 0; | |
| return { | |
| sessionId: event.sessionId, | |
| agentId: event.agentId, | |
| model: | |
| typeof event.model === 'string' && event.model.trim() | |
| ? event.model.trim() | |
| : 'unknown', | |
| inputTokens, | |
| outputTokens, | |
| totalTokens, | |
| toolCalls, | |
| costUsd, | |
| timestamp: normalizeUsageTimestamp(event.timestamp), | |
| }; | |
| } | |
| function computeBatchHash(events: TokenUsageEvent[]): string { | |
| // Deterministic, order-sensitive hash of the canonical payload written to | |
| // SQLite so the batch hash matches the persisted representation. | |
| const lines = events.map((event) => { | |
| const normalized = normalizeUsageEventForHash(event); | |
| return [ | |
| normalized.sessionId, | |
| normalized.agentId, | |
| normalized.model, | |
| normalized.inputTokens, | |
| normalized.outputTokens, | |
| normalized.totalTokens, | |
| normalized.toolCalls, | |
| normalized.costUsd, | |
| normalized.timestamp, | |
| ].join('|'); | |
| }); |
Summary
enqueueTokenUsageis non-blocking and bounded; a periodic flush drains the queue into one SQLite transaction.usage.batch_flushedevent per affected session witheventCount, summed tokens/cost, sortedmodels[]/agents[], and a SHA-256batchHashover the canonical payload — so the chargeback feed is tamper-evident batch-by-batch on top of the per-callmodel.usageevents.usage_eventsrows the aggregators already consume, just batched.Changes
src/usage/token-usage-buffer.ts: bounded queue, periodic + opportunistic flush, drain-on-shutdown, in-flight serialization, drop counters for observable backpressure.recordUsageEventBatchinsrc/memory/db.ts: single-transaction bulk insert with the same row normalization asrecordUsageEvent.src/gateway/gateway.ts—startTokenUsageBuffer()next tostartObservabilityIngest(); awaitedstopTokenUsageBuffer()in the shutdown handler.recordUsageEventcall sites switched toenqueueTokenUsage, threadingauditRunIdso the batch event chains under the same run as the per-callmodel.usageevent:src/gateway/gateway-chat-service.tssrc/gateway/gateway-service.ts(delegate + bootstrap-autostart paths)src/scheduler/scheduled-task-runner.tsTest plan
tests/token-usage-buffer.test.ts— 6 tests, all passing: drop-on-invalid, batched flush + audit emission, opportunistic flush, queue overflow, drain-on-stop, idempotent start.tsc --noEmitclean.tsc --noEmit --noUnusedLocals --noUnusedParameters(lint) clean.biome checkclean.audit-events,memory-service,gateway-history-summary,session-trace-export,agent-registry— 70 tests) all pass.admin-terminal,eval-command,host-runner,plugin-manager) and reproduce identically on the unmodified base — pre-existing flakes, not regressions.🤖 Generated with Claude Code