Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
* [BUGFIX] Storage: Bucket index updater should ignore meta not found for partial blocks. #5343
* [BUGFIX] Ring: Add JOINING state to read operation. #5346
* [BUGFIX] Compactor: Partial block with only visit marker should be deleted even there is no deletion marker. #5342
* [BUGFIX] KV: Etcd calls will no longer block indefinitely and will now time out after the DialTimeout period. #5392

## 1.15.1 2023-04-26

Expand Down
40 changes: 33 additions & 7 deletions pkg/ring/kv/etcd/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,11 @@ func (c *Client) CAS(ctx context.Context, key string, f func(in interface{}) (ou
var revision int64
var lastErr error

opsCtx, cancel := c.opsContext(ctx)
defer cancel()

for i := 0; i < c.cfg.MaxRetries; i++ {
resp, err := c.cli.Get(ctx, key)
resp, err := c.cli.Get(opsCtx, key)
if err != nil {
level.Error(c.logger).Log("msg", "error getting key", "key", key, "err", err)
lastErr = err
Expand Down Expand Up @@ -165,7 +168,7 @@ func (c *Client) CAS(ctx context.Context, key string, f func(in interface{}) (ou
continue
}

result, err := c.cli.Txn(ctx).
result, err := c.cli.Txn(opsCtx).
If(clientv3.Compare(clientv3.Version(key), "=", revision)).
Then(clientv3.OpPut(key, string(buf))).
Commit()
Expand Down Expand Up @@ -198,7 +201,12 @@ func (c *Client) WatchKey(ctx context.Context, key string, f func(interface{}) b

// Ensure the context used by the Watch is always cancelled.
watchCtx, cancel := context.WithCancel(ctx)
defer cancel()
defer func() {
cancel()
level.Debug(c.logger).Log("msg", "Finished watching key", "key", key)
}()

level.Debug(c.logger).Log("msg", "Watching key", "key", key)

outer:
for backoff.Ongoing() {
Expand Down Expand Up @@ -234,7 +242,12 @@ func (c *Client) WatchPrefix(ctx context.Context, key string, f func(string, int

// Ensure the context used by the Watch is always cancelled.
watchCtx, cancel := context.WithCancel(ctx)
defer cancel()
defer func() {
cancel()
level.Debug(c.logger).Log("msg", "Finished watching prefix", "key", key)
}()

level.Debug(c.logger).Log("msg", "Watching prefix", "key", key)

outer:
for backoff.Ongoing() {
Expand Down Expand Up @@ -268,7 +281,10 @@ outer:

// List implements kv.Client.
func (c *Client) List(ctx context.Context, prefix string) ([]string, error) {
resp, err := c.cli.Get(ctx, prefix, clientv3.WithPrefix(), clientv3.WithKeysOnly())
opsCtx, cancel := c.opsContext(ctx)
defer cancel()

resp, err := c.cli.Get(opsCtx, prefix, clientv3.WithPrefix(), clientv3.WithKeysOnly())
if err != nil {
return nil, err
}
Expand All @@ -281,7 +297,10 @@ func (c *Client) List(ctx context.Context, prefix string) ([]string, error) {

// Get implements kv.Client.
func (c *Client) Get(ctx context.Context, key string) (interface{}, error) {
resp, err := c.cli.Get(ctx, key)
opsCtx, cancel := c.opsContext(ctx)
defer cancel()

resp, err := c.cli.Get(opsCtx, key)
if err != nil {
return nil, err
}
Expand All @@ -295,10 +314,17 @@ func (c *Client) Get(ctx context.Context, key string) (interface{}, error) {

// Delete implements kv.Client.
func (c *Client) Delete(ctx context.Context, key string) error {
_, err := c.cli.Delete(ctx, key)
opsCtx, cancel := c.opsContext(ctx)
defer cancel()

_, err := c.cli.Delete(opsCtx, key)
return err
}

func (c *Client) LastUpdateTime(_ string) time.Time {
return time.Now().UTC()
}

func (c *Client) opsContext(parent context.Context) (context.Context, context.CancelFunc) {
return context.WithTimeout(parent, time.Duration(float64(c.cfg.DialTimeout)*float64(c.cfg.MaxRetries)))
}