Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion SKILL.md
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,15 @@ Returns `{ loggedIn, baseUrl, workspaceId, deviceId, loggedInAt, lastSyncAt }`.
thomas cloud sync --json
```

Returns `{ schemaVersion, policiesCount, bundlesCount, bindingsCount, providersCount, redactRulesVersion, syncedAt }`. Hits the cloud, writes the snapshot to `~/.thomas/cloud-cache.json` (the local proxy reads from this cache for policy decisions). When v1 ships, the snapshot is empty by design — this just exercises the wiring.
Returns `{ schemaVersion, policiesCount, bundlesCount, bindingsCount, providersCount, redactRulesVersion, syncedAt }`. Hits the cloud, writes the snapshot to `~/.thomas/cloud-cache.json`.

The local proxy's policy decision pipeline reads from this cache **before** the local `~/.thomas/policies.json` store. Resolution order on each request:

1. cloud cache (if logged in to thomas-cloud and the workspace has a binding for this agent)
2. local store (`thomas policy set` results)
3. route fallback (`thomas route ...`)

So a centrally-managed policy on thomas-cloud automatically takes effect once the user logs in + syncs; offline / pre-login users keep getting their local policies unchanged.

### "Sign out of cloud"

Expand Down
139 changes: 139 additions & 0 deletions src/cloud/policy-bridge.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
// Translate the cloud snapshot's per-agent binding into a local PolicyConfig
// the proxy's existing decide() pipeline understands.
//
// The wire shape from /v1/sync mirrors apps/api/app/schemas/{policy,bundle,binding}.py
// — same fields, same camelCase. We do runtime parsing here (no codegen) and
// synthesize a "cost-cascade" PolicyConfig regardless of the cloud binding's
// kind, so the rest of thomas keeps a single decision pipeline:
//
// binding kind=static → synthetic policy: primary=staticTarget, no cascade.
// The cascade decision becomes a no-op; primary wins.
// binding kind=policy → look up the policy in the snapshot, translate the
// field names (providerId/triggerSpendDayUsd → provider/
// triggerSpendDay) and feed it through unchanged.
// binding kind=bundle → v1 stub: use the highest-priority leg as primary,
// no cascade. The real bundle balancer (per-leg cap
// accounting + drain order) lands in a follow-up PR.
//
// "No binding for this agent" returns null; callers fall back to the local
// ~/.thomas/policies.json store. Same for "no cloud login" / "stale cache".

import type { AgentId } from "../agents/types.js";
import type { PolicyConfig } from "../policy/types.js";
import { readCache } from "./cache.js";

type WireModelRef = { providerId: string; model: string };

type WireCascadeStep = {
triggerSpendDayUsd: number;
fallback: WireModelRef;
};

type WirePolicySpec = {
schemaVersion: number;
primary: WireModelRef;
cascade?: WireCascadeStep[];
failoverTo?: WireModelRef | null;
};

type WirePolicy = {
id: string;
name: string;
spec: WirePolicySpec;
enabled: boolean;
};

type WireBundleLeg = {
providerId: string;
model: string;
capUsdPerDay?: number | null;
capCallsPerDay?: number | null;
priority: number;
};

type WireBundle = {
id: string;
name: string;
spec: { schemaVersion: number; legs: WireBundleLeg[] };
enabled: boolean;
};

type WireBinding = {
agentId: string;
bindingKind: "policy" | "bundle" | "static";
targetId?: string | null;
staticTarget?: WireModelRef | null;
};

/**
* Look up a policy config for `agentId` in the cloud cache. Returns undefined
* when there's no cache, no binding for this agent, or the binding refers to
* a disabled / missing policy. Caller should fall through to the local
* `~/.thomas/policies.json` store on undefined.
*/
export async function loadCloudPolicyForAgent(
agentId: AgentId,
): Promise<PolicyConfig | undefined> {
const snapshot = await readCache();
// Empty defaults — when the user hasn't logged in to cloud, the cache file
// returns the EMPTY constant from cache.ts and these are all empty arrays.
const bindings = snapshot.bindings as WireBinding[];
const binding = bindings.find((b) => b.agentId === agentId);
if (!binding) return undefined;

if (binding.bindingKind === "static") {
if (!binding.staticTarget) return undefined;
return staticAsPolicy(binding.staticTarget);
}
if (binding.bindingKind === "policy") {
const policies = snapshot.policies as WirePolicy[];
const policy = policies.find((p) => p.id === binding.targetId && p.enabled);
if (!policy) return undefined;
return wireToPolicyConfig(policy.spec);
}
if (binding.bindingKind === "bundle") {
const bundles = snapshot.bundles as WireBundle[];
const bundle = bundles.find((b) => b.id === binding.targetId && b.enabled);
if (!bundle || bundle.spec.legs.length === 0) return undefined;
return bundleAsPolicy(bundle);
}
return undefined;
}

/** Convert a static binding to a no-cascade PolicyConfig. */
function staticAsPolicy(target: WireModelRef): PolicyConfig {
return {
id: "cost-cascade",
primary: { provider: target.providerId, model: target.model },
cascade: [],
};
}

/** Convert a cloud PolicySpec (camelCase wire) to local PolicyConfig (legacy
* field names). Cascade is already sorted ascending on the cloud side. */
function wireToPolicyConfig(spec: WirePolicySpec): PolicyConfig {
return {
id: "cost-cascade",
primary: { provider: spec.primary.providerId, model: spec.primary.model },
cascade: (spec.cascade ?? []).map((step) => ({
triggerSpendDay: step.triggerSpendDayUsd,
fallback: { provider: step.fallback.providerId, model: step.fallback.model },
})),
...(spec.failoverTo
? { failoverTo: { provider: spec.failoverTo.providerId, model: spec.failoverTo.model } }
: {}),
};
}

/** v1 bundle stub: use the highest-priority leg as the primary target. The
* real per-leg cap balancer lands later — when it does, this function gets
* replaced by something that consumes today's spend per leg. */
function bundleAsPolicy(bundle: WireBundle): PolicyConfig {
// Schema validates legs are sorted ascending by priority on write. Take [0].
const head = bundle.spec.legs[0]!;
return {
id: "cost-cascade",
primary: { provider: head.providerId, model: head.model },
cascade: [],
};
}
41 changes: 34 additions & 7 deletions src/policy/decide.ts
Original file line number Diff line number Diff line change
@@ -1,24 +1,51 @@
// Pure decision: given a policy + today's spend, compute the effective target.
// Tested against fixtures of (policy, spend) inputs in tests/policy.test.ts.
//
// `decideForAgent` resolves the policy in this order:
// 1. cloud cache (~/.thomas/cloud-cache.json) — written by `thomas cloud sync`
// 2. local store (~/.thomas/policies.json) — set by `thomas policy set`
// 3. fallback target — the route from routes.json
// The first hit wins. Cloud takes precedence so a centrally-managed policy
// supersedes a leftover local one once the user logs in. Offline (or pre-login)
// users keep getting their local policies — no behavior change.

import { readRuns } from "../runs/store.js";
import type { AgentId } from "../agents/types.js";
import { loadCloudPolicyForAgent } from "../cloud/policy-bridge.js";
import { readRuns } from "../runs/store.js";
import { getPolicy } from "./store.js";
import type { PolicyConfig, PolicyDecision } from "./types.js";

export async function decideForAgent(
agentId: AgentId,
fallbackTarget: { provider: string; model: string },
): Promise<PolicyDecision> {
const policy = await getPolicy(agentId);
if (!policy) {
return { target: fallbackTarget, reason: "no policy configured", policyId: null };
const cloudPolicy = await loadCloudPolicyForAgent(agentId);
if (cloudPolicy) {
const spendDay = await spendSinceStartOfDay(agentId);
return { ...decide(cloudPolicy, spendDay), policy: cloudPolicy, source: "cloud" };
}
const localPolicy = await getPolicy(agentId);
if (localPolicy) {
const spendDay = await spendSinceStartOfDay(agentId);
return { ...decide(localPolicy, spendDay), policy: localPolicy, source: "local" };
}
const spendDay = await spendSinceStartOfDay(agentId);
return decide(policy, spendDay);
return {
target: fallbackTarget,
reason: "no policy configured",
policyId: null,
policy: null,
source: "none",
};
}

export function decide(policy: PolicyConfig, spendDay: number): PolicyDecision {
/**
* Pure cascade evaluation. Returns target + reason + policyId. Caller is
* responsible for stamping `policy` and `source` (decideForAgent does this).
*/
export function decide(
policy: PolicyConfig,
spendDay: number,
): Omit<PolicyDecision, "policy" | "source"> {
// cascade rules are evaluated in order; first matching trigger wins.
// Caller is expected to have ordered them ascending by trigger.
for (const rule of policy.cascade) {
Expand Down
12 changes: 12 additions & 0 deletions src/policy/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,16 @@ export type PolicyDecision = {
target: { provider: string; model: string };
reason: string;
policyId: PolicyConfig["id"] | null;
/**
* The full policy that produced this decision, if any. Lets callers reach
* for `failoverTo` (or future fields) without re-reading the store. Null
* when the decision was just "use the fallback target" (no policy bound).
*/
policy: PolicyConfig | null;
/**
* Where the policy came from. "cloud" = pulled from ~/.thomas/cloud-cache.json.
* "local" = ~/.thomas/policies.json. "none" = no policy was bound.
* Surfaced for telemetry / debugging — not part of the decision logic.
*/
source: "cloud" | "local" | "none";
};
8 changes: 3 additions & 5 deletions src/proxy/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import { getProvider, type ProviderSpec } from "../providers/registry.js";
import type { AgentId, AgentSpec, Protocol } from "../agents/types.js";
import { decideForAgent } from "../policy/decide.js";
import { shouldFailover } from "../policy/failover.js";
import { getPolicy } from "../policy/store.js";
import { computeCost } from "../runs/pricing.js";
import { appendRun } from "../runs/store.js";
import { StreamUsageWatcher, ZERO_USAGE, extractUsageFromBody, type Usage } from "../runs/usage.js";
Expand Down Expand Up @@ -65,15 +64,14 @@ async function handle(req: IncomingMessage, res: ServerResponse): Promise<void>
}

// Apply policy (cost cascade, etc.) — may rewrite provider+model.
// Cloud-backed cache takes precedence over local ~/.thomas/policies.json
// when the user is logged in to thomas-cloud; see src/policy/decide.ts.
const decision = await decideForAgent(found.agentId as AgentId, {
provider: route.provider,
model: route.model,
});
const effective = decision.target;
const policyConfig =
decision.policyId === "cost-cascade"
? await getPolicy(found.agentId as AgentId)
: undefined;
const policyConfig = decision.policy ?? undefined;

const inboundBody = await readBody(req);
const inboundPath = req.url ?? "";
Expand Down
Loading
Loading