diff --git a/packages/utils/src/stream-to-ma-conn.ts b/packages/utils/src/stream-to-ma-conn.ts index 94efb4cf55..47ba0d91c3 100644 --- a/packages/utils/src/stream-to-ma-conn.ts +++ b/packages/utils/src/stream-to-ma-conn.ts @@ -18,13 +18,21 @@ export function streamToMaConnection (props: StreamProperties): MultiaddrConnect const { sink, source } = stream const log = logger.forComponent('libp2p:stream:converter') + let closedRead = false + let closedWrite = false + const mapSource = (async function * () { - for await (const list of source) { - if (list instanceof Uint8Array) { - yield list - } else { - yield * list + try { + for await (const list of source) { + if (list instanceof Uint8Array) { + yield list + } else { + yield * list + } } + } finally { + closedRead = true + close() } }()) @@ -32,7 +40,6 @@ export function streamToMaConnection (props: StreamProperties): MultiaddrConnect async sink (source) { try { await sink(source) - close() } catch (err: any) { // If aborted we can safely ignore if (err.type !== 'aborted') { @@ -41,23 +48,31 @@ export function streamToMaConnection (props: StreamProperties): MultiaddrConnect // destroyed. There's nothing to do here except log the error & return. log(err) } + } finally { + closedWrite = true + close() } }, source: mapSource, remoteAddr, timeline: { open: Date.now(), close: undefined }, async close (options?: AbortOptions) { - close() + close(true) await stream.close(options) }, abort (err: Error): void { - close() + close(true) stream.abort(err) } } - function close (): void { - if (maConn.timeline.close == null) { + function close (force?: boolean): void { + if (force === true) { + closedRead = true + closedWrite = true + } + + if (closedRead && closedWrite && maConn.timeline.close == null) { maConn.timeline.close = Date.now() } }