Skip to content

Commit fa5fee5

Browse files
authored
feat(shard-distributor): Add canary pinger for periodic shard ownership verification (#7487)
Depends on #7475 and #7478 being merged **What changed?** Added canary pinger component that periodically sends ping requests to shard owners to verify executor-to-executor communication and shard ownership. **Why?** The canary pinger provides active monitoring of the shard distributor's routing and ownership mechanisms by: - Periodically selecting random shards and pinging their owners - Verifying that the pinged executor owns the shard - Detecting communication failures between executors This is part of the canary ping/pong implementation that validates end-to-end executor-to-executor gRPC communication. **How did you test it?** Unit tests **Potential risks** **Release notes** **Documentation Changes** --------- Signed-off-by: Jakob Haahr Taankvist <[email protected]>
1 parent 956a742 commit fa5fee5

File tree

3 files changed

+279
-0
lines changed

3 files changed

+279
-0
lines changed

service/sharddistributor/canary/pinger/canary_client_mock.go

Lines changed: 64 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.
Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
package pinger
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"math/rand"
7+
"sync"
8+
"time"
9+
10+
"go.uber.org/fx"
11+
"go.uber.org/yarpc"
12+
"go.uber.org/zap"
13+
14+
sharddistributorv1 "github.com/uber/cadence/.gen/proto/sharddistributor/v1"
15+
"github.com/uber/cadence/common/backoff"
16+
"github.com/uber/cadence/common/clock"
17+
"github.com/uber/cadence/service/sharddistributor/client/spectatorclient"
18+
)
19+
20+
//go:generate mockgen -package $GOPACKAGE -destination canary_client_mock.go github.com/uber/cadence/.gen/proto/sharddistributor/v1 ShardDistributorExecutorCanaryAPIYARPCClient
21+
22+
const (
23+
pingInterval = 1 * time.Second
24+
pingJitterCoeff = 0.1 // 10% jitter
25+
pingTimeout = 5 * time.Second
26+
)
27+
28+
// Pinger periodically pings shard owners in the fixed namespace
29+
type Pinger struct {
30+
logger *zap.Logger
31+
timeSource clock.TimeSource
32+
canaryClient sharddistributorv1.ShardDistributorExecutorCanaryAPIYARPCClient
33+
namespace string
34+
numShards int
35+
ctx context.Context
36+
cancel context.CancelFunc
37+
wg sync.WaitGroup
38+
}
39+
40+
// Params are the parameters for creating a Pinger
41+
type Params struct {
42+
fx.In
43+
44+
Logger *zap.Logger
45+
TimeSource clock.TimeSource
46+
CanaryClient sharddistributorv1.ShardDistributorExecutorCanaryAPIYARPCClient
47+
}
48+
49+
// NewPinger creates a new Pinger for the fixed namespace
50+
func NewPinger(params Params, namespace string, numShards int) *Pinger {
51+
return &Pinger{
52+
logger: params.Logger,
53+
timeSource: params.TimeSource,
54+
canaryClient: params.CanaryClient,
55+
namespace: namespace,
56+
numShards: numShards,
57+
}
58+
}
59+
60+
// Start begins the periodic ping loop
61+
func (p *Pinger) Start(ctx context.Context) {
62+
p.logger.Info("Starting canary pinger", zap.String("namespace", p.namespace), zap.Int("num_shards", p.numShards))
63+
p.ctx, p.cancel = context.WithCancel(context.WithoutCancel(ctx))
64+
p.wg.Add(1)
65+
go p.pingLoop()
66+
}
67+
68+
// Stop stops the ping loop
69+
func (p *Pinger) Stop() {
70+
if p.cancel != nil {
71+
p.cancel()
72+
}
73+
p.wg.Wait()
74+
}
75+
76+
func (p *Pinger) pingLoop() {
77+
defer p.wg.Done()
78+
79+
ticker := p.timeSource.NewTicker(backoff.JitDuration(pingInterval, pingJitterCoeff))
80+
defer ticker.Stop()
81+
82+
for {
83+
select {
84+
case <-p.ctx.Done():
85+
p.logger.Info("Pinger context done, stopping")
86+
return
87+
case <-ticker.Chan():
88+
p.pingRandomShard()
89+
ticker.Reset(backoff.JitDuration(pingInterval, pingJitterCoeff))
90+
}
91+
}
92+
}
93+
94+
// Pings a random shard in the namespace and logs the results
95+
func (p *Pinger) pingRandomShard() {
96+
shardNum := rand.Intn(p.numShards)
97+
shardKey := fmt.Sprintf("%d", shardNum)
98+
99+
request := &sharddistributorv1.PingRequest{
100+
ShardKey: shardKey,
101+
Namespace: p.namespace,
102+
}
103+
104+
ctx, cancel := context.WithTimeout(p.ctx, pingTimeout)
105+
defer cancel()
106+
107+
response, err := p.canaryClient.Ping(ctx, request, yarpc.WithShardKey(shardKey), yarpc.WithHeader(spectatorclient.NamespaceHeader, p.namespace))
108+
if err != nil {
109+
p.logger.Error("Failed to ping shard", zap.String("namespace", p.namespace), zap.String("shard_key", shardKey), zap.Error(err))
110+
}
111+
112+
// Verify response
113+
if !response.GetOwnsShard() {
114+
p.logger.Warn("Executor does not own shard", zap.String("namespace", p.namespace), zap.String("shard_key", shardKey), zap.String("executor_id", response.GetExecutorId()))
115+
}
116+
117+
p.logger.Info("Successfully pinged shard owner", zap.String("namespace", p.namespace), zap.String("shard_key", shardKey), zap.String("executor_id", response.GetExecutorId()))
118+
}
Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
package pinger
2+
3+
import (
4+
"context"
5+
"errors"
6+
"testing"
7+
8+
"github.com/stretchr/testify/assert"
9+
"go.uber.org/goleak"
10+
"go.uber.org/mock/gomock"
11+
"go.uber.org/zap"
12+
"go.uber.org/zap/zaptest/observer"
13+
14+
sharddistributorv1 "github.com/uber/cadence/.gen/proto/sharddistributor/v1"
15+
"github.com/uber/cadence/common/clock"
16+
)
17+
18+
func TestPingerStartStop(t *testing.T) {
19+
defer goleak.VerifyNone(t)
20+
21+
ctrl := gomock.NewController(t)
22+
mockClient := NewMockShardDistributorExecutorCanaryAPIYARPCClient(ctrl)
23+
24+
pinger := NewPinger(Params{
25+
Logger: zap.NewNop(),
26+
TimeSource: clock.NewRealTimeSource(),
27+
CanaryClient: mockClient,
28+
}, "test-ns", 10)
29+
30+
pinger.Start(context.Background())
31+
pinger.Stop()
32+
}
33+
34+
func TestPingerPingRandomShard(t *testing.T) {
35+
defer goleak.VerifyNone(t)
36+
37+
cases := []struct {
38+
name string
39+
setupClientMock func(*MockShardDistributorExecutorCanaryAPIYARPCClient)
40+
expectedLog string
41+
}{
42+
{
43+
name: "owns shard",
44+
setupClientMock: func(mockClient *MockShardDistributorExecutorCanaryAPIYARPCClient) {
45+
mockClient.EXPECT().Ping(gomock.Any(), gomock.Any(), gomock.Any()).
46+
Return(&sharddistributorv1.PingResponse{
47+
OwnsShard: true,
48+
ExecutorId: "127.0.0.1:7953",
49+
}, nil)
50+
},
51+
expectedLog: "Successfully pinged shard owner",
52+
},
53+
{
54+
name: "does not own shard",
55+
setupClientMock: func(mockClient *MockShardDistributorExecutorCanaryAPIYARPCClient) {
56+
mockClient.EXPECT().
57+
Ping(gomock.Any(), gomock.Any(), gomock.Any()).
58+
Return(&sharddistributorv1.PingResponse{
59+
OwnsShard: false,
60+
ExecutorId: "127.0.0.1:7953",
61+
}, nil)
62+
},
63+
expectedLog: "Executor does not own shard",
64+
},
65+
{
66+
name: "RPC error",
67+
setupClientMock: func(mockClient *MockShardDistributorExecutorCanaryAPIYARPCClient) {
68+
mockClient.EXPECT().
69+
Ping(gomock.Any(), gomock.Any(), gomock.Any()).
70+
Return(nil, errors.New("network error"))
71+
},
72+
expectedLog: "Failed to ping shard",
73+
},
74+
}
75+
76+
for _, tt := range cases {
77+
t.Run(tt.name, func(t *testing.T) {
78+
ctrl := gomock.NewController(t)
79+
mockClient := NewMockShardDistributorExecutorCanaryAPIYARPCClient(ctrl)
80+
zapCore, logs := observer.New(zap.InfoLevel)
81+
logger := zap.New(zapCore)
82+
83+
pinger := NewPinger(Params{
84+
Logger: logger,
85+
TimeSource: clock.NewRealTimeSource(),
86+
CanaryClient: mockClient,
87+
}, "test-ns", 10)
88+
pinger.ctx = context.Background()
89+
90+
tt.setupClientMock(mockClient)
91+
92+
pinger.pingRandomShard()
93+
94+
assert.Equal(t, 1, logs.FilterMessage(tt.expectedLog).Len())
95+
})
96+
}
97+
}

0 commit comments

Comments
 (0)