Skip to content

Commit 3b1bad2

Browse files
leibaleSimon Prickett
and
Simon Prickett
authored
Add support for sharded PubSub (#2373)
* refactor pubsub, add support for sharded pub sub * run tests in redis 7 only, fix PUBSUB SHARDCHANNELS test * add some comments and fix some bugs * PubSubType, not PubSubTypes 🤦‍♂️ * remove test.txt * fix some bugs, add tests * add some tests * fix #2345 - allow PING in PubSub mode (remove client side validation) * remove .only * revert changes in cluster/index.ts * fix tests minimum version * handle server sunsubscribe * add 'sharded-channel-moved' event to docs, improve the events section in the main README (fix #2302) * exit "resubscribe" if pubsub not active * Update commands-queue.ts * Release [email protected] * WIP * use `node:util` instead of `node:util/types` (to support node 14) * run PubSub resharding test with Redis 7+ * fix inconsistency in live resharding test * add some tests * fix iterateAllNodes when starting from a replica * fix iterateAllNodes random * fix slotNodesIterator * fix slotNodesIterator * clear pubSubNode when node in use * wait for all nodes cluster state to be ok before testing * `cluster.minimizeConections` tests * `client.reconnectStrategry = false | 0` tests * sharded pubsub + cluster 🎉 * add minimum version to sharded pubsub tests * add cluster sharded pubsub live reshard test, use stable dockers for tests, make sure to close pubsub clients when a node disconnects from the cluster * fix "ssubscribe & sunsubscribe" test * lock search docker to 2.4.9 * change numberOfMasters default to 2 * use edge for bloom * add tests * add back getMasters and getSlotMaster as deprecated functions * add some tests * fix reconnect strategy + docs * sharded pubsub docs * Update pub-sub.md * some jsdoc, docs, cluster topology test * clean pub-sub docs Co-authored-by: Simon Prickett <[email protected]> * reconnect startegy docs and bug fix Co-authored-by: Simon Prickett <[email protected]> * refine jsdoc and some docs Co-authored-by: Simon Prickett <[email protected]> * I'm stupid * fix cluster topology test * fix cluster topology test * Update README.md * Update clustering.md * Update pub-sub.md Co-authored-by: Simon Prickett <[email protected]>
1 parent e75a5db commit 3b1bad2

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

41 files changed

+2357
-710
lines changed

README.md

+12-49
Original file line numberDiff line numberDiff line change
@@ -166,47 +166,7 @@ To learn more about isolated execution, check out the [guide](./docs/isolated-ex
166166

167167
### Pub/Sub
168168

169-
Subscribing to a channel requires a dedicated stand-alone connection. You can easily get one by `.duplicate()`ing an existing Redis connection.
170-
171-
```typescript
172-
const subscriber = client.duplicate();
173-
174-
await subscriber.connect();
175-
```
176-
177-
Once you have one, simply subscribe and unsubscribe as needed:
178-
179-
```typescript
180-
await subscriber.subscribe('channel', (message) => {
181-
console.log(message); // 'message'
182-
});
183-
184-
await subscriber.pSubscribe('channe*', (message, channel) => {
185-
console.log(message, channel); // 'message', 'channel'
186-
});
187-
188-
await subscriber.unsubscribe('channel');
189-
190-
await subscriber.pUnsubscribe('channe*');
191-
```
192-
193-
Publish a message on a channel:
194-
195-
```typescript
196-
await publisher.publish('channel', 'message');
197-
```
198-
199-
There is support for buffers as well:
200-
201-
```typescript
202-
await subscriber.subscribe('channel', (message) => {
203-
console.log(message); // <Buffer 6d 65 73 73 61 67 65>
204-
}, true);
205-
206-
await subscriber.pSubscribe('channe*', (message, channel) => {
207-
console.log(message, channel); // <Buffer 6d 65 73 73 61 67 65>, <Buffer 63 68 61 6e 6e 65 6c>
208-
}, true);
209-
```
169+
See the [Pub/Sub overview](./docs/pub-sub.md).
210170

211171
### Scan Iterator
212172

@@ -373,15 +333,18 @@ Check out the [Clustering Guide](./docs/clustering.md) when using Node Redis to
373333

374334
The Node Redis client class is an Nodejs EventEmitter and it emits an event each time the network status changes:
375335

376-
| Event name | Scenes | Arguments to be passed to the listener |
377-
|----------------|-------------------------------------------------------------------------------------------------------------------|------------------------------------------------------------------------------------------------------------------------------------------------|
378-
| `connect` | The client is initiating a connection to the server. | _No argument_ |
379-
| `ready` | The client successfully initiated the connection to the server. | _No argument_ |
380-
| `end` | The client disconnected the connection to the server via `.quit()` or `.disconnect()`. | _No argument_ |
381-
| `error` | When a network error has occurred, such as unable to connect to the server or the connection closed unexpectedly. | 1 argument: The error object, such as `SocketClosedUnexpectedlyError: Socket closed unexpectedly` or `Error: connect ECONNREFUSED [IP]:[PORT]` |
382-
| `reconnecting` | The client is trying to reconnect to the server. | _No argument_ |
336+
| Name | When | Listener arguments |
337+
|-------------------------|------------------------------------------------------------------------------------|------------------------------------------------------------|
338+
| `connect` | Initiating a connection to the server | *No arguments* |
339+
| `ready` | Client is ready to use | *No arguments* |
340+
| `end` | Connection has been closed (via `.quit()` or `.disconnect()`) | *No arguments* |
341+
| `error` | An error has occurred—usually a network issue such as "Socket closed unexpectedly" | `(error: Error)` |
342+
| `reconnecting` | Client is trying to reconnect to the server | *No arguments* |
343+
| `sharded-channel-moved` | See [here](./docs/pub-sub.md#sharded-channel-moved-event) | See [here](./docs/pub-sub.md#sharded-channel-moved-event) |
344+
345+
> :warning: You **MUST** listen to `error` events. If a client doesn't have at least one `error` listener registered and an `error` occurs, that error will be thrown and the Node.js process will exit. See the [`EventEmitter` docs](https://nodejs.org/api/events.html#events_error_events) for more details.
383346
384-
The client will not emit [any other events](./docs/v3-to-v4.md#all-the-removed-events) beyond those listed above.
347+
> The client will not emit [any other events](./docs/v3-to-v4.md#all-the-removed-events) beyond those listed above.
385348
386349
## Supported Redis versions
387350

docs/client-configuration.md

+14-22
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
| socket.reconnectStrategy | `retries => Math.min(retries * 50, 500)` | A function containing the [Reconnect Strategy](#reconnect-strategy) logic |
1616
| username | | ACL username ([see ACL guide](https://redis.io/topics/acl)) |
1717
| password | | ACL password or the old "--requirepass" password |
18-
| name | | Connection name ([see `CLIENT SETNAME`](https://redis.io/commands/client-setname)) |
18+
| name | | Client name ([see `CLIENT SETNAME`](https://redis.io/commands/client-setname)) |
1919
| database | | Redis database number (see [`SELECT`](https://redis.io/commands/select) command) |
2020
| modules | | Included [Redis Modules](../README.md#packages) |
2121
| scripts | | Script definitions (see [Lua Scripts](../README.md#lua-scripts)) |
@@ -25,30 +25,22 @@
2525
| readonly | `false` | Connect in [`READONLY`](https://redis.io/commands/readonly) mode |
2626
| legacyMode | `false` | Maintain some backwards compatibility (see the [Migration Guide](./v3-to-v4.md)) |
2727
| isolationPoolOptions | | See the [Isolated Execution Guide](./isolated-execution.md) |
28-
| pingInterval | | Send `PING` command at interval (in ms). Useful with "[Azure Cache for Redis](https://learn.microsoft.com/en-us/azure/azure-cache-for-redis/cache-best-practices-connection#idle-timeout)" |
28+
| pingInterval | | Send `PING` command at interval (in ms). Useful with ["Azure Cache for Redis"](https://learn.microsoft.com/en-us/azure/azure-cache-for-redis/cache-best-practices-connection#idle-timeout) |
2929

3030
## Reconnect Strategy
3131

32-
When a network error occurs the client will automatically try to reconnect, following a default linear strategy (the more attempts, the more waiting before trying to reconnect).
32+
When the socket closes unexpectedly (without calling `.quit()`/`.disconnect()`), the client uses `reconnectStrategy` to decide what to do. The following values are supported:
33+
1. `false` -> do not reconnect, close the client and flush the command queue.
34+
2. `number` -> wait for `X` milliseconds before reconnecting.
35+
3. `(retries: number, cause: Error) => false | number | Error` -> `number` is the same as configuring a `number` directly, `Error` is the same as `false`, but with a custom error.
3336

34-
This strategy can be overridden by providing a `socket.reconnectStrategy` option during the client's creation.
37+
By default the strategy is `Math.min(retries * 50, 500)`, but it can be overwritten like so:
3538

36-
The `socket.reconnectStrategy` is a function that:
37-
38-
- Receives the number of retries attempted so far.
39-
- Returns `number | Error`:
40-
- `number`: wait time in milliseconds prior to attempting a reconnect.
41-
- `Error`: closes the client and flushes internal command queues.
42-
43-
The example below shows the default `reconnectStrategy` and how to override it.
44-
45-
```typescript
46-
import { createClient } from 'redis';
47-
48-
const client = createClient({
49-
socket: {
50-
reconnectStrategy: (retries) => Math.min(retries * 50, 500)
51-
}
39+
```javascript
40+
createClient({
41+
socket: {
42+
reconnectStrategy: retries => Math.min(retries * 50, 1000)
43+
}
5244
});
5345
```
5446

@@ -60,7 +52,7 @@ To enable TLS, set `socket.tls` to `true`. Below are some basic examples.
6052
6153
### Create a SSL client
6254

63-
```typescript
55+
```javascript
6456
createClient({
6557
socket: {
6658
tls: true,
@@ -72,7 +64,7 @@ createClient({
7264

7365
### Create a SSL client using a self-signed certificate
7466

75-
```typescript
67+
```javascript
7668
createClient({
7769
socket: {
7870
tls: true,

docs/clustering.md

+28-9
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ const value = await cluster.get('key');
3535
| rootNodes | | An array of root nodes that are part of the cluster, which will be used to get the cluster topology. Each element in the array is a client configuration object. There is no need to specify every node in the cluster, 3 should be enough to reliably connect and obtain the cluster configuration from the server |
3636
| defaults | | The default configuration values for every client in the cluster. Use this for example when specifying an ACL user to connect with |
3737
| useReplicas | `false` | When `true`, distribute load by executing readonly commands (such as `GET`, `GEOSEARCH`, etc.) across all cluster nodes. When `false`, only use master nodes |
38+
| minimizeConnections | `false` | When `true`, `.connect()` will only discover the cluster topology, without actually connecting to all the nodes. Useful for short-term or Pub/Sub-only connections. |
3839
| maxCommandRedirections | `16` | The maximum number of times a command will be redirected due to `MOVED` or `ASK` errors |
3940
| nodeAddressMap | | Defines the [node address mapping](#node-address-map) |
4041
| modules | | Included [Redis Modules](../README.md#packages) |
@@ -59,27 +60,45 @@ createCluster({
5960

6061
## Node Address Map
6162

62-
A node address map is required when a Redis cluster is configured with addresses that are inaccessible by the machine running the Redis client.
63-
This is a mapping of addresses and ports, with the values being the accessible address/port combination. Example:
63+
A mapping between the addresses in the cluster (see `CLUSTER SHARDS`) and the addresses the client should connect to.
64+
Useful when the cluster is running on a different network to the client.
6465

6566
```javascript
67+
const rootNodes = [{
68+
url: 'external-host-1.io:30001'
69+
}, {
70+
url: 'external-host-2.io:30002'
71+
}];
72+
73+
// Use either a static mapping:
6674
createCluster({
67-
rootNodes: [{
68-
url: 'external-host-1.io:30001'
69-
}, {
70-
url: 'external-host-2.io:30002'
71-
}],
75+
rootNodes,
7276
nodeAddressMap: {
7377
'10.0.0.1:30001': {
74-
host: 'external-host-1.io',
78+
host: 'external-host.io',
7579
port: 30001
7680
},
7781
'10.0.0.2:30002': {
78-
host: 'external-host-2.io',
82+
host: 'external-host.io',
7983
port: 30002
8084
}
8185
}
8286
});
87+
88+
// or create the mapping dynamically, as a function:
89+
createCluster({
90+
rootNodes,
91+
nodeAddressMap(address) {
92+
const indexOfDash = address.lastIndexOf('-'),
93+
indexOfDot = address.indexOf('.', indexOfDash),
94+
indexOfColons = address.indexOf(':', indexOfDot);
95+
96+
return {
97+
host: `external-host-${address.substring(indexOfDash + 1, indexOfDot)}.io`,
98+
port: Number(address.substring(indexOfColons + 1))
99+
};
100+
}
101+
});
83102
```
84103

85104
> This is a common problem when using ElastiCache. See [Accessing ElastiCache from outside AWS](https://docs.aws.amazon.com/AmazonElastiCache/latest/red-ug/accessing-elasticache.html) for more information on that.

docs/pub-sub.md

+86
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
# Pub/Sub
2+
3+
The Pub/Sub API is implemented by `RedisClient` and `RedisCluster`.
4+
5+
## Pub/Sub with `RedisClient`
6+
7+
Pub/Sub requires a dedicated stand-alone client. You can easily get one by `.duplicate()`ing an existing `RedisClient`:
8+
9+
```typescript
10+
const subscriber = client.duplicate();
11+
subscribe.on('error', err => console.error(err));
12+
await subscriber.connect();
13+
```
14+
15+
When working with a `RedisCluster`, this is handled automatically for you.
16+
17+
### `sharded-channel-moved` event
18+
19+
`RedisClient` emits the `sharded-channel-moved` event when the ["cluster slot"](https://redis.io/docs/reference/cluster-spec/#key-distribution-model) of a subscribed [Sharded Pub/Sub](https://redis.io/docs/manual/pubsub/#sharded-pubsub) channel has been moved to another shard.
20+
21+
The event listener signature is as follows:
22+
```typescript
23+
(
24+
channel: string,
25+
listeners: {
26+
buffers: Set<Listener>;
27+
strings: Set<Listener>;
28+
}
29+
)`.
30+
```
31+
32+
## Subscribing
33+
34+
```javascript
35+
const listener = (message, channel) => console.log(message, channel);
36+
await client.subscribe('channel', listener);
37+
await client.pSubscribe('channe*', listener);
38+
// Use sSubscribe for sharded Pub/Sub:
39+
await client.sSubscribe('channel', listener);
40+
```
41+
42+
## Publishing
43+
44+
```javascript
45+
await client.publish('channel', 'message');
46+
// Use sPublish for sharded Pub/Sub:
47+
await client.sPublish('channel', 'message');
48+
```
49+
50+
## Unsubscribing
51+
52+
The code below unsubscribes all listeners from all channels.
53+
54+
```javascript
55+
await client.unsubscribe();
56+
await client.pUnsubscribe();
57+
// Use sUnsubscribe for sharded Pub/Sub:
58+
await client.sUnsubscribe();
59+
```
60+
61+
To unsubscribe from specific channels:
62+
63+
```javascript
64+
await client.unsubscribe('channel');
65+
await client.unsubscribe(['1', '2']);
66+
```
67+
68+
To unsubscribe a specific listener:
69+
70+
```javascript
71+
await client.unsubscribe('channel', listener);
72+
```
73+
74+
## Buffers
75+
76+
Publishing and subscribing using `Buffer`s is also supported:
77+
78+
```javascript
79+
await subscriber.subscribe('channel', message => {
80+
console.log(message); // <Buffer 6d 65 73 73 61 67 65>
81+
}, true); // true = subscribe in `Buffer` mode.
82+
83+
await subscriber.publish(Buffer.from('channel'), Buffer.from('message'));
84+
```
85+
86+
> NOTE: Buffers and strings are supported both for the channel name and the message. You can mix and match these as desired.

0 commit comments

Comments
 (0)