diff --git a/src/connection/index.ts b/src/connection/index.ts index 7d73e6130e..272c89259b 100644 --- a/src/connection/index.ts +++ b/src/connection/index.ts @@ -130,13 +130,18 @@ export class ConnectionImpl implements Connection { */ addStream (stream: Stream): void { stream.stat.direction = 'inbound' + this.streams.push(stream) } /** * Remove stream registry after it is closed */ removeStream (id: string): void { - + const indexToRemove = this.streams.findIndex(s => s.id === id) + if (indexToRemove === -1) { + return + } + this.streams.splice(indexToRemove, 1) } /** diff --git a/src/upgrader.ts b/src/upgrader.ts index 1b290f0dc6..0432bec403 100644 --- a/src/upgrader.ts +++ b/src/upgrader.ts @@ -417,7 +417,7 @@ export class DefaultUpgrader extends EventEmitter implements Upg }, // Run anytime a stream closes onStreamEnd: muxedStream => { - connection?.removeStream(muxedStream.id) + connection.removeStream(muxedStream.id) } }) diff --git a/test/dialing/direct.node.ts b/test/dialing/direct.node.ts index eb3d74d6e8..36eb9989f7 100644 --- a/test/dialing/direct.node.ts +++ b/test/dialing/direct.node.ts @@ -335,6 +335,62 @@ describe('libp2p.dialer (direct, TCP)', () => { expect(dialerDialSpy.callCount).to.be.greaterThan(0) }) + it('should remove a stream from the connection when it closes', async () => { + libp2p = await createLibp2pNode({ + peerId, + transports: [ + tcp() + ], + streamMuxers: [ + mplex() + ], + connectionEncryption: [ + plaintext() + ] + }) + + await libp2p.start() + + await libp2p.components.peerStore.addressBook.set(remotePeerId, remoteLibp2p.getMultiaddrs()) + const connection = await libp2p.dial(remotePeerId) + + // Create local to remote streams + const stream = await connection.newStream('/echo/1.0.0') + + connection.addStream(stream) + expect(connection.streams).to.include(stream) + + connection.removeStream(stream.id) + expect(connection.streams).to.not.include(stream.id) + }) + + it('should only close an existing stream', async () => { + libp2p = await createLibp2pNode({ + peerId, + transports: [ + tcp() + ], + streamMuxers: [ + mplex() + ], + connectionEncryption: [ + plaintext() + ] + }) + + await libp2p.start() + + await libp2p.components.peerStore.addressBook.set(remotePeerId, remoteLibp2p.getMultiaddrs()) + const connection = await libp2p.dial(remotePeerId) + + // Create local to remote streams + const stream = await connection.newStream('/echo/1.0.0') + connection.addStream(stream) + connection.removeStream('non-existent-stream-id') + + expect(connection.streams).to.include(stream) + }) + it('should close all streams when the connection closes', async () => { libp2p = await createLibp2pNode({ peerId,