-
Notifications
You must be signed in to change notification settings - Fork 4.6k
xdsclient: fix new watcher to get both old good update and nack error (if exist) from the cache #7851
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
xdsclient: fix new watcher to get both old good update and nack error (if exist) from the cache #7851
Changes from all commits
e337e2b
9d6bd0e
9a51cf3
757053e
3570449
777e12b
9ca1ed3
65d5760
0e1bc31
742da1b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -71,22 +71,47 @@ func newListenerWatcher() *listenerWatcher { | |
| return &listenerWatcher{updateCh: testutils.NewChannel()} | ||
| } | ||
|
|
||
| func (cw *listenerWatcher) OnUpdate(update *xdsresource.ListenerResourceData, onDone xdsresource.OnDoneFunc) { | ||
| cw.updateCh.Send(listenerUpdateErrTuple{update: update.Resource}) | ||
| func (lw *listenerWatcher) OnUpdate(update *xdsresource.ListenerResourceData, onDone xdsresource.OnDoneFunc) { | ||
| lw.updateCh.Send(listenerUpdateErrTuple{update: update.Resource}) | ||
| onDone() | ||
| } | ||
|
|
||
| func (cw *listenerWatcher) OnError(err error, onDone xdsresource.OnDoneFunc) { | ||
| func (lw *listenerWatcher) OnError(err error, onDone xdsresource.OnDoneFunc) { | ||
| // When used with a go-control-plane management server that continuously | ||
| // resends resources which are NACKed by the xDS client, using a `Replace()` | ||
| // here and in OnResourceDoesNotExist() simplifies tests which will have | ||
| // access to the most recently received error. | ||
| cw.updateCh.Replace(listenerUpdateErrTuple{err: err}) | ||
| lw.updateCh.Replace(listenerUpdateErrTuple{err: err}) | ||
| onDone() | ||
| } | ||
|
|
||
| func (cw *listenerWatcher) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) { | ||
| cw.updateCh.Replace(listenerUpdateErrTuple{err: xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "Listener not found in received response")}) | ||
| func (lw *listenerWatcher) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) { | ||
| lw.updateCh.Replace(listenerUpdateErrTuple{err: xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "Listener not found in received response")}) | ||
| onDone() | ||
| } | ||
|
|
||
| type listenerWatcherMultiple struct { | ||
| updateCh *testutils.Channel | ||
| } | ||
|
|
||
| // TODO: delete this once `newListenerWatcher` is modified to handle multiple | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please link the issue here.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||
| // updates (https://github.com/grpc/grpc-go/issues/7864). | ||
| func newListenerWatcherMultiple(size int) *listenerWatcherMultiple { | ||
| return &listenerWatcherMultiple{updateCh: testutils.NewChannelWithSize(size)} | ||
| } | ||
|
|
||
| func (lw *listenerWatcherMultiple) OnUpdate(update *xdsresource.ListenerResourceData, onDone xdsresource.OnDoneFunc) { | ||
| lw.updateCh.Send(listenerUpdateErrTuple{update: update.Resource}) | ||
| onDone() | ||
| } | ||
|
|
||
| func (lw *listenerWatcherMultiple) OnError(err error, onDone xdsresource.OnDoneFunc) { | ||
| lw.updateCh.Send(listenerUpdateErrTuple{err: err}) | ||
| onDone() | ||
| } | ||
|
|
||
| func (lw *listenerWatcherMultiple) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) { | ||
| lw.updateCh.Send(listenerUpdateErrTuple{err: xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "Listener not found in received response")}) | ||
| onDone() | ||
| } | ||
|
|
||
|
|
@@ -155,6 +180,18 @@ func verifyListenerUpdate(ctx context.Context, updateCh *testutils.Channel, want | |
| return nil | ||
| } | ||
|
|
||
| func verifyUnknownListenerError(ctx context.Context, updateCh *testutils.Channel, wantErr string) error { | ||
| u, err := updateCh.Receive(ctx) | ||
| if err != nil { | ||
| return fmt.Errorf("timeout when waiting for a listener error from the management server: %v", err) | ||
| } | ||
| gotErr := u.(listenerUpdateErrTuple).err | ||
| if gotErr == nil || !strings.Contains(gotErr.Error(), wantErr) { | ||
| return fmt.Errorf("update received with error: %v, want %q", gotErr, wantErr) | ||
| } | ||
| return nil | ||
| } | ||
|
|
||
| // TestLDSWatch covers the case where a single watcher exists for a single | ||
| // listener resource. The test verifies the following scenarios: | ||
| // 1. An update from the management server containing the resource being | ||
|
|
@@ -953,8 +990,9 @@ func (s) TestLDSWatch_NewWatcherForRemovedResource(t *testing.T) { | |
| } | ||
|
|
||
| // TestLDSWatch_NACKError covers the case where an update from the management | ||
| // server is NACK'ed by the xdsclient. The test verifies that the error is | ||
| // propagated to the watcher. | ||
| // server is NACKed by the xdsclient. The test verifies that the error is | ||
| // propagated to the existing watcher. After NACK, if a new watcher registers | ||
| // for the resource, error is propagated to the new watcher as well. | ||
| func (s) TestLDSWatch_NACKError(t *testing.T) { | ||
| mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{}) | ||
|
|
||
|
|
@@ -992,19 +1030,141 @@ func (s) TestLDSWatch_NACKError(t *testing.T) { | |
| } | ||
|
|
||
| // Verify that the expected error is propagated to the watcher. | ||
| u, err := lw.updateCh.Receive(ctx) | ||
| // Verify that the expected error is propagated to the existing watcher. | ||
| if err := verifyUnknownListenerError(ctx, lw.updateCh, wantListenerNACKErr); err != nil { | ||
| t.Fatal(err) | ||
| } | ||
|
|
||
| // Verify that the expected error is propagated to the new watcher as well. | ||
| lw2 := newListenerWatcher() | ||
| ldsCancel2 := xdsresource.WatchListener(client, ldsName, lw2) | ||
| defer ldsCancel2() | ||
| // Verify that the expected error is propagated to the existing watcher. | ||
| if err := verifyUnknownListenerError(ctx, lw2.updateCh, wantListenerNACKErr); err != nil { | ||
| t.Fatal(err) | ||
| } | ||
| } | ||
|
|
||
| // TestLDSWatch_ResourceCaching_WithNACKError covers the case where a watch is | ||
| // registered for a resource which is already present in the cache with an old | ||
| // good update as well as latest NACK error. The test verifies that new watcher | ||
| // receives both good update and error without a new resource request being | ||
| // sent to the management server. | ||
easwars marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| func TestLDSWatch_ResourceCaching_NACKError(t *testing.T) { | ||
| firstRequestReceived := false | ||
| firstAckReceived := grpcsync.NewEvent() | ||
| secondAckReceived := grpcsync.NewEvent() | ||
| secondRequestReceived := grpcsync.NewEvent() | ||
|
|
||
| mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{ | ||
| OnStreamRequest: func(id int64, req *v3discoverypb.DiscoveryRequest) error { | ||
| // The first request has an empty version string. | ||
| if !firstRequestReceived && req.GetVersionInfo() == "" { | ||
| firstRequestReceived = true | ||
| return nil | ||
| } | ||
| // The first ack has a non-empty version string. | ||
| if !firstAckReceived.HasFired() && req.GetVersionInfo() != "" { | ||
| firstAckReceived.Fire() | ||
| return nil | ||
| } | ||
| // The second ack has a non-empty version string. | ||
| if !secondAckReceived.HasFired() && req.GetVersionInfo() != "" { | ||
| secondAckReceived.Fire() | ||
| return nil | ||
| } | ||
| // Any requests after the first request and two acks, are not expected. | ||
| secondRequestReceived.Fire() | ||
| return nil | ||
| }, | ||
| }) | ||
|
|
||
| nodeID := uuid.New().String() | ||
| bc := e2e.DefaultBootstrapContents(t, nodeID, mgmtServer.Address) | ||
|
|
||
| // Create an xDS client with the above bootstrap contents. | ||
| client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{ | ||
| Name: t.Name(), | ||
| Contents: bc, | ||
| }) | ||
| if err != nil { | ||
| t.Fatalf("timeout when waiting for a listener resource from the management server: %v", err) | ||
| t.Fatalf("Failed to create xDS client: %v", err) | ||
| } | ||
| gotErr := u.(listenerUpdateErrTuple).err | ||
| if gotErr == nil || !strings.Contains(gotErr.Error(), wantListenerNACKErr) { | ||
| t.Fatalf("update received with error: %v, want %q", gotErr, wantListenerNACKErr) | ||
| defer close() | ||
|
|
||
| // Register a watch for a listener resource and have the watch | ||
| // callback push the received update on to a channel. | ||
| lw1 := newListenerWatcher() | ||
| ldsCancel1 := xdsresource.WatchListener(client, ldsName, lw1) | ||
| defer ldsCancel1() | ||
|
|
||
| // Configure the management server to return a single listener | ||
easwars marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| // resource, corresponding to the one we registered a watch for. | ||
| resources := e2e.UpdateOptions{ | ||
| NodeID: nodeID, | ||
| Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(ldsName, rdsName)}, | ||
| SkipValidation: true, | ||
| } | ||
| ctx, cancel := context.WithTimeout(context.Background(), 1000*defaultTestTimeout) | ||
| defer cancel() | ||
| if err := mgmtServer.Update(ctx, resources); err != nil { | ||
| t.Fatalf("Failed to update management server with resources: %v, err: %v", resources, err) | ||
| } | ||
|
|
||
| // Verify the contents of the received update. | ||
easwars marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| wantUpdate := listenerUpdateErrTuple{ | ||
| update: xdsresource.ListenerUpdate{ | ||
| RouteConfigName: rdsName, | ||
| HTTPFilters: []xdsresource.HTTPFilter{{Name: "router"}}, | ||
| }, | ||
| } | ||
| if err := verifyListenerUpdate(ctx, lw1.updateCh, wantUpdate); err != nil { | ||
| t.Fatal(err) | ||
| } | ||
|
|
||
| // Configure the management server to return a single listener resource | ||
| // which is expected to be NACKed by the client. | ||
| resources = e2e.UpdateOptions{ | ||
| NodeID: nodeID, | ||
| Listeners: []*v3listenerpb.Listener{badListenerResource(t, ldsName)}, | ||
| SkipValidation: true, | ||
| } | ||
| if err := mgmtServer.Update(ctx, resources); err != nil { | ||
| t.Fatalf("Failed to update management server with resources: %v, err: %v", resources, err) | ||
| } | ||
|
|
||
| // Verify that the expected error is propagated to the existing watcher. | ||
| if err := verifyUnknownListenerError(ctx, lw1.updateCh, wantListenerNACKErr); err != nil { | ||
| t.Fatal(err) | ||
| } | ||
|
|
||
| // Register another watch for the same resource. This should get the update | ||
| // and error from the cache. | ||
| lw2 := newListenerWatcherMultiple(2) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why do we need this new listener watcher type? Why can't we handle this case with the existing
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The current
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah the way i used existing listenerWatcher struct, it was missing resource update during race test. I have added the separate struct back for handling variable size update channel and the race went away. Didn't get a chance to fully debug why it was happening. But may be its fine to have separate struct to hold multiple updates?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think the correct way would be to change the I wanted to do this change when I was working on some of the refactors recently, but never got around to doing that. I would recommend making that change in a separate PR though. What do you think?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. oh yeah i think i can send a separate PR for that. The idea of having 3 channels for each callback is a good idea. Should this fix be blocked for that though?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'd be Ok if we create an issue for the same and add a TODO in here to remove this new listener watcher type once that issue is taken care of.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Filed an issue #7864. It should be simple as well. Added TODO for the new watcher in this PR. |
||
| ldsCancel2 := xdsresource.WatchListener(client, ldsName, lw2) | ||
| defer ldsCancel2() | ||
| if err := verifyListenerUpdate(ctx, lw2.updateCh, wantUpdate); err != nil { | ||
| t.Fatal(err) | ||
| } | ||
| // Verify that the expected error is propagated to the existing watcher. | ||
| if err := verifyUnknownListenerError(ctx, lw2.updateCh, wantListenerNACKErr); err != nil { | ||
| t.Fatal(err) | ||
| } | ||
|
|
||
| // No request should get sent out as part of this watch. | ||
| sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout) | ||
| defer sCancel() | ||
| select { | ||
| case <-sCtx.Done(): | ||
| case <-secondRequestReceived.Done(): | ||
| t.Fatal("xdsClient sent out request instead of using update from cache") | ||
| default: | ||
| } | ||
| } | ||
|
|
||
| // TestLDSWatch_PartialValid covers the case where a response from the | ||
| // management server contains both valid and invalid resources and is expected | ||
| // to be NACK'ed by the xdsclient. The test verifies that watchers corresponding | ||
| // to be NACKed by the xdsclient. The test verifies that watchers corresponding | ||
| // to the valid resource receive the update, while watchers corresponding to the | ||
| // invalid resource receive an error. | ||
| func (s) TestLDSWatch_PartialValid(t *testing.T) { | ||
|
|
@@ -1071,13 +1231,9 @@ func (s) TestLDSWatch_PartialValid(t *testing.T) { | |
|
|
||
| // Verify that the expected error is propagated to the watcher which | ||
| // requested for the bad resource. | ||
| u, err := lw1.updateCh.Receive(ctx) | ||
| if err != nil { | ||
| t.Fatalf("timeout when waiting for a listener resource from the management server: %v", err) | ||
| } | ||
| gotErr := u.(listenerUpdateErrTuple).err | ||
| if gotErr == nil || !strings.Contains(gotErr.Error(), wantListenerNACKErr) { | ||
| t.Fatalf("update received with error: %v, want %q", gotErr, wantListenerNACKErr) | ||
| // Verify that the expected error is propagated to the existing watcher. | ||
| if err := verifyUnknownListenerError(ctx, lw1.updateCh, wantListenerNACKErr); err != nil { | ||
| t.Fatal(err) | ||
| } | ||
|
|
||
| // Verify that the watcher watching the good resource receives a good | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.