Skip to content

Commit 978cfad

Browse files
committed
http2: avoid blocking while holding ClientConn.mu
Operations which examine the state of a ClientConn--notably, the connection pool's check to see if a conn is available to take a new request--need to acquire mu. Blocking while holding mu, such as when writing to the network, blocks these operations. Remove blocking operations from the mutex. Perform network writes with only ClientConn.wmu held. Clarify that wmu guards the per-conn HPACK encoder and buffer. Add a new mutex guarding request creation, covering the critical section starting with allocating a new stream ID and continuing until the stream is created. Fix a locking issue where trailers were written from the HPACK buffer with only wmu held, but headers were encoded into the buffer with only mu held. (Now both encoding and writes occur with wmu held.) Fixes golang/go#32388. Fixes golang/go#48340. Change-Id: Ibb313424ed2f32c1aeac4645b76aedf227b597a3 Reviewed-on: https://go-review.googlesource.com/c/net/+/349594 Trust: Damien Neil <[email protected]> Run-TryBot: Damien Neil <[email protected]> TryBot-Result: Go Bot <[email protected]> Reviewed-by: Brad Fitzpatrick <[email protected]>
1 parent 95888ee commit 978cfad

File tree

2 files changed

+358
-56
lines changed

2 files changed

+358
-56
lines changed

http2/transport.go

+116-55
Original file line numberDiff line numberDiff line change
@@ -270,22 +270,29 @@ type ClientConn struct {
270270
nextStreamID uint32
271271
pendingRequests int // requests blocked and waiting to be sent because len(streams) == maxConcurrentStreams
272272
pings map[[8]byte]chan struct{} // in flight ping data to notification channel
273-
bw *bufio.Writer
274273
br *bufio.Reader
275-
fr *Framer
276274
lastActive time.Time
277275
lastIdle time.Time // time last idle
278-
// Settings from peer: (also guarded by mu)
276+
// Settings from peer: (also guarded by wmu)
279277
maxFrameSize uint32
280278
maxConcurrentStreams uint32
281279
peerMaxHeaderListSize uint64
282280
initialWindowSize uint32
283281

282+
// reqHeaderMu is a 1-element semaphore channel controlling access to sending new requests.
283+
// Write to reqHeaderMu to lock it, read from it to unlock.
284+
// Lock reqmu BEFORE mu or wmu.
285+
reqHeaderMu chan struct{}
286+
287+
// wmu is held while writing.
288+
// Acquire BEFORE mu when holding both, to avoid blocking mu on network writes.
289+
// Only acquire both at the same time when changing peer settings.
290+
wmu sync.Mutex
291+
bw *bufio.Writer
292+
fr *Framer
293+
werr error // first write error that has occurred
284294
hbuf bytes.Buffer // HPACK encoder writes into this
285295
henc *hpack.Encoder
286-
287-
wmu sync.Mutex // held while writing; acquire AFTER mu if holding both
288-
werr error // first write error that has occurred
289296
}
290297

291298
// clientStream is the state for a single HTTP/2 stream. One of these
@@ -404,10 +411,11 @@ func (cs *clientStream) abortRequestBodyWrite(err error) {
404411
cc.mu.Lock()
405412
if cs.stopReqBody == nil {
406413
cs.stopReqBody = err
407-
if cs.req.Body != nil {
408-
cs.req.Body.Close()
409-
}
410414
cc.cond.Broadcast()
415+
// Close the body after releasing the mutex, in case it blocks.
416+
if body := cs.req.Body; body != nil {
417+
defer body.Close()
418+
}
411419
}
412420
cc.mu.Unlock()
413421
}
@@ -672,6 +680,7 @@ func (t *Transport) newClientConn(c net.Conn, singleUse bool) (*ClientConn, erro
672680
singleUse: singleUse,
673681
wantSettingsAck: true,
674682
pings: make(map[[8]byte]chan struct{}),
683+
reqHeaderMu: make(chan struct{}, 1),
675684
}
676685
if d := t.idleConnTimeout(); d != 0 {
677686
cc.idleTimeout = d
@@ -900,41 +909,48 @@ func (cc *ClientConn) Shutdown(ctx context.Context) error {
900909

901910
func (cc *ClientConn) sendGoAway() error {
902911
cc.mu.Lock()
903-
defer cc.mu.Unlock()
904-
cc.wmu.Lock()
905-
defer cc.wmu.Unlock()
906-
if cc.closing {
912+
closing := cc.closing
913+
cc.closing = true
914+
maxStreamID := cc.nextStreamID
915+
cc.mu.Unlock()
916+
if closing {
907917
// GOAWAY sent already
908918
return nil
909919
}
920+
921+
cc.wmu.Lock()
922+
defer cc.wmu.Unlock()
910923
// Send a graceful shutdown frame to server
911-
maxStreamID := cc.nextStreamID
912924
if err := cc.fr.WriteGoAway(maxStreamID, ErrCodeNo, nil); err != nil {
913925
return err
914926
}
915927
if err := cc.bw.Flush(); err != nil {
916928
return err
917929
}
918930
// Prevent new requests
919-
cc.closing = true
920931
return nil
921932
}
922933

923934
// closes the client connection immediately. In-flight requests are interrupted.
924935
// err is sent to streams.
925936
func (cc *ClientConn) closeForError(err error) error {
926937
cc.mu.Lock()
927-
defer cc.cond.Broadcast()
928-
defer cc.mu.Unlock()
929-
for id, cs := range cc.streams {
938+
streams := cc.streams
939+
cc.streams = nil
940+
cc.closed = true
941+
cc.mu.Unlock()
942+
943+
for _, cs := range streams {
930944
select {
931945
case cs.resc <- resAndError{err: err}:
932946
default:
933947
}
934948
cs.bufPipe.CloseWithError(err)
935-
delete(cc.streams, id)
936949
}
937-
cc.closed = true
950+
951+
cc.mu.Lock()
952+
defer cc.cond.Broadcast()
953+
defer cc.mu.Unlock()
938954
return cc.tconn.Close()
939955
}
940956

@@ -1022,6 +1038,7 @@ func (cc *ClientConn) RoundTrip(req *http.Request) (*http.Response, error) {
10221038
}
10231039

10241040
func (cc *ClientConn) roundTrip(req *http.Request) (res *http.Response, gotErrAfterReqBodyWrite bool, err error) {
1041+
ctx := req.Context()
10251042
if err := checkConnHeaders(req); err != nil {
10261043
return nil, false, err
10271044
}
@@ -1035,6 +1052,26 @@ func (cc *ClientConn) roundTrip(req *http.Request) (res *http.Response, gotErrAf
10351052
}
10361053
hasTrailers := trailers != ""
10371054

1055+
// Acquire the new-request lock by writing to reqHeaderMu.
1056+
// This lock guards the critical section covering allocating a new stream ID
1057+
// (requires mu) and creating the stream (requires wmu).
1058+
if cc.reqHeaderMu == nil {
1059+
panic("RoundTrip on initialized ClientConn") // for tests
1060+
}
1061+
select {
1062+
case cc.reqHeaderMu <- struct{}{}:
1063+
case <-req.Cancel:
1064+
return nil, false, errRequestCanceled
1065+
case <-ctx.Done():
1066+
return nil, false, ctx.Err()
1067+
}
1068+
reqHeaderMuNeedsUnlock := true
1069+
defer func() {
1070+
if reqHeaderMuNeedsUnlock {
1071+
<-cc.reqHeaderMu
1072+
}
1073+
}()
1074+
10381075
cc.mu.Lock()
10391076
if err := cc.awaitOpenSlotForRequest(req); err != nil {
10401077
cc.mu.Unlock()
@@ -1066,22 +1103,24 @@ func (cc *ClientConn) roundTrip(req *http.Request) (res *http.Response, gotErrAf
10661103
requestedGzip = true
10671104
}
10681105

1106+
cs := cc.newStream()
1107+
cs.req = req
1108+
cs.trace = httptrace.ContextClientTrace(req.Context())
1109+
cs.requestedGzip = requestedGzip
1110+
bodyWriter := cc.t.getBodyWriterState(cs, body)
1111+
cs.on100 = bodyWriter.on100
1112+
cc.mu.Unlock()
1113+
10691114
// we send: HEADERS{1}, CONTINUATION{0,} + DATA{0,} (DATA is
10701115
// sent by writeRequestBody below, along with any Trailers,
10711116
// again in form HEADERS{1}, CONTINUATION{0,})
1117+
cc.wmu.Lock()
10721118
hdrs, err := cc.encodeHeaders(req, requestedGzip, trailers, contentLen)
10731119
if err != nil {
1074-
cc.mu.Unlock()
1120+
cc.wmu.Unlock()
10751121
return nil, false, err
10761122
}
10771123

1078-
cs := cc.newStream()
1079-
cs.req = req
1080-
cs.trace = httptrace.ContextClientTrace(req.Context())
1081-
cs.requestedGzip = requestedGzip
1082-
bodyWriter := cc.t.getBodyWriterState(cs, body)
1083-
cs.on100 = bodyWriter.on100
1084-
10851124
defer func() {
10861125
cc.wmu.Lock()
10871126
werr := cc.werr
@@ -1091,24 +1130,24 @@ func (cc *ClientConn) roundTrip(req *http.Request) (res *http.Response, gotErrAf
10911130
}
10921131
}()
10931132

1094-
cc.wmu.Lock()
10951133
endStream := !hasBody && !hasTrailers
1096-
werr := cc.writeHeaders(cs.ID, endStream, int(cc.maxFrameSize), hdrs)
1134+
err = cc.writeHeaders(cs.ID, endStream, int(cc.maxFrameSize), hdrs)
10971135
cc.wmu.Unlock()
1136+
<-cc.reqHeaderMu // release the new-request lock
1137+
reqHeaderMuNeedsUnlock = false
10981138
traceWroteHeaders(cs.trace)
1099-
cc.mu.Unlock()
11001139

1101-
if werr != nil {
1140+
if err != nil {
11021141
if hasBody {
11031142
bodyWriter.cancel()
11041143
}
11051144
cc.forgetStreamID(cs.ID)
11061145
// Don't bother sending a RST_STREAM (our write already failed;
11071146
// no need to keep writing)
1108-
traceWroteRequest(cs.trace, werr)
1147+
traceWroteRequest(cs.trace, err)
11091148
// TODO(dneil): An error occurred while writing the headers.
11101149
// Should we return an error indicating that this request can be retried?
1111-
return nil, false, werr
1150+
return nil, false, err
11121151
}
11131152

11141153
var respHeaderTimer <-chan time.Time
@@ -1125,7 +1164,6 @@ func (cc *ClientConn) roundTrip(req *http.Request) (res *http.Response, gotErrAf
11251164

11261165
readLoopResCh := cs.resc
11271166
bodyWritten := false
1128-
ctx := req.Context()
11291167

11301168
handleReadLoopResponse := func(re resAndError) (*http.Response, bool, error) {
11311169
res := re.res
@@ -1427,19 +1465,17 @@ func (cs *clientStream) writeRequestBody(body io.Reader, bodyCloser io.Closer) (
14271465
return nil
14281466
}
14291467

1468+
cc.wmu.Lock()
14301469
var trls []byte
14311470
if hasTrailers {
1432-
cc.mu.Lock()
14331471
trls, err = cc.encodeTrailers(req)
1434-
cc.mu.Unlock()
14351472
if err != nil {
1473+
cc.wmu.Unlock()
14361474
cc.writeStreamReset(cs.ID, ErrCodeInternal, err)
14371475
cc.forgetStreamID(cs.ID)
14381476
return err
14391477
}
14401478
}
1441-
1442-
cc.wmu.Lock()
14431479
defer cc.wmu.Unlock()
14441480

14451481
// Two ways to send END_STREAM: either with trailers, or
@@ -1489,7 +1525,7 @@ func (cs *clientStream) awaitFlowControl(maxBytes int) (taken int32, err error)
14891525
}
14901526
}
14911527

1492-
// requires cc.mu be held.
1528+
// requires cc.wmu be held.
14931529
func (cc *ClientConn) encodeHeaders(req *http.Request, addGzipHeader bool, trailers string, contentLength int64) ([]byte, error) {
14941530
cc.hbuf.Reset()
14951531

@@ -1677,7 +1713,7 @@ func shouldSendReqContentLength(method string, contentLength int64) bool {
16771713
}
16781714
}
16791715

1680-
// requires cc.mu be held.
1716+
// requires cc.wmu be held.
16811717
func (cc *ClientConn) encodeTrailers(req *http.Request) ([]byte, error) {
16821718
cc.hbuf.Reset()
16831719

@@ -1826,15 +1862,19 @@ func (rl *clientConnReadLoop) cleanup() {
18261862
} else if err == io.EOF {
18271863
err = io.ErrUnexpectedEOF
18281864
}
1829-
for _, cs := range cc.streams {
1865+
cc.closed = true
1866+
streams := cc.streams
1867+
cc.streams = nil
1868+
cc.mu.Unlock()
1869+
for _, cs := range streams {
18301870
cs.bufPipe.CloseWithError(err) // no-op if already closed
18311871
select {
18321872
case cs.resc <- resAndError{err: err}:
18331873
default:
18341874
}
18351875
close(cs.done)
18361876
}
1837-
cc.closed = true
1877+
cc.mu.Lock()
18381878
cc.cond.Broadcast()
18391879
cc.mu.Unlock()
18401880
}
@@ -2192,8 +2232,6 @@ func (b transportResponseBody) Read(p []byte) (n int, err error) {
21922232
}
21932233

21942234
cc.mu.Lock()
2195-
defer cc.mu.Unlock()
2196-
21972235
var connAdd, streamAdd int32
21982236
// Check the conn-level first, before the stream-level.
21992237
if v := cc.inflow.available(); v < transportDefaultConnFlow/2 {
@@ -2210,6 +2248,8 @@ func (b transportResponseBody) Read(p []byte) (n int, err error) {
22102248
cs.inflow.add(streamAdd)
22112249
}
22122250
}
2251+
cc.mu.Unlock()
2252+
22132253
if connAdd != 0 || streamAdd != 0 {
22142254
cc.wmu.Lock()
22152255
defer cc.wmu.Unlock()
@@ -2235,19 +2275,25 @@ func (b transportResponseBody) Close() error {
22352275

22362276
if unread > 0 || !serverSentStreamEnd {
22372277
cc.mu.Lock()
2238-
cc.wmu.Lock()
22392278
if !serverSentStreamEnd {
2240-
cc.fr.WriteRSTStream(cs.ID, ErrCodeCancel)
22412279
cs.didReset = true
22422280
}
22432281
// Return connection-level flow control.
22442282
if unread > 0 {
22452283
cc.inflow.add(int32(unread))
2284+
}
2285+
cc.mu.Unlock()
2286+
2287+
cc.wmu.Lock()
2288+
if !serverSentStreamEnd {
2289+
cc.fr.WriteRSTStream(cs.ID, ErrCodeCancel)
2290+
}
2291+
// Return connection-level flow control.
2292+
if unread > 0 {
22462293
cc.fr.WriteWindowUpdate(0, uint32(unread))
22472294
}
22482295
cc.bw.Flush()
22492296
cc.wmu.Unlock()
2250-
cc.mu.Unlock()
22512297
}
22522298

22532299
cs.bufPipe.BreakWithError(errClosedResponseBody)
@@ -2325,6 +2371,10 @@ func (rl *clientConnReadLoop) processData(f *DataFrame) error {
23252371
}
23262372
if refund > 0 {
23272373
cc.inflow.add(int32(refund))
2374+
}
2375+
cc.mu.Unlock()
2376+
2377+
if refund > 0 {
23282378
cc.wmu.Lock()
23292379
cc.fr.WriteWindowUpdate(0, uint32(refund))
23302380
if !didReset {
@@ -2334,7 +2384,6 @@ func (rl *clientConnReadLoop) processData(f *DataFrame) error {
23342384
cc.bw.Flush()
23352385
cc.wmu.Unlock()
23362386
}
2337-
cc.mu.Unlock()
23382387

23392388
if len(data) > 0 && !didReset {
23402389
if _, err := cs.bufPipe.Write(data); err != nil {
@@ -2399,6 +2448,23 @@ func (rl *clientConnReadLoop) processGoAway(f *GoAwayFrame) error {
23992448
}
24002449

24012450
func (rl *clientConnReadLoop) processSettings(f *SettingsFrame) error {
2451+
cc := rl.cc
2452+
// Locking both mu and wmu here allows frame encoding to read settings with only wmu held.
2453+
// Acquiring wmu when f.IsAck() is unnecessary, but convenient and mostly harmless.
2454+
cc.wmu.Lock()
2455+
defer cc.wmu.Unlock()
2456+
2457+
if err := rl.processSettingsNoWrite(f); err != nil {
2458+
return err
2459+
}
2460+
if !f.IsAck() {
2461+
cc.fr.WriteSettingsAck()
2462+
cc.bw.Flush()
2463+
}
2464+
return nil
2465+
}
2466+
2467+
func (rl *clientConnReadLoop) processSettingsNoWrite(f *SettingsFrame) error {
24022468
cc := rl.cc
24032469
cc.mu.Lock()
24042470
defer cc.mu.Unlock()
@@ -2461,12 +2527,7 @@ func (rl *clientConnReadLoop) processSettings(f *SettingsFrame) error {
24612527
cc.seenSettings = true
24622528
}
24632529

2464-
cc.wmu.Lock()
2465-
defer cc.wmu.Unlock()
2466-
2467-
cc.fr.WriteSettingsAck()
2468-
cc.bw.Flush()
2469-
return cc.werr
2530+
return nil
24702531
}
24712532

24722533
func (rl *clientConnReadLoop) processWindowUpdate(f *WindowUpdateFrame) error {

0 commit comments

Comments
 (0)