From 439150f1f48ce8960e701a107fd9b7855b5f5906 Mon Sep 17 00:00:00 2001 From: Easwar Swaminathan Date: Mon, 12 Aug 2024 15:43:06 +0000 Subject: [PATCH 1/7] csds: unskip e2e test --- xds/csds/csds_e2e_test.go | 164 ++++++++++++++++++++++++++++---------- 1 file changed, 120 insertions(+), 44 deletions(-) diff --git a/xds/csds/csds_e2e_test.go b/xds/csds/csds_e2e_test.go index d5611ca97d5e..edd0fa478ba9 100644 --- a/xds/csds/csds_e2e_test.go +++ b/xds/csds/csds_e2e_test.go @@ -22,7 +22,6 @@ import ( "context" "fmt" "io" - "runtime" "slices" "strings" "testing" @@ -65,56 +64,113 @@ func Test(t *testing.T) { grpctest.RunSubTests(t, s{}) } -// The following watcher implementations are no-ops since we don't really care -// about the callback received by these watchers in the test. We only care -// whether CSDS reports the expected state. +// The following watcher implementations are no-ops in the sense that they don't +// validate or use the received updates from the xDS client. In this test, we +// only care whether CSDS reports the expected state. +// +// Note: These watchers do use the ADS stream level flow control supported by +// the xDS client. The watch callbacks block on a wait channel that is written +// to by the test, before invoking the onDone callback. This is particularly +// useful when a resource is NACKed, because the go-control-plane management +// server continuously resends the same resource in this case, and applying flow +// control from these watchers ensures that xDS client does not spend all of its +// time receiving and NACKing updates from the maangement server. This was +// indeed the case on arm64 (before we had support for ADS stream level flow +// control), and was causing CSDS to not receive any updates in this case. + +func blockAndDone(testDoneCh <-chan struct{}, waitCh chan struct{}, onDone xdsresource.DoneNotifier) { + select { + case <-testDoneCh: + case <-waitCh: + onDone.OnDone() + } +} -type unimplementedListenerWatcher struct{} +type blockingListenerWatcher struct { + testDoneCh <-chan struct{} // Closed when the test is done. + waitCh chan struct{} // Written to by the test to unblock the watch callback. +} + +func newBlockingListenerWatcher(doneCh <-chan struct{}) *blockingListenerWatcher { + return &blockingListenerWatcher{ + testDoneCh: doneCh, + waitCh: make(chan struct{}, 1), + } +} -func (unimplementedListenerWatcher) OnUpdate(_ *xdsresource.ListenerResourceData, onDone xdsresource.DoneNotifier) { - onDone.OnDone() +func (w *blockingListenerWatcher) OnUpdate(_ *xdsresource.ListenerResourceData, onDone xdsresource.DoneNotifier) { + blockAndDone(w.testDoneCh, w.waitCh, onDone) } -func (unimplementedListenerWatcher) OnError(_ error, onDone xdsresource.DoneNotifier) { - onDone.OnDone() +func (w *blockingListenerWatcher) OnError(_ error, onDone xdsresource.DoneNotifier) { + blockAndDone(w.testDoneCh, w.waitCh, onDone) } -func (unimplementedListenerWatcher) OnResourceDoesNotExist(onDone xdsresource.DoneNotifier) { - onDone.OnDone() +func (w *blockingListenerWatcher) OnResourceDoesNotExist(onDone xdsresource.DoneNotifier) { + blockAndDone(w.testDoneCh, w.waitCh, onDone) } -type unimplementedRouteConfigWatcher struct{} +type blockingRouteConfigWatcher struct { + testDoneCh <-chan struct{} // Closed when the test is done. + waitCh chan struct{} // Written to by the test to unblock the watch callback. +} + +func newBlockingRouteConfigWatcher(doneCh <-chan struct{}) *blockingRouteConfigWatcher { + return &blockingRouteConfigWatcher{ + testDoneCh: doneCh, + waitCh: make(chan struct{}, 1), + } +} -func (unimplementedRouteConfigWatcher) OnUpdate(_ *xdsresource.RouteConfigResourceData, onDone xdsresource.DoneNotifier) { - onDone.OnDone() +func (w *blockingRouteConfigWatcher) OnUpdate(_ *xdsresource.RouteConfigResourceData, onDone xdsresource.DoneNotifier) { + blockAndDone(w.testDoneCh, w.waitCh, onDone) } -func (unimplementedRouteConfigWatcher) OnError(_ error, onDone xdsresource.DoneNotifier) { - onDone.OnDone() +func (w *blockingRouteConfigWatcher) OnError(_ error, onDone xdsresource.DoneNotifier) { + blockAndDone(w.testDoneCh, w.waitCh, onDone) } -func (unimplementedRouteConfigWatcher) OnResourceDoesNotExist(onDone xdsresource.DoneNotifier) { - onDone.OnDone() +func (w *blockingRouteConfigWatcher) OnResourceDoesNotExist(onDone xdsresource.DoneNotifier) { + blockAndDone(w.testDoneCh, w.waitCh, onDone) } -type unimplementedClusterWatcher struct{} +type blockingClusterWatcher struct { + testDoneCh <-chan struct{} // Closed when the test is done. + waitCh chan struct{} // Written to by the test to unblock the watch callback. +} -func (unimplementedClusterWatcher) OnUpdate(_ *xdsresource.ClusterResourceData, onDone xdsresource.DoneNotifier) { - onDone.OnDone() +func newBlockingClusterWatcher(doneCh <-chan struct{}) *blockingClusterWatcher { + return &blockingClusterWatcher{ + testDoneCh: doneCh, + waitCh: make(chan struct{}, 1), + } +} +func (w *blockingClusterWatcher) OnUpdate(_ *xdsresource.ClusterResourceData, onDone xdsresource.DoneNotifier) { + blockAndDone(w.testDoneCh, w.waitCh, onDone) } -func (unimplementedClusterWatcher) OnError(_ error, onDone xdsresource.DoneNotifier) { - onDone.OnDone() +func (w *blockingClusterWatcher) OnError(_ error, onDone xdsresource.DoneNotifier) { + blockAndDone(w.testDoneCh, w.waitCh, onDone) } -func (unimplementedClusterWatcher) OnResourceDoesNotExist(onDone xdsresource.DoneNotifier) { - onDone.OnDone() +func (w *blockingClusterWatcher) OnResourceDoesNotExist(onDone xdsresource.DoneNotifier) { + blockAndDone(w.testDoneCh, w.waitCh, onDone) } -type unimplementedEndpointsWatcher struct{} +type blockingEndpointsWatcher struct { + testDoneCh <-chan struct{} // Closed when the test is done. + waitCh chan struct{} // Written to by the test to unblock the watch callback. +} + +func newBlockingEndpointsWatcher(doneCh <-chan struct{}) *blockingEndpointsWatcher { + return &blockingEndpointsWatcher{ + testDoneCh: doneCh, + waitCh: make(chan struct{}, 1), + } +} -func (unimplementedEndpointsWatcher) OnUpdate(_ *xdsresource.EndpointsResourceData, onDone xdsresource.DoneNotifier) { - onDone.OnDone() +func (w *blockingEndpointsWatcher) OnUpdate(_ *xdsresource.EndpointsResourceData, onDone xdsresource.DoneNotifier) { + blockAndDone(w.testDoneCh, w.waitCh, onDone) } -func (unimplementedEndpointsWatcher) OnError(_ error, onDone xdsresource.DoneNotifier) { - onDone.OnDone() +func (w *blockingEndpointsWatcher) OnError(_ error, onDone xdsresource.DoneNotifier) { + blockAndDone(w.testDoneCh, w.waitCh, onDone) } -func (unimplementedEndpointsWatcher) OnResourceDoesNotExist(onDone xdsresource.DoneNotifier) { - onDone.OnDone() +func (w *blockingEndpointsWatcher) OnResourceDoesNotExist(onDone xdsresource.DoneNotifier) { + blockAndDone(w.testDoneCh, w.waitCh, onDone) } // Creates a gRPC server and starts serving a CSDS service implementation on it. @@ -163,6 +219,22 @@ func startCSDSClientStream(ctx context.Context, t *testing.T, serverAddr string) return stream } +// Unblock the resource watchers. Since we use the same resource watchers +// for both xDS clients and each of them watches two resources of each type, +// we need to write to their wait channel four times. Also, we have to spawn +// goroutines to write to these channels because we have no control over the +// order of invocation of resource's watch callbacks. +func unblockResourceWatchers(t *testing.T, listenerWatcherWaitCh, routeConfigWatcherWaitCh, clusterWatcherWaitCh, endpointsWatcherWaitCh chan struct{}) { + t.Helper() + for _, w := range []chan struct{}{listenerWatcherWaitCh, routeConfigWatcherWaitCh, clusterWatcherWaitCh, endpointsWatcherWaitCh} { + go func(w chan struct{}) { + for i := 0; i < 4; i++ { + w <- struct{}{} + } + }(w) + } +} + // Tests CSDS functionality. The test performs the following: // - Spins up a management server and creates two xDS clients talking to it. // - Registers a set of watches on the xDS clients, and verifies that the CSDS @@ -177,14 +249,6 @@ func startCSDSClientStream(ctx context.Context, t *testing.T, serverAddr string) // For all of the above operations, the test also verifies that the client_scope // field in the CSDS response is populated appropriately. func (s) TestCSDS(t *testing.T) { - // TODO(easwars): Once https://github.com/grpc/grpc/issues/34099 is fixed - // for grpc-go, use resource watchers which are able to control how much we - // read from the management server, and thereby allowing this test to not - // starve in the presence of constant updates from the management server. - if runtime.GOARCH == "arm64" { - t.Skip("Skipping test on arm64 due to https://github.com/envoyproxy/go-control-plane/issues/962") - } - // Spin up a xDS management server on a local port. mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{}) @@ -274,19 +338,25 @@ func (s) TestCSDS(t *testing.T) { endpointAnys[i] = testutils.MarshalAny(t, endpoints[i]) } + // Create the blocking resource watchers. + listenerWatcher := newBlockingListenerWatcher(ctx.Done()) + routeConfigWatcher := newBlockingRouteConfigWatcher(ctx.Done()) + clusterWatcher := newBlockingClusterWatcher(ctx.Done()) + endpointsWatcher := newBlockingEndpointsWatcher(ctx.Done()) + // Register watches on the xDS clients for two resources of each type. for _, xdsC := range []xdsclient.XDSClient{xdsClient1, xdsClient2} { for _, target := range ldsTargets { - xdsresource.WatchListener(xdsC, target, unimplementedListenerWatcher{}) + xdsresource.WatchListener(xdsC, target, listenerWatcher) } for _, target := range rdsTargets { - xdsresource.WatchRouteConfig(xdsC, target, unimplementedRouteConfigWatcher{}) + xdsresource.WatchRouteConfig(xdsC, target, routeConfigWatcher) } for _, target := range cdsTargets { - xdsresource.WatchCluster(xdsC, target, unimplementedClusterWatcher{}) + xdsresource.WatchCluster(xdsC, target, clusterWatcher) } for _, target := range edsTargets { - xdsresource.WatchEndpoints(xdsC, target, unimplementedEndpointsWatcher{}) + xdsresource.WatchEndpoints(xdsC, target, endpointsWatcher) } } @@ -332,6 +402,9 @@ func (s) TestCSDS(t *testing.T) { t.Fatal(err) } + // Unblock the resource watchers. + unblockResourceWatchers(t, listenerWatcher.waitCh, routeConfigWatcher.waitCh, clusterWatcher.waitCh, endpointsWatcher.waitCh) + // Verify that the xDS client reports the resources as being in "ACKed" // state, and in version "1". wantConfigs = []*v3statuspb.ClientConfig_GenericXdsConfig{ @@ -379,6 +452,9 @@ func (s) TestCSDS(t *testing.T) { t.Fatal(err) } + // Unblock the resource watchers once more. + unblockResourceWatchers(t, listenerWatcher.waitCh, routeConfigWatcher.waitCh, clusterWatcher.waitCh, endpointsWatcher.waitCh) + // Verify that the xDS client reports the first resource of each type as // being in "NACKed" state, and the second resource of each type to be in // "ACKed" state. The version for the ACKed resource would be "2", while From a08e0237b64fbc9287447e60df73db2449be908a Mon Sep 17 00:00:00 2001 From: Easwar Swaminathan Date: Mon, 12 Aug 2024 16:02:08 +0000 Subject: [PATCH 2/7] pass ctx to unblockResourceWatchers --- xds/csds/csds_e2e_test.go | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/xds/csds/csds_e2e_test.go b/xds/csds/csds_e2e_test.go index edd0fa478ba9..5ab5e2e0e7d9 100644 --- a/xds/csds/csds_e2e_test.go +++ b/xds/csds/csds_e2e_test.go @@ -224,12 +224,16 @@ func startCSDSClientStream(ctx context.Context, t *testing.T, serverAddr string) // we need to write to their wait channel four times. Also, we have to spawn // goroutines to write to these channels because we have no control over the // order of invocation of resource's watch callbacks. -func unblockResourceWatchers(t *testing.T, listenerWatcherWaitCh, routeConfigWatcherWaitCh, clusterWatcherWaitCh, endpointsWatcherWaitCh chan struct{}) { +func unblockResourceWatchers(ctx context.Context, t *testing.T, listenerWatcherWaitCh, routeConfigWatcherWaitCh, clusterWatcherWaitCh, endpointsWatcherWaitCh chan struct{}) { t.Helper() for _, w := range []chan struct{}{listenerWatcherWaitCh, routeConfigWatcherWaitCh, clusterWatcherWaitCh, endpointsWatcherWaitCh} { go func(w chan struct{}) { for i := 0; i < 4; i++ { - w <- struct{}{} + select { + case <-ctx.Done(): + return + case w <- struct{}{}: + } } }(w) } @@ -403,7 +407,7 @@ func (s) TestCSDS(t *testing.T) { } // Unblock the resource watchers. - unblockResourceWatchers(t, listenerWatcher.waitCh, routeConfigWatcher.waitCh, clusterWatcher.waitCh, endpointsWatcher.waitCh) + unblockResourceWatchers(ctx, t, listenerWatcher.waitCh, routeConfigWatcher.waitCh, clusterWatcher.waitCh, endpointsWatcher.waitCh) // Verify that the xDS client reports the resources as being in "ACKed" // state, and in version "1". @@ -453,7 +457,7 @@ func (s) TestCSDS(t *testing.T) { } // Unblock the resource watchers once more. - unblockResourceWatchers(t, listenerWatcher.waitCh, routeConfigWatcher.waitCh, clusterWatcher.waitCh, endpointsWatcher.waitCh) + unblockResourceWatchers(ctx, t, listenerWatcher.waitCh, routeConfigWatcher.waitCh, clusterWatcher.waitCh, endpointsWatcher.waitCh) // Verify that the xDS client reports the first resource of each type as // being in "NACKed" state, and the second resource of each type to be in From 1df7d6abe2cc9ed4c10423a83b745f48df6292a7 Mon Sep 17 00:00:00 2001 From: Easwar Swaminathan Date: Mon, 12 Aug 2024 19:35:56 +0000 Subject: [PATCH 3/7] review comments pass #1 --- xds/csds/csds_e2e_test.go | 98 ++++++++++++++++++++------------------- 1 file changed, 51 insertions(+), 47 deletions(-) diff --git a/xds/csds/csds_e2e_test.go b/xds/csds/csds_e2e_test.go index 5ab5e2e0e7d9..77f4a6bbf06c 100644 --- a/xds/csds/csds_e2e_test.go +++ b/xds/csds/csds_e2e_test.go @@ -68,109 +68,113 @@ func Test(t *testing.T) { // validate or use the received updates from the xDS client. In this test, we // only care whether CSDS reports the expected state. // -// Note: These watchers do use the ADS stream level flow control supported by -// the xDS client. The watch callbacks block on a wait channel that is written -// to by the test, before invoking the onDone callback. This is particularly -// useful when a resource is NACKed, because the go-control-plane management -// server continuously resends the same resource in this case, and applying flow -// control from these watchers ensures that xDS client does not spend all of its -// time receiving and NACKing updates from the maangement server. This was -// indeed the case on arm64 (before we had support for ADS stream level flow -// control), and was causing CSDS to not receive any updates in this case. - -func blockAndDone(testDoneCh <-chan struct{}, waitCh chan struct{}, onDone xdsresource.DoneNotifier) { +// Note: These watchers do use the ADS stream level flow control mechanism +// supported by the xDS client. The watch callbacks block on a channel that is +// written to by the test, before invoking the onDone callback. This is +// particularly useful when a resource is NACKed, because the go-control-plane +// management server continuously resends the same resource in this case, and +// applying flow control from these watchers ensures that xDS client does not +// spend all of its time receiving and NACKing updates from the management +// server. This was indeed the case on arm64 (before we had support for ADS +// stream level flow control), and was causing CSDS to not receive any updates +// from the xDS client. + +// blockAndDone blocks until the test is done or the onDone channel is written +// to. If the latter happens, it invokes the onDone callback which is part of +// the ADS stream level flow control mechanism within the xDS client. +func blockAndDone(testDoneCh <-chan struct{}, triggerOnDoneCh chan struct{}, onDone xdsresource.DoneNotifier) { select { case <-testDoneCh: - case <-waitCh: + case <-triggerOnDoneCh: onDone.OnDone() } } type blockingListenerWatcher struct { - testDoneCh <-chan struct{} // Closed when the test is done. - waitCh chan struct{} // Written to by the test to unblock the watch callback. + testDoneCh <-chan struct{} // Closed when the test is done. + triggerOnDoneCh chan struct{} // Written to by the test to get the watcher to invoke the onDone callback. } func newBlockingListenerWatcher(doneCh <-chan struct{}) *blockingListenerWatcher { return &blockingListenerWatcher{ - testDoneCh: doneCh, - waitCh: make(chan struct{}, 1), + testDoneCh: doneCh, + triggerOnDoneCh: make(chan struct{}, 1), } } func (w *blockingListenerWatcher) OnUpdate(_ *xdsresource.ListenerResourceData, onDone xdsresource.DoneNotifier) { - blockAndDone(w.testDoneCh, w.waitCh, onDone) + blockAndDone(w.testDoneCh, w.triggerOnDoneCh, onDone) } func (w *blockingListenerWatcher) OnError(_ error, onDone xdsresource.DoneNotifier) { - blockAndDone(w.testDoneCh, w.waitCh, onDone) + blockAndDone(w.testDoneCh, w.triggerOnDoneCh, onDone) } func (w *blockingListenerWatcher) OnResourceDoesNotExist(onDone xdsresource.DoneNotifier) { - blockAndDone(w.testDoneCh, w.waitCh, onDone) + blockAndDone(w.testDoneCh, w.triggerOnDoneCh, onDone) } type blockingRouteConfigWatcher struct { - testDoneCh <-chan struct{} // Closed when the test is done. - waitCh chan struct{} // Written to by the test to unblock the watch callback. + testDoneCh <-chan struct{} // Closed when the test is done. + triggerOnDoneCh chan struct{} // Written to by the test to get the watcher to invoke the onDone callback. } func newBlockingRouteConfigWatcher(doneCh <-chan struct{}) *blockingRouteConfigWatcher { return &blockingRouteConfigWatcher{ - testDoneCh: doneCh, - waitCh: make(chan struct{}, 1), + testDoneCh: doneCh, + triggerOnDoneCh: make(chan struct{}, 1), } } func (w *blockingRouteConfigWatcher) OnUpdate(_ *xdsresource.RouteConfigResourceData, onDone xdsresource.DoneNotifier) { - blockAndDone(w.testDoneCh, w.waitCh, onDone) + blockAndDone(w.testDoneCh, w.triggerOnDoneCh, onDone) } func (w *blockingRouteConfigWatcher) OnError(_ error, onDone xdsresource.DoneNotifier) { - blockAndDone(w.testDoneCh, w.waitCh, onDone) + blockAndDone(w.testDoneCh, w.triggerOnDoneCh, onDone) } func (w *blockingRouteConfigWatcher) OnResourceDoesNotExist(onDone xdsresource.DoneNotifier) { - blockAndDone(w.testDoneCh, w.waitCh, onDone) + blockAndDone(w.testDoneCh, w.triggerOnDoneCh, onDone) } type blockingClusterWatcher struct { - testDoneCh <-chan struct{} // Closed when the test is done. - waitCh chan struct{} // Written to by the test to unblock the watch callback. + testDoneCh <-chan struct{} // Closed when the test is done. + triggerOnDoneCh chan struct{} // Written to by the test to get the watcher to invoke the onDone callback. } func newBlockingClusterWatcher(doneCh <-chan struct{}) *blockingClusterWatcher { return &blockingClusterWatcher{ - testDoneCh: doneCh, - waitCh: make(chan struct{}, 1), + testDoneCh: doneCh, + triggerOnDoneCh: make(chan struct{}, 1), } } func (w *blockingClusterWatcher) OnUpdate(_ *xdsresource.ClusterResourceData, onDone xdsresource.DoneNotifier) { - blockAndDone(w.testDoneCh, w.waitCh, onDone) + blockAndDone(w.testDoneCh, w.triggerOnDoneCh, onDone) } func (w *blockingClusterWatcher) OnError(_ error, onDone xdsresource.DoneNotifier) { - blockAndDone(w.testDoneCh, w.waitCh, onDone) + blockAndDone(w.testDoneCh, w.triggerOnDoneCh, onDone) } func (w *blockingClusterWatcher) OnResourceDoesNotExist(onDone xdsresource.DoneNotifier) { - blockAndDone(w.testDoneCh, w.waitCh, onDone) + blockAndDone(w.testDoneCh, w.triggerOnDoneCh, onDone) } type blockingEndpointsWatcher struct { - testDoneCh <-chan struct{} // Closed when the test is done. - waitCh chan struct{} // Written to by the test to unblock the watch callback. + testDoneCh <-chan struct{} // Closed when the test is done. + triggerOnDoneCh chan struct{} // Written to by the test to get the watcher to invoke the onDone callback. } func newBlockingEndpointsWatcher(doneCh <-chan struct{}) *blockingEndpointsWatcher { return &blockingEndpointsWatcher{ - testDoneCh: doneCh, - waitCh: make(chan struct{}, 1), + testDoneCh: doneCh, + triggerOnDoneCh: make(chan struct{}, 1), } } func (w *blockingEndpointsWatcher) OnUpdate(_ *xdsresource.EndpointsResourceData, onDone xdsresource.DoneNotifier) { - blockAndDone(w.testDoneCh, w.waitCh, onDone) + blockAndDone(w.testDoneCh, w.triggerOnDoneCh, onDone) } func (w *blockingEndpointsWatcher) OnError(_ error, onDone xdsresource.DoneNotifier) { - blockAndDone(w.testDoneCh, w.waitCh, onDone) + blockAndDone(w.testDoneCh, w.triggerOnDoneCh, onDone) } func (w *blockingEndpointsWatcher) OnResourceDoesNotExist(onDone xdsresource.DoneNotifier) { - blockAndDone(w.testDoneCh, w.waitCh, onDone) + blockAndDone(w.testDoneCh, w.triggerOnDoneCh, onDone) } // Creates a gRPC server and starts serving a CSDS service implementation on it. @@ -219,14 +223,14 @@ func startCSDSClientStream(ctx context.Context, t *testing.T, serverAddr string) return stream } -// Unblock the resource watchers. Since we use the same resource watchers -// for both xDS clients and each of them watches two resources of each type, -// we need to write to their wait channel four times. Also, we have to spawn +// Unblock the resource watchers. Since we use the same resource watchers for +// both xDS clients and each of them watches two resources of each type, we need +// to write to their onDone channels four times. Also, we have to spawn // goroutines to write to these channels because we have no control over the // order of invocation of resource's watch callbacks. -func unblockResourceWatchers(ctx context.Context, t *testing.T, listenerWatcherWaitCh, routeConfigWatcherWaitCh, clusterWatcherWaitCh, endpointsWatcherWaitCh chan struct{}) { +func unblockResourceWatchers(ctx context.Context, t *testing.T, listenerOnDoneCh, routeConfigOnDoneCh, clusterOnDoneCh, endpointsOnDoneCh chan struct{}) { t.Helper() - for _, w := range []chan struct{}{listenerWatcherWaitCh, routeConfigWatcherWaitCh, clusterWatcherWaitCh, endpointsWatcherWaitCh} { + for _, w := range []chan struct{}{listenerOnDoneCh, routeConfigOnDoneCh, clusterOnDoneCh, endpointsOnDoneCh} { go func(w chan struct{}) { for i := 0; i < 4; i++ { select { @@ -407,7 +411,7 @@ func (s) TestCSDS(t *testing.T) { } // Unblock the resource watchers. - unblockResourceWatchers(ctx, t, listenerWatcher.waitCh, routeConfigWatcher.waitCh, clusterWatcher.waitCh, endpointsWatcher.waitCh) + unblockResourceWatchers(ctx, t, listenerWatcher.triggerOnDoneCh, routeConfigWatcher.triggerOnDoneCh, clusterWatcher.triggerOnDoneCh, endpointsWatcher.triggerOnDoneCh) // Verify that the xDS client reports the resources as being in "ACKed" // state, and in version "1". @@ -457,7 +461,7 @@ func (s) TestCSDS(t *testing.T) { } // Unblock the resource watchers once more. - unblockResourceWatchers(ctx, t, listenerWatcher.waitCh, routeConfigWatcher.waitCh, clusterWatcher.waitCh, endpointsWatcher.waitCh) + unblockResourceWatchers(ctx, t, listenerWatcher.triggerOnDoneCh, routeConfigWatcher.triggerOnDoneCh, clusterWatcher.triggerOnDoneCh, endpointsWatcher.triggerOnDoneCh) // Verify that the xDS client reports the first resource of each type as // being in "NACKed" state, and the second resource of each type to be in From c8ceee34a163e4273515ab1230674a82019f2cc2 Mon Sep 17 00:00:00 2001 From: Easwar Swaminathan Date: Mon, 12 Aug 2024 19:38:14 +0000 Subject: [PATCH 4/7] delete comment --- xds/csds/csds_e2e_test.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/xds/csds/csds_e2e_test.go b/xds/csds/csds_e2e_test.go index 77f4a6bbf06c..379342e2b258 100644 --- a/xds/csds/csds_e2e_test.go +++ b/xds/csds/csds_e2e_test.go @@ -410,7 +410,6 @@ func (s) TestCSDS(t *testing.T) { t.Fatal(err) } - // Unblock the resource watchers. unblockResourceWatchers(ctx, t, listenerWatcher.triggerOnDoneCh, routeConfigWatcher.triggerOnDoneCh, clusterWatcher.triggerOnDoneCh, endpointsWatcher.triggerOnDoneCh) // Verify that the xDS client reports the resources as being in "ACKed" @@ -460,7 +459,6 @@ func (s) TestCSDS(t *testing.T) { t.Fatal(err) } - // Unblock the resource watchers once more. unblockResourceWatchers(ctx, t, listenerWatcher.triggerOnDoneCh, routeConfigWatcher.triggerOnDoneCh, clusterWatcher.triggerOnDoneCh, endpointsWatcher.triggerOnDoneCh) // Verify that the xDS client reports the first resource of each type as From fd7cc5a363f1db9362f6b38800ae3013d3d3bc66 Mon Sep 17 00:00:00 2001 From: Easwar Swaminathan Date: Wed, 14 Aug 2024 14:39:24 +0000 Subject: [PATCH 5/7] WIP: fix flakyness in test --- xds/csds/csds_e2e_test.go | 262 ++++++++++++++++++++++++-------------- 1 file changed, 168 insertions(+), 94 deletions(-) diff --git a/xds/csds/csds_e2e_test.go b/xds/csds/csds_e2e_test.go index 379342e2b258..344386dd2061 100644 --- a/xds/csds/csds_e2e_test.go +++ b/xds/csds/csds_e2e_test.go @@ -24,6 +24,7 @@ import ( "io" "slices" "strings" + "sync" "testing" "time" @@ -31,6 +32,7 @@ import ( "github.com/google/uuid" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/grpclog" "google.golang.org/grpc/internal/grpctest" "google.golang.org/grpc/internal/pretty" "google.golang.org/grpc/internal/testutils" @@ -68,113 +70,119 @@ func Test(t *testing.T) { // validate or use the received updates from the xDS client. In this test, we // only care whether CSDS reports the expected state. // -// Note: These watchers do use the ADS stream level flow control mechanism -// supported by the xDS client. The watch callbacks block on a channel that is -// written to by the test, before invoking the onDone callback. This is -// particularly useful when a resource is NACKed, because the go-control-plane -// management server continuously resends the same resource in this case, and -// applying flow control from these watchers ensures that xDS client does not -// spend all of its time receiving and NACKing updates from the management -// server. This was indeed the case on arm64 (before we had support for ADS -// stream level flow control), and was causing CSDS to not receive any updates -// from the xDS client. - -// blockAndDone blocks until the test is done or the onDone channel is written -// to. If the latter happens, it invokes the onDone callback which is part of -// the ADS stream level flow control mechanism within the xDS client. -func blockAndDone(testDoneCh <-chan struct{}, triggerOnDoneCh chan struct{}, onDone xdsresource.DoneNotifier) { +// Note: These watchers write the onDone callback on to a channel for the test +// to invoke them when it wants to unblock the next read on the ADS stream in +// the xDS client. This is particularly useful when a resource is NACKed, +// because the go-control-plane management server continuously resends the same +// resource in this case, and applying flow control from these watchers ensures +// that xDS client does not spend all of its time receiving and NACKing updates +// from the management server. This was indeed the case on arm64 (before we had +// support for ADS stream level flow control), and was causing CSDS to not +// receive any updates from the xDS client. + +// timeBoundedWrite attempts to writes the onDone callback on the onDone +// channel. It returns when it can successfully write to the channel or when the +// test is done, which is signalled by testDoneCh being closed. +func timeBoundedWrite(testCtxDone <-chan struct{}, onDoneCh chan xdsresource.DoneNotifier, onDone xdsresource.DoneNotifier) { select { - case <-testDoneCh: - case <-triggerOnDoneCh: - onDone.OnDone() + case <-testCtxDone: + case onDoneCh <- onDone: } } type blockingListenerWatcher struct { - testDoneCh <-chan struct{} // Closed when the test is done. - triggerOnDoneCh chan struct{} // Written to by the test to get the watcher to invoke the onDone callback. + testCtxDone <-chan struct{} + onDoneCh chan xdsresource.DoneNotifier } -func newBlockingListenerWatcher(doneCh <-chan struct{}) *blockingListenerWatcher { +func newBlockingListenerWatcher(testCtxDone <-chan struct{}) *blockingListenerWatcher { return &blockingListenerWatcher{ - testDoneCh: doneCh, - triggerOnDoneCh: make(chan struct{}, 1), + testCtxDone: testCtxDone, + onDoneCh: make(chan xdsresource.DoneNotifier, 1), } } func (w *blockingListenerWatcher) OnUpdate(_ *xdsresource.ListenerResourceData, onDone xdsresource.DoneNotifier) { - blockAndDone(w.testDoneCh, w.triggerOnDoneCh, onDone) + grpclog.Infof("easwars: received listener update") + timeBoundedWrite(w.testCtxDone, w.onDoneCh, onDone) } func (w *blockingListenerWatcher) OnError(_ error, onDone xdsresource.DoneNotifier) { - blockAndDone(w.testDoneCh, w.triggerOnDoneCh, onDone) + grpclog.Infof("easwars: received listener error") + timeBoundedWrite(w.testCtxDone, w.onDoneCh, onDone) } func (w *blockingListenerWatcher) OnResourceDoesNotExist(onDone xdsresource.DoneNotifier) { - blockAndDone(w.testDoneCh, w.triggerOnDoneCh, onDone) + timeBoundedWrite(w.testCtxDone, w.onDoneCh, onDone) } type blockingRouteConfigWatcher struct { - testDoneCh <-chan struct{} // Closed when the test is done. - triggerOnDoneCh chan struct{} // Written to by the test to get the watcher to invoke the onDone callback. + testCtxDone <-chan struct{} + onDoneCh chan xdsresource.DoneNotifier } -func newBlockingRouteConfigWatcher(doneCh <-chan struct{}) *blockingRouteConfigWatcher { +func newBlockingRouteConfigWatcher(testCtxDone <-chan struct{}) *blockingRouteConfigWatcher { return &blockingRouteConfigWatcher{ - testDoneCh: doneCh, - triggerOnDoneCh: make(chan struct{}, 1), + testCtxDone: testCtxDone, + onDoneCh: make(chan xdsresource.DoneNotifier, 1), } } func (w *blockingRouteConfigWatcher) OnUpdate(_ *xdsresource.RouteConfigResourceData, onDone xdsresource.DoneNotifier) { - blockAndDone(w.testDoneCh, w.triggerOnDoneCh, onDone) + grpclog.Infof("easwars: received route config update") + timeBoundedWrite(w.testCtxDone, w.onDoneCh, onDone) } func (w *blockingRouteConfigWatcher) OnError(_ error, onDone xdsresource.DoneNotifier) { - blockAndDone(w.testDoneCh, w.triggerOnDoneCh, onDone) + grpclog.Infof("easwars: received route config error") + timeBoundedWrite(w.testCtxDone, w.onDoneCh, onDone) } func (w *blockingRouteConfigWatcher) OnResourceDoesNotExist(onDone xdsresource.DoneNotifier) { - blockAndDone(w.testDoneCh, w.triggerOnDoneCh, onDone) + timeBoundedWrite(w.testCtxDone, w.onDoneCh, onDone) } type blockingClusterWatcher struct { - testDoneCh <-chan struct{} // Closed when the test is done. - triggerOnDoneCh chan struct{} // Written to by the test to get the watcher to invoke the onDone callback. + testCtxDone <-chan struct{} + onDoneCh chan xdsresource.DoneNotifier } -func newBlockingClusterWatcher(doneCh <-chan struct{}) *blockingClusterWatcher { +func newBlockingClusterWatcher(testCtxDone <-chan struct{}) *blockingClusterWatcher { return &blockingClusterWatcher{ - testDoneCh: doneCh, - triggerOnDoneCh: make(chan struct{}, 1), + testCtxDone: testCtxDone, + onDoneCh: make(chan xdsresource.DoneNotifier, 1), } } func (w *blockingClusterWatcher) OnUpdate(_ *xdsresource.ClusterResourceData, onDone xdsresource.DoneNotifier) { - blockAndDone(w.testDoneCh, w.triggerOnDoneCh, onDone) + grpclog.Infof("easwars: received cluster update") + timeBoundedWrite(w.testCtxDone, w.onDoneCh, onDone) } func (w *blockingClusterWatcher) OnError(_ error, onDone xdsresource.DoneNotifier) { - blockAndDone(w.testDoneCh, w.triggerOnDoneCh, onDone) + grpclog.Infof("easwars: received cluster error") + timeBoundedWrite(w.testCtxDone, w.onDoneCh, onDone) } func (w *blockingClusterWatcher) OnResourceDoesNotExist(onDone xdsresource.DoneNotifier) { - blockAndDone(w.testDoneCh, w.triggerOnDoneCh, onDone) + timeBoundedWrite(w.testCtxDone, w.onDoneCh, onDone) } type blockingEndpointsWatcher struct { - testDoneCh <-chan struct{} // Closed when the test is done. - triggerOnDoneCh chan struct{} // Written to by the test to get the watcher to invoke the onDone callback. + testCtxDone <-chan struct{} + onDoneCh chan xdsresource.DoneNotifier } -func newBlockingEndpointsWatcher(doneCh <-chan struct{}) *blockingEndpointsWatcher { +func newBlockingEndpointsWatcher(testCtxDone <-chan struct{}) *blockingEndpointsWatcher { return &blockingEndpointsWatcher{ - testDoneCh: doneCh, - triggerOnDoneCh: make(chan struct{}, 1), + testCtxDone: testCtxDone, + onDoneCh: make(chan xdsresource.DoneNotifier, 1), } } func (w *blockingEndpointsWatcher) OnUpdate(_ *xdsresource.EndpointsResourceData, onDone xdsresource.DoneNotifier) { - blockAndDone(w.testDoneCh, w.triggerOnDoneCh, onDone) + grpclog.Infof("easwars: received endpoints update") + timeBoundedWrite(w.testCtxDone, w.onDoneCh, onDone) } func (w *blockingEndpointsWatcher) OnError(_ error, onDone xdsresource.DoneNotifier) { - blockAndDone(w.testDoneCh, w.triggerOnDoneCh, onDone) + grpclog.Infof("easwars: received endpoints error") + timeBoundedWrite(w.testCtxDone, w.onDoneCh, onDone) } func (w *blockingEndpointsWatcher) OnResourceDoesNotExist(onDone xdsresource.DoneNotifier) { - blockAndDone(w.testDoneCh, w.triggerOnDoneCh, onDone) + timeBoundedWrite(w.testCtxDone, w.onDoneCh, onDone) } // Creates a gRPC server and starts serving a CSDS service implementation on it. @@ -223,26 +231,6 @@ func startCSDSClientStream(ctx context.Context, t *testing.T, serverAddr string) return stream } -// Unblock the resource watchers. Since we use the same resource watchers for -// both xDS clients and each of them watches two resources of each type, we need -// to write to their onDone channels four times. Also, we have to spawn -// goroutines to write to these channels because we have no control over the -// order of invocation of resource's watch callbacks. -func unblockResourceWatchers(ctx context.Context, t *testing.T, listenerOnDoneCh, routeConfigOnDoneCh, clusterOnDoneCh, endpointsOnDoneCh chan struct{}) { - t.Helper() - for _, w := range []chan struct{}{listenerOnDoneCh, routeConfigOnDoneCh, clusterOnDoneCh, endpointsOnDoneCh} { - go func(w chan struct{}) { - for i := 0; i < 4; i++ { - select { - case <-ctx.Done(): - return - case w <- struct{}{}: - } - } - }(w) - } -} - // Tests CSDS functionality. The test performs the following: // - Spins up a management server and creates two xDS clients talking to it. // - Registers a set of watches on the xDS clients, and verifies that the CSDS @@ -346,27 +334,38 @@ func (s) TestCSDS(t *testing.T) { endpointAnys[i] = testutils.MarshalAny(t, endpoints[i]) } - // Create the blocking resource watchers. - listenerWatcher := newBlockingListenerWatcher(ctx.Done()) - routeConfigWatcher := newBlockingRouteConfigWatcher(ctx.Done()) - clusterWatcher := newBlockingClusterWatcher(ctx.Done()) - endpointsWatcher := newBlockingEndpointsWatcher(ctx.Done()) - - // Register watches on the xDS clients for two resources of each type. - for _, xdsC := range []xdsclient.XDSClient{xdsClient1, xdsClient2} { - for _, target := range ldsTargets { - xdsresource.WatchListener(xdsC, target, listenerWatcher) - } - for _, target := range rdsTargets { - xdsresource.WatchRouteConfig(xdsC, target, routeConfigWatcher) - } - for _, target := range cdsTargets { - xdsresource.WatchCluster(xdsC, target, clusterWatcher) - } - for _, target := range edsTargets { - xdsresource.WatchEndpoints(xdsC, target, endpointsWatcher) - } - } + // Create four watchers of each type. We have two resources of each type and + // two xDS clients to watch those resources. Hence four watchers altogether. + var ( + listenerWatchers []*blockingListenerWatcher + routeConfigWatchers []*blockingRouteConfigWatcher + clusterWatchers []*blockingClusterWatcher + endpointsWatchers []*blockingEndpointsWatcher + ) + for i := 0; i < 4; i++ { + listenerWatchers = append(listenerWatchers, newBlockingListenerWatcher(ctx.Done())) + routeConfigWatchers = append(routeConfigWatchers, newBlockingRouteConfigWatcher(ctx.Done())) + clusterWatchers = append(clusterWatchers, newBlockingClusterWatcher(ctx.Done())) + endpointsWatchers = append(endpointsWatchers, newBlockingEndpointsWatcher(ctx.Done())) + } + + // Register watches on the two xDS clients for two resources of each type. + xdsresource.WatchListener(xdsClient1, ldsTargets[0], listenerWatchers[0]) + xdsresource.WatchListener(xdsClient1, ldsTargets[1], listenerWatchers[1]) + xdsresource.WatchListener(xdsClient2, ldsTargets[0], listenerWatchers[2]) + xdsresource.WatchListener(xdsClient2, ldsTargets[1], listenerWatchers[3]) + xdsresource.WatchRouteConfig(xdsClient1, rdsTargets[0], routeConfigWatchers[0]) + xdsresource.WatchRouteConfig(xdsClient1, rdsTargets[1], routeConfigWatchers[1]) + xdsresource.WatchRouteConfig(xdsClient2, rdsTargets[0], routeConfigWatchers[2]) + xdsresource.WatchRouteConfig(xdsClient2, rdsTargets[1], routeConfigWatchers[3]) + xdsresource.WatchCluster(xdsClient1, cdsTargets[0], clusterWatchers[0]) + xdsresource.WatchCluster(xdsClient1, cdsTargets[1], clusterWatchers[1]) + xdsresource.WatchCluster(xdsClient2, cdsTargets[0], clusterWatchers[2]) + xdsresource.WatchCluster(xdsClient2, cdsTargets[1], clusterWatchers[3]) + xdsresource.WatchEndpoints(xdsClient1, edsTargets[0], endpointsWatchers[0]) + xdsresource.WatchEndpoints(xdsClient1, edsTargets[1], endpointsWatchers[1]) + xdsresource.WatchEndpoints(xdsClient2, edsTargets[0], endpointsWatchers[2]) + xdsresource.WatchEndpoints(xdsClient2, edsTargets[1], endpointsWatchers[3]) // Verify that the xDS client reports the resources as being in "Requested" // state, and in version "0". @@ -410,7 +409,82 @@ func (s) TestCSDS(t *testing.T) { t.Fatal(err) } - unblockResourceWatchers(ctx, t, listenerWatcher.triggerOnDoneCh, routeConfigWatcher.triggerOnDoneCh, clusterWatcher.triggerOnDoneCh, endpointsWatcher.triggerOnDoneCh) + // Unblock the resource watchers. Watchers of one type are invoked once a + // response for that resource type is received, and until all those watchers + // invoke their onDone callbacks, the next resource type cannot be read off + // the wire. So, we need to spawn a goroutine for each resource type and + // once the update is received for that type, we need to invoke the watchers + // to unblock the next read. + unblockResourceWatchers := func() { + var wg sync.WaitGroup + wg.Add(4) + go func() { + defer wg.Done() + var onDones []xdsresource.DoneNotifier + for i := 0; i < 4; i++ { + select { + case <-ctx.Done(): + t.Fatal("Timeout when waiting for listener watch callback to be invoked") + case onDone := <-listenerWatchers[i].onDoneCh: + onDones = append(onDones, onDone) + } + } + for _, onDone := range onDones { + grpclog.Infof("easwars: invoking onDone for a listener") + onDone.OnDone() + } + }() + go func() { + defer wg.Done() + var onDones []xdsresource.DoneNotifier + for i := 0; i < 4; i++ { + select { + case <-ctx.Done(): + t.Fatal("Timeout when waiting for route configuration watch callback to be invoked") + case onDone := <-routeConfigWatchers[i].onDoneCh: + onDones = append(onDones, onDone) + } + } + for _, onDone := range onDones { + grpclog.Infof("easwars: invoking onDone for a route configuration") + onDone.OnDone() + } + }() + go func() { + defer wg.Done() + var onDones []xdsresource.DoneNotifier + for i := 0; i < 4; i++ { + select { + case <-ctx.Done(): + t.Fatal("Timeout when waiting for cluster watch callback to be invoked") + case onDone := <-clusterWatchers[i].onDoneCh: + onDones = append(onDones, onDone) + } + } + for _, onDone := range onDones { + grpclog.Infof("easwars: invoking onDone for a cluster") + onDone.OnDone() + } + }() + go func() { + defer wg.Done() + var onDones []xdsresource.DoneNotifier + for i := 0; i < 4; i++ { + select { + case <-ctx.Done(): + t.Fatal("Timeout when waiting for endpoints watch callback to be invoked") + case onDone := <-endpointsWatchers[i].onDoneCh: + onDones = append(onDones, onDone) + } + } + for _, onDone := range onDones { + grpclog.Infof("easwars: invoking onDone for a endpoints") + onDone.OnDone() + } + }() + wg.Wait() + } + unblockResourceWatchers() // Verify that the xDS client reports the resources as being in "ACKed" // state, and in version "1". @@ -459,7 +533,7 @@ func (s) TestCSDS(t *testing.T) { t.Fatal(err) } - unblockResourceWatchers(ctx, t, listenerWatcher.triggerOnDoneCh, routeConfigWatcher.triggerOnDoneCh, clusterWatcher.triggerOnDoneCh, endpointsWatcher.triggerOnDoneCh) + unblockResourceWatchers() // Verify that the xDS client reports the first resource of each type as // being in "NACKed" state, and the second resource of each type to be in @@ -557,7 +631,7 @@ func checkClientStatusResponse(ctx context.Context, stream v3statuspbgrpc.Client } lastErr = fmt.Errorf("received unexpected resource dump, diff (-got, +want):\n%s, got: %s\n want:%s", diff, pretty.ToJSON(got), pretty.ToJSON(want)) } - return fmt.Errorf("timeout when waiting for resource dump to reach expected state: %v", lastErr) + return fmt.Errorf("timeout when waiting for resource dump to reach expected state: ctxErr: %v, otherErr: %v", ctx.Err(), lastErr) } func (s) TestCSDSNoXDSClient(t *testing.T) { From 9530e696c7566ca799485c57d6ad1d3db1310e87 Mon Sep 17 00:00:00 2001 From: Easwar Swaminathan Date: Wed, 14 Aug 2024 23:49:22 +0000 Subject: [PATCH 6/7] split and rewrite the test for NACK case --- xds/csds/csds_e2e_test.go | 498 ++++++++++++++++++++------------------ 1 file changed, 261 insertions(+), 237 deletions(-) diff --git a/xds/csds/csds_e2e_test.go b/xds/csds/csds_e2e_test.go index 344386dd2061..7bcc9115e97b 100644 --- a/xds/csds/csds_e2e_test.go +++ b/xds/csds/csds_e2e_test.go @@ -24,7 +24,6 @@ import ( "io" "slices" "strings" - "sync" "testing" "time" @@ -32,7 +31,6 @@ import ( "github.com/google/uuid" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" - "google.golang.org/grpc/grpclog" "google.golang.org/grpc/internal/grpctest" "google.golang.org/grpc/internal/pretty" "google.golang.org/grpc/internal/testutils" @@ -66,123 +64,97 @@ func Test(t *testing.T) { grpctest.RunSubTests(t, s{}) } -// The following watcher implementations are no-ops in the sense that they don't -// validate or use the received updates from the xDS client. In this test, we -// only care whether CSDS reports the expected state. -// -// Note: These watchers write the onDone callback on to a channel for the test -// to invoke them when it wants to unblock the next read on the ADS stream in -// the xDS client. This is particularly useful when a resource is NACKed, -// because the go-control-plane management server continuously resends the same -// resource in this case, and applying flow control from these watchers ensures -// that xDS client does not spend all of its time receiving and NACKing updates -// from the management server. This was indeed the case on arm64 (before we had -// support for ADS stream level flow control), and was causing CSDS to not -// receive any updates from the xDS client. - -// timeBoundedWrite attempts to writes the onDone callback on the onDone -// channel. It returns when it can successfully write to the channel or when the -// test is done, which is signalled by testDoneCh being closed. -func timeBoundedWrite(testCtxDone <-chan struct{}, onDoneCh chan xdsresource.DoneNotifier, onDone xdsresource.DoneNotifier) { - select { - case <-testCtxDone: - case onDoneCh <- onDone: - } -} +// The following watcher implementations are no-ops since we don't really care +// about the callback received by these watchers in the test. We only care +// whether CSDS reports the expected state. -type blockingListenerWatcher struct { - testCtxDone <-chan struct{} - onDoneCh chan xdsresource.DoneNotifier -} +type nopListenerWatcher struct{} -func newBlockingListenerWatcher(testCtxDone <-chan struct{}) *blockingListenerWatcher { - return &blockingListenerWatcher{ - testCtxDone: testCtxDone, - onDoneCh: make(chan xdsresource.DoneNotifier, 1), - } +func (nopListenerWatcher) OnUpdate(_ *xdsresource.ListenerResourceData, onDone xdsresource.DoneNotifier) { + onDone.OnDone() } - -func (w *blockingListenerWatcher) OnUpdate(_ *xdsresource.ListenerResourceData, onDone xdsresource.DoneNotifier) { - grpclog.Infof("easwars: received listener update") - timeBoundedWrite(w.testCtxDone, w.onDoneCh, onDone) +func (nopListenerWatcher) OnError(_ error, onDone xdsresource.DoneNotifier) { + onDone.OnDone() } -func (w *blockingListenerWatcher) OnError(_ error, onDone xdsresource.DoneNotifier) { - grpclog.Infof("easwars: received listener error") - timeBoundedWrite(w.testCtxDone, w.onDoneCh, onDone) -} -func (w *blockingListenerWatcher) OnResourceDoesNotExist(onDone xdsresource.DoneNotifier) { - timeBoundedWrite(w.testCtxDone, w.onDoneCh, onDone) +func (nopListenerWatcher) OnResourceDoesNotExist(onDone xdsresource.DoneNotifier) { + onDone.OnDone() } -type blockingRouteConfigWatcher struct { - testCtxDone <-chan struct{} - onDoneCh chan xdsresource.DoneNotifier -} +type nopRouteConfigWatcher struct{} -func newBlockingRouteConfigWatcher(testCtxDone <-chan struct{}) *blockingRouteConfigWatcher { - return &blockingRouteConfigWatcher{ - testCtxDone: testCtxDone, - onDoneCh: make(chan xdsresource.DoneNotifier, 1), - } +func (nopRouteConfigWatcher) OnUpdate(_ *xdsresource.RouteConfigResourceData, onDone xdsresource.DoneNotifier) { + onDone.OnDone() } +func (nopRouteConfigWatcher) OnError(_ error, onDone xdsresource.DoneNotifier) { + onDone.OnDone() +} +func (nopRouteConfigWatcher) OnResourceDoesNotExist(onDone xdsresource.DoneNotifier) { + onDone.OnDone() +} + +type nopClusterWatcher struct{} -func (w *blockingRouteConfigWatcher) OnUpdate(_ *xdsresource.RouteConfigResourceData, onDone xdsresource.DoneNotifier) { - grpclog.Infof("easwars: received route config update") - timeBoundedWrite(w.testCtxDone, w.onDoneCh, onDone) +func (nopClusterWatcher) OnUpdate(_ *xdsresource.ClusterResourceData, onDone xdsresource.DoneNotifier) { + onDone.OnDone() } -func (w *blockingRouteConfigWatcher) OnError(_ error, onDone xdsresource.DoneNotifier) { - grpclog.Infof("easwars: received route config error") - timeBoundedWrite(w.testCtxDone, w.onDoneCh, onDone) +func (nopClusterWatcher) OnError(_ error, onDone xdsresource.DoneNotifier) { + onDone.OnDone() } -func (w *blockingRouteConfigWatcher) OnResourceDoesNotExist(onDone xdsresource.DoneNotifier) { - timeBoundedWrite(w.testCtxDone, w.onDoneCh, onDone) +func (nopClusterWatcher) OnResourceDoesNotExist(onDone xdsresource.DoneNotifier) { + onDone.OnDone() } -type blockingClusterWatcher struct { - testCtxDone <-chan struct{} - onDoneCh chan xdsresource.DoneNotifier -} +type nopEndpointsWatcher struct{} -func newBlockingClusterWatcher(testCtxDone <-chan struct{}) *blockingClusterWatcher { - return &blockingClusterWatcher{ - testCtxDone: testCtxDone, - onDoneCh: make(chan xdsresource.DoneNotifier, 1), - } -} -func (w *blockingClusterWatcher) OnUpdate(_ *xdsresource.ClusterResourceData, onDone xdsresource.DoneNotifier) { - grpclog.Infof("easwars: received cluster update") - timeBoundedWrite(w.testCtxDone, w.onDoneCh, onDone) +func (nopEndpointsWatcher) OnUpdate(_ *xdsresource.EndpointsResourceData, onDone xdsresource.DoneNotifier) { + onDone.OnDone() } -func (w *blockingClusterWatcher) OnError(_ error, onDone xdsresource.DoneNotifier) { - grpclog.Infof("easwars: received cluster error") - timeBoundedWrite(w.testCtxDone, w.onDoneCh, onDone) +func (nopEndpointsWatcher) OnError(_ error, onDone xdsresource.DoneNotifier) { + onDone.OnDone() } -func (w *blockingClusterWatcher) OnResourceDoesNotExist(onDone xdsresource.DoneNotifier) { - timeBoundedWrite(w.testCtxDone, w.onDoneCh, onDone) +func (nopEndpointsWatcher) OnResourceDoesNotExist(onDone xdsresource.DoneNotifier) { + onDone.OnDone() } -type blockingEndpointsWatcher struct { - testCtxDone <-chan struct{} - onDoneCh chan xdsresource.DoneNotifier +// This watcher writes the onDone callback on to a channel for the test to +// invoke it when it wants to unblock the next read on the ADS stream in the xDS +// client. This is particularly useful when a resource is NACKed, because the +// go-control-plane management server continuously resends the same resource in +// this case, and applying flow control from these watchers ensures that xDS +// client does not spend all of its time receiving and NACKing updates from the +// management server. This was indeed the case on arm64 (before we had support +// for ADS stream level flow control), and was causing CSDS to not receive any +// updates from the xDS client. +type blockingListenerWatcher struct { + testCtxDone <-chan struct{} // Closed when the test is done. + onDoneCh chan xdsresource.DoneNotifier // Channel to write the onDone callback to. } -func newBlockingEndpointsWatcher(testCtxDone <-chan struct{}) *blockingEndpointsWatcher { - return &blockingEndpointsWatcher{ +func newBlockingListenerWatcher(testCtxDone <-chan struct{}) *blockingListenerWatcher { + return &blockingListenerWatcher{ testCtxDone: testCtxDone, onDoneCh: make(chan xdsresource.DoneNotifier, 1), } } -func (w *blockingEndpointsWatcher) OnUpdate(_ *xdsresource.EndpointsResourceData, onDone xdsresource.DoneNotifier) { - grpclog.Infof("easwars: received endpoints update") - timeBoundedWrite(w.testCtxDone, w.onDoneCh, onDone) +func (w *blockingListenerWatcher) OnUpdate(_ *xdsresource.ListenerResourceData, onDone xdsresource.DoneNotifier) { + writeOnDone(w.testCtxDone, w.onDoneCh, onDone) } -func (w *blockingEndpointsWatcher) OnError(_ error, onDone xdsresource.DoneNotifier) { - grpclog.Infof("easwars: received endpoints error") - timeBoundedWrite(w.testCtxDone, w.onDoneCh, onDone) +func (w *blockingListenerWatcher) OnError(_ error, onDone xdsresource.DoneNotifier) { + writeOnDone(w.testCtxDone, w.onDoneCh, onDone) } -func (w *blockingEndpointsWatcher) OnResourceDoesNotExist(onDone xdsresource.DoneNotifier) { - timeBoundedWrite(w.testCtxDone, w.onDoneCh, onDone) +func (w *blockingListenerWatcher) OnResourceDoesNotExist(onDone xdsresource.DoneNotifier) { + writeOnDone(w.testCtxDone, w.onDoneCh, onDone) +} + +// writeOnDone attempts to writes the onDone callback on the onDone +// channel. It returns when it can successfully write to the channel or when the +// test is done, which is signalled by testCtxDone being closed. +func writeOnDone(testCtxDone <-chan struct{}, onDoneCh chan xdsresource.DoneNotifier, onDone xdsresource.DoneNotifier) { + select { + case <-testCtxDone: + case onDoneCh <- onDone: + } } // Creates a gRPC server and starts serving a CSDS service implementation on it. @@ -238,12 +210,9 @@ func startCSDSClientStream(ctx context.Context, t *testing.T, serverAddr string) // - Configures resources on the management server corresponding to the ones // being watched by the clients, and verifies that the CSDS response reports // resources in ACKED state. -// - Modifies resources on the management server such that some of them are -// expected to be NACKed by the client. Verifies that the CSDS response -// contains some resources in ACKED state and some in NACKED state. // -// For all of the above operations, the test also verifies that the client_scope -// field in the CSDS response is populated appropriately. +// For the above operations, the test also verifies that the client_scope field +// in the CSDS response is populated appropriately. func (s) TestCSDS(t *testing.T) { // Spin up a xDS management server on a local port. mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{}) @@ -334,38 +303,21 @@ func (s) TestCSDS(t *testing.T) { endpointAnys[i] = testutils.MarshalAny(t, endpoints[i]) } - // Create four watchers of each type. We have two resources of each type and - // two xDS clients to watch those resources. Hence four watchers altogether. - var ( - listenerWatchers []*blockingListenerWatcher - routeConfigWatchers []*blockingRouteConfigWatcher - clusterWatchers []*blockingClusterWatcher - endpointsWatchers []*blockingEndpointsWatcher - ) - for i := 0; i < 4; i++ { - listenerWatchers = append(listenerWatchers, newBlockingListenerWatcher(ctx.Done())) - routeConfigWatchers = append(routeConfigWatchers, newBlockingRouteConfigWatcher(ctx.Done())) - clusterWatchers = append(clusterWatchers, newBlockingClusterWatcher(ctx.Done())) - endpointsWatchers = append(endpointsWatchers, newBlockingEndpointsWatcher(ctx.Done())) - } - - // Register watches on the two xDS clients for two resources of each type. - xdsresource.WatchListener(xdsClient1, ldsTargets[0], listenerWatchers[0]) - xdsresource.WatchListener(xdsClient1, ldsTargets[1], listenerWatchers[1]) - xdsresource.WatchListener(xdsClient2, ldsTargets[0], listenerWatchers[2]) - xdsresource.WatchListener(xdsClient2, ldsTargets[1], listenerWatchers[3]) - xdsresource.WatchRouteConfig(xdsClient1, rdsTargets[0], routeConfigWatchers[0]) - xdsresource.WatchRouteConfig(xdsClient1, rdsTargets[1], routeConfigWatchers[1]) - xdsresource.WatchRouteConfig(xdsClient2, rdsTargets[0], routeConfigWatchers[2]) - xdsresource.WatchRouteConfig(xdsClient2, rdsTargets[1], routeConfigWatchers[3]) - xdsresource.WatchCluster(xdsClient1, cdsTargets[0], clusterWatchers[0]) - xdsresource.WatchCluster(xdsClient1, cdsTargets[1], clusterWatchers[1]) - xdsresource.WatchCluster(xdsClient2, cdsTargets[0], clusterWatchers[2]) - xdsresource.WatchCluster(xdsClient2, cdsTargets[1], clusterWatchers[3]) - xdsresource.WatchEndpoints(xdsClient1, edsTargets[0], endpointsWatchers[0]) - xdsresource.WatchEndpoints(xdsClient1, edsTargets[1], endpointsWatchers[1]) - xdsresource.WatchEndpoints(xdsClient2, edsTargets[0], endpointsWatchers[2]) - xdsresource.WatchEndpoints(xdsClient2, edsTargets[1], endpointsWatchers[3]) + // Register watches on the xDS clients for two resources of each type. + for _, xdsC := range []xdsclient.XDSClient{xdsClient1, xdsClient2} { + for _, target := range ldsTargets { + xdsresource.WatchListener(xdsC, target, nopListenerWatcher{}) + } + for _, target := range rdsTargets { + xdsresource.WatchRouteConfig(xdsC, target, nopRouteConfigWatcher{}) + } + for _, target := range cdsTargets { + xdsresource.WatchCluster(xdsC, target, nopClusterWatcher{}) + } + for _, target := range edsTargets { + xdsresource.WatchEndpoints(xdsC, target, nopEndpointsWatcher{}) + } + } // Verify that the xDS client reports the resources as being in "Requested" // state, and in version "0". @@ -409,83 +361,6 @@ func (s) TestCSDS(t *testing.T) { t.Fatal(err) } - // Unblock the resource watchers. Watchers of one type are invoked once a - // response for that resource type is received, and until all those watchers - // invoke their onDone callbacks, the next resource type cannot be read off - // the wire. So, we need to spawn a goroutine for each resource type and - // once the update is received for that type, we need to invoke the watchers - // to unblock the next read. - unblockResourceWatchers := func() { - var wg sync.WaitGroup - wg.Add(4) - go func() { - defer wg.Done() - var onDones []xdsresource.DoneNotifier - for i := 0; i < 4; i++ { - select { - case <-ctx.Done(): - t.Fatal("Timeout when waiting for listener watch callback to be invoked") - case onDone := <-listenerWatchers[i].onDoneCh: - onDones = append(onDones, onDone) - } - } - for _, onDone := range onDones { - grpclog.Infof("easwars: invoking onDone for a listener") - onDone.OnDone() - } - }() - go func() { - defer wg.Done() - var onDones []xdsresource.DoneNotifier - for i := 0; i < 4; i++ { - select { - case <-ctx.Done(): - t.Fatal("Timeout when waiting for route configuration watch callback to be invoked") - case onDone := <-routeConfigWatchers[i].onDoneCh: - onDones = append(onDones, onDone) - } - } - for _, onDone := range onDones { - grpclog.Infof("easwars: invoking onDone for a route configuration") - onDone.OnDone() - } - }() - go func() { - defer wg.Done() - var onDones []xdsresource.DoneNotifier - for i := 0; i < 4; i++ { - select { - case <-ctx.Done(): - t.Fatal("Timeout when waiting for cluster watch callback to be invoked") - case onDone := <-clusterWatchers[i].onDoneCh: - onDones = append(onDones, onDone) - } - } - for _, onDone := range onDones { - grpclog.Infof("easwars: invoking onDone for a cluster") - onDone.OnDone() - } - }() - go func() { - defer wg.Done() - var onDones []xdsresource.DoneNotifier - for i := 0; i < 4; i++ { - select { - case <-ctx.Done(): - t.Fatal("Timeout when waiting for endpoints watch callback to be invoked") - case onDone := <-endpointsWatchers[i].onDoneCh: - onDones = append(onDones, onDone) - } - } - for _, onDone := range onDones { - grpclog.Infof("easwars: invoking onDone for a endpoints") - onDone.OnDone() - } - }() - wg.Wait() - } - unblockResourceWatchers() - // Verify that the xDS client reports the resources as being in "ACKed" // state, and in version "1". wantConfigs = []*v3statuspb.ClientConfig_GenericXdsConfig{ @@ -515,52 +390,201 @@ func (s) TestCSDS(t *testing.T) { if err := checkClientStatusResponse(ctx, stream, wantResp); err != nil { t.Fatal(err) } +} - // Update the first resource of each type in the management server to a - // value which is expected to be NACK'ed by the xDS client. - listeners[0].ApiListener = &v3listenerpb.ApiListener{} - routes[0].VirtualHosts = []*v3routepb.VirtualHost{{Routes: []*v3routepb.Route{{}}}} - clusters[0].ClusterDiscoveryType = &v3clusterpb.Cluster_Type{Type: v3clusterpb.Cluster_STATIC} - endpoints[0].Endpoints = []*v3endpointpb.LocalityLbEndpoints{{}} +// Tests CSDS functionality. The test performs the following: +// - Spins up a management server and creates two xDS clients talking to it. +// - Registers one watch on each xDS client, and verifies that the CSDS +// response reports resources in REQUESTED state. +// - Configures two resources on the management server such that one of them +// is expected to be NACKed by the client. Verifies that the CSDS response +// contains one resource in ACKED state and one in NACKED state. +// +// For the above operations, the test also verifies that the client_scope field +// in the CSDS response is populated appropriately. +// +// This test does a bunch of similar things to the previous test, but has +// reduced complexity because of having to deal with a single resource type. +// This makes is possible to test the NACKing a resource (which results in +// continuous resending of the resource by the go-control-plane management +// server), in an easier and less flaky way. +func (s) TestCSDS_NACK(t *testing.T) { + // Spin up a xDS management server on a local port. + mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{AllowResourceSubset: true}) + + // Create a bootstrap contents pointing to the above management server. + nodeID := uuid.New().String() + bootstrapContents := e2e.DefaultBootstrapContents(t, nodeID, mgmtServer.Address) + + // Create two xDS clients, with different names. These should end up + // creating two different xDS clients. + const xdsClient1Name = "xds-csds-client-1" + xdsClient1, xdsClose1, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{ + Name: xdsClient1Name, + Contents: bootstrapContents, + }) + if err != nil { + t.Fatalf("Failed to create xDS client: %v", err) + } + defer xdsClose1() + const xdsClient2Name = "xds-csds-client-2" + xdsClient2, xdsClose2, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{ + Name: xdsClient2Name, + Contents: bootstrapContents, + }) + if err != nil { + t.Fatalf("Failed to create xDS client: %v", err) + } + defer xdsClose2() + + // Start a CSDS server and create a client stream to it. + addr := startCSDSServer(t) + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + stream := startCSDSClientStream(ctx, t, addr) + + // Verify that the xDS client reports an empty config. + wantNode := &v3corepb.Node{ + Id: nodeID, + UserAgentName: "gRPC Go", + UserAgentVersionType: &v3corepb.Node_UserAgentVersion{UserAgentVersion: grpc.Version}, + ClientFeatures: []string{"envoy.lb.does_not_support_overprovisioning", "xds.config.resource-in-sotw"}, + } + wantResp := &v3statuspb.ClientStatusResponse{ + Config: []*v3statuspb.ClientConfig{ + { + Node: wantNode, + ClientScope: xdsClient1Name, + }, + { + Node: wantNode, + ClientScope: xdsClient2Name, + }, + }, + } + if err := checkClientStatusResponse(ctx, stream, wantResp); err != nil { + t.Fatal(err) + } + + // Initialize the xDS resources to be used in this test. + const ldsTarget0, ldsTarget1 = "lds.target.good:0000", "lds.target.good:1111" + listener0 := e2e.DefaultClientListener(ldsTarget0, "rds-name") + listener1 := e2e.DefaultClientListener(ldsTarget1, "rds-name") + listenerAny0 := testutils.MarshalAny(t, listener0) + listenerAny1 := testutils.MarshalAny(t, listener1) + + // Register the watchers, one for each xDS client. + watcher1 := nopListenerWatcher{} + watcher2 := newBlockingListenerWatcher(ctx.Done()) + xdsresource.WatchListener(xdsClient1, ldsTarget0, watcher1) + xdsresource.WatchListener(xdsClient2, ldsTarget1, watcher2) + + // Verify that the xDS client reports the resources as being in "Requested" + // state, and in version "0". + wantResp = &v3statuspb.ClientStatusResponse{ + Config: []*v3statuspb.ClientConfig{ + { + Node: wantNode, + GenericXdsConfigs: []*v3statuspb.ClientConfig_GenericXdsConfig{ + makeGenericXdsConfig("type.googleapis.com/envoy.config.listener.v3.Listener", ldsTarget0, "", v3adminpb.ClientResourceStatus_REQUESTED, nil, nil), + }, + ClientScope: xdsClient1Name, + }, + { + Node: wantNode, + GenericXdsConfigs: []*v3statuspb.ClientConfig_GenericXdsConfig{ + makeGenericXdsConfig("type.googleapis.com/envoy.config.listener.v3.Listener", ldsTarget1, "", v3adminpb.ClientResourceStatus_REQUESTED, nil, nil), + }, + ClientScope: xdsClient2Name, + }, + }, + } + if err := checkClientStatusResponse(ctx, stream, wantResp); err != nil { + t.Fatal(err) + } + + // Configure the management server with two listener resources corresponding + // to the watches registered above. if err := mgmtServer.Update(ctx, e2e.UpdateOptions{ NodeID: nodeID, - Listeners: listeners, - Routes: routes, - Clusters: clusters, - Endpoints: endpoints, + Listeners: []*v3listenerpb.Listener{listener0, listener1}, SkipValidation: true, }); err != nil { t.Fatal(err) } - unblockResourceWatchers() + // Verify that the xDS client reports the resources as being in "ACKed" + // state, and in version "1". + wantResp = &v3statuspb.ClientStatusResponse{ + Config: []*v3statuspb.ClientConfig{ + { + Node: wantNode, + GenericXdsConfigs: []*v3statuspb.ClientConfig_GenericXdsConfig{ + makeGenericXdsConfig("type.googleapis.com/envoy.config.listener.v3.Listener", ldsTarget0, "1", v3adminpb.ClientResourceStatus_ACKED, listenerAny0, nil), + }, + ClientScope: xdsClient1Name, + }, + { + Node: wantNode, + GenericXdsConfigs: []*v3statuspb.ClientConfig_GenericXdsConfig{ + makeGenericXdsConfig("type.googleapis.com/envoy.config.listener.v3.Listener", ldsTarget1, "1", v3adminpb.ClientResourceStatus_ACKED, listenerAny1, nil), + }, + ClientScope: xdsClient2Name, + }, + }, + } + if err := checkClientStatusResponse(ctx, stream, wantResp); err != nil { + t.Fatal(err) + } + + // Unblock reads on the ADS stream by calling the onDone callback sent to + // the watcher. + select { + case <-ctx.Done(): + t.Fatal("Timed out waiting for watch callback") + case onDone := <-watcher2.onDoneCh: + onDone.OnDone() + } - // Verify that the xDS client reports the first resource of each type as - // being in "NACKed" state, and the second resource of each type to be in - // "ACKed" state. The version for the ACKed resource would be "2", while - // that for the NACKed resource would be "1". In the NACKed resource, the - // version which is NACKed is stored in the ErrorState field. - wantConfigs = []*v3statuspb.ClientConfig_GenericXdsConfig{ - makeGenericXdsConfig("type.googleapis.com/envoy.config.cluster.v3.Cluster", cdsTargets[0], "1", v3adminpb.ClientResourceStatus_NACKED, clusterAnys[0], &v3adminpb.UpdateFailureState{VersionInfo: "2"}), - makeGenericXdsConfig("type.googleapis.com/envoy.config.cluster.v3.Cluster", cdsTargets[1], "2", v3adminpb.ClientResourceStatus_ACKED, clusterAnys[1], nil), - makeGenericXdsConfig("type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment", edsTargets[0], "1", v3adminpb.ClientResourceStatus_NACKED, endpointAnys[0], &v3adminpb.UpdateFailureState{VersionInfo: "2"}), - makeGenericXdsConfig("type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment", edsTargets[1], "2", v3adminpb.ClientResourceStatus_ACKED, endpointAnys[1], nil), - makeGenericXdsConfig("type.googleapis.com/envoy.config.listener.v3.Listener", ldsTargets[0], "1", v3adminpb.ClientResourceStatus_NACKED, listenerAnys[0], &v3adminpb.UpdateFailureState{VersionInfo: "2"}), - makeGenericXdsConfig("type.googleapis.com/envoy.config.listener.v3.Listener", ldsTargets[1], "2", v3adminpb.ClientResourceStatus_ACKED, listenerAnys[1], nil), - makeGenericXdsConfig("type.googleapis.com/envoy.config.route.v3.RouteConfiguration", rdsTargets[0], "1", v3adminpb.ClientResourceStatus_NACKED, routeAnys[0], &v3adminpb.UpdateFailureState{VersionInfo: "2"}), - makeGenericXdsConfig("type.googleapis.com/envoy.config.route.v3.RouteConfiguration", rdsTargets[1], "2", v3adminpb.ClientResourceStatus_ACKED, routeAnys[1], nil), + // Update the second resource with an empty ApiListener field which is + // expected to be NACK'ed by the xDS client. + listener1.ApiListener = nil + if err := mgmtServer.Update(ctx, e2e.UpdateOptions{ + NodeID: nodeID, + Listeners: []*v3listenerpb.Listener{listener0, listener1}, + SkipValidation: true, + }); err != nil { + t.Fatal(err) + } + + // Wait for the update to reach the watchers. + select { + case <-ctx.Done(): + t.Fatal("Timed out waiting for watch callback") + case onDone := <-watcher2.onDoneCh: + onDone.OnDone() } + + // Verify that the xDS client reports the first listener resource as being + // ACKed and the second listener resource as being NACKed. The version for + // the ACKed resource would be "2", while that for the NACKed resource would + // be "1". In the NACKed resource, the version which is NACKed is stored in + // the ErrorState field. wantResp = &v3statuspb.ClientStatusResponse{ Config: []*v3statuspb.ClientConfig{ { - Node: wantNode, - GenericXdsConfigs: wantConfigs, - ClientScope: xdsClient1Name, + Node: wantNode, + GenericXdsConfigs: []*v3statuspb.ClientConfig_GenericXdsConfig{ + makeGenericXdsConfig("type.googleapis.com/envoy.config.listener.v3.Listener", ldsTarget0, "2", v3adminpb.ClientResourceStatus_ACKED, listenerAny0, nil), + }, + ClientScope: xdsClient1Name, }, { - Node: wantNode, - GenericXdsConfigs: wantConfigs, - ClientScope: xdsClient2Name, + Node: wantNode, + GenericXdsConfigs: []*v3statuspb.ClientConfig_GenericXdsConfig{ + makeGenericXdsConfig("type.googleapis.com/envoy.config.listener.v3.Listener", ldsTarget1, "1", v3adminpb.ClientResourceStatus_NACKED, listenerAny1, &v3adminpb.UpdateFailureState{VersionInfo: "2"}), + }, + ClientScope: xdsClient2Name, }, }, } From 6685729e1c51a0235b13d338b957bff6fb021bb4 Mon Sep 17 00:00:00 2001 From: Easwar Swaminathan Date: Thu, 15 Aug 2024 18:49:44 +0000 Subject: [PATCH 7/7] final review comments --- xds/csds/csds_e2e_test.go | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/xds/csds/csds_e2e_test.go b/xds/csds/csds_e2e_test.go index 7bcc9115e97b..90dd265a6ce1 100644 --- a/xds/csds/csds_e2e_test.go +++ b/xds/csds/csds_e2e_test.go @@ -147,9 +147,9 @@ func (w *blockingListenerWatcher) OnResourceDoesNotExist(onDone xdsresource.Done writeOnDone(w.testCtxDone, w.onDoneCh, onDone) } -// writeOnDone attempts to writes the onDone callback on the onDone -// channel. It returns when it can successfully write to the channel or when the -// test is done, which is signalled by testCtxDone being closed. +// writeOnDone attempts to writes the onDone callback on the onDone channel. It +// returns when it can successfully write to the channel or when the test is +// done, which is signalled by testCtxDone being closed. func writeOnDone(testCtxDone <-chan struct{}, onDoneCh chan xdsresource.DoneNotifier, onDone xdsresource.DoneNotifier) { select { case <-testCtxDone: @@ -396,8 +396,10 @@ func (s) TestCSDS(t *testing.T) { // - Spins up a management server and creates two xDS clients talking to it. // - Registers one watch on each xDS client, and verifies that the CSDS // response reports resources in REQUESTED state. -// - Configures two resources on the management server such that one of them -// is expected to be NACKed by the client. Verifies that the CSDS response +// - Configures two resources on the management server and verifies that the +// CSDS response reports the resources as being in ACKED state. +// - Updates one of two resources on the management server such that it is +// expected to be NACKed by the client. Verifies that the CSDS response // contains one resource in ACKED state and one in NACKED state. // // For the above operations, the test also verifies that the client_scope field