From 2943832024db66138195962c6c36bb26702d9436 Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Thu, 13 Nov 2025 13:37:28 +0000 Subject: [PATCH] docs: realtime streams v2 --- docs/docs.json | 79 +- .../example-projects/batch-llm-evaluator.mdx | 6 +- docs/realtime/backend/overview.mdx | 6 +- docs/realtime/backend/streams.mdx | 617 ++++++++------- docs/realtime/overview.mdx | 2 +- docs/realtime/react-hooks/streams.mdx | 170 +++- docs/runs/metadata.mdx | 13 +- docs/tasks/streams.mdx | 730 ++++++++++++++++++ 8 files changed, 1314 insertions(+), 309 deletions(-) create mode 100644 docs/tasks/streams.mdx diff --git a/docs/docs.json b/docs/docs.json index aeb5650ae9..13b8b7706d 100644 --- a/docs/docs.json +++ b/docs/docs.json @@ -10,7 +10,11 @@ }, "favicon": "/images/favicon.png", "contextual": { - "options": ["copy", "view", "claude"] + "options": [ + "copy", + "view", + "claude" + ] }, "navigation": { "dropdowns": [ @@ -36,7 +40,11 @@ "pages": [ { "group": "Tasks", - "pages": ["tasks/overview", "tasks/schemaTask", "tasks/scheduled"] + "pages": [ + "tasks/overview", + "tasks/schemaTask", + "tasks/scheduled" + ] }, "triggering", "runs", @@ -51,7 +59,12 @@ "errors-retrying", { "group": "Wait", - "pages": ["wait", "wait-for", "wait-until", "wait-for-token"] + "pages": [ + "wait", + "wait-for", + "wait-until", + "wait-for-token" + ] }, "queue-concurrency", "versioning", @@ -60,6 +73,7 @@ "runs/max-duration", "tags", "runs/metadata", + "tasks/streams", "run-usage", "context", "runs/priority", @@ -99,7 +113,9 @@ }, { "group": "Development", - "pages": ["cli-dev"] + "pages": [ + "cli-dev" + ] }, { "group": "Deployment", @@ -111,7 +127,10 @@ "deployment/atomic-deployment", { "group": "Deployment integrations", - "pages": ["github-integration", "vercel-integration"] + "pages": [ + "github-integration", + "vercel-integration" + ] } ] }, @@ -167,11 +186,20 @@ }, { "group": "MCP Server", - "pages": ["mcp-introduction", "mcp-tools", "mcp-agent-rules"] + "pages": [ + "mcp-introduction", + "mcp-tools", + "mcp-agent-rules" + ] }, { "group": "Using the Dashboard", - "pages": ["run-tests", "troubleshooting-alerts", "replaying", "bulk-actions"] + "pages": [ + "run-tests", + "troubleshooting-alerts", + "replaying", + "bulk-actions" + ] }, { "group": "Troubleshooting", @@ -193,19 +221,30 @@ "self-hosting/kubernetes", { "group": "Environment variables", - "pages": ["self-hosting/env/webapp", "self-hosting/env/supervisor"] + "pages": [ + "self-hosting/env/webapp", + "self-hosting/env/supervisor" + ] }, "open-source-self-hosting" ] }, - { "group": "Open source", - "pages": ["open-source-contributing", "github-repo", "changelog", "roadmap"] + "pages": [ + "open-source-contributing", + "github-repo", + "changelog", + "roadmap" + ] }, { "group": "Help", - "pages": ["community", "help-slack", "help-email"] + "pages": [ + "community", + "help-slack", + "help-email" + ] } ] }, @@ -226,7 +265,10 @@ }, { "group": "Tasks API", - "pages": ["management/tasks/trigger", "management/tasks/batch-trigger"] + "pages": [ + "management/tasks/trigger", + "management/tasks/batch-trigger" + ] }, { "group": "Runs API", @@ -272,7 +314,9 @@ "groups": [ { "group": "Introduction", - "pages": ["guides/introduction"] + "pages": [ + "guides/introduction" + ] }, { "group": "Frameworks", @@ -401,7 +445,9 @@ }, { "group": "Migration guides", - "pages": ["migration-mergent"] + "pages": [ + "migration-mergent" + ] }, { "group": "Community packages", @@ -422,7 +468,10 @@ "href": "https://trigger.dev" }, "api": { - "openapi": ["openapi.yml", "v3-openapi.yaml"], + "openapi": [ + "openapi.yml", + "v3-openapi.yaml" + ], "playground": { "display": "simple" } diff --git a/docs/guides/example-projects/batch-llm-evaluator.mdx b/docs/guides/example-projects/batch-llm-evaluator.mdx index 5a04331391..d000962c09 100644 --- a/docs/guides/example-projects/batch-llm-evaluator.mdx +++ b/docs/guides/example-projects/batch-llm-evaluator.mdx @@ -44,6 +44,10 @@ This demo is a full stack example that uses the following: - The `AnthropicEval` component: [src/components/evals/Anthropic.tsx](https://github.com/triggerdotdev/examples/blob/main/batch-llm-evaluator/src/components/evals/Anthropic.tsx) - The `XAIEval` component: [src/components/evals/XAI.tsx](https://github.com/triggerdotdev/examples/blob/main/batch-llm-evaluator/src/components/evals/XAI.tsx) - The `OpenAIEval` component: [src/components/evals/OpenAI.tsx](https://github.com/triggerdotdev/examples/blob/main/batch-llm-evaluator/src/components/evals/OpenAI.tsx) -- Each of these components then uses [useRealtimeRunWithStreams](/realtime/react-hooks/streams) to subscribe to the different LLM responses. +- Each of these components then uses [useRealtimeRunWithStreams](/realtime/react-hooks/streams#userealtimerunwithstreams) to subscribe to the different LLM responses. + + + This example uses the older `useRealtimeRunWithStreams` hook. For new projects, consider using the new [`useRealtimeStream`](/realtime/react-hooks/streams#userealtimestream-recommended) hook (SDK 4.1.0+) for a simpler API and better type safety with defined streams. + diff --git a/docs/realtime/backend/overview.mdx b/docs/realtime/backend/overview.mdx index de13fd70ec..7adb30bfa0 100644 --- a/docs/realtime/backend/overview.mdx +++ b/docs/realtime/backend/overview.mdx @@ -14,7 +14,11 @@ There are three main categories of functionality: - **[Subscribe functions](/realtime/backend/subscribe)** - Subscribe to run updates using async iterators - **[Metadata](/realtime/backend/subscribe#subscribe-to-metadata-updates-from-your-tasks)** - Update and subscribe to run metadata in real-time -- **[Streams](/realtime/backend/streams)** - Emit and consume real-time streaming data from your tasks +- **[Streams](/realtime/backend/streams)** - Read and consume real-time streaming data from your tasks + + + To learn how to emit streams from your tasks, see our [Realtime Streams](/tasks/streams) documentation. + ## Authentication diff --git a/docs/realtime/backend/streams.mdx b/docs/realtime/backend/streams.mdx index 9593578da1..11c21bd2f4 100644 --- a/docs/realtime/backend/streams.mdx +++ b/docs/realtime/backend/streams.mdx @@ -1,351 +1,414 @@ --- title: Streams sidebarTitle: Streams -description: Emit and consume real-time streaming data from your tasks +description: Read and consume real-time streaming data from your tasks in your backend --- -The Streams API allows you to stream data from your tasks to the outside world in realtime using the [metadata](/runs/metadata) system. This is particularly useful for streaming LLM outputs or any other real-time data. +The Streams API allows you to read streaming data from your Trigger.dev tasks in your backend code. This is particularly useful for consuming AI/LLM outputs, progress updates, or any other real-time data that your tasks emit. - For frontend applications using React, see our [React hooks streams - documentation](/realtime/react-hooks/streams) for consuming streams in your UI. + To learn how to emit streams from your tasks, see our [Realtime Streams](/tasks/streams) documentation. For frontend applications using React, see our [React hooks streams documentation](/realtime/react-hooks/streams). -## How streams work +## Reading streams -Streams use the metadata system to send data chunks in real-time. You register a stream with a specific key using `metadata.stream`, and then consumers can subscribe to receive those chunks as they're emitted. +### Using defined streams (Recommended) -## Basic streaming example +The recommended approach is to use [defined streams](/tasks/streams#defining-typed-streams-recommended) for full type safety: -Here's how to stream data from OpenAI in your task: +```ts +import { streams } from "@trigger.dev/sdk"; +import { aiStream } from "./trigger/streams"; + +async function consumeStream(runId: string) { + // Read from the defined stream + const stream = await aiStream.read(runId); + + let fullText = ""; + + for await (const chunk of stream) { + console.log("Received chunk:", chunk); // chunk is typed! + fullText += chunk; + } + + console.log("Final text:", fullText); +} +``` + +### Direct stream reading + +If you prefer not to use defined streams, you can read directly by specifying the stream key: ```ts -import { task, metadata } from "@trigger.dev/sdk"; -import OpenAI from "openai"; +import { streams } from "@trigger.dev/sdk"; -const openai = new OpenAI({ - apiKey: process.env.OPENAI_API_KEY, -}); +async function consumeStream(runId: string) { + // Read from a stream by key + const stream = await streams.read(runId, "ai-output"); -export type STREAMS = { - openai: OpenAI.ChatCompletionChunk; // The type of the chunk is determined by the provider -}; + for await (const chunk of stream) { + console.log("Received chunk:", chunk); + } +} +``` -export const myTask = task({ - id: "my-task", - run: async (payload: { prompt: string }) => { - const completion = await openai.chat.completions.create({ - messages: [{ role: "user", content: payload.prompt }], - model: "gpt-3.5-turbo", - stream: true, - }); +### Reading from the default stream - // Register the stream with the key "openai" - // This will "tee" the stream and send it to the metadata system - const stream = await metadata.stream("openai", completion); +Every run has a default stream, so you can omit the stream key: - let text = ""; +```ts +import { streams } from "@trigger.dev/sdk"; - // You can read the returned stream as an async iterator - for await (const chunk of stream) { - logger.log("Received chunk", { chunk }); +async function consumeDefaultStream(runId: string) { + // Read from the default stream + const stream = await streams.read(runId); - // The type of the chunk is determined by the provider - text += chunk.choices.map((choice) => choice.delta?.content).join(""); + for await (const chunk of stream) { + console.log("Received chunk:", chunk); + } +} +``` + +## Stream options + +The `read()` method accepts several options for controlling stream behavior: + +### Timeout + +Set a timeout to stop reading if no data is received within a specified time: + +```ts +import { streams } from "@trigger.dev/sdk"; +import { aiStream } from "./trigger/streams"; + +async function consumeWithTimeout(runId: string) { + const stream = await aiStream.read(runId, { + timeoutInSeconds: 120, // Wait up to 2 minutes for data + }); + + try { + for await (const chunk of stream) { + console.log("Received chunk:", chunk); + } + } catch (error) { + if (error.name === "TimeoutError") { + console.log("Stream timed out"); } + } +} +``` - return { text }; - }, -}); +### Start index + +Resume reading from a specific chunk index (useful for reconnection scenarios): + +```ts +import { streams } from "@trigger.dev/sdk"; +import { aiStream } from "./trigger/streams"; + +async function resumeStream(runId: string, lastChunkIndex: number) { + // Start reading from the chunk after the last one we received + const stream = await aiStream.read(runId, { + startIndex: lastChunkIndex + 1, + }); + + for await (const chunk of stream) { + console.log("Received chunk:", chunk); + } +} ``` -## Subscribing to streams from backend +### Abort signal -You can subscribe to the stream using the `runs.subscribeToRun` method with `.withStreams()`: +Use an `AbortSignal` to cancel stream reading: ```ts -import { runs } from "@trigger.dev/sdk"; -import type { myTask, STREAMS } from "./trigger/my-task"; - -// Somewhere in your backend -async function subscribeToStream(runId: string) { - // Use a for-await loop to subscribe to the stream - for await (const part of runs.subscribeToRun(runId).withStreams()) { - switch (part.type) { - case "run": { - console.log("Received run", part.run); - break; - } - case "openai": { - // part.chunk is of type OpenAI.ChatCompletionChunk - console.log("Received OpenAI chunk", part.chunk); - break; +import { streams } from "@trigger.dev/sdk"; +import { aiStream } from "./trigger/streams"; + +async function consumeWithCancellation(runId: string) { + const controller = new AbortController(); + + // Cancel after 30 seconds + setTimeout(() => controller.abort(), 30000); + + const stream = await aiStream.read(runId, { + signal: controller.signal, + }); + + try { + for await (const chunk of stream) { + console.log("Received chunk:", chunk); + + // Optionally abort based on content + if (chunk.includes("STOP")) { + controller.abort(); } } + } catch (error) { + if (error.name === "AbortError") { + console.log("Stream was cancelled"); + } } } ``` -## Multiple streams +### Combining options -You can register and subscribe to multiple streams in the same task: +You can combine multiple options: ```ts -import { task, metadata } from "@trigger.dev/sdk"; -import OpenAI from "openai"; - -const openai = new OpenAI({ - apiKey: process.env.OPENAI_API_KEY, -}); - -export type STREAMS = { - openai: OpenAI.ChatCompletionChunk; - fetch: string; // The response body will be an array of strings -}; - -export const myTask = task({ - id: "my-task", - run: async (payload: { prompt: string }) => { - const completion = await openai.chat.completions.create({ - messages: [{ role: "user", content: payload.prompt }], - model: "gpt-3.5-turbo", - stream: true, - }); - - // Register the OpenAI stream - await metadata.stream("openai", completion); - - const response = await fetch("https://jsonplaceholder.typicode.com/posts"); - - if (!response.body) { - return; +import { streams } from "@trigger.dev/sdk"; +import { aiStream } from "./trigger/streams"; + +async function advancedStreamConsumption(runId: string) { + const controller = new AbortController(); + + const stream = await aiStream.read(runId, { + timeoutInSeconds: 300, // 5 minute timeout + startIndex: 0, // Start from the beginning + signal: controller.signal, // Allow cancellation + }); + + try { + for await (const chunk of stream) { + console.log("Received chunk:", chunk); + } + } catch (error) { + if (error.name === "AbortError") { + console.log("Stream was cancelled"); + } else if (error.name === "TimeoutError") { + console.log("Stream timed out"); + } else { + console.error("Stream error:", error); } + } +} +``` + +## Practical examples - // Register a fetch response stream - // Pipe the response.body through a TextDecoderStream to convert it to a string - await metadata.stream("fetch", response.body.pipeThrough(new TextDecoderStream())); - }, -}); +### Reading AI streaming responses + +Here's a complete example of consuming an AI stream from your backend: + +```ts +import { streams } from "@trigger.dev/sdk"; +import { aiStream } from "./trigger/streams"; + +async function consumeAIStream(runId: string) { + const stream = await aiStream.read(runId, { + timeoutInSeconds: 300, // AI responses can take time + }); + + let fullResponse = ""; + const chunks: string[] = []; + + for await (const chunk of stream) { + chunks.push(chunk); + fullResponse += chunk; + + // Process each chunk as it arrives + console.log("Chunk received:", chunk); + + // Could send to websocket, SSE, etc. + // await sendToClient(chunk); + } + + console.log("Stream complete!"); + console.log("Total chunks:", chunks.length); + console.log("Full response:", fullResponse); + + return { fullResponse, chunks }; +} ``` - - You may notice above that we aren't consuming either of the streams in the task. In the - background, we'll wait until all streams are consumed before the task is considered complete (with - a max timeout of 60 seconds). If you have a longer running stream, make sure to consume it in the - task. - +### Reading multiple streams -Then subscribe to both streams: +If a task emits multiple streams, you can read them concurrently or sequentially: ```ts -import { runs } from "@trigger.dev/sdk"; -import type { myTask, STREAMS } from "./trigger/my-task"; - -// Somewhere in your backend -async function subscribeToStream(runId: string) { - for await (const part of runs.subscribeToRun(runId).withStreams()) { - switch (part.type) { - case "run": { - console.log("Received run", part.run); - break; - } - case "openai": { - // part.chunk is of type OpenAI.ChatCompletionChunk - console.log("Received OpenAI chunk", part.chunk); - break; - } - case "fetch": { - // part.chunk is a string - console.log("Received fetch chunk", part.chunk); - break; - } - } +import { streams } from "@trigger.dev/sdk"; +import { aiStream, progressStream } from "./trigger/streams"; + +async function consumeMultipleStreams(runId: string) { + // Read streams concurrently + const [aiData, progressData] = await Promise.all([ + consumeStream(aiStream, runId), + consumeStream(progressStream, runId), + ]); + + return { aiData, progressData }; +} + +async function consumeStream( + streamDef: { read: (runId: string) => Promise> }, + runId: string +): Promise { + const stream = await streamDef.read(runId); + const chunks: T[] = []; + + for await (const chunk of stream) { + chunks.push(chunk); } + + return chunks; } ``` -## Using with the AI SDK +### Piping streams to HTTP responses -The [AI SDK](https://sdk.vercel.ai/docs/introduction) provides a higher-level API for working with AI models. You can use it with the Streams API: +You can pipe streams directly to HTTP responses for server-sent events (SSE): ```ts -import { openai } from "@ai-sdk/openai"; -import { logger, metadata, runs, schemaTask } from "@trigger.dev/sdk"; -import { streamText } from "ai"; -import { z } from "zod"; - -export type STREAMS = { - openai: string; -}; - -export const aiStreaming = schemaTask({ - id: "ai-streaming", - description: "Stream data from the AI sdk", - schema: z.object({ - model: z.string().default("o1-preview"), - prompt: z.string().default("Hello, how are you?"), - }), - run: async ({ model, prompt }) => { - logger.info("Running OpenAI model", { model, prompt }); - - const result = streamText({ - model: openai(model), - prompt, - }); - - // pass the textStream to the metadata system - const stream = await metadata.stream("openai", result.textStream); - - let text = ""; +import { streams } from "@trigger.dev/sdk"; +import { aiStream } from "./trigger/streams"; +import type { NextRequest } from "next/server"; - for await (const chunk of stream) { - logger.log("Received chunk", { chunk }); +export async function GET(request: NextRequest) { + const runId = request.nextUrl.searchParams.get("runId"); - text += chunk; // chunk is a string - } + if (!runId) { + return new Response("Missing runId", { status: 400 }); + } - return { text }; - }, -}); + const stream = await aiStream.read(runId, { + timeoutInSeconds: 300, + }); + + // Create a readable stream for SSE + const encoder = new TextEncoder(); + const readableStream = new ReadableStream({ + async start(controller) { + try { + for await (const chunk of stream) { + // Format as SSE + const data = `data: ${JSON.stringify({ chunk })}\n\n`; + controller.enqueue(encoder.encode(data)); + } + controller.close(); + } catch (error) { + controller.error(error); + } + }, + }); + + return new Response(readableStream, { + headers: { + "Content-Type": "text/event-stream", + "Cache-Control": "no-cache", + "Connection": "keep-alive", + }, + }); +} ``` -## Using AI SDK with tools +### Implementing retry logic -When using tools with the AI SDK, you can access tool calls and results using the `fullStream`: +Handle transient errors with retry logic: ```ts -import { openai } from "@ai-sdk/openai"; -import { logger, metadata, runs, schemaTask } from "@trigger.dev/sdk"; -import { streamText, tool, type TextStreamPart } from "ai"; -import { z } from "zod"; - -const tools = { - getWeather: tool({ - description: "Get the weather in a location", - parameters: z.object({ - location: z.string().describe("The location to get the weather for"), - }), - execute: async ({ location }) => ({ - location, - temperature: 72 + Math.floor(Math.random() * 21) - 10, - }), - }), -}; - -export type STREAMS = { - // Give the stream a type of TextStreamPart along with the tools - openai: TextStreamPart<{ getWeather: typeof tools.getWeather }>; -}; - -export const aiStreamingWithTools = schemaTask({ - id: "ai-streaming-with-tools", - description: "Stream data from the AI SDK and use tools", - schema: z.object({ - model: z.string().default("gpt-4o-mini"), - prompt: z - .string() - .default( - "Based on the temperature, will I need to wear extra clothes today in San Francisco? Please be detailed." - ), - }), - run: async ({ model, prompt }) => { - logger.info("Running OpenAI model", { model, prompt }); - - const result = streamText({ - model: openai(model), - prompt, - tools, // Pass in the tools to use - maxSteps: 5, // Allow streamText to repeatedly call the model - }); - - // pass the fullStream to the metadata system - const stream = await metadata.stream("openai", result.fullStream); - - let text = ""; +import { streams } from "@trigger.dev/sdk"; +import { aiStream } from "./trigger/streams"; + +async function consumeStreamWithRetry( + runId: string, + maxRetries = 3 +): Promise { + let lastChunkIndex = 0; + const allChunks: string[] = []; + let attempt = 0; + + while (attempt < maxRetries) { + try { + const stream = await aiStream.read(runId, { + startIndex: lastChunkIndex, + timeoutInSeconds: 120, + }); + + for await (const chunk of stream) { + allChunks.push(chunk); + lastChunkIndex++; + } - for await (const chunk of stream) { - logger.log("Received chunk", { chunk }); + // Success! Break out of retry loop + break; + } catch (error) { + attempt++; - // chunk is a TextStreamPart - if (chunk.type === "text-delta") { - text += chunk.textDelta; + if (attempt >= maxRetries) { + throw new Error(`Failed after ${maxRetries} attempts: ${error.message}`); } + + console.log(`Retry attempt ${attempt} after error:`, error.message); + + // Wait before retrying (exponential backoff) + await new Promise((resolve) => setTimeout(resolve, 1000 * Math.pow(2, attempt))); } + } - return { text }; - }, -}); + return allChunks; +} ``` -## Using toolTask +### Processing streams in chunks -You can define a Trigger.dev task that can be used as a tool, and will automatically be invoked with `triggerAndWait` when the tool is called: +Process streams in batches for efficiency: ```ts -import { openai } from "@ai-sdk/openai"; -import { logger, metadata, runs, schemaTask, toolTask } from "@trigger.dev/sdk"; -import { streamText, tool, type TextStreamPart } from "ai"; -import { z } from "zod"; - -export const getWeather = toolTask({ - id: "get-weather", - description: "Get the weather for a location", - // Define the parameters for the tool, which becomes the task payload - parameters: z.object({ - location: z.string(), - }), - run: async ({ location }) => { - // return mock data - return { - location, - temperature: 72 + Math.floor(Math.random() * 21) - 10, - }; - }, -}); - -export type STREAMS = { - // Give the stream a type of TextStreamPart along with the tools - openai: TextStreamPart<{ getWeather: typeof getWeather.tool }>; -}; - -export const aiStreamingWithTools = schemaTask({ - id: "ai-streaming-with-tools", - description: "Stream data from the AI SDK and use tools", - schema: z.object({ - model: z.string().default("gpt-4o-mini"), - prompt: z - .string() - .default( - "Based on the temperature, will I need to wear extra clothes today in San Francisco? Please be detailed." - ), - }), - run: async ({ model, prompt }) => { - logger.info("Running OpenAI model", { model, prompt }); - - const result = streamText({ - model: openai(model), - prompt, - tools: { - getWeather: getWeather.tool, // pass weatherTask.tool as a tool - }, - maxSteps: 5, // Allow streamText to repeatedly call the model - }); - - // pass the fullStream to the metadata system - const stream = await metadata.stream("openai", result.fullStream); - - let text = ""; +import { streams } from "@trigger.dev/sdk"; +import { aiStream } from "./trigger/streams"; - for await (const chunk of stream) { - logger.log("Received chunk", { chunk }); +async function processStreamInBatches(runId: string, batchSize = 10) { + const stream = await aiStream.read(runId); - // chunk is a TextStreamPart - if (chunk.type === "text-delta") { - text += chunk.textDelta; - } + let batch: string[] = []; + + for await (const chunk of stream) { + batch.push(chunk); + + if (batch.length >= batchSize) { + // Process the batch + await processBatch(batch); + batch = []; } + } + + // Process remaining chunks + if (batch.length > 0) { + await processBatch(batch); + } +} + +async function processBatch(chunks: string[]) { + console.log(`Processing batch of ${chunks.length} chunks`); + // Do something with the batch + // e.g., save to database, send to queue, etc. +} +``` + +## Using with `runs.subscribeToRun()` + +For more advanced use cases where you need both the run status and streams, you can use the `runs.subscribeToRun()` method with `.withStreams()`: - return { text }; - }, -}); +```ts +import { runs } from "@trigger.dev/sdk"; +import type { myTask } from "./trigger/myTask"; + +async function subscribeToRunAndStreams(runId: string) { + for await (const update of runs.subscribeToRun(runId).withStreams()) { + switch (update.type) { + case "run": + console.log("Run update:", update.run.status); + break; + case "default": + console.log("Stream chunk:", update.chunk); + break; + } + } +} ``` + + + For most use cases, we recommend using `streams.read()` with defined streams for better type safety and clearer code. Use `runs.subscribeToRun().withStreams()` only when you need to track both run status and stream data simultaneously. + diff --git a/docs/realtime/overview.mdx b/docs/realtime/overview.mdx index 998a2e6295..8819c4003f 100644 --- a/docs/realtime/overview.mdx +++ b/docs/realtime/overview.mdx @@ -18,7 +18,7 @@ Once subscribed, you'll receive the complete [run object](/realtime/run-object) - **Status changes** - queued → executing → completed, etc. - **Metadata updates** - Custom progress tracking, status updates, user data, etc. ([React hooks](/realtime/react-hooks/subscribe#using-metadata-to-show-progress-in-your-ui) | [backend](/realtime/backend/subscribe#subscribe-to-metadata-updates-from-your-tasks)) - **Tag changes** - When [tags](/tags) are added or removed from the run -- **Stream data** - Real-time data chunks from your tasks, perfect for AI/LLM streaming ([React hooks](/realtime/react-hooks/streams) | [backend](/realtime/backend/streams)) +- **Realtime Streams** - Stream real-time data from your tasks, perfect for AI/LLM outputs and progress updates. ([Learn more](/tasks/streams) | [React hooks](/realtime/react-hooks/streams) | [Backend](/realtime/backend/streams)) ## Using Realtime in your applications diff --git a/docs/realtime/react-hooks/streams.mdx b/docs/realtime/react-hooks/streams.mdx index aaf153bf3d..670941e899 100644 --- a/docs/realtime/react-hooks/streams.mdx +++ b/docs/realtime/react-hooks/streams.mdx @@ -7,13 +7,161 @@ description: Subscribe to real-time streams from your tasks in React components These hooks allow you to consume real-time streams from your tasks. Streams are useful for displaying AI/LLM outputs as they're generated, or any other real-time data from your tasks. - To learn how to emit streams from your tasks, see our [backend streams - documentation](/realtime/backend/streams). + To learn how to emit streams from your tasks, see our [Realtime Streams](/tasks/streams) + documentation. +## useRealtimeStream (Recommended) + + + Available in SDK version **4.1.0 or later**. This is the recommended way to consume streams in + your React components. + + +The `useRealtimeStream` hook allows you to subscribe to a specific stream by its run ID and stream key. This hook is designed to work seamlessly with [defined streams](/tasks/streams#defining-typed-streams-recommended) for full type safety. + +### Basic Usage + +```tsx +"use client"; + +import { useRealtimeStream } from "@trigger.dev/react-hooks"; + +export function StreamViewer({ + runId, + publicAccessToken, +}: { + runId: string; + publicAccessToken: string; +}) { + const { parts, error } = useRealtimeStream(runId, "ai-output", { + accessToken: publicAccessToken, + }); + + if (error) return
Error: {error.message}
; + if (!parts) return
Loading...
; + + return ( +
+ {parts.map((part, i) => ( + {part} + ))} +
+ ); +} +``` + +### With Defined Streams + +The recommended approach is to use defined streams for full type safety: + +```tsx +"use client"; + +import { useRealtimeStream } from "@trigger.dev/react-hooks"; +import { aiStream } from "@/app/streams"; + +export function StreamViewer({ + runId, + publicAccessToken, +}: { + runId: string; + publicAccessToken: string; +}) { + // Pass the defined stream directly - full type safety! + const { parts, error } = useRealtimeStream(aiStream, runId, { + accessToken: publicAccessToken, + timeoutInSeconds: 600, + onData: (chunk) => { + console.log("New chunk:", chunk); // chunk is typed! + }, + }); + + if (error) return
Error: {error.message}
; + if (!parts) return
Loading...
; + + return ( +
+ {parts.map((part, i) => ( + {part} + ))} +
+ ); +} +``` + +### Streaming AI Responses + +Here's a complete example showing how to display streaming AI responses: + +```tsx +"use client"; + +import { useRealtimeStream } from "@trigger.dev/react-hooks"; +import { aiStream } from "@/trigger/streams"; +import { Streamdown } from "streamdown"; + +export function AIStreamViewer({ + runId, + publicAccessToken, +}: { + runId: string; + publicAccessToken: string; +}) { + const { parts, error } = useRealtimeStream(aiStream, runId, { + accessToken: publicAccessToken, + timeoutInSeconds: 300, + }); + + if (error) return
Error: {error.message}
; + if (!parts) return
Loading stream...
; + + const text = parts.join(""); + + return ( +
+ {text} +
+ ); +} +``` + +### Options + +The `useRealtimeStream` hook accepts the following options: + +```tsx +const { parts, error } = useRealtimeStream(streamOrRunId, streamKeyOrOptions, { + accessToken: "pk_...", // Required: Public access token + baseURL: "https://api.trigger.dev", // Optional: Custom API URL + timeoutInSeconds: 60, // Optional: Timeout (default: 60) + startIndex: 0, // Optional: Start from specific chunk + throttleInMs: 16, // Optional: Throttle updates (default: 16ms) + onData: (chunk) => {}, // Optional: Callback for each chunk +}); +``` + +### Using Default Stream + +You can omit the stream key to use the default stream: + +```tsx +const { parts, error } = useRealtimeStream(runId, { + accessToken: publicAccessToken, +}); +``` + +For more information on defining and using streams, see the [Realtime Streams v2](/tasks/streams) documentation. + ## useRealtimeRunWithStreams -The `useRealtimeRunWithStreams` hook allows you to subscribe to a run by its ID and also receive any streams that are emitted by the task. + + For new projects, we recommend using `useRealtimeStream` instead (available in SDK 4.1.0+). This + hook is still supported for backward compatibility and use cases where you need to subscribe to + both the run and all its streams at once. + + +The `useRealtimeRunWithStreams` hook allows you to subscribe to a run by its ID and also receive any streams that are emitted by the task. This is useful when you need to access both the run metadata and multiple streams simultaneously. ```tsx "use client"; // This is needed for Next.js App Router or other RSC frameworks @@ -115,9 +263,9 @@ export function MyComponent({ } ``` -## Streaming AI responses +### Streaming AI responses with useRealtimeRunWithStreams -Here's a more complete example showing how to display streaming OpenAI responses: +Here's an example showing how to display streaming OpenAI responses using `useRealtimeRunWithStreams`: ```tsx import { useRealtimeRunWithStreams } from "@trigger.dev/react-hooks"; @@ -143,9 +291,9 @@ function MyComponent({ runId, publicAccessToken }: { runId: string; publicAccess } ``` -## AI SDK with tools +### AI SDK with tools -When using the AI SDK with tools, you can access tool calls and results: +When using the AI SDK with tools with `useRealtimeRunWithStreams`, you can access tool calls and results: ```tsx import { useRealtimeRunWithStreams } from "@trigger.dev/react-hooks"; @@ -186,11 +334,9 @@ function MyComponent({ runId, publicAccessToken }: { runId: string; publicAccess } ``` -## Common options +### Throttling updates -### experimental_throttleInMs - -The `*withStreams` variants of the Realtime hooks accept an `experimental_throttleInMs` option to throttle the updates from the server. This can be useful if you are getting too many updates and want to reduce the number of updates. +The `useRealtimeRunWithStreams` hook accepts an `experimental_throttleInMs` option to throttle the updates from the server. This can be useful if you are getting too many updates and want to reduce the number of updates. ```tsx import { useRealtimeRunWithStreams } from "@trigger.dev/react-hooks"; @@ -219,3 +365,5 @@ export function MyComponent({ ``` All other options (accessToken, baseURL, enabled, id) work the same as the other realtime hooks. + +For the newer `useRealtimeStream` hook, use the `throttleInMs` option instead (see [options above](#options)). diff --git a/docs/runs/metadata.mdx b/docs/runs/metadata.mdx index 2b6f6955a7..d0dff36758 100644 --- a/docs/runs/metadata.mdx +++ b/docs/runs/metadata.mdx @@ -264,7 +264,14 @@ export const myTask = task({ ### stream -Capture a stream of values and make the stream available when using Realtime. See our [Realtime streams](/realtime/react-hooks/streams) documentation for more information. + + As of SDK version **4.1.0**, `metadata.stream()` has been replaced by [Realtime Streams + v2](/tasks/streams). We recommend using the new `streams.pipe()` API for better reliability, + unlimited stream length, and improved developer experience. The examples below are provided for + backward compatibility. + + +Capture a stream of values and make the stream available when using Realtime. See our [Realtime Streams v2](/tasks/streams) documentation for the recommended approach. ```ts import { task, metadata } from "@trigger.dev/sdk"; @@ -435,14 +442,14 @@ metadata.parent.append("logs", "Step 1 complete"); metadata.parent.remove("logs", "Step 1 complete"); metadata.parent.increment("progress", 0.4); metadata.parent.decrement("otherProgress", 0.1); -metadata.parent.stream("llm", readableStream); +metadata.parent.stream("llm", readableStream); // Use streams.pipe() instead (v4.1+) metadata.root.set("progress", 0.5); metadata.root.append("logs", "Step 1 complete"); metadata.root.remove("logs", "Step 1 complete"); metadata.root.increment("progress", 0.4); metadata.root.decrement("otherProgress", 0.1); -metadata.root.stream("llm", readableStream); +metadata.root.stream("llm", readableStream); // Use streams.pipe() instead (v4.1+) ``` You can also chain the update methods together: diff --git a/docs/tasks/streams.mdx b/docs/tasks/streams.mdx new file mode 100644 index 0000000000..2d494977a3 --- /dev/null +++ b/docs/tasks/streams.mdx @@ -0,0 +1,730 @@ +--- +title: "Realtime Streams" +sidebarTitle: "Streams" +description: "Stream data in realtime from your Trigger.dev tasks to your frontend or backend applications." +--- + +Realtime Streams allow you to pipe streaming data from your Trigger.dev tasks to your frontend or backend applications in real-time. This is perfect for use cases like streaming AI completions, progress updates, or any continuous data flow. + + + Streams v2 requires SDK version **4.1.0 or later**. Make sure to upgrade your `@trigger.dev/sdk` + and `@trigger.dev/react-hooks` packages to use these features. If you're on an earlier version, + see the [metadata.stream()](/runs/metadata#stream) documentation. + + +## Overview + +Streams v2 is a major upgrade that provides: + +- **Unlimited stream length** (previously capped at 2000 chunks) +- **Unlimited active streams per run** (previously 5) +- **Improved reliability** with automatic resumption on connection loss +- **28-day stream retention** (previously 1 day) +- **Multiple client streams** can pipe to a single stream +- **Enhanced dashboard visibility** for viewing stream data in real-time + +## Enabling Streams v2 + +Streams v2 is **automatically enabled** when triggering runs from the SDK using 4.1.0 or later. If you aren't triggering via the SDK, you'll need to explicitly enable v2 streams via setting the `x-trigger-realtime-streams-version=v2` header when triggering the task. + +If you'd like to **opt-out** of the v2 streams, you can see so in one of the following two ways: + +### Option 1: Configure the SDK + +```ts +import { auth } from "@trigger.dev/sdk"; + +auth.configure({ + future: { + v2RealtimeStreams: false, + }, +}); +``` + +### Option 2: Environment Variable + +Set the `TRIGGER_V2_REALTIME_STREAMS=0` environment variable in your backend code (where you trigger tasks). + +## Limits Comparison + +| Limit | Streams v1 | Streams v2 | +| -------------------------------- | ---------- | ---------- | +| Maximum stream length | 2000 | Unlimited | +| Number of active streams per run | 5 | Unlimited | +| Maximum streams per run | 10 | Unlimited | +| Maximum stream TTL | 1 day | 28 days | +| Maximum stream size | 10MB | 300 MiB | + +## Quick Start + +The recommended workflow for using Realtime Streams v2: + +1. **Define your streams** in a shared location using `streams.define()` +2. **Use the defined stream** in your tasks with `.pipe()`, `.append()`, or `.writer()` +3. **Read from the stream** using `.read()` or the `useRealtimeStream` hook in React + +This approach gives you full type safety, better code organization, and easier maintenance as your application grows. + +## Defining Typed Streams (Recommended) + +The recommended way to work with streams is to define them once with `streams.define()`. This allows you to specify the chunk type and stream ID in one place, and then reuse that definition throughout your codebase with full type safety. + +### Creating a Defined Stream + +Define your streams in a shared location (like `app/streams.ts` or `trigger/streams.ts`): + +```ts +import { streams, InferStreamType } from "@trigger.dev/sdk"; + +// Define a stream with a specific type +export const aiStream = streams.define({ + id: "ai-output", +}); + +// Export the type for use in frontend components +export type AIStreamPart = InferStreamType; +``` + +You can define streams for any JSON-serializable type: + +```ts +import { streams, InferStreamType } from "@trigger.dev/sdk"; +import { UIMessageChunk } from "ai"; + +// Stream for AI UI message chunks +export const aiStream = streams.define({ + id: "ai", +}); + +// Stream for progress updates +export const progressStream = streams.define<{ step: string; percent: number }>({ + id: "progress", +}); + +// Stream for simple text +export const logStream = streams.define({ + id: "logs", +}); + +// Export types +export type AIStreamPart = InferStreamType; +export type ProgressStreamPart = InferStreamType; +export type LogStreamPart = InferStreamType; +``` + +### Using Defined Streams in Tasks + +Once defined, you can use all stream methods on your defined stream: + +```ts +import { task } from "@trigger.dev/sdk"; +import { aiStream } from "./streams"; + +export const streamTask = task({ + id: "stream-task", + run: async (payload: { prompt: string }) => { + // Get a stream from an AI service, database, etc. + const stream = await getAIStream(payload.prompt); + + // Pipe the stream using your defined stream + const { stream: readableStream, waitUntilComplete } = aiStream.pipe(stream); + + // Option A: Iterate over the stream locally + for await (const chunk of readableStream) { + console.log("Received chunk:", chunk); + } + + // Option B: Wait for the stream to complete + await waitUntilComplete(); + + return { message: "Stream completed" }; + }, +}); +``` + +#### Reading from a Stream + +Use the defined stream's `read()` method to consume data from anywhere (frontend, backend, or another task): + +```ts +import { aiStream } from "./streams"; + +const stream = await aiStream.read(runId); + +for await (const chunk of stream) { + console.log(chunk); // chunk is typed as the stream's chunk type +} +``` + +With options: + +```ts +const stream = await aiStream.read(runId, { + timeoutInSeconds: 60, // Stop if no data for 60 seconds + startIndex: 10, // Start from the 10th chunk +}); +``` + +#### Appending to a Stream + +Use the defined stream's `append()` method to add a single chunk: + +```ts +import { task } from "@trigger.dev/sdk"; +import { aiStream, progressStream, logStream } from "./streams"; + +export const appendTask = task({ + id: "append-task", + run: async (payload) => { + // Append to different streams with full type safety + await logStream.append("Processing started"); + await progressStream.append({ step: "Initialization", percent: 0 }); + + // Do some work... + + await progressStream.append({ step: "Processing", percent: 50 }); + await logStream.append("Step 1 complete"); + + // Do more work... + + await progressStream.append({ step: "Complete", percent: 100 }); + await logStream.append("All steps complete"); + }, +}); +``` + +#### Writing Multiple Chunks + +Use the defined stream's `writer()` method for more complex stream writing: + +```ts +import { task } from "@trigger.dev/sdk"; +import { logStream } from "./streams"; + +export const writerTask = task({ + id: "writer-task", + run: async (payload) => { + const { waitUntilComplete } = logStream.writer({ + execute: ({ write, merge }) => { + // Write individual chunks + write("Chunk 1"); + write("Chunk 2"); + + // Merge another stream + const additionalStream = ReadableStream.from(["Chunk 3", "Chunk 4", "Chunk 5"]); + merge(additionalStream); + }, + }); + + await waitUntilComplete(); + }, +}); +``` + +### Using Defined Streams in React + +Defined streams work seamlessly with the `useRealtimeStream` hook: + +```tsx +"use client"; + +import { useRealtimeStream } from "@trigger.dev/react-hooks"; +import { aiStream } from "@/app/streams"; + +export function StreamViewer({ accessToken, runId }: { accessToken: string; runId: string }) { + // Pass the defined stream directly - full type safety! + const { parts, error } = useRealtimeStream(aiStream, runId, { + accessToken, + timeoutInSeconds: 600, + }); + + if (error) return
Error: {error.message}
; + if (!parts) return
Loading...
; + + return ( +
+ {parts.map((part, i) => ( + {part} + ))} +
+ ); +} +``` + +## Direct Stream Methods (Without Defining) + + + We strongly recommend using `streams.define()` instead of direct methods. Defined streams provide + better organization, full type safety, and make it easier to maintain your codebase as it grows. + + +If you have a specific reason to avoid defined streams, you can use stream methods directly by specifying the stream key each time. + +### Direct Piping + +```ts +import { streams, task } from "@trigger.dev/sdk"; + +export const directStreamTask = task({ + id: "direct-stream", + run: async (payload: { prompt: string }) => { + const stream = await getAIStream(payload.prompt); + + // Specify the stream key directly + const { stream: readableStream, waitUntilComplete } = streams.pipe("ai-output", stream); + + await waitUntilComplete(); + }, +}); +``` + +### Direct Reading + +```ts +import { streams } from "@trigger.dev/sdk"; + +// Specify the stream key when reading +const stream = await streams.read(runId, "ai-output"); + +for await (const chunk of stream) { + console.log(chunk); +} +``` + +### Direct Appending + +```ts +import { streams, task } from "@trigger.dev/sdk"; + +export const directAppendTask = task({ + id: "direct-append", + run: async (payload) => { + // Specify the stream key each time + await streams.append("logs", "Processing started"); + await streams.append("progress", "50%"); + await streams.append("logs", "Complete"); + }, +}); +``` + +### Direct Writing + +```ts +import { streams, task } from "@trigger.dev/sdk"; + +export const directWriterTask = task({ + id: "direct-writer", + run: async (payload) => { + const { waitUntilComplete } = streams.writer("output", { + execute: ({ write, merge }) => { + write("Chunk 1"); + write("Chunk 2"); + }, + }); + + await waitUntilComplete(); + }, +}); +``` + +## Default Stream + +Every run has a "default" stream, allowing you to skip the stream key entirely. This is useful for simple cases where you only need one stream per run. + +Using direct methods: + +```ts +import { streams, task } from "@trigger.dev/sdk"; + +export const defaultStreamTask = task({ + id: "default-stream", + run: async (payload) => { + const stream = getDataStream(); + + // No stream key needed - uses "default" + const { waitUntilComplete } = streams.pipe(stream); + + await waitUntilComplete(); + }, +}); + +// Reading from the default stream +const readStream = await streams.read(runId); +``` + +## Targeting Different Runs + +You can pipe streams to parent, root, or any other run using the `target` option. This works with both defined streams and direct methods. + +### With Defined Streams + +```ts +import { task } from "@trigger.dev/sdk"; +import { logStream } from "./streams"; + +export const childTask = task({ + id: "child-task", + run: async (payload, { ctx }) => { + const stream = getDataStream(); + + // Pipe to parent run + logStream.pipe(stream, { target: "parent" }); + + // Pipe to root run + logStream.pipe(stream, { target: "root" }); + + // Pipe to self (default behavior) + logStream.pipe(stream, { target: "self" }); + + // Pipe to a specific run ID + logStream.pipe(stream, { target: payload.otherRunId }); + }, +}); +``` + +### With Direct Methods + +```ts +import { streams, task } from "@trigger.dev/sdk"; + +export const childTask = task({ + id: "child-task", + run: async (payload, { ctx }) => { + const stream = getDataStream(); + + // Pipe to parent run + streams.pipe("output", stream, { target: "parent" }); + + // Pipe to root run + streams.pipe("output", stream, { target: "root" }); + + // Pipe to a specific run ID + streams.pipe("output", stream, { target: payload.otherRunId }); + }, +}); +``` + +## Streaming from Outside a Task + +If you specify a `target` run ID, you can pipe streams from anywhere (like a Next.js API route): + +```ts +import { streams } from "@trigger.dev/sdk"; +import { openai } from "@ai-sdk/openai"; +import { streamText } from "ai"; + +export async function POST(req: Request) { + const { messages, runId } = await req.json(); + + const result = streamText({ + model: openai("gpt-4o"), + messages, + }); + + // Pipe AI stream to a Trigger.dev run + const { stream } = streams.pipe("ai-stream", result.toUIMessageStream(), { + target: runId, + }); + + return new Response(stream as any, { + headers: { "Content-Type": "text/event-stream" }, + }); +} +``` + +## React Hook + +Use the `useRealtimeStream` hook to subscribe to streams in your React components. + +### With Defined Streams (Recommended) + +```tsx +"use client"; + +import { useRealtimeStream } from "@trigger.dev/react-hooks"; +import { aiStream } from "@/app/streams"; + +export function StreamViewer({ accessToken, runId }: { accessToken: string; runId: string }) { + // Pass the defined stream directly for full type safety + const { parts, error } = useRealtimeStream(aiStream, runId, { + accessToken, + timeoutInSeconds: 600, + onData: (chunk) => { + console.log("New chunk:", chunk); // chunk is typed! + }, + }); + + if (error) return
Error: {error.message}
; + if (!parts) return
Loading...
; + + return ( +
+ {parts.map((part, i) => ( + {part} + ))} +
+ ); +} +``` + +### With Direct Stream Keys + +If you prefer not to use defined streams, you can specify the stream key directly: + +```tsx +"use client"; + +import { useRealtimeStream } from "@trigger.dev/react-hooks"; + +export function StreamViewer({ accessToken, runId }: { accessToken: string; runId: string }) { + const { parts, error } = useRealtimeStream(runId, "ai-output", { + accessToken, + timeoutInSeconds: 600, + }); + + if (error) return
Error: {error.message}
; + if (!parts) return
Loading...
; + + return ( +
+ {parts.map((part, i) => ( + {part} + ))} +
+ ); +} +``` + +### Using Default Stream + +```tsx +// Omit stream key to use the default stream +const { parts, error } = useRealtimeStream(runId, { + accessToken, +}); +``` + +### Hook Options + +```tsx +const { parts, error } = useRealtimeStream(streamDef, runId, { + accessToken: "pk_...", // Required: Public access token + baseURL: "https://api.trigger.dev", // Optional: Custom API URL + timeoutInSeconds: 60, // Optional: Timeout (default: 60) + startIndex: 0, // Optional: Start from specific chunk + throttleInMs: 16, // Optional: Throttle updates (default: 16ms) + onData: (chunk) => {}, // Optional: Callback for each chunk +}); +``` + +## Complete Example: AI Streaming + +### Define the stream + +```ts +// app/streams.ts +import { streams, InferStreamType } from "@trigger.dev/sdk"; +import { UIMessageChunk } from "ai"; + +export const aiStream = streams.define({ + id: "ai", +}); + +export type AIStreamPart = InferStreamType; +``` + +### Create the task + +```ts +// trigger/ai-task.ts +import { task } from "@trigger.dev/sdk"; +import { openai } from "@ai-sdk/openai"; +import { streamText } from "ai"; +import { aiStream } from "@/app/streams"; + +export const generateAI = task({ + id: "generate-ai", + run: async (payload: { prompt: string }) => { + const result = streamText({ + model: openai("gpt-4o"), + prompt: payload.prompt, + }); + + const { waitUntilComplete } = aiStream.pipe(result.toUIMessageStream()); + + await waitUntilComplete(); + + return { success: true }; + }, +}); +``` + +### Frontend component + +```tsx +// components/ai-stream.tsx +"use client"; + +import { useRealtimeStream } from "@trigger.dev/react-hooks"; +import { aiStream } from "@/app/streams"; + +export function AIStream({ accessToken, runId }: { accessToken: string; runId: string }) { + const { parts, error } = useRealtimeStream(aiStream, runId, { + accessToken, + timeoutInSeconds: 300, + }); + + if (error) return
Error: {error.message}
; + if (!parts) return
Loading...
; + + return ( +
+ {parts.map((part, i) => ( + {part} + ))} +
+ ); +} +``` + +## Migration from v1 + +If you're using the old `metadata.stream()` API, here's how to migrate to the recommended v2 approach: + +### Step 1: Define Your Streams + +Create a shared streams definition file: + +```ts +// app/streams.ts or trigger/streams.ts +import { streams, InferStreamType } from "@trigger.dev/sdk"; + +export const myStream = streams.define({ + id: "my-stream", +}); + +export type MyStreamPart = InferStreamType; +``` + +### Step 2: Update Your Tasks + +Replace `metadata.stream()` with the defined stream's `pipe()` method: + +```ts +// Before (v1) +import { metadata, task } from "@trigger.dev/sdk"; + +export const myTask = task({ + id: "my-task", + run: async (payload) => { + const stream = getDataStream(); + await metadata.stream("my-stream", stream); + }, +}); +``` + +```ts +// After (v2 - Recommended) +import { task } from "@trigger.dev/sdk"; +import { myStream } from "./streams"; + +export const myTask = task({ + id: "my-task", + run: async (payload) => { + const stream = getDataStream(); + + // Don't await - returns immediately + const { waitUntilComplete } = myStream.pipe(stream); + + // Optionally wait for completion + await waitUntilComplete(); + }, +}); +``` + +### Step 3: Update Your Frontend + +Use the defined stream with `useRealtimeStream`: + +```tsx +// Before +const { parts, error } = useRealtimeStream(runId, "my-stream", { + accessToken, +}); +``` + +```tsx +// After +import { myStream } from "@/app/streams"; + +const { parts, error } = useRealtimeStream(myStream, runId, { + accessToken, +}); +``` + +### Alternative: Direct Methods (Not Recommended) + +If you prefer not to use defined streams, you can use direct methods: + +```ts +import { streams, task } from "@trigger.dev/sdk"; + +export const myTask = task({ + id: "my-task", + run: async (payload) => { + const stream = getDataStream(); + const { waitUntilComplete } = streams.pipe("my-stream", stream); + await waitUntilComplete(); + }, +}); +``` + +## Reliability Features + +Streams v2 includes automatic reliability improvements: + +- **Automatic resumption**: If a connection is lost, both appending and reading will automatically resume from the last successful chunk +- **No data loss**: Network issues won't cause stream data to be lost +- **Idempotent operations**: Duplicate chunks are automatically handled + +These improvements happen automatically - no code changes needed. + +## Dashboard Integration + +Streams are now visible in the Trigger.dev dashboard, allowing you to: + +- View stream data in real-time as it's generated +- Inspect historical stream data for completed runs +- Debug streaming issues with full visibility into chunk delivery + +