Skip to content

Commit cf24a09

Browse files
committed
Pre-order batch items before sending to clickhouse in favor of performance
1 parent 7aedb96 commit cf24a09

File tree

1 file changed

+28
-6
lines changed

1 file changed

+28
-6
lines changed

apps/webapp/app/services/runsReplicationService.server.ts

Lines changed: 28 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import type { ClickHouse, RawTaskRunPayloadV1, TaskRunV2 } from "@internal/clickhouse";
2-
import { RedisOptions } from "@internal/redis";
2+
import { type RedisOptions } from "@internal/redis";
33
import {
44
LogicalReplicationClient,
55
type MessageDelete,
@@ -8,14 +8,13 @@ import {
88
type PgoutputMessage,
99
} from "@internal/replication";
1010
import { recordSpanError, startSpan, trace, type Tracer } from "@internal/tracing";
11-
import { Logger, LogLevel } from "@trigger.dev/core/logger";
11+
import { Logger, type LogLevel } from "@trigger.dev/core/logger";
1212
import { tryCatch } from "@trigger.dev/core/utils";
1313
import { parsePacketAsJson } from "@trigger.dev/core/v3/utils/ioSerialization";
14-
import { TaskRun } from "@trigger.dev/database";
14+
import { type TaskRun } from "@trigger.dev/database";
1515
import { nanoid } from "nanoid";
1616
import EventEmitter from "node:events";
1717
import pLimit from "p-limit";
18-
import { logger } from "./logger.server";
1918
import { detectBadJsonStrings } from "~/utils/detectBadJsonStrings";
2019

2120
interface TransactionEvent<T = any> {
@@ -130,6 +129,7 @@ export class RunsReplicationService {
130129
flushInterval: options.flushIntervalMs ?? 100,
131130
maxConcurrency: options.maxFlushConcurrency ?? 100,
132131
callback: this.#flushBatch.bind(this),
132+
// we can do some pre-merging to reduce the amount of data we need to send to clickhouse
133133
mergeBatch: (existingBatch: TaskRunInsert[], newBatch: TaskRunInsert[]) => {
134134
const merged = new Map<string, TaskRunInsert>();
135135

@@ -487,11 +487,33 @@ export class RunsReplicationService {
487487

488488
const taskRunInserts = preparedInserts
489489
.map(({ taskRunInsert }) => taskRunInsert)
490-
.filter(Boolean);
490+
.filter(Boolean)
491+
// batch inserts in clickhouse are more performant if the items
492+
// are pre-sorted by the primary key
493+
.sort((a, b) => {
494+
if (a.organization_id !== b.organization_id) {
495+
return a.organization_id < b.organization_id ? -1 : 1;
496+
}
497+
if (a.project_id !== b.project_id) {
498+
return a.project_id < b.project_id ? -1 : 1;
499+
}
500+
if (a.environment_id !== b.environment_id) {
501+
return a.environment_id < b.environment_id ? -1 : 1;
502+
}
503+
if (a.created_at !== b.created_at) {
504+
return a.created_at - b.created_at;
505+
}
506+
return a.run_id < b.run_id ? -1 : 1;
507+
});
491508

492509
const payloadInserts = preparedInserts
493510
.map(({ payloadInsert }) => payloadInsert)
494-
.filter(Boolean);
511+
.filter(Boolean)
512+
// batch inserts in clickhouse are more performant if the items
513+
// are pre-sorted by the primary key
514+
.sort((a, b) => {
515+
return a.run_id < b.run_id ? -1 : 1;
516+
});
495517

496518
span.setAttribute("task_run_inserts", taskRunInserts.length);
497519
span.setAttribute("payload_inserts", payloadInserts.length);

0 commit comments

Comments
 (0)