From e337e2bc51339f3825837e3cc45cc1a4df1ae586 Mon Sep 17 00:00:00 2001 From: Purnesh Dixit Date: Sun, 17 Nov 2024 17:52:14 +0530 Subject: [PATCH 01/10] xds/internal/xdsclient/test: new watcher resource caching behavior --- .../xdsclient/tests/lds_watchers_test.go | 122 ++++++++++++++++++ 1 file changed, 122 insertions(+) diff --git a/xds/internal/xdsclient/tests/lds_watchers_test.go b/xds/internal/xdsclient/tests/lds_watchers_test.go index 38e1f1760383..d3501ffb243f 100644 --- a/xds/internal/xdsclient/tests/lds_watchers_test.go +++ b/xds/internal/xdsclient/tests/lds_watchers_test.go @@ -1002,6 +1002,128 @@ func (s) TestLDSWatch_NACKError(t *testing.T) { } } +// TestLDSWatch_ResourceCaching_WithNACKError covers the case where a watch is +// registered for a resource which is already present in the cache with an old +// good update and latest NACK error. The test verifies that new watcher +// receives both good update and error without request being sent to the +// management server. +func (s) TestLDSWatch_ResourceCaching_NACKError(t *testing.T) { + firstRequestReceived := false + firstAckReceived := grpcsync.NewEvent() + secondRequestReceived := grpcsync.NewEvent() + + mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{ + OnStreamRequest: func(id int64, req *v3discoverypb.DiscoveryRequest) error { + // The first request has an empty version string. + if !firstRequestReceived && req.GetVersionInfo() == "" { + firstRequestReceived = true + return nil + } + // The first ack has a non-empty version string. + if !firstAckReceived.HasFired() && req.GetVersionInfo() != "" { + firstAckReceived.Fire() + return nil + } + // Any requests after the first request and ack, are not expected. + secondRequestReceived.Fire() + return nil + }, + }) + + nodeID := uuid.New().String() + bc := e2e.DefaultBootstrapContents(t, nodeID, mgmtServer.Address) + testutils.CreateBootstrapFileForTesting(t, bc) + + // Create an xDS client with the above bootstrap contents. + client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{ + Name: t.Name(), + Contents: bc, + }) + if err != nil { + t.Fatalf("Failed to create xDS client: %v", err) + } + defer close() + + // Register a watch for a listener resource and have the watch + // callback push the received update on to a channel. + lw1 := newListenerWatcher() + ldsCancel1 := xdsresource.WatchListener(client, ldsName, lw1) + defer ldsCancel1() + // Configure the management server to return a single listener + // resource, corresponding to the one we registered a watch for. + resources := e2e.UpdateOptions{ + NodeID: nodeID, + Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(ldsName, rdsName)}, + SkipValidation: true, + } + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + if err := mgmtServer.Update(ctx, resources); err != nil { + t.Fatalf("Failed to update management server with resources: %v, err: %v", resources, err) + } + // Verify the contents of the received update. + wantUpdate := listenerUpdateErrTuple{ + update: xdsresource.ListenerUpdate{ + RouteConfigName: rdsName, + HTTPFilters: []xdsresource.HTTPFilter{{Name: "router"}}, + }, + } + if err := verifyListenerUpdate(ctx, lw1.updateCh, wantUpdate); err != nil { + t.Fatal(err) + } + select { + case <-ctx.Done(): + t.Fatal("timeout when waiting for receipt of ACK at the management server") + case <-firstAckReceived.Done(): + } + + // Configure the management server to return a single listener resource + // which is expected to be NACKed by the client. + resources = e2e.UpdateOptions{ + NodeID: nodeID, + Listeners: []*v3listenerpb.Listener{badListenerResource(t, ldsName)}, + SkipValidation: true, + } + if err := mgmtServer.Update(ctx, resources); err != nil { + t.Fatalf("Failed to update management server with resources: %v, err: %v", resources, err) + } + // Verify that the expected error is propagated to the existing watcher. + u, err := lw1.updateCh.Receive(ctx) + if err != nil { + t.Fatalf("timeout when waiting for a listener resource from the management server: %v", err) + } + gotErr := u.(listenerUpdateErrTuple).err + if gotErr == nil || !strings.Contains(gotErr.Error(), wantListenerNACKErr) { + t.Fatalf("update received with error: %v, want %q", gotErr, wantListenerNACKErr) + } + + // Register another watch for the same resource. This should get the update + // and error from the cache. + lw2 := newListenerWatcher() + ldsCancel2 := xdsresource.WatchListener(client, ldsName, lw2) + defer ldsCancel2() + if err := verifyListenerUpdate(ctx, lw2.updateCh, wantUpdate); err != nil { + t.Fatal(err) + } + u, err = lw2.updateCh.Receive(ctx) + if err != nil { + t.Fatalf("timeout when waiting for a listener resource from the management server: %v", err) + } + gotErr = u.(listenerUpdateErrTuple).err + if gotErr == nil || !strings.Contains(gotErr.Error(), wantListenerNACKErr) { + t.Fatalf("update received with error: %v, want %q", gotErr, wantListenerNACKErr) + } + + // No request should get sent out as part of this watch. + sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout) + defer sCancel() + select { + case <-sCtx.Done(): + case <-secondRequestReceived.Done(): + t.Fatal("xdsClient sent out request instead of using update from cache") + } +} + // TestLDSWatch_PartialValid covers the case where a response from the // management server contains both valid and invalid resources and is expected // to be NACK'ed by the xdsclient. The test verifies that watchers corresponding From 9d6bd0e7f13b6c8dab1ed60664d7d02c2e46d44f Mon Sep 17 00:00:00 2001 From: Purnesh Dixit Date: Sun, 17 Nov 2024 23:15:34 +0530 Subject: [PATCH 02/10] test to verify both update and error are sent --- xds/internal/xdsclient/authority.go | 8 ++- .../xdsclient/tests/lds_watchers_test.go | 61 ++++++++++++++----- 2 files changed, 52 insertions(+), 17 deletions(-) diff --git a/xds/internal/xdsclient/authority.go b/xds/internal/xdsclient/authority.go index 04bd278d2c47..b08737f73fa4 100644 --- a/xds/internal/xdsclient/authority.go +++ b/xds/internal/xdsclient/authority.go @@ -633,13 +633,19 @@ func (a *authority) watchResource(rType xdsresource.Type, resourceName string, w // Always add the new watcher to the set of watchers. state.watchers[watcher] = true - // If we have a cached copy of the resource, notify the new watcher. + // If we have a cached copy of the resource, notify the new watcher + // immediately. if state.cache != nil { if a.logger.V(2) { a.logger.Infof("Resource type %q with resource name %q found in cache: %s", rType.TypeName(), resourceName, state.cache.ToJSON()) } resource := state.cache a.watcherCallbackSerializer.TrySchedule(func(context.Context) { watcher.OnUpdate(resource, func() {}) }) + // If last update was NACK'd, notify the new watcher of error + // immediately as well. + if state.md.Status == xdsresource.ServiceStatusNACKed && state.md.ErrState != nil { + a.watcherCallbackSerializer.TrySchedule(func(context.Context) { watcher.OnError(state.md.ErrState.Err, func() {}) }) + } } // If the metadata field is updated to indicate that the management // server does not have this resource, notify the new watcher. diff --git a/xds/internal/xdsclient/tests/lds_watchers_test.go b/xds/internal/xdsclient/tests/lds_watchers_test.go index d3501ffb243f..f0cd8f6e0c2b 100644 --- a/xds/internal/xdsclient/tests/lds_watchers_test.go +++ b/xds/internal/xdsclient/tests/lds_watchers_test.go @@ -77,16 +77,39 @@ func (cw *listenerWatcher) OnUpdate(update *xdsresource.ListenerResourceData, on } func (cw *listenerWatcher) OnError(err error, onDone xdsresource.OnDoneFunc) { + cw.updateCh.Replace(listenerUpdateErrTuple{err: err}) + onDone() +} + +func (cw *listenerWatcher) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) { + cw.updateCh.Replace(listenerUpdateErrTuple{err: xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "Listener not found in received response")}) + onDone() +} + +type listenerWatcherMultiple struct { + updateCh *testutils.Channel +} + +func newListenerWatcherMultiple() *listenerWatcherMultiple { + return &listenerWatcherMultiple{updateCh: testutils.NewChannelWithSize(2)} +} + +func (cw *listenerWatcherMultiple) OnUpdate(update *xdsresource.ListenerResourceData, onDone xdsresource.OnDoneFunc) { + cw.updateCh.Send(listenerUpdateErrTuple{update: update.Resource}) + onDone() +} + +func (cw *listenerWatcherMultiple) OnError(err error, onDone xdsresource.OnDoneFunc) { // When used with a go-control-plane management server that continuously // resends resources which are NACKed by the xDS client, using a `Replace()` // here and in OnResourceDoesNotExist() simplifies tests which will have // access to the most recently received error. - cw.updateCh.Replace(listenerUpdateErrTuple{err: err}) + cw.updateCh.Send(listenerUpdateErrTuple{err: err}) onDone() } -func (cw *listenerWatcher) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) { - cw.updateCh.Replace(listenerUpdateErrTuple{err: xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "Listener not found in received response")}) +func (cw *listenerWatcherMultiple) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) { + cw.updateCh.Send(listenerUpdateErrTuple{err: xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "Listener not found in received response")}) onDone() } @@ -1004,9 +1027,9 @@ func (s) TestLDSWatch_NACKError(t *testing.T) { // TestLDSWatch_ResourceCaching_WithNACKError covers the case where a watch is // registered for a resource which is already present in the cache with an old -// good update and latest NACK error. The test verifies that new watcher -// receives both good update and error without request being sent to the -// management server. +// good update as well latest NACK error. The test verifies that new watcher +// receives both good update and error without a new resource request being +// sent to the management server. func (s) TestLDSWatch_ResourceCaching_NACKError(t *testing.T) { firstRequestReceived := false firstAckReceived := grpcsync.NewEvent() @@ -1024,6 +1047,13 @@ func (s) TestLDSWatch_ResourceCaching_NACKError(t *testing.T) { firstAckReceived.Fire() return nil } + // If the request version remains "1" while the nonce keeps + // increasing, it indicates the client is repeatedly NACKing + // updates from the server but not sending any new resource + // request. + if req.GetVersionInfo() == "1" { + return nil + } // Any requests after the first request and ack, are not expected. secondRequestReceived.Fire() return nil @@ -1099,21 +1129,12 @@ func (s) TestLDSWatch_ResourceCaching_NACKError(t *testing.T) { // Register another watch for the same resource. This should get the update // and error from the cache. - lw2 := newListenerWatcher() + lw2 := newListenerWatcherMultiple() ldsCancel2 := xdsresource.WatchListener(client, ldsName, lw2) defer ldsCancel2() if err := verifyListenerUpdate(ctx, lw2.updateCh, wantUpdate); err != nil { t.Fatal(err) } - u, err = lw2.updateCh.Receive(ctx) - if err != nil { - t.Fatalf("timeout when waiting for a listener resource from the management server: %v", err) - } - gotErr = u.(listenerUpdateErrTuple).err - if gotErr == nil || !strings.Contains(gotErr.Error(), wantListenerNACKErr) { - t.Fatalf("update received with error: %v, want %q", gotErr, wantListenerNACKErr) - } - // No request should get sent out as part of this watch. sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout) defer sCancel() @@ -1122,6 +1143,14 @@ func (s) TestLDSWatch_ResourceCaching_NACKError(t *testing.T) { case <-secondRequestReceived.Done(): t.Fatal("xdsClient sent out request instead of using update from cache") } + u, err = lw2.updateCh.Receive(ctx) + if err != nil { + t.Fatalf("timeout when waiting for a listener resource from the management server: %v", err) + } + gotErr = u.(listenerUpdateErrTuple).err + if gotErr == nil || !strings.Contains(gotErr.Error(), wantListenerNACKErr) { + t.Fatalf("update received with error: %v, want %q", gotErr, wantListenerNACKErr) + } } // TestLDSWatch_PartialValid covers the case where a response from the From 9a51cf34694d9a2a53a107ff2449f00030d1925d Mon Sep 17 00:00:00 2001 From: Purnesh Dixit Date: Mon, 18 Nov 2024 15:30:14 +0530 Subject: [PATCH 03/10] test without cache verification --- .../xdsclient/tests/lds_watchers_test.go | 48 ++----------------- 1 file changed, 4 insertions(+), 44 deletions(-) diff --git a/xds/internal/xdsclient/tests/lds_watchers_test.go b/xds/internal/xdsclient/tests/lds_watchers_test.go index f0cd8f6e0c2b..75b80e3b7bb9 100644 --- a/xds/internal/xdsclient/tests/lds_watchers_test.go +++ b/xds/internal/xdsclient/tests/lds_watchers_test.go @@ -90,8 +90,8 @@ type listenerWatcherMultiple struct { updateCh *testutils.Channel } -func newListenerWatcherMultiple() *listenerWatcherMultiple { - return &listenerWatcherMultiple{updateCh: testutils.NewChannelWithSize(2)} +func newListenerWatcherMultiple(size int) *listenerWatcherMultiple { + return &listenerWatcherMultiple{updateCh: testutils.NewChannelWithSize(size)} } func (cw *listenerWatcherMultiple) OnUpdate(update *xdsresource.ListenerResourceData, onDone xdsresource.OnDoneFunc) { @@ -1031,34 +1031,7 @@ func (s) TestLDSWatch_NACKError(t *testing.T) { // receives both good update and error without a new resource request being // sent to the management server. func (s) TestLDSWatch_ResourceCaching_NACKError(t *testing.T) { - firstRequestReceived := false - firstAckReceived := grpcsync.NewEvent() - secondRequestReceived := grpcsync.NewEvent() - - mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{ - OnStreamRequest: func(id int64, req *v3discoverypb.DiscoveryRequest) error { - // The first request has an empty version string. - if !firstRequestReceived && req.GetVersionInfo() == "" { - firstRequestReceived = true - return nil - } - // The first ack has a non-empty version string. - if !firstAckReceived.HasFired() && req.GetVersionInfo() != "" { - firstAckReceived.Fire() - return nil - } - // If the request version remains "1" while the nonce keeps - // increasing, it indicates the client is repeatedly NACKing - // updates from the server but not sending any new resource - // request. - if req.GetVersionInfo() == "1" { - return nil - } - // Any requests after the first request and ack, are not expected. - secondRequestReceived.Fire() - return nil - }, - }) + mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{}) nodeID := uuid.New().String() bc := e2e.DefaultBootstrapContents(t, nodeID, mgmtServer.Address) @@ -1101,11 +1074,6 @@ func (s) TestLDSWatch_ResourceCaching_NACKError(t *testing.T) { if err := verifyListenerUpdate(ctx, lw1.updateCh, wantUpdate); err != nil { t.Fatal(err) } - select { - case <-ctx.Done(): - t.Fatal("timeout when waiting for receipt of ACK at the management server") - case <-firstAckReceived.Done(): - } // Configure the management server to return a single listener resource // which is expected to be NACKed by the client. @@ -1129,20 +1097,12 @@ func (s) TestLDSWatch_ResourceCaching_NACKError(t *testing.T) { // Register another watch for the same resource. This should get the update // and error from the cache. - lw2 := newListenerWatcherMultiple() + lw2 := newListenerWatcherMultiple(2) ldsCancel2 := xdsresource.WatchListener(client, ldsName, lw2) defer ldsCancel2() if err := verifyListenerUpdate(ctx, lw2.updateCh, wantUpdate); err != nil { t.Fatal(err) } - // No request should get sent out as part of this watch. - sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout) - defer sCancel() - select { - case <-sCtx.Done(): - case <-secondRequestReceived.Done(): - t.Fatal("xdsClient sent out request instead of using update from cache") - } u, err = lw2.updateCh.Receive(ctx) if err != nil { t.Fatalf("timeout when waiting for a listener resource from the management server: %v", err) From 757053e8efa4b892fe3332ad334a906fe4efec5c Mon Sep 17 00:00:00 2001 From: Purnesh Dixit Date: Tue, 19 Nov 2024 19:54:58 +0530 Subject: [PATCH 04/10] move nack error out of cache condition, use same test listener watcher, add cache check --- xds/internal/xdsclient/authority.go | 10 +-- .../xdsclient/tests/lds_watchers_test.go | 79 +++++++++++-------- 2 files changed, 50 insertions(+), 39 deletions(-) diff --git a/xds/internal/xdsclient/authority.go b/xds/internal/xdsclient/authority.go index b08737f73fa4..b4a13802e6d6 100644 --- a/xds/internal/xdsclient/authority.go +++ b/xds/internal/xdsclient/authority.go @@ -641,11 +641,11 @@ func (a *authority) watchResource(rType xdsresource.Type, resourceName string, w } resource := state.cache a.watcherCallbackSerializer.TrySchedule(func(context.Context) { watcher.OnUpdate(resource, func() {}) }) - // If last update was NACK'd, notify the new watcher of error - // immediately as well. - if state.md.Status == xdsresource.ServiceStatusNACKed && state.md.ErrState != nil { - a.watcherCallbackSerializer.TrySchedule(func(context.Context) { watcher.OnError(state.md.ErrState.Err, func() {}) }) - } + } + // If last update was NACK'd, notify the new watcher of error + // immediately as well. + if state.md.Status == xdsresource.ServiceStatusNACKed && state.md.ErrState != nil { + a.watcherCallbackSerializer.TrySchedule(func(context.Context) { watcher.OnError(state.md.ErrState.Err, func() {}) }) } // If the metadata field is updated to indicate that the management // server does not have this resource, notify the new watcher. diff --git a/xds/internal/xdsclient/tests/lds_watchers_test.go b/xds/internal/xdsclient/tests/lds_watchers_test.go index 75b80e3b7bb9..9f5d2898df37 100644 --- a/xds/internal/xdsclient/tests/lds_watchers_test.go +++ b/xds/internal/xdsclient/tests/lds_watchers_test.go @@ -71,45 +71,34 @@ func newListenerWatcher() *listenerWatcher { return &listenerWatcher{updateCh: testutils.NewChannel()} } -func (cw *listenerWatcher) OnUpdate(update *xdsresource.ListenerResourceData, onDone xdsresource.OnDoneFunc) { - cw.updateCh.Send(listenerUpdateErrTuple{update: update.Resource}) - onDone() +func newListenerWatcherWithSize(size int) *listenerWatcher { + return &listenerWatcher{updateCh: testutils.NewChannelWithSize(size)} } -func (cw *listenerWatcher) OnError(err error, onDone xdsresource.OnDoneFunc) { - cw.updateCh.Replace(listenerUpdateErrTuple{err: err}) +func (lw *listenerWatcher) OnUpdate(update *xdsresource.ListenerResourceData, onDone xdsresource.OnDoneFunc) { + lw.updateCh.Send(listenerUpdateErrTuple{update: update.Resource}) onDone() } -func (cw *listenerWatcher) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) { - cw.updateCh.Replace(listenerUpdateErrTuple{err: xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "Listener not found in received response")}) - onDone() -} - -type listenerWatcherMultiple struct { - updateCh *testutils.Channel -} - -func newListenerWatcherMultiple(size int) *listenerWatcherMultiple { - return &listenerWatcherMultiple{updateCh: testutils.NewChannelWithSize(size)} -} - -func (cw *listenerWatcherMultiple) OnUpdate(update *xdsresource.ListenerResourceData, onDone xdsresource.OnDoneFunc) { - cw.updateCh.Send(listenerUpdateErrTuple{update: update.Resource}) - onDone() -} - -func (cw *listenerWatcherMultiple) OnError(err error, onDone xdsresource.OnDoneFunc) { - // When used with a go-control-plane management server that continuously - // resends resources which are NACKed by the xDS client, using a `Replace()` - // here and in OnResourceDoesNotExist() simplifies tests which will have - // access to the most recently received error. - cw.updateCh.Send(listenerUpdateErrTuple{err: err}) +func (lw *listenerWatcher) OnError(err error, onDone xdsresource.OnDoneFunc) { + if len(lw.updateCh.C) == 1 { + // When used with a go-control-plane management server that continuously + // resends resources which are NACKed by the xDS client, using a `Replace()` + // here and in OnResourceDoesNotExist() simplifies tests which will have + // access to the most recently received error. + lw.updateCh.Replace(listenerUpdateErrTuple{err: err}) + } else { + lw.updateCh.Send(listenerUpdateErrTuple{err: err}) + } onDone() } -func (cw *listenerWatcherMultiple) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) { - cw.updateCh.Send(listenerUpdateErrTuple{err: xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "Listener not found in received response")}) +func (lw *listenerWatcher) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) { + if len(lw.updateCh.C) == 1 { + lw.updateCh.Replace(listenerUpdateErrTuple{err: xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "Listener not found in received response")}) + } else { + lw.updateCh.Send(listenerUpdateErrTuple{err: xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "Listener not found in received response")}) + } onDone() } @@ -1031,7 +1020,20 @@ func (s) TestLDSWatch_NACKError(t *testing.T) { // receives both good update and error without a new resource request being // sent to the management server. func (s) TestLDSWatch_ResourceCaching_NACKError(t *testing.T) { - mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{}) + secondRequestReceived := grpcsync.NewEvent() + + mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{ + OnStreamRequest: func(id int64, req *v3discoverypb.DiscoveryRequest) error { + // If the version is "2", it means that a second request has been + // received (after an initial request and ack). The client should + // not send a second request if the resource is already cached. + if req.GetVersionInfo() == "2" { + secondRequestReceived.Fire() + return nil + } + return nil + }, + }) nodeID := uuid.New().String() bc := e2e.DefaultBootstrapContents(t, nodeID, mgmtServer.Address) @@ -1059,7 +1061,7 @@ func (s) TestLDSWatch_ResourceCaching_NACKError(t *testing.T) { Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(ldsName, rdsName)}, SkipValidation: true, } - ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + ctx, cancel := context.WithTimeout(context.Background(), 1000*defaultTestTimeout) defer cancel() if err := mgmtServer.Update(ctx, resources); err != nil { t.Fatalf("Failed to update management server with resources: %v, err: %v", resources, err) @@ -1097,7 +1099,7 @@ func (s) TestLDSWatch_ResourceCaching_NACKError(t *testing.T) { // Register another watch for the same resource. This should get the update // and error from the cache. - lw2 := newListenerWatcherMultiple(2) + lw2 := newListenerWatcherWithSize(2) ldsCancel2 := xdsresource.WatchListener(client, ldsName, lw2) defer ldsCancel2() if err := verifyListenerUpdate(ctx, lw2.updateCh, wantUpdate); err != nil { @@ -1111,6 +1113,15 @@ func (s) TestLDSWatch_ResourceCaching_NACKError(t *testing.T) { if gotErr == nil || !strings.Contains(gotErr.Error(), wantListenerNACKErr) { t.Fatalf("update received with error: %v, want %q", gotErr, wantListenerNACKErr) } + // No request should get sent out as part of this watch. + sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout) + defer sCancel() + select { + case <-sCtx.Done(): + case <-secondRequestReceived.Done(): + t.Fatal("xdsClient sent out request instead of using update from cache") + default: + } } // TestLDSWatch_PartialValid covers the case where a response from the From 357044923635af7102caaafbdf0995f873fa3bb6 Mon Sep 17 00:00:00 2001 From: Purnesh Dixit Date: Tue, 19 Nov 2024 20:23:17 +0530 Subject: [PATCH 05/10] Add test case for new watcher getting nack while registering --- .../xdsclient/tests/lds_watchers_test.go | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/xds/internal/xdsclient/tests/lds_watchers_test.go b/xds/internal/xdsclient/tests/lds_watchers_test.go index 9f5d2898df37..4c5a9efe892d 100644 --- a/xds/internal/xdsclient/tests/lds_watchers_test.go +++ b/xds/internal/xdsclient/tests/lds_watchers_test.go @@ -966,7 +966,8 @@ func (s) TestLDSWatch_NewWatcherForRemovedResource(t *testing.T) { // TestLDSWatch_NACKError covers the case where an update from the management // server is NACK'ed by the xdsclient. The test verifies that the error is -// propagated to the watcher. +// propagated to the existing watcher. After NACK, If a new watcher registers +// for the resource, error is propagated to the new watcher as well. func (s) TestLDSWatch_NACKError(t *testing.T) { mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{}) @@ -1012,6 +1013,19 @@ func (s) TestLDSWatch_NACKError(t *testing.T) { if gotErr == nil || !strings.Contains(gotErr.Error(), wantListenerNACKErr) { t.Fatalf("update received with error: %v, want %q", gotErr, wantListenerNACKErr) } + + // Verify that the expected error is propagated to the new watcher as well. + lw2 := newListenerWatcher() + ldsCancel2 := xdsresource.WatchListener(client, ldsName, lw2) + defer ldsCancel2() + u, err = lw2.updateCh.Receive(ctx) + if err != nil { + t.Fatalf("timeout when waiting for a listener resource from the management server: %v", err) + } + gotErr = u.(listenerUpdateErrTuple).err + if gotErr == nil || !strings.Contains(gotErr.Error(), wantListenerNACKErr) { + t.Fatalf("update received with error: %v, want %q", gotErr, wantListenerNACKErr) + } } // TestLDSWatch_ResourceCaching_WithNACKError covers the case where a watch is From 777e12b7f0c76d7f12676224f91cb36edc51d892 Mon Sep 17 00:00:00 2001 From: Purnesh Dixit Date: Tue, 19 Nov 2024 23:10:49 +0530 Subject: [PATCH 06/10] remove second request verification check --- xds/internal/xdsclient/tests/lds_watchers_test.go | 9 --------- 1 file changed, 9 deletions(-) diff --git a/xds/internal/xdsclient/tests/lds_watchers_test.go b/xds/internal/xdsclient/tests/lds_watchers_test.go index 4c5a9efe892d..859a14d2e370 100644 --- a/xds/internal/xdsclient/tests/lds_watchers_test.go +++ b/xds/internal/xdsclient/tests/lds_watchers_test.go @@ -1127,15 +1127,6 @@ func (s) TestLDSWatch_ResourceCaching_NACKError(t *testing.T) { if gotErr == nil || !strings.Contains(gotErr.Error(), wantListenerNACKErr) { t.Fatalf("update received with error: %v, want %q", gotErr, wantListenerNACKErr) } - // No request should get sent out as part of this watch. - sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout) - defer sCancel() - select { - case <-sCtx.Done(): - case <-secondRequestReceived.Done(): - t.Fatal("xdsClient sent out request instead of using update from cache") - default: - } } // TestLDSWatch_PartialValid covers the case where a response from the From 9ca1ed39c6ee7132dfd1e6c412ecf05440f4b37f Mon Sep 17 00:00:00 2001 From: Purnesh Dixit Date: Tue, 19 Nov 2024 23:47:23 +0530 Subject: [PATCH 07/10] separate watcher struct --- .../xdsclient/tests/lds_watchers_test.go | 61 ++++++++++++------- 1 file changed, 40 insertions(+), 21 deletions(-) diff --git a/xds/internal/xdsclient/tests/lds_watchers_test.go b/xds/internal/xdsclient/tests/lds_watchers_test.go index 859a14d2e370..040985e7fd7c 100644 --- a/xds/internal/xdsclient/tests/lds_watchers_test.go +++ b/xds/internal/xdsclient/tests/lds_watchers_test.go @@ -71,34 +71,45 @@ func newListenerWatcher() *listenerWatcher { return &listenerWatcher{updateCh: testutils.NewChannel()} } -func newListenerWatcherWithSize(size int) *listenerWatcher { - return &listenerWatcher{updateCh: testutils.NewChannelWithSize(size)} -} - func (lw *listenerWatcher) OnUpdate(update *xdsresource.ListenerResourceData, onDone xdsresource.OnDoneFunc) { lw.updateCh.Send(listenerUpdateErrTuple{update: update.Resource}) onDone() } func (lw *listenerWatcher) OnError(err error, onDone xdsresource.OnDoneFunc) { - if len(lw.updateCh.C) == 1 { - // When used with a go-control-plane management server that continuously - // resends resources which are NACKed by the xDS client, using a `Replace()` - // here and in OnResourceDoesNotExist() simplifies tests which will have - // access to the most recently received error. - lw.updateCh.Replace(listenerUpdateErrTuple{err: err}) - } else { - lw.updateCh.Send(listenerUpdateErrTuple{err: err}) - } + // When used with a go-control-plane management server that continuously + // resends resources which are NACKed by the xDS client, using a `Replace()` + // here and in OnResourceDoesNotExist() simplifies tests which will have + // access to the most recently received error. + lw.updateCh.Replace(listenerUpdateErrTuple{err: err}) onDone() } func (lw *listenerWatcher) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) { - if len(lw.updateCh.C) == 1 { - lw.updateCh.Replace(listenerUpdateErrTuple{err: xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "Listener not found in received response")}) - } else { - lw.updateCh.Send(listenerUpdateErrTuple{err: xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "Listener not found in received response")}) - } + lw.updateCh.Replace(listenerUpdateErrTuple{err: xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "Listener not found in received response")}) + onDone() +} + +type listenerWatcherMultiple struct { + updateCh *testutils.Channel +} + +func newListenerWatcherMultiple(size int) *listenerWatcherMultiple { + return &listenerWatcherMultiple{updateCh: testutils.NewChannelWithSize(size)} +} + +func (lw *listenerWatcherMultiple) OnUpdate(update *xdsresource.ListenerResourceData, onDone xdsresource.OnDoneFunc) { + lw.updateCh.Send(listenerUpdateErrTuple{update: update.Resource}) + onDone() +} + +func (lw *listenerWatcherMultiple) OnError(err error, onDone xdsresource.OnDoneFunc) { + lw.updateCh.Send(listenerUpdateErrTuple{err: err}) + onDone() +} + +func (lw *listenerWatcherMultiple) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) { + lw.updateCh.Send(listenerUpdateErrTuple{err: xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "Listener not found in received response")}) onDone() } @@ -1030,7 +1041,7 @@ func (s) TestLDSWatch_NACKError(t *testing.T) { // TestLDSWatch_ResourceCaching_WithNACKError covers the case where a watch is // registered for a resource which is already present in the cache with an old -// good update as well latest NACK error. The test verifies that new watcher +// good update as well as latest NACK error. The test verifies that new watcher // receives both good update and error without a new resource request being // sent to the management server. func (s) TestLDSWatch_ResourceCaching_NACKError(t *testing.T) { @@ -1051,7 +1062,6 @@ func (s) TestLDSWatch_ResourceCaching_NACKError(t *testing.T) { nodeID := uuid.New().String() bc := e2e.DefaultBootstrapContents(t, nodeID, mgmtServer.Address) - testutils.CreateBootstrapFileForTesting(t, bc) // Create an xDS client with the above bootstrap contents. client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{ @@ -1113,7 +1123,7 @@ func (s) TestLDSWatch_ResourceCaching_NACKError(t *testing.T) { // Register another watch for the same resource. This should get the update // and error from the cache. - lw2 := newListenerWatcherWithSize(2) + lw2 := newListenerWatcherMultiple(2) ldsCancel2 := xdsresource.WatchListener(client, ldsName, lw2) defer ldsCancel2() if err := verifyListenerUpdate(ctx, lw2.updateCh, wantUpdate); err != nil { @@ -1127,6 +1137,15 @@ func (s) TestLDSWatch_ResourceCaching_NACKError(t *testing.T) { if gotErr == nil || !strings.Contains(gotErr.Error(), wantListenerNACKErr) { t.Fatalf("update received with error: %v, want %q", gotErr, wantListenerNACKErr) } + // No request should get sent out as part of this watch. + sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout) + defer sCancel() + select { + case <-sCtx.Done(): + case <-secondRequestReceived.Done(): + t.Fatal("xdsClient sent out request instead of using update from cache") + default: + } } // TestLDSWatch_PartialValid covers the case where a response from the From 65d5760d58c778ac38a57102ac424d386706d3f8 Mon Sep 17 00:00:00 2001 From: Purnesh Dixit Date: Wed, 20 Nov 2024 14:27:28 +0530 Subject: [PATCH 08/10] address nits, verify caching without version number --- xds/internal/xdsclient/authority.go | 5 +- .../xdsclient/tests/lds_watchers_test.go | 48 ++++++++++++------- 2 files changed, 36 insertions(+), 17 deletions(-) diff --git a/xds/internal/xdsclient/authority.go b/xds/internal/xdsclient/authority.go index b4a13802e6d6..24673a8d9077 100644 --- a/xds/internal/xdsclient/authority.go +++ b/xds/internal/xdsclient/authority.go @@ -644,7 +644,10 @@ func (a *authority) watchResource(rType xdsresource.Type, resourceName string, w } // If last update was NACK'd, notify the new watcher of error // immediately as well. - if state.md.Status == xdsresource.ServiceStatusNACKed && state.md.ErrState != nil { + if state.md.Status == xdsresource.ServiceStatusNACKed { + if a.logger.V(2) { + a.logger.Infof("Resource type %q with resource name %q was NACKed: %s", rType.TypeName(), resourceName, state.cache.ToJSON()) + } a.watcherCallbackSerializer.TrySchedule(func(context.Context) { watcher.OnError(state.md.ErrState.Err, func() {}) }) } // If the metadata field is updated to indicate that the management diff --git a/xds/internal/xdsclient/tests/lds_watchers_test.go b/xds/internal/xdsclient/tests/lds_watchers_test.go index 040985e7fd7c..99772dd1bafb 100644 --- a/xds/internal/xdsclient/tests/lds_watchers_test.go +++ b/xds/internal/xdsclient/tests/lds_watchers_test.go @@ -976,8 +976,8 @@ func (s) TestLDSWatch_NewWatcherForRemovedResource(t *testing.T) { } // TestLDSWatch_NACKError covers the case where an update from the management -// server is NACK'ed by the xdsclient. The test verifies that the error is -// propagated to the existing watcher. After NACK, If a new watcher registers +// server is NACKed by the xdsclient. The test verifies that the error is +// propagated to the existing watcher. After NACK, if a new watcher registers // for the resource, error is propagated to the new watcher as well. func (s) TestLDSWatch_NACKError(t *testing.T) { mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{}) @@ -1018,11 +1018,11 @@ func (s) TestLDSWatch_NACKError(t *testing.T) { // Verify that the expected error is propagated to the watcher. u, err := lw.updateCh.Receive(ctx) if err != nil { - t.Fatalf("timeout when waiting for a listener resource from the management server: %v", err) + t.Fatalf("Timeout when waiting for a listener resource from the management server: %v", err) } gotErr := u.(listenerUpdateErrTuple).err if gotErr == nil || !strings.Contains(gotErr.Error(), wantListenerNACKErr) { - t.Fatalf("update received with error: %v, want %q", gotErr, wantListenerNACKErr) + t.Fatalf("Update received with error: %v, want %q", gotErr, wantListenerNACKErr) } // Verify that the expected error is propagated to the new watcher as well. @@ -1031,11 +1031,11 @@ func (s) TestLDSWatch_NACKError(t *testing.T) { defer ldsCancel2() u, err = lw2.updateCh.Receive(ctx) if err != nil { - t.Fatalf("timeout when waiting for a listener resource from the management server: %v", err) + t.Fatalf("Timeout when waiting for a listener resource from the management server: %v", err) } gotErr = u.(listenerUpdateErrTuple).err if gotErr == nil || !strings.Contains(gotErr.Error(), wantListenerNACKErr) { - t.Fatalf("update received with error: %v, want %q", gotErr, wantListenerNACKErr) + t.Fatalf("Update received with error: %v, want %q", gotErr, wantListenerNACKErr) } } @@ -1044,18 +1044,31 @@ func (s) TestLDSWatch_NACKError(t *testing.T) { // good update as well as latest NACK error. The test verifies that new watcher // receives both good update and error without a new resource request being // sent to the management server. -func (s) TestLDSWatch_ResourceCaching_NACKError(t *testing.T) { +func TestLDSWatch_ResourceCaching_NACKError(t *testing.T) { + firstRequestReceived := false + firstAckReceived := grpcsync.NewEvent() + secondAckReceived := grpcsync.NewEvent() secondRequestReceived := grpcsync.NewEvent() mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{ OnStreamRequest: func(id int64, req *v3discoverypb.DiscoveryRequest) error { - // If the version is "2", it means that a second request has been - // received (after an initial request and ack). The client should - // not send a second request if the resource is already cached. - if req.GetVersionInfo() == "2" { - secondRequestReceived.Fire() + // The first request has an empty version string. + if !firstRequestReceived && req.GetVersionInfo() == "" { + firstRequestReceived = true return nil } + // The first ack has a non-empty version string. + if !firstAckReceived.HasFired() && req.GetVersionInfo() != "" { + firstAckReceived.Fire() + return nil + } + // The second ack has a non-empty version string. + if !secondAckReceived.HasFired() && req.GetVersionInfo() != "" { + secondAckReceived.Fire() + return nil + } + // Any requests after the first request and two acks, are not expected. + secondRequestReceived.Fire() return nil }, }) @@ -1078,6 +1091,7 @@ func (s) TestLDSWatch_ResourceCaching_NACKError(t *testing.T) { lw1 := newListenerWatcher() ldsCancel1 := xdsresource.WatchListener(client, ldsName, lw1) defer ldsCancel1() + // Configure the management server to return a single listener // resource, corresponding to the one we registered a watch for. resources := e2e.UpdateOptions{ @@ -1090,6 +1104,7 @@ func (s) TestLDSWatch_ResourceCaching_NACKError(t *testing.T) { if err := mgmtServer.Update(ctx, resources); err != nil { t.Fatalf("Failed to update management server with resources: %v, err: %v", resources, err) } + // Verify the contents of the received update. wantUpdate := listenerUpdateErrTuple{ update: xdsresource.ListenerUpdate{ @@ -1111,6 +1126,7 @@ func (s) TestLDSWatch_ResourceCaching_NACKError(t *testing.T) { if err := mgmtServer.Update(ctx, resources); err != nil { t.Fatalf("Failed to update management server with resources: %v, err: %v", resources, err) } + // Verify that the expected error is propagated to the existing watcher. u, err := lw1.updateCh.Receive(ctx) if err != nil { @@ -1118,7 +1134,7 @@ func (s) TestLDSWatch_ResourceCaching_NACKError(t *testing.T) { } gotErr := u.(listenerUpdateErrTuple).err if gotErr == nil || !strings.Contains(gotErr.Error(), wantListenerNACKErr) { - t.Fatalf("update received with error: %v, want %q", gotErr, wantListenerNACKErr) + t.Fatalf("Update received with error: %v, want %q", gotErr, wantListenerNACKErr) } // Register another watch for the same resource. This should get the update @@ -1131,11 +1147,11 @@ func (s) TestLDSWatch_ResourceCaching_NACKError(t *testing.T) { } u, err = lw2.updateCh.Receive(ctx) if err != nil { - t.Fatalf("timeout when waiting for a listener resource from the management server: %v", err) + t.Fatalf("Timeout when waiting for a listener resource from the management server: %v", err) } gotErr = u.(listenerUpdateErrTuple).err if gotErr == nil || !strings.Contains(gotErr.Error(), wantListenerNACKErr) { - t.Fatalf("update received with error: %v, want %q", gotErr, wantListenerNACKErr) + t.Fatalf("Update received with error: %v, want %q", gotErr, wantListenerNACKErr) } // No request should get sent out as part of this watch. sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout) @@ -1150,7 +1166,7 @@ func (s) TestLDSWatch_ResourceCaching_NACKError(t *testing.T) { // TestLDSWatch_PartialValid covers the case where a response from the // management server contains both valid and invalid resources and is expected -// to be NACK'ed by the xdsclient. The test verifies that watchers corresponding +// to be NACKed by the xdsclient. The test verifies that watchers corresponding // to the valid resource receive the update, while watchers corresponding to the // invalid resource receive an error. func (s) TestLDSWatch_PartialValid(t *testing.T) { From 0e1bc31eb20e04c966839635e3394635e2e07062 Mon Sep 17 00:00:00 2001 From: Purnesh Dixit Date: Thu, 21 Nov 2024 17:14:44 +0530 Subject: [PATCH 09/10] helper function for unknown errors --- .../xdsclient/tests/lds_watchers_test.go | 64 +++++++++---------- 1 file changed, 29 insertions(+), 35 deletions(-) diff --git a/xds/internal/xdsclient/tests/lds_watchers_test.go b/xds/internal/xdsclient/tests/lds_watchers_test.go index 99772dd1bafb..f4c012c4b344 100644 --- a/xds/internal/xdsclient/tests/lds_watchers_test.go +++ b/xds/internal/xdsclient/tests/lds_watchers_test.go @@ -94,6 +94,8 @@ type listenerWatcherMultiple struct { updateCh *testutils.Channel } +// TODO: delete this once `newListenerWatcher` is modified to handle multiple +// updates. func newListenerWatcherMultiple(size int) *listenerWatcherMultiple { return &listenerWatcherMultiple{updateCh: testutils.NewChannelWithSize(size)} } @@ -178,6 +180,18 @@ func verifyListenerUpdate(ctx context.Context, updateCh *testutils.Channel, want return nil } +func verifyUnknownListenerError(ctx context.Context, updateCh *testutils.Channel, wantErr string) error { + u, err := updateCh.Receive(ctx) + if err != nil { + return fmt.Errorf("timeout when waiting for a listener error from the management server: %v", err) + } + gotErr := u.(listenerUpdateErrTuple).err + if gotErr == nil || !strings.Contains(gotErr.Error(), wantErr) { + return fmt.Errorf("update received with error: %v, want %q", gotErr, wantErr) + } + return nil +} + // TestLDSWatch covers the case where a single watcher exists for a single // listener resource. The test verifies the following scenarios: // 1. An update from the management server containing the resource being @@ -1016,26 +1030,18 @@ func (s) TestLDSWatch_NACKError(t *testing.T) { } // Verify that the expected error is propagated to the watcher. - u, err := lw.updateCh.Receive(ctx) - if err != nil { - t.Fatalf("Timeout when waiting for a listener resource from the management server: %v", err) - } - gotErr := u.(listenerUpdateErrTuple).err - if gotErr == nil || !strings.Contains(gotErr.Error(), wantListenerNACKErr) { - t.Fatalf("Update received with error: %v, want %q", gotErr, wantListenerNACKErr) + // Verify that the expected error is propagated to the existing watcher. + if err := verifyUnknownListenerError(ctx, lw.updateCh, wantListenerNACKErr); err != nil { + t.Fatal(err) } // Verify that the expected error is propagated to the new watcher as well. lw2 := newListenerWatcher() ldsCancel2 := xdsresource.WatchListener(client, ldsName, lw2) defer ldsCancel2() - u, err = lw2.updateCh.Receive(ctx) - if err != nil { - t.Fatalf("Timeout when waiting for a listener resource from the management server: %v", err) - } - gotErr = u.(listenerUpdateErrTuple).err - if gotErr == nil || !strings.Contains(gotErr.Error(), wantListenerNACKErr) { - t.Fatalf("Update received with error: %v, want %q", gotErr, wantListenerNACKErr) + // Verify that the expected error is propagated to the existing watcher. + if err := verifyUnknownListenerError(ctx, lw2.updateCh, wantListenerNACKErr); err != nil { + t.Fatal(err) } } @@ -1128,13 +1134,8 @@ func TestLDSWatch_ResourceCaching_NACKError(t *testing.T) { } // Verify that the expected error is propagated to the existing watcher. - u, err := lw1.updateCh.Receive(ctx) - if err != nil { - t.Fatalf("timeout when waiting for a listener resource from the management server: %v", err) - } - gotErr := u.(listenerUpdateErrTuple).err - if gotErr == nil || !strings.Contains(gotErr.Error(), wantListenerNACKErr) { - t.Fatalf("Update received with error: %v, want %q", gotErr, wantListenerNACKErr) + if err := verifyUnknownListenerError(ctx, lw1.updateCh, wantListenerNACKErr); err != nil { + t.Fatal(err) } // Register another watch for the same resource. This should get the update @@ -1145,14 +1146,11 @@ func TestLDSWatch_ResourceCaching_NACKError(t *testing.T) { if err := verifyListenerUpdate(ctx, lw2.updateCh, wantUpdate); err != nil { t.Fatal(err) } - u, err = lw2.updateCh.Receive(ctx) - if err != nil { - t.Fatalf("Timeout when waiting for a listener resource from the management server: %v", err) - } - gotErr = u.(listenerUpdateErrTuple).err - if gotErr == nil || !strings.Contains(gotErr.Error(), wantListenerNACKErr) { - t.Fatalf("Update received with error: %v, want %q", gotErr, wantListenerNACKErr) + // Verify that the expected error is propagated to the existing watcher. + if err := verifyUnknownListenerError(ctx, lw2.updateCh, wantListenerNACKErr); err != nil { + t.Fatal(err) } + // No request should get sent out as part of this watch. sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout) defer sCancel() @@ -1233,13 +1231,9 @@ func (s) TestLDSWatch_PartialValid(t *testing.T) { // Verify that the expected error is propagated to the watcher which // requested for the bad resource. - u, err := lw1.updateCh.Receive(ctx) - if err != nil { - t.Fatalf("timeout when waiting for a listener resource from the management server: %v", err) - } - gotErr := u.(listenerUpdateErrTuple).err - if gotErr == nil || !strings.Contains(gotErr.Error(), wantListenerNACKErr) { - t.Fatalf("update received with error: %v, want %q", gotErr, wantListenerNACKErr) + // Verify that the expected error is propagated to the existing watcher. + if err := verifyUnknownListenerError(ctx, lw1.updateCh, wantListenerNACKErr); err != nil { + t.Fatal(err) } // Verify that the watcher watching the good resource receives a good From 742da1bfcd4743dd41d0cee3ae83de073975138f Mon Sep 17 00:00:00 2001 From: Purnesh Dixit Date: Fri, 22 Nov 2024 00:51:43 +0530 Subject: [PATCH 10/10] link issue for todo --- xds/internal/xdsclient/tests/lds_watchers_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/xds/internal/xdsclient/tests/lds_watchers_test.go b/xds/internal/xdsclient/tests/lds_watchers_test.go index f4c012c4b344..7b49b9b17b74 100644 --- a/xds/internal/xdsclient/tests/lds_watchers_test.go +++ b/xds/internal/xdsclient/tests/lds_watchers_test.go @@ -95,7 +95,7 @@ type listenerWatcherMultiple struct { } // TODO: delete this once `newListenerWatcher` is modified to handle multiple -// updates. +// updates (https://github.com/grpc/grpc-go/issues/7864). func newListenerWatcherMultiple(size int) *listenerWatcherMultiple { return &listenerWatcherMultiple{updateCh: testutils.NewChannelWithSize(size)} }