From b50b5827ffa1032029e1f85e0aa0f3f5ea90c6be Mon Sep 17 00:00:00 2001 From: Yoav Farhi Date: Tue, 17 Mar 2026 12:42:14 +0200 Subject: [PATCH 1/2] feat(logs): add --tail flag for real-time log streaming via SSE - Add streamFunctionLogs() async generator in core layer using ky with timeout:false, SSE line parsing, and malformed JSON resilience - Export FunctionLogEntry type and add StreamLogFilters interface - Register --tail (-f) and --json flags on the logs command - Implement tailAction with multi-function support (one SSE connection per function), SIGINT cleanup, and stream shutdown policy - Validate incompatible flag combinations (--tail with --since/--until/ --limit/--order) - JSON mode outputs NDJSON with source field for piping Co-Authored-By: Claude Opus 4.6 (1M context) --- packages/cli/src/cli/commands/project/logs.ts | 145 +++++++++++++++++- .../cli/src/core/resources/function/api.ts | 99 ++++++++++++ .../cli/src/core/resources/function/schema.ts | 8 +- 3 files changed, 245 insertions(+), 7 deletions(-) diff --git a/packages/cli/src/cli/commands/project/logs.ts b/packages/cli/src/cli/commands/project/logs.ts index 94a18a61..b7543fb9 100644 --- a/packages/cli/src/cli/commands/project/logs.ts +++ b/packages/cli/src/cli/commands/project/logs.ts @@ -1,3 +1,4 @@ +import { log } from "@clack/prompts"; import { Command, Option } from "commander"; import type { CLIContext } from "@/cli/types.js"; import { runCommand } from "@/cli/utils/index.js"; @@ -5,13 +6,16 @@ import type { RunCommandResult } from "@/cli/utils/runCommand.js"; import { ApiError, InvalidInputError } from "@/core/errors.js"; import { readProjectConfig } from "@/core/index.js"; import type { + FunctionLogEntry, FunctionLogFilters, FunctionLogsResponse, LogLevel, + StreamLogFilters, } from "@/core/resources/function/index.js"; import { fetchFunctionLogs, LogLevelSchema, + streamFunctionLogs, } from "@/core/resources/function/index.js"; interface LogsOptions { @@ -22,6 +26,7 @@ interface LogsOptions { limit?: string; order?: string; json?: boolean; + tail?: boolean; } /** @@ -171,6 +176,131 @@ function validateLimit(limit: string | undefined): void { } } +function validateTailFlags(options: LogsOptions): void { + if (!options.tail) return; + + const incompatible: [string, string | undefined][] = [ + ["--since", options.since], + ["--until", options.until], + ["--limit", options.limit], + ["--order", options.order], + ]; + + for (const [flag, value] of incompatible) { + if (value !== undefined) { + throw new InvalidInputError(`Cannot use ${flag} with --tail`); + } + } +} + +function formatStreamEntry( + entry: FunctionLogEntry, + functionName: string, + json: boolean, +): string { + if (json) { + return JSON.stringify({ + source: functionName, + time: entry.time, + level: entry.level, + message: entry.message, + }); + } + const normalized = normalizeLogEntry(entry, functionName); + return formatEntry(normalized); +} + +async function consumeStream( + functionName: string, + filters: StreamLogFilters, + signal: AbortSignal, + json: boolean, +): Promise<"ended" | "aborted"> { + try { + for await (const entry of streamFunctionLogs( + functionName, + filters, + signal, + )) { + process.stdout.write(`${formatStreamEntry(entry, functionName, json)}\n`); + } + return "ended"; + } catch (error) { + if (signal.aborted) return "aborted"; + throw error; + } +} + +async function tailAction(options: LogsOptions): Promise { + validateTailFlags(options); + + const specifiedFunctions = parseFunctionNames(options.function); + const allProjectFunctions = await getAllFunctionNames(); + const functionNames = + specifiedFunctions.length > 0 ? specifiedFunctions : allProjectFunctions; + + if (functionNames.length === 0) { + return { outroMessage: "No functions found in this project." }; + } + + const filters: StreamLogFilters = {}; + if (options.level) { + filters.level = options.level as LogLevel; + } + + const json = options.json ?? false; + const functionList = functionNames.join(", "); + log.info(`Tailing logs for ${functionList}... (Ctrl+C to stop)`); + + const controller = new AbortController(); + const onSigint = () => controller.abort(); + process.on("SIGINT", onSigint); + + let userAborted = false; + + try { + if (functionNames.length === 1) { + const result = await consumeStream( + functionNames[0], + filters, + controller.signal, + json, + ); + userAborted = result === "aborted"; + } else { + const results = await Promise.allSettled( + functionNames.map((fn) => + consumeStream(fn, filters, controller.signal, json), + ), + ); + + const allFailed = results.every((r) => r.status === "rejected"); + + if (allFailed && !controller.signal.aborted) { + const firstError = results.find( + (r) => r.status === "rejected", + ) as PromiseRejectedResult; + throw firstError.reason; + } + + for (let i = 0; i < results.length; i++) { + const result = results[i]; + if (result.status === "rejected" && !controller.signal.aborted) { + log.warn(`Connection lost for function '${functionNames[i]}'`); + } + } + + userAborted = controller.signal.aborted; + } + } finally { + process.removeListener("SIGINT", onSigint); + } + + return { + outroMessage: userAborted ? "Stream closed" : "Stream ended", + }; +} + async function logsAction(options: LogsOptions): Promise { validateLimit(options.limit); const specifiedFunctions = parseFunctionNames(options.function); @@ -207,7 +337,9 @@ async function logsAction(options: LogsOptions): Promise { export function getLogsCommand(context: CLIContext): Command { return new Command("logs") - .description("Fetch function logs for this app") + .description( + "Fetch function logs for this app (use --tail to stream in real-time)", + ) .option( "--function ", "Filter by function name(s), comma-separated. If omitted, fetches logs for all project functions", @@ -231,11 +363,12 @@ export function getLogsCommand(context: CLIContext): Command { .addOption( new Option("--order ", "Sort order").choices(["asc", "desc"]), ) + .option("--json", "Output raw JSON") + .option("-f, --tail", "Stream logs in real-time") .action(async (options: LogsOptions) => { - await runCommand( - () => logsAction(options), - { requireAuth: true }, - context, - ); + const action = options.tail + ? () => tailAction(options) + : () => logsAction(options); + await runCommand(action, { requireAuth: true }, context); }); } diff --git a/packages/cli/src/core/resources/function/api.ts b/packages/cli/src/core/resources/function/api.ts index dfa04030..e3f93c97 100644 --- a/packages/cli/src/core/resources/function/api.ts +++ b/packages/cli/src/core/resources/function/api.ts @@ -4,12 +4,15 @@ import { ApiError, SchemaValidationError } from "@/core/errors.js"; import type { DeploySingleFunctionResponse, FunctionFile, + FunctionLogEntry, FunctionLogFilters, FunctionLogsResponse, ListFunctionsResponse, + StreamLogFilters, } from "@/core/resources/function/schema.js"; import { DeploySingleFunctionResponseSchema, + FunctionLogEntrySchema, FunctionLogsResponseSchema, ListFunctionsResponseSchema, } from "@/core/resources/function/schema.js"; @@ -133,3 +136,99 @@ export async function fetchFunctionLogs( return result.data; } + +/** + * Build query string for stream filter options. + */ +function buildStreamQueryString(filters: StreamLogFilters): URLSearchParams { + const params = new URLSearchParams(); + if (filters.level) { + params.set("level", filters.level); + } + return params; +} + +/** + * Stream real-time logs for a specific function via SSE. + * + * Uses getAppClient() (ky) with timeout: false to preserve token refresh, + * 401 retry, and ApiError mapping. Reads the response body as a ReadableStream + * and parses SSE data: lines into typed log entries. + * + * IMPORTANT: Do NOT call response.json() or response.text() — those buffer + * the entire body. Only read from response.body incrementally. + */ +export async function* streamFunctionLogs( + functionName: string, + filters: StreamLogFilters = {}, + signal?: AbortSignal, +): AsyncGenerator { + const appClient = getAppClient(); + const searchParams = buildStreamQueryString(filters); + + let response: KyResponse; + try { + response = await appClient.get( + `functions-mgmt/${encodeURIComponent(functionName)}/logs/stream`, + { + searchParams, + timeout: false, + signal, + }, + ); + } catch (error) { + throw await ApiError.fromHttpError( + error, + `streaming function logs: '${functionName}'`, + ); + } + + if (!response.body) { + throw new ApiError("Server returned empty response for log stream"); + } + + const reader = response.body.getReader(); + const decoder = new TextDecoder(); + let buffer = ""; + + try { + while (true) { + const { done, value } = await reader.read(); + if (done) break; + + buffer += decoder.decode(value, { stream: true }); + + // Process complete lines + for ( + let newlineIndex = buffer.indexOf("\n"); + newlineIndex !== -1; + newlineIndex = buffer.indexOf("\n") + ) { + const line = buffer.slice(0, newlineIndex).trimEnd(); + buffer = buffer.slice(newlineIndex + 1); + + // Skip empty lines (SSE event delimiters) and comments (heartbeats) + if (line === "" || line.startsWith(":")) { + continue; + } + + // Parse SSE data: lines + if (line.startsWith("data: ")) { + const jsonStr = line.slice(6); // Remove "data: " prefix + try { + const parsed = FunctionLogEntrySchema.safeParse( + JSON.parse(jsonStr), + ); + if (parsed.success) { + yield parsed.data; + } + } catch { + // Malformed JSON — skip this line silently. + } + } + } + } + } finally { + reader.releaseLock(); + } +} diff --git a/packages/cli/src/core/resources/function/schema.ts b/packages/cli/src/core/resources/function/schema.ts index 7d3a5ae2..f61233a6 100644 --- a/packages/cli/src/core/resources/function/schema.ts +++ b/packages/cli/src/core/resources/function/schema.ts @@ -128,12 +128,14 @@ export const LogLevelSchema = z.enum(["info", "warning", "error", "debug"]); export type LogLevel = z.infer; -const FunctionLogEntrySchema = z.object({ +export const FunctionLogEntrySchema = z.object({ time: z.string(), level: LogLevelSchema, message: z.string(), }); +export type FunctionLogEntry = z.infer; + export const FunctionLogsResponseSchema = z.array(FunctionLogEntrySchema); export type FunctionLogsResponse = z.infer; @@ -145,3 +147,7 @@ export interface FunctionLogFilters { limit?: number; order?: "asc" | "desc"; } + +export interface StreamLogFilters { + level?: LogLevel; +} From 51775990e853cd618488a338dc14230b52ac558d Mon Sep 17 00:00:00 2001 From: Yoav Farhi Date: Tue, 17 Mar 2026 12:42:23 +0200 Subject: [PATCH 2/2] test(logs): add tests for --tail streaming and SSE parsing - Core unit tests: SSE parsing, heartbeat filtering, malformed JSON, null body, abort signal, level filter passthrough - Integration tests: streaming output, --json with source field, multi-function tailing, flag validation, heartbeat ignore, stream error handling - Add mockFunctionLogsStream helper to TestAPIServer Co-Authored-By: Claude Opus 4.6 (1M context) --- packages/cli/tests/cli/logs.spec.ts | 155 ++++++++++++++++++ .../cli/tests/cli/testkit/TestAPIServer.ts | 33 ++++ .../tests/core/stream-function-logs.spec.ts | 119 ++++++++++++++ 3 files changed, 307 insertions(+) create mode 100644 packages/cli/tests/core/stream-function-logs.spec.ts diff --git a/packages/cli/tests/cli/logs.spec.ts b/packages/cli/tests/cli/logs.spec.ts index 3899c803..5b2c1599 100644 --- a/packages/cli/tests/cli/logs.spec.ts +++ b/packages/cli/tests/cli/logs.spec.ts @@ -199,4 +199,159 @@ describe("logs command", () => { t.expectResult(result).toSucceed(); }); + + it("fails when --tail is used with --since", async () => { + await t.givenLoggedInWithProject(fixture("basic")); + const result = await t.run( + "logs", + "--tail", + "--since", + "2024-01-01T00:00:00Z", + ); + t.expectResult(result).toFail(); + t.expectResult(result).toContain("Cannot use --since with --tail"); + }); + + it("fails when --tail is used with --until", async () => { + await t.givenLoggedInWithProject(fixture("basic")); + const result = await t.run( + "logs", + "--tail", + "--until", + "2024-01-01T00:00:00Z", + ); + t.expectResult(result).toFail(); + t.expectResult(result).toContain("Cannot use --until with --tail"); + }); + + it("fails when --tail is used with --limit", async () => { + await t.givenLoggedInWithProject(fixture("basic")); + const result = await t.run("logs", "--tail", "--limit", "10"); + t.expectResult(result).toFail(); + t.expectResult(result).toContain("Cannot use --limit with --tail"); + }); + + it("fails when --tail is used with --order", async () => { + await t.givenLoggedInWithProject(fixture("basic")); + const result = await t.run("logs", "--tail", "--order", "asc"); + t.expectResult(result).toFail(); + t.expectResult(result).toContain("Cannot use --order with --tail"); + }); + + it("streams logs with --tail and prints entries", async () => { + await t.givenLoggedInWithProject(fixture("basic")); + t.api.mockFunctionLogsStream("my-function", [ + { + time: "2024-01-15T10:30:00.000Z", + level: "info", + message: "Streamed entry 1", + }, + { + time: "2024-01-15T10:30:01.000Z", + level: "error", + message: "Streamed entry 2", + }, + ]); + + const result = await t.run("logs", "--tail", "--function", "my-function"); + + t.expectResult(result).toSucceed(); + t.expectResult(result).toContain("Streamed entry 1"); + t.expectResult(result).toContain("Streamed entry 2"); + t.expectResult(result).toContain("Stream ended"); + }); + + it("streams logs with --tail --json and includes source field", async () => { + await t.givenLoggedInWithProject(fixture("basic")); + t.api.mockFunctionLogsStream("my-function", [ + { + time: "2024-01-15T10:30:00.000Z", + level: "info", + message: "JSON entry", + }, + ]); + + const result = await t.run( + "logs", + "--tail", + "--json", + "--function", + "my-function", + ); + + t.expectResult(result).toSucceed(); + t.expectResult(result).toContain('"source":"my-function"'); + t.expectResult(result).toContain('"message":"JSON entry"'); + }); + + it("streams logs for multiple functions with --tail", async () => { + await t.givenLoggedInWithProject(fixture("basic")); + t.api.mockFunctionLogsStream("fn1", [ + { time: "2024-01-15T10:30:00Z", level: "info", message: "From fn1" }, + ]); + t.api.mockFunctionLogsStream("fn2", [ + { time: "2024-01-15T10:30:01Z", level: "info", message: "From fn2" }, + ]); + + const result = await t.run("logs", "--tail", "--function", "fn1,fn2"); + + t.expectResult(result).toSucceed(); + t.expectResult(result).toContain("[fn1]"); + t.expectResult(result).toContain("[fn2]"); + }); + + it("shows error when all --tail streams fail", async () => { + await t.givenLoggedInWithProject(fixture("basic")); + t.api.mockFunctionLogsStreamError("my-function", { + status: 500, + body: { error: "Server error" }, + }); + + const result = await t.run("logs", "--tail", "--function", "my-function"); + + t.expectResult(result).toFail(); + }); + + it("shows Stream ended when server closes --tail stream", async () => { + await t.givenLoggedInWithProject(fixture("basic")); + t.api.mockFunctionLogsStream("my-function", [ + { + time: "2024-01-15T10:30:00.000Z", + level: "info", + message: "Last entry", + }, + ]); + + const result = await t.run("logs", "--tail", "--function", "my-function"); + + t.expectResult(result).toSucceed(); + t.expectResult(result).toContain("Stream ended"); + }); + + it("ignores SSE heartbeat comments with --tail", async () => { + await t.givenLoggedInWithProject(fixture("basic")); + t.api.mockRoute( + "GET", + `/api/apps/test-app-id/functions-mgmt/my-function/logs/stream`, + (_req, res) => { + res.writeHead(200, { + "Content-Type": "text/event-stream", + "Cache-Control": "no-cache", + Connection: "keep-alive", + }); + res.write(": heartbeat\n\n"); + res.write( + 'data: {"time":"2024-01-15T10:30:00Z","level":"info","message":"real entry"}\n\n', + ); + res.write(": heartbeat\n\n"); + res.end(); + }, + ); + + const result = await t.run("logs", "--tail", "--function", "my-function"); + + t.expectResult(result).toSucceed(); + t.expectResult(result).toContain("real entry"); + t.expectResult(result).toNotContain("heartbeat"); + }); }); diff --git a/packages/cli/tests/cli/testkit/TestAPIServer.ts b/packages/cli/tests/cli/testkit/TestAPIServer.ts index 44185bd0..7d87c14c 100644 --- a/packages/cli/tests/cli/testkit/TestAPIServer.ts +++ b/packages/cli/tests/cli/testkit/TestAPIServer.ts @@ -494,6 +494,39 @@ export class TestAPIServer { ); } + mockFunctionLogsStream( + functionName: string, + entries: FunctionLogsResponse, + ): this { + this.pendingRoutes.push({ + method: "GET", + path: `/api/apps/${this.appId}/functions-mgmt/${functionName}/logs/stream`, + handler: (_req, res) => { + res.writeHead(200, { + "Content-Type": "text/event-stream", + "Cache-Control": "no-cache", + Connection: "keep-alive", + }); + for (const entry of entries) { + res.write(`data: ${JSON.stringify(entry)}\n\n`); + } + res.end(); + }, + }); + return this; + } + + mockFunctionLogsStreamError( + functionName: string, + error: ErrorResponse, + ): this { + return this.addErrorRoute( + "GET", + `/api/apps/${this.appId}/functions-mgmt/${functionName}/logs/stream`, + error, + ); + } + // ─── SECRETS ENDPOINTS ─────────────────────────────────── mockSecretsList(response: SecretsListResponse): this { diff --git a/packages/cli/tests/core/stream-function-logs.spec.ts b/packages/cli/tests/core/stream-function-logs.spec.ts new file mode 100644 index 00000000..87292743 --- /dev/null +++ b/packages/cli/tests/core/stream-function-logs.spec.ts @@ -0,0 +1,119 @@ +import { beforeEach, describe, expect, it, vi } from "vitest"; + +function sseStream(lines: string[]): ReadableStream { + const encoder = new TextEncoder(); + const text = `${lines.join("\n")}\n`; + return new ReadableStream({ + start(controller) { + controller.enqueue(encoder.encode(text)); + controller.close(); + }, + }); +} + +function mockAppClientStream(body: ReadableStream) { + const mockGet = vi.fn().mockResolvedValue({ body }); + vi.doMock("@/core/clients/index.js", () => ({ + getAppClient: () => ({ get: mockGet }), + })); + return mockGet; +} + +describe("streamFunctionLogs", () => { + beforeEach(() => { + vi.resetModules(); + vi.doMock("@/core/project/index.js", () => ({ + getAppConfig: () => ({ id: "test-app-id" }), + })); + }); + + it("yields parsed log entries from SSE data lines", async () => { + mockAppClientStream( + sseStream([ + 'data: {"time":"2024-01-15T10:30:00Z","level":"info","message":"hello"}', + "", + 'data: {"time":"2024-01-15T10:30:01Z","level":"error","message":"fail"}', + "", + ]), + ); + const { streamFunctionLogs } = await import( + "@/core/resources/function/api.js" + ); + const entries = []; + for await (const entry of streamFunctionLogs("my-func")) { + entries.push(entry); + } + expect(entries).toHaveLength(2); + expect(entries[0].message).toBe("hello"); + expect(entries[1].message).toBe("fail"); + }); + + it("ignores SSE heartbeat comments", async () => { + mockAppClientStream( + sseStream([ + ": heartbeat", + 'data: {"time":"2024-01-15T10:30:00Z","level":"info","message":"only entry"}', + "", + ": heartbeat", + ]), + ); + const { streamFunctionLogs } = await import( + "@/core/resources/function/api.js" + ); + const entries = []; + for await (const entry of streamFunctionLogs("my-func")) { + entries.push(entry); + } + expect(entries).toHaveLength(1); + expect(entries[0].message).toBe("only entry"); + }); + + it("skips malformed JSON in data lines without throwing", async () => { + mockAppClientStream( + sseStream([ + "data: not valid json", + "", + 'data: {"time":"2024-01-15T10:30:00Z","level":"info","message":"good"}', + "", + ]), + ); + const { streamFunctionLogs } = await import( + "@/core/resources/function/api.js" + ); + const entries = []; + for await (const entry of streamFunctionLogs("my-func")) { + entries.push(entry); + } + expect(entries).toHaveLength(1); + expect(entries[0].message).toBe("good"); + }); + + it("throws ApiError when response body is null", async () => { + const mockGet = vi.fn().mockResolvedValue({ body: null }); + vi.doMock("@/core/clients/index.js", () => ({ + getAppClient: () => ({ get: mockGet }), + })); + const { streamFunctionLogs } = await import( + "@/core/resources/function/api.js" + ); + const gen = streamFunctionLogs("my-func"); + await expect(gen.next()).rejects.toThrow( + "Server returned empty response for log stream", + ); + }); + + it("passes level filter as searchParams", async () => { + const mockGet = mockAppClientStream(sseStream([])); + const { streamFunctionLogs } = await import( + "@/core/resources/function/api.js" + ); + for await (const _ of streamFunctionLogs("my-func", { + level: "error", + })) { + // empty + } + const callArgs = mockGet.mock.calls[0]; + const options = callArgs[1]; + expect(options.searchParams.get("level")).toBe("error"); + }); +});