From 7aedb96f77b7d693d62a25f6fbbeed88fa36a867 Mon Sep 17 00:00:00 2001 From: myftija Date: Thu, 24 Jul 2025 09:43:44 +0200 Subject: [PATCH 1/6] Premerge run batch before sending it to clickhouse --- .../services/runsReplicationService.server.ts | 28 +++++++++++++++++-- 1 file changed, 26 insertions(+), 2 deletions(-) diff --git a/apps/webapp/app/services/runsReplicationService.server.ts b/apps/webapp/app/services/runsReplicationService.server.ts index aeaea7a046..9d276db167 100644 --- a/apps/webapp/app/services/runsReplicationService.server.ts +++ b/apps/webapp/app/services/runsReplicationService.server.ts @@ -130,6 +130,26 @@ export class RunsReplicationService { flushInterval: options.flushIntervalMs ?? 100, maxConcurrency: options.maxFlushConcurrency ?? 100, callback: this.#flushBatch.bind(this), + mergeBatch: (existingBatch: TaskRunInsert[], newBatch: TaskRunInsert[]) => { + const merged = new Map(); + + for (const item of existingBatch) { + const key = `${item.event}_${item.run.id}`; + merged.set(key, item); + } + + for (const item of newBatch) { + const key = `${item.event}_${item.run.id}`; + const existingItem = merged.get(key); + + // keep the run with the higher version (latest) + if (!existingItem || item._version > existingItem._version) { + merged.set(key, item); + } + } + + return Array.from(merged.values()); + }, logger: new Logger("ConcurrentFlushScheduler", options.logLevel ?? "info"), tracer: options.tracer, }); @@ -825,12 +845,13 @@ export type ConcurrentFlushSchedulerConfig = { flushInterval: number; maxConcurrency?: number; callback: (flushId: string, batch: T[]) => Promise; + mergeBatch?: (existingBatch: T[], newBatch: T[]) => T[]; tracer?: Tracer; logger?: Logger; }; export class ConcurrentFlushScheduler { - private currentBatch: T[]; // Adjust the type according to your data structure + private currentBatch: T[]; private readonly BATCH_SIZE: number; private readonly flushInterval: number; private readonly MAX_CONCURRENCY: number; @@ -855,7 +876,10 @@ export class ConcurrentFlushScheduler { } addToBatch(items: T[]): void { - this.currentBatch = this.currentBatch.concat(items); + this.currentBatch = this.config.mergeBatch + ? this.config.mergeBatch(this.currentBatch, items) + : this.currentBatch.concat(items); + this.#flushNextBatchIfNeeded(); } From cf24a09aa69040829fd1d3edd2e1a475fd44e693 Mon Sep 17 00:00:00 2001 From: myftija Date: Thu, 24 Jul 2025 09:52:21 +0200 Subject: [PATCH 2/6] Pre-order batch items before sending to clickhouse in favor of performance --- .../services/runsReplicationService.server.ts | 34 +++++++++++++++---- 1 file changed, 28 insertions(+), 6 deletions(-) diff --git a/apps/webapp/app/services/runsReplicationService.server.ts b/apps/webapp/app/services/runsReplicationService.server.ts index 9d276db167..4e76995846 100644 --- a/apps/webapp/app/services/runsReplicationService.server.ts +++ b/apps/webapp/app/services/runsReplicationService.server.ts @@ -1,5 +1,5 @@ import type { ClickHouse, RawTaskRunPayloadV1, TaskRunV2 } from "@internal/clickhouse"; -import { RedisOptions } from "@internal/redis"; +import { type RedisOptions } from "@internal/redis"; import { LogicalReplicationClient, type MessageDelete, @@ -8,14 +8,13 @@ import { type PgoutputMessage, } from "@internal/replication"; import { recordSpanError, startSpan, trace, type Tracer } from "@internal/tracing"; -import { Logger, LogLevel } from "@trigger.dev/core/logger"; +import { Logger, type LogLevel } from "@trigger.dev/core/logger"; import { tryCatch } from "@trigger.dev/core/utils"; import { parsePacketAsJson } from "@trigger.dev/core/v3/utils/ioSerialization"; -import { TaskRun } from "@trigger.dev/database"; +import { type TaskRun } from "@trigger.dev/database"; import { nanoid } from "nanoid"; import EventEmitter from "node:events"; import pLimit from "p-limit"; -import { logger } from "./logger.server"; import { detectBadJsonStrings } from "~/utils/detectBadJsonStrings"; interface TransactionEvent { @@ -130,6 +129,7 @@ export class RunsReplicationService { flushInterval: options.flushIntervalMs ?? 100, maxConcurrency: options.maxFlushConcurrency ?? 100, callback: this.#flushBatch.bind(this), + // we can do some pre-merging to reduce the amount of data we need to send to clickhouse mergeBatch: (existingBatch: TaskRunInsert[], newBatch: TaskRunInsert[]) => { const merged = new Map(); @@ -487,11 +487,33 @@ export class RunsReplicationService { const taskRunInserts = preparedInserts .map(({ taskRunInsert }) => taskRunInsert) - .filter(Boolean); + .filter(Boolean) + // batch inserts in clickhouse are more performant if the items + // are pre-sorted by the primary key + .sort((a, b) => { + if (a.organization_id !== b.organization_id) { + return a.organization_id < b.organization_id ? -1 : 1; + } + if (a.project_id !== b.project_id) { + return a.project_id < b.project_id ? -1 : 1; + } + if (a.environment_id !== b.environment_id) { + return a.environment_id < b.environment_id ? -1 : 1; + } + if (a.created_at !== b.created_at) { + return a.created_at - b.created_at; + } + return a.run_id < b.run_id ? -1 : 1; + }); const payloadInserts = preparedInserts .map(({ payloadInsert }) => payloadInsert) - .filter(Boolean); + .filter(Boolean) + // batch inserts in clickhouse are more performant if the items + // are pre-sorted by the primary key + .sort((a, b) => { + return a.run_id < b.run_id ? -1 : 1; + }); span.setAttribute("task_run_inserts", taskRunInserts.length); span.setAttribute("payload_inserts", payloadInserts.length); From fdaae622d5a4416151fa1384afaf0b6288c9e3e4 Mon Sep 17 00:00:00 2001 From: myftija Date: Thu, 24 Jul 2025 10:48:49 +0200 Subject: [PATCH 3/6] existing --- .../test/runsReplicationService.part2.test.ts | 304 ++++++++++++++++++ 1 file changed, 304 insertions(+) diff --git a/apps/webapp/test/runsReplicationService.part2.test.ts b/apps/webapp/test/runsReplicationService.part2.test.ts index cb04867f93..8bf89c1f3c 100644 --- a/apps/webapp/test/runsReplicationService.part2.test.ts +++ b/apps/webapp/test/runsReplicationService.part2.test.ts @@ -783,4 +783,308 @@ describe("RunsReplicationService (part 2/2)", () => { await runsReplicationService.stop(); } ); + + containerTest( + "should merge duplicate event+run.id combinations keeping the latest version", + async ({ clickhouseContainer, redisOptions, postgresContainer, prisma }) => { + await prisma.$executeRawUnsafe(`ALTER TABLE public.\"TaskRun\" REPLICA IDENTITY FULL;`); + + const clickhouse = new ClickHouse({ + url: clickhouseContainer.getConnectionUrl(), + name: "runs-replication-merge-batch", + }); + + const runsReplicationService = new RunsReplicationService({ + clickhouse, + pgConnectionUrl: postgresContainer.getConnectionUri(), + serviceName: "runs-replication-merge-batch", + slotName: "task_runs_to_clickhouse_v1", + publicationName: "task_runs_to_clickhouse_v1_publication", + redisOptions, + maxFlushConcurrency: 1, + flushIntervalMs: 100, + flushBatchSize: 10, // Higher batch size to test merging + leaderLockTimeoutMs: 5000, + leaderLockExtendIntervalMs: 1000, + ackIntervalSeconds: 5, + logger: new Logger("runs-replication-merge-batch", "info"), + }); + + await runsReplicationService.start(); + + const organization = await prisma.organization.create({ + data: { + title: "test-merge-batch", + slug: "test-merge-batch", + }, + }); + + const project = await prisma.project.create({ + data: { + name: "test-merge-batch", + slug: "test-merge-batch", + organizationId: organization.id, + externalRef: "test-merge-batch", + }, + }); + + const runtimeEnvironment = await prisma.runtimeEnvironment.create({ + data: { + slug: "test-merge-batch", + type: "DEVELOPMENT", + projectId: project.id, + organizationId: organization.id, + apiKey: "test-merge-batch", + pkApiKey: "test-merge-batch", + shortcode: "test-merge-batch", + }, + }); + + // Create a run and rapidly update it multiple times in a transaction + // This should create multiple events for the same run that get merged + const [taskRun] = await prisma.$transaction(async (tx) => { + const run = await tx.taskRun.create({ + data: { + friendlyId: `run_merge_${Date.now()}`, + taskIdentifier: "my-task-merge", + payload: JSON.stringify({ version: 1 }), + payloadType: "application/json", + traceId: `merge-${Date.now()}`, + spanId: `merge-${Date.now()}`, + queue: "test-merge-batch", + runtimeEnvironmentId: runtimeEnvironment.id, + projectId: project.id, + organizationId: organization.id, + environmentType: "DEVELOPMENT", + engine: "V2", + status: "PENDING", + }, + }); + + await tx.taskRun.update({ + where: { id: run.id }, + data: { status: "EXECUTING", payload: JSON.stringify({ version: 2 }) }, + }); + + await tx.taskRun.update({ + where: { id: run.id }, + data: { status: "COMPLETED_SUCCESSFULLY", payload: JSON.stringify({ version: 3 }) }, + }); + + return [run]; + }); + + // Wait for replication + await setTimeout(2000); + + // Query ClickHouse for the run using FINAL + const queryRuns = clickhouse.reader.query({ + name: "runs-replication-merge-batch", + query: "SELECT * FROM trigger_dev.task_runs_v2 FINAL WHERE run_id = {run_id:String}", + schema: z.any(), + params: z.object({ run_id: z.string() }), + }); + + const [queryError, result] = await queryRuns({ run_id: taskRun.id }); + + expect(queryError).toBeNull(); + expect(result?.length).toBe(1); + + // Should have the final status from the last update + expect(result?.[0]).toEqual( + expect.objectContaining({ + run_id: taskRun.id, + status: "COMPLETED_SUCCESSFULLY", + }) + ); + + // Check payload was also updated to latest version + const queryPayloads = clickhouse.reader.query({ + name: "runs-replication-merge-batch", + query: "SELECT * FROM trigger_dev.raw_task_runs_payload_v1 WHERE run_id = {run_id:String}", + schema: z.any(), + params: z.object({ run_id: z.string() }), + }); + + const [payloadError, payloadResult] = await queryPayloads({ run_id: taskRun.id }); + + expect(payloadError).toBeNull(); + expect(payloadResult?.length).toBe(1); + expect(payloadResult?.[0]).toEqual( + expect.objectContaining({ + run_id: taskRun.id, + payload: expect.objectContaining({ + data: { version: 3 }, + }), + }) + ); + + await runsReplicationService.stop(); + } + ); + + containerTest( + "should sort batch inserts according to table schema ordering for optimal performance", + async ({ clickhouseContainer, redisOptions, postgresContainer, prisma }) => { + await prisma.$executeRawUnsafe(`ALTER TABLE public.\"TaskRun\" REPLICA IDENTITY FULL;`); + + const clickhouse = new ClickHouse({ + url: clickhouseContainer.getConnectionUrl(), + name: "runs-replication-sorting", + }); + + const runsReplicationService = new RunsReplicationService({ + clickhouse, + pgConnectionUrl: postgresContainer.getConnectionUri(), + serviceName: "runs-replication-sorting", + slotName: "task_runs_to_clickhouse_v1", + publicationName: "task_runs_to_clickhouse_v1_publication", + redisOptions, + maxFlushConcurrency: 1, + flushIntervalMs: 100, + flushBatchSize: 10, + leaderLockTimeoutMs: 5000, + leaderLockExtendIntervalMs: 1000, + ackIntervalSeconds: 5, + logger: new Logger("runs-replication-sorting", "info"), + }); + + await runsReplicationService.start(); + + // Create two organizations to test sorting by organization_id + const org1 = await prisma.organization.create({ + data: { title: "org-z", slug: "org-z" }, + }); + + const org2 = await prisma.organization.create({ + data: { title: "org-a", slug: "org-a" }, + }); + + const project1 = await prisma.project.create({ + data: { + name: "test-sorting-z", + slug: "test-sorting-z", + organizationId: org1.id, + externalRef: "test-sorting-z", + }, + }); + + const project2 = await prisma.project.create({ + data: { + name: "test-sorting-a", + slug: "test-sorting-a", + organizationId: org2.id, + externalRef: "test-sorting-a", + }, + }); + + const env1 = await prisma.runtimeEnvironment.create({ + data: { + slug: "test-sorting-z", + type: "DEVELOPMENT", + projectId: project1.id, + organizationId: org1.id, + apiKey: "test-sorting-z", + pkApiKey: "test-sorting-z", + shortcode: "test-sorting-z", + }, + }); + + const env2 = await prisma.runtimeEnvironment.create({ + data: { + slug: "test-sorting-a", + type: "DEVELOPMENT", + projectId: project2.id, + organizationId: org2.id, + apiKey: "test-sorting-a", + pkApiKey: "test-sorting-a", + shortcode: "test-sorting-a", + }, + }); + + const now = Date.now(); + + // Create runs in reverse alphabetical order by organization + // The sorting should put org2 (org-a) before org1 (org-z) + const [run1, run2] = await prisma.$transaction(async (tx) => { + const run1 = await tx.taskRun.create({ + data: { + friendlyId: `run_sort_org_z_${now}`, + taskIdentifier: "my-task-sort", + payload: JSON.stringify({ org: "z" }), + payloadType: "application/json", + traceId: `sort-z-${now}`, + spanId: `sort-z-${now}`, + queue: "test-sorting", + runtimeEnvironmentId: env1.id, + projectId: project1.id, + organizationId: org1.id, + environmentType: "DEVELOPMENT", + engine: "V2", + status: "PENDING", + createdAt: new Date(now + 100), // Later timestamp + }, + }); + + const run2 = await tx.taskRun.create({ + data: { + friendlyId: `run_sort_org_a_${now}`, + taskIdentifier: "my-task-sort", + payload: JSON.stringify({ org: "a" }), + payloadType: "application/json", + traceId: `sort-a-${now}`, + spanId: `sort-a-${now}`, + queue: "test-sorting", + runtimeEnvironmentId: env2.id, + projectId: project2.id, + organizationId: org2.id, + environmentType: "DEVELOPMENT", + engine: "V2", + status: "PENDING", + createdAt: new Date(now), // Earlier timestamp + }, + }); + + return [run1, run2]; + }); + + // Wait for replication + await setTimeout(2000); + + // Query ClickHouse for both runs + const queryRuns = clickhouse.reader.query({ + name: "runs-replication-sorting", + query: `SELECT run_id, organization_id, project_id, environment_id, created_at, friendly_id + FROM trigger_dev.task_runs_v2 FINAL + WHERE run_id IN ({run_id_1:String}, {run_id_2:String}) + ORDER BY organization_id, project_id, environment_id, created_at, run_id`, + schema: z.any(), + params: z.object({ run_id_1: z.string(), run_id_2: z.string() }), + }); + + const [queryError, result] = await queryRuns({ run_id_1: run1.id, run_id_2: run2.id }); + + expect(queryError).toBeNull(); + expect(result?.length).toBe(2); + + // Due to sorting, org2 (org-a) should come first even though it was created second + expect(result?.[0]).toEqual( + expect.objectContaining({ + run_id: run2.id, + organization_id: org2.id, + friendly_id: `run_sort_org_a_${now}`, + }) + ); + + expect(result?.[1]).toEqual( + expect.objectContaining({ + run_id: run1.id, + organization_id: org1.id, + friendly_id: `run_sort_org_z_${now}`, + }) + ); + + await runsReplicationService.stop(); + } + ); }); From 3400d80aae92c57107b701d5a527e7ce967200a9 Mon Sep 17 00:00:00 2001 From: myftija Date: Thu, 24 Jul 2025 14:31:19 +0200 Subject: [PATCH 4/6] Emit event on batch flushes --- apps/webapp/app/services/runsReplicationService.server.ts | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/apps/webapp/app/services/runsReplicationService.server.ts b/apps/webapp/app/services/runsReplicationService.server.ts index 4e76995846..8803d19dbd 100644 --- a/apps/webapp/app/services/runsReplicationService.server.ts +++ b/apps/webapp/app/services/runsReplicationService.server.ts @@ -63,6 +63,9 @@ type TaskRunInsert = { _version: bigint; run: TaskRun; event: "insert" | "update export type RunsReplicationServiceEvents = { message: [{ lsn: string; message: PgoutputMessage; service: RunsReplicationService }]; + batchFlushed: [ + { flushId: string; taskRunInserts: TaskRunV2[]; payloadInserts: RawTaskRunPayloadV1[] } + ]; }; export class RunsReplicationService { @@ -561,6 +564,8 @@ export class RunsReplicationService { taskRunInserts: taskRunInserts.length, payloadInserts: payloadInserts.length, }); + + this.events.emit("batchFlushed", { flushId, taskRunInserts, payloadInserts }); }); } From 2f565c30446c84730bca812a8119e481cb14ba95 Mon Sep 17 00:00:00 2001 From: myftija Date: Thu, 24 Jul 2025 14:31:58 +0200 Subject: [PATCH 5/6] When merging batches, keep the last occurrence items with the same version --- apps/webapp/app/services/runsReplicationService.server.ts | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/apps/webapp/app/services/runsReplicationService.server.ts b/apps/webapp/app/services/runsReplicationService.server.ts index 8803d19dbd..60badb2ebc 100644 --- a/apps/webapp/app/services/runsReplicationService.server.ts +++ b/apps/webapp/app/services/runsReplicationService.server.ts @@ -145,8 +145,10 @@ export class RunsReplicationService { const key = `${item.event}_${item.run.id}`; const existingItem = merged.get(key); - // keep the run with the higher version (latest) - if (!existingItem || item._version > existingItem._version) { + // Keep the run with the higher version (latest) + // and take the last occurrence for that version. + // Items originating from the same DB transaction have the same version. + if (!existingItem || item._version >= existingItem._version) { merged.set(key, item); } } From 7293c10d17a8c675d68324098c5d298ae55f373a Mon Sep 17 00:00:00 2001 From: myftija Date: Thu, 24 Jul 2025 14:32:17 +0200 Subject: [PATCH 6/6] Add a couple of tests --- .../test/runsReplicationService.part2.test.ts | 316 ++++++++++-------- 1 file changed, 173 insertions(+), 143 deletions(-) diff --git a/apps/webapp/test/runsReplicationService.part2.test.ts b/apps/webapp/test/runsReplicationService.part2.test.ts index 8bf89c1f3c..e08b579738 100644 --- a/apps/webapp/test/runsReplicationService.part2.test.ts +++ b/apps/webapp/test/runsReplicationService.part2.test.ts @@ -754,11 +754,6 @@ describe("RunsReplicationService (part 2/2)", () => { expect(queryError).toBeNull(); expect(result?.length).toBe(10); - console.log("Data", { - runsData, - result, - }); - // Check a few random runs for correctness for (let i = 0; i < 9; i++) { const expected = runsData[i]; @@ -810,6 +805,17 @@ describe("RunsReplicationService (part 2/2)", () => { logger: new Logger("runs-replication-merge-batch", "info"), }); + // Listen to batchFlushed events to verify merging + const batchFlushedEvents: Array<{ + flushId: string; + taskRunInserts: any[]; + payloadInserts: any[]; + }> = []; + + runsReplicationService.events.on("batchFlushed", (event) => { + batchFlushedEvents.push(event); + }); + await runsReplicationService.start(); const organization = await prisma.organization.create({ @@ -842,80 +848,57 @@ describe("RunsReplicationService (part 2/2)", () => { // Create a run and rapidly update it multiple times in a transaction // This should create multiple events for the same run that get merged - const [taskRun] = await prisma.$transaction(async (tx) => { - const run = await tx.taskRun.create({ - data: { - friendlyId: `run_merge_${Date.now()}`, - taskIdentifier: "my-task-merge", - payload: JSON.stringify({ version: 1 }), - payloadType: "application/json", - traceId: `merge-${Date.now()}`, - spanId: `merge-${Date.now()}`, - queue: "test-merge-batch", - runtimeEnvironmentId: runtimeEnvironment.id, - projectId: project.id, - organizationId: organization.id, - environmentType: "DEVELOPMENT", - engine: "V2", - status: "PENDING", - }, - }); - - await tx.taskRun.update({ - where: { id: run.id }, - data: { status: "EXECUTING", payload: JSON.stringify({ version: 2 }) }, - }); - - await tx.taskRun.update({ - where: { id: run.id }, - data: { status: "COMPLETED_SUCCESSFULLY", payload: JSON.stringify({ version: 3 }) }, - }); - - return [run]; + const run = await prisma.taskRun.create({ + data: { + friendlyId: `run_merge_${Date.now()}`, + taskIdentifier: "my-task-merge", + payload: JSON.stringify({ version: 1 }), + payloadType: "application/json", + traceId: `merge-${Date.now()}`, + spanId: `merge-${Date.now()}`, + queue: "test-merge-batch", + runtimeEnvironmentId: runtimeEnvironment.id, + projectId: project.id, + organizationId: organization.id, + environmentType: "DEVELOPMENT", + engine: "V2", + status: "PENDING_VERSION", + }, }); - - // Wait for replication - await setTimeout(2000); - - // Query ClickHouse for the run using FINAL - const queryRuns = clickhouse.reader.query({ - name: "runs-replication-merge-batch", - query: "SELECT * FROM trigger_dev.task_runs_v2 FINAL WHERE run_id = {run_id:String}", - schema: z.any(), - params: z.object({ run_id: z.string() }), + await prisma.taskRun.update({ + where: { id: run.id }, + data: { status: "DEQUEUED" }, + }); + await prisma.taskRun.update({ + where: { id: run.id }, + data: { status: "EXECUTING" }, + }); + await prisma.taskRun.update({ + where: { id: run.id }, + data: { status: "PAUSED" }, + }); + await prisma.taskRun.update({ + where: { id: run.id }, + data: { status: "EXECUTING" }, + }); + await prisma.taskRun.update({ + where: { id: run.id }, + data: { status: "COMPLETED_SUCCESSFULLY" }, }); - const [queryError, result] = await queryRuns({ run_id: taskRun.id }); - - expect(queryError).toBeNull(); - expect(result?.length).toBe(1); + await setTimeout(1000); - // Should have the final status from the last update - expect(result?.[0]).toEqual( + expect(batchFlushedEvents?.[0].taskRunInserts).toHaveLength(2); + expect(batchFlushedEvents?.[0].taskRunInserts[0]).toEqual( expect.objectContaining({ - run_id: taskRun.id, - status: "COMPLETED_SUCCESSFULLY", + run_id: run.id, + status: "PENDING_VERSION", }) ); - - // Check payload was also updated to latest version - const queryPayloads = clickhouse.reader.query({ - name: "runs-replication-merge-batch", - query: "SELECT * FROM trigger_dev.raw_task_runs_payload_v1 WHERE run_id = {run_id:String}", - schema: z.any(), - params: z.object({ run_id: z.string() }), - }); - - const [payloadError, payloadResult] = await queryPayloads({ run_id: taskRun.id }); - - expect(payloadError).toBeNull(); - expect(payloadResult?.length).toBe(1); - expect(payloadResult?.[0]).toEqual( + expect(batchFlushedEvents?.[0].taskRunInserts[1]).toEqual( expect.objectContaining({ - run_id: taskRun.id, - payload: expect.objectContaining({ - data: { version: 3 }, - }), + run_id: run.id, + status: "COMPLETED_SUCCESSFULLY", }) ); @@ -949,6 +932,17 @@ describe("RunsReplicationService (part 2/2)", () => { logger: new Logger("runs-replication-sorting", "info"), }); + // Listen to batchFlushed events to verify sorting + const batchFlushedEvents: Array<{ + flushId: string; + taskRunInserts: any[]; + payloadInserts: any[]; + }> = []; + + runsReplicationService.events.on("batchFlushed", (event) => { + batchFlushedEvents.push(event); + }); + await runsReplicationService.start(); // Create two organizations to test sorting by organization_id @@ -1004,85 +998,121 @@ describe("RunsReplicationService (part 2/2)", () => { const now = Date.now(); - // Create runs in reverse alphabetical order by organization - // The sorting should put org2 (org-a) before org1 (org-z) - const [run1, run2] = await prisma.$transaction(async (tx) => { - const run1 = await tx.taskRun.create({ - data: { - friendlyId: `run_sort_org_z_${now}`, - taskIdentifier: "my-task-sort", - payload: JSON.stringify({ org: "z" }), - payloadType: "application/json", - traceId: `sort-z-${now}`, - spanId: `sort-z-${now}`, - queue: "test-sorting", - runtimeEnvironmentId: env1.id, - projectId: project1.id, - organizationId: org1.id, - environmentType: "DEVELOPMENT", - engine: "V2", - status: "PENDING", - createdAt: new Date(now + 100), // Later timestamp - }, - }); - - const run2 = await tx.taskRun.create({ - data: { - friendlyId: `run_sort_org_a_${now}`, - taskIdentifier: "my-task-sort", - payload: JSON.stringify({ org: "a" }), - payloadType: "application/json", - traceId: `sort-a-${now}`, - spanId: `sort-a-${now}`, - queue: "test-sorting", - runtimeEnvironmentId: env2.id, - projectId: project2.id, - organizationId: org2.id, - environmentType: "DEVELOPMENT", - engine: "V2", - status: "PENDING", - createdAt: new Date(now), // Earlier timestamp - }, - }); - - return [run1, run2]; + const run1 = await prisma.taskRun.create({ + data: { + friendlyId: `run_sort_org_z_${now}`, + taskIdentifier: "my-task-sort", + payload: JSON.stringify({ org: "z" }), + payloadType: "application/json", + traceId: `sort-z-${now}`, + spanId: `sort-z-${now}`, + queue: "test-sorting", + runtimeEnvironmentId: env1.id, + projectId: project1.id, + organizationId: org1.id, + environmentType: "DEVELOPMENT", + engine: "V2", + status: "PENDING", + createdAt: new Date(now + 2000), + }, + }); + await prisma.taskRun.update({ + where: { id: run1.id }, + data: { status: "DEQUEUED" }, }); - // Wait for replication - await setTimeout(2000); - - // Query ClickHouse for both runs - const queryRuns = clickhouse.reader.query({ - name: "runs-replication-sorting", - query: `SELECT run_id, organization_id, project_id, environment_id, created_at, friendly_id - FROM trigger_dev.task_runs_v2 FINAL - WHERE run_id IN ({run_id_1:String}, {run_id_2:String}) - ORDER BY organization_id, project_id, environment_id, created_at, run_id`, - schema: z.any(), - params: z.object({ run_id_1: z.string(), run_id_2: z.string() }), + await prisma.taskRun.create({ + data: { + friendlyId: `run_sort_org_a_${now}`, + taskIdentifier: "my-task-sort", + payload: JSON.stringify({ org: "a" }), + payloadType: "application/json", + traceId: `sort-a-${now}`, + spanId: `sort-a-${now}`, + queue: "test-sorting", + runtimeEnvironmentId: env2.id, + projectId: project2.id, + organizationId: org2.id, + environmentType: "DEVELOPMENT", + engine: "V2", + status: "PENDING", + createdAt: new Date(now + 1000), + }, }); - const [queryError, result] = await queryRuns({ run_id_1: run1.id, run_id_2: run2.id }); + await prisma.taskRun.create({ + data: { + friendlyId: `run_sort_org_a_${now}_2`, + taskIdentifier: "my-task-sort", + payload: JSON.stringify({ org: "a" }), + payloadType: "application/json", + traceId: `sort-a-${now}`, + spanId: `sort-a-${now}`, + queue: "test-sorting", + runtimeEnvironmentId: env2.id, + projectId: project2.id, + organizationId: org2.id, + environmentType: "DEVELOPMENT", + engine: "V2", + status: "PENDING", + createdAt: new Date(now), + }, + }); - expect(queryError).toBeNull(); - expect(result?.length).toBe(2); + await setTimeout(1000); - // Due to sorting, org2 (org-a) should come first even though it was created second - expect(result?.[0]).toEqual( - expect.objectContaining({ - run_id: run2.id, - organization_id: org2.id, - friendly_id: `run_sort_org_a_${now}`, - }) - ); + expect(batchFlushedEvents[0]?.taskRunInserts.length).toBeGreaterThan(1); + expect(batchFlushedEvents[0]?.payloadInserts.length).toBeGreaterThan(1); + + // Verify sorting order: organization_id, project_id, environment_id, created_at, run_id + for (let i = 1; i < batchFlushedEvents[0]?.taskRunInserts.length; i++) { + const prev = batchFlushedEvents[0]?.taskRunInserts[i - 1]; + const curr = batchFlushedEvents[0]?.taskRunInserts[i]; + + const prevKey = [ + prev.organization_id, + prev.project_id, + prev.environment_id, + prev.created_at, + prev.run_id, + ]; + const currKey = [ + curr.organization_id, + curr.project_id, + curr.environment_id, + curr.created_at, + curr.run_id, + ]; + + const keysAreEqual = prevKey.every((val, idx) => val === currKey[idx]); + if (keysAreEqual) { + // Also valid order + continue; + } + + // Compare tuples lexicographically + let isCorrectOrder = false; + for (let j = 0; j < prevKey.length; j++) { + if (prevKey[j] < currKey[j]) { + isCorrectOrder = true; + break; + } + if (prevKey[j] > currKey[j]) { + isCorrectOrder = false; + break; + } + // If equal, continue to next field + } + + expect(isCorrectOrder).toBeTruthy(); + } - expect(result?.[1]).toEqual( - expect.objectContaining({ - run_id: run1.id, - organization_id: org1.id, - friendly_id: `run_sort_org_z_${now}`, - }) - ); + // Verify payloadInserts are also sorted by run_id + for (let i = 1; i < batchFlushedEvents[0]?.payloadInserts.length; i++) { + const prev = batchFlushedEvents[0]?.payloadInserts[i - 1]; + const curr = batchFlushedEvents[0]?.payloadInserts[i]; + expect(prev.run_id <= curr.run_id).toBeTruthy(); + } await runsReplicationService.stop(); }