11import { abortableSource } from 'abortable-iterator'
22import { type Pushable , pushable } from 'it-pushable'
33import defer , { type DeferredPromise } from 'p-defer'
4+ import { raceSignal } from 'race-signal'
45import { Uint8ArrayList } from 'uint8arraylist'
56import { CodeError } from '../errors.js'
67import type { Direction , ReadStatus , Stream , StreamStatus , StreamTimeline , WriteStatus } from '../connection/index.js'
78import type { AbortOptions } from '../index.js'
89import type { Source } from 'it-stream-types'
910
11+ // copied from @libp 2p/logger to break a circular dependency
1012interface Logger {
1113 ( formatter : any , ...args : any [ ] ) : void
1214 error : ( formatter : any , ...args : any [ ] ) => void
@@ -16,6 +18,7 @@ interface Logger {
1618
1719const ERR_STREAM_RESET = 'ERR_STREAM_RESET'
1820const ERR_SINK_INVALID_STATE = 'ERR_SINK_INVALID_STATE'
21+ const DEFAULT_SEND_CLOSE_WRITE_TIMEOUT = 5000
1922
2023export interface AbstractStreamInit {
2124 /**
@@ -68,6 +71,12 @@ export interface AbstractStreamInit {
6871 * connection when closing the writable end of the stream. (default: 500)
6972 */
7073 closeTimeout ?: number
74+
75+ /**
76+ * After the stream sink has closed, a limit on how long it takes to send
77+ * a close-write message to the remote peer.
78+ */
79+ sendCloseWriteTimeout ?: number
7180}
7281
7382function isPromise ( res ?: any ) : res is Promise < void > {
@@ -94,6 +103,7 @@ export abstract class AbstractStream implements Stream {
94103 private readonly onCloseWrite ?: ( ) => void
95104 private readonly onReset ?: ( ) => void
96105 private readonly onAbort ?: ( err : Error ) => void
106+ private readonly sendCloseWriteTimeout : number
97107
98108 protected readonly log : Logger
99109
@@ -113,6 +123,7 @@ export abstract class AbstractStream implements Stream {
113123 this . timeline = {
114124 open : Date . now ( )
115125 }
126+ this . sendCloseWriteTimeout = init . sendCloseWriteTimeout ?? DEFAULT_SEND_CLOSE_WRITE_TIMEOUT
116127
117128 this . onEnd = init . onEnd
118129 this . onCloseRead = init ?. onCloseRead
@@ -128,7 +139,6 @@ export abstract class AbstractStream implements Stream {
128139 this . log . trace ( 'source ended' )
129140 }
130141
131- this . readStatus = 'closed'
132142 this . onSourceEnd ( err )
133143 }
134144 } )
@@ -173,11 +183,19 @@ export abstract class AbstractStream implements Stream {
173183 }
174184 }
175185
176- this . log . trace ( 'sink finished reading from source' )
177- this . writeStatus = 'done'
186+ this . log . trace ( 'sink finished reading from source, write status is "%s"' , this . writeStatus )
187+
188+ if ( this . writeStatus === 'writing' ) {
189+ this . writeStatus = 'closing'
190+
191+ this . log . trace ( 'send close write to remote' )
192+ await this . sendCloseWrite ( {
193+ signal : AbortSignal . timeout ( this . sendCloseWriteTimeout )
194+ } )
195+
196+ this . writeStatus = 'closed'
197+ }
178198
179- this . log . trace ( 'sink calling closeWrite' )
180- await this . closeWrite ( options )
181199 this . onSinkEnd ( )
182200 } catch ( err : any ) {
183201 this . log . trace ( 'sink ended with error, calling abort with error' , err )
@@ -196,6 +214,7 @@ export abstract class AbstractStream implements Stream {
196214 }
197215
198216 this . timeline . closeRead = Date . now ( )
217+ this . readStatus = 'closed'
199218
200219 if ( err != null && this . endErr == null ) {
201220 this . endErr = err
@@ -207,6 +226,10 @@ export abstract class AbstractStream implements Stream {
207226 this . log . trace ( 'source and sink ended' )
208227 this . timeline . close = Date . now ( )
209228
229+ if ( this . status !== 'aborted' && this . status !== 'reset' ) {
230+ this . status = 'closed'
231+ }
232+
210233 if ( this . onEnd != null ) {
211234 this . onEnd ( this . endErr )
212235 }
@@ -221,6 +244,7 @@ export abstract class AbstractStream implements Stream {
221244 }
222245
223246 this . timeline . closeWrite = Date . now ( )
247+ this . writeStatus = 'closed'
224248
225249 if ( err != null && this . endErr == null ) {
226250 this . endErr = err
@@ -232,6 +256,10 @@ export abstract class AbstractStream implements Stream {
232256 this . log . trace ( 'sink and source ended' )
233257 this . timeline . close = Date . now ( )
234258
259+ if ( this . status !== 'aborted' && this . status !== 'reset' ) {
260+ this . status = 'closed'
261+ }
262+
235263 if ( this . onEnd != null ) {
236264 this . onEnd ( this . endErr )
237265 }
@@ -266,16 +294,16 @@ export abstract class AbstractStream implements Stream {
266294 const readStatus = this . readStatus
267295 this . readStatus = 'closing'
268296
269- if ( readStatus === 'ready' ) {
270- this . log . trace ( 'ending internal source queue' )
271- this . streamSource . end ( )
272- }
273-
274297 if ( this . status !== 'reset' && this . status !== 'aborted' && this . timeline . closeRead == null ) {
275298 this . log . trace ( 'send close read to remote' )
276299 await this . sendCloseRead ( options )
277300 }
278301
302+ if ( readStatus === 'ready' ) {
303+ this . log . trace ( 'ending internal source queue' )
304+ this . streamSource . end ( )
305+ }
306+
279307 this . log . trace ( 'closed readable end of stream' )
280308 }
281309
@@ -286,33 +314,26 @@ export abstract class AbstractStream implements Stream {
286314
287315 this . log . trace ( 'closing writable end of stream with starting write status "%s"' , this . writeStatus )
288316
289- const writeStatus = this . writeStatus
290-
291317 if ( this . writeStatus === 'ready' ) {
292318 this . log . trace ( 'sink was never sunk, sink an empty array' )
293- await this . sink ( [ ] )
294- }
295319
296- this . writeStatus = 'closing'
320+ await raceSignal ( this . sink ( [ ] ) , options . signal )
321+ }
297322
298- if ( writeStatus === 'writing' ) {
323+ if ( this . writeStatus === 'writing' ) {
299324 // stop reading from the source passed to `.sink` in the microtask queue
300325 // - this lets any data queued by the user in the current tick get read
301326 // before we exit
302327 await new Promise ( ( resolve , reject ) => {
303328 queueMicrotask ( ( ) => {
304329 this . log . trace ( 'aborting source passed to .sink' )
305330 this . sinkController . abort ( )
306- this . sinkEnd . promise . then ( resolve , reject )
331+ raceSignal ( this . sinkEnd . promise , options . signal )
332+ . then ( resolve , reject )
307333 } )
308334 } )
309335 }
310336
311- if ( this . status !== 'reset' && this . status !== 'aborted' && this . timeline . closeWrite == null ) {
312- this . log . trace ( 'send close write to remote' )
313- await this . sendCloseWrite ( options )
314- }
315-
316337 this . writeStatus = 'closed'
317338
318339 this . log . trace ( 'closed writable end of stream' )
@@ -357,6 +378,7 @@ export abstract class AbstractStream implements Stream {
357378 const err = new CodeError ( 'stream reset' , ERR_STREAM_RESET )
358379
359380 this . status = 'reset'
381+ this . timeline . reset = Date . now ( )
360382 this . _closeSinkAndSource ( err )
361383 this . onReset ?.( )
362384 }
@@ -423,7 +445,7 @@ export abstract class AbstractStream implements Stream {
423445 return
424446 }
425447
426- this . log . trace ( 'muxer destroyed' )
448+ this . log . trace ( 'stream destroyed' )
427449
428450 this . _closeSinkAndSource ( )
429451 }
0 commit comments