Skip to content

Commit c007e8a

Browse files
committed
xds/internal/xdsclient/test: new watcher resource caching behavior
1 parent 66385b2 commit c007e8a

File tree

1 file changed

+122
-0
lines changed

1 file changed

+122
-0
lines changed

xds/internal/xdsclient/tests/lds_watchers_test.go

Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -921,6 +921,128 @@ func (s) TestLDSWatch_NACKError(t *testing.T) {
921921
}
922922
}
923923

924+
// TestLDSWatch_ResourceCaching_WithNACKError covers the case where a watch is
925+
// registered for a resource which is already present in the cache with an old
926+
// good update and latest NACK error. The test verifies that new watcher
927+
// receives both good update and error without request being sent to the
928+
// management server.
929+
func (s) TestLDSWatch_ResourceCaching_NACKError(t *testing.T) {
930+
firstRequestReceived := false
931+
firstAckReceived := grpcsync.NewEvent()
932+
secondRequestReceived := grpcsync.NewEvent()
933+
934+
mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{
935+
OnStreamRequest: func(id int64, req *v3discoverypb.DiscoveryRequest) error {
936+
// The first request has an empty version string.
937+
if !firstRequestReceived && req.GetVersionInfo() == "" {
938+
firstRequestReceived = true
939+
return nil
940+
}
941+
// The first ack has a non-empty version string.
942+
if !firstAckReceived.HasFired() && req.GetVersionInfo() != "" {
943+
firstAckReceived.Fire()
944+
return nil
945+
}
946+
// Any requests after the first request and ack, are not expected.
947+
secondRequestReceived.Fire()
948+
return nil
949+
},
950+
})
951+
952+
nodeID := uuid.New().String()
953+
bc := e2e.DefaultBootstrapContents(t, nodeID, mgmtServer.Address)
954+
testutils.CreateBootstrapFileForTesting(t, bc)
955+
956+
// Create an xDS client with the above bootstrap contents.
957+
client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{
958+
Name: t.Name(),
959+
Contents: bc,
960+
})
961+
if err != nil {
962+
t.Fatalf("Failed to create xDS client: %v", err)
963+
}
964+
defer close()
965+
966+
// Register a watch for a listener resource and have the watch
967+
// callback push the received update on to a channel.
968+
lw1 := newListenerWatcher()
969+
ldsCancel1 := xdsresource.WatchListener(client, ldsName, lw1)
970+
defer ldsCancel1()
971+
// Configure the management server to return a single listener
972+
// resource, corresponding to the one we registered a watch for.
973+
resources := e2e.UpdateOptions{
974+
NodeID: nodeID,
975+
Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(ldsName, rdsName)},
976+
SkipValidation: true,
977+
}
978+
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
979+
defer cancel()
980+
if err := mgmtServer.Update(ctx, resources); err != nil {
981+
t.Fatalf("Failed to update management server with resources: %v, err: %v", resources, err)
982+
}
983+
// Verify the contents of the received update.
984+
wantUpdate := listenerUpdateErrTuple{
985+
update: xdsresource.ListenerUpdate{
986+
RouteConfigName: rdsName,
987+
HTTPFilters: []xdsresource.HTTPFilter{{Name: "router"}},
988+
},
989+
}
990+
if err := verifyListenerUpdate(ctx, lw1.updateCh, wantUpdate); err != nil {
991+
t.Fatal(err)
992+
}
993+
select {
994+
case <-ctx.Done():
995+
t.Fatal("timeout when waiting for receipt of ACK at the management server")
996+
case <-firstAckReceived.Done():
997+
}
998+
999+
// Configure the management server to return a single listener resource
1000+
// which is expected to be NACKed by the client.
1001+
resources = e2e.UpdateOptions{
1002+
NodeID: nodeID,
1003+
Listeners: []*v3listenerpb.Listener{badListenerResource(t, ldsName)},
1004+
SkipValidation: true,
1005+
}
1006+
if err := mgmtServer.Update(ctx, resources); err != nil {
1007+
t.Fatalf("Failed to update management server with resources: %v, err: %v", resources, err)
1008+
}
1009+
// Verify that the expected error is propagated to the existing watcher.
1010+
u, err := lw1.updateCh.Receive(ctx)
1011+
if err != nil {
1012+
t.Fatalf("timeout when waiting for a listener resource from the management server: %v", err)
1013+
}
1014+
gotErr := u.(listenerUpdateErrTuple).err
1015+
if gotErr == nil || !strings.Contains(gotErr.Error(), wantListenerNACKErr) {
1016+
t.Fatalf("update received with error: %v, want %q", gotErr, wantListenerNACKErr)
1017+
}
1018+
1019+
// Register another watch for the same resource. This should get the update
1020+
// and error from the cache.
1021+
lw2 := newListenerWatcher()
1022+
ldsCancel2 := xdsresource.WatchListener(client, ldsName, lw2)
1023+
defer ldsCancel2()
1024+
if err := verifyListenerUpdate(ctx, lw2.updateCh, wantUpdate); err != nil {
1025+
t.Fatal(err)
1026+
}
1027+
u, err = lw2.updateCh.Receive(ctx)
1028+
if err != nil {
1029+
t.Fatalf("timeout when waiting for a listener resource from the management server: %v", err)
1030+
}
1031+
gotErr = u.(listenerUpdateErrTuple).err
1032+
if gotErr == nil || !strings.Contains(gotErr.Error(), wantListenerNACKErr) {
1033+
t.Fatalf("update received with error: %v, want %q", gotErr, wantListenerNACKErr)
1034+
}
1035+
1036+
// No request should get sent out as part of this watch.
1037+
sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
1038+
defer sCancel()
1039+
select {
1040+
case <-sCtx.Done():
1041+
case <-secondRequestReceived.Done():
1042+
t.Fatal("xdsClient sent out request instead of using update from cache")
1043+
}
1044+
}
1045+
9241046
// TestLDSWatch_PartialValid covers the case where a response from the
9251047
// management server contains both valid and invalid resources and is expected
9261048
// to be NACK'ed by the xdsclient. The test verifies that watchers corresponding

0 commit comments

Comments
 (0)