Skip to content

Commit 9084659

Browse files
authored
Enforced -querier.max-query-lookback in the query-frontend for range queries (#3458)
* Enforced -querier.max-query-lookback in the query-frontend for range queries Signed-off-by: Marco Pracucci <[email protected]> * Addressed review comments Signed-off-by: Marco Pracucci <[email protected]>
1 parent f0c81bb commit 9084659

18 files changed

+402
-131
lines changed

CHANGELOG.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
* [ENHANCEMENT] Enforced keepalive on all gRPC clients used for inter-service communication. #3431
1212
* [ENHANCEMENT] Added `cortex_alertmanager_config_hash` metric to expose hash of Alertmanager Config loaded per user. #3388
1313
* [ENHANCEMENT] Query-Frontend / Query-Scheduler: New component called "Query-Scheduler" has been introduced. Query-Scheduler is simply a queue of requests, moved outside of Query-Frontend. This allows Query-Frontend to be scaled separately from number of queues. To make Query-Frontend and Querier use Query-Scheduler, they need to be started with `-frontend.scheduler-address` and `-querier.scheduler-address` options respectively. #3374
14-
* [ENHANCEMENT] Querier: added `-querier.max-query-lookback` to limit how long back data (series and metadata) can be queried. This setting can be overridden on a per-tenant basis. #3452
14+
* [ENHANCEMENT] Query-frontend / Querier / Ruler: added `-querier.max-query-lookback` to limit how long back data (series and metadata) can be queried. This setting can be overridden on a per-tenant basis and is enforced in the query-frontend, querier and ruler. #3452 #3458
1515
* [BUGFIX] Blocks storage ingester: fixed some cases leading to a TSDB WAL corruption after a partial write to disk. #3423
1616
* [BUGFIX] Blocks storage: Fix the race between ingestion and `/flush` call resulting in overlapping blocks. #3422
1717
* [BUGFIX] Querier: fixed `-querier.max-query-into-future` which wasn't correctly enforced on range queries. #3452

docs/blocks-storage/querier.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -414,7 +414,7 @@ blocks_storage:
414414
[max_get_multi_concurrency: <int> | default = 100]
415415
416416
# The maximum number of keys a single underlying get operation should
417-
# run. If more keys are specified, internally keys are splitted into
417+
# run. If more keys are specified, internally keys are split into
418418
# multiple batches and fetched concurrently, honoring the max
419419
# concurrency. If set to 0, the max batch size is unlimited.
420420
# CLI flag: -blocks-storage.bucket-store.index-cache.memcached.max-get-multi-batch-size
@@ -465,7 +465,7 @@ blocks_storage:
465465
[max_get_multi_concurrency: <int> | default = 100]
466466
467467
# The maximum number of keys a single underlying get operation should
468-
# run. If more keys are specified, internally keys are splitted into
468+
# run. If more keys are specified, internally keys are split into
469469
# multiple batches and fetched concurrently, honoring the max
470470
# concurrency. If set to 0, the max batch size is unlimited.
471471
# CLI flag: -blocks-storage.bucket-store.chunks-cache.memcached.max-get-multi-batch-size
@@ -531,7 +531,7 @@ blocks_storage:
531531
[max_get_multi_concurrency: <int> | default = 100]
532532
533533
# The maximum number of keys a single underlying get operation should
534-
# run. If more keys are specified, internally keys are splitted into
534+
# run. If more keys are specified, internally keys are split into
535535
# multiple batches and fetched concurrently, honoring the max
536536
# concurrency. If set to 0, the max batch size is unlimited.
537537
# CLI flag: -blocks-storage.bucket-store.metadata-cache.memcached.max-get-multi-batch-size

docs/blocks-storage/store-gateway.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -469,7 +469,7 @@ blocks_storage:
469469
[max_get_multi_concurrency: <int> | default = 100]
470470
471471
# The maximum number of keys a single underlying get operation should
472-
# run. If more keys are specified, internally keys are splitted into
472+
# run. If more keys are specified, internally keys are split into
473473
# multiple batches and fetched concurrently, honoring the max
474474
# concurrency. If set to 0, the max batch size is unlimited.
475475
# CLI flag: -blocks-storage.bucket-store.index-cache.memcached.max-get-multi-batch-size
@@ -520,7 +520,7 @@ blocks_storage:
520520
[max_get_multi_concurrency: <int> | default = 100]
521521
522522
# The maximum number of keys a single underlying get operation should
523-
# run. If more keys are specified, internally keys are splitted into
523+
# run. If more keys are specified, internally keys are split into
524524
# multiple batches and fetched concurrently, honoring the max
525525
# concurrency. If set to 0, the max batch size is unlimited.
526526
# CLI flag: -blocks-storage.bucket-store.chunks-cache.memcached.max-get-multi-batch-size
@@ -586,7 +586,7 @@ blocks_storage:
586586
[max_get_multi_concurrency: <int> | default = 100]
587587
588588
# The maximum number of keys a single underlying get operation should
589-
# run. If more keys are specified, internally keys are splitted into
589+
# run. If more keys are specified, internally keys are split into
590590
# multiple batches and fetched concurrently, honoring the max
591591
# concurrency. If set to 0, the max batch size is unlimited.
592592
# CLI flag: -blocks-storage.bucket-store.metadata-cache.memcached.max-get-multi-batch-size

docs/configuration/config-file-reference.md

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3049,7 +3049,10 @@ The `limits_config` configures default and per-tenant limits imposed by Cortex s
30493049
[max_chunks_per_query: <int> | default = 2000000]
30503050
30513051
# Limit how long back data (series and metadata) can be queried, up until
3052-
# <lookback> duration ago. 0 to disable.
3052+
# <lookback> duration ago. This limit is enforced in the query-frontend, querier
3053+
# and ruler. If the requested time range is outside the allowed range, the
3054+
# request will not fail but will be manipulated to only query data within the
3055+
# allowed time range. 0 to disable.
30533056
# CLI flag: -querier.max-query-lookback
30543057
[max_query_lookback: <duration> | default = 0s]
30553058
@@ -3059,7 +3062,7 @@ The `limits_config` configures default and per-tenant limits imposed by Cortex s
30593062
# CLI flag: -store.max-query-length
30603063
[max_query_length: <duration> | default = 0s]
30613064
3062-
# Maximum number of queries will be scheduled in parallel by the frontend.
3065+
# Maximum number of split queries will be scheduled in parallel by the frontend.
30633066
# CLI flag: -querier.max-query-parallelism
30643067
[max_query_parallelism: <int> | default = 14]
30653068
@@ -3589,7 +3592,7 @@ bucket_store:
35893592
[max_get_multi_concurrency: <int> | default = 100]
35903593
35913594
# The maximum number of keys a single underlying get operation should run.
3592-
# If more keys are specified, internally keys are splitted into multiple
3595+
# If more keys are specified, internally keys are split into multiple
35933596
# batches and fetched concurrently, honoring the max concurrency. If set
35943597
# to 0, the max batch size is unlimited.
35953598
# CLI flag: -blocks-storage.bucket-store.index-cache.memcached.max-get-multi-batch-size
@@ -3640,7 +3643,7 @@ bucket_store:
36403643
[max_get_multi_concurrency: <int> | default = 100]
36413644
36423645
# The maximum number of keys a single underlying get operation should run.
3643-
# If more keys are specified, internally keys are splitted into multiple
3646+
# If more keys are specified, internally keys are split into multiple
36443647
# batches and fetched concurrently, honoring the max concurrency. If set
36453648
# to 0, the max batch size is unlimited.
36463649
# CLI flag: -blocks-storage.bucket-store.chunks-cache.memcached.max-get-multi-batch-size
@@ -3705,7 +3708,7 @@ bucket_store:
37053708
[max_get_multi_concurrency: <int> | default = 100]
37063709
37073710
# The maximum number of keys a single underlying get operation should run.
3708-
# If more keys are specified, internally keys are splitted into multiple
3711+
# If more keys are specified, internally keys are split into multiple
37093712
# batches and fetched concurrently, honoring the max concurrency. If set
37103713
# to 0, the max batch size is unlimited.
37113714
# CLI flag: -blocks-storage.bucket-store.metadata-cache.memcached.max-get-multi-batch-size

pkg/querier/querier.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -512,7 +512,8 @@ func validateQueryTimeRange(ctx context.Context, userID string, startMs, endMs i
512512
// Make sure to log it in traces to ease debugging.
513513
level.Debug(spanlogger.FromContext(ctx)).Log(
514514
"msg", "the end time of the query has been manipulated because of the 'max query into future' setting",
515-
"original", origEndTime, "updated", endTime)
515+
"original", util.FormatTimeModel(origEndTime),
516+
"updated", util.FormatTimeModel(endTime))
516517

517518
if endTime.Before(startTime) {
518519
return 0, 0, errEmptyTimeRange
@@ -527,7 +528,8 @@ func validateQueryTimeRange(ctx context.Context, userID string, startMs, endMs i
527528
// Make sure to log it in traces to ease debugging.
528529
level.Debug(spanlogger.FromContext(ctx)).Log(
529530
"msg", "the start time of the query has been manipulated because of the 'max query lookback' setting",
530-
"original", origStartTime, "updated", startTime)
531+
"original", util.FormatTimeModel(origStartTime),
532+
"updated", util.FormatTimeModel(startTime))
531533

532534
if endTime.Before(startTime) {
533535
return 0, 0, errEmptyTimeRange

pkg/querier/queryrange/limits.go

Lines changed: 48 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -5,107 +5,92 @@ import (
55
"net/http"
66
"time"
77

8+
"github.com/go-kit/kit/log/level"
89
"github.com/prometheus/prometheus/pkg/timestamp"
910
"github.com/weaveworks/common/httpgrpc"
1011
"github.com/weaveworks/common/user"
1112

13+
"github.com/cortexproject/cortex/pkg/util"
14+
"github.com/cortexproject/cortex/pkg/util/spanlogger"
1215
"github.com/cortexproject/cortex/pkg/util/validation"
1316
)
1417

1518
// Limits allows us to specify per-tenant runtime limits on the behavior of
1619
// the query handling code.
1720
type Limits interface {
21+
// MaxQueryLookback returns the max lookback period of queries.
22+
MaxQueryLookback(userID string) time.Duration
23+
24+
// MaxQueryLength returns the limit of the length (in time) of a query.
1825
MaxQueryLength(string) time.Duration
26+
27+
// MaxQueryParallelism returns the limit to the number of split queries the
28+
// frontend will process in parallel.
1929
MaxQueryParallelism(string) int
30+
31+
// MaxCacheFreshness returns the period after which results are cacheable,
32+
// to prevent caching of very recent results.
2033
MaxCacheFreshness(string) time.Duration
2134
}
2235

23-
type limits struct {
36+
type limitsMiddleware struct {
2437
Limits
2538
next Handler
2639
}
2740

28-
// LimitsMiddleware creates a new Middleware that invalidates large queries based on Limits interface.
29-
func LimitsMiddleware(l Limits) Middleware {
41+
// NewLimitsMiddleware creates a new Middleware that enforces query limits.
42+
func NewLimitsMiddleware(l Limits) Middleware {
3043
return MiddlewareFunc(func(next Handler) Handler {
31-
return limits{
44+
return limitsMiddleware{
3245
next: next,
3346
Limits: l,
3447
}
3548
})
3649
}
3750

38-
func (l limits) Do(ctx context.Context, r Request) (Response, error) {
39-
userid, err := user.ExtractOrgID(ctx)
51+
func (l limitsMiddleware) Do(ctx context.Context, r Request) (Response, error) {
52+
log, ctx := spanlogger.New(ctx, "limits")
53+
defer log.Finish()
54+
55+
userID, err := user.ExtractOrgID(ctx)
4056
if err != nil {
4157
return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error())
4258
}
4359

44-
maxQueryLen := l.MaxQueryLength(userid)
45-
queryLen := timestamp.Time(r.GetEnd()).Sub(timestamp.Time(r.GetStart()))
46-
if maxQueryLen > 0 && queryLen > maxQueryLen {
47-
return nil, httpgrpc.Errorf(http.StatusBadRequest, validation.ErrQueryTooLong, queryLen, maxQueryLen)
48-
}
49-
return l.next.Do(ctx, r)
50-
}
60+
// Clamp the time range based on the max query lookback.
61+
if maxQueryLookback := l.MaxQueryLookback(userID); maxQueryLookback > 0 {
62+
minStartTime := util.TimeToMillis(time.Now().Add(-maxQueryLookback))
5163

52-
// RequestResponse contains a request response and the respective request that was used.
53-
type RequestResponse struct {
54-
Request Request
55-
Response Response
56-
}
64+
if r.GetEnd() < minStartTime {
65+
// The request is fully outside the allowed range, so we can return an
66+
// empty response.
67+
level.Debug(log).Log(
68+
"msg", "skipping the execution of the query because its time range is before the 'max query lookback' setting",
69+
"reqStart", util.FormatTimeMillis(r.GetStart()),
70+
"redEnd", util.FormatTimeMillis(r.GetEnd()),
71+
"maxQueryLookback", maxQueryLookback)
5772

58-
// DoRequests executes a list of requests in parallel. The limits parameters is used to limit parallelism per single request.
59-
func DoRequests(ctx context.Context, downstream Handler, reqs []Request, limits Limits) ([]RequestResponse, error) {
60-
userid, err := user.ExtractOrgID(ctx)
61-
if err != nil {
62-
return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error())
63-
}
73+
return NewEmptyPrometheusResponse(), nil
74+
}
6475

65-
// If one of the requests fail, we want to be able to cancel the rest of them.
66-
ctx, cancel := context.WithCancel(ctx)
67-
defer cancel()
76+
if r.GetStart() < minStartTime {
77+
// Replace the start time in the request.
78+
level.Debug(log).Log(
79+
"msg", "the start time of the query has been manipulated because of the 'max query lookback' setting",
80+
"original", util.FormatTimeMillis(r.GetStart()),
81+
"updated", util.FormatTimeMillis(minStartTime))
6882

69-
// Feed all requests to a bounded intermediate channel to limit parallelism.
70-
intermediate := make(chan Request)
71-
go func() {
72-
for _, req := range reqs {
73-
intermediate <- req
83+
r = r.WithStartEnd(minStartTime, r.GetEnd())
7484
}
75-
close(intermediate)
76-
}()
77-
78-
respChan, errChan := make(chan RequestResponse), make(chan error)
79-
parallelism := limits.MaxQueryParallelism(userid)
80-
if parallelism > len(reqs) {
81-
parallelism = len(reqs)
82-
}
83-
for i := 0; i < parallelism; i++ {
84-
go func() {
85-
for req := range intermediate {
86-
resp, err := downstream.Do(ctx, req)
87-
if err != nil {
88-
errChan <- err
89-
} else {
90-
respChan <- RequestResponse{req, resp}
91-
}
92-
}
93-
}()
9485
}
9586

96-
resps := make([]RequestResponse, 0, len(reqs))
97-
var firstErr error
98-
for range reqs {
99-
select {
100-
case resp := <-respChan:
101-
resps = append(resps, resp)
102-
case err := <-errChan:
103-
if firstErr == nil {
104-
cancel()
105-
firstErr = err
106-
}
87+
// Enforce the max query length.
88+
if maxQueryLength := l.MaxQueryLength(userID); maxQueryLength > 0 {
89+
queryLen := timestamp.Time(r.GetEnd()).Sub(timestamp.Time(r.GetStart()))
90+
if queryLen > maxQueryLength {
91+
return nil, httpgrpc.Errorf(http.StatusBadRequest, validation.ErrQueryTooLong, queryLen, maxQueryLength)
10792
}
10893
}
10994

110-
return resps, firstErr
95+
return l.next.Do(ctx, r)
11196
}

0 commit comments

Comments
 (0)