@@ -264,22 +264,29 @@ type ClientConn struct {
264
264
nextStreamID uint32
265
265
pendingRequests int // requests blocked and waiting to be sent because len(streams) == maxConcurrentStreams
266
266
pings map [[8 ]byte ]chan struct {} // in flight ping data to notification channel
267
- bw * bufio.Writer
268
267
br * bufio.Reader
269
- fr * Framer
270
268
lastActive time.Time
271
269
lastIdle time.Time // time last idle
272
- // Settings from peer: (also guarded by mu )
270
+ // Settings from peer: (also guarded by wmu )
273
271
maxFrameSize uint32
274
272
maxConcurrentStreams uint32
275
273
peerMaxHeaderListSize uint64
276
274
initialWindowSize uint32
277
275
276
+ // reqHeaderMu is a 1-element semaphore channel controlling access to sending new requests.
277
+ // Write to reqHeaderMu to lock it, read from it to unlock.
278
+ // Lock reqmu BEFORE mu or wmu.
279
+ reqHeaderMu chan struct {}
280
+
281
+ // wmu is held while writing.
282
+ // Acquire BEFORE mu when holding both, to avoid blocking mu on network writes.
283
+ // Only acquire both at the same time when changing peer settings.
284
+ wmu sync.Mutex
285
+ bw * bufio.Writer
286
+ fr * Framer
287
+ werr error // first write error that has occurred
278
288
hbuf bytes.Buffer // HPACK encoder writes into this
279
289
henc * hpack.Encoder
280
-
281
- wmu sync.Mutex // held while writing; acquire AFTER mu if holding both
282
- werr error // first write error that has occurred
283
290
}
284
291
285
292
// clientStream is the state for a single HTTP/2 stream. One of these
@@ -398,10 +405,11 @@ func (cs *clientStream) abortRequestBodyWrite(err error) {
398
405
cc .mu .Lock ()
399
406
if cs .stopReqBody == nil {
400
407
cs .stopReqBody = err
401
- if cs .req .Body != nil {
402
- cs .req .Body .Close ()
403
- }
404
408
cc .cond .Broadcast ()
409
+ // Close the body after releasing the mutex, in case it blocks.
410
+ if body := cs .req .Body ; body != nil {
411
+ defer body .Close ()
412
+ }
405
413
}
406
414
cc .mu .Unlock ()
407
415
}
@@ -670,6 +678,7 @@ func (t *Transport) newClientConn(c net.Conn, singleUse bool) (*ClientConn, erro
670
678
singleUse : singleUse ,
671
679
wantSettingsAck : true ,
672
680
pings : make (map [[8 ]byte ]chan struct {}),
681
+ reqHeaderMu : make (chan struct {}, 1 ),
673
682
}
674
683
if d := t .idleConnTimeout (); d != 0 {
675
684
cc .idleTimeout = d
@@ -898,41 +907,48 @@ func (cc *ClientConn) Shutdown(ctx context.Context) error {
898
907
899
908
func (cc * ClientConn ) sendGoAway () error {
900
909
cc .mu .Lock ()
901
- defer cc .mu .Unlock ()
902
- cc .wmu .Lock ()
903
- defer cc .wmu .Unlock ()
904
- if cc .closing {
910
+ closing := cc .closing
911
+ cc .closing = true
912
+ maxStreamID := cc .nextStreamID
913
+ cc .mu .Unlock ()
914
+ if closing {
905
915
// GOAWAY sent already
906
916
return nil
907
917
}
918
+
919
+ cc .wmu .Lock ()
920
+ defer cc .wmu .Unlock ()
908
921
// Send a graceful shutdown frame to server
909
- maxStreamID := cc .nextStreamID
910
922
if err := cc .fr .WriteGoAway (maxStreamID , ErrCodeNo , nil ); err != nil {
911
923
return err
912
924
}
913
925
if err := cc .bw .Flush (); err != nil {
914
926
return err
915
927
}
916
928
// Prevent new requests
917
- cc .closing = true
918
929
return nil
919
930
}
920
931
921
932
// closes the client connection immediately. In-flight requests are interrupted.
922
933
// err is sent to streams.
923
934
func (cc * ClientConn ) closeForError (err error ) error {
924
935
cc .mu .Lock ()
925
- defer cc .cond .Broadcast ()
926
- defer cc .mu .Unlock ()
927
- for id , cs := range cc .streams {
936
+ streams := cc .streams
937
+ cc .streams = nil
938
+ cc .closed = true
939
+ cc .mu .Unlock ()
940
+
941
+ for _ , cs := range streams {
928
942
select {
929
943
case cs .resc <- resAndError {err : err }:
930
944
default :
931
945
}
932
946
cs .bufPipe .CloseWithError (err )
933
- delete (cc .streams , id )
934
947
}
935
- cc .closed = true
948
+
949
+ cc .mu .Lock ()
950
+ defer cc .cond .Broadcast ()
951
+ defer cc .mu .Unlock ()
936
952
return cc .tconn .Close ()
937
953
}
938
954
@@ -1017,6 +1033,7 @@ func (cc *ClientConn) RoundTrip(req *http.Request) (*http.Response, error) {
1017
1033
}
1018
1034
1019
1035
func (cc * ClientConn ) roundTrip (req * http.Request ) (res * http.Response , gotErrAfterReqBodyWrite bool , err error ) {
1036
+ ctx := req .Context ()
1020
1037
if err := checkConnHeaders (req ); err != nil {
1021
1038
return nil , false , err
1022
1039
}
@@ -1030,6 +1047,26 @@ func (cc *ClientConn) roundTrip(req *http.Request) (res *http.Response, gotErrAf
1030
1047
}
1031
1048
hasTrailers := trailers != ""
1032
1049
1050
+ // Acquire the new-request lock by writing to reqHeaderMu.
1051
+ // This lock guards the critical section covering allocating a new stream ID
1052
+ // (requires mu) and creating the stream (requires wmu).
1053
+ if cc .reqHeaderMu == nil {
1054
+ panic ("RoundTrip on initialized ClientConn" ) // for tests
1055
+ }
1056
+ select {
1057
+ case cc .reqHeaderMu <- struct {}{}:
1058
+ case <- req .Cancel :
1059
+ return nil , false , errRequestCanceled
1060
+ case <- ctx .Done ():
1061
+ return nil , false , ctx .Err ()
1062
+ }
1063
+ reqHeaderMuNeedsUnlock := true
1064
+ defer func () {
1065
+ if reqHeaderMuNeedsUnlock {
1066
+ <- cc .reqHeaderMu
1067
+ }
1068
+ }()
1069
+
1033
1070
cc .mu .Lock ()
1034
1071
if err := cc .awaitOpenSlotForRequest (req ); err != nil {
1035
1072
cc .mu .Unlock ()
@@ -1061,22 +1098,24 @@ func (cc *ClientConn) roundTrip(req *http.Request) (res *http.Response, gotErrAf
1061
1098
requestedGzip = true
1062
1099
}
1063
1100
1101
+ cs := cc .newStream ()
1102
+ cs .req = req
1103
+ cs .trace = httptrace .ContextClientTrace (req .Context ())
1104
+ cs .requestedGzip = requestedGzip
1105
+ bodyWriter := cc .t .getBodyWriterState (cs , body )
1106
+ cs .on100 = bodyWriter .on100
1107
+ cc .mu .Unlock ()
1108
+
1064
1109
// we send: HEADERS{1}, CONTINUATION{0,} + DATA{0,} (DATA is
1065
1110
// sent by writeRequestBody below, along with any Trailers,
1066
1111
// again in form HEADERS{1}, CONTINUATION{0,})
1112
+ cc .wmu .Lock ()
1067
1113
hdrs , err := cc .encodeHeaders (req , requestedGzip , trailers , contentLen )
1068
1114
if err != nil {
1069
- cc .mu .Unlock ()
1115
+ cc .wmu .Unlock ()
1070
1116
return nil , false , err
1071
1117
}
1072
1118
1073
- cs := cc .newStream ()
1074
- cs .req = req
1075
- cs .trace = httptrace .ContextClientTrace (req .Context ())
1076
- cs .requestedGzip = requestedGzip
1077
- bodyWriter := cc .t .getBodyWriterState (cs , body )
1078
- cs .on100 = bodyWriter .on100
1079
-
1080
1119
defer func () {
1081
1120
cc .wmu .Lock ()
1082
1121
werr := cc .werr
@@ -1086,24 +1125,24 @@ func (cc *ClientConn) roundTrip(req *http.Request) (res *http.Response, gotErrAf
1086
1125
}
1087
1126
}()
1088
1127
1089
- cc .wmu .Lock ()
1090
1128
endStream := ! hasBody && ! hasTrailers
1091
- werr : = cc .writeHeaders (cs .ID , endStream , int (cc .maxFrameSize ), hdrs )
1129
+ err = cc .writeHeaders (cs .ID , endStream , int (cc .maxFrameSize ), hdrs )
1092
1130
cc .wmu .Unlock ()
1131
+ <- cc .reqHeaderMu // release the new-request lock
1132
+ reqHeaderMuNeedsUnlock = false
1093
1133
traceWroteHeaders (cs .trace )
1094
- cc .mu .Unlock ()
1095
1134
1096
- if werr != nil {
1135
+ if err != nil {
1097
1136
if hasBody {
1098
1137
bodyWriter .cancel ()
1099
1138
}
1100
1139
cc .forgetStreamID (cs .ID )
1101
1140
// Don't bother sending a RST_STREAM (our write already failed;
1102
1141
// no need to keep writing)
1103
- traceWroteRequest (cs .trace , werr )
1142
+ traceWroteRequest (cs .trace , err )
1104
1143
// TODO(dneil): An error occurred while writing the headers.
1105
1144
// Should we return an error indicating that this request can be retried?
1106
- return nil , false , werr
1145
+ return nil , false , err
1107
1146
}
1108
1147
1109
1148
var respHeaderTimer <- chan time.Time
@@ -1120,7 +1159,6 @@ func (cc *ClientConn) roundTrip(req *http.Request) (res *http.Response, gotErrAf
1120
1159
1121
1160
readLoopResCh := cs .resc
1122
1161
bodyWritten := false
1123
- ctx := req .Context ()
1124
1162
1125
1163
handleReadLoopResponse := func (re resAndError ) (* http.Response , bool , error ) {
1126
1164
res := re .res
@@ -1422,19 +1460,17 @@ func (cs *clientStream) writeRequestBody(body io.Reader, bodyCloser io.Closer) (
1422
1460
return nil
1423
1461
}
1424
1462
1463
+ cc .wmu .Lock ()
1425
1464
var trls []byte
1426
1465
if hasTrailers {
1427
- cc .mu .Lock ()
1428
1466
trls , err = cc .encodeTrailers (req )
1429
- cc .mu .Unlock ()
1430
1467
if err != nil {
1468
+ cc .wmu .Unlock ()
1431
1469
cc .writeStreamReset (cs .ID , ErrCodeInternal , err )
1432
1470
cc .forgetStreamID (cs .ID )
1433
1471
return err
1434
1472
}
1435
1473
}
1436
-
1437
- cc .wmu .Lock ()
1438
1474
defer cc .wmu .Unlock ()
1439
1475
1440
1476
// Two ways to send END_STREAM: either with trailers, or
@@ -1484,7 +1520,7 @@ func (cs *clientStream) awaitFlowControl(maxBytes int) (taken int32, err error)
1484
1520
}
1485
1521
}
1486
1522
1487
- // requires cc.mu be held.
1523
+ // requires cc.wmu be held.
1488
1524
func (cc * ClientConn ) encodeHeaders (req * http.Request , addGzipHeader bool , trailers string , contentLength int64 ) ([]byte , error ) {
1489
1525
cc .hbuf .Reset ()
1490
1526
@@ -1672,7 +1708,7 @@ func shouldSendReqContentLength(method string, contentLength int64) bool {
1672
1708
}
1673
1709
}
1674
1710
1675
- // requires cc.mu be held.
1711
+ // requires cc.wmu be held.
1676
1712
func (cc * ClientConn ) encodeTrailers (req * http.Request ) ([]byte , error ) {
1677
1713
cc .hbuf .Reset ()
1678
1714
@@ -1821,15 +1857,19 @@ func (rl *clientConnReadLoop) cleanup() {
1821
1857
} else if err == io .EOF {
1822
1858
err = io .ErrUnexpectedEOF
1823
1859
}
1824
- for _ , cs := range cc .streams {
1860
+ cc .closed = true
1861
+ streams := cc .streams
1862
+ cc .streams = nil
1863
+ cc .mu .Unlock ()
1864
+ for _ , cs := range streams {
1825
1865
cs .bufPipe .CloseWithError (err ) // no-op if already closed
1826
1866
select {
1827
1867
case cs .resc <- resAndError {err : err }:
1828
1868
default :
1829
1869
}
1830
1870
close (cs .done )
1831
1871
}
1832
- cc .closed = true
1872
+ cc .mu . Lock ()
1833
1873
cc .cond .Broadcast ()
1834
1874
cc .mu .Unlock ()
1835
1875
}
@@ -2159,8 +2199,6 @@ func (b transportResponseBody) Read(p []byte) (n int, err error) {
2159
2199
}
2160
2200
2161
2201
cc .mu .Lock ()
2162
- defer cc .mu .Unlock ()
2163
-
2164
2202
var connAdd , streamAdd int32
2165
2203
// Check the conn-level first, before the stream-level.
2166
2204
if v := cc .inflow .available (); v < transportDefaultConnFlow / 2 {
@@ -2177,6 +2215,8 @@ func (b transportResponseBody) Read(p []byte) (n int, err error) {
2177
2215
cs .inflow .add (streamAdd )
2178
2216
}
2179
2217
}
2218
+ cc .mu .Unlock ()
2219
+
2180
2220
if connAdd != 0 || streamAdd != 0 {
2181
2221
cc .wmu .Lock ()
2182
2222
defer cc .wmu .Unlock ()
@@ -2202,19 +2242,25 @@ func (b transportResponseBody) Close() error {
2202
2242
2203
2243
if unread > 0 || ! serverSentStreamEnd {
2204
2244
cc .mu .Lock ()
2205
- cc .wmu .Lock ()
2206
2245
if ! serverSentStreamEnd {
2207
- cc .fr .WriteRSTStream (cs .ID , ErrCodeCancel )
2208
2246
cs .didReset = true
2209
2247
}
2210
2248
// Return connection-level flow control.
2211
2249
if unread > 0 {
2212
2250
cc .inflow .add (int32 (unread ))
2251
+ }
2252
+ cc .mu .Unlock ()
2253
+
2254
+ cc .wmu .Lock ()
2255
+ if ! serverSentStreamEnd {
2256
+ cc .fr .WriteRSTStream (cs .ID , ErrCodeCancel )
2257
+ }
2258
+ // Return connection-level flow control.
2259
+ if unread > 0 {
2213
2260
cc .fr .WriteWindowUpdate (0 , uint32 (unread ))
2214
2261
}
2215
2262
cc .bw .Flush ()
2216
2263
cc .wmu .Unlock ()
2217
- cc .mu .Unlock ()
2218
2264
}
2219
2265
2220
2266
cs .bufPipe .BreakWithError (errClosedResponseBody )
@@ -2292,6 +2338,10 @@ func (rl *clientConnReadLoop) processData(f *DataFrame) error {
2292
2338
}
2293
2339
if refund > 0 {
2294
2340
cc .inflow .add (int32 (refund ))
2341
+ }
2342
+ cc .mu .Unlock ()
2343
+
2344
+ if refund > 0 {
2295
2345
cc .wmu .Lock ()
2296
2346
cc .fr .WriteWindowUpdate (0 , uint32 (refund ))
2297
2347
if ! didReset {
@@ -2301,7 +2351,6 @@ func (rl *clientConnReadLoop) processData(f *DataFrame) error {
2301
2351
cc .bw .Flush ()
2302
2352
cc .wmu .Unlock ()
2303
2353
}
2304
- cc .mu .Unlock ()
2305
2354
2306
2355
if len (data ) > 0 && ! didReset {
2307
2356
if _ , err := cs .bufPipe .Write (data ); err != nil {
@@ -2362,6 +2411,23 @@ func (rl *clientConnReadLoop) processGoAway(f *GoAwayFrame) error {
2362
2411
}
2363
2412
2364
2413
func (rl * clientConnReadLoop ) processSettings (f * SettingsFrame ) error {
2414
+ cc := rl .cc
2415
+ // Locking both mu and wmu here allows frame encoding to read settings with only wmu held.
2416
+ // Acquiring wmu when f.IsAck() is unnecessary, but convenient and mostly harmless.
2417
+ cc .wmu .Lock ()
2418
+ defer cc .wmu .Unlock ()
2419
+
2420
+ if err := rl .processSettingsNoWrite (f ); err != nil {
2421
+ return err
2422
+ }
2423
+ if ! f .IsAck () {
2424
+ cc .fr .WriteSettingsAck ()
2425
+ cc .bw .Flush ()
2426
+ }
2427
+ return nil
2428
+ }
2429
+
2430
+ func (rl * clientConnReadLoop ) processSettingsNoWrite (f * SettingsFrame ) error {
2365
2431
cc := rl .cc
2366
2432
cc .mu .Lock ()
2367
2433
defer cc .mu .Unlock ()
@@ -2424,12 +2490,7 @@ func (rl *clientConnReadLoop) processSettings(f *SettingsFrame) error {
2424
2490
cc .seenSettings = true
2425
2491
}
2426
2492
2427
- cc .wmu .Lock ()
2428
- defer cc .wmu .Unlock ()
2429
-
2430
- cc .fr .WriteSettingsAck ()
2431
- cc .bw .Flush ()
2432
- return cc .werr
2493
+ return nil
2433
2494
}
2434
2495
2435
2496
func (rl * clientConnReadLoop ) processWindowUpdate (f * WindowUpdateFrame ) error {
0 commit comments