Skip to content

Commit 82920ae

Browse files
committed
fix #1764 - fix PubSub resubscribe
1 parent 7202f35 commit 82920ae

File tree

2 files changed

+119
-67
lines changed

2 files changed

+119
-67
lines changed

packages/client/lib/client/commands-queue.ts

Lines changed: 28 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -294,9 +294,9 @@ export default class RedisCommandsQueue {
294294
}
295295
resolve();
296296
},
297-
reject: () => {
297+
reject: err => {
298298
pubSubState[inProgressKey] -= channelsCounter * (isSubscribe ? 1 : -1);
299-
reject();
299+
reject(err);
300300
}
301301
});
302302
});
@@ -307,11 +307,32 @@ export default class RedisCommandsQueue {
307307
return;
308308
}
309309

310-
// TODO: acl error on one channel/pattern will reject the whole command
311-
return Promise.all([
312-
this.#pushPubSubCommand(PubSubSubscribeCommands.SUBSCRIBE, [...this.#pubSubState.listeners.channels.keys()]),
313-
this.#pushPubSubCommand(PubSubSubscribeCommands.PSUBSCRIBE, [...this.#pubSubState.listeners.patterns.keys()])
314-
]);
310+
this.#pubSubState.subscribed = 0;
311+
312+
const promises = [],
313+
{ channels, patterns } = this.#pubSubState.listeners;
314+
315+
if (channels.size) {
316+
promises.push(
317+
this.#pushPubSubCommand(
318+
PubSubSubscribeCommands.SUBSCRIBE,
319+
[...channels.keys()]
320+
)
321+
);
322+
}
323+
324+
if (patterns.size) {
325+
promises.push(
326+
this.#pushPubSubCommand(
327+
PubSubSubscribeCommands.PSUBSCRIBE,
328+
[...patterns.keys()]
329+
)
330+
);
331+
}
332+
333+
if (promises.length) {
334+
return Promise.all(promises);
335+
}
315336
}
316337

317338
getCommandToSend(): RedisCommandArguments | undefined {

packages/client/lib/client/index.spec.ts

Lines changed: 91 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -560,73 +560,104 @@ describe('Client', () => {
560560
);
561561
}, GLOBAL.SERVERS.OPEN);
562562

563-
testUtils.testWithClient('PubSub', async publisher => {
564-
function assertStringListener(message: string, channel: string) {
565-
assert.ok(typeof message === 'string');
566-
assert.ok(typeof channel === 'string');
567-
}
563+
describe('PubSub', () => {
564+
testUtils.testWithClient('should be able to publish and subscribe to messages', async publisher => {
565+
function assertStringListener(message: string, channel: string) {
566+
assert.ok(typeof message === 'string');
567+
assert.ok(typeof channel === 'string');
568+
}
568569

569-
function assertBufferListener(message: Buffer, channel: Buffer) {
570-
assert.ok(Buffer.isBuffer(message));
571-
assert.ok(Buffer.isBuffer(channel));
572-
}
570+
function assertBufferListener(message: Buffer, channel: Buffer) {
571+
assert.ok(Buffer.isBuffer(message));
572+
assert.ok(Buffer.isBuffer(channel));
573+
}
573574

574-
const subscriber = publisher.duplicate();
575+
const subscriber = publisher.duplicate();
576+
577+
await subscriber.connect();
578+
579+
try {
580+
const channelListener1 = spy(assertBufferListener),
581+
channelListener2 = spy(assertStringListener),
582+
patternListener = spy(assertStringListener);
583+
584+
await Promise.all([
585+
subscriber.subscribe('channel', channelListener1, true),
586+
subscriber.subscribe('channel', channelListener2),
587+
subscriber.pSubscribe('channel*', patternListener)
588+
]);
589+
await Promise.all([
590+
waitTillBeenCalled(channelListener1),
591+
waitTillBeenCalled(channelListener2),
592+
waitTillBeenCalled(patternListener),
593+
publisher.publish(Buffer.from('channel'), Buffer.from('message'))
594+
]);
595+
596+
assert.ok(channelListener1.calledOnceWithExactly(Buffer.from('message'), Buffer.from('channel')));
597+
assert.ok(channelListener2.calledOnceWithExactly('message', 'channel'));
598+
assert.ok(patternListener.calledOnceWithExactly('message', 'channel'));
599+
600+
await subscriber.unsubscribe('channel', channelListener1, true);
601+
await Promise.all([
602+
waitTillBeenCalled(channelListener2),
603+
waitTillBeenCalled(patternListener),
604+
publisher.publish('channel', 'message')
605+
]);
606+
assert.ok(channelListener1.calledOnce);
607+
assert.ok(channelListener2.calledTwice);
608+
assert.ok(channelListener2.secondCall.calledWithExactly('message', 'channel'));
609+
assert.ok(patternListener.calledTwice);
610+
assert.ok(patternListener.secondCall.calledWithExactly('message', 'channel'));
611+
await subscriber.unsubscribe('channel');
612+
await Promise.all([
613+
waitTillBeenCalled(patternListener),
614+
publisher.publish('channel', 'message')
615+
]);
616+
assert.ok(channelListener1.calledOnce);
617+
assert.ok(channelListener2.calledTwice);
618+
assert.ok(patternListener.calledThrice);
619+
assert.ok(patternListener.thirdCall.calledWithExactly('message', 'channel'));
620+
await subscriber.pUnsubscribe();
621+
await publisher.publish('channel', 'message');
622+
assert.ok(channelListener1.calledOnce);
623+
assert.ok(channelListener2.calledTwice);
624+
assert.ok(patternListener.calledThrice);
625+
// should be able to send commands when unsubsribed from all channels (see #1652)
626+
await assert.doesNotReject(subscriber.ping());
627+
} finally {
628+
await subscriber.disconnect();
629+
}
630+
}, GLOBAL.SERVERS.OPEN);
575631

576-
await subscriber.connect();
632+
testUtils.testWithClient('should resubscribe', async publisher => {
633+
const subscriber = publisher.duplicate();
577634

578-
try {
579-
const channelListener1 = spy(assertBufferListener),
580-
channelListener2 = spy(assertStringListener),
581-
patternListener = spy(assertStringListener);
635+
await subscriber.connect();
582636

583-
await Promise.all([
584-
subscriber.subscribe('channel', channelListener1, true),
585-
subscriber.subscribe('channel', channelListener2),
586-
subscriber.pSubscribe('channel*', patternListener)
587-
]);
588-
await Promise.all([
589-
waitTillBeenCalled(channelListener1),
590-
waitTillBeenCalled(channelListener2),
591-
waitTillBeenCalled(patternListener),
592-
publisher.publish(Buffer.from('channel'), Buffer.from('message'))
593-
]);
637+
try {
638+
const listener = spy();
639+
await subscriber.subscribe('channel', listener);
640+
641+
subscriber.on('error', err => {
642+
console.error('subscriber err', err.message);
643+
});
594644

595-
assert.ok(channelListener1.calledOnceWithExactly(Buffer.from('message'), Buffer.from('channel')));
596-
assert.ok(channelListener2.calledOnceWithExactly('message', 'channel'));
597-
assert.ok(patternListener.calledOnceWithExactly('message', 'channel'));
645+
await Promise.all([
646+
once(subscriber, 'error'),
647+
publisher.sendCommand(['CLIENT', 'KILL', 'SKIPME', 'yes'])
648+
]);
598649

599-
await subscriber.unsubscribe('channel', channelListener1, true);
600-
await Promise.all([
601-
waitTillBeenCalled(channelListener2),
602-
waitTillBeenCalled(patternListener),
603-
publisher.publish('channel', 'message')
604-
]);
605-
assert.ok(channelListener1.calledOnce);
606-
assert.ok(channelListener2.calledTwice);
607-
assert.ok(channelListener2.secondCall.calledWithExactly('message', 'channel'));
608-
assert.ok(patternListener.calledTwice);
609-
assert.ok(patternListener.secondCall.calledWithExactly('message', 'channel'));
610-
await subscriber.unsubscribe('channel');
611-
await Promise.all([
612-
waitTillBeenCalled(patternListener),
613-
publisher.publish('channel', 'message')
614-
]);
615-
assert.ok(channelListener1.calledOnce);
616-
assert.ok(channelListener2.calledTwice);
617-
assert.ok(patternListener.calledThrice);
618-
assert.ok(patternListener.thirdCall.calledWithExactly('message', 'channel'));
619-
await subscriber.pUnsubscribe();
620-
await publisher.publish('channel', 'message');
621-
assert.ok(channelListener1.calledOnce);
622-
assert.ok(channelListener2.calledTwice);
623-
assert.ok(patternListener.calledThrice);
624-
// should be able to send commands when unsubsribed from all channels (see #1652)
625-
await assert.doesNotReject(subscriber.ping());
626-
} finally {
627-
await subscriber.disconnect();
628-
}
629-
}, GLOBAL.SERVERS.OPEN);
650+
await once(subscriber, 'ready');
651+
652+
await Promise.all([
653+
waitTillBeenCalled(listener),
654+
publisher.publish('channel', 'message')
655+
]);
656+
} finally {
657+
await subscriber.disconnect();
658+
}
659+
}, GLOBAL.SERVERS.OPEN);
660+
});
630661

631662
testUtils.testWithClient('ConnectionTimeoutError', async client => {
632663
const promise = assert.rejects(client.connect(), ConnectionTimeoutError),

0 commit comments

Comments
 (0)