Skip to content

Commit 34cbfbb

Browse files
fix: discard acknowledgements upon disconnection
Previously, getting disconnected while waiting for an acknowledgement would create a memory leak, as the acknowledgement was never received and the handler would stay in memory forever. This commit fixes the issue: - handlers that do accept an error as first argument, such as: * `socket.emit("test", (err, value) => { ... })` with `ackTimeout` option * `socket.timeout(5000).emit("test", (err, value) => { ... })` * `const value = await socket.emitWithAck("test")` will now properly resolve with an error and get discarded. - handlers that don't like `socket.emit("test", (value) => { ... });` will simply be discarded upon disconnection Note: the structure of the 'acks' attribute has been left untouched, in order to prevent any breaking change. Related: - #1546 - socketio/socket.io#4964
1 parent 8cfea8c commit 34cbfbb

File tree

2 files changed

+198
-22
lines changed

2 files changed

+198
-22
lines changed

lib/socket.ts

Lines changed: 74 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -241,7 +241,33 @@ export class Socket<
241241
private readonly _opts: SocketOptions;
242242

243243
private ids: number = 0;
244-
private acks: object = {};
244+
/**
245+
* A map containing acknowledgement handlers.
246+
*
247+
* The `withError` attribute is used to differentiate handlers that accept an error as first argument:
248+
*
249+
* - `socket.emit("test", (err, value) => { ... })` with `ackTimeout` option
250+
* - `socket.timeout(5000).emit("test", (err, value) => { ... })`
251+
* - `const value = await socket.emitWithAck("test")`
252+
*
253+
* From those that don't:
254+
*
255+
* - `socket.emit("test", (value) => { ... });`
256+
*
257+
* In the first case, the handlers will be called with an error when:
258+
*
259+
* - the timeout is reached
260+
* - the socket gets disconnected
261+
*
262+
* In the second case, the handlers will be simply discarded upon disconnection, since the client will never receive
263+
* an acknowledgement from the server.
264+
*
265+
* @private
266+
*/
267+
private acks: Record<
268+
string,
269+
((...args: any[]) => void) & { withError?: boolean }
270+
> = {};
245271
private flags: Flags = {};
246272
private subs?: Array<VoidFunction>;
247273
private _anyListeners: Array<(...args: any[]) => void>;
@@ -409,7 +435,7 @@ export class Socket<
409435
const id = this.ids++;
410436
debug("emitting packet with ack id %d", id);
411437

412-
const ack = args.pop() as Function;
438+
const ack = args.pop() as (...args: any[]) => void;
413439
this._registerAckCallback(id, ack);
414440
packet.id = id;
415441
}
@@ -438,7 +464,7 @@ export class Socket<
438464
/**
439465
* @private
440466
*/
441-
private _registerAckCallback(id: number, ack: Function) {
467+
private _registerAckCallback(id: number, ack: (...args: any[]) => void) {
442468
const timeout = this.flags.timeout ?? this._opts.ackTimeout;
443469
if (timeout === undefined) {
444470
this.acks[id] = ack;
@@ -458,11 +484,14 @@ export class Socket<
458484
ack.call(this, new Error("operation has timed out"));
459485
}, timeout);
460486

461-
this.acks[id] = (...args) => {
487+
const fn = (...args: any[]) => {
462488
// @ts-ignore
463489
this.io.clearTimeoutFn(timer);
464-
ack.apply(this, [null, ...args]);
490+
ack.apply(this, args);
465491
};
492+
fn.withError = true;
493+
494+
this.acks[id] = fn;
466495
}
467496

468497
/**
@@ -485,17 +514,12 @@ export class Socket<
485514
ev: Ev,
486515
...args: AllButLast<EventParams<EmitEvents, Ev>>
487516
): Promise<FirstArg<Last<EventParams<EmitEvents, Ev>>>> {
488-
// the timeout flag is optional
489-
const withErr =
490-
this.flags.timeout !== undefined || this._opts.ackTimeout !== undefined;
491517
return new Promise((resolve, reject) => {
492-
args.push((arg1, arg2) => {
493-
if (withErr) {
494-
return arg1 ? reject(arg1) : resolve(arg2);
495-
} else {
496-
return resolve(arg1);
497-
}
498-
});
518+
const fn = (arg1, arg2) => {
519+
return arg1 ? reject(arg1) : resolve(arg2);
520+
};
521+
fn.withError = true;
522+
args.push(fn);
499523
this.emit(ev, ...(args as any[] as EventParams<EmitEvents, Ev>));
500524
});
501525
}
@@ -647,6 +671,30 @@ export class Socket<
647671
this.connected = false;
648672
delete this.id;
649673
this.emitReserved("disconnect", reason, description);
674+
this._clearAcks();
675+
}
676+
677+
/**
678+
* Clears the acknowledgement handlers upon disconnection, since the client will never receive an acknowledgement from
679+
* the server.
680+
*
681+
* @private
682+
*/
683+
private _clearAcks() {
684+
Object.keys(this.acks).forEach((id) => {
685+
const isBuffered = this.sendBuffer.some(
686+
(packet) => String(packet.id) === id
687+
);
688+
if (!isBuffered) {
689+
// note: handlers that do not accept an error as first argument are ignored here
690+
const ack = this.acks[id];
691+
delete this.acks[id];
692+
693+
if (ack.withError) {
694+
ack.call(this, new Error("socket has been disconnected"));
695+
}
696+
}
697+
});
650698
}
651699

652700
/**
@@ -756,20 +804,25 @@ export class Socket<
756804
}
757805

758806
/**
759-
* Called upon a server acknowlegement.
807+
* Called upon a server acknowledgement.
760808
*
761809
* @param packet
762810
* @private
763811
*/
764812
private onack(packet: Packet): void {
765813
const ack = this.acks[packet.id];
766-
if ("function" === typeof ack) {
767-
debug("calling ack %s with %j", packet.id, packet.data);
768-
ack.apply(this, packet.data);
769-
delete this.acks[packet.id];
770-
} else {
814+
if (typeof ack !== "function") {
771815
debug("bad ack %s", packet.id);
816+
return;
817+
}
818+
delete this.acks[packet.id];
819+
debug("calling ack %s with %j", packet.id, packet.data);
820+
// @ts-ignore FIXME ack is incorrectly inferred as 'never'
821+
if (ack.withError) {
822+
packet.data.unshift(null);
772823
}
824+
// @ts-ignore
825+
ack.apply(this, packet.data);
773826
}
774827

775828
/**

test/socket.ts

Lines changed: 124 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -651,7 +651,7 @@ describe("socket", () => {
651651
});
652652
});
653653

654-
it("should use the default value", () => {
654+
it("should use the default timeout value", () => {
655655
return wrap((done) => {
656656
const socket = io(BASE_URL + "/", {
657657
ackTimeout: 50,
@@ -663,5 +663,128 @@ describe("socket", () => {
663663
});
664664
});
665665
});
666+
667+
describe("acknowledgement upon disconnection", () => {
668+
it("should not ack upon disconnection (callback)", () => {
669+
return wrap((done) => {
670+
const socket = io(BASE_URL, {
671+
forceNew: true,
672+
});
673+
674+
socket.on("connect", () => {
675+
socket.emit("echo", "a", (_value) => {
676+
done(new Error("should not happen"));
677+
});
678+
679+
socket.disconnect();
680+
681+
// @ts-ignore property 'acks' is private
682+
expect(Object.keys(socket.acks).length).to.eql(0);
683+
684+
setTimeout(() => success(done, socket), 100);
685+
});
686+
});
687+
});
688+
689+
it("should ack with an error upon disconnection (callback & timeout)", () => {
690+
return wrap((done) => {
691+
const socket = io(BASE_URL, {
692+
forceNew: true,
693+
});
694+
695+
socket.on("connect", () => {
696+
socket.timeout(10_000).emit("echo", "a", (err) => {
697+
expect(err).to.be.an(Error);
698+
699+
// @ts-ignore property 'acks' is private
700+
expect(Object.keys(socket.acks).length).to.eql(0);
701+
702+
success(done, socket);
703+
});
704+
705+
socket.disconnect();
706+
});
707+
});
708+
});
709+
710+
it("should ack with an error upon disconnection (callback & ackTimeout)", () => {
711+
return wrap((done) => {
712+
const socket = io(BASE_URL, {
713+
forceNew: true,
714+
ackTimeout: 10_000,
715+
});
716+
717+
socket.on("connect", () => {
718+
socket.emit("echo", "a", (err) => {
719+
expect(err).to.be.an(Error);
720+
721+
success(done, socket);
722+
});
723+
724+
socket.disconnect();
725+
});
726+
});
727+
});
728+
729+
it("should ack with an error upon disconnection (promise)", () => {
730+
return wrap((done) => {
731+
const socket = io(BASE_URL, {
732+
forceNew: true,
733+
});
734+
735+
socket.on("connect", () => {
736+
socket.emitWithAck("echo", "a").catch((err) => {
737+
expect(err).to.be.an(Error);
738+
739+
success(done, socket);
740+
});
741+
742+
socket.disconnect();
743+
});
744+
});
745+
});
746+
747+
it("should ack with an error upon disconnection (promise & timeout)", () => {
748+
return wrap((done) => {
749+
const socket = io(BASE_URL, {
750+
forceNew: true,
751+
});
752+
753+
socket.on("connect", () => {
754+
socket
755+
.timeout(10_000)
756+
.emitWithAck("echo", "a")
757+
.catch((err) => {
758+
expect(err).to.be.an(Error);
759+
760+
success(done, socket);
761+
});
762+
763+
socket.disconnect();
764+
});
765+
});
766+
});
767+
768+
it("should not discard an unsent ack (callback)", () => {
769+
return wrap((done) => {
770+
const socket = io(BASE_URL, {
771+
forceNew: true,
772+
});
773+
774+
socket.once("connect", () => {
775+
socket.disconnect();
776+
777+
// the packet will be buffered and sent upon reconnection
778+
socket.emit("echo", "a", (value) => {
779+
expect(value).to.eql("a");
780+
781+
success(done, socket);
782+
});
783+
784+
setTimeout(() => socket.connect(), 100);
785+
});
786+
});
787+
});
788+
});
666789
});
667790
});

0 commit comments

Comments
 (0)