diff --git a/apps/webapp/app/services/runsReplicationService.server.ts b/apps/webapp/app/services/runsReplicationService.server.ts index aeaea7a046..60badb2ebc 100644 --- a/apps/webapp/app/services/runsReplicationService.server.ts +++ b/apps/webapp/app/services/runsReplicationService.server.ts @@ -1,5 +1,5 @@ import type { ClickHouse, RawTaskRunPayloadV1, TaskRunV2 } from "@internal/clickhouse"; -import { RedisOptions } from "@internal/redis"; +import { type RedisOptions } from "@internal/redis"; import { LogicalReplicationClient, type MessageDelete, @@ -8,14 +8,13 @@ import { type PgoutputMessage, } from "@internal/replication"; import { recordSpanError, startSpan, trace, type Tracer } from "@internal/tracing"; -import { Logger, LogLevel } from "@trigger.dev/core/logger"; +import { Logger, type LogLevel } from "@trigger.dev/core/logger"; import { tryCatch } from "@trigger.dev/core/utils"; import { parsePacketAsJson } from "@trigger.dev/core/v3/utils/ioSerialization"; -import { TaskRun } from "@trigger.dev/database"; +import { type TaskRun } from "@trigger.dev/database"; import { nanoid } from "nanoid"; import EventEmitter from "node:events"; import pLimit from "p-limit"; -import { logger } from "./logger.server"; import { detectBadJsonStrings } from "~/utils/detectBadJsonStrings"; interface TransactionEvent { @@ -64,6 +63,9 @@ type TaskRunInsert = { _version: bigint; run: TaskRun; event: "insert" | "update export type RunsReplicationServiceEvents = { message: [{ lsn: string; message: PgoutputMessage; service: RunsReplicationService }]; + batchFlushed: [ + { flushId: string; taskRunInserts: TaskRunV2[]; payloadInserts: RawTaskRunPayloadV1[] } + ]; }; export class RunsReplicationService { @@ -130,6 +132,29 @@ export class RunsReplicationService { flushInterval: options.flushIntervalMs ?? 100, maxConcurrency: options.maxFlushConcurrency ?? 100, callback: this.#flushBatch.bind(this), + // we can do some pre-merging to reduce the amount of data we need to send to clickhouse + mergeBatch: (existingBatch: TaskRunInsert[], newBatch: TaskRunInsert[]) => { + const merged = new Map(); + + for (const item of existingBatch) { + const key = `${item.event}_${item.run.id}`; + merged.set(key, item); + } + + for (const item of newBatch) { + const key = `${item.event}_${item.run.id}`; + const existingItem = merged.get(key); + + // Keep the run with the higher version (latest) + // and take the last occurrence for that version. + // Items originating from the same DB transaction have the same version. + if (!existingItem || item._version >= existingItem._version) { + merged.set(key, item); + } + } + + return Array.from(merged.values()); + }, logger: new Logger("ConcurrentFlushScheduler", options.logLevel ?? "info"), tracer: options.tracer, }); @@ -467,11 +492,33 @@ export class RunsReplicationService { const taskRunInserts = preparedInserts .map(({ taskRunInsert }) => taskRunInsert) - .filter(Boolean); + .filter(Boolean) + // batch inserts in clickhouse are more performant if the items + // are pre-sorted by the primary key + .sort((a, b) => { + if (a.organization_id !== b.organization_id) { + return a.organization_id < b.organization_id ? -1 : 1; + } + if (a.project_id !== b.project_id) { + return a.project_id < b.project_id ? -1 : 1; + } + if (a.environment_id !== b.environment_id) { + return a.environment_id < b.environment_id ? -1 : 1; + } + if (a.created_at !== b.created_at) { + return a.created_at - b.created_at; + } + return a.run_id < b.run_id ? -1 : 1; + }); const payloadInserts = preparedInserts .map(({ payloadInsert }) => payloadInsert) - .filter(Boolean); + .filter(Boolean) + // batch inserts in clickhouse are more performant if the items + // are pre-sorted by the primary key + .sort((a, b) => { + return a.run_id < b.run_id ? -1 : 1; + }); span.setAttribute("task_run_inserts", taskRunInserts.length); span.setAttribute("payload_inserts", payloadInserts.length); @@ -519,6 +566,8 @@ export class RunsReplicationService { taskRunInserts: taskRunInserts.length, payloadInserts: payloadInserts.length, }); + + this.events.emit("batchFlushed", { flushId, taskRunInserts, payloadInserts }); }); } @@ -825,12 +874,13 @@ export type ConcurrentFlushSchedulerConfig = { flushInterval: number; maxConcurrency?: number; callback: (flushId: string, batch: T[]) => Promise; + mergeBatch?: (existingBatch: T[], newBatch: T[]) => T[]; tracer?: Tracer; logger?: Logger; }; export class ConcurrentFlushScheduler { - private currentBatch: T[]; // Adjust the type according to your data structure + private currentBatch: T[]; private readonly BATCH_SIZE: number; private readonly flushInterval: number; private readonly MAX_CONCURRENCY: number; @@ -855,7 +905,10 @@ export class ConcurrentFlushScheduler { } addToBatch(items: T[]): void { - this.currentBatch = this.currentBatch.concat(items); + this.currentBatch = this.config.mergeBatch + ? this.config.mergeBatch(this.currentBatch, items) + : this.currentBatch.concat(items); + this.#flushNextBatchIfNeeded(); } diff --git a/apps/webapp/test/runsReplicationService.part2.test.ts b/apps/webapp/test/runsReplicationService.part2.test.ts index cb04867f93..e08b579738 100644 --- a/apps/webapp/test/runsReplicationService.part2.test.ts +++ b/apps/webapp/test/runsReplicationService.part2.test.ts @@ -754,11 +754,6 @@ describe("RunsReplicationService (part 2/2)", () => { expect(queryError).toBeNull(); expect(result?.length).toBe(10); - console.log("Data", { - runsData, - result, - }); - // Check a few random runs for correctness for (let i = 0; i < 9; i++) { const expected = runsData[i]; @@ -783,4 +778,343 @@ describe("RunsReplicationService (part 2/2)", () => { await runsReplicationService.stop(); } ); + + containerTest( + "should merge duplicate event+run.id combinations keeping the latest version", + async ({ clickhouseContainer, redisOptions, postgresContainer, prisma }) => { + await prisma.$executeRawUnsafe(`ALTER TABLE public.\"TaskRun\" REPLICA IDENTITY FULL;`); + + const clickhouse = new ClickHouse({ + url: clickhouseContainer.getConnectionUrl(), + name: "runs-replication-merge-batch", + }); + + const runsReplicationService = new RunsReplicationService({ + clickhouse, + pgConnectionUrl: postgresContainer.getConnectionUri(), + serviceName: "runs-replication-merge-batch", + slotName: "task_runs_to_clickhouse_v1", + publicationName: "task_runs_to_clickhouse_v1_publication", + redisOptions, + maxFlushConcurrency: 1, + flushIntervalMs: 100, + flushBatchSize: 10, // Higher batch size to test merging + leaderLockTimeoutMs: 5000, + leaderLockExtendIntervalMs: 1000, + ackIntervalSeconds: 5, + logger: new Logger("runs-replication-merge-batch", "info"), + }); + + // Listen to batchFlushed events to verify merging + const batchFlushedEvents: Array<{ + flushId: string; + taskRunInserts: any[]; + payloadInserts: any[]; + }> = []; + + runsReplicationService.events.on("batchFlushed", (event) => { + batchFlushedEvents.push(event); + }); + + await runsReplicationService.start(); + + const organization = await prisma.organization.create({ + data: { + title: "test-merge-batch", + slug: "test-merge-batch", + }, + }); + + const project = await prisma.project.create({ + data: { + name: "test-merge-batch", + slug: "test-merge-batch", + organizationId: organization.id, + externalRef: "test-merge-batch", + }, + }); + + const runtimeEnvironment = await prisma.runtimeEnvironment.create({ + data: { + slug: "test-merge-batch", + type: "DEVELOPMENT", + projectId: project.id, + organizationId: organization.id, + apiKey: "test-merge-batch", + pkApiKey: "test-merge-batch", + shortcode: "test-merge-batch", + }, + }); + + // Create a run and rapidly update it multiple times in a transaction + // This should create multiple events for the same run that get merged + const run = await prisma.taskRun.create({ + data: { + friendlyId: `run_merge_${Date.now()}`, + taskIdentifier: "my-task-merge", + payload: JSON.stringify({ version: 1 }), + payloadType: "application/json", + traceId: `merge-${Date.now()}`, + spanId: `merge-${Date.now()}`, + queue: "test-merge-batch", + runtimeEnvironmentId: runtimeEnvironment.id, + projectId: project.id, + organizationId: organization.id, + environmentType: "DEVELOPMENT", + engine: "V2", + status: "PENDING_VERSION", + }, + }); + await prisma.taskRun.update({ + where: { id: run.id }, + data: { status: "DEQUEUED" }, + }); + await prisma.taskRun.update({ + where: { id: run.id }, + data: { status: "EXECUTING" }, + }); + await prisma.taskRun.update({ + where: { id: run.id }, + data: { status: "PAUSED" }, + }); + await prisma.taskRun.update({ + where: { id: run.id }, + data: { status: "EXECUTING" }, + }); + await prisma.taskRun.update({ + where: { id: run.id }, + data: { status: "COMPLETED_SUCCESSFULLY" }, + }); + + await setTimeout(1000); + + expect(batchFlushedEvents?.[0].taskRunInserts).toHaveLength(2); + expect(batchFlushedEvents?.[0].taskRunInserts[0]).toEqual( + expect.objectContaining({ + run_id: run.id, + status: "PENDING_VERSION", + }) + ); + expect(batchFlushedEvents?.[0].taskRunInserts[1]).toEqual( + expect.objectContaining({ + run_id: run.id, + status: "COMPLETED_SUCCESSFULLY", + }) + ); + + await runsReplicationService.stop(); + } + ); + + containerTest( + "should sort batch inserts according to table schema ordering for optimal performance", + async ({ clickhouseContainer, redisOptions, postgresContainer, prisma }) => { + await prisma.$executeRawUnsafe(`ALTER TABLE public.\"TaskRun\" REPLICA IDENTITY FULL;`); + + const clickhouse = new ClickHouse({ + url: clickhouseContainer.getConnectionUrl(), + name: "runs-replication-sorting", + }); + + const runsReplicationService = new RunsReplicationService({ + clickhouse, + pgConnectionUrl: postgresContainer.getConnectionUri(), + serviceName: "runs-replication-sorting", + slotName: "task_runs_to_clickhouse_v1", + publicationName: "task_runs_to_clickhouse_v1_publication", + redisOptions, + maxFlushConcurrency: 1, + flushIntervalMs: 100, + flushBatchSize: 10, + leaderLockTimeoutMs: 5000, + leaderLockExtendIntervalMs: 1000, + ackIntervalSeconds: 5, + logger: new Logger("runs-replication-sorting", "info"), + }); + + // Listen to batchFlushed events to verify sorting + const batchFlushedEvents: Array<{ + flushId: string; + taskRunInserts: any[]; + payloadInserts: any[]; + }> = []; + + runsReplicationService.events.on("batchFlushed", (event) => { + batchFlushedEvents.push(event); + }); + + await runsReplicationService.start(); + + // Create two organizations to test sorting by organization_id + const org1 = await prisma.organization.create({ + data: { title: "org-z", slug: "org-z" }, + }); + + const org2 = await prisma.organization.create({ + data: { title: "org-a", slug: "org-a" }, + }); + + const project1 = await prisma.project.create({ + data: { + name: "test-sorting-z", + slug: "test-sorting-z", + organizationId: org1.id, + externalRef: "test-sorting-z", + }, + }); + + const project2 = await prisma.project.create({ + data: { + name: "test-sorting-a", + slug: "test-sorting-a", + organizationId: org2.id, + externalRef: "test-sorting-a", + }, + }); + + const env1 = await prisma.runtimeEnvironment.create({ + data: { + slug: "test-sorting-z", + type: "DEVELOPMENT", + projectId: project1.id, + organizationId: org1.id, + apiKey: "test-sorting-z", + pkApiKey: "test-sorting-z", + shortcode: "test-sorting-z", + }, + }); + + const env2 = await prisma.runtimeEnvironment.create({ + data: { + slug: "test-sorting-a", + type: "DEVELOPMENT", + projectId: project2.id, + organizationId: org2.id, + apiKey: "test-sorting-a", + pkApiKey: "test-sorting-a", + shortcode: "test-sorting-a", + }, + }); + + const now = Date.now(); + + const run1 = await prisma.taskRun.create({ + data: { + friendlyId: `run_sort_org_z_${now}`, + taskIdentifier: "my-task-sort", + payload: JSON.stringify({ org: "z" }), + payloadType: "application/json", + traceId: `sort-z-${now}`, + spanId: `sort-z-${now}`, + queue: "test-sorting", + runtimeEnvironmentId: env1.id, + projectId: project1.id, + organizationId: org1.id, + environmentType: "DEVELOPMENT", + engine: "V2", + status: "PENDING", + createdAt: new Date(now + 2000), + }, + }); + await prisma.taskRun.update({ + where: { id: run1.id }, + data: { status: "DEQUEUED" }, + }); + + await prisma.taskRun.create({ + data: { + friendlyId: `run_sort_org_a_${now}`, + taskIdentifier: "my-task-sort", + payload: JSON.stringify({ org: "a" }), + payloadType: "application/json", + traceId: `sort-a-${now}`, + spanId: `sort-a-${now}`, + queue: "test-sorting", + runtimeEnvironmentId: env2.id, + projectId: project2.id, + organizationId: org2.id, + environmentType: "DEVELOPMENT", + engine: "V2", + status: "PENDING", + createdAt: new Date(now + 1000), + }, + }); + + await prisma.taskRun.create({ + data: { + friendlyId: `run_sort_org_a_${now}_2`, + taskIdentifier: "my-task-sort", + payload: JSON.stringify({ org: "a" }), + payloadType: "application/json", + traceId: `sort-a-${now}`, + spanId: `sort-a-${now}`, + queue: "test-sorting", + runtimeEnvironmentId: env2.id, + projectId: project2.id, + organizationId: org2.id, + environmentType: "DEVELOPMENT", + engine: "V2", + status: "PENDING", + createdAt: new Date(now), + }, + }); + + await setTimeout(1000); + + expect(batchFlushedEvents[0]?.taskRunInserts.length).toBeGreaterThan(1); + expect(batchFlushedEvents[0]?.payloadInserts.length).toBeGreaterThan(1); + + // Verify sorting order: organization_id, project_id, environment_id, created_at, run_id + for (let i = 1; i < batchFlushedEvents[0]?.taskRunInserts.length; i++) { + const prev = batchFlushedEvents[0]?.taskRunInserts[i - 1]; + const curr = batchFlushedEvents[0]?.taskRunInserts[i]; + + const prevKey = [ + prev.organization_id, + prev.project_id, + prev.environment_id, + prev.created_at, + prev.run_id, + ]; + const currKey = [ + curr.organization_id, + curr.project_id, + curr.environment_id, + curr.created_at, + curr.run_id, + ]; + + const keysAreEqual = prevKey.every((val, idx) => val === currKey[idx]); + if (keysAreEqual) { + // Also valid order + continue; + } + + // Compare tuples lexicographically + let isCorrectOrder = false; + for (let j = 0; j < prevKey.length; j++) { + if (prevKey[j] < currKey[j]) { + isCorrectOrder = true; + break; + } + if (prevKey[j] > currKey[j]) { + isCorrectOrder = false; + break; + } + // If equal, continue to next field + } + + expect(isCorrectOrder).toBeTruthy(); + } + + // Verify payloadInserts are also sorted by run_id + for (let i = 1; i < batchFlushedEvents[0]?.payloadInserts.length; i++) { + const prev = batchFlushedEvents[0]?.payloadInserts[i - 1]; + const curr = batchFlushedEvents[0]?.payloadInserts[i]; + expect(prev.run_id <= curr.run_id).toBeTruthy(); + } + + await runsReplicationService.stop(); + } + ); });