Skip to content

Commit 5440bfc

Browse files
committed
net/http/httputil: rewrite flushing code, disable on Server-Sent Events
* Rewrite the flushing code to not use a persistent goroutine, which also simplifies testing. * Define the meaning of a negative flush interval. Its meaning doesn't change, but now it's locked in, and then we can use it to optimize the performance of the non-buffered case to avoid use of an AfterFunc. * Support (internal-only) special casing of FlushInterval values per request/response. * For now, treat Server-Sent Event responses as unbuffered. (or rather, immediately flushed from the buffer per-write) Fixes #27816 Change-Id: Ie0f975c997daa3db539504137c741a96d7022665 Reviewed-on: https://go-review.googlesource.com/c/137335 Run-TryBot: Brad Fitzpatrick <[email protected]> TryBot-Result: Gobot Gobot <[email protected]> Reviewed-by: Dmitri Shuralyov <[email protected]>
1 parent ffc7bc5 commit 5440bfc

File tree

2 files changed

+102
-43
lines changed

2 files changed

+102
-43
lines changed

src/net/http/httputil/reverseproxy.go

Lines changed: 57 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,6 @@ import (
1818
"time"
1919
)
2020

21-
// onExitFlushLoop is a callback set by tests to detect the state of the
22-
// flushLoop() goroutine.
23-
var onExitFlushLoop func()
24-
2521
// ReverseProxy is an HTTP Handler that takes an incoming request and
2622
// sends it to another server, proxying the response back to the
2723
// client.
@@ -42,6 +38,12 @@ type ReverseProxy struct {
4238
// to flush to the client while copying the
4339
// response body.
4440
// If zero, no periodic flushing is done.
41+
// A negative value means to flush immediately
42+
// after each write to the client.
43+
// The FlushInterval is ignored when ReverseProxy
44+
// recognizes a response as a streaming response;
45+
// for such reponses, writes are flushed to the client
46+
// immediately.
4547
FlushInterval time.Duration
4648

4749
// ErrorLog specifies an optional logger for errors
@@ -271,7 +273,7 @@ func (p *ReverseProxy) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
271273
fl.Flush()
272274
}
273275
}
274-
err = p.copyResponse(rw, res.Body)
276+
err = p.copyResponse(rw, res.Body, p.flushInterval(req, res))
275277
if err != nil {
276278
defer res.Body.Close()
277279
// Since we're streaming the response, if we run into an error all we can do
@@ -332,15 +334,28 @@ func removeConnectionHeaders(h http.Header) {
332334
}
333335
}
334336

335-
func (p *ReverseProxy) copyResponse(dst io.Writer, src io.Reader) error {
336-
if p.FlushInterval != 0 {
337+
// flushInterval returns the p.FlushInterval value, conditionally
338+
// overriding its value for a specific request/response.
339+
func (p *ReverseProxy) flushInterval(req *http.Request, res *http.Response) time.Duration {
340+
resCT := res.Header.Get("Content-Type")
341+
342+
// For Server-Sent Events responses, flush immediately.
343+
// The MIME type is defined in https://www.w3.org/TR/eventsource/#text-event-stream
344+
if resCT == "text/event-stream" {
345+
return -1 // negative means immediately
346+
}
347+
348+
// TODO: more specific cases? e.g. res.ContentLength == -1?
349+
return p.FlushInterval
350+
}
351+
352+
func (p *ReverseProxy) copyResponse(dst io.Writer, src io.Reader, flushInterval time.Duration) error {
353+
if flushInterval != 0 {
337354
if wf, ok := dst.(writeFlusher); ok {
338355
mlw := &maxLatencyWriter{
339356
dst: wf,
340-
latency: p.FlushInterval,
341-
done: make(chan bool),
357+
latency: flushInterval,
342358
}
343-
go mlw.flushLoop()
344359
defer mlw.stop()
345360
dst = mlw
346361
}
@@ -403,34 +418,44 @@ type writeFlusher interface {
403418

404419
type maxLatencyWriter struct {
405420
dst writeFlusher
406-
latency time.Duration
421+
latency time.Duration // non-zero; negative means to flush immediately
407422

408-
mu sync.Mutex // protects Write + Flush
409-
done chan bool
423+
mu sync.Mutex // protects t, flushPending, and dst.Flush
424+
t *time.Timer
425+
flushPending bool
410426
}
411427

412-
func (m *maxLatencyWriter) Write(p []byte) (int, error) {
428+
func (m *maxLatencyWriter) Write(p []byte) (n int, err error) {
413429
m.mu.Lock()
414430
defer m.mu.Unlock()
415-
return m.dst.Write(p)
431+
n, err = m.dst.Write(p)
432+
if m.latency < 0 {
433+
m.dst.Flush()
434+
return
435+
}
436+
if m.flushPending {
437+
return
438+
}
439+
if m.t == nil {
440+
m.t = time.AfterFunc(m.latency, m.delayedFlush)
441+
} else {
442+
m.t.Reset(m.latency)
443+
}
444+
m.flushPending = true
445+
return
416446
}
417447

418-
func (m *maxLatencyWriter) flushLoop() {
419-
t := time.NewTicker(m.latency)
420-
defer t.Stop()
421-
for {
422-
select {
423-
case <-m.done:
424-
if onExitFlushLoop != nil {
425-
onExitFlushLoop()
426-
}
427-
return
428-
case <-t.C:
429-
m.mu.Lock()
430-
m.dst.Flush()
431-
m.mu.Unlock()
432-
}
433-
}
448+
func (m *maxLatencyWriter) delayedFlush() {
449+
m.mu.Lock()
450+
defer m.mu.Unlock()
451+
m.dst.Flush()
452+
m.flushPending = false
434453
}
435454

436-
func (m *maxLatencyWriter) stop() { m.done <- true }
455+
func (m *maxLatencyWriter) stop() {
456+
m.mu.Lock()
457+
defer m.mu.Unlock()
458+
if m.t != nil {
459+
m.t.Stop()
460+
}
461+
}

src/net/http/httputil/reverseproxy_test.go

Lines changed: 45 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -297,10 +297,6 @@ func TestReverseProxyFlushInterval(t *testing.T) {
297297
proxyHandler := NewSingleHostReverseProxy(backendURL)
298298
proxyHandler.FlushInterval = time.Microsecond
299299

300-
done := make(chan bool)
301-
onExitFlushLoop = func() { done <- true }
302-
defer func() { onExitFlushLoop = nil }()
303-
304300
frontend := httptest.NewServer(proxyHandler)
305301
defer frontend.Close()
306302

@@ -314,13 +310,6 @@ func TestReverseProxyFlushInterval(t *testing.T) {
314310
if bodyBytes, _ := ioutil.ReadAll(res.Body); string(bodyBytes) != expected {
315311
t.Errorf("got body %q; expected %q", bodyBytes, expected)
316312
}
317-
318-
select {
319-
case <-done:
320-
// OK
321-
case <-time.After(5 * time.Second):
322-
t.Error("maxLatencyWriter flushLoop() never exited")
323-
}
324313
}
325314

326315
func TestReverseProxyCancelation(t *testing.T) {
@@ -946,3 +935,48 @@ func TestReverseProxy_PanicBodyError(t *testing.T) {
946935
req, _ := http.NewRequest("GET", "http://foo.tld/", nil)
947936
rproxy.ServeHTTP(httptest.NewRecorder(), req)
948937
}
938+
939+
func TestSelectFlushInterval(t *testing.T) {
940+
tests := []struct {
941+
name string
942+
p *ReverseProxy
943+
req *http.Request
944+
res *http.Response
945+
want time.Duration
946+
}{
947+
{
948+
name: "default",
949+
res: &http.Response{},
950+
p: &ReverseProxy{FlushInterval: 123},
951+
want: 123,
952+
},
953+
{
954+
name: "server-sent events overrides non-zero",
955+
res: &http.Response{
956+
Header: http.Header{
957+
"Content-Type": {"text/event-stream"},
958+
},
959+
},
960+
p: &ReverseProxy{FlushInterval: 123},
961+
want: -1,
962+
},
963+
{
964+
name: "server-sent events overrides zero",
965+
res: &http.Response{
966+
Header: http.Header{
967+
"Content-Type": {"text/event-stream"},
968+
},
969+
},
970+
p: &ReverseProxy{FlushInterval: 0},
971+
want: -1,
972+
},
973+
}
974+
for _, tt := range tests {
975+
t.Run(tt.name, func(t *testing.T) {
976+
got := tt.p.flushInterval(tt.req, tt.res)
977+
if got != tt.want {
978+
t.Errorf("flushLatency = %v; want %v", got, tt.want)
979+
}
980+
})
981+
}
982+
}

0 commit comments

Comments
 (0)