Skip to content

Commit 177f59d

Browse files
committed
Move more of graphile worker jobs into Redis Worker (workerQueue -> commonWorker)
1 parent 1c640f2 commit 177f59d

16 files changed

+384
-345
lines changed

apps/webapp/app/runEngine/services/batchTrigger.server.ts

Lines changed: 58 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ import { env } from "~/env.server";
1414
import { AuthenticatedEnvironment } from "~/services/apiAuth.server";
1515
import { logger } from "~/services/logger.server";
1616
import { getEntitlement } from "~/services/platform.v3.server";
17-
import { workerQueue } from "~/services/worker.server";
17+
import { commonWorker } from "~/v3/commonWorker.server";
1818
import { downloadPacketFromObjectStore, uploadPacketToObjectStore } from "../../v3/r2.server";
1919
import { ServiceValidationError, WithRunEngine } from "../../v3/services/baseService.server";
2020
import { OutOfEntitlementError, TriggerTaskService } from "../../v3/services/triggerTask.server";
@@ -244,88 +244,80 @@ export class RunEngineBatchTriggerService extends WithRunEngine {
244244
}
245245
}
246246
} else {
247-
return await $transaction(this._prisma, async (tx) => {
248-
const batch = await tx.batchTaskRun.create({
249-
data: {
250-
id: BatchId.fromFriendlyId(batchId),
251-
friendlyId: batchId,
252-
runtimeEnvironmentId: environment.id,
253-
runCount: body.items.length,
254-
runIds: [],
255-
payload: payloadPacket.data,
256-
payloadType: payloadPacket.dataType,
257-
options,
258-
batchVersion: "runengine:v1",
259-
oneTimeUseToken: options.oneTimeUseToken,
260-
},
247+
const batch = await this._prisma.batchTaskRun.create({
248+
data: {
249+
id: BatchId.fromFriendlyId(batchId),
250+
friendlyId: batchId,
251+
runtimeEnvironmentId: environment.id,
252+
runCount: body.items.length,
253+
runIds: [],
254+
payload: payloadPacket.data,
255+
payloadType: payloadPacket.dataType,
256+
options,
257+
batchVersion: "runengine:v1",
258+
oneTimeUseToken: options.oneTimeUseToken,
259+
},
260+
});
261+
262+
if (body.parentRunId && body.resumeParentOnCompletion) {
263+
await this._engine.blockRunWithCreatedBatch({
264+
runId: RunId.fromFriendlyId(body.parentRunId),
265+
batchId: batch.id,
266+
environmentId: environment.id,
267+
projectId: environment.projectId,
268+
organizationId: environment.organizationId,
261269
});
270+
}
262271

263-
if (body.parentRunId && body.resumeParentOnCompletion) {
264-
await this._engine.blockRunWithCreatedBatch({
265-
runId: RunId.fromFriendlyId(body.parentRunId),
272+
switch (this._batchProcessingStrategy) {
273+
case "sequential": {
274+
await this.#enqueueBatchTaskRun({
266275
batchId: batch.id,
267-
environmentId: environment.id,
268-
projectId: environment.projectId,
269-
organizationId: environment.organizationId,
270-
tx,
276+
processingId: batchId,
277+
range: { start: 0, count: PROCESSING_BATCH_SIZE },
278+
attemptCount: 0,
279+
strategy: this._batchProcessingStrategy,
280+
parentRunId: body.parentRunId,
281+
resumeParentOnCompletion: body.resumeParentOnCompletion,
271282
});
272-
}
273283

274-
switch (this._batchProcessingStrategy) {
275-
case "sequential": {
276-
await this.#enqueueBatchTaskRun(
277-
{
284+
break;
285+
}
286+
case "parallel": {
287+
const ranges = Array.from({
288+
length: Math.ceil(body.items.length / PROCESSING_BATCH_SIZE),
289+
}).map((_, index) => ({
290+
start: index * PROCESSING_BATCH_SIZE,
291+
count: PROCESSING_BATCH_SIZE,
292+
}));
293+
294+
await Promise.all(
295+
ranges.map((range, index) =>
296+
this.#enqueueBatchTaskRun({
278297
batchId: batch.id,
279-
processingId: batchId,
280-
range: { start: 0, count: PROCESSING_BATCH_SIZE },
298+
processingId: `${index}`,
299+
range,
281300
attemptCount: 0,
282301
strategy: this._batchProcessingStrategy,
283302
parentRunId: body.parentRunId,
284303
resumeParentOnCompletion: body.resumeParentOnCompletion,
285-
},
286-
tx
287-
);
288-
289-
break;
290-
}
291-
case "parallel": {
292-
const ranges = Array.from({
293-
length: Math.ceil(body.items.length / PROCESSING_BATCH_SIZE),
294-
}).map((_, index) => ({
295-
start: index * PROCESSING_BATCH_SIZE,
296-
count: PROCESSING_BATCH_SIZE,
297-
}));
298-
299-
await Promise.all(
300-
ranges.map((range, index) =>
301-
this.#enqueueBatchTaskRun(
302-
{
303-
batchId: batch.id,
304-
processingId: `${index}`,
305-
range,
306-
attemptCount: 0,
307-
strategy: this._batchProcessingStrategy,
308-
parentRunId: body.parentRunId,
309-
resumeParentOnCompletion: body.resumeParentOnCompletion,
310-
},
311-
tx
312-
)
313-
)
314-
);
304+
})
305+
)
306+
);
315307

316-
break;
317-
}
308+
break;
318309
}
310+
}
319311

320-
return batch;
321-
});
312+
return batch;
322313
}
323314
}
324315

325316
async #enqueueBatchTaskRun(options: BatchProcessingOptions, tx?: PrismaClientOrTransaction) {
326-
await workerQueue.enqueue("runengine.processBatchTaskRun", options, {
327-
tx,
328-
jobKey: `RunEngineBatchTriggerService.process:${options.batchId}:${options.processingId}`,
317+
await commonWorker.enqueue({
318+
id: `RunEngineBatchTriggerService.process:${options.batchId}:${options.processingId}`,
319+
job: "runengine.processBatchTaskRun",
320+
payload: options,
329321
});
330322
}
331323

apps/webapp/app/services/email.server.ts

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import type { SendEmailOptions } from "remix-auth-email-link";
44
import { redirect } from "remix-typedjson";
55
import { env } from "~/env.server";
66
import type { AuthUser } from "./authUser";
7-
import { workerQueue } from "./worker.server";
7+
import { commonWorker } from "~/v3/commonWorker.server";
88
import { logger } from "./logger.server";
99
import { singleton } from "~/utils/singleton";
1010
import { assertEmailAllowed } from "~/utils/email";
@@ -93,8 +93,12 @@ export async function sendPlainTextEmail(options: SendPlainTextOptions) {
9393
}
9494

9595
export async function scheduleEmail(data: DeliverEmail, delay?: { seconds: number }) {
96-
const runAt = delay ? new Date(Date.now() + delay.seconds * 1000) : undefined;
97-
await workerQueue.enqueue("scheduleEmail", data, { runAt });
96+
const availableAt = delay ? new Date(Date.now() + delay.seconds * 1000) : undefined;
97+
await commonWorker.enqueue({
98+
job: "scheduleEmail",
99+
payload: data,
100+
availableAt,
101+
});
98102
}
99103

100104
export async function sendEmail(data: DeliverEmail) {

0 commit comments

Comments
 (0)