Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
* [FEATURE] Experimental: Added a new object storage client for OpenStack Swift. #2440
* [FEATURE] Update in dependency `weaveworks/common`. TLS config options added to the Server. #2535
* [FEATURE] Experimental: Added support for `/api/v1/metadata` Prometheus-based endpoint. #2549
* [FEATURE] Add ability to limit concurrent queries to Cassandra with `-cassandra.limit-query-concurrency` flag. #2562
* [ENHANCEMENT] Experimental TSDB: sample ingestion errors are now reported via existing `cortex_discarded_samples_total` metric. #2370
* [ENHANCEMENT] Failures on samples at distributors and ingesters return the first validation error as opposed to the last. #2383
* [ENHANCEMENT] Experimental TSDB: Added `cortex_querier_blocks_meta_synced`, which reflects current state of synced blocks over all tenants. #2392
Expand Down
4 changes: 4 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -1685,6 +1685,10 @@ cassandra:
# CLI flag: -cassandra.retry-min-backoff
[retry_min_backoff: <duration> | default = 100ms]

# Limit number of concurrent queries to Cassandra. (Default is 0: no limit)
# CLI flag: -cassandra.limit-query-concurrency
[limit_query_concurrency: <int> | default = 0]

boltdb:
# Location of BoltDB index files.
# CLI flag: -boltdb.dir
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ require (
go.etcd.io/etcd v0.0.0-20191023171146-3cf2f69b5738
go.uber.org/atomic v1.5.1
golang.org/x/net v0.0.0-20200226121028-0de0cce0169b
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e
golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a
golang.org/x/time v0.0.0-20191024005414-555d28b269f0
google.golang.org/api v0.14.0
google.golang.org/grpc v1.26.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1041,6 +1041,8 @@ golang.org/x/sync v0.0.0-20190423024810-112230192c58 h1:8gQV6CLnAEikrhgkHFbMAEha
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e h1:vcxGaoTs7kV8m5Np9uUNQin4BrLOthgV7252N8V+FwY=
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a h1:WXEvlFVvvGxCJLG6REjsT03iWnKLEWinaScsxF2Vm2o=
golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20170830134202-bb24a47a89ea/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
Expand Down
36 changes: 30 additions & 6 deletions pkg/chunk/cassandra/storage_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

"github.com/gocql/gocql"
"github.com/pkg/errors"
"golang.org/x/sync/semaphore"

"github.com/cortexproject/cortex/pkg/chunk"
"github.com/cortexproject/cortex/pkg/chunk/util"
Expand Down Expand Up @@ -40,6 +41,7 @@ type Config struct {
Retries int `yaml:"max_retries"`
MaxBackoff time.Duration `yaml:"retry_max_backoff"`
MinBackoff time.Duration `yaml:"retry_min_backoff"`
LimitQueryConcurrency int `yaml:"limit_query_concurrency"`
}

// RegisterFlags adds the flags required to config this to the given FlagSet
Expand All @@ -63,6 +65,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.IntVar(&cfg.Retries, "cassandra.max-retries", 0, "Number of retries to perform on a request. (Default is 0: no retries)")
f.DurationVar(&cfg.MinBackoff, "cassandra.retry-min-backoff", 100*time.Millisecond, "Minimum time to wait before retrying a failed request. (Default = 100ms)")
f.DurationVar(&cfg.MaxBackoff, "cassandra.retry-max-backoff", 10*time.Second, "Maximum time to wait before retrying a failed request. (Default = 10s)")
f.IntVar(&cfg.LimitQueryConcurrency, "cassandra.limit-query-concurrency", 0, "Limit number of concurrent queries to Cassandra. (Default is 0: no limit)")
}

func (cfg *Config) Validate() error {
Expand Down Expand Up @@ -192,9 +195,10 @@ func (cfg *Config) createKeyspace() error {

// StorageClient implements chunk.IndexClient and chunk.ObjectClient for Cassandra.
type StorageClient struct {
cfg Config
schemaCfg chunk.SchemaConfig
session *gocql.Session
cfg Config
schemaCfg chunk.SchemaConfig
session *gocql.Session
querySemaphore *semaphore.Weighted
}

// NewStorageClient returns a new StorageClient.
Expand All @@ -206,10 +210,16 @@ func NewStorageClient(cfg Config, schemaCfg chunk.SchemaConfig) (*StorageClient,
return nil, errors.WithStack(err)
}

var querySemaphore *semaphore.Weighted
if cfg.LimitQueryConcurrency > 0 {
querySemaphore = semaphore.NewWeighted(int64(cfg.LimitQueryConcurrency))
}

client := &StorageClient{
cfg: cfg,
schemaCfg: schemaCfg,
session: session,
cfg: cfg,
schemaCfg: schemaCfg,
session: session,
querySemaphore: querySemaphore,
}
return client, nil
}
Expand Down Expand Up @@ -277,6 +287,13 @@ func (s *StorageClient) QueryPages(ctx context.Context, queries []chunk.IndexQue
}

func (s *StorageClient) query(ctx context.Context, query chunk.IndexQuery, callback util.Callback) error {
if s.querySemaphore != nil {
if err := s.querySemaphore.Acquire(ctx, 1); err != nil {
return err
}
defer s.querySemaphore.Release(1)
}

var q *gocql.Query

switch {
Expand Down Expand Up @@ -383,6 +400,13 @@ func (s *StorageClient) GetChunks(ctx context.Context, input []chunk.Chunk) ([]c
}

func (s *StorageClient) getChunk(ctx context.Context, decodeContext *chunk.DecodeContext, input chunk.Chunk) (chunk.Chunk, error) {
if s.querySemaphore != nil {
if err := s.querySemaphore.Acquire(ctx, 1); err != nil {
return input, err
}
defer s.querySemaphore.Release(1)
}

tableName, err := s.schemaCfg.ChunkTableFor(input.From)
if err != nil {
return input, err
Expand Down
136 changes: 136 additions & 0 deletions vendor/golang.org/x/sync/semaphore/semaphore.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion vendor/modules.txt

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.