Skip to content

Commit a6a7c46

Browse files
committed
Implement s2 chunk deduping
1 parent 93927c8 commit a6a7c46

File tree

2 files changed

+11
-2
lines changed

2 files changed

+11
-2
lines changed

packages/core/src/v3/apiClient/runStream.ts

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -259,6 +259,8 @@ export class SSEStreamSubscription implements StreamSubscription {
259259
// Reset retry count on successful connection
260260
this.retryCount = 0;
261261

262+
const seenIds = new Set<string>();
263+
262264
const stream = response.body
263265
.pipeThrough(new TextDecoderStream())
264266
.pipeThrough(new EventSourceParserStream())
@@ -287,9 +289,15 @@ export class SSEStreamSubscription implements StreamSubscription {
287289
for (const record of data.records) {
288290
this.lastEventId = record.seq_num.toString();
289291

292+
const parsedBody = safeParseJSON(record.body) as { data: unknown; id: string };
293+
if (seenIds.has(parsedBody.id)) {
294+
continue;
295+
}
296+
seenIds.add(parsedBody.id);
297+
290298
chunkController.enqueue({
291299
id: record.seq_num.toString(),
292-
chunk: safeParseJSON(record.body),
300+
chunk: parsedBody.data,
293301
timestamp: record.timestamp,
294302
});
295303
}

packages/core/src/v3/realtimeStreams/streamsWriterV2.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import { S2, AppendRecord, BatchTransform } from "@s2-dev/streamstore";
22
import { StreamsWriter } from "./types.js";
3+
import { nanoid } from "nanoid";
34

45
export type StreamsWriterV2Options<T = any> = {
56
basin: string;
@@ -140,7 +141,7 @@ export class StreamsWriterV2<T = any> implements StreamsWriter {
140141
return;
141142
}
142143
// Convert each chunk to JSON string and wrap in AppendRecord
143-
controller.enqueue(AppendRecord.make(JSON.stringify(chunk)));
144+
controller.enqueue(AppendRecord.make(JSON.stringify({ data: chunk, id: nanoid(7) })));
144145
},
145146
})
146147
)

0 commit comments

Comments
 (0)