Skip to content

Cancel abandoned operations in ReplicationSet.Do() #3178

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Sep 17, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
* [BUGFIX] Configs: prevent validation of templates to fail when using template functions. #3157
* [BUGFIX] Configuring the S3 URL with an `@` but without username and password doesn't enable the AWS static credentials anymore. #3170
* [BUGFIX] Limit errors on ranged queries (`api/v1/query_range`) no longer return a status code `500` but `422` instead. #3167
* [BUGFIX] No-longer-needed ingester operations from queries are now canceled. #3178
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've just merged a PR to cut the CHANGELOG because of the upcoming 1.4.0 release. Could you rebase master and move your CHANGELOG entry to the top, under ## master / unreleased, please?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo in queriers.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I left the same comment, but Bryan correctly told it's intended to be "queries" because it affects both queriers and rulers.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, it confused two of us, so perhaps better wording like "operations caused by queries" would improve it?


## 1.3.0 / 2020-08-21

Expand Down
16 changes: 8 additions & 8 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -604,7 +604,7 @@ func (d *Distributor) send(ctx context.Context, ingester ring.IngesterDesc, time
}

// ForAllIngesters runs f, in parallel, for all ingesters
func (d *Distributor) ForAllIngesters(ctx context.Context, reallyAll bool, f func(client.IngesterClient) (interface{}, error)) ([]interface{}, error) {
func (d *Distributor) ForAllIngesters(ctx context.Context, reallyAll bool, f func(context.Context, client.IngesterClient) (interface{}, error)) ([]interface{}, error) {
replicationSet, err := d.ingestersRing.GetAll(ring.Read)
if err != nil {
return nil, err
Expand All @@ -613,13 +613,13 @@ func (d *Distributor) ForAllIngesters(ctx context.Context, reallyAll bool, f fun
replicationSet.MaxErrors = 0
}

return replicationSet.Do(ctx, d.cfg.ExtraQueryDelay, func(ing *ring.IngesterDesc) (interface{}, error) {
return replicationSet.Do(ctx, d.cfg.ExtraQueryDelay, func(ctx context.Context, ing *ring.IngesterDesc) (interface{}, error) {
client, err := d.ingesterPool.GetClientFor(ing.Addr)
if err != nil {
return nil, err
}

return f(client.(ingester_client.IngesterClient))
return f(ctx, client.(ingester_client.IngesterClient))
})
}

Expand All @@ -628,7 +628,7 @@ func (d *Distributor) LabelValuesForLabelName(ctx context.Context, labelName mod
req := &client.LabelValuesRequest{
LabelName: string(labelName),
}
resps, err := d.ForAllIngesters(ctx, false, func(client client.IngesterClient) (interface{}, error) {
resps, err := d.ForAllIngesters(ctx, false, func(ctx context.Context, client client.IngesterClient) (interface{}, error) {
return client.LabelValues(ctx, req)
})
if err != nil {
Expand All @@ -652,7 +652,7 @@ func (d *Distributor) LabelValuesForLabelName(ctx context.Context, labelName mod
// LabelNames returns all of the label names.
func (d *Distributor) LabelNames(ctx context.Context) ([]string, error) {
req := &client.LabelNamesRequest{}
resps, err := d.ForAllIngesters(ctx, false, func(client client.IngesterClient) (interface{}, error) {
resps, err := d.ForAllIngesters(ctx, false, func(ctx context.Context, client client.IngesterClient) (interface{}, error) {
return client.LabelNames(ctx, req)
})
if err != nil {
Expand Down Expand Up @@ -684,7 +684,7 @@ func (d *Distributor) MetricsForLabelMatchers(ctx context.Context, from, through
return nil, err
}

resps, err := d.ForAllIngesters(ctx, false, func(client client.IngesterClient) (interface{}, error) {
resps, err := d.ForAllIngesters(ctx, false, func(ctx context.Context, client client.IngesterClient) (interface{}, error) {
return client.MetricsForLabelMatchers(ctx, req)
})
if err != nil {
Expand Down Expand Up @@ -712,7 +712,7 @@ func (d *Distributor) MetricsForLabelMatchers(ctx context.Context, from, through
func (d *Distributor) MetricsMetadata(ctx context.Context) ([]scrape.MetricMetadata, error) {
req := &ingester_client.MetricsMetadataRequest{}
// TODO(gotjosh): We only need to look in all the ingesters if shardByAllLabels is enabled.
resps, err := d.ForAllIngesters(ctx, false, func(client client.IngesterClient) (interface{}, error) {
resps, err := d.ForAllIngesters(ctx, false, func(ctx context.Context, client client.IngesterClient) (interface{}, error) {
return client.MetricsMetadata(ctx, req)
})
if err != nil {
Expand Down Expand Up @@ -746,7 +746,7 @@ func (d *Distributor) MetricsMetadata(ctx context.Context) ([]scrape.MetricMetad
// UserStats returns statistics about the current user.
func (d *Distributor) UserStats(ctx context.Context) (*UserStats, error) {
req := &client.UserStatsRequest{}
resps, err := d.ForAllIngesters(ctx, true, func(client client.IngesterClient) (interface{}, error) {
resps, err := d.ForAllIngesters(ctx, true, func(ctx context.Context, client client.IngesterClient) (interface{}, error) {
return client.UserStats(ctx, req)
})
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions pkg/distributor/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func (d *Distributor) queryPrep(ctx context.Context, from, to model.Time, matche
func (d *Distributor) queryIngesters(ctx context.Context, replicationSet ring.ReplicationSet, req *client.QueryRequest) (model.Matrix, error) {
// Fetch samples from multiple ingesters in parallel, using the replicationSet
// to deal with consistency.
results, err := replicationSet.Do(ctx, d.cfg.ExtraQueryDelay, func(ing *ring.IngesterDesc) (interface{}, error) {
results, err := replicationSet.Do(ctx, d.cfg.ExtraQueryDelay, func(ctx context.Context, ing *ring.IngesterDesc) (interface{}, error) {
client, err := d.ingesterPool.GetClientFor(ing.Addr)
if err != nil {
return nil, err
Expand Down Expand Up @@ -133,7 +133,7 @@ func (d *Distributor) queryIngesters(ctx context.Context, replicationSet ring.Re
// queryIngesterStream queries the ingesters using the new streaming API.
func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSet ring.ReplicationSet, req *client.QueryRequest) (*ingester_client.QueryStreamResponse, error) {
// Fetch samples from multiple ingesters
results, err := replicationSet.Do(ctx, d.cfg.ExtraQueryDelay, func(ing *ring.IngesterDesc) (interface{}, error) {
results, err := replicationSet.Do(ctx, d.cfg.ExtraQueryDelay, func(ctx context.Context, ing *ring.IngesterDesc) (interface{}, error) {
client, err := d.ingesterPool.GetClientFor(ing.Addr)
if err != nil {
return nil, err
Expand Down
12 changes: 5 additions & 7 deletions pkg/ring/replication_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,15 @@ type ReplicationSet struct {

// Do function f in parallel for all replicas in the set, erroring is we exceed
// MaxErrors and returning early otherwise.
func (r ReplicationSet) Do(ctx context.Context, delay time.Duration, f func(*IngesterDesc) (interface{}, error)) ([]interface{}, error) {
func (r ReplicationSet) Do(ctx context.Context, delay time.Duration, f func(context.Context, *IngesterDesc) (interface{}, error)) ([]interface{}, error) {
var (
errs = make(chan error, len(r.Ingesters))
resultsChan = make(chan interface{}, len(r.Ingesters))
minSuccess = len(r.Ingesters) - r.MaxErrors
done = make(chan struct{})
forceStart = make(chan struct{}, r.MaxErrors)
)
defer func() {
close(done)
}()
ctx, cancel := context.WithCancel(ctx)
defer cancel()

for i := range r.Ingesters {
go func(i int, ing *IngesterDesc) {
Expand All @@ -33,13 +31,13 @@ func (r ReplicationSet) Do(ctx context.Context, delay time.Duration, f func(*Ing
after := time.NewTimer(delay)
defer after.Stop()
select {
case <-done:
case <-ctx.Done():
return
case <-forceStart:
case <-after.C:
}
}
result, err := f(ing)
result, err := f(ctx, ing)
if err != nil {
errs <- err
} else {
Expand Down