Skip to content

Commit 5ec8802

Browse files
committed
feature: add sharded support
1 parent bfefbde commit 5ec8802

File tree

5 files changed

+687
-269
lines changed

5 files changed

+687
-269
lines changed

lib/broadcast-operator.ts

Lines changed: 228 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,228 @@
1+
import debugModule from "debug";
2+
import { PacketType } from "socket.io-parser";
3+
import type {
4+
EventNames,
5+
EventParams,
6+
EventsMap,
7+
TypedEventBroadcaster,
8+
} from "./typed-events";
9+
import { UID, RESERVED_EVENTS, BroadcastFlags, BroadcastOptions } from "./util";
10+
11+
const debug = debugModule("socket.io-emitter");
12+
13+
/**
14+
* Request types, for messages between nodes
15+
*/
16+
17+
export enum RequestType {
18+
SOCKETS = 0,
19+
ALL_ROOMS = 1,
20+
REMOTE_JOIN = 2,
21+
REMOTE_LEAVE = 3,
22+
REMOTE_DISCONNECT = 4,
23+
REMOTE_FETCH = 5,
24+
SERVER_SIDE_EMIT = 6,
25+
}
26+
27+
export class BroadcastOperator<EmitEvents extends EventsMap>
28+
implements TypedEventBroadcaster<EmitEvents> {
29+
constructor(
30+
private readonly redisClient: any,
31+
private readonly broadcastOptions: BroadcastOptions,
32+
private readonly rooms: Set<string> = new Set<string>(),
33+
private readonly exceptRooms: Set<string> = new Set<string>(),
34+
private readonly flags: BroadcastFlags = {}
35+
) {}
36+
37+
/**
38+
* Targets a room when emitting.
39+
*
40+
* @param room
41+
* @return a new BroadcastOperator instance
42+
* @public
43+
*/
44+
public to(room: string | string[]): BroadcastOperator<EmitEvents> {
45+
const rooms = new Set(this.rooms);
46+
if (Array.isArray(room)) {
47+
room.forEach((r) => rooms.add(r));
48+
} else {
49+
rooms.add(room);
50+
}
51+
return new BroadcastOperator(
52+
this.redisClient,
53+
this.broadcastOptions,
54+
rooms,
55+
this.exceptRooms,
56+
this.flags
57+
);
58+
}
59+
60+
/**
61+
* Targets a room when emitting.
62+
*
63+
* @param room
64+
* @return a new BroadcastOperator instance
65+
* @public
66+
*/
67+
public in(room: string | string[]): BroadcastOperator<EmitEvents> {
68+
return this.to(room);
69+
}
70+
71+
/**
72+
* Excludes a room when emitting.
73+
*
74+
* @param room
75+
* @return a new BroadcastOperator instance
76+
* @public
77+
*/
78+
public except(room: string | string[]): BroadcastOperator<EmitEvents> {
79+
const exceptRooms = new Set(this.exceptRooms);
80+
if (Array.isArray(room)) {
81+
room.forEach((r) => exceptRooms.add(r));
82+
} else {
83+
exceptRooms.add(room);
84+
}
85+
return new BroadcastOperator(
86+
this.redisClient,
87+
this.broadcastOptions,
88+
this.rooms,
89+
exceptRooms,
90+
this.flags
91+
);
92+
}
93+
94+
/**
95+
* Sets the compress flag.
96+
*
97+
* @param compress - if `true`, compresses the sending data
98+
* @return a new BroadcastOperator instance
99+
* @public
100+
*/
101+
public compress(compress: boolean): BroadcastOperator<EmitEvents> {
102+
const flags = Object.assign({}, this.flags, { compress });
103+
return new BroadcastOperator(
104+
this.redisClient,
105+
this.broadcastOptions,
106+
this.rooms,
107+
this.exceptRooms,
108+
flags
109+
);
110+
}
111+
112+
/**
113+
* Sets a modifier for a subsequent event emission that the event data may be lost if the client is not ready to
114+
* receive messages (because of network slowness or other issues, or because they’re connected through long polling
115+
* and is in the middle of a request-response cycle).
116+
*
117+
* @return a new BroadcastOperator instance
118+
* @public
119+
*/
120+
public get volatile(): BroadcastOperator<EmitEvents> {
121+
const flags = Object.assign({}, this.flags, { volatile: true });
122+
return new BroadcastOperator(
123+
this.redisClient,
124+
this.broadcastOptions,
125+
this.rooms,
126+
this.exceptRooms,
127+
flags
128+
);
129+
}
130+
131+
/**
132+
* Emits to all clients.
133+
*
134+
* @return Always true
135+
* @public
136+
*/
137+
public emit<Ev extends EventNames<EmitEvents>>(
138+
ev: Ev,
139+
...args: EventParams<EmitEvents, Ev>
140+
): true {
141+
if (RESERVED_EVENTS.has(ev)) {
142+
throw new Error(`"${ev}" is a reserved event name`);
143+
}
144+
145+
// set up packet object
146+
const data = [ev, ...args];
147+
const packet = {
148+
type: PacketType.EVENT,
149+
data: data,
150+
nsp: this.broadcastOptions.nsp,
151+
};
152+
153+
const opts = {
154+
rooms: [...this.rooms],
155+
flags: this.flags,
156+
except: [...this.exceptRooms],
157+
};
158+
159+
const msg = this.broadcastOptions.parser.encode([UID, packet, opts]);
160+
let channel = this.broadcastOptions.broadcastChannel;
161+
if (this.rooms && this.rooms.size === 1) {
162+
channel += this.rooms.keys().next().value + "#";
163+
}
164+
165+
debug("publishing message to channel %s", channel);
166+
167+
this.redisClient.publish(channel, msg);
168+
169+
return true;
170+
}
171+
172+
/**
173+
* Makes the matching socket instances join the specified rooms
174+
*
175+
* @param rooms
176+
* @public
177+
*/
178+
public socketsJoin(rooms: string | string[]): void {
179+
const request = JSON.stringify({
180+
type: RequestType.REMOTE_JOIN,
181+
opts: {
182+
rooms: [...this.rooms],
183+
except: [...this.exceptRooms],
184+
},
185+
rooms: Array.isArray(rooms) ? rooms : [rooms],
186+
});
187+
188+
this.redisClient.publish(this.broadcastOptions.requestChannel, request);
189+
}
190+
191+
/**
192+
* Makes the matching socket instances leave the specified rooms
193+
*
194+
* @param rooms
195+
* @public
196+
*/
197+
public socketsLeave(rooms: string | string[]): void {
198+
const request = JSON.stringify({
199+
type: RequestType.REMOTE_LEAVE,
200+
opts: {
201+
rooms: [...this.rooms],
202+
except: [...this.exceptRooms],
203+
},
204+
rooms: Array.isArray(rooms) ? rooms : [rooms],
205+
});
206+
207+
this.redisClient.publish(this.broadcastOptions.requestChannel, request);
208+
}
209+
210+
/**
211+
* Makes the matching socket instances disconnect
212+
*
213+
* @param close - whether to close the underlying connection
214+
* @public
215+
*/
216+
public disconnectSockets(close: boolean = false): void {
217+
const request = JSON.stringify({
218+
type: RequestType.REMOTE_DISCONNECT,
219+
opts: {
220+
rooms: [...this.rooms],
221+
except: [...this.exceptRooms],
222+
},
223+
close,
224+
});
225+
226+
this.redisClient.publish(this.broadcastOptions.requestChannel, request);
227+
}
228+
}

0 commit comments

Comments
 (0)