Skip to content

fix leaking connections when user client closes connection #220

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/bootstrap/cloud/terraform/service-account.tf
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ resource "google_project_iam_member" "robot-service-roles" {
project = data.google_project.project.project_id
member = "serviceAccount:${google_service_account.robot-service.email}"
for_each = toset([
"roles/cloudtrace.agent", # Upload cloud traces
"roles/cloudtrace.agent", # Upload cloud traces
"roles/logging.logWriter", # Upload text logs to Cloud logging
# Required to use robot-service@ for GKE clusters that simulate robots
"roles/monitoring.viewer",
Expand Down
19 changes: 17 additions & 2 deletions src/go/cmd/http-relay-client/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,18 @@ type ClientConfig struct {
ForceHttp2 bool
}

type RelayServerError struct {
msg string
}

func NewRelayServerError(msg string) error {
return &RelayServerError{msg}
}

func (e *RelayServerError) Error() string {
return e.msg
}

func DefaultClientConfig() ClientConfig {
return ClientConfig{
RemoteRequestTimeout: 60 * time.Second,
Expand Down Expand Up @@ -384,7 +396,7 @@ func (c *Client) postResponse(remote *http.Client, br *pb.HttpResponse) error {
return fmt.Errorf("couldn't read relay server's response body: %v", err)
}
if resp.StatusCode != http.StatusOK {
err := fmt.Errorf("relay server responded %s: %s", http.StatusText(resp.StatusCode), body)
err := NewRelayServerError(fmt.Sprintf("relay server responded %s: %s", http.StatusText(resp.StatusCode), body))
if resp.StatusCode == http.StatusBadRequest {
// http-relay-server may have restarted during the request.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please extend this comment to say "or the client cancelled the request."

return backoff.Permanent(err)
Expand Down Expand Up @@ -643,8 +655,11 @@ func (c *Client) handleRequest(remote *http.Client, local *http.Client, pbreq *p
log.Printf("[%s] Failed to post response to relay: %v", *resp.Id, err)
},
)
if _, ok := err.(*backoff.PermanentError); ok {
if _, ok := err.(*RelayServerError); ok {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a change in behavior: Before, only HTTP 400 would terminate the loop, now anything other than HTTP 200 terminates the loop. For example, if the nginx ingress is overloaded and returns 500s for a while, this will now terminate the request. I think this is a good thing: There is no reason for us to drop a chunk from the middle of the response body, and it's better to terminate the request.

I think you could go even further: any error returned from RetryNotify indicates that we aren't sure we've posted the response, and that we should stop this loop rather than incorrectly continuing with the next response chunk. WDYT?

Please also delete the next comment which is no longer true (this could be a "transient" 5xx error that lasted too long). Maybe // The relay server was unreachable for too long, so we dropped the chunk and should abort the request.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That sounds good, silently dropping chunks will cause clients to receive corrupted data, it breaks the normal assumption of http/tcp connections.

// A permanent error suggests the request should be aborted.
log.Printf("[%s] %s", *resp.Id, err)
log.Printf("[%s] Closing backend connection", *resp.Id)
hresp.Body.Close()
break
}
}
Expand Down
6 changes: 6 additions & 0 deletions src/go/cmd/http-relay-server/server/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,12 @@ func (r *broker) RelayRequest(server string, request *pb.HttpRequest) (<-chan *p
}
}

// StopRelayRequest forgets a relaying request, this causes the next chunk from the backend
// with the relay id to not be recognized, resulting in the relay server returning an error.
func (r *broker) StopRelayRequest(requestId string) {
delete(r.resp, requestId)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please hold the mutex while accessing the map, Go maps are not thread-safe.

}

// GetRequest obtains a client's request for the server identifier. It blocks
// until a client makes a request.
func (r *broker) GetRequest(ctx context.Context, server, path string) (*pb.HttpRequest, error) {
Expand Down
13 changes: 9 additions & 4 deletions src/go/cmd/http-relay-server/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,8 +257,10 @@ func (s *Server) bidirectionalStream(backendCtx backendContext, w http.ResponseW

numBytes := 0
for responseChunk := range responseChunks {
// TODO(b/130706300): detect dropped connection and end request in broker
_, _ = bufrw.Write(responseChunk.Body)
if _, err = w.Write(responseChunk.Body); err != nil {
log.Printf("[%s] %s", backendCtx.Id, err)
return
}
bufrw.Flush()
numBytes += len(responseChunk.Body)
}
Expand Down Expand Up @@ -378,6 +380,7 @@ func (s *Server) userClientRequest(w http.ResponseWriter, r *http.Request) {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
defer s.b.StopRelayRequest(backendCtx.Id)

header, responseChunksChan, done := s.waitForFirstResponseAndHandleSwitching(ctx, *backendCtx, w, backendRespChan)
if done {
Expand All @@ -392,8 +395,10 @@ func (s *Server) userClientRequest(w http.ResponseWriter, r *http.Request) {
// i.e. this will block until
numBytes := 0
for responseChunk := range responseChunksChan {
// TODO(b/130706300): detect dropped connection and end request in broker
_, _ = w.Write(responseChunk.Body)
if _, err = w.Write(responseChunk.Body); err != nil {
log.Printf("[%s] %s", backendCtx.Id, err)
return
}
if flush, ok := w.(http.Flusher); ok {
flush.Flush()
}
Expand Down
64 changes: 63 additions & 1 deletion src/go/tests/relay/nok8s_relay_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package main

import (
"bufio"
"bytes"
"context"
"fmt"
Expand Down Expand Up @@ -128,7 +129,7 @@ func (r *relay) stop() error {
}

// TestHttpRelay launches a local http relay (client + server) and connects a
// test-hhtp-server as a backend. The test is then interacting with the backend
// test-http-server as a backend. The test is then interacting with the backend
// through the local relay.
func TestHttpRelay(t *testing.T) {
tests := []struct {
Expand Down Expand Up @@ -201,6 +202,67 @@ func TestHttpRelay(t *testing.T) {
}
}

// TestDroppedUserClientFreesRelayChannel checks that when the user client closes a connection,
// it is propagated to the relay server and client, closing the backend connection as well.
func TestDroppedUserClientFreesRelayChannel(t *testing.T) {
// setup http test server
connClosed := make(chan error)
defer close(connClosed)
finishServer := make(chan bool)
defer close(finishServer)

// mock a long running backend that uses chunking to send periodic updates
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
for {
select {
case <-finishServer:
return
default:
if _, err := fmt.Fprintln(w, "DEADBEEF"); err != nil {
connClosed <- err
return
}
if flusher, ok := w.(http.Flusher); ok {
flusher.Flush()
} else {
t.Fatal("cannot flush")
}
time.Sleep(time.Second)
}
}
}))
defer func() { ts.Close() }()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this can be defer ts.Close()


backendAddress := strings.TrimPrefix(ts.URL, "http://")
r := &relay{}
if err := r.start(backendAddress); err != nil {
t.Fatal("failed to start relay: ", err)
}
defer func() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would just do defer r.stop() here. As far as I can tell, t.Fatal() inside a deferred function is not handled well: golang/go#29207

if err := r.stop(); err != nil {
t.Fatal("failed to stop relay: ", err)
}
}()
relayAddress := "http://127.0.0.1:" + r.rsPort

res, err := http.Get(relayAddress + "/client/remote1/")
if err != nil {
t.Fatal(err)
}
// receive the first chunk then terminates the connection
if _, err := bufio.NewReader(res.Body).ReadString('\n'); err != nil {
t.Fatal(err)
}
res.Body.Close()

// wait for up to 30s for backend connection to be closed
select {
case <-connClosed:
case <-time.After(30 * time.Second):
t.Error("Server did not close connection")
}
}

type testServer struct {
testpb.UnimplementedTestServiceServer
responsePayload []byte
Expand Down