@@ -581,9 +581,11 @@ type serverConn struct {
581
581
advMaxStreams uint32 // our SETTINGS_MAX_CONCURRENT_STREAMS advertised the client
582
582
curClientStreams uint32 // number of open streams initiated by the client
583
583
curPushedStreams uint32 // number of open streams initiated by server push
584
+ curHandlers uint32 // number of running handler goroutines
584
585
maxClientStreamID uint32 // max ever seen from client (odd), or 0 if there have been no client requests
585
586
maxPushPromiseID uint32 // ID of the last push promise (even), or 0 if there have been no pushes
586
587
streams map [uint32 ]* stream
588
+ unstartedHandlers []unstartedHandler
587
589
initialStreamSendWindowSize int32
588
590
maxFrameSize int32
589
591
peerMaxHeaderListSize uint32 // zero means unknown (default)
@@ -976,6 +978,8 @@ func (sc *serverConn) serve() {
976
978
return
977
979
case gracefulShutdownMsg :
978
980
sc .startGracefulShutdownInternal ()
981
+ case handlerDoneMsg :
982
+ sc .handlerDone ()
979
983
default :
980
984
panic ("unknown timer" )
981
985
}
@@ -1023,6 +1027,7 @@ var (
1023
1027
idleTimerMsg = new (serverMessage )
1024
1028
shutdownTimerMsg = new (serverMessage )
1025
1029
gracefulShutdownMsg = new (serverMessage )
1030
+ handlerDoneMsg = new (serverMessage )
1026
1031
)
1027
1032
1028
1033
func (sc * serverConn ) onSettingsTimer () { sc .sendServeMsg (settingsTimerMsg ) }
@@ -2015,8 +2020,7 @@ func (sc *serverConn) processHeaders(f *MetaHeadersFrame) error {
2015
2020
}
2016
2021
}
2017
2022
2018
- go sc .runHandler (rw , req , handler )
2019
- return nil
2023
+ return sc .scheduleHandler (id , rw , req , handler )
2020
2024
}
2021
2025
2022
2026
func (sc * serverConn ) upgradeRequest (req * http.Request ) {
@@ -2036,6 +2040,10 @@ func (sc *serverConn) upgradeRequest(req *http.Request) {
2036
2040
sc .conn .SetReadDeadline (time.Time {})
2037
2041
}
2038
2042
2043
+ // This is the first request on the connection,
2044
+ // so start the handler directly rather than going
2045
+ // through scheduleHandler.
2046
+ sc .curHandlers ++
2039
2047
go sc .runHandler (rw , req , sc .handler .ServeHTTP )
2040
2048
}
2041
2049
@@ -2277,8 +2285,62 @@ func (sc *serverConn) newResponseWriter(st *stream, req *http.Request) *response
2277
2285
return & responseWriter {rws : rws }
2278
2286
}
2279
2287
2288
+ type unstartedHandler struct {
2289
+ streamID uint32
2290
+ rw * responseWriter
2291
+ req * http.Request
2292
+ handler func (http.ResponseWriter , * http.Request )
2293
+ }
2294
+
2295
+ // scheduleHandler starts a handler goroutine,
2296
+ // or schedules one to start as soon as an existing handler finishes.
2297
+ func (sc * serverConn ) scheduleHandler (streamID uint32 , rw * responseWriter , req * http.Request , handler func (http.ResponseWriter , * http.Request )) error {
2298
+ sc .serveG .check ()
2299
+ maxHandlers := sc .advMaxStreams
2300
+ if sc .curHandlers < maxHandlers {
2301
+ sc .curHandlers ++
2302
+ go sc .runHandler (rw , req , handler )
2303
+ return nil
2304
+ }
2305
+ if len (sc .unstartedHandlers ) > int (4 * sc .advMaxStreams ) {
2306
+ return sc .countError ("too_many_early_resets" , ConnectionError (ErrCodeEnhanceYourCalm ))
2307
+ }
2308
+ sc .unstartedHandlers = append (sc .unstartedHandlers , unstartedHandler {
2309
+ streamID : streamID ,
2310
+ rw : rw ,
2311
+ req : req ,
2312
+ handler : handler ,
2313
+ })
2314
+ return nil
2315
+ }
2316
+
2317
+ func (sc * serverConn ) handlerDone () {
2318
+ sc .serveG .check ()
2319
+ sc .curHandlers --
2320
+ i := 0
2321
+ maxHandlers := sc .advMaxStreams
2322
+ for ; i < len (sc .unstartedHandlers ); i ++ {
2323
+ u := sc .unstartedHandlers [i ]
2324
+ if sc .streams [u .streamID ] == nil {
2325
+ // This stream was reset before its goroutine had a chance to start.
2326
+ continue
2327
+ }
2328
+ if sc .curHandlers >= maxHandlers {
2329
+ break
2330
+ }
2331
+ sc .curHandlers ++
2332
+ go sc .runHandler (u .rw , u .req , u .handler )
2333
+ sc .unstartedHandlers [i ] = unstartedHandler {} // don't retain references
2334
+ }
2335
+ sc .unstartedHandlers = sc .unstartedHandlers [i :]
2336
+ if len (sc .unstartedHandlers ) == 0 {
2337
+ sc .unstartedHandlers = nil
2338
+ }
2339
+ }
2340
+
2280
2341
// Run on its own goroutine.
2281
2342
func (sc * serverConn ) runHandler (rw * responseWriter , req * http.Request , handler func (http.ResponseWriter , * http.Request )) {
2343
+ defer sc .sendServeMsg (handlerDoneMsg )
2282
2344
didPanic := true
2283
2345
defer func () {
2284
2346
rw .rws .stream .cancelCtx ()
0 commit comments