Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions .changeset/flat-olives-grab.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
---
"@trigger.dev/sdk": patch
"@trigger.dev/express": patch
"@trigger.dev/nextjs": patch
"@trigger.dev/astro": patch
"@trigger.dev/remix": patch
"@trigger.dev/core": patch
---

Send client version back to the server via headers
6 changes: 6 additions & 0 deletions .changeset/warm-carrots-float.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
"@trigger.dev/core": patch
"@trigger.dev/sdk": patch
---

Better performance when resuming a run, especially one with a large amount of tasks
1 change: 1 addition & 0 deletions apps/webapp/app/consts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@ export const DEFAULT_MAX_CONCURRENT_RUNS = 10;
export const MAX_CONCURRENT_RUNS_LIMIT = 20;
export const PREPROCESS_RETRY_LIMIT = 2;
export const EXECUTE_JOB_RETRY_LIMIT = 10;
export const MAX_RUN_YIELDED_EXECUTIONS = 100;
86 changes: 85 additions & 1 deletion apps/webapp/app/models/task.server.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import type { Task, TaskAttempt } from "@trigger.dev/database";
import { ServerTask } from "@trigger.dev/core";
import { CachedTask, ServerTask } from "@trigger.dev/core";

export type TaskWithAttempts = Task & { attempts: TaskAttempt[] };

Expand All @@ -25,3 +25,87 @@ export function taskWithAttemptsToServerTask(task: TaskWithAttempts): ServerTask
operation: task.operation,
};
}

export type TaskForCaching = Pick<
Task,
"id" | "status" | "idempotencyKey" | "noop" | "output" | "parentId"
>;

export function prepareTasksForCaching(
possibleTasks: TaskForCaching[],
maxSize: number
): {
tasks: CachedTask[];
cursor: string | undefined;
} {
const tasks = possibleTasks.filter((task) => task.status === "COMPLETED" && !task.noop);

// Select tasks using greedy approach
const tasksToRun: CachedTask[] = [];
let remainingSize = maxSize;

for (const task of tasks) {
const cachedTask = prepareTaskForCaching(task);
const size = calculateCachedTaskSize(cachedTask);

if (size <= remainingSize) {
tasksToRun.push(cachedTask);
remainingSize -= size;
}
}

return {
tasks: tasksToRun,
cursor: tasks.length > tasksToRun.length ? tasks[tasksToRun.length].id : undefined,
};
}

export function prepareTasksForCachingLegacy(
possibleTasks: TaskForCaching[],
maxSize: number
): {
tasks: CachedTask[];
cursor: string | undefined;
} {
const tasks = possibleTasks.filter((task) => task.status === "COMPLETED");

// Prepare tasks and calculate their sizes
const availableTasks = tasks.map((task) => {
const cachedTask = prepareTaskForCaching(task);
return { task: cachedTask, size: calculateCachedTaskSize(cachedTask) };
});

// Sort tasks in ascending order by size
availableTasks.sort((a, b) => a.size - b.size);

// Select tasks using greedy approach
const tasksToRun: CachedTask[] = [];
let remainingSize = maxSize;

for (const { task, size } of availableTasks) {
if (size <= remainingSize) {
tasksToRun.push(task);
remainingSize -= size;
}
}

return {
tasks: tasksToRun,
cursor: undefined,
};
}

function prepareTaskForCaching(task: TaskForCaching): CachedTask {
return {
id: task.idempotencyKey, // We should eventually move this back to task.id
status: task.status,
idempotencyKey: task.idempotencyKey,
noop: task.noop,
output: task.output as any,
parentId: task.parentId,
};
}

function calculateCachedTaskSize(task: CachedTask): number {
return JSON.stringify(task).length;
}
85 changes: 82 additions & 3 deletions apps/webapp/app/routes/api.v1.runs.$runId.tasks.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,16 @@
import type { ActionArgs } from "@remix-run/server-runtime";
import { json } from "@remix-run/server-runtime";
import { TaskStatus } from "@trigger.dev/database";
import { RunTaskBodyOutput, RunTaskBodyOutputSchema, ServerTask } from "@trigger.dev/core";
import {
API_VERSIONS,
RunTaskBodyOutput,
RunTaskBodyOutputSchema,
RunTaskResponseWithCachedTasksBody,
ServerTask,
} from "@trigger.dev/core";
import { z } from "zod";
import { $transaction, PrismaClient, prisma } from "~/db.server";
import { taskWithAttemptsToServerTask } from "~/models/task.server";
import { prepareTasksForCaching, taskWithAttemptsToServerTask } from "~/models/task.server";
import { authenticateApiRequest } from "~/services/apiAuth.server";
import { logger } from "~/services/logger.server";
import { ulid } from "~/services/ulid.server";
Expand All @@ -16,6 +22,8 @@ const ParamsSchema = z.object({

const HeadersSchema = z.object({
"idempotency-key": z.string(),
"trigger-version": z.string().optional().nullable(),
"x-cached-tasks-cursor": z.string().optional().nullable(),
});

export async function action({ request, params }: ActionArgs) {
Expand All @@ -37,7 +45,11 @@ export async function action({ request, params }: ActionArgs) {
return json({ error: "Invalid or Missing idempotency key" }, { status: 400 });
}

const { "idempotency-key": idempotencyKey } = headers.data;
const {
"idempotency-key": idempotencyKey,
"trigger-version": triggerVersion,
"x-cached-tasks-cursor": cachedTasksCursor,
} = headers.data;

const { runId } = ParamsSchema.parse(params);

Expand All @@ -48,6 +60,8 @@ export async function action({ request, params }: ActionArgs) {
body: anyBody,
runId,
idempotencyKey,
triggerVersion,
cachedTasksCursor,
});

const body = RunTaskBodyOutputSchema.safeParse(anyBody);
Expand All @@ -71,6 +85,26 @@ export async function action({ request, params }: ActionArgs) {
return json({ error: "Something went wrong" }, { status: 500 });
}

if (triggerVersion === API_VERSIONS.LAZY_LOADED_CACHED_TASKS) {
const requestMigration = new ChangeRequestLazyLoadedCachedTasks();

const responseBody = await requestMigration.call(runId, task, cachedTasksCursor);

logger.debug(
"RunTaskService.call() response migrating with ChangeRequestLazyLoadedCachedTasks",
{
responseBody,
cachedTasksCursor,
}
);

return json(responseBody, {
headers: {
"trigger-version": API_VERSIONS.LAZY_LOADED_CACHED_TASKS,
},
});
}

return json(task);
} catch (error) {
if (error instanceof Error) {
Expand All @@ -81,6 +115,51 @@ export async function action({ request, params }: ActionArgs) {
}
}

class ChangeRequestLazyLoadedCachedTasks {
#prismaClient: PrismaClient;

constructor(prismaClient: PrismaClient = prisma) {
this.#prismaClient = prismaClient;
}

public async call(
runId: string,
task: ServerTask,
cursor?: string | null
): Promise<RunTaskResponseWithCachedTasksBody> {
if (!cursor) {
return {
task,
};
}

// We need to limit the cached tasks to not be too large >2MB when serialized
const TOTAL_CACHED_TASK_BYTE_LIMIT = 2000000;

const nextTasks = await this.#prismaClient.task.findMany({
where: {
runId,
status: "COMPLETED",
noop: false,
},
take: 250,
cursor: {
id: cursor,
},
orderBy: {
id: "asc",
},
});

const preparedTasks = prepareTasksForCaching(nextTasks, TOTAL_CACHED_TASK_BYTE_LIMIT);

return {
task,
cachedTasks: preparedTasks,
};
}
}

export class RunTaskService {
#prismaClient: PrismaClient;

Expand Down
19 changes: 3 additions & 16 deletions apps/webapp/app/routes/resources.projects.$projectId.endpoint.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,28 +2,15 @@ import { parse } from "@conform-to/zod";
import { ActionArgs, json } from "@remix-run/server-runtime";
import { z } from "zod";
import { prisma } from "~/db.server";
import {
CreateEndpointError,
CreateEndpointService,
} from "~/services/endpoints/createEndpoint.server";
import { requireUserId } from "~/services/session.server";
import { RuntimeEnvironmentTypeSchema } from "@trigger.dev/core";
import { env } from "process";
import { CreateEndpointError } from "~/services/endpoints/createEndpoint.server";
import { ValidateCreateEndpointService } from "~/services/endpoints/validateCreateEndpoint.server";

const ParamsSchema = z.object({
projectId: z.string(),
});

export const bodySchema = z.object({
environmentId: z.string(),
url: z.string().url("Must be a valid URL"),
});

export async function action({ request, params }: ActionArgs) {
const userId = await requireUserId(request);
const { projectId } = ParamsSchema.parse(params);

export async function action({ request }: ActionArgs) {
const formData = await request.formData();
const submission = parse(formData, { schema: bodySchema });

Expand All @@ -48,7 +35,7 @@ export async function action({ request, params }: ActionArgs) {
}

const service = new ValidateCreateEndpointService();
const result = await service.call({
await service.call({
url: submission.value.url,
environment,
});
Expand Down
51 changes: 23 additions & 28 deletions apps/webapp/app/services/endpointApi.server.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import {
API_VERSIONS,
ApiEventLog,
DeliverEventResponseSchema,
DeserializedJson,
EndpointHeadersSchema,
ErrorWithStackSchema,
HttpSourceRequest,
HttpSourceResponseSchema,
Expand Down Expand Up @@ -89,6 +91,15 @@ export class EndpointApi {
};
}

const headers = EndpointHeadersSchema.safeParse(Object.fromEntries(response.headers.entries()));

if (headers.success && headers.data["trigger-version"]) {
return {
...pongResponse.data,
triggerVersion: headers.data["trigger-version"],
};
}

return pongResponse.data;
}

Expand Down Expand Up @@ -129,41 +140,15 @@ export class EndpointApi {
const anyBody = await response.json();

const data = IndexEndpointResponseSchema.parse(anyBody);
const headers = EndpointHeadersSchema.parse(Object.fromEntries(response.headers.entries()));

return {
ok: true,
data,
headers,
} as const;
}

async deliverEvent(event: ApiEventLog) {
const response = await safeFetch(this.url, {
method: "POST",
headers: {
"Content-Type": "application/json",
"x-trigger-api-key": this.apiKey,
"x-trigger-action": "DELIVER_EVENT",
},
body: JSON.stringify(event),
});

if (!response) {
throw new Error(`Could not connect to endpoint ${this.url}`);
}

if (!response.ok) {
throw new Error(`Could not connect to endpoint ${this.url}. Status code: ${response.status}`);
}

const anyBody = await response.json();

logger.debug("deliverEvent() response from endpoint", {
body: anyBody,
});

return DeliverEventResponseSchema.parse(anyBody);
}

async executeJobRequest(options: RunJobBody) {
const startTimeInMs = performance.now();

Expand Down Expand Up @@ -338,6 +323,15 @@ export class EndpointApi {
};
}

const headers = EndpointHeadersSchema.safeParse(Object.fromEntries(response.headers.entries()));

if (headers.success && headers.data["trigger-version"]) {
return {
...validateResponse.data,
triggerVersion: headers.data["trigger-version"],
};
}

return validateResponse.data;
}
}
Expand All @@ -359,6 +353,7 @@ function addStandardRequestOptions(options: RequestInit) {
headers: {
...options.headers,
"user-agent": "triggerdotdev-server/2.0.0",
"x-trigger-version": API_VERSIONS.LAZY_LOADED_CACHED_TASKS,
},
};
}
2 changes: 2 additions & 0 deletions apps/webapp/app/services/endpoints/createEndpoint.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,11 @@ export class CreateEndpointService {
slug: id,
url: endpointUrl,
indexingHookIdentifier: indexingHookIdentifier(),
version: pong.triggerVersion,
},
update: {
url: endpointUrl,
version: pong.triggerVersion,
},
});

Expand Down
Loading