From f695a507e9ecacbcdae05da707fc6bdae61c92ef Mon Sep 17 00:00:00 2001 From: achingbrain Date: Thu, 16 Nov 2023 08:00:30 +0000 Subject: [PATCH] fix: close maconn stream after reading/writing Ensure the .close timeline property is set correctly after the sink/source for the stream have both finished. --- packages/utils/src/stream-to-ma-conn.ts | 35 ++++++++++++++++++------- 1 file changed, 25 insertions(+), 10 deletions(-) diff --git a/packages/utils/src/stream-to-ma-conn.ts b/packages/utils/src/stream-to-ma-conn.ts index 94efb4cf55..47ba0d91c3 100644 --- a/packages/utils/src/stream-to-ma-conn.ts +++ b/packages/utils/src/stream-to-ma-conn.ts @@ -18,13 +18,21 @@ export function streamToMaConnection (props: StreamProperties): MultiaddrConnect const { sink, source } = stream const log = logger.forComponent('libp2p:stream:converter') + let closedRead = false + let closedWrite = false + const mapSource = (async function * () { - for await (const list of source) { - if (list instanceof Uint8Array) { - yield list - } else { - yield * list + try { + for await (const list of source) { + if (list instanceof Uint8Array) { + yield list + } else { + yield * list + } } + } finally { + closedRead = true + close() } }()) @@ -32,7 +40,6 @@ export function streamToMaConnection (props: StreamProperties): MultiaddrConnect async sink (source) { try { await sink(source) - close() } catch (err: any) { // If aborted we can safely ignore if (err.type !== 'aborted') { @@ -41,23 +48,31 @@ export function streamToMaConnection (props: StreamProperties): MultiaddrConnect // destroyed. There's nothing to do here except log the error & return. log(err) } + } finally { + closedWrite = true + close() } }, source: mapSource, remoteAddr, timeline: { open: Date.now(), close: undefined }, async close (options?: AbortOptions) { - close() + close(true) await stream.close(options) }, abort (err: Error): void { - close() + close(true) stream.abort(err) } } - function close (): void { - if (maConn.timeline.close == null) { + function close (force?: boolean): void { + if (force === true) { + closedRead = true + closedWrite = true + } + + if (closedRead && closedWrite && maConn.timeline.close == null) { maConn.timeline.close = Date.now() } }