Skip to content

Commit 6c601e0

Browse files
committed
s2 WIP
1 parent cd4d83c commit 6c601e0

File tree

33 files changed

+1318
-112
lines changed

33 files changed

+1318
-112
lines changed

apps/webapp/app/env.server.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1204,6 +1204,14 @@ const EnvironmentSchema = z
12041204
EVENT_LOOP_MONITOR_UTILIZATION_SAMPLE_RATE: z.coerce.number().default(0.05),
12051205

12061206
VERY_SLOW_QUERY_THRESHOLD_MS: z.coerce.number().int().optional(),
1207+
1208+
REALTIME_STREAMS_S2_BASIN: z.string().optional(),
1209+
REALTIME_STREAMS_S2_ACCESS_TOKEN: z.string().optional(),
1210+
REALTIME_STREAMS_S2_LOG_LEVEL: z
1211+
.enum(["log", "error", "warn", "info", "debug"])
1212+
.default("info"),
1213+
REALTIME_STREAMS_S2_FLUSH_INTERVAL_MS: z.coerce.number().int().default(100),
1214+
REALTIME_STREAMS_S2_RESUME_TTL_SECONDS: z.coerce.number().int().default(86400),
12071215
})
12081216
.and(GithubAppEnvSchema);
12091217

apps/webapp/app/routes/api.v1.tasks.$taskId.trigger.ts

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ export const HeadersSchema = z.object({
3333
"x-trigger-client": z.string().nullish(),
3434
"x-trigger-engine-version": RunEngineVersionSchema.nullish(),
3535
"x-trigger-request-idempotency-key": z.string().nullish(),
36+
"x-trigger-realtime-streams-version": z.string().nullish(),
3637
traceparent: z.string().optional(),
3738
tracestate: z.string().optional(),
3839
});
@@ -63,6 +64,7 @@ const { action, loader } = createActionApiRoute(
6364
"x-trigger-client": triggerClient,
6465
"x-trigger-engine-version": engineVersion,
6566
"x-trigger-request-idempotency-key": requestIdempotencyKey,
67+
"x-trigger-realtime-streams-version": realtimeStreamsVersion,
6668
} = headers;
6769

6870
const cachedResponse = await handleRequestIdempotency(requestIdempotencyKey, {
@@ -108,14 +110,7 @@ const { action, loader } = createActionApiRoute(
108110
options: body.options,
109111
isFromWorker,
110112
traceContext,
111-
});
112-
113-
logger.debug("[otelContext]", {
114-
taskId: params.taskId,
115-
headers,
116-
options: body.options,
117-
isFromWorker,
118-
traceContext,
113+
realtimeStreamsVersion,
119114
});
120115

121116
const idempotencyKeyExpiresAt = resolveIdempotencyKeyTTL(idempotencyKeyTTL);
@@ -131,6 +126,7 @@ const { action, loader } = createActionApiRoute(
131126
traceContext,
132127
spanParentAsLink: spanParentAsLink === 1,
133128
oneTimeUseToken,
129+
realtimeStreamsVersion: realtimeStreamsVersion ?? undefined,
134130
},
135131
engineVersion ?? undefined
136132
);

apps/webapp/app/routes/realtime.v1.streams.$runId.$streamId.ts

Lines changed: 7 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -1,52 +1,13 @@
1-
import { ActionFunctionArgs } from "@remix-run/server-runtime";
21
import { z } from "zod";
32
import { $replica } from "~/db.server";
4-
import { v1RealtimeStreams } from "~/services/realtime/v1StreamsGlobal.server";
3+
import { getRealtimeStreamInstance } from "~/services/realtime/v1StreamsGlobal.server";
54
import { createLoaderApiRoute } from "~/services/routeBuilders/apiBuilder.server";
65

76
const ParamsSchema = z.object({
87
runId: z.string(),
98
streamId: z.string(),
109
});
1110

12-
export async function action({ request, params }: ActionFunctionArgs) {
13-
const $params = ParamsSchema.parse(params);
14-
15-
// Extract client ID from header, default to "default" if not provided
16-
const clientId = request.headers.get("X-Client-Id") || "default";
17-
18-
// Handle HEAD request to get last chunk index for this client
19-
if (request.method === "HEAD") {
20-
const lastChunkIndex = await v1RealtimeStreams.getLastChunkIndex(
21-
$params.runId,
22-
$params.streamId,
23-
clientId
24-
);
25-
26-
return new Response(null, {
27-
status: 200,
28-
headers: {
29-
"X-Last-Chunk-Index": lastChunkIndex.toString(),
30-
},
31-
});
32-
}
33-
34-
if (!request.body) {
35-
return new Response("No body provided", { status: 400 });
36-
}
37-
38-
const resumeFromChunk = request.headers.get("X-Resume-From-Chunk");
39-
const resumeFromChunkNumber = resumeFromChunk ? parseInt(resumeFromChunk, 10) : undefined;
40-
41-
return v1RealtimeStreams.ingestData(
42-
request.body,
43-
$params.runId,
44-
$params.streamId,
45-
clientId,
46-
resumeFromChunkNumber
47-
);
48-
}
49-
5011
export const loader = createLoaderApiRoute(
5112
{
5213
params: ParamsSchema,
@@ -82,7 +43,12 @@ export const loader = createLoaderApiRoute(
8243
// Get Last-Event-ID header for resuming from a specific position
8344
const lastEventId = request.headers.get("Last-Event-ID") || undefined;
8445

85-
return v1RealtimeStreams.streamResponse(
46+
const realtimeStream = getRealtimeStreamInstance(
47+
authentication.environment,
48+
run.realtimeStreamsVersion
49+
);
50+
51+
return realtimeStream.streamResponse(
8652
request,
8753
run.friendlyId,
8854
params.streamId,

apps/webapp/app/routes/realtime.v1.streams.$runId.$target.$streamId.ts

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import { z } from "zod";
22
import { $replica } from "~/db.server";
3-
import { v1RealtimeStreams } from "~/services/realtime/v1StreamsGlobal.server";
3+
import { getRealtimeStreamInstance } from "~/services/realtime/v1StreamsGlobal.server";
44
import {
55
createActionApiRoute,
66
createLoaderApiRoute,
@@ -55,6 +55,7 @@ const { action } = createActionApiRoute(
5555

5656
// Extract client ID from header, default to "default" if not provided
5757
const clientId = request.headers.get("X-Client-Id") || "default";
58+
const streamVersion = request.headers.get("X-Stream-Version") || "v1";
5859

5960
if (!request.body) {
6061
return new Response("No body provided", { status: 400 });
@@ -63,7 +64,9 @@ const { action } = createActionApiRoute(
6364
const resumeFromChunk = request.headers.get("X-Resume-From-Chunk");
6465
const resumeFromChunkNumber = resumeFromChunk ? parseInt(resumeFromChunk, 10) : undefined;
6566

66-
return v1RealtimeStreams.ingestData(
67+
const realtimeStream = getRealtimeStreamInstance(authentication.environment, streamVersion);
68+
69+
return realtimeStream.ingestData(
6770
request.body,
6871
targetId,
6972
params.streamId,
@@ -101,7 +104,7 @@ const loader = createLoaderApiRoute(
101104
});
102105
},
103106
},
104-
async ({ request, params, resource: run }) => {
107+
async ({ request, params, resource: run, authentication }) => {
105108
if (!run) {
106109
return new Response("Run not found", { status: 404 });
107110
}
@@ -124,8 +127,11 @@ const loader = createLoaderApiRoute(
124127

125128
// Extract client ID from header, default to "default" if not provided
126129
const clientId = request.headers.get("X-Client-Id") || "default";
130+
const streamVersion = request.headers.get("X-Stream-Version") || "v1";
131+
132+
const realtimeStream = getRealtimeStreamInstance(authentication.environment, streamVersion);
127133

128-
const lastChunkIndex = await v1RealtimeStreams.getLastChunkIndex(
134+
const lastChunkIndex = await realtimeStream.getLastChunkIndex(
129135
targetId,
130136
params.streamId,
131137
clientId

apps/webapp/app/runEngine/services/triggerTask.server.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -347,6 +347,7 @@ export class RunEngineTriggerTaskService {
347347
createdAt: options.overrideCreatedAt,
348348
bulkActionId: body.options?.bulkActionId,
349349
planType,
350+
realtimeStreamsVersion: options.realtimeStreamsVersion,
350351
},
351352
this.prisma
352353
);

0 commit comments

Comments
 (0)