From dc8949e3905bccd689b819cf320ce512e415c908 Mon Sep 17 00:00:00 2001 From: chad Date: Fri, 3 Mar 2023 16:25:16 -0500 Subject: [PATCH 1/3] fix: Remove stream from registry after it's closed --- src/connection/index.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/connection/index.ts b/src/connection/index.ts index 953ca3bcb1..b645f1c7a6 100644 --- a/src/connection/index.ts +++ b/src/connection/index.ts @@ -136,7 +136,7 @@ export class ConnectionImpl implements Connection { * Remove stream registry after it is closed */ removeStream (id: string) { - + this.streams.splice(this.streams.findIndex(s => s.id === id), 1) } /** From 94d1c4f49ff322e0c558a0a40090bc3487d95251 Mon Sep 17 00:00:00 2001 From: chad Date: Thu, 9 Mar 2023 12:49:50 -0500 Subject: [PATCH 2/3] fix: handle case where stream doesn't exist in array + added tests (#1612) --- src/connection/index.ts | 7 ++++- src/upgrader.ts | 2 +- test/dialing/direct.node.ts | 56 +++++++++++++++++++++++++++++++++++++ 3 files changed, 63 insertions(+), 2 deletions(-) diff --git a/src/connection/index.ts b/src/connection/index.ts index b645f1c7a6..d8646e657c 100644 --- a/src/connection/index.ts +++ b/src/connection/index.ts @@ -130,13 +130,18 @@ export class ConnectionImpl implements Connection { */ addStream (stream: Stream) { stream.stat.direction = 'inbound' + this.streams.push(stream) } /** * Remove stream registry after it is closed */ removeStream (id: string) { - this.streams.splice(this.streams.findIndex(s => s.id === id), 1) + const indexToRemove = this.streams.findIndex(s => s.id === id) + if (indexToRemove === -1) { + return + } + this.streams.splice(indexToRemove, 1) } /** diff --git a/src/upgrader.ts b/src/upgrader.ts index 436e1f2d03..8c3410c8b0 100644 --- a/src/upgrader.ts +++ b/src/upgrader.ts @@ -416,7 +416,7 @@ export class DefaultUpgrader extends EventEmitter implements Upg }, // Run anytime a stream closes onStreamEnd: muxedStream => { - connection?.removeStream(muxedStream.id) + connection.removeStream(muxedStream.id) } }) diff --git a/test/dialing/direct.node.ts b/test/dialing/direct.node.ts index 956d75bb7d..d44f8189b3 100644 --- a/test/dialing/direct.node.ts +++ b/test/dialing/direct.node.ts @@ -335,6 +335,62 @@ describe('libp2p.dialer (direct, TCP)', () => { expect(dialerDialSpy.callCount).to.be.greaterThan(0) }) + it('should remove a stream from the connection when it closes', async () => { + libp2p = await createLibp2pNode({ + peerId, + transports: [ + tcp() + ], + streamMuxers: [ + mplex() + ], + connectionEncryption: [ + plaintext() + ] + }) + + await libp2p.start() + + await libp2p.components.peerStore.addressBook.set(remotePeerId, remoteLibp2p.getMultiaddrs()) + const connection = await libp2p.dial(remotePeerId) + + // Create local to remote streams + const stream = await connection.newStream('/echo/1.0.0') + + connection.addStream(stream) + expect(connection.streams).to.include(stream) + + connection.removeStream(stream.id) + expect(connection.streams).to.not.include(stream.id) + }) + + it('should only close an existing stream', async () => { + libp2p = await createLibp2pNode({ + peerId, + transports: [ + tcp() + ], + streamMuxers: [ + mplex() + ], + connectionEncryption: [ + plaintext() + ] + }) + + await libp2p.start() + + await libp2p.components.peerStore.addressBook.set(remotePeerId, remoteLibp2p.getMultiaddrs()) + const connection = await libp2p.dial(remotePeerId) + + // Create local to remote streams + const stream = await connection.newStream('/echo/1.0.0') + connection.addStream(stream) + connection.removeStream('non-existent-stream-id') + + expect(connection.streams).to.include(stream) + }) + it('should close all streams when the connection closes', async () => { libp2p = await createLibp2pNode({ peerId, From 4e423cb371254e4a803f04a4813cef9c6ec84212 Mon Sep 17 00:00:00 2001 From: chad Date: Wed, 15 Mar 2023 16:58:59 -0500 Subject: [PATCH 3/3] chore: added return type for linting on removeStream --- src/connection/index.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/connection/index.ts b/src/connection/index.ts index b3cfe8024f..272c89259b 100644 --- a/src/connection/index.ts +++ b/src/connection/index.ts @@ -136,7 +136,7 @@ export class ConnectionImpl implements Connection { /** * Remove stream registry after it is closed */ - removeStream (id: string) { + removeStream (id: string): void { const indexToRemove = this.streams.findIndex(s => s.id === id) if (indexToRemove === -1) { return