@@ -126,7 +126,7 @@ use std::future::Future;
126
126
use std:: pin:: Pin ;
127
127
use std:: task:: { Context , Poll } ;
128
128
use std:: time:: Duration ;
129
- use std:: { convert , fmt, io, mem } ;
129
+ use std:: { fmt, io} ;
130
130
use tokio:: io:: { AsyncRead , AsyncWrite , ReadBuf } ;
131
131
use tracing:: instrument:: { Instrument , Instrumented } ;
132
132
@@ -298,8 +298,8 @@ enum Handshaking<T, B: Buf> {
298
298
Flushing ( Instrumented < Flush < T , Prioritized < B > > > ) ,
299
299
/// State 2. Connection is waiting for the client preface.
300
300
ReadingPreface ( Instrumented < ReadPreface < T , Prioritized < B > > > ) ,
301
- /// Dummy state for `mem::replace` .
302
- Empty ,
301
+ /// State 3. Handshake is done, polling again would panic .
302
+ Done ,
303
303
}
304
304
305
305
/// Flush a Sink
@@ -384,7 +384,8 @@ where
384
384
. expect ( "invalid SETTINGS frame" ) ;
385
385
386
386
// Create the handshake future.
387
- let state = Handshaking :: from ( codec) ;
387
+ let state =
388
+ Handshaking :: Flushing ( Flush :: new ( codec) . instrument ( tracing:: trace_span!( "flush" ) ) ) ;
388
389
389
390
drop ( entered) ;
390
391
@@ -1226,62 +1227,57 @@ where
1226
1227
let span = self . span . clone ( ) ; // XXX(eliza): T_T
1227
1228
let _e = span. enter ( ) ;
1228
1229
tracing:: trace!( state = ?self . state) ;
1229
- use crate :: server:: Handshaking :: * ;
1230
-
1231
- self . state = if let Flushing ( ref mut flush) = self . state {
1232
- // We're currently flushing a pending SETTINGS frame. Poll the
1233
- // flush future, and, if it's completed, advance our state to wait
1234
- // for the client preface.
1235
- let codec = match Pin :: new ( flush) . poll ( cx) ? {
1236
- Poll :: Pending => {
1237
- tracing:: trace!( flush. poll = %"Pending" ) ;
1238
- return Poll :: Pending ;
1230
+
1231
+ loop {
1232
+ match & mut self . state {
1233
+ Handshaking :: Flushing ( flush) => {
1234
+ // We're currently flushing a pending SETTINGS frame. Poll the
1235
+ // flush future, and, if it's completed, advance our state to wait
1236
+ // for the client preface.
1237
+ let codec = match Pin :: new ( flush) . poll ( cx) ? {
1238
+ Poll :: Pending => {
1239
+ tracing:: trace!( flush. poll = %"Pending" ) ;
1240
+ return Poll :: Pending ;
1241
+ }
1242
+ Poll :: Ready ( flushed) => {
1243
+ tracing:: trace!( flush. poll = %"Ready" ) ;
1244
+ flushed
1245
+ }
1246
+ } ;
1247
+ self . state = Handshaking :: ReadingPreface (
1248
+ ReadPreface :: new ( codec) . instrument ( tracing:: trace_span!( "read_preface" ) ) ,
1249
+ ) ;
1239
1250
}
1240
- Poll :: Ready ( flushed) => {
1241
- tracing:: trace!( flush. poll = %"Ready" ) ;
1242
- flushed
1251
+ Handshaking :: ReadingPreface ( read) => {
1252
+ let codec = ready ! ( Pin :: new( read) . poll( cx) ?) ;
1253
+
1254
+ self . state = Handshaking :: Done ;
1255
+
1256
+ let connection = proto:: Connection :: new (
1257
+ codec,
1258
+ Config {
1259
+ next_stream_id : 2 . into ( ) ,
1260
+ // Server does not need to locally initiate any streams
1261
+ initial_max_send_streams : 0 ,
1262
+ reset_stream_duration : self . builder . reset_stream_duration ,
1263
+ reset_stream_max : self . builder . reset_stream_max ,
1264
+ settings : self . builder . settings . clone ( ) ,
1265
+ } ,
1266
+ ) ;
1267
+
1268
+ tracing:: trace!( "connection established!" ) ;
1269
+ let mut c = Connection { connection } ;
1270
+ if let Some ( sz) = self . builder . initial_target_connection_window_size {
1271
+ c. set_target_window_size ( sz) ;
1272
+ }
1273
+
1274
+ return Poll :: Ready ( Ok ( c) ) ;
1275
+ }
1276
+ Handshaking :: Done => {
1277
+ panic ! ( "Handshaking::poll() state was not advanced completely!" )
1243
1278
}
1244
- } ;
1245
- Handshaking :: from ( ReadPreface :: new ( codec) )
1246
- } else {
1247
- // Otherwise, we haven't actually advanced the state, but we have
1248
- // to replace it with itself, because we have to return a value.
1249
- // (note that the assignment to `self.state` has to be outside of
1250
- // the `if let` block above in order to placate the borrow checker).
1251
- mem:: replace ( & mut self . state , Handshaking :: Empty )
1252
- } ;
1253
- let poll = if let ReadingPreface ( ref mut read) = self . state {
1254
- // We're now waiting for the client preface. Poll the `ReadPreface`
1255
- // future. If it has completed, we will create a `Connection` handle
1256
- // for the connection.
1257
- Pin :: new ( read) . poll ( cx)
1258
- // Actually creating the `Connection` has to occur outside of this
1259
- // `if let` block, because we've borrowed `self` mutably in order
1260
- // to poll the state and won't be able to borrow the SETTINGS frame
1261
- // as well until we release the borrow for `poll()`.
1262
- } else {
1263
- unreachable ! ( "Handshake::poll() state was not advanced completely!" )
1264
- } ;
1265
- poll?. map ( |codec| {
1266
- let connection = proto:: Connection :: new (
1267
- codec,
1268
- Config {
1269
- next_stream_id : 2 . into ( ) ,
1270
- // Server does not need to locally initiate any streams
1271
- initial_max_send_streams : 0 ,
1272
- reset_stream_duration : self . builder . reset_stream_duration ,
1273
- reset_stream_max : self . builder . reset_stream_max ,
1274
- settings : self . builder . settings . clone ( ) ,
1275
- } ,
1276
- ) ;
1277
-
1278
- tracing:: trace!( "connection established!" ) ;
1279
- let mut c = Connection { connection } ;
1280
- if let Some ( sz) = self . builder . initial_target_connection_window_size {
1281
- c. set_target_window_size ( sz) ;
1282
1279
}
1283
- Ok ( c)
1284
- } )
1280
+ }
1285
1281
}
1286
1282
}
1287
1283
@@ -1497,42 +1493,9 @@ where
1497
1493
#[ inline]
1498
1494
fn fmt ( & self , f : & mut fmt:: Formatter ) -> Result < ( ) , fmt:: Error > {
1499
1495
match * self {
1500
- Handshaking :: Flushing ( _) => write ! ( f, "Handshaking:: Flushing(_)" ) ,
1501
- Handshaking :: ReadingPreface ( _) => write ! ( f, "Handshaking:: ReadingPreface(_)" ) ,
1502
- Handshaking :: Empty => write ! ( f, "Handshaking::Empty " ) ,
1496
+ Handshaking :: Flushing ( _) => write ! ( f, "Flushing(_)" ) ,
1497
+ Handshaking :: ReadingPreface ( _) => write ! ( f, "ReadingPreface(_)" ) ,
1498
+ Handshaking :: Done => write ! ( f, "Done " ) ,
1503
1499
}
1504
1500
}
1505
1501
}
1506
-
1507
- impl < T , B > convert:: From < Flush < T , Prioritized < B > > > for Handshaking < T , B >
1508
- where
1509
- T : AsyncRead + AsyncWrite ,
1510
- B : Buf ,
1511
- {
1512
- #[ inline]
1513
- fn from ( flush : Flush < T , Prioritized < B > > ) -> Self {
1514
- Handshaking :: Flushing ( flush. instrument ( tracing:: trace_span!( "flush" ) ) )
1515
- }
1516
- }
1517
-
1518
- impl < T , B > convert:: From < ReadPreface < T , Prioritized < B > > > for Handshaking < T , B >
1519
- where
1520
- T : AsyncRead + AsyncWrite ,
1521
- B : Buf ,
1522
- {
1523
- #[ inline]
1524
- fn from ( read : ReadPreface < T , Prioritized < B > > ) -> Self {
1525
- Handshaking :: ReadingPreface ( read. instrument ( tracing:: trace_span!( "read_preface" ) ) )
1526
- }
1527
- }
1528
-
1529
- impl < T , B > convert:: From < Codec < T , Prioritized < B > > > for Handshaking < T , B >
1530
- where
1531
- T : AsyncRead + AsyncWrite ,
1532
- B : Buf ,
1533
- {
1534
- #[ inline]
1535
- fn from ( codec : Codec < T , Prioritized < B > > ) -> Self {
1536
- Handshaking :: from ( Flush :: new ( codec) )
1537
- }
1538
- }
0 commit comments