Skip to content

Commit 343ba54

Browse files
authored
fix(streams): restore realtime stream writing for v3 tasks (#2675)
1 parent a70ab10 commit 343ba54

File tree

1 file changed

+66
-0
lines changed

1 file changed

+66
-0
lines changed

apps/webapp/app/routes/realtime.v1.streams.$runId.$streamId.ts

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,79 @@
1+
import { type ActionFunctionArgs } from "@remix-run/server-runtime";
12
import { z } from "zod";
23
import { $replica } from "~/db.server";
34
import { getRealtimeStreamInstance } from "~/services/realtime/v1StreamsGlobal.server";
45
import { createLoaderApiRoute } from "~/services/routeBuilders/apiBuilder.server";
6+
import { AuthenticatedEnvironment } from "~/services/apiAuth.server";
57

68
const ParamsSchema = z.object({
79
runId: z.string(),
810
streamId: z.string(),
911
});
1012

13+
// Plain action for backwards compatibility with older clients that don't send auth headers
14+
export async function action({ request, params }: ActionFunctionArgs) {
15+
const parsedParams = ParamsSchema.safeParse(params);
16+
17+
if (!parsedParams.success) {
18+
return new Response("Invalid parameters", { status: 400 });
19+
}
20+
21+
const { runId, streamId } = parsedParams.data;
22+
23+
// Look up the run without environment scoping for backwards compatibility
24+
const run = await $replica.taskRun.findFirst({
25+
where: {
26+
friendlyId: runId,
27+
},
28+
select: {
29+
id: true,
30+
friendlyId: true,
31+
runtimeEnvironment: {
32+
include: {
33+
project: true,
34+
organization: true,
35+
orgMember: true,
36+
},
37+
},
38+
},
39+
});
40+
41+
if (!run) {
42+
return new Response("Run not found", { status: 404 });
43+
}
44+
45+
// Extract client ID from header, default to "default" if not provided
46+
const clientId = request.headers.get("X-Client-Id") || "default";
47+
const streamVersion = request.headers.get("X-Stream-Version") || "v1";
48+
49+
if (!request.body) {
50+
return new Response("No body provided", { status: 400 });
51+
}
52+
53+
const resumeFromChunk = request.headers.get("X-Resume-From-Chunk");
54+
let resumeFromChunkNumber: number | undefined = undefined;
55+
if (resumeFromChunk) {
56+
const parsed = parseInt(resumeFromChunk, 10);
57+
if (isNaN(parsed) || parsed < 0) {
58+
return new Response(`Invalid X-Resume-From-Chunk header value: ${resumeFromChunk}`, {
59+
status: 400,
60+
});
61+
}
62+
resumeFromChunkNumber = parsed;
63+
}
64+
65+
// The runtimeEnvironment from the run is already in the correct shape for AuthenticatedEnvironment
66+
const realtimeStream = getRealtimeStreamInstance(run.runtimeEnvironment, streamVersion);
67+
68+
return realtimeStream.ingestData(
69+
request.body,
70+
run.friendlyId,
71+
streamId,
72+
clientId,
73+
resumeFromChunkNumber
74+
);
75+
}
76+
1177
export const loader = createLoaderApiRoute(
1278
{
1379
params: ParamsSchema,

0 commit comments

Comments
 (0)