@@ -39,9 +39,29 @@ export enum PubSubUnsubscribeCommands {
39
39
PUNSUBSCRIBE = 'PUNSUBSCRIBE'
40
40
}
41
41
42
- export type PubSubListener = ( message : string , channel : string ) => unknown ;
42
+ type PubSubArgumentTypes = Buffer | string ;
43
43
44
- export type PubSubListenersMap = Map < string , Set < PubSubListener > > ;
44
+ export type PubSubListener <
45
+ BUFFER_MODE extends boolean = false ,
46
+ T = BUFFER_MODE extends true ? Buffer : string
47
+ > = ( message : T , channel : T ) => unknown ;
48
+
49
+ interface PubSubListeners {
50
+ buffers : Set < PubSubListener < true > > ;
51
+ strings : Set < PubSubListener < false > > ;
52
+ }
53
+
54
+ type PubSubListenersMap = Map < string , PubSubListeners > ;
55
+
56
+ interface PubSubState {
57
+ subscribing : number ;
58
+ subscribed : number ;
59
+ unsubscribing : number ;
60
+ listeners : {
61
+ channels : PubSubListenersMap ;
62
+ patterns : PubSubListenersMap ;
63
+ } ;
64
+ }
45
65
46
66
export default class RedisCommandsQueue {
47
67
static #flushQueue< T extends CommandWaitingForReply > ( queue : LinkedList < T > , err : Error ) : void {
@@ -50,10 +70,20 @@ export default class RedisCommandsQueue {
50
70
}
51
71
}
52
72
53
- static #emitPubSubMessage( listeners : Set < PubSubListener > , message : string , channel : string ) : void {
54
- for ( const listener of listeners ) {
73
+ static #emitPubSubMessage( listenersMap : PubSubListenersMap , message : Buffer , channel : Buffer , pattern ?: Buffer ) : void {
74
+ const keyString = ( pattern || channel ) . toString ( ) ,
75
+ listeners = listenersMap . get ( keyString ) ! ;
76
+ for ( const listener of listeners . buffers ) {
55
77
listener ( message , channel ) ;
56
78
}
79
+
80
+ if ( ! listeners . strings . size ) return ;
81
+
82
+ const messageString = message . toString ( ) ,
83
+ channelString = pattern ? channel . toString ( ) : keyString ;
84
+ for ( const listener of listeners . strings ) {
85
+ listener ( messageString , channelString ) ;
86
+ }
57
87
}
58
88
59
89
readonly #maxLength: number | null | undefined ;
@@ -62,41 +92,43 @@ export default class RedisCommandsQueue {
62
92
63
93
readonly #waitingForReply = new LinkedList < CommandWaitingForReply > ( ) ;
64
94
65
- readonly #pubSubState = {
66
- subscribing : 0 ,
67
- subscribed : 0 ,
68
- unsubscribing : 0
69
- } ;
95
+ #pubSubState: PubSubState | undefined ;
70
96
71
- readonly #pubSubListeners = {
72
- channels : < PubSubListenersMap > new Map ( ) ,
73
- patterns : < PubSubListenersMap > new Map ( )
97
+ static readonly #PUB_SUB_MESSAGES = {
98
+ message : Buffer . from ( 'message' ) ,
99
+ pMessage : Buffer . from ( 'pmessage' ) ,
100
+ subscribe : Buffer . from ( 'subscribe' ) ,
101
+ pSubscribe : Buffer . from ( 'psubscribe' ) ,
102
+ unsubscribe : Buffer . from ( 'unsunscribe' ) ,
103
+ pUnsubscribe : Buffer . from ( 'punsubscribe' )
74
104
} ;
75
105
76
106
readonly #parser = new RedisParser ( {
77
107
returnReply : ( reply : unknown ) => {
78
- if ( ( this . #pubSubState. subscribing || this . #pubSubState. subscribed ) && Array . isArray ( reply ) ) {
79
- switch ( reply [ 0 ] ) {
80
- case 'message' :
81
- return RedisCommandsQueue . #emitPubSubMessage(
82
- this . #pubSubListeners. channels . get ( reply [ 1 ] ) ! ,
83
- reply [ 2 ] ,
84
- reply [ 1 ]
85
- ) ;
86
-
87
- case 'pmessage' :
88
- return RedisCommandsQueue . #emitPubSubMessage(
89
- this . #pubSubListeners. patterns . get ( reply [ 1 ] ) ! ,
90
- reply [ 3 ] ,
91
- reply [ 2 ]
92
- ) ;
93
-
94
- case 'subscribe' :
95
- case 'psubscribe' :
96
- if ( -- this . #waitingForReply. head ! . value . channelsCounter ! === 0 ) {
97
- this . #shiftWaitingForReply( ) . resolve ( ) ;
98
- }
99
- return ;
108
+ if ( this . #pubSubState && Array . isArray ( reply ) ) {
109
+ if ( RedisCommandsQueue . #PUB_SUB_MESSAGES. message . equals ( reply [ 0 ] ) ) {
110
+ return RedisCommandsQueue . #emitPubSubMessage(
111
+ this . #pubSubState. listeners . channels ,
112
+ reply [ 2 ] ,
113
+ reply [ 1 ]
114
+ ) ;
115
+ } else if ( RedisCommandsQueue . #PUB_SUB_MESSAGES. pMessage . equals ( reply [ 0 ] ) ) {
116
+ return RedisCommandsQueue . #emitPubSubMessage(
117
+ this . #pubSubState. listeners . patterns ,
118
+ reply [ 3 ] ,
119
+ reply [ 2 ] ,
120
+ reply [ 1 ]
121
+ ) ;
122
+ } else if (
123
+ RedisCommandsQueue . #PUB_SUB_MESSAGES. subscribe . equals ( reply [ 0 ] ) ||
124
+ RedisCommandsQueue . #PUB_SUB_MESSAGES. pSubscribe . equals ( reply [ 0 ] ) ||
125
+ RedisCommandsQueue . #PUB_SUB_MESSAGES. unsubscribe . equals ( reply [ 0 ] ) ||
126
+ RedisCommandsQueue . #PUB_SUB_MESSAGES. pUnsubscribe . equals ( reply [ 0 ] )
127
+ ) {
128
+ if ( -- this . #waitingForReply. head ! . value . channelsCounter ! === 0 ) {
129
+ this . #shiftWaitingForReply( ) . resolve ( ) ;
130
+ }
131
+ return ;
100
132
}
101
133
}
102
134
@@ -112,7 +144,7 @@ export default class RedisCommandsQueue {
112
144
}
113
145
114
146
addCommand < T = RedisCommandRawReply > ( args : RedisCommandArguments , options ?: QueueCommandOptions , bufferMode ?: boolean ) : Promise < T > {
115
- if ( this . #pubSubState. subscribing || this . #pubSubState . subscribed ) {
147
+ if ( this . #pubSubState) {
116
148
return Promise . reject ( new Error ( 'Cannot send commands in PubSub mode' ) ) ;
117
149
} else if ( this . #maxLength && this . #waitingToBeSent. length + this . #waitingForReply. length >= this . #maxLength) {
118
150
return Promise . reject ( new Error ( 'The queue is full' ) ) ;
@@ -126,7 +158,7 @@ export default class RedisCommandsQueue {
126
158
chainId : options ?. chainId ,
127
159
bufferMode,
128
160
resolve,
129
- reject,
161
+ reject
130
162
} ) ;
131
163
132
164
if ( options ?. signal ) {
@@ -153,17 +185,41 @@ export default class RedisCommandsQueue {
153
185
} ) ;
154
186
}
155
187
156
- subscribe ( command : PubSubSubscribeCommands , channels : string | Array < string > , listener : PubSubListener ) : Promise < void > {
157
- const channelsToSubscribe : Array < string > = [ ] ,
158
- listeners = command === PubSubSubscribeCommands . SUBSCRIBE ? this . #pubSubListeners. channels : this . #pubSubListeners. patterns ;
188
+ #initiatePubSubState( ) : PubSubState {
189
+ return this . #pubSubState ??= {
190
+ subscribed : 0 ,
191
+ subscribing : 0 ,
192
+ unsubscribing : 0 ,
193
+ listeners : {
194
+ channels : new Map ( ) ,
195
+ patterns : new Map ( )
196
+ }
197
+ } ;
198
+ }
199
+
200
+ subscribe < T extends boolean > (
201
+ command : PubSubSubscribeCommands ,
202
+ channels : PubSubArgumentTypes | Array < PubSubArgumentTypes > ,
203
+ listener : PubSubListener < T > ,
204
+ bufferMode ?: T
205
+ ) : Promise < void > {
206
+ const pubSubState = this . #initiatePubSubState( ) ,
207
+ channelsToSubscribe : Array < PubSubArgumentTypes > = [ ] ,
208
+ listenersMap = command === PubSubSubscribeCommands . SUBSCRIBE ? pubSubState . listeners . channels : pubSubState . listeners . patterns ;
159
209
for ( const channel of ( Array . isArray ( channels ) ? channels : [ channels ] ) ) {
160
- if ( listeners . has ( channel ) ) {
161
- listeners . get ( channel ) ! . add ( listener ) ;
162
- continue ;
210
+ const channelString = typeof channel === 'string' ? channel : channel . toString ( ) ;
211
+ let listeners = listenersMap . get ( channelString ) ;
212
+ if ( ! listeners ) {
213
+ listeners = {
214
+ buffers : new Set ( ) ,
215
+ strings : new Set ( )
216
+ } ;
217
+ listenersMap . set ( channelString , listeners ) ;
218
+ channelsToSubscribe . push ( channel ) ;
163
219
}
164
220
165
- listeners . set ( channel , new Set ( [ listener ] ) ) ;
166
- channelsToSubscribe . push ( channel ) ;
221
+ // https://github.com/microsoft/TypeScript/issues/23132
222
+ ( bufferMode ? listeners . buffers : listeners . strings ) . add ( listener as any ) ;
167
223
}
168
224
169
225
if ( ! channelsToSubscribe . length ) {
@@ -173,8 +229,20 @@ export default class RedisCommandsQueue {
173
229
return this . #pushPubSubCommand( command , channelsToSubscribe ) ;
174
230
}
175
231
176
- unsubscribe ( command : PubSubUnsubscribeCommands , channels ?: string | Array < string > , listener ?: PubSubListener ) : Promise < void > {
177
- const listeners = command === PubSubUnsubscribeCommands . UNSUBSCRIBE ? this . #pubSubListeners. channels : this . #pubSubListeners. patterns ;
232
+ unsubscribe < T extends boolean > (
233
+ command : PubSubUnsubscribeCommands ,
234
+ channels ?: string | Array < string > ,
235
+ listener ?: PubSubListener < T > ,
236
+ bufferMode ?: T
237
+ ) : Promise < void > {
238
+ if ( ! this . #pubSubState) {
239
+ return Promise . resolve ( ) ;
240
+ }
241
+
242
+ const listeners = command === PubSubUnsubscribeCommands . UNSUBSCRIBE ?
243
+ this . #pubSubState. listeners . channels :
244
+ this . #pubSubState. listeners . patterns ;
245
+
178
246
if ( ! channels ) {
179
247
const size = listeners . size ;
180
248
listeners . clear ( ) ;
@@ -183,13 +251,16 @@ export default class RedisCommandsQueue {
183
251
184
252
const channelsToUnsubscribe = [ ] ;
185
253
for ( const channel of ( Array . isArray ( channels ) ? channels : [ channels ] ) ) {
186
- const set = listeners . get ( channel ) ;
187
- if ( ! set ) continue ;
254
+ const sets = listeners . get ( channel ) ;
255
+ if ( ! sets ) continue ;
188
256
189
- let shouldUnsubscribe = ! listener ;
257
+ let shouldUnsubscribe ;
190
258
if ( listener ) {
191
- set . delete ( listener ) ;
192
- shouldUnsubscribe = set . size === 0 ;
259
+ // https://github.com/microsoft/TypeScript/issues/23132
260
+ ( bufferMode ? sets . buffers : sets . strings ) . delete ( listener as any ) ;
261
+ shouldUnsubscribe = ! sets . buffers . size && ! sets . strings . size ;
262
+ } else {
263
+ shouldUnsubscribe = true ;
193
264
}
194
265
195
266
if ( shouldUnsubscribe ) {
@@ -205,11 +276,12 @@ export default class RedisCommandsQueue {
205
276
return this . #pushPubSubCommand( command , channelsToUnsubscribe ) ;
206
277
}
207
278
208
- #pushPubSubCommand( command : PubSubSubscribeCommands | PubSubUnsubscribeCommands , channels : number | Array < string > ) : Promise < void > {
279
+ #pushPubSubCommand( command : PubSubSubscribeCommands | PubSubUnsubscribeCommands , channels : number | Array < PubSubArgumentTypes > ) : Promise < void > {
209
280
return new Promise ( ( resolve , reject ) => {
210
- const isSubscribe = command === PubSubSubscribeCommands . SUBSCRIBE || command === PubSubSubscribeCommands . PSUBSCRIBE ,
281
+ const pubSubState = this . #initiatePubSubState( ) ,
282
+ isSubscribe = command === PubSubSubscribeCommands . SUBSCRIBE || command === PubSubSubscribeCommands . PSUBSCRIBE ,
211
283
inProgressKey = isSubscribe ? 'subscribing' : 'unsubscribing' ,
212
- commandArgs : Array < string > = [ command ] ;
284
+ commandArgs : Array < PubSubArgumentTypes > = [ command ] ;
213
285
214
286
let channelsCounter : number ;
215
287
if ( typeof channels === 'number' ) { // unsubscribe only
@@ -219,35 +291,41 @@ export default class RedisCommandsQueue {
219
291
channelsCounter = channels . length ;
220
292
}
221
293
222
- this . # pubSubState[ inProgressKey ] += channelsCounter ;
294
+ pubSubState [ inProgressKey ] += channelsCounter ;
223
295
224
296
this . #waitingToBeSent. push ( {
225
297
args : commandArgs ,
226
298
channelsCounter,
299
+ bufferMode : true ,
227
300
resolve : ( ) => {
228
- this . #pubSubState[ inProgressKey ] -= channelsCounter ;
229
- this . #pubSubState. subscribed += channelsCounter * ( isSubscribe ? 1 : - 1 ) ;
301
+ pubSubState [ inProgressKey ] -= channelsCounter ;
302
+ if ( isSubscribe ) {
303
+ pubSubState . subscribed += channelsCounter ;
304
+ } else {
305
+ pubSubState . subscribed -= channelsCounter ;
306
+ if ( ! pubSubState . subscribed && ! pubSubState . subscribing && ! pubSubState . subscribed ) {
307
+ this . #pubSubState = undefined ;
308
+ }
309
+ }
230
310
resolve ( ) ;
231
311
} ,
232
312
reject : ( ) => {
233
- this . # pubSubState[ inProgressKey ] -= channelsCounter ;
313
+ pubSubState [ inProgressKey ] -= channelsCounter * ( isSubscribe ? 1 : - 1 ) ;
234
314
reject ( ) ;
235
315
}
236
316
} ) ;
237
317
} ) ;
238
318
}
239
319
240
320
resubscribe ( ) : Promise < any > | undefined {
241
- if ( ! this . #pubSubState. subscribed && ! this . #pubSubState . subscribing ) {
321
+ if ( ! this . #pubSubState) {
242
322
return ;
243
323
}
244
324
245
- this . #pubSubState. subscribed = this . #pubSubState. subscribing = 0 ;
246
-
247
325
// TODO: acl error on one channel/pattern will reject the whole command
248
326
return Promise . all ( [
249
- this . #pushPubSubCommand( PubSubSubscribeCommands . SUBSCRIBE , [ ...this . #pubSubListeners . channels . keys ( ) ] ) ,
250
- this . #pushPubSubCommand( PubSubSubscribeCommands . PSUBSCRIBE , [ ...this . #pubSubListeners . patterns . keys ( ) ] )
327
+ this . #pushPubSubCommand( PubSubSubscribeCommands . SUBSCRIBE , [ ...this . #pubSubState . listeners . channels . keys ( ) ] ) ,
328
+ this . #pushPubSubCommand( PubSubSubscribeCommands . PSUBSCRIBE , [ ...this . #pubSubState . listeners . patterns . keys ( ) ] )
251
329
] ) ;
252
330
}
253
331
@@ -269,7 +347,10 @@ export default class RedisCommandsQueue {
269
347
}
270
348
271
349
parseResponse ( data : Buffer ) : void {
272
- this . #parser. setReturnBuffers ( ! ! this . #waitingForReply. head ?. value . bufferMode ) ;
350
+ this . #parser. setReturnBuffers (
351
+ ! ! this . #waitingForReply. head ?. value . bufferMode ||
352
+ ! ! this . #pubSubState?. subscribed
353
+ ) ;
273
354
this . #parser. execute ( data ) ;
274
355
}
275
356
0 commit comments