diff --git a/balancer/rls/balancer.go b/balancer/rls/balancer.go index 3ac28271618b..dd81c962bab2 100644 --- a/balancer/rls/balancer.go +++ b/balancer/rls/balancer.go @@ -30,6 +30,7 @@ import ( "google.golang.org/grpc/balancer" "google.golang.org/grpc/connectivity" + estats "google.golang.org/grpc/experimental/stats" "google.golang.org/grpc/grpclog" "google.golang.org/grpc/internal" "google.golang.org/grpc/internal/backoff" @@ -77,6 +78,42 @@ var ( clientConnUpdateHook = func() {} dataCachePurgeHook = func() {} resetBackoffHook = func() {} + + cacheEntriesMetric = estats.RegisterInt64Gauge(estats.MetricDescriptor{ + Name: "grpc.lb.rls.cache_entries", + Description: "EXPERIMENTAL. Number of entries in the RLS cache.", + Unit: "entry", + Labels: []string{"grpc.target", "grpc.lb.rls.server_target", "grpc.lb.rls.instance_uuid"}, + Default: false, + }) + cacheSizeMetric = estats.RegisterInt64Gauge(estats.MetricDescriptor{ + Name: "grpc.lb.rls.cache_size", + Description: "EXPERIMENTAL. The current size of the RLS cache.", + Unit: "By", + Labels: []string{"grpc.target", "grpc.lb.rls.server_target", "grpc.lb.rls.instance_uuid"}, + Default: false, + }) + defaultTargetPicksMetric = estats.RegisterInt64Count(estats.MetricDescriptor{ + Name: "grpc.lb.rls.default_target_picks", + Description: "EXPERIMENTAL. Number of LB picks sent to the default target.", + Unit: "pick", + Labels: []string{"grpc.target", "grpc.lb.rls.server_target", "grpc.lb.rls.data_plane_target", "grpc.lb.pick_result"}, + Default: false, + }) + targetPicksMetric = estats.RegisterInt64Count(estats.MetricDescriptor{ + Name: "grpc.lb.rls.target_picks", + Description: "EXPERIMENTAL. Number of LB picks sent to each RLS target. Note that if the default target is also returned by the RLS server, RPCs sent to that target from the cache will be counted in this metric, not in grpc.rls.default_target_picks.", + Unit: "pick", + Labels: []string{"grpc.target", "grpc.lb.rls.server_target", "grpc.lb.rls.data_plane_target", "grpc.lb.pick_result"}, + Default: false, + }) + failedPicksMetric = estats.RegisterInt64Count(estats.MetricDescriptor{ + Name: "grpc.lb.rls.failed_picks", + Description: "EXPERIMENTAL. Number of LB picks failed due to either a failed RLS request or the RLS channel being throttled.", + Unit: "pick", + Labels: []string{"grpc.target", "grpc.lb.rls.server_target"}, + Default: false, + }) ) func init() { @@ -103,7 +140,7 @@ func (rlsBB) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer. updateCh: buffer.NewUnbounded(), } lb.logger = internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf("[rls-experimental-lb %p] ", lb)) - lb.dataCache = newDataCache(maxCacheSize, lb.logger) + lb.dataCache = newDataCache(maxCacheSize, lb.logger, opts.MetricsRecorder, opts.Target.String()) lb.bg = balancergroup.New(balancergroup.Options{ CC: cc, BuildOpts: opts, @@ -303,6 +340,7 @@ func (b *rlsBalancer) UpdateClientConnState(ccs balancer.ClientConnState) error // `resizeCache` boolean) because `cacheMu` needs to be grabbed before // `stateMu` if we are to hold both locks at the same time. b.cacheMu.Lock() + b.dataCache.updateRLSServerTarget(newCfg.lookupService) b.dataCache.resize(newCfg.cacheSizeBytes) b.cacheMu.Unlock() } @@ -490,15 +528,19 @@ func (b *rlsBalancer) sendNewPickerLocked() { if b.defaultPolicy != nil { b.defaultPolicy.acquireRef() } + picker := &rlsPicker{ - kbm: b.lbCfg.kbMap, - origEndpoint: b.bopts.Target.Endpoint(), - lb: b, - defaultPolicy: b.defaultPolicy, - ctrlCh: b.ctrlCh, - maxAge: b.lbCfg.maxAge, - staleAge: b.lbCfg.staleAge, - bg: b.bg, + kbm: b.lbCfg.kbMap, + origEndpoint: b.bopts.Target.Endpoint(), + lb: b, + defaultPolicy: b.defaultPolicy, + ctrlCh: b.ctrlCh, + maxAge: b.lbCfg.maxAge, + staleAge: b.lbCfg.staleAge, + bg: b.bg, + rlsServerTarget: b.lbCfg.lookupService, + grpcTarget: b.bopts.Target.String(), + metricsRecorder: b.bopts.MetricsRecorder, } picker.logger = internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf("[rls-picker %p] ", picker)) state := balancer.State{ diff --git a/balancer/rls/cache.go b/balancer/rls/cache.go index d7a6a1a436c6..4c267289caa9 100644 --- a/balancer/rls/cache.go +++ b/balancer/rls/cache.go @@ -22,6 +22,8 @@ import ( "container/list" "time" + "github.com/google/uuid" + estats "google.golang.org/grpc/experimental/stats" "google.golang.org/grpc/internal/backoff" internalgrpclog "google.golang.org/grpc/internal/grpclog" "google.golang.org/grpc/internal/grpcsync" @@ -163,22 +165,40 @@ func (l *lru) getLeastRecentlyUsed() cacheKey { // // It is not safe for concurrent access. type dataCache struct { - maxSize int64 // Maximum allowed size. - currentSize int64 // Current size. - keys *lru // Cache keys maintained in lru order. - entries map[cacheKey]*cacheEntry - logger *internalgrpclog.PrefixLogger - shutdown *grpcsync.Event + maxSize int64 // Maximum allowed size. + currentSize int64 // Current size. + keys *lru // Cache keys maintained in lru order. + entries map[cacheKey]*cacheEntry + logger *internalgrpclog.PrefixLogger + shutdown *grpcsync.Event + rlsServerTarget string + + // Read only after initialization. + grpcTarget string + uuid string + metricsRecorder estats.MetricsRecorder } -func newDataCache(size int64, logger *internalgrpclog.PrefixLogger) *dataCache { - return &dataCache{ - maxSize: size, - keys: newLRU(), - entries: make(map[cacheKey]*cacheEntry), - logger: logger, - shutdown: grpcsync.NewEvent(), +func newDataCache(size int64, logger *internalgrpclog.PrefixLogger, metricsRecorder estats.MetricsRecorder, grpcTarget string) *dataCache { + dc := &dataCache{ + maxSize: size, + keys: newLRU(), + entries: make(map[cacheKey]*cacheEntry), + logger: logger, + shutdown: grpcsync.NewEvent(), + grpcTarget: grpcTarget, + uuid: uuid.New().String(), + metricsRecorder: metricsRecorder, } + cacheSizeMetric.Record(dc.metricsRecorder, 0, grpcTarget, "", dc.uuid) + cacheEntriesMetric.Record(dc.metricsRecorder, 0, grpcTarget, "", dc.uuid) + return dc +} + +// updateRLSServerTarget updates the RLS Server Target the RLS Balancer is +// configured with. +func (dc *dataCache) updateRLSServerTarget(rlsServerTarget string) { + dc.rlsServerTarget = rlsServerTarget } // resize changes the maximum allowed size of the data cache. @@ -310,6 +330,9 @@ func (dc *dataCache) addEntry(key cacheKey, entry *cacheEntry) (backoffCancelled if dc.currentSize > dc.maxSize { backoffCancelled = dc.resize(dc.maxSize) } + + cacheSizeMetric.Record(dc.metricsRecorder, dc.currentSize, dc.grpcTarget, dc.rlsServerTarget, dc.uuid) + cacheEntriesMetric.Record(dc.metricsRecorder, int64(len(dc.entries)), dc.grpcTarget, dc.rlsServerTarget, dc.uuid) return backoffCancelled, true } @@ -319,6 +342,7 @@ func (dc *dataCache) updateEntrySize(entry *cacheEntry, newSize int64) { dc.currentSize -= entry.size entry.size = newSize dc.currentSize += entry.size + cacheSizeMetric.Record(dc.metricsRecorder, dc.currentSize, dc.grpcTarget, dc.rlsServerTarget, dc.uuid) } func (dc *dataCache) getEntry(key cacheKey) *cacheEntry { @@ -351,6 +375,8 @@ func (dc *dataCache) deleteAndcleanup(key cacheKey, entry *cacheEntry) { delete(dc.entries, key) dc.currentSize -= entry.size dc.keys.removeEntry(key) + cacheSizeMetric.Record(dc.metricsRecorder, dc.currentSize, dc.grpcTarget, dc.rlsServerTarget, dc.uuid) + cacheEntriesMetric.Record(dc.metricsRecorder, int64(len(dc.entries)), dc.grpcTarget, dc.rlsServerTarget, dc.uuid) } func (dc *dataCache) stop() { diff --git a/balancer/rls/cache_test.go b/balancer/rls/cache_test.go index 80185f39c929..9e6bdd50c35e 100644 --- a/balancer/rls/cache_test.go +++ b/balancer/rls/cache_test.go @@ -19,6 +19,7 @@ package rls import ( + "google.golang.org/grpc/internal/testutils/stats" "testing" "time" @@ -119,7 +120,7 @@ func (s) TestLRU_BasicOperations(t *testing.T) { func (s) TestDataCache_BasicOperations(t *testing.T) { initCacheEntries() - dc := newDataCache(5, nil) + dc := newDataCache(5, nil, &stats.NoopMetricsRecorder{}, "") for i, k := range cacheKeys { dc.addEntry(k, cacheEntries[i]) } @@ -133,7 +134,7 @@ func (s) TestDataCache_BasicOperations(t *testing.T) { func (s) TestDataCache_AddForcesResize(t *testing.T) { initCacheEntries() - dc := newDataCache(1, nil) + dc := newDataCache(1, nil, &stats.NoopMetricsRecorder{}, "") // The first entry in cacheEntries has a minimum expiry time in the future. // This entry would stop the resize operation since we do not evict entries @@ -162,7 +163,7 @@ func (s) TestDataCache_AddForcesResize(t *testing.T) { func (s) TestDataCache_Resize(t *testing.T) { initCacheEntries() - dc := newDataCache(5, nil) + dc := newDataCache(5, nil, &stats.NoopMetricsRecorder{}, "") for i, k := range cacheKeys { dc.addEntry(k, cacheEntries[i]) } @@ -193,7 +194,7 @@ func (s) TestDataCache_Resize(t *testing.T) { func (s) TestDataCache_EvictExpiredEntries(t *testing.T) { initCacheEntries() - dc := newDataCache(5, nil) + dc := newDataCache(5, nil, &stats.NoopMetricsRecorder{}, "") for i, k := range cacheKeys { dc.addEntry(k, cacheEntries[i]) } @@ -220,7 +221,7 @@ func (s) TestDataCache_ResetBackoffState(t *testing.T) { } initCacheEntries() - dc := newDataCache(5, nil) + dc := newDataCache(5, nil, &stats.NoopMetricsRecorder{}, "") for i, k := range cacheKeys { dc.addEntry(k, cacheEntries[i]) } diff --git a/balancer/rls/picker.go b/balancer/rls/picker.go index 8f617a4e42e0..0fcadbfc0e2e 100644 --- a/balancer/rls/picker.go +++ b/balancer/rls/picker.go @@ -29,6 +29,7 @@ import ( "google.golang.org/grpc/balancer/rls/internal/keys" "google.golang.org/grpc/codes" "google.golang.org/grpc/connectivity" + estats "google.golang.org/grpc/experimental/stats" internalgrpclog "google.golang.org/grpc/internal/grpclog" rlspb "google.golang.org/grpc/internal/proto/grpc_lookup_v1" "google.golang.org/grpc/metadata" @@ -61,12 +62,15 @@ type rlsPicker struct { // The picker is given its own copy of the below fields from the RLS LB policy // to avoid having to grab the mutex on the latter. - defaultPolicy *childPolicyWrapper // Child policy for the default target. - ctrlCh *controlChannel // Control channel to the RLS server. - maxAge time.Duration // Cache max age from LB config. - staleAge time.Duration // Cache stale age from LB config. - bg exitIdler - logger *internalgrpclog.PrefixLogger + rlsServerTarget string + grpcTarget string + metricsRecorder estats.MetricsRecorder + defaultPolicy *childPolicyWrapper // Child policy for the default target. + ctrlCh *controlChannel // Control channel to the RLS server. + maxAge time.Duration // Cache max age from LB config. + staleAge time.Duration // Cache stale age from LB config. + bg exitIdler + logger *internalgrpclog.PrefixLogger } // isFullMethodNameValid return true if name is of the form `/service/method`. @@ -85,7 +89,17 @@ func (p *rlsPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) { reqKeys := p.kbm.RLSKey(md, p.origEndpoint, info.FullMethodName) p.lb.cacheMu.Lock() - defer p.lb.cacheMu.Unlock() + var pr balancer.PickResult + var err error + + // Record metrics without the cache mutex held, to prevent lock contention + // between concurrent RPC's and their Pick calls. Metrics Recording can + // potentially be expensive. + metricsCallback := func() {} + defer func() { + p.lb.cacheMu.Unlock() + metricsCallback() + }() // Lookup data cache and pending request map using request path and keys. cacheKey := cacheKey{path: info.FullMethodName, keys: reqKeys.Str} @@ -98,7 +112,8 @@ func (p *rlsPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) { case dcEntry == nil && pendingEntry == nil: throttled := p.sendRouteLookupRequestLocked(cacheKey, &backoffState{bs: defaultBackoffStrategy}, reqKeys.Map, rlspb.RouteLookupRequest_REASON_MISS, "") if throttled { - return p.useDefaultPickIfPossible(info, errRLSThrottled) + pr, metricsCallback, err = p.useDefaultPickIfPossible(info, errRLSThrottled) + return pr, err } return balancer.PickResult{}, balancer.ErrNoSubConnAvailable @@ -113,8 +128,8 @@ func (p *rlsPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) { p.sendRouteLookupRequestLocked(cacheKey, dcEntry.backoffState, reqKeys.Map, rlspb.RouteLookupRequest_REASON_STALE, dcEntry.headerData) } // Delegate to child policies. - res, err := p.delegateToChildPoliciesLocked(dcEntry, info) - return res, err + pr, metricsCallback, err = p.delegateToChildPoliciesLocked(dcEntry, info) + return pr, err } // We get here only if the data cache entry has expired. If entry is in @@ -126,32 +141,50 @@ func (p *rlsPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) { // message received from the control plane is still fine, as it could be // useful for debugging purposes. st := dcEntry.status - return p.useDefaultPickIfPossible(info, status.Error(codes.Unavailable, fmt.Sprintf("most recent error from RLS server: %v", st.Error()))) + pr, metricsCallback, err = p.useDefaultPickIfPossible(info, status.Error(codes.Unavailable, fmt.Sprintf("most recent error from RLS server: %v", st.Error()))) + return pr, err } // We get here only if the entry has expired and is not in backoff. throttled := p.sendRouteLookupRequestLocked(cacheKey, dcEntry.backoffState, reqKeys.Map, rlspb.RouteLookupRequest_REASON_MISS, "") if throttled { - return p.useDefaultPickIfPossible(info, errRLSThrottled) + pr, metricsCallback, err = p.useDefaultPickIfPossible(info, errRLSThrottled) + return pr, err } return balancer.PickResult{}, balancer.ErrNoSubConnAvailable // Data cache hit. Pending request exists. default: if dcEntry.expiryTime.After(now) { - res, err := p.delegateToChildPoliciesLocked(dcEntry, info) - return res, err + pr, metricsCallback, err = p.delegateToChildPoliciesLocked(dcEntry, info) + return pr, err } // Data cache entry has expired and pending request exists. Queue pick. return balancer.PickResult{}, balancer.ErrNoSubConnAvailable } } +// errToPickResult is a helper function which converts the error value returned +// by Pick() to a string that represents the pick result. +func errToPickResult(err error) string { + if err == nil { + return "complete" + } + if errors.Is(err, balancer.ErrNoSubConnAvailable) { + return "queue" + } + if _, ok := status.FromError(err); ok { + return "drop" + } + return "fail" +} + // delegateToChildPoliciesLocked is a helper function which iterates through the // list of child policy wrappers in a cache entry and attempts to find a child // policy to which this RPC can be routed to. If all child policies are in -// TRANSIENT_FAILURE, we delegate to the last child policy arbitrarily. -func (p *rlsPicker) delegateToChildPoliciesLocked(dcEntry *cacheEntry, info balancer.PickInfo) (balancer.PickResult, error) { +// TRANSIENT_FAILURE, we delegate to the last child policy arbitrarily. Returns +// a function to be invoked to record metrics. +func (p *rlsPicker) delegateToChildPoliciesLocked(dcEntry *cacheEntry, info balancer.PickInfo) (balancer.PickResult, func(), error) { const rlsDataHeaderName = "x-google-rls-data" for i, cpw := range dcEntry.childPolicyWrappers { state := (*balancer.State)(atomic.LoadPointer(&cpw.state)) @@ -164,29 +197,54 @@ func (p *rlsPicker) delegateToChildPoliciesLocked(dcEntry *cacheEntry, info bala // X-Google-RLS-Data header. res, err := state.Picker.Pick(info) if err != nil { - return res, err + pr := errToPickResult(err) + rf := func() { + targetPicksMetric.Record(p.metricsRecorder, 1, p.grpcTarget, p.rlsServerTarget, cpw.target, pr) + } + if pr == "queue" { + // Don't record metrics for queued Picks. + rf = func() {} + } + return res, rf, err } + if res.Metadata == nil { res.Metadata = metadata.Pairs(rlsDataHeaderName, dcEntry.headerData) } else { res.Metadata.Append(rlsDataHeaderName, dcEntry.headerData) } - return res, nil + return res, func() { + targetPicksMetric.Record(p.metricsRecorder, 1, p.grpcTarget, p.rlsServerTarget, cpw.target, "complete") + }, nil } } + // In the unlikely event that we have a cache entry with no targets, we end up // queueing the RPC. - return balancer.PickResult{}, balancer.ErrNoSubConnAvailable + return balancer.PickResult{}, func() {}, balancer.ErrNoSubConnAvailable } // useDefaultPickIfPossible is a helper method which delegates to the default -// target if one is configured, or fails the pick with the given error. -func (p *rlsPicker) useDefaultPickIfPossible(info balancer.PickInfo, errOnNoDefault error) (balancer.PickResult, error) { +// target if one is configured, or fails the pick with the given error. Returns +// a function to be invoked to record metrics. +func (p *rlsPicker) useDefaultPickIfPossible(info balancer.PickInfo, errOnNoDefault error) (balancer.PickResult, func(), error) { if p.defaultPolicy != nil { state := (*balancer.State)(atomic.LoadPointer(&p.defaultPolicy.state)) - return state.Picker.Pick(info) + res, err := state.Picker.Pick(info) + pr := errToPickResult(err) + rf := func() { + defaultTargetPicksMetric.Record(p.metricsRecorder, 1, p.grpcTarget, p.rlsServerTarget, p.defaultPolicy.target, pr) + } + if pr == "queue" { + // Don't record metrics for queued Picks. + rf = func() {} + } + return res, rf, err } - return balancer.PickResult{}, errOnNoDefault + + return balancer.PickResult{}, func() { + failedPicksMetric.Record(p.metricsRecorder, 1, p.grpcTarget, p.rlsServerTarget) + }, errOnNoDefault } // sendRouteLookupRequestLocked adds an entry to the pending request map and diff --git a/internal/testutils/stats/test_metrics_recorder.go b/internal/testutils/stats/test_metrics_recorder.go index 25817be50b37..6ad225f10527 100644 --- a/internal/testutils/stats/test_metrics_recorder.go +++ b/internal/testutils/stats/test_metrics_recorder.go @@ -180,3 +180,17 @@ func (r *TestMetricsRecorder) TagConn(ctx context.Context, _ *stats.ConnTagInfo) } func (r *TestMetricsRecorder) HandleConn(context.Context, stats.ConnStats) {} + +// NoopMetricsRecorder is a noop MetricsRecorder to be used in tests to prevent +// nil panics. +type NoopMetricsRecorder struct {} + +func (r *NoopMetricsRecorder) RecordInt64Count(_ *estats.Int64CountHandle, _ int64, _ ...string) {} + +func (r *NoopMetricsRecorder) RecordFloat64Count(_ *estats.Float64CountHandle, _ float64, _ ...string) {} + +func (r *NoopMetricsRecorder) RecordInt64Histo(_ *estats.Int64HistoHandle, _ int64, _ ...string) {} + +func (r *NoopMetricsRecorder) RecordFloat64Histo(_ *estats.Float64HistoHandle, _ float64, _ ...string) {} + +func (r *NoopMetricsRecorder) RecordInt64Gauge(_ *estats.Int64GaugeHandle, _ int64, _ ...string) {}