Skip to content

Commit 08837c8

Browse files
committed
fix #1650 - add support for Buffer in some commands, add GET_BUFFER command
1 parent 1413a69 commit 08837c8

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

65 files changed

+300
-227
lines changed

lib/client.spec.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -195,6 +195,13 @@ describe('Client', () => {
195195
assert.equal(await client.sendCommand(['PING']), 'PONG');
196196
});
197197

198+
itWithClient(TestRedisServers.OPEN, 'bufferMode', async client => {
199+
assert.deepEqual(
200+
await client.sendCommand(['PING'], undefined, true),
201+
Buffer.from('PONG')
202+
);
203+
});
204+
198205
describe('AbortController', () => {
199206
before(function () {
200207
if (!global.AbortController) {

lib/client.ts

Lines changed: 40 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import RedisSocket, { RedisSocketOptions } from './socket';
22
import RedisCommandsQueue, { PubSubListener, PubSubSubscribeCommands, PubSubUnsubscribeCommands, QueueCommandOptions } from './commands-queue';
3-
import COMMANDS from './commands';
3+
import COMMANDS, { TransformArgumentsReply } from './commands';
44
import { RedisCommand, RedisModules, RedisReply } from './commands';
55
import RedisMultiCommand, { MultiQueuedCommand, RedisMultiCommandType } from './multi-command';
66
import EventEmitter from 'events';
@@ -62,12 +62,10 @@ export default class RedisClient<M extends RedisModules = RedisModules, S extend
6262
): Promise<ReturnType<typeof command['transformReply']>> {
6363
const { args: redisArgs, options } = transformCommandArguments<ClientCommandOptions>(command, args);
6464

65-
const reply = command.transformReply(
66-
await this.#sendCommand(redisArgs, options),
67-
redisArgs.preserve
65+
return command.transformReply(
66+
await this.#sendCommand(redisArgs, options, command.BUFFER_MODE),
67+
redisArgs.preserve,
6868
);
69-
70-
return reply;
7169
}
7270

7371
static async #scriptsExecutor(
@@ -77,12 +75,10 @@ export default class RedisClient<M extends RedisModules = RedisModules, S extend
7775
): Promise<typeof script['transformArguments']> {
7876
const { args: redisArgs, options } = transformCommandArguments<ClientCommandOptions>(script, args);
7977

80-
const reply = script.transformReply(
81-
await this.executeScript(script, redisArgs, options),
78+
return script.transformReply(
79+
await this.executeScript(script, redisArgs, options, script.BUFFER_MODE),
8280
redisArgs.preserve
8381
);
84-
85-
return reply;
8682
}
8783

8884
static create<M extends RedisModules, S extends RedisLuaScripts>(options?: RedisClientOptions<M, S>): RedisClientType<M, S> {
@@ -182,10 +178,7 @@ export default class RedisClient<M extends RedisModules = RedisModules, S extend
182178
}
183179

184180
#initiateQueue(): RedisCommandsQueue {
185-
return new RedisCommandsQueue(
186-
this.#options?.commandsQueueMaxLength,
187-
encodedCommands => this.#socket.write(encodedCommands)
188-
);
181+
return new RedisCommandsQueue(this.#options?.commandsQueueMaxLength);
189182
}
190183

191184
#legacyMode(): void {
@@ -299,54 +292,72 @@ export default class RedisClient<M extends RedisModules = RedisModules, S extend
299292

300293
QUIT(): Promise<void> {
301294
return this.#socket.quit(() => {
302-
const promise = this.#queue.addEncodedCommand(encodeCommand(['QUIT']));
295+
const promise = this.#queue.addCommand(['QUIT']);
303296
this.#tick();
304297
return promise;
305298
});
306299
}
307300

308301
quit = this.QUIT;
309302

310-
sendCommand<T = unknown>(args: Array<string>, options?: ClientCommandOptions): Promise<T> {
311-
return this.#sendCommand(args, options);
303+
sendCommand<T = RedisReply>(args: TransformArgumentsReply, options?: ClientCommandOptions, bufferMode?: boolean): Promise<T> {
304+
return this.#sendCommand(args, options, bufferMode);
312305
}
313306

314307
// using `#sendCommand` cause `sendCommand` is overwritten in legacy mode
315-
#sendCommand<T = RedisReply>(args: Array<string>, options?: ClientCommandOptions): Promise<T> {
316-
return this.sendEncodedCommand(encodeCommand(args), options);
317-
}
318-
319-
async sendEncodedCommand<T = RedisReply>(encodedCommand: string, options?: ClientCommandOptions): Promise<T> {
308+
async #sendCommand<T = RedisReply>(args: TransformArgumentsReply, options?: ClientCommandOptions, bufferMode?: boolean): Promise<T> {
320309
if (!this.#socket.isOpen) {
321310
throw new ClientClosedError();
322311
}
323312

324313
if (options?.isolated) {
325314
return this.executeIsolated(isolatedClient =>
326-
isolatedClient.sendEncodedCommand(encodedCommand, {
315+
isolatedClient.sendCommand(args, {
327316
...options,
328317
isolated: false
329318
})
330319
);
331320
}
332321

333-
const promise = this.#queue.addEncodedCommand<T>(encodedCommand, options);
322+
const promise = this.#queue.addCommand<T>(args, options, bufferMode);
334323
this.#tick();
335324
return await promise;
336325
}
337326

327+
#tick(): void {
328+
if (!this.#socket.isSocketExists) {
329+
return;
330+
}
331+
332+
this.#socket.cork();
333+
334+
while (true) {
335+
const args = this.#queue.getCommandToSend();
336+
if (args === undefined) break;
337+
338+
let writeResult;
339+
for (const toWrite of encodeCommand(args)) {
340+
writeResult = this.#socket.write(toWrite);
341+
}
342+
343+
if (!writeResult) {
344+
break;
345+
}
346+
}
347+
}
348+
338349
executeIsolated<T>(fn: (client: RedisClientType<M, S>) => T | Promise<T>): Promise<T> {
339350
return this.#isolationPool.use(fn);
340351
}
341352

342-
async executeScript(script: RedisLuaScript, args: Array<string>, options?: ClientCommandOptions): Promise<ReturnType<typeof script['transformReply']>> {
353+
async executeScript(script: RedisLuaScript, args: TransformArgumentsReply, options?: ClientCommandOptions, bufferMode?: boolean): Promise<ReturnType<typeof script['transformReply']>> {
343354
try {
344355
return await this.#sendCommand([
345356
'EVALSHA',
346357
script.SHA1,
347358
script.NUMBER_OF_KEYS.toString(),
348359
...args
349-
], options);
360+
], options, bufferMode);
350361
} catch (err: any) {
351362
if (!err?.message?.startsWith?.('NOSCRIPT')) {
352363
throw err;
@@ -357,14 +368,14 @@ export default class RedisClient<M extends RedisModules = RedisModules, S extend
357368
script.SCRIPT,
358369
script.NUMBER_OF_KEYS.toString(),
359370
...args
360-
], options);
371+
], options, bufferMode);
361372
}
362373
}
363374

364375
#multiExecutor(commands: Array<MultiQueuedCommand>, chainId?: symbol): Promise<Array<RedisReply>> {
365376
const promise = Promise.all(
366-
commands.map(({encodedCommand}) => {
367-
return this.#queue.addEncodedCommand(encodedCommand, RedisClient.commandOptions({
377+
commands.map(({ args }) => {
378+
return this.#queue.addCommand(args, RedisClient.commandOptions({
368379
chainId
369380
}));
370381
})
@@ -438,31 +449,6 @@ export default class RedisClient<M extends RedisModules = RedisModules, S extend
438449
await this.#isolationPool.drain();
439450
await this.#isolationPool.clear();
440451
}
441-
442-
#isTickQueued = false;
443-
444-
#tick(): void {
445-
const {chunkRecommendedSize} = this.#socket;
446-
if (!chunkRecommendedSize) {
447-
return;
448-
}
449-
450-
if (!this.#isTickQueued && this.#queue.waitingToBeSentCommandsLength < chunkRecommendedSize) {
451-
queueMicrotask(() => this.#tick());
452-
this.#isTickQueued = true;
453-
return;
454-
}
455-
456-
const isBuffering = this.#queue.executeChunk(chunkRecommendedSize);
457-
if (isBuffering === true) {
458-
this.#socket.once('drain', () => this.#tick());
459-
} else if (isBuffering === false) {
460-
this.#tick();
461-
return;
462-
}
463-
464-
this.#isTickQueued = false;
465-
}
466452
}
467453

468454
extendWithDefaultCommands(RedisClient, RedisClient.commandsExecutor);

lib/cluster-slots.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,7 @@ export default class RedisClusterSlots<M extends RedisModules, S extends RedisLu
172172
return value.client;
173173
}
174174

175-
getClient(firstKey?: string, isReadonly?: boolean): RedisClientType<M, S> {
175+
getClient(firstKey?: string | Buffer, isReadonly?: boolean): RedisClientType<M, S> {
176176
if (!firstKey) {
177177
return this.#getRandomClient();
178178
}

lib/cluster.ts

Lines changed: 16 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
1-
import { RedisCommand, RedisModules } from './commands';
1+
import { RedisCommand, RedisModules, TransformArgumentsReply } from './commands';
22
import RedisClient, { ClientCommandOptions, RedisClientType, WithPlugins } from './client';
33
import { RedisSocketOptions } from './socket';
44
import RedisClusterSlots, { ClusterNode } from './cluster-slots';
55
import { RedisLuaScript, RedisLuaScripts } from './lua-script';
66
import { extendWithModulesAndScripts, extendWithDefaultCommands, transformCommandArguments } from './commander';
77
import RedisMultiCommand, { MultiQueuedCommand, RedisMultiCommandType } from './multi-command';
88
import { EventEmitter } from 'events';
9+
import cluster from 'cluster';
910

1011
export interface RedisClusterOptions<M = RedisModules, S = RedisLuaScripts> {
1112
rootNodes: Array<RedisSocketOptions>;
@@ -19,7 +20,7 @@ export type RedisClusterType<M extends RedisModules, S extends RedisLuaScripts>
1920
WithPlugins<M, S> & RedisCluster;
2021

2122
export default class RedisCluster<M extends RedisModules = RedisModules, S extends RedisLuaScripts = RedisLuaScripts> extends EventEmitter {
22-
static #extractFirstKey(command: RedisCommand, originalArgs: Array<unknown>, redisArgs: Array<string>): string | undefined {
23+
static #extractFirstKey(command: RedisCommand, originalArgs: Array<unknown>, redisArgs: TransformArgumentsReply): string | Buffer | undefined {
2324
if (command.FIRST_KEY_INDEX === undefined) {
2425
return undefined;
2526
} else if (typeof command.FIRST_KEY_INDEX === 'number') {
@@ -41,7 +42,8 @@ export default class RedisCluster<M extends RedisModules = RedisModules, S exten
4142
RedisCluster.#extractFirstKey(command, args, redisArgs),
4243
command.IS_READ_ONLY,
4344
redisArgs,
44-
options
45+
options,
46+
command.BUFFER_MODE
4547
),
4648
redisArgs.preserve
4749
);
@@ -100,22 +102,23 @@ export default class RedisCluster<M extends RedisModules = RedisModules, S exten
100102
}
101103

102104
async sendCommand<C extends RedisCommand>(
103-
firstKey: string | undefined,
105+
firstKey: string | Buffer | undefined,
104106
isReadonly: boolean | undefined,
105-
args: Array<string>,
107+
args: TransformArgumentsReply,
106108
options?: ClientCommandOptions,
109+
bufferMode?: boolean,
107110
redirections = 0
108111
): Promise<ReturnType<C['transformReply']>> {
109112
const client = this.#slots.getClient(firstKey, isReadonly);
110113

111114
try {
112-
return await client.sendCommand(args, options);
115+
return await client.sendCommand(args, options, bufferMode);
113116
} catch (err: any) {
114117
const shouldRetry = await this.#handleCommandError(err, client, redirections);
115118
if (shouldRetry === true) {
116-
return this.sendCommand(firstKey, isReadonly, args, options, redirections + 1);
119+
return this.sendCommand(firstKey, isReadonly, args, options, bufferMode, redirections + 1);
117120
} else if (shouldRetry) {
118-
return shouldRetry.sendCommand(args, options);
121+
return shouldRetry.sendCommand(args, options, bufferMode);
119122
}
120123

121124
throw err;
@@ -125,7 +128,7 @@ export default class RedisCluster<M extends RedisModules = RedisModules, S exten
125128
async executeScript(
126129
script: RedisLuaScript,
127130
originalArgs: Array<unknown>,
128-
redisArgs: Array<string>,
131+
redisArgs: TransformArgumentsReply,
129132
options?: ClientCommandOptions,
130133
redirections = 0
131134
): Promise<ReturnType<typeof script['transformReply']>> {
@@ -135,13 +138,13 @@ export default class RedisCluster<M extends RedisModules = RedisModules, S exten
135138
);
136139

137140
try {
138-
return await client.executeScript(script, redisArgs, options);
141+
return await client.executeScript(script, redisArgs, options, script.BUFFER_MODE);
139142
} catch (err: any) {
140143
const shouldRetry = await this.#handleCommandError(err, client, redirections);
141144
if (shouldRetry === true) {
142145
return this.executeScript(script, originalArgs, redisArgs, options, redirections + 1);
143146
} else if (shouldRetry) {
144-
return shouldRetry.executeScript(script, redisArgs, options);
147+
return shouldRetry.executeScript(script, redisArgs, options, script.BUFFER_MODE);
145148
}
146149

147150
throw err;
@@ -181,8 +184,8 @@ export default class RedisCluster<M extends RedisModules = RedisModules, S exten
181184
const client = this.#slots.getClient(routing);
182185

183186
return Promise.all(
184-
commands.map(({encodedCommand}) => {
185-
return client.sendEncodedCommand(encodedCommand, RedisClient.commandOptions({
187+
commands.map(({ args }) => {
188+
return client.sendCommand(args, RedisClient.commandOptions({
186189
chainId
187190
}));
188191
})

lib/commander.spec.ts

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,27 +2,43 @@ import { strict as assert } from 'assert';
22
import { describe } from 'mocha';
33
import { encodeCommand } from './commander';
44

5+
function encodeCommandToString(...args: Parameters<typeof encodeCommand>): string {
6+
const arr = [];
7+
for (const item of encodeCommand(...args)) {
8+
arr.push(item.toString());
9+
}
10+
11+
return arr.join('');
12+
}
13+
514
describe('Commander', () => {
615
describe('encodeCommand (see #1628)', () => {
716
it('1 byte', () => {
817
assert.equal(
9-
encodeCommand(['a', 'z']),
18+
encodeCommandToString(['a', 'z']),
1019
'*2\r\n$1\r\na\r\n$1\r\nz\r\n'
1120
);
1221
});
1322

1423
it('2 bytes', () => {
1524
assert.equal(
16-
encodeCommand(['א', 'ת']),
25+
encodeCommandToString(['א', 'ת']),
1726
'*2\r\n$2\r\nא\r\n$2\r\nת\r\n'
1827
);
1928
});
2029

2130
it('4 bytes', () => {
2231
assert.equal(
23-
encodeCommand(['🐣', '🐤']),
32+
encodeCommandToString(['🐣', '🐤']),
2433
'*2\r\n$4\r\n🐣\r\n$4\r\n🐤\r\n'
2534
);
2635
});
36+
37+
it('with a buffer', () => {
38+
assert.equal(
39+
encodeCommandToString([Buffer.from('string')]),
40+
'*1\r\n$6\r\nstring\r\n'
41+
);
42+
});
2743
});
2844
});

lib/commander.ts

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
import COMMANDS, { RedisCommand, RedisModules, TransformArgumentsReply } from './commands';
33
import { RedisLuaScript, RedisLuaScripts } from './lua-script';
44
import { CommandOptions, isCommandOptions } from './command-options';
5+
import { off } from 'process';
56

67
type Instantiable<T = any> = new(...args: Array<any>) => T;
78

@@ -94,16 +95,15 @@ export function transformCommandArguments<T = unknown>(
9495
};
9596
}
9697

97-
export function encodeCommand(args: Array<string>): string {
98-
const encoded = [
99-
`*${args.length}`,
100-
`$${Buffer.byteLength(args[0]).toString()}`,
101-
args[0]
102-
];
98+
const DELIMITER = '\r\n';
10399

104-
for (let i = 1; i < args.length; i++) {
105-
encoded.push(`$${Buffer.byteLength(args[i]).toString()}`, args[i]);
106-
}
100+
export function* encodeCommand(args: TransformArgumentsReply): IterableIterator<string | Buffer> {
101+
yield `*${args.length}${DELIMITER}`;
107102

108-
return encoded.join('\r\n') + '\r\n';
103+
for (const arg of args) {
104+
const byteLength = typeof arg === 'string' ? Buffer.byteLength(arg): arg.length;
105+
yield `$${byteLength.toString()}${DELIMITER}`;
106+
yield arg;
107+
yield DELIMITER;
108+
}
109109
}

0 commit comments

Comments
 (0)