Skip to content

Improved schedule engine #2174

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open

Conversation

ericallam
Copy link
Member

No description provided.

Copy link

changeset-bot bot commented Jun 13, 2025

⚠️ No Changeset found

Latest commit: 177f59d

Merging this PR will not cause a version bump for any packages. If these changes should not result in a new version, you're good to go. If these changes should result in a version bump, you need to add a changeset.

Click here to learn what changesets are, and how to add one.

Click here if you're a maintainer who wants to add a changeset to this PR

Copy link
Contributor

coderabbitai bot commented Jun 13, 2025

Walkthrough

This update introduces a comprehensive scheduling engine refactor and migration across the codebase. A new internal package, @internal/schedule-engine, is added, providing a centralized ScheduleEngine class for managing scheduled tasks, their execution, and distributed timing using Redis and Prisma. The webapp is updated to use this engine, with new environment variables for schedule worker and engine configuration. Legacy scheduling services (TriggerScheduledTaskService, RegisterNextTaskScheduleInstanceService) are removed and replaced with direct calls to the ScheduleEngine. Worker job enqueuing is refactored to use a unified commonWorker interface. New utility functions and tests for distributed scheduling are introduced, and extensive integration tests validate the new engine's lifecycle and timing. Type definitions, documentation, and build/test configurations for the new package are included.

✨ Finishing Touches
  • 📝 Generate Docstrings

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Explain this complex logic.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai explain this code block.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and explain its main purpose.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Support

Need help? Create a ticket on our support page for assistance with any issues or questions.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR.
  • @coderabbitai generate sequence diagram to generate a sequence diagram of the changes in this PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 14

🔭 Outside diff range comments (5)
internal-packages/run-engine/src/engine/types.ts (1)

128-137: 💡 Verification agent

🧩 Analysis chain

Temporal field proliferation – clarify semantics of createdAt vs queuedAt / queueTimestamp

TriggerParams now exposes three very similar timestamps:

  • createdAt
  • queuedAt
  • queueTimestamp

Without a clear contract, callers may set only one of them or confuse their meaning, causing subtle ordering bugs in metrics or deduplication.

Suggest either:

-  createdAt?: Date;
+  /**
+   * Absolute creation time of the *task run* record in the DB.
+   * Prefer `queuedAt` for queue-ing semantics and `queueTimestamp` for
+   * external-system hand-off.  Documented here to avoid ambiguity.
+   */
+  createdAt?: Date;

and add JSDoc for the other two, or fold them into a single canonical field.

Run this to locate all writes to these properties:


🏁 Script executed:

#!/bin/bash
rg -n --pretty $'(createdAt|queuedAt|queueTimestamp)\\s*:' | head

Length of output: 296


🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Check for queuedAt and queueTimestamp in TriggerParams type
rg -n "queuedAt" internal-packages/run-engine/src/engine/types.ts || echo "no matches for queuedAt"
rg -n "queueTimestamp" internal-packages/run-engine/src/engine/types.ts || echo "no matches for queueTimestamp"

Length of output: 209


Enhance JSDoc for Temporal Fields in TriggerParams

The three timestamp fields in TriggerParamscreatedAt, queuedAt, and queueTimestamp—currently lack documentation and have subtly different meanings. Please add clear JSDoc comments to each to prevent misuse:

• File: internal-packages/run-engine/src/engine/types.ts
Lines: ~108–137

Suggested diff:

 interface TriggerParams {
+  /**
+   * Absolute creation time of the task-run record in the database.
+   * Use only for DB-side ordering; not for queue sequencing.
+   */
   createdAt?: Date;
+  /**
+   * Time when this run was enqueued in our internal scheduler.
+   * Use for internal queue ordering and deduplication.
+   */
   queuedAt?: Date;
+  /**
+   * Time when the run was handed off to an external system or broker.
+   * Use for external-system metrics and hand-off tracing.
+   */
   queueTimestamp?: Date;
   machine?: MachinePresetName;
   workerId?: string;
   runnerId?: string;
   releaseConcurrency?: boolean;
   runChainState?: RunChainState;
   scheduleId?: string;
   scheduleInstanceId?: string;
 }
apps/webapp/app/v3/services/triggerTaskV1.server.ts (1)

367-372: 🛠️ Refactor suggestion

Same validation gap for options.queueTimestamp

Duplicate the sanity check here (or extract a shared util) to avoid divergent behaviours between V1 and the new engine path.

internal-packages/schedule-engine/tsconfig.test.json (1)

1-22: ⚠️ Potential issue

include misses the actual test folder

Integration tests live in test/, but "include": ["src/**/*.test.ts"] only covers unit tests inside src/.
vitest will still run the file, yet the compiler won’t type-check it, defeating the purpose of this config.

-  "include": ["src/**/*.test.ts"],
+  "include": ["src/**/*.test.ts", "test/**/*.ts"],

Add the glob (or move the tests) to ensure they’re compiled.

apps/webapp/app/v3/services/timeoutDeployment.server.ts (1)

46-63: 🛠️ Refactor suggestion

tx parameter is now unused – drop or use

enqueue() still accepts tx?: PrismaClientOrTransaction but never references it after the refactor to commonWorker.

-static async enqueue(
-  deploymentId: string,
-  fromStatus: string,
-  errorMessage: string,
-  runAt: Date,
-  tx?: PrismaClientOrTransaction
-) {
+static async enqueue(
+  deploymentId: string,
+  fromStatus: string,
+  errorMessage: string,
+  runAt: Date,
+) {

Keeping an unused parameter risks confusion and lint warnings. If transactional enqueue is still required, wire it through commonWorker.enqueue; otherwise remove it.

apps/webapp/app/v3/services/cancelDevSessionRuns.server.ts (1)

93-106: ⚠️ Potential issue

Breaking change: parameter order flipped

enqueue() used to be (options, tx, runAt?).
The new signature (options, runAt?, tx?) silently swaps runAt and tx. Existing call-sites will now pass a Prisma client where a Date is expected, resulting in an invalid availableAt and no transactional context.

Either restore the original ordering or add an overload while deprecating the old one:

-static async enqueue(
-  options: CancelDevSessionRunsServiceOptions,
-  runAt?: Date,
-  tx?: PrismaClientOrTransaction
-) {
+// Preserve old order
+static async enqueue(
+  options: CancelDevSessionRunsServiceOptions,
+  tx?: PrismaClientOrTransaction,
+  runAt?: Date,
+) {

Remember to prefix _tx if it remains unused.

♻️ Duplicate comments (3)
apps/webapp/app/presenters/v3/RunListPresenter.server.ts (1)

182-186: Same clampToNow duplication as in NextRunListPresenter

Please pull this helper from a shared util to keep behaviour consistent across all call-sites and cut maintenance overhead.

internal-packages/schedule-engine/tsconfig.build.json (1)

6-8: Same DOM lib concern as in tsconfig.src.json

Consider trimming the lib array here as well to keep the build output consistent with the runtime environment.

apps/webapp/app/v3/services/triggerTaskV1.server.ts (1)

442-443: Conditionally include createdAt

Apply the same conditional spread pattern here to avoid sending explicit undefined to Prisma.

🧹 Nitpick comments (31)
apps/webapp/app/presenters/v3/NextRunListPresenter.server.ts (2)

192-195: Utility duplication – extract clampToNow

clampToNow is now copy-pasted in at least two presenters. Move it to a small shared util (e.g. ~/utils/date.ts) to avoid divergence and ease testing.

-function clampToNow(date: Date): Date {
-  const now = new Date();
-  return date > now ? now : date;
-}
+import { clampToNow } from "~/utils/date";

206-208: Minor: avoid double DatenumberDate conversions

time.from / time.to are Date objects. You call clampToNow(time.to) (returns Date) and then immediately .getTime(). Consider inlining to keep it obvious:

- to: time.to ? clampToNow(time.to).getTime() : undefined,
+ to: time.to ? Math.min(time.to.getTime(), Date.now()) : undefined,

Purely cosmetic; feel free to skip.

apps/webapp/app/presenters/v3/RunListPresenter.server.ts (1)

291-294: Performance & readability: calculate once

clampToNow(time.to) is evaluated during SQL template construction; for large batches this inside-template computation is negligible, but calculating once improves clarity:

-  time.to
-    ? Prisma.sql`AND tr."createdAt" <= ${clampToNow(time.to).toISOString()}::timestamp`
+  time.to
+    ? Prisma.sql`AND tr."createdAt" <= ${
+        clampToNow(time.to).toISOString()
+      }::timestamp`

Even better after extracting util as suggested above.

internal-packages/schedule-engine/test/setup.ts (1)

1-4: Consider moving global vitest config to vitest.config.* instead of runtime mutation

vi.setConfig({ testTimeout: 60_000 }); works, but:

  1. It executes after test file import order, which can be flaky if other files rely on the timeout earlier.
  2. A vitest.config.ts (or <root>/vitest.setup.ts referenced via setupFiles) is the canonical place.

If you keep this, at least prefix the filename with setupTests. to make its intent clearer.

apps/webapp/app/v3/services/triggerTask.server.ts (1)

34-36: Consider surfacing docs / validation for the new options

queueTimestamp and overrideCreatedAt are accepted here, but nothing verifies that:

  • queueTimestampoverrideCreatedAt (or vice-versa depending on intent)
  • both are sensible (e.g. not in the future or > now + max skew)

A short JSDoc comment or runtime guard would prevent subtle clock-skew bugs.

apps/webapp/app/v3/services/createBackgroundWorker.server.ts (1)

28-29: Import is used only for side-effectful calls – keep tree-shaking friendly

scheduleEngine is imported at module top level but only referenced inside syncDeclarativeSchedules.
For clarity / bundler friendliness you could move the import next to its usage:

-import { scheduleEngine } from "../scheduleEngine.server";
+// defer import to keep the main bundle lean
+import { scheduleEngine } from "../scheduleEngine.server";

(Not critical; feel free to ignore if build size is not a concern.)

internal-packages/schedule-engine/tsconfig.json (1)

1-8: Minor: missing "strict": true & rootDir can hurt DX

Since this is a new package, enabling compiler strictness early saves bugs later:

   "compilerOptions": {
+    "strict": true,
     "moduleResolution": "Node16",
     "module": "Node16",
     "customConditions": ["@triggerdotdev/source"]
   }

Also add "rootDir": "./src" to keep emitted paths tidy.

internal-packages/schedule-engine/tsconfig.src.json (1)

6-8: Unnecessary DOM libs increase type-check noise

The library list includes DOM* libs even though the package runs exclusively in Node. This can:

  • pull in thousands of irrelevant types
  • mask accidental browser-only API usage
-    "lib": ["ES2020", "DOM", "DOM.Iterable", "DOM.AsyncIterable"],
+    "lib": ["ES2020"],

Unless you intentionally reference browser globals inside the engine, dropping them will speed up builds and tighten type safety.

apps/webapp/app/runEngine/services/triggerTask.server.ts (1)

311-312: Passing createdAt as undefined clutters the Prisma payload

createdAt is always present in the object, even when no override is supplied. Prisma accepts undefined, but it still bloats query text and hides intent.

Consider making the property conditional:

-                  createdAt: options.overrideCreatedAt,
+                  ...(options.overrideCreatedAt
+                    ? { createdAt: options.overrideCreatedAt }
+                    : {}),

Keeps the payload minimal and future-proof if Prisma tightens strict-null-checks.

internal-packages/schedule-engine/src/engine/workerCatalog.ts (1)

3-14: Explicit typing & as const would safeguard the catalog

scheduleWorkerCatalog is currently an untyped object literal. Down-stream code will lose IntelliSense and accidental property changes won’t be caught.

-export const scheduleWorkerCatalog = {
+import type { WorkerCatalog } from "@trigger.dev/worker"; // hypothetical shared type
+
+export const scheduleWorkerCatalog: WorkerCatalog = {
   "schedule.triggerScheduledTask": {
     schema: z.object({
       instanceId: z.string(),
       exactScheduleTime: z.coerce.date(),
     }),

Optionally append as const after the object to freeze the keys.

This minor change prevents typos (e.g., "schedule.triggerSchedueldTask") from compiling.

apps/webapp/app/services/email.server.ts (1)

95-101: Return the enqueue promise to surface failures

scheduleEmail currently awaits the enqueue but returns void, swallowing any value the worker library might provide (job id, etc.). Returning the promise enables callers to react or log the job id.

-export async function scheduleEmail(data: DeliverEmail, delay?: { seconds: number }) {
+export async function scheduleEmail(
+  data: DeliverEmail,
+  delay?: { seconds: number }
+): Promise<void> {
   const availableAt = delay ? new Date(Date.now() + delay.seconds * 1000) : undefined;
-  await commonWorker.enqueue({
+  return commonWorker.enqueue({
     job: "scheduleEmail",
     payload: data,
     availableAt,
   });
 }

Also consider validating delay.seconds >= 0 to avoid accidental past dates.

internal-packages/schedule-engine/src/engine/distributedScheduling.ts (1)

21-26: Small clarity nit

Math.pow(2, 31) can be replaced with the more idiomatic 2 ** 31, which is easier to read and avoids an extra function call.

apps/webapp/app/v3/commonWorker.server.ts (1)

48-141: Catalog definition is getting unwieldy

Hard-coding dozens of job specs with identical visibilityTimeoutMs and very similar retry policies makes maintenance painful. Consider extracting helpers:

const defaultJob = <T>(schema: T, retry = 5) => ({
  schema,
  visibilityTimeoutMs: 60_000,
  retry: { maxAttempts: retry },
});

Then:

catalog: {
  "v3.resumeBatchRun": defaultJob(z.object({ batchRunId: z.string() })),
  ...
}

This trims ~150 LOC and reduces copy-paste errors.

internal-packages/schedule-engine/README.md (2)

51-58: Minor wording & brevity nit

“from executing at exactly the same moment”

-Prevents thundering herd issues by distributing executions across time windows
+Prevents thundering-herd issues by spreading executions across a time window
🧰 Tools
🪛 LanguageTool

[style] ~51-~51: ‘exactly the same’ might be wordy. Consider a shorter alternative.
Context: ...t all scheduled tasks from executing at exactly the same moment: ```typescript import { calcula...

(EN_WORDINESS_PREMIUM_EXACTLY_THE_SAME)


101-105: Dangling colon renders as loose punctuation

A colon at the start of a bullet occasionally shows up in some markdown renderers:

-- `redis`: Redis connection configuration
++ `redis` – Redis connection configuration
🧰 Tools
🪛 LanguageTool

[uncategorized] ~101-~101: Loose punctuation mark.
Context: ...these configuration options: - prisma: PrismaClient instance - redis: Redis ...

(UNLIKELY_OPENING_PUNCTUATION)

apps/webapp/app/v3/services/timeoutDeployment.server.ts (1)

53-62: Consider enqueueOnce to avoid duplicate timeout jobs

Timeouts are idempotent; using plain enqueue can queue duplicates if enqueue is called multiple times before the first job is processed.

- await commonWorker.enqueue({
+ await commonWorker.enqueueOnce({
   id: `timeoutDeployment:${deploymentId}`,

The unique id is already provided, so switching to enqueueOnce protects against accidental duplication at no extra cost.

apps/webapp/app/v3/services/executeTasksWaitingForDeploy.ts (1)

107-116: Use enqueueOnce for id-based deduplication

As with the timeout service, the job ID is deterministic (v3.executeTasksWaitingForDeploy:${backgroundWorkerId}).
Switching to enqueueOnce guarantees exactly-once semantics during bursts (e.g., multiple deploy hooks firing).

- return await commonWorker.enqueue({
+ return await commonWorker.enqueueOnce({
apps/webapp/app/v3/services/cancelTaskAttemptDependencies.server.ts (1)

60-69: Unused tx parameter – silence linter or drop it

The tx argument is no longer forwarded to commonWorker.enqueue, leaving it unused and triggering the @typescript-eslint/no-unused-vars rule in most code-bases.

-static async enqueue(attemptId: string, tx: PrismaClientOrTransaction, runAt?: Date) {
+static async enqueue(
+  attemptId: string,
+  _tx: PrismaClientOrTransaction, // keep positional compatibility but silence linter
+  runAt?: Date
+) {

If transactions are no longer required here, consider making the parameter optional and prefixing with an underscore (or removing it entirely) across call-sites to avoid confusion.

apps/webapp/app/runEngine/services/batchTrigger.server.ts (1)

316-321: tx parameter kept but never used

#enqueueBatchTaskRun() still accepts a tx parameter yet the value is ignored after the switch to commonWorker.enqueue. Either remove the parameter or rename it to _tx to clarify that it is intentionally unused:

-async #enqueueBatchTaskRun(options: BatchProcessingOptions, tx?: PrismaClientOrTransaction) {
+async #enqueueBatchTaskRun(options: BatchProcessingOptions, _tx?: PrismaClientOrTransaction) {

Cleaning this up avoids false expectations that the call will be wrapped in a transaction.

apps/webapp/app/v3/services/resumeTaskDependency.server.ts (1)

157-171: Unused tx argument

Similar to other services, tx is accepted but not used after the move to commonWorker.enqueue.

-static async enqueue(
-  dependencyId: string,
-  sourceTaskAttemptId: string,
-  tx: PrismaClientOrTransaction,
-  runAt?: Date
-) {
+static async enqueue(
+  dependencyId: string,
+  sourceTaskAttemptId: string,
+  _tx: PrismaClientOrTransaction,
+  runAt?: Date
+) {

This prevents linter warnings and signals that transactional support has been removed.

apps/webapp/test/distributedScheduling.test.ts (1)

57-88: Potential flakiness in uniform-distribution assertion

The “should distribute work evenly” test uses 1 000 samples and fails if fewer than 80 % of 30 buckets receive at least one hit.
While unlikely, the deterministic hash could still cluster enough samples to dip below the 24-bucket threshold, making the test flaky.

Consider loosening the assertion or increasing the sample size:

-const testCount = 1000;
-
-expect(bucketsWithItems).toBeGreaterThan(distributionWindow * 0.8);
+const testCount = 5_000; // reduces variance
+
+expect(bucketsWithItems).toBeGreaterThan(distributionWindow * 0.7);

This keeps the spirit of the check while minimising spurious CI failures.

apps/webapp/app/v3/services/resumeBatchRun.server.ts (1)

2-2: Unused import side-effect

Switching to commonWorker makes the PrismaClientOrTransaction argument in enqueue() obsolete. Consider pruning that parameter from the method signature (and its call-sites) to avoid dead code and confusion.

apps/webapp/app/v3/scheduleEngine.server.ts (1)

81-116: Error is swallowed – add structured logging

The catch only returns { success:false } but drops stack traces; production debugging will be painful.

-      } catch (error) {
-        return {
-          success: false,
-          error: error instanceof Error ? error.message : String(error),
-        };
+      } catch (error) {
+        logger.error("scheduleEngine.onTriggerScheduledTask failed", { error });
+        return {
+          success: false,
+          error: error instanceof Error ? error.message : String(error),
+        };
       }
apps/webapp/app/v3/services/batchTriggerV3.server.ts (2)

478-540: Inconsistent logging & IDs reference “V2”

This v3 service logs under BatchTriggerV2 and builds job IDs with BatchTriggerV2Service.process.
Rename to BatchTriggerV3 to avoid future grep/debug headaches.

- id: `BatchTriggerV2Service.process:${options.batchId}:${options.processingId}`,
+ id: `BatchTriggerV3Service.process:${options.batchId}:${options.processingId}`,

Same for every logger.debug message using [BatchTriggerV2] ….


895-899: tx parameter is never used

#enqueueBatchTaskRun still accepts tx but ignores it. Drop the argument or forward it to a transactional queue helper; leaving it dangling is misleading.

internal-packages/schedule-engine/test/scheduleEngine.test.ts (1)

121-123: Avoid brittle exact-equality on Date

expect(actualNextExecution).toEqual(expectedExecutionTime) can flake by a few ms.
Use a ±1 s window or toBeCloseTo with a tolerance instead.

internal-packages/schedule-engine/src/engine/types.ts (1)

27-30: Constrain logLevel via a union

logLevel?: string accepts invalid values at runtime. Consider

logLevel?: "log" | "error" | "warn" | "info" | "debug";

This mirrors the enum you use elsewhere and prevents typos.

apps/webapp/app/v3/services/upsertTaskSchedule.server.ts (2)

108-128: Parallelise instance creation / registration

The loop issues a Prisma insert then immediately waits for scheduleEngine.registerNextTaskScheduleInstance.
With many environments this becomes O(n) sequential I/O. Consider:

await Promise.all(
  options.environments.map(async (environmentId) => {
    const instance = await this._prisma.taskScheduleInstance.create({});
    await scheduleEngine.registerNextTaskScheduleInstance({ instanceId: instance.id });
  })
);

This keeps semantic order while cutting latency substantially.


206-218: Typo & readability

instancesToDeletedinstancesToDelete. Minor, but avoiding grammatical slips helps future readers.

internal-packages/schedule-engine/src/engine/index.ts (1)

96-106: Side-effects in constructor hinder testability

Spawning the worker inside the constructor (this.worker.start()) means merely importing / instantiating the class connects to Redis and spawns threads, complicating unit tests and life-cycle management.
Expose an explicit start() method instead; call it from your singleton wrapper.

apps/webapp/app/env.server.ts (1)

686-695: Clarify overlapping concurrency env-vars

The trio
SCHEDULE_WORKER_CONCURRENCY_WORKERS,
SCHEDULE_WORKER_CONCURRENCY_TASKS_PER_WORKER, and
SCHEDULE_WORKER_CONCURRENCY_LIMIT
overlap but the codebase only reads concurrency (workers × tasks). Document which flags are authoritative or drop the unused ones to avoid misconfiguration.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 8a5e7f3 and 177f59d.

⛔ Files ignored due to path filters (2)
  • pnpm-lock.yaml is excluded by !**/pnpm-lock.yaml
  • references/hello-world/src/trigger/schedule.ts is excluded by !references/**
📒 Files selected for processing (49)
  • apps/webapp/app/env.server.ts (1 hunks)
  • apps/webapp/app/presenters/v3/NextRunListPresenter.server.ts (2 hunks)
  • apps/webapp/app/presenters/v3/RunListPresenter.server.ts (2 hunks)
  • apps/webapp/app/runEngine/services/batchTrigger.server.ts (2 hunks)
  • apps/webapp/app/runEngine/services/triggerTask.server.ts (1 hunks)
  • apps/webapp/app/services/email.server.ts (2 hunks)
  • apps/webapp/app/services/runsRepository.server.ts (1 hunks)
  • apps/webapp/app/services/worker.server.ts (19 hunks)
  • apps/webapp/app/v3/commonWorker.server.ts (3 hunks)
  • apps/webapp/app/v3/scheduleEngine.server.ts (1 hunks)
  • apps/webapp/app/v3/services/batchTriggerV3.server.ts (3 hunks)
  • apps/webapp/app/v3/services/cancelDevSessionRuns.server.ts (2 hunks)
  • apps/webapp/app/v3/services/cancelTaskAttemptDependencies.server.ts (2 hunks)
  • apps/webapp/app/v3/services/changeCurrentDeployment.server.ts (1 hunks)
  • apps/webapp/app/v3/services/createBackgroundWorker.server.ts (3 hunks)
  • apps/webapp/app/v3/services/createDeploymentBackgroundWorkerV3.server.ts (1 hunks)
  • apps/webapp/app/v3/services/executeTasksWaitingForDeploy.ts (2 hunks)
  • apps/webapp/app/v3/services/registerNextTaskScheduleInstance.server.ts (0 hunks)
  • apps/webapp/app/v3/services/resumeBatchRun.server.ts (2 hunks)
  • apps/webapp/app/v3/services/resumeTaskDependency.server.ts (2 hunks)
  • apps/webapp/app/v3/services/resumeTaskRunDependencies.server.ts (2 hunks)
  • apps/webapp/app/v3/services/retryAttempt.server.ts (2 hunks)
  • apps/webapp/app/v3/services/timeoutDeployment.server.ts (2 hunks)
  • apps/webapp/app/v3/services/triggerScheduledTask.server.ts (0 hunks)
  • apps/webapp/app/v3/services/triggerTask.server.ts (1 hunks)
  • apps/webapp/app/v3/services/triggerTaskV1.server.ts (2 hunks)
  • apps/webapp/app/v3/services/upsertTaskSchedule.server.ts (4 hunks)
  • apps/webapp/app/v3/utils/distributedScheduling.server.ts (1 hunks)
  • apps/webapp/package.json (1 hunks)
  • apps/webapp/test/distributedScheduling.test.ts (1 hunks)
  • internal-packages/run-engine/src/engine/index.ts (2 hunks)
  • internal-packages/run-engine/src/engine/types.ts (1 hunks)
  • internal-packages/schedule-engine/README.md (1 hunks)
  • internal-packages/schedule-engine/package.json (1 hunks)
  • internal-packages/schedule-engine/src/engine/distributedScheduling.ts (1 hunks)
  • internal-packages/schedule-engine/src/engine/index.ts (1 hunks)
  • internal-packages/schedule-engine/src/engine/scheduleCalculation.ts (1 hunks)
  • internal-packages/schedule-engine/src/engine/types.ts (1 hunks)
  • internal-packages/schedule-engine/src/engine/workerCatalog.ts (1 hunks)
  • internal-packages/schedule-engine/src/index.ts (1 hunks)
  • internal-packages/schedule-engine/test/scheduleEngine.test.ts (1 hunks)
  • internal-packages/schedule-engine/test/setup.ts (1 hunks)
  • internal-packages/schedule-engine/tsconfig.build.json (1 hunks)
  • internal-packages/schedule-engine/tsconfig.json (1 hunks)
  • internal-packages/schedule-engine/tsconfig.src.json (1 hunks)
  • internal-packages/schedule-engine/tsconfig.test.json (1 hunks)
  • internal-packages/schedule-engine/vitest.config.ts (1 hunks)
  • packages/redis-worker/src/worker.ts (1 hunks)
  • packages/rsc/src/package.json (1 hunks)
💤 Files with no reviewable changes (2)
  • apps/webapp/app/v3/services/registerNextTaskScheduleInstance.server.ts
  • apps/webapp/app/v3/services/triggerScheduledTask.server.ts
🧰 Additional context used
🧬 Code Graph Analysis (15)
apps/webapp/app/v3/services/changeCurrentDeployment.server.ts (1)
apps/webapp/app/v3/services/executeTasksWaitingForDeploy.ts (1)
  • ExecuteTasksWaitingForDeployService (8-117)
apps/webapp/app/v3/services/createDeploymentBackgroundWorkerV3.server.ts (1)
apps/webapp/app/v3/services/executeTasksWaitingForDeploy.ts (1)
  • ExecuteTasksWaitingForDeployService (8-117)
apps/webapp/app/v3/services/resumeTaskRunDependencies.server.ts (1)
apps/webapp/app/v3/commonWorker.server.ts (1)
  • commonWorker (279-279)
apps/webapp/app/services/email.server.ts (2)
apps/webapp/app/v3/commonWorker.server.ts (1)
  • commonWorker (279-279)
internal-packages/emails/src/index.tsx (1)
  • data (80-122)
apps/webapp/app/v3/services/executeTasksWaitingForDeploy.ts (1)
apps/webapp/app/v3/commonWorker.server.ts (1)
  • commonWorker (279-279)
apps/webapp/app/v3/services/timeoutDeployment.server.ts (3)
apps/webapp/app/v3/commonWorker.server.ts (1)
  • commonWorker (279-279)
apps/webapp/app/db.server.ts (1)
  • PrismaClientOrTransaction (21-21)
apps/webapp/app/services/worker.server.ts (1)
  • workerQueue (373-373)
apps/webapp/app/v3/services/cancelTaskAttemptDependencies.server.ts (1)
apps/webapp/app/v3/commonWorker.server.ts (1)
  • commonWorker (279-279)
packages/redis-worker/src/worker.ts (2)
internal-packages/schedule-engine/src/engine/index.ts (1)
  • JobHandlerParams (217-225)
internal-packages/zod-worker/src/index.ts (1)
  • K (581-660)
apps/webapp/app/v3/services/retryAttempt.server.ts (1)
apps/webapp/app/v3/commonWorker.server.ts (1)
  • commonWorker (279-279)
internal-packages/schedule-engine/src/engine/types.ts (4)
internal-packages/schedule-engine/src/index.ts (3)
  • TriggerScheduledTaskCallback (5-5)
  • ScheduleEngineOptions (3-3)
  • TriggerScheduleParams (4-4)
apps/webapp/app/db.server.ts (1)
  • PrismaClient (228-228)
internal-packages/redis/src/index.ts (1)
  • RedisOptions (4-4)
internal-packages/tracing/src/index.ts (2)
  • Tracer (14-14)
  • Meter (16-16)
apps/webapp/app/v3/services/resumeTaskDependency.server.ts (1)
apps/webapp/app/v3/commonWorker.server.ts (1)
  • commonWorker (279-279)
apps/webapp/test/distributedScheduling.test.ts (1)
apps/webapp/app/v3/utils/distributedScheduling.server.ts (1)
  • calculateDistributedExecutionTime (6-31)
apps/webapp/app/v3/services/cancelDevSessionRuns.server.ts (2)
apps/webapp/app/v3/commonWorker.server.ts (1)
  • commonWorker (279-279)
apps/webapp/app/v3/services/batchTriggerV3.server.ts (1)
  • options (894-900)
apps/webapp/app/v3/services/resumeBatchRun.server.ts (1)
apps/webapp/app/v3/commonWorker.server.ts (1)
  • commonWorker (279-279)
apps/webapp/app/v3/services/createBackgroundWorker.server.ts (1)
apps/webapp/app/v3/scheduleEngine.server.ts (1)
  • scheduleEngine (11-11)
🪛 LanguageTool
internal-packages/schedule-engine/README.md

[style] ~51-~51: ‘exactly the same’ might be wordy. Consider a shorter alternative.
Context: ...t all scheduled tasks from executing at exactly the same moment: ```typescript import { calcula...

(EN_WORDINESS_PREMIUM_EXACTLY_THE_SAME)


[uncategorized] ~101-~101: Loose punctuation mark.
Context: ...these configuration options: - prisma: PrismaClient instance - redis: Redis ...

(UNLIKELY_OPENING_PUNCTUATION)

⏰ Context from checks skipped due to timeout of 90000ms (25)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (1, 10)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (8, 10)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (10, 10)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (7, 10)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (4, 10)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (6, 10)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (9, 10)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (3, 10)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (5, 10)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (2, 10)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (8, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (7, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (4, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (3, 8)
  • GitHub Check: units / packages / 🧪 Unit Tests: Packages (1, 1)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (2, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (1, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (6, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (5, 8)
  • GitHub Check: e2e / 🧪 CLI v3 tests (ubuntu-latest - pnpm)
  • GitHub Check: e2e / 🧪 CLI v3 tests (ubuntu-latest - npm)
  • GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - pnpm)
  • GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - npm)
  • GitHub Check: typecheck / typecheck
  • GitHub Check: Analyze (javascript-typescript)
🔇 Additional comments (16)
packages/rsc/src/package.json (1)

1-3: Minimal package.json LGTM, but double-check directory purpose

Declaring "type": "module" is fine and consistent with the rest of the repo.
Make sure:

  1. No other package.json higher in the packages/rsc tree overrides this.
  2. Tooling that consumes this folder (ts-node, vitest, etc.) expects ESM.

If both are already covered, nothing else to do here.

apps/webapp/package.json (1)

53-57: Workspace dependency added – ensure pipeline & lockfile are updated

"@internal/schedule-engine": "workspace:*" looks correct.
Please run the package-manager install (pnpm install / npm install -w) and commit the updated lockfile so CI picks it up.

apps/webapp/app/v3/services/changeCurrentDeployment.server.ts (1)

87-87: Call site now matches new enqueue signature – all good

ExecuteTasksWaitingForDeployService.enqueue now only accepts (backgroundWorkerId, runAt?).
This update removes the stale Prisma argument and compiles cleanly.

apps/webapp/app/v3/services/createDeploymentBackgroundWorkerV3.server.ts (1)

164-166: LGTM – signature change reflected correctly

ExecuteTasksWaitingForDeployService.enqueue no longer requires the Prisma client, and the call site has been updated accordingly.

apps/webapp/app/v3/services/createBackgroundWorker.server.ts (1)

571-573: Error propagation unchanged but worth revisiting

scheduleEngine.registerNextTaskScheduleInstance is awaited inside this tight loop.
If that call throws (e.g. transient Redis issue) the entire worker-creation flow aborts and rolls back the DB transaction – the same behaviour the removed service had, but now the failure surface is broader (Redis, distributed locks…).

Consider wrapping in a retry helper or catching and logging non-fatal errors to avoid making deployments brittle.

Also applies to: 603-604

internal-packages/schedule-engine/vitest.config.ts (1)

11-13: Confirm the esbuild target is aligned with runtime

target: "node18" is fine today, but if the runtime matrix includes Node 20 (current LTS) the emitted code will miss out on native improvements (e.g., fetch, structuredClone). Worth double-checking the production containers.

internal-packages/schedule-engine/src/index.ts (1)

1-6: Public entry-point looks good

The minimalist barrel file is clear and follows the Node16/ESM convention that the compiler will emit .js files, so the explicit .js extension is appropriate. No further concerns.

apps/webapp/app/v3/services/resumeTaskRunDependencies.server.ts (1)

8-9: Import change looks correct

Switching to the new commonWorker aligns with the broader refactor.

apps/webapp/app/services/email.server.ts (1)

7-8: Import update acknowledged

Migrating to commonWorker is consistent with the rest of the PR.

internal-packages/schedule-engine/package.json (1)

8-14: Non-standard export key

"@triggerdotdev/source" inside exports isn’t a recognised condition and may break resolution in Node ≥16. Verify that tools relying on this key actually support it; otherwise use a documented condition such as "source" or "development".

apps/webapp/app/v3/commonWorker.server.ts (1)

188-195: Ensure numeric env values

pollIntervalMs, immediatePollIntervalMs, worker counts, etc., are passed straight from env.*. Confirm the Zod schema parses them to numbers; if they are still strings, RedisWorker will treat "5000" as a string and mis-handle arithmetic/timeouts.

If parsing isn’t already done in env.server.ts, coerce here:

pollIntervalMs: Number(env.COMMON_WORKER_POLL_INTERVAL),
apps/webapp/app/services/worker.server.ts (1)

238-246: Confirm finalAttempt semantics against worker implementation

finalAttempt is derived from job.attempts === job.max_attempts.
In graphile-worker the attempts counter already includes the current attempt, so the flag will be true while the job is still running, not after it has finally failed.
If scheduleEngine.triggerScheduledTask() internally interprets finalAttempt as “no more retries will follow”, this could lead to premature deletion / alerting.

await scheduleEngine.triggerScheduledTask({
  instanceId: payload.instanceId,
  finalAttempt: job.attempts === job.max_attempts, // ← verify semantics
});

Please double-check the contract and adjust to job.attempts + 1 === job.max_attempts (or similar) if the engine expects the next attempt to be the last.

apps/webapp/app/v3/services/timeoutDeployment.server.ts (1)

66-69: Acknowledge both queues symmetrically

dequeue is invoked on workerQueue, while ack is invoked on commonWorker.
If workerQueue.dequeue() fails (e.g., job not found) the commonWorker.ack() will still run, silently discarding the job in the new queue. Consider guarding with an existence check or catching errors to keep behaviour predictable during the migration window.

packages/redis-worker/src/worker.ts (1)

37-48: Type extraction looks good

Extracting JobHandlerParams makes handler definitions clearer and reusable; compile-time inference also improves.

apps/webapp/app/v3/services/resumeBatchRun.server.ts (1)

337-344: Verify availableAt semantics

commonWorker.enqueue often expects a Unix epoch (ms) whereas you’re passing a Date | undefined.
Double-check the worker’s API – if it expects a number, wrap with .getTime(); if it accepts Date, ignore this note.

internal-packages/schedule-engine/src/engine/scheduleCalculation.ts (1)

22-30: LGTM – cron calculation is concise and correct

Comment on lines +8 to +10
testTimeout: 30000,
hookTimeout: 30000,
},
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Avoid conflicting time-out definitions

testTimeout/hookTimeout are set to 30 s here, but ./test/setup.ts later overrides the default timeout to 60 s for container-heavy tests. The lower value configured here will win and may randomly fail the longer integration suite.

-    testTimeout: 30000,
-    hookTimeout: 30000,
+    // Allow the 60 s override in ./test/setup.ts to take effect
+    // testTimeout: 60000,
+    // hookTimeout: 60000,

Either bump these to 60 s or drop them entirely so the per-test override is the single source of truth.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
testTimeout: 30000,
hookTimeout: 30000,
},
// Allow the 60 s override in ./test/setup.ts to take effect
// testTimeout: 60000,
// hookTimeout: 60000,
},
🤖 Prompt for AI Agents
In internal-packages/schedule-engine/vitest.config.ts around lines 8 to 10, the
testTimeout and hookTimeout are set to 30000 ms (30 s), which conflicts with the
60000 ms (60 s) timeout set later in ./test/setup.ts for container-heavy tests.
To fix this, either increase both testTimeout and hookTimeout here to 60000 ms
to match the later override or remove these timeout settings entirely so that
the per-test override in setup.ts is the single source of truth.

Comment on lines +304 to 308
options.queueTimestamp ??
(parentRun && body.options?.resumeParentOnCompletion
? parentRun.queueTimestamp ?? undefined
: undefined,
: undefined),
runChainState,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

options.queueTimestamp needs validation

The new override is accepted blindly. If a caller passes an invalid date (string, future > now + years, etc.) it will persist to the DB and break chronological queries & metrics.

Add a narrow guard before assignment:

-                  queueTimestamp:
-                    options.queueTimestamp ??
+                  queueTimestamp:
+                    (options.queueTimestamp instanceof Date
+                      ? options.queueTimestamp
+                      : undefined) ??
                     (parentRun && body.options?.resumeParentOnCompletion
                       ? parentRun.queueTimestamp ?? undefined
                       : undefined),

or run it through a zod schema shared with the client.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
options.queueTimestamp ??
(parentRun && body.options?.resumeParentOnCompletion
? parentRun.queueTimestamp ?? undefined
: undefined,
: undefined),
runChainState,
queueTimestamp:
(options.queueTimestamp instanceof Date
? options.queueTimestamp
: undefined) ??
(parentRun && body.options?.resumeParentOnCompletion
? parentRun.queueTimestamp ?? undefined
: undefined),
runChainState,
🤖 Prompt for AI Agents
In apps/webapp/app/runEngine/services/triggerTask.server.ts around lines 304 to
308, the code assigns options.queueTimestamp without validation, risking invalid
dates being saved and breaking chronological logic. Add a validation step before
assignment to ensure options.queueTimestamp is a valid date and within an
acceptable range (not a future date far beyond now). You can implement this by
checking the type and range explicitly or by validating it through a shared zod
schema with the client, then only assign if valid; otherwise, fallback to the
existing logic.

Comment on lines +5 to +8
schema: z.object({
instanceId: z.string(),
exactScheduleTime: z.coerce.date(),
}),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Potential silent date parsing failures

z.coerce.date() will accept almost anything that new Date(value) doesn’t throw on, including ambiguous or invalid ISO strings that silently become "Invalid Date". Consider a stricter guard:

exactScheduleTime: z
  .string()
  .datetime({ offset: true }) // stricter ISO-8601 with timezone
  .transform((v) => new Date(v)),

to fail fast on malformed payloads.

🤖 Prompt for AI Agents
In internal-packages/schedule-engine/src/engine/workerCatalog.ts around lines 5
to 8, replace the use of z.coerce.date() for exactScheduleTime with a stricter
validation by first validating the value as a string with z.string().datetime({
offset: true }) to enforce ISO-8601 format with timezone, then transform it to a
Date object. This change ensures that invalid or ambiguous date strings cause
validation to fail immediately instead of silently producing invalid Date
objects.

Comment on lines 26 to +34
static async enqueue(runId: string, tx: PrismaClientOrTransaction, runAt?: Date) {
return await workerQueue.enqueue(
"v3.retryAttempt",
{
return await commonWorker.enqueue({
id: `retryAttempt:${runId}`,
job: "v3.retryAttempt",
payload: {
runId,
},
{
tx,
runAt,
jobKey: `retryAttempt:${runId}`,
}
);
availableAt: runAt,
});
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Unused tx parameter

enqueue keeps the tx: PrismaClientOrTransaction argument but never uses it after the worker refactor, which will raise linter/TS warnings and confuses callers.

-  static async enqueue(runId: string, tx: PrismaClientOrTransaction, runAt?: Date) {
+  static async enqueue(runId: string, runAt?: Date) {

Update call-sites accordingly.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
static async enqueue(runId: string, tx: PrismaClientOrTransaction, runAt?: Date) {
return await workerQueue.enqueue(
"v3.retryAttempt",
{
return await commonWorker.enqueue({
id: `retryAttempt:${runId}`,
job: "v3.retryAttempt",
payload: {
runId,
},
{
tx,
runAt,
jobKey: `retryAttempt:${runId}`,
}
);
availableAt: runAt,
});
static async enqueue(runId: string, runAt?: Date) {
return await commonWorker.enqueue({
id: `retryAttempt:${runId}`,
job: "v3.retryAttempt",
payload: {
runId,
},
availableAt: runAt,
});
🤖 Prompt for AI Agents
In apps/webapp/app/v3/services/retryAttempt.server.ts between lines 26 and 34,
the enqueue method has an unused parameter tx of type PrismaClientOrTransaction
that is no longer needed after the worker refactor. Remove the tx parameter from
the method signature and update all call-sites to stop passing this argument to
avoid linter and TypeScript warnings and reduce confusion.

Comment on lines +50 to +67
const engine = new ScheduleEngine({
prisma,
logLevel: env.SCHEDULE_ENGINE_LOG_LEVEL,
redis: {
host: env.SCHEDULE_WORKER_REDIS_HOST ?? "localhost",
port: env.SCHEDULE_WORKER_REDIS_PORT ?? 6379,
username: env.SCHEDULE_WORKER_REDIS_USERNAME,
password: env.SCHEDULE_WORKER_REDIS_PASSWORD,
keyPrefix: "schedule:",
enableAutoPipelining: true,
...(env.SCHEDULE_WORKER_REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }),
},
worker: {
concurrency: env.SCHEDULE_WORKER_CONCURRENCY_LIMIT,
pollIntervalMs: env.SCHEDULE_WORKER_POLL_INTERVAL,
shutdownTimeoutMs: env.SCHEDULE_WORKER_SHUTDOWN_TIMEOUT_MS,
disabled: env.SCHEDULE_WORKER_ENABLED === "0",
},
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Coerce env values to numbers

env.SCHEDULE_WORKER_REDIS_PORT, SCHEDULE_WORKER_CONCURRENCY_LIMIT, etc. come from process.env and will be strings.
Pass them through Number() (or zod coerce.number()) before feeding them to redis / worker configs to prevent silent mis-configuration.

🤖 Prompt for AI Agents
In apps/webapp/app/v3/scheduleEngine.server.ts around lines 50 to 67,
environment variables like SCHEDULE_WORKER_REDIS_PORT and
SCHEDULE_WORKER_CONCURRENCY_LIMIT are used directly but they are strings from
process.env. Convert these string values to numbers using Number() before
assigning them to the redis and worker configuration properties to ensure
correct types and prevent misconfiguration.

Comment on lines +8 to +9
environment: any; // The full environment object from the database with all relationships
payload: {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Avoid any – model the Environment shape

environment: any defeats the purpose of the strongly-typed contracts you’re introducing. Define a EnvironmentPayload interface (even if partial) or use unknown + explicit narrowing at call-sites.

🤖 Prompt for AI Agents
In internal-packages/schedule-engine/src/engine/types.ts at lines 8 to 9,
replace the type of the environment property from any to a properly defined
interface named EnvironmentPayload that models the expected shape of the
environment object. Define this interface with the relevant fields you expect
from the database, or alternatively use unknown and perform explicit type checks
where environment is used. This will enforce strong typing and improve type
safety.

Comment on lines +42 to +44
this.tracer = options.tracer ?? (startSpan as any).tracer;
this.meter = options.meter ?? getMeter("schedule-engine");

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Unsafe tracer fallback

Casting startSpan to any to pluck .tracer bypasses type-safety and will break if the util implementation changes.
Prefer passing an explicit getTracer() helper or making tracer a required option in production builds.

🤖 Prompt for AI Agents
In internal-packages/schedule-engine/src/engine/index.ts around lines 42 to 44,
avoid using an unsafe fallback by casting startSpan to any to access .tracer, as
this bypasses type safety and risks breakage if the implementation changes.
Instead, refactor the code to require an explicit tracer option or provide a
dedicated getTracer() helper function to obtain the tracer, ensuring type safety
and reliability in production builds.

Comment on lines +606 to +615
try {
await this.worker.enqueue({
id: `scheduled-task-instance:${instanceId}`,
job: "schedule.triggerScheduledTask",
payload: {
instanceId,
exactScheduleTime,
},
availableAt: distributedExecutionTime,
});
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Job-ID collision risk

id: \scheduled-task-instance:${instanceId}`is reused for every run of the same schedule instance. If the previous run’s job hasn’t been pruned yet,enqueue` may reject or overwrite, silently dropping executions.
Include the scheduled timestamp (or an incrementing counter) in the ID:

- id: `scheduled-task-instance:${instanceId}`,
+ id: `scheduled-task-instance:${instanceId}:${exactScheduleTime.getTime()}`,
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
try {
await this.worker.enqueue({
id: `scheduled-task-instance:${instanceId}`,
job: "schedule.triggerScheduledTask",
payload: {
instanceId,
exactScheduleTime,
},
availableAt: distributedExecutionTime,
});
try {
await this.worker.enqueue({
- id: `scheduled-task-instance:${instanceId}`,
+ id: `scheduled-task-instance:${instanceId}:${exactScheduleTime.getTime()}`,
job: "schedule.triggerScheduledTask",
payload: {
instanceId,
exactScheduleTime,
},
availableAt: distributedExecutionTime,
});
🤖 Prompt for AI Agents
In internal-packages/schedule-engine/src/engine/index.ts around lines 606 to
615, the job ID uses only the instanceId, causing potential collisions if
multiple runs of the same instance are enqueued before pruning. To fix this,
modify the job ID to include the scheduled timestamp or a unique incrementing
counter along with the instanceId to ensure each job ID is unique and prevent
silent overwrites or rejections.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant