Skip to content

Commit 9453373

Browse files
committed
fix: simplify event buffer
1 parent 49a4f5b commit 9453373

File tree

8 files changed

+785
-1866
lines changed

8 files changed

+785
-1866
lines changed

packages/db/src/buffers/event-buffer.test.ts

Lines changed: 160 additions & 565 deletions
Large diffs are not rendered by default.

packages/db/src/buffers/event-buffer.ts

Lines changed: 101 additions & 812 deletions
Large diffs are not rendered by default.

packages/db/src/clickhouse/client.ts

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -56,14 +56,15 @@ export const TABLE_NAMES = {
5656
event_property_values_mv: 'event_property_values_mv',
5757
cohort_events_mv: 'cohort_events_mv',
5858
sessions: 'sessions',
59+
events_imports: 'events_imports',
5960
};
6061

6162
export const CLICKHOUSE_OPTIONS: NodeClickHouseClientConfigOptions = {
6263
max_open_connections: 30,
63-
request_timeout: 60000,
64+
request_timeout: 300000,
6465
keep_alive: {
6566
enabled: true,
66-
idle_socket_ttl: 8000,
67+
idle_socket_ttl: 60000,
6768
},
6869
compression: {
6970
request: true,
@@ -132,7 +133,27 @@ export const ch = new Proxy(originalCh, {
132133
const value = Reflect.get(target, property, receiver);
133134

134135
if (property === 'insert') {
135-
return (...args: any[]) => withRetry(() => value.apply(target, args));
136+
return (...args: any[]) =>
137+
withRetry(() => {
138+
args[0].clickhouse_settings = {
139+
// Allow bigger HTTP payloads/time to stream rows
140+
async_insert: 1,
141+
wait_for_async_insert: 1,
142+
// Increase insert timeouts and buffer sizes for large batches
143+
max_execution_time: 300,
144+
max_insert_block_size: '500000',
145+
max_http_get_redirects: '0',
146+
// Ensure JSONEachRow stays efficient
147+
input_format_parallel_parsing: 1,
148+
// Keep long-running inserts/queries from idling out at proxies by sending progress headers
149+
send_progress_in_http_headers: 1,
150+
http_headers_progress_interval_ms: '50000',
151+
// Ensure server holds the connection until the query is finished
152+
wait_end_of_query: 1,
153+
...args[0].clickhouse_settings,
154+
};
155+
return value.apply(target, args);
156+
});
136157
}
137158

138159
return value;

packages/db/src/services/event.service.ts

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -510,7 +510,7 @@ export async function getEventList(options: GetEventListOptions) {
510510
sb.select.model = 'model';
511511
}
512512
if (select.duration) {
513-
sb.select.duration = 'duration';
513+
sb.select.duration = `${getDurationSql()} as duration`;
514514
}
515515
if (select.path) {
516516
sb.select.path = 'path';
@@ -570,7 +570,7 @@ export async function getEventList(options: GetEventListOptions) {
570570
custom(sb);
571571
}
572572

573-
console.log('getSql()', getSql());
573+
console.log('getSql() ----> ', getSql());
574574

575575
const data = await getEvents(getSql(), {
576576
profile: select.profile ?? true,
@@ -1016,3 +1016,6 @@ class EventService {
10161016
}
10171017

10181018
export const eventService = new EventService(ch);
1019+
1020+
export const getDurationSql = (field = 'created_at') =>
1021+
`dateDiff('millisecond', ${field}, leadInFrame(toNullable(${field}), 1, NULL) OVER (PARTITION BY session_id ORDER BY ${field} ASC ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING))`;

0 commit comments

Comments
 (0)