Skip to content

Commit 50d0a60

Browse files
committed
Revert "Revert #222 (#226)"
This reverts commit 1b29414.
1 parent dff6b4c commit 50d0a60

File tree

8 files changed

+110
-23
lines changed

8 files changed

+110
-23
lines changed

src/bootstrap/cloud/terraform/service-account.tf

+1-1
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ resource "google_project_iam_member" "robot-service-roles" {
6060
project = data.google_project.project.project_id
6161
member = "serviceAccount:${google_service_account.robot-service.email}"
6262
for_each = toset([
63-
"roles/cloudtrace.agent", # Upload cloud traces
63+
"roles/cloudtrace.agent", # Upload cloud traces
6464
"roles/logging.logWriter", # Upload text logs to Cloud logging
6565
# Required to use robot-service@ for GKE clusters that simulate robots
6666
"roles/monitoring.viewer",

src/go/cmd/http-relay-client/client/client.go

+20-4
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,18 @@ type ClientConfig struct {
9191
ForceHttp2 bool
9292
}
9393

94+
type RelayServerError struct {
95+
msg string
96+
}
97+
98+
func NewRelayServerError(msg string) error {
99+
return &RelayServerError{msg}
100+
}
101+
102+
func (e *RelayServerError) Error() string {
103+
return e.msg
104+
}
105+
94106
func DefaultClientConfig() ClientConfig {
95107
return ClientConfig{
96108
RemoteRequestTimeout: 60 * time.Second,
@@ -384,9 +396,9 @@ func (c *Client) postResponse(remote *http.Client, br *pb.HttpResponse) error {
384396
return fmt.Errorf("couldn't read relay server's response body: %v", err)
385397
}
386398
if resp.StatusCode != http.StatusOK {
387-
err := fmt.Errorf("relay server responded %s: %s", http.StatusText(resp.StatusCode), body)
399+
err := NewRelayServerError(fmt.Sprintf("relay server responded %s: %s", http.StatusText(resp.StatusCode), body))
388400
if resp.StatusCode == http.StatusBadRequest {
389-
// http-relay-server may have restarted during the request.
401+
// http-relay-server may have restarted or the client cancelled the request.
390402
return backoff.Permanent(err)
391403
}
392404
return err
@@ -564,6 +576,7 @@ func (c *Client) handleRequest(remote *http.Client, local *http.Client, pbreq *p
564576
defer span.End()
565577

566578
resp, hresp, err := makeBackendRequest(ctx, local, req, id)
579+
defer hresp.Body.Close()
567580
if err != nil {
568581
// Even if we couldn't handle the backend request, send an
569582
// answer to the relay that signals the error.
@@ -643,8 +656,11 @@ func (c *Client) handleRequest(remote *http.Client, local *http.Client, pbreq *p
643656
log.Printf("[%s] Failed to post response to relay: %v", *resp.Id, err)
644657
},
645658
)
646-
if _, ok := err.(*backoff.PermanentError); ok {
647-
// A permanent error suggests the request should be aborted.
659+
// Any error suggests the request should be aborted.
660+
// A missing chunk will cause clients to receive corrupted data, in most cases it is better
661+
// to close the connection to avoid that.
662+
if err != nil {
663+
log.Printf("[%s] Closing backend connection: %v", *resp.Id, err)
648664
break
649665
}
650666
}

src/go/cmd/http-relay-client/client/client_test.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ func TestLocalProxy(t *testing.T) {
106106
client := NewClient(config)
107107
err := client.localProxy(&http.Client{}, &http.Client{})
108108
if err != nil {
109-
t.Errorf("Unexpected error: %s", err)
109+
t.Errorf("Unexpected error: %v", err)
110110
}
111111
assertMocksDoneWithin(t, 10*time.Second)
112112
}
@@ -175,7 +175,7 @@ func TestBackendError(t *testing.T) {
175175
// 3. retrieves the response from the backend and sends it to the relay-server
176176
err := client.localProxy(&http.Client{}, &http.Client{})
177177
if err != nil {
178-
t.Errorf("Unexpected error: %s", err)
178+
t.Errorf("Unexpected error: %v", err)
179179
}
180180
assertMocksDoneWithin(t, 10*time.Second)
181181
}
@@ -206,7 +206,7 @@ func TestServerTimeout(t *testing.T) {
206206
client := NewClient(config)
207207
err := client.localProxy(&http.Client{}, &http.Client{})
208208
if err != ErrTimeout {
209-
t.Errorf("Unexpected error: %s", err)
209+
t.Errorf("Unexpected error: %v", err)
210210
}
211211
assertMocksDoneWithin(t, 10*time.Second)
212212
}

src/go/cmd/http-relay-server/server/broker.go

+8
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,14 @@ func (r *broker) RelayRequest(server string, request *pb.HttpRequest) (<-chan *p
171171
}
172172
}
173173

174+
// StopRelayRequest forgets a relaying request, this causes the next chunk from the backend
175+
// with the relay id to not be recognized, resulting in the relay server returning an error.
176+
func (r *broker) StopRelayRequest(requestId string) {
177+
r.m.Lock()
178+
defer r.m.Unlock()
179+
delete(r.resp, requestId)
180+
}
181+
174182
// GetRequest obtains a client's request for the server identifier. It blocks
175183
// until a client makes a request.
176184
func (r *broker) GetRequest(ctx context.Context, server, path string) (*pb.HttpRequest, error) {

src/go/cmd/http-relay-server/server/broker_test.go

+5-5
Original file line numberDiff line numberDiff line change
@@ -54,11 +54,11 @@ func runSender(t *testing.T, b *broker, s string, m string, wg *sync.WaitGroup)
5454
func runReceiver(t *testing.T, b *broker, s string, wg *sync.WaitGroup) {
5555
req, err := b.GetRequest(context.Background(), s, "/")
5656
if err != nil {
57-
t.Errorf("Error when getting request: %s", err)
57+
t.Errorf("Error when getting request: %v", err)
5858
}
5959
err = b.SendResponse(&pb.HttpResponse{Id: req.Id, Body: []byte(*req.Id), Eof: proto.Bool(true)})
6060
if err != nil {
61-
t.Errorf("Error when sending response: %s", err)
61+
t.Errorf("Error when sending response: %v", err)
6262
}
6363
wg.Done()
6464
}
@@ -89,17 +89,17 @@ func runSenderStream(t *testing.T, b *broker, s string, m string, wg *sync.WaitG
8989
func runReceiverStream(t *testing.T, b *broker, s string, wg *sync.WaitGroup, done <-chan bool) {
9090
req, err := b.GetRequest(context.Background(), s, "/")
9191
if err != nil {
92-
t.Errorf("Error when getting request: %s", err)
92+
t.Errorf("Error when getting request: %v", err)
9393
}
9494
err = b.SendResponse(&pb.HttpResponse{Id: req.Id, Body: []byte(*req.Id), Eof: proto.Bool(false)})
9595
if err != nil {
96-
t.Errorf("Error when sending response: %s", err)
96+
t.Errorf("Error when sending response: %v", err)
9797
}
9898
go func() {
9999
<-done
100100
err = b.SendResponse(&pb.HttpResponse{Id: req.Id, Body: []byte(*req.Id), Eof: proto.Bool(true)})
101101
if err != nil {
102-
t.Errorf("Error when sending response: %s", err)
102+
t.Errorf("Error when sending response: %v", err)
103103
}
104104
wg.Done()
105105
}()

src/go/cmd/http-relay-server/server/server.go

+10-5
Original file line numberDiff line numberDiff line change
@@ -257,8 +257,10 @@ func (s *Server) bidirectionalStream(backendCtx backendContext, w http.ResponseW
257257

258258
numBytes := 0
259259
for responseChunk := range responseChunks {
260-
// TODO(b/130706300): detect dropped connection and end request in broker
261-
_, _ = bufrw.Write(responseChunk.Body)
260+
if _, err = w.Write(responseChunk.Body); err != nil {
261+
log.Printf("[%s] Error writing response to bidi-stream: %v", backendCtx.Id, err)
262+
return
263+
}
262264
bufrw.Flush()
263265
numBytes += len(responseChunk.Body)
264266
}
@@ -378,6 +380,7 @@ func (s *Server) userClientRequest(w http.ResponseWriter, r *http.Request) {
378380
http.Error(w, err.Error(), http.StatusInternalServerError)
379381
return
380382
}
383+
defer s.b.StopRelayRequest(backendCtx.Id)
381384

382385
header, responseChunksChan, done := s.waitForFirstResponseAndHandleSwitching(ctx, *backendCtx, w, backendRespChan)
383386
if done {
@@ -392,8 +395,10 @@ func (s *Server) userClientRequest(w http.ResponseWriter, r *http.Request) {
392395
// i.e. this will block until
393396
numBytes := 0
394397
for responseChunk := range responseChunksChan {
395-
// TODO(b/130706300): detect dropped connection and end request in broker
396-
_, _ = w.Write(responseChunk.Body)
398+
if _, err = w.Write(responseChunk.Body); err != nil {
399+
log.Printf("[%s] Error writing response to user-client: %v", backendCtx.Id, err)
400+
return
401+
}
397402
if flush, ok := w.(http.Flusher); ok {
398403
flush.Flush()
399404
}
@@ -553,6 +558,6 @@ func (s *Server) Start(port int, blockSize int) {
553558
// update) or a failed liveness check (eg broker deadlock), we can't
554559
// easily tell. We panic to help debugging: if the environment sets
555560
// GOTRACEBACK=all they will see stacktraces after the panic.
556-
log.Panicf("Server terminated abnormally: %s", err)
561+
log.Panicf("Server terminated abnormally: %v", err)
557562
}
558563
}

src/go/cmd/http-relay-server/server/server_test.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -223,7 +223,7 @@ func TestClientBadRequest(t *testing.T) {
223223
if tc.wantMsg != "" {
224224
body, err := ioutil.ReadAll(resp.Body)
225225
if err != nil {
226-
t.Errorf("Failed to read body stream: %s", err)
226+
t.Errorf("Failed to read body stream: %v", err)
227227
}
228228
if !strings.Contains(string(body), tc.wantMsg) {
229229
t.Errorf("Wrong response body; want %q; got %q", tc.wantMsg, body)
@@ -326,7 +326,7 @@ func TestServerRequestResponseHandler(t *testing.T) {
326326
}
327327
backendRespBody, err := proto.Marshal(backendResp)
328328
if err != nil {
329-
t.Errorf("Failed to marshal test response: %s", err)
329+
t.Errorf("Failed to marshal test response: %v", err)
330330
}
331331

332332
req := httptest.NewRequest("GET", "/server/request?server=b", strings.NewReader(""))
@@ -354,7 +354,7 @@ func TestServerRequestResponseHandler(t *testing.T) {
354354
}
355355
body, err := ioutil.ReadAll(reqRecorder.Result().Body)
356356
if err != nil {
357-
t.Errorf("Failed to read body stream: %s", err)
357+
t.Errorf("Failed to read body stream: %v", err)
358358
}
359359
if !strings.Contains(string(body), "/my/url") {
360360
t.Errorf("Serialize request didn't contain URL: %s", string(body))
@@ -385,7 +385,7 @@ func TestServerResponseHandlerWithInvalidRequestID(t *testing.T) {
385385
}
386386
backendRespBody, err := proto.Marshal(backendResp)
387387
if err != nil {
388-
t.Errorf("Failed to marshal test response: %s", err)
388+
t.Errorf("Failed to marshal test response: %v", err)
389389
}
390390

391391
resp := httptest.NewRequest("POST", "/server/response", bytes.NewReader(backendRespBody))

src/go/tests/relay/nok8s_relay_test.go

+59-1
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
package main
1616

1717
import (
18+
"bufio"
1819
"bytes"
1920
"context"
2021
"fmt"
@@ -128,7 +129,7 @@ func (r *relay) stop() error {
128129
}
129130

130131
// TestHttpRelay launches a local http relay (client + server) and connects a
131-
// test-hhtp-server as a backend. The test is then interacting with the backend
132+
// test-http-server as a backend. The test is then interacting with the backend
132133
// through the local relay.
133134
func TestHttpRelay(t *testing.T) {
134135
tests := []struct {
@@ -201,6 +202,63 @@ func TestHttpRelay(t *testing.T) {
201202
}
202203
}
203204

205+
// TestDroppedUserClientFreesRelayChannel checks that when the user client closes a connection,
206+
// it is propagated to the relay server and client, closing the backend connection as well.
207+
func TestDroppedUserClientFreesRelayChannel(t *testing.T) {
208+
// setup http test server
209+
connClosed := make(chan error)
210+
defer close(connClosed)
211+
finishServer := make(chan bool)
212+
defer close(finishServer)
213+
214+
// mock a long running backend that uses chunking to send periodic updates
215+
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
216+
for {
217+
select {
218+
case <-finishServer:
219+
return
220+
default:
221+
if _, err := fmt.Fprintln(w, "DEADBEEF"); err != nil {
222+
connClosed <- err
223+
return
224+
}
225+
if flusher, ok := w.(http.Flusher); ok {
226+
flusher.Flush()
227+
} else {
228+
t.Fatal("cannot flush")
229+
}
230+
time.Sleep(time.Second)
231+
}
232+
}
233+
}))
234+
defer ts.Close()
235+
236+
backendAddress := strings.TrimPrefix(ts.URL, "http://")
237+
r := &relay{}
238+
if err := r.start(backendAddress); err != nil {
239+
t.Fatal("failed to start relay: ", err)
240+
}
241+
defer r.stop()
242+
relayAddress := "http://127.0.0.1:" + r.rsPort
243+
244+
res, err := http.Get(relayAddress + "/client/remote1/")
245+
if err != nil {
246+
t.Fatal(err)
247+
}
248+
// receive the first chunk then terminates the connection
249+
if _, err := bufio.NewReader(res.Body).ReadString('\n'); err != nil {
250+
t.Fatal(err)
251+
}
252+
res.Body.Close()
253+
254+
// wait for up to 30s for backend connection to be closed
255+
select {
256+
case <-connClosed:
257+
case <-time.After(30 * time.Second):
258+
t.Error("Server did not close connection")
259+
}
260+
}
261+
204262
type testServer struct {
205263
testpb.UnimplementedTestServiceServer
206264
responsePayload []byte

0 commit comments

Comments
 (0)