Skip to content

Commit e5febf9

Browse files
committed
net/http: flush request body chunks in Transport
The Transport's writer to the remote server is wrapped in a bufio.Writer to suppress many small writes while writing headers and trailers. However, when writing the request body, the buffering may get in the way if the request body is arriving slowly. Because the io.Copy from the Request.Body to the writer is already buffered, the outer bufio.Writer is unnecessary and prevents small Request.Body.Reads from going to the server right away. (and the io.Reader contract does say to return when you've got something, instead of blocking waiting for more). After the body is finished, the Transport's bufio.Writer is still used for any trailers following. A previous attempted fix for this made the chunk writer always flush if the underlying type was a bufio.Writer, but that is not quite correct. This CL instead makes it opt-in by using a private sentinel type (wrapping a *bufio.Writer) to the chunk writer that requests Flushes after each chunk body (the chunk header & chunk body are still buffered together into one write). Fixes #6574 Change-Id: Icefcdf17130c9e285c80b69af295bfd3e72c3a70 Reviewed-on: https://go-review.googlesource.com/10021 Reviewed-by: Andrew Gerrand <[email protected]> Run-TryBot: Brad Fitzpatrick <[email protected]> TryBot-Result: Gobot Gobot <[email protected]>
1 parent 5c7f944 commit e5febf9

File tree

3 files changed

+119
-2
lines changed

3 files changed

+119
-2
lines changed

src/net/http/internal/chunked.go

+15-2
Original file line numberDiff line numberDiff line change
@@ -173,8 +173,12 @@ func (cw *chunkedWriter) Write(data []byte) (n int, err error) {
173173
err = io.ErrShortWrite
174174
return
175175
}
176-
_, err = io.WriteString(cw.Wire, "\r\n")
177-
176+
if _, err = io.WriteString(cw.Wire, "\r\n"); err != nil {
177+
return
178+
}
179+
if bw, ok := cw.Wire.(*FlushAfterChunkWriter); ok {
180+
err = bw.Flush()
181+
}
178182
return
179183
}
180184

@@ -183,6 +187,15 @@ func (cw *chunkedWriter) Close() error {
183187
return err
184188
}
185189

190+
// FlushAfterChunkWriter signals from the caller of NewChunkedWriter
191+
// that each chunk should be followed by a flush. It is used by the
192+
// http.Transport code to keep the buffering behavior for headers and
193+
// trailers, but flush out chunks aggressively in the middle for
194+
// request bodies which may be generated slowly. See Issue 6574.
195+
type FlushAfterChunkWriter struct {
196+
*bufio.Writer
197+
}
198+
186199
func parseHexUint(v []byte) (n uint64, err error) {
187200
for _, b := range v {
188201
n <<= 4

src/net/http/transfer.go

+5
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ type transferWriter struct {
4343
Close bool
4444
TransferEncoding []string
4545
Trailer Header
46+
IsResponse bool
4647
}
4748

4849
func newTransferWriter(r interface{}) (t *transferWriter, err error) {
@@ -89,6 +90,7 @@ func newTransferWriter(r interface{}) (t *transferWriter, err error) {
8990
}
9091
}
9192
case *Response:
93+
t.IsResponse = true
9294
if rr.Request != nil {
9395
t.Method = rr.Request.Method
9496
}
@@ -206,6 +208,9 @@ func (t *transferWriter) WriteBody(w io.Writer) error {
206208
// Write body
207209
if t.Body != nil {
208210
if chunked(t.TransferEncoding) {
211+
if bw, ok := w.(*bufio.Writer); ok && !t.IsResponse {
212+
w = &internal.FlushAfterChunkWriter{bw}
213+
}
209214
cw := internal.NewChunkedWriter(w)
210215
_, err = io.Copy(cw, t.Body)
211216
if err == nil {

src/net/http/transport_test.go

+99
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"net/http/httptest"
2424
"net/url"
2525
"os"
26+
"reflect"
2627
"runtime"
2728
"strconv"
2829
"strings"
@@ -2447,6 +2448,104 @@ func TestTransportDialCancelRace(t *testing.T) {
24472448
}
24482449
}
24492450

2451+
// logWritesConn is a net.Conn that logs each Write call to writes
2452+
// and then proxies to w.
2453+
// It proxies Read calls to a reader it receives from rch.
2454+
type logWritesConn struct {
2455+
net.Conn // nil. crash on use.
2456+
2457+
w io.Writer
2458+
2459+
rch <-chan io.Reader
2460+
r io.Reader // nil until received by rch
2461+
2462+
mu sync.Mutex
2463+
writes []string
2464+
}
2465+
2466+
func (c *logWritesConn) Write(p []byte) (n int, err error) {
2467+
c.mu.Lock()
2468+
defer c.mu.Unlock()
2469+
c.writes = append(c.writes, string(p))
2470+
return c.w.Write(p)
2471+
}
2472+
2473+
func (c *logWritesConn) Read(p []byte) (n int, err error) {
2474+
if c.r == nil {
2475+
c.r = <-c.rch
2476+
}
2477+
return c.r.Read(p)
2478+
}
2479+
2480+
func (c *logWritesConn) Close() error { return nil }
2481+
2482+
// Issue 6574
2483+
func TestTransportFlushesBodyChunks(t *testing.T) {
2484+
defer afterTest(t)
2485+
resBody := make(chan io.Reader, 1)
2486+
connr, connw := io.Pipe() // connection pipe pair
2487+
lw := &logWritesConn{
2488+
rch: resBody,
2489+
w: connw,
2490+
}
2491+
tr := &Transport{
2492+
Dial: func(network, addr string) (net.Conn, error) {
2493+
return lw, nil
2494+
},
2495+
}
2496+
bodyr, bodyw := io.Pipe() // body pipe pair
2497+
go func() {
2498+
defer bodyw.Close()
2499+
for i := 0; i < 3; i++ {
2500+
fmt.Fprintf(bodyw, "num%d\n", i)
2501+
}
2502+
}()
2503+
resc := make(chan *Response)
2504+
go func() {
2505+
req, _ := NewRequest("POST", "http://localhost:8080", bodyr)
2506+
req.Header.Set("User-Agent", "x") // known value for test
2507+
res, err := tr.RoundTrip(req)
2508+
if err != nil {
2509+
t.Error("RoundTrip: %v", err)
2510+
close(resc)
2511+
return
2512+
}
2513+
resc <- res
2514+
2515+
}()
2516+
// Fully consume the request before checking the Write log vs. want.
2517+
req, err := ReadRequest(bufio.NewReader(connr))
2518+
if err != nil {
2519+
t.Fatal(err)
2520+
}
2521+
io.Copy(ioutil.Discard, req.Body)
2522+
2523+
// Unblock the transport's roundTrip goroutine.
2524+
resBody <- strings.NewReader("HTTP/1.1 204 No Content\r\nConnection: close\r\n\r\n")
2525+
res, ok := <-resc
2526+
if !ok {
2527+
return
2528+
}
2529+
defer res.Body.Close()
2530+
2531+
want := []string{
2532+
// Because Request.ContentLength = 0, the body is sniffed for 1 byte to determine whether there's content.
2533+
// That explains the initial "num0" being split into "n" and "um0".
2534+
// The first byte is included with the request headers Write. Perhaps in the future
2535+
// we will want to flush the headers out early if the first byte of the request body is
2536+
// taking a long time to arrive. But not yet.
2537+
"POST / HTTP/1.1\r\nHost: localhost:8080\r\nUser-Agent: x\r\nTransfer-Encoding: chunked\r\nAccept-Encoding: gzip\r\n\r\n" +
2538+
"1\r\nn\r\n",
2539+
"4\r\num0\n\r\n",
2540+
"5\r\nnum1\n\r\n",
2541+
"5\r\nnum2\n\r\n",
2542+
"0\r\n\r\n",
2543+
}
2544+
if !reflect.DeepEqual(lw.writes, want) {
2545+
t.Errorf("Writes differed.\n Got: %q\nWant: %q\n", lw.writes, want)
2546+
}
2547+
}
2548+
24502549
func wantBody(res *http.Response, err error, want string) error {
24512550
if err != nil {
24522551
return err

0 commit comments

Comments
 (0)