Skip to content

Commit c78cbe7

Browse files
nurzhan-saktaganovKaymeKaydex
authored andcommitted
make NewRouter tolerant to the cluster's degraded state (resolve issue #13)
* DiscoveryAllBuckets: don't cancel requests to other replicasets if request to some replicaset failed. * add test for degraded cluster (TestDegradedCluster) * AddReplicaset: don't fail if there are no available RW instances * NewRouter: don't fail if DiscoveryAllBuckets has returned an error
1 parent be369fe commit c78cbe7

File tree

5 files changed

+60
-10
lines changed

5 files changed

+60
-10
lines changed

CHANGELOG.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,15 @@
33
BUG FIXES:
44
- Fixed go.mod and go.sum files with go mod tidy.
55

6+
CHANGES:
7+
* DiscoveryAllBuckets: don't cancel requests to other replicasets if request to some replicaset failed.
8+
* AddReplicaset: don't fail if there are no available RW instances.
9+
* NewRouter: don't fail if DiscoveryAllBuckets has returned an error.
10+
611
TESTS:
712

813
- Added etcd v2 provider tests.
14+
- Add test for degraded cluster (TestDegradedCluster).
915

1016
## v2.0.1
1117

discovery.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -222,7 +222,7 @@ func (r *Router) DiscoveryAllBuckets(ctx context.Context) error {
222222

223223
r.log().Infof(ctx, "Start discovery all buckets")
224224

225-
errGr, ctx := errgroup.WithContext(ctx)
225+
var errGr errgroup.Group
226226

227227
view := r.getConsistentView()
228228
nameToReplicasetRef := r.getNameToReplicaset()
@@ -236,6 +236,7 @@ func (r *Router) DiscoveryAllBuckets(ctx context.Context) error {
236236
for {
237237
resp, err := rs.bucketsDiscovery(ctx, bucketsDiscoveryPaginationFrom)
238238
if err != nil {
239+
r.log().Errorf(ctx, "can't bucketsDiscovery for rs %s: %v", rs.info, err)
239240
return err
240241
}
241242

tarantool_test.go

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -639,3 +639,48 @@ func TestReplicaset_Pooler(t *testing.T) {
639639
}
640640
})
641641
}
642+
643+
func TestDegradedCluster(t *testing.T) {
644+
ctx := context.Background()
645+
646+
// create a topology to imitate cluster with several replicasets:
647+
// 1 fake replicaset (imitates an unavailable replicaset) + all replicasets of topology
648+
topologyDegraded := map[vshardrouter.ReplicasetInfo][]vshardrouter.InstanceInfo{
649+
{
650+
// add fake replicaset
651+
Name: "storage_0",
652+
UUID: uuid.New(),
653+
Weight: 1,
654+
}: {
655+
{
656+
Name: "storage_0_a",
657+
UUID: uuid.New(),
658+
Addr: "127.0.0.1:2998",
659+
},
660+
{
661+
Name: "storage_0_b",
662+
UUID: uuid.New(),
663+
Addr: "127.0.0.1:2999",
664+
},
665+
},
666+
}
667+
668+
// add all replicasets of topology
669+
for k, v := range topology {
670+
topologyDegraded[k] = v
671+
}
672+
673+
router, err := vshardrouter.NewRouter(ctx, vshardrouter.Config{
674+
TopologyProvider: static.NewProvider(topologyDegraded),
675+
DiscoveryTimeout: 5 * time.Second,
676+
DiscoveryMode: vshardrouter.DiscoveryModeOn,
677+
TotalBucketCount: totalBucketCount,
678+
User: username,
679+
})
680+
require.NoError(t, err, "NewRouter created successfully")
681+
682+
for bucketID := uint64(1); bucketID <= totalBucketCount; bucketID++ {
683+
_, err := router.Route(ctx, bucketID)
684+
require.NoErrorf(t, err, "bucket %d resolved successfully")
685+
}
686+
}

topology.go

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -130,13 +130,11 @@ func (r *Router) AddReplicaset(ctx context.Context, rsInfo ReplicasetInfo, insta
130130
r.log().Infof(ctx, "[replicaset %s ] instance %s %s in role %s", rsInfo, instName, connectStatus, instConnInfo.ConnRole)
131131
}
132132

133-
isConnected, err := conn.ConnectedNow(pool.RW)
134-
if err != nil {
135-
return fmt.Errorf("cant check rs pool conntected rw now with error: %s", err)
136-
}
137-
138-
if !isConnected {
139-
return fmt.Errorf("got connected now as false, storage must be configured first")
133+
switch isConnected, err := conn.ConnectedNow(pool.RW); {
134+
case err != nil:
135+
r.log().Errorf(ctx, "cant check rs pool conntected rw now with error: %v", err)
136+
case !isConnected:
137+
r.log().Errorf(ctx, "got connected now as false to pool.RW")
140138
}
141139

142140
replicaset.conn = conn

vshard.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -241,7 +241,7 @@ func NewRouter(ctx context.Context, cfg Config) (*Router, error) {
241241

242242
err = router.DiscoveryAllBuckets(ctx)
243243
if err != nil {
244-
return nil, err
244+
router.log().Errorf(ctx, "router.DiscoveryAllBuckets failed: %v", err)
245245
}
246246

247247
if cfg.DiscoveryMode == DiscoveryModeOn {
@@ -255,7 +255,7 @@ func NewRouter(ctx context.Context, cfg Config) (*Router, error) {
255255
router.cancelDiscovery = cancelFunc
256256
}
257257

258-
return router, err
258+
return router, nil
259259
}
260260

261261
// BucketSet Set a bucket to a replicaset.

0 commit comments

Comments
 (0)