11import { noise } from '@chainsafe/libp2p-noise'
22import { type Transport , symbol , type CreateListenerOptions , type DialOptions , type Listener } from '@libp2p/interface/transport'
33import { logger } from '@libp2p/logger'
4- import { peerIdFromString } from '@libp2p/peer-id'
5- import { type Multiaddr , protocols , type AbortOptions } from '@multiformats/multiaddr'
6- import { bases , digest } from 'multiformats/basics'
7- import { Uint8ArrayList } from 'uint8arraylist'
8- import type { Connection , Direction , MultiaddrConnection , Stream } from '@libp2p/interface/connection'
4+ import { type Multiaddr , type AbortOptions } from '@multiformats/multiaddr'
5+ import { webtransportBiDiStreamToStream } from './stream.js'
6+ import { inertDuplex } from './utils/inert-duplex.js'
7+ import { isSubset } from './utils/is-subset.js'
8+ import { parseMultiaddr } from './utils/parse-multiaddr.js'
9+ import type { Connection , MultiaddrConnection , Stream } from '@libp2p/interface/connection'
910import type { PeerId } from '@libp2p/interface/peer-id'
1011import type { StreamMuxerFactory , StreamMuxerInit , StreamMuxer } from '@libp2p/interface/stream-muxer'
11- import type { Duplex , Source } from 'it-stream-types'
12+ import type { Source } from 'it-stream-types'
1213import type { MultihashDigest } from 'multiformats/hashes/interface'
1314
1415declare global {
@@ -17,300 +18,6 @@ declare global {
1718
1819const log = logger ( 'libp2p:webtransport' )
1920
20- // @ts -expect-error - Not easy to combine these types.
21- const multibaseDecoder = Object . values ( bases ) . map ( b => b . decoder ) . reduce ( ( d , b ) => d . or ( b ) )
22-
23- function decodeCerthashStr ( s : string ) : MultihashDigest {
24- return digest . decode ( multibaseDecoder . decode ( s ) )
25- }
26-
27- // Duplex that does nothing. Needed to fulfill the interface
28- function inertDuplex ( ) : Duplex < any , any , any > {
29- return {
30- source : {
31- [ Symbol . asyncIterator ] ( ) {
32- return {
33- async next ( ) {
34- // This will never resolve
35- return new Promise ( ( ) => { } )
36- }
37- }
38- }
39- } ,
40- sink : async ( source : Source < any > ) => {
41- // This will never resolve
42- return new Promise ( ( ) => { } )
43- }
44- }
45- }
46-
47- async function webtransportBiDiStreamToStream ( bidiStream : any , streamId : string , direction : Direction , activeStreams : Stream [ ] , onStreamEnd : undefined | ( ( s : Stream ) => void ) ) : Promise < Stream > {
48- const writer = bidiStream . writable . getWriter ( )
49- const reader = bidiStream . readable . getReader ( )
50- await writer . ready
51-
52- function cleanupStreamFromActiveStreams ( ) : void {
53- const index = activeStreams . findIndex ( s => s === stream )
54- if ( index !== - 1 ) {
55- activeStreams . splice ( index , 1 )
56- stream . timeline . close = Date . now ( )
57- onStreamEnd ?.( stream )
58- }
59- }
60-
61- let writerClosed = false
62- let readerClosed = false ;
63- ( async function ( ) {
64- const err : Error | undefined = await writer . closed . catch ( ( err : Error ) => err )
65- if ( err != null ) {
66- const msg = err . message
67- if ( ! ( msg . includes ( 'aborted by the remote server' ) || msg . includes ( 'STOP_SENDING' ) ) ) {
68- log . error ( `WebTransport writer closed unexpectedly: streamId=${ streamId } err=${ err . message } ` )
69- }
70- }
71- writerClosed = true
72- if ( writerClosed && readerClosed ) {
73- cleanupStreamFromActiveStreams ( )
74- }
75- } ) ( ) . catch ( ( ) => {
76- log . error ( 'WebTransport failed to cleanup closed stream' )
77- } ) ;
78-
79- ( async function ( ) {
80- const err : Error | undefined = await reader . closed . catch ( ( err : Error ) => err )
81- if ( err != null ) {
82- log . error ( `WebTransport reader closed unexpectedly: streamId=${ streamId } err=${ err . message } ` )
83- }
84- readerClosed = true
85- if ( writerClosed && readerClosed ) {
86- cleanupStreamFromActiveStreams ( )
87- }
88- } ) ( ) . catch ( ( ) => {
89- log . error ( 'WebTransport failed to cleanup closed stream' )
90- } )
91-
92- let sinkSunk = false
93- const stream : Stream = {
94- id : streamId ,
95- status : 'open' ,
96- writeStatus : 'ready' ,
97- readStatus : 'ready' ,
98- abort ( err : Error ) {
99- if ( ! writerClosed ) {
100- writer . abort ( )
101- writerClosed = true
102- }
103- stream . abort ( err )
104- readerClosed = true
105-
106- this . status = 'aborted'
107- this . writeStatus = 'closed'
108- this . readStatus = 'closed'
109-
110- this . timeline . reset =
111- this . timeline . close =
112- this . timeline . closeRead =
113- this . timeline . closeWrite = Date . now ( )
114-
115- cleanupStreamFromActiveStreams ( )
116- } ,
117- async close ( options ?: AbortOptions ) {
118- this . status = 'closing'
119-
120- await Promise . all ( [
121- stream . closeRead ( options ) ,
122- stream . closeWrite ( options )
123- ] )
124-
125- cleanupStreamFromActiveStreams ( )
126-
127- this . status = 'closed'
128- this . timeline . close = Date . now ( )
129- } ,
130-
131- async closeRead ( options ?: AbortOptions ) {
132- if ( ! readerClosed ) {
133- this . readStatus = 'closing'
134-
135- try {
136- await reader . cancel ( )
137- } catch ( err : any ) {
138- if ( err . toString ( ) . includes ( 'RESET_STREAM' ) === true ) {
139- writerClosed = true
140- }
141- }
142-
143- this . timeline . closeRead = Date . now ( )
144- this . readStatus = 'closed'
145-
146- readerClosed = true
147- }
148-
149- if ( writerClosed ) {
150- cleanupStreamFromActiveStreams ( )
151- }
152- } ,
153-
154- async closeWrite ( options ?: AbortOptions ) {
155- if ( ! writerClosed ) {
156- writerClosed = true
157-
158- this . writeStatus = 'closing'
159-
160- try {
161- await writer . close ( )
162- } catch ( err : any ) {
163- if ( err . toString ( ) . includes ( 'RESET_STREAM' ) === true ) {
164- readerClosed = true
165- }
166- }
167-
168- this . timeline . closeWrite = Date . now ( )
169- this . writeStatus = 'closed'
170- }
171-
172- if ( readerClosed ) {
173- cleanupStreamFromActiveStreams ( )
174- }
175- } ,
176- direction,
177- timeline : { open : Date . now ( ) } ,
178- metadata : { } ,
179- source : ( async function * ( ) {
180- while ( true ) {
181- const val = await reader . read ( )
182- if ( val . done === true ) {
183- readerClosed = true
184- if ( writerClosed ) {
185- cleanupStreamFromActiveStreams ( )
186- }
187- return
188- }
189-
190- yield new Uint8ArrayList ( val . value )
191- }
192- } ) ( ) ,
193- sink : async function ( source : Source < Uint8Array | Uint8ArrayList > ) {
194- if ( sinkSunk ) {
195- throw new Error ( 'sink already called on stream' )
196- }
197- sinkSunk = true
198- try {
199- this . writeStatus = 'writing'
200-
201- for await ( const chunks of source ) {
202- if ( chunks instanceof Uint8Array ) {
203- await writer . write ( chunks )
204- } else {
205- for ( const buf of chunks ) {
206- await writer . write ( buf )
207- }
208- }
209- }
210-
211- this . writeStatus = 'done'
212- } finally {
213- this . timeline . closeWrite = Date . now ( )
214- this . writeStatus = 'closed'
215-
216- await stream . closeWrite ( )
217- }
218- }
219- }
220-
221- return stream
222- }
223-
224- function parseMultiaddr ( ma : Multiaddr ) : { url : string , certhashes : MultihashDigest [ ] , remotePeer ?: PeerId } {
225- const parts = ma . stringTuples ( )
226-
227- // This is simpler to have inline than extract into a separate function
228- // eslint-disable-next-line complexity
229- const { url, certhashes, remotePeer } = parts . reduce ( ( state : { url : string , certhashes : MultihashDigest [ ] , seenHost : boolean , seenPort : boolean , remotePeer ?: PeerId } , [ proto , value ] ) => {
230- switch ( proto ) {
231- case protocols ( 'ip6' ) . code :
232- // @ts -expect-error - ts error on switch fallthrough
233- case protocols ( 'dns6' ) . code :
234- if ( value ?. includes ( ':' ) === true ) {
235- /**
236- * This resolves cases where `new globalThis.WebTransport` fails to construct because of an invalid URL being passed.
237- *
238- * `new URL('https://::1:4001/blah')` will throw a `TypeError: Failed to construct 'URL': Invalid URL`
239- * `new URL('https://[::1]:4001/blah')` is valid and will not.
240- *
241- * @see https://datatracker.ietf.org/doc/html/rfc3986#section-3.2.2
242- */
243- value = `[${ value } ]`
244- }
245- // eslint-disable-next-line no-fallthrough
246- case protocols ( 'ip4' ) . code :
247- case protocols ( 'dns4' ) . code :
248- if ( state . seenHost || state . seenPort ) {
249- throw new Error ( 'Invalid multiaddr, saw host and already saw the host or port' )
250- }
251- return {
252- ...state ,
253- url : `${ state . url } ${ value ?? '' } ` ,
254- seenHost : true
255- }
256- case protocols ( 'quic' ) . code :
257- case protocols ( 'quic-v1' ) . code :
258- case protocols ( 'webtransport' ) . code :
259- if ( ! state . seenHost || ! state . seenPort ) {
260- throw new Error ( "Invalid multiaddr, Didn't see host and port, but saw quic/webtransport" )
261- }
262- return state
263- case protocols ( 'udp' ) . code :
264- if ( state . seenPort ) {
265- throw new Error ( 'Invalid multiaddr, saw port but already saw the port' )
266- }
267- return {
268- ...state ,
269- url : `${ state . url } :${ value ?? '' } ` ,
270- seenPort : true
271- }
272- case protocols ( 'certhash' ) . code :
273- if ( ! state . seenHost || ! state . seenPort ) {
274- throw new Error ( 'Invalid multiaddr, saw the certhash before seeing the host and port' )
275- }
276- return {
277- ...state ,
278- certhashes : state . certhashes . concat ( [ decodeCerthashStr ( value ?? '' ) ] )
279- }
280- case protocols ( 'p2p' ) . code :
281- return {
282- ...state ,
283- remotePeer : peerIdFromString ( value ?? '' )
284- }
285- default :
286- throw new Error ( `unexpected component in multiaddr: ${ proto } ${ protocols ( proto ) . name } ${ value ?? '' } ` )
287- }
288- } ,
289- // All webtransport urls are https
290- { url : 'https://' , seenHost : false , seenPort : false , certhashes : [ ] } )
291-
292- return { url, certhashes, remotePeer }
293- }
294-
295- // Determines if `maybeSubset` is a subset of `set`. This means that all byte arrays in `maybeSubset` are present in `set`.
296- export function isSubset ( set : Uint8Array [ ] , maybeSubset : Uint8Array [ ] ) : boolean {
297- const intersection = maybeSubset . filter ( byteArray => {
298- return Boolean ( set . find ( ( otherByteArray : Uint8Array ) => {
299- if ( byteArray . length !== otherByteArray . length ) {
300- return false
301- }
302-
303- for ( let index = 0 ; index < byteArray . length ; index ++ ) {
304- if ( otherByteArray [ index ] !== byteArray [ index ] ) {
305- return false
306- }
307- }
308- return true
309- } ) )
310- } )
311- return ( intersection . length === maybeSubset . length )
312- }
313-
31421export interface WebTransportInit {
31522 maxInboundStreams ?: number
31623}
0 commit comments