Skip to content

Commit 5f5ffc2

Browse files
committed
Add RLS Metrics
1 parent c8716e5 commit 5f5ffc2

File tree

3 files changed

+90
-18
lines changed

3 files changed

+90
-18
lines changed

balancer/rls/balancer.go

Lines changed: 64 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,10 @@ import (
2828
"time"
2929
"unsafe"
3030

31+
"github.com/google/uuid"
3132
"google.golang.org/grpc/balancer"
3233
"google.golang.org/grpc/connectivity"
34+
estats "google.golang.org/grpc/experimental/stats"
3335
"google.golang.org/grpc/grpclog"
3436
"google.golang.org/grpc/internal"
3537
"google.golang.org/grpc/internal/backoff"
@@ -77,6 +79,42 @@ var (
7779
clientConnUpdateHook = func() {}
7880
dataCachePurgeHook = func() {}
7981
resetBackoffHook = func() {}
82+
83+
cacheEntriesMetric = estats.RegisterInt64Gauge(estats.MetricDescriptor{
84+
Name: "grpc.lb.rls.cache_entries",
85+
Description: "EXPERIMENTAL. Number of entries in the RLS cache.",
86+
Unit: "entry",
87+
Labels: []string{"grpc.target", "grpc.lb.rls.server_target", "grpc.lb.rls.instance_uuid"},
88+
Default: false,
89+
})
90+
cacheSizeMetric = estats.RegisterInt64Gauge(estats.MetricDescriptor{
91+
Name: "grpc.lb.rls.cache_size",
92+
Description: "EXPERIMENTAL. The current size of the RLS cache.",
93+
Unit: "By",
94+
Labels: []string{"grpc.target", "grpc.lb.rls.server_target", "grpc.lb.rls.instance_uuid"},
95+
Default: false,
96+
})
97+
defaultTargetPicksMetric = estats.RegisterInt64Count(estats.MetricDescriptor{
98+
Name: "grpc.lb.rls.default_target_picks",
99+
Description: "EXPERIMENTAL. Number of LB picks sent to the default target.",
100+
Unit: "pick",
101+
Labels: []string{"grpc.target", "grpc.lb.rls.server_target", "grpc.lb.rls.instance_uuid", "grpc.lb.pick_result"},
102+
Default: false,
103+
})
104+
targetPicksMetric = estats.RegisterInt64Count(estats.MetricDescriptor{
105+
Name: "grpc.lb.rls.target_picks",
106+
Description: "EXPERIMENTAL. Number of LB picks sent to each RLS target. Note that if the default target is also returned by the RLS server, RPCs sent to that target from the cache will be counted in this metric, not in grpc.rls.default_target_picks.",
107+
Unit: "pick",
108+
Labels: []string{"grpc.target", "grpc.lb.rls.server_target", "grpc.lb.rls.instance_uuid", "grpc.lb.pick_result"},
109+
Default: false,
110+
})
111+
failedPicksMetric = estats.RegisterInt64Count(estats.MetricDescriptor{
112+
Name: "grpc.lb.rls.failed_picks",
113+
Description: "EXPERIMENTAL. Number of LB picks failed due to either a failed RLS request or the RLS channel being throttled.",
114+
Unit: "pick",
115+
Labels: []string{"grpc.target", "grpc.lb.rls.server_target"},
116+
Default: false,
117+
})
80118
)
81119

82120
func init() {
@@ -95,6 +133,7 @@ func (rlsBB) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.
95133
done: grpcsync.NewEvent(),
96134
cc: cc,
97135
bopts: opts,
136+
uuid: uuid.New().String(),
98137
purgeTicker: dataCachePurgeTicker(),
99138
dataCachePurgeHook: dataCachePurgeHook,
100139
lbCfg: &lbConfig{},
@@ -122,6 +161,7 @@ type rlsBalancer struct {
122161
done *grpcsync.Event // Fires when Close() is done.
123162
cc balancer.ClientConn
124163
bopts balancer.BuildOptions
164+
uuid string
125165
purgeTicker *time.Ticker
126166
dataCachePurgeHook func()
127167
logger *internalgrpclog.PrefixLogger
@@ -240,7 +280,16 @@ func (b *rlsBalancer) purgeDataCache(doneCh chan struct{}) {
240280
case <-b.purgeTicker.C:
241281
b.cacheMu.Lock()
242282
updatePicker := b.dataCache.evictExpiredEntries()
283+
284+
b.stateMu.Lock()
285+
rlsLookupService := b.lbCfg.lookupService
286+
b.stateMu.Unlock()
287+
cacheSize := b.dataCache.currentSize
288+
cacheEntries := int64(len(b.dataCache.entries))
243289
b.cacheMu.Unlock()
290+
grpcTarget := b.bopts.Target.String()
291+
cacheSizeMetric.Record(b.bopts.MetricsRecorder, cacheSize, grpcTarget, rlsLookupService, b.uuid)
292+
cacheEntriesMetric.Record(b.bopts.MetricsRecorder, cacheEntries, grpcTarget, rlsLookupService, b.uuid)
244293
if updatePicker {
245294
b.sendNewPicker()
246295
}
@@ -304,7 +353,12 @@ func (b *rlsBalancer) UpdateClientConnState(ccs balancer.ClientConnState) error
304353
// `stateMu` if we are to hold both locks at the same time.
305354
b.cacheMu.Lock()
306355
b.dataCache.resize(newCfg.cacheSizeBytes)
356+
dataCacheSize := b.dataCache.currentSize
357+
dataCacheEntries := int64(len(b.dataCache.entries))
307358
b.cacheMu.Unlock()
359+
grpcTarget := b.bopts.Target.String()
360+
cacheSizeMetric.Record(b.bopts.MetricsRecorder, dataCacheSize, grpcTarget, newCfg.lookupService, b.uuid)
361+
cacheEntriesMetric.Record(b.bopts.MetricsRecorder, dataCacheEntries, grpcTarget, newCfg.lookupService, b.uuid)
308362
}
309363
return nil
310364
}
@@ -490,15 +544,17 @@ func (b *rlsBalancer) sendNewPickerLocked() {
490544
if b.defaultPolicy != nil {
491545
b.defaultPolicy.acquireRef()
492546
}
547+
493548
picker := &rlsPicker{
494-
kbm: b.lbCfg.kbMap,
495-
origEndpoint: b.bopts.Target.Endpoint(),
496-
lb: b,
497-
defaultPolicy: b.defaultPolicy,
498-
ctrlCh: b.ctrlCh,
499-
maxAge: b.lbCfg.maxAge,
500-
staleAge: b.lbCfg.staleAge,
501-
bg: b.bg,
549+
kbm: b.lbCfg.kbMap,
550+
origEndpoint: b.bopts.Target.Endpoint(),
551+
lb: b,
552+
defaultPolicy: b.defaultPolicy,
553+
ctrlCh: b.ctrlCh,
554+
maxAge: b.lbCfg.maxAge,
555+
staleAge: b.lbCfg.staleAge,
556+
bg: b.bg,
557+
rlsServerTarget: b.lbCfg.lookupService,
502558
}
503559
picker.logger = internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf("[rls-picker %p] ", picker))
504560
state := balancer.State{

balancer/rls/cache.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,8 @@ func (l *lru) getLeastRecentlyUsed() cacheKey {
163163
//
164164
// It is not safe for concurrent access.
165165
type dataCache struct {
166-
maxSize int64 // Maximum allowed size.
166+
maxSize int64 // Maximum allowed size.
167+
167168
currentSize int64 // Current size.
168169
keys *lru // Cache keys maintained in lru order.
169170
entries map[cacheKey]*cacheEntry

balancer/rls/picker.go

Lines changed: 24 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -61,12 +61,13 @@ type rlsPicker struct {
6161

6262
// The picker is given its own copy of the below fields from the RLS LB policy
6363
// to avoid having to grab the mutex on the latter.
64-
defaultPolicy *childPolicyWrapper // Child policy for the default target.
65-
ctrlCh *controlChannel // Control channel to the RLS server.
66-
maxAge time.Duration // Cache max age from LB config.
67-
staleAge time.Duration // Cache stale age from LB config.
68-
bg exitIdler
69-
logger *internalgrpclog.PrefixLogger
64+
rlsServerTarget string
65+
defaultPolicy *childPolicyWrapper // Child policy for the default target.
66+
ctrlCh *controlChannel // Control channel to the RLS server.
67+
maxAge time.Duration // Cache max age from LB config.
68+
staleAge time.Duration // Cache stale age from LB config.
69+
bg exitIdler
70+
logger *internalgrpclog.PrefixLogger
7071
}
7172

7273
// isFullMethodNameValid return true if name is of the form `/service/method`.
@@ -164,8 +165,11 @@ func (p *rlsPicker) delegateToChildPoliciesLocked(dcEntry *cacheEntry, info bala
164165
// X-Google-RLS-Data header.
165166
res, err := state.Picker.Pick(info)
166167
if err != nil {
168+
targetPicksMetric.Record(p.lb.bopts.MetricsRecorder, 1, p.lb.bopts.Target.String(), p.rlsServerTarget, cpw.target, "fail")
167169
return res, err
168170
}
171+
targetPicksMetric.Record(p.lb.bopts.MetricsRecorder, 1, p.lb.bopts.Target.String(), p.rlsServerTarget, cpw.target, "complete")
172+
169173
if res.Metadata == nil {
170174
res.Metadata = metadata.Pairs(rlsDataHeaderName, dcEntry.headerData)
171175
} else {
@@ -174,6 +178,8 @@ func (p *rlsPicker) delegateToChildPoliciesLocked(dcEntry *cacheEntry, info bala
174178
return res, nil
175179
}
176180
}
181+
182+
failedPicksMetric.Record(p.lb.bopts.MetricsRecorder, 1, p.lb.bopts.Target.String(), p.rlsServerTarget)
177183
// In the unlikely event that we have a cache entry with no targets, we end up
178184
// queueing the RPC.
179185
return balancer.PickResult{}, balancer.ErrNoSubConnAvailable
@@ -184,8 +190,16 @@ func (p *rlsPicker) delegateToChildPoliciesLocked(dcEntry *cacheEntry, info bala
184190
func (p *rlsPicker) useDefaultPickIfPossible(info balancer.PickInfo, errOnNoDefault error) (balancer.PickResult, error) {
185191
if p.defaultPolicy != nil {
186192
state := (*balancer.State)(atomic.LoadPointer(&p.defaultPolicy.state))
187-
return state.Picker.Pick(info)
193+
res, err := state.Picker.Pick(info)
194+
pr := "complete"
195+
if err != nil {
196+
pr = "fail"
197+
}
198+
defaultTargetPicksMetric.Record(p.lb.bopts.MetricsRecorder, 1, p.lb.bopts.Target.String(), p.rlsServerTarget, p.defaultPolicy.target, pr)
199+
return res, err
188200
}
201+
202+
failedPicksMetric.Record(p.lb.bopts.MetricsRecorder, 1, p.lb.bopts.Target.String(), p.rlsServerTarget)
189203
return balancer.PickResult{}, errOnNoDefault
190204
}
191205

@@ -218,8 +232,9 @@ func (p *rlsPicker) handleRouteLookupResponse(cacheKey cacheKey, targets []strin
218232

219233
p.lb.cacheMu.Lock()
220234
defer func() {
221-
// Pending request map entry is unconditionally deleted since the request is
222-
// no longer pending.
235+
grpcTarget := p.lb.bopts.Target.String()
236+
cacheSizeMetric.Record(p.lb.bopts.MetricsRecorder, p.lb.dataCache.currentSize, grpcTarget, p.rlsServerTarget, p.lb.uuid)
237+
cacheEntriesMetric.Record(p.lb.bopts.MetricsRecorder, int64(len(p.lb.dataCache.entries)), grpcTarget, p.rlsServerTarget, p.lb.uuid)
223238
p.logger.Infof("Removing pending request entry for key %+v", cacheKey)
224239
delete(p.lb.pendingMap, cacheKey)
225240
p.lb.sendNewPicker()

0 commit comments

Comments
 (0)