diff --git a/packages/db/src/buffers/event-buffer.test.ts b/packages/db/src/buffers/event-buffer.test.ts index 0df5b4ae..9634b38c 100644 --- a/packages/db/src/buffers/event-buffer.test.ts +++ b/packages/db/src/buffers/event-buffer.test.ts @@ -1,13 +1,5 @@ import { getRedisCache } from '@openpanel/redis'; -import { - afterAll, - beforeAll, - beforeEach, - describe, - expect, - it, - vi, -} from 'vitest'; +import { afterAll, beforeEach, describe, expect, it, vi } from 'vitest'; import { ch } from '../clickhouse/client'; // Mock transformEvent to avoid circular dependency with buffers -> services -> buffers @@ -68,8 +60,8 @@ describe('EventBuffer with real Redis', () => { eventBuffer = new EventBuffer(); }); - it('keeps a single screen_view pending until a subsequent event arrives', async () => { - const screenView = { + it('adds events to buffer', async () => { + const event = { project_id: 'p1', profile_id: 'u1', session_id: 'session_a', @@ -77,13 +69,10 @@ describe('EventBuffer with real Redis', () => { created_at: new Date().toISOString(), } as any; - await eventBuffer.add(screenView); - - // Not eligible for processing yet (only 1 event in session) - await eventBuffer.processBuffer(); + await eventBuffer.add(event); - const sessionKey = `event_buffer:session:${screenView.session_id}`; - const events = await redis.lrange(sessionKey, 0, -1); + const bufferKey = 'event-buffer'; + const events = await redis.lrange(bufferKey, 0, -1); expect(events.length).toBe(1); expect(JSON.parse(events[0]!)).toMatchObject({ session_id: 'session_a', @@ -91,25 +80,20 @@ describe('EventBuffer with real Redis', () => { }); }); - it('processes two screen_view events and leaves only the last one pending', async () => { - const t0 = Date.now(); - const first = { + it('processes events from buffer and inserts into ClickHouse', async () => { + const event1 = { project_id: 'p1', - profile_id: 'u1', - session_id: 'session_b', - name: 'screen_view', - created_at: new Date(t0).toISOString(), + name: 'event1', + created_at: new Date().toISOString(), } as any; - const second = { + const event2 = { project_id: 'p1', - profile_id: 'u1', - session_id: 'session_b', - name: 'screen_view', - created_at: new Date(t0 + 1000).toISOString(), + name: 'event2', + created_at: new Date(Date.now() + 1000).toISOString(), } as any; - await eventBuffer.add(first); - await eventBuffer.add(second); + await eventBuffer.add(event1); + await eventBuffer.add(event2); const insertSpy = vi .spyOn(ch, 'insert') @@ -117,248 +101,123 @@ describe('EventBuffer with real Redis', () => { await eventBuffer.processBuffer(); - // First screen_view should be flushed to ClickHouse, second should remain pending in Redis - expect(insertSpy).toHaveBeenCalledWith({ - format: 'JSONEachRow', - table: 'events', - values: [ - { - ...first, - duration: 1000, - }, - ], - }); + // Both events should be flushed to ClickHouse + expect(insertSpy).toHaveBeenCalled(); + const insertCall = insertSpy.mock.calls[0]![0] as any; + expect(insertCall.format).toBe('JSONEachRow'); + expect(insertCall.table).toBe('events'); + expect(insertCall.values).toHaveLength(2); + expect(insertCall.values?.[0]).toMatchObject({ name: 'event1' }); + expect(insertCall.values?.[1]).toMatchObject({ name: 'event2' }); + + // Buffer should be empty after processing + const bufferKey = 'event-buffer'; + const storedEvents = await redis.lrange(bufferKey, 0, -1); + expect(storedEvents.length).toBe(0); - const sessionKey = `event_buffer:session:${first.session_id}`; - const storedEvents = await redis.lrange(sessionKey, 0, -1); - expect(storedEvents.length).toBe(1); - const remaining = JSON.parse(storedEvents[0]!); - expect(remaining).toMatchObject({ - session_id: 'session_b', - name: 'screen_view', - created_at: second.created_at, - }); + insertSpy.mockRestore(); }); - it('clears session when a session_end event arrives', async () => { - const t0 = Date.now(); - const first = { + it('sorts events by creation time before inserting', async () => { + const event1 = { project_id: 'p1', - profile_id: 'u1', - session_id: 'session_c', - name: 'screen_view', - created_at: new Date(t0).toISOString(), + name: 'event1', + created_at: new Date(Date.now() + 2000).toISOString(), } as any; - const end = { + const event2 = { project_id: 'p1', - profile_id: 'u1', - session_id: 'session_c', - name: 'session_end', - created_at: new Date(t0 + 1000).toISOString(), + name: 'event2', + created_at: new Date(Date.now() + 1000).toISOString(), } as any; - - await eventBuffer.add(first); - await eventBuffer.add(end); - - const insertSpy = vi - .spyOn(ch, 'insert') - .mockResolvedValue(undefined as any); - - await eventBuffer.processBuffer(); - - // Both events should be flushed, leaving no pending session events - expect(insertSpy).toHaveBeenCalledWith({ - format: 'JSONEachRow', - table: 'events', - values: [first, end], - }); - const sessionKey = `event_buffer:session:${first.session_id}`; - const storedEvents = await redis.lrange(sessionKey, 0, -1); - expect(storedEvents.length).toBe(0); - }); - - it('queues and processes non-session events in regular queue', async () => { - const event = { - project_id: 'p2', - name: 'custom_event', + const event3 = { + project_id: 'p1', + name: 'event3', created_at: new Date().toISOString(), } as any; - await eventBuffer.add(event); - - // Should be in regular queue - const regularQueueKey = 'event_buffer:regular_queue'; - expect(await redis.llen(regularQueueKey)).toBe(1); - - // Buffer counter should reflect outstanding = 1 - expect(await eventBuffer.getBufferSize()).toBe(1); + await eventBuffer.add(event1); + await eventBuffer.add(event2); + await eventBuffer.add(event3); const insertSpy = vi .spyOn(ch, 'insert') .mockResolvedValueOnce(undefined as any); + await eventBuffer.processBuffer(); - // Regular queue should be trimmed - expect(await redis.llen(regularQueueKey)).toBe(0); + // Events should be sorted by created_at expect(insertSpy).toHaveBeenCalled(); + const insertCall = insertSpy.mock.calls[0]![0] as any; + expect(insertCall.format).toBe('JSONEachRow'); + expect(insertCall.table).toBe('events'); + expect(insertCall.values).toHaveLength(3); + expect(insertCall.values[0]).toMatchObject({ name: 'event3' }); + expect(insertCall.values[1]).toMatchObject({ name: 'event2' }); + expect(insertCall.values[2]).toMatchObject({ name: 'event1' }); - // Buffer counter back to 0 - expect(await eventBuffer.getBufferSize()).toBe(0); + insertSpy.mockRestore(); }); - it('adds session to ready set at 2 events and removes it when < 2 events remain', async () => { - const s = 'session_ready'; - const e1 = { - project_id: 'p3', - profile_id: 'u3', - session_id: s, - name: 'screen_view', - created_at: new Date().toISOString(), - } as any; - const e2 = { - ...e1, - created_at: new Date(Date.now() + 1000).toISOString(), - } as any; - - await eventBuffer.add(e1); - - // One event -> not ready - expect(await redis.zscore('event_buffer:ready_sessions', s)).toBeNull(); - - await eventBuffer.add(e2); - - // Two events -> ready - expect(await redis.zscore('event_buffer:ready_sessions', s)).not.toBeNull(); - + it('handles empty buffer gracefully', async () => { const insertSpy = vi .spyOn(ch, 'insert') .mockResolvedValueOnce(undefined as any); + await eventBuffer.processBuffer(); - // After processing with one pending left, session should be REMOVED from ready set - // It will be re-added when the next event arrives - expect(await redis.zscore('event_buffer:ready_sessions', s)).toBeNull(); - expect(insertSpy).toHaveBeenCalled(); + // Should not insert anything + expect(insertSpy).not.toHaveBeenCalled(); - // But the session and its data should still exist - const sessionKey = `event_buffer:session:${s}`; - const remaining = await redis.lrange(sessionKey, 0, -1); - expect(remaining.length).toBe(1); // One pending event - expect( - await redis.zscore('event_buffer:sessions_sorted', s), - ).not.toBeNull(); // Still in sorted set + insertSpy.mockRestore(); }); - it('sets last screen_view key and clears it on session_end', async () => { - const projectId = 'p4'; - const profileId = 'u4'; - const sessionId = 'session_last'; - const lastKey = `session:last_screen_view:${projectId}:${profileId}`; - - const view = { - project_id: projectId, - profile_id: profileId, - session_id: sessionId, - name: 'screen_view', - created_at: new Date().toISOString(), - } as any; - - await eventBuffer.add(view); - - // Should be set in Redis - expect(await redis.get(lastKey)).not.toBeNull(); + it('respects EVENT_BUFFER_BATCH_SIZE', async () => { + const prev = process.env.EVENT_BUFFER_BATCH_SIZE; + process.env.EVENT_BUFFER_BATCH_SIZE = '2'; + const eb = new EventBuffer(); - const end = { - project_id: projectId, - profile_id: profileId, - session_id: sessionId, - name: 'session_end', - created_at: new Date(Date.now() + 1000).toISOString(), - } as any; + const events = Array.from({ length: 5 }, (_, i) => ({ + project_id: 'p1', + name: `event${i}`, + created_at: new Date(Date.now() + i * 100).toISOString(), + })); - await eventBuffer.add(end); + for (const event of events) { + await eb.add(event as any); + } const insertSpy = vi .spyOn(ch, 'insert') - .mockResolvedValueOnce(undefined as any); - await eventBuffer.processBuffer(); - - // Key should be deleted by session_end - expect(await redis.get(lastKey)).toBeNull(); - expect(insertSpy).toHaveBeenCalled(); - }); - - it('getLastScreenView works for profile and session queries', async () => { - const projectId = 'p5'; - const profileId = 'u5'; - const sessionId = 'session_glsv'; - - const view = { - project_id: projectId, - profile_id: profileId, - session_id: sessionId, - name: 'screen_view', - created_at: new Date().toISOString(), - } as any; - - await eventBuffer.add(view); - - const byProfile = await eventBuffer.getLastScreenView({ - projectId, - profileId, - }); - - if (!byProfile) { - throw new Error('byProfile is null'); - } - - expect(byProfile.name).toBe('screen_view'); - - const bySession = await eventBuffer.getLastScreenView({ - projectId, - sessionId, - }); - - if (!bySession) { - throw new Error('bySession is null'); - } - - expect(bySession.name).toBe('screen_view'); - }); + .mockResolvedValue(undefined as any); - it('buffer counter reflects pending after processing 2 screen_view events', async () => { - const sessionId = 'session_counter'; - const a = { - project_id: 'p6', - profile_id: 'u6', - session_id: sessionId, - name: 'screen_view', - created_at: new Date().toISOString(), - } as any; - const b = { - ...a, - created_at: new Date(Date.now() + 1000).toISOString(), - } as any; + await eb.processBuffer(); - await eventBuffer.add(a); - await eventBuffer.add(b); + // Only first 2 events should be processed (batchSize = 2) + expect(insertSpy).toHaveBeenCalledOnce(); + const insertCall = insertSpy.mock.calls[0]![0] as any; + expect(insertCall.format).toBe('JSONEachRow'); + expect(insertCall.table).toBe('events'); + expect(insertCall.values).toHaveLength(2); + expect(insertCall.values[0]).toMatchObject({ name: 'event0' }); + expect(insertCall.values[1]).toMatchObject({ name: 'event1' }); - // Counter counts enqueued items - expect(await eventBuffer.getBufferSize()).toBeGreaterThanOrEqual(2); + // 3 events should remain in buffer + const bufferKey = 'event-buffer'; + const remaining = await redis.lrange(bufferKey, 0, -1); + expect(remaining.length).toBe(3); - const insertSpy = vi - .spyOn(ch, 'insert') - .mockResolvedValueOnce(undefined as any); - await eventBuffer.processBuffer(); + // Restore env + if (prev === undefined) delete process.env.EVENT_BUFFER_BATCH_SIZE; + else process.env.EVENT_BUFFER_BATCH_SIZE = prev; - // One pending screen_view left -> counter should be 1 - expect(await eventBuffer.getBufferSize()).toBe(1); - expect(insertSpy).toHaveBeenCalled(); + insertSpy.mockRestore(); }); it('inserts in chunks according to EVENT_BUFFER_CHUNK_SIZE', async () => { const prev = process.env.EVENT_BUFFER_CHUNK_SIZE; + const prevBatch = process.env.EVENT_BUFFER_BATCH_SIZE; process.env.EVENT_BUFFER_CHUNK_SIZE = '1'; + process.env.EVENT_BUFFER_BATCH_SIZE = '10'; // High enough to not trigger auto-flush const eb = new EventBuffer(); const e1 = { @@ -372,398 +231,134 @@ describe('EventBuffer with real Redis', () => { created_at: new Date(Date.now() + 1).toISOString(), } as any; - await eb.add(e1); - await eb.add(e2); - const insertSpy = vi .spyOn(ch, 'insert') .mockResolvedValue(undefined as any); + await eb.add(e1); + await eb.add(e2); + await eb.processBuffer(); // With chunk size 1 and two events, insert should be called twice - expect(insertSpy.mock.calls.length).toBeGreaterThanOrEqual(2); + expect(insertSpy).toHaveBeenCalledTimes(2); // Restore env if (prev === undefined) delete process.env.EVENT_BUFFER_CHUNK_SIZE; else process.env.EVENT_BUFFER_CHUNK_SIZE = prev; + if (prevBatch === undefined) delete process.env.EVENT_BUFFER_BATCH_SIZE; + else process.env.EVENT_BUFFER_BATCH_SIZE = prevBatch; + + insertSpy.mockRestore(); }); - it('counts active visitors after adding an event with profile', async () => { - const e = { + it('tracks active visitors after adding an event with profile', async () => { + const event = { project_id: 'p7', profile_id: 'u7', name: 'custom', created_at: new Date().toISOString(), } as any; - await eventBuffer.add(e); + await eventBuffer.add(event); const count = await eventBuffer.getActiveVisitorCount('p7'); expect(count).toBeGreaterThanOrEqual(1); }); - it('batches pending session updates (respects cap) during processBuffer', async () => { - const prev = process.env.EVENT_BUFFER_UPDATE_PENDING_SESSIONS_BATCH_SIZE; - process.env.EVENT_BUFFER_UPDATE_PENDING_SESSIONS_BATCH_SIZE = '3'; + it('getBufferSize returns correct count', async () => { + const prev = process.env.EVENT_BUFFER_BATCH_SIZE; + process.env.EVENT_BUFFER_BATCH_SIZE = '1000'; // High enough to prevent auto-flush const eb = new EventBuffer(); - // Create many sessions each with 2 screen_view events → leaves 1 pending per session - const numSessions = 10; - const base = Date.now(); - - for (let i = 0; i < numSessions; i++) { - const sid = `batch_s_${i}`; - const e1 = { - project_id: 'p8', - profile_id: `u${i}`, - session_id: sid, - name: 'screen_view', - created_at: new Date(base + i * 10).toISOString(), - } as any; - const e2 = { - ...e1, - created_at: new Date(base + i * 10 + 1).toISOString(), - } as any; - await eb.add(e1); - await eb.add(e2); - } - - const insertSpy = vi - .spyOn(ch, 'insert') - .mockResolvedValue(undefined as any); - const evalSpy = vi.spyOn(redis as any, 'eval'); - - await eb.processBuffer(); - - // Only consider eval calls for batchUpdateSessionsScript (3 keys now: ready, sorted, counter) - const batchEvalCalls = evalSpy.mock.calls.filter( - (call) => call[1] === 3 && call[4] === 'event_buffer:total_count', - ); - - const expectedCalls = Math.ceil(numSessions / 3); - expect(batchEvalCalls.length).toBeGreaterThanOrEqual(expectedCalls); - - function countSessionsInEvalCall(args: any[]): number { - let idx = 5; // ARGV starts after: script, numKeys, key1, key2, key3 - let count = 0; - while (idx < args.length) { - if (idx + 3 >= args.length) break; - const pendingCount = Number.parseInt(String(args[idx + 3]), 10); - idx += 4 + Math.max(0, pendingCount); - count += 1; - } - return count; - } - - for (const call of batchEvalCalls) { - expect(call[1]).toBe(3); - expect(call[2]).toBe('event_buffer:ready_sessions'); - expect(call[3]).toBe('event_buffer:sessions_sorted'); - expect(call[4]).toBe('event_buffer:total_count'); - - const sessionsInThisCall = countSessionsInEvalCall(call.slice(0)); - expect(sessionsInThisCall).toBeLessThanOrEqual(3); - expect(sessionsInThisCall).toBeGreaterThan(0); - } - - expect(insertSpy).toHaveBeenCalled(); - - // Restore env - if (prev === undefined) - delete process.env.EVENT_BUFFER_UPDATE_PENDING_SESSIONS_BATCH_SIZE; - else process.env.EVENT_BUFFER_UPDATE_PENDING_SESSIONS_BATCH_SIZE = prev; - - evalSpy.mockRestore(); - insertSpy.mockRestore(); - }); - - it('flushes a lone session_end and clears the session list', async () => { - const s = 'session_only_end'; - const end = { - project_id: 'p9', - profile_id: 'u9', - session_id: s, - name: 'session_end', + const event1 = { + project_id: 'p1', + name: 'event1', + created_at: new Date().toISOString(), + } as any; + const event2 = { + project_id: 'p1', + name: 'event2', created_at: new Date().toISOString(), } as any; - const eb = new EventBuffer(); - await eb.add(end); + expect(await eb.getBufferSize()).toBe(0); - // Should be considered ready even though only 1 event (session_end) const insertSpy = vi .spyOn(ch, 'insert') - .mockResolvedValueOnce(undefined as any); - - await eb.processBuffer(); - - expect(insertSpy).toHaveBeenCalledWith({ - format: 'JSONEachRow', - table: 'events', - values: [end], - }); - - const sessionKey = `event_buffer:session:${s}`; - const remaining = await redis.lrange(sessionKey, 0, -1); - expect(remaining.length).toBe(0); - - insertSpy.mockRestore(); - }); - - it('flushes ALL screen_views when session_end arrives (no pending events)', async () => { - const t0 = Date.now(); - const s = 'session_multi_end'; - const view1 = { - project_id: 'p10', - profile_id: 'u10', - session_id: s, - name: 'screen_view', - created_at: new Date(t0).toISOString(), - } as any; - const view2 = { - ...view1, - created_at: new Date(t0 + 1000).toISOString(), - } as any; - const view3 = { - ...view1, - created_at: new Date(t0 + 2000).toISOString(), - } as any; - const end = { - ...view1, - name: 'session_end', - created_at: new Date(t0 + 3000).toISOString(), - } as any; + .mockResolvedValue(undefined as any); - const eb = new EventBuffer(); - await eb.add(view1); - await eb.add(view2); - await eb.add(view3); - await eb.add(end); + await eb.add(event1); + expect(await eb.getBufferSize()).toBe(1); - const insertSpy = vi - .spyOn(ch, 'insert') - .mockResolvedValueOnce(undefined as any); + await eb.add(event2); + expect(await eb.getBufferSize()).toBe(2); await eb.processBuffer(); - // All 4 events should be flushed (3 screen_views + session_end) - expect(insertSpy).toHaveBeenCalledWith({ - format: 'JSONEachRow', - table: 'events', - values: [view1, view2, view3, end], - }); - - // Session should be completely empty and removed - const sessionKey = `event_buffer:session:${s}`; - const remaining = await redis.lrange(sessionKey, 0, -1); - expect(remaining.length).toBe(0); + expect(await eb.getBufferSize()).toBe(0); - // Session should be removed from both sorted sets - expect(await redis.zscore('event_buffer:sessions_sorted', s)).toBeNull(); - expect(await redis.zscore('event_buffer:ready_sessions', s)).toBeNull(); + // Restore env + if (prev === undefined) delete process.env.EVENT_BUFFER_BATCH_SIZE; + else process.env.EVENT_BUFFER_BATCH_SIZE = prev; insertSpy.mockRestore(); }); - it('re-adds session to ready_sessions when new event arrives after processing', async () => { - const t0 = Date.now(); - const s = 'session_continued'; - const view1 = { - project_id: 'p11', - profile_id: 'u11', - session_id: s, - name: 'screen_view', - created_at: new Date(t0).toISOString(), - } as any; - const view2 = { - ...view1, - created_at: new Date(t0 + 1000).toISOString(), - } as any; - - const eb = new EventBuffer(); - await eb.add(view1); - await eb.add(view2); - - const insertSpy = vi - .spyOn(ch, 'insert') - .mockResolvedValue(undefined as any); - - // First processing: flush view1, keep view2 pending - await eb.processBuffer(); - - expect(insertSpy).toHaveBeenCalledWith({ - format: 'JSONEachRow', - table: 'events', - values: [{ ...view1, duration: 1000 }], - }); - - // Session should be REMOVED from ready_sessions (only 1 event left) - expect(await redis.zscore('event_buffer:ready_sessions', s)).toBeNull(); - - // Add a third screen_view - this should re-add to ready_sessions - const view3 = { - ...view1, - created_at: new Date(t0 + 2000).toISOString(), - } as any; - await eb.add(view3); - - // NOW it should be back in ready_sessions (2 events again) - expect(await redis.zscore('event_buffer:ready_sessions', s)).not.toBeNull(); - - insertSpy.mockClear(); - - // Second processing: should process view2 (now has duration), keep view3 pending - await eb.processBuffer(); - - expect(insertSpy).toHaveBeenCalledWith({ - format: 'JSONEachRow', - table: 'events', - values: [{ ...view2, duration: 1000 }], - }); - - // Session should be REMOVED again (only 1 event left) - expect(await redis.zscore('event_buffer:ready_sessions', s)).toBeNull(); + it('bulkAdd adds multiple events atomically', async () => { + const events = Array.from({ length: 3 }, (_, i) => ({ + project_id: 'p1', + name: `event${i}`, + created_at: new Date(Date.now() + i * 100).toISOString(), + })) as any[]; - const sessionKey = `event_buffer:session:${s}`; - const remaining = await redis.lrange(sessionKey, 0, -1); - expect(remaining.length).toBe(1); - expect(JSON.parse(remaining[0]!)).toMatchObject({ - session_id: s, - created_at: view3.created_at, - }); + await eventBuffer.bulkAdd(events); - insertSpy.mockRestore(); + const bufferKey = 'event-buffer'; + const storedEvents = await redis.lrange(bufferKey, 0, -1); + expect(storedEvents.length).toBe(3); }); - it('removes session from ready_sessions only when completely empty', async () => { - const t0 = Date.now(); - const s = 'session_complete'; - const view = { - project_id: 'p12', - profile_id: 'u12', - session_id: s, + it('handles events with all fields correctly', async () => { + const event = { + project_id: 'p1', + profile_id: 'u1', + session_id: 's1', + device_id: 'd1', name: 'screen_view', - created_at: new Date(t0).toISOString(), - } as any; - const end = { - ...view, - name: 'session_end', - created_at: new Date(t0 + 1000).toISOString(), + path: '/home', + origin: 'https://example.com', + duration: 5000, + properties: { key: 'value' }, + created_at: new Date().toISOString(), + country: 'US', + city: 'NYC', + region: 'NY', + os: 'macOS', + browser: 'Chrome', } as any; - const eb = new EventBuffer(); - await eb.add(view); - await eb.add(end); + await eventBuffer.add(event); const insertSpy = vi .spyOn(ch, 'insert') .mockResolvedValueOnce(undefined as any); - await eb.processBuffer(); + await eventBuffer.processBuffer(); - // Both events flushed, session empty - expect(insertSpy).toHaveBeenCalledWith({ - format: 'JSONEachRow', - table: 'events', - values: [view, end], + expect(insertSpy).toHaveBeenCalled(); + const insertCall = insertSpy.mock.calls[0]![0] as any; + expect(insertCall.format).toBe('JSONEachRow'); + expect(insertCall.table).toBe('events'); + expect(insertCall.values).toHaveLength(1); + expect(insertCall.values[0]).toMatchObject({ + project_id: 'p1', + profile_id: 'u1', + session_id: 's1', + name: 'screen_view', + path: '/home', }); - // NOW it should be removed from ready_sessions (because it's empty) - expect(await redis.zscore('event_buffer:ready_sessions', s)).toBeNull(); - expect(await redis.zscore('event_buffer:sessions_sorted', s)).toBeNull(); - insertSpy.mockRestore(); }); - - it('getBufferSizeHeavy correctly counts events across many sessions in batches', async () => { - const eb = new EventBuffer(); - const numSessions = 250; // More than batch size (100) to test batching - const eventsPerSession = 3; - const numRegularEvents = 50; - - // Add session events (3 events per session) - for (let i = 0; i < numSessions; i++) { - const sessionId = `batch_session_${i}`; - for (let j = 0; j < eventsPerSession; j++) { - await eb.add({ - project_id: 'p_batch', - profile_id: `u_${i}`, - session_id: sessionId, - name: 'screen_view', - created_at: new Date(Date.now() + i * 100 + j * 10).toISOString(), - } as any); - } - } - - // Add regular queue events - for (let i = 0; i < numRegularEvents; i++) { - await eb.add({ - project_id: 'p_batch', - name: 'custom_event', - created_at: new Date().toISOString(), - } as any); - } - - // Get buffer size using heavy method - const bufferSize = await eb.getBufferSizeHeavy(); - - // Should count all events: (250 sessions × 3 events) + 50 regular events - const expectedSize = numSessions * eventsPerSession + numRegularEvents; - expect(bufferSize).toBe(expectedSize); - - // Verify sessions are properly tracked - const sessionCount = await redis.zcard('event_buffer:sessions_sorted'); - expect(sessionCount).toBe(numSessions); - - const regularQueueCount = await redis.llen('event_buffer:regular_queue'); - expect(regularQueueCount).toBe(numRegularEvents); - }); - - it('getBufferSizeHeavy handles empty buffer correctly', async () => { - const eb = new EventBuffer(); - - const bufferSize = await eb.getBufferSizeHeavy(); - - expect(bufferSize).toBe(0); - }); - - it('getBufferSizeHeavy handles only regular queue events', async () => { - const eb = new EventBuffer(); - const numEvents = 10; - - for (let i = 0; i < numEvents; i++) { - await eb.add({ - project_id: 'p_regular', - name: 'custom_event', - created_at: new Date().toISOString(), - } as any); - } - - const bufferSize = await eb.getBufferSizeHeavy(); - - expect(bufferSize).toBe(numEvents); - }); - - it('getBufferSizeHeavy handles only session events', async () => { - const eb = new EventBuffer(); - const numSessions = 5; - const eventsPerSession = 2; - - for (let i = 0; i < numSessions; i++) { - for (let j = 0; j < eventsPerSession; j++) { - await eb.add({ - project_id: 'p_sessions', - profile_id: `u_${i}`, - session_id: `session_${i}`, - name: 'screen_view', - created_at: new Date(Date.now() + i * 100 + j * 10).toISOString(), - } as any); - } - } - - const bufferSize = await eb.getBufferSizeHeavy(); - - expect(bufferSize).toBe(numSessions * eventsPerSession); - }); }); diff --git a/packages/db/src/buffers/event-buffer.ts b/packages/db/src/buffers/event-buffer.ts index e6793ca7..9bd91981 100644 --- a/packages/db/src/buffers/event-buffer.ts +++ b/packages/db/src/buffers/event-buffer.ts @@ -4,7 +4,6 @@ import { getRedisCache, getRedisPub, publishEvent, - runEvery, } from '@openpanel/redis'; import { ch } from '../clickhouse/client'; import { @@ -14,389 +13,61 @@ import { } from '../services/event.service'; import { BaseBuffer } from './base-buffer'; -/** - * - * Usuful redis commands: - * --------------------- - * - * Add empty session - * ZADD event_buffer:sessions_sorted 1710831600000 "test_empty_session" - * - * Get session events - * LRANGE event_buffer:session:test_empty_session 0 -1 - * - * Get session events count - * LLEN event_buffer:session:test_empty_session - * - * Get regular queue events - * LRANGE event_buffer:regular_queue 0 -1 - * - * Get regular queue count - * LLEN event_buffer:regular_queue - * - */ - export class EventBuffer extends BaseBuffer { - // Configurable limits - // How many days to keep buffered session metadata before cleanup - private daysToKeep = process.env.EVENT_BUFFER_DAYS_TO_KEEP - ? Number.parseFloat(process.env.EVENT_BUFFER_DAYS_TO_KEEP) - : 3; - // How many events we attempt to FETCH per flush cycle (split across sessions/non-sessions) - // Prefer new env EVENT_BUFFER_BATCH_SIZE; fallback to legacy EVENT_BUFFER_BATCH_SIZE private batchSize = process.env.EVENT_BUFFER_BATCH_SIZE ? Number.parseInt(process.env.EVENT_BUFFER_BATCH_SIZE, 10) : 4000; - // How many events per insert chunk we send to ClickHouse (insert batch size) private chunkSize = process.env.EVENT_BUFFER_CHUNK_SIZE ? Number.parseInt(process.env.EVENT_BUFFER_CHUNK_SIZE, 10) : 1000; - private updatePendingSessionsBatchSize = process.env - .EVENT_BUFFER_UPDATE_PENDING_SESSIONS_BATCH_SIZE - ? Number.parseInt( - process.env.EVENT_BUFFER_UPDATE_PENDING_SESSIONS_BATCH_SIZE, - 10, - ) - : 300; - - // Cap of how many ready sessions to scan per flush cycle (configurable via env) - private maxSessionsPerFlush = process.env.EVENT_BUFFER_MAX_SESSIONS_PER_FLUSH - ? Number.parseInt(process.env.EVENT_BUFFER_MAX_SESSIONS_PER_FLUSH, 10) - : 500; - - // Soft time budget per flush (ms) to avoid long lock holds - private flushTimeBudgetMs = process.env.EVENT_BUFFER_FLUSH_TIME_BUDGET_MS - ? Number.parseInt(process.env.EVENT_BUFFER_FLUSH_TIME_BUDGET_MS, 10) - : 1000; - - private minEventsInSession = 2; private activeVisitorsExpiration = 60 * 5; // 5 minutes + private lastScreenViewTTL = 60 * 60; // 1 hour - private sessionEvents = ['screen_view', 'session_end']; - - // LIST - Stores events without sessions - private regularQueueKey = 'event_buffer:regular_queue'; - - // SORTED SET - Tracks all active session IDs with their timestamps - private sessionSortedKey = 'event_buffer:sessions_sorted'; // sorted set of session IDs - - // SORTED SET - Tracks sessions that are ready for processing (have >= minEvents) - private readySessionsKey = 'event_buffer:ready_sessions'; - - // STRING - Tracks total buffer size incrementally - protected bufferCounterKey = 'event_buffer:total_count'; - - private readonly sessionKeyPrefix = 'event_buffer:session:'; - // LIST - Stores events for a given session - private getSessionKey(sessionId: string) { - return `${this.sessionKeyPrefix}${sessionId}`; - } - /** - * Optimized Lua script that processes ready sessions efficiently. - * Only fetches from sessions known to have >= minEvents. - * Limits the number of events fetched per session to avoid huge payloads. - */ - private readonly processReadySessionsScript = ` -local readySessionsKey = KEYS[1] -local sessionPrefix = KEYS[2] -local sessionsSortedKey = KEYS[3] -local maxSessions = tonumber(ARGV[1]) -local maxEventsPerSession = tonumber(ARGV[2]) -local startOffset = tonumber(ARGV[3]) or 0 - -local result = {} -local sessionsToRemove = {} - --- Get up to maxSessions ready sessions from window [startOffset, startOffset+maxSessions-1] -local stopIndex = startOffset + maxSessions - 1 -local sessionIds = redis.call('ZRANGE', readySessionsKey, startOffset, stopIndex) -local resultIndex = 1 - -for i, sessionId in ipairs(sessionIds) do - local sessionKey = sessionPrefix .. sessionId - local eventCount = redis.call('LLEN', sessionKey) - - if eventCount == 0 then - -- Session is empty, remove from both sets - table.insert(sessionsToRemove, sessionId) - else - -- Fetch limited number of events to avoid huge payloads - local eventsToFetch = math.min(eventCount, maxEventsPerSession) - local events = redis.call('LRANGE', sessionKey, 0, eventsToFetch - 1) - - result[resultIndex] = { - sessionId = sessionId, - events = events, - totalEventCount = eventCount - } - resultIndex = resultIndex + 1 - end -end - --- Clean up empty sessions from both ready set and sorted set -if #sessionsToRemove > 0 then - redis.call('ZREM', readySessionsKey, unpack(sessionsToRemove)) - redis.call('ZREM', sessionsSortedKey, unpack(sessionsToRemove)) - -- Also delete the empty session keys - for i, sessionId in ipairs(sessionsToRemove) do - redis.call('DEL', sessionPrefix .. sessionId) - end -end - -return cjson.encode(result) -`; - - /** - * Optimized atomic Lua script to update a session's list with pending events. - * Also manages the ready_sessions set and buffer counter. - * - * KEYS[1] = session key - * KEYS[2] = ready sessions key - * KEYS[3] = sessions sorted key - * KEYS[4] = buffer counter key - * ARGV[1] = sessionId - * ARGV[2] = snapshotCount (number of events that were present in our snapshot) - * ARGV[3] = pendingCount (number of pending events) - * ARGV[4] = minEventsInSession - * ARGV[5..(4+pendingCount)] = the pending event strings - */ - private readonly updateSessionScript = ` -local sessionKey = KEYS[1] -local readySessionsKey = KEYS[2] -local sessionsSortedKey = KEYS[3] -local bufferCounterKey = KEYS[4] -local sessionId = ARGV[1] -local snapshotCount = tonumber(ARGV[2]) -local pendingCount = tonumber(ARGV[3]) -local minEventsInSession = tonumber(ARGV[4]) - --- Trim the list to remove the processed (snapshot) events. -redis.call("LTRIM", sessionKey, snapshotCount, -1) - --- Re-insert the pending events at the head in their original order. -for i = pendingCount, 1, -1 do - redis.call("LPUSH", sessionKey, ARGV[i+4]) -end - -local newLength = redis.call("LLEN", sessionKey) - --- Update ready sessions set based on new length -if newLength == 0 then - -- Session is now empty, remove from both sets and delete key - redis.call("ZREM", readySessionsKey, sessionId) - redis.call("ZREM", sessionsSortedKey, sessionId) - redis.call("DEL", sessionKey) -elseif newLength >= minEventsInSession then - -- Session has enough events, keep/add it in ready_sessions - redis.call("ZADD", readySessionsKey, redis.call("TIME")[1], sessionId) -else - -- Session has events but < minEvents, remove from ready_sessions - -- It will be re-added when a new event arrives (via addEventScript) - redis.call("ZREM", readySessionsKey, sessionId) -end - --- Update buffer counter (decrement by processed events, increment by pending) -local counterChange = pendingCount - snapshotCount -if counterChange ~= 0 then - redis.call("INCRBY", bufferCounterKey, counterChange) -end - -return newLength -`; - - /** - * Optimized batch update script with counter and ready sessions management. - * KEYS[1] = ready sessions key - * KEYS[2] = sessions sorted key - * KEYS[3] = buffer counter key - * ARGV format: [minEventsInSession, sessionKey1, sessionId1, snapshotCount1, pendingCount1, pending1...., sessionKey2, ...] - */ - private readonly batchUpdateSessionsScript = ` -local readySessionsKey = KEYS[1] -local sessionsSortedKey = KEYS[2] -local bufferCounterKey = KEYS[3] -local minEventsInSession = tonumber(ARGV[1]) -local totalCounterChange = 0 - -local i = 2 -while i <= #ARGV do - local sessionKey = ARGV[i] - local sessionId = ARGV[i + 1] - local snapshotCount = tonumber(ARGV[i + 2]) - local pendingCount = tonumber(ARGV[i + 3]) - - -- Trim the list to remove processed events - redis.call("LTRIM", sessionKey, snapshotCount, -1) - - -- Re-insert pending events at the head in original order - if pendingCount > 0 then - -- Reinsert in original order: LPUSH requires reverse iteration - for j = pendingCount, 1, -1 do - redis.call("LPUSH", sessionKey, ARGV[i + 3 + j]) - end - end - - local newLength = redis.call("LLEN", sessionKey) - - -- Update ready sessions set based on new length - if newLength == 0 then - -- Session is now empty, remove from both sets and delete key - redis.call("ZREM", readySessionsKey, sessionId) - redis.call("ZREM", sessionsSortedKey, sessionId) - redis.call("DEL", sessionKey) - elseif newLength >= minEventsInSession then - -- Session has enough events, keep/add it in ready_sessions - redis.call("ZADD", readySessionsKey, redis.call("TIME")[1], sessionId) - else - -- Session has events but < minEvents, remove from ready_sessions - -- It will be re-added when a new event arrives (via addEventScript) - redis.call("ZREM", readySessionsKey, sessionId) - end - - -- Track counter change - totalCounterChange = totalCounterChange + (pendingCount - snapshotCount) - - i = i + 4 + pendingCount -end - --- Update buffer counter once -if totalCounterChange ~= 0 then - redis.call("INCRBY", bufferCounterKey, totalCounterChange) -end - -return "OK" -`; + private readonly redisKey = 'event-buffer'; + private redis: Redis; constructor() { super({ name: 'event', onFlush: async () => { await this.processBuffer(); - await this.tryCleanup(); }, }); + this.redis = getRedisCache(); } bulkAdd(events: IClickhouseEvent[]) { - const redis = getRedisCache(); - const multi = redis.multi(); + const multi = this.redis.multi(); for (const event of events) { this.add(event, multi); } return multi.exec(); } - /** - * Optimized Lua script for adding events with counter management. - * KEYS[1] = session key (if session event) - * KEYS[2] = regular queue key - * KEYS[3] = sessions sorted key - * KEYS[4] = ready sessions key - * KEYS[5] = buffer counter key - * KEYS[6] = last event key (if screen_view) - * ARGV[1] = event JSON - * ARGV[2] = session_id - * ARGV[3] = event_name - * ARGV[4] = score (timestamp) - * ARGV[5] = minEventsInSession - * ARGV[6] = last event TTL (if screen_view) - */ - private readonly addEventScript = ` -local sessionKey = KEYS[1] -local regularQueueKey = KEYS[2] -local sessionsSortedKey = KEYS[3] -local readySessionsKey = KEYS[4] -local bufferCounterKey = KEYS[5] -local lastEventKey = KEYS[6] - -local eventJson = ARGV[1] -local sessionId = ARGV[2] -local eventName = ARGV[3] -local score = tonumber(ARGV[4]) -local minEventsInSession = tonumber(ARGV[5]) -local lastEventTTL = tonumber(ARGV[6] or 0) - -local counterIncrement = 1 - -if sessionId and sessionId ~= "" and (eventName == "screen_view" or eventName == "session_end") then - -- Add to session - redis.call("RPUSH", sessionKey, eventJson) - redis.call("ZADD", sessionsSortedKey, "NX", score, sessionId) - - -- Check if session is now ready for processing - local sessionLength = redis.call("LLEN", sessionKey) - if sessionLength >= minEventsInSession or eventName == "session_end" then - redis.call("ZADD", readySessionsKey, score, sessionId) - end - - -- Handle screen_view specific logic - if eventName == "screen_view" and lastEventKey ~= "" then - redis.call("SET", lastEventKey, eventJson, "EX", lastEventTTL) - elseif eventName == "session_end" and lastEventKey ~= "" then - redis.call("DEL", lastEventKey) - end -else - -- Add to regular queue - redis.call("RPUSH", regularQueueKey, eventJson) -end - --- Increment buffer counter -redis.call("INCR", bufferCounterKey) - -return "OK" -`; - - /** - * Add an event into Redis. - * Uses optimized Lua script to reduce round trips and manage counters. - */ async add(event: IClickhouseEvent, _multi?: ReturnType) { try { - const redis = getRedisCache(); const eventJson = JSON.stringify(event); - const multi = _multi || redis.multi(); - - const isSessionEvent = - event.session_id && this.sessionEvents.includes(event.name); - - if (isSessionEvent) { - const sessionKey = this.getSessionKey(event.session_id); - const score = new Date(event.created_at || Date.now()).getTime(); - const lastEventKey = - event.name === 'screen_view' - ? this.getLastEventKey({ - projectId: event.project_id, - profileId: event.profile_id, - }) - : event.name === 'session_end' - ? this.getLastEventKey({ - projectId: event.project_id, - profileId: event.profile_id, - }) - : ''; - - multi.eval( - this.addEventScript, - 6, - sessionKey, - this.regularQueueKey, - this.sessionSortedKey, - this.readySessionsKey, - this.bufferCounterKey, - lastEventKey, - eventJson, - event.session_id, - event.name, - score.toString(), - this.minEventsInSession.toString(), - '3600', // 1 hour TTL for last event - ); - } else { - // Non-session events go to regular queue - multi - .rpush(this.regularQueueKey, eventJson) - .incr(this.bufferCounterKey); + const multi = _multi || this.redis.multi(); + + multi.rpush(this.redisKey, eventJson).incr(this.bufferCounterKey); + + // Store last screen_view for event enrichment + if (event.name === 'screen_view' && event.profile_id) { + const lastEventKey = this.getLastEventKey({ + projectId: event.project_id, + profileId: event.profile_id, + }); + multi.set(lastEventKey, eventJson, 'EX', this.lastScreenViewTTL); + } + + // Clear last screen_view on session_end + if (event.name === 'session_end' && event.profile_id) { + const lastEventKey = this.getLastEventKey({ + projectId: event.project_id, + profileId: event.profile_id, + }); + multi.del(lastEventKey); } if (event.profile_id) { @@ -412,195 +83,104 @@ return "OK" } await publishEvent('events', 'received', transformEvent(event)); + + // Check buffer length using counter + const bufferLength = await this.getBufferSize(); + + if (bufferLength >= this.batchSize) { + await this.tryFlush(); + } } catch (error) { this.logger.error('Failed to add event to Redis buffer', { error }); } } - private async getEligibleSessions( - startOffset: number, - maxEventsPerSession: number, - sessionsPerPage: number, - ) { - const sessionsSorted = await getRedisCache().eval( - this.processReadySessionsScript, - 3, // number of KEYS - this.readySessionsKey, - this.sessionKeyPrefix, - this.sessionSortedKey, - sessionsPerPage.toString(), - maxEventsPerSession.toString(), - startOffset.toString(), - ); - - const parsed = getSafeJson< - Array<{ - sessionId: string; - events: string[]; - totalEventCount: number; - }> - >(sessionsSorted as string); - - const sessions: Record< - string, - { - events: IClickhouseEvent[]; - totalEventCount: number; + /** + * Retrieve the latest screen_view event for a given project/profile + * Used for event enrichment - inheriting properties from last screen view + */ + public async getLastScreenView({ + projectId, + profileId, + }: { + projectId: string; + profileId: string; + }): Promise; + public async getLastScreenView({ + projectId, + sessionId, + }: { + projectId: string; + sessionId: string; + }): Promise; + public async getLastScreenView({ + projectId, + profileId, + sessionId, + }: { + projectId: string; + profileId?: string; + sessionId?: string; + }): Promise { + if (profileId) { + const redis = getRedisCache(); + const eventStr = await redis.get( + this.getLastEventKey({ projectId, profileId }), + ); + if (eventStr) { + const parsed = getSafeJson(eventStr); + if (parsed) { + return transformEvent(parsed); + } } - > = {}; - - if (!parsed || !Array.isArray(parsed)) { - return sessions; } - for (const session of parsed) { - const events = session.events - .map((e) => getSafeJson(e)) - .filter((e): e is IClickhouseEvent => e !== null); - - sessions[session.sessionId] = { - events, - totalEventCount: session.totalEventCount, - }; - } + // sessionId lookup not supported in simplified version + // Events are processed immediately, no session-specific storage + return null; + } - return sessions; + private getLastEventKey({ + projectId, + profileId, + }: { + projectId: string; + profileId: string; + }) { + return `session:last_screen_view:${projectId}:${profileId}`; } - /** - * Process the Redis buffer. - * - * 1. Fetch events from two sources in parallel: - * - Pick events from regular queue (batchSize / 2) - * - Pick events from sessions (batchSize / 2). - * This only have screen_view and session_end events - * - * 2. Process session events: - * - For screen_view events, calculate duration if next event exists - * - Last screen_view of each session remains pending - * - All other events are marked for flushing - * - * 3. Process regular queue events: - * - Inherit path/origin from last screen_view of same session if exists - * - * 4. Insert all flushable events into ClickHouse in chunks and publish notifications - * - * 5. Clean up processed events: - * - For regular queue: LTRIM processed events - * - For sessions: Update lists atomically via Lua script, preserving pending events - */ async processBuffer() { - const redis = getRedisCache(); - const eventsToClickhouse: IClickhouseEvent[] = []; - const pendingUpdates: Array<{ - sessionId: string; - snapshotCount: number; - pending: IClickhouseEvent[]; - }> = []; - const timer = { - fetchUnprocessedEvents: 0, - processSessionEvents: 0, - processRegularQueueEvents: 0, - insertEvents: 0, - updatePendingSessions: 0, - }; - try { - let now = performance.now(); - // (A) Fetch no-session events once per run - const regularQueueEvents = await redis.lrange( - this.regularQueueKey, + // Get events from the start without removing them + const events = await this.redis.lrange( + this.redisKey, 0, - Math.floor(this.batchSize / 2) - 1, + this.batchSize - 1, ); - // (A2) Page through ready sessions within time and budget - let sessionBudget = Math.floor(this.batchSize / 2); - let startOffset = 0; - let totalSessionEventsFetched = 0; - while (sessionBudget > 0) { - if (performance.now() - now > this.flushTimeBudgetMs) { - this.logger.debug('Stopping session paging due to time budget'); - break; - } - - const sessionsPerPage = Math.min( - this.maxSessionsPerFlush, - Math.max(1, Math.floor(sessionBudget / 2)), - ); - const perSessionBudget = Math.max( - 2, - Math.floor(sessionBudget / sessionsPerPage), - ); - - const sessionsPage = await this.getEligibleSessions( - startOffset, - perSessionBudget, - sessionsPerPage, - ); - const sessionIds = Object.keys(sessionsPage); - if (sessionIds.length === 0) { - break; - } - - for (const sessionId of sessionIds) { - const sessionData = sessionsPage[sessionId]!; - const { flush, pending } = this.processSessionEvents( - sessionData.events, - ); - - if (flush.length > 0) { - eventsToClickhouse.push(...flush); - } - - pendingUpdates.push({ - sessionId, - snapshotCount: sessionData.events.length, - pending, - }); - - // Decrease budget by fetched events for this session window - sessionBudget -= sessionData.events.length; - totalSessionEventsFetched += sessionData.events.length; - if (sessionBudget <= 0) { - break; - } - } - startOffset += sessionsPerPage; - } - - timer.processSessionEvents = performance.now() - now; - now = performance.now(); - - // (B) Process no-session events - for (const eventStr of regularQueueEvents) { - const event = getSafeJson(eventStr); - if (event) { - eventsToClickhouse.push(event); - } - } - - timer.processRegularQueueEvents = performance.now() - now; - now = performance.now(); - - if (eventsToClickhouse.length === 0) { + if (events.length === 0) { this.logger.debug('No events to process'); return; } - // (C) Sort events by creation time. + const eventsToClickhouse = events + .map((e) => getSafeJson(e)) + .filter((e): e is IClickhouseEvent => e !== null); + + // Sort events by creation time eventsToClickhouse.sort( (a, b) => new Date(a.created_at || 0).getTime() - new Date(b.created_at || 0).getTime(), ); - // (D) Insert events into ClickHouse in chunks this.logger.info('Inserting events into ClickHouse', { totalEvents: eventsToClickhouse.length, chunks: Math.ceil(eventsToClickhouse.length / this.chunkSize), }); + // Insert events into ClickHouse in chunks for (const chunk of this.chunks(eventsToClickhouse, this.chunkSize)) { await ch.insert({ table: 'events', @@ -609,321 +189,30 @@ return "OK" }); } - timer.insertEvents = performance.now() - now; - now = performance.now(); - - // (E) Publish "saved" events. + // Publish "saved" events const pubMulti = getRedisPub().multi(); for (const event of eventsToClickhouse) { await publishEvent('events', 'saved', transformEvent(event), pubMulti); } await pubMulti.exec(); - // (F) Only after successful processing, update Redis - const multi = redis.multi(); - - // Clean up no-session events and update counter - if (regularQueueEvents.length > 0) { - multi - .ltrim(this.regularQueueKey, regularQueueEvents.length, -1) - .decrby(this.bufferCounterKey, regularQueueEvents.length); - } - + // Only remove events after successful insert and update counter + const multi = this.redis.multi(); + multi + .ltrim(this.redisKey, events.length, -1) + .decrby(this.bufferCounterKey, events.length); await multi.exec(); - // Process pending sessions in batches - await this.processPendingSessionsInBatches(redis, pendingUpdates); - - timer.updatePendingSessions = performance.now() - now; - this.logger.info('Processed events from Redis buffer', { - batchSize: this.batchSize, - eventsToClickhouse: eventsToClickhouse.length, - pendingSessionUpdates: pendingUpdates.length, - sessionEventsFetched: totalSessionEventsFetched, - regularEvents: regularQueueEvents.length, - timer, + count: eventsToClickhouse.length, }); } catch (error) { this.logger.error('Error processing Redis buffer', { error }); } } - /** - * Process a session's events. - * - * For each event in the session (in order): - * - If it is a screen_view, look for a subsequent event (screen_view or session_end) - * to calculate its duration. If found, flush it; if not, leave it pending. - * - * Returns an object with two arrays: - * flush: events to be sent to ClickHouse. - * pending: events that remain in the Redis session list. - */ - private processSessionEvents(events: IClickhouseEvent[]): { - flush: IClickhouseEvent[]; - pending: IClickhouseEvent[]; - } { - // Ensure events are sorted by created_at - events.sort( - (a, b) => - new Date(a.created_at || 0).getTime() - - new Date(b.created_at || 0).getTime(), - ); - - const flush: IClickhouseEvent[] = []; - const pending: IClickhouseEvent[] = []; - - // Check if session has ended - if so, flush everything - const hasSessionEnd = events.some((e) => e.name === 'session_end'); - - if (hasSessionEnd) { - flush.push(...events); - return { flush, pending: [] }; - } - - const findNextScreenView = (events: IClickhouseEvent[]) => { - return events.find((e) => e.name === 'screen_view'); - }; - - for (let i = 0; i < events.length; i++) { - const event = events[i]!; - // For screen_view events, look for next event - const next = findNextScreenView(events.slice(i + 1)); - if (next) { - event.duration = - new Date(next.created_at).getTime() - - new Date(event.created_at).getTime(); - flush.push(event); - } else { - // Last screen_view with no next event - keep pending - pending.push(event); - } - } - - return { flush, pending }; - } - - async tryCleanup() { - try { - await runEvery({ - interval: 60 * 60 * 24, - fn: this.cleanup.bind(this), - key: `${this.name}-cleanup`, - }); - } catch (error) { - this.logger.error('Failed to run cleanup', { error }); - } - } - - /** - * Cleanup old events from Redis. - * For each key (no-session and per-session), remove events older than the cutoff date. - */ - async cleanup() { - const redis = getRedisCache(); - const cutoffTime = Date.now() - 1000 * 60 * 60 * 24 * this.daysToKeep; - - try { - const sessionCount = await redis.zcard(this.sessionSortedKey); - const batchSize = 1000; - let offset = 0; - let totalCleaned = 0; - - this.logger.info('Starting cleanup of stale sessions', { - cutoffTime: new Date(cutoffTime), - totalSessions: sessionCount, - }); - - while (offset < sessionCount) { - // Get batch of session IDs with scores - const sessionIdsWithScores = await redis.zrange( - this.sessionSortedKey, - offset, - offset + batchSize - 1, - 'WITHSCORES', - ); - - if (sessionIdsWithScores.length === 0) break; - - const pipeline = redis.pipeline(); - let staleSessions = 0; - - // Process pairs of [sessionId, score] - for (let i = 0; i < sessionIdsWithScores.length; i += 2) { - const sessionId = sessionIdsWithScores[i]; - const score = Number.parseInt(sessionIdsWithScores[i + 1] || '0', 10); - - if (sessionId && score < cutoffTime) { - staleSessions++; - // Remove from both sorted sets and delete the session key - pipeline.zrem(this.sessionSortedKey, sessionId); - pipeline.zrem(this.readySessionsKey, sessionId); - pipeline.del(this.getSessionKey(sessionId)); - } - } - - if (staleSessions > 0) { - await pipeline.exec(); - totalCleaned += staleSessions; - this.logger.info('Cleaned batch of stale sessions', { - batch: Math.floor(offset / batchSize) + 1, - cleanedInBatch: staleSessions, - totalCleaned, - }); - } - - offset += batchSize; - } - - this.logger.info('Cleanup completed', { totalCleaned }); - } catch (error) { - this.logger.error('Failed to cleanup stale sessions', { error }); - } - } - - /** - * Retrieve the latest screen_view event for a given project/profile or project/session - */ - public async getLastScreenView({ - projectId, - ...rest - }: - | { - projectId: string; - profileId: string; - } - | { - projectId: string; - sessionId: string; - }): Promise { - if ('profileId' in rest) { - const redis = getRedisCache(); - const eventStr = await redis.get( - this.getLastEventKey({ projectId, profileId: rest.profileId }), - ); - if (eventStr) { - const parsed = getSafeJson(eventStr); - if (parsed) { - return transformEvent(parsed); - } - } - } - - if ('sessionId' in rest) { - const redis = getRedisCache(); - const sessionKey = this.getSessionKey(rest.sessionId); - const lastEvent = await redis.lindex(sessionKey, -1); - if (lastEvent) { - const parsed = getSafeJson(lastEvent); - if (parsed) { - return transformEvent(parsed); - } - } - } - - return null; - } - - private getLastEventKey({ - projectId, - profileId, - }: { - projectId: string; - profileId: string; - }) { - return `session:last_screen_view:${projectId}:${profileId}`; - } - - private async processPendingSessionsInBatches( - redis: ReturnType, - pendingUpdates: Array<{ - sessionId: string; - snapshotCount: number; - pending: IClickhouseEvent[]; - }>, - ) { - for (const batch of this.chunks( - pendingUpdates, - this.updatePendingSessionsBatchSize, - )) { - const batchArgs: string[] = [this.minEventsInSession.toString()]; - - for (const { sessionId, snapshotCount, pending } of batch) { - const sessionKey = this.getSessionKey(sessionId); - batchArgs.push( - sessionKey, - sessionId, - snapshotCount.toString(), - pending.length.toString(), - ...pending.map((e) => JSON.stringify(e)), - ); - } - - await redis.eval( - this.batchUpdateSessionsScript, - 3, // KEYS: ready sessions, sessions sorted, buffer counter - this.readySessionsKey, - this.sessionSortedKey, - this.bufferCounterKey, - ...batchArgs, - ); - } - } - - public async getBufferSizeHeavy() { - // Fallback method for when counter is not available - const redis = getRedisCache(); - - // Get regular queue count - const regularQueueCount = await redis.llen(this.regularQueueKey); - - // Get total number of sessions - const sessionCount = await redis.zcard(this.sessionSortedKey); - - if (sessionCount === 0) { - return regularQueueCount; - } - - // Process sessions in batches to avoid memory spikes - const batchSize = 1000; - let totalSessionEvents = 0; - let offset = 0; - - while (offset < sessionCount) { - // Get batch of session IDs - const sessionIds = await redis.zrange( - this.sessionSortedKey, - offset, - offset + batchSize - 1, - ); - - if (sessionIds.length === 0) break; - - // Queue up LLEN commands for this batch - const sessionPipeline = redis.pipeline(); - for (const sessionId of sessionIds) { - sessionPipeline.llen(this.getSessionKey(sessionId)); - } - - // Execute pipeline for this batch - const sessionCounts = (await sessionPipeline.exec()) as [any, any][]; - - // Sum up counts from this batch - for (const [err, count] of sessionCounts) { - if (!err) { - totalSessionEvents += count; - } - } - - offset += batchSize; - } - - return regularQueueCount + totalSessionEvents; - } - - public async getBufferSize() { - return this.getBufferSizeWithCounter(() => this.getBufferSizeHeavy()); + async getBufferSize() { + return this.getBufferSizeWithCounter(() => this.redis.llen(this.redisKey)); } private async incrementActiveVisitorCount( diff --git a/packages/db/src/clickhouse/client.ts b/packages/db/src/clickhouse/client.ts index b363cb44..de267bbb 100644 --- a/packages/db/src/clickhouse/client.ts +++ b/packages/db/src/clickhouse/client.ts @@ -56,14 +56,15 @@ export const TABLE_NAMES = { event_property_values_mv: 'event_property_values_mv', cohort_events_mv: 'cohort_events_mv', sessions: 'sessions', + events_imports: 'events_imports', }; export const CLICKHOUSE_OPTIONS: NodeClickHouseClientConfigOptions = { max_open_connections: 30, - request_timeout: 60000, + request_timeout: 300000, keep_alive: { enabled: true, - idle_socket_ttl: 8000, + idle_socket_ttl: 60000, }, compression: { request: true, @@ -132,7 +133,27 @@ export const ch = new Proxy(originalCh, { const value = Reflect.get(target, property, receiver); if (property === 'insert') { - return (...args: any[]) => withRetry(() => value.apply(target, args)); + return (...args: any[]) => + withRetry(() => { + args[0].clickhouse_settings = { + // Allow bigger HTTP payloads/time to stream rows + async_insert: 1, + wait_for_async_insert: 1, + // Increase insert timeouts and buffer sizes for large batches + max_execution_time: 300, + max_insert_block_size: '500000', + max_http_get_redirects: '0', + // Ensure JSONEachRow stays efficient + input_format_parallel_parsing: 1, + // Keep long-running inserts/queries from idling out at proxies by sending progress headers + send_progress_in_http_headers: 1, + http_headers_progress_interval_ms: '50000', + // Ensure server holds the connection until the query is finished + wait_end_of_query: 1, + ...args[0].clickhouse_settings, + }; + return value.apply(target, args); + }); } return value; diff --git a/packages/db/src/services/event.service.ts b/packages/db/src/services/event.service.ts index 195fc887..6111bdb4 100644 --- a/packages/db/src/services/event.service.ts +++ b/packages/db/src/services/event.service.ts @@ -510,7 +510,7 @@ export async function getEventList(options: GetEventListOptions) { sb.select.model = 'model'; } if (select.duration) { - sb.select.duration = 'duration'; + sb.select.duration = `${getDurationSql()} as duration`; } if (select.path) { sb.select.path = 'path'; @@ -570,7 +570,7 @@ export async function getEventList(options: GetEventListOptions) { custom(sb); } - console.log('getSql()', getSql()); + console.log('getSql() ----> ', getSql()); const data = await getEvents(getSql(), { profile: select.profile ?? true, @@ -1016,3 +1016,6 @@ class EventService { } export const eventService = new EventService(ch); + +export const getDurationSql = (field = 'created_at') => + `dateDiff('millisecond', ${field}, leadInFrame(toNullable(${field}), 1, NULL) OVER (PARTITION BY session_id ORDER BY ${field} ASC ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING))`; diff --git a/packages/db/src/services/overview.service.ts b/packages/db/src/services/overview.service.ts index e0897c17..a5df87bc 100644 --- a/packages/db/src/services/overview.service.ts +++ b/packages/db/src/services/overview.service.ts @@ -2,9 +2,14 @@ import { average, sum } from '@openpanel/common'; import { getCache } from '@openpanel/redis'; import { type IChartEventFilter, zTimeInterval } from '@openpanel/validation'; import { omit } from 'ramda'; +import sqlstring from 'sqlstring'; import { z } from 'zod'; -import { TABLE_NAMES, ch } from '../clickhouse/client'; -import { clix } from '../clickhouse/query-builder'; +import { + TABLE_NAMES, + ch, + chQuery, + formatClickhouseDate, +} from '../clickhouse/client'; import { getEventFiltersWhereClause } from './chart.service'; export const zGetMetricsInput = z.object({ @@ -91,55 +96,77 @@ export class OverviewService { return filters.some((filter) => filter.name === 'path' && filter.value); } - getTotalSessions({ - projectId, - startDate, - endDate, - filters, - timezone, - }: { - projectId: string; - startDate: string; - endDate: string; - filters: IChartEventFilter[]; - timezone: string; - }) { - const where = this.getRawWhereClause('sessions', filters); - const key = `total_sessions_${projectId}_${startDate}_${endDate}_${JSON.stringify(filters)}`; - - // Check if there's already a pending query for this key - const pendingQuery = this.pendingQueries.get(key); - if (pendingQuery) { - return pendingQuery.then((res) => res ?? 0); + private toStartOfInterval( + field: string, + interval: string, + timezone: string, + ): string { + const tzPart = timezone ? `, '${timezone}'` : ''; + switch (interval) { + case 'hour': + return `toStartOfHour(${field}${tzPart})`; + case 'day': + return `toStartOfDay(${field}${tzPart})`; + case 'week': + // toStartOfWeek(date, mode) - mode is UInt8, NOT timezone + // mode 0 = Sunday, mode 1 = Monday + // For timezone support, we need to convert to timezone first + if (timezone) { + return `toStartOfWeek(toTimeZone(${field}, '${timezone}'), 1)`; + } + return `toStartOfWeek(${field}, 1)`; + case 'month': + return `toStartOfMonth(${field}${tzPart})`; + case 'year': + return `toStartOfYear(${field}${tzPart})`; + default: + return `toStartOfDay(${field}${tzPart})`; } + } + + private toInterval(value: string, interval: string): string { + return `INTERVAL ${value} ${interval}`; + } - // Create new query promise and store it - const queryPromise = getCache(key, 15, async () => { - try { - const result = await clix(this.client, timezone) - .select<{ - total_sessions: number; - }>(['sum(sign) as total_sessions']) - .from(TABLE_NAMES.sessions, true) - .where('project_id', '=', projectId) - .where('created_at', 'BETWEEN', [ - clix.datetime(startDate, 'toDateTime'), - clix.datetime(endDate, 'toDateTime'), - ]) - .rawWhere(where) - .having('sum(sign)', '>', 0) - .execute(); - return result?.[0]?.total_sessions ?? 0; - } catch (error) { - return 0; + private buildWhereClause( + type: 'events' | 'sessions', + filters: IChartEventFilter[], + ): string { + const mappedFilters = filters.map((item) => { + if (type === 'sessions') { + if (item.name === 'path') { + return { ...item, name: 'entry_path' }; + } + if (item.name === 'origin') { + return { ...item, name: 'entry_origin' }; + } + if (item.name.startsWith('properties.__query.utm_')) { + return { + ...item, + name: item.name.replace('properties.__query.utm_', 'utm_'), + }; + } } + return item; }); - this.pendingQueries.set(key, queryPromise); - return queryPromise; + const where = getEventFiltersWhereClause(mappedFilters); + return Object.values(where).filter(Boolean).join(' AND '); } - getMetrics({ + /** + * Get overview metrics and time series. + * + * Performance optimization (following Plausible's approach): + * - WITHOUT page filters: Query sessions table directly (much faster!) + * - WITH page filters: Query events table and join with sessions for bounce rate + * + * This optimization significantly improves performance because: + * 1. Sessions table is much smaller than events table + * 2. Sessions table already has pre-aggregated data (screen_view_count, duration, etc.) + * 3. When filtering by page, we must hit events table anyway to match specific pages + */ + async getMetrics({ projectId, filters, startDate, @@ -165,149 +192,105 @@ export class OverviewService { views_per_session: number; }[]; }> { - const where = this.getRawWhereClause('sessions', filters); - if (this.isPageFilter(filters)) { - // Session aggregation with bounce rates - const sessionAggQuery = clix(this.client, timezone) - .select([ - `${clix.toStartOf('created_at', interval, timezone)} AS date`, - 'round((countIf(is_bounce = 1 AND sign = 1) * 100.) / countIf(sign = 1), 2) AS bounce_rate', - ]) - .from(TABLE_NAMES.sessions, true) - .where('sign', '=', 1) - .where('project_id', '=', projectId) - .where('created_at', 'BETWEEN', [ - clix.datetime(startDate, 'toDateTime'), - clix.datetime(endDate, 'toDateTime'), - ]) - .rawWhere(where) - .groupBy(['date']) - .rollup() - .orderBy('date', 'ASC'); - - // Overall unique visitors - const overallUniqueVisitorsQuery = clix(this.client, timezone) - .select([ - 'uniq(profile_id) AS unique_visitors', - 'uniq(session_id) AS total_sessions', - ]) - .from(TABLE_NAMES.events) - .where('project_id', '=', projectId) - .where('name', '=', 'screen_view') - .where('created_at', 'BETWEEN', [ - clix.datetime(startDate, 'toDateTime'), - clix.datetime(endDate, 'toDateTime'), - ]) - .rawWhere(this.getRawWhereClause('events', filters)); - - return clix(this.client, timezone) - .with('session_agg', sessionAggQuery) - .with( - 'overall_bounce_rate', - clix(this.client, timezone) - .select(['bounce_rate']) - .from('session_agg') - .where('date', '=', clix.datetime('1970-01-01 00:00:00')), - ) - .with( - 'daily_stats', - clix(this.client, timezone) - .select(['date', 'bounce_rate']) - .from('session_agg') - .where('date', '!=', clix.datetime('1970-01-01 00:00:00')), - ) - .with('overall_unique_visitors', overallUniqueVisitorsQuery) - .select<{ - date: string; - bounce_rate: number; - unique_visitors: number; - total_sessions: number; - avg_session_duration: number; - total_screen_views: number; - views_per_session: number; - overall_unique_visitors: number; - overall_total_sessions: number; - overall_bounce_rate: number; - }>([ - `${clix.toStartOf('e.created_at', interval)} AS date`, - 'ds.bounce_rate as bounce_rate', - 'uniq(e.profile_id) AS unique_visitors', - 'uniq(e.session_id) AS total_sessions', - 'round(avgIf(duration, duration > 0), 2) / 1000 AS _avg_session_duration', - 'if(isNaN(_avg_session_duration), 0, _avg_session_duration) AS avg_session_duration', - 'count(*) AS total_screen_views', - 'round((count(*) * 1.) / uniq(e.session_id), 2) AS views_per_session', - '(SELECT unique_visitors FROM overall_unique_visitors) AS overall_unique_visitors', - '(SELECT total_sessions FROM overall_unique_visitors) AS overall_total_sessions', - '(SELECT bounce_rate FROM overall_bounce_rate) AS overall_bounce_rate', - ]) - .from(`${TABLE_NAMES.events} AS e`) - .leftJoin( - 'daily_stats AS ds', - `${clix.toStartOf('e.created_at', interval)} = ds.date`, - ) - .where('e.project_id', '=', projectId) - .where('e.name', '=', 'screen_view') - .where('e.created_at', 'BETWEEN', [ - clix.datetime(startDate, 'toDateTime'), - clix.datetime(endDate, 'toDateTime'), - ]) - .rawWhere(this.getRawWhereClause('events', filters)) - .groupBy(['date', 'ds.bounce_rate']) - .orderBy('date', 'ASC') - .fill( - clix.toStartOf( - clix.datetime( - startDate, - ['month', 'week'].includes(interval) ? 'toDate' : 'toDateTime', - ), - interval, + const hasPageFilter = this.isPageFilter(filters); + const startOfInterval = this.toStartOfInterval( + 'created_at', + interval, + timezone, + ); + + // Following Plausible: Use a lookback window for sessions + // Sessions can start before the period but have events within it + const sessionLookbackDays = 7; + const sessionLookbackStart = new Date(startDate); + sessionLookbackStart.setDate( + sessionLookbackStart.getDate() - sessionLookbackDays, + ); + + if (hasPageFilter) { + // WITH PAGE FILTER: use events table, join sessions for bounce rate + const eventsWhere = this.buildWhereClause('events', filters); + const sessionsWhere = this.buildWhereClause('sessions', filters); + + const sql = ` + WITH + -- Sessions that visited the filtered pages + filtered_sessions AS ( + SELECT DISTINCT + ${startOfInterval} AS date, + session_id, + profile_id + FROM ${TABLE_NAMES.events} + WHERE project_id = ${sqlstring.escape(projectId)} + AND created_at BETWEEN toDateTime(${sqlstring.escape(formatClickhouseDate(startDate))}) + AND toDateTime(${sqlstring.escape(formatClickhouseDate(endDate))}) + AND name = 'screen_view' + ${eventsWhere ? `AND ${eventsWhere}` : ''} ), - clix.datetime( - endDate, - ['month', 'week'].includes(interval) ? 'toDate' : 'toDateTime', + -- Get session stats for filtered sessions + session_stats AS ( + SELECT + ${startOfInterval} AS date, + round(avgIf(duration, duration > 0 AND sign > 0) / 1000, 2) AS avg_session_duration + FROM ${TABLE_NAMES.sessions} s + INNER JOIN filtered_sessions fs ON s.id = fs.session_id + WHERE s.project_id = ${sqlstring.escape(projectId)} + AND s.created_at BETWEEN toDateTime(${sqlstring.escape(formatClickhouseDate(startDate))}) + AND toDateTime(${sqlstring.escape(formatClickhouseDate(endDate))}) + AND sign = 1 + GROUP BY date + WITH ROLLUP ), - clix.toInterval('1', interval), - ) - .transform({ - date: (item) => new Date(item.date).toISOString(), - }) - .execute() - .then((res) => { - const anyRowWithData = res.find( - (item) => - item.overall_bounce_rate !== null || - item.overall_total_sessions !== null || - item.overall_unique_visitors !== null, - ); - return { - metrics: { - bounce_rate: anyRowWithData?.overall_bounce_rate ?? 0, - unique_visitors: anyRowWithData?.overall_unique_visitors ?? 0, - total_sessions: anyRowWithData?.overall_total_sessions ?? 0, - avg_session_duration: average( - res.map((item) => item.avg_session_duration), - ), - total_screen_views: sum( - res.map((item) => item.total_screen_views), - ), - views_per_session: average( - res.map((item) => item.views_per_session), - ), - }, - series: res.map( - omit([ - 'overall_bounce_rate', - 'overall_unique_visitors', - 'overall_total_sessions', - ]), - ), - }; - }); - } + -- Bounce rate calculated separately with entry_path filter + bounce_stats AS ( + SELECT + ${startOfInterval} AS date, + round( + countIf(is_bounce = 1 AND sign = 1) * 100.0 + / nullIf(countIf(sign = 1), 0), + 2 + ) AS bounce_rate + FROM ${TABLE_NAMES.sessions} s + WHERE s.project_id = ${sqlstring.escape(projectId)} + AND s.created_at BETWEEN toDateTime(${sqlstring.escape(formatClickhouseDate(startDate))}) + AND toDateTime(${sqlstring.escape(formatClickhouseDate(endDate))}) + AND sign = 1 + ${sessionsWhere ? `AND ${sessionsWhere}` : ''} + GROUP BY date + WITH ROLLUP + ) + SELECT + ${startOfInterval} AS date, + any(ss.bounce_rate) AS bounce_rate, + uniq(e.profile_id) AS unique_visitors, + uniq(e.session_id) AS total_sessions, + any(ss.avg_session_duration) AS avg_session_duration, + count(*) AS total_screen_views, + round(count(*) * 1.0 / nullIf(uniq(e.session_id), 0), 2) AS views_per_session + FROM ${TABLE_NAMES.events} e + LEFT JOIN session_stats ss ON ${startOfInterval} = ss.date + WHERE e.project_id = ${sqlstring.escape(projectId)} + AND e.created_at BETWEEN toDateTime(${sqlstring.escape(formatClickhouseDate(startDate))}) + AND toDateTime(${sqlstring.escape(formatClickhouseDate(endDate))}) + AND e.name = 'screen_view' + ${eventsWhere ? `AND ${eventsWhere}` : ''} + GROUP BY date + WITH ROLLUP + ORDER BY date ASC WITH FILL + FROM ${this.toStartOfInterval( + `toDateTime(${sqlstring.escape(formatClickhouseDate(startDate))})`, + interval, + timezone, + )} + TO ${this.toStartOfInterval( + `toDateTime(${sqlstring.escape(formatClickhouseDate(endDate))})`, + interval, + timezone, + )} + STEP ${this.toInterval('1', interval)} + `; - const query = clix(this.client, timezone) - .select<{ + const res = await chQuery<{ date: string; bounce_rate: number; unique_visitors: number; @@ -315,94 +298,115 @@ export class OverviewService { avg_session_duration: number; total_screen_views: number; views_per_session: number; - }>([ - `${clix.toStartOf('created_at', interval, timezone)} AS date`, - 'round(sum(sign * is_bounce) * 100.0 / sum(sign), 2) as bounce_rate', - 'uniqIf(profile_id, sign > 0) AS unique_visitors', - 'sum(sign) AS total_sessions', - 'round(avgIf(duration, duration > 0 AND sign > 0), 2) / 1000 AS _avg_session_duration', - 'if(isNaN(_avg_session_duration), 0, _avg_session_duration) AS avg_session_duration', - 'sum(sign * screen_view_count) AS total_screen_views', - 'round(sum(sign * screen_view_count) * 1.0 / sum(sign), 2) AS views_per_session', - ]) - .from('sessions') - .where('created_at', 'BETWEEN', [ - clix.datetime(startDate, 'toDateTime'), - clix.datetime(endDate, 'toDateTime'), - ]) - .where('project_id', '=', projectId) - .rawWhere(where) - .groupBy(['date']) - .having('sum(sign)', '>', 0) - .rollup() - .orderBy('date', 'ASC') - .fill( - clix.toStartOf( - clix.datetime( - startDate, - ['month', 'week'].includes(interval) ? 'toDate' : 'toDateTime', - ), - interval, - ), - clix.datetime( - endDate, - ['month', 'week'].includes(interval) ? 'toDate' : 'toDateTime', - ), - clix.toInterval('1', interval), - ) - .transform({ - date: (item) => new Date(item.date).toISOString(), - }); - - return query.execute().then((res) => { - // First row is the rollup row containing the total values + }>(sql); + + // WITH ROLLUP: First row is the aggregated totals, rest are time series + const rollupRow = res[0]; + const series = res.slice(1).map((r) => ({ + ...r, + date: new Date(r.date).toISOString(), + })); + return { metrics: { - bounce_rate: res[0]?.bounce_rate ?? 0, - unique_visitors: res[0]?.unique_visitors ?? 0, - total_sessions: res[0]?.total_sessions ?? 0, - avg_session_duration: res[0]?.avg_session_duration ?? 0, - total_screen_views: res[0]?.total_screen_views ?? 0, - views_per_session: res[0]?.views_per_session ?? 0, + bounce_rate: rollupRow?.bounce_rate ?? 0, + unique_visitors: rollupRow?.unique_visitors ?? 0, + total_sessions: rollupRow?.total_sessions ?? 0, + avg_session_duration: rollupRow?.avg_session_duration ?? 0, + total_screen_views: rollupRow?.total_screen_views ?? 0, + views_per_session: rollupRow?.views_per_session ?? 0, }, - series: res - .slice(1) - .map(omit(['overall_bounce_rate', 'overall_unique_visitors'])), + series: series.map((s) => ({ + ...s, + bounce_rate: s.bounce_rate ?? 0, + })), }; - }); - } + } - getRawWhereClause(type: 'events' | 'sessions', filters: IChartEventFilter[]) { - const where = getEventFiltersWhereClause( - filters.map((item) => { - if (type === 'sessions') { - if (item.name === 'path') { - return { ...item, name: 'entry_path' }; - } - if (item.name === 'origin') { - return { ...item, name: 'entry_origin' }; - } - if (item.name.startsWith('properties.__query.utm_')) { - return { - ...item, - name: item.name.replace('properties.__query.utm_', 'utm_'), - }; - } - return item; - } - return item; - }), - // .filter((item) => { - // if (this.isPageFilter(filters) && type === 'sessions') { - // return item.name !== 'entry_path' && item.name !== 'entry_origin'; - // } - // return true; - // }), - ); + // WITHOUT PAGE FILTER: use sessions table directly (much faster!) + // All data is pre-aggregated in sessions table, no need to touch events table + const sessionsWhere = this.buildWhereClause('sessions', filters); + + const sql = ` + SELECT + ${startOfInterval} AS date, + round( + sumIf(is_bounce * sign, sign = 1) * 100.0 + / nullIf(sumIf(sign, sign = 1), 0), + 2 + ) AS bounce_rate, + uniqIf(profile_id, sign > 0) AS unique_visitors, + sumIf(sign, sign = 1) AS total_sessions, + round( + avgIf(duration, duration > 0 AND sign > 0) / 1000, + 2 + ) AS avg_session_duration, + sumIf(screen_view_count * sign, sign = 1) AS total_screen_views, + round( + sumIf(screen_view_count * sign, sign = 1) * 1.0 + / nullIf(sumIf(sign, sign = 1), 0), + 2 + ) AS views_per_session + FROM ${TABLE_NAMES.sessions} + WHERE project_id = ${sqlstring.escape(projectId)} + -- Plausible pattern: 7-day lookback on session start for index optimization + AND created_at >= toDateTime(${sqlstring.escape(formatClickhouseDate(sessionLookbackStart))}) + AND ended_at >= toDateTime(${sqlstring.escape(formatClickhouseDate(startDate))}) + AND created_at <= toDateTime(${sqlstring.escape(formatClickhouseDate(endDate))}) + ${sessionsWhere ? `AND ${sessionsWhere}` : ''} + GROUP BY date + WITH ROLLUP + HAVING sum(sign) > 0 + ORDER BY date ASC WITH FILL + FROM ${this.toStartOfInterval( + `toDateTime(${sqlstring.escape(formatClickhouseDate(startDate))})`, + interval, + timezone, + )} + TO ${this.toStartOfInterval( + `toDateTime(${sqlstring.escape(formatClickhouseDate(endDate))})`, + interval, + timezone, + )} + STEP ${this.toInterval('1', interval)} + `; - return Object.values(where).join(' AND '); + const res = await chQuery<{ + date: string; + bounce_rate: number; + unique_visitors: number; + total_sessions: number; + avg_session_duration: number; + total_screen_views: number; + views_per_session: number; + }>(sql); + + // WITH ROLLUP: First row is the aggregated totals, rest are time series + const rollupRow = res[0]; + const series = res.slice(1).map((r) => ({ + ...r, + date: new Date(r.date).toISOString(), + })); + + return { + metrics: { + bounce_rate: rollupRow?.bounce_rate ?? 0, + unique_visitors: rollupRow?.unique_visitors ?? 0, + total_sessions: rollupRow?.total_sessions ?? 0, + avg_session_duration: rollupRow?.avg_session_duration ?? 0, + total_screen_views: rollupRow?.total_screen_views ?? 0, + views_per_session: rollupRow?.views_per_session ?? 0, + }, + series, + }; } + /** + * Get top pages with bounce rates. + * + * Always queries events table for page-level metrics, + * then joins with sessions table for bounce rates. + */ async getTopPages({ projectId, filters, @@ -412,82 +416,86 @@ export class OverviewService { limit = 10, timezone, }: IGetTopPagesInput) { - const pageStatsQuery = clix(this.client, timezone) - .select([ - 'origin', - 'path', - `last_value(properties['__title']) as title`, - 'uniq(session_id) as count', - 'round(avg(duration)/1000, 2) as avg_duration', - ]) - .from(TABLE_NAMES.events, false) - .where('project_id', '=', projectId) - .where('name', '=', 'screen_view') - .where('path', '!=', '') - .where('created_at', 'BETWEEN', [ - clix.datetime(startDate, 'toDateTime'), - clix.datetime(endDate, 'toDateTime'), - ]) - .groupBy(['origin', 'path']) - .orderBy('count', 'DESC') - .limit(limit) - .offset((cursor - 1) * limit); - - const bounceStatsQuery = clix(this.client, timezone) - .select([ - 'entry_path', - 'entry_origin', - 'coalesce(round(countIf(is_bounce = 1 AND sign = 1) * 100.0 / countIf(sign = 1), 2), 0) as bounce_rate', - ]) - .from(TABLE_NAMES.sessions, true) - .where('sign', '=', 1) - .where('project_id', '=', projectId) - .where('created_at', 'BETWEEN', [ - clix.datetime(startDate, 'toDateTime'), - clix.datetime(endDate, 'toDateTime'), - ]) - .groupBy(['entry_path', 'entry_origin']); - - pageStatsQuery.rawWhere(this.getRawWhereClause('events', filters)); - bounceStatsQuery.rawWhere(this.getRawWhereClause('sessions', filters)); - - const mainQuery = clix(this.client, timezone) - .with('page_stats', pageStatsQuery) - .with('bounce_stats', bounceStatsQuery) - .select<{ - title: string; - origin: string; - path: string; - avg_duration: number; - bounce_rate: number; - sessions: number; - }>([ - 'p.title', - 'p.origin', - 'p.path', - 'p.avg_duration', - 'p.count as sessions', - 'b.bounce_rate', - ]) - .from('page_stats p', false) - .leftJoin( - 'bounce_stats b', - 'p.path = b.entry_path AND p.origin = b.entry_origin', - ) - .orderBy('sessions', 'DESC') - .limit(limit); - - const totalSessions = await this.getTotalSessions({ - projectId, - startDate, - endDate, - filters, - timezone, - }); + const eventsWhere = this.buildWhereClause('events', filters); + const sessionsWhere = this.buildWhereClause('sessions', filters); + const offset = (cursor - 1) * limit; - return mainQuery.execute(); + const sql = ` + WITH + -- Step 1: Calculate time spent on each page view (time to next event in session) + page_view_durations AS ( + SELECT + origin, + path, + session_id, + properties, + dateDiff('millisecond', created_at, leadInFrame(toNullable(created_at), 1, NULL) OVER (PARTITION BY session_id ORDER BY created_at ASC ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING)) AS duration + FROM ${TABLE_NAMES.events} + WHERE project_id = ${sqlstring.escape(projectId)} + AND created_at BETWEEN toDateTime(${sqlstring.escape(formatClickhouseDate(startDate))}) + AND toDateTime(${sqlstring.escape(formatClickhouseDate(endDate))}) + AND name = 'screen_view' + AND path != '' + ${eventsWhere ? `AND ${eventsWhere}` : ''} + ), + -- Step 2: Group by page (origin + path) to get stats + page_stats AS ( + SELECT + origin, + path, + anyLast(properties['__title']) AS title, + uniq(session_id) AS count, + round(avgIf(duration, duration > 0) / 1000, 2) AS avg_duration, + countIf(duration > 0) AS duration_count + FROM page_view_durations + GROUP BY origin, path + ORDER BY count DESC + LIMIT ${limit} OFFSET ${offset} + ), + bounce_stats AS ( + SELECT + entry_path, + entry_origin, + COALESCE(round(countIf(is_bounce = 1 AND sign = 1) * 100.0 / countIf(sign = 1), 2), 0) AS bounce_rate + FROM ${TABLE_NAMES.sessions} + WHERE project_id = ${sqlstring.escape(projectId)} + AND created_at BETWEEN toDateTime(${sqlstring.escape(formatClickhouseDate(startDate))}) + AND toDateTime(${sqlstring.escape(formatClickhouseDate(endDate))}) + AND sign = 1 + ${sessionsWhere ? `AND ${sessionsWhere}` : ''} + GROUP BY entry_path, entry_origin + ) + SELECT + p.title, + p.origin, + p.path, + p.avg_duration, + p.count AS sessions, + COALESCE(b.bounce_rate, 0) AS bounce_rate + FROM page_stats p + LEFT JOIN bounce_stats b ON p.path = b.entry_path AND p.origin = b.entry_origin + ORDER BY sessions DESC + `; + + console.log('sql', sql); + + return chQuery<{ + title: string; + origin: string; + path: string; + avg_duration: number; + bounce_rate: number; + sessions: number; + }>(sql); } + /** + * Get top entry/exit pages. + * + * Optimization: + * - WITHOUT page filters: Query sessions table directly + * - WITH page filters: Join with events to filter sessions + */ async getTopEntryExit({ projectId, filters, @@ -498,93 +506,85 @@ export class OverviewService { limit = 10, timezone, }: IGetTopEntryExitInput) { - const where = this.getRawWhereClause('sessions', filters); - - const distinctSessionQuery = this.getDistinctSessions({ - projectId, - filters, - startDate, - endDate, - timezone, - }); - + const hasPageFilter = this.isPageFilter(filters); + const sessionsWhere = this.buildWhereClause('sessions', filters); const offset = (cursor - 1) * limit; - const query = clix(this.client, timezone) - .select<{ + if (hasPageFilter) { + // WITH PAGE FILTER: restrict to sessions that have matching events + const eventsWhere = this.buildWhereClause('events', filters); + + const sql = ` + WITH distinct_sessions AS ( + SELECT DISTINCT session_id + FROM ${TABLE_NAMES.events} + WHERE project_id = ${sqlstring.escape(projectId)} + AND created_at BETWEEN toDateTime(${sqlstring.escape(formatClickhouseDate(startDate))}) + AND toDateTime(${sqlstring.escape(formatClickhouseDate(endDate))}) + ${eventsWhere ? `AND ${eventsWhere}` : ''} + ) + SELECT + ${mode}_origin AS origin, + ${mode}_path AS path, + round(avg(duration * sign) / 1000, 2) AS avg_duration, + round(sum(sign * is_bounce) * 100.0 / sum(sign), 2) AS bounce_rate, + sum(sign) AS sessions + FROM ${TABLE_NAMES.sessions} + WHERE project_id = ${sqlstring.escape(projectId)} + AND created_at BETWEEN toDateTime(${sqlstring.escape(formatClickhouseDate(startDate))}) + AND toDateTime(${sqlstring.escape(formatClickhouseDate(endDate))}) + AND id IN (SELECT session_id FROM distinct_sessions) + ${sessionsWhere ? `AND ${sessionsWhere}` : ''} + GROUP BY ${mode}_origin, ${mode}_path + HAVING sum(sign) > 0 + ORDER BY sessions DESC + LIMIT ${limit} OFFSET ${offset} + `; + + return chQuery<{ origin: string; path: string; avg_duration: number; bounce_rate: number; sessions: number; - }>([ - `${mode}_origin AS origin`, - `${mode}_path AS path`, - 'round(avg(duration * sign)/1000, 2) as avg_duration', - 'round(sum(sign * is_bounce) * 100.0 / sum(sign), 2) as bounce_rate', - 'sum(sign) as sessions', - ]) - .from(TABLE_NAMES.sessions, true) - .where('project_id', '=', projectId) - .where('created_at', 'BETWEEN', [ - clix.datetime(startDate, 'toDateTime'), - clix.datetime(endDate, 'toDateTime'), - ]) - .rawWhere(where) - .groupBy([`${mode}_origin`, `${mode}_path`]) - .having('sum(sign)', '>', 0) - .orderBy('sessions', 'DESC') - .limit(limit) - .offset(offset); - - let mainQuery = query; - - if (this.isPageFilter(filters)) { - mainQuery = clix(this.client, timezone) - .with('distinct_sessions', distinctSessionQuery) - .merge(query) - .where( - 'id', - 'IN', - clix.exp('(SELECT session_id FROM distinct_sessions)'), - ); + }>(sql); } - const totalSessions = await this.getTotalSessions({ - projectId, - startDate, - endDate, - filters, - timezone, - }); - - return mainQuery.execute(); - } - - private getDistinctSessions({ - projectId, - filters, - startDate, - endDate, - timezone, - }: { - projectId: string; - filters: IChartEventFilter[]; - startDate: string; - endDate: string; - timezone: string; - }) { - return clix(this.client, timezone) - .select(['DISTINCT session_id']) - .from(TABLE_NAMES.events) - .where('project_id', '=', projectId) - .where('created_at', 'BETWEEN', [ - clix.datetime(startDate, 'toDateTime'), - clix.datetime(endDate, 'toDateTime'), - ]) - .rawWhere(this.getRawWhereClause('events', filters)); + // WITHOUT PAGE FILTER: direct query on sessions table + const sql = ` + SELECT + ${mode}_origin AS origin, + ${mode}_path AS path, + round(avg(duration * sign) / 1000, 2) AS avg_duration, + round(sum(sign * is_bounce) * 100.0 / sum(sign), 2) AS bounce_rate, + sum(sign) AS sessions + FROM ${TABLE_NAMES.sessions} + WHERE project_id = ${sqlstring.escape(projectId)} + AND created_at BETWEEN toDateTime(${sqlstring.escape(formatClickhouseDate(startDate))}) + AND toDateTime(${sqlstring.escape(formatClickhouseDate(endDate))}) + ${sessionsWhere ? `AND ${sessionsWhere}` : ''} + GROUP BY ${mode}_origin, ${mode}_path + HAVING sum(sign) > 0 + ORDER BY sessions DESC + LIMIT ${limit} OFFSET ${offset} + `; + + return chQuery<{ + origin: string; + path: string; + avg_duration: number; + bounce_rate: number; + sessions: number; + }>(sql); } + /** + * Get top generic dimensions (referrers, geo, devices, etc). + * + * Optimization: + * - WITHOUT page filters: Query sessions table directly + * - WITH page filters: Join with events to filter sessions + */ async getTopGeneric({ projectId, filters, @@ -595,13 +595,9 @@ export class OverviewService { limit = 10, timezone, }: IGetTopGenericInput) { - const distinctSessionQuery = this.getDistinctSessions({ - projectId, - filters, - startDate, - endDate, - timezone, - }); + const hasPageFilter = this.isPageFilter(filters); + const sessionsWhere = this.buildWhereClause('sessions', filters); + const offset = (cursor - 1) * limit; const prefixColumn = (() => { switch (column) { @@ -617,61 +613,75 @@ export class OverviewService { return null; })(); - const offset = (cursor - 1) * limit; - - const query = clix(this.client, timezone) - .select<{ + const selectPrefix = prefixColumn ? `${prefixColumn} AS prefix,` : ''; + const groupByPrefix = prefixColumn ? `${prefixColumn},` : ''; + + if (hasPageFilter) { + // WITH PAGE FILTER: restrict to sessions that have matching events + const eventsWhere = this.buildWhereClause('events', filters); + + const sql = ` + WITH distinct_sessions AS ( + SELECT DISTINCT session_id + FROM ${TABLE_NAMES.events} + WHERE project_id = ${sqlstring.escape(projectId)} + AND created_at BETWEEN toDateTime(${sqlstring.escape(formatClickhouseDate(startDate))}) + AND toDateTime(${sqlstring.escape(formatClickhouseDate(endDate))}) + ${eventsWhere ? `AND ${eventsWhere}` : ''} + ) + SELECT + ${selectPrefix} + nullIf(${column}, '') AS name, + sum(sign) AS sessions, + round(sum(sign * is_bounce) * 100.0 / sum(sign), 2) AS bounce_rate, + round(avgIf(duration, duration > 0 AND sign > 0) / 1000, 2) AS avg_session_duration + FROM ${TABLE_NAMES.sessions} + WHERE project_id = ${sqlstring.escape(projectId)} + AND created_at BETWEEN toDateTime(${sqlstring.escape(formatClickhouseDate(startDate))}) + AND toDateTime(${sqlstring.escape(formatClickhouseDate(endDate))}) + AND id IN (SELECT session_id FROM distinct_sessions) + ${sessionsWhere ? `AND ${sessionsWhere}` : ''} + GROUP BY ${groupByPrefix} ${column} + HAVING sum(sign) > 0 + ORDER BY sessions DESC + LIMIT ${limit} OFFSET ${offset} + `; + + return chQuery<{ prefix?: string; name: string; sessions: number; bounce_rate: number; avg_session_duration: number; - }>([ - prefixColumn && `${prefixColumn} as prefix`, - `nullIf(${column}, '') as name`, - 'sum(sign) as sessions', - 'round(sum(sign * is_bounce) * 100.0 / sum(sign), 2) AS bounce_rate', - 'round(avgIf(duration, duration > 0 AND sign > 0), 2)/1000 AS avg_session_duration', - ]) - .from(TABLE_NAMES.sessions, true) - .where('project_id', '=', projectId) - .where('created_at', 'BETWEEN', [ - clix.datetime(startDate, 'toDateTime'), - clix.datetime(endDate, 'toDateTime'), - ]) - .groupBy([prefixColumn, column].filter(Boolean)) - .having('sum(sign)', '>', 0) - .orderBy('sessions', 'DESC') - .limit(limit) - .offset(offset); - - let mainQuery = query; - - if (this.isPageFilter(filters)) { - mainQuery = clix(this.client, timezone) - .with('distinct_sessions', distinctSessionQuery) - .merge(query) - .where( - 'id', - 'IN', - clix.exp('(SELECT session_id FROM distinct_sessions)'), - ); - } else { - mainQuery.rawWhere(this.getRawWhereClause('sessions', filters)); + }>(sql); } - const [res, totalSessions] = await Promise.all([ - mainQuery.execute(), - this.getTotalSessions({ - projectId, - startDate, - endDate, - filters, - timezone, - }), - ]); - - return res; + // WITHOUT PAGE FILTER: direct query on sessions table + const sql = ` + SELECT + ${selectPrefix} + nullIf(${column}, '') AS name, + sum(sign) AS sessions, + round(sum(sign * is_bounce) * 100.0 / sum(sign), 2) AS bounce_rate, + round(avgIf(duration, duration > 0 AND sign > 0) / 1000, 2) AS avg_session_duration + FROM ${TABLE_NAMES.sessions} + WHERE project_id = ${sqlstring.escape(projectId)} + AND created_at BETWEEN toDateTime(${sqlstring.escape(formatClickhouseDate(startDate))}) + AND toDateTime(${sqlstring.escape(formatClickhouseDate(endDate))}) + ${sessionsWhere ? `AND ${sessionsWhere}` : ''} + GROUP BY ${groupByPrefix} ${column} + HAVING sum(sign) > 0 + ORDER BY sessions DESC + LIMIT ${limit} OFFSET ${offset} + `; + + return chQuery<{ + prefix?: string; + name: string; + sessions: number; + bounce_rate: number; + avg_session_duration: number; + }>(sql); } } diff --git a/packages/db/src/services/profile.service.ts b/packages/db/src/services/profile.service.ts index c9a49bf2..faf6b4b7 100644 --- a/packages/db/src/services/profile.service.ts +++ b/packages/db/src/services/profile.service.ts @@ -13,6 +13,7 @@ import { formatClickhouseDate, } from '../clickhouse/client'; import { createSqlBuilder } from '../sql-builder'; +import { getDurationSql } from './event.service'; export type IProfileMetrics = { lastSeen: string; @@ -39,15 +40,15 @@ export function getProfileMetrics(profileId: string, projectId: string) { screenViews AS ( SELECT count(*) as screenViews FROM ${TABLE_NAMES.events} WHERE name = 'screen_view' AND profile_id = ${sqlstring.escape(profileId)} AND project_id = ${sqlstring.escape(projectId)} ), - sessions AS ( + sessionsCount AS ( SELECT count(*) as sessions FROM ${TABLE_NAMES.events} WHERE name = 'session_start' AND profile_id = ${sqlstring.escape(profileId)} AND project_id = ${sqlstring.escape(projectId)} ), - duration AS ( + sessionDuration AS ( SELECT - round(avg(duration) / 1000 / 60, 2) as durationAvg, - round(quantilesExactInclusive(0.9)(duration)[1] / 1000 / 60, 2) as durationP90 - FROM ${TABLE_NAMES.events} - WHERE name = 'session_end' AND duration != 0 AND profile_id = ${sqlstring.escape(profileId)} AND project_id = ${sqlstring.escape(projectId)} + avg(duration) / 1000 as durationAvg, + quantilesExactInclusive(0.9)(duration)[1] / 1000 as durationP90 + FROM ${TABLE_NAMES.sessions} + WHERE profile_id = ${sqlstring.escape(profileId)} AND project_id = ${sqlstring.escape(projectId)} ), totalEvents AS ( SELECT count(*) as totalEvents FROM ${TABLE_NAMES.events} WHERE profile_id = ${sqlstring.escape(profileId)} AND project_id = ${sqlstring.escape(projectId)} @@ -59,7 +60,7 @@ export function getProfileMetrics(profileId: string, projectId: string) { SELECT round(avg(properties['__bounce'] = '1') * 100, 4) as bounceRate FROM ${TABLE_NAMES.events} WHERE name = 'session_end' AND profile_id = ${sqlstring.escape(profileId)} AND project_id = ${sqlstring.escape(projectId)} ), avgEventsPerSession AS ( - SELECT round((SELECT totalEvents FROM totalEvents) / nullIf((SELECT sessions FROM sessions), 0), 2) as avgEventsPerSession + SELECT round((SELECT totalEvents FROM totalEvents) / nullIf((SELECT sessions FROM sessionsCount), 0), 2) as avgEventsPerSession ), conversionEvents AS ( SELECT count(*) as conversionEvents FROM ${TABLE_NAMES.events} WHERE name NOT IN ('screen_view', 'session_start', 'session_end') AND profile_id = ${sqlstring.escape(profileId)} AND project_id = ${sqlstring.escape(projectId)} @@ -67,17 +68,17 @@ export function getProfileMetrics(profileId: string, projectId: string) { avgTimeBetweenSessions AS ( SELECT CASE - WHEN (SELECT sessions FROM sessions) <= 1 THEN 0 - ELSE round(dateDiff('second', (SELECT firstSeen FROM firstSeen), (SELECT lastSeen FROM lastSeen)) / nullIf((SELECT sessions FROM sessions) - 1, 0), 1) + WHEN (SELECT sessions FROM sessionsCount) <= 1 THEN 0 + ELSE round(dateDiff('second', (SELECT firstSeen FROM firstSeen), (SELECT lastSeen FROM lastSeen)) / nullIf((SELECT sessions FROM sessionsCount) - 1, 0), 1) END as avgTimeBetweenSessions ) SELECT (SELECT lastSeen FROM lastSeen) as lastSeen, (SELECT firstSeen FROM firstSeen) as firstSeen, (SELECT screenViews FROM screenViews) as screenViews, - (SELECT sessions FROM sessions) as sessions, - (SELECT durationAvg FROM duration) as durationAvg, - (SELECT durationP90 FROM duration) as durationP90, + (SELECT sessions FROM sessionsCount) as sessions, + (SELECT durationAvg FROM sessionDuration) as durationAvg, + (SELECT durationP90 FROM sessionDuration) as durationP90, (SELECT totalEvents FROM totalEvents) as totalEvents, (SELECT uniqueDaysActive FROM uniqueDaysActive) as uniqueDaysActive, (SELECT bounceRate FROM bounceRate) as bounceRate, diff --git a/packages/trpc/src/routers/chart.ts b/packages/trpc/src/routers/chart.ts index 6fb30572..899ac669 100644 --- a/packages/trpc/src/routers/chart.ts +++ b/packages/trpc/src/routers/chart.ts @@ -212,7 +212,6 @@ export const chartRouter = createTRPCRouter({ 'origin', 'referrer', 'referrer_name', - 'duration', 'created_at', 'country', 'city', diff --git a/packages/trpc/src/routers/realtime.ts b/packages/trpc/src/routers/realtime.ts index 336f541e..8035f49a 100644 --- a/packages/trpc/src/routers/realtime.ts +++ b/packages/trpc/src/routers/realtime.ts @@ -8,6 +8,7 @@ import { clix, db, formatClickhouseDate, + getDurationSql, getEventList, } from '@openpanel/db'; @@ -66,7 +67,7 @@ export const realtimeRouter = createTRPCRouter({ 'origin', 'path', 'COUNT(*) as count', - 'round(avg(duration)/1000, 2) as avg_duration', + `round(avg(${getDurationSql()})/1000, 2) as avg_duration`, ]) .from(TABLE_NAMES.events) .where('project_id', '=', input.projectId) @@ -94,7 +95,7 @@ export const realtimeRouter = createTRPCRouter({ }>([ 'referrer_name', 'COUNT(*) as count', - 'round(avg(duration)/1000, 2) as avg_duration', + `round(avg(${getDurationSql()})/1000, 2) as avg_duration`, ]) .from(TABLE_NAMES.events) .where('project_id', '=', input.projectId) @@ -124,7 +125,7 @@ export const realtimeRouter = createTRPCRouter({ 'country', 'city', 'COUNT(*) as count', - 'round(avg(duration)/1000, 2) as avg_duration', + `round(avg(${getDurationSql()})/1000, 2) as avg_duration`, ]) .from(TABLE_NAMES.events) .where('project_id', '=', input.projectId)