Skip to content

Add ddb ring pull time config #5357

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jun 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
* [ENHANCEMENT] Update Go version to 1.20.4. #5299
* [ENHANCEMENT] Log: Avoid expensive log.Valuer evaluation for disallowed levels. #5297
* [ENHANCEMENT] Improving Performance on the API Gzip Handler. #5347
* [ENHANCEMENT] Dynamodb: Add `puller-sync-time` to allow different pull time for ring. #5357
* [ENHANCEMENT] Emit querier `max_concurrent` as a metric. #5362
* [BUGFIX] Ruler: Validate if rule group can be safely converted back to rule group yaml from protobuf message #5265
* [BUGFIX] Querier: Convert gRPC `ResourceExhausted` status code from store gateway to 422 limit error. #5286
Expand Down
4 changes: 4 additions & 0 deletions docs/blocks-storage/compactor.md
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,10 @@ compactor:
# CLI flag: -compactor.ring.dynamodb.ttl-time
[ttl: <duration> | default = 0s]

# Time to refresh local ring with information on dynamodb.
# CLI flag: -compactor.ring.dynamodb.puller-sync-time
[puller_sync_time: <duration> | default = 1m]

# The consul_config configures the consul client.
# The CLI flags prefix for this block config is: compactor.ring
[consul: <consul_config>]
Expand Down
4 changes: 4 additions & 0 deletions docs/blocks-storage/store-gateway.md
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,10 @@ store_gateway:
# CLI flag: -store-gateway.sharding-ring.dynamodb.ttl-time
[ttl: <duration> | default = 0s]

# Time to refresh local ring with information on dynamodb.
# CLI flag: -store-gateway.sharding-ring.dynamodb.puller-sync-time
[puller_sync_time: <duration> | default = 1m]

# The consul_config configures the consul client.
# The CLI flags prefix for this block config is:
# store-gateway.sharding-ring
Expand Down
28 changes: 28 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,10 @@ sharding_ring:
# CLI flag: -alertmanager.sharding-ring.dynamodb.ttl-time
[ttl: <duration> | default = 0s]

# Time to refresh local ring with information on dynamodb.
# CLI flag: -alertmanager.sharding-ring.dynamodb.puller-sync-time
[puller_sync_time: <duration> | default = 1m]

# The consul_config configures the consul client.
# The CLI flags prefix for this block config is: alertmanager.sharding-ring
[consul: <consul_config>]
Expand Down Expand Up @@ -1878,6 +1882,10 @@ sharding_ring:
# CLI flag: -compactor.ring.dynamodb.ttl-time
[ttl: <duration> | default = 0s]

# Time to refresh local ring with information on dynamodb.
# CLI flag: -compactor.ring.dynamodb.puller-sync-time
[puller_sync_time: <duration> | default = 1m]

# The consul_config configures the consul client.
# The CLI flags prefix for this block config is: compactor.ring
[consul: <consul_config>]
Expand Down Expand Up @@ -2117,6 +2125,10 @@ ha_tracker:
# CLI flag: -distributor.ha-tracker.dynamodb.ttl-time
[ttl: <duration> | default = 0s]

# Time to refresh local ring with information on dynamodb.
# CLI flag: -distributor.ha-tracker.dynamodb.puller-sync-time
[puller_sync_time: <duration> | default = 1m]

# The consul_config configures the consul client.
# The CLI flags prefix for this block config is: distributor.ha-tracker
[consul: <consul_config>]
Expand Down Expand Up @@ -2194,6 +2206,10 @@ ring:
# CLI flag: -distributor.ring.dynamodb.ttl-time
[ttl: <duration> | default = 0s]

# Time to refresh local ring with information on dynamodb.
# CLI flag: -distributor.ring.dynamodb.puller-sync-time
[puller_sync_time: <duration> | default = 1m]

# The consul_config configures the consul client.
# The CLI flags prefix for this block config is: distributor.ring
[consul: <consul_config>]
Expand Down Expand Up @@ -2482,6 +2498,10 @@ lifecycler:
# CLI flag: -dynamodb.ttl-time
[ttl: <duration> | default = 0s]

# Time to refresh local ring with information on dynamodb.
# CLI flag: -dynamodb.puller-sync-time
[puller_sync_time: <duration> | default = 1m]

# The consul_config configures the consul client.
[consul: <consul_config>]

Expand Down Expand Up @@ -3853,6 +3873,10 @@ ring:
# CLI flag: -ruler.ring.dynamodb.ttl-time
[ttl: <duration> | default = 0s]

# Time to refresh local ring with information on dynamodb.
# CLI flag: -ruler.ring.dynamodb.puller-sync-time
[puller_sync_time: <duration> | default = 1m]

# The consul_config configures the consul client.
# The CLI flags prefix for this block config is: ruler.ring
[consul: <consul_config>]
Expand Down Expand Up @@ -4712,6 +4736,10 @@ sharding_ring:
# CLI flag: -store-gateway.sharding-ring.dynamodb.ttl-time
[ttl: <duration> | default = 0s]

# Time to refresh local ring with information on dynamodb.
# CLI flag: -store-gateway.sharding-ring.dynamodb.puller-sync-time
[puller_sync_time: <duration> | default = 1m]

# The consul_config configures the consul client.
# The CLI flags prefix for this block config is: store-gateway.sharding-ring
[consul: <consul_config>]
Expand Down
59 changes: 30 additions & 29 deletions pkg/ring/kv/dynamodb/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,19 @@ import (

// Config to create a ConsulClient
type Config struct {
Region string `yaml:"region"`
TableName string `yaml:"table_name"`
TTL time.Duration `yaml:"ttl"`
Region string `yaml:"region"`
TableName string `yaml:"table_name"`
TTL time.Duration `yaml:"ttl"`
PullerSyncTime time.Duration `yaml:"puller_sync_time"`
}

type Client struct {
kv dynamoDbClient
codec codec.Codec
ddbMetrics *dynamodbMetrics
logger log.Logger
kv dynamoDbClient
codec codec.Codec
ddbMetrics *dynamodbMetrics
logger log.Logger
pullerSyncTime time.Duration
backoffConfig backoff.Config

staleDataLock sync.RWMutex
staleData map[string]staleData
Expand All @@ -37,22 +40,13 @@ type staleData struct {
timestamp time.Time
}

var (
backoffConfig = backoff.Config{
MinBackoff: 1 * time.Second,
MaxBackoff: 1 * time.Minute,
MaxRetries: 0,
}

defaultLoopDelay = 1 * time.Minute
)

// RegisterFlags adds the flags required to config this to the given FlagSet
// If prefix is not an empty string it should end with a period.
func (cfg *Config) RegisterFlags(f *flag.FlagSet, prefix string) {
f.StringVar(&cfg.Region, prefix+"dynamodb.region", "", "Region to access dynamodb.")
f.StringVar(&cfg.TableName, prefix+"dynamodb.table-name", "", "Table name to use on dynamodb.")
f.DurationVar(&cfg.TTL, prefix+"dynamodb.ttl-time", 0, "Time to expire items on dynamodb.")
f.DurationVar(&cfg.PullerSyncTime, prefix+"dynamodb.puller-sync-time", 60*time.Second, "Time to refresh local ring with information on dynamodb.")
}

func NewClient(cfg Config, cc codec.Codec, logger log.Logger, registerer prometheus.Registerer) (*Client, error) {
Expand All @@ -63,12 +57,20 @@ func NewClient(cfg Config, cc codec.Codec, logger log.Logger, registerer prometh

ddbMetrics := newDynamoDbMetrics(registerer)

backoffConfig := backoff.Config{
MinBackoff: 1 * time.Second,
MaxBackoff: cfg.PullerSyncTime,
MaxRetries: 0,
}

c := &Client{
kv: dynamodbInstrumentation{kv: dynamoDB, ddbMetrics: ddbMetrics},
codec: cc,
logger: ddbLog(logger),
ddbMetrics: ddbMetrics,
staleData: make(map[string]staleData),
kv: dynamodbInstrumentation{kv: dynamoDB, ddbMetrics: ddbMetrics},
codec: cc,
logger: ddbLog(logger),
ddbMetrics: ddbMetrics,
pullerSyncTime: cfg.PullerSyncTime,
staleData: make(map[string]staleData),
backoffConfig: backoffConfig,
}
level.Info(c.logger).Log("dynamodb kv initialized")
return c, nil
Expand Down Expand Up @@ -121,7 +123,7 @@ func (c *Client) Delete(ctx context.Context, key string) error {
}

func (c *Client) CAS(ctx context.Context, key string, f func(in interface{}) (out interface{}, retry bool, err error)) error {
bo := backoff.New(ctx, backoffConfig)
bo := backoff.New(ctx, c.backoffConfig)
for bo.Ongoing() {
resp, _, err := c.kv.Query(ctx, dynamodbKey{primaryKey: key}, false)
if err != nil {
Expand Down Expand Up @@ -190,7 +192,7 @@ func (c *Client) CAS(ctx context.Context, key string, f func(in interface{}) (ou
}

func (c *Client) WatchKey(ctx context.Context, key string, f func(interface{}) bool) {
bo := backoff.New(ctx, backoffConfig)
bo := backoff.New(ctx, c.backoffConfig)

for bo.Ongoing() {
out, _, err := c.kv.Query(ctx, dynamodbKey{
Expand All @@ -199,12 +201,11 @@ func (c *Client) WatchKey(ctx context.Context, key string, f func(interface{}) b
if err != nil {
level.Error(c.logger).Log("msg", "error WatchKey", "key", key, "err", err)

if bo.NumRetries() > 10 {
if bo.NumRetries() >= 10 {
if staleData := c.getStaleData(key); staleData != nil {
if !f(staleData) {
return
}
bo.Reset()
}
}
bo.Wait()
Expand All @@ -226,13 +227,13 @@ func (c *Client) WatchKey(ctx context.Context, key string, f func(interface{}) b
select {
case <-ctx.Done():
return
case <-time.After(defaultLoopDelay):
case <-time.After(c.pullerSyncTime):
}
}
}

func (c *Client) WatchPrefix(ctx context.Context, prefix string, f func(string, interface{}) bool) {
bo := backoff.New(ctx, backoffConfig)
bo := backoff.New(ctx, c.backoffConfig)

for bo.Ongoing() {
out, _, err := c.kv.Query(ctx, dynamodbKey{
Expand All @@ -259,7 +260,7 @@ func (c *Client) WatchPrefix(ctx context.Context, prefix string, f func(string,
select {
case <-ctx.Done():
return
case <-time.After(defaultLoopDelay):
case <-time.After(c.pullerSyncTime):
}
}
}
Expand Down
Loading