@@ -16,6 +16,16 @@ declare global {
1616 var WebTransport : any
1717}
1818
19+ // https://www.w3.org/TR/webtransport/#web-transport-close-info
20+ interface WebTransportCloseInfo {
21+ closeCode : number
22+ reason : string
23+ }
24+
25+ interface WebTransportSessionCleanup {
26+ ( closeInfo ?: WebTransportCloseInfo ) : void
27+ }
28+
1929const log = logger ( 'libp2p:webtransport' )
2030
2131export interface WebTransportInit {
@@ -42,6 +52,8 @@ class WebTransportTransport implements Transport {
4252 readonly [ symbol ] = true
4353
4454 async dial ( ma : Multiaddr , options : DialOptions ) : Promise < Connection > {
55+ options ?. signal ?. throwIfAborted ( )
56+
4557 log ( 'dialing %s' , ma )
4658 const localPeer = this . components . peerId
4759 if ( localPeer === undefined ) {
@@ -52,60 +64,122 @@ class WebTransportTransport implements Transport {
5264
5365 const { url, certhashes, remotePeer } = parseMultiaddr ( ma )
5466
55- if ( certhashes . length === 0 ) {
56- throw new Error ( 'Expected multiaddr to contain certhashes' )
57- }
58-
59- const wt = new WebTransport ( `${ url } /.well-known/libp2p-webtransport?type=noise` , {
60- serverCertificateHashes : certhashes . map ( certhash => ( {
61- algorithm : 'sha-256' ,
62- value : certhash . digest
63- } ) )
64- } )
65- wt . closed . catch ( ( error : Error ) => {
66- log . error ( 'WebTransport transport closed due to:' , error )
67- } )
68- await wt . ready
69-
7067 if ( remotePeer == null ) {
7168 throw new Error ( 'Need a target peerid' )
7269 }
7370
74- if ( ! await this . authenticateWebTransport ( wt , localPeer , remotePeer , certhashes ) ) {
75- throw new Error ( 'Failed to authenticate webtransport ' )
71+ if ( certhashes . length === 0 ) {
72+ throw new Error ( 'Expected multiaddr to contain certhashes ' )
7673 }
7774
78- const maConn : MultiaddrConnection = {
79- close : async ( options ?: AbortOptions ) => {
80- log ( 'Closing webtransport' )
81- await wt . close ( )
82- } ,
83- abort : ( err : Error ) => {
84- log ( 'Aborting webtransport with err:' , err )
85- wt . close ( )
86- } ,
87- remoteAddr : ma ,
88- timeline : {
89- open : Date . now ( )
90- } ,
91- // This connection is never used directly since webtransport supports native streams.
92- ...inertDuplex ( )
93- }
75+ let abortListener : ( ( ) => void ) | undefined
76+ let maConn : MultiaddrConnection | undefined
9477
95- wt . closed . catch ( ( err : Error ) => {
96- log . error ( 'WebTransport connection closed:' , err )
97- // This is how we specify the connection is closed and shouldn't be used.
98- maConn . timeline . close = Date . now ( )
99- } )
78+ let cleanUpWTSession : ( closeInfo ?: WebTransportCloseInfo ) => void = ( ) => { }
10079
10180 try {
81+ let closed = false
82+ const wt = new WebTransport ( `${ url } /.well-known/libp2p-webtransport?type=noise` , {
83+ serverCertificateHashes : certhashes . map ( certhash => ( {
84+ algorithm : 'sha-256' ,
85+ value : certhash . digest
86+ } ) )
87+ } )
88+
89+ cleanUpWTSession = ( closeInfo ?: WebTransportCloseInfo ) => {
90+ try {
91+ if ( maConn != null ) {
92+ if ( maConn . timeline . close != null ) {
93+ // already closed session
94+ return
95+ }
96+
97+ // This is how we specify the connection is closed and shouldn't be used.
98+ maConn . timeline . close = Date . now ( )
99+ }
100+
101+ if ( closed ) {
102+ // already closed session
103+ return
104+ }
105+
106+ wt . close ( closeInfo )
107+ } catch ( err ) {
108+ log . error ( 'error closing wt session' , err )
109+ } finally {
110+ closed = true
111+ }
112+ }
113+
114+ // this promise resolves/throws when the session is closed or failed to init
115+ wt . closed
116+ . then ( async ( ) => {
117+ await maConn ?. close ( )
118+ } )
119+ . catch ( ( err : Error ) => {
120+ log . error ( 'error on remote wt session close' , err )
121+ maConn ?. abort ( err )
122+ } )
123+ . finally ( ( ) => {
124+ // if we never got as far as creating the maConn, just clean up the session
125+ if ( maConn == null ) {
126+ cleanUpWTSession ( )
127+ }
128+ } )
129+
130+ // if the dial is aborted before we are ready, close the WebTransport session
131+ abortListener = ( ) => {
132+ if ( abortListener != null ) {
133+ options . signal ?. removeEventListener ( 'abort' , abortListener )
134+ }
135+
136+ cleanUpWTSession ( )
137+ }
138+ options . signal ?. addEventListener ( 'abort' , abortListener )
139+
140+ await wt . ready
141+
142+ if ( ! await this . authenticateWebTransport ( wt , localPeer , remotePeer , certhashes ) ) {
143+ throw new Error ( 'Failed to authenticate webtransport' )
144+ }
145+
146+ maConn = {
147+ close : async ( options ?: AbortOptions ) => {
148+ log ( 'Closing webtransport' )
149+ cleanUpWTSession ( )
150+ } ,
151+ abort : ( err : Error ) => {
152+ log ( 'aborting webtransport due to passed err' , err )
153+ cleanUpWTSession ( {
154+ closeCode : 0 ,
155+ reason : err . message
156+ } )
157+ } ,
158+ remoteAddr : ma ,
159+ timeline : {
160+ open : Date . now ( )
161+ } ,
162+ // This connection is never used directly since webtransport supports native streams.
163+ ...inertDuplex ( )
164+ }
165+
102166 options ?. signal ?. throwIfAborted ( )
103- } catch ( e ) {
104- wt . close ( )
105- throw e
106- }
107167
108- return options . upgrader . upgradeOutbound ( maConn , { skipEncryption : true , muxerFactory : this . webtransportMuxer ( wt ) , skipProtection : true } )
168+ return await options . upgrader . upgradeOutbound ( maConn , { skipEncryption : true , muxerFactory : this . webtransportMuxer ( wt , cleanUpWTSession ) , skipProtection : true } )
169+ } catch ( err : any ) {
170+ log . error ( 'caught wt session err' , err )
171+
172+ cleanUpWTSession ( {
173+ closeCode : 0 ,
174+ reason : err . message
175+ } )
176+
177+ throw err
178+ } finally {
179+ if ( abortListener != null ) {
180+ options . signal ?. removeEventListener ( 'abort' , abortListener )
181+ }
182+ }
109183 }
110184
111185 async authenticateWebTransport ( wt : InstanceType < typeof WebTransport > , localPeer : PeerId , remotePeer : PeerId , certhashes : Array < MultihashDigest < number > > ) : Promise < boolean > {
@@ -156,7 +230,7 @@ class WebTransportTransport implements Transport {
156230 return true
157231 }
158232
159- webtransportMuxer ( wt : InstanceType < typeof WebTransport > ) : StreamMuxerFactory {
233+ webtransportMuxer ( wt : InstanceType < typeof WebTransport > , cleanUpWTSession : WebTransportSessionCleanup ) : StreamMuxerFactory {
160234 let streamIDCounter = 0
161235 const config = this . config
162236 return {
@@ -217,11 +291,14 @@ class WebTransportTransport implements Transport {
217291 */
218292 close : async ( options ?: AbortOptions ) => {
219293 log ( 'Closing webtransport muxer' )
220- await wt . close ( )
294+ cleanUpWTSession ( )
221295 } ,
222296 abort : ( err : Error ) => {
223297 log ( 'Aborting webtransport muxer with err:' , err )
224- wt . close ( )
298+ cleanUpWTSession ( {
299+ closeCode : 0 ,
300+ reason : err . message
301+ } )
225302 } ,
226303 // This stream muxer is webtransport native. Therefore it doesn't plug in with any other duplex.
227304 ...inertDuplex ( )
0 commit comments