Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
9008e16
Added ID to the querier.
pstibrany Aug 28, 2020
6947054
Extended frontend protocol to add request type.
pstibrany Aug 31, 2020
78c3713
Frontend now asks querier for its ID before running process loop.
pstibrany Aug 31, 2020
55d98fe
Close gRPC connection when stopping manager.
pstibrany Sep 1, 2020
2629b98
Shuffle shard queriers between users.
pstibrany Sep 1, 2020
96924dc
Added MaxQueriersPerUser to overrides.
pstibrany Sep 1, 2020
2b9dcab
Fixes.
pstibrany Sep 1, 2020
d23a825
Fix querier.id default value.
pstibrany Sep 1, 2020
813f2f2
CHANGELOG.md
pstibrany Sep 1, 2020
dd81676
Make lint happy.
pstibrany Sep 1, 2020
31cc2d2
Fix protos.
pstibrany Sep 1, 2020
0f0b343
Fixed bug in getNextRequestForQuerier, modified benchmarks to add que…
pstibrany Sep 2, 2020
6ee9f37
Fixed spelling.
pstibrany Sep 2, 2020
36caae2
Move metrics increment in register/unregister querier connection.
pstibrany Sep 2, 2020
b90323a
When select queriers for tenant, use shuffling.
pstibrany Sep 2, 2020
736ca66
Use rand numbers to find queriers.
pstibrany Sep 2, 2020
cb9b616
Fixed docs.
pstibrany Sep 2, 2020
45a1f7f
Add integration test for sharding queriers.
pstibrany Sep 2, 2020
c66e3da
Use shuffling for selecting queriers per user.
pstibrany Sep 11, 2020
35b81e8
Updated documentation.
pstibrany Sep 11, 2020
0927013
Fixed docs after rebase.
pstibrany Sep 11, 2020
32820ec
Unify seed computation with subring PR.
pstibrany Sep 17, 2020
fde0808
Added unit test to verify that selected queriers are unique and come …
pstibrany Sep 17, 2020
84217e6
Review feedback.
pstibrany Sep 17, 2020
874a548
Mention shuffle sharding in v1 guarantees.
pstibrany Sep 17, 2020
729cc27
Merge branch 'master' into query-frontend-sharding
pracucci Sep 17, 2020
6501e26
Review feedback.
pstibrany Sep 17, 2020
2058c50
Fix flag after master merge.
pstibrany Sep 17, 2020
78cf6b0
Fixed TestQueuesConsistency test.
pstibrany Sep 17, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

## master / unreleased

* [FEATURE] Added support for shuffle-sharding queriers in the query-frontend. When configured (`-frontend.max-queriers-per-user` globally, or using per-user limit `max_queriers_per_user`), each user's requests will be handled by different set of queriers. #3113
* [ENHANCEMENT] Ingester: added new metric `cortex_ingester_active_series` to track active series more accurately. Also added options to control whether active series tracking is enabled (`-ingester.active-series-enabled`, defaults to false), and how often this metric is updated (`-ingester.active-series-update-period`) and max idle time for series to be considered inactive (`-ingester.active-series-idle-timeout`). #3153
* [BUGFIX] Ruler: directories in the configured `rules-path` will be removed on startup and shutdown in order to ensure they don't persist between runs. #3195
* [BUGFIX] Handle hash-collisions in the query path. #3192
Expand Down
14 changes: 14 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -2414,6 +2414,11 @@ The `frontend_worker_config` configures the worker - running within the Cortex q
# CLI flag: -querier.dns-lookup-period
[dns_lookup_duration: <duration> | default = 10s]

# Querier ID, sent to frontend service to identify requests from the same
# querier. Defaults to hostname.
# CLI flag: -querier.id
[id: <string> | default = ""]

grpc_client_config:
# gRPC client max receive message size (bytes).
# CLI flag: -querier.frontend-client.grpc-max-recv-msg-size
Expand Down Expand Up @@ -2831,6 +2836,15 @@ The `limits_config` configures default and per-tenant limits imposed by Cortex s
# CLI flag: -frontend.max-cache-freshness
[max_cache_freshness: <duration> | default = 1m]

# Maximum number of queriers that can handle requests for a single user. If set
# to 0 or value higher than number of available queriers, *all* queriers will
# handle requests for the user. Each frontend will select the same set of
# queriers for the same user (given that all queriers are connected to all
# frontends). This option only works with queriers connecting to the
# query-frontend, not when using downstream URL.
# CLI flag: -frontend.max-queriers-per-user
[max_queriers_per_user: <int> | default = 0]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the PR for store-gateway shuffle sharding, a config called sharding_strategy was introduced. Is that something to be added here as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Store-gateways can use different sharding strategies. In case of queriers, no sharding was done before (which corresponds to using max_queriers_per_user: 0), and only shuffle-sharding is available (if max_queriers_per_user is greater than 0). We can introduce sharding_strategy option too, but personally I don't see a need for it here.

Copy link
Contributor

@pracucci pracucci Sep 16, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Food for thought (not having a strong opinion): I agree it's not required here, but could help with config consistency.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Food for thought (not having a strong opinion): I agree it's not required here, but could help with config consistency.

If we want to be consistent with other config options, we should also support "default" (non-shuffle-sharding) strategy with non-zero shard size.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's keep it as is for now. We marked this feature as experimental, which will allow us to eventually fine-tune the config before marking it stable.


# Duration to delay the evaluation of rules to ensure the underlying metrics
# have been pushed to Cortex.
# CLI flag: -ruler.evaluation-delay-duration
Expand Down
1 change: 1 addition & 0 deletions docs/configuration/v1-guarantees.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,4 @@ Currently experimental features are:
- gRPC Store.
- Querier support for querying chunks and blocks store at the same time.
- Tracking of active series and exporting them as metrics (`-ingester.active-series-metrics-enabled` and related flags)
- Shuffle-sharding of queriers in the query-frontend (i.e. use of `-frontend.max-queriers-per-user` flag with non-zero value).
155 changes: 155 additions & 0 deletions integration/querier_sharding_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
// +build requires_docker

package integration

import (
"strconv"
"sync"
"testing"
"time"

"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/prompb"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/cortexproject/cortex/integration/e2e"
e2ecache "github.com/cortexproject/cortex/integration/e2e/cache"
e2edb "github.com/cortexproject/cortex/integration/e2e/db"
"github.com/cortexproject/cortex/integration/e2ecortex"
)

func TestQuerierSharding(t *testing.T) {
runQuerierShardingTest(t, true)
}

func TestQuerierNoSharding(t *testing.T) {
runQuerierShardingTest(t, false)
}

func runQuerierShardingTest(t *testing.T, sharding bool) {
// Going to high starts hitting filedescriptor limit, since we run all queriers concurrently.
const numQueries = 100

s, err := e2e.NewScenario(networkName)
require.NoError(t, err)
defer s.Close()

memcached := e2ecache.NewMemcached()
consul := e2edb.NewConsul()
require.NoError(t, s.StartAndWaitReady(consul, memcached))

minio := e2edb.NewMinio(9000, BlocksStorageFlags["-blocks-storage.s3.bucket-name"])
require.NoError(t, s.StartAndWaitReady(minio))

flags := BlocksStorageFlags

flags = mergeFlags(flags, map[string]string{
"-querier.cache-results": "true",
"-querier.split-queries-by-interval": "24h",
"-querier.query-ingesters-within": "12h", // Required by the test on query /series out of ingesters time range
"-frontend.memcached.addresses": "dns+" + memcached.NetworkEndpoint(e2ecache.MemcachedPort),
"-querier.max-outstanding-requests-per-tenant": strconv.Itoa(numQueries), // To avoid getting errors.
})

if sharding {
// Use only single querier for each user.
flags["-frontend.max-queriers-per-user"] = "1"
}

// Start Cortex components.
queryFrontend := e2ecortex.NewQueryFrontendWithConfigFile("query-frontend", "", flags, "")
ingester := e2ecortex.NewIngesterWithConfigFile("ingester", consul.NetworkHTTPEndpoint(), "", flags, "")
distributor := e2ecortex.NewDistributorWithConfigFile("distributor", consul.NetworkHTTPEndpoint(), "", flags, "")

require.NoError(t, s.Start(queryFrontend))

querier1 := e2ecortex.NewQuerierWithConfigFile("querier-1", consul.NetworkHTTPEndpoint(), "", mergeFlags(flags, map[string]string{
"-querier.frontend-address": queryFrontend.NetworkGRPCEndpoint(),
}), "")
querier2 := e2ecortex.NewQuerierWithConfigFile("querier-2", consul.NetworkHTTPEndpoint(), "", mergeFlags(flags, map[string]string{
"-querier.frontend-address": queryFrontend.NetworkGRPCEndpoint(),
}), "")

require.NoError(t, s.StartAndWaitReady(querier1, querier2, ingester, distributor))
require.NoError(t, s.WaitReady(queryFrontend))

// Wait until distributor and queriers have updated the ring.
require.NoError(t, distributor.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total"))
require.NoError(t, querier1.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total"))
require.NoError(t, querier2.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total"))

// Push a series for each user to Cortex.
now := time.Now()

distClient, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), "", "", "", userID)
require.NoError(t, err)

var series []prompb.TimeSeries
series, expectedVector := generateSeries("series_1", now)

res, err := distClient.Push(series)
require.NoError(t, err)
require.Equal(t, 200, res.StatusCode)

// Send both queriers a single query, so that they both initialize their cortex_querier_request_duration_seconds metrics.
for _, q := range []*e2ecortex.CortexService{querier1, querier2} {
c, err := e2ecortex.NewClient("", q.HTTPEndpoint(), "", "", userID)
require.NoError(t, err)

_, err = c.Query("series_1", now)
require.NoError(t, err)
}

wg := sync.WaitGroup{}

// Run all queries concurrently to get better distribution of requests between queriers.
for i := 0; i < numQueries; i++ {
wg.Add(1)

go func() {
defer wg.Done()
c, err := e2ecortex.NewClient("", queryFrontend.HTTPEndpoint(), "", "", userID)
require.NoError(t, err)

result, err := c.Query("series_1", now)
require.NoError(t, err)
require.Equal(t, model.ValVector, result.Type())
assert.Equal(t, expectedVector, result.(model.Vector))
}()
}

wg.Wait()

require.NoError(t, queryFrontend.WaitSumMetrics(e2e.Equals(numQueries), "cortex_query_frontend_queries_total"))

// Verify that only single querier handled all the queries when sharding is enabled, otherwise queries have been fairly distributed across queriers.
q1Values, err := querier1.SumMetrics([]string{"cortex_querier_request_duration_seconds"}, e2e.WithMetricCount)
require.NoError(t, err)
require.Len(t, q1Values, 1)

q2Values, err := querier2.SumMetrics([]string{"cortex_querier_request_duration_seconds"}, e2e.WithMetricCount)
require.NoError(t, err)
require.Len(t, q2Values, 1)

total := q1Values[0] + q2Values[0]
diff := q1Values[0] - q2Values[0]
if diff < 0 {
diff = -diff
}

require.Equal(t, float64(numQueries), total-2) // Remove 2 requests used for metrics initialization.

if sharding {
require.Equal(t, float64(numQueries), diff)
} else {
require.InDelta(t, 0, diff, numQueries*0.20) // Both queriers should have roughly equal number of requests, with possible delta.
}

// Ensure no service-specific metrics prefix is used by the wrong service.
assertServiceMetricsPrefixes(t, Distributor, distributor)
assertServiceMetricsPrefixes(t, Ingester, ingester)
assertServiceMetricsPrefixes(t, Querier, querier1)
assertServiceMetricsPrefixes(t, Querier, querier2)
assertServiceMetricsPrefixes(t, QueryFrontend, queryFrontend)
}
2 changes: 1 addition & 1 deletion pkg/cortex/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,7 @@ func (t *Cortex) initQueryFrontend() (serv services.Service, err error) {
}
}

t.Frontend, err = frontend.New(t.Cfg.Frontend, util.Logger, prometheus.DefaultRegisterer)
t.Frontend, err = frontend.New(t.Cfg.Frontend, t.Overrides, util.Logger, prometheus.DefaultRegisterer)
if err != nil {
return
}
Expand Down
Loading