Skip to content

Commit f3d7083

Browse files
stainless-botRobertCraigie
authored andcommitted
fix: correctly decode multi-byte characters over multiple chunks
1 parent a40d359 commit f3d7083

File tree

3 files changed

+126
-40
lines changed

3 files changed

+126
-40
lines changed

src/internal/decoders/line.ts

Lines changed: 69 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -13,56 +13,62 @@ export class LineDecoder {
1313
static NEWLINE_CHARS = new Set(['\n', '\r']);
1414
static NEWLINE_REGEXP = /\r\n|[\n\r]/g;
1515

16-
buffer: string[];
17-
trailingCR: boolean;
16+
buffer: Uint8Array;
17+
#carriageReturnIndex: number | null;
1818
textDecoder:
1919
| undefined
2020
| {
2121
decode(buffer: Uint8Array | ArrayBuffer): string;
2222
};
2323

2424
constructor() {
25-
this.buffer = [];
26-
this.trailingCR = false;
25+
this.buffer = new Uint8Array();
26+
this.#carriageReturnIndex = null;
2727
}
2828

2929
decode(chunk: Bytes): string[] {
30-
let text = this.decodeText(chunk);
31-
32-
if (this.trailingCR) {
33-
text = '\r' + text;
34-
this.trailingCR = false;
35-
}
36-
if (text.endsWith('\r')) {
37-
this.trailingCR = true;
38-
text = text.slice(0, -1);
39-
}
40-
41-
if (!text) {
30+
if (chunk == null) {
4231
return [];
4332
}
4433

45-
const trailingNewline = LineDecoder.NEWLINE_CHARS.has(text[text.length - 1] || '');
46-
let lines = text.split(LineDecoder.NEWLINE_REGEXP);
34+
const binaryChunk =
35+
chunk instanceof ArrayBuffer ? new Uint8Array(chunk)
36+
: typeof chunk === 'string' ? new TextEncoder().encode(chunk)
37+
: chunk;
38+
39+
let newData = new Uint8Array(this.buffer.length + binaryChunk.length);
40+
newData.set(this.buffer);
41+
newData.set(binaryChunk, this.buffer.length);
42+
this.buffer = newData;
43+
44+
const lines: string[] = [];
45+
let patternIndex;
46+
while ((patternIndex = findNewlineIndex(this.buffer, this.#carriageReturnIndex)) != null) {
47+
if (patternIndex.carriage && this.#carriageReturnIndex == null) {
48+
// skip until we either get a corresponding `\n`, a new `\r` or nothing
49+
this.#carriageReturnIndex = patternIndex.index;
50+
continue;
51+
}
4752

48-
// if there is a trailing new line then the last entry will be an empty
49-
// string which we don't care about
50-
if (trailingNewline) {
51-
lines.pop();
52-
}
53+
// we got double \r or \rtext\n
54+
if (
55+
this.#carriageReturnIndex != null &&
56+
(patternIndex.index !== this.#carriageReturnIndex + 1 || patternIndex.carriage)
57+
) {
58+
lines.push(this.decodeText(this.buffer.slice(0, this.#carriageReturnIndex - 1)));
59+
this.buffer = this.buffer.slice(this.#carriageReturnIndex);
60+
this.#carriageReturnIndex = null;
61+
continue;
62+
}
5363

54-
if (lines.length === 1 && !trailingNewline) {
55-
this.buffer.push(lines[0]!);
56-
return [];
57-
}
64+
const endIndex =
65+
this.#carriageReturnIndex !== null ? patternIndex.preceding - 1 : patternIndex.preceding;
5866

59-
if (this.buffer.length > 0) {
60-
lines = [this.buffer.join('') + lines[0], ...lines.slice(1)];
61-
this.buffer = [];
62-
}
67+
const line = this.decodeText(this.buffer.slice(0, endIndex));
68+
lines.push(line);
6369

64-
if (!trailingNewline) {
65-
this.buffer = [lines.pop() || ''];
70+
this.buffer = this.buffer.slice(patternIndex.index);
71+
this.#carriageReturnIndex = null;
6672
}
6773

6874
return lines;
@@ -106,13 +112,38 @@ export class LineDecoder {
106112
}
107113

108114
flush(): string[] {
109-
if (!this.buffer.length && !this.trailingCR) {
115+
if (!this.buffer.length) {
110116
return [];
111117
}
118+
return this.decode('\n');
119+
}
120+
}
112121

113-
const lines = [this.buffer.join('')];
114-
this.buffer = [];
115-
this.trailingCR = false;
116-
return lines;
122+
/**
123+
* This function searches the buffer for the end patterns, (\r or \n)
124+
* and returns an object with the index preceding the matched newline and the
125+
* index after the newline char. `null` is returned if no new line is found.
126+
*
127+
* ```ts
128+
* findNewLineIndex('abc\ndef') -> { preceding: 2, index: 3 }
129+
* ```
130+
*/
131+
function findNewlineIndex(
132+
buffer: Uint8Array,
133+
startIndex: number | null,
134+
): { preceding: number; index: number; carriage: boolean } | null {
135+
const newline = 0x0a; // \n
136+
const carriage = 0x0d; // \r
137+
138+
for (let i = startIndex ?? 0; i < buffer.length; i++) {
139+
if (buffer[i] === newline) {
140+
return { preceding: i, index: i + 1, carriage: false };
141+
}
142+
143+
if (buffer[i] === carriage) {
144+
return { preceding: i, index: i + 1, carriage: true };
145+
}
117146
}
147+
148+
return null;
118149
}

src/streaming.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -358,13 +358,17 @@ class SSEDecoder {
358358
}
359359

360360
/** This is an internal helper function that's just used for testing */
361-
export function _decodeChunks(chunks: string[]): string[] {
361+
export function _decodeChunks(chunks: string[], { flush }: { flush: boolean } = { flush: false }): string[] {
362362
const decoder = new LineDecoder();
363363
const lines: string[] = [];
364364
for (const chunk of chunks) {
365365
lines.push(...decoder.decode(chunk));
366366
}
367367

368+
if (flush) {
369+
lines.push(...decoder.flush());
370+
}
371+
368372
return lines;
369373
}
370374

tests/streaming.test.ts

Lines changed: 52 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import { PassThrough } from 'stream';
22
import assert from 'assert';
33
import { _iterSSEMessages, _decodeChunks as decodeChunks } from 'openai/streaming';
4+
import { LineDecoder } from 'openai/internal/decoders/line';
45

56
describe('line decoder', () => {
67
test('basic', () => {
@@ -9,8 +10,8 @@ describe('line decoder', () => {
910
});
1011

1112
test('basic with \\r', () => {
12-
// baz is not included because the line hasn't ended yet
1313
expect(decodeChunks(['foo', ' bar\r\nbaz'])).toEqual(['foo bar']);
14+
expect(decodeChunks(['foo', ' bar\r\nbaz'], { flush: true })).toEqual(['foo bar', 'baz']);
1415
});
1516

1617
test('trailing new lines', () => {
@@ -28,6 +29,56 @@ describe('line decoder', () => {
2829
test('escaped new lines with \\r', () => {
2930
expect(decodeChunks(['foo', ' bar\\r\\nbaz\n'])).toEqual(['foo bar\\r\\nbaz']);
3031
});
32+
33+
test('\\r & \\n split across multiple chunks', () => {
34+
expect(decodeChunks(['foo\r', '\n', 'bar'], { flush: true })).toEqual(['foo', 'bar']);
35+
});
36+
37+
test('single \\r', () => {
38+
expect(decodeChunks(['foo\r', 'bar'], { flush: true })).toEqual(['foo', 'bar']);
39+
});
40+
41+
test('double \\r', () => {
42+
expect(decodeChunks(['foo\r', 'bar\r'], { flush: true })).toEqual(['foo', 'bar']);
43+
expect(decodeChunks(['foo\r', '\r', 'bar'], { flush: true })).toEqual(['foo', '', 'bar']);
44+
// implementation detail that we don't yield the single \r line until a new \r or \n is encountered
45+
expect(decodeChunks(['foo\r', '\r', 'bar'], { flush: false })).toEqual(['foo']);
46+
});
47+
48+
test('double \\r then \\r\\n', () => {
49+
expect(decodeChunks(['foo\r', '\r', '\r', '\n', 'bar', '\n'])).toEqual(['foo', '', '', 'bar']);
50+
expect(decodeChunks(['foo\n', '\n', '\n', 'bar', '\n'])).toEqual(['foo', '', '', 'bar']);
51+
});
52+
53+
test('double newline', () => {
54+
expect(decodeChunks(['foo\n\nbar'], { flush: true })).toEqual(['foo', '', 'bar']);
55+
expect(decodeChunks(['foo', '\n', '\nbar'], { flush: true })).toEqual(['foo', '', 'bar']);
56+
expect(decodeChunks(['foo\n', '\n', 'bar'], { flush: true })).toEqual(['foo', '', 'bar']);
57+
expect(decodeChunks(['foo', '\n', '\n', 'bar'], { flush: true })).toEqual(['foo', '', 'bar']);
58+
});
59+
60+
test('multi-byte characters across chunks', () => {
61+
const decoder = new LineDecoder();
62+
63+
// bytes taken from the string 'известни' and arbitrarily split
64+
// so that some multi-byte characters span multiple chunks
65+
expect(decoder.decode(new Uint8Array([0xd0]))).toHaveLength(0);
66+
expect(decoder.decode(new Uint8Array([0xb8, 0xd0, 0xb7, 0xd0]))).toHaveLength(0);
67+
expect(
68+
decoder.decode(new Uint8Array([0xb2, 0xd0, 0xb5, 0xd1, 0x81, 0xd1, 0x82, 0xd0, 0xbd, 0xd0, 0xb8])),
69+
).toHaveLength(0);
70+
71+
const decoded = decoder.decode(new Uint8Array([0xa]));
72+
expect(decoded).toEqual(['известни']);
73+
});
74+
75+
test('flushing trailing newlines', () => {
76+
expect(decodeChunks(['foo\n', '\nbar'], { flush: true })).toEqual(['foo', '', 'bar']);
77+
});
78+
79+
test('flushing empty buffer', () => {
80+
expect(decodeChunks([], { flush: true })).toEqual([]);
81+
});
3182
});
3283

3384
describe('streaming decoding', () => {

0 commit comments

Comments
 (0)