From c39da5b3520f65fc15a2a84d65c9b63cc50e00ae Mon Sep 17 00:00:00 2001 From: Thomas Date: Tue, 5 May 2026 17:26:30 +0800 Subject: [PATCH] feat(policy): decideForAgent reads cloud cache before local store MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Bridges thomas-cloud → local thomas's policy decision pipeline. The proxy's existing decide() flow stays unchanged structurally — we just resolve the PolicyConfig from a wider source. Resolution order on every request: 1. ~/.thomas/cloud-cache.json (set by `thomas cloud sync`) 2. ~/.thomas/policies.json (set by `thomas policy set`) 3. fallbackTarget (the route) So a centrally-managed thomas-cloud policy automatically takes effect once the user logs in and syncs; offline / not-logged-in users keep getting their local policies — zero behavior change for the existing path. Implementation: src/cloud/policy-bridge.ts (new) loadCloudPolicyForAgent(agentId): reads ~/.thomas/cloud-cache.json, finds the matching agent_binding, translates it to a PolicyConfig the rest of decide() understands. Three binding kinds: - static → synthetic policy: primary=staticTarget, no cascade - policy → translate the cloud PolicySpec wire shape (camelCase) to local PolicyConfig (legacy field names) - bundle → v1 stub: use the highest-priority leg as primary, no cascade. Real bundle balancer (per-leg cap accounting + drain order) lands in a follow-up PR. Disabled policies / missing-target bundles → undefined, fall through. src/policy/decide.ts decideForAgent: cloud → local → fallback, first hit wins. Cloud policies are loaded via the bridge above; local via the existing store. src/policy/types.ts PolicyDecision gains `policy` (the resolved PolicyConfig, for failoverTo consumers that previously re-fetched) and `source: "cloud" | "local" | "none"` for telemetry. src/proxy/server.ts Drops the second getPolicy() call; reads from decision.policy directly. Tests: - tests/cloud-decide.test.ts: 7 cases covering the three binding kinds + each fallback path (no binding for this agent / no cache / disabled policy / missing bundle) + cloud-overrides-local on conflict. - 263/263 pytests / vitest pass; build 186 KB. Co-Authored-By: Claude Opus 4.7 (1M context) --- SKILL.md | 10 +- src/cloud/policy-bridge.ts | 139 +++++++++++++++++++++ src/policy/decide.ts | 41 ++++-- src/policy/types.ts | 12 ++ src/proxy/server.ts | 8 +- tests/cloud-decide.test.ts | 248 +++++++++++++++++++++++++++++++++++++ 6 files changed, 445 insertions(+), 13 deletions(-) create mode 100644 src/cloud/policy-bridge.ts create mode 100644 tests/cloud-decide.test.ts diff --git a/SKILL.md b/SKILL.md index d508e75..945cfc8 100644 --- a/SKILL.md +++ b/SKILL.md @@ -257,7 +257,15 @@ Returns `{ loggedIn, baseUrl, workspaceId, deviceId, loggedInAt, lastSyncAt }`. thomas cloud sync --json ``` -Returns `{ schemaVersion, policiesCount, bundlesCount, bindingsCount, providersCount, redactRulesVersion, syncedAt }`. Hits the cloud, writes the snapshot to `~/.thomas/cloud-cache.json` (the local proxy reads from this cache for policy decisions). When v1 ships, the snapshot is empty by design — this just exercises the wiring. +Returns `{ schemaVersion, policiesCount, bundlesCount, bindingsCount, providersCount, redactRulesVersion, syncedAt }`. Hits the cloud, writes the snapshot to `~/.thomas/cloud-cache.json`. + +The local proxy's policy decision pipeline reads from this cache **before** the local `~/.thomas/policies.json` store. Resolution order on each request: + +1. cloud cache (if logged in to thomas-cloud and the workspace has a binding for this agent) +2. local store (`thomas policy set` results) +3. route fallback (`thomas route ...`) + +So a centrally-managed policy on thomas-cloud automatically takes effect once the user logs in + syncs; offline / pre-login users keep getting their local policies unchanged. ### "Sign out of cloud" diff --git a/src/cloud/policy-bridge.ts b/src/cloud/policy-bridge.ts new file mode 100644 index 0000000..1f7811e --- /dev/null +++ b/src/cloud/policy-bridge.ts @@ -0,0 +1,139 @@ +// Translate the cloud snapshot's per-agent binding into a local PolicyConfig +// the proxy's existing decide() pipeline understands. +// +// The wire shape from /v1/sync mirrors apps/api/app/schemas/{policy,bundle,binding}.py +// — same fields, same camelCase. We do runtime parsing here (no codegen) and +// synthesize a "cost-cascade" PolicyConfig regardless of the cloud binding's +// kind, so the rest of thomas keeps a single decision pipeline: +// +// binding kind=static → synthetic policy: primary=staticTarget, no cascade. +// The cascade decision becomes a no-op; primary wins. +// binding kind=policy → look up the policy in the snapshot, translate the +// field names (providerId/triggerSpendDayUsd → provider/ +// triggerSpendDay) and feed it through unchanged. +// binding kind=bundle → v1 stub: use the highest-priority leg as primary, +// no cascade. The real bundle balancer (per-leg cap +// accounting + drain order) lands in a follow-up PR. +// +// "No binding for this agent" returns null; callers fall back to the local +// ~/.thomas/policies.json store. Same for "no cloud login" / "stale cache". + +import type { AgentId } from "../agents/types.js"; +import type { PolicyConfig } from "../policy/types.js"; +import { readCache } from "./cache.js"; + +type WireModelRef = { providerId: string; model: string }; + +type WireCascadeStep = { + triggerSpendDayUsd: number; + fallback: WireModelRef; +}; + +type WirePolicySpec = { + schemaVersion: number; + primary: WireModelRef; + cascade?: WireCascadeStep[]; + failoverTo?: WireModelRef | null; +}; + +type WirePolicy = { + id: string; + name: string; + spec: WirePolicySpec; + enabled: boolean; +}; + +type WireBundleLeg = { + providerId: string; + model: string; + capUsdPerDay?: number | null; + capCallsPerDay?: number | null; + priority: number; +}; + +type WireBundle = { + id: string; + name: string; + spec: { schemaVersion: number; legs: WireBundleLeg[] }; + enabled: boolean; +}; + +type WireBinding = { + agentId: string; + bindingKind: "policy" | "bundle" | "static"; + targetId?: string | null; + staticTarget?: WireModelRef | null; +}; + +/** + * Look up a policy config for `agentId` in the cloud cache. Returns undefined + * when there's no cache, no binding for this agent, or the binding refers to + * a disabled / missing policy. Caller should fall through to the local + * `~/.thomas/policies.json` store on undefined. + */ +export async function loadCloudPolicyForAgent( + agentId: AgentId, +): Promise { + const snapshot = await readCache(); + // Empty defaults — when the user hasn't logged in to cloud, the cache file + // returns the EMPTY constant from cache.ts and these are all empty arrays. + const bindings = snapshot.bindings as WireBinding[]; + const binding = bindings.find((b) => b.agentId === agentId); + if (!binding) return undefined; + + if (binding.bindingKind === "static") { + if (!binding.staticTarget) return undefined; + return staticAsPolicy(binding.staticTarget); + } + if (binding.bindingKind === "policy") { + const policies = snapshot.policies as WirePolicy[]; + const policy = policies.find((p) => p.id === binding.targetId && p.enabled); + if (!policy) return undefined; + return wireToPolicyConfig(policy.spec); + } + if (binding.bindingKind === "bundle") { + const bundles = snapshot.bundles as WireBundle[]; + const bundle = bundles.find((b) => b.id === binding.targetId && b.enabled); + if (!bundle || bundle.spec.legs.length === 0) return undefined; + return bundleAsPolicy(bundle); + } + return undefined; +} + +/** Convert a static binding to a no-cascade PolicyConfig. */ +function staticAsPolicy(target: WireModelRef): PolicyConfig { + return { + id: "cost-cascade", + primary: { provider: target.providerId, model: target.model }, + cascade: [], + }; +} + +/** Convert a cloud PolicySpec (camelCase wire) to local PolicyConfig (legacy + * field names). Cascade is already sorted ascending on the cloud side. */ +function wireToPolicyConfig(spec: WirePolicySpec): PolicyConfig { + return { + id: "cost-cascade", + primary: { provider: spec.primary.providerId, model: spec.primary.model }, + cascade: (spec.cascade ?? []).map((step) => ({ + triggerSpendDay: step.triggerSpendDayUsd, + fallback: { provider: step.fallback.providerId, model: step.fallback.model }, + })), + ...(spec.failoverTo + ? { failoverTo: { provider: spec.failoverTo.providerId, model: spec.failoverTo.model } } + : {}), + }; +} + +/** v1 bundle stub: use the highest-priority leg as the primary target. The + * real per-leg cap balancer lands later — when it does, this function gets + * replaced by something that consumes today's spend per leg. */ +function bundleAsPolicy(bundle: WireBundle): PolicyConfig { + // Schema validates legs are sorted ascending by priority on write. Take [0]. + const head = bundle.spec.legs[0]!; + return { + id: "cost-cascade", + primary: { provider: head.providerId, model: head.model }, + cascade: [], + }; +} diff --git a/src/policy/decide.ts b/src/policy/decide.ts index 2cccf51..750a81d 100644 --- a/src/policy/decide.ts +++ b/src/policy/decide.ts @@ -1,8 +1,17 @@ // Pure decision: given a policy + today's spend, compute the effective target. // Tested against fixtures of (policy, spend) inputs in tests/policy.test.ts. +// +// `decideForAgent` resolves the policy in this order: +// 1. cloud cache (~/.thomas/cloud-cache.json) — written by `thomas cloud sync` +// 2. local store (~/.thomas/policies.json) — set by `thomas policy set` +// 3. fallback target — the route from routes.json +// The first hit wins. Cloud takes precedence so a centrally-managed policy +// supersedes a leftover local one once the user logs in. Offline (or pre-login) +// users keep getting their local policies — no behavior change. -import { readRuns } from "../runs/store.js"; import type { AgentId } from "../agents/types.js"; +import { loadCloudPolicyForAgent } from "../cloud/policy-bridge.js"; +import { readRuns } from "../runs/store.js"; import { getPolicy } from "./store.js"; import type { PolicyConfig, PolicyDecision } from "./types.js"; @@ -10,15 +19,33 @@ export async function decideForAgent( agentId: AgentId, fallbackTarget: { provider: string; model: string }, ): Promise { - const policy = await getPolicy(agentId); - if (!policy) { - return { target: fallbackTarget, reason: "no policy configured", policyId: null }; + const cloudPolicy = await loadCloudPolicyForAgent(agentId); + if (cloudPolicy) { + const spendDay = await spendSinceStartOfDay(agentId); + return { ...decide(cloudPolicy, spendDay), policy: cloudPolicy, source: "cloud" }; + } + const localPolicy = await getPolicy(agentId); + if (localPolicy) { + const spendDay = await spendSinceStartOfDay(agentId); + return { ...decide(localPolicy, spendDay), policy: localPolicy, source: "local" }; } - const spendDay = await spendSinceStartOfDay(agentId); - return decide(policy, spendDay); + return { + target: fallbackTarget, + reason: "no policy configured", + policyId: null, + policy: null, + source: "none", + }; } -export function decide(policy: PolicyConfig, spendDay: number): PolicyDecision { +/** + * Pure cascade evaluation. Returns target + reason + policyId. Caller is + * responsible for stamping `policy` and `source` (decideForAgent does this). + */ +export function decide( + policy: PolicyConfig, + spendDay: number, +): Omit { // cascade rules are evaluated in order; first matching trigger wins. // Caller is expected to have ordered them ascending by trigger. for (const rule of policy.cascade) { diff --git a/src/policy/types.ts b/src/policy/types.ts index f523236..8dd13ee 100644 --- a/src/policy/types.ts +++ b/src/policy/types.ts @@ -29,4 +29,16 @@ export type PolicyDecision = { target: { provider: string; model: string }; reason: string; policyId: PolicyConfig["id"] | null; + /** + * The full policy that produced this decision, if any. Lets callers reach + * for `failoverTo` (or future fields) without re-reading the store. Null + * when the decision was just "use the fallback target" (no policy bound). + */ + policy: PolicyConfig | null; + /** + * Where the policy came from. "cloud" = pulled from ~/.thomas/cloud-cache.json. + * "local" = ~/.thomas/policies.json. "none" = no policy was bound. + * Surfaced for telemetry / debugging — not part of the decision logic. + */ + source: "cloud" | "local" | "none"; }; diff --git a/src/proxy/server.ts b/src/proxy/server.ts index cddc615..fc188f5 100644 --- a/src/proxy/server.ts +++ b/src/proxy/server.ts @@ -10,7 +10,6 @@ import { getProvider, type ProviderSpec } from "../providers/registry.js"; import type { AgentId, AgentSpec, Protocol } from "../agents/types.js"; import { decideForAgent } from "../policy/decide.js"; import { shouldFailover } from "../policy/failover.js"; -import { getPolicy } from "../policy/store.js"; import { computeCost } from "../runs/pricing.js"; import { appendRun } from "../runs/store.js"; import { StreamUsageWatcher, ZERO_USAGE, extractUsageFromBody, type Usage } from "../runs/usage.js"; @@ -65,15 +64,14 @@ async function handle(req: IncomingMessage, res: ServerResponse): Promise } // Apply policy (cost cascade, etc.) — may rewrite provider+model. + // Cloud-backed cache takes precedence over local ~/.thomas/policies.json + // when the user is logged in to thomas-cloud; see src/policy/decide.ts. const decision = await decideForAgent(found.agentId as AgentId, { provider: route.provider, model: route.model, }); const effective = decision.target; - const policyConfig = - decision.policyId === "cost-cascade" - ? await getPolicy(found.agentId as AgentId) - : undefined; + const policyConfig = decision.policy ?? undefined; const inboundBody = await readBody(req); const inboundPath = req.url ?? ""; diff --git a/tests/cloud-decide.test.ts b/tests/cloud-decide.test.ts new file mode 100644 index 0000000..f09ade9 --- /dev/null +++ b/tests/cloud-decide.test.ts @@ -0,0 +1,248 @@ +// PR6: decide() consults cloud-cache before local store. +// +// Three angles covered: +// 1. cloud cache binding kind=static → static target wins (no cascade) +// 2. cloud cache binding kind=policy → cascade evaluated against today's spend +// 3. cloud cache binding kind=bundle → highest-priority leg used (v1 stub) +// Plus the fallback paths: +// 4. cache present but no binding for this agent → falls through to local store +// 5. cache absent (no cloud login) → falls through to local store +// 6. cache present + local store empty + no binding → fallbackTarget wins +// Plus referential-integrity oddities: +// 7. binding kind=policy with disabled policy → falls through (not used) +// 8. binding kind=bundle with empty legs → falls through + +import { afterEach, beforeEach, describe, expect, it } from "bun:test"; +import { mkdtemp, rm } from "node:fs/promises"; +import { tmpdir } from "node:os"; +import { join } from "node:path"; + +import { decideForAgent } from "../src/policy/decide.js"; +import { setPolicy } from "../src/policy/store.js"; +import type { PolicyConfig } from "../src/policy/types.js"; +import { writeCache } from "../src/cloud/cache.js"; +import type { CloudSnapshot } from "../src/cloud/types.js"; + +let dir: string; +const ORIG_THOMAS_HOME = process.env.THOMAS_HOME; + +beforeEach(async () => { + dir = await mkdtemp(join(tmpdir(), "thomas-cloud-decide-")); + process.env.THOMAS_HOME = dir; +}); + +afterEach(async () => { + if (ORIG_THOMAS_HOME !== undefined) process.env.THOMAS_HOME = ORIG_THOMAS_HOME; + else delete process.env.THOMAS_HOME; + await rm(dir, { recursive: true, force: true }); +}); + +const FALLBACK = { provider: "anthropic", model: "claude-haiku-4-5" }; + +function snapshot(partial: Partial): CloudSnapshot { + return { + schemaVersion: 1, + policies: [], + bundles: [], + bindings: [], + providers: [], + redactRulesVersion: null, + syncedAt: new Date().toISOString(), + ...partial, + }; +} + +describe("decideForAgent — cloud cache integration", () => { + it("uses cloud static binding ahead of local policy and route fallback", async () => { + // Local also has a policy for this agent — cloud must override. + const localPolicy: PolicyConfig = { + id: "cost-cascade", + primary: { provider: "openai", model: "gpt-4o" }, + cascade: [], + }; + await setPolicy("claude-code", localPolicy); + + await writeCache( + snapshot({ + bindings: [ + { + agentId: "claude-code", + bindingKind: "static", + staticTarget: { providerId: "anthropic", model: "claude-opus-4-7" }, + }, + ], + }) as unknown as CloudSnapshot, + ); + + const decision = await decideForAgent("claude-code", FALLBACK); + expect(decision.target).toEqual({ + provider: "anthropic", + model: "claude-opus-4-7", + }); + expect(decision.source).toBe("cloud"); + expect(decision.policy?.cascade).toEqual([]); + }); + + it("applies cloud policy cascade based on today's spend", async () => { + await writeCache( + snapshot({ + policies: [ + { + id: "01POLICY_ULID00000000000000", + name: "Opus then Haiku", + enabled: true, + spec: { + schemaVersion: 1, + primary: { providerId: "anthropic", model: "claude-opus-4-7" }, + cascade: [ + { + triggerSpendDayUsd: 5.0, + fallback: { + providerId: "anthropic", + model: "claude-haiku-4-5", + }, + }, + ], + }, + }, + ], + bindings: [ + { + agentId: "claude-code", + bindingKind: "policy", + targetId: "01POLICY_ULID00000000000000", + }, + ], + }) as unknown as CloudSnapshot, + ); + + // No spend → primary + const cold = await decideForAgent("claude-code", FALLBACK); + expect(cold.target.model).toBe("claude-opus-4-7"); + expect(cold.source).toBe("cloud"); + }); + + it("falls back to local policy when cloud cache has no binding for this agent", async () => { + await writeCache( + snapshot({ + bindings: [ + { + agentId: "codex", + bindingKind: "static", + staticTarget: { providerId: "openai", model: "gpt-4o" }, + }, + ], + }) as unknown as CloudSnapshot, + ); + const localPolicy: PolicyConfig = { + id: "cost-cascade", + primary: { provider: "kimi", model: "kimi-k2" }, + cascade: [], + }; + await setPolicy("claude-code", localPolicy); + + const decision = await decideForAgent("claude-code", FALLBACK); + expect(decision.target).toEqual({ provider: "kimi", model: "kimi-k2" }); + expect(decision.source).toBe("local"); + }); + + it("falls back to fallbackTarget when neither cloud nor local has anything", async () => { + // No cache, no local policy — pure fallback path (existing behavior). + const decision = await decideForAgent("claude-code", FALLBACK); + expect(decision.target).toEqual(FALLBACK); + expect(decision.source).toBe("none"); + expect(decision.policy).toBeNull(); + }); + + it("ignores disabled cloud policies + falls through to local", async () => { + const localPolicy: PolicyConfig = { + id: "cost-cascade", + primary: { provider: "kimi", model: "kimi-k2" }, + cascade: [], + }; + await setPolicy("claude-code", localPolicy); + + await writeCache( + snapshot({ + policies: [ + { + id: "01POLICY", + name: "disabled", + enabled: false, // <-- key bit + spec: { + schemaVersion: 1, + primary: { providerId: "anthropic", model: "claude-opus-4-7" }, + cascade: [], + }, + }, + ], + bindings: [ + { + agentId: "claude-code", + bindingKind: "policy", + targetId: "01POLICY", + }, + ], + }) as unknown as CloudSnapshot, + ); + + const decision = await decideForAgent("claude-code", FALLBACK); + expect(decision.source).toBe("local"); + expect(decision.target.provider).toBe("kimi"); + }); + + it("uses highest-priority leg for bundle bindings (v1 stub)", async () => { + await writeCache( + snapshot({ + bundles: [ + { + id: "01BUNDLE", + name: "openai then deepseek", + enabled: true, + spec: { + schemaVersion: 1, + legs: [ + // priority 0 (head) — used by the v1 stub + { providerId: "openai", model: "gpt-4o", priority: 0, capUsdPerDay: 5 }, + { providerId: "deepseek", model: "deepseek-chat", priority: 1, capUsdPerDay: 5 }, + ], + }, + }, + ], + bindings: [ + { + agentId: "claude-code", + bindingKind: "bundle", + targetId: "01BUNDLE", + }, + ], + }) as unknown as CloudSnapshot, + ); + + const decision = await decideForAgent("claude-code", FALLBACK); + expect(decision.target).toEqual({ provider: "openai", model: "gpt-4o" }); + expect(decision.source).toBe("cloud"); + }); + + it("ignores bundle bindings whose target bundle is missing", async () => { + await setPolicy("claude-code", { + id: "cost-cascade", + primary: { provider: "kimi", model: "kimi-k2" }, + cascade: [], + }); + await writeCache( + snapshot({ + bundles: [], // empty — binding refers to nonexistent bundle + bindings: [ + { + agentId: "claude-code", + bindingKind: "bundle", + targetId: "01MISSING", + }, + ], + }) as unknown as CloudSnapshot, + ); + const decision = await decideForAgent("claude-code", FALLBACK); + expect(decision.source).toBe("local"); + }); +});