Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 9 additions & 5 deletions internal/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,6 @@ var (
// This is used in the 1.0 release of gcp/observability, and thus must not be
// deleted or changed.
CanonicalString any // func (codes.Code) string
// DrainServerTransports initiates a graceful close of existing connections
// on a gRPC server accepted on the provided listener address. An
// xDS-enabled server invokes this method on a grpc.Server when a particular
// listener moves to "not-serving" mode.
DrainServerTransports any // func(*grpc.Server, string)
// IsRegisteredMethod returns whether the passed in method is registered as
// a method on the server.
IsRegisteredMethod any // func(*grpc.Server, string) bool
Expand Down Expand Up @@ -188,6 +183,15 @@ var (
ExitIdleModeForTesting any // func(*grpc.ClientConn) error

ChannelzTurnOffForTesting func()

// TriggerXDSResourceNameNotFoundForTesting triggers the resource-not-found
// error for a given resource type and name. This is usually triggered when
// the associated watch timer fires. For testing purposes, having this
// function makes events more predictable than relying on timer events.
TriggerXDSResourceNameNotFoundForTesting any // func(func(xdsresource.Type, string), string, string) error
// TriggerXDSResourceNotFoundClient invokes the testing xDS Client singleton
// to invoke resource not found for a resource type name and resource name.
TriggerXDSResourceNameNotFoundClient any // func(string, string) error
)

// HealthChecker defines the signature of the client-side LB channel health checking function.
Expand Down
121 changes: 113 additions & 8 deletions internal/testutils/xds/e2e/clientresources.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,72 @@ func marshalAny(m proto.Message) *anypb.Any {
return a
}

// filterChainWontMatch returns a filter chain that won't match if running the
// test locally.
func filterChainWontMatch(routeName string, addressPrefix string, srcPorts []uint32) *v3listenerpb.FilterChain {
hcm := &v3httppb.HttpConnectionManager{
RouteSpecifier: &v3httppb.HttpConnectionManager_Rds{
Rds: &v3httppb.Rds{
ConfigSource: &v3corepb.ConfigSource{
ConfigSourceSpecifier: &v3corepb.ConfigSource_Ads{Ads: &v3corepb.AggregatedConfigSource{}},
},
RouteConfigName: routeName,
},
},
HttpFilters: []*v3httppb.HttpFilter{RouterHTTPFilter},
}
return &v3listenerpb.FilterChain{
Name: routeName + "-wont-match",
FilterChainMatch: &v3listenerpb.FilterChainMatch{
PrefixRanges: []*v3corepb.CidrRange{
{
AddressPrefix: addressPrefix,
PrefixLen: &wrapperspb.UInt32Value{
Value: uint32(0),
},
},
},
SourceType: v3listenerpb.FilterChainMatch_SAME_IP_OR_LOOPBACK,
SourcePorts: srcPorts,
SourcePrefixRanges: []*v3corepb.CidrRange{
{
AddressPrefix: addressPrefix,
PrefixLen: &wrapperspb.UInt32Value{
Value: uint32(0),
},
},
},
},
Filters: []*v3listenerpb.Filter{
{
Name: "filter-1",
ConfigType: &v3listenerpb.Filter_TypedConfig{TypedConfig: marshalAny(hcm)},
},
},
}
}

// ListenerResourceThreeRouteResources returns a listener resource that points
// to three route configurations. Only the filter chain that points to the first
// route config can be matched to.
func ListenerResourceThreeRouteResources(host string, port uint32, secLevel SecurityLevel, routeName string) *v3listenerpb.Listener {
lis := defaultServerListenerCommon(host, port, secLevel, routeName, false)
lis.FilterChains = append(lis.FilterChains, filterChainWontMatch("routeName2", "1.1.1.1", []uint32{1}))
lis.FilterChains = append(lis.FilterChains, filterChainWontMatch("routeName3", "2.2.2.2", []uint32{2}))
return lis
}

// ListenerResourceFallbackToDefault returns a listener resource that contains a
// filter chain that will never get chosen to process traffic and a default
// filter chain. The default filter chain points to routeName2.
func ListenerResourceFallbackToDefault(host string, port uint32, secLevel SecurityLevel) *v3listenerpb.Listener {
lis := defaultServerListenerCommon(host, port, secLevel, "", false)
lis.FilterChains = nil
lis.FilterChains = append(lis.FilterChains, filterChainWontMatch("routeName", "1.1.1.1", []uint32{1}))
lis.DefaultFilterChain = filterChainWontMatch("routeName2", "2.2.2.2", []uint32{2})
return lis
}

// DefaultServerListener returns a basic xds Listener resource to be used on the
// server side. The returned Listener resource contains an inline route
// configuration with the name of routeName.
Expand Down Expand Up @@ -290,13 +356,6 @@ func defaultServerListenerCommon(host string, port uint32, secLevel SecurityLeve
}
}

// DefaultServerListenerWithRouteConfigName returns a basic xds Listener
// resource to be used on the server side. The returned Listener resource
// contains a RouteCongiguration resource name that needs to be resolved.
func DefaultServerListenerWithRouteConfigName(host string, port uint32, secLevel SecurityLevel, routeName string) *v3listenerpb.Listener {
return defaultServerListenerCommon(host, port, secLevel, routeName, false)
}

// HTTPFilter constructs an xds HttpFilter with the provided name and config.
func HTTPFilter(name string, config proto.Message) *v3httppb.HttpFilter {
return &v3httppb.HttpFilter{
Expand Down Expand Up @@ -356,7 +415,6 @@ type RouteConfigOptions struct {
ListenerName string
// ClusterSpecifierType determines the cluster specifier type.
ClusterSpecifierType RouteConfigClusterSpecifierType

// ClusterName is name of the cluster resource used when the cluster
// specifier type is set to RouteConfigClusterSpecifierTypeCluster.
//
Expand Down Expand Up @@ -722,3 +780,50 @@ func EndpointResourceWithOptions(opts EndpointOptions) *v3endpointpb.ClusterLoad
}
return cla
}

// DefaultServerListenerWithRouteConfigName returns a basic xds Listener
// resource to be used on the server side. The returned Listener resource
// contains a RouteCongiguration resource name that needs to be resolved.
func DefaultServerListenerWithRouteConfigName(host string, port uint32, secLevel SecurityLevel, routeName string) *v3listenerpb.Listener {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Existing function DefaultServerListener calls defaultServerListenerCommon whose last parameter indicates whether the route configuration is to be inlined or not? Can the implementation of this function be replaced with a one-liner return defaultServerListenerCommon(host, port, secLevel, routeName, false)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, good catch. This was left over from when I wrote the e2e test to trigger failure.

return defaultServerListenerCommon(host, port, secLevel, routeName, false)
}

// RouteConfigNonForwardingAction returns an xDS RouteConfig resource which
// specifies to route to a route specfying non forwarding action. This is
// intended to be used on the server side for RDS requests, and corresponds to
// the inline route configuration in DefaultServerListener.
func RouteConfigNonForwardingAction(routeName string) *v3routepb.RouteConfiguration {
return &v3routepb.RouteConfiguration{
Name: routeName,
VirtualHosts: []*v3routepb.VirtualHost{{
// This "*" string matches on any incoming authority. This is to ensure any
// incoming RPC matches to Route_NonForwardingAction and will proceed as
// normal.
Domains: []string{"*"},
Routes: []*v3routepb.Route{{
Match: &v3routepb.RouteMatch{
PathSpecifier: &v3routepb.RouteMatch_Prefix{Prefix: "/"},
},
Action: &v3routepb.Route_NonForwardingAction{},
}}}}}
}

// RouteConfigFilterAction returns an xDS RouteConfig resource which specifies
// to route to a route specifying route filter action. Since this is not type
// non forwarding action, this should fail requests that match to this server
// side.
func RouteConfigFilterAction(routeName string) *v3routepb.RouteConfiguration {
return &v3routepb.RouteConfiguration{
Name: routeName,
VirtualHosts: []*v3routepb.VirtualHost{{
// This "*" string matches on any incoming authority. This is to
// ensure any incoming RPC matches to Route_Route and will fail with
// UNAVAILABLE.
Domains: []string{"*"},
Routes: []*v3routepb.Route{{
Match: &v3routepb.RouteMatch{
PathSpecifier: &v3routepb.RouteMatch_Prefix{Prefix: "/"},
},
Action: &v3routepb.Route_FilterAction{},
}}}}}
}
5 changes: 5 additions & 0 deletions internal/transport/http2_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1288,6 +1288,11 @@ func (t *http2Server) closeStream(s *Stream, rst bool, rstCode http2.ErrCode, eo
})
}

// CallbackConn is a conn with a callback function.
type CallbackConn interface {
Callback(ServerTransport)
}
Comment on lines +1291 to +1294
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See: go/go-style/decisions#interfaces

Go interfaces generally belong in the package that consumes values of the interface type, 
not a package that implements the interface type.

Why does this interface have to be defined in the internal/transport package? I don't see this package consuming this interface or implementing it.

Also, this interface is utterly underspecified. It does not explain any of the following:

  • when is this method called?
  • is it called only ever once or can it called multiple times?
  • who calls this method?
  • who implements the interface?
  • what are implementations supposed to do in this callback?
  • do implementations have to be non-blocking in this callback?

From the PR description:

server: Call a callback with the server transport once it’s created on the Conn.
This gives access to the server transport to xDS layer, which will be gracefully
closed on transitions into not serving and transitions to a new LDS configuration.
It also guarantees at some point the server transport will be gracefully drained.
This replaces the Drain() operation previously present.

I don't see the problem with the existing approach of draining server transports that this new method overcomes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method guarantees synchornization between the Conns closing, and a new Conn being accepted. Previously, there was no way in the case you yielded thread after an Accept and synchronized with the possibility of a new Conn being added to the map, so it races. This guarantees synchronization between those components. Will document this further. (You don't have the server transport object until it wraps the accepted conn with the http2_server, so we were doing it wrong). Previously, the server would drain all the conns.


func (t *http2Server) Drain(debugData string) {
t.mu.Lock()
defer t.mu.Unlock()
Expand Down
16 changes: 4 additions & 12 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,6 @@ func init() {
return srv.isRegisteredMethod(method)
}
internal.ServerFromContext = serverFromContext
internal.DrainServerTransports = func(srv *Server, addr string) {
srv.drainServerTransports(addr)
}
internal.AddGlobalServerOptions = func(opt ...ServerOption) {
globalServerOptions = append(globalServerOptions, opt...)
}
Expand Down Expand Up @@ -932,6 +929,10 @@ func (s *Server) handleRawConn(lisAddr string, rawConn net.Conn) {
return
}

if cc, ok := rawConn.(transport.CallbackConn); ok {
cc.Callback(st)
}

if !s.addConn(lisAddr, st) {
return
}
Expand All @@ -941,15 +942,6 @@ func (s *Server) handleRawConn(lisAddr string, rawConn net.Conn) {
}()
}

func (s *Server) drainServerTransports(addr string) {
s.mu.Lock()
conns := s.conns[addr]
for st := range conns {
st.Drain("")
}
s.mu.Unlock()
}

// newHTTP2Transport sets up a http/2 transport (using the
// gRPC http2 server transport in transport/http2_server.go).
func (s *Server) newHTTP2Transport(c net.Conn) transport.ServerTransport {
Expand Down
2 changes: 1 addition & 1 deletion test/xds/xds_client_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func Test(t *testing.T) {

const (
defaultTestTimeout = 10 * time.Second
defaultTestShortTimeout = 10 * time.Millisecond // For events expected to *not* happen.
defaultTestShortTimeout = 10 * time.Millisecond // For events expdefaultTected to *not* happen.
)

func (s) TestClientSideXDS(t *testing.T) {
Expand Down
16 changes: 2 additions & 14 deletions test/xds/xds_server_certificate_providers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,12 @@ import (

"github.com/google/uuid"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials/insecure"
xdscreds "google.golang.org/grpc/credentials/xds"
"google.golang.org/grpc/internal/testutils"
"google.golang.org/grpc/internal/testutils/xds/bootstrap"
"google.golang.org/grpc/internal/testutils/xds/e2e"
"google.golang.org/grpc/status"
"google.golang.org/grpc/xds"
"google.golang.org/protobuf/types/known/wrapperspb"

Expand Down Expand Up @@ -233,12 +231,7 @@ func (s) TestServerSideXDS_WithNoCertificateProvidersInBootstrap_Failure(t *test
}
defer cc.Close()

client := testgrpc.NewTestServiceClient(cc)
sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
defer sCancel()
if _, err := client.EmptyCall(sCtx, &testpb.Empty{}); status.Code(err) != codes.DeadlineExceeded {
t.Fatalf("EmptyCall() failed: %v, wantCode: %s", err, codes.DeadlineExceeded)
}
waitForFailedRPCWithStatusCode(ctx, t, cc, errAcceptAndClose...)
}

// Tests the case where the bootstrap configuration contains one certificate
Expand Down Expand Up @@ -484,10 +477,5 @@ func (s) TestServerSideXDS_WithValidAndInvalidSecurityConfiguration(t *testing.T
}
defer cc2.Close()

client2 := testgrpc.NewTestServiceClient(cc2)
sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
defer sCancel()
if _, err := client2.EmptyCall(sCtx, &testpb.Empty{}); status.Code(err) != codes.DeadlineExceeded {
t.Fatalf("EmptyCall() failed: %v, wantCode: %s", err, codes.DeadlineExceeded)
}
waitForFailedRPCWithStatusCode(ctx, t, cc2, errAcceptAndClose...)
}
10 changes: 10 additions & 0 deletions test/xds/xds_server_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package xds_test
import (
"context"
"fmt"
"io"
"net"
"strconv"
"testing"
Expand Down Expand Up @@ -50,6 +51,15 @@ func (*testService) UnaryCall(context.Context, *testpb.SimpleRequest) (*testpb.S
return &testpb.SimpleResponse{}, nil
}

func (*testService) FullDuplexCall(stream testgrpc.TestService_FullDuplexCallServer) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This testService type is a complete no-op test service implementation. This could be replaced with a call to stubserver.StartTestService in most cases where the streaming RPC is not required. And in the odd case where the streaming RPC is required, the test could instantiate a stubserver and fill in this implementation for FullDuplexCall.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I don't get what you're saying here.

for {
_, err := stream.Recv() // hangs here forever if stream doesn't shut down...doesn't receive EOF without any errors
if err == io.EOF {
return nil
}
}
}

func testModeChangeServerOption(t *testing.T) grpc.ServerOption {
// Create a server option to get notified about serving mode changes. We don't
// do anything other than throwing a log entry here. But this is required,
Expand Down
2 changes: 1 addition & 1 deletion test/xds/xds_server_serving_mode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,7 @@ func waitForFailedRPC(ctx context.Context, t *testing.T, cc *grpc.ClientConn) {
return
}

ticker := time.NewTimer(1 * time.Second)
ticker := time.NewTicker(10 * time.Millisecond)
defer ticker.Stop()
for {
select {
Expand Down
Loading