diff --git a/apps/webapp/app/components/runs/v3/RetryDeploymentIndexingDialog.tsx b/apps/webapp/app/components/runs/v3/RetryDeploymentIndexingDialog.tsx deleted file mode 100644 index 8eb85f9946..0000000000 --- a/apps/webapp/app/components/runs/v3/RetryDeploymentIndexingDialog.tsx +++ /dev/null @@ -1,60 +0,0 @@ -import { ArrowPathIcon } from "@heroicons/react/20/solid"; -import { DialogClose } from "@radix-ui/react-dialog"; -import { Form, useNavigation } from "@remix-run/react"; -import { Button } from "~/components/primitives/Buttons"; -import { - DialogContent, - DialogDescription, - DialogFooter, - DialogHeader, -} from "~/components/primitives/Dialog"; -import { SpinnerWhite } from "~/components/primitives/Spinner"; - -type RetryDeploymentIndexingDialogProps = { - projectId: string; - deploymentShortCode: string; - redirectPath: string; -}; - -export function RetryDeploymentIndexingDialog({ - projectId, - deploymentShortCode, - redirectPath, -}: RetryDeploymentIndexingDialogProps) { - const navigation = useNavigation(); - - const formAction = `/resources/${projectId}/deployments/${deploymentShortCode}/retry-indexing`; - const isLoading = navigation.formAction === formAction; - - return ( - - Retry indexing this deployment? - - Retrying can be useful if indexing failed due to missing environment variables. Make sure - you set them before retrying. In most other cases, indexing will keep failing until you fix - any errors and re-deploy. - - - - - -
- -
-
-
- ); -} diff --git a/apps/webapp/app/env.server.ts b/apps/webapp/app/env.server.ts index 3297616866..c0e926728d 100644 --- a/apps/webapp/app/env.server.ts +++ b/apps/webapp/app/env.server.ts @@ -683,6 +683,46 @@ const EnvironmentSchema = z.object({ COMMON_WORKER_REDIS_TLS_DISABLED: z.string().default(process.env.REDIS_TLS_DISABLED ?? "false"), COMMON_WORKER_REDIS_CLUSTER_MODE_ENABLED: z.string().default("0"), + SCHEDULE_ENGINE_LOG_LEVEL: z.enum(["log", "error", "warn", "info", "debug"]).default("info"), + SCHEDULE_WORKER_ENABLED: z.string().default(process.env.WORKER_ENABLED ?? "true"), + SCHEDULE_WORKER_CONCURRENCY_WORKERS: z.coerce.number().int().default(1), + SCHEDULE_WORKER_CONCURRENCY_TASKS_PER_WORKER: z.coerce.number().int().default(1), + SCHEDULE_WORKER_POLL_INTERVAL: z.coerce.number().int().default(1000), + SCHEDULE_WORKER_IMMEDIATE_POLL_INTERVAL: z.coerce.number().int().default(50), + SCHEDULE_WORKER_CONCURRENCY_LIMIT: z.coerce.number().int().default(50), + SCHEDULE_WORKER_SHUTDOWN_TIMEOUT_MS: z.coerce.number().int().default(30_000), + SCHEDULE_WORKER_DISTRIBUTION_WINDOW_SECONDS: z.coerce.number().int().default(30), + + SCHEDULE_WORKER_REDIS_HOST: z + .string() + .optional() + .transform((v) => v ?? process.env.REDIS_HOST), + SCHEDULE_WORKER_REDIS_READER_HOST: z + .string() + .optional() + .transform((v) => v ?? process.env.REDIS_READER_HOST), + SCHEDULE_WORKER_REDIS_READER_PORT: z.coerce + .number() + .optional() + .transform( + (v) => + v ?? (process.env.REDIS_READER_PORT ? parseInt(process.env.REDIS_READER_PORT) : undefined) + ), + SCHEDULE_WORKER_REDIS_PORT: z.coerce + .number() + .optional() + .transform((v) => v ?? (process.env.REDIS_PORT ? parseInt(process.env.REDIS_PORT) : undefined)), + SCHEDULE_WORKER_REDIS_USERNAME: z + .string() + .optional() + .transform((v) => v ?? process.env.REDIS_USERNAME), + SCHEDULE_WORKER_REDIS_PASSWORD: z + .string() + .optional() + .transform((v) => v ?? process.env.REDIS_PASSWORD), + SCHEDULE_WORKER_REDIS_TLS_DISABLED: z.string().default(process.env.REDIS_TLS_DISABLED ?? "false"), + SCHEDULE_WORKER_REDIS_CLUSTER_MODE_ENABLED: z.string().default("0"), + TASK_EVENT_PARTITIONING_ENABLED: z.string().default("0"), TASK_EVENT_PARTITIONED_WINDOW_IN_SECONDS: z.coerce.number().int().default(60), // 1 minute diff --git a/apps/webapp/app/presenters/v3/NextRunListPresenter.server.ts b/apps/webapp/app/presenters/v3/NextRunListPresenter.server.ts index ec5703b60b..9c3d65edb6 100644 --- a/apps/webapp/app/presenters/v3/NextRunListPresenter.server.ts +++ b/apps/webapp/app/presenters/v3/NextRunListPresenter.server.ts @@ -190,6 +190,11 @@ export class NextRunListPresenter { prisma: this.replica as PrismaClient, }); + function clampToNow(date: Date): Date { + const now = new Date(); + return date > now ? now : date; + } + const { runs, pagination } = await runsRepository.listRuns({ organizationId, environmentId, @@ -200,8 +205,8 @@ export class NextRunListPresenter { tags, scheduleId, period: periodMs ?? undefined, - from, - to, + from: time.from ? time.from.getTime() : undefined, + to: time.to ? clampToNow(time.to).getTime() : undefined, isTest, rootOnly, batchId, diff --git a/apps/webapp/app/presenters/v3/RunListPresenter.server.ts b/apps/webapp/app/presenters/v3/RunListPresenter.server.ts index 9244428646..5b81e137cd 100644 --- a/apps/webapp/app/presenters/v3/RunListPresenter.server.ts +++ b/apps/webapp/app/presenters/v3/RunListPresenter.server.ts @@ -179,6 +179,12 @@ export class RunListPresenter extends BasePresenter { const periodMs = time.period ? parse(time.period) : undefined; + function clampToNow(date: Date): Date { + const now = new Date(); + + return date > now ? now : date; + } + //get the runs const runs = await this._replica.$queryRaw< { @@ -282,7 +288,9 @@ WHERE : Prisma.empty } ${ - time.to ? Prisma.sql`AND tr."createdAt" <= ${time.to.toISOString()}::timestamp` : Prisma.empty + time.to + ? Prisma.sql`AND tr."createdAt" <= ${clampToNow(time.to).toISOString()}::timestamp` + : Prisma.sql`AND tr."createdAt" <= CURRENT_TIMESTAMP` } ${ tags && tags.length > 0 diff --git a/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.deployments/route.tsx b/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.deployments/route.tsx index 701e6843f9..a92f2159b9 100644 --- a/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.deployments/route.tsx +++ b/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.deployments/route.tsx @@ -1,10 +1,12 @@ -import { ArrowPathIcon, ArrowUturnLeftIcon, BookOpenIcon } from "@heroicons/react/20/solid"; -import { type MetaFunction, Outlet, useLocation, useParams, useNavigate } from "@remix-run/react"; +import { ArrowUturnLeftIcon, BookOpenIcon } from "@heroicons/react/20/solid"; +import { type MetaFunction, Outlet, useLocation, useNavigate, useParams } from "@remix-run/react"; import { type LoaderFunctionArgs } from "@remix-run/server-runtime"; +import { useEffect } from "react"; import { typedjson, useTypedLoaderData } from "remix-typedjson"; import { z } from "zod"; import { PromoteIcon } from "~/assets/icons/PromoteIcon"; import { DeploymentsNone, DeploymentsNoneDev } from "~/components/BlankStatePanels"; +import { GitMetadata } from "~/components/GitMetadata"; import { UserAvatar } from "~/components/UserProfilePhoto"; import { MainCenteredContainer, PageBody, PageContainer } from "~/components/layout/AppLayout"; import { Badge } from "~/components/primitives/Badge"; @@ -34,7 +36,6 @@ import { deploymentStatusDescription, deploymentStatuses, } from "~/components/runs/v3/DeploymentStatus"; -import { RetryDeploymentIndexingDialog } from "~/components/runs/v3/RetryDeploymentIndexingDialog"; import { PromoteDeploymentDialog, RollbackDeploymentDialog, @@ -52,8 +53,6 @@ import { EnvironmentParamSchema, docsPath, v3DeploymentPath } from "~/utils/path import { createSearchParams } from "~/utils/searchParams"; import { deploymentIndexingIsRetryable } from "~/v3/deploymentStatus"; import { compareDeploymentVersions } from "~/v3/utils/deploymentVersions"; -import { useEffect } from "react"; -import { GitMetadata } from "~/components/GitMetadata"; export const meta: MetaFunction = () => { return [ @@ -388,26 +387,6 @@ function DeploymentActionsCell({ /> )} - {canRetryIndexing && ( - - - - - - - )} } /> diff --git a/apps/webapp/app/routes/resources.$projectId.deployments.$deploymentShortCode.retry-indexing.ts b/apps/webapp/app/routes/resources.$projectId.deployments.$deploymentShortCode.retry-indexing.ts deleted file mode 100644 index 5472f76d84..0000000000 --- a/apps/webapp/app/routes/resources.$projectId.deployments.$deploymentShortCode.retry-indexing.ts +++ /dev/null @@ -1,108 +0,0 @@ -import { parse } from "@conform-to/zod"; -import { ActionFunction, json } from "@remix-run/node"; -import { z } from "zod"; -import { prisma } from "~/db.server"; -import { redirectWithErrorMessage, redirectWithSuccessMessage } from "~/models/message.server"; -import { logger } from "~/services/logger.server"; -import { requireUserId } from "~/services/session.server"; -import { deploymentIndexingIsRetryable } from "~/v3/deploymentStatus"; -import { RetryDeploymentIndexingService } from "~/v3/services/retryDeploymentIndexing.server"; - -export const rollbackSchema = z.object({ - redirectUrl: z.string(), -}); - -const ParamSchema = z.object({ - projectId: z.string(), - deploymentShortCode: z.string(), -}); - -export const action: ActionFunction = async ({ request, params }) => { - const userId = await requireUserId(request); - const { projectId, deploymentShortCode } = ParamSchema.parse(params); - - console.log("projectId", projectId); - console.log("deploymentShortCode", deploymentShortCode); - - const formData = await request.formData(); - const submission = parse(formData, { schema: rollbackSchema }); - - if (!submission.value) { - return json(submission); - } - - try { - const project = await prisma.project.findUnique({ - where: { - id: projectId, - organization: { - members: { - some: { - userId, - }, - }, - }, - }, - }); - - if (!project) { - return redirectWithErrorMessage(submission.value.redirectUrl, request, "Project not found"); - } - - const deployment = await prisma.workerDeployment.findUnique({ - where: { - projectId_shortCode: { - projectId: project.id, - shortCode: deploymentShortCode, - }, - }, - }); - - if (!deployment) { - return redirectWithErrorMessage( - submission.value.redirectUrl, - request, - "Deployment not found" - ); - } - - if (!deploymentIndexingIsRetryable(deployment)) { - return redirectWithErrorMessage( - submission.value.redirectUrl, - request, - "Deployment indexing not in retryable state" - ); - } - - const startIndexing = new RetryDeploymentIndexingService(); - await startIndexing.call(deployment.id); - - return redirectWithSuccessMessage( - submission.value.redirectUrl, - request, - "Retrying deployment indexing" - ); - } catch (error) { - if (error instanceof Error) { - logger.error("Failed to retry deployment indexing", { - error: { - name: error.name, - message: error.message, - stack: error.stack, - }, - projectId, - deploymentShortCode, - }); - submission.error = { runParam: [error.message] }; - return json(submission); - } else { - logger.error("Failed to retry deployment indexing", { - error, - projectId, - deploymentShortCode, - }); - submission.error = { runParam: [JSON.stringify(error)] }; - return json(submission); - } - } -}; diff --git a/apps/webapp/app/runEngine/services/batchTrigger.server.ts b/apps/webapp/app/runEngine/services/batchTrigger.server.ts index f51c82cdc7..405d7aa831 100644 --- a/apps/webapp/app/runEngine/services/batchTrigger.server.ts +++ b/apps/webapp/app/runEngine/services/batchTrigger.server.ts @@ -14,7 +14,7 @@ import { env } from "~/env.server"; import { AuthenticatedEnvironment } from "~/services/apiAuth.server"; import { logger } from "~/services/logger.server"; import { getEntitlement } from "~/services/platform.v3.server"; -import { workerQueue } from "~/services/worker.server"; +import { commonWorker } from "~/v3/commonWorker.server"; import { downloadPacketFromObjectStore, uploadPacketToObjectStore } from "../../v3/r2.server"; import { ServiceValidationError, WithRunEngine } from "../../v3/services/baseService.server"; import { OutOfEntitlementError, TriggerTaskService } from "../../v3/services/triggerTask.server"; @@ -244,88 +244,80 @@ export class RunEngineBatchTriggerService extends WithRunEngine { } } } else { - return await $transaction(this._prisma, async (tx) => { - const batch = await tx.batchTaskRun.create({ - data: { - id: BatchId.fromFriendlyId(batchId), - friendlyId: batchId, - runtimeEnvironmentId: environment.id, - runCount: body.items.length, - runIds: [], - payload: payloadPacket.data, - payloadType: payloadPacket.dataType, - options, - batchVersion: "runengine:v1", - oneTimeUseToken: options.oneTimeUseToken, - }, + const batch = await this._prisma.batchTaskRun.create({ + data: { + id: BatchId.fromFriendlyId(batchId), + friendlyId: batchId, + runtimeEnvironmentId: environment.id, + runCount: body.items.length, + runIds: [], + payload: payloadPacket.data, + payloadType: payloadPacket.dataType, + options, + batchVersion: "runengine:v1", + oneTimeUseToken: options.oneTimeUseToken, + }, + }); + + if (body.parentRunId && body.resumeParentOnCompletion) { + await this._engine.blockRunWithCreatedBatch({ + runId: RunId.fromFriendlyId(body.parentRunId), + batchId: batch.id, + environmentId: environment.id, + projectId: environment.projectId, + organizationId: environment.organizationId, }); + } - if (body.parentRunId && body.resumeParentOnCompletion) { - await this._engine.blockRunWithCreatedBatch({ - runId: RunId.fromFriendlyId(body.parentRunId), + switch (this._batchProcessingStrategy) { + case "sequential": { + await this.#enqueueBatchTaskRun({ batchId: batch.id, - environmentId: environment.id, - projectId: environment.projectId, - organizationId: environment.organizationId, - tx, + processingId: batchId, + range: { start: 0, count: PROCESSING_BATCH_SIZE }, + attemptCount: 0, + strategy: this._batchProcessingStrategy, + parentRunId: body.parentRunId, + resumeParentOnCompletion: body.resumeParentOnCompletion, }); - } - switch (this._batchProcessingStrategy) { - case "sequential": { - await this.#enqueueBatchTaskRun( - { + break; + } + case "parallel": { + const ranges = Array.from({ + length: Math.ceil(body.items.length / PROCESSING_BATCH_SIZE), + }).map((_, index) => ({ + start: index * PROCESSING_BATCH_SIZE, + count: PROCESSING_BATCH_SIZE, + })); + + await Promise.all( + ranges.map((range, index) => + this.#enqueueBatchTaskRun({ batchId: batch.id, - processingId: batchId, - range: { start: 0, count: PROCESSING_BATCH_SIZE }, + processingId: `${index}`, + range, attemptCount: 0, strategy: this._batchProcessingStrategy, parentRunId: body.parentRunId, resumeParentOnCompletion: body.resumeParentOnCompletion, - }, - tx - ); - - break; - } - case "parallel": { - const ranges = Array.from({ - length: Math.ceil(body.items.length / PROCESSING_BATCH_SIZE), - }).map((_, index) => ({ - start: index * PROCESSING_BATCH_SIZE, - count: PROCESSING_BATCH_SIZE, - })); - - await Promise.all( - ranges.map((range, index) => - this.#enqueueBatchTaskRun( - { - batchId: batch.id, - processingId: `${index}`, - range, - attemptCount: 0, - strategy: this._batchProcessingStrategy, - parentRunId: body.parentRunId, - resumeParentOnCompletion: body.resumeParentOnCompletion, - }, - tx - ) - ) - ); + }) + ) + ); - break; - } + break; } + } - return batch; - }); + return batch; } } - async #enqueueBatchTaskRun(options: BatchProcessingOptions, tx?: PrismaClientOrTransaction) { - await workerQueue.enqueue("runengine.processBatchTaskRun", options, { - tx, - jobKey: `RunEngineBatchTriggerService.process:${options.batchId}:${options.processingId}`, + async #enqueueBatchTaskRun(options: BatchProcessingOptions) { + await commonWorker.enqueue({ + id: `RunEngineBatchTriggerService.process:${options.batchId}:${options.processingId}`, + job: "runengine.processBatchTaskRun", + payload: options, }); } diff --git a/apps/webapp/app/runEngine/services/triggerTask.server.ts b/apps/webapp/app/runEngine/services/triggerTask.server.ts index 29db094bfd..b621d7dee6 100644 --- a/apps/webapp/app/runEngine/services/triggerTask.server.ts +++ b/apps/webapp/app/runEngine/services/triggerTask.server.ts @@ -301,12 +301,14 @@ export class RunEngineTriggerTaskService { priorityMs: body.options?.priority ? body.options.priority * 1_000 : undefined, releaseConcurrency: body.options?.releaseConcurrency, queueTimestamp: - parentRun && body.options?.resumeParentOnCompletion + options.queueTimestamp ?? + (parentRun && body.options?.resumeParentOnCompletion ? parentRun.queueTimestamp ?? undefined - : undefined, + : undefined), runChainState, scheduleId: options.scheduleId, scheduleInstanceId: options.scheduleInstanceId, + createdAt: options.overrideCreatedAt, }, this.prisma ); diff --git a/apps/webapp/app/services/email.server.ts b/apps/webapp/app/services/email.server.ts index 92d6f4e165..290addcdc9 100644 --- a/apps/webapp/app/services/email.server.ts +++ b/apps/webapp/app/services/email.server.ts @@ -4,7 +4,7 @@ import type { SendEmailOptions } from "remix-auth-email-link"; import { redirect } from "remix-typedjson"; import { env } from "~/env.server"; import type { AuthUser } from "./authUser"; -import { workerQueue } from "./worker.server"; +import { commonWorker } from "~/v3/commonWorker.server"; import { logger } from "./logger.server"; import { singleton } from "~/utils/singleton"; import { assertEmailAllowed } from "~/utils/email"; @@ -93,8 +93,12 @@ export async function sendPlainTextEmail(options: SendPlainTextOptions) { } export async function scheduleEmail(data: DeliverEmail, delay?: { seconds: number }) { - const runAt = delay ? new Date(Date.now() + delay.seconds * 1000) : undefined; - await workerQueue.enqueue("scheduleEmail", data, { runAt }); + const availableAt = delay ? new Date(Date.now() + delay.seconds * 1000) : undefined; + await commonWorker.enqueue({ + job: "scheduleEmail", + payload: data, + availableAt, + }); } export async function sendEmail(data: DeliverEmail) { diff --git a/apps/webapp/app/services/runsRepository.server.ts b/apps/webapp/app/services/runsRepository.server.ts index 6d2befbb2c..154e6967c3 100644 --- a/apps/webapp/app/services/runsRepository.server.ts +++ b/apps/webapp/app/services/runsRepository.server.ts @@ -91,6 +91,10 @@ export class RunsRepository { if (options.to) { queryBuilder.where("created_at <= fromUnixTimestamp64Milli({to: Int64})", { to: options.to }); + } else { + queryBuilder.where("created_at <= fromUnixTimestamp64Milli({to: Int64})", { + to: Date.now(), + }); } if (typeof options.isTest === "boolean") { diff --git a/apps/webapp/app/services/worker.server.ts b/apps/webapp/app/services/worker.server.ts index 75a7ded41d..15de002d28 100644 --- a/apps/webapp/app/services/worker.server.ts +++ b/apps/webapp/app/services/worker.server.ts @@ -3,10 +3,16 @@ import { DeliverEmailSchema } from "emails"; import { z } from "zod"; import { $replica, prisma } from "~/db.server"; import { env } from "~/env.server"; +import { + BatchProcessingOptions as RunEngineBatchProcessingOptions, + RunEngineBatchTriggerService, +} from "~/runEngine/services/batchTrigger.server"; import { MarqsConcurrencyMonitor } from "~/v3/marqs/concurrencyMonitor.server"; +import { scheduleEngine } from "~/v3/scheduleEngine.server"; import { DeliverAlertService } from "~/v3/services/alerts/deliverAlert.server"; import { PerformDeploymentAlertsService } from "~/v3/services/alerts/performDeploymentAlerts.server"; import { PerformTaskRunAlertsService } from "~/v3/services/alerts/performTaskRunAlerts.server"; +import { BatchProcessingOptions, BatchTriggerV3Service } from "~/v3/services/batchTriggerV3.server"; import { PerformBulkActionService } from "~/v3/services/bulk/performBulkAction.server"; import { CancelDevSessionRunsService, @@ -16,56 +22,49 @@ import { CancelTaskAttemptDependenciesService } from "~/v3/services/cancelTaskAt import { EnqueueDelayedRunService } from "~/v3/services/enqueueDelayedRun.server"; import { ExecuteTasksWaitingForDeployService } from "~/v3/services/executeTasksWaitingForDeploy"; import { ExpireEnqueuedRunService } from "~/v3/services/expireEnqueuedRun.server"; -import { IndexDeploymentService } from "~/v3/services/indexDeployment.server"; import { ResumeBatchRunService } from "~/v3/services/resumeBatchRun.server"; import { ResumeTaskDependencyService } from "~/v3/services/resumeTaskDependency.server"; -import { ResumeTaskRunDependenciesService } from "~/v3/services/resumeTaskRunDependencies.server"; import { RetryAttemptService } from "~/v3/services/retryAttempt.server"; import { TimeoutDeploymentService } from "~/v3/services/timeoutDeployment.server"; -import { TriggerScheduledTaskService } from "~/v3/services/triggerScheduledTask.server"; import { GraphileMigrationHelperService } from "./db/graphileMigrationHelper.server"; import { sendEmail } from "./email.server"; -import { reportInvocationUsage } from "./platform.v3.server"; import { logger } from "./logger.server"; -import { BatchProcessingOptions, BatchTriggerV3Service } from "~/v3/services/batchTriggerV3.server"; -import { - BatchProcessingOptions as RunEngineBatchProcessingOptions, - RunEngineBatchTriggerService, -} from "~/runEngine/services/batchTrigger.server"; const workerCatalog = { + // @deprecated, moved to commonWorker.server.ts scheduleEmail: DeliverEmailSchema, - // v3 tasks - "v3.indexDeployment": z.object({ - id: z.string(), - }), - "v3.resumeTaskRunDependencies": z.object({ - attemptId: z.string(), - }), + // @deprecated, moved to commonWorker.server.ts "v3.resumeBatchRun": z.object({ batchRunId: z.string(), }), + // @deprecated, moved to commonWorker.server.ts "v3.resumeTaskDependency": z.object({ dependencyId: z.string(), sourceTaskAttemptId: z.string(), }), + // @deprecated, moved to commonWorker.server.ts "v3.timeoutDeployment": z.object({ deploymentId: z.string(), fromStatus: z.string(), errorMessage: z.string(), }), + // @deprecated, moved to commonWorker.server.ts "v3.executeTasksWaitingForDeploy": z.object({ backgroundWorkerId: z.string(), }), + // @deprecated, moved to ScheduleEngine "v3.triggerScheduledTask": z.object({ instanceId: z.string(), }), + // @deprecated, moved to commonWorker.server.ts "v3.performTaskRunAlerts": z.object({ runId: z.string(), }), + // @deprecated, moved to commonWorker.server.ts "v3.deliverAlert": z.object({ alertId: z.string(), }), + // @deprecated, moved to commonWorker.server.ts "v3.performDeploymentAlerts": z.object({ deploymentId: z.string(), }), @@ -75,30 +74,31 @@ const workerCatalog = { "v3.performBulkActionItem": z.object({ bulkActionItemId: z.string(), }), + // @deprecated, moved to legacyRunEngineWorker.server.ts "v3.requeueTaskRun": z.object({ runId: z.string(), }), + // @deprecated, moved to commonWorker.server.ts "v3.retryAttempt": z.object({ runId: z.string(), }), - "v3.reportUsage": z.object({ - orgId: z.string(), - data: z.object({ - costInCents: z.string(), - }), - additionalData: z.record(z.any()).optional(), - }), + // @deprecated, moved to commonWorker.server.ts "v3.enqueueDelayedRun": z.object({ runId: z.string(), }), + // @deprecated, moved to commonWorker.server.ts "v3.expireRun": z.object({ runId: z.string(), }), + // @deprecated, moved to commonWorker.server.ts "v3.cancelTaskAttemptDependencies": z.object({ attemptId: z.string(), }), + // @deprecated, moved to commonWorker.server.ts "v3.cancelDevSessionRuns": CancelDevSessionRunsServiceOptions, + // @deprecated, moved to commonWorker.server.ts "v3.processBatchTaskRun": BatchProcessingOptions, + // @deprecated, moved to commonWorker.server.ts "runengine.processBatchTaskRun": RunEngineBatchProcessingOptions, }; @@ -156,6 +156,7 @@ function getWorkerQueue() { }, }, tasks: { + // @deprecated, moved to commonWorker.server.ts scheduleEmail: { priority: 0, maxAttempts: 3, @@ -163,25 +164,7 @@ function getWorkerQueue() { await sendEmail(payload); }, }, - // v3 tasks - "v3.indexDeployment": { - priority: 0, - maxAttempts: 5, - handler: async (payload, job) => { - const service = new IndexDeploymentService(); - - return await service.call(payload.id); - }, - }, - "v3.resumeTaskRunDependencies": { - priority: 0, - maxAttempts: 5, - handler: async (payload, job) => { - const service = new ResumeTaskRunDependenciesService(); - - return await service.call(payload.attemptId); - }, - }, + // @deprecated, moved to commonWorker.server.ts "v3.resumeBatchRun": { priority: 0, maxAttempts: 5, @@ -191,6 +174,7 @@ function getWorkerQueue() { await service.call(payload.batchRunId); }, }, + // @deprecated, moved to commonWorker.server.ts "v3.resumeTaskDependency": { priority: 0, maxAttempts: 5, @@ -200,6 +184,7 @@ function getWorkerQueue() { return await service.call(payload.dependencyId, payload.sourceTaskAttemptId); }, }, + // @deprecated, moved to commonWorker.server.ts "v3.timeoutDeployment": { priority: 0, maxAttempts: 5, @@ -209,6 +194,7 @@ function getWorkerQueue() { return await service.call(payload.deploymentId, payload.fromStatus, payload.errorMessage); }, }, + // @deprecated, moved to commonWorker.server.ts "v3.executeTasksWaitingForDeploy": { priority: 0, maxAttempts: 5, @@ -218,15 +204,18 @@ function getWorkerQueue() { return await service.call(payload.backgroundWorkerId); }, }, + // @deprecated, moved to ScheduleEngine "v3.triggerScheduledTask": { priority: 0, maxAttempts: 3, // total delay of 30 seconds handler: async (payload, job) => { - const service = new TriggerScheduledTaskService(); - - return await service.call(payload.instanceId, job.attempts === job.max_attempts); + await scheduleEngine.triggerScheduledTask({ + instanceId: payload.instanceId, + finalAttempt: job.attempts === job.max_attempts, + }); }, }, + // @deprecated, moved to commonWorker.server.ts "v3.performTaskRunAlerts": { priority: 0, maxAttempts: 3, @@ -235,6 +224,7 @@ function getWorkerQueue() { return await service.call(payload.runId); }, }, + // @deprecated, moved to commonWorker.server.ts "v3.deliverAlert": { priority: 0, maxAttempts: 8, @@ -244,6 +234,7 @@ function getWorkerQueue() { return await service.call(payload.alertId); }, }, + // @deprecated, moved to commonWorker.server.ts "v3.performDeploymentAlerts": { priority: 0, maxAttempts: 3, @@ -276,6 +267,7 @@ function getWorkerQueue() { maxAttempts: 3, handler: async (payload, job) => {}, // This is now handled by redisWorker }, + // @deprecated, moved to commonWorker.server.ts "v3.retryAttempt": { priority: 0, maxAttempts: 3, @@ -285,17 +277,7 @@ function getWorkerQueue() { return await service.call(payload.runId); }, }, - "v3.reportUsage": { - priority: 0, - maxAttempts: 8, - handler: async (payload, job) => { - await reportInvocationUsage( - payload.orgId, - Number(payload.data.costInCents), - payload.additionalData - ); - }, - }, + // @deprecated, moved to commonWorker.server.ts "v3.enqueueDelayedRun": { priority: 0, maxAttempts: 8, @@ -305,6 +287,7 @@ function getWorkerQueue() { return await service.call(payload.runId); }, }, + // @deprecated, moved to commonWorker.server.ts "v3.expireRun": { priority: 0, maxAttempts: 8, @@ -314,6 +297,7 @@ function getWorkerQueue() { return await service.call(payload.runId); }, }, + // @deprecated, moved to commonWorker.server.ts "v3.cancelTaskAttemptDependencies": { priority: 0, maxAttempts: 8, @@ -323,6 +307,7 @@ function getWorkerQueue() { return await service.call(payload.attemptId); }, }, + // @deprecated, moved to commonWorker.server.ts "v3.cancelDevSessionRuns": { priority: 0, maxAttempts: 5, @@ -332,6 +317,7 @@ function getWorkerQueue() { return await service.call(payload); }, }, + // @deprecated, moved to commonWorker.server.ts "v3.processBatchTaskRun": { priority: 0, maxAttempts: 5, @@ -341,6 +327,7 @@ function getWorkerQueue() { await service.processBatchTaskRun(payload); }, }, + // @deprecated, moved to commonWorker.server.ts "runengine.processBatchTaskRun": { priority: 0, maxAttempts: 5, diff --git a/apps/webapp/app/v3/commonWorker.server.ts b/apps/webapp/app/v3/commonWorker.server.ts index 2bc6c43b4e..f735747ba6 100644 --- a/apps/webapp/app/v3/commonWorker.server.ts +++ b/apps/webapp/app/v3/commonWorker.server.ts @@ -1,14 +1,25 @@ -import { Worker as RedisWorker } from "@trigger.dev/redis-worker"; import { Logger } from "@trigger.dev/core/logger"; +import { Worker as RedisWorker } from "@trigger.dev/redis-worker"; +import { DeliverEmailSchema } from "emails"; import { z } from "zod"; import { env } from "~/env.server"; +import { RunEngineBatchTriggerService } from "~/runEngine/services/batchTrigger.server"; +import { sendEmail } from "~/services/email.server"; import { logger } from "~/services/logger.server"; import { singleton } from "~/utils/singleton"; import { DeliverAlertService } from "./services/alerts/deliverAlert.server"; import { PerformDeploymentAlertsService } from "./services/alerts/performDeploymentAlerts.server"; import { PerformTaskRunAlertsService } from "./services/alerts/performTaskRunAlerts.server"; -import { ExpireEnqueuedRunService } from "./services/expireEnqueuedRun.server"; +import { BatchTriggerV3Service } from "./services/batchTriggerV3.server"; +import { CancelDevSessionRunsService } from "./services/cancelDevSessionRuns.server"; +import { CancelTaskAttemptDependenciesService } from "./services/cancelTaskAttemptDependencies.server"; import { EnqueueDelayedRunService } from "./services/enqueueDelayedRun.server"; +import { ExecuteTasksWaitingForDeployService } from "./services/executeTasksWaitingForDeploy"; +import { ExpireEnqueuedRunService } from "./services/expireEnqueuedRun.server"; +import { ResumeBatchRunService } from "./services/resumeBatchRun.server"; +import { ResumeTaskDependencyService } from "./services/resumeTaskDependency.server"; +import { RetryAttemptService } from "./services/retryAttempt.server"; +import { TimeoutDeploymentService } from "./services/timeoutDeployment.server"; function initializeWorker() { const redisOptions = { @@ -27,6 +38,110 @@ function initializeWorker() { name: "common-worker", redisOptions, catalog: { + scheduleEmail: { + schema: DeliverEmailSchema, + visibilityTimeoutMs: 60_000, + retry: { + maxAttempts: 3, + }, + }, + "v3.resumeBatchRun": { + schema: z.object({ + batchRunId: z.string(), + }), + visibilityTimeoutMs: 60_000, + retry: { + maxAttempts: 5, + }, + }, + "v3.resumeTaskDependency": { + schema: z.object({ + dependencyId: z.string(), + sourceTaskAttemptId: z.string(), + }), + visibilityTimeoutMs: 60_000, + retry: { + maxAttempts: 5, + }, + }, + "v3.timeoutDeployment": { + schema: z.object({ + deploymentId: z.string(), + fromStatus: z.string(), + errorMessage: z.string(), + }), + visibilityTimeoutMs: 60_000, + retry: { + maxAttempts: 5, + }, + }, + "v3.executeTasksWaitingForDeploy": { + schema: z.object({ + backgroundWorkerId: z.string(), + }), + visibilityTimeoutMs: 60_000, + retry: { + maxAttempts: 5, + }, + }, + "v3.retryAttempt": { + schema: z.object({ + runId: z.string(), + }), + visibilityTimeoutMs: 60_000, + retry: { + maxAttempts: 3, + }, + }, + "v3.cancelTaskAttemptDependencies": { + schema: z.object({ + attemptId: z.string(), + }), + visibilityTimeoutMs: 60_000, + retry: { + maxAttempts: 8, + }, + }, + "v3.cancelDevSessionRuns": { + schema: z.object({ + runIds: z.array(z.string()), + cancelledAt: z.coerce.date(), + reason: z.string(), + cancelledSessionId: z.string().optional(), + }), + visibilityTimeoutMs: 60_000, + retry: { + maxAttempts: 5, + }, + }, + "v3.processBatchTaskRun": { + schema: z.object({ + batchId: z.string(), + processingId: z.string(), + range: z.object({ start: z.number().int(), count: z.number().int() }), + attemptCount: z.number().int(), + strategy: z.enum(["sequential", "parallel"]), + }), + visibilityTimeoutMs: 60_000, + retry: { + maxAttempts: 5, + }, + }, + "runengine.processBatchTaskRun": { + schema: z.object({ + batchId: z.string(), + processingId: z.string(), + range: z.object({ start: z.number().int(), count: z.number().int() }), + attemptCount: z.number().int(), + strategy: z.enum(["sequential", "parallel"]), + parentRunId: z.string().optional(), + resumeParentOnCompletion: z.boolean().optional(), + }), + visibilityTimeoutMs: 60_000, + retry: { + maxAttempts: 5, + }, + }, "v3.performTaskRunAlerts": { schema: z.object({ runId: z.string(), @@ -83,6 +198,45 @@ function initializeWorker() { shutdownTimeoutMs: env.COMMON_WORKER_SHUTDOWN_TIMEOUT_MS, logger: new Logger("CommonWorker", "debug"), jobs: { + scheduleEmail: async ({ payload }) => { + await sendEmail(payload); + }, + "v3.resumeBatchRun": async ({ payload }) => { + const service = new ResumeBatchRunService(); + await service.call(payload.batchRunId); + }, + "v3.resumeTaskDependency": async ({ payload }) => { + const service = new ResumeTaskDependencyService(); + await service.call(payload.dependencyId, payload.sourceTaskAttemptId); + }, + "v3.timeoutDeployment": async ({ payload }) => { + const service = new TimeoutDeploymentService(); + await service.call(payload.deploymentId, payload.fromStatus, payload.errorMessage); + }, + "v3.executeTasksWaitingForDeploy": async ({ payload }) => { + const service = new ExecuteTasksWaitingForDeployService(); + await service.call(payload.backgroundWorkerId); + }, + "v3.retryAttempt": async ({ payload }) => { + const service = new RetryAttemptService(); + await service.call(payload.runId); + }, + "v3.cancelTaskAttemptDependencies": async ({ payload }) => { + const service = new CancelTaskAttemptDependenciesService(); + await service.call(payload.attemptId); + }, + "v3.cancelDevSessionRuns": async ({ payload }) => { + const service = new CancelDevSessionRunsService(); + await service.call(payload); + }, + "v3.processBatchTaskRun": async ({ payload }) => { + const service = new BatchTriggerV3Service(payload.strategy); + await service.processBatchTaskRun(payload); + }, + "runengine.processBatchTaskRun": async ({ payload }) => { + const service = new RunEngineBatchTriggerService(payload.strategy); + await service.processBatchTaskRun(payload); + }, "v3.deliverAlert": async ({ payload }) => { const service = new DeliverAlertService(); diff --git a/apps/webapp/app/v3/scheduleEngine.server.ts b/apps/webapp/app/v3/scheduleEngine.server.ts new file mode 100644 index 0000000000..04dbf4e8f4 --- /dev/null +++ b/apps/webapp/app/v3/scheduleEngine.server.ts @@ -0,0 +1,121 @@ +import { ScheduleEngine } from "@internal/schedule-engine"; +import { stringifyIO } from "@trigger.dev/core/v3"; +import { prisma } from "~/db.server"; +import { env } from "~/env.server"; +import { devPresence } from "~/presenters/v3/DevPresence.server"; +import { logger } from "~/services/logger.server"; +import { singleton } from "~/utils/singleton"; +import { TriggerTaskService } from "./services/triggerTask.server"; +import { meter, tracer } from "./tracer.server"; + +export const scheduleEngine = singleton("ScheduleEngine", createScheduleEngine); + +export type { ScheduleEngine }; + +async function isDevEnvironmentConnectedHandler(environmentId: string) { + const environment = await prisma.runtimeEnvironment.findFirst({ + where: { + id: environmentId, + }, + select: { + currentSession: { + select: { + disconnectedAt: true, + }, + }, + project: { + select: { + engine: true, + }, + }, + }, + }); + + if (!environment) { + return false; + } + + if (environment.project.engine === "V1") { + const v3Disconnected = !environment.currentSession || environment.currentSession.disconnectedAt; + + return !v3Disconnected; + } + + const v4Connected = await devPresence.isConnected(environmentId); + + return v4Connected; +} + +function createScheduleEngine() { + const engine = new ScheduleEngine({ + prisma, + logLevel: env.SCHEDULE_ENGINE_LOG_LEVEL, + redis: { + host: env.SCHEDULE_WORKER_REDIS_HOST ?? "localhost", + port: env.SCHEDULE_WORKER_REDIS_PORT ?? 6379, + username: env.SCHEDULE_WORKER_REDIS_USERNAME, + password: env.SCHEDULE_WORKER_REDIS_PASSWORD, + keyPrefix: "schedule:", + enableAutoPipelining: true, + ...(env.SCHEDULE_WORKER_REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }), + }, + worker: { + concurrency: env.SCHEDULE_WORKER_CONCURRENCY_LIMIT, + pollIntervalMs: env.SCHEDULE_WORKER_POLL_INTERVAL, + shutdownTimeoutMs: env.SCHEDULE_WORKER_SHUTDOWN_TIMEOUT_MS, + disabled: env.SCHEDULE_WORKER_ENABLED === "0", + }, + distributionWindow: { + seconds: env.SCHEDULE_WORKER_DISTRIBUTION_WINDOW_SECONDS, + }, + tracer, + meter, + onTriggerScheduledTask: async ({ + taskIdentifier, + environment, + payload, + scheduleInstanceId, + scheduleId, + exactScheduleTime, + }) => { + try { + // This will trigger either v1 or v2 depending on the engine of the project + const triggerService = new TriggerTaskService(); + + const payloadPacket = await stringifyIO(payload); + + logger.debug("Triggering scheduled task", { + taskIdentifier, + environment, + payload, + scheduleInstanceId, + scheduleId, + exactScheduleTime, + }); + + const result = await triggerService.call( + taskIdentifier, + environment, + { payload: payloadPacket.data, options: { payloadType: payloadPacket.dataType } }, + { + customIcon: "scheduled", + scheduleId, + scheduleInstanceId, + queueTimestamp: exactScheduleTime, + overrideCreatedAt: exactScheduleTime, + } + ); + + return { success: !!result }; + } catch (error) { + return { + success: false, + error: error instanceof Error ? error.message : String(error), + }; + } + }, + isDevEnvironmentConnectedHandler: isDevEnvironmentConnectedHandler, + }); + + return engine; +} diff --git a/apps/webapp/app/v3/services/batchTriggerV3.server.ts b/apps/webapp/app/v3/services/batchTriggerV3.server.ts index 467a81f7cf..110ae492a1 100644 --- a/apps/webapp/app/v3/services/batchTriggerV3.server.ts +++ b/apps/webapp/app/v3/services/batchTriggerV3.server.ts @@ -20,7 +20,7 @@ import { batchTaskRunItemStatusForRunStatus } from "~/models/taskRun.server"; import { AuthenticatedEnvironment } from "~/services/apiAuth.server"; import { logger } from "~/services/logger.server"; import { getEntitlement } from "~/services/platform.v3.server"; -import { workerQueue } from "~/services/worker.server"; +import { commonWorker } from "../commonWorker.server"; import { generateFriendlyId } from "../friendlyIdentifiers"; import { legacyRunEngineWorker } from "../legacyRunEngineWorker.server"; import { marqs } from "../marqs/index.server"; @@ -477,75 +477,67 @@ export class BatchTriggerV3Service extends BaseService { return batch; } else { - return await $transaction(this._prisma, "create batch run", async (tx) => { - const batch = await tx.batchTaskRun.create({ - data: { - friendlyId: batchId, - runtimeEnvironmentId: environment.id, - idempotencyKey: options.idempotencyKey, - idempotencyKeyExpiresAt: options.idempotencyKeyExpiresAt, - dependentTaskAttemptId: dependentAttempt?.id, - runCount: body.items.length, - runIds: runs.map((r) => r.id), - payload: payloadPacket.data, - payloadType: payloadPacket.dataType, - options, - batchVersion: "v3", - oneTimeUseToken: options.oneTimeUseToken, - }, - }); + const batch = await this._prisma.batchTaskRun.create({ + data: { + friendlyId: batchId, + runtimeEnvironmentId: environment.id, + idempotencyKey: options.idempotencyKey, + idempotencyKeyExpiresAt: options.idempotencyKeyExpiresAt, + dependentTaskAttemptId: dependentAttempt?.id, + runCount: body.items.length, + runIds: runs.map((r) => r.id), + payload: payloadPacket.data, + payloadType: payloadPacket.dataType, + options, + batchVersion: "v3", + oneTimeUseToken: options.oneTimeUseToken, + }, + }); + + switch (this._batchProcessingStrategy) { + case "sequential": { + await this.#enqueueBatchTaskRun({ + batchId: batch.id, + processingId: batchId, + range: { start: 0, count: PROCESSING_BATCH_SIZE }, + attemptCount: 0, + strategy: this._batchProcessingStrategy, + }); - switch (this._batchProcessingStrategy) { - case "sequential": { - await this.#enqueueBatchTaskRun( - { + break; + } + case "parallel": { + const ranges = Array.from({ + length: Math.ceil(newRunCount / PROCESSING_BATCH_SIZE), + }).map((_, index) => ({ + start: index * PROCESSING_BATCH_SIZE, + count: PROCESSING_BATCH_SIZE, + })); + + await this._prisma.batchTaskRun.update({ + where: { id: batch.id }, + data: { + processingJobsExpectedCount: ranges.length, + }, + }); + + await Promise.all( + ranges.map((range, index) => + this.#enqueueBatchTaskRun({ batchId: batch.id, - processingId: batchId, - range: { start: 0, count: PROCESSING_BATCH_SIZE }, + processingId: `${index}`, + range, attemptCount: 0, strategy: this._batchProcessingStrategy, - }, - tx - ); - - break; - } - case "parallel": { - const ranges = Array.from({ - length: Math.ceil(newRunCount / PROCESSING_BATCH_SIZE), - }).map((_, index) => ({ - start: index * PROCESSING_BATCH_SIZE, - count: PROCESSING_BATCH_SIZE, - })); - - await tx.batchTaskRun.update({ - where: { id: batch.id }, - data: { - processingJobsExpectedCount: ranges.length, - }, - }); - - await Promise.all( - ranges.map((range, index) => - this.#enqueueBatchTaskRun( - { - batchId: batch.id, - processingId: `${index}`, - range, - attemptCount: 0, - strategy: this._batchProcessingStrategy, - }, - tx - ) - ) - ); + }) + ) + ); - break; - } + break; } + } - return batch; - }); + return batch; } } @@ -899,10 +891,11 @@ export class BatchTriggerV3Service extends BaseService { return false; } - async #enqueueBatchTaskRun(options: BatchProcessingOptions, tx?: PrismaClientOrTransaction) { - await workerQueue.enqueue("v3.processBatchTaskRun", options, { - tx, - jobKey: `BatchTriggerV2Service.process:${options.batchId}:${options.processingId}`, + async #enqueueBatchTaskRun(options: BatchProcessingOptions) { + await commonWorker.enqueue({ + id: `BatchTriggerV2Service.process:${options.batchId}:${options.processingId}`, + job: "v3.processBatchTaskRun", + payload: options, }); } diff --git a/apps/webapp/app/v3/services/cancelDevSessionRuns.server.ts b/apps/webapp/app/v3/services/cancelDevSessionRuns.server.ts index 98fbe762dc..f779d81641 100644 --- a/apps/webapp/app/v3/services/cancelDevSessionRuns.server.ts +++ b/apps/webapp/app/v3/services/cancelDevSessionRuns.server.ts @@ -1,10 +1,9 @@ -import { workerQueue } from "~/services/worker.server"; -import { BaseService } from "./baseService.server"; -import { PrismaClientOrTransaction } from "~/db.server"; import { z } from "zod"; +import { findLatestSession } from "~/models/runtimeEnvironment.server"; import { logger } from "~/services/logger.server"; +import { commonWorker } from "../commonWorker.server"; +import { BaseService } from "./baseService.server"; import { CancelTaskRunService } from "./cancelTaskRun.server"; -import { findLatestSession } from "~/models/runtimeEnvironment.server"; export const CancelDevSessionRunsServiceOptions = z.object({ runIds: z.array(z.string()), @@ -90,14 +89,14 @@ export class CancelDevSessionRunsService extends BaseService { } } - static async enqueue( - options: CancelDevSessionRunsServiceOptions, - runAt?: Date, - tx?: PrismaClientOrTransaction - ) { - return await workerQueue.enqueue("v3.cancelDevSessionRuns", options, { - tx, - runAt: runAt, + static async enqueue(options: CancelDevSessionRunsServiceOptions, runAt?: Date) { + return await commonWorker.enqueue({ + id: options.cancelledSessionId + ? `cancelDevSessionRuns:${options.cancelledSessionId}` + : undefined, + job: "v3.cancelDevSessionRuns", + payload: options, + availableAt: runAt, }); } } diff --git a/apps/webapp/app/v3/services/cancelTaskAttemptDependencies.server.ts b/apps/webapp/app/v3/services/cancelTaskAttemptDependencies.server.ts index 2e14ea1a73..f3ad291ac9 100644 --- a/apps/webapp/app/v3/services/cancelTaskAttemptDependencies.server.ts +++ b/apps/webapp/app/v3/services/cancelTaskAttemptDependencies.server.ts @@ -1,5 +1,5 @@ import { PrismaClientOrTransaction } from "~/db.server"; -import { workerQueue } from "~/services/worker.server"; +import { commonWorker } from "../commonWorker.server"; import { BaseService } from "./baseService.server"; import { logger } from "~/services/logger.server"; import { CancelTaskRunService } from "./cancelTaskRun.server"; @@ -57,17 +57,14 @@ export class CancelTaskAttemptDependenciesService extends BaseService { } } - static async enqueue(attemptId: string, tx: PrismaClientOrTransaction, runAt?: Date) { - return await workerQueue.enqueue( - "v3.cancelTaskAttemptDependencies", - { + static async enqueue(attemptId: string, runAt?: Date) { + return await commonWorker.enqueue({ + id: `cancelTaskAttemptDependencies:${attemptId}`, + job: "v3.cancelTaskAttemptDependencies", + payload: { attemptId, }, - { - tx, - runAt, - jobKey: `cancelTaskAttemptDependencies:${attemptId}`, - } - ); + availableAt: runAt, + }); } } diff --git a/apps/webapp/app/v3/services/cancelTaskRunV1.server.ts b/apps/webapp/app/v3/services/cancelTaskRunV1.server.ts index 1a8c136e5c..5d9d621a26 100644 --- a/apps/webapp/app/v3/services/cancelTaskRunV1.server.ts +++ b/apps/webapp/app/v3/services/cancelTaskRunV1.server.ts @@ -116,7 +116,7 @@ export class CancelTaskRunServiceV1 extends BaseService { attempts: ExtendedTaskRunAttempt[] ) { for (const attempt of attempts) { - await CancelTaskAttemptDependenciesService.enqueue(attempt.id, this._prisma); + await CancelTaskAttemptDependenciesService.enqueue(attempt.id); if (run.runtimeEnvironment.type === "DEVELOPMENT") { // Signal the task run attempt to stop diff --git a/apps/webapp/app/v3/services/changeCurrentDeployment.server.ts b/apps/webapp/app/v3/services/changeCurrentDeployment.server.ts index bf7968e91e..bc0dd04eaa 100644 --- a/apps/webapp/app/v3/services/changeCurrentDeployment.server.ts +++ b/apps/webapp/app/v3/services/changeCurrentDeployment.server.ts @@ -84,6 +84,6 @@ export class ChangeCurrentDeploymentService extends BaseService { }, }); - await ExecuteTasksWaitingForDeployService.enqueue(deployment.workerId, this._prisma); + await ExecuteTasksWaitingForDeployService.enqueue(deployment.workerId); } } diff --git a/apps/webapp/app/v3/services/completeAttempt.server.ts b/apps/webapp/app/v3/services/completeAttempt.server.ts index 618e823630..d0b9d911b6 100644 --- a/apps/webapp/app/v3/services/completeAttempt.server.ts +++ b/apps/webapp/app/v3/services/completeAttempt.server.ts @@ -482,7 +482,7 @@ export class CompleteAttemptService extends BaseService { const retryDirectly = () => { logger.debug("[CompleteAttemptService] Retrying attempt directly", { runId: run.id }); - return RetryAttemptService.enqueue(run.id, this._prisma, new Date(executionRetry.timestamp)); + return RetryAttemptService.enqueue(run.id, new Date(executionRetry.timestamp)); }; // There's a checkpoint, so we need to go through the queue diff --git a/apps/webapp/app/v3/services/createBackgroundWorker.server.ts b/apps/webapp/app/v3/services/createBackgroundWorker.server.ts index f93d6848c4..57d6fd0aa6 100644 --- a/apps/webapp/app/v3/services/createBackgroundWorker.server.ts +++ b/apps/webapp/app/v3/services/createBackgroundWorker.server.ts @@ -23,9 +23,9 @@ import { clampMaxDuration } from "../utils/maxDuration"; import { BaseService, ServiceValidationError } from "./baseService.server"; import { CheckScheduleService } from "./checkSchedule.server"; import { projectPubSub } from "./projectPubSub.server"; -import { RegisterNextTaskScheduleInstanceService } from "./registerNextTaskScheduleInstance.server"; import { tryCatch } from "@trigger.dev/core/v3"; import { engine } from "../runEngine.server"; +import { scheduleEngine } from "../scheduleEngine.server"; export class CreateBackgroundWorkerService extends BaseService { public async call( @@ -510,7 +510,6 @@ export async function syncDeclarativeSchedules( }); const checkSchedule = new CheckScheduleService(prisma); - const registerNextService = new RegisterNextTaskScheduleInstanceService(prisma); //start out by assuming they're all missing const missingSchedules = new Set( @@ -569,7 +568,7 @@ export async function syncDeclarativeSchedules( missingSchedules.delete(existingSchedule.id); const instance = schedule.instances.at(0); if (instance) { - await registerNextService.call(instance.id); + await scheduleEngine.registerNextTaskScheduleInstance({ instanceId: instance.id }); } else { throw new CreateDeclarativeScheduleError( `Missing instance for declarative schedule ${schedule.id}` @@ -601,7 +600,7 @@ export async function syncDeclarativeSchedules( const instance = newSchedule.instances.at(0); if (instance) { - await registerNextService.call(instance.id); + await scheduleEngine.registerNextTaskScheduleInstance({ instanceId: instance.id }); } else { throw new CreateDeclarativeScheduleError( `Missing instance for declarative schedule ${newSchedule.id}` diff --git a/apps/webapp/app/v3/services/createDeploymentBackgroundWorkerV3.server.ts b/apps/webapp/app/v3/services/createDeploymentBackgroundWorkerV3.server.ts index 1856bf3ef5..76be016528 100644 --- a/apps/webapp/app/v3/services/createDeploymentBackgroundWorkerV3.server.ts +++ b/apps/webapp/app/v3/services/createDeploymentBackgroundWorkerV3.server.ts @@ -161,7 +161,7 @@ export class CreateDeploymentBackgroundWorkerServiceV3 extends BaseService { }); } - await ExecuteTasksWaitingForDeployService.enqueue(backgroundWorker.id, this._prisma); + await ExecuteTasksWaitingForDeployService.enqueue(backgroundWorker.id); await PerformDeploymentAlertsService.enqueue(deployment.id); await TimeoutDeploymentService.dequeue(deployment.id, this._prisma); diff --git a/apps/webapp/app/v3/services/createDeploymentBackgroundWorkerV4.server.ts b/apps/webapp/app/v3/services/createDeploymentBackgroundWorkerV4.server.ts index 1931f085ff..4a7107b7eb 100644 --- a/apps/webapp/app/v3/services/createDeploymentBackgroundWorkerV4.server.ts +++ b/apps/webapp/app/v3/services/createDeploymentBackgroundWorkerV4.server.ts @@ -167,8 +167,7 @@ export class CreateDeploymentBackgroundWorkerServiceV4 extends BaseService { deployment.id, "DEPLOYING", "Indexing timed out", - new Date(Date.now() + env.DEPLOY_TIMEOUT_MS), - this._prisma + new Date(Date.now() + env.DEPLOY_TIMEOUT_MS) ); return backgroundWorker; diff --git a/apps/webapp/app/v3/services/executeTasksWaitingForDeploy.ts b/apps/webapp/app/v3/services/executeTasksWaitingForDeploy.ts index b3e2e4f724..931c804614 100644 --- a/apps/webapp/app/v3/services/executeTasksWaitingForDeploy.ts +++ b/apps/webapp/app/v3/services/executeTasksWaitingForDeploy.ts @@ -1,7 +1,7 @@ import { PrismaClientOrTransaction } from "~/db.server"; import { env } from "~/env.server"; import { logger } from "~/services/logger.server"; -import { workerQueue } from "~/services/worker.server"; +import { commonWorker } from "../commonWorker.server"; import { marqs } from "~/v3/marqs/index.server"; import { BaseService } from "./baseService.server"; @@ -99,22 +99,19 @@ export class ExecuteTasksWaitingForDeployService extends BaseService { if (runsWaitingForDeploy.length > maxCount) { await ExecuteTasksWaitingForDeployService.enqueue( backgroundWorkerId, - this._prisma, new Date(Date.now() + env.LEGACY_RUN_ENGINE_WAITING_FOR_DEPLOY_BATCH_STAGGER_MS) ); } } - static async enqueue(backgroundWorkerId: string, tx: PrismaClientOrTransaction, runAt?: Date) { - return await workerQueue.enqueue( - "v3.executeTasksWaitingForDeploy", - { + static async enqueue(backgroundWorkerId: string, runAt?: Date) { + return await commonWorker.enqueue({ + id: `v3.executeTasksWaitingForDeploy:${backgroundWorkerId}`, + job: "v3.executeTasksWaitingForDeploy", + payload: { backgroundWorkerId, }, - { - tx, - runAt, - } - ); + availableAt: runAt, + }); } } diff --git a/apps/webapp/app/v3/services/indexDeployment.server.ts b/apps/webapp/app/v3/services/indexDeployment.server.ts deleted file mode 100644 index 2148891996..0000000000 --- a/apps/webapp/app/v3/services/indexDeployment.server.ts +++ /dev/null @@ -1,108 +0,0 @@ -import { env } from "~/env.server"; -import { logger } from "~/services/logger.server"; -import { socketIo } from "../handleSocketIo.server"; -import { BaseService } from "./baseService.server"; -import { DeploymentIndexFailed } from "./deploymentIndexFailed.server"; -import { TimeoutDeploymentService } from "./timeoutDeployment.server"; -import { workerQueue } from "~/services/worker.server"; - -export class IndexDeploymentService extends BaseService { - public async call(id: string) { - const deployment = await this._prisma.workerDeployment.findFirst({ - where: { - id, - }, - include: { - environment: true, - }, - }); - - if (!deployment) { - logger.error(`No worker deployment with this ID: ${id}`); - return; - } - - if (!deployment.imageReference) { - logger.error(`No image reference for worker deployment: ${id}`); - return; - } - - if (deployment.workerId) { - logger.debug( - `Deployment have already been indexed for ${deployment.friendlyId}. Refreshing worker timestamp.` - ); - - await this._prisma.backgroundWorker.update({ - where: { - id: deployment.workerId, - }, - data: { - updatedAt: new Date(), - }, - }); - return; - } - - // just broadcast for now - there should only ever be one provider connected - try { - // timeout the deployment if 180 seconds have passed and the deployment is still not indexed - await TimeoutDeploymentService.enqueue( - deployment.id, - "DEPLOYING", - "Could not index deployment in time", - new Date(Date.now() + env.DEPLOY_TIMEOUT_MS) - ); - - const responses = await socketIo.providerNamespace.timeout(30_000).emitWithAck("INDEX", { - version: "v1", - shortCode: deployment.shortCode, - imageTag: deployment.imageReference, - apiKey: deployment.environment.apiKey, - apiUrl: env.APP_ORIGIN, - // identifiers - envId: deployment.environmentId, - envType: deployment.environment.type, - projectId: deployment.projectId, - orgId: deployment.environment.organizationId, - deploymentId: deployment.id, - }); - - logger.debug("Index ACK received", { responses }); - - if (responses.length > 0) { - const indexFailed = new DeploymentIndexFailed(); - - for (const response of responses) { - if (!response.success) { - await indexFailed.call(deployment.friendlyId, response.error); - } - } - } - } catch (error) { - logger.error("No index ACK received within timeout", { error }); - - const indexFailed = new DeploymentIndexFailed(); - - let indexError = { - message: `Could not index deployment: ${error}`, - name: "IndexError", - }; - - if (error instanceof Error) { - if (error.message === "operation has timed out") { - indexError = { message: "Provider failed to respond in time", name: "TimeoutError" }; - } else { - indexError = { message: error.message, name: error.name }; - } - } - - await indexFailed.call(deployment.friendlyId, indexError); - } - } - - static async enqueue(id: string) { - const runAt = new Date(Date.now() + 1000); // 1 second from now (give eventually-consistent DO time) - - await workerQueue.enqueue("v3.indexDeployment", { id }, { runAt }); - } -} diff --git a/apps/webapp/app/v3/services/registerNextTaskScheduleInstance.server.ts b/apps/webapp/app/v3/services/registerNextTaskScheduleInstance.server.ts deleted file mode 100644 index 9e2f0eda24..0000000000 --- a/apps/webapp/app/v3/services/registerNextTaskScheduleInstance.server.ts +++ /dev/null @@ -1,55 +0,0 @@ -import { startActiveSpan } from "../tracer.server"; -import { calculateNextScheduledTimestampFromNow } from "../utils/calculateNextSchedule.server"; -import { BaseService } from "./baseService.server"; -import { TriggerScheduledTaskService } from "./triggerScheduledTask.server"; - -export class RegisterNextTaskScheduleInstanceService extends BaseService { - public async call(instanceId: string) { - const instance = await this._prisma.taskScheduleInstance.findFirst({ - where: { - id: instanceId, - }, - include: { - taskSchedule: true, - environment: true, - }, - }); - - if (!instance) { - return; - } - - const nextScheduledTimestamp = await startActiveSpan( - "calculateNextScheduledTimestamp", - async (span) => { - span.setAttribute("task_schedule_id", instance.taskSchedule.id); - span.setAttribute("task_schedule_instance_id", instance.id); - span.setAttribute( - "task_schedule_generator_expression", - instance.taskSchedule.generatorExpression - ); - span.setAttribute( - "last_scheduled_timestamp", - instance.lastScheduledTimestamp?.toISOString() ?? new Date().toISOString() - ); - - return calculateNextScheduledTimestampFromNow( - instance.taskSchedule.generatorExpression, - instance.taskSchedule.timezone - ); - } - ); - - await this._prisma.taskScheduleInstance.update({ - where: { - id: instanceId, - }, - data: { - nextScheduledTimestamp, - }, - }); - - // Enqueue triggering the task at the next scheduled timestamp - await TriggerScheduledTaskService.enqueue(instanceId, nextScheduledTimestamp); - } -} diff --git a/apps/webapp/app/v3/services/resumeBatchRun.server.ts b/apps/webapp/app/v3/services/resumeBatchRun.server.ts index af81781547..d07e38eda8 100644 --- a/apps/webapp/app/v3/services/resumeBatchRun.server.ts +++ b/apps/webapp/app/v3/services/resumeBatchRun.server.ts @@ -1,5 +1,5 @@ import { PrismaClientOrTransaction } from "~/db.server"; -import { workerQueue } from "~/services/worker.server"; +import { commonWorker } from "../commonWorker.server"; import { marqs } from "~/v3/marqs/index.server"; import { BaseService } from "./baseService.server"; import { logger } from "~/services/logger.server"; @@ -334,17 +334,14 @@ export class ResumeBatchRunService extends BaseService { tx: PrismaClientOrTransaction, runAt?: Date ) { - return await workerQueue.enqueue( - "v3.resumeBatchRun", - { + return await commonWorker.enqueue({ + id: skipJobKey ? undefined : `resumeBatchRun-${batchRunId}`, + job: "v3.resumeBatchRun", + payload: { batchRunId, }, - { - tx, - runAt, - jobKey: skipJobKey ? undefined : `resumeBatchRun-${batchRunId}`, - } - ); + availableAt: runAt, + }); } } diff --git a/apps/webapp/app/v3/services/resumeDependentParents.server.ts b/apps/webapp/app/v3/services/resumeDependentParents.server.ts index 1bacfc22ee..0de5baa451 100644 --- a/apps/webapp/app/v3/services/resumeDependentParents.server.ts +++ b/apps/webapp/app/v3/services/resumeDependentParents.server.ts @@ -180,7 +180,7 @@ export class ResumeDependentParentsService extends BaseService { } //resume the dependent task - await ResumeTaskDependencyService.enqueue(dependency.id, lastAttempt.id, this._prisma); + await ResumeTaskDependencyService.enqueue(dependency.id, lastAttempt.id); return { success: true, action: "resume-scheduled", diff --git a/apps/webapp/app/v3/services/resumeTaskDependency.server.ts b/apps/webapp/app/v3/services/resumeTaskDependency.server.ts index b4ccc2368a..a831fdd78c 100644 --- a/apps/webapp/app/v3/services/resumeTaskDependency.server.ts +++ b/apps/webapp/app/v3/services/resumeTaskDependency.server.ts @@ -1,9 +1,8 @@ -import { PrismaClientOrTransaction } from "~/db.server"; +import { TaskRunDependency } from "@trigger.dev/database"; import { logger } from "~/services/logger.server"; -import { workerQueue } from "~/services/worker.server"; import { marqs } from "~/v3/marqs/index.server"; +import { commonWorker } from "../commonWorker.server"; import { BaseService } from "./baseService.server"; -import { TaskRunDependency } from "@trigger.dev/database"; export class ResumeTaskDependencyService extends BaseService { public async call(dependencyId: string, sourceTaskAttemptId: string) { @@ -154,22 +153,14 @@ export class ResumeTaskDependencyService extends BaseService { } } - static async enqueue( - dependencyId: string, - sourceTaskAttemptId: string, - tx: PrismaClientOrTransaction, - runAt?: Date - ) { - return await workerQueue.enqueue( - "v3.resumeTaskDependency", - { + static async enqueue(dependencyId: string, sourceTaskAttemptId: string, runAt?: Date) { + return await commonWorker.enqueue({ + job: "v3.resumeTaskDependency", + payload: { dependencyId, sourceTaskAttemptId, }, - { - tx, - runAt, - } - ); + availableAt: runAt, + }); } } diff --git a/apps/webapp/app/v3/services/resumeTaskRunDependencies.server.ts b/apps/webapp/app/v3/services/resumeTaskRunDependencies.server.ts deleted file mode 100644 index 9615af5abf..0000000000 --- a/apps/webapp/app/v3/services/resumeTaskRunDependencies.server.ts +++ /dev/null @@ -1,113 +0,0 @@ -import { - BatchTaskRun, - BatchTaskRunItem, - TaskRunAttempt, - TaskRunDependency, -} from "@trigger.dev/database"; -import { $transaction, PrismaClientOrTransaction } from "~/db.server"; -import { workerQueue } from "~/services/worker.server"; -import { BaseService } from "./baseService.server"; -import { ResumeBatchRunService } from "./resumeBatchRun.server"; -import { ResumeTaskDependencyService } from "./resumeTaskDependency.server"; -import { completeBatchTaskRunItemV3 } from "./batchTriggerV3.server"; - -export class ResumeTaskRunDependenciesService extends BaseService { - public async call(attemptId: string) { - const taskAttempt = await this._prisma.taskRunAttempt.findFirst({ - where: { id: attemptId }, - include: { - taskRun: { - include: { - batchItems: { - include: { - batchTaskRun: true, - }, - }, - dependency: { - include: { - dependentAttempt: true, - dependentBatchRun: true, - }, - }, - }, - }, - backgroundWorkerTask: true, - runtimeEnvironment: true, - }, - }); - - if (!taskAttempt) { - return; - } - - if (taskAttempt.runtimeEnvironment.type === "DEVELOPMENT") { - return; - } - - const { batchItems, dependency } = taskAttempt.taskRun; - - if (!batchItems.length && !dependency) { - return; - } - - if (batchItems.length) { - for (const batchItem of batchItems) { - await this.#resumeBatchItem(batchItem, batchItem.batchTaskRun, taskAttempt); - } - return; - } - - if (dependency) { - await this.#resumeDependency(dependency, taskAttempt); - return; - } - } - - async #resumeBatchItem( - batchItem: BatchTaskRunItem, - batchTaskRun: BatchTaskRun, - taskAttempt: TaskRunAttempt - ) { - if (batchTaskRun.batchVersion === "v3") { - await completeBatchTaskRunItemV3( - batchItem.id, - batchTaskRun.id, - this._prisma, - true, - taskAttempt.id - ); - } else { - await $transaction(this._prisma, async (tx) => { - await tx.batchTaskRunItem.update({ - where: { - id: batchItem.id, - }, - data: { - status: "COMPLETED", - taskRunAttemptId: taskAttempt.id, - }, - }); - - await ResumeBatchRunService.enqueue(batchItem.batchTaskRunId, false, tx); - }); - } - } - - async #resumeDependency(dependency: TaskRunDependency, taskAttempt: TaskRunAttempt) { - await ResumeTaskDependencyService.enqueue(dependency.id, taskAttempt.id, this._prisma); - } - - static async enqueue(attemptId: string, tx: PrismaClientOrTransaction, runAt?: Date) { - return await workerQueue.enqueue( - "v3.resumeTaskRunDependencies", - { - attemptId, - }, - { - tx, - runAt, - jobKey: `resumeTaskRunDependencies:${attemptId}`, - } - ); - } -} diff --git a/apps/webapp/app/v3/services/retryAttempt.server.ts b/apps/webapp/app/v3/services/retryAttempt.server.ts index 86844b5349..b4ab523576 100644 --- a/apps/webapp/app/v3/services/retryAttempt.server.ts +++ b/apps/webapp/app/v3/services/retryAttempt.server.ts @@ -1,8 +1,7 @@ -import { BaseService } from "./baseService.server"; import { logger } from "~/services/logger.server"; +import { commonWorker } from "../commonWorker.server"; import { socketIo } from "../handleSocketIo.server"; -import { PrismaClientOrTransaction } from "~/db.server"; -import { workerQueue } from "~/services/worker.server"; +import { BaseService } from "./baseService.server"; export class RetryAttemptService extends BaseService { public async call(runId: string) { @@ -23,17 +22,14 @@ export class RetryAttemptService extends BaseService { }); } - static async enqueue(runId: string, tx: PrismaClientOrTransaction, runAt?: Date) { - return await workerQueue.enqueue( - "v3.retryAttempt", - { + static async enqueue(runId: string, runAt?: Date) { + return await commonWorker.enqueue({ + id: `retryAttempt:${runId}`, + job: "v3.retryAttempt", + payload: { runId, }, - { - tx, - runAt, - jobKey: `retryAttempt:${runId}`, - } - ); + availableAt: runAt, + }); } } diff --git a/apps/webapp/app/v3/services/retryDeploymentIndexing.server.ts b/apps/webapp/app/v3/services/retryDeploymentIndexing.server.ts deleted file mode 100644 index 750448f7bb..0000000000 --- a/apps/webapp/app/v3/services/retryDeploymentIndexing.server.ts +++ /dev/null @@ -1,50 +0,0 @@ -import { deploymentIndexingIsRetryable } from "../deploymentStatus"; -import { BaseService } from "./baseService.server"; -import { IndexDeploymentService } from "./indexDeployment.server"; - -export class RetryDeploymentIndexingService extends BaseService { - public async call(deploymentId: string) { - const deployment = await this._prisma.workerDeployment.findUnique({ - where: { - id: deploymentId, - }, - }); - - if (!deployment) { - throw new Error("Deployment not found"); - } - - if (!deploymentIndexingIsRetryable(deployment)) { - throw new Error("Deployment indexing not retryable"); - } - - const latestDeployment = await this._prisma.workerDeployment.findFirst({ - where: { - projectId: deployment.projectId, - }, - select: { - id: true, - }, - orderBy: { - createdAt: "desc", - }, - }); - - if (!latestDeployment || latestDeployment.id !== deployment.id) { - throw new Error("Deployment is not the latest"); - } - - await this._prisma.workerDeployment.update({ - where: { - id: deploymentId, - }, - data: { - status: "DEPLOYING", - }, - }); - - await IndexDeploymentService.enqueue(deployment.id); - - return deployment; - } -} diff --git a/apps/webapp/app/v3/services/timeoutDeployment.server.ts b/apps/webapp/app/v3/services/timeoutDeployment.server.ts index ef59c1ddf3..5cf69f76e4 100644 --- a/apps/webapp/app/v3/services/timeoutDeployment.server.ts +++ b/apps/webapp/app/v3/services/timeoutDeployment.server.ts @@ -1,8 +1,9 @@ import { logger } from "~/services/logger.server"; import { BaseService } from "./baseService.server"; -import { workerQueue } from "~/services/worker.server"; +import { commonWorker } from "../commonWorker.server"; import { PerformDeploymentAlertsService } from "./alerts/performDeploymentAlerts.server"; import { PrismaClientOrTransaction } from "~/db.server"; +import { workerQueue } from "~/services/worker.server"; export class TimeoutDeploymentService extends BaseService { public async call(id: string, fromStatus: string, errorMessage: string) { @@ -46,26 +47,23 @@ export class TimeoutDeploymentService extends BaseService { deploymentId: string, fromStatus: string, errorMessage: string, - runAt: Date, - tx?: PrismaClientOrTransaction + runAt: Date ) { - await workerQueue.enqueue( - "v3.timeoutDeployment", - { + await commonWorker.enqueue({ + id: `timeoutDeployment:${deploymentId}`, + job: "v3.timeoutDeployment", + payload: { deploymentId, fromStatus, errorMessage, }, - { - runAt, - jobKey: `timeoutDeployment:${deploymentId}`, - jobKeyMode: "replace", - tx, - } - ); + availableAt: runAt, + }); } static async dequeue(deploymentId: string, tx?: PrismaClientOrTransaction) { + // For backwards compatibility during transition, we need to dequeue/ack from both workers await workerQueue.dequeue(`timeoutDeployment:${deploymentId}`, { tx }); + await commonWorker.ack(`timeoutDeployment:${deploymentId}`); } } diff --git a/apps/webapp/app/v3/services/triggerScheduledTask.server.ts b/apps/webapp/app/v3/services/triggerScheduledTask.server.ts deleted file mode 100644 index 52e0cfab63..0000000000 --- a/apps/webapp/app/v3/services/triggerScheduledTask.server.ts +++ /dev/null @@ -1,216 +0,0 @@ -import { stringifyIO } from "@trigger.dev/core/v3"; -import { type PrismaClientOrTransaction } from "~/db.server"; -import { devPresence } from "~/presenters/v3/DevPresence.server"; -import { logger } from "~/services/logger.server"; -import { workerQueue } from "~/services/worker.server"; -import { findCurrentWorkerDeployment } from "../models/workerDeployment.server"; -import { nextScheduledTimestamps } from "../utils/calculateNextSchedule.server"; -import { BaseService } from "./baseService.server"; -import { RegisterNextTaskScheduleInstanceService } from "./registerNextTaskScheduleInstance.server"; -import { TriggerTaskService } from "./triggerTask.server"; - -export class TriggerScheduledTaskService extends BaseService { - public async call(instanceId: string, finalAttempt: boolean) { - const registerNextService = new RegisterNextTaskScheduleInstanceService(); - - const instance = await this._prisma.taskScheduleInstance.findFirst({ - where: { - id: instanceId, - }, - include: { - taskSchedule: true, - environment: { - include: { - project: true, - organization: true, - currentSession: true, - }, - }, - }, - }); - - if (!instance) { - return; - } - - if (instance.environment.organization.deletedAt) { - logger.debug("Organization is deleted, disabling schedule", { - instanceId, - scheduleId: instance.taskSchedule.friendlyId, - organizationId: instance.environment.organization.id, - }); - - return; - } - - if (instance.environment.project.deletedAt) { - logger.debug("Project is deleted, disabling schedule", { - instanceId, - scheduleId: instance.taskSchedule.friendlyId, - projectId: instance.environment.project.id, - }); - - return; - } - - if (instance.environment.archivedAt) { - logger.debug("Environment is archived, disabling schedule", { - instanceId, - scheduleId: instance.taskSchedule.friendlyId, - environmentId: instance.environment.id, - }); - - return; - } - - try { - let shouldTrigger = true; - - if (!instance.active) { - shouldTrigger = false; - } - - if (!instance.taskSchedule.active) { - shouldTrigger = false; - } - - if (!instance.nextScheduledTimestamp) { - shouldTrigger = false; - } - - if (instance.environment.type === "DEVELOPMENT") { - //v3 - const v3Disconnected = - !instance.environment.currentSession || - instance.environment.currentSession.disconnectedAt; - //v4 - const v4Connected = await devPresence.isConnected(instance.environment.id); - - if (v3Disconnected && !v4Connected) { - shouldTrigger = false; - } - } - - if (instance.environment.type !== "DEVELOPMENT") { - // Get the current backgroundWorker for this environment - const currentWorkerDeployment = await findCurrentWorkerDeployment({ - environmentId: instance.environment.id, - }); - - if (!currentWorkerDeployment) { - logger.debug("No current worker deployment found, skipping task trigger", { - instanceId, - scheduleId: instance.taskSchedule.friendlyId, - environmentId: instance.environment.id, - }); - - shouldTrigger = false; - } else if ( - !currentWorkerDeployment.worker || - !currentWorkerDeployment.worker.tasks.some( - (t) => t.slug === instance.taskSchedule.taskIdentifier - ) - ) { - logger.debug( - "Current worker deployment does not contain the scheduled task identifier, skipping task trigger", - { - instanceId, - scheduleId: instance.taskSchedule.friendlyId, - environmentId: instance.environment.id, - workerDeploymentId: currentWorkerDeployment.id, - workerId: currentWorkerDeployment.worker?.id, - taskIdentifier: instance.taskSchedule.taskIdentifier, - } - ); - - shouldTrigger = false; - } - } - - if (shouldTrigger) { - // Enqueue triggering the task - const triggerTask = new TriggerTaskService(); - - const payload = { - scheduleId: instance.taskSchedule.friendlyId, - type: instance.taskSchedule.type, - timestamp: instance.nextScheduledTimestamp, - lastTimestamp: instance.lastScheduledTimestamp ?? undefined, - externalId: instance.taskSchedule.externalId ?? undefined, - timezone: instance.taskSchedule.timezone, - upcoming: nextScheduledTimestamps( - instance.taskSchedule.generatorExpression, - instance.taskSchedule.timezone, - instance.nextScheduledTimestamp!, - 10 - ), - }; - - const payloadPacket = await stringifyIO(payload); - - logger.debug("Triggering scheduled task", { - instance, - payloadPacket, - }); - - const result = await triggerTask.call( - instance.taskSchedule.taskIdentifier, - instance.environment, - { payload: payloadPacket.data, options: { payloadType: payloadPacket.dataType } }, - { - customIcon: "scheduled", - scheduleId: instance.taskSchedule.id, - scheduleInstanceId: instance.id, - } - ); - - if (!result) { - logger.error("Failed to trigger task", { - instanceId, - scheduleId: instance.taskSchedule.friendlyId, - payloadPacket, - }); - } else { - await this._prisma.taskSchedule.update({ - where: { - id: instance.taskSchedule.id, - }, - data: { - lastRunTriggeredAt: new Date(), - }, - }); - } - } - - await this._prisma.taskScheduleInstance.update({ - where: { - id: instanceId, - }, - data: { - lastScheduledTimestamp: instance.nextScheduledTimestamp, - }, - }); - - await registerNextService.call(instanceId); - } catch (e) { - if (finalAttempt) { - logger.error("Failed to trigger scheduled task, rescheduling the next run", { - instanceId, - error: e, - }); - - await registerNextService.call(instanceId); - } else { - throw e; - } - } - } - - public static async enqueue(instanceId: string, runAt: Date, tx?: PrismaClientOrTransaction) { - return await workerQueue.enqueue( - "v3.triggerScheduledTask", - { instanceId }, - { tx, jobKey: `scheduled-task-instance:${instanceId}`, runAt } - ); - } -} diff --git a/apps/webapp/app/v3/services/triggerTask.server.ts b/apps/webapp/app/v3/services/triggerTask.server.ts index c055ddb904..954c271faa 100644 --- a/apps/webapp/app/v3/services/triggerTask.server.ts +++ b/apps/webapp/app/v3/services/triggerTask.server.ts @@ -31,6 +31,8 @@ export type TriggerTaskServiceOptions = { oneTimeUseToken?: string; scheduleId?: string; scheduleInstanceId?: string; + queueTimestamp?: Date; + overrideCreatedAt?: Date; }; export class OutOfEntitlementError extends Error { diff --git a/apps/webapp/app/v3/services/triggerTaskV1.server.ts b/apps/webapp/app/v3/services/triggerTaskV1.server.ts index ff33cd8011..f57a0cb6a0 100644 --- a/apps/webapp/app/v3/services/triggerTaskV1.server.ts +++ b/apps/webapp/app/v3/services/triggerTaskV1.server.ts @@ -364,6 +364,7 @@ export class TriggerTaskServiceV1 extends BaseService { : 0; const queueTimestamp = + options.queueTimestamp ?? dependentAttempt?.taskRun.queueTimestamp ?? dependentBatchRun?.dependentTaskAttempt?.taskRun.queueTimestamp ?? delayUntil ?? @@ -438,6 +439,7 @@ export class TriggerTaskServiceV1 extends BaseService { machinePreset: body.options?.machine, scheduleId: options.scheduleId, scheduleInstanceId: options.scheduleInstanceId, + createdAt: options.overrideCreatedAt, }, }); diff --git a/apps/webapp/app/v3/services/upsertTaskSchedule.server.ts b/apps/webapp/app/v3/services/upsertTaskSchedule.server.ts index 9f27bb4c70..6d5fa7d495 100644 --- a/apps/webapp/app/v3/services/upsertTaskSchedule.server.ts +++ b/apps/webapp/app/v3/services/upsertTaskSchedule.server.ts @@ -1,13 +1,12 @@ import { type Prisma, type TaskSchedule } from "@trigger.dev/database"; import cronstrue from "cronstrue"; import { nanoid } from "nanoid"; -import { $transaction } from "~/db.server"; import { generateFriendlyId } from "../friendlyIdentifiers"; import { type UpsertSchedule } from "../schedules"; import { calculateNextScheduledTimestampFromNow } from "../utils/calculateNextSchedule.server"; import { BaseService, ServiceValidationError } from "./baseService.server"; import { CheckScheduleService } from "./checkSchedule.server"; -import { RegisterNextTaskScheduleInstanceService } from "./registerNextTaskScheduleInstance.server"; +import { scheduleEngine } from "../scheduleEngine.server"; export type UpsertTaskScheduleServiceOptions = UpsertSchedule; @@ -49,17 +48,15 @@ export class UpsertTaskScheduleService extends BaseService { }, }); - const result = await (async (tx) => { - if (existingSchedule) { - if (existingSchedule.type === "DECLARATIVE") { - throw new ServiceValidationError("Cannot update a declarative schedule"); - } - - return await this.#updateExistingSchedule(existingSchedule, schedule); - } else { - return await this.#createNewSchedule(schedule, projectId, deduplicationKey); + let result; + if (existingSchedule) { + if (existingSchedule.type === "DECLARATIVE") { + throw new ServiceValidationError("Cannot update a declarative schedule"); } - })(); + result = await this.#updateExistingSchedule(existingSchedule, schedule); + } else { + result = await this.#createNewSchedule(schedule, projectId, deduplicationKey); + } if (!result) { throw new ServiceValidationError("Failed to create or update schedule"); @@ -92,54 +89,45 @@ export class UpsertTaskScheduleService extends BaseService { projectId: string, deduplicationKey: string ) { - return await $transaction( - this._prisma, - "UpsertTaskSchedule.upsertNewSchedule", - async (tx, span) => { - const scheduleRecord = await tx.taskSchedule.create({ - data: { - projectId, - friendlyId: generateFriendlyId("sched"), - taskIdentifier: options.taskIdentifier, - deduplicationKey, - userProvidedDeduplicationKey: - options.deduplicationKey !== undefined && options.deduplicationKey !== "", - generatorExpression: options.cron, - generatorDescription: cronstrue.toString(options.cron), - timezone: options.timezone ?? "UTC", - externalId: options.externalId ? options.externalId : undefined, - }, - }); - - const registerNextService = new RegisterNextTaskScheduleInstanceService(tx); - - //create the instances (links to environments) + const scheduleRecord = await this._prisma.taskSchedule.create({ + data: { + projectId, + friendlyId: generateFriendlyId("sched"), + taskIdentifier: options.taskIdentifier, + deduplicationKey, + userProvidedDeduplicationKey: + options.deduplicationKey !== undefined && options.deduplicationKey !== "", + generatorExpression: options.cron, + generatorDescription: cronstrue.toString(options.cron), + timezone: options.timezone ?? "UTC", + externalId: options.externalId ? options.externalId : undefined, + }, + }); - for (const environmentId of options.environments) { - const instance = await tx.taskScheduleInstance.create({ - data: { - taskScheduleId: scheduleRecord.id, - environmentId, - }, + //create the instances (links to environments) + for (const environmentId of options.environments) { + const instance = await this._prisma.taskScheduleInstance.create({ + data: { + taskScheduleId: scheduleRecord.id, + environmentId, + }, + include: { + environment: { include: { - environment: { + orgMember: { include: { - orgMember: { - include: { - user: true, - }, - }, + user: true, }, }, }, - }); + }, + }, + }); - await registerNextService.call(instance.id); - } + await scheduleEngine.registerNextTaskScheduleInstance({ instanceId: instance.id }); + } - return { scheduleRecord }; - } - ); + return { scheduleRecord }; } async #updateExistingSchedule( @@ -164,89 +152,83 @@ export class UpsertTaskScheduleService extends BaseService { }, }); - return await $transaction( - this._prisma, - async (tx) => { - const scheduleRecord = await tx.taskSchedule.update({ - where: { - id: existingSchedule.id, - }, - data: { - generatorExpression: options.cron, - generatorDescription: cronstrue.toString(options.cron), - timezone: options.timezone ?? "UTC", - externalId: options.externalId ? options.externalId : null, - }, - }); + const scheduleRecord = await this._prisma.taskSchedule.update({ + where: { + id: existingSchedule.id, + }, + data: { + generatorExpression: options.cron, + generatorDescription: cronstrue.toString(options.cron), + timezone: options.timezone ?? "UTC", + externalId: options.externalId ? options.externalId : null, + }, + }); - const scheduleHasChanged = - scheduleRecord.generatorExpression !== existingSchedule.generatorExpression || - scheduleRecord.timezone !== existingSchedule.timezone; + const scheduleHasChanged = + scheduleRecord.generatorExpression !== existingSchedule.generatorExpression || + scheduleRecord.timezone !== existingSchedule.timezone; - // create the new instances - const newInstances: InstanceWithEnvironment[] = []; - const updatingInstances: InstanceWithEnvironment[] = []; + // create the new instances + const newInstances: InstanceWithEnvironment[] = []; + const updatingInstances: InstanceWithEnvironment[] = []; - for (const environmentId of options.environments) { - const existingInstance = existingInstances.find((i) => i.environmentId === environmentId); + for (const environmentId of options.environments) { + const existingInstance = existingInstances.find((i) => i.environmentId === environmentId); - if (existingInstance) { - // Update the existing instance - updatingInstances.push(existingInstance); - } else { - // Create a new instance - const instance = await tx.taskScheduleInstance.create({ - data: { - taskScheduleId: scheduleRecord.id, - environmentId, - }, + if (existingInstance) { + // Update the existing instance + updatingInstances.push(existingInstance); + } else { + // Create a new instance + const instance = await this._prisma.taskScheduleInstance.create({ + data: { + taskScheduleId: scheduleRecord.id, + environmentId, + }, + include: { + environment: { include: { - environment: { + orgMember: { include: { - orgMember: { - include: { - user: true, - }, - }, + user: true, }, }, }, - }); - - newInstances.push(instance); - } - } + }, + }, + }); - // find the instances that need to be removed - const instancesToDeleted = existingInstances.filter( - (i) => !options.environments.includes(i.environmentId) - ); + newInstances.push(instance); + } + } - // delete the instances no longer selected - for (const instance of instancesToDeleted) { - await tx.taskScheduleInstance.delete({ - where: { - id: instance.id, - }, - }); - } + // find the instances that need to be removed + const instancesToDeleted = existingInstances.filter( + (i) => !options.environments.includes(i.environmentId) + ); - const registerService = new RegisterNextTaskScheduleInstanceService(tx); + // delete the instances no longer selected + for (const instance of instancesToDeleted) { + await this._prisma.taskScheduleInstance.delete({ + where: { + id: instance.id, + }, + }); + } - for (const instance of newInstances) { - await registerService.call(instance.id); - } + for (const instance of newInstances) { + // Register the new task schedule instances + await scheduleEngine.registerNextTaskScheduleInstance({ instanceId: instance.id }); + } - if (scheduleHasChanged) { - for (const instance of updatingInstances) { - await registerService.call(instance.id); - } - } + if (scheduleHasChanged) { + for (const instance of updatingInstances) { + // Update the existing task schedule instances + await scheduleEngine.registerNextTaskScheduleInstance({ instanceId: instance.id }); + } + } - return { scheduleRecord }; - }, - { timeout: 10_000 } - ); + return { scheduleRecord }; } #createReturnObject(taskSchedule: TaskSchedule, instances: InstanceWithEnvironment[]) { diff --git a/apps/webapp/package.json b/apps/webapp/package.json index 8fd37d8c4d..892bc8ecec 100644 --- a/apps/webapp/package.json +++ b/apps/webapp/package.json @@ -52,6 +52,7 @@ "@heroicons/react": "^2.0.12", "@internal/redis": "workspace:*", "@internal/run-engine": "workspace:*", + "@internal/schedule-engine": "workspace:*", "@internal/tracing": "workspace:*", "@internal/zod-worker": "workspace:*", "@internationalized/date": "^3.5.1", diff --git a/internal-packages/run-engine/src/engine/index.ts b/internal-packages/run-engine/src/engine/index.ts index 620eac3f64..8ab7c85db9 100644 --- a/internal-packages/run-engine/src/engine/index.ts +++ b/internal-packages/run-engine/src/engine/index.ts @@ -366,6 +366,7 @@ export class RunEngine { runChainState, scheduleId, scheduleInstanceId, + createdAt, }: TriggerParams, tx?: PrismaClientOrTransaction ): Promise { @@ -439,6 +440,7 @@ export class RunEngine { runChainState, scheduleId, scheduleInstanceId, + createdAt, executionSnapshots: { create: { engine: "V2", diff --git a/internal-packages/run-engine/src/engine/types.ts b/internal-packages/run-engine/src/engine/types.ts index f07dd703ab..a9610ff536 100644 --- a/internal-packages/run-engine/src/engine/types.ts +++ b/internal-packages/run-engine/src/engine/types.ts @@ -133,6 +133,7 @@ export type TriggerParams = { runChainState?: RunChainState; scheduleId?: string; scheduleInstanceId?: string; + createdAt?: Date; }; export type EngineWorker = Worker; diff --git a/internal-packages/schedule-engine/README.md b/internal-packages/schedule-engine/README.md new file mode 100644 index 0000000000..28be432ee8 --- /dev/null +++ b/internal-packages/schedule-engine/README.md @@ -0,0 +1,110 @@ +# @internal/schedule-engine + +The `@internal/schedule-engine` package encapsulates all scheduling logic for Trigger.dev, providing a clean API boundary for managing scheduled tasks and their execution. + +## Architecture + +The ScheduleEngine follows the same pattern as the RunEngine, providing: + +- **Centralized Schedule Management**: All schedule-related operations go through the ScheduleEngine +- **Redis Worker Integration**: Built-in Redis-based distributed task scheduling +- **Distributed Execution**: Prevents thundering herd issues by distributing executions across time windows +- **Comprehensive Testing**: Built-in utilities for testing schedule behavior + +## Key Components + +### ScheduleEngine Class + +The main interface for all schedule operations: + +```typescript +import { ScheduleEngine } from "@internal/schedule-engine"; + +const engine = new ScheduleEngine({ + prisma, + redis: { + /* Redis configuration */ + }, + worker: { + /* Worker configuration */ + }, + distributionWindow: { seconds: 30 }, // Optional: default 30s +}); + +// Register next schedule instance +await engine.registerNextTaskScheduleInstance({ instanceId }); + +// Upsert a schedule +await engine.upsertTaskSchedule({ + projectId, + schedule: { + taskIdentifier: "my-task", + cron: "0 */5 * * *", + timezone: "UTC", + environments: ["env-1", "env-2"], + }, +}); +``` + +### Distributed Scheduling + +The engine includes built-in distributed scheduling to prevent all scheduled tasks from executing at exactly the same moment: + +```typescript +import { calculateDistributedExecutionTime } from "@internal/schedule-engine"; + +const exactTime = new Date("2024-01-01T12:00:00Z"); +const distributedTime = calculateDistributedExecutionTime(exactTime, 30); // 30-second window +``` + +### Schedule Calculation + +High-performance CRON schedule calculation with optimization for old timestamps: + +```typescript +import { + calculateNextScheduledTimestampFromNow, + nextScheduledTimestamps, +} from "@internal/schedule-engine"; + +const nextRun = calculateNextScheduledTimestampFromNow("0 */5 * * *", "UTC"); +const upcoming = nextScheduledTimestamps("0 */5 * * *", "UTC", nextRun, 5); +``` + +## Integration with Webapp + +The ScheduleEngine should be the **API boundary** between the webapp and schedule logic. Services in the webapp should call into the ScheduleEngine rather than implementing schedule logic directly. + +### Migration Path + +Currently, the webapp uses individual services like: + +- `RegisterNextTaskScheduleInstanceService` +- `TriggerScheduledTaskService` +- Schedule calculation utilities + +These should be replaced with ScheduleEngine method calls: + +```typescript +// Old approach +const service = new RegisterNextTaskScheduleInstanceService(tx); +await service.call(instanceId); + +// New approach +await scheduleEngine.registerNextTaskScheduleInstance({ instanceId }); +``` + +## Configuration + +The ScheduleEngine expects these configuration options: + +- `prisma`: PrismaClient instance +- `redis`: Redis connection configuration +- `worker`: Worker configuration (concurrency, polling intervals) +- `distributionWindow`: Optional time window for distributed execution +- `tracer`: Optional OpenTelemetry tracer +- `meter`: Optional OpenTelemetry meter + +## Testing + +The package includes comprehensive test utilities and examples. See the test directory for usage examples. diff --git a/internal-packages/schedule-engine/package.json b/internal-packages/schedule-engine/package.json new file mode 100644 index 0000000000..ecdf151247 --- /dev/null +++ b/internal-packages/schedule-engine/package.json @@ -0,0 +1,39 @@ +{ + "name": "@internal/schedule-engine", + "private": true, + "version": "0.0.1", + "main": "./dist/src/index.js", + "types": "./dist/src/index.d.ts", + "type": "module", + "exports": { + ".": { + "@triggerdotdev/source": "./src/index.ts", + "import": "./dist/src/index.js", + "types": "./dist/src/index.d.ts", + "default": "./dist/src/index.js" + } + }, + "dependencies": { + "@internal/redis": "workspace:*", + "@trigger.dev/redis-worker": "workspace:*", + "@internal/tracing": "workspace:*", + "@trigger.dev/core": "workspace:*", + "@trigger.dev/database": "workspace:*", + "cron-parser": "^4.9.0", + "cronstrue": "^2.50.0", + "nanoid": "3.3.8", + "zod": "3.23.8" + }, + "devDependencies": { + "@internal/testcontainers": "workspace:*", + "rimraf": "6.0.1" + }, + "scripts": { + "clean": "rimraf dist", + "typecheck": "tsc --noEmit -p tsconfig.build.json", + "test": "vitest --sequence.concurrent=false --no-file-parallelism", + "test:coverage": "vitest --sequence.concurrent=false --no-file-parallelism --coverage.enabled", + "build": "pnpm run clean && tsc -p tsconfig.build.json", + "dev": "tsc --watch -p tsconfig.build.json" + } +} \ No newline at end of file diff --git a/internal-packages/schedule-engine/src/engine/distributedScheduling.ts b/internal-packages/schedule-engine/src/engine/distributedScheduling.ts new file mode 100644 index 0000000000..c73b514b35 --- /dev/null +++ b/internal-packages/schedule-engine/src/engine/distributedScheduling.ts @@ -0,0 +1,29 @@ +/** + * Calculates a distributed execution time for a scheduled task. + * Tasks are distributed across a time window before the exact schedule time + * to prevent thundering herd issues while maintaining schedule accuracy. + */ +export function calculateDistributedExecutionTime( + exactScheduleTime: Date, + distributionWindowSeconds: number = 30 +): Date { + // Use the ISO string of the exact schedule time as the seed for consistency + const seed = exactScheduleTime.toISOString(); + + // Create a simple hash from the seed string + let hash = 0; + for (let i = 0; i < seed.length; i++) { + const char = seed.charCodeAt(i); + hash = (hash << 5) - hash + char; + hash = hash & hash; // Convert to 32-bit integer + } + + // Convert hash to a value between 0 and 1 + const normalized = Math.abs(hash) / Math.pow(2, 31); + + // Calculate offset in milliseconds (0 to distributionWindowSeconds * 1000) + const offsetMs = Math.floor(normalized * distributionWindowSeconds * 1000); + + // Return time that's offsetMs before the exact schedule time + return new Date(exactScheduleTime.getTime() - offsetMs); +} diff --git a/internal-packages/schedule-engine/src/engine/index.ts b/internal-packages/schedule-engine/src/engine/index.ts new file mode 100644 index 0000000000..c1fa859d99 --- /dev/null +++ b/internal-packages/schedule-engine/src/engine/index.ts @@ -0,0 +1,661 @@ +import { + Counter, + getMeter, + getTracer, + Histogram, + Meter, + startSpan, + Tracer, +} from "@internal/tracing"; +import { Logger } from "@trigger.dev/core/logger"; +import { PrismaClient } from "@trigger.dev/database"; +import { Worker, type JobHandlerParams } from "@trigger.dev/redis-worker"; +import { calculateDistributedExecutionTime } from "./distributedScheduling.js"; +import { calculateNextScheduledTimestamp, nextScheduledTimestamps } from "./scheduleCalculation.js"; +import { + RegisterScheduleInstanceParams, + ScheduleEngineOptions, + TriggerScheduledTaskCallback, + TriggerScheduleParams, +} from "./types.js"; +import { scheduleWorkerCatalog } from "./workerCatalog.js"; +import { tryCatch } from "@trigger.dev/core/utils"; + +export class ScheduleEngine { + private worker: Worker; + private logger: Logger; + private tracer: Tracer; + private meter: Meter; + private distributionWindowSeconds: number; + + // Metrics + private scheduleRegistrationCounter: Counter; + private scheduleExecutionCounter: Counter; + private scheduleExecutionDuration: Histogram; + private scheduleExecutionFailureCounter: Counter; + private distributionOffsetHistogram: Histogram; + private devEnvironmentCheckCounter: Counter; + + prisma: PrismaClient; + + private onTriggerScheduledTask: TriggerScheduledTaskCallback; + + constructor(private readonly options: ScheduleEngineOptions) { + this.logger = + options.logger ?? new Logger("ScheduleEngine", (this.options.logLevel ?? "info") as any); + this.prisma = options.prisma; + this.distributionWindowSeconds = options.distributionWindow?.seconds ?? 30; + this.onTriggerScheduledTask = options.onTriggerScheduledTask; + + this.tracer = options.tracer ?? getTracer("schedule-engine"); + this.meter = options.meter ?? getMeter("schedule-engine"); + + // Initialize metrics + this.scheduleRegistrationCounter = this.meter.createCounter("schedule_registrations_total", { + description: "Total number of schedule registrations", + }); + + this.scheduleExecutionCounter = this.meter.createCounter("schedule_executions_total", { + description: "Total number of schedule executions", + }); + + this.scheduleExecutionDuration = this.meter.createHistogram("schedule_execution_duration_ms", { + description: "Duration of schedule execution in milliseconds", + unit: "ms", + }); + + this.scheduleExecutionFailureCounter = this.meter.createCounter( + "schedule_execution_failures_total", + { + description: "Total number of schedule execution failures", + } + ); + + this.distributionOffsetHistogram = this.meter.createHistogram( + "schedule_distribution_offset_ms", + { + description: "Distribution offset from exact schedule time in milliseconds", + unit: "ms", + } + ); + + this.devEnvironmentCheckCounter = this.meter.createCounter("dev_environment_checks_total", { + description: "Total number of development environment connectivity checks", + }); + + this.worker = new Worker({ + name: "schedule-engine-worker", + redisOptions: { + ...options.redis, + keyPrefix: `${options.redis.keyPrefix ?? ""}schedule:`, + }, + catalog: scheduleWorkerCatalog, + concurrency: { + limit: options.worker.concurrency, + }, + pollIntervalMs: options.worker.pollIntervalMs, + shutdownTimeoutMs: options.worker.shutdownTimeoutMs, + logger: new Logger("ScheduleEngineWorker", "debug"), + jobs: { + "schedule.triggerScheduledTask": this.#handleTriggerScheduledTaskJob.bind(this), + }, + }); + + if (!options.worker.disabled) { + this.worker.start(); + this.logger.info("Schedule engine worker started", { + concurrency: options.worker.concurrency, + pollIntervalMs: options.worker.pollIntervalMs, + distributionWindowSeconds: this.distributionWindowSeconds, + }); + } else { + this.logger.info("Schedule engine worker disabled"); + } + } + + /** + * Registers the next scheduled instance for a schedule + */ + async registerNextTaskScheduleInstance(params: RegisterScheduleInstanceParams) { + return startSpan(this.tracer, "registerNextTaskScheduleInstance", async (span) => { + const startTime = Date.now(); + + span.setAttribute("instanceId", params.instanceId); + + this.logger.debug("Starting schedule registration", { + instanceId: params.instanceId, + }); + + try { + const instance = await this.prisma.taskScheduleInstance.findFirst({ + where: { + id: params.instanceId, + }, + include: { + taskSchedule: true, + environment: true, + }, + }); + + if (!instance) { + this.logger.warn("Schedule instance not found during registration", { + instanceId: params.instanceId, + }); + span.setAttribute("error", "instance_not_found"); + return; + } + + span.setAttribute("task_schedule_id", instance.taskSchedule.id); + span.setAttribute("task_schedule_instance_id", instance.id); + span.setAttribute("task_identifier", instance.taskSchedule.taskIdentifier); + span.setAttribute("environment_type", instance.environment.type); + span.setAttribute("schedule_active", instance.active); + span.setAttribute("task_schedule_active", instance.taskSchedule.active); + span.setAttribute( + "task_schedule_generator_expression", + instance.taskSchedule.generatorExpression + ); + + const lastScheduledTimestamp = instance.lastScheduledTimestamp ?? new Date(); + span.setAttribute("last_scheduled_timestamp", lastScheduledTimestamp.toISOString()); + + const nextScheduledTimestamp = calculateNextScheduledTimestamp( + instance.taskSchedule.generatorExpression, + instance.taskSchedule.timezone, + lastScheduledTimestamp + ); + + span.setAttribute("next_scheduled_timestamp", nextScheduledTimestamp.toISOString()); + + const schedulingDelayMs = nextScheduledTimestamp.getTime() - Date.now(); + span.setAttribute("scheduling_delay_ms", schedulingDelayMs); + + this.logger.info("Calculated next schedule timestamp", { + instanceId: params.instanceId, + taskIdentifier: instance.taskSchedule.taskIdentifier, + nextScheduledTimestamp: nextScheduledTimestamp.toISOString(), + schedulingDelayMs, + generatorExpression: instance.taskSchedule.generatorExpression, + timezone: instance.taskSchedule.timezone, + }); + + await this.prisma.taskScheduleInstance.update({ + where: { + id: params.instanceId, + }, + data: { + nextScheduledTimestamp, + }, + }); + + // Enqueue the scheduled task + await this.enqueueScheduledTask(params.instanceId, nextScheduledTimestamp); + + // Record metrics + this.scheduleRegistrationCounter.add(1, { + environment_type: instance.environment.type, + schedule_type: instance.taskSchedule.type, + }); + + const duration = Date.now() - startTime; + this.logger.debug("Schedule registration completed", { + instanceId: params.instanceId, + durationMs: duration, + }); + + span.setAttribute("success", true); + span.setAttribute("duration_ms", duration); + } catch (error) { + const duration = Date.now() - startTime; + this.logger.error("Failed to register schedule instance", { + instanceId: params.instanceId, + durationMs: duration, + error: error instanceof Error ? error.message : String(error), + }); + + span.setAttribute("error", true); + span.setAttribute("error_message", error instanceof Error ? error.message : String(error)); + span.setAttribute("duration_ms", duration); + + throw error; + } + }); + } + + async #handleTriggerScheduledTaskJob({ + payload, + }: JobHandlerParams) { + await this.triggerScheduledTask({ + instanceId: payload.instanceId, + finalAttempt: false, // TODO: implement retry logic + exactScheduleTime: payload.exactScheduleTime, + }); + } + + /** + * Triggers a scheduled task (called by the Redis worker) + */ + async triggerScheduledTask(params: TriggerScheduleParams) { + return startSpan(this.tracer, "triggerScheduledTask", async (span) => { + const startTime = Date.now(); + + span.setAttribute("instanceId", params.instanceId); + span.setAttribute("finalAttempt", params.finalAttempt); + if (params.exactScheduleTime) { + span.setAttribute("exactScheduleTime", params.exactScheduleTime.toISOString()); + } + + this.logger.debug("Starting scheduled task trigger", { + instanceId: params.instanceId, + finalAttempt: params.finalAttempt, + exactScheduleTime: params.exactScheduleTime?.toISOString(), + }); + + let taskIdentifier: string | undefined; + let environmentType: string | undefined; + let scheduleType: string | undefined; + + try { + const instance = await this.prisma.taskScheduleInstance.findFirst({ + where: { + id: params.instanceId, + }, + include: { + taskSchedule: true, + environment: { + include: { + project: true, + organization: true, + orgMember: true, + }, + }, + }, + }); + + if (!instance) { + this.logger.debug("Schedule instance not found", { + instanceId: params.instanceId, + }); + span.setAttribute("error", "instance_not_found"); + return; + } + + taskIdentifier = instance.taskSchedule.taskIdentifier; + environmentType = instance.environment.type; + scheduleType = instance.taskSchedule.type; + + span.setAttribute("task_identifier", taskIdentifier); + span.setAttribute("environment_type", environmentType); + span.setAttribute("schedule_type", scheduleType); + span.setAttribute("organization_id", instance.environment.organization.id); + span.setAttribute("project_id", instance.environment.project.id); + span.setAttribute("environment_id", instance.environment.id); + + // Check if organization/project/environment is still valid + if (instance.environment.organization.deletedAt) { + this.logger.debug("Organization is deleted, skipping schedule", { + instanceId: params.instanceId, + scheduleId: instance.taskSchedule.friendlyId, + organizationId: instance.environment.organization.id, + }); + span.setAttribute("skip_reason", "organization_deleted"); + return; + } + + if (instance.environment.project.deletedAt) { + this.logger.debug("Project is deleted, skipping schedule", { + instanceId: params.instanceId, + scheduleId: instance.taskSchedule.friendlyId, + projectId: instance.environment.project.id, + }); + span.setAttribute("skip_reason", "project_deleted"); + return; + } + + if (instance.environment.archivedAt) { + this.logger.debug("Environment is archived, skipping schedule", { + instanceId: params.instanceId, + scheduleId: instance.taskSchedule.friendlyId, + environmentId: instance.environment.id, + }); + span.setAttribute("skip_reason", "environment_archived"); + return; + } + + let shouldTrigger = true; + let skipReason: string | undefined; + + if (!instance.active || !instance.taskSchedule.active) { + this.logger.debug("Schedule is inactive", { + instanceId: params.instanceId, + instanceActive: instance.active, + scheduleActive: instance.taskSchedule.active, + }); + shouldTrigger = false; + skipReason = "schedule_inactive"; + } + + if (!instance.nextScheduledTimestamp) { + this.logger.debug("No next scheduled timestamp", { + instanceId: params.instanceId, + }); + shouldTrigger = false; + skipReason = "no_next_timestamp"; + } + + // For development environments, check if there's an active session + if (instance.environment.type === "DEVELOPMENT") { + this.devEnvironmentCheckCounter.add(1, { + environment_id: instance.environment.id, + }); + + const [devConnectedError, isConnected] = await tryCatch( + this.options.isDevEnvironmentConnectedHandler(instance.environment.id) + ); + + if (devConnectedError) { + this.logger.error("Error checking if development environment is connected", { + instanceId: params.instanceId, + environmentId: instance.environment.id, + error: devConnectedError, + }); + span.setAttribute("dev_connection_check_error", true); + shouldTrigger = false; + skipReason = "dev_connection_check_failed"; + } else if (!isConnected) { + this.logger.debug("Development environment is disconnected", { + instanceId: params.instanceId, + environmentId: instance.environment.id, + }); + span.setAttribute("dev_connected", false); + shouldTrigger = false; + skipReason = "dev_disconnected"; + } else { + span.setAttribute("dev_connected", true); + } + } + + span.setAttribute("should_trigger", shouldTrigger); + if (skipReason) { + span.setAttribute("skip_reason", skipReason); + } + + if (shouldTrigger) { + const scheduleTimestamp = + params.exactScheduleTime ?? instance.nextScheduledTimestamp ?? new Date(); + + const payload = { + scheduleId: instance.taskSchedule.friendlyId, + type: instance.taskSchedule.type as "DECLARATIVE" | "IMPERATIVE", + timestamp: scheduleTimestamp, + lastTimestamp: instance.lastScheduledTimestamp ?? undefined, + externalId: instance.taskSchedule.externalId ?? undefined, + timezone: instance.taskSchedule.timezone, + upcoming: nextScheduledTimestamps( + instance.taskSchedule.generatorExpression, + instance.taskSchedule.timezone, + scheduleTimestamp, + 10 + ), + }; + + // Calculate execution timing metrics + const actualExecutionTime = new Date(); + const schedulingAccuracyMs = actualExecutionTime.getTime() - scheduleTimestamp.getTime(); + + span.setAttribute("scheduling_accuracy_ms", schedulingAccuracyMs); + span.setAttribute("actual_execution_time", actualExecutionTime.toISOString()); + + this.logger.info("Triggering scheduled task", { + instanceId: params.instanceId, + taskIdentifier: instance.taskSchedule.taskIdentifier, + scheduleTimestamp: scheduleTimestamp.toISOString(), + actualExecutionTime: actualExecutionTime.toISOString(), + schedulingAccuracyMs, + lastTimestamp: instance.lastScheduledTimestamp?.toISOString(), + }); + + const triggerStartTime = Date.now(); + + // Rewritten try/catch to use tryCatch utility + const [triggerError, result] = await tryCatch( + this.onTriggerScheduledTask({ + taskIdentifier: instance.taskSchedule.taskIdentifier, + environment: instance.environment, + payload, + scheduleInstanceId: instance.id, + scheduleId: instance.taskSchedule.id, + exactScheduleTime: params.exactScheduleTime, + }) + ); + + const triggerDuration = Date.now() - triggerStartTime; + + this.scheduleExecutionDuration.record(triggerDuration, { + environment_type: environmentType, + schedule_type: scheduleType, + }); + + if (triggerError) { + this.logger.error("Error calling trigger callback", { + instanceId: params.instanceId, + taskIdentifier: instance.taskSchedule.taskIdentifier, + durationMs: triggerDuration, + error: triggerError instanceof Error ? triggerError.message : String(triggerError), + }); + + this.scheduleExecutionFailureCounter.add(1, { + environment_type: environmentType, + schedule_type: scheduleType, + error_type: "callback_error", + }); + + span.setAttribute("trigger_error", true); + span.setAttribute( + "trigger_error_message", + triggerError instanceof Error ? triggerError.message : String(triggerError) + ); + } else if (result) { + if (result.success) { + // Update the last run triggered timestamp + await this.prisma.taskSchedule.update({ + where: { + id: instance.taskSchedule.id, + }, + data: { + lastRunTriggeredAt: new Date(), + }, + }); + + this.logger.info("Successfully triggered scheduled task", { + instanceId: params.instanceId, + taskIdentifier: instance.taskSchedule.taskIdentifier, + durationMs: triggerDuration, + }); + + this.scheduleExecutionCounter.add(1, { + environment_type: environmentType, + schedule_type: scheduleType, + status: "success", + }); + + span.setAttribute("trigger_success", true); + } else { + this.logger.error("Failed to trigger scheduled task", { + instanceId: params.instanceId, + taskIdentifier: instance.taskSchedule.taskIdentifier, + durationMs: triggerDuration, + error: result.error, + }); + + this.scheduleExecutionFailureCounter.add(1, { + environment_type: environmentType, + schedule_type: scheduleType, + error_type: "task_failure", + }); + + span.setAttribute("trigger_success", false); + if (result.error) { + span.setAttribute("trigger_error_message", result.error); + } + } + } + + span.setAttribute("trigger_duration_ms", triggerDuration); + } else { + this.logger.debug("Skipping task trigger due to conditions", { + instanceId: params.instanceId, + reason: skipReason, + }); + + this.scheduleExecutionCounter.add(1, { + environment_type: environmentType ?? "unknown", + schedule_type: scheduleType ?? "unknown", + status: "skipped", + }); + } + + // Always update the last scheduled timestamp and register next run + await this.prisma.taskScheduleInstance.update({ + where: { + id: params.instanceId, + }, + data: { + lastScheduledTimestamp: instance.nextScheduledTimestamp, + }, + }); + + // Register the next run + // Rewritten try/catch to use tryCatch utility + const [nextRunError] = await tryCatch( + this.registerNextTaskScheduleInstance({ instanceId: params.instanceId }) + ); + if (nextRunError) { + this.logger.error("Failed to schedule next run after execution", { + instanceId: params.instanceId, + error: nextRunError instanceof Error ? nextRunError.message : String(nextRunError), + }); + + span.setAttribute("next_run_registration_error", true); + span.setAttribute( + "next_run_error_message", + nextRunError instanceof Error ? nextRunError.message : String(nextRunError) + ); + + if (!params.finalAttempt) { + throw nextRunError; + } + } else { + span.setAttribute("next_run_registered", true); + } + + const totalDuration = Date.now() - startTime; + this.logger.debug("Scheduled task trigger completed", { + instanceId: params.instanceId, + totalDurationMs: totalDuration, + }); + + span.setAttribute("total_duration_ms", totalDuration); + span.setAttribute("success", true); + } catch (error) { + const totalDuration = Date.now() - startTime; + this.logger.error("Failed to trigger scheduled task", { + instanceId: params.instanceId, + totalDurationMs: totalDuration, + error: error instanceof Error ? error.message : String(error), + }); + + this.scheduleExecutionFailureCounter.add(1, { + environment_type: environmentType ?? "unknown", + schedule_type: scheduleType ?? "unknown", + error_type: "system_error", + }); + + span.setAttribute("error", true); + span.setAttribute("error_message", error instanceof Error ? error.message : String(error)); + span.setAttribute("total_duration_ms", totalDuration); + + throw error; + } + }); + } + + /** + * Enqueues a scheduled task with distributed execution timing + */ + private async enqueueScheduledTask(instanceId: string, exactScheduleTime: Date) { + return startSpan(this.tracer, "enqueueScheduledTask", async (span) => { + span.setAttribute("instanceId", instanceId); + span.setAttribute("exactScheduleTime", exactScheduleTime.toISOString()); + + const distributedExecutionTime = calculateDistributedExecutionTime( + exactScheduleTime, + this.distributionWindowSeconds + ); + + const distributionOffsetMs = exactScheduleTime.getTime() - distributedExecutionTime.getTime(); + + span.setAttribute("distributedExecutionTime", distributedExecutionTime.toISOString()); + span.setAttribute("distributionOffsetMs", distributionOffsetMs); + span.setAttribute("distributionWindowSeconds", this.distributionWindowSeconds); + + this.distributionOffsetHistogram.record(distributionOffsetMs, { + distribution_window_seconds: this.distributionWindowSeconds.toString(), + }); + + this.logger.debug("Enqueuing scheduled task with distributed execution", { + instanceId, + exactScheduleTime: exactScheduleTime.toISOString(), + distributedExecutionTime: distributedExecutionTime.toISOString(), + distributionOffsetMs, + distributionWindowSeconds: this.distributionWindowSeconds, + }); + + try { + await this.worker.enqueue({ + id: `scheduled-task-instance:${instanceId}`, + job: "schedule.triggerScheduledTask", + payload: { + instanceId, + exactScheduleTime, + }, + availableAt: distributedExecutionTime, + }); + + span.setAttribute("enqueue_success", true); + + this.logger.debug("Successfully enqueued scheduled task", { + instanceId, + jobId: `scheduled-task-instance:${instanceId}`, + }); + } catch (error) { + this.logger.error("Failed to enqueue scheduled task", { + instanceId, + error: error instanceof Error ? error.message : String(error), + }); + + span.setAttribute("enqueue_error", true); + span.setAttribute( + "enqueue_error_message", + error instanceof Error ? error.message : String(error) + ); + + throw error; + } + }); + } + + async quit() { + this.logger.info("Shutting down schedule engine"); + + try { + await this.worker.stop(); + this.logger.info("Schedule engine worker stopped successfully"); + } catch (error) { + this.logger.error("Error stopping schedule engine worker", { + error: error instanceof Error ? error.message : String(error), + }); + throw error; + } + } +} diff --git a/internal-packages/schedule-engine/src/engine/scheduleCalculation.ts b/internal-packages/schedule-engine/src/engine/scheduleCalculation.ts new file mode 100644 index 0000000000..e556418706 --- /dev/null +++ b/internal-packages/schedule-engine/src/engine/scheduleCalculation.ts @@ -0,0 +1,52 @@ +import { parseExpression } from "cron-parser"; + +export function calculateNextScheduledTimestampFromNow(schedule: string, timezone: string | null) { + return calculateNextScheduledTimestamp(schedule, timezone, new Date()); +} + +export function calculateNextScheduledTimestamp( + schedule: string, + timezone: string | null, + lastScheduledTimestamp: Date = new Date() +) { + const nextStep = calculateNextStep(schedule, timezone, lastScheduledTimestamp); + + if (nextStep.getTime() < Date.now()) { + // If the next step is in the past, we just need to calculate the next step from now + return calculateNextStep(schedule, timezone, new Date()); + } + + return nextStep; +} + +function calculateNextStep(schedule: string, timezone: string | null, currentDate: Date) { + return parseExpression(schedule, { + currentDate, + utc: timezone === null, + tz: timezone ?? undefined, + }) + .next() + .toDate(); +} + +export function nextScheduledTimestamps( + cron: string, + timezone: string | null, + lastScheduledTimestamp: Date, + count: number = 1 +) { + const result: Array = []; + let nextScheduledTimestamp = lastScheduledTimestamp; + + for (let i = 0; i < count; i++) { + nextScheduledTimestamp = calculateNextScheduledTimestamp( + cron, + timezone, + nextScheduledTimestamp + ); + + result.push(nextScheduledTimestamp); + } + + return result; +} diff --git a/internal-packages/schedule-engine/src/engine/types.ts b/internal-packages/schedule-engine/src/engine/types.ts new file mode 100644 index 0000000000..1d6425cfbf --- /dev/null +++ b/internal-packages/schedule-engine/src/engine/types.ts @@ -0,0 +1,72 @@ +import { Logger } from "@trigger.dev/core/logger"; +import { Meter, Tracer } from "@internal/tracing"; +import { Prisma, PrismaClient } from "@trigger.dev/database"; +import { RedisOptions } from "@internal/redis"; + +export type SchedulingEnvironment = Prisma.RuntimeEnvironmentGetPayload<{ + include: { project: true; organization: true; orgMember: true }; +}>; + +export type TriggerScheduledTaskParams = { + taskIdentifier: string; + environment: SchedulingEnvironment; + payload: { + scheduleId: string; + type: "DECLARATIVE" | "IMPERATIVE"; + timestamp: Date; + lastTimestamp?: Date; + externalId?: string; + timezone: string; + upcoming: Date[]; + }; + scheduleInstanceId: string; + scheduleId: string; + exactScheduleTime?: Date; +}; + +export interface TriggerScheduledTaskCallback { + (params: TriggerScheduledTaskParams): Promise<{ success: boolean; error?: string }>; +} + +export interface ScheduleEngineOptions { + logger?: Logger; + logLevel?: string; + prisma: PrismaClient; + redis: RedisOptions; + worker: { + concurrency: number; + pollIntervalMs?: number; + shutdownTimeoutMs?: number; + disabled?: boolean; + }; + distributionWindow?: { + seconds: number; + }; + tracer?: Tracer; + meter?: Meter; + onTriggerScheduledTask: TriggerScheduledTaskCallback; + isDevEnvironmentConnectedHandler: (environmentId: string) => Promise; +} + +export interface UpsertScheduleParams { + projectId: string; + schedule: { + friendlyId?: string; + taskIdentifier: string; + deduplicationKey?: string; + cron: string; + timezone?: string; + externalId?: string; + environments: string[]; + }; +} + +export interface TriggerScheduleParams { + instanceId: string; + finalAttempt: boolean; + exactScheduleTime?: Date; +} + +export interface RegisterScheduleInstanceParams { + instanceId: string; +} diff --git a/internal-packages/schedule-engine/src/engine/workerCatalog.ts b/internal-packages/schedule-engine/src/engine/workerCatalog.ts new file mode 100644 index 0000000000..ee634c6fb5 --- /dev/null +++ b/internal-packages/schedule-engine/src/engine/workerCatalog.ts @@ -0,0 +1,14 @@ +import { z } from "zod"; + +export const scheduleWorkerCatalog = { + "schedule.triggerScheduledTask": { + schema: z.object({ + instanceId: z.string(), + exactScheduleTime: z.coerce.date(), + }), + visibilityTimeoutMs: 60_000, + retry: { + maxAttempts: 5, + }, + }, +}; diff --git a/internal-packages/schedule-engine/src/index.ts b/internal-packages/schedule-engine/src/index.ts new file mode 100644 index 0000000000..22f6c07b30 --- /dev/null +++ b/internal-packages/schedule-engine/src/index.ts @@ -0,0 +1,6 @@ +export { ScheduleEngine } from "./engine/index.js"; +export type { + ScheduleEngineOptions, + TriggerScheduleParams, + TriggerScheduledTaskCallback, +} from "./engine/types.js"; diff --git a/internal-packages/schedule-engine/test/scheduleEngine.test.ts b/internal-packages/schedule-engine/test/scheduleEngine.test.ts new file mode 100644 index 0000000000..e3990a51cf --- /dev/null +++ b/internal-packages/schedule-engine/test/scheduleEngine.test.ts @@ -0,0 +1,267 @@ +import { containerTest } from "@internal/testcontainers"; +import { trace } from "@internal/tracing"; +import { describe, expect, vi } from "vitest"; +import { ScheduleEngine } from "../src/index.js"; +import { setTimeout } from "timers/promises"; +import { TriggerScheduledTaskParams } from "../src/engine/types.js"; + +describe("ScheduleEngine Integration", () => { + containerTest( + "should process full schedule lifecycle through worker with multiple executions", + { timeout: 240_000 }, // Increase timeout for multiple executions (4 minutes) + async ({ prisma, redisOptions }) => { + // Real callback function for testing expectations + const mockDevConnectedHandler = vi.fn().mockResolvedValue(true); + + const triggerCalls: Array<{ + params: TriggerScheduledTaskParams; + executionTime: Date; + }> = []; + + const engine = new ScheduleEngine({ + prisma, + redis: redisOptions, + distributionWindow: { seconds: 10 }, + worker: { + concurrency: 1, + disabled: false, // Enable worker for full integration test + pollIntervalMs: 100, // Poll frequently for faster test execution + }, + tracer: trace.getTracer("test", "0.0.0"), + onTriggerScheduledTask: async (params) => { + const executionTime = new Date(); // Capture when callback is actually called + console.log( + `TriggerScheduledTask called at: ${executionTime.toISOString()} (execution #${ + triggerCalls.length + 1 + })` + ); + console.log("TriggerScheduledTask", params); + + triggerCalls.push({ params, executionTime }); + return { success: true }; + }, + isDevEnvironmentConnectedHandler: mockDevConnectedHandler, + }); + + try { + // Create real database records + const organization = await prisma.organization.create({ + data: { + title: "Test Organization", + slug: "test-org", + }, + }); + + const project = await prisma.project.create({ + data: { + name: "Test Project", + slug: "test-project", + externalRef: "test-ref", + organizationId: organization.id, + }, + }); + + const environment = await prisma.runtimeEnvironment.create({ + data: { + slug: "test-env", + type: "PRODUCTION", + projectId: project.id, + organizationId: organization.id, + apiKey: "tr_test_1234", + pkApiKey: "pk_test_1234", + shortcode: "test-short", + }, + }); + + const taskSchedule = await prisma.taskSchedule.create({ + data: { + friendlyId: "sched_abc123", + taskIdentifier: "test-task", + projectId: project.id, + deduplicationKey: "test-dedup", + userProvidedDeduplicationKey: false, + generatorExpression: "* * * * *", // Every minute + generatorDescription: "Every minute", + timezone: "UTC", + type: "DECLARATIVE", + active: true, + externalId: "ext-123", + }, + }); + + const scheduleInstance = await prisma.taskScheduleInstance.create({ + data: { + taskScheduleId: taskSchedule.id, + environmentId: environment.id, + active: true, + }, + }); + + // Calculate the expected next execution time (next minute boundary) + const now = new Date(); + const expectedExecutionTime = new Date(now); + expectedExecutionTime.setMinutes(now.getMinutes() + 1, 0, 0); // Next minute, 0 seconds, 0 milliseconds + + // Calculate the expected upcoming execution times (next 10 minutes after the first execution) + const expectedUpcoming = []; + for (let i = 1; i <= 10; i++) { + const upcoming = new Date(expectedExecutionTime); + upcoming.setMinutes(expectedExecutionTime.getMinutes() + i); + expectedUpcoming.push(upcoming); + } + + // Manually enqueue the first scheduled task to kick off the lifecycle + await engine.registerNextTaskScheduleInstance({ instanceId: scheduleInstance.id }); + + // Get the actual nextScheduledTimestamp that was calculated by the engine + const instanceAfterRegistration = await prisma.taskScheduleInstance.findFirst({ + where: { id: scheduleInstance.id }, + }); + const actualNextExecution = instanceAfterRegistration?.nextScheduledTimestamp; + expect(actualNextExecution).toBeDefined(); + expect(actualNextExecution).toEqual(expectedExecutionTime); + + // Wait for the first execution + console.log("Waiting for first execution..."); + const startTime = Date.now(); + const maxWaitTime = 70_000; // 70 seconds max wait for first execution + + while (triggerCalls.length === 0 && Date.now() - startTime < maxWaitTime) { + await setTimeout(100); + } + + expect(triggerCalls.length).toBeGreaterThanOrEqual(1); + + // Verify the first execution + const firstExecution = triggerCalls[0]; + console.log("First execution verified, waiting for second execution..."); + + // Wait for the second execution (should happen ~1 minute after the first) + const secondExecutionStartTime = Date.now(); + const maxWaitForSecond = 80_000; // 80 seconds max wait for second execution + + while ( + triggerCalls.length < 2 && + Date.now() - secondExecutionStartTime < maxWaitForSecond + ) { + await setTimeout(100); + } + + expect(triggerCalls.length).toBeGreaterThanOrEqual(2); + + const secondExecution = triggerCalls[1]; + console.log("Second execution verified!"); + + // Give a small delay for database updates to complete before checking + await setTimeout(500); + + // Verify both executions have correct timing and distribution window behavior + for (let i = 0; i < 2; i++) { + const execution = triggerCalls[i]; + const expectedScheduleTime = execution.params.exactScheduleTime; + + if (expectedScheduleTime) { + // Calculate the distribution window (10 seconds before the scheduled time) + const distributionWindowStart = new Date(expectedScheduleTime); + distributionWindowStart.setSeconds(distributionWindowStart.getSeconds() - 10); + + console.log(`Execution ${i + 1}:`); + console.log(" Scheduled time:", expectedScheduleTime.toISOString()); + console.log(" Distribution window start:", distributionWindowStart.toISOString()); + console.log(" Actual execution time:", execution.executionTime.toISOString()); + + // Verify the callback was executed within the distribution window + expect(execution.executionTime.getTime()).toBeGreaterThanOrEqual( + distributionWindowStart.getTime() + ); + expect(execution.executionTime.getTime()).toBeLessThanOrEqual( + expectedScheduleTime.getTime() + ); + } + } + + // Verify the first execution parameters + expect(firstExecution.params).toEqual({ + taskIdentifier: "test-task", + environment: expect.objectContaining({ + id: environment.id, + type: "PRODUCTION", + project: expect.objectContaining({ + id: project.id, + name: "Test Project", + slug: "test-project", + }), + organization: expect.objectContaining({ + id: organization.id, + title: "Test Organization", + slug: "test-org", + }), + }), + payload: { + scheduleId: "sched_abc123", + type: "DECLARATIVE", + timestamp: actualNextExecution, + lastTimestamp: undefined, // First run has no lastTimestamp + externalId: "ext-123", + timezone: "UTC", + upcoming: expect.arrayContaining([expect.any(Date)]), + }, + scheduleInstanceId: scheduleInstance.id, + scheduleId: taskSchedule.id, + exactScheduleTime: actualNextExecution, + }); + + // Verify the second execution parameters + if (actualNextExecution) { + const expectedSecondExecution = new Date(actualNextExecution); + expectedSecondExecution.setMinutes(actualNextExecution.getMinutes() + 1); + + expect(secondExecution.params).toEqual({ + taskIdentifier: "test-task", + environment: expect.objectContaining({ + id: environment.id, + type: "PRODUCTION", + }), + payload: { + scheduleId: "sched_abc123", + type: "DECLARATIVE", + timestamp: expectedSecondExecution, + lastTimestamp: actualNextExecution, // Second run should have the first execution time as lastTimestamp + externalId: "ext-123", + timezone: "UTC", + upcoming: expect.arrayContaining([expect.any(Date)]), + }, + scheduleInstanceId: scheduleInstance.id, + scheduleId: taskSchedule.id, + exactScheduleTime: expectedSecondExecution, + }); + } + + // Verify database updates occurred after both executions + const updatedSchedule = await prisma.taskSchedule.findFirst({ + where: { id: taskSchedule.id }, + }); + expect(updatedSchedule?.lastRunTriggeredAt).toBeTruthy(); + expect(updatedSchedule?.lastRunTriggeredAt).toBeInstanceOf(Date); + + const finalInstance = await prisma.taskScheduleInstance.findFirst({ + where: { id: scheduleInstance.id }, + }); + + // After two executions, lastScheduledTimestamp should be the second execution time + if (actualNextExecution && secondExecution.params.exactScheduleTime) { + const secondExecutionTime = secondExecution.params.exactScheduleTime; + expect(finalInstance?.lastScheduledTimestamp).toEqual(secondExecutionTime); + + // The next scheduled timestamp should be 1 minute after the second execution + const expectedThirdExecution = new Date(secondExecutionTime); + expectedThirdExecution.setMinutes(secondExecutionTime.getMinutes() + 1); + expect(finalInstance?.nextScheduledTimestamp).toEqual(expectedThirdExecution); + } + } finally { + // Clean up: stop the worker + await engine.quit(); + } + } + ); +}); diff --git a/internal-packages/schedule-engine/test/setup.ts b/internal-packages/schedule-engine/test/setup.ts new file mode 100644 index 0000000000..b2bacd6baf --- /dev/null +++ b/internal-packages/schedule-engine/test/setup.ts @@ -0,0 +1,4 @@ +import { vi } from "vitest"; + +// Set extended timeout for container tests +vi.setConfig({ testTimeout: 60_000 }); diff --git a/internal-packages/schedule-engine/tsconfig.build.json b/internal-packages/schedule-engine/tsconfig.build.json new file mode 100644 index 0000000000..89c87a3dc6 --- /dev/null +++ b/internal-packages/schedule-engine/tsconfig.build.json @@ -0,0 +1,21 @@ +{ + "include": ["src/**/*.ts"], + "exclude": ["src/**/*.test.ts"], + "compilerOptions": { + "composite": true, + "target": "ES2020", + "lib": ["ES2020", "DOM", "DOM.Iterable", "DOM.AsyncIterable"], + "outDir": "dist", + "module": "Node16", + "moduleResolution": "Node16", + "moduleDetection": "force", + "verbatimModuleSyntax": false, + "esModuleInterop": true, + "forceConsistentCasingInFileNames": true, + "isolatedModules": true, + "preserveWatchOutput": true, + "skipLibCheck": true, + "strict": true, + "declaration": true + } +} diff --git a/internal-packages/schedule-engine/tsconfig.json b/internal-packages/schedule-engine/tsconfig.json new file mode 100644 index 0000000000..af630abe1f --- /dev/null +++ b/internal-packages/schedule-engine/tsconfig.json @@ -0,0 +1,8 @@ +{ + "references": [{ "path": "./tsconfig.src.json" }, { "path": "./tsconfig.test.json" }], + "compilerOptions": { + "moduleResolution": "Node16", + "module": "Node16", + "customConditions": ["@triggerdotdev/source"] + } +} diff --git a/internal-packages/schedule-engine/tsconfig.src.json b/internal-packages/schedule-engine/tsconfig.src.json new file mode 100644 index 0000000000..0df3d2d222 --- /dev/null +++ b/internal-packages/schedule-engine/tsconfig.src.json @@ -0,0 +1,20 @@ +{ + "include": ["src/**/*.ts"], + "exclude": ["node_modules", "src/**/*.test.ts"], + "compilerOptions": { + "composite": true, + "target": "ES2020", + "lib": ["ES2020", "DOM", "DOM.Iterable", "DOM.AsyncIterable"], + "module": "Node16", + "moduleResolution": "Node16", + "moduleDetection": "force", + "verbatimModuleSyntax": false, + "esModuleInterop": true, + "forceConsistentCasingInFileNames": true, + "isolatedModules": true, + "preserveWatchOutput": true, + "skipLibCheck": true, + "strict": true, + "customConditions": ["@triggerdotdev/source"] + } +} diff --git a/internal-packages/schedule-engine/tsconfig.test.json b/internal-packages/schedule-engine/tsconfig.test.json new file mode 100644 index 0000000000..4c06c9f57b --- /dev/null +++ b/internal-packages/schedule-engine/tsconfig.test.json @@ -0,0 +1,21 @@ +{ + "include": ["src/**/*.test.ts"], + "references": [{ "path": "./tsconfig.src.json" }], + "compilerOptions": { + "composite": true, + "target": "ES2020", + "lib": ["ES2020", "DOM", "DOM.Iterable", "DOM.AsyncIterable"], + "module": "Node16", + "moduleResolution": "Node16", + "moduleDetection": "force", + "verbatimModuleSyntax": false, + "types": ["vitest/globals"], + "esModuleInterop": true, + "forceConsistentCasingInFileNames": true, + "isolatedModules": true, + "preserveWatchOutput": true, + "skipLibCheck": true, + "strict": true, + "customConditions": ["@triggerdotdev/source"] + } +} diff --git a/internal-packages/schedule-engine/vitest.config.ts b/internal-packages/schedule-engine/vitest.config.ts new file mode 100644 index 0000000000..cce9bda541 --- /dev/null +++ b/internal-packages/schedule-engine/vitest.config.ts @@ -0,0 +1,14 @@ +import { defineConfig } from "vitest/config"; + +export default defineConfig({ + test: { + globals: true, + environment: "node", + setupFiles: ["./test/setup.ts"], + testTimeout: 30000, + hookTimeout: 30000, + }, + esbuild: { + target: "node18", + }, +}); diff --git a/packages/redis-worker/src/queue.ts b/packages/redis-worker/src/queue.ts index c07257fdd6..6a1618404b 100644 --- a/packages/redis-worker/src/queue.ts +++ b/packages/redis-worker/src/queue.ts @@ -203,6 +203,7 @@ export class SimpleQueue { item: parsedItem, job: parsedItem.job, timestamp, + availableJobs: Object.keys(this.schema), }); continue; } diff --git a/packages/redis-worker/src/worker.ts b/packages/redis-worker/src/worker.ts index f981698bfa..e167611c79 100644 --- a/packages/redis-worker/src/worker.ts +++ b/packages/redis-worker/src/worker.ts @@ -1,10 +1,10 @@ +import { createRedisClient, Redis, type RedisOptions } from "@internal/redis"; import { Attributes, Histogram, Meter, metrics, ObservableResult, - SemanticAttributes, SpanKind, startSpan, trace, @@ -14,13 +14,11 @@ import { import { Logger } from "@trigger.dev/core/logger"; import { calculateNextRetryDelay } from "@trigger.dev/core/v3"; import { type RetryOptions } from "@trigger.dev/core/v3/schemas"; -import { Redis, type RedisOptions } from "@internal/redis"; -import { z } from "zod"; -import { AnyQueueItem, SimpleQueue } from "./queue.js"; +import { shutdownManager } from "@trigger.dev/core/v3/serverOnly"; import { nanoid } from "nanoid"; import pLimit from "p-limit"; -import { createRedisClient } from "@internal/redis"; -import { shutdownManager } from "@trigger.dev/core/v3/serverOnly"; +import { z } from "zod"; +import { AnyQueueItem, SimpleQueue } from "./queue.js"; export type WorkerCatalog = { [key: string]: { @@ -34,13 +32,17 @@ type QueueCatalogFromWorkerCatalog = { [K in keyof Catalog]: Catalog[K]["schema"]; }; -type JobHandler = (params: { +export type JobHandlerParams = { id: string; payload: z.infer; visibilityTimeoutMs: number; attempt: number; deduplicationKey?: string; -}) => Promise; +}; + +export type JobHandler = ( + params: JobHandlerParams +) => Promise; export type WorkerConcurrencyOptions = { workers?: number; diff --git a/packages/rsc/src/package.json b/packages/rsc/src/package.json new file mode 100644 index 0000000000..3dbc1ca591 --- /dev/null +++ b/packages/rsc/src/package.json @@ -0,0 +1,3 @@ +{ + "type": "module" +} diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 159927b32b..be39db8422 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -251,6 +251,9 @@ importers: '@internal/run-engine': specifier: workspace:* version: link:../../internal-packages/run-engine + '@internal/schedule-engine': + specifier: workspace:* + version: link:../../internal-packages/schedule-engine '@internal/tracing': specifier: workspace:* version: link:../../internal-packages/tracing @@ -1050,6 +1053,43 @@ importers: specifier: 6.0.1 version: 6.0.1 + internal-packages/schedule-engine: + dependencies: + '@internal/redis': + specifier: workspace:* + version: link:../redis + '@internal/tracing': + specifier: workspace:* + version: link:../tracing + '@trigger.dev/core': + specifier: workspace:* + version: link:../../packages/core + '@trigger.dev/database': + specifier: workspace:* + version: link:../database + '@trigger.dev/redis-worker': + specifier: workspace:* + version: link:../../packages/redis-worker + cron-parser: + specifier: ^4.9.0 + version: 4.9.0 + cronstrue: + specifier: ^2.50.0 + version: 2.61.0 + nanoid: + specifier: 3.3.8 + version: 3.3.8 + zod: + specifier: 3.23.8 + version: 3.23.8 + devDependencies: + '@internal/testcontainers': + specifier: workspace:* + version: link:../testcontainers + rimraf: + specifier: 6.0.1 + version: 6.0.1 + internal-packages/testcontainers: dependencies: '@clickhouse/client': @@ -10715,7 +10755,7 @@ packages: '@types/react-dom': optional: true dependencies: - '@babel/runtime': 7.27.4 + '@babel/runtime': 7.27.0 '@radix-ui/primitive': 1.0.1 '@radix-ui/react-compose-refs': 1.0.1(@types/react@18.2.69)(react@18.2.0) '@radix-ui/react-primitive': 1.0.3(@types/react-dom@18.2.7)(@types/react@18.2.69)(react-dom@18.2.0)(react@18.2.0) @@ -10795,7 +10835,7 @@ packages: '@types/react': optional: true dependencies: - '@babel/runtime': 7.27.4 + '@babel/runtime': 7.27.0 '@types/react': 18.2.69 react: 18.2.0 dev: false @@ -11152,7 +11192,7 @@ packages: '@types/react-dom': optional: true dependencies: - '@babel/runtime': 7.27.4 + '@babel/runtime': 7.27.0 '@radix-ui/react-primitive': 1.0.3(@types/react-dom@18.2.7)(@types/react@18.2.69)(react-dom@18.2.0)(react@18.2.0) '@types/react': 18.2.69 '@types/react-dom': 18.2.7 @@ -21101,6 +21141,11 @@ packages: hasBin: true dev: false + /cronstrue@2.61.0: + resolution: {integrity: sha512-ootN5bvXbIQI9rW94+QsXN5eROtXWwew6NkdGxIRpS/UFWRggL0G5Al7a9GTBFEsuvVhJ2K3CntIIVt7L2ILhA==} + hasBin: true + dev: false + /cross-env@7.0.3: resolution: {integrity: sha512-+/HKd6EgcQCJGh2PSjZuUitQBQynKor4wrFbRg4DtAgS1aWO+gU52xpH7M9ScGgXSYmAVS9bIJ8EzuaGw0oNAw==} engines: {node: '>=10.14', npm: '>=6', yarn: '>=1'} diff --git a/references/hello-world/src/trigger/schedule.ts b/references/hello-world/src/trigger/schedule.ts index 29c363d64c..c11c91ab7a 100644 --- a/references/hello-world/src/trigger/schedule.ts +++ b/references/hello-world/src/trigger/schedule.ts @@ -2,7 +2,8 @@ import { schedules } from "@trigger.dev/sdk/v3"; export const simpleSchedule = schedules.task({ id: "simple-schedule", - cron: "0 0 * * *", + // Every other minute + cron: "*/2 * * * *", run: async (payload, { ctx }) => { return { message: "Hello, world!",