Skip to content

Commit 58faa1d

Browse files
feat: add support for the ioredis package
Related: #2
1 parent 65d0539 commit 58faa1d

File tree

5 files changed

+306
-48
lines changed

5 files changed

+306
-48
lines changed

lib/adapter.ts

Lines changed: 16 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,13 @@
11
import type { PrivateSessionId, Session } from "socket.io-adapter";
22
import { decode, encode } from "@msgpack/msgpack";
3-
import { commandOptions } from "redis";
43
import {
54
ClusterAdapter,
65
ClusterAdapterOptions,
76
ClusterMessage,
87
MessageType,
98
} from "./cluster-adapter";
109
import debugModule from "debug";
11-
import { hasBinary } from "./util";
10+
import { hasBinary, XADD, XREAD } from "./util";
1211

1312
const debug = debugModule("socket.io-redis-streams-adapter");
1413

@@ -61,23 +60,15 @@ export function createAdapter(
6160
);
6261
let offset = "$";
6362
let polling = false;
63+
let shouldClose = false;
6464

6565
async function poll() {
6666
try {
67-
let response = await redisClient.xRead(
68-
commandOptions({
69-
isolated: true,
70-
}),
71-
[
72-
{
73-
key: options.streamName,
74-
id: offset,
75-
},
76-
],
77-
{
78-
COUNT: options.readCount,
79-
BLOCK: 5000,
80-
}
67+
let response = await XREAD(
68+
redisClient,
69+
options.streamName,
70+
offset,
71+
options.readCount
8172
);
8273

8374
if (response) {
@@ -98,7 +89,7 @@ export function createAdapter(
9889
debug("something went wrong while consuming the stream: %s", e.message);
9990
}
10091

101-
if (namespaceToAdapters.size > 0 && redisClient.isOpen) {
92+
if (namespaceToAdapters.size > 0 && !shouldClose) {
10293
poll();
10394
} else {
10495
polling = false;
@@ -111,6 +102,7 @@ export function createAdapter(
111102

112103
if (!polling) {
113104
polling = true;
105+
shouldClose = false;
114106
poll();
115107
}
116108

@@ -119,6 +111,10 @@ export function createAdapter(
119111
adapter.close = () => {
120112
namespaceToAdapters.delete(nsp.name);
121113

114+
if (namespaceToAdapters.size === 0) {
115+
shouldClose = true;
116+
}
117+
122118
defaultClose.call(adapter);
123119
};
124120

@@ -141,17 +137,11 @@ class RedisStreamsAdapter extends ClusterAdapter {
141137
override doPublish(message: ClusterMessage) {
142138
debug("publishing %o", message);
143139

144-
return this.#redisClient.xAdd(
140+
return XADD(
141+
this.#redisClient,
145142
this.#opts.streamName,
146-
"*",
147143
RedisStreamsAdapter.encode(message),
148-
{
149-
TRIM: {
150-
strategy: "MAXLEN",
151-
strategyModifier: "~",
152-
threshold: this.#opts.maxLen,
153-
},
154-
}
144+
this.#opts.maxLen
155145
);
156146
}
157147

lib/util.ts

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import { randomBytes } from "crypto";
2+
import { commandOptions } from "redis";
23

34
export function hasBinary(obj: any, toJSON?: boolean): boolean {
45
if (!obj || typeof obj !== "object") {
@@ -34,3 +35,93 @@ export function hasBinary(obj: any, toJSON?: boolean): boolean {
3435
export function randomId() {
3536
return randomBytes(8).toString("hex");
3637
}
38+
39+
/**
40+
* Whether the client comes from the `redis` package
41+
*
42+
* @param redisClient
43+
*
44+
* @see https://github.com/redis/node-redis
45+
*/
46+
function isRedisV4Client(redisClient: any) {
47+
return typeof redisClient.sSubscribe === "function";
48+
}
49+
50+
/**
51+
* @see https://redis.io/commands/xread/
52+
*/
53+
export function XREAD(
54+
redisClient: any,
55+
streamName: string,
56+
offset: string,
57+
readCount: number
58+
) {
59+
if (isRedisV4Client(redisClient)) {
60+
return redisClient.xRead(
61+
commandOptions({
62+
isolated: true,
63+
}),
64+
[
65+
{
66+
key: streamName,
67+
id: offset,
68+
},
69+
],
70+
{
71+
COUNT: readCount,
72+
BLOCK: 5000,
73+
}
74+
);
75+
} else {
76+
return redisClient
77+
.xread("BLOCK", 100, "COUNT", readCount, "STREAMS", streamName, offset)
78+
.then((results) => {
79+
if (results === null) {
80+
return null;
81+
}
82+
return [
83+
{
84+
messages: results[0][1].map((result) => {
85+
const id = result[0];
86+
const inlineValues = result[1];
87+
const message = {};
88+
for (let i = 0; i < inlineValues.length; i += 2) {
89+
message[inlineValues[i]] = inlineValues[i + 1];
90+
}
91+
return {
92+
id,
93+
message,
94+
};
95+
}),
96+
},
97+
];
98+
});
99+
}
100+
}
101+
102+
/**
103+
* @see https://redis.io/commands/xadd/
104+
*/
105+
export function XADD(
106+
redisClient: any,
107+
streamName: string,
108+
payload: any,
109+
maxLenThreshold: number
110+
) {
111+
if (isRedisV4Client(redisClient)) {
112+
return redisClient.xAdd(streamName, "*", payload, {
113+
TRIM: {
114+
strategy: "MAXLEN",
115+
strategyModifier: "~",
116+
threshold: maxLenThreshold,
117+
},
118+
});
119+
} else {
120+
const args = [streamName, "MAXLEN", "~", maxLenThreshold, "*"];
121+
Object.keys(payload).forEach((k) => {
122+
args.push(k, payload[k]);
123+
});
124+
125+
return redisClient.xadd.call(redisClient, args);
126+
}
127+
}

0 commit comments

Comments
 (0)