Skip to content

Commit ffb45a4

Browse files
committed
Ability to perform bulk run backfills
1 parent b447a80 commit ffb45a4

File tree

10 files changed

+661
-18
lines changed

10 files changed

+661
-18
lines changed

apps/webapp/app/env.server.ts

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -763,6 +763,45 @@ const EnvironmentSchema = z.object({
763763
.default(process.env.REDIS_TLS_DISABLED ?? "false"),
764764
BATCH_TRIGGER_WORKER_REDIS_CLUSTER_MODE_ENABLED: z.string().default("0"),
765765

766+
ADMIN_WORKER_ENABLED: z.string().default(process.env.WORKER_ENABLED ?? "true"),
767+
ADMIN_WORKER_CONCURRENCY_WORKERS: z.coerce.number().int().default(2),
768+
ADMIN_WORKER_CONCURRENCY_TASKS_PER_WORKER: z.coerce.number().int().default(10),
769+
ADMIN_WORKER_POLL_INTERVAL: z.coerce.number().int().default(1000),
770+
ADMIN_WORKER_IMMEDIATE_POLL_INTERVAL: z.coerce.number().int().default(50),
771+
ADMIN_WORKER_CONCURRENCY_LIMIT: z.coerce.number().int().default(20),
772+
ADMIN_WORKER_SHUTDOWN_TIMEOUT_MS: z.coerce.number().int().default(60_000),
773+
ADMIN_WORKER_LOG_LEVEL: z.enum(["log", "error", "warn", "info", "debug"]).default("info"),
774+
775+
ADMIN_WORKER_REDIS_HOST: z
776+
.string()
777+
.optional()
778+
.transform((v) => v ?? process.env.REDIS_HOST),
779+
ADMIN_WORKER_REDIS_READER_HOST: z
780+
.string()
781+
.optional()
782+
.transform((v) => v ?? process.env.REDIS_READER_HOST),
783+
ADMIN_WORKER_REDIS_READER_PORT: z.coerce
784+
.number()
785+
.optional()
786+
.transform(
787+
(v) =>
788+
v ?? (process.env.REDIS_READER_PORT ? parseInt(process.env.REDIS_READER_PORT) : undefined)
789+
),
790+
ADMIN_WORKER_REDIS_PORT: z.coerce
791+
.number()
792+
.optional()
793+
.transform((v) => v ?? (process.env.REDIS_PORT ? parseInt(process.env.REDIS_PORT) : undefined)),
794+
ADMIN_WORKER_REDIS_USERNAME: z
795+
.string()
796+
.optional()
797+
.transform((v) => v ?? process.env.REDIS_USERNAME),
798+
ADMIN_WORKER_REDIS_PASSWORD: z
799+
.string()
800+
.optional()
801+
.transform((v) => v ?? process.env.REDIS_PASSWORD),
802+
ADMIN_WORKER_REDIS_TLS_DISABLED: z.string().default(process.env.REDIS_TLS_DISABLED ?? "false"),
803+
ADMIN_WORKER_REDIS_CLUSTER_MODE_ENABLED: z.string().default("0"),
804+
766805
ALERTS_WORKER_ENABLED: z.string().default(process.env.WORKER_ENABLED ?? "true"),
767806
ALERTS_WORKER_CONCURRENCY_WORKERS: z.coerce.number().int().default(2),
768807
ALERTS_WORKER_CONCURRENCY_TASKS_PER_WORKER: z.coerce.number().int().default(10),

apps/webapp/app/routes/admin.api.v1.feature-flags.ts

Lines changed: 1 addition & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,7 @@
11
import { ActionFunctionArgs, json } from "@remix-run/server-runtime";
22
import { prisma } from "~/db.server";
33
import { authenticateApiRequestWithPersonalAccessToken } from "~/services/personalAccessToken.server";
4-
import { getRunsReplicationGlobal } from "~/services/runsReplicationGlobal.server";
5-
import { runsReplicationInstance } from "~/services/runsReplicationInstance.server";
6-
import {
7-
makeSetFlags,
8-
setFlags,
9-
FeatureFlagCatalogSchema,
10-
validateAllFeatureFlags,
11-
validatePartialFeatureFlags,
12-
makeSetMultipleFlags,
13-
} from "~/v3/featureFlags.server";
14-
import { z } from "zod";
4+
import { makeSetMultipleFlags, validatePartialFeatureFlags } from "~/v3/featureFlags.server";
155

166
export async function action({ request }: ActionFunctionArgs) {
177
// Next authenticate the request
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
import { type ActionFunctionArgs, json } from "@remix-run/server-runtime";
2+
import { z } from "zod";
3+
import { prisma } from "~/db.server";
4+
import { authenticateApiRequestWithPersonalAccessToken } from "~/services/personalAccessToken.server";
5+
import { adminWorker } from "~/v3/services/adminWorker.server";
6+
7+
const Body = z.object({
8+
from: z.coerce.date(),
9+
to: z.coerce.date(),
10+
batchSize: z.number().optional(),
11+
});
12+
13+
const Params = z.object({
14+
batchId: z.string(),
15+
});
16+
17+
const DEFAULT_BATCH_SIZE = 500;
18+
19+
export async function action({ request, params }: ActionFunctionArgs) {
20+
// Next authenticate the request
21+
const authenticationResult = await authenticateApiRequestWithPersonalAccessToken(request);
22+
23+
if (!authenticationResult) {
24+
return json({ error: "Invalid or Missing API key" }, { status: 401 });
25+
}
26+
27+
const user = await prisma.user.findUnique({
28+
where: {
29+
id: authenticationResult.userId,
30+
},
31+
});
32+
33+
if (!user) {
34+
return json({ error: "Invalid or Missing API key" }, { status: 401 });
35+
}
36+
37+
if (!user.admin) {
38+
return json({ error: "You must be an admin to perform this action" }, { status: 403 });
39+
}
40+
41+
const { batchId } = Params.parse(params);
42+
43+
try {
44+
const body = await request.json();
45+
46+
const { from, to, batchSize } = Body.parse(body);
47+
48+
await adminWorker.enqueue({
49+
job: "admin.backfillRunsToReplication",
50+
payload: {
51+
from,
52+
to,
53+
batchSize: batchSize ?? DEFAULT_BATCH_SIZE,
54+
},
55+
id: batchId,
56+
});
57+
58+
return json({
59+
success: true,
60+
id: batchId,
61+
});
62+
} catch (error) {
63+
return json({ error: error instanceof Error ? error.message : error }, { status: 400 });
64+
}
65+
}
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
import { type ActionFunctionArgs, json } from "@remix-run/server-runtime";
2+
import { z } from "zod";
3+
import { prisma } from "~/db.server";
4+
import { authenticateApiRequestWithPersonalAccessToken } from "~/services/personalAccessToken.server";
5+
import { adminWorker } from "~/v3/services/adminWorker.server";
6+
7+
const Params = z.object({
8+
batchId: z.string(),
9+
});
10+
11+
export async function action({ request, params }: ActionFunctionArgs) {
12+
// Next authenticate the request
13+
const authenticationResult = await authenticateApiRequestWithPersonalAccessToken(request);
14+
15+
if (!authenticationResult) {
16+
return json({ error: "Invalid or Missing API key" }, { status: 401 });
17+
}
18+
19+
const user = await prisma.user.findUnique({
20+
where: {
21+
id: authenticationResult.userId,
22+
},
23+
});
24+
25+
if (!user) {
26+
return json({ error: "Invalid or Missing API key" }, { status: 401 });
27+
}
28+
29+
if (!user.admin) {
30+
return json({ error: "You must be an admin to perform this action" }, { status: 403 });
31+
}
32+
33+
const { batchId } = Params.parse(params);
34+
35+
try {
36+
await adminWorker.cancel(batchId);
37+
38+
return json({
39+
success: true,
40+
id: batchId,
41+
});
42+
} catch (error) {
43+
return json({ error: error instanceof Error ? error.message : error }, { status: 400 });
44+
}
45+
}
Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
import { Tracer } from "@opentelemetry/api";
2+
import type { PrismaClientOrTransaction } from "@trigger.dev/database";
3+
import { RunsReplicationService } from "~/services/runsReplicationService.server";
4+
import { startSpan } from "~/v3/tracing.server";
5+
import { FINAL_RUN_STATUSES } from "../v3/taskStatus";
6+
import { Logger } from "@trigger.dev/core/logger";
7+
8+
export class RunsBackfillerService {
9+
private readonly prisma: PrismaClientOrTransaction;
10+
private readonly runsReplicationInstance: RunsReplicationService;
11+
private readonly tracer: Tracer;
12+
private readonly logger: Logger;
13+
14+
constructor(opts: {
15+
prisma: PrismaClientOrTransaction;
16+
runsReplicationInstance: RunsReplicationService;
17+
tracer: Tracer;
18+
logLevel?: "log" | "error" | "warn" | "info" | "debug";
19+
}) {
20+
this.prisma = opts.prisma;
21+
this.runsReplicationInstance = opts.runsReplicationInstance;
22+
this.tracer = opts.tracer;
23+
this.logger = new Logger("RunsBackfillerService", opts.logLevel ?? "debug");
24+
}
25+
26+
public async call({
27+
from,
28+
to,
29+
cursor,
30+
batchSize,
31+
}: {
32+
from: Date;
33+
to: Date;
34+
cursor?: string;
35+
batchSize?: number;
36+
}): Promise<string | undefined> {
37+
return await startSpan(this.tracer, "RunsBackfillerService.call()", async (span) => {
38+
span.setAttribute("from", from.toISOString());
39+
span.setAttribute("to", to.toISOString());
40+
span.setAttribute("cursor", cursor ?? "");
41+
span.setAttribute("batchSize", batchSize ?? 0);
42+
43+
const runs = await this.prisma.taskRun.findMany({
44+
where: {
45+
createdAt: {
46+
gte: from,
47+
lte: to,
48+
},
49+
status: {
50+
in: FINAL_RUN_STATUSES,
51+
},
52+
...(cursor ? { id: { gt: cursor } } : {}),
53+
},
54+
orderBy: {
55+
id: "asc",
56+
},
57+
take: batchSize,
58+
});
59+
60+
if (runs.length === 0) {
61+
this.logger.info("No runs to backfill", { from, to, cursor });
62+
63+
return;
64+
}
65+
66+
this.logger.info("Backfilling runs", {
67+
from,
68+
to,
69+
cursor,
70+
batchSize,
71+
runCount: runs.length,
72+
firstCreatedAt: runs[0].createdAt,
73+
lastCreatedAt: runs[runs.length - 1].createdAt,
74+
});
75+
76+
await this.runsReplicationInstance.backfill(runs);
77+
78+
const lastRun = runs[runs.length - 1];
79+
80+
this.logger.info("Backfilled runs", {
81+
from,
82+
to,
83+
cursor,
84+
batchSize,
85+
lastRunId: lastRun.id,
86+
});
87+
88+
// Return the last run ID to continue from
89+
return lastRun.id;
90+
});
91+
}
92+
}
Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
import { Logger } from "@trigger.dev/core/logger";
2+
import { Worker as RedisWorker } from "@trigger.dev/redis-worker";
3+
import { z } from "zod";
4+
import { env } from "~/env.server";
5+
import { logger } from "~/services/logger.server";
6+
import { runsReplicationInstance } from "~/services/runsReplicationInstance.server";
7+
import { singleton } from "~/utils/singleton";
8+
import { tracer } from "../tracer.server";
9+
import { $replica } from "~/db.server";
10+
import { RunsBackfillerService } from "../../services/runsBackfiller.server";
11+
12+
function initializeWorker() {
13+
const redisOptions = {
14+
keyPrefix: "admin:worker:",
15+
host: env.ADMIN_WORKER_REDIS_HOST,
16+
port: env.ADMIN_WORKER_REDIS_PORT,
17+
username: env.ADMIN_WORKER_REDIS_USERNAME,
18+
password: env.ADMIN_WORKER_REDIS_PASSWORD,
19+
enableAutoPipelining: true,
20+
...(env.ADMIN_WORKER_REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }),
21+
};
22+
23+
logger.debug(`👨‍🏭 Initializing admin worker at host ${env.ADMIN_WORKER_REDIS_HOST}`);
24+
25+
const worker = new RedisWorker({
26+
name: "admin-worker",
27+
redisOptions,
28+
catalog: {
29+
"admin.backfillRunsToReplication": {
30+
schema: z.object({
31+
from: z.coerce.date(),
32+
to: z.coerce.date(),
33+
cursor: z.string().optional(),
34+
batchSize: z.coerce.number().int().default(500),
35+
}),
36+
visibilityTimeoutMs: 60_000 * 15, // 15 minutes
37+
retry: {
38+
maxAttempts: 5,
39+
},
40+
},
41+
},
42+
concurrency: {
43+
workers: env.ADMIN_WORKER_CONCURRENCY_WORKERS,
44+
tasksPerWorker: env.ADMIN_WORKER_CONCURRENCY_TASKS_PER_WORKER,
45+
limit: env.ADMIN_WORKER_CONCURRENCY_LIMIT,
46+
},
47+
pollIntervalMs: env.ADMIN_WORKER_POLL_INTERVAL,
48+
immediatePollIntervalMs: env.ADMIN_WORKER_IMMEDIATE_POLL_INTERVAL,
49+
shutdownTimeoutMs: env.ADMIN_WORKER_SHUTDOWN_TIMEOUT_MS,
50+
logger: new Logger("AdminWorker", env.ADMIN_WORKER_LOG_LEVEL),
51+
jobs: {
52+
"admin.backfillRunsToReplication": async ({ payload, id }) => {
53+
if (!runsReplicationInstance) {
54+
logger.error("Runs replication instance not found");
55+
return;
56+
}
57+
58+
const service = new RunsBackfillerService({
59+
prisma: $replica,
60+
runsReplicationInstance: runsReplicationInstance,
61+
tracer: tracer,
62+
});
63+
64+
const cursor = await service.call({
65+
from: payload.from,
66+
to: payload.to,
67+
cursor: payload.cursor,
68+
batchSize: payload.batchSize,
69+
});
70+
71+
if (cursor) {
72+
await worker.enqueue({
73+
job: "admin.backfillRunsToReplication",
74+
payload: {
75+
from: payload.from,
76+
to: payload.to,
77+
cursor,
78+
batchSize: payload.batchSize,
79+
},
80+
id,
81+
availableAt: new Date(Date.now() + 1000),
82+
cancellationKey: id,
83+
});
84+
}
85+
},
86+
},
87+
});
88+
89+
if (env.ADMIN_WORKER_ENABLED === "true") {
90+
logger.debug(
91+
`👨‍🏭 Starting admin worker at host ${env.ADMIN_WORKER_REDIS_HOST}, pollInterval = ${env.ADMIN_WORKER_POLL_INTERVAL}, immediatePollInterval = ${env.ADMIN_WORKER_IMMEDIATE_POLL_INTERVAL}, workers = ${env.ADMIN_WORKER_CONCURRENCY_WORKERS}, tasksPerWorker = ${env.ADMIN_WORKER_CONCURRENCY_TASKS_PER_WORKER}, concurrencyLimit = ${env.ADMIN_WORKER_CONCURRENCY_LIMIT}`
92+
);
93+
94+
worker.start();
95+
}
96+
97+
return worker;
98+
}
99+
100+
export const adminWorker = singleton("adminWorker", initializeWorker);

0 commit comments

Comments
 (0)