Skip to content

Commit 524fcad

Browse files
bradfitzdmitshur
authored andcommitted
[internal-branch.go1.16-vendor] http2: make Transport not reuse conns after a stream protocol error
If a server sends a stream error of type "protocol error" to a client, that's the server saying "you're speaking http2 wrong". At that point, regardless of whether we're in the right or not (that is, regardless of whether the Transport is bug-free), clearly there's some confusion and one of the two parties is either wrong or confused. There's no point pushing on and trying to use the connection and potentially exacerbating the confusion (as we saw in golang/go#47635). Instead, make the client "poison" the connection by setting a new "do not reuse" bit on it. Existing streams can finish up but new requests won't pick that connection. Also, make those requests as retryable without the caller getting an error. Given that golang/go#42777 existed, there are HTTP/2 servers in the wild that incorrectly set RST_STREAM PROTOCOL_ERROR codes. But even once those go away, this is still a reasonable fix for preventing a broken connection from being stuck in the connection pool that fails all future requests if a similar bug happens in another HTTP/2 server. Updates golang/go#49076 Change-Id: I3f89ecd1d3710e49f7219ccb846e016eb269515b Reviewed-on: https://go-review.googlesource.com/c/net/+/347033 Trust: Brad Fitzpatrick <[email protected]> Run-TryBot: Brad Fitzpatrick <[email protected]> TryBot-Result: Go Bot <[email protected]> Reviewed-by: Damien Neil <[email protected]> Reviewed-on: https://go-review.googlesource.com/c/net/+/356978 Trust: Damien Neil <[email protected]> Run-TryBot: Damien Neil <[email protected]> Reviewed-by: Dmitri Shuralyov <[email protected]>
1 parent 0fe5f8a commit 524fcad

File tree

3 files changed

+129
-3
lines changed

3 files changed

+129
-3
lines changed

http2/errors.go

+5
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,11 @@ type StreamError struct {
6767
Cause error // optional additional detail
6868
}
6969

70+
// errFromPeer is a sentinel error value for StreamError.Cause to
71+
// indicate that the StreamError was sent from the peer over the wire
72+
// and wasn't locally generated in the Transport.
73+
var errFromPeer = errors.New("received from peer")
74+
7075
func streamError(id uint32, code ErrCode) StreamError {
7176
return StreamError{StreamID: id, Code: code}
7277
}

http2/transport.go

+23-3
Original file line numberDiff line numberDiff line change
@@ -244,6 +244,7 @@ type ClientConn struct {
244244
cond *sync.Cond // hold mu; broadcast on flow/closed changes
245245
flow flow // our conn-level flow control quota (cs.flow is per stream)
246246
inflow flow // peer's conn-level flow control
247+
doNotReuse bool // whether conn is marked to not be reused for any future requests
247248
closing bool
248249
closed bool
249250
wantSettingsAck bool // we sent a SETTINGS frame and haven't heard back
@@ -563,6 +564,10 @@ func canRetryError(err error) bool {
563564
return true
564565
}
565566
if se, ok := err.(StreamError); ok {
567+
if se.Code == ErrCodeProtocol && se.Cause == errFromPeer {
568+
// See golang/go#47635, golang/go#42777
569+
return true
570+
}
566571
return se.Code == ErrCodeRefusedStream
567572
}
568573
return false
@@ -718,6 +723,13 @@ func (cc *ClientConn) healthCheck() {
718723
}
719724
}
720725

726+
// SetDoNotReuse marks cc as not reusable for future HTTP requests.
727+
func (cc *ClientConn) SetDoNotReuse() {
728+
cc.mu.Lock()
729+
defer cc.mu.Unlock()
730+
cc.doNotReuse = true
731+
}
732+
721733
func (cc *ClientConn) setGoAway(f *GoAwayFrame) {
722734
cc.mu.Lock()
723735
defer cc.mu.Unlock()
@@ -780,6 +792,7 @@ func (cc *ClientConn) idleStateLocked() (st clientConnIdleState) {
780792
}
781793

782794
st.canTakeNewRequest = cc.goAway == nil && !cc.closed && !cc.closing && maxConcurrentOkay &&
795+
!cc.doNotReuse &&
783796
int64(cc.nextStreamID)+2*int64(cc.pendingRequests) < math.MaxInt32 &&
784797
!cc.tooIdleLocked()
785798
st.freshConn = cc.nextStreamID == 1 && st.canTakeNewRequest
@@ -2423,10 +2436,17 @@ func (rl *clientConnReadLoop) processResetStream(f *RSTStreamFrame) error {
24232436
// which closes this, so there
24242437
// isn't a race.
24252438
default:
2426-
err := streamError(cs.ID, f.ErrCode)
2427-
cs.resetErr = err
2439+
serr := streamError(cs.ID, f.ErrCode)
2440+
if f.ErrCode == ErrCodeProtocol {
2441+
rl.cc.SetDoNotReuse()
2442+
serr.Cause = errFromPeer
2443+
// TODO(bradfitz): increment a varz here, once Transport
2444+
// takes an optional interface-typed field that expvar.Map.Add
2445+
// implements.
2446+
}
2447+
cs.resetErr = serr
24282448
close(cs.peerReset)
2429-
cs.bufPipe.CloseWithError(err)
2449+
cs.bufPipe.CloseWithError(serr)
24302450
cs.cc.cond.Broadcast() // wake up checkResetOrDone via clientStream.awaitFlowControl
24312451
}
24322452
return nil

http2/transport_test.go

+101
Original file line numberDiff line numberDiff line change
@@ -4944,3 +4944,104 @@ func TestTransportCloseRequestBody(t *testing.T) {
49444944
})
49454945
}
49464946
}
4947+
4948+
// collectClientsConnPool is a ClientConnPool that wraps lower and
4949+
// collects what calls were made on it.
4950+
type collectClientsConnPool struct {
4951+
lower ClientConnPool
4952+
4953+
mu sync.Mutex
4954+
getErrs int
4955+
got []*ClientConn
4956+
}
4957+
4958+
func (p *collectClientsConnPool) GetClientConn(req *http.Request, addr string) (*ClientConn, error) {
4959+
cc, err := p.lower.GetClientConn(req, addr)
4960+
p.mu.Lock()
4961+
defer p.mu.Unlock()
4962+
if err != nil {
4963+
p.getErrs++
4964+
return nil, err
4965+
}
4966+
p.got = append(p.got, cc)
4967+
return cc, nil
4968+
}
4969+
4970+
func (p *collectClientsConnPool) MarkDead(cc *ClientConn) {
4971+
p.lower.MarkDead(cc)
4972+
}
4973+
4974+
func TestTransportRetriesOnStreamProtocolError(t *testing.T) {
4975+
ct := newClientTester(t)
4976+
pool := &collectClientsConnPool{
4977+
lower: &clientConnPool{t: ct.tr},
4978+
}
4979+
ct.tr.ConnPool = pool
4980+
done := make(chan struct{})
4981+
ct.client = func() error {
4982+
req, _ := http.NewRequest("GET", "https://dummy.tld/", nil)
4983+
res, err := ct.tr.RoundTrip(req)
4984+
const want = "only one dial allowed in test mode"
4985+
if got := fmt.Sprint(err); got != want {
4986+
t.Errorf("didn't dial again: got %#q; want %#q", got, want)
4987+
}
4988+
close(done)
4989+
ct.sc.Close()
4990+
if res != nil {
4991+
res.Body.Close()
4992+
}
4993+
4994+
pool.mu.Lock()
4995+
defer pool.mu.Unlock()
4996+
if pool.getErrs != 1 {
4997+
t.Errorf("pool get errors = %v; want 1", pool.getErrs)
4998+
}
4999+
if len(pool.got) == 1 {
5000+
cc := pool.got[0]
5001+
cc.mu.Lock()
5002+
if !cc.doNotReuse {
5003+
t.Error("ClientConn not marked doNotReuse")
5004+
}
5005+
cc.mu.Unlock()
5006+
} else {
5007+
t.Errorf("pool get success = %v; want 1", len(pool.got))
5008+
}
5009+
return nil
5010+
}
5011+
ct.server = func() error {
5012+
ct.greet()
5013+
var sentErr bool
5014+
for {
5015+
f, err := ct.fr.ReadFrame()
5016+
if err != nil {
5017+
select {
5018+
case <-done:
5019+
return nil
5020+
default:
5021+
return err
5022+
}
5023+
}
5024+
switch f := f.(type) {
5025+
case *WindowUpdateFrame, *SettingsFrame:
5026+
case *HeadersFrame:
5027+
if !sentErr {
5028+
sentErr = true
5029+
ct.fr.WriteRSTStream(f.StreamID, ErrCodeProtocol)
5030+
continue
5031+
}
5032+
var buf bytes.Buffer
5033+
enc := hpack.NewEncoder(&buf)
5034+
// send headers without Trailer header
5035+
enc.WriteField(hpack.HeaderField{Name: ":status", Value: "200"})
5036+
ct.fr.WriteHeaders(HeadersFrameParam{
5037+
StreamID: f.StreamID,
5038+
EndHeaders: true,
5039+
EndStream: true,
5040+
BlockFragment: buf.Bytes(),
5041+
})
5042+
}
5043+
}
5044+
return nil
5045+
}
5046+
ct.run()
5047+
}

0 commit comments

Comments
 (0)