@@ -18,21 +18,28 @@ export function streamToMaConnection (props: StreamProperties): MultiaddrConnect
1818 const { sink, source } = stream
1919 const log = logger . forComponent ( 'libp2p:stream:converter' )
2020
21+ let closedRead = false
22+ let closedWrite = false
23+
2124 const mapSource = ( async function * ( ) {
22- for await ( const list of source ) {
23- if ( list instanceof Uint8Array ) {
24- yield list
25- } else {
26- yield * list
25+ try {
26+ for await ( const list of source ) {
27+ if ( list instanceof Uint8Array ) {
28+ yield list
29+ } else {
30+ yield * list
31+ }
2732 }
33+ } finally {
34+ closedRead = true
35+ close ( )
2836 }
2937 } ( ) )
3038
3139 const maConn : MultiaddrConnection = {
3240 async sink ( source ) {
3341 try {
3442 await sink ( source )
35- close ( )
3643 } catch ( err : any ) {
3744 // If aborted we can safely ignore
3845 if ( err . type !== 'aborted' ) {
@@ -41,23 +48,31 @@ export function streamToMaConnection (props: StreamProperties): MultiaddrConnect
4148 // destroyed. There's nothing to do here except log the error & return.
4249 log ( err )
4350 }
51+ } finally {
52+ closedWrite = true
53+ close ( )
4454 }
4555 } ,
4656 source : mapSource ,
4757 remoteAddr,
4858 timeline : { open : Date . now ( ) , close : undefined } ,
4959 async close ( options ?: AbortOptions ) {
50- close ( )
60+ close ( true )
5161 await stream . close ( options )
5262 } ,
5363 abort ( err : Error ) : void {
54- close ( )
64+ close ( true )
5565 stream . abort ( err )
5666 }
5767 }
5868
59- function close ( ) : void {
60- if ( maConn . timeline . close == null ) {
69+ function close ( force ?: boolean ) : void {
70+ if ( force === true ) {
71+ closedRead = true
72+ closedWrite = true
73+ }
74+
75+ if ( closedRead && closedWrite && maConn . timeline . close == null ) {
6176 maConn . timeline . close = Date . now ( )
6277 }
6378 }
0 commit comments