Skip to content

Commit dc8c965

Browse files
ericallammatt-aitken
authored andcommitted
Improves the performance of run resuming (#522)
* Improves the perform of run resuming When runs resume, we try and make sure that tasks that have already been completed are cached and reused. Worst case scenario the client needs to hit the API server once for a non-cached task that is indeed completed on the server, but this can get pretty expensive when there are a larger number of tasks. This commit does 2 different things to help: - noop tasks are no longer “cached” using the cachedTasks strategy, instead their idempotency keys are shoved into a bloom filter and the client tests for their inclusion in the bloom filter before running them (since they don’t have any concept of output, this works) - Additional cached tasks are lazy loaded when a task is run. This allows us to progressively fetch additional tasks to be cached on the client, which will cut down on cache misses by a decent amount * Create warm-carrots-float.md * Make io.yield backwards compat with older platform versions * Better support old clients connecting to server versions that support lazy loading cached tasks * Fixed type errors when settings headers with unknown value * Better yield not support error message * Rename _version to _serverVersion to be more clear
1 parent 3ecbef2 commit dc8c965

File tree

27 files changed

+1029
-210
lines changed

27 files changed

+1029
-210
lines changed

.changeset/flat-olives-grab.md

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
---
2+
"@trigger.dev/sdk": patch
3+
"@trigger.dev/express": patch
4+
"@trigger.dev/nextjs": patch
5+
"@trigger.dev/astro": patch
6+
"@trigger.dev/remix": patch
7+
"@trigger.dev/core": patch
8+
---
9+
10+
Send client version back to the server via headers

.changeset/warm-carrots-float.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
"@trigger.dev/core": patch
3+
"@trigger.dev/sdk": patch
4+
---
5+
6+
Better performance when resuming a run, especially one with a large amount of tasks

apps/webapp/app/consts.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,3 +5,4 @@ export const DEFAULT_MAX_CONCURRENT_RUNS = 10;
55
export const MAX_CONCURRENT_RUNS_LIMIT = 20;
66
export const PREPROCESS_RETRY_LIMIT = 2;
77
export const EXECUTE_JOB_RETRY_LIMIT = 10;
8+
export const MAX_RUN_YIELDED_EXECUTIONS = 100;

apps/webapp/app/models/task.server.ts

Lines changed: 85 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import type { Task, TaskAttempt } from "@trigger.dev/database";
2-
import { ServerTask } from "@trigger.dev/core";
2+
import { CachedTask, ServerTask } from "@trigger.dev/core";
33

44
export type TaskWithAttempts = Task & { attempts: TaskAttempt[] };
55

@@ -25,3 +25,87 @@ export function taskWithAttemptsToServerTask(task: TaskWithAttempts): ServerTask
2525
operation: task.operation,
2626
};
2727
}
28+
29+
export type TaskForCaching = Pick<
30+
Task,
31+
"id" | "status" | "idempotencyKey" | "noop" | "output" | "parentId"
32+
>;
33+
34+
export function prepareTasksForCaching(
35+
possibleTasks: TaskForCaching[],
36+
maxSize: number
37+
): {
38+
tasks: CachedTask[];
39+
cursor: string | undefined;
40+
} {
41+
const tasks = possibleTasks.filter((task) => task.status === "COMPLETED" && !task.noop);
42+
43+
// Select tasks using greedy approach
44+
const tasksToRun: CachedTask[] = [];
45+
let remainingSize = maxSize;
46+
47+
for (const task of tasks) {
48+
const cachedTask = prepareTaskForCaching(task);
49+
const size = calculateCachedTaskSize(cachedTask);
50+
51+
if (size <= remainingSize) {
52+
tasksToRun.push(cachedTask);
53+
remainingSize -= size;
54+
}
55+
}
56+
57+
return {
58+
tasks: tasksToRun,
59+
cursor: tasks.length > tasksToRun.length ? tasks[tasksToRun.length].id : undefined,
60+
};
61+
}
62+
63+
export function prepareTasksForCachingLegacy(
64+
possibleTasks: TaskForCaching[],
65+
maxSize: number
66+
): {
67+
tasks: CachedTask[];
68+
cursor: string | undefined;
69+
} {
70+
const tasks = possibleTasks.filter((task) => task.status === "COMPLETED");
71+
72+
// Prepare tasks and calculate their sizes
73+
const availableTasks = tasks.map((task) => {
74+
const cachedTask = prepareTaskForCaching(task);
75+
return { task: cachedTask, size: calculateCachedTaskSize(cachedTask) };
76+
});
77+
78+
// Sort tasks in ascending order by size
79+
availableTasks.sort((a, b) => a.size - b.size);
80+
81+
// Select tasks using greedy approach
82+
const tasksToRun: CachedTask[] = [];
83+
let remainingSize = maxSize;
84+
85+
for (const { task, size } of availableTasks) {
86+
if (size <= remainingSize) {
87+
tasksToRun.push(task);
88+
remainingSize -= size;
89+
}
90+
}
91+
92+
return {
93+
tasks: tasksToRun,
94+
cursor: undefined,
95+
};
96+
}
97+
98+
function prepareTaskForCaching(task: TaskForCaching): CachedTask {
99+
return {
100+
id: task.idempotencyKey, // We should eventually move this back to task.id
101+
status: task.status,
102+
idempotencyKey: task.idempotencyKey,
103+
noop: task.noop,
104+
output: task.output as any,
105+
parentId: task.parentId,
106+
};
107+
}
108+
109+
function calculateCachedTaskSize(task: CachedTask): number {
110+
return JSON.stringify(task).length;
111+
}

apps/webapp/app/routes/api.v1.runs.$runId.tasks.ts

Lines changed: 82 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,16 @@
11
import type { ActionArgs } from "@remix-run/server-runtime";
22
import { json } from "@remix-run/server-runtime";
33
import { TaskStatus } from "@trigger.dev/database";
4-
import { RunTaskBodyOutput, RunTaskBodyOutputSchema, ServerTask } from "@trigger.dev/core";
4+
import {
5+
API_VERSIONS,
6+
RunTaskBodyOutput,
7+
RunTaskBodyOutputSchema,
8+
RunTaskResponseWithCachedTasksBody,
9+
ServerTask,
10+
} from "@trigger.dev/core";
511
import { z } from "zod";
612
import { $transaction, PrismaClient, prisma } from "~/db.server";
7-
import { taskWithAttemptsToServerTask } from "~/models/task.server";
13+
import { prepareTasksForCaching, taskWithAttemptsToServerTask } from "~/models/task.server";
814
import { authenticateApiRequest } from "~/services/apiAuth.server";
915
import { logger } from "~/services/logger.server";
1016
import { ulid } from "~/services/ulid.server";
@@ -16,6 +22,8 @@ const ParamsSchema = z.object({
1622

1723
const HeadersSchema = z.object({
1824
"idempotency-key": z.string(),
25+
"trigger-version": z.string().optional().nullable(),
26+
"x-cached-tasks-cursor": z.string().optional().nullable(),
1927
});
2028

2129
export async function action({ request, params }: ActionArgs) {
@@ -37,7 +45,11 @@ export async function action({ request, params }: ActionArgs) {
3745
return json({ error: "Invalid or Missing idempotency key" }, { status: 400 });
3846
}
3947

40-
const { "idempotency-key": idempotencyKey } = headers.data;
48+
const {
49+
"idempotency-key": idempotencyKey,
50+
"trigger-version": triggerVersion,
51+
"x-cached-tasks-cursor": cachedTasksCursor,
52+
} = headers.data;
4153

4254
const { runId } = ParamsSchema.parse(params);
4355

@@ -48,6 +60,8 @@ export async function action({ request, params }: ActionArgs) {
4860
body: anyBody,
4961
runId,
5062
idempotencyKey,
63+
triggerVersion,
64+
cachedTasksCursor,
5165
});
5266

5367
const body = RunTaskBodyOutputSchema.safeParse(anyBody);
@@ -71,6 +85,26 @@ export async function action({ request, params }: ActionArgs) {
7185
return json({ error: "Something went wrong" }, { status: 500 });
7286
}
7387

88+
if (triggerVersion === API_VERSIONS.LAZY_LOADED_CACHED_TASKS) {
89+
const requestMigration = new ChangeRequestLazyLoadedCachedTasks();
90+
91+
const responseBody = await requestMigration.call(runId, task, cachedTasksCursor);
92+
93+
logger.debug(
94+
"RunTaskService.call() response migrating with ChangeRequestLazyLoadedCachedTasks",
95+
{
96+
responseBody,
97+
cachedTasksCursor,
98+
}
99+
);
100+
101+
return json(responseBody, {
102+
headers: {
103+
"trigger-version": API_VERSIONS.LAZY_LOADED_CACHED_TASKS,
104+
},
105+
});
106+
}
107+
74108
return json(task);
75109
} catch (error) {
76110
if (error instanceof Error) {
@@ -81,6 +115,51 @@ export async function action({ request, params }: ActionArgs) {
81115
}
82116
}
83117

118+
class ChangeRequestLazyLoadedCachedTasks {
119+
#prismaClient: PrismaClient;
120+
121+
constructor(prismaClient: PrismaClient = prisma) {
122+
this.#prismaClient = prismaClient;
123+
}
124+
125+
public async call(
126+
runId: string,
127+
task: ServerTask,
128+
cursor?: string | null
129+
): Promise<RunTaskResponseWithCachedTasksBody> {
130+
if (!cursor) {
131+
return {
132+
task,
133+
};
134+
}
135+
136+
// We need to limit the cached tasks to not be too large >2MB when serialized
137+
const TOTAL_CACHED_TASK_BYTE_LIMIT = 2000000;
138+
139+
const nextTasks = await this.#prismaClient.task.findMany({
140+
where: {
141+
runId,
142+
status: "COMPLETED",
143+
noop: false,
144+
},
145+
take: 250,
146+
cursor: {
147+
id: cursor,
148+
},
149+
orderBy: {
150+
id: "asc",
151+
},
152+
});
153+
154+
const preparedTasks = prepareTasksForCaching(nextTasks, TOTAL_CACHED_TASK_BYTE_LIMIT);
155+
156+
return {
157+
task,
158+
cachedTasks: preparedTasks,
159+
};
160+
}
161+
}
162+
84163
export class RunTaskService {
85164
#prismaClient: PrismaClient;
86165

apps/webapp/app/routes/resources.projects.$projectId.endpoint.ts

Lines changed: 3 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -2,28 +2,15 @@ import { parse } from "@conform-to/zod";
22
import { ActionArgs, json } from "@remix-run/server-runtime";
33
import { z } from "zod";
44
import { prisma } from "~/db.server";
5-
import {
6-
CreateEndpointError,
7-
CreateEndpointService,
8-
} from "~/services/endpoints/createEndpoint.server";
9-
import { requireUserId } from "~/services/session.server";
10-
import { RuntimeEnvironmentTypeSchema } from "@trigger.dev/core";
11-
import { env } from "process";
5+
import { CreateEndpointError } from "~/services/endpoints/createEndpoint.server";
126
import { ValidateCreateEndpointService } from "~/services/endpoints/validateCreateEndpoint.server";
137

14-
const ParamsSchema = z.object({
15-
projectId: z.string(),
16-
});
17-
188
export const bodySchema = z.object({
199
environmentId: z.string(),
2010
url: z.string().url("Must be a valid URL"),
2111
});
2212

23-
export async function action({ request, params }: ActionArgs) {
24-
const userId = await requireUserId(request);
25-
const { projectId } = ParamsSchema.parse(params);
26-
13+
export async function action({ request }: ActionArgs) {
2714
const formData = await request.formData();
2815
const submission = parse(formData, { schema: bodySchema });
2916

@@ -48,7 +35,7 @@ export async function action({ request, params }: ActionArgs) {
4835
}
4936

5037
const service = new ValidateCreateEndpointService();
51-
const result = await service.call({
38+
await service.call({
5239
url: submission.value.url,
5340
environment,
5441
});

apps/webapp/app/services/endpointApi.server.ts

Lines changed: 23 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
import {
2+
API_VERSIONS,
23
ApiEventLog,
34
DeliverEventResponseSchema,
45
DeserializedJson,
6+
EndpointHeadersSchema,
57
ErrorWithStackSchema,
68
HttpSourceRequest,
79
HttpSourceResponseSchema,
@@ -89,6 +91,15 @@ export class EndpointApi {
8991
};
9092
}
9193

94+
const headers = EndpointHeadersSchema.safeParse(Object.fromEntries(response.headers.entries()));
95+
96+
if (headers.success && headers.data["trigger-version"]) {
97+
return {
98+
...pongResponse.data,
99+
triggerVersion: headers.data["trigger-version"],
100+
};
101+
}
102+
92103
return pongResponse.data;
93104
}
94105

@@ -129,41 +140,15 @@ export class EndpointApi {
129140
const anyBody = await response.json();
130141

131142
const data = IndexEndpointResponseSchema.parse(anyBody);
143+
const headers = EndpointHeadersSchema.parse(Object.fromEntries(response.headers.entries()));
132144

133145
return {
134146
ok: true,
135147
data,
148+
headers,
136149
} as const;
137150
}
138151

139-
async deliverEvent(event: ApiEventLog) {
140-
const response = await safeFetch(this.url, {
141-
method: "POST",
142-
headers: {
143-
"Content-Type": "application/json",
144-
"x-trigger-api-key": this.apiKey,
145-
"x-trigger-action": "DELIVER_EVENT",
146-
},
147-
body: JSON.stringify(event),
148-
});
149-
150-
if (!response) {
151-
throw new Error(`Could not connect to endpoint ${this.url}`);
152-
}
153-
154-
if (!response.ok) {
155-
throw new Error(`Could not connect to endpoint ${this.url}. Status code: ${response.status}`);
156-
}
157-
158-
const anyBody = await response.json();
159-
160-
logger.debug("deliverEvent() response from endpoint", {
161-
body: anyBody,
162-
});
163-
164-
return DeliverEventResponseSchema.parse(anyBody);
165-
}
166-
167152
async executeJobRequest(options: RunJobBody) {
168153
const startTimeInMs = performance.now();
169154

@@ -338,6 +323,15 @@ export class EndpointApi {
338323
};
339324
}
340325

326+
const headers = EndpointHeadersSchema.safeParse(Object.fromEntries(response.headers.entries()));
327+
328+
if (headers.success && headers.data["trigger-version"]) {
329+
return {
330+
...validateResponse.data,
331+
triggerVersion: headers.data["trigger-version"],
332+
};
333+
}
334+
341335
return validateResponse.data;
342336
}
343337
}
@@ -359,6 +353,7 @@ function addStandardRequestOptions(options: RequestInit) {
359353
headers: {
360354
...options.headers,
361355
"user-agent": "triggerdotdev-server/2.0.0",
356+
"x-trigger-version": API_VERSIONS.LAZY_LOADED_CACHED_TASKS,
362357
},
363358
};
364359
}

apps/webapp/app/services/endpoints/createEndpoint.server.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,9 +74,11 @@ export class CreateEndpointService {
7474
slug: id,
7575
url: endpointUrl,
7676
indexingHookIdentifier: indexingHookIdentifier(),
77+
version: pong.triggerVersion,
7778
},
7879
update: {
7980
url: endpointUrl,
81+
version: pong.triggerVersion,
8082
},
8183
});
8284

0 commit comments

Comments
 (0)