From 2ce39ec7610d44a6674ba8a68c832739adc4ddda Mon Sep 17 00:00:00 2001 From: Teo Koon Peng Date: Thu, 7 Sep 2023 10:34:57 +0800 Subject: [PATCH 1/3] wip Signed-off-by: Teo Koon Peng --- src/go/cmd/http-relay-client/client/client.go | 16 ++++- src/go/cmd/http-relay-server/server/server.go | 5 +- src/go/tests/relay/nok8s_relay_test.go | 61 ++++++++++++++++++- 3 files changed, 78 insertions(+), 4 deletions(-) diff --git a/src/go/cmd/http-relay-client/client/client.go b/src/go/cmd/http-relay-client/client/client.go index f29bef878..fb335aab0 100644 --- a/src/go/cmd/http-relay-client/client/client.go +++ b/src/go/cmd/http-relay-client/client/client.go @@ -91,6 +91,18 @@ type ClientConfig struct { ForceHttp2 bool } +type RelayServerError struct { + msg string +} + +func NewRelayServerError(msg string) error { + return &RelayServerError{msg} +} + +func (e *RelayServerError) Error() string { + return e.msg +} + func DefaultClientConfig() ClientConfig { return ClientConfig{ RemoteRequestTimeout: 60 * time.Second, @@ -384,7 +396,7 @@ func (c *Client) postResponse(remote *http.Client, br *pb.HttpResponse) error { return fmt.Errorf("couldn't read relay server's response body: %v", err) } if resp.StatusCode != http.StatusOK { - err := fmt.Errorf("relay server responded %s: %s", http.StatusText(resp.StatusCode), body) + err := NewRelayServerError(fmt.Sprintf("relay server responded %s: %s", http.StatusText(resp.StatusCode), body)) if resp.StatusCode == http.StatusBadRequest { // http-relay-server may have restarted during the request. return backoff.Permanent(err) @@ -643,7 +655,7 @@ func (c *Client) handleRequest(remote *http.Client, local *http.Client, pbreq *p log.Printf("[%s] Failed to post response to relay: %v", *resp.Id, err) }, ) - if _, ok := err.(*backoff.PermanentError); ok { + if _, ok := err.(*RelayServerError); ok { // A permanent error suggests the request should be aborted. break } diff --git a/src/go/cmd/http-relay-server/server/server.go b/src/go/cmd/http-relay-server/server/server.go index a5f5c3f7c..d7828a763 100644 --- a/src/go/cmd/http-relay-server/server/server.go +++ b/src/go/cmd/http-relay-server/server/server.go @@ -379,7 +379,10 @@ func (s *Server) userClientRequest(w http.ResponseWriter, r *http.Request) { numBytes := 0 for bytes := range backendRespBodyChannel { // TODO(b/130706300): detect dropped connection and end request in broker - _, _ = w.Write(bytes) + if _, err = w.Write(bytes); err != nil { + delete(s.b.resp, backendReq.GetId()) + return + } if flush, ok := w.(http.Flusher); ok { flush.Flush() } diff --git a/src/go/tests/relay/nok8s_relay_test.go b/src/go/tests/relay/nok8s_relay_test.go index 4a216b18f..b19827fc9 100644 --- a/src/go/tests/relay/nok8s_relay_test.go +++ b/src/go/tests/relay/nok8s_relay_test.go @@ -15,6 +15,7 @@ package main import ( + "bufio" "bytes" "context" "fmt" @@ -127,7 +128,7 @@ func (r *relay) stop() error { } // TestHttpRelay launches a local http relay (client + server) and connects a -// test-hhtp-server as a backend. The test is then interacting with the backend +// test-http-server as a backend. The test is then interacting with the backend // through the local relay. func TestHttpRelay(t *testing.T) { tests := []struct { @@ -200,6 +201,64 @@ func TestHttpRelay(t *testing.T) { } } +func TestDroppedUserClientFreesRelayChannel(t *testing.T) { + // setup http test server + connClosed := make(chan error) + finishServer := make(chan bool) + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + for { + select { + case <-finishServer: + return + default: + if _, err := fmt.Fprintln(w, "DEADBEEF"); err != nil { + connClosed <- err + println("server closed") + return + } + if flusher, ok := w.(http.Flusher); ok { + flusher.Flush() + } else { + t.Fatal("cannot flush") + } + time.Sleep(time.Second) + } + } + })) + defer ts.Close() + defer func() { finishServer <- true }() + + backendAddress := strings.TrimPrefix(ts.URL, "http://") + r := &relay{} + if err := r.start(backendAddress); err != nil { + t.Fatal("failed to start relay: ", err) + } + defer func() { + if err := r.stop(); err != nil { + t.Fatal("failed to stop relay: ", err) + } + }() + relayAddress := "http://127.0.0.1:" + r.rsPort + + res, err := http.Get(relayAddress + "/client/remote1/") + if err != nil { + t.Fatal(err) + } + // receive the first chunk then terminates the connection + if _, err := bufio.NewReader(res.Body).ReadString('\n'); err != nil { + t.Fatal(err) + } + res.Body.Close() + println("client closed") + + // wait for up to 10s for server to close connection + select { + case <-connClosed: + case <-time.After(10 * time.Second): + t.Error("Server did not close connection") + } +} + type testServer struct { testpb.UnimplementedTestServiceServer } From 6dbe16b10c471dccedfd6ae8309ac90285f47b05 Mon Sep 17 00:00:00 2001 From: Teo Koon Peng Date: Thu, 7 Sep 2023 12:29:16 +0800 Subject: [PATCH 2/3] working for chunked http response Signed-off-by: Teo Koon Peng --- src/go/cmd/http-relay-client/client/client.go | 3 +++ src/go/tests/relay/nok8s_relay_test.go | 15 +++++++++------ 2 files changed, 12 insertions(+), 6 deletions(-) diff --git a/src/go/cmd/http-relay-client/client/client.go b/src/go/cmd/http-relay-client/client/client.go index fb335aab0..24b3c344a 100644 --- a/src/go/cmd/http-relay-client/client/client.go +++ b/src/go/cmd/http-relay-client/client/client.go @@ -657,6 +657,9 @@ func (c *Client) handleRequest(remote *http.Client, local *http.Client, pbreq *p ) if _, ok := err.(*RelayServerError); ok { // A permanent error suggests the request should be aborted. + log.Printf("[%s] %s", *resp.Id, err) + log.Printf("[%s] Closing backend connection", *resp.Id) + hresp.Body.Close() break } } diff --git a/src/go/tests/relay/nok8s_relay_test.go b/src/go/tests/relay/nok8s_relay_test.go index 5b94289f2..eb071aa6c 100644 --- a/src/go/tests/relay/nok8s_relay_test.go +++ b/src/go/tests/relay/nok8s_relay_test.go @@ -202,10 +202,16 @@ func TestHttpRelay(t *testing.T) { } } +// TestDroppedUserClientFreesRelayChannel checks that when the user client closes a connection, +// it is propagated to the relay server and client, closing the backend connection as well. func TestDroppedUserClientFreesRelayChannel(t *testing.T) { // setup http test server connClosed := make(chan error) + defer close(connClosed) finishServer := make(chan bool) + defer close(finishServer) + + // mock a long running backend that uses chunking to send periodic updates ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { for { select { @@ -214,7 +220,6 @@ func TestDroppedUserClientFreesRelayChannel(t *testing.T) { default: if _, err := fmt.Fprintln(w, "DEADBEEF"); err != nil { connClosed <- err - println("server closed") return } if flusher, ok := w.(http.Flusher); ok { @@ -226,8 +231,7 @@ func TestDroppedUserClientFreesRelayChannel(t *testing.T) { } } })) - defer ts.Close() - defer func() { finishServer <- true }() + defer func() { ts.Close() }() backendAddress := strings.TrimPrefix(ts.URL, "http://") r := &relay{} @@ -250,12 +254,11 @@ func TestDroppedUserClientFreesRelayChannel(t *testing.T) { t.Fatal(err) } res.Body.Close() - println("client closed") - // wait for up to 10s for server to close connection + // wait for up to 30s for backend connection to be closed select { case <-connClosed: - case <-time.After(10 * time.Second): + case <-time.After(30 * time.Second): t.Error("Server did not close connection") } } From b753c3c7ba99f36cebd5632009f9a3ccc5c2089c Mon Sep 17 00:00:00 2001 From: Teo Koon Peng Date: Thu, 7 Sep 2023 12:53:30 +0800 Subject: [PATCH 3/3] use defer to ensure connection is closed Signed-off-by: Teo Koon Peng --- src/go/cmd/http-relay-server/server/broker.go | 6 ++++++ src/go/cmd/http-relay-server/server/server.go | 10 ++++++---- 2 files changed, 12 insertions(+), 4 deletions(-) diff --git a/src/go/cmd/http-relay-server/server/broker.go b/src/go/cmd/http-relay-server/server/broker.go index 94c20f0dd..b1d5b8dd3 100644 --- a/src/go/cmd/http-relay-server/server/broker.go +++ b/src/go/cmd/http-relay-server/server/broker.go @@ -171,6 +171,12 @@ func (r *broker) RelayRequest(server string, request *pb.HttpRequest) (<-chan *p } } +// StopRelayRequest forgets a relaying request, this causes the next chunk from the backend +// with the relay id to not be recognized, resulting in the relay server returning an error. +func (r *broker) StopRelayRequest(requestId string) { + delete(r.resp, requestId) +} + // GetRequest obtains a client's request for the server identifier. It blocks // until a client makes a request. func (r *broker) GetRequest(ctx context.Context, server, path string) (*pb.HttpRequest, error) { diff --git a/src/go/cmd/http-relay-server/server/server.go b/src/go/cmd/http-relay-server/server/server.go index 150f3b273..cce7f1076 100644 --- a/src/go/cmd/http-relay-server/server/server.go +++ b/src/go/cmd/http-relay-server/server/server.go @@ -257,8 +257,10 @@ func (s *Server) bidirectionalStream(backendCtx backendContext, w http.ResponseW numBytes := 0 for responseChunk := range responseChunks { - // TODO(b/130706300): detect dropped connection and end request in broker - _, _ = bufrw.Write(responseChunk.Body) + if _, err = w.Write(responseChunk.Body); err != nil { + log.Printf("[%s] %s", backendCtx.Id, err) + return + } bufrw.Flush() numBytes += len(responseChunk.Body) } @@ -378,6 +380,7 @@ func (s *Server) userClientRequest(w http.ResponseWriter, r *http.Request) { http.Error(w, err.Error(), http.StatusInternalServerError) return } + defer s.b.StopRelayRequest(backendCtx.Id) header, responseChunksChan, done := s.waitForFirstResponseAndHandleSwitching(ctx, *backendCtx, w, backendRespChan) if done { @@ -392,9 +395,8 @@ func (s *Server) userClientRequest(w http.ResponseWriter, r *http.Request) { // i.e. this will block until numBytes := 0 for responseChunk := range responseChunksChan { - // TODO(b/130706300): detect dropped connection and end request in broker if _, err = w.Write(responseChunk.Body); err != nil { - delete(s.b.resp, backendReq.GetId()) + log.Printf("[%s] %s", backendCtx.Id, err) return } if flush, ok := w.(http.Flusher); ok {