@@ -282,109 +282,109 @@ export default class RedisClusterSlots<
282282 this . pubSubNode ?. client . _pause ( ) ;
283283 }
284284
285- // const destinationAddress = `${event.destination. host}:${ event.destination.port}`;
286- // let destinationNode = this.nodeByAddress.get(destinationAddress) ;
287- // let destinationShard: Shard<M, F, S, RESP, TYPE_MAPPING> ;
288-
289- // // 2. Create new Master
290- // // TODO create new pubsubnode if needed
291- // if(!destinationNode) {
292- // const promises: Promise<unknown>[] = [];
293- // destinationNode = this.#initiateSlotNode({ host: event.destination. host, port: event.destination. port, id: 'asdff' }, false, true, new Set(), promises);
294- // await Promise.all(promises);
295- // // 2.1 Pause
296- // destinationNode.client?._pause();
297- // // In case destination node didnt exist, this means Shard didnt exist as well, so creating a new Shard is completely fine
298- // destinationShard = {
299- // master: destinationNode
300- // };
301- // } else {
302- // // In case destination node existed, this means there was a Shard already, so its best if we can find it.
303- // const existingShard = this.slots.find(shard => shard.master.host === event.destination. host && shard.master.port === event.destination. port);
304- // if(!existingShard) {
305- // dbgMaintenance("Could not find shard");
306- // throw new Error('Could not find shard');
307- // }
308- // destinationShard = existingShard;
309- // }
310-
311- // // 3. Soft update shards.
312- // // After this step we are expecting any new commands that hash to the same slots to be routed to the destinationShard
313- // const movingSlots = new Set<number>();
314- // for(const range of event.ranges ) {
315- // if(typeof range === 'number') {
316- // this.slots[range] = destinationShard;
317- // movingSlots.add(range)
318- // } else {
319- // for (let slot = range[0]; slot <= range[1]; slot++) {
320- // this.slots[slot] = destinationShard;
321- // movingSlots.add(slot)
322- // }
323- // }
324- // }
325-
326- // // 4. For all affected clients (normal, pubsub, spubsub):
327- // // 4.1 Wait for inflight commands to complete
328- // const inflightPromises: Promise<void>[] = [];
329- // //Normal
330- // inflightPromises.push(sourceNode.client!._getQueue().waitForInflightCommandsToComplete());
331- // //Sharded pubsub
332- // if('pubSub' in sourceNode) {
333- // inflightPromises.push(sourceNode.pubSub!.client._getQueue().waitForInflightCommandsToComplete());
334- // }
335- // //Regular pubsub
336- // if (this.pubSubNode?.address === sourceAddress) {
337- // inflightPromises.push(this.pubSubNode?.client._getQueue().waitForInflightCommandsToComplete());
338- // }
339- // await Promise.all(inflightPromises);
340-
341-
342- // // 4.2 Extract commands, channels, sharded channels
343- // // TODO dont forget to extract channels and resubscribe
344- // const sourceStillHasSlots = this.slots.find(slot => slot.master.address === sourceAddress) !== undefined;
345- // if(sourceStillHasSlots) {
346- // const normalCommandsToMove = sourceNode.client!._getQueue().extractCommandsForSlots(movingSlots);
347- // // 5. Prepend extracted commands, chans
348- // //TODO pubsub, spubsub
349- // destinationNode.client?._getQueue().prependCommandsToWrite(normalCommandsToMove);
350-
351- // //unpause source node clients
352- // sourceNode.client?._unpause();
353- // if('pubSub' in sourceNode) {
354- // sourceNode.pubSub?.client._unpause();
355- // }
356- // //TODO pubSubNode?
357- // } else {
358-
359- // const normalCommandsToMove = sourceNode.client!._getQueue().getAllCommands();
360- // // 5. Prepend extracted commands, chans
361- // destinationNode.client?._getQueue().prependCommandsToWrite(normalCommandsToMove);
362- // if('pubSub' in destinationNode) {
363- // // const pubsubListeners = destinationNode.pubSub?.client._getQueue().removePubSubListenersForSlots(movingSlots);
364- // //TODO resubscribe. Might need to throw an event for cluster to do the job
365- // }
366- // //TODO pubSubNode?
367-
368- // //Cleanup
369- // this.masters = this.masters.filter(master => master.address !== sourceAddress);
370- // //not sure if needed, since there should be no replicas in RE
371- // this.replicas = this.replicas.filter(replica => replica.address !== sourceAddress);
372- // this.nodeByAddress.delete(sourceAddress);
373- // //TODO pubSubNode?
374-
375- // // 4.3 Kill because no slots are pointing to it anymore
376- // await sourceNode.client?.close()
377- // if('pubSub' in sourceNode) {
378- // await sourceNode.pubSub?.client.close();
379- // }
380- // //TODO pubSubNode?
381- // }
382-
383- // // 5.1 Unpause
384- // destinationNode.client?._unpause();
385- // if('pubSub' in destinationNode) {
386- // destinationNode.pubSub?.client._unpause();
387- // }
285+ for ( const { host, port , slots } of event . destinations ) {
286+ const destinationAddress = ` ${ host } : ${ port } ` ;
287+ let destinationNode = this . nodeByAddress . get ( destinationAddress ) ;
288+ let destinationShard : Shard < M , F , S , RESP , TYPE_MAPPING > ;
289+ // 2. Create new Master
290+ // TODO create new pubsubnode if needed
291+ if ( ! destinationNode ) {
292+ const promises : Promise < unknown > [ ] = [ ] ;
293+ destinationNode = this . #initiateSlotNode( { host : host , port : port , id : 'asdff' } , false , true , new Set ( ) , promises ) ;
294+ await Promise . all ( promises ) ;
295+ // 2.1 Pause
296+ destinationNode . client ?. _pause ( ) ;
297+ // In case destination node didnt exist, this means Shard didnt exist as well, so creating a new Shard is completely fine
298+ destinationShard = {
299+ master : destinationNode
300+ } ;
301+ } else {
302+ // In case destination node existed, this means there was a Shard already, so its best if we can find it.
303+ const existingShard = this . slots . find ( shard => shard . master . host === host && shard . master . port === port ) ;
304+ if ( ! existingShard ) {
305+ dbgMaintenance ( "Could not find shard" ) ;
306+ throw new Error ( 'Could not find shard' ) ;
307+ }
308+ destinationShard = existingShard ;
309+ }
310+ // 3. Soft update shards.
311+ // After this step we are expecting any new commands that hash to the same slots to be routed to the destinationShard
312+ const movingSlots = new Set < number > ( ) ;
313+ for ( const slot of slots ) {
314+ if ( typeof slot === 'number' ) {
315+ this . slots [ slot ] = destinationShard ;
316+ movingSlots . add ( slot )
317+ } else {
318+ for ( let s = slot [ 0 ] ; s <= slot [ 1 ] ; s ++ ) {
319+ this . slots [ s ] = destinationShard ;
320+ movingSlots . add ( s )
321+ }
322+ }
323+ }
324+
325+ // 4. For all affected clients (normal, pubsub, spubsub):
326+ // 4.1 Wait for inflight commands to complete
327+ const inflightPromises : Promise < void > [ ] = [ ] ;
328+ //Normal
329+ inflightPromises . push ( sourceNode . client ! . _getQueue ( ) . waitForInflightCommandsToComplete ( ) ) ;
330+ //Sharded pubsub
331+ if ( 'pubSub' in sourceNode ) {
332+ inflightPromises . push ( sourceNode . pubSub ! . client . _getQueue ( ) . waitForInflightCommandsToComplete ( ) ) ;
333+ }
334+ //Regular pubsub
335+ if ( this . pubSubNode ?. address === sourceAddress ) {
336+ inflightPromises . push ( this . pubSubNode ?. client . _getQueue ( ) . waitForInflightCommandsToComplete ( ) ) ;
337+ }
338+ await Promise . all ( inflightPromises ) ;
339+
340+
341+ // 4.2 Extract commands, channels, sharded channels
342+ // TODO dont forget to extract channels and resubscribe
343+ const sourceStillHasSlots = this . slots . find ( slot => slot . master . address === sourceAddress ) !== undefined ;
344+ if ( sourceStillHasSlots ) {
345+ const normalCommandsToMove = sourceNode . client ! . _getQueue ( ) . extractCommandsForSlots ( movingSlots ) ;
346+ // 5. Prepend extracted commands, chans
347+ //TODO pubsub, spubsub
348+ destinationNode . client ?. _getQueue ( ) . prependCommandsToWrite ( normalCommandsToMove ) ;
349+
350+ //unpause source node clients
351+ sourceNode . client ?. _unpause ( ) ;
352+ if ( 'pubSub' in sourceNode ) {
353+ sourceNode . pubSub ?. client . _unpause ( ) ;
354+ }
355+ //TODO pubSubNode?
356+ } else {
357+
358+ const normalCommandsToMove = sourceNode . client ! . _getQueue ( ) . getAllCommands ( ) ;
359+ // 5. Prepend extracted commands, chans
360+ destinationNode . client ?. _getQueue ( ) . prependCommandsToWrite ( normalCommandsToMove ) ;
361+ if ( 'pubSub' in destinationNode ) {
362+ // const pubsubListeners = destinationNode.pubSub?.client._getQueue().removePubSubListenersForSlots(movingSlots);
363+ //TODO resubscribe. Might need to throw an event for cluster to do the job
364+ }
365+ //TODO pubSubNode?
366+
367+ //Cleanup
368+ this . masters = this . masters . filter ( master => master . address !== sourceAddress ) ;
369+ //not sure if needed, since there should be no replicas in RE
370+ this . replicas = this . replicas . filter ( replica => replica . address !== sourceAddress ) ;
371+ this . nodeByAddress . delete ( sourceAddress ) ;
372+ //TODO pubSubNode?
373+
374+ // 4.3 Kill because no slots are pointing to it anymore
375+ await sourceNode . client ?. close ( )
376+ if ( 'pubSub' in sourceNode ) {
377+ await sourceNode . pubSub ?. client . close ( ) ;
378+ }
379+ //TODO pubSubNode?
380+ }
381+
382+ // 5.1 Unpause
383+ destinationNode . client ?. _unpause ( ) ;
384+ if ( 'pubSub' in destinationNode ) {
385+ destinationNode . pubSub ?. client . _unpause ( ) ;
386+ }
387+ }
388388
389389 }
390390
0 commit comments