Skip to content

Add support for sharded PubSub #2373

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 57 commits into from
Jan 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
57 commits
Select commit Hold shift + click to select a range
5dfd673
refactor pubsub, add support for sharded pub sub
leibale Dec 11, 2022
af03ba0
run tests in redis 7 only, fix PUBSUB SHARDCHANNELS test
leibale Dec 11, 2022
0b69a25
add some comments and fix some bugs
leibale Dec 12, 2022
29e43c7
PubSubType, not PubSubTypes 🤦‍♂️
leibale Dec 12, 2022
f7ce533
remove test.txt
leibale Dec 12, 2022
b74f6d4
fix some bugs, add tests
leibale Dec 12, 2022
de12812
add some tests
leibale Dec 12, 2022
811b554
fix #2345 - allow PING in PubSub mode (remove client side validation)
leibale Dec 12, 2022
d3cbd0d
remove .only
leibale Dec 12, 2022
877e57a
revert changes in cluster/index.ts
leibale Dec 12, 2022
52465d5
fix tests minimum version
leibale Dec 12, 2022
b82fcf5
handle server sunsubscribe
leibale Dec 13, 2022
2aab6eb
add 'sharded-channel-moved' event to docs, improve the events section…
leibale Dec 13, 2022
9419fef
exit "resubscribe" if pubsub not active
leibale Dec 13, 2022
298697a
Update commands-queue.ts
leibale Dec 13, 2022
8ea2123
Merge branch 'master' of github.com:redis/node-redis into sharded-pubusb
leibale Dec 26, 2022
d4bb5f3
Release [email protected]
leibale Dec 26, 2022
59cea35
Merge branch 'master' of github.com:redis/node-redis into sharded-pubusb
leibale Dec 26, 2022
d7ec781
Merge branch 'sharded-pubusb' of github.com:leibale/node-redis into s…
leibale Jan 4, 2023
4d6c65a
WIP
leibale Jan 10, 2023
fa52217
use `node:util` instead of `node:util/types` (to support node 14)
leibale Jan 10, 2023
27e72a0
run PubSub resharding test with Redis 7+
leibale Jan 10, 2023
a08924f
fix inconsistency in live resharding test
leibale Jan 10, 2023
a0b5862
add some tests
leibale Jan 11, 2023
ba0ac0f
fix iterateAllNodes when starting from a replica
leibale Jan 11, 2023
d1246e6
fix iterateAllNodes random
leibale Jan 11, 2023
3ec4d27
fix slotNodesIterator
leibale Jan 11, 2023
236e3ad
fix slotNodesIterator
leibale Jan 11, 2023
4525b5e
clear pubSubNode when node in use
leibale Jan 11, 2023
da8f681
wait for all nodes cluster state to be ok before testing
leibale Jan 11, 2023
d91356d
`cluster.minimizeConections` tests
leibale Jan 12, 2023
b3fd1e9
`client.reconnectStrategry = false | 0` tests
leibale Jan 12, 2023
67a495e
sharded pubsub + cluster 🎉
leibale Jan 18, 2023
d572fba
add minimum version to sharded pubsub tests
leibale Jan 18, 2023
e5ced88
add cluster sharded pubsub live reshard test, use stable dockers for …
leibale Jan 23, 2023
e212630
Merge branch 'master' of github.com:redis/node-redis into sharded-pubsub
leibale Jan 23, 2023
783c1fd
fix "ssubscribe & sunsubscribe" test
leibale Jan 23, 2023
244e3c2
lock search docker to 2.4.9
leibale Jan 23, 2023
d2c360c
change numberOfMasters default to 2
leibale Jan 23, 2023
be0f287
use edge for bloom
leibale Jan 23, 2023
b007316
add tests
leibale Jan 23, 2023
c11d2ec
add back getMasters and getSlotMaster as deprecated functions
leibale Jan 23, 2023
97bf4bf
add some tests
leibale Jan 23, 2023
6e969fa
Merge branch 'master' of github.com:redis/node-redis into sharded-pubsub
leibale Jan 24, 2023
80eddda
fix reconnect strategy + docs
leibale Jan 24, 2023
7741fb4
sharded pubsub docs
leibale Jan 24, 2023
d316f5c
Update pub-sub.md
leibale Jan 24, 2023
9cb28f6
some jsdoc, docs, cluster topology test
leibale Jan 24, 2023
db899ad
clean pub-sub docs
leibale Jan 24, 2023
6c47a82
reconnect startegy docs and bug fix
leibale Jan 24, 2023
5392e0f
refine jsdoc and some docs
leibale Jan 24, 2023
47e2bfd
I'm stupid
leibale Jan 24, 2023
a936a0e
fix cluster topology test
leibale Jan 24, 2023
aa5376f
fix cluster topology test
leibale Jan 24, 2023
651b074
Update README.md
leibale Jan 25, 2023
dc96a38
Update clustering.md
leibale Jan 25, 2023
d4cf124
Update pub-sub.md
leibale Jan 25, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 12 additions & 49 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -166,47 +166,7 @@ To learn more about isolated execution, check out the [guide](./docs/isolated-ex

### Pub/Sub

Subscribing to a channel requires a dedicated stand-alone connection. You can easily get one by `.duplicate()`ing an existing Redis connection.

```typescript
const subscriber = client.duplicate();

await subscriber.connect();
```

Once you have one, simply subscribe and unsubscribe as needed:

```typescript
await subscriber.subscribe('channel', (message) => {
console.log(message); // 'message'
});

await subscriber.pSubscribe('channe*', (message, channel) => {
console.log(message, channel); // 'message', 'channel'
});

await subscriber.unsubscribe('channel');

await subscriber.pUnsubscribe('channe*');
```

Publish a message on a channel:

```typescript
await publisher.publish('channel', 'message');
```

There is support for buffers as well:

```typescript
await subscriber.subscribe('channel', (message) => {
console.log(message); // <Buffer 6d 65 73 73 61 67 65>
}, true);

await subscriber.pSubscribe('channe*', (message, channel) => {
console.log(message, channel); // <Buffer 6d 65 73 73 61 67 65>, <Buffer 63 68 61 6e 6e 65 6c>
}, true);
```
See the [Pub/Sub overview](./docs/pub-sub.md).

### Scan Iterator

Expand Down Expand Up @@ -373,15 +333,18 @@ Check out the [Clustering Guide](./docs/clustering.md) when using Node Redis to

The Node Redis client class is an Nodejs EventEmitter and it emits an event each time the network status changes:

| Event name | Scenes | Arguments to be passed to the listener |
|----------------|-------------------------------------------------------------------------------------------------------------------|------------------------------------------------------------------------------------------------------------------------------------------------|
| `connect` | The client is initiating a connection to the server. | _No argument_ |
| `ready` | The client successfully initiated the connection to the server. | _No argument_ |
| `end` | The client disconnected the connection to the server via `.quit()` or `.disconnect()`. | _No argument_ |
| `error` | When a network error has occurred, such as unable to connect to the server or the connection closed unexpectedly. | 1 argument: The error object, such as `SocketClosedUnexpectedlyError: Socket closed unexpectedly` or `Error: connect ECONNREFUSED [IP]:[PORT]` |
| `reconnecting` | The client is trying to reconnect to the server. | _No argument_ |
| Name | When | Listener arguments |
|-------------------------|------------------------------------------------------------------------------------|------------------------------------------------------------|
| `connect` | Initiating a connection to the server | *No arguments* |
| `ready` | Client is ready to use | *No arguments* |
| `end` | Connection has been closed (via `.quit()` or `.disconnect()`) | *No arguments* |
| `error` | An error has occurred—usually a network issue such as "Socket closed unexpectedly" | `(error: Error)` |
| `reconnecting` | Client is trying to reconnect to the server | *No arguments* |
| `sharded-channel-moved` | See [here](./docs/pub-sub.md#sharded-channel-moved-event) | See [here](./docs/pub-sub.md#sharded-channel-moved-event) |

> :warning: You **MUST** listen to `error` events. If a client doesn't have at least one `error` listener registered and an `error` occurs, that error will be thrown and the Node.js process will exit. See the [`EventEmitter` docs](https://nodejs.org/api/events.html#events_error_events) for more details.

The client will not emit [any other events](./docs/v3-to-v4.md#all-the-removed-events) beyond those listed above.
> The client will not emit [any other events](./docs/v3-to-v4.md#all-the-removed-events) beyond those listed above.

## Supported Redis versions

Expand Down
36 changes: 14 additions & 22 deletions docs/client-configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
| socket.reconnectStrategy | `retries => Math.min(retries * 50, 500)` | A function containing the [Reconnect Strategy](#reconnect-strategy) logic |
| username | | ACL username ([see ACL guide](https://redis.io/topics/acl)) |
| password | | ACL password or the old "--requirepass" password |
| name | | Connection name ([see `CLIENT SETNAME`](https://redis.io/commands/client-setname)) |
| name | | Client name ([see `CLIENT SETNAME`](https://redis.io/commands/client-setname)) |
| database | | Redis database number (see [`SELECT`](https://redis.io/commands/select) command) |
| modules | | Included [Redis Modules](../README.md#packages) |
| scripts | | Script definitions (see [Lua Scripts](../README.md#lua-scripts)) |
Expand All @@ -25,30 +25,22 @@
| readonly | `false` | Connect in [`READONLY`](https://redis.io/commands/readonly) mode |
| legacyMode | `false` | Maintain some backwards compatibility (see the [Migration Guide](./v3-to-v4.md)) |
| isolationPoolOptions | | See the [Isolated Execution Guide](./isolated-execution.md) |
| pingInterval | | Send `PING` command at interval (in ms). Useful with "[Azure Cache for Redis](https://learn.microsoft.com/en-us/azure/azure-cache-for-redis/cache-best-practices-connection#idle-timeout)" |
| pingInterval | | Send `PING` command at interval (in ms). Useful with ["Azure Cache for Redis"](https://learn.microsoft.com/en-us/azure/azure-cache-for-redis/cache-best-practices-connection#idle-timeout) |

## Reconnect Strategy

When a network error occurs the client will automatically try to reconnect, following a default linear strategy (the more attempts, the more waiting before trying to reconnect).
When the socket closes unexpectedly (without calling `.quit()`/`.disconnect()`), the client uses `reconnectStrategy` to decide what to do. The following values are supported:
1. `false` -> do not reconnect, close the client and flush the command queue.
2. `number` -> wait for `X` milliseconds before reconnecting.
3. `(retries: number, cause: Error) => false | number | Error` -> `number` is the same as configuring a `number` directly, `Error` is the same as `false`, but with a custom error.

This strategy can be overridden by providing a `socket.reconnectStrategy` option during the client's creation.
By default the strategy is `Math.min(retries * 50, 500)`, but it can be overwritten like so:

The `socket.reconnectStrategy` is a function that:

- Receives the number of retries attempted so far.
- Returns `number | Error`:
- `number`: wait time in milliseconds prior to attempting a reconnect.
- `Error`: closes the client and flushes internal command queues.

The example below shows the default `reconnectStrategy` and how to override it.

```typescript
import { createClient } from 'redis';

const client = createClient({
socket: {
reconnectStrategy: (retries) => Math.min(retries * 50, 500)
}
```javascript
createClient({
socket: {
reconnectStrategy: retries => Math.min(retries * 50, 1000)
}
});
```

Expand All @@ -60,7 +52,7 @@ To enable TLS, set `socket.tls` to `true`. Below are some basic examples.

### Create a SSL client

```typescript
```javascript
createClient({
socket: {
tls: true,
Expand All @@ -72,7 +64,7 @@ createClient({

### Create a SSL client using a self-signed certificate

```typescript
```javascript
createClient({
socket: {
tls: true,
Expand Down
37 changes: 28 additions & 9 deletions docs/clustering.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ const value = await cluster.get('key');
| rootNodes | | An array of root nodes that are part of the cluster, which will be used to get the cluster topology. Each element in the array is a client configuration object. There is no need to specify every node in the cluster, 3 should be enough to reliably connect and obtain the cluster configuration from the server |
| defaults | | The default configuration values for every client in the cluster. Use this for example when specifying an ACL user to connect with |
| useReplicas | `false` | When `true`, distribute load by executing readonly commands (such as `GET`, `GEOSEARCH`, etc.) across all cluster nodes. When `false`, only use master nodes |
| minimizeConnections | `false` | When `true`, `.connect()` will only discover the cluster topology, without actually connecting to all the nodes. Useful for short-term or Pub/Sub-only connections. |
| maxCommandRedirections | `16` | The maximum number of times a command will be redirected due to `MOVED` or `ASK` errors |
| nodeAddressMap | | Defines the [node address mapping](#node-address-map) |
| modules | | Included [Redis Modules](../README.md#packages) |
Expand All @@ -43,27 +44,45 @@ const value = await cluster.get('key');

## Node Address Map

A node address map is required when a Redis cluster is configured with addresses that are inaccessible by the machine running the Redis client.
This is a mapping of addresses and ports, with the values being the accessible address/port combination. Example:
A mapping between the addresses in the cluster (see `CLUSTER SHARDS`) and the addresses the client should connect to.
Useful when the cluster is running on a different network to the client.

```javascript
const rootNodes = [{
url: 'external-host-1.io:30001'
}, {
url: 'external-host-2.io:30002'
}];

// Use either a static mapping:
createCluster({
rootNodes: [{
url: 'external-host-1.io:30001'
}, {
url: 'external-host-2.io:30002'
}],
rootNodes,
nodeAddressMap: {
'10.0.0.1:30001': {
host: 'external-host-1.io',
host: 'external-host.io',
port: 30001
},
'10.0.0.2:30002': {
host: 'external-host-2.io',
host: 'external-host.io',
port: 30002
}
}
});

// or create the mapping dynamically, as a function:
createCluster({
rootNodes,
nodeAddressMap(address) {
const indexOfDash = address.lastIndexOf('-'),
indexOfDot = address.indexOf('.', indexOfDash),
indexOfColons = address.indexOf(':', indexOfDot);

return {
host: `external-host-${address.substring(indexOfDash + 1, indexOfDot)}.io`,
port: Number(address.substring(indexOfColons + 1))
};
}
});
```

> This is a common problem when using ElastiCache. See [Accessing ElastiCache from outside AWS](https://docs.aws.amazon.com/AmazonElastiCache/latest/red-ug/accessing-elasticache.html) for more information on that.
Expand Down
86 changes: 86 additions & 0 deletions docs/pub-sub.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
# Pub/Sub

The Pub/Sub API is implemented by `RedisClient` and `RedisCluster`.

## Pub/Sub with `RedisClient`

Pub/Sub requires a dedicated stand-alone client. You can easily get one by `.duplicate()`ing an existing `RedisClient`:

```typescript
const subscriber = client.duplicate();
subscribe.on('error', err => console.error(err));
await subscriber.connect();
```

When working with a `RedisCluster`, this is handled automatically for you.

### `sharded-channel-moved` event

`RedisClient` emits the `sharded-channel-moved` event when the ["cluster slot"](https://redis.io/docs/reference/cluster-spec/#key-distribution-model) of a subscribed [Sharded Pub/Sub](https://redis.io/docs/manual/pubsub/#sharded-pubsub) channel has been moved to another shard.

The event listener signature is as follows:
```typescript
(
channel: string,
listeners: {
buffers: Set<Listener>;
strings: Set<Listener>;
}
)`.
```

## Subscribing

```javascript
const listener = (message, channel) => console.log(message, channel);
await client.subscribe('channel', listener);
await client.pSubscribe('channe*', listener);
// Use sSubscribe for sharded Pub/Sub:
await client.sSubscribe('channel', listener);
```

## Publishing

```javascript
await client.publish('channel', 'message');
// Use sPublish for sharded Pub/Sub:
await client.sPublish('channel', 'message');
```

## Unsubscribing

The code below unsubscribes all listeners from all channels.

```javascript
await client.unsubscribe();
await client.pUnsubscribe();
// Use sUnsubscribe for sharded Pub/Sub:
await client.sUnsubscribe();
```

To unsubscribe from specific channels:

```javascript
await client.unsubscribe('channel');
await client.unsubscribe(['1', '2']);
```

To unsubscribe a specific listener:

```javascript
await client.unsubscribe('channel', listener);
```

## Buffers

Publishing and subscribing using `Buffer`s is also supported:

```javascript
await subscriber.subscribe('channel', message => {
console.log(message); // <Buffer 6d 65 73 73 61 67 65>
}, true); // true = subscribe in `Buffer` mode.

await subscriber.publish(Buffer.from('channel'), Buffer.from('message'));
```

> NOTE: Buffers and strings are supported both for the channel name and the message. You can mix and match these as desired.
Loading