diff --git a/.circleci/config.yml b/.circleci/config.yml index 54dfc1bd4b1..8e369d65108 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -149,6 +149,7 @@ jobs: docker pull quay.io/cortexproject/cortex:v0.6.0 docker pull shopify/bigtable-emulator:0.1.0 docker pull rinscy/cassandra:3.11.0 + docker pull memcached:1.6.1 - run: name: Integration Tests command: | diff --git a/CHANGELOG.md b/CHANGELOG.md index 05a570a1a99..3d4724e6c5e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,7 @@ * `-flusher.flush-op-timeout` is duration after which a flush should timeout. * [ENHANCEMENT] Better re-use of connections to DynamoDB and S3. #2268 * [ENHANCEMENT] Experimental TSDB: Add support for local `filesystem` backend. #2245 +* [ENHANCEMENT] Experimental TSDB: Added memcached support for the TSDB index cache. #2290 * [ENHANCEMENT] Allow 1w (where w denotes week) and 1y (where y denotes year) when setting table period and retention. #2252 * [ENHANCEMENT] Added FIFO cache metrics for current number of entries and memory usage. #2270 * [ENHANCEMENT] Output all config fields to /config API, including those with empty value. #2209 diff --git a/development/tsdb-blocks-storage-s3/config/cortex.yaml b/development/tsdb-blocks-storage-s3/config/cortex.yaml index c1d479afb39..d943c4e749b 100644 --- a/development/tsdb-blocks-storage-s3/config/cortex.yaml +++ b/development/tsdb-blocks-storage-s3/config/cortex.yaml @@ -39,6 +39,11 @@ tsdb: bucket_store: sync_dir: /tmp/cortex-tsdb-querier + index_cache: + backend: memcached + memcached: + addresses: dns+memcached:11211 + s3: endpoint: minio:9000 bucket_name: cortex-tsdb diff --git a/development/tsdb-blocks-storage-s3/docker-compose.yml b/development/tsdb-blocks-storage-s3/docker-compose.yml index eee8a02c098..7ac35b60495 100644 --- a/development/tsdb-blocks-storage-s3/docker-compose.yml +++ b/development/tsdb-blocks-storage-s3/docker-compose.yml @@ -18,6 +18,9 @@ services: volumes: - .data-minio:/data:delegated + memcached: + image: memcached:1.6 + configstore: image: nginx volumes: diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index 8dbf43fcda1..2d8bb7ef043 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -2227,11 +2227,6 @@ bucket_store: # CLI flag: -experimental.tsdb.bucket-store.sync-interval [sync_interval: | default = 5m0s] - # Size in bytes of in-memory index cache used to speed up blocks index lookups - # (shared between all tenants). - # CLI flag: -experimental.tsdb.bucket-store.index-cache-size-bytes - [index_cache_size_bytes: | default = 1073741824] - # Max size - in bytes - of a per-tenant chunk pool, used to reduce memory # allocations. # CLI flag: -experimental.tsdb.bucket-store.max-chunk-pool-bytes @@ -2271,6 +2266,54 @@ bucket_store: # CLI flag: -experimental.tsdb.bucket-store.consistency-delay [consistency_delay: | default = 0s] + index_cache: + # The index cache backend type. Supported values: inmemory, memcached. + # CLI flag: -experimental.tsdb.bucket-store.index-cache.backend + [backend: | default = "inmemory"] + + inmemory: + # Maximum size in bytes of in-memory index cache used to speed up blocks + # index lookups (shared between all tenants). + # CLI flag: -experimental.tsdb.bucket-store.index-cache.inmemory.max-size-bytes + [max_size_bytes: | default = 1073741824] + + memcached: + # Comma separated list of memcached addresses. Supported prefixes are: + # dns+ (looked up as an A/AAAA query), dnssrv+ (looked up as a SRV query, + # dnssrvnoa+ (looked up as a SRV query, with no A/AAAA lookup made after + # that). + # CLI flag: -experimental.tsdb.bucket-store.index-cache.memcached.addresses + [addresses: | default = ""] + + # The socket read/write timeout. + # CLI flag: -experimental.tsdb.bucket-store.index-cache.memcached.timeout + [timeout: | default = 100ms] + + # The maximum number of idle connections that will be maintained per + # address. + # CLI flag: -experimental.tsdb.bucket-store.index-cache.memcached.max-idle-connections + [max_idle_connections: | default = 16] + + # The maximum number of concurrent asynchronous operations can occur. + # CLI flag: -experimental.tsdb.bucket-store.index-cache.memcached.max-async-concurrency + [max_async_concurrency: | default = 50] + + # The maximum number of enqueued asynchronous operations allowed. + # CLI flag: -experimental.tsdb.bucket-store.index-cache.memcached.max-async-buffer-size + [max_async_buffer_size: | default = 10000] + + # The maximum number of concurrent connections running get operations. If + # set to 0, concurrency is unlimited. + # CLI flag: -experimental.tsdb.bucket-store.index-cache.memcached.max-get-multi-concurrency + [max_get_multi_concurrency: | default = 100] + + # The maximum number of keys a single underlying get operation should run. + # If more keys are specified, internally keys are splitted into multiple + # batches and fetched concurrently, honoring the max concurrency. If set + # to 0, the max batch size is unlimited. + # CLI flag: -experimental.tsdb.bucket-store.index-cache.memcached.max-get-multi-batch-size + [max_get_multi_batch_size: | default = 0] + # How frequently does Cortex try to compact TSDB head. Block is only created if # data covers smallest block range. Must be greater than 0 and max 5 minutes. # CLI flag: -experimental.tsdb.head-compaction-interval diff --git a/docs/operations/blocks-storage.md b/docs/operations/blocks-storage.md index 2b60932a4fa..8e61676def3 100644 --- a/docs/operations/blocks-storage.md +++ b/docs/operations/blocks-storage.md @@ -72,6 +72,37 @@ Whenever the pool of compactors increase or decrease (ie. following up a scale u - `GET /compactor_ring`
Displays the status of the compactors ring, including the tokens owned by each compactor and an option to remove (forget) instances from the ring. +## Index cache + +The querier supports a cache to speed up postings and series lookups from TSDB blocks indexes. Two backends are supported: + +- `inmemory` +- `memcached` + +### In-memory index cache + +The `inmemory` index cache is **enabled by default** and its max size can be configured through the flag `-experimental.tsdb.bucket-store.index-cache.inmemory.max-size-bytes` (or config file). The trade-off of using the in-memory index cache is: + +- Pros: zero latency +- Cons: increased querier memory usage, not shared across multiple querier replicas + +### Memcached index cache + +The `memcached` index cache allows to use [Memcached](https://memcached.org/) as cache backend. This cache backend is configured using `-experimental.tsdb.bucket-store.index-cache.backend=memcached` and requires the Memcached server(s) addresses via `-experimental.tsdb.bucket-store.index-cache.memcached.addresses` (or config file). The addresses are resolved using the [DNS service provider](dns-service-discovery.md). + +The trade-off of using the Memcached index cache is: + +- Pros: can scale beyond a single node memory (Memcached cluster), shared across multiple querier instances +- Cons: higher latency in the cache round trip compared to the in-memory one + +The Memcached client uses a jump hash algorithm to shard cached entries across a cluster of Memcached servers. For this reason, you should make sure memcached servers are **not** behind any kind of load balancer and their address is configured so that servers are added/removed to the end of the list whenever a scale up/down occurs. + +For example, if you're running Memcached in Kubernetes, you may: + +1. Deploy your Memcached cluster using a [StatefulSet](https://kubernetes.io/docs/concepts/workloads/controllers/statefulset/) +2. Create an [headless service](https://kubernetes.io/docs/concepts/services-networking/service/#headless-services) for Memcached StatefulSet +3. Configure the Cortex's Memcached client address using the `dnssrvnoa+` [service discovery](dns-service-discovery.md) + ## Configuration The general [configuration documentation](../configuration/_index.md) also applied to a Cortex cluster running the blocks storage, with few differences: @@ -134,11 +165,6 @@ tsdb: # CLI flag: -experimental.tsdb.bucket-store.sync-interval [sync_interval: | default = 5m0s] - # Size in bytes of in-memory index cache used to speed up blocks index - # lookups (shared between all tenants). - # CLI flag: -experimental.tsdb.bucket-store.index-cache-size-bytes - [index_cache_size_bytes: | default = 1073741824] - # Max size - in bytes - of a per-tenant chunk pool, used to reduce memory # allocations. # CLI flag: -experimental.tsdb.bucket-store.max-chunk-pool-bytes @@ -178,6 +204,54 @@ tsdb: # CLI flag: -experimental.tsdb.bucket-store.consistency-delay [consistency_delay: | default = 0s] + index_cache: + # The index cache backend type. Supported values: inmemory, memcached. + # CLI flag: -experimental.tsdb.bucket-store.index-cache.backend + [backend: | default = "inmemory"] + + inmemory: + # Maximum size in bytes of in-memory index cache used to speed up blocks + # index lookups (shared between all tenants). + # CLI flag: -experimental.tsdb.bucket-store.index-cache.inmemory.max-size-bytes + [max_size_bytes: | default = 1073741824] + + memcached: + # Comma separated list of memcached addresses. Supported prefixes are: + # dns+ (looked up as an A/AAAA query), dnssrv+ (looked up as a SRV + # query, dnssrvnoa+ (looked up as a SRV query, with no A/AAAA lookup + # made after that). + # CLI flag: -experimental.tsdb.bucket-store.index-cache.memcached.addresses + [addresses: | default = ""] + + # The socket read/write timeout. + # CLI flag: -experimental.tsdb.bucket-store.index-cache.memcached.timeout + [timeout: | default = 100ms] + + # The maximum number of idle connections that will be maintained per + # address. + # CLI flag: -experimental.tsdb.bucket-store.index-cache.memcached.max-idle-connections + [max_idle_connections: | default = 16] + + # The maximum number of concurrent asynchronous operations can occur. + # CLI flag: -experimental.tsdb.bucket-store.index-cache.memcached.max-async-concurrency + [max_async_concurrency: | default = 50] + + # The maximum number of enqueued asynchronous operations allowed. + # CLI flag: -experimental.tsdb.bucket-store.index-cache.memcached.max-async-buffer-size + [max_async_buffer_size: | default = 10000] + + # The maximum number of concurrent connections running get operations. + # If set to 0, concurrency is unlimited. + # CLI flag: -experimental.tsdb.bucket-store.index-cache.memcached.max-get-multi-concurrency + [max_get_multi_concurrency: | default = 100] + + # The maximum number of keys a single underlying get operation should + # run. If more keys are specified, internally keys are splitted into + # multiple batches and fetched concurrently, honoring the max + # concurrency. If set to 0, the max batch size is unlimited. + # CLI flag: -experimental.tsdb.bucket-store.index-cache.memcached.max-get-multi-batch-size + [max_get_multi_batch_size: | default = 0] + # How frequently does Cortex try to compact TSDB head. Block is only created # if data covers smallest block range. Must be greater than 0 and max 5 # minutes. diff --git a/docs/operations/blocks-storage.template b/docs/operations/blocks-storage.template index 52c2882bbf3..7eec681bf35 100644 --- a/docs/operations/blocks-storage.template +++ b/docs/operations/blocks-storage.template @@ -72,6 +72,37 @@ Whenever the pool of compactors increase or decrease (ie. following up a scale u - `GET /compactor_ring`
Displays the status of the compactors ring, including the tokens owned by each compactor and an option to remove (forget) instances from the ring. +## Index cache + +The querier supports a cache to speed up postings and series lookups from TSDB blocks indexes. Two backends are supported: + +- `inmemory` +- `memcached` + +### In-memory index cache + +The `inmemory` index cache is **enabled by default** and its max size can be configured through the flag `-experimental.tsdb.bucket-store.index-cache.inmemory.max-size-bytes` (or config file). The trade-off of using the in-memory index cache is: + +- Pros: zero latency +- Cons: increased querier memory usage, not shared across multiple querier replicas + +### Memcached index cache + +The `memcached` index cache allows to use [Memcached](https://memcached.org/) as cache backend. This cache backend is configured using `-experimental.tsdb.bucket-store.index-cache.backend=memcached` and requires the Memcached server(s) addresses via `-experimental.tsdb.bucket-store.index-cache.memcached.addresses` (or config file). The addresses are resolved using the [DNS service provider](dns-service-discovery.md). + +The trade-off of using the Memcached index cache is: + +- Pros: can scale beyond a single node memory (Memcached cluster), shared across multiple querier instances +- Cons: higher latency in the cache round trip compared to the in-memory one + +The Memcached client uses a jump hash algorithm to shard cached entries across a cluster of Memcached servers. For this reason, you should make sure memcached servers are **not** behind any kind of load balancer and their address is configured so that servers are added/removed to the end of the list whenever a scale up/down occurs. + +For example, if you're running Memcached in Kubernetes, you may: + +1. Deploy your Memcached cluster using a [StatefulSet](https://kubernetes.io/docs/concepts/workloads/controllers/statefulset/) +2. Create an [headless service](https://kubernetes.io/docs/concepts/services-networking/service/#headless-services) for Memcached StatefulSet +3. Configure the Cortex's Memcached client address using the `dnssrvnoa+` [service discovery](dns-service-discovery.md) + ## Configuration The general [configuration documentation](../configuration/_index.md) also applied to a Cortex cluster running the blocks storage, with few differences: diff --git a/docs/operations/dns-service-discovery.md b/docs/operations/dns-service-discovery.md new file mode 100644 index 00000000000..864247f9653 --- /dev/null +++ b/docs/operations/dns-service-discovery.md @@ -0,0 +1,21 @@ +--- +title: "DNS service discovery" +linkTitle: "DNS service discovery" +weight: 2 +slug: dns-service-discovery +--- + +Some clients in Cortex support service discovery via DNS to find addresses of backend servers to connect to (ie. caching servers). The clients supporting it are: + +- [Blocks storage's memcached index cache](blocks-storage.md#memcached-index-cache) + +## Supported discovery modes + +The DNS service discovery supports different discovery modes. A discovery mode is selected adding a specific prefix to the address. The supported prefixes are: + +- **`dns+`**
+ The domain name after the prefix is looked up as an A/AAAA query. For example: `dns+memcached.local:11211` +- **`dnssrv+`**
+ The domain name after the prefix is looked up as a SRV query, and then each SRV record is resolved as an A/AAAA record. For example: `dnssrv+memcached.namespace.svc.cluster.local` +- **`dnssrvnoa+`**
+ The domain name after the prefix is looked up as a SRV query, with no A/AAAA lookup made after that. For example: `dnssrvnoa+memcached.namespace.svc.cluster.local` diff --git a/docs/operations/query-auditor.md b/docs/operations/query-auditor.md index 32828cef159..17f6bc28b9c 100644 --- a/docs/operations/query-auditor.md +++ b/docs/operations/query-auditor.md @@ -1,7 +1,7 @@ --- title: "Query Auditor (tool)" linkTitle: "Query Auditor (tool)" -weight: 2 +weight: 3 slug: query-auditor --- diff --git a/docs/operations/query-tee.md b/docs/operations/query-tee.md index 73d58299b2b..ca21f393cf8 100644 --- a/docs/operations/query-tee.md +++ b/docs/operations/query-tee.md @@ -1,7 +1,7 @@ --- title: "Query Tee (service)" linkTitle: "Query Tee (service)" -weight: 3 +weight: 4 slug: query-tee --- diff --git a/integration/e2e/cache/cache.go b/integration/e2e/cache/cache.go new file mode 100644 index 00000000000..42f2d080ca4 --- /dev/null +++ b/integration/e2e/cache/cache.go @@ -0,0 +1,21 @@ +package e2ecache + +import ( + "github.com/cortexproject/cortex/integration/e2e" +) + +const ( + MemcachedPort = 11211 +) + +func NewMemcached() *e2e.ConcreteService { + return e2e.NewConcreteService( + "memcached", + // If you change the image tag, remember to update it in the preloading done + // by CircleCI too (see .circleci/config.yml). + "memcached:1.6.1", + nil, + e2e.NewTCPReadinessProbe(MemcachedPort), + MemcachedPort, + ) +} diff --git a/integration/e2e/service.go b/integration/e2e/service.go index 395ea533908..a2ef5627e30 100644 --- a/integration/e2e/service.go +++ b/integration/e2e/service.go @@ -6,6 +6,7 @@ import ( "fmt" "io/ioutil" "math" + "net" "os/exec" "regexp" "strconv" @@ -249,7 +250,12 @@ func (s *ConcreteService) WaitStarted() (err error) { } for s.retryBackoff.Reset(); s.retryBackoff.Ongoing(); { - err = exec.Command("docker", "inspect", s.containerName()).Run() + // Enforce a timeout on the command execution because we've seen some flaky tests + // stuck here. + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + err = exec.CommandContext(ctx, "docker", "inspect", s.containerName()).Run() if err == nil { return nil } @@ -390,6 +396,33 @@ func (p *HTTPReadinessProbe) Ready(service *ConcreteService) (err error) { return fmt.Errorf("got no expected status code: %v, expected: %v", res.StatusCode, p.expectedStatus) } +// TCPReadinessProbe checks readiness by ensure a TCP connection can be established. +type TCPReadinessProbe struct { + port int +} + +func NewTCPReadinessProbe(port int) *TCPReadinessProbe { + return &TCPReadinessProbe{ + port: port, + } +} + +func (p *TCPReadinessProbe) Ready(service *ConcreteService) (err error) { + endpoint := service.Endpoint(p.port) + if endpoint == "" { + return fmt.Errorf("cannot get service endpoint for port %d", p.port) + } else if endpoint == "stopped" { + return errors.New("service has stopped") + } + + conn, err := net.DialTimeout("tcp", endpoint, time.Second) + if err != nil { + return err + } + + return conn.Close() +} + // CmdReadinessProbe checks readiness by `Exec`ing a command (within container) which returns 0 to consider status being ready type CmdReadinessProbe struct { cmd *Command diff --git a/integration/querier_test.go b/integration/querier_test.go index fc576057f42..a88371300d6 100644 --- a/integration/querier_test.go +++ b/integration/querier_test.go @@ -11,22 +11,32 @@ import ( "github.com/stretchr/testify/require" "github.com/cortexproject/cortex/integration/e2e" + e2ecache "github.com/cortexproject/cortex/integration/e2e/cache" e2edb "github.com/cortexproject/cortex/integration/e2e/db" "github.com/cortexproject/cortex/integration/e2ecortex" + "github.com/cortexproject/cortex/pkg/storage/tsdb" ) func TestQuerierWithBlocksStorage(t *testing.T) { tests := map[string]struct { flags map[string]string }{ - "querier running with ingester gRPC streaming disabled": { + "querier running with ingester gRPC streaming disabled and inmemory index cache": { flags: mergeFlags(BlocksStorageFlags, map[string]string{ - "-querier.ingester-streaming": "false", + "-querier.ingester-streaming": "false", + "-experimental.tsdb.bucket-store.index-cache.backend": "inmemory", }), }, - "querier running with ingester gRPC streaming enabled": { + "querier running with ingester gRPC streaming enabled and inmemory index cache": { flags: mergeFlags(BlocksStorageFlags, map[string]string{ - "-querier.ingester-streaming": "true", + "-querier.ingester-streaming": "true", + "-experimental.tsdb.bucket-store.index-cache.backend": "inmemory", + }), + }, + "queintegration/e2e/service.gorier running with memcached index cache": { + flags: mergeFlags(BlocksStorageFlags, map[string]string{ + // The address will be inject during the test execution because it's dynamic. + "-experimental.tsdb.bucket-store.index-cache.backend": "memcached", }), }, } @@ -48,10 +58,20 @@ func TestQuerierWithBlocksStorage(t *testing.T) { "-experimental.tsdb.retention-period": ((blockRangePeriod * 2) - 1).String(), }) + // Detect the index cache backend from flags. + indexCacheBackend := tsdb.IndexCacheBackendDefault + if flags["-experimental.tsdb.bucket-store.index-cache.backend"] != "" { + indexCacheBackend = flags["-experimental.tsdb.bucket-store.index-cache.backend"] + } + // Start dependencies. consul := e2edb.NewConsul() minio := e2edb.NewMinio(9000, flags["-experimental.tsdb.s3.bucket-name"]) - require.NoError(t, s.StartAndWaitReady(consul, minio)) + memcached := e2ecache.NewMemcached() + require.NoError(t, s.StartAndWaitReady(consul, minio, memcached)) + + // Add the memcached address to the flags. + flags["-experimental.tsdb.bucket-store.index-cache.memcached.addresses"] = "dns+" + memcached.NetworkEndpoint(e2ecache.MemcachedPort) // Start Cortex components. distributor := e2ecortex.NewDistributor("distributor", consul.NetworkHTTPEndpoint(), flags, "") @@ -121,9 +141,15 @@ func TestQuerierWithBlocksStorage(t *testing.T) { assert.Equal(t, expectedVector3, result.(model.Vector)) // Check the in-memory index cache metrics (in the querier). - require.NoError(t, querier.WaitSumMetrics(e2e.Equals(2*2), "cortex_querier_blocks_index_cache_items")) // 2 series both for postings and series cache - require.NoError(t, querier.WaitSumMetrics(e2e.Equals(2*2), "cortex_querier_blocks_index_cache_items_added_total")) // 2 series both for postings and series cache - require.NoError(t, querier.WaitSumMetrics(e2e.Equals(0), "cortex_querier_blocks_index_cache_hits_total")) // no cache hit cause the cache was empty + require.NoError(t, querier.WaitSumMetrics(e2e.Equals(7), "cortex_querier_blocks_index_cache_requests_total")) + require.NoError(t, querier.WaitSumMetrics(e2e.Equals(0), "cortex_querier_blocks_index_cache_hits_total")) // no cache hit cause the cache was empty + + if indexCacheBackend == tsdb.IndexCacheBackendInMemory { + require.NoError(t, querier.WaitSumMetrics(e2e.Equals(2*2), "cortex_querier_blocks_index_cache_items")) // 2 series both for postings and series cache + require.NoError(t, querier.WaitSumMetrics(e2e.Equals(2*2), "cortex_querier_blocks_index_cache_items_added_total")) // 2 series both for postings and series cache + } else if indexCacheBackend == tsdb.IndexCacheBackendMemcached { + require.NoError(t, querier.WaitSumMetrics(e2e.Equals(11), "cortex_querier_blocks_index_cache_memcached_operations_total")) // 7 gets + 4 sets + } // Query back again the 1st series from storage. This time it should use the index cache. result, err = c.Query("series_1", series1Timestamp) @@ -131,9 +157,15 @@ func TestQuerierWithBlocksStorage(t *testing.T) { require.Equal(t, model.ValVector, result.Type()) assert.Equal(t, expectedVector1, result.(model.Vector)) - require.NoError(t, querier.WaitSumMetrics(e2e.Equals(2*2), "cortex_querier_blocks_index_cache_items")) // as before - require.NoError(t, querier.WaitSumMetrics(e2e.Equals(2*2), "cortex_querier_blocks_index_cache_items_added_total")) // as before - require.NoError(t, querier.WaitSumMetrics(e2e.Equals(2), "cortex_querier_blocks_index_cache_hits_total")) // this time has used the index cache + require.NoError(t, querier.WaitSumMetrics(e2e.Equals(7+2), "cortex_querier_blocks_index_cache_requests_total")) + require.NoError(t, querier.WaitSumMetrics(e2e.Equals(2), "cortex_querier_blocks_index_cache_hits_total")) // this time has used the index cache + + if indexCacheBackend == tsdb.IndexCacheBackendInMemory { + require.NoError(t, querier.WaitSumMetrics(e2e.Equals(2*2), "cortex_querier_blocks_index_cache_items")) // as before + require.NoError(t, querier.WaitSumMetrics(e2e.Equals(2*2), "cortex_querier_blocks_index_cache_items_added_total")) // as before + } else if indexCacheBackend == tsdb.IndexCacheBackendMemcached { + require.NoError(t, querier.WaitSumMetrics(e2e.Equals(11+2), "cortex_querier_blocks_index_cache_memcached_operations_total")) // as before + 2 gets + } }) } } diff --git a/pkg/cortex/cortex_test.go b/pkg/cortex/cortex_test.go index d7e38825893..997876ea207 100644 --- a/pkg/cortex/cortex_test.go +++ b/pkg/cortex/cortex_test.go @@ -41,6 +41,11 @@ func TestCortex(t *testing.T) { S3: s3.Config{ Endpoint: "localhost", }, + BucketStore: tsdb.BucketStoreConfig{ + IndexCache: tsdb.IndexCacheConfig{ + Backend: tsdb.IndexCacheBackendInMemory, + }, + }, }, Target: All, } diff --git a/pkg/querier/block_store.go b/pkg/querier/block_store.go index 8a19d95c6e8..76499f6be56 100644 --- a/pkg/querier/block_store.go +++ b/pkg/querier/block_store.go @@ -41,7 +41,7 @@ type UserStore struct { client storepb.StoreClient logLevel logging.Level bucketStoreMetrics *tsdbBucketStoreMetrics - indexCacheMetrics *tsdbIndexCacheMetrics + indexCacheMetrics prometheus.Collector // Index cache shared across all tenants. indexCache storecache.IndexCache @@ -67,7 +67,7 @@ func NewUserStore(cfg tsdb.Config, bucketClient objstore.Bucket, logLevel loggin stores: map[string]*store.BucketStore{}, logLevel: logLevel, bucketStoreMetrics: newTSDBBucketStoreMetrics(), - indexCacheMetrics: newTSDBIndexCacheMetrics(indexCacheRegistry), + indexCacheMetrics: tsdb.MustNewIndexCacheMetrics(cfg.BucketStore.IndexCache.Backend, indexCacheRegistry), syncTimes: promauto.With(registerer).NewHistogram(prometheus.HistogramOpts{ Name: "cortex_querier_blocks_sync_seconds", Help: "The total time it takes to perform a sync stores", @@ -77,7 +77,7 @@ func NewUserStore(cfg tsdb.Config, bucketClient objstore.Bucket, logLevel loggin // Init the index cache. var err error - if u.indexCache, err = tsdb.NewIndexCache(cfg.BucketStore, logger, indexCacheRegistry); err != nil { + if u.indexCache, err = tsdb.NewIndexCache(cfg.BucketStore.IndexCache, logger, indexCacheRegistry); err != nil { return nil, errors.Wrap(err, "create index cache") } diff --git a/pkg/querier/block_store_metrics.go b/pkg/querier/block_store_metrics.go index bdaae2958de..d47bb2ef03a 100644 --- a/pkg/querier/block_store_metrics.go +++ b/pkg/querier/block_store_metrics.go @@ -185,90 +185,3 @@ func (m *tsdbBucketStoreMetrics) Collect(out chan<- prometheus.Metric) { data.SendSumOfHistograms(out, m.metaSyncDuration, "blocks_meta_sync_duration_seconds") data.SendMaxOfGauges(out, m.metaSyncConsistencyDelay, "consistency_delay_seconds") } - -// This struct aggregates metrics exported by Thanos Index Cache -// and re-exports as Cortex metrics. -type tsdbIndexCacheMetrics struct { - reg *prometheus.Registry - - // Metrics gathered from Thanos storecache.InMemoryIndexCache - cacheItemsEvicted *prometheus.Desc - cacheItemsAdded *prometheus.Desc - cacheRequests *prometheus.Desc - cacheItemsOverflow *prometheus.Desc - cacheHits *prometheus.Desc - cacheItemsCurrentCount *prometheus.Desc - cacheItemsCurrentSize *prometheus.Desc - cacheItemsTotalCurrentSize *prometheus.Desc - - // Ignored: - // thanos_store_index_cache_max_size_bytes - // thanos_store_index_cache_max_item_size_bytes -} - -func newTSDBIndexCacheMetrics(reg *prometheus.Registry) *tsdbIndexCacheMetrics { - return &tsdbIndexCacheMetrics{ - reg: reg, - - // Cache - cacheItemsEvicted: prometheus.NewDesc( - "cortex_querier_blocks_index_cache_items_evicted_total", - "TSDB: Total number of items that were evicted from the index cache.", - []string{"item_type"}, nil), - cacheItemsAdded: prometheus.NewDesc( - "cortex_querier_blocks_index_cache_items_added_total", - "TSDB: Total number of items that were added to the index cache.", - []string{"item_type"}, nil), - cacheRequests: prometheus.NewDesc( - "cortex_querier_blocks_index_cache_requests_total", - "TSDB: Total number of requests to the cache.", - []string{"item_type"}, nil), - cacheItemsOverflow: prometheus.NewDesc( - "cortex_querier_blocks_index_cache_items_overflowed_total", - "TSDB: Total number of items that could not be added to the cache due to being too big.", - []string{"item_type"}, nil), - cacheHits: prometheus.NewDesc( - "cortex_querier_blocks_index_cache_hits_total", - "TSDB: Total number of requests to the cache that were a hit.", - []string{"item_type"}, nil), - cacheItemsCurrentCount: prometheus.NewDesc( - "cortex_querier_blocks_index_cache_items", - "TSDB: Current number of items in the index cache.", - []string{"item_type"}, nil), - cacheItemsCurrentSize: prometheus.NewDesc( - "cortex_querier_blocks_index_cache_items_size_bytes", - "TSDB: Current byte size of items in the index cache.", - []string{"item_type"}, nil), - cacheItemsTotalCurrentSize: prometheus.NewDesc( - "cortex_querier_blocks_index_cache_total_size_bytes", - "TSDB: Current byte size of items (both value and key) in the index cache.", - []string{"item_type"}, nil), - } -} - -func (m *tsdbIndexCacheMetrics) Describe(out chan<- *prometheus.Desc) { - out <- m.cacheItemsEvicted - out <- m.cacheItemsAdded - out <- m.cacheRequests - out <- m.cacheItemsOverflow - out <- m.cacheHits - out <- m.cacheItemsCurrentCount - out <- m.cacheItemsCurrentSize - out <- m.cacheItemsTotalCurrentSize -} - -func (m *tsdbIndexCacheMetrics) Collect(out chan<- prometheus.Metric) { - data := util.BuildMetricFamiliesPerUserFromUserRegistries(map[string]*prometheus.Registry{ - "": m.reg, - }) - - data.SendSumOfCountersWithLabels(out, m.cacheItemsEvicted, "thanos_store_index_cache_items_evicted_total", "item_type") - data.SendSumOfCountersWithLabels(out, m.cacheItemsAdded, "thanos_store_index_cache_items_added_total", "item_type") - data.SendSumOfCountersWithLabels(out, m.cacheRequests, "thanos_store_index_cache_requests_total", "item_type") - data.SendSumOfCountersWithLabels(out, m.cacheItemsOverflow, "thanos_store_index_cache_items_overflowed_total", "item_type") - data.SendSumOfCountersWithLabels(out, m.cacheHits, "thanos_store_index_cache_hits_total", "item_type") - - data.SendSumOfGaugesWithLabels(out, m.cacheItemsCurrentCount, "thanos_store_index_cache_items", "item_type") - data.SendSumOfGaugesWithLabels(out, m.cacheItemsCurrentSize, "thanos_store_index_cache_items_size_bytes", "item_type") - data.SendSumOfGaugesWithLabels(out, m.cacheItemsTotalCurrentSize, "thanos_store_index_cache_total_size_bytes", "item_type") -} diff --git a/pkg/querier/bucket_store_metrics_test.go b/pkg/querier/bucket_store_metrics_test.go index adc3d78a1e5..4e1718845f2 100644 --- a/pkg/querier/bucket_store_metrics_test.go +++ b/pkg/querier/bucket_store_metrics_test.go @@ -153,56 +153,6 @@ func TestTSDBBucketStoreMetrics(t *testing.T) { require.NoError(t, err) } -func TestTSDBIndexCacheMetrics(t *testing.T) { - mainReg := prometheus.NewPedanticRegistry() - cacheMetrics := newTSDBIndexCacheMetrics(populateTSDBIndexCacheMetrics(5328)) - mainReg.MustRegister(cacheMetrics) - - //noinspection ALL - err := testutil.GatherAndCompare(mainReg, bytes.NewBufferString(` - # HELP cortex_querier_blocks_index_cache_items_evicted_total TSDB: Total number of items that were evicted from the index cache. - # TYPE cortex_querier_blocks_index_cache_items_evicted_total counter - cortex_querier_blocks_index_cache_items_evicted_total{item_type="Postings"} 5328 - cortex_querier_blocks_index_cache_items_evicted_total{item_type="Series"} 10656 - - # HELP cortex_querier_blocks_index_cache_requests_total TSDB: Total number of requests to the cache. - # TYPE cortex_querier_blocks_index_cache_requests_total counter - cortex_querier_blocks_index_cache_requests_total{item_type="Postings"} 15984 - cortex_querier_blocks_index_cache_requests_total{item_type="Series"} 21312 - - # HELP cortex_querier_blocks_index_cache_hits_total TSDB: Total number of requests to the cache that were a hit. - # TYPE cortex_querier_blocks_index_cache_hits_total counter - cortex_querier_blocks_index_cache_hits_total{item_type="Postings"} 26640 - cortex_querier_blocks_index_cache_hits_total{item_type="Series"} 31968 - - # HELP cortex_querier_blocks_index_cache_items_added_total TSDB: Total number of items that were added to the index cache. - # TYPE cortex_querier_blocks_index_cache_items_added_total counter - cortex_querier_blocks_index_cache_items_added_total{item_type="Postings"} 37296 - cortex_querier_blocks_index_cache_items_added_total{item_type="Series"} 42624 - - # HELP cortex_querier_blocks_index_cache_items TSDB: Current number of items in the index cache. - # TYPE cortex_querier_blocks_index_cache_items gauge - cortex_querier_blocks_index_cache_items{item_type="Postings"} 47952 - cortex_querier_blocks_index_cache_items{item_type="Series"} 53280 - - # HELP cortex_querier_blocks_index_cache_items_size_bytes TSDB: Current byte size of items in the index cache. - # TYPE cortex_querier_blocks_index_cache_items_size_bytes gauge - cortex_querier_blocks_index_cache_items_size_bytes{item_type="Postings"} 58608 - cortex_querier_blocks_index_cache_items_size_bytes{item_type="Series"} 63936 - - # HELP cortex_querier_blocks_index_cache_total_size_bytes TSDB: Current byte size of items (both value and key) in the index cache. - # TYPE cortex_querier_blocks_index_cache_total_size_bytes gauge - cortex_querier_blocks_index_cache_total_size_bytes{item_type="Postings"} 69264 - cortex_querier_blocks_index_cache_total_size_bytes{item_type="Series"} 74592 - - # HELP cortex_querier_blocks_index_cache_items_overflowed_total TSDB: Total number of items that could not be added to the cache due to being too big. - # TYPE cortex_querier_blocks_index_cache_items_overflowed_total counter - cortex_querier_blocks_index_cache_items_overflowed_total{item_type="Postings"} 79920 - cortex_querier_blocks_index_cache_items_overflowed_total{item_type="Series"} 85248 -`)) - require.NoError(t, err) -} - func BenchmarkMetricsCollections10(b *testing.B) { benchmarkMetricsCollection(b, 10) } @@ -289,30 +239,6 @@ func populateTSDBBucketStoreMetrics(base float64) *prometheus.Registry { return reg } -func populateTSDBIndexCacheMetrics(base float64) *prometheus.Registry { - reg := prometheus.NewRegistry() - c := newIndexStoreCacheMetrics(reg) - - c.evicted.WithLabelValues(cacheTypePostings).Add(base * 1) - c.evicted.WithLabelValues(cacheTypeSeries).Add(base * 2) - c.requests.WithLabelValues(cacheTypePostings).Add(base * 3) - c.requests.WithLabelValues(cacheTypeSeries).Add(base * 4) - c.hits.WithLabelValues(cacheTypePostings).Add(base * 5) - c.hits.WithLabelValues(cacheTypeSeries).Add(base * 6) - c.added.WithLabelValues(cacheTypePostings).Add(base * 7) - c.added.WithLabelValues(cacheTypeSeries).Add(base * 8) - c.current.WithLabelValues(cacheTypePostings).Set(base * 9) - c.current.WithLabelValues(cacheTypeSeries).Set(base * 10) - c.currentSize.WithLabelValues(cacheTypePostings).Set(base * 11) - c.currentSize.WithLabelValues(cacheTypeSeries).Set(base * 12) - c.totalCurrentSize.WithLabelValues(cacheTypePostings).Set(base * 13) - c.totalCurrentSize.WithLabelValues(cacheTypeSeries).Set(base * 14) - c.overflow.WithLabelValues(cacheTypePostings).Add(base * 15) - c.overflow.WithLabelValues(cacheTypeSeries).Add(base * 16) - - return reg -} - // copied from Thanos, pkg/store/bucket.go type bucketStoreMetrics struct { blocksLoaded prometheus.Gauge @@ -335,23 +261,6 @@ type bucketStoreMetrics struct { metaSyncConsistencyDelay prometheus.Gauge } -// Copied from Thanos, pkg/store/cache/inmemory.go, InMemoryIndexCache struct -type indexStoreCacheMetrics struct { - evicted *prometheus.CounterVec - requests *prometheus.CounterVec - hits *prometheus.CounterVec - added *prometheus.CounterVec - current *prometheus.GaugeVec - currentSize *prometheus.GaugeVec - totalCurrentSize *prometheus.GaugeVec - overflow *prometheus.CounterVec -} - -const ( - cacheTypePostings string = "Postings" - cacheTypeSeries string = "Series" -) - func newBucketStoreMetrics(reg prometheus.Registerer) *bucketStoreMetrics { var m bucketStoreMetrics @@ -463,68 +372,3 @@ func newBucketStoreMetrics(reg prometheus.Registerer) *bucketStoreMetrics { } return &m } - -func newIndexStoreCacheMetrics(reg prometheus.Registerer) *indexStoreCacheMetrics { - c := indexStoreCacheMetrics{} - c.evicted = prometheus.NewCounterVec(prometheus.CounterOpts{ - Name: "thanos_store_index_cache_items_evicted_total", - Help: "Total number of items that were evicted from the index cache.", - }, []string{"item_type"}) - c.evicted.WithLabelValues(cacheTypePostings) - c.evicted.WithLabelValues(cacheTypeSeries) - - c.added = prometheus.NewCounterVec(prometheus.CounterOpts{ - Name: "thanos_store_index_cache_items_added_total", - Help: "Total number of items that were added to the index cache.", - }, []string{"item_type"}) - c.added.WithLabelValues(cacheTypePostings) - c.added.WithLabelValues(cacheTypeSeries) - - c.requests = prometheus.NewCounterVec(prometheus.CounterOpts{ - Name: "thanos_store_index_cache_requests_total", - Help: "Total number of requests to the cache.", - }, []string{"item_type"}) - c.requests.WithLabelValues(cacheTypePostings) - c.requests.WithLabelValues(cacheTypeSeries) - - c.overflow = prometheus.NewCounterVec(prometheus.CounterOpts{ - Name: "thanos_store_index_cache_items_overflowed_total", - Help: "Total number of items that could not be added to the cache due to being too big.", - }, []string{"item_type"}) - c.overflow.WithLabelValues(cacheTypePostings) - c.overflow.WithLabelValues(cacheTypeSeries) - - c.hits = prometheus.NewCounterVec(prometheus.CounterOpts{ - Name: "thanos_store_index_cache_hits_total", - Help: "Total number of requests to the cache that were a hit.", - }, []string{"item_type"}) - c.hits.WithLabelValues(cacheTypePostings) - c.hits.WithLabelValues(cacheTypeSeries) - - c.current = prometheus.NewGaugeVec(prometheus.GaugeOpts{ - Name: "thanos_store_index_cache_items", - Help: "Current number of items in the index cache.", - }, []string{"item_type"}) - c.current.WithLabelValues(cacheTypePostings) - c.current.WithLabelValues(cacheTypeSeries) - - c.currentSize = prometheus.NewGaugeVec(prometheus.GaugeOpts{ - Name: "thanos_store_index_cache_items_size_bytes", - Help: "Current byte size of items in the index cache.", - }, []string{"item_type"}) - c.currentSize.WithLabelValues(cacheTypePostings) - c.currentSize.WithLabelValues(cacheTypeSeries) - - c.totalCurrentSize = prometheus.NewGaugeVec(prometheus.GaugeOpts{ - Name: "thanos_store_index_cache_total_size_bytes", - Help: "Current byte size of items (both value and key) in the index cache.", - }, []string{"item_type"}) - c.totalCurrentSize.WithLabelValues(cacheTypePostings) - c.totalCurrentSize.WithLabelValues(cacheTypeSeries) - - if reg != nil { - reg.MustRegister(c.requests, c.hits, c.added, c.evicted, c.current, c.currentSize, c.totalCurrentSize, c.overflow) - } - - return &c -} diff --git a/pkg/storage/tsdb/bucket_client.go b/pkg/storage/tsdb/bucket_client.go index 69573d86e19..3e98a58b3e4 100644 --- a/pkg/storage/tsdb/bucket_client.go +++ b/pkg/storage/tsdb/bucket_client.go @@ -24,6 +24,6 @@ func NewBucketClient(ctx context.Context, cfg Config, name string, logger log.Lo case BackendFilesystem: return filesystem.NewBucketClient(cfg.Filesystem) default: - return nil, errUnsupportedBackend + return nil, errUnsupportedStorageBackend } } diff --git a/pkg/storage/tsdb/bucket_client_test.go b/pkg/storage/tsdb/bucket_client_test.go index e03dfae0fb6..1720a74b6d5 100644 --- a/pkg/storage/tsdb/bucket_client_test.go +++ b/pkg/storage/tsdb/bucket_client_test.go @@ -69,7 +69,7 @@ func TestNewBucketClient(t *testing.T) { }, "should return error on unknown backend": { config: configWithUnknownBackend, - expectedErr: errUnsupportedBackend, + expectedErr: errUnsupportedStorageBackend, expectedType: nil, }, } diff --git a/pkg/storage/tsdb/config.go b/pkg/storage/tsdb/config.go index 577645d8647..f5f27f63d01 100644 --- a/pkg/storage/tsdb/config.go +++ b/pkg/storage/tsdb/config.go @@ -14,6 +14,7 @@ import ( "github.com/cortexproject/cortex/pkg/storage/tsdb/backend/filesystem" "github.com/cortexproject/cortex/pkg/storage/tsdb/backend/gcs" "github.com/cortexproject/cortex/pkg/storage/tsdb/backend/s3" + "github.com/cortexproject/cortex/pkg/util" ) const ( @@ -37,7 +38,7 @@ const ( var ( supportedBackends = []string{BackendS3, BackendGCS, BackendAzure, BackendFilesystem} - errUnsupportedBackend = errors.New("unsupported TSDB storage backend") + errUnsupportedStorageBackend = errors.New("unsupported TSDB storage backend") errInvalidShipConcurrency = errors.New("invalid TSDB ship concurrency") errInvalidCompactionInterval = errors.New("invalid TSDB compaction interval") errInvalidCompactionConcurrency = errors.New("invalid TSDB compaction concurrency") @@ -128,10 +129,10 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.IntVar(&cfg.StripeSize, "experimental.tsdb.stripe-size", 16384, "The number of shards of series to use in TSDB (must be a power of 2). Reducing this will decrease memory footprint, but can negatively impact performance.") } -// Validate the config +// Validate the config. func (cfg *Config) Validate() error { - if cfg.Backend != BackendS3 && cfg.Backend != BackendGCS && cfg.Backend != BackendAzure && cfg.Backend != BackendFilesystem { - return errUnsupportedBackend + if !util.StringsContain(supportedBackends, cfg.Backend) { + return errUnsupportedStorageBackend } if cfg.ShipInterval > 0 && cfg.ShipConcurrency <= 0 { @@ -150,29 +151,30 @@ func (cfg *Config) Validate() error { return errInvalidStripeSize } - return nil + return cfg.BucketStore.Validate() } // BucketStoreConfig holds the config information for Bucket Stores used by the querier type BucketStoreConfig struct { - SyncDir string `yaml:"sync_dir"` - SyncInterval time.Duration `yaml:"sync_interval"` - IndexCacheSizeBytes uint64 `yaml:"index_cache_size_bytes"` - MaxChunkPoolBytes uint64 `yaml:"max_chunk_pool_bytes"` - MaxSampleCount uint64 `yaml:"max_sample_count"` - MaxConcurrent int `yaml:"max_concurrent"` - TenantSyncConcurrency int `yaml:"tenant_sync_concurrency"` - BlockSyncConcurrency int `yaml:"block_sync_concurrency"` - MetaSyncConcurrency int `yaml:"meta_sync_concurrency"` - BinaryIndexHeader bool `yaml:"binary_index_header_enabled"` - ConsistencyDelay time.Duration `yaml:"consistency_delay"` + SyncDir string `yaml:"sync_dir"` + SyncInterval time.Duration `yaml:"sync_interval"` + MaxChunkPoolBytes uint64 `yaml:"max_chunk_pool_bytes"` + MaxSampleCount uint64 `yaml:"max_sample_count"` + MaxConcurrent int `yaml:"max_concurrent"` + TenantSyncConcurrency int `yaml:"tenant_sync_concurrency"` + BlockSyncConcurrency int `yaml:"block_sync_concurrency"` + MetaSyncConcurrency int `yaml:"meta_sync_concurrency"` + BinaryIndexHeader bool `yaml:"binary_index_header_enabled"` + ConsistencyDelay time.Duration `yaml:"consistency_delay"` + IndexCache IndexCacheConfig `yaml:"index_cache"` } // RegisterFlags registers the BucketStore flags func (cfg *BucketStoreConfig) RegisterFlags(f *flag.FlagSet) { + cfg.IndexCache.RegisterFlagsWithPrefix(f, "experimental.tsdb.bucket-store.index-cache.") + f.StringVar(&cfg.SyncDir, "experimental.tsdb.bucket-store.sync-dir", "tsdb-sync", "Directory to store synchronized TSDB index headers.") f.DurationVar(&cfg.SyncInterval, "experimental.tsdb.bucket-store.sync-interval", 5*time.Minute, "How frequently scan the bucket to look for changes (new blocks shipped by ingesters and blocks removed by retention or compaction). 0 disables it.") - f.Uint64Var(&cfg.IndexCacheSizeBytes, "experimental.tsdb.bucket-store.index-cache-size-bytes", uint64(1*units.Gibibyte), "Size in bytes of in-memory index cache used to speed up blocks index lookups (shared between all tenants).") f.Uint64Var(&cfg.MaxChunkPoolBytes, "experimental.tsdb.bucket-store.max-chunk-pool-bytes", uint64(2*units.Gibibyte), "Max size - in bytes - of a per-tenant chunk pool, used to reduce memory allocations.") f.Uint64Var(&cfg.MaxSampleCount, "experimental.tsdb.bucket-store.max-sample-count", 0, "Max number of samples per query when loading series from the long-term storage. 0 disables the limit.") f.IntVar(&cfg.MaxConcurrent, "experimental.tsdb.bucket-store.max-concurrent", 20, "Max number of concurrent queries to execute against the long-term storage on a per-tenant basis.") @@ -183,6 +185,11 @@ func (cfg *BucketStoreConfig) RegisterFlags(f *flag.FlagSet) { f.DurationVar(&cfg.ConsistencyDelay, "experimental.tsdb.bucket-store.consistency-delay", 0, "Minimum age of a block before it's being read. Set it to safe value (e.g 30m) if your object storage is eventually consistent. GCS and S3 are (roughly) strongly consistent.") } +// Validate the config. +func (cfg *BucketStoreConfig) Validate() error { + return cfg.IndexCache.Validate() +} + // BlocksDir returns the directory path where TSDB blocks and wal should be // stored by the ingester func (cfg *Config) BlocksDir(userID string) string { diff --git a/pkg/storage/tsdb/config_test.go b/pkg/storage/tsdb/config_test.go index 9101e245462..d2eee345250 100644 --- a/pkg/storage/tsdb/config_test.go +++ b/pkg/storage/tsdb/config_test.go @@ -21,6 +21,11 @@ func TestConfig_Validate(t *testing.T) { HeadCompactionInterval: 1 * time.Minute, HeadCompactionConcurrency: 5, StripeSize: 2, + BucketStore: BucketStoreConfig{ + IndexCache: IndexCacheConfig{ + Backend: "inmemory", + }, + }, }, expectedErr: nil, }, @@ -30,15 +35,25 @@ func TestConfig_Validate(t *testing.T) { HeadCompactionInterval: 1 * time.Minute, HeadCompactionConcurrency: 5, StripeSize: 2, + BucketStore: BucketStoreConfig{ + IndexCache: IndexCacheConfig{ + Backend: "inmemory", + }, + }, }, expectedErr: nil, }, - "should fail on unknown backend": { + "should fail on unknown storage backend": { config: Config{ Backend: "unknown", StripeSize: 2, + BucketStore: BucketStoreConfig{ + IndexCache: IndexCacheConfig{ + Backend: "inmemory", + }, + }, }, - expectedErr: errUnsupportedBackend, + expectedErr: errUnsupportedStorageBackend, }, "should fail on invalid ship concurrency": { config: Config{ @@ -46,6 +61,11 @@ func TestConfig_Validate(t *testing.T) { ShipInterval: time.Minute, ShipConcurrency: 0, StripeSize: 2, + BucketStore: BucketStoreConfig{ + IndexCache: IndexCacheConfig{ + Backend: "inmemory", + }, + }, }, expectedErr: errInvalidShipConcurrency, }, @@ -57,6 +77,11 @@ func TestConfig_Validate(t *testing.T) { HeadCompactionInterval: 1 * time.Minute, HeadCompactionConcurrency: 5, StripeSize: 2, + BucketStore: BucketStoreConfig{ + IndexCache: IndexCacheConfig{ + Backend: "inmemory", + }, + }, }, expectedErr: nil, }, @@ -65,6 +90,11 @@ func TestConfig_Validate(t *testing.T) { Backend: "s3", HeadCompactionInterval: 0 * time.Minute, StripeSize: 2, + BucketStore: BucketStoreConfig{ + IndexCache: IndexCacheConfig{ + Backend: "inmemory", + }, + }, }, expectedErr: errInvalidCompactionInterval, }, @@ -73,6 +103,11 @@ func TestConfig_Validate(t *testing.T) { Backend: "s3", HeadCompactionInterval: 10 * time.Minute, StripeSize: 2, + BucketStore: BucketStoreConfig{ + IndexCache: IndexCacheConfig{ + Backend: "inmemory", + }, + }, }, expectedErr: errInvalidCompactionInterval, }, @@ -82,6 +117,11 @@ func TestConfig_Validate(t *testing.T) { HeadCompactionInterval: time.Minute, HeadCompactionConcurrency: 0, StripeSize: 2, + BucketStore: BucketStoreConfig{ + IndexCache: IndexCacheConfig{ + Backend: "inmemory", + }, + }, }, expectedErr: errInvalidCompactionConcurrency, }, @@ -91,6 +131,11 @@ func TestConfig_Validate(t *testing.T) { HeadCompactionInterval: time.Minute, HeadCompactionConcurrency: 10, StripeSize: 2, + BucketStore: BucketStoreConfig{ + IndexCache: IndexCacheConfig{ + Backend: "inmemory", + }, + }, }, expectedErr: nil, }, @@ -100,6 +145,11 @@ func TestConfig_Validate(t *testing.T) { HeadCompactionInterval: 1 * time.Minute, HeadCompactionConcurrency: 5, StripeSize: -2, + BucketStore: BucketStoreConfig{ + IndexCache: IndexCacheConfig{ + Backend: "inmemory", + }, + }, }, expectedErr: errInvalidStripeSize, }, @@ -109,6 +159,11 @@ func TestConfig_Validate(t *testing.T) { HeadCompactionInterval: 1 * time.Minute, HeadCompactionConcurrency: 5, StripeSize: 0, + BucketStore: BucketStoreConfig{ + IndexCache: IndexCacheConfig{ + Backend: "inmemory", + }, + }, }, expectedErr: errInvalidStripeSize, }, @@ -118,6 +173,11 @@ func TestConfig_Validate(t *testing.T) { HeadCompactionInterval: 1 * time.Minute, HeadCompactionConcurrency: 5, StripeSize: 1, + BucketStore: BucketStoreConfig{ + IndexCache: IndexCacheConfig{ + Backend: "inmemory", + }, + }, }, expectedErr: errInvalidStripeSize, }, @@ -127,6 +187,11 @@ func TestConfig_Validate(t *testing.T) { HeadCompactionInterval: 1 * time.Minute, HeadCompactionConcurrency: 5, StripeSize: 1 << 14, + BucketStore: BucketStoreConfig{ + IndexCache: IndexCacheConfig{ + Backend: "inmemory", + }, + }, }, expectedErr: nil, }, diff --git a/pkg/storage/tsdb/index_cache.go b/pkg/storage/tsdb/index_cache.go index d6909706f31..950cddb81f1 100644 --- a/pkg/storage/tsdb/index_cache.go +++ b/pkg/storage/tsdb/index_cache.go @@ -1,19 +1,132 @@ package tsdb import ( + "flag" + "fmt" + "strings" + "time" + "github.com/alecthomas/units" "github.com/go-kit/kit/log" + "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" + "github.com/thanos-io/thanos/pkg/cacheutil" storecache "github.com/thanos-io/thanos/pkg/store/cache" + + "github.com/cortexproject/cortex/pkg/util" ) const ( + // IndexCacheBackendInMemory is the value for the in-memory index cache backend. + IndexCacheBackendInMemory = "inmemory" + + // IndexCacheBackendMemcached is the value for the memcached index cache backend. + IndexCacheBackendMemcached = "memcached" + + // IndexCacheBackendDefault is the value for the default index cache backend. + IndexCacheBackendDefault = IndexCacheBackendInMemory + defaultMaxItemSize = storecache.Bytes(128 * units.MiB) ) +var ( + supportedIndexCacheBackends = []string{IndexCacheBackendInMemory, IndexCacheBackendMemcached} + + errUnsupportedIndexCacheBackend = errors.New("unsupported index cache backend") + errNoIndexCacheAddresses = errors.New("no index cache backend addresses") +) + +type IndexCacheConfig struct { + Backend string `yaml:"backend"` + InMemory InMemoryIndexCacheConfig `yaml:"inmemory"` + Memcached MemcachedIndexCacheConfig `yaml:"memcached"` +} + +func (cfg *IndexCacheConfig) RegisterFlags(f *flag.FlagSet) { + cfg.RegisterFlagsWithPrefix(f, "experimental.tsdb.bucket-store.index-cache.") +} + +func (cfg *IndexCacheConfig) RegisterFlagsWithPrefix(f *flag.FlagSet, prefix string) { + f.StringVar(&cfg.Backend, prefix+"backend", IndexCacheBackendDefault, fmt.Sprintf("The index cache backend type. Supported values: %s.", strings.Join(supportedIndexCacheBackends, ", "))) + + cfg.InMemory.RegisterFlagsWithPrefix(f, prefix+"inmemory.") + cfg.Memcached.RegisterFlagsWithPrefix(f, prefix+"memcached.") +} + +// Validate the config. +func (cfg *IndexCacheConfig) Validate() error { + if !util.StringsContain(supportedIndexCacheBackends, cfg.Backend) { + return errUnsupportedIndexCacheBackend + } + + if cfg.Backend == IndexCacheBackendMemcached { + if err := cfg.Memcached.Validate(); err != nil { + return err + } + } + + return nil +} + +type InMemoryIndexCacheConfig struct { + MaxSizeBytes uint64 `yaml:"max_size_bytes"` +} + +func (cfg *InMemoryIndexCacheConfig) RegisterFlagsWithPrefix(f *flag.FlagSet, prefix string) { + f.Uint64Var(&cfg.MaxSizeBytes, prefix+"max-size-bytes", uint64(1*units.Gibibyte), "Maximum size in bytes of in-memory index cache used to speed up blocks index lookups (shared between all tenants).") +} + +type MemcachedIndexCacheConfig struct { + Addresses string `yaml:"addresses"` + Timeout time.Duration `yaml:"timeout"` + MaxIdleConnections int `yaml:"max_idle_connections"` + MaxAsyncConcurrency int `yaml:"max_async_concurrency"` + MaxAsyncBufferSize int `yaml:"max_async_buffer_size"` + MaxGetMultiConcurrency int `yaml:"max_get_multi_concurrency"` + MaxGetMultiBatchSize int `yaml:"max_get_multi_batch_size"` +} + +func (cfg *MemcachedIndexCacheConfig) RegisterFlagsWithPrefix(f *flag.FlagSet, prefix string) { + f.StringVar(&cfg.Addresses, prefix+"addresses", "", "Comma separated list of memcached addresses. Supported prefixes are: dns+ (looked up as an A/AAAA query), dnssrv+ (looked up as a SRV query, dnssrvnoa+ (looked up as a SRV query, with no A/AAAA lookup made after that).") + f.DurationVar(&cfg.Timeout, prefix+"timeout", 100*time.Millisecond, "The socket read/write timeout.") + f.IntVar(&cfg.MaxIdleConnections, prefix+"max-idle-connections", 16, "The maximum number of idle connections that will be maintained per address.") + f.IntVar(&cfg.MaxAsyncConcurrency, prefix+"max-async-concurrency", 50, "The maximum number of concurrent asynchronous operations can occur.") + f.IntVar(&cfg.MaxAsyncBufferSize, prefix+"max-async-buffer-size", 10000, "The maximum number of enqueued asynchronous operations allowed.") + f.IntVar(&cfg.MaxGetMultiConcurrency, prefix+"max-get-multi-concurrency", 100, "The maximum number of concurrent connections running get operations. If set to 0, concurrency is unlimited.") + f.IntVar(&cfg.MaxGetMultiBatchSize, prefix+"max-get-multi-batch-size", 0, "The maximum number of keys a single underlying get operation should run. If more keys are specified, internally keys are splitted into multiple batches and fetched concurrently, honoring the max concurrency. If set to 0, the max batch size is unlimited.") +} + +func (cfg *MemcachedIndexCacheConfig) GetAddresses() []string { + if cfg.Addresses == "" { + return []string{} + } + + return strings.Split(cfg.Addresses, ",") +} + +// Validate the config. +func (cfg *MemcachedIndexCacheConfig) Validate() error { + if len(cfg.GetAddresses()) == 0 { + return errNoIndexCacheAddresses + } + + return nil +} + // NewIndexCache creates a new index cache based on the input configuration. -func NewIndexCache(cfg BucketStoreConfig, logger log.Logger, registerer prometheus.Registerer) (storecache.IndexCache, error) { - maxCacheSize := storecache.Bytes(cfg.IndexCacheSizeBytes) +func NewIndexCache(cfg IndexCacheConfig, logger log.Logger, registerer prometheus.Registerer) (storecache.IndexCache, error) { + switch cfg.Backend { + case IndexCacheBackendInMemory: + return newInMemoryIndexCache(cfg.InMemory, logger, registerer) + case IndexCacheBackendMemcached: + return newMemcachedIndexCache(cfg.Memcached, logger, registerer) + default: + return nil, errUnsupportedIndexCacheBackend + } +} + +func newInMemoryIndexCache(cfg InMemoryIndexCacheConfig, logger log.Logger, registerer prometheus.Registerer) (storecache.IndexCache, error) { + maxCacheSize := storecache.Bytes(cfg.MaxSizeBytes) // Calculate the max item size. maxItemSize := defaultMaxItemSize @@ -26,3 +139,23 @@ func NewIndexCache(cfg BucketStoreConfig, logger log.Logger, registerer promethe MaxItemSize: maxItemSize, }) } + +func newMemcachedIndexCache(cfg MemcachedIndexCacheConfig, logger log.Logger, registerer prometheus.Registerer) (storecache.IndexCache, error) { + config := cacheutil.MemcachedClientConfig{ + Addresses: cfg.GetAddresses(), + Timeout: cfg.Timeout, + MaxIdleConnections: cfg.MaxIdleConnections, + MaxAsyncConcurrency: cfg.MaxAsyncConcurrency, + MaxAsyncBufferSize: cfg.MaxAsyncBufferSize, + MaxGetMultiConcurrency: cfg.MaxGetMultiConcurrency, + MaxGetMultiBatchSize: cfg.MaxGetMultiBatchSize, + DNSProviderUpdateInterval: 30 * time.Second, + } + + client, err := cacheutil.NewMemcachedClientWithConfig(logger, "index-cache", config, registerer) + if err != nil { + return nil, errors.Wrapf(err, "create index cache memcached client") + } + + return storecache.NewMemcachedIndexCache(logger, client, registerer) +} diff --git a/pkg/storage/tsdb/index_cache_metrics.go b/pkg/storage/tsdb/index_cache_metrics.go new file mode 100644 index 00000000000..a8120db656f --- /dev/null +++ b/pkg/storage/tsdb/index_cache_metrics.go @@ -0,0 +1,168 @@ +package tsdb + +import ( + "github.com/prometheus/client_golang/prometheus" + + "github.com/cortexproject/cortex/pkg/util" +) + +func MustNewIndexCacheMetrics(backend string, reg *prometheus.Registry) prometheus.Collector { + switch backend { + case IndexCacheBackendInMemory: + return NewInMemoryIndexCacheMetrics(reg) + case IndexCacheBackendMemcached: + return NewMemcachedIndexCacheMetrics(reg) + default: + panic(errUnsupportedIndexCacheBackend.Error()) + } +} + +// InMemoryIndexCacheMetrics aggregates metrics exported by Thanos in-memory index cache +// and re-exports them as Cortex metrics. +type InMemoryIndexCacheMetrics struct { + reg *prometheus.Registry + + // Metrics gathered from Thanos InMemoryIndexCache + cacheItemsEvicted *prometheus.Desc + cacheItemsAdded *prometheus.Desc + cacheRequests *prometheus.Desc + cacheItemsOverflow *prometheus.Desc + cacheHits *prometheus.Desc + cacheItemsCurrentCount *prometheus.Desc + cacheItemsCurrentSize *prometheus.Desc + cacheItemsTotalCurrentSize *prometheus.Desc + + // Ignored: + // thanos_store_index_cache_max_size_bytes + // thanos_store_index_cache_max_item_size_bytes +} + +// NewInMemoryIndexCacheMetrics makes InMemoryIndexCacheMetrics. +func NewInMemoryIndexCacheMetrics(reg *prometheus.Registry) *InMemoryIndexCacheMetrics { + return &InMemoryIndexCacheMetrics{ + reg: reg, + + // Cache + cacheItemsEvicted: prometheus.NewDesc( + "cortex_querier_blocks_index_cache_items_evicted_total", + "Total number of items that were evicted from the index cache.", + []string{"item_type"}, nil), + cacheItemsAdded: prometheus.NewDesc( + "cortex_querier_blocks_index_cache_items_added_total", + "Total number of items that were added to the index cache.", + []string{"item_type"}, nil), + cacheRequests: prometheus.NewDesc( + "cortex_querier_blocks_index_cache_requests_total", + "Total number of requests to the cache.", + []string{"item_type"}, nil), + cacheItemsOverflow: prometheus.NewDesc( + "cortex_querier_blocks_index_cache_items_overflowed_total", + "Total number of items that could not be added to the cache due to being too big.", + []string{"item_type"}, nil), + cacheHits: prometheus.NewDesc( + "cortex_querier_blocks_index_cache_hits_total", + "Total number of requests to the cache that were a hit.", + []string{"item_type"}, nil), + cacheItemsCurrentCount: prometheus.NewDesc( + "cortex_querier_blocks_index_cache_items", + "Current number of items in the index cache.", + []string{"item_type"}, nil), + cacheItemsCurrentSize: prometheus.NewDesc( + "cortex_querier_blocks_index_cache_items_size_bytes", + "Current byte size of items in the index cache.", + []string{"item_type"}, nil), + cacheItemsTotalCurrentSize: prometheus.NewDesc( + "cortex_querier_blocks_index_cache_total_size_bytes", + "Current byte size of items (both value and key) in the index cache.", + []string{"item_type"}, nil), + } +} + +func (m *InMemoryIndexCacheMetrics) Describe(out chan<- *prometheus.Desc) { + out <- m.cacheItemsEvicted + out <- m.cacheItemsAdded + out <- m.cacheRequests + out <- m.cacheItemsOverflow + out <- m.cacheHits + out <- m.cacheItemsCurrentCount + out <- m.cacheItemsCurrentSize + out <- m.cacheItemsTotalCurrentSize +} + +func (m *InMemoryIndexCacheMetrics) Collect(out chan<- prometheus.Metric) { + data := util.BuildMetricFamiliesPerUserFromUserRegistries(map[string]*prometheus.Registry{ + "": m.reg, + }) + + data.SendSumOfCountersWithLabels(out, m.cacheItemsEvicted, "thanos_store_index_cache_items_evicted_total", "item_type") + data.SendSumOfCountersWithLabels(out, m.cacheItemsAdded, "thanos_store_index_cache_items_added_total", "item_type") + data.SendSumOfCountersWithLabels(out, m.cacheRequests, "thanos_store_index_cache_requests_total", "item_type") + data.SendSumOfCountersWithLabels(out, m.cacheItemsOverflow, "thanos_store_index_cache_items_overflowed_total", "item_type") + data.SendSumOfCountersWithLabels(out, m.cacheHits, "thanos_store_index_cache_hits_total", "item_type") + + data.SendSumOfGaugesWithLabels(out, m.cacheItemsCurrentCount, "thanos_store_index_cache_items", "item_type") + data.SendSumOfGaugesWithLabels(out, m.cacheItemsCurrentSize, "thanos_store_index_cache_items_size_bytes", "item_type") + data.SendSumOfGaugesWithLabels(out, m.cacheItemsTotalCurrentSize, "thanos_store_index_cache_total_size_bytes", "item_type") +} + +// MemcachedIndexCacheMetrics aggregates metrics exported by Thanos memcached index cache +// and re-exports them as Cortex metrics. +type MemcachedIndexCacheMetrics struct { + reg *prometheus.Registry + + // Metrics gathered from Thanos MemcachedIndexCache (and client). + cacheRequests *prometheus.Desc + cacheHits *prometheus.Desc + memcachedOperations *prometheus.Desc + memcachedFailures *prometheus.Desc + memcachedDuration *prometheus.Desc +} + +// NewMemcachedIndexCacheMetrics makes MemcachedIndexCacheMetrics. +func NewMemcachedIndexCacheMetrics(reg *prometheus.Registry) *MemcachedIndexCacheMetrics { + return &MemcachedIndexCacheMetrics{ + reg: reg, + + cacheRequests: prometheus.NewDesc( + "cortex_querier_blocks_index_cache_requests_total", + "Total number of requests to the cache.", + []string{"item_type"}, nil), + cacheHits: prometheus.NewDesc( + "cortex_querier_blocks_index_cache_hits_total", + "Total number of requests to the cache that were a hit.", + []string{"item_type"}, nil), + memcachedOperations: prometheus.NewDesc( + "cortex_querier_blocks_index_cache_memcached_operations_total", + "Total number of operations against memcached.", + []string{"operation"}, nil), + memcachedFailures: prometheus.NewDesc( + "cortex_querier_blocks_index_cache_memcached_operation_failures_total", + "Total number of operations against memcached that failed.", + []string{"operation"}, nil), + memcachedDuration: prometheus.NewDesc( + "cortex_querier_blocks_index_cache_memcached_operation_duration_seconds", + "Duration of operations against memcached.", + []string{"operation"}, nil), + } +} + +func (m *MemcachedIndexCacheMetrics) Describe(out chan<- *prometheus.Desc) { + out <- m.cacheRequests + out <- m.cacheHits + out <- m.memcachedOperations + out <- m.memcachedFailures + out <- m.memcachedDuration +} + +func (m *MemcachedIndexCacheMetrics) Collect(out chan<- prometheus.Metric) { + data := util.BuildMetricFamiliesPerUserFromUserRegistries(map[string]*prometheus.Registry{ + "": m.reg, + }) + + data.SendSumOfCountersWithLabels(out, m.cacheRequests, "thanos_store_index_cache_requests_total", "item_type") + data.SendSumOfCountersWithLabels(out, m.cacheHits, "thanos_store_index_cache_hits_total", "item_type") + + data.SendSumOfCountersWithLabels(out, m.memcachedOperations, "thanos_memcached_operations_total", "operation") + data.SendSumOfCountersWithLabels(out, m.memcachedFailures, "thanos_memcached_operation_failures_total", "operation") + data.SendSumOfHistogramsWithLabels(out, m.memcachedDuration, "thanos_memcached_operation_duration_seconds", "operation") +} diff --git a/pkg/storage/tsdb/index_cache_metrics_test.go b/pkg/storage/tsdb/index_cache_metrics_test.go new file mode 100644 index 00000000000..fdde974a085 --- /dev/null +++ b/pkg/storage/tsdb/index_cache_metrics_test.go @@ -0,0 +1,293 @@ +package tsdb + +import ( + "bytes" + "testing" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/testutil" + "github.com/stretchr/testify/require" +) + +const ( + cacheTypePostings string = "Postings" + cacheTypeSeries string = "Series" + + cacheOpSet string = "set" + cacheOpGetMulti string = "getmulti" +) + +func TestInMemoryIndexCacheMetrics(t *testing.T) { + mainReg := prometheus.NewPedanticRegistry() + cacheMetrics := NewInMemoryIndexCacheMetrics(populateInMemoryIndexCacheMetrics(5328)) + mainReg.MustRegister(cacheMetrics) + + //noinspection ALL + err := testutil.GatherAndCompare(mainReg, bytes.NewBufferString(` + # HELP cortex_querier_blocks_index_cache_items_evicted_total Total number of items that were evicted from the index cache. + # TYPE cortex_querier_blocks_index_cache_items_evicted_total counter + cortex_querier_blocks_index_cache_items_evicted_total{item_type="Postings"} 5328 + cortex_querier_blocks_index_cache_items_evicted_total{item_type="Series"} 10656 + + # HELP cortex_querier_blocks_index_cache_requests_total Total number of requests to the cache. + # TYPE cortex_querier_blocks_index_cache_requests_total counter + cortex_querier_blocks_index_cache_requests_total{item_type="Postings"} 15984 + cortex_querier_blocks_index_cache_requests_total{item_type="Series"} 21312 + + # HELP cortex_querier_blocks_index_cache_hits_total Total number of requests to the cache that were a hit. + # TYPE cortex_querier_blocks_index_cache_hits_total counter + cortex_querier_blocks_index_cache_hits_total{item_type="Postings"} 26640 + cortex_querier_blocks_index_cache_hits_total{item_type="Series"} 31968 + + # HELP cortex_querier_blocks_index_cache_items_added_total Total number of items that were added to the index cache. + # TYPE cortex_querier_blocks_index_cache_items_added_total counter + cortex_querier_blocks_index_cache_items_added_total{item_type="Postings"} 37296 + cortex_querier_blocks_index_cache_items_added_total{item_type="Series"} 42624 + + # HELP cortex_querier_blocks_index_cache_items Current number of items in the index cache. + # TYPE cortex_querier_blocks_index_cache_items gauge + cortex_querier_blocks_index_cache_items{item_type="Postings"} 47952 + cortex_querier_blocks_index_cache_items{item_type="Series"} 53280 + + # HELP cortex_querier_blocks_index_cache_items_size_bytes Current byte size of items in the index cache. + # TYPE cortex_querier_blocks_index_cache_items_size_bytes gauge + cortex_querier_blocks_index_cache_items_size_bytes{item_type="Postings"} 58608 + cortex_querier_blocks_index_cache_items_size_bytes{item_type="Series"} 63936 + + # HELP cortex_querier_blocks_index_cache_total_size_bytes Current byte size of items (both value and key) in the index cache. + # TYPE cortex_querier_blocks_index_cache_total_size_bytes gauge + cortex_querier_blocks_index_cache_total_size_bytes{item_type="Postings"} 69264 + cortex_querier_blocks_index_cache_total_size_bytes{item_type="Series"} 74592 + + # HELP cortex_querier_blocks_index_cache_items_overflowed_total Total number of items that could not be added to the cache due to being too big. + # TYPE cortex_querier_blocks_index_cache_items_overflowed_total counter + cortex_querier_blocks_index_cache_items_overflowed_total{item_type="Postings"} 79920 + cortex_querier_blocks_index_cache_items_overflowed_total{item_type="Series"} 85248 +`)) + require.NoError(t, err) +} + +// Copied from Thanos, pkg/store/cache/inmemory.go, InMemoryIndexCache struct +type inMemoryIndexStoreCacheMetrics struct { + evicted *prometheus.CounterVec + requests *prometheus.CounterVec + hits *prometheus.CounterVec + added *prometheus.CounterVec + current *prometheus.GaugeVec + currentSize *prometheus.GaugeVec + totalCurrentSize *prometheus.GaugeVec + overflow *prometheus.CounterVec +} + +func newInMemoryIndexStoreCacheMetrics(reg prometheus.Registerer) *inMemoryIndexStoreCacheMetrics { + c := inMemoryIndexStoreCacheMetrics{} + c.evicted = prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "thanos_store_index_cache_items_evicted_total", + Help: "Total number of items that were evicted from the index cache.", + }, []string{"item_type"}) + c.evicted.WithLabelValues(cacheTypePostings) + c.evicted.WithLabelValues(cacheTypeSeries) + + c.added = prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "thanos_store_index_cache_items_added_total", + Help: "Total number of items that were added to the index cache.", + }, []string{"item_type"}) + c.added.WithLabelValues(cacheTypePostings) + c.added.WithLabelValues(cacheTypeSeries) + + c.requests = prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "thanos_store_index_cache_requests_total", + Help: "Total number of requests to the cache.", + }, []string{"item_type"}) + c.requests.WithLabelValues(cacheTypePostings) + c.requests.WithLabelValues(cacheTypeSeries) + + c.overflow = prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "thanos_store_index_cache_items_overflowed_total", + Help: "Total number of items that could not be added to the cache due to being too big.", + }, []string{"item_type"}) + c.overflow.WithLabelValues(cacheTypePostings) + c.overflow.WithLabelValues(cacheTypeSeries) + + c.hits = prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "thanos_store_index_cache_hits_total", + Help: "Total number of requests to the cache that were a hit.", + }, []string{"item_type"}) + c.hits.WithLabelValues(cacheTypePostings) + c.hits.WithLabelValues(cacheTypeSeries) + + c.current = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Name: "thanos_store_index_cache_items", + Help: "Current number of items in the index cache.", + }, []string{"item_type"}) + c.current.WithLabelValues(cacheTypePostings) + c.current.WithLabelValues(cacheTypeSeries) + + c.currentSize = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Name: "thanos_store_index_cache_items_size_bytes", + Help: "Current byte size of items in the index cache.", + }, []string{"item_type"}) + c.currentSize.WithLabelValues(cacheTypePostings) + c.currentSize.WithLabelValues(cacheTypeSeries) + + c.totalCurrentSize = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Name: "thanos_store_index_cache_total_size_bytes", + Help: "Current byte size of items (both value and key) in the index cache.", + }, []string{"item_type"}) + c.totalCurrentSize.WithLabelValues(cacheTypePostings) + c.totalCurrentSize.WithLabelValues(cacheTypeSeries) + + if reg != nil { + reg.MustRegister(c.requests, c.hits, c.added, c.evicted, c.current, c.currentSize, c.totalCurrentSize, c.overflow) + } + + return &c +} + +func populateInMemoryIndexCacheMetrics(base float64) *prometheus.Registry { + reg := prometheus.NewRegistry() + c := newInMemoryIndexStoreCacheMetrics(reg) + + c.evicted.WithLabelValues(cacheTypePostings).Add(base * 1) + c.evicted.WithLabelValues(cacheTypeSeries).Add(base * 2) + c.requests.WithLabelValues(cacheTypePostings).Add(base * 3) + c.requests.WithLabelValues(cacheTypeSeries).Add(base * 4) + c.hits.WithLabelValues(cacheTypePostings).Add(base * 5) + c.hits.WithLabelValues(cacheTypeSeries).Add(base * 6) + c.added.WithLabelValues(cacheTypePostings).Add(base * 7) + c.added.WithLabelValues(cacheTypeSeries).Add(base * 8) + c.current.WithLabelValues(cacheTypePostings).Set(base * 9) + c.current.WithLabelValues(cacheTypeSeries).Set(base * 10) + c.currentSize.WithLabelValues(cacheTypePostings).Set(base * 11) + c.currentSize.WithLabelValues(cacheTypeSeries).Set(base * 12) + c.totalCurrentSize.WithLabelValues(cacheTypePostings).Set(base * 13) + c.totalCurrentSize.WithLabelValues(cacheTypeSeries).Set(base * 14) + c.overflow.WithLabelValues(cacheTypePostings).Add(base * 15) + c.overflow.WithLabelValues(cacheTypeSeries).Add(base * 16) + + return reg +} + +func TestMemcachedIndexCacheMetrics(t *testing.T) { + mainReg := prometheus.NewPedanticRegistry() + cacheMetrics := NewMemcachedIndexCacheMetrics(populateMemcachedIndexCacheMetrics(1)) + mainReg.MustRegister(cacheMetrics) + + //noinspection ALL + err := testutil.GatherAndCompare(mainReg, bytes.NewBufferString(` + # HELP cortex_querier_blocks_index_cache_requests_total Total number of requests to the cache. + # TYPE cortex_querier_blocks_index_cache_requests_total counter + cortex_querier_blocks_index_cache_requests_total{item_type="Postings"} 1 + cortex_querier_blocks_index_cache_requests_total{item_type="Series"} 2 + + # HELP cortex_querier_blocks_index_cache_hits_total Total number of requests to the cache that were a hit. + # TYPE cortex_querier_blocks_index_cache_hits_total counter + cortex_querier_blocks_index_cache_hits_total{item_type="Postings"} 3 + cortex_querier_blocks_index_cache_hits_total{item_type="Series"} 4 + + # HELP cortex_querier_blocks_index_cache_memcached_operations_total Total number of operations against memcached. + # TYPE cortex_querier_blocks_index_cache_memcached_operations_total counter + cortex_querier_blocks_index_cache_memcached_operations_total{operation="set"} 5 + cortex_querier_blocks_index_cache_memcached_operations_total{operation="getmulti"} 6 + + # HELP cortex_querier_blocks_index_cache_memcached_operation_failures_total Total number of operations against memcached that failed. + # TYPE cortex_querier_blocks_index_cache_memcached_operation_failures_total counter + cortex_querier_blocks_index_cache_memcached_operation_failures_total{operation="set"} 7 + cortex_querier_blocks_index_cache_memcached_operation_failures_total{operation="getmulti"} 8 + + # HELP cortex_querier_blocks_index_cache_memcached_operation_duration_seconds Duration of operations against memcached. + # TYPE cortex_querier_blocks_index_cache_memcached_operation_duration_seconds histogram + cortex_querier_blocks_index_cache_memcached_operation_duration_seconds_bucket{operation="set",le="0.001"} 0 + cortex_querier_blocks_index_cache_memcached_operation_duration_seconds_bucket{operation="set",le="0.005"} 0 + cortex_querier_blocks_index_cache_memcached_operation_duration_seconds_bucket{operation="set",le="0.01"} 0 + cortex_querier_blocks_index_cache_memcached_operation_duration_seconds_bucket{operation="set",le="0.025"} 0 + cortex_querier_blocks_index_cache_memcached_operation_duration_seconds_bucket{operation="set",le="0.05"} 0 + cortex_querier_blocks_index_cache_memcached_operation_duration_seconds_bucket{operation="set",le="0.1"} 1 + cortex_querier_blocks_index_cache_memcached_operation_duration_seconds_bucket{operation="set",le="0.2"} 1 + cortex_querier_blocks_index_cache_memcached_operation_duration_seconds_bucket{operation="set",le="0.5"} 1 + cortex_querier_blocks_index_cache_memcached_operation_duration_seconds_bucket{operation="set",le="1"} 1 + cortex_querier_blocks_index_cache_memcached_operation_duration_seconds_bucket{operation="set",le="+Inf"} 1 + cortex_querier_blocks_index_cache_memcached_operation_duration_seconds_sum{operation="set"} 0.1 + cortex_querier_blocks_index_cache_memcached_operation_duration_seconds_count{operation="set"} 1 + cortex_querier_blocks_index_cache_memcached_operation_duration_seconds_bucket{operation="getmulti",le="0.001"} 0 + cortex_querier_blocks_index_cache_memcached_operation_duration_seconds_bucket{operation="getmulti",le="0.005"} 0 + cortex_querier_blocks_index_cache_memcached_operation_duration_seconds_bucket{operation="getmulti",le="0.01"} 0 + cortex_querier_blocks_index_cache_memcached_operation_duration_seconds_bucket{operation="getmulti",le="0.025"} 1 + cortex_querier_blocks_index_cache_memcached_operation_duration_seconds_bucket{operation="getmulti",le="0.05"} 1 + cortex_querier_blocks_index_cache_memcached_operation_duration_seconds_bucket{operation="getmulti",le="0.1"} 1 + cortex_querier_blocks_index_cache_memcached_operation_duration_seconds_bucket{operation="getmulti",le="0.2"} 1 + cortex_querier_blocks_index_cache_memcached_operation_duration_seconds_bucket{operation="getmulti",le="0.5"} 1 + cortex_querier_blocks_index_cache_memcached_operation_duration_seconds_bucket{operation="getmulti",le="1"} 1 + cortex_querier_blocks_index_cache_memcached_operation_duration_seconds_bucket{operation="getmulti",le="+Inf"} 1 + cortex_querier_blocks_index_cache_memcached_operation_duration_seconds_sum{operation="getmulti"} 0.025 + cortex_querier_blocks_index_cache_memcached_operation_duration_seconds_count{operation="getmulti"} 1 +`)) + require.NoError(t, err) +} + +type memcachedIndexStoreCacheMetrics struct { + requests *prometheus.CounterVec + hits *prometheus.CounterVec + operations *prometheus.CounterVec + failures *prometheus.CounterVec + duration *prometheus.HistogramVec +} + +func newMemcachedIndexStoreCacheMetrics(reg prometheus.Registerer) *memcachedIndexStoreCacheMetrics { + c := memcachedIndexStoreCacheMetrics{} + + c.requests = prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "thanos_store_index_cache_requests_total", + Help: "Total number of requests to the cache.", + }, []string{"item_type"}) + c.requests.WithLabelValues(cacheTypePostings) + c.requests.WithLabelValues(cacheTypeSeries) + + c.hits = prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "thanos_store_index_cache_hits_total", + Help: "Total number of requests to the cache that were a hit.", + }, []string{"item_type"}) + c.hits.WithLabelValues(cacheTypePostings) + c.hits.WithLabelValues(cacheTypeSeries) + + c.operations = prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "thanos_memcached_operations_total", + Help: "Total number of operations against memcached.", + }, []string{"operation"}) + + c.failures = prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "thanos_memcached_operation_failures_total", + Help: "Total number of operations against memcached that failed.", + }, []string{"operation"}) + + c.duration = prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Name: "thanos_memcached_operation_duration_seconds", + Help: "Duration of operations against memcached.", + Buckets: []float64{0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.2, 0.5, 1}, + }, []string{"operation"}) + + if reg != nil { + reg.MustRegister(c.requests, c.hits, c.operations, c.failures, c.duration) + } + + return &c +} + +func populateMemcachedIndexCacheMetrics(base float64) *prometheus.Registry { + reg := prometheus.NewRegistry() + c := newMemcachedIndexStoreCacheMetrics(reg) + + c.requests.WithLabelValues(cacheTypePostings).Add(base * 1) + c.requests.WithLabelValues(cacheTypeSeries).Add(base * 2) + c.hits.WithLabelValues(cacheTypePostings).Add(base * 3) + c.hits.WithLabelValues(cacheTypeSeries).Add(base * 4) + + c.operations.WithLabelValues(cacheOpSet).Add(base * 5) + c.operations.WithLabelValues(cacheOpGetMulti).Add(base * 6) + c.failures.WithLabelValues(cacheOpSet).Add(base * 7) + c.failures.WithLabelValues(cacheOpGetMulti).Add(base * 8) + c.duration.WithLabelValues(cacheOpSet).Observe(0.1) + c.duration.WithLabelValues(cacheOpGetMulti).Observe(0.025) + + return reg +} diff --git a/pkg/storage/tsdb/index_cache_test.go b/pkg/storage/tsdb/index_cache_test.go new file mode 100644 index 00000000000..1f4d63e6065 --- /dev/null +++ b/pkg/storage/tsdb/index_cache_test.go @@ -0,0 +1,81 @@ +package tsdb + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/cortexproject/cortex/pkg/util/flagext" +) + +func TestIndexCacheConfig_Validate(t *testing.T) { + tests := map[string]struct { + cfg IndexCacheConfig + expected error + }{ + "default config should pass": { + cfg: func() IndexCacheConfig { + cfg := IndexCacheConfig{} + flagext.DefaultValues(&cfg) + return cfg + }(), + }, + "unsupported backend should fail": { + cfg: IndexCacheConfig{ + Backend: "xxx", + }, + expected: errUnsupportedIndexCacheBackend, + }, + "no memcached addresses should fail": { + cfg: IndexCacheConfig{ + Backend: "memcached", + }, + expected: errNoIndexCacheAddresses, + }, + "one memcached address should pass": { + cfg: IndexCacheConfig{ + Backend: "memcached", + Memcached: MemcachedIndexCacheConfig{ + Addresses: "dns+localhost:11211", + }, + }, + }, + } + + for testName, testData := range tests { + t.Run(testName, func(t *testing.T) { + assert.Equal(t, testData.expected, testData.cfg.Validate()) + }) + } +} + +func TestMemcachedIndexCacheConfig_GetAddresses(t *testing.T) { + tests := map[string]struct { + cfg MemcachedIndexCacheConfig + expected []string + }{ + "no addresses": { + cfg: MemcachedIndexCacheConfig{ + Addresses: "", + }, + expected: []string{}, + }, + "one address": { + cfg: MemcachedIndexCacheConfig{ + Addresses: "dns+localhost:11211", + }, + expected: []string{"dns+localhost:11211"}, + }, + "two addresses": { + cfg: MemcachedIndexCacheConfig{ + Addresses: "dns+memcached-1:11211,dns+memcached-2:11211", + }, + expected: []string{"dns+memcached-1:11211", "dns+memcached-2:11211"}, + }, + } + for testName, testData := range tests { + t.Run(testName, func(t *testing.T) { + assert.Equal(t, testData.expected, testData.cfg.GetAddresses()) + }) + } +} diff --git a/pkg/util/metrics_helper.go b/pkg/util/metrics_helper.go index b4aeabb48b2..537d3cfd947 100644 --- a/pkg/util/metrics_helper.go +++ b/pkg/util/metrics_helper.go @@ -254,6 +254,35 @@ func (d MetricFamiliesPerUser) SendSumOfHistograms(out chan<- prometheus.Metric, out <- hd.Metric(desc) } +func (d MetricFamiliesPerUser) SendSumOfHistogramsWithLabels(out chan<- prometheus.Metric, desc *prometheus.Desc, histogramName string, labelNames ...string) { + type histogramResult struct { + data HistogramData + labelValues []string + } + + result := map[string]histogramResult{} + + for _, userMetrics := range d { + metricsPerLabelValue := getMetricsWithLabelNames(userMetrics[histogramName], labelNames) + + for key, mwl := range metricsPerLabelValue { + for _, m := range mwl.metrics { + r := result[key] + if r.labelValues == nil { + r.labelValues = mwl.labelValues + } + + r.data.AddHistogram(m.GetHistogram()) + result[key] = r + } + } + } + + for _, hg := range result { + out <- hg.data.Metric(desc, hg.labelValues...) + } +} + // struct for holding metrics with same label values type metricsWithLabels struct { labelValues []string @@ -405,8 +434,8 @@ func (d *HistogramData) AddHistogramData(histo HistogramData) { } // Return prometheus metric from this histogram data. -func (d *HistogramData) Metric(desc *prometheus.Desc) prometheus.Metric { - return prometheus.MustNewConstHistogram(desc, d.sampleCount, d.sampleSum, d.buckets) +func (d *HistogramData) Metric(desc *prometheus.Desc, labelValues ...string) prometheus.Metric { + return prometheus.MustNewConstHistogram(desc, d.sampleCount, d.sampleSum, d.buckets, labelValues...) } // Creates new histogram data collector. diff --git a/pkg/util/metrics_helper_test.go b/pkg/util/metrics_helper_test.go index 1bf41493e80..e4fa550c6fa 100644 --- a/pkg/util/metrics_helper_test.go +++ b/pkg/util/metrics_helper_test.go @@ -120,42 +120,48 @@ func TestSendSumOfGaugesPerUserWithLabels(t *testing.T) { "user-2": user2Reg, }) - desc := prometheus.NewDesc("test_metric", "", []string{"user", "label_one"}, nil) - actual, err := collectMetrics(func(out chan prometheus.Metric) { - mf.SendSumOfGaugesPerUserWithLabels(out, desc, "test_metric", "label_one") - }) - require.NoError(t, err) - expected := []*dto.Metric{ - {Label: makeLabels("label_one", "a", "user", "user-1"), Gauge: &dto.Gauge{Value: proto.Float64(180)}}, - {Label: makeLabels("label_one", "a", "user", "user-2"), Gauge: &dto.Gauge{Value: proto.Float64(100)}}, + { + desc := prometheus.NewDesc("test_metric", "", []string{"user", "label_one"}, nil) + actual, err := collectMetrics(func(out chan prometheus.Metric) { + mf.SendSumOfGaugesPerUserWithLabels(out, desc, "test_metric", "label_one") + }) + require.NoError(t, err) + expected := []*dto.Metric{ + {Label: makeLabels("label_one", "a", "user", "user-1"), Gauge: &dto.Gauge{Value: proto.Float64(180)}}, + {Label: makeLabels("label_one", "a", "user", "user-2"), Gauge: &dto.Gauge{Value: proto.Float64(100)}}, + } + require.ElementsMatch(t, expected, actual) } - require.ElementsMatch(t, expected, actual) - desc = prometheus.NewDesc("test_metric", "", []string{"user", "label_two"}, nil) - actual, err = collectMetrics(func(out chan prometheus.Metric) { - mf.SendSumOfGaugesPerUserWithLabels(out, desc, "test_metric", "label_two") - }) - require.NoError(t, err) - expected = []*dto.Metric{ - {Label: makeLabels("label_two", "b", "user", "user-1"), Gauge: &dto.Gauge{Value: proto.Float64(100)}}, - {Label: makeLabels("label_two", "c", "user", "user-1"), Gauge: &dto.Gauge{Value: proto.Float64(80)}}, - {Label: makeLabels("label_two", "b", "user", "user-2"), Gauge: &dto.Gauge{Value: proto.Float64(60)}}, - {Label: makeLabels("label_two", "c", "user", "user-2"), Gauge: &dto.Gauge{Value: proto.Float64(40)}}, + { + desc := prometheus.NewDesc("test_metric", "", []string{"user", "label_two"}, nil) + actual, err := collectMetrics(func(out chan prometheus.Metric) { + mf.SendSumOfGaugesPerUserWithLabels(out, desc, "test_metric", "label_two") + }) + require.NoError(t, err) + expected := []*dto.Metric{ + {Label: makeLabels("label_two", "b", "user", "user-1"), Gauge: &dto.Gauge{Value: proto.Float64(100)}}, + {Label: makeLabels("label_two", "c", "user", "user-1"), Gauge: &dto.Gauge{Value: proto.Float64(80)}}, + {Label: makeLabels("label_two", "b", "user", "user-2"), Gauge: &dto.Gauge{Value: proto.Float64(60)}}, + {Label: makeLabels("label_two", "c", "user", "user-2"), Gauge: &dto.Gauge{Value: proto.Float64(40)}}, + } + require.ElementsMatch(t, expected, actual) } - require.ElementsMatch(t, expected, actual) - desc = prometheus.NewDesc("test_metric", "", []string{"user", "label_one", "label_two"}, nil) - actual, err = collectMetrics(func(out chan prometheus.Metric) { - mf.SendSumOfGaugesPerUserWithLabels(out, desc, "test_metric", "label_one", "label_two") - }) - require.NoError(t, err) - expected = []*dto.Metric{ - {Label: makeLabels("label_one", "a", "label_two", "b", "user", "user-1"), Gauge: &dto.Gauge{Value: proto.Float64(100)}}, - {Label: makeLabels("label_one", "a", "label_two", "c", "user", "user-1"), Gauge: &dto.Gauge{Value: proto.Float64(80)}}, - {Label: makeLabels("label_one", "a", "label_two", "b", "user", "user-2"), Gauge: &dto.Gauge{Value: proto.Float64(60)}}, - {Label: makeLabels("label_one", "a", "label_two", "c", "user", "user-2"), Gauge: &dto.Gauge{Value: proto.Float64(40)}}, + { + desc := prometheus.NewDesc("test_metric", "", []string{"user", "label_one", "label_two"}, nil) + actual, err := collectMetrics(func(out chan prometheus.Metric) { + mf.SendSumOfGaugesPerUserWithLabels(out, desc, "test_metric", "label_one", "label_two") + }) + require.NoError(t, err) + expected := []*dto.Metric{ + {Label: makeLabels("label_one", "a", "label_two", "b", "user", "user-1"), Gauge: &dto.Gauge{Value: proto.Float64(100)}}, + {Label: makeLabels("label_one", "a", "label_two", "c", "user", "user-1"), Gauge: &dto.Gauge{Value: proto.Float64(80)}}, + {Label: makeLabels("label_one", "a", "label_two", "b", "user", "user-2"), Gauge: &dto.Gauge{Value: proto.Float64(60)}}, + {Label: makeLabels("label_one", "a", "label_two", "c", "user", "user-2"), Gauge: &dto.Gauge{Value: proto.Float64(40)}}, + } + require.ElementsMatch(t, expected, actual) } - require.ElementsMatch(t, expected, actual) } func TestSendMaxOfGauges(t *testing.T) { @@ -197,6 +203,84 @@ func TestSendMaxOfGauges(t *testing.T) { require.ElementsMatch(t, expected, actual) } +func TestSendSumOfHistogramsWithLabels(t *testing.T) { + buckets := []float64{1, 2, 3} + user1Metric := prometheus.NewHistogramVec(prometheus.HistogramOpts{Name: "test_metric", Buckets: buckets}, []string{"label_one", "label_two"}) + user2Metric := prometheus.NewHistogramVec(prometheus.HistogramOpts{Name: "test_metric", Buckets: buckets}, []string{"label_one", "label_two"}) + user1Metric.WithLabelValues("a", "b").Observe(1) + user1Metric.WithLabelValues("a", "c").Observe(2) + user2Metric.WithLabelValues("a", "b").Observe(3) + user2Metric.WithLabelValues("a", "c").Observe(4) + + user1Reg := prometheus.NewRegistry() + user2Reg := prometheus.NewRegistry() + user1Reg.MustRegister(user1Metric) + user2Reg.MustRegister(user2Metric) + + mf := BuildMetricFamiliesPerUserFromUserRegistries(map[string]*prometheus.Registry{ + "user-1": user1Reg, + "user-2": user2Reg, + }) + + { + desc := prometheus.NewDesc("test_metric", "", []string{"label_one"}, nil) + actual, err := collectMetrics(func(out chan prometheus.Metric) { + mf.SendSumOfHistogramsWithLabels(out, desc, "test_metric", "label_one") + }) + require.NoError(t, err) + expected := []*dto.Metric{ + {Label: makeLabels("label_one", "a"), Histogram: &dto.Histogram{SampleCount: uint64p(4), SampleSum: float64p(10), Bucket: []*dto.Bucket{ + {UpperBound: float64p(1), CumulativeCount: uint64p(1)}, + {UpperBound: float64p(2), CumulativeCount: uint64p(2)}, + {UpperBound: float64p(3), CumulativeCount: uint64p(3)}, + }}}, + } + require.ElementsMatch(t, expected, actual) + } + + { + desc := prometheus.NewDesc("test_metric", "", []string{"label_two"}, nil) + actual, err := collectMetrics(func(out chan prometheus.Metric) { + mf.SendSumOfHistogramsWithLabels(out, desc, "test_metric", "label_two") + }) + require.NoError(t, err) + expected := []*dto.Metric{ + {Label: makeLabels("label_two", "b"), Histogram: &dto.Histogram{SampleCount: uint64p(2), SampleSum: float64p(4), Bucket: []*dto.Bucket{ + {UpperBound: float64p(1), CumulativeCount: uint64p(1)}, + {UpperBound: float64p(2), CumulativeCount: uint64p(1)}, + {UpperBound: float64p(3), CumulativeCount: uint64p(2)}, + }}}, + {Label: makeLabels("label_two", "c"), Histogram: &dto.Histogram{SampleCount: uint64p(2), SampleSum: float64p(6), Bucket: []*dto.Bucket{ + {UpperBound: float64p(1), CumulativeCount: uint64p(0)}, + {UpperBound: float64p(2), CumulativeCount: uint64p(1)}, + {UpperBound: float64p(3), CumulativeCount: uint64p(1)}, + }}}, + } + require.ElementsMatch(t, expected, actual) + } + + { + desc := prometheus.NewDesc("test_metric", "", []string{"label_one", "label_two"}, nil) + actual, err := collectMetrics(func(out chan prometheus.Metric) { + mf.SendSumOfHistogramsWithLabels(out, desc, "test_metric", "label_one", "label_two") + }) + require.NoError(t, err) + expected := []*dto.Metric{ + {Label: makeLabels("label_one", "a", "label_two", "b"), Histogram: &dto.Histogram{SampleCount: uint64p(2), SampleSum: float64p(4), Bucket: []*dto.Bucket{ + {UpperBound: float64p(1), CumulativeCount: uint64p(1)}, + {UpperBound: float64p(2), CumulativeCount: uint64p(1)}, + {UpperBound: float64p(3), CumulativeCount: uint64p(2)}, + }}}, + {Label: makeLabels("label_one", "a", "label_two", "c"), Histogram: &dto.Histogram{SampleCount: uint64p(2), SampleSum: float64p(6), Bucket: []*dto.Bucket{ + {UpperBound: float64p(1), CumulativeCount: uint64p(0)}, + {UpperBound: float64p(2), CumulativeCount: uint64p(1)}, + {UpperBound: float64p(3), CumulativeCount: uint64p(1)}, + }}}, + } + require.ElementsMatch(t, expected, actual) + } +} + func collectMetrics(send func(out chan prometheus.Metric)) ([]*dto.Metric, error) { out := make(chan prometheus.Metric) @@ -218,3 +302,11 @@ func collectMetrics(send func(out chan prometheus.Metric)) ([]*dto.Metric, error return metrics, nil } + +func float64p(v float64) *float64 { + return &v +} + +func uint64p(v uint64) *uint64 { + return &v +} diff --git a/pkg/util/strings.go b/pkg/util/strings.go new file mode 100644 index 00000000000..39868e1d1cb --- /dev/null +++ b/pkg/util/strings.go @@ -0,0 +1,12 @@ +package util + +// StringsContain returns true if the search value is within the list of input values. +func StringsContain(values []string, search string) bool { + for _, v := range values { + if search == v { + return true + } + } + + return false +}