Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
335 changes: 302 additions & 33 deletions SKILL.md

Large diffs are not rendered by default.

23 changes: 23 additions & 0 deletions src/cloud/cache.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
// Read / write ~/.thomas/cloud-cache.json — the snapshot pulled from /v1/sync.

import { readJson, writeJsonAtomic } from "../config/io.js";
import { paths } from "../config/paths.js";
import type { CloudSnapshot } from "./types.js";

const EMPTY: CloudSnapshot = {
schemaVersion: 1,
policies: [],
bundles: [],
bindings: [],
providers: [],
redactRulesVersion: null,
syncedAt: "",
};

export async function readCache(): Promise<CloudSnapshot> {
return readJson<CloudSnapshot>(paths.cloudCache, EMPTY);
}

export async function writeCache(snapshot: CloudSnapshot): Promise<void> {
await writeJsonAtomic(paths.cloudCache, snapshot);
}
99 changes: 99 additions & 0 deletions src/cloud/client.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
// HTTP client for thomas-cloud — single concern: assemble URLs, attach the
// device token (when present), and surface failures as ThomasErrors with
// stable codes the calling command can interpret.
//
// Stays small on purpose. No retry / backoff for v1 — if the cloud is down
// or slow, fail loud so the user knows. Background sync (PR4 candidate) can
// add retry around this.

import { ThomasError } from "../cli/json.js";
import type { ErrorCode } from "../cli/output.js";

export type CloudFetchOptions = {
baseUrl: string;
/** When set, sent as Authorization: Bearer ${deviceToken}. */
deviceToken?: string;
/** Per-request timeout in ms. Default 10s. */
timeoutMs?: number;
};

export async function cloudFetch(
path: string,
init: RequestInit,
opts: CloudFetchOptions,
): Promise<Response> {
const url = `${opts.baseUrl.replace(/\/+$/, "")}${path}`;
const headers = new Headers(init.headers);
headers.set("accept", "application/json");
if (init.body && !headers.has("content-type")) {
headers.set("content-type", "application/json");
}
if (opts.deviceToken) {
headers.set("authorization", `Bearer ${opts.deviceToken}`);
}
const ctrl = new AbortController();
const timer = setTimeout(() => ctrl.abort(), opts.timeoutMs ?? 10_000);
try {
return await fetch(url, { ...init, headers, signal: ctrl.signal });
} catch (err) {
throw asThomasError(err, url);
} finally {
clearTimeout(timer);
}
}

/** GET <path> + parse JSON, with the auth/timeout treatment above. */
export async function cloudGetJson<T>(
path: string,
opts: CloudFetchOptions,
): Promise<T> {
const resp = await cloudFetch(path, { method: "GET" }, opts);
return await readJsonOrThrow<T>(resp, path);
}

export async function cloudPostJson<T>(
path: string,
body: unknown,
opts: CloudFetchOptions,
): Promise<T> {
const resp = await cloudFetch(
path,
{ method: "POST", body: JSON.stringify(body) },
opts,
);
return await readJsonOrThrow<T>(resp, path);
}

async function readJsonOrThrow<T>(resp: Response, path: string): Promise<T> {
if (resp.ok) return (await resp.json()) as T;
const code = mapStatusToCode(resp.status);
let detail: unknown;
try {
detail = await resp.json();
} catch {
detail = await resp.text().catch(() => "");
}
throw new ThomasError({
code,
message: `${resp.status} ${resp.statusText} from ${path}`,
details: detail,
});
}

function mapStatusToCode(status: number): ErrorCode {
if (status === 401 || status === 403) return "E_CLOUD_UNAUTHORIZED";
return "E_CLOUD_UNREACHABLE";
}

function asThomasError(err: unknown, url: string): ThomasError {
const isAbort = err instanceof Error && err.name === "AbortError";
return new ThomasError({
code: isAbort ? "E_CLOUD_TIMEOUT" : "E_CLOUD_UNREACHABLE",
message: isAbort
? `request to ${url} timed out`
: `could not reach ${url}: ${err instanceof Error ? err.message : String(err)}`,
remediation: isAbort
? "Check your network or set THOMAS_CLOUD_BASE_URL to a reachable host."
: "Verify thomas-cloud is up. For local dev, set THOMAS_CLOUD_BASE_URL=http://localhost:8000.",
});
}
95 changes: 95 additions & 0 deletions src/cloud/device.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
// Device-code grant flow on the client side.
//
// Mirrors RFC 8628 polling shape, matched 1:1 to apps/api/app/api/devices.py:
// 1. POST /v1/devices/begin → { device_code, user_code, verification_uri, interval, expires_in }
// 2. user opens the URL, signs in, approves
// 3. POST /v1/devices/poll → 400 authorization_pending (loop) | 200 { device_token, ... }
//
// The polling loop is the only place this CLI does an interactive long-poll.
// We respect the server-provided interval; on 400 authorization_pending we
// keep going; on any other 4xx/5xx we surface immediately.

import { ThomasError } from "../cli/json.js";
import { cloudFetch, cloudPostJson, type CloudFetchOptions } from "./client.js";
import type { DeviceBeginRequest, DeviceBeginResponse, DevicePollResponse } from "./types.js";

export type LoginProgress =
| { kind: "begun"; userCode: string; verificationUri: string; verificationUriComplete: string; intervalMs: number; expiresInMs: number }
| { kind: "still_pending" }
| { kind: "approved"; result: DevicePollResponse };

export async function beginDeviceLogin(
req: DeviceBeginRequest,
opts: CloudFetchOptions,
): Promise<DeviceBeginResponse> {
return cloudPostJson<DeviceBeginResponse>("/v1/devices/begin", req, opts);
}

export type PollDeviceLoginOptions = CloudFetchOptions & {
deviceCode: string;
/** ms; defaults to begin response's interval. Floored at 1s. */
intervalMs: number;
/** Total wall-clock budget; loop exits once exceeded. */
expiresAt: number;
/** Called once per poll iteration so the caller can render a spinner / abort. */
onTick?: () => boolean | void;
};

/** Loops until approved, expired, or onTick returns false. */
export async function pollDeviceLogin(
opts: PollDeviceLoginOptions,
): Promise<DevicePollResponse> {
const interval = Math.max(1000, opts.intervalMs);
// Tiny initial delay so the user sees the URL before we start hammering.
await sleep(interval);
while (Date.now() < opts.expiresAt) {
if (opts.onTick && opts.onTick() === false) {
throw new ThomasError({
code: "E_INTERNAL",
message: "login aborted by caller",
});
}
const resp = await cloudFetch(
"/v1/devices/poll",
{ method: "POST", body: JSON.stringify({ device_code: opts.deviceCode }) },
opts,
);
if (resp.ok) {
return (await resp.json()) as DevicePollResponse;
}
// 400 with detail.error == "authorization_pending" → keep waiting.
// Any other status → surface immediately.
let detail: { error?: string } | undefined;
try {
const body = (await resp.json()) as { detail?: { error?: string } };
detail = body.detail;
} catch {
// ignore parse errors; fall through to the generic-error path
}
if (resp.status === 400 && detail?.error === "authorization_pending") {
await sleep(interval);
continue;
}
if (resp.status === 400 && detail?.error === "expired_token") {
throw new ThomasError({
code: "E_CLOUD_TIMEOUT",
message: "device code expired before approval",
remediation: "Run `thomas cloud login` again.",
});
}
throw new ThomasError({
code: "E_CLOUD_UNAUTHORIZED",
message: `unexpected ${resp.status} from /v1/devices/poll`,
details: detail ?? null,
});
}
throw new ThomasError({
code: "E_CLOUD_TIMEOUT",
message: "login timed out before approval",
remediation: "Run `thomas cloud login` again and approve in the browser within the time limit.",
});
}

function sleep(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms));
}
43 changes: 43 additions & 0 deletions src/cloud/identity.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Read / write ~/.thomas/cloud.json — the device token + workspace binding.
//
// The device token is a bearer secret. It's stored at 0600 (already enforced by
// io.writeJsonAtomic) and never leaves this file at runtime — clients pass it
// to fetch() via Authorization headers.

import { unlink } from "node:fs/promises";

import { readJson, writeJsonAtomic } from "../config/io.js";
import { paths } from "../config/paths.js";
import type { CloudIdentity } from "./types.js";

export async function readIdentity(): Promise<CloudIdentity | undefined> {
const value = await readJson<CloudIdentity | null>(paths.cloud, null);
return value ?? undefined;
}

export async function writeIdentity(identity: CloudIdentity): Promise<void> {
await writeJsonAtomic(paths.cloud, identity);
}

export async function clearIdentity(): Promise<boolean> {
try {
await unlink(paths.cloud);
return true;
} catch {
return false;
}
}

export async function updateLastSync(syncedAt: string): Promise<void> {
const identity = await readIdentity();
if (!identity) return;
await writeIdentity({ ...identity, lastSyncAt: syncedAt });
}

/**
* Default base URL for the SaaS. Override with THOMAS_CLOUD_BASE_URL for
* local dev (e.g. http://localhost:8000) or a private deployment.
*/
export function defaultBaseUrl(): string {
return process.env.THOMAS_CLOUD_BASE_URL?.replace(/\/+$/, "") ?? "https://thomas.trustunknown.com";
}
47 changes: 47 additions & 0 deletions src/cloud/sync.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// One-shot pull from /v1/sync. Returns the snapshot AND writes it to the
// on-disk cache as a side effect — both reads and the write are part of
// `thomas cloud sync`.

import { ThomasError } from "../cli/json.js";
import { writeCache } from "./cache.js";
import { cloudGetJson, type CloudFetchOptions } from "./client.js";
import { readIdentity, updateLastSync } from "./identity.js";
import type { CloudSnapshot } from "./types.js";

type SyncWireResponse = {
schemaVersion: number;
policies: unknown[];
bundles: unknown[];
bindings: unknown[];
providers: unknown[];
redactRulesVersion: string | null;
};

export async function syncFromCloud(): Promise<CloudSnapshot> {
const identity = await readIdentity();
if (!identity) {
throw new ThomasError({
code: "E_CLOUD_NOT_LOGGED_IN",
message: "no cloud login on this machine",
remediation: "Run `thomas cloud login` first.",
});
}
const opts: CloudFetchOptions = {
baseUrl: identity.baseUrl,
deviceToken: identity.deviceToken,
};
const wire = await cloudGetJson<SyncWireResponse>("/v1/sync", opts);
const syncedAt = new Date().toISOString();
const snapshot: CloudSnapshot = {
schemaVersion: 1,
policies: wire.policies ?? [],
bundles: wire.bundles ?? [],
bindings: wire.bindings ?? [],
providers: wire.providers ?? [],
redactRulesVersion: wire.redactRulesVersion ?? null,
syncedAt,
};
await writeCache(snapshot);
await updateLastSync(syncedAt);
return snapshot;
}
47 changes: 47 additions & 0 deletions src/cloud/types.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// Wire shapes that thomas-cloud's HTTP API speaks. Mirrors the Pydantic
// models in apps/api/app/api/{devices,sync}.py. Kept in one file so changes
// to the cloud contract surface as one diff here.

export type DeviceBeginRequest = {
label: string;
platform?: string;
thomas_version?: string;
};

export type DeviceBeginResponse = {
device_code: string;
user_code: string;
verification_uri: string;
verification_uri_complete: string;
interval: number;
expires_in: number;
};

export type DevicePollResponse = {
device_token: string;
workspace_id: string;
device_id: string;
};

// Saved at ~/.thomas/cloud.json after a successful login. 0600 mode.
export type CloudIdentity = {
baseUrl: string;
deviceToken: string;
deviceId: string;
workspaceId: string;
loggedInAt: string;
// Optional, set after first /v1/sync.
lastSyncAt?: string;
};

// Saved at ~/.thomas/cloud-cache.json after each successful /v1/sync.
export type CloudSnapshot = {
schemaVersion: 1;
policies: unknown[];
bundles: unknown[];
bindings: unknown[];
providers: unknown[];
redactRulesVersion: string | null;
// Local timestamp of when this snapshot was pulled. Used to flag staleness.
syncedAt: string;
};
Loading
Loading