Skip to content

Commit 23b6513

Browse files
authored
New RESP2 parser (#1899)
* parser * a new RESP parser :) * clean code * fix simple string and bulk string cursor * performance improvements * change typescript compiler target * do not use stream.Transform * Update decoder.ts * fix for 1d09acb * improve integer performance * revert 1d09acb * improve RESP2 decoder performance * improve performance * improve encode performance * remove unused import * upgrade benchmark deps * clean code * fix socket error handlers, reset parser on error * fix #2080 - reset pubSubState on socket error * reset decoder on socket error * fix pubsub * fix "RedisSocketInitiator" * fix returnStringsAsBuffers * fix merge
1 parent b1a0b48 commit 23b6513

21 files changed

+703
-157
lines changed

benchmark/package-lock.json

Lines changed: 14 additions & 7 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
import { strict as assert } from 'assert';
2+
import BufferComposer from './buffer';
3+
4+
describe('Buffer Composer', () => {
5+
const composer = new BufferComposer();
6+
7+
it('should compose two buffers', () => {
8+
composer.write(Buffer.from([0]));
9+
assert.deepEqual(
10+
composer.end(Buffer.from([1])),
11+
Buffer.from([0, 1])
12+
);
13+
});
14+
});
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
import { Composer } from './interface';
2+
3+
export default class BufferComposer implements Composer<Buffer> {
4+
private chunks: Array<Buffer> = [];
5+
6+
write(buffer: Buffer): void {
7+
this.chunks.push(buffer);
8+
}
9+
10+
end(buffer: Buffer): Buffer {
11+
this.write(buffer);
12+
return Buffer.concat(this.chunks.splice(0));
13+
}
14+
15+
reset() {
16+
this.chunks = [];
17+
}
18+
}
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
export interface Composer<T> {
2+
write(buffer: Buffer): void;
3+
4+
end(buffer: Buffer): T;
5+
6+
reset(): void;
7+
}
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
import { strict as assert } from 'assert';
2+
import StringComposer from './string';
3+
4+
describe('String Composer', () => {
5+
const composer = new StringComposer();
6+
7+
it('should compose two strings', () => {
8+
composer.write(Buffer.from([0]));
9+
assert.deepEqual(
10+
composer.end(Buffer.from([1])),
11+
Buffer.from([0, 1]).toString()
12+
);
13+
});
14+
});
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
import { StringDecoder } from 'string_decoder';
2+
import { Composer } from './interface';
3+
4+
export default class StringComposer implements Composer<string> {
5+
private decoder = new StringDecoder();
6+
7+
private string = '';
8+
9+
write(buffer: Buffer): void {
10+
this.string += this.decoder.write(buffer);
11+
}
12+
13+
end(buffer: Buffer): string {
14+
const string = this.string + this.decoder.end(buffer);
15+
this.string = '';
16+
return string;
17+
}
18+
19+
reset() {
20+
this.string = '';
21+
}
22+
}
Lines changed: 195 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,195 @@
1+
import { strict as assert } from 'assert';
2+
import { SinonSpy, spy } from 'sinon';
3+
import RESP2Decoder from './decoder';
4+
import { ErrorReply } from '../../errors';
5+
6+
interface DecoderAndSpies {
7+
decoder: RESP2Decoder;
8+
returnStringsAsBuffersSpy: SinonSpy;
9+
onReplySpy: SinonSpy;
10+
}
11+
12+
function createDecoderAndSpies(returnStringsAsBuffers: boolean): DecoderAndSpies {
13+
const returnStringsAsBuffersSpy = spy(() => returnStringsAsBuffers),
14+
onReplySpy = spy();
15+
16+
return {
17+
decoder: new RESP2Decoder({
18+
returnStringsAsBuffers: returnStringsAsBuffersSpy,
19+
onReply: onReplySpy
20+
}),
21+
returnStringsAsBuffersSpy,
22+
onReplySpy
23+
};
24+
}
25+
26+
function writeChunks(stream: RESP2Decoder, buffer: Buffer) {
27+
let i = 0;
28+
while (i < buffer.length) {
29+
stream.write(buffer.slice(i, ++i));
30+
}
31+
}
32+
33+
type Replies = Array<Array<unknown>>;
34+
35+
interface TestsOptions {
36+
toWrite: Buffer;
37+
returnStringsAsBuffers: boolean;
38+
replies: Replies;
39+
}
40+
41+
function generateTests({
42+
toWrite,
43+
returnStringsAsBuffers,
44+
replies
45+
}: TestsOptions): void {
46+
it('single chunk', () => {
47+
const { decoder, returnStringsAsBuffersSpy, onReplySpy } =
48+
createDecoderAndSpies(returnStringsAsBuffers);
49+
decoder.write(toWrite);
50+
assert.equal(returnStringsAsBuffersSpy.callCount, replies.length);
51+
testReplies(onReplySpy, replies);
52+
});
53+
54+
it('multiple chunks', () => {
55+
const { decoder, returnStringsAsBuffersSpy, onReplySpy } =
56+
createDecoderAndSpies(returnStringsAsBuffers);
57+
writeChunks(decoder, toWrite);
58+
assert.equal(returnStringsAsBuffersSpy.callCount, replies.length);
59+
testReplies(onReplySpy, replies);
60+
});
61+
}
62+
63+
function testReplies(spy: SinonSpy, replies: Replies): void {
64+
if (!replies) {
65+
assert.equal(spy.callCount, 0);
66+
return;
67+
}
68+
69+
assert.equal(spy.callCount, replies.length);
70+
for (const [i, reply] of replies.entries()) {
71+
assert.deepEqual(
72+
spy.getCall(i).args,
73+
reply
74+
);
75+
}
76+
}
77+
78+
describe('RESP2Parser', () => {
79+
describe('Simple String', () => {
80+
describe('as strings', () => {
81+
generateTests({
82+
toWrite: Buffer.from('+OK\r\n'),
83+
returnStringsAsBuffers: false,
84+
replies: [['OK']]
85+
});
86+
});
87+
88+
describe('as buffers', () => {
89+
generateTests({
90+
toWrite: Buffer.from('+OK\r\n'),
91+
returnStringsAsBuffers: true,
92+
replies: [[Buffer.from('OK')]]
93+
});
94+
});
95+
});
96+
97+
describe('Error', () => {
98+
generateTests({
99+
toWrite: Buffer.from('-ERR\r\n'),
100+
returnStringsAsBuffers: false,
101+
replies: [[new ErrorReply('ERR')]]
102+
});
103+
});
104+
105+
describe('Integer', () => {
106+
describe('-1', () => {
107+
generateTests({
108+
toWrite: Buffer.from(':-1\r\n'),
109+
returnStringsAsBuffers: false,
110+
replies: [[-1]]
111+
});
112+
});
113+
114+
describe('0', () => {
115+
generateTests({
116+
toWrite: Buffer.from(':0\r\n'),
117+
returnStringsAsBuffers: false,
118+
replies: [[0]]
119+
});
120+
});
121+
});
122+
123+
describe('Bulk String', () => {
124+
describe('null', () => {
125+
generateTests({
126+
toWrite: Buffer.from('$-1\r\n'),
127+
returnStringsAsBuffers: false,
128+
replies: [[null]]
129+
});
130+
});
131+
132+
describe('as strings', () => {
133+
generateTests({
134+
toWrite: Buffer.from('$2\r\naa\r\n'),
135+
returnStringsAsBuffers: false,
136+
replies: [['aa']]
137+
});
138+
});
139+
140+
describe('as buffers', () => {
141+
generateTests({
142+
toWrite: Buffer.from('$2\r\naa\r\n'),
143+
returnStringsAsBuffers: true,
144+
replies: [[Buffer.from('aa')]]
145+
});
146+
});
147+
});
148+
149+
describe('Array', () => {
150+
describe('null', () => {
151+
generateTests({
152+
toWrite: Buffer.from('*-1\r\n'),
153+
returnStringsAsBuffers: false,
154+
replies: [[null]]
155+
});
156+
});
157+
158+
const arrayBuffer = Buffer.from(
159+
'*5\r\n' +
160+
'+OK\r\n' +
161+
'-ERR\r\n' +
162+
':0\r\n' +
163+
'$1\r\na\r\n' +
164+
'*0\r\n'
165+
);
166+
167+
describe('as strings', () => {
168+
generateTests({
169+
toWrite: arrayBuffer,
170+
returnStringsAsBuffers: false,
171+
replies: [[[
172+
'OK',
173+
new ErrorReply('ERR'),
174+
0,
175+
'a',
176+
[]
177+
]]]
178+
});
179+
});
180+
181+
describe('as buffers', () => {
182+
generateTests({
183+
toWrite: arrayBuffer,
184+
returnStringsAsBuffers: true,
185+
replies: [[[
186+
Buffer.from('OK'),
187+
new ErrorReply('ERR'),
188+
0,
189+
Buffer.from('a'),
190+
[]
191+
]]]
192+
});
193+
});
194+
});
195+
});

0 commit comments

Comments
 (0)