diff --git a/.changeset/flat-olives-grab.md b/.changeset/flat-olives-grab.md new file mode 100644 index 0000000000..4e5fe2a307 --- /dev/null +++ b/.changeset/flat-olives-grab.md @@ -0,0 +1,10 @@ +--- +"@trigger.dev/sdk": patch +"@trigger.dev/express": patch +"@trigger.dev/nextjs": patch +"@trigger.dev/astro": patch +"@trigger.dev/remix": patch +"@trigger.dev/core": patch +--- + +Send client version back to the server via headers diff --git a/.changeset/warm-carrots-float.md b/.changeset/warm-carrots-float.md new file mode 100644 index 0000000000..cf51b63570 --- /dev/null +++ b/.changeset/warm-carrots-float.md @@ -0,0 +1,6 @@ +--- +"@trigger.dev/core": patch +"@trigger.dev/sdk": patch +--- + +Better performance when resuming a run, especially one with a large amount of tasks diff --git a/apps/webapp/app/consts.ts b/apps/webapp/app/consts.ts index c6e9c85cdd..51ab2cc444 100644 --- a/apps/webapp/app/consts.ts +++ b/apps/webapp/app/consts.ts @@ -5,3 +5,4 @@ export const DEFAULT_MAX_CONCURRENT_RUNS = 10; export const MAX_CONCURRENT_RUNS_LIMIT = 20; export const PREPROCESS_RETRY_LIMIT = 2; export const EXECUTE_JOB_RETRY_LIMIT = 10; +export const MAX_RUN_YIELDED_EXECUTIONS = 100; diff --git a/apps/webapp/app/models/task.server.ts b/apps/webapp/app/models/task.server.ts index 19951eb6f7..ac468779af 100644 --- a/apps/webapp/app/models/task.server.ts +++ b/apps/webapp/app/models/task.server.ts @@ -1,5 +1,5 @@ import type { Task, TaskAttempt } from "@trigger.dev/database"; -import { ServerTask } from "@trigger.dev/core"; +import { CachedTask, ServerTask } from "@trigger.dev/core"; export type TaskWithAttempts = Task & { attempts: TaskAttempt[] }; @@ -25,3 +25,87 @@ export function taskWithAttemptsToServerTask(task: TaskWithAttempts): ServerTask operation: task.operation, }; } + +export type TaskForCaching = Pick< + Task, + "id" | "status" | "idempotencyKey" | "noop" | "output" | "parentId" +>; + +export function prepareTasksForCaching( + possibleTasks: TaskForCaching[], + maxSize: number +): { + tasks: CachedTask[]; + cursor: string | undefined; +} { + const tasks = possibleTasks.filter((task) => task.status === "COMPLETED" && !task.noop); + + // Select tasks using greedy approach + const tasksToRun: CachedTask[] = []; + let remainingSize = maxSize; + + for (const task of tasks) { + const cachedTask = prepareTaskForCaching(task); + const size = calculateCachedTaskSize(cachedTask); + + if (size <= remainingSize) { + tasksToRun.push(cachedTask); + remainingSize -= size; + } + } + + return { + tasks: tasksToRun, + cursor: tasks.length > tasksToRun.length ? tasks[tasksToRun.length].id : undefined, + }; +} + +export function prepareTasksForCachingLegacy( + possibleTasks: TaskForCaching[], + maxSize: number +): { + tasks: CachedTask[]; + cursor: string | undefined; +} { + const tasks = possibleTasks.filter((task) => task.status === "COMPLETED"); + + // Prepare tasks and calculate their sizes + const availableTasks = tasks.map((task) => { + const cachedTask = prepareTaskForCaching(task); + return { task: cachedTask, size: calculateCachedTaskSize(cachedTask) }; + }); + + // Sort tasks in ascending order by size + availableTasks.sort((a, b) => a.size - b.size); + + // Select tasks using greedy approach + const tasksToRun: CachedTask[] = []; + let remainingSize = maxSize; + + for (const { task, size } of availableTasks) { + if (size <= remainingSize) { + tasksToRun.push(task); + remainingSize -= size; + } + } + + return { + tasks: tasksToRun, + cursor: undefined, + }; +} + +function prepareTaskForCaching(task: TaskForCaching): CachedTask { + return { + id: task.idempotencyKey, // We should eventually move this back to task.id + status: task.status, + idempotencyKey: task.idempotencyKey, + noop: task.noop, + output: task.output as any, + parentId: task.parentId, + }; +} + +function calculateCachedTaskSize(task: CachedTask): number { + return JSON.stringify(task).length; +} diff --git a/apps/webapp/app/routes/api.v1.runs.$runId.tasks.ts b/apps/webapp/app/routes/api.v1.runs.$runId.tasks.ts index 0e5eaecd99..54c9645348 100644 --- a/apps/webapp/app/routes/api.v1.runs.$runId.tasks.ts +++ b/apps/webapp/app/routes/api.v1.runs.$runId.tasks.ts @@ -1,10 +1,16 @@ import type { ActionArgs } from "@remix-run/server-runtime"; import { json } from "@remix-run/server-runtime"; import { TaskStatus } from "@trigger.dev/database"; -import { RunTaskBodyOutput, RunTaskBodyOutputSchema, ServerTask } from "@trigger.dev/core"; +import { + API_VERSIONS, + RunTaskBodyOutput, + RunTaskBodyOutputSchema, + RunTaskResponseWithCachedTasksBody, + ServerTask, +} from "@trigger.dev/core"; import { z } from "zod"; import { $transaction, PrismaClient, prisma } from "~/db.server"; -import { taskWithAttemptsToServerTask } from "~/models/task.server"; +import { prepareTasksForCaching, taskWithAttemptsToServerTask } from "~/models/task.server"; import { authenticateApiRequest } from "~/services/apiAuth.server"; import { logger } from "~/services/logger.server"; import { ulid } from "~/services/ulid.server"; @@ -16,6 +22,8 @@ const ParamsSchema = z.object({ const HeadersSchema = z.object({ "idempotency-key": z.string(), + "trigger-version": z.string().optional().nullable(), + "x-cached-tasks-cursor": z.string().optional().nullable(), }); export async function action({ request, params }: ActionArgs) { @@ -37,7 +45,11 @@ export async function action({ request, params }: ActionArgs) { return json({ error: "Invalid or Missing idempotency key" }, { status: 400 }); } - const { "idempotency-key": idempotencyKey } = headers.data; + const { + "idempotency-key": idempotencyKey, + "trigger-version": triggerVersion, + "x-cached-tasks-cursor": cachedTasksCursor, + } = headers.data; const { runId } = ParamsSchema.parse(params); @@ -48,6 +60,8 @@ export async function action({ request, params }: ActionArgs) { body: anyBody, runId, idempotencyKey, + triggerVersion, + cachedTasksCursor, }); const body = RunTaskBodyOutputSchema.safeParse(anyBody); @@ -71,6 +85,26 @@ export async function action({ request, params }: ActionArgs) { return json({ error: "Something went wrong" }, { status: 500 }); } + if (triggerVersion === API_VERSIONS.LAZY_LOADED_CACHED_TASKS) { + const requestMigration = new ChangeRequestLazyLoadedCachedTasks(); + + const responseBody = await requestMigration.call(runId, task, cachedTasksCursor); + + logger.debug( + "RunTaskService.call() response migrating with ChangeRequestLazyLoadedCachedTasks", + { + responseBody, + cachedTasksCursor, + } + ); + + return json(responseBody, { + headers: { + "trigger-version": API_VERSIONS.LAZY_LOADED_CACHED_TASKS, + }, + }); + } + return json(task); } catch (error) { if (error instanceof Error) { @@ -81,6 +115,51 @@ export async function action({ request, params }: ActionArgs) { } } +class ChangeRequestLazyLoadedCachedTasks { + #prismaClient: PrismaClient; + + constructor(prismaClient: PrismaClient = prisma) { + this.#prismaClient = prismaClient; + } + + public async call( + runId: string, + task: ServerTask, + cursor?: string | null + ): Promise { + if (!cursor) { + return { + task, + }; + } + + // We need to limit the cached tasks to not be too large >2MB when serialized + const TOTAL_CACHED_TASK_BYTE_LIMIT = 2000000; + + const nextTasks = await this.#prismaClient.task.findMany({ + where: { + runId, + status: "COMPLETED", + noop: false, + }, + take: 250, + cursor: { + id: cursor, + }, + orderBy: { + id: "asc", + }, + }); + + const preparedTasks = prepareTasksForCaching(nextTasks, TOTAL_CACHED_TASK_BYTE_LIMIT); + + return { + task, + cachedTasks: preparedTasks, + }; + } +} + export class RunTaskService { #prismaClient: PrismaClient; diff --git a/apps/webapp/app/routes/resources.projects.$projectId.endpoint.ts b/apps/webapp/app/routes/resources.projects.$projectId.endpoint.ts index 0412919ecf..ece220cb72 100644 --- a/apps/webapp/app/routes/resources.projects.$projectId.endpoint.ts +++ b/apps/webapp/app/routes/resources.projects.$projectId.endpoint.ts @@ -2,28 +2,15 @@ import { parse } from "@conform-to/zod"; import { ActionArgs, json } from "@remix-run/server-runtime"; import { z } from "zod"; import { prisma } from "~/db.server"; -import { - CreateEndpointError, - CreateEndpointService, -} from "~/services/endpoints/createEndpoint.server"; -import { requireUserId } from "~/services/session.server"; -import { RuntimeEnvironmentTypeSchema } from "@trigger.dev/core"; -import { env } from "process"; +import { CreateEndpointError } from "~/services/endpoints/createEndpoint.server"; import { ValidateCreateEndpointService } from "~/services/endpoints/validateCreateEndpoint.server"; -const ParamsSchema = z.object({ - projectId: z.string(), -}); - export const bodySchema = z.object({ environmentId: z.string(), url: z.string().url("Must be a valid URL"), }); -export async function action({ request, params }: ActionArgs) { - const userId = await requireUserId(request); - const { projectId } = ParamsSchema.parse(params); - +export async function action({ request }: ActionArgs) { const formData = await request.formData(); const submission = parse(formData, { schema: bodySchema }); @@ -48,7 +35,7 @@ export async function action({ request, params }: ActionArgs) { } const service = new ValidateCreateEndpointService(); - const result = await service.call({ + await service.call({ url: submission.value.url, environment, }); diff --git a/apps/webapp/app/services/endpointApi.server.ts b/apps/webapp/app/services/endpointApi.server.ts index 8cce435fa0..494930a073 100644 --- a/apps/webapp/app/services/endpointApi.server.ts +++ b/apps/webapp/app/services/endpointApi.server.ts @@ -1,7 +1,9 @@ import { + API_VERSIONS, ApiEventLog, DeliverEventResponseSchema, DeserializedJson, + EndpointHeadersSchema, ErrorWithStackSchema, HttpSourceRequest, HttpSourceResponseSchema, @@ -89,6 +91,15 @@ export class EndpointApi { }; } + const headers = EndpointHeadersSchema.safeParse(Object.fromEntries(response.headers.entries())); + + if (headers.success && headers.data["trigger-version"]) { + return { + ...pongResponse.data, + triggerVersion: headers.data["trigger-version"], + }; + } + return pongResponse.data; } @@ -129,41 +140,15 @@ export class EndpointApi { const anyBody = await response.json(); const data = IndexEndpointResponseSchema.parse(anyBody); + const headers = EndpointHeadersSchema.parse(Object.fromEntries(response.headers.entries())); return { ok: true, data, + headers, } as const; } - async deliverEvent(event: ApiEventLog) { - const response = await safeFetch(this.url, { - method: "POST", - headers: { - "Content-Type": "application/json", - "x-trigger-api-key": this.apiKey, - "x-trigger-action": "DELIVER_EVENT", - }, - body: JSON.stringify(event), - }); - - if (!response) { - throw new Error(`Could not connect to endpoint ${this.url}`); - } - - if (!response.ok) { - throw new Error(`Could not connect to endpoint ${this.url}. Status code: ${response.status}`); - } - - const anyBody = await response.json(); - - logger.debug("deliverEvent() response from endpoint", { - body: anyBody, - }); - - return DeliverEventResponseSchema.parse(anyBody); - } - async executeJobRequest(options: RunJobBody) { const startTimeInMs = performance.now(); @@ -338,6 +323,15 @@ export class EndpointApi { }; } + const headers = EndpointHeadersSchema.safeParse(Object.fromEntries(response.headers.entries())); + + if (headers.success && headers.data["trigger-version"]) { + return { + ...validateResponse.data, + triggerVersion: headers.data["trigger-version"], + }; + } + return validateResponse.data; } } @@ -359,6 +353,7 @@ function addStandardRequestOptions(options: RequestInit) { headers: { ...options.headers, "user-agent": "triggerdotdev-server/2.0.0", + "x-trigger-version": API_VERSIONS.LAZY_LOADED_CACHED_TASKS, }, }; } diff --git a/apps/webapp/app/services/endpoints/createEndpoint.server.ts b/apps/webapp/app/services/endpoints/createEndpoint.server.ts index bd1839222f..20c62e7ef2 100644 --- a/apps/webapp/app/services/endpoints/createEndpoint.server.ts +++ b/apps/webapp/app/services/endpoints/createEndpoint.server.ts @@ -74,9 +74,11 @@ export class CreateEndpointService { slug: id, url: endpointUrl, indexingHookIdentifier: indexingHookIdentifier(), + version: pong.triggerVersion, }, update: { url: endpointUrl, + version: pong.triggerVersion, }, }); diff --git a/apps/webapp/app/services/endpoints/indexEndpoint.server.ts b/apps/webapp/app/services/endpoints/indexEndpoint.server.ts index bc529fe0cf..2e6da01825 100644 --- a/apps/webapp/app/services/endpoints/indexEndpoint.server.ts +++ b/apps/webapp/app/services/endpoints/indexEndpoint.server.ts @@ -41,6 +41,7 @@ export class IndexEndpointService { } const { jobs, sources, dynamicTriggers, dynamicSchedules } = indexResponse.data; + const { "trigger-version": triggerVersion } = indexResponse.headers; logger.debug("Indexing endpoint", { endpointId: endpoint.id, @@ -48,6 +49,7 @@ export class IndexEndpointService { endpointSlug: endpoint.slug, source: source, sourceData: sourceData, + triggerVersion, stats: { jobs: jobs.length, sources: sources.length, @@ -56,6 +58,17 @@ export class IndexEndpointService { }, }); + if (triggerVersion && triggerVersion !== endpoint.version) { + await this.#prismaClient.endpoint.update({ + where: { + id: endpoint.id, + }, + data: { + version: triggerVersion, + }, + }); + } + const indexStats = { jobs: 0, sources: 0, diff --git a/apps/webapp/app/services/endpoints/validateCreateEndpoint.server.ts b/apps/webapp/app/services/endpoints/validateCreateEndpoint.server.ts index d578a26d9d..4bbe5231fc 100644 --- a/apps/webapp/app/services/endpoints/validateCreateEndpoint.server.ts +++ b/apps/webapp/app/services/endpoints/validateCreateEndpoint.server.ts @@ -58,9 +58,11 @@ export class ValidateCreateEndpointService { slug: validationResult.endpointId, url: endpointUrl, indexingHookIdentifier: indexingHookIdentifier(), + version: validationResult.triggerVersion, }, update: { url: endpointUrl, + version: validationResult.triggerVersion, }, }); diff --git a/apps/webapp/app/services/runs/performRunExecutionV1.server.ts b/apps/webapp/app/services/runs/performRunExecutionV1.server.ts index 591f6f00b1..7cfdf2d966 100644 --- a/apps/webapp/app/services/runs/performRunExecutionV1.server.ts +++ b/apps/webapp/app/services/runs/performRunExecutionV1.server.ts @@ -263,6 +263,7 @@ export class PerformRunExecutionV1Service { .flat() .filter(Boolean) .map((t) => CachedTaskSchema.parse(t)), + yieldedExecutions: run.yieldedExecutions, }); if (!response) { @@ -354,6 +355,11 @@ export class PerformRunExecutionV1Service { break; } + case "YIELD_EXECUTION": { + await this.#resumeYieldedExecution(execution, safeBody.data.key); + + break; + } default: { const _exhaustiveCheck: never = status; throw new Error(`Non-exhaustive match for value: ${status}`); @@ -393,6 +399,40 @@ export class PerformRunExecutionV1Service { }); } + async #resumeYieldedExecution(execution: FoundRunExecution, key: string) { + const { run } = execution; + + return await $transaction(this.#prismaClient, async (tx) => { + await tx.jobRunExecution.update({ + where: { + id: execution.id, + }, + data: { + status: "SUCCESS", + completedAt: new Date(), + run: { + update: { + yieldedExecutions: { + push: key, + }, + }, + }, + }, + }); + + const newJobExecution = await tx.jobRunExecution.create({ + data: { + runId: run.id, + reason: "EXECUTE_JOB", + status: "PENDING", + retryLimit: EXECUTE_JOB_RETRY_LIMIT, + }, + }); + + await enqueueRunExecutionV1(newJobExecution, run.queue.id, run.queue.maxJobs, tx); + }); + } + async #resumeRunWithTask(execution: FoundRunExecution, data: RunJobResumeWithTask) { const { run } = execution; diff --git a/apps/webapp/app/services/runs/performRunExecutionV2.server.ts b/apps/webapp/app/services/runs/performRunExecutionV2.server.ts index 61feab0185..1af87b8c30 100644 --- a/apps/webapp/app/services/runs/performRunExecutionV2.server.ts +++ b/apps/webapp/app/services/runs/performRunExecutionV2.server.ts @@ -1,12 +1,17 @@ import { - CachedTask, + API_VERSIONS, + BloomFilter, + ConnectionAuth, + EndpointHeadersSchema, RunJobError, RunJobInvalidPayloadError, RunJobResumeWithTask, RunJobRetryWithTask, RunJobSuccess, RunJobUnresolvedAuthError, + RunSourceContext, RunSourceContextSchema, + supportsFeature, } from "@trigger.dev/core"; import { RuntimeEnvironmentType, type Task } from "@trigger.dev/database"; import { generateErrorMessage } from "zod-error"; @@ -18,10 +23,17 @@ import { formatError } from "~/utils/formatErrors.server"; import { safeJsonZodParse } from "~/utils/json"; import { EndpointApi } from "../endpointApi.server"; import { logger } from "../logger.server"; +import { prepareTasksForCaching, prepareTasksForCachingLegacy } from "~/models/task.server"; +import { MAX_RUN_YIELDED_EXECUTIONS } from "~/consts"; +import { ApiEventLog } from "@trigger.dev/core"; +import { RunJobBody } from "@trigger.dev/core"; type FoundRun = NonNullable>>; type FoundTask = FoundRun["tasks"][number]; +// We need to limit the cached tasks to not be too large >3.5MB when serialized +const TOTAL_CACHED_TASK_BYTE_LIMIT = 3500000; + export type PerformRunExecutionV2Input = { id: string; reason: "PREPROCESS" | "EXECUTE_JOB"; @@ -230,38 +242,19 @@ export class PerformRunExecutionV2Service { const sourceContext = RunSourceContextSchema.safeParse(run.event.sourceContext); - const { response, parser, errorParser, durationInMs } = await client.executeJobRequest({ + const executionBody = await this.#createExecutionBody( + run, + [run.tasks, resumedTask].flat().filter(Boolean), + startedAt, + isRetry, + connections.auth, event, - job: { - id: run.version.job.slug, - version: run.version.version, - }, - run: { - id: run.id, - isTest: run.isTest, - startedAt, - isRetry, - }, - environment: { - id: run.environment.id, - slug: run.environment.slug, - type: run.environment.type, - }, - organization: { - id: run.organization.id, - slug: run.organization.slug, - title: run.organization.title, - }, - account: run.externalAccount - ? { - id: run.externalAccount.identifier, - metadata: run.externalAccount.metadata, - } - : undefined, - connections: connections.auth, - source: sourceContext.success ? sourceContext.data : undefined, - tasks: prepareTasksForRun([run.tasks, resumedTask].flat().filter(Boolean)), - }); + sourceContext.success ? sourceContext.data : undefined + ); + + const { response, parser, errorParser, durationInMs } = await client.executeJobRequest( + executionBody + ); if (!response) { return await this.#failRunExecutionWithRetry({ @@ -269,6 +262,25 @@ export class PerformRunExecutionV2Service { }); } + // Update the endpoint version if it has changed + const rawHeaders = Object.fromEntries(response.headers.entries()); + const headers = EndpointHeadersSchema.safeParse(rawHeaders); + + if ( + headers.success && + headers.data["trigger-version"] && + headers.data["trigger-version"] !== run.endpoint.version + ) { + await this.#prismaClient.endpoint.update({ + where: { + id: run.endpoint.id, + }, + data: { + version: headers.data["trigger-version"], + }, + }); + } + const rawBody = await response.text(); if (!response.ok) { @@ -389,6 +401,10 @@ export class PerformRunExecutionV2Service { break; } + case "YIELD_EXECUTION": { + await this.#resumeYieldedRun(run, safeBody.data.key, isRetry, durationInMs, executionCount); + break; + } default: { const _exhaustiveCheck: never = status; throw new Error(`Non-exhaustive match for value: ${status}`); @@ -396,6 +412,91 @@ export class PerformRunExecutionV2Service { } } + async #createExecutionBody( + run: FoundRun, + tasks: FoundTask[], + startedAt: Date, + isRetry: boolean, + connections: Record, + event: ApiEventLog, + source?: RunSourceContext + ): Promise { + if (supportsFeature("lazyLoadedCachedTasks", run.endpoint.version)) { + const preparedTasks = prepareTasksForCaching(tasks, TOTAL_CACHED_TASK_BYTE_LIMIT); + + return { + event, + job: { + id: run.version.job.slug, + version: run.version.version, + }, + run: { + id: run.id, + isTest: run.isTest, + startedAt, + isRetry, + }, + environment: { + id: run.environment.id, + slug: run.environment.slug, + type: run.environment.type, + }, + organization: { + id: run.organization.id, + slug: run.organization.slug, + title: run.organization.title, + }, + account: run.externalAccount + ? { + id: run.externalAccount.identifier, + metadata: run.externalAccount.metadata, + } + : undefined, + connections, + source, + tasks: preparedTasks.tasks, + cachedTaskCursor: preparedTasks.cursor, + noopTasksSet: prepareNoOpTasksBloomFilter(tasks), + yieldedExecutions: run.yieldedExecutions, + }; + } + + const preparedTasks = prepareTasksForCachingLegacy(tasks, TOTAL_CACHED_TASK_BYTE_LIMIT); + + return { + event, + job: { + id: run.version.job.slug, + version: run.version.version, + }, + run: { + id: run.id, + isTest: run.isTest, + startedAt, + isRetry, + }, + environment: { + id: run.environment.id, + slug: run.environment.slug, + type: run.environment.type, + }, + organization: { + id: run.organization.id, + slug: run.organization.slug, + title: run.organization.title, + }, + account: run.externalAccount + ? { + id: run.externalAccount.identifier, + metadata: run.externalAccount.metadata, + } + : undefined, + connections, + source, + tasks: preparedTasks.tasks, + }; + } + async #completeRunWithSuccess(run: FoundRun, data: RunJobSuccess, durationInMs: number) { await this.#prismaClient.jobRun.update({ where: { id: run.id }, @@ -501,6 +602,56 @@ export class PerformRunExecutionV2Service { }); } + async #resumeYieldedRun( + run: FoundRun, + key: string, + isRetry: boolean, + durationInMs: number, + executionCount: number + ) { + await $transaction(this.#prismaClient, async (tx) => { + if (run.yieldedExecutions.length + 1 > MAX_RUN_YIELDED_EXECUTIONS) { + return await this.#failRunExecution( + tx, + "EXECUTE_JOB", + run, + { + message: `Run has yielded too many times, the maximum is ${MAX_RUN_YIELDED_EXECUTIONS}`, + }, + "FAILURE", + durationInMs + ); + } + + await tx.jobRun.update({ + where: { + id: run.id, + }, + data: { + executionDuration: { + increment: durationInMs, + }, + executionCount: { + increment: 1, + }, + yieldedExecutions: { + push: key, + }, + }, + select: { + yieldedExecutions: true, + executionCount: true, + }, + }); + + await enqueueRunExecutionV2(run, tx, { + isRetry, + skipRetrying: run.environment.type === RuntimeEnvironmentType.DEVELOPMENT, + executionCount, + }); + }); + } + async #retryRunWithTask( run: FoundRun, data: RunJobRetryWithTask, @@ -686,69 +837,16 @@ export class PerformRunExecutionV2Service { } } -function prepareTasksForRun(possibleTasks: FoundTask[]): CachedTask[] { - const tasks = possibleTasks.filter((task) => task.status === "COMPLETED"); - - // We need to limit the cached tasks to not be too large >3.5MB when serialized - const TOTAL_CACHED_TASK_BYTE_LIMIT = 3500000; +function prepareNoOpTasksBloomFilter(possibleTasks: FoundTask[]): string { + const tasks = possibleTasks.filter((task) => task.status === "COMPLETED" && task.noop); - const cachedTasks = new Map(); // Cache for prepared tasks - const cachedTaskSizes = new Map(); // Cache for calculated task sizes + const filter = new BloomFilter(BloomFilter.NOOP_TASK_SET_SIZE); - // Helper function to get the cached prepared task, or prepare and cache if not already cached - function getCachedTask(task: FoundTask): CachedTask { - const taskId = task.id; - if (!cachedTasks.has(taskId)) { - cachedTasks.set(taskId, prepareTaskForRun(task)); - } - return cachedTasks.get(taskId)!; + for (const task of tasks) { + filter.add(task.idempotencyKey); } - // Helper function to get the cached task size, or calculate and cache if not already cached - function getCachedTaskSize(task: CachedTask): number { - const taskId = task.id; - if (!cachedTaskSizes.has(taskId)) { - cachedTaskSizes.set(taskId, calculateCachedTaskSize(task)); - } - return cachedTaskSizes.get(taskId)!; - } - - // Prepare tasks and calculate their sizes - const availableTasks = tasks.map((task) => { - const cachedTask = getCachedTask(task); - return { task: cachedTask, size: getCachedTaskSize(cachedTask) }; - }); - - // Sort tasks in ascending order by size - availableTasks.sort((a, b) => a.size - b.size); - - // Select tasks using greedy approach - const tasksToRun: CachedTask[] = []; - let remainingSize = TOTAL_CACHED_TASK_BYTE_LIMIT; - - for (const { task, size } of availableTasks) { - if (size <= remainingSize) { - tasksToRun.push(task); - remainingSize -= size; - } - } - - return tasksToRun; -} - -function prepareTaskForRun(task: FoundTask): CachedTask { - return { - id: task.idempotencyKey, // We should eventually move this back to task.id - status: task.status, - idempotencyKey: task.idempotencyKey, - noop: task.noop, - output: task.output as any, - parentId: task.parentId, - }; -} - -function calculateCachedTaskSize(task: CachedTask): number { - return JSON.stringify(task).length; + return filter.serialize(); } async function findRun(prisma: PrismaClientOrTransaction, id: string) { @@ -783,6 +881,9 @@ async function findRun(prisma: PrismaClientOrTransaction, id: string) { output: true, parentId: true, }, + orderBy: { + id: "asc", + }, }, event: true, version: { diff --git a/packages/astro/src/index.ts b/packages/astro/src/index.ts index c90ff6ae0b..e399aa0d71 100644 --- a/packages/astro/src/index.ts +++ b/packages/astro/src/index.ts @@ -42,6 +42,7 @@ export function createAstroRoute(client: TriggerClient) { // execution's response body return new Response(JSON.stringify(response.body), { status: response.status, + headers: response.headers, }); } catch (err) { return new Response(JSON.stringify({ error: "Internal server error" }), { diff --git a/packages/core/src/bloom.ts b/packages/core/src/bloom.ts new file mode 100644 index 0000000000..63428ab76e --- /dev/null +++ b/packages/core/src/bloom.ts @@ -0,0 +1,63 @@ +import { Buffer } from "node:buffer"; + +export class BloomFilter { + private size: number; + private bitArray: Uint8Array; + + constructor(size: number) { + this.size = size; + this.bitArray = new Uint8Array(Math.ceil(size / 8)); + } + + add(item: string): void { + const index = murmurHash3(item) % this.size; + this.bitArray[Math.floor(index / 8)] |= 1 << index % 8; + } + + test(item: string): boolean { + const index = murmurHash3(item) % this.size; + return (this.bitArray[Math.floor(index / 8)] & (1 << index % 8)) !== 0; + } + + // Serialize to a Base64 string + serialize(): string { + return Buffer.from(this.bitArray).toString("base64"); + } + + // Deserialize from a Base64 string + static deserialize(str: string, size: number): BloomFilter { + const filter = new BloomFilter(size); + filter.bitArray = Uint8Array.from(Buffer.from(str, "base64")); + return filter; + } + + static NOOP_TASK_SET_SIZE = 32_768; +} + +function murmurHash3(str: string, seed = 0): number { + let h1 = 0xdeadbeef ^ seed, + h2 = 0x41c6ce57 ^ seed; + for (let i = 0, ch; i < str.length; i++) { + ch = str.charCodeAt(i); + h1 = Math.imul(h1 ^ ch, 0xcc9e2d51); + h1 = (h1 << 15) | (h1 >>> 17); + h1 = Math.imul(h1, 0x1b873593); + + h2 = Math.imul(h2 ^ ch, 0x85ebca6b); + h2 = (h2 << 13) | (h2 >>> 19); + h2 = Math.imul(h2, 0xc2b2ae35); + } + + h1 ^= str.length; + h2 ^= str.length; + + h1 = Math.imul(h1 ^ (h1 >>> 16), 0x85ebca6b); + h1 = Math.imul(h1 ^ (h1 >>> 13), 0xc2b2ae35); + h1 ^= h1 >>> 16; + + h2 = Math.imul(h2 ^ (h2 >>> 16), 0x85ebca6b); + h2 = Math.imul(h2 ^ (h2 >>> 13), 0xc2b2ae35); + h2 ^= h2 >>> 16; + + return 4294967296 * (2097151 & h2) + (h1 >>> 0); +} diff --git a/packages/core/src/index.ts b/packages/core/src/index.ts index 50aee5efaa..0810ae8355 100644 --- a/packages/core/src/index.ts +++ b/packages/core/src/index.ts @@ -6,3 +6,30 @@ export * from "./retry"; export * from "./replacements"; export * from "./searchParams"; export * from "./eventFilterMatches"; +export * from "./bloom"; + +export const API_VERSIONS = { + LAZY_LOADED_CACHED_TASKS: "2023-09-29", +} as const; + +export const PLATFORM_FEATURES = { + yieldExecution: API_VERSIONS.LAZY_LOADED_CACHED_TASKS, + lazyLoadedCachedTasks: API_VERSIONS.LAZY_LOADED_CACHED_TASKS, +}; + +export function supportsFeature( + featureName: TFeatureName, + version: string +): boolean { + if (version === "unversioned" || version === "unknown") { + return false; + } + + const supportedVersion = PLATFORM_FEATURES[featureName]; + + if (!supportedVersion) { + return false; + } + + return version >= supportedVersion; +} diff --git a/packages/core/src/schemas/api.ts b/packages/core/src/schemas/api.ts index b5f1f91610..62edc1f15d 100644 --- a/packages/core/src/schemas/api.ts +++ b/packages/core/src/schemas/api.ts @@ -13,7 +13,7 @@ import { RegisterDynamicSchedulePayloadSchema, ScheduleMetadataSchema, } from "./schedules"; -import { CachedTaskSchema, TaskSchema } from "./tasks"; +import { CachedTaskSchema, ServerTaskSchema, TaskSchema } from "./tasks"; import { EventSpecificationSchema, TriggerMetadataSchema } from "./triggers"; import { RunStatusSchema } from "./runs"; import { JobRunStatusRecordSchema } from "./statuses"; @@ -176,11 +176,13 @@ export type HttpSourceRequestHeaders = z.output; export const ValidateSuccessResponseSchema = z.object({ ok: z.literal(true), endpointId: z.string(), + triggerVersion: z.string().optional(), }); export const ValidateErrorResponseSchema = z.object({ ok: z.literal(false), error: z.string(), + triggerVersion: z.string().optional(), }); export const ValidateResponseSchema = z.discriminatedUnion("ok", [ @@ -292,6 +296,10 @@ export const IndexEndpointResponseSchema = z.object({ export type IndexEndpointResponse = z.infer; +export const EndpointHeadersSchema = z.object({ + "trigger-version": z.string().optional(), +}); + export const RawEventSchema = z.object({ /** The `name` property must exactly match any subscriptions you want to trigger. */ @@ -394,6 +402,8 @@ export const RunSourceContextSchema = z.object({ metadata: z.any(), }); +export type RunSourceContext = z.infer; + export const RunJobBodySchema = z.object({ event: ApiEventLogSchema, job: z.object({ @@ -424,7 +434,10 @@ export const RunJobBodySchema = z.object({ .optional(), source: RunSourceContextSchema.optional(), tasks: z.array(CachedTaskSchema).optional(), + cachedTaskCursor: z.string().optional(), + noopTasksSet: z.string().optional(), connections: z.record(ConnectionAuthSchema).optional(), + yieldedExecutions: z.string().array().optional(), }); export type RunJobBody = z.infer; @@ -437,6 +450,13 @@ export const RunJobErrorSchema = z.object({ export type RunJobError = z.infer; +export const RunJobYieldExecutionErrorSchema = z.object({ + status: z.literal("YIELD_EXECUTION"), + key: z.string(), +}); + +export type RunJobYieldExecutionError = z.infer; + export const RunJobInvalidPayloadErrorSchema = z.object({ status: z.literal("INVALID_PAYLOAD"), errors: z.array(SchemaErrorSchema), @@ -482,6 +502,7 @@ export const RunJobSuccessSchema = z.object({ export type RunJobSuccess = z.infer; export const RunJobResponseSchema = z.discriminatedUnion("status", [ + RunJobYieldExecutionErrorSchema, RunJobErrorSchema, RunJobUnresolvedAuthErrorSchema, RunJobInvalidPayloadErrorSchema, @@ -638,6 +659,20 @@ export const RunTaskBodyOutputSchema = RunTaskBodyInputSchema.extend({ export type RunTaskBodyOutput = z.infer; +export const RunTaskResponseWithCachedTasksBodySchema = z.object({ + task: ServerTaskSchema, + cachedTasks: z + .object({ + tasks: z.array(CachedTaskSchema), + cursor: z.string().optional(), + }) + .optional(), +}); + +export type RunTaskResponseWithCachedTasksBody = z.infer< + typeof RunTaskResponseWithCachedTasksBodySchema +>; + export const CompleteTaskBodyInputSchema = RunTaskBodyInputSchema.pick({ properties: true, description: true, diff --git a/packages/database/prisma/migrations/20230929100348_add_yielded_executions_to_job_run/migration.sql b/packages/database/prisma/migrations/20230929100348_add_yielded_executions_to_job_run/migration.sql new file mode 100644 index 0000000000..c5ccba0866 --- /dev/null +++ b/packages/database/prisma/migrations/20230929100348_add_yielded_executions_to_job_run/migration.sql @@ -0,0 +1,2 @@ +-- AlterTable +ALTER TABLE "JobRun" ADD COLUMN "yieldedExecutions" TEXT[]; diff --git a/packages/database/prisma/migrations/20231003092741_add_version_to_endpoint/migration.sql b/packages/database/prisma/migrations/20231003092741_add_version_to_endpoint/migration.sql new file mode 100644 index 0000000000..0317eaaef5 --- /dev/null +++ b/packages/database/prisma/migrations/20231003092741_add_version_to_endpoint/migration.sql @@ -0,0 +1,2 @@ +-- AlterTable +ALTER TABLE "Endpoint" ADD COLUMN "version" TEXT NOT NULL DEFAULT 'unknown'; diff --git a/packages/database/prisma/schema.prisma b/packages/database/prisma/schema.prisma index 9e8b66da8b..192e0d66a4 100644 --- a/packages/database/prisma/schema.prisma +++ b/packages/database/prisma/schema.prisma @@ -367,6 +367,7 @@ model Endpoint { updatedAt DateTime @updatedAt indexingHookIdentifier String? + version String @default("unknown") jobVersions JobVersion[] jobRuns JobRun[] @@ -713,6 +714,8 @@ model JobRun { isTest Boolean @default(false) preprocess Boolean @default(false) + yieldedExecutions String[] + tasks Task[] runConnections RunConnection[] missingConnections MissingConnection[] diff --git a/packages/express/src/index.ts b/packages/express/src/index.ts index fefc2f1d59..bd69a54fc2 100644 --- a/packages/express/src/index.ts +++ b/packages/express/src/index.ts @@ -77,6 +77,14 @@ export function createMiddleware(client: TriggerClient, path: string = "/api/tri return; } + if (response.headers) { + for (const [key, value] of Object.entries(response.headers)) { + if (typeof value === "string") { + res.setHeader(key, value); + } + } + } + res.status(response.status).json(response.body); } catch (error) { next(error); diff --git a/packages/nextjs/src/index.ts b/packages/nextjs/src/index.ts index b48dd9e879..1f6660cb09 100644 --- a/packages/nextjs/src/index.ts +++ b/packages/nextjs/src/index.ts @@ -14,6 +14,14 @@ export function createPagesRoute(client: TriggerClient) { return; } + if (response.headers) { + for (const [key, value] of Object.entries(response.headers)) { + if (typeof value === "string") { + res.setHeader(key, value); + } + } + } + res.status(response.status).json(response.body); }; @@ -35,7 +43,7 @@ export function createAppRoute(client: TriggerClient) { return NextResponse.json({ error: "Not found" }, { status: 404 }); } - return NextResponse.json(response.body, { status: response.status }); + return NextResponse.json(response.body, { status: response.status, headers: response.headers }); }; return { diff --git a/packages/remix/src/index.ts b/packages/remix/src/index.ts index 347e34da8b..04ec567e8a 100644 --- a/packages/remix/src/index.ts +++ b/packages/remix/src/index.ts @@ -9,7 +9,7 @@ export function createRemixRoute(client: TriggerClient) { return json({ error: "Not found" }, { status: 404 }); } - return json(response.body, { status: response.status }); + return json(response.body, { status: response.status, headers: response.headers }); }; return { action }; } diff --git a/packages/trigger-sdk/src/apiClient.ts b/packages/trigger-sdk/src/apiClient.ts index 744a5cb8c9..42a2976a52 100644 --- a/packages/trigger-sdk/src/apiClient.ts +++ b/packages/trigger-sdk/src/apiClient.ts @@ -27,6 +27,8 @@ import { JobRunStatusRecordSchema, StatusUpdate, urlWithSearchParams, + RunTaskResponseWithCachedTasksBodySchema, + API_VERSIONS, } from "@trigger.dev/core"; import fetch, { type RequestInit } from "node-fetch"; @@ -106,22 +108,35 @@ export class ApiClient { return await response.json(); } - async runTask(runId: string, task: RunTaskBodyInput) { + async runTask( + runId: string, + task: RunTaskBodyInput, + options: { cachedTasksCursor?: string } = {} + ) { const apiKey = await this.#apiKey(); this.#logger.debug("Running Task", { task, }); - return await zodfetch(ServerTaskSchema, `${this.#apiUrl}/api/v1/runs/${runId}/tasks`, { - method: "POST", - headers: { - "Content-Type": "application/json", - Authorization: `Bearer ${apiKey}`, - "Idempotency-Key": task.idempotencyKey, + return await zodfetchWithVersions( + { + [API_VERSIONS.LAZY_LOADED_CACHED_TASKS]: RunTaskResponseWithCachedTasksBodySchema, }, - body: JSON.stringify(task), - }); + ServerTaskSchema, + `${this.#apiUrl}/api/v1/runs/${runId}/tasks`, + { + method: "POST", + headers: { + "Content-Type": "application/json", + Authorization: `Bearer ${apiKey}`, + "Idempotency-Key": task.idempotencyKey, + "X-Cached-Tasks-Cursor": options.cachedTasksCursor ?? "", + "Trigger-Version": API_VERSIONS.LAZY_LOADED_CACHED_TASKS, + }, + body: JSON.stringify(task), + } + ); } async completeTask(runId: string, id: string, task: CompleteTaskBodyInput) { @@ -481,15 +496,103 @@ function getApiKey(key?: string) { return { status: "valid" as const, apiKey }; } -async function zodfetch( - schema: z.Schema, +type VersionedResponseBodyMap = { + [key: string]: z.ZodTypeAny; +}; + +// The resulting type should be a discriminating union +// For example, if the TVersions param is { "2023_09_29": z.string() } and the TUnversioned param is z.number(), the resulting type should be: +// type VersionedResponseBody = { version: "2023_09_29"; body: string } | { version: "unversioned"; body: number } +type VersionedResponseBody< + TVersions extends VersionedResponseBodyMap, + TUnversioned extends z.ZodTypeAny, +> = + | { + [TVersion in keyof TVersions]: { + version: TVersion; + body: z.infer; + }; + }[keyof TVersions] + | { + version: "unversioned"; + body: z.infer; + }; + +async function zodfetchWithVersions< + TVersionedResponseBodyMap extends VersionedResponseBodyMap, + TUnversionedResponseBodySchema extends z.ZodTypeAny, + TOptional extends boolean = false, +>( + versionedSchemaMap: TVersionedResponseBodyMap, + unversionedSchema: TUnversionedResponseBodySchema, + url: string, + requestInit?: RequestInit, + options?: { + errorMessage?: string; + optional?: TOptional; + } +): Promise< + TOptional extends true + ? VersionedResponseBody | undefined + : VersionedResponseBody +> { + const response = await fetch(url, requestInit); + + if ( + (!requestInit || requestInit.method === "GET") && + response.status === 404 && + options?.optional + ) { + // @ts-ignore + return; + } + + if (response.status >= 400 && response.status < 500) { + const body = await response.json(); + + throw new Error(body.error); + } + + if (response.status !== 200) { + throw new Error( + options?.errorMessage ?? `Failed to fetch ${url}, got status code ${response.status}` + ); + } + + const jsonBody = await response.json(); + + const version = response.headers.get("trigger-version"); + + if (!version) { + return { + version: "unversioned", + body: unversionedSchema.parse(jsonBody), + }; + } + + const versionedSchema = versionedSchemaMap[version]; + + if (!versionedSchema) { + throw new Error(`Unknown version ${version}`); + } + + return { + version, + body: versionedSchema.parse(jsonBody), + }; +} + +async function zodfetch( + schema: TResponseSchema, url: string, requestInit?: RequestInit, options?: { errorMessage?: string; optional?: TOptional; } -): Promise { +): Promise< + TOptional extends true ? z.infer | undefined : z.infer +> { const response = await fetch(url, requestInit); if ( diff --git a/packages/trigger-sdk/src/errors.ts b/packages/trigger-sdk/src/errors.ts index 5ef0252039..9e2aeec0e3 100644 --- a/packages/trigger-sdk/src/errors.ts +++ b/packages/trigger-sdk/src/errors.ts @@ -16,6 +16,10 @@ export class CanceledWithTaskError { constructor(public task: ServerTask) {} } +export class YieldExecutionError { + constructor(public key: string) {} +} + export class ParsedPayloadSchemaError { constructor(public schemaErrors: SchemaError[]) {} } @@ -32,6 +36,7 @@ export function isTriggerError( return ( err instanceof ResumeWithTaskError || err instanceof RetryWithTaskError || - err instanceof CanceledWithTaskError + err instanceof CanceledWithTaskError || + err instanceof YieldExecutionError ); } diff --git a/packages/trigger-sdk/src/io.ts b/packages/trigger-sdk/src/io.ts index 9286e71090..58d5a95e9c 100644 --- a/packages/trigger-sdk/src/io.ts +++ b/packages/trigger-sdk/src/io.ts @@ -1,4 +1,6 @@ import { + API_VERSIONS, + BloomFilter, CachedTask, ConnectionAuth, CronOptions, @@ -15,6 +17,7 @@ import { SerializableJsonSchema, ServerTask, UpdateTriggerSourceBodyV2, + supportsFeature, } from "@trigger.dev/core"; import { AsyncLocalStorage } from "node:async_hooks"; import { webcrypto } from "node:crypto"; @@ -23,6 +26,7 @@ import { CanceledWithTaskError, ResumeWithTaskError, RetryWithTaskError, + YieldExecutionError, isTriggerError, } from "./errors"; import { calculateRetryAt } from "./retry"; @@ -46,6 +50,10 @@ export type IOOptions = { jobLogger?: Logger; jobLogLevel: LogLevel; cachedTasks?: Array; + cachedTasksCursor?: string; + yieldedExecutions?: Array; + noopTasksSet?: string; + serverVersion?: string | null; }; type JsonPrimitive = string | number | boolean | null | undefined | Date | symbol; @@ -63,6 +71,16 @@ export type RunTaskErrorCallback = ( | undefined | void; +export type IOStats = { + initialCachedTasks: number; + lazyLoadedCachedTasks: number; + executedTasks: number; + cachedTaskHits: number; + cachedTaskMisses: number; + noopCachedTaskHits: number; + noopCachedTaskMisses: number; +}; + export class IO { private _id: string; private _apiClient: ApiClient; @@ -72,7 +90,16 @@ export class IO { private _jobLogLevel: LogLevel; private _cachedTasks: Map; private _taskStorage: AsyncLocalStorage<{ taskId: string }>; + private _cachedTasksCursor?: string; private _context: TriggerContext; + private _yieldedExecutions: Array; + private _noopTasksBloomFilter: BloomFilter | undefined; + private _stats: IOStats; + private _serverVersion: string; + + get stats() { + return this._stats; + } constructor(options: IOOptions) { this._id = options.id; @@ -83,14 +110,37 @@ export class IO { this._jobLogger = options.jobLogger; this._jobLogLevel = options.jobLogLevel; + this._stats = { + initialCachedTasks: 0, + lazyLoadedCachedTasks: 0, + executedTasks: 0, + cachedTaskHits: 0, + cachedTaskMisses: 0, + noopCachedTaskHits: 0, + noopCachedTaskMisses: 0, + }; + if (options.cachedTasks) { options.cachedTasks.forEach((task) => { this._cachedTasks.set(task.idempotencyKey, task); }); + + this._stats.initialCachedTasks = options.cachedTasks.length; } this._taskStorage = new AsyncLocalStorage(); this._context = options.context; + this._yieldedExecutions = options.yieldedExecutions ?? []; + + if (options.noopTasksSet) { + this._noopTasksBloomFilter = BloomFilter.deserialize( + options.noopTasksSet, + BloomFilter.NOOP_TASK_SET_SIZE + ); + } + + this._cachedTasksCursor = options.cachedTasksCursor; + this._serverVersion = options.serverVersion ?? "unversioned"; } /** @internal */ @@ -108,44 +158,48 @@ export class IO { return new IOLogger(async (level, message, data) => { let logLevel: LogLevel = "info"; - switch (level) { - case "LOG": { - this._jobLogger?.log(message, data); - logLevel = "log"; - break; - } - case "DEBUG": { - this._jobLogger?.debug(message, data); - logLevel = "debug"; - break; - } - case "INFO": { - this._jobLogger?.info(message, data); - logLevel = "info"; - break; - } - case "WARN": { - this._jobLogger?.warn(message, data); - logLevel = "warn"; - break; - } - case "ERROR": { - this._jobLogger?.error(message, data); - logLevel = "error"; - break; - } - } - if (Logger.satisfiesLogLevel(logLevel, this._jobLogLevel)) { - await this.runTask([message, level], async (task) => {}, { - name: "log", - icon: "log", - description: message, - params: data, - properties: [{ label: "Level", text: level }], - style: { style: "minimal", variant: level.toLowerCase() }, - noop: true, - }); + await this.runTask( + [message, level], + async (task) => { + switch (level) { + case "LOG": { + this._jobLogger?.log(message, data); + logLevel = "log"; + break; + } + case "DEBUG": { + this._jobLogger?.debug(message, data); + logLevel = "debug"; + break; + } + case "INFO": { + this._jobLogger?.info(message, data); + logLevel = "info"; + break; + } + case "WARN": { + this._jobLogger?.warn(message, data); + logLevel = "warn"; + break; + } + case "ERROR": { + this._jobLogger?.error(message, data); + logLevel = "error"; + break; + } + } + }, + { + name: "log", + icon: "log", + description: message, + params: data, + properties: [{ label: "Level", text: level }], + style: { style: "minimal", variant: level.toLowerCase() }, + noop: true, + } + ); } }); } @@ -549,19 +603,59 @@ export class IO { if (cachedTask && cachedTask.status === "COMPLETED") { this._logger.debug("Using completed cached task", { idempotencyKey, - cachedTask, }); + this._stats.cachedTaskHits++; + return cachedTask.output as T; } - const task = await this._apiClient.runTask(this._id, { - idempotencyKey, - displayKey: typeof key === "string" ? key : key.join("."), - noop: false, - ...(options ?? {}), - parentId, - }); + if (options?.noop && this._noopTasksBloomFilter) { + if (this._noopTasksBloomFilter.test(idempotencyKey)) { + this._logger.debug("task idempotency key exists in noopTasksBloomFilter", { + idempotencyKey, + }); + + this._stats.noopCachedTaskHits++; + + return {} as T; + } + } + + const response = await this._apiClient.runTask( + this._id, + { + idempotencyKey, + displayKey: typeof key === "string" ? key : undefined, + noop: false, + ...(options ?? {}), + parentId, + }, + { + cachedTasksCursor: this._cachedTasksCursor, + } + ); + + const task = + response.version === API_VERSIONS.LAZY_LOADED_CACHED_TASKS + ? response.body.task + : response.body; + + if (response.version === API_VERSIONS.LAZY_LOADED_CACHED_TASKS) { + this._cachedTasksCursor = response.body.cachedTasks?.cursor; + + for (const cachedTask of response.body.cachedTasks?.tasks ?? []) { + if (!this._cachedTasks.has(cachedTask.idempotencyKey)) { + this._cachedTasks.set(cachedTask.idempotencyKey, cachedTask); + + this._logger.debug("Injecting lazy loaded task into task cache", { + idempotencyKey: cachedTask.idempotencyKey, + }); + + this._stats.lazyLoadedCachedTasks++; + } + } + } if (task.status === "CANCELED") { this._logger.debug("Task canceled", { @@ -573,12 +667,20 @@ export class IO { } if (task.status === "COMPLETED") { - this._logger.debug("Using task output", { - idempotencyKey, - task, - }); + if (task.noop) { + this._logger.debug("Noop Task completed", { + idempotencyKey, + }); + + this._noopTasksBloomFilter?.add(task.idempotencyKey); + } else { + this._logger.debug("Cache miss", { + idempotencyKey, + }); - this.#addToCachedTasks(task); + this._stats.cachedTaskMisses++; + this.#addToCachedTasks(task); + } return task.output as T; } @@ -626,6 +728,8 @@ export class IO { properties: task.outputProperties ?? undefined, }); + this._stats.executedTasks++; + if (completedTask.status === "CANCELED") { throw new CanceledWithTaskError(completedTask); } @@ -699,6 +803,30 @@ export class IO { return this._taskStorage.run({ taskId: task.id }, executeTask); } + /** + * `io.yield()` allows you to yield execution of the current run and resume it in a new function execution. Similar to `io.wait()` but does not create a task and resumes execution immediately. + */ + yield(key: string) { + if (!supportsFeature("yieldExecution", this._serverVersion)) { + console.warn( + "[trigger.dev] io.yield() is not support by the version of the Trigger.dev server you are using, you will need to upgrade your self-hosted Trigger.dev instance." + ); + + return; + } + + if (this._yieldedExecutions.includes(key)) { + return; + } + + throw new YieldExecutionError(key); + } + + /** + * `io.brb()` is an alias of `io.yield()` + */ + brb = this.yield.bind(this); + /** `io.try()` allows you to run Tasks and catch any errors that are thrown, it's similar to a normal `try/catch` block but works with [io.runTask()](/sdk/io/runtask). * A regular `try/catch` block on its own won't work as expected with Tasks. Internally `runTask()` throws some special errors to control flow execution. This is necessary to deal with resumability, serverless timeouts, and retrying Tasks. * @param tryCallback The code you wish to run diff --git a/packages/trigger-sdk/src/triggerClient.ts b/packages/trigger-sdk/src/triggerClient.ts index 44c638dcf0..55412d89b4 100644 --- a/packages/trigger-sdk/src/triggerClient.ts +++ b/packages/trigger-sdk/src/triggerClient.ts @@ -1,4 +1,5 @@ import { + API_VERSIONS, ConnectionAuth, DeserializedJson, ErrorWithStackSchema, @@ -36,9 +37,10 @@ import { ParsedPayloadSchemaError, ResumeWithTaskError, RetryWithTaskError, + YieldExecutionError, } from "./errors"; import { TriggerIntegration } from "./integrations"; -import { IO } from "./io"; +import { IO, IOStats } from "./io"; import { createIOWithIntegrations } from "./ioWithIntegrations"; import { Job, JobOptions } from "./job"; import { runLocalStorage } from "./runLocalStorage"; @@ -124,7 +126,10 @@ export class TriggerClient { this.id = options.id; this.#options = options; this.#client = new ApiClient(this.#options); - this.#internalLogger = new Logger("trigger.dev", this.#options.verbose ? "debug" : "log"); + this.#internalLogger = new Logger("trigger.dev", this.#options.verbose ? "debug" : "log", [ + "output", + "noopTasksSet", + ]); } async handleRequest(request: Request): Promise { @@ -135,6 +140,7 @@ export class TriggerClient { }); const apiKey = request.headers.get("x-trigger-api-key"); + const triggerVersion = request.headers.get("x-trigger-version"); const authorization = this.authorized(apiKey); @@ -148,6 +154,7 @@ export class TriggerClient { body: { message: "Unauthorized: client missing apiKey", }, + headers: this.#standardResponseHeaders, }; } case "missing-header": { @@ -156,6 +163,7 @@ export class TriggerClient { body: { message: "Unauthorized: missing x-trigger-api-key header", }, + headers: this.#standardResponseHeaders, }; } case "unauthorized": { @@ -164,6 +172,7 @@ export class TriggerClient { body: { message: `Forbidden: client apiKey mismatch: Make sure you are using the correct API Key for your environment`, }, + headers: this.#standardResponseHeaders, }; } } @@ -174,6 +183,7 @@ export class TriggerClient { body: { message: "Method not allowed (only POST is allowed)", }, + headers: this.#standardResponseHeaders, }; } @@ -185,6 +195,7 @@ export class TriggerClient { body: { message: "Missing x-trigger-action header", }, + headers: this.#standardResponseHeaders, }; } @@ -199,6 +210,7 @@ export class TriggerClient { ok: false, error: "Missing endpoint ID", }, + headers: this.#standardResponseHeaders, }; } @@ -209,6 +221,7 @@ export class TriggerClient { ok: false, error: `Endpoint ID mismatch error. Expected ${this.id}, got ${endpointId}`, }, + headers: this.#standardResponseHeaders, }; } @@ -217,6 +230,7 @@ export class TriggerClient { body: { ok: true, }, + headers: this.#standardResponseHeaders, }; } case "INDEX_ENDPOINT": { @@ -241,6 +255,7 @@ export class TriggerClient { return { status: 200, body, + headers: this.#standardResponseHeaders, }; } case "INITIALIZE_TRIGGER": { @@ -270,6 +285,7 @@ export class TriggerClient { return { status: 200, body: dynamicTrigger.registeredTriggerForParams(body.data.params), + headers: this.#standardResponseHeaders, }; } case "EXECUTE_JOB": { @@ -296,11 +312,12 @@ export class TriggerClient { }; } - const results = await this.#executeJob(execution.data, job); + const results = await this.#executeJob(execution.data, job, triggerVersion); return { status: 200, body: results, + headers: this.#standardResponseHeaders, }; } case "PREPROCESS_RUN": { @@ -335,6 +352,7 @@ export class TriggerClient { abort: results.abort, properties: results.properties, }, + headers: this.#standardResponseHeaders, }; } case "DELIVER_HTTP_SOURCE_REQUEST": { @@ -400,6 +418,7 @@ export class TriggerClient { response, metadata, }, + headers: this.#standardResponseHeaders, }; } case "VALIDATE": { @@ -409,6 +428,7 @@ export class TriggerClient { ok: true, endpointId: this.id, }, + headers: this.#standardResponseHeaders, }; } } @@ -418,6 +438,7 @@ export class TriggerClient { body: { message: "Method not allowed", }, + headers: this.#standardResponseHeaders, }; } @@ -664,12 +685,14 @@ export class TriggerClient { async #executeJob( body: RunJobBody, - job: Job, Record> + job: Job, Record>, + triggerVersion: string | null ): Promise { this.#internalLogger.debug("executing job", { execution: body, job: job.id, version: job.version, + triggerVersion, }); const context = this.#createRunContext(body); @@ -677,6 +700,9 @@ export class TriggerClient { const io = new IO({ id: body.run.id, cachedTasks: body.tasks, + cachedTasksCursor: body.cachedTaskCursor, + yieldedExecutions: body.yieldedExecutions ?? [], + noopTasksSet: body.noopTasksSet, apiClient: this.#client, logger: this.#internalLogger, client: this, @@ -685,6 +711,7 @@ export class TriggerClient { jobLogger: this.#options.ioLogLocalEnabled ? new Logger(job.id, job.logLevel ?? this.#options.logLevel ?? "info") : undefined, + serverVersion: triggerVersion, }); const resolvedConnections = await this.#resolveConnections( @@ -715,8 +742,20 @@ export class TriggerClient { ); }); + if (this.#options.verbose) { + this.#logIOStats(io.stats); + } + return { status: "SUCCESS", output }; } catch (error) { + if (this.#options.verbose) { + this.#logIOStats(io.stats); + } + + if (error instanceof YieldExecutionError) { + return { status: "YIELD_EXECUTION", key: error.key }; + } + if (error instanceof ParsedPayloadSchemaError) { return { status: "INVALID_PAYLOAD", errors: error.schemaErrors }; } @@ -1108,6 +1147,18 @@ export class TriggerClient { authSource, }; } + + #logIOStats(stats: IOStats) { + this.#internalLogger.debug("IO stats", { + stats, + }); + } + + get #standardResponseHeaders() { + return { + "Trigger-Version": API_VERSIONS.LAZY_LOADED_CACHED_TASKS, + }; + } } function dynamicTriggerRegisterSourceJobId(id: string) { diff --git a/references/job-catalog/src/stressTest.ts b/references/job-catalog/src/stressTest.ts index a6414046cc..305dc152db 100644 --- a/references/job-catalog/src/stressTest.ts +++ b/references/job-catalog/src/stressTest.ts @@ -5,7 +5,7 @@ export const client = new TriggerClient({ id: "job-catalog", apiKey: process.env["TRIGGER_API_KEY"], apiUrl: process.env["TRIGGER_API_URL"], - verbose: false, + verbose: true, ioLogLocalEnabled: true, }); @@ -139,4 +139,67 @@ client.defineJob({ }, }); +client.defineJob({ + id: "stress.logs-of-logs", + name: "Lots of Logs", + version: "1.0.0", + trigger: eventTrigger({ + name: "lots.of.logs", + }), + run: async (payload, io, ctx) => { + // Do lots of logs + for (let i = 0; i < payload.iterations; i++) { + await io.logger.info(`before-yield: Iteration ${i} started`); + } + + // Each are 300KB + for (let i = 0; i < payload.iterations; i++) { + await io.runTask( + `before.yield.${i}`, + async (task) => { + return { + i, + extra: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa\n".repeat( + (300 * payload.size) / 60 + ), + }; + }, + { name: `before-yield: Task ${i}` } + ); + } + + io.yield("yield 1"); + + // Do lots of logs + for (let i = 0; i < payload.iterations; i++) { + await io.logger.info(`after-yield: Iteration ${i} started`); + } + + for (let i = 0; i < payload.iterations; i++) { + await io.runTask( + `after-yield.task.${i}`, + async (task) => { + return { + i, + extra: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa\n".repeat( + (300 * payload.size) / 60 + ), + }; + }, + { name: `after-yield: Task ${i}` } + ); + } + + await io.wait("wait-1", 10); + + await io.runTask( + `after-wait.task`, + async (task) => { + return { i: 0 }; + }, + { name: `after-wait: Task 0` } + ); + }, +}); + createExpressServer(client);