Skip to content

Ability to perform bulk run backfills #2304

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jul 23, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions apps/webapp/app/env.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -763,6 +763,45 @@ const EnvironmentSchema = z.object({
.default(process.env.REDIS_TLS_DISABLED ?? "false"),
BATCH_TRIGGER_WORKER_REDIS_CLUSTER_MODE_ENABLED: z.string().default("0"),

ADMIN_WORKER_ENABLED: z.string().default(process.env.WORKER_ENABLED ?? "true"),
ADMIN_WORKER_CONCURRENCY_WORKERS: z.coerce.number().int().default(2),
ADMIN_WORKER_CONCURRENCY_TASKS_PER_WORKER: z.coerce.number().int().default(10),
ADMIN_WORKER_POLL_INTERVAL: z.coerce.number().int().default(1000),
ADMIN_WORKER_IMMEDIATE_POLL_INTERVAL: z.coerce.number().int().default(50),
ADMIN_WORKER_CONCURRENCY_LIMIT: z.coerce.number().int().default(20),
ADMIN_WORKER_SHUTDOWN_TIMEOUT_MS: z.coerce.number().int().default(60_000),
ADMIN_WORKER_LOG_LEVEL: z.enum(["log", "error", "warn", "info", "debug"]).default("info"),

ADMIN_WORKER_REDIS_HOST: z
.string()
.optional()
.transform((v) => v ?? process.env.REDIS_HOST),
ADMIN_WORKER_REDIS_READER_HOST: z
.string()
.optional()
.transform((v) => v ?? process.env.REDIS_READER_HOST),
ADMIN_WORKER_REDIS_READER_PORT: z.coerce
.number()
.optional()
.transform(
(v) =>
v ?? (process.env.REDIS_READER_PORT ? parseInt(process.env.REDIS_READER_PORT) : undefined)
),
ADMIN_WORKER_REDIS_PORT: z.coerce
.number()
.optional()
.transform((v) => v ?? (process.env.REDIS_PORT ? parseInt(process.env.REDIS_PORT) : undefined)),
ADMIN_WORKER_REDIS_USERNAME: z
.string()
.optional()
.transform((v) => v ?? process.env.REDIS_USERNAME),
ADMIN_WORKER_REDIS_PASSWORD: z
.string()
.optional()
.transform((v) => v ?? process.env.REDIS_PASSWORD),
ADMIN_WORKER_REDIS_TLS_DISABLED: z.string().default(process.env.REDIS_TLS_DISABLED ?? "false"),
ADMIN_WORKER_REDIS_CLUSTER_MODE_ENABLED: z.string().default("0"),

ALERTS_WORKER_ENABLED: z.string().default(process.env.WORKER_ENABLED ?? "true"),
ALERTS_WORKER_CONCURRENCY_WORKERS: z.coerce.number().int().default(2),
ALERTS_WORKER_CONCURRENCY_TASKS_PER_WORKER: z.coerce.number().int().default(10),
Expand Down
12 changes: 1 addition & 11 deletions apps/webapp/app/routes/admin.api.v1.feature-flags.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,7 @@
import { ActionFunctionArgs, json } from "@remix-run/server-runtime";
import { prisma } from "~/db.server";
import { authenticateApiRequestWithPersonalAccessToken } from "~/services/personalAccessToken.server";
import { getRunsReplicationGlobal } from "~/services/runsReplicationGlobal.server";
import { runsReplicationInstance } from "~/services/runsReplicationInstance.server";
import {
makeSetFlags,
setFlags,
FeatureFlagCatalogSchema,
validateAllFeatureFlags,
validatePartialFeatureFlags,
makeSetMultipleFlags,
} from "~/v3/featureFlags.server";
import { z } from "zod";
import { makeSetMultipleFlags, validatePartialFeatureFlags } from "~/v3/featureFlags.server";

export async function action({ request }: ActionFunctionArgs) {
// Next authenticate the request
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
import { type ActionFunctionArgs, json } from "@remix-run/server-runtime";
import { z } from "zod";
import { prisma } from "~/db.server";
import { authenticateApiRequestWithPersonalAccessToken } from "~/services/personalAccessToken.server";
import { adminWorker } from "~/v3/services/adminWorker.server";

const Body = z.object({
from: z.coerce.date(),
to: z.coerce.date(),
batchSize: z.number().optional(),
delayIntervalMs: z.number().optional(),
});

const Params = z.object({
batchId: z.string(),
});

const DEFAULT_BATCH_SIZE = 500;
const DEFAULT_DELAY_INTERVAL_MS = 1000;

export async function action({ request, params }: ActionFunctionArgs) {
// Next authenticate the request
const authenticationResult = await authenticateApiRequestWithPersonalAccessToken(request);

if (!authenticationResult) {
return json({ error: "Invalid or Missing API key" }, { status: 401 });
}

const user = await prisma.user.findUnique({
where: {
id: authenticationResult.userId,
},
});

if (!user) {
return json({ error: "Invalid or Missing API key" }, { status: 401 });
}

if (!user.admin) {
return json({ error: "You must be an admin to perform this action" }, { status: 403 });
}

const { batchId } = Params.parse(params);

try {
const body = await request.json();

const { from, to, batchSize, delayIntervalMs } = Body.parse(body);

await adminWorker.enqueue({
job: "admin.backfillRunsToReplication",
payload: {
from,
to,
batchSize: batchSize ?? DEFAULT_BATCH_SIZE,
delayIntervalMs: delayIntervalMs ?? DEFAULT_DELAY_INTERVAL_MS,
},
id: batchId,
});

return json({
success: true,
id: batchId,
});
} catch (error) {
return json({ error: error instanceof Error ? error.message : error }, { status: 400 });
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
import { type ActionFunctionArgs, json } from "@remix-run/server-runtime";
import { z } from "zod";
import { prisma } from "~/db.server";
import { authenticateApiRequestWithPersonalAccessToken } from "~/services/personalAccessToken.server";
import { adminWorker } from "~/v3/services/adminWorker.server";

const Params = z.object({
batchId: z.string(),
});

export async function action({ request, params }: ActionFunctionArgs) {
// Next authenticate the request
const authenticationResult = await authenticateApiRequestWithPersonalAccessToken(request);

if (!authenticationResult) {
return json({ error: "Invalid or Missing API key" }, { status: 401 });
}

const user = await prisma.user.findUnique({
where: {
id: authenticationResult.userId,
},
});

if (!user) {
return json({ error: "Invalid or Missing API key" }, { status: 401 });
}

if (!user.admin) {
return json({ error: "You must be an admin to perform this action" }, { status: 403 });
}

const { batchId } = Params.parse(params);

try {
await adminWorker.cancel(batchId);

return json({
success: true,
id: batchId,
});
} catch (error) {
return json({ error: error instanceof Error ? error.message : error }, { status: 400 });
}
}
92 changes: 92 additions & 0 deletions apps/webapp/app/services/runsBackfiller.server.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
import { Tracer } from "@opentelemetry/api";
import type { PrismaClientOrTransaction } from "@trigger.dev/database";
import { RunsReplicationService } from "~/services/runsReplicationService.server";
import { startSpan } from "~/v3/tracing.server";
import { FINAL_RUN_STATUSES } from "../v3/taskStatus";
import { Logger } from "@trigger.dev/core/logger";

export class RunsBackfillerService {
private readonly prisma: PrismaClientOrTransaction;
private readonly runsReplicationInstance: RunsReplicationService;
private readonly tracer: Tracer;
private readonly logger: Logger;

constructor(opts: {
prisma: PrismaClientOrTransaction;
runsReplicationInstance: RunsReplicationService;
tracer: Tracer;
logLevel?: "log" | "error" | "warn" | "info" | "debug";
}) {
this.prisma = opts.prisma;
this.runsReplicationInstance = opts.runsReplicationInstance;
this.tracer = opts.tracer;
this.logger = new Logger("RunsBackfillerService", opts.logLevel ?? "debug");
}

public async call({
from,
to,
cursor,
batchSize,
}: {
from: Date;
to: Date;
cursor?: string;
batchSize?: number;
}): Promise<string | undefined> {
return await startSpan(this.tracer, "RunsBackfillerService.call()", async (span) => {
span.setAttribute("from", from.toISOString());
span.setAttribute("to", to.toISOString());
span.setAttribute("cursor", cursor ?? "");
span.setAttribute("batchSize", batchSize ?? 0);

const runs = await this.prisma.taskRun.findMany({
where: {
createdAt: {
gte: from,
lte: to,
},
status: {
in: FINAL_RUN_STATUSES,
},
...(cursor ? { id: { gt: cursor } } : {}),
},
orderBy: {
id: "asc",
},
take: batchSize,
});

if (runs.length === 0) {
this.logger.info("No runs to backfill", { from, to, cursor });

return;
}

this.logger.info("Backfilling runs", {
from,
to,
cursor,
batchSize,
runCount: runs.length,
firstCreatedAt: runs[0].createdAt,
lastCreatedAt: runs[runs.length - 1].createdAt,
});

await this.runsReplicationInstance.backfill(runs);

const lastRun = runs[runs.length - 1];

this.logger.info("Backfilled runs", {
from,
to,
cursor,
batchSize,
lastRunId: lastRun.id,
});

// Return the last run ID to continue from
return lastRun.id;
});
}
}
102 changes: 102 additions & 0 deletions apps/webapp/app/v3/services/adminWorker.server.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
import { Logger } from "@trigger.dev/core/logger";
import { Worker as RedisWorker } from "@trigger.dev/redis-worker";
import { z } from "zod";
import { env } from "~/env.server";
import { logger } from "~/services/logger.server";
import { runsReplicationInstance } from "~/services/runsReplicationInstance.server";
import { singleton } from "~/utils/singleton";
import { tracer } from "../tracer.server";
import { $replica } from "~/db.server";
import { RunsBackfillerService } from "../../services/runsBackfiller.server";

function initializeWorker() {
const redisOptions = {
keyPrefix: "admin:worker:",
host: env.ADMIN_WORKER_REDIS_HOST,
port: env.ADMIN_WORKER_REDIS_PORT,
username: env.ADMIN_WORKER_REDIS_USERNAME,
password: env.ADMIN_WORKER_REDIS_PASSWORD,
enableAutoPipelining: true,
...(env.ADMIN_WORKER_REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }),
};

logger.debug(`👨‍🏭 Initializing admin worker at host ${env.ADMIN_WORKER_REDIS_HOST}`);

const worker = new RedisWorker({
name: "admin-worker",
redisOptions,
catalog: {
"admin.backfillRunsToReplication": {
schema: z.object({
from: z.coerce.date(),
to: z.coerce.date(),
cursor: z.string().optional(),
batchSize: z.coerce.number().int().default(500),
delayIntervalMs: z.coerce.number().int().default(1000),
}),
visibilityTimeoutMs: 60_000 * 15, // 15 minutes
retry: {
maxAttempts: 5,
},
},
},
concurrency: {
workers: env.ADMIN_WORKER_CONCURRENCY_WORKERS,
tasksPerWorker: env.ADMIN_WORKER_CONCURRENCY_TASKS_PER_WORKER,
limit: env.ADMIN_WORKER_CONCURRENCY_LIMIT,
},
pollIntervalMs: env.ADMIN_WORKER_POLL_INTERVAL,
immediatePollIntervalMs: env.ADMIN_WORKER_IMMEDIATE_POLL_INTERVAL,
shutdownTimeoutMs: env.ADMIN_WORKER_SHUTDOWN_TIMEOUT_MS,
logger: new Logger("AdminWorker", env.ADMIN_WORKER_LOG_LEVEL),
jobs: {
"admin.backfillRunsToReplication": async ({ payload, id }) => {
if (!runsReplicationInstance) {
logger.error("Runs replication instance not found");
return;
}

const service = new RunsBackfillerService({
prisma: $replica,
runsReplicationInstance: runsReplicationInstance,
tracer: tracer,
});

const cursor = await service.call({
from: payload.from,
to: payload.to,
cursor: payload.cursor,
batchSize: payload.batchSize,
});

if (cursor) {
await worker.enqueue({
job: "admin.backfillRunsToReplication",
payload: {
from: payload.from,
to: payload.to,
cursor,
batchSize: payload.batchSize,
delayIntervalMs: payload.delayIntervalMs,
},
id,
availableAt: new Date(Date.now() + payload.delayIntervalMs),
cancellationKey: id,
});
}
},
},
});

if (env.ADMIN_WORKER_ENABLED === "true") {
logger.debug(
`👨‍🏭 Starting admin worker at host ${env.ADMIN_WORKER_REDIS_HOST}, pollInterval = ${env.ADMIN_WORKER_POLL_INTERVAL}, immediatePollInterval = ${env.ADMIN_WORKER_IMMEDIATE_POLL_INTERVAL}, workers = ${env.ADMIN_WORKER_CONCURRENCY_WORKERS}, tasksPerWorker = ${env.ADMIN_WORKER_CONCURRENCY_TASKS_PER_WORKER}, concurrencyLimit = ${env.ADMIN_WORKER_CONCURRENCY_LIMIT}`
);

worker.start();
}

return worker;
}

export const adminWorker = singleton("adminWorker", initializeWorker);
Loading