Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pages/docs/examples/durable-endpoints.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ export const description = "Learn how to use Inngest Durable Endpoints to make a

[Durable Endpoints](/docs/learn/durable-endpoints) let you add durability to regular HTTP handlers without separate function definitions or event triggers. You wrap your existing API route with `inngest.endpoint()` and use `step.run()` inline to get automatic retries and memoization for each step.

This is useful when you want your endpoint to orchestrate multiple operations (like booking a flight, processing a payment, and sending a confirmation) and guarantee that each step completes exactly once, even if the handler crashes or restarts partway through.
This is useful when you want your endpoint to orchestrate multiple operations (like booking a flight, processing a payment, and sending a confirmation) and guarantee that each step completes exactly once, even if the handler crashes or restarts partway through. You can also [stream data to clients](/docs/learn/durable-endpoints/streaming) during execution.


## How it differs from traditional Inngest
Expand Down
10 changes: 8 additions & 2 deletions pages/docs/learn/durable-endpoints.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -290,13 +290,19 @@ fetch(`/api/your-durable-endpoint`)
| TypeScript | ✅ Beta | >= 3.x (with `endpointAdapter`) |
| Go | ✅ | >= v0.14.0 |

## Streaming <VersionBadge version="Developer Preview" />

Durable Endpoints can stream data back to clients in real-time using Server-Sent Events (SSE). Stream LLM tokens, progress updates, or any other data while keeping full durability guarantees. If a step fails and retries, streamed data from that step is automatically rolled back on the client.

Read the [full guide](/docs/learn/durable-endpoints/streaming?ref=docs-durable-endpoints) for setup, client integration, rollback semantics, and more.

## Limitations

Durable Endpoints is currently in beta. The following limitations apply:

- **Flow control is not supported** — Features like concurrency limits and rate limiting are not available for Durable Endpoints
- **POST body is not yet supported** — Prefer using query strings for passing data. POST body support is coming soon
- **Standard HTTP responses only** — Durable Endpoints should return a standard HTTP response, not an SSE stream
- **Standard HTTP responses only** — Durable Endpoints should return a standard HTTP response. Streaming responses [are supported](#streaming)

## Examples

Expand Down Expand Up @@ -330,4 +336,4 @@ The following demos are also available to check out and run locally with the Inn
- [Durable Endpoint - TypeScript SDK Reference](/docs/reference/typescript/durable-endpoints)
- [Steps Overview](/docs/learn/inngest-steps)

</LanguageSelector>
</LanguageSelector>
278 changes: 278 additions & 0 deletions pages/docs/learn/durable-endpoints/streaming.mdx
Original file line number Diff line number Diff line change
@@ -0,0 +1,278 @@
import { Callout, VersionBadge } from "src/shared/Docs/mdx";

export const description = 'Learn how to durably stream data back to clients from Durable Endpoints';

# Durable Endpoints Streaming <VersionBadge version="Developer Preview" />

Durable Endpoints can stream data back to clients in real-time using Server-Sent Events (SSE). This lets you stream AI inference tokens, progress updates, or any other data, while keeping the durability guarantees of [durable steps](/docs/learn/inngest-steps).

Streaming works across multiple steps within a single endpoint invocation, and handles the transition from sync to async mode seamlessly. If a step fails and retries, any data streamed during that step is automatically rolled back on the client.

<Callout>
Streaming SSE from Durable Endpoints is currently only available in the TypeScript SDK. This guide assumes you've already [set up a Durable Endpoint](/docs/learn/durable-endpoints).
</Callout>

## When to use streaming

- **AI inference** — Stream LLM tokens to the browser as they're generated, so users see results immediately.
- **Status updates** — Send progress messages during long-running endpoint executions.
- **Making existing streaming endpoints durable** — Wrap your existing streaming HTTP endpoints with steps to add retry and observability at no cost to functionality.

If you don't need to stream data directly to an HTTP client, consider using [Realtime](/docs/features/realtime) to push updates from background Inngest functions via pub/sub channels.

## Example

In this example, we'll create an HTTP endpoint that generates a haiku and then translates it to French. The client will be the browser, and it'll render the haiku and its translation as they're generated. The user will see the streamed LLM output appear in realtime.

### Server

Import `step` from `inngest` and `stream` from `inngest/experimental/durable-endpoints`, then use `stream.push()` or `stream.pipe()` inside your endpoint handler:

```typescript
import Anthropic from "@anthropic-ai/sdk";
import { step } from "inngest";
import { stream } from "inngest/experimental/durable-endpoints";
import { inngest } from "@/inngest";

export const GET = inngest.endpoint(async () => {
// Option A: push() with an SDK event callback
const text = await step.run("generate", async () => {
stream.push("Generating...\n");

const client = new Anthropic();
const response = client.messages.stream({
model: "claude-sonnet-4-20250514",
max_tokens: 512,
messages: [{ role: "user", content: "Write a haiku about durability." }],
});

response.on("text", (token) => stream.push(token));
return await response.finalText();
});

// Option B: pipe() — streams each chunk AND returns the collected text
await step.run("translate", async () => {
stream.push(`\nTranslating...\n`);

const client = new Anthropic();
const response = client.messages.stream({
model: "claude-sonnet-4-20250514",
max_tokens: 256,
messages: [{ role: "user", content: `Translate to French: ${text}` }],
});

return stream.pipe(async function* () {
for await (const event of response) {
if (
event.type === "content_block_delta" &&
event.delta.type === "text_delta"
) {
yield event.delta.text;
}
}
});
});

return new Response("\nDone!");
});
```

### Client

Use `fetchWithStream()` from `inngest/experimental/durable-endpoints/client` to consume the stream. It handles SSE parsing, sync-to-async redirects, and commit/rollback automatically. Chunks arrive on the client in the order they are pushed or yielded on the server.

```typescript
"use client";

import { useState, useRef } from "react";
import { fetchWithStream } from "inngest/experimental/durable-endpoints/client";

export default function Generate() {
const [chunks, setChunks] = useState<string[]>([]);
const uncommittedCountRef = useRef(0);

async function run() {
setChunks([]);
uncommittedCountRef.current = 0;

const resp = await fetchWithStream("/api/generate", {
onData: ({ data }) => {
if (typeof data === "string") {
uncommittedCountRef.current++;
setChunks((prev) => [...prev, data]);
}
},
onRollback: () => {
// A step failed and will retry — remove the chunks it produced
const count = uncommittedCountRef.current;
setChunks((prev) => prev.slice(0, prev.length - count));
uncommittedCountRef.current = 0;
},
onCommit: () => {
// Step completed — its chunks are now permanent
uncommittedCountRef.current = 0;
},
});

// The endpoint's return value is available as the Response body
const result = await resp.text();
setChunks((prev) => [...prev, result]);
}

return (
<div>
<button onClick={run}>Generate</button>
<pre>{chunks.join("")}</pre>
</div>
);
}
```

## Server API

### `stream.push(data)`

Send a single chunk of data to the client as an SSE event.

```typescript
stream.push("Loading...");
stream.push({ progress: 50, message: "Halfway there" });
```

- Accepts any JSON-serializable value.
- Fire-and-forget. Does not block execution or return a value.
- No-op outside of an Inngest execution context, so your code works the same when called outside of a durable endpoint.

`push()` is ideal for one-off status messages or streaming via provider SDK event callbacks.

### `stream.pipe(source)`

Pipe a stream source to the client and resolve with the concatenated text of all chunks. Each chunk is sent as an SSE event in real-time.

The simplest case is piping a `ReadableStream`, like a `fetch` response body:

```typescript
const response = await fetch("https://api.example.com/stream");
const text = await stream.pipe(response.body);
// `text` contains the full response; the client received it chunk by chunk
```

When you need to transform or filter chunks before they're sent, pass an async generator function. Each `yield` sends one chunk to the client:

```typescript
const text = await stream.pipe(async function* () {
for await (const event of response) {
// Only yield the parts you want the client to see
if (event.type === "content_block_delta") {
yield event.delta.text;
}
}
});
```

`pipe()` accepts three source types:

- **`ReadableStream`** — piped directly, decoded from bytes to string chunks.
- **`AsyncIterable<string>`** — each value in the iterable becomes a chunk.
- **`() => AsyncIterable<string>`** — a function that returns an async iterable. This is what lets you pass `async function*` generators directly to `pipe()`.

No-op outside of an Inngest execution context (resolves with an empty string).

For the full `stream.push()` and `stream.pipe()` API reference, see the [Streaming reference](/docs/reference/typescript/v4/durable-endpoints#streaming).

## Client API

### `fetchWithStream(url, options)`

The primary way to consume a streaming Durable Endpoint. Import it from `inngest/experimental/durable-endpoints/client`:

```typescript
import { fetchWithStream } from "inngest/experimental/durable-endpoints/client";
```

`fetchWithStream()` returns a `Promise<Response>`. `await` it to drive the stream to completion. When the endpoint finishes, the returned `Response` contains the endpoint's final return value. If the endpoint does not use streaming, `fetchWithStream()` returns the raw `Response` as-is.

The core callbacks handle the majority of streaming use cases:

- **`onData({ data, hashedStepId })`** — Called for each chunk. `data` is the deserialized value; `hashedStepId` identifies which step produced it (or `null` if streamed outside a step). Data should be considered uncommitted until `onCommit` fires.
- **`onRollback({ hashedStepId })`** — Called when a step fails and will retry. Your code is responsible for tracking and removing the chunks produced by that step (see the [example above](#client) for a pattern using a ref counter).
- **`onCommit({ hashedStepId })`** — Called when a step completes successfully. Chunks from that step are now permanent and will never be rolled back.

Because `stream.push()` accepts any JSON-serializable value, `data` in the `onData` callback is typed as `unknown`. Narrow the type in your callback as needed:

```typescript
const uncommittedCount = { current: 0 };

const resp = await fetchWithStream("/api/generate", {
onData: ({ data }) => {
if (typeof data === "string") {
uncommittedCount.current++;
console.log("Chunk:", data);
}
},
onRollback: () => {
// Discard uncommitted chunks and reset counter
uncommittedCount.current = 0;
},
onCommit: () => {
// Chunks are permanent — reset counter
uncommittedCount.current = 0;
},
});

const result = await resp.text();
```

For all available options see the [full API reference](/docs/reference/typescript/v4/durable-endpoints#client-fetchwithstream).

## How it works

### Sync-to-async transitions

When a client calls a streaming Durable Endpoint, the SSE stream flows directly from your app to the client. If the endpoint needs to go async (e.g. due to `step.sleep()`, `step.waitForEvent()`, or a retry), the SDK sends a redirect event telling the client where to reconnect, and the stream continues through the Inngest server.

`fetchWithStream()` handles this redirect automatically. The client sees a single continuous stream regardless of sync-to-async transitions.

### Streaming activation

Streaming is activated lazily. The endpoint only sends an SSE response if:

- The client sends the `Accept: text/event-stream` header (which `fetchWithStream()` does automatically), **and**
- Your code calls `stream.push()` or `stream.pipe()` during execution.

If neither `push()` nor `pipe()` is called, the endpoint behaves like a regular non-streaming Durable Endpoint.

### Rollback on retry

Each chunk is tagged with the step that produced it (via `hashedStepId`). When a step completes, `onCommit` fires and those chunks become permanent. When a step fails and retries, `onRollback` fires and your client code should discard the uncommitted chunks from that step. On the retry attempt, the step streams fresh data that replaces what was rolled back. See the [example above](#client) for an implementation pattern.

Data streamed outside of a `step.run()` is never rolled back.

### SSE event types

The stream uses SSE with the following event types. The `inngest.*` events are internal protocol events handled by `fetchWithStream()` automatically; only `inngest.stream` events contain user data.

| Event name | Payload | Purpose |
|---|---|---|
| `inngest.metadata` | `{ runId }` | Always first. Identifies the run. |
| `inngest.stream` | `{ data, hashedStepId? }` | User data from `push()` / `pipe()`. |
| `inngest.commit` | `{ hashedStepId }` | Step succeeded. Its streamed data is permanent. |
| `inngest.rollback` | `{ hashedStepId }` | Step failed. Discard its uncommitted data. |
| `inngest.redirect_info` | `{ runId, url }` | Tells the client to reconnect for async continuation. |
| `inngest.response` | `{ status, response: { body, headers, statusCode } }` | Terminal event. Closes the stream. |

## Limitations

Streaming SSE from Durable Endpoints is currently in developer preview. In addition to any [general Durable Endpoint limitations](/docs/learn/durable-endpoints#limitations), the following apply:

- **15 minute timeout** — Client connections time out after 15 minutes, meaning your endpoint should complete within this window (including any retries) to ensure the stream is delivered end-to-end.
- **No rollback outside of steps** — Data streamed outside of a `step.run()` is never rolled back. If you need rollback guarantees, stream from within a step.
- **One streaming parallel step** — You can stream from at most one parallel step. Streaming from multiple parallel steps will result in interleaved output that cannot be disambiguated by the client.
- **No streaming from child functions** — `step.invoke()` calls cannot stream data back to the parent function's client.
- **Raw `Response` objects may be lost on async transition** — If your endpoint returns a `Response` (like a file download) and goes async, the Response is lost because it can't be memoized. Use `stream.push()` or `stream.pipe()` instead.

## SDK support

| SDK | Support | Version |
|-----|---------|---------|
| TypeScript | Developer Preview | >= 4.x (with `endpointAdapter`) |
3 changes: 3 additions & 0 deletions pages/docs/reference/typescript/v4/client/create.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@ const inngest = new Inngest({
<Property name="middleware" type="array">
A stack of [middleware](/docs/features/middleware) to add to the client.
</Property>
<Property name="endpointAdapter" type="InngestEndpointAdapter.Like">
An endpoint adapter that enables [Durable Endpoints](/docs/reference/typescript/v4/durable-endpoints). When provided, `inngest.endpoint()` and `inngest.endpointProxy()` become available.
</Property>
</Properties>

<Callout>
Expand Down
Loading
Loading