Skip to content

Add dynamodb multikey kv #5026

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Jan 24, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
* [FEATURE] Query-frontend/Querier: Create spans to measure time to merge promql responses. #5041
* [FEATURE] Querier/Ruler: Support the new thanos promql engine. This is an experimental feature and might change in the future. #5093
* [FEATURE] Added zstd as an option for grpc compression #5092
* [FEATURE] Ring: Add new kv store option `dynamodb`. #5026
* [BUGFIX] Updated `golang.org/x/net` dependency to fix CVE-2022-27664. #5008
* [BUGFIX] Fix panic when otel and xray tracing is enabled. #5044
* [BUGFIX] Fixed no compact block got grouped in shuffle sharding grouper. #5055
Expand Down
13 changes: 13 additions & 0 deletions docs/blocks-storage/compactor.md
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,19 @@ compactor:
# CLI flag: -compactor.ring.prefix
[prefix: <string> | default = "collectors/"]

dynamodb:
# Region to access dynamodb.
# CLI flag: -compactor.ring.dynamodb.region
[region: <string> | default = ""]

# Table name to use on dynamodb.
# CLI flag: -compactor.ring.dynamodb.table-name
[table_name: <string> | default = ""]

# Time to expire items on dynamodb.
# CLI flag: -compactor.ring.dynamodb.ttl-time
[ttl: <duration> | default = 0s]

# The consul_config configures the consul client.
# The CLI flags prefix for this block config is: compactor.ring
[consul: <consul_config>]
Expand Down
13 changes: 13 additions & 0 deletions docs/blocks-storage/store-gateway.md
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,19 @@ store_gateway:
# CLI flag: -store-gateway.sharding-ring.prefix
[prefix: <string> | default = "collectors/"]

dynamodb:
# Region to access dynamodb.
# CLI flag: -store-gateway.sharding-ring.dynamodb.region
[region: <string> | default = ""]

# Table name to use on dynamodb.
# CLI flag: -store-gateway.sharding-ring.dynamodb.table-name
[table_name: <string> | default = ""]

# Time to expire items on dynamodb.
# CLI flag: -store-gateway.sharding-ring.dynamodb.ttl-time
[ttl: <duration> | default = 0s]

# The consul_config configures the consul client.
# The CLI flags prefix for this block config is:
# store-gateway.sharding-ring
Expand Down
91 changes: 91 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -493,6 +493,19 @@ ha_tracker:
# CLI flag: -distributor.ha-tracker.prefix
[prefix: <string> | default = "ha-tracker/"]

dynamodb:
# Region to access dynamodb.
# CLI flag: -distributor.ha-tracker.dynamodb.region
[region: <string> | default = ""]

# Table name to use on dynamodb.
# CLI flag: -distributor.ha-tracker.dynamodb.table-name
[table_name: <string> | default = ""]

# Time to expire items on dynamodb.
# CLI flag: -distributor.ha-tracker.dynamodb.ttl-time
[ttl: <duration> | default = 0s]

# The consul_config configures the consul client.
# The CLI flags prefix for this block config is: distributor.ha-tracker
[consul: <consul_config>]
Expand Down Expand Up @@ -557,6 +570,19 @@ ring:
# CLI flag: -distributor.ring.prefix
[prefix: <string> | default = "collectors/"]

dynamodb:
# Region to access dynamodb.
# CLI flag: -distributor.ring.dynamodb.region
[region: <string> | default = ""]

# Table name to use on dynamodb.
# CLI flag: -distributor.ring.dynamodb.table-name
[table_name: <string> | default = ""]

# Time to expire items on dynamodb.
# CLI flag: -distributor.ring.dynamodb.ttl-time
[ttl: <duration> | default = 0s]

# The consul_config configures the consul client.
# The CLI flags prefix for this block config is: distributor.ring
[consul: <consul_config>]
Expand Down Expand Up @@ -627,6 +653,19 @@ lifecycler:
# CLI flag: -ring.prefix
[prefix: <string> | default = "collectors/"]

dynamodb:
# Region to access dynamodb.
# CLI flag: -dynamodb.region
[region: <string> | default = ""]

# Table name to use on dynamodb.
# CLI flag: -dynamodb.table-name
[table_name: <string> | default = ""]

# Time to expire items on dynamodb.
# CLI flag: -dynamodb.ttl-time
[ttl: <duration> | default = 0s]

# The consul_config configures the consul client.
[consul: <consul_config>]

Expand Down Expand Up @@ -1299,6 +1338,19 @@ ring:
# CLI flag: -ruler.ring.prefix
[prefix: <string> | default = "rulers/"]

dynamodb:
# Region to access dynamodb.
# CLI flag: -ruler.ring.dynamodb.region
[region: <string> | default = ""]

# Table name to use on dynamodb.
# CLI flag: -ruler.ring.dynamodb.table-name
[table_name: <string> | default = ""]

# Time to expire items on dynamodb.
# CLI flag: -ruler.ring.dynamodb.ttl-time
[ttl: <duration> | default = 0s]

# The consul_config configures the consul client.
# The CLI flags prefix for this block config is: ruler.ring
[consul: <consul_config>]
Expand Down Expand Up @@ -1681,6 +1733,19 @@ sharding_ring:
# CLI flag: -alertmanager.sharding-ring.prefix
[prefix: <string> | default = "alertmanagers/"]

dynamodb:
# Region to access dynamodb.
# CLI flag: -alertmanager.sharding-ring.dynamodb.region
[region: <string> | default = ""]

# Table name to use on dynamodb.
# CLI flag: -alertmanager.sharding-ring.dynamodb.table-name
[table_name: <string> | default = ""]

# Time to expire items on dynamodb.
# CLI flag: -alertmanager.sharding-ring.dynamodb.ttl-time
[ttl: <duration> | default = 0s]

# The consul_config configures the consul client.
# The CLI flags prefix for this block config is: alertmanager.sharding-ring
[consul: <consul_config>]
Expand Down Expand Up @@ -3868,6 +3933,19 @@ sharding_ring:
# CLI flag: -compactor.ring.prefix
[prefix: <string> | default = "collectors/"]

dynamodb:
# Region to access dynamodb.
# CLI flag: -compactor.ring.dynamodb.region
[region: <string> | default = ""]

# Table name to use on dynamodb.
# CLI flag: -compactor.ring.dynamodb.table-name
[table_name: <string> | default = ""]

# Time to expire items on dynamodb.
# CLI flag: -compactor.ring.dynamodb.ttl-time
[ttl: <duration> | default = 0s]

# The consul_config configures the consul client.
# The CLI flags prefix for this block config is: compactor.ring
[consul: <consul_config>]
Expand Down Expand Up @@ -3955,6 +4033,19 @@ sharding_ring:
# CLI flag: -store-gateway.sharding-ring.prefix
[prefix: <string> | default = "collectors/"]

dynamodb:
# Region to access dynamodb.
# CLI flag: -store-gateway.sharding-ring.dynamodb.region
[region: <string> | default = ""]

# Table name to use on dynamodb.
# CLI flag: -store-gateway.sharding-ring.dynamodb.table-name
[table_name: <string> | default = ""]

# Time to expire items on dynamodb.
# CLI flag: -store-gateway.sharding-ring.dynamodb.ttl-time
[ttl: <duration> | default = 0s]

# The consul_config configures the consul client.
# The CLI flags prefix for this block config is: store-gateway.sharding-ring
[consul: <consul_config>]
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ require (
github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137
github.com/alicebob/miniredis/v2 v2.30.0
github.com/armon/go-metrics v0.4.1
github.com/aws/aws-sdk-go v1.44.156
github.com/bradfitz/gomemcache v0.0.0-20190913173617-a41fca850d0b
github.com/cespare/xxhash v1.1.0
github.com/dustin/go-humanize v1.0.0
Expand Down Expand Up @@ -89,7 +90,6 @@ require (
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 // indirect
github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a // indirect
github.com/asaskevich/govalidator v0.0.0-20210307081110-f21760c49a8d // indirect
github.com/aws/aws-sdk-go v1.44.156 // indirect
github.com/aws/aws-sdk-go-v2 v1.16.0 // indirect
github.com/aws/aws-sdk-go-v2/config v1.15.1 // indirect
github.com/aws/aws-sdk-go-v2/credentials v1.11.0 // indirect
Expand Down
8 changes: 8 additions & 0 deletions integration/kv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,14 @@ func (c stringCodec) Decode(bb []byte) (interface{}, error) {
func (c stringCodec) Encode(v interface{}) ([]byte, error) { return []byte(v.(string)), nil }
func (c stringCodec) CodecID() string { return "stringCodec" }

func (stringCodec) EncodeMultiKey(msg interface{}) (map[string][]byte, error) {
return nil, errors.New("String codec does not support EncodeMultiKey")
}

func (stringCodec) DecodeMultiKey(map[string][]byte) (interface{}, error) {
return nil, errors.New("String codec does not support DecodeMultiKey")
}

type watcher struct {
values map[string][]interface{}
}
Expand Down
19 changes: 11 additions & 8 deletions pkg/ring/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ const pageContent = `
<body>
<h1>Ring Status</h1>
<p>Current time: {{ .Now }}</p>
<p>Storage updated: {{ .StorageLastUpdated }}</p>
<form action="" method="POST">
<input type="hidden" name="csrf_token" value="$__CSRF_TOKEN_PLACEHOLDER__">
<table width="100%" border="1">
Expand Down Expand Up @@ -116,9 +117,10 @@ type ingesterDesc struct {
}

type httpResponse struct {
Ingesters []ingesterDesc `json:"shards"`
Now time.Time `json:"now"`
ShowTokens bool `json:"-"`
Ingesters []ingesterDesc `json:"shards"`
Now time.Time `json:"now"`
StorageLastUpdated time.Time `json:"storageLastUpdated"`
ShowTokens bool `json:"-"`
}

func (r *Ring) ServeHTTP(w http.ResponseWriter, req *http.Request) {
Expand Down Expand Up @@ -149,14 +151,14 @@ func (r *Ring) ServeHTTP(w http.ResponseWriter, req *http.Request) {
}
sort.Strings(ingesterIDs)

now := time.Now()
storageLastUpdate := r.KVClient.LastUpdateTime(r.key)
var ingesters []ingesterDesc
_, owned := r.countTokens()
for _, id := range ingesterIDs {
ing := r.ringDesc.Ingesters[id]
heartbeatTimestamp := time.Unix(ing.Timestamp, 0)
state := ing.State.String()
if !r.IsHealthy(&ing, Reporting, now) {
if !r.IsHealthy(&ing, Reporting, storageLastUpdate) {
state = unhealthy
}

Expand All @@ -182,9 +184,10 @@ func (r *Ring) ServeHTTP(w http.ResponseWriter, req *http.Request) {
tokensParam := req.URL.Query().Get("tokens")

renderHTTPResponse(w, httpResponse{
Ingesters: ingesters,
Now: now,
ShowTokens: tokensParam == "true",
Ingesters: ingesters,
Now: time.Now(),
StorageLastUpdated: storageLastUpdate,
ShowTokens: tokensParam == "true",
}, pageTemplate, req)
}

Expand Down
16 changes: 13 additions & 3 deletions pkg/ring/kv/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@ import (
"flag"
"fmt"
"sync"
"time"

"github.com/go-kit/log"
"github.com/prometheus/client_golang/prometheus"

"github.com/cortexproject/cortex/pkg/ring/kv/codec"
"github.com/cortexproject/cortex/pkg/ring/kv/consul"
"github.com/cortexproject/cortex/pkg/ring/kv/dynamodb"
"github.com/cortexproject/cortex/pkg/ring/kv/etcd"
"github.com/cortexproject/cortex/pkg/ring/kv/memberlist"
)
Expand Down Expand Up @@ -40,9 +42,10 @@ var inmemoryStore Client
// Consul, Etcd, Memberlist or MultiClient. It was extracted from Config to keep
// single-client config separate from final client-config (with all the wrappers)
type StoreConfig struct {
Consul consul.Config `yaml:"consul"`
Etcd etcd.Config `yaml:"etcd"`
Multi MultiConfig `yaml:"multi"`
DynamoDB dynamodb.Config `yaml:"dynamodb"`
Consul consul.Config `yaml:"consul"`
Etcd etcd.Config `yaml:"etcd"`
Multi MultiConfig `yaml:"multi"`

// Function that returns memberlist.KV store to use. By using a function, we can delay
// initialization of memberlist.KV until it is actually required.
Expand All @@ -69,6 +72,7 @@ func (cfg *Config) RegisterFlagsWithPrefix(flagsPrefix, defaultPrefix string, f
// This needs to be fixed in the future (1.0 release maybe?) when we normalize flags.
// At the moment we have consul.<flag-name>, and ring.store, going forward it would
// be easier to have everything under ring, so ring.consul.<flag-name>
cfg.DynamoDB.RegisterFlags(f, flagsPrefix)
cfg.Consul.RegisterFlags(f, flagsPrefix)
cfg.Etcd.RegisterFlagsWithPrefix(f, flagsPrefix)
cfg.Multi.RegisterFlagsWithPrefix(f, flagsPrefix)
Expand Down Expand Up @@ -111,6 +115,9 @@ type Client interface {

// WatchPrefix calls f whenever any value stored under prefix changes.
WatchPrefix(ctx context.Context, prefix string, f func(string, interface{}) bool)

// LastUpdateTime returns the time a key was last sync by the kv store
LastUpdateTime(key string) time.Time
}

// NewClient creates a new Client (consul, etcd or inmemory) based on the config,
Expand All @@ -128,6 +135,9 @@ func createClient(backend string, prefix string, cfg StoreConfig, codec codec.Co
var err error

switch backend {
case "dynamodb":
client, err = dynamodb.NewClient(cfg.DynamoDB, codec, logger, reg)

case "consul":
client, err = consul.NewClient(cfg.Consul, codec, logger, reg)

Expand Down
6 changes: 6 additions & 0 deletions pkg/ring/kv/codec/clonable.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package codec

type Clonable interface {
// Clone should return a deep copy of the state.
Clone() interface{}
}
Loading