diff --git a/CHANGELOG.md b/CHANGELOG.md index 39ce61d8f69..cb930e7c6c0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,7 @@ ## master / unreleased * [CHANGE] Alertmanager: Validating new fields on the PagerDuty AM config. #5290 * [CHANGE] Ingester: Creating label `native-histogram-sample` on the `cortex_discarded_samples_total` to keep track of discarded native histogram samples. #5289 +* [ENHANCEMENT] Distributor/Ingester: Add span on push path #5319 * [FEATURE] Store Gateway: Add `max_downloaded_bytes_per_request` to limit max bytes to download per store gateway request. * [BUGFIX] Ruler: Validate if rule group can be safely converted back to rule group yaml from protobuf message #5265 * [BUGFIX] Querier: Convert gRPC `ResourceExhausted` status code from store gateway to 422 limit error. #5286 diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index 92698d5009e..f0603a6ac63 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -555,6 +555,9 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co return nil, err } + span, ctx := opentracing.StartSpanFromContext(ctx, "Distributor.Push") + defer span.Finish() + // We will report *this* request in the error too. inflight := d.inflightPushRequests.Inc() defer d.inflightPushRequests.Dec() @@ -572,9 +575,6 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co now := time.Now() d.activeUsers.UpdateUserTimestamp(userID, now) - source := util.GetSourceIPsFromOutgoingCtx(ctx) - - var firstPartialErr error removeReplica := false numSamples := 0 @@ -589,16 +589,6 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co // Count the total number of metadata in. d.incomingMetadata.WithLabelValues(userID).Add(float64(len(req.Metadata))) - // A WriteRequest can only contain series or metadata but not both. This might change in the future. - // For each timeseries or samples, we compute a hash to distribute across ingesters; - // check each sample/metadata and discard if outside limits. - validatedTimeseries := make([]cortexpb.PreallocTimeseries, 0, len(req.Timeseries)) - validatedMetadata := make([]*cortexpb.MetricMetadata, 0, len(req.Metadata)) - metadataKeys := make([]uint32, 0, len(req.Metadata)) - seriesKeys := make([]uint32, 0, len(req.Timeseries)) - validatedSamples := 0 - validatedExemplars := 0 - // Cache user limit with overrides so we spend less CPU doing locking. See issue #4904 limits := d.limits.GetOverridesForUser(userID) @@ -628,6 +618,135 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co } } + // A WriteRequest can only contain series or metadata but not both. This might change in the future. + seriesKeys, validatedTimeseries, validatedSamples, validatedExemplars, firstPartialErr, err := d.prepareSeriesKeys(ctx, req, userID, limits, removeReplica) + if err != nil { + return nil, err + } + metadataKeys, validatedMetadata, firstPartialErr := d.prepareMetadataKeys(req, limits, userID, firstPartialErr) + + d.receivedSamples.WithLabelValues(userID).Add(float64(validatedSamples)) + d.receivedExemplars.WithLabelValues(userID).Add(float64(validatedExemplars)) + d.receivedMetadata.WithLabelValues(userID).Add(float64(len(validatedMetadata))) + + if len(seriesKeys) == 0 && len(metadataKeys) == 0 { + // Ensure the request slice is reused if there's no series or metadata passing the validation. + cortexpb.ReuseSlice(req.Timeseries) + + return &cortexpb.WriteResponse{}, firstPartialErr + } + + totalN := validatedSamples + validatedExemplars + len(validatedMetadata) + if !d.ingestionRateLimiter.AllowN(now, userID, totalN) { + // Ensure the request slice is reused if the request is rate limited. + cortexpb.ReuseSlice(req.Timeseries) + + validation.DiscardedSamples.WithLabelValues(validation.RateLimited, userID).Add(float64(validatedSamples)) + validation.DiscardedExemplars.WithLabelValues(validation.RateLimited, userID).Add(float64(validatedExemplars)) + validation.DiscardedMetadata.WithLabelValues(validation.RateLimited, userID).Add(float64(len(validatedMetadata))) + // Return a 429 here to tell the client it is going too fast. + // Client may discard the data or slow down and re-send. + // Prometheus v2.26 added a remote-write option 'retry_on_http_429'. + return nil, httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (%v) exceeded while adding %d samples and %d metadata", d.ingestionRateLimiter.Limit(now, userID), validatedSamples, len(validatedMetadata)) + } + + // totalN included samples and metadata. Ingester follows this pattern when computing its ingestion rate. + d.ingestionRate.Add(int64(totalN)) + + subRing := d.ingestersRing + + // Obtain a subring if required. + if d.cfg.ShardingStrategy == util.ShardingStrategyShuffle { + subRing = d.ingestersRing.ShuffleShard(userID, limits.IngestionTenantShardSize) + } + + keys := append(seriesKeys, metadataKeys...) + initialMetadataIndex := len(seriesKeys) + + err = d.doBatch(ctx, req, subRing, keys, initialMetadataIndex, validatedMetadata, validatedTimeseries, userID) + if err != nil { + return nil, err + } + + return &cortexpb.WriteResponse{}, firstPartialErr +} + +func (d *Distributor) doBatch(ctx context.Context, req *cortexpb.WriteRequest, subRing ring.ReadRing, keys []uint32, initialMetadataIndex int, validatedMetadata []*cortexpb.MetricMetadata, validatedTimeseries []cortexpb.PreallocTimeseries, userID string) error { + span, _ := opentracing.StartSpanFromContext(ctx, "doBatch") + defer span.Finish() + + // Use a background context to make sure all ingesters get samples even if we return early + localCtx, cancel := context.WithTimeout(context.Background(), d.cfg.RemoteTimeout) + localCtx = user.InjectOrgID(localCtx, userID) + if sp := opentracing.SpanFromContext(ctx); sp != nil { + localCtx = opentracing.ContextWithSpan(localCtx, sp) + } + // Get any HTTP headers that are supposed to be added to logs and add to localCtx for later use + if headerMap := util_log.HeaderMapFromContext(ctx); headerMap != nil { + localCtx = util_log.ContextWithHeaderMap(localCtx, headerMap) + } + // Get clientIP(s) from Context and add it to localCtx + source := util.GetSourceIPsFromOutgoingCtx(ctx) + localCtx = util.AddSourceIPsToOutgoingContext(localCtx, source) + + op := ring.WriteNoExtend + if d.cfg.ExtendWrites { + op = ring.Write + } + + return ring.DoBatch(ctx, op, subRing, keys, func(ingester ring.InstanceDesc, indexes []int) error { + timeseries := make([]cortexpb.PreallocTimeseries, 0, len(indexes)) + var metadata []*cortexpb.MetricMetadata + + for _, i := range indexes { + if i >= initialMetadataIndex { + metadata = append(metadata, validatedMetadata[i-initialMetadataIndex]) + } else { + timeseries = append(timeseries, validatedTimeseries[i]) + } + } + + return d.send(localCtx, ingester, timeseries, metadata, req.Source) + }, func() { + cortexpb.ReuseSlice(req.Timeseries) + cancel() + }) +} + +func (d *Distributor) prepareMetadataKeys(req *cortexpb.WriteRequest, limits *validation.Limits, userID string, firstPartialErr error) ([]uint32, []*cortexpb.MetricMetadata, error) { + validatedMetadata := make([]*cortexpb.MetricMetadata, 0, len(req.Metadata)) + metadataKeys := make([]uint32, 0, len(req.Metadata)) + + for _, m := range req.Metadata { + err := validation.ValidateMetadata(limits, userID, m) + + if err != nil { + if firstPartialErr == nil { + firstPartialErr = err + } + + continue + } + + metadataKeys = append(metadataKeys, d.tokenForMetadata(userID, m.MetricFamilyName)) + validatedMetadata = append(validatedMetadata, m) + } + return metadataKeys, validatedMetadata, firstPartialErr +} + +func (d *Distributor) prepareSeriesKeys(ctx context.Context, req *cortexpb.WriteRequest, userID string, limits *validation.Limits, removeReplica bool) ([]uint32, []cortexpb.PreallocTimeseries, int, int, error, error) { + pSpan, _ := opentracing.StartSpanFromContext(ctx, "prepareSeriesKeys") + defer pSpan.Finish() + + // For each timeseries or samples, we compute a hash to distribute across ingesters; + // check each sample/metadata and discard if outside limits. + validatedTimeseries := make([]cortexpb.PreallocTimeseries, 0, len(req.Timeseries)) + seriesKeys := make([]uint32, 0, len(req.Timeseries)) + validatedSamples := 0 + validatedExemplars := 0 + + var firstPartialErr error + latestSampleTimestampMs := int64(0) defer func() { // Update this metric even in case of errors. @@ -638,7 +757,6 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co // For each timeseries, compute a hash to distribute across ingesters; // check each sample and discard if outside limits. - skipLabelNameValidation := d.cfg.SkipLabelNameValidation || req.GetSkipLabelNameValidation() for _, ts := range req.Timeseries { // Use timestamp of latest sample in the series. If samples for series are not ordered, metric for user may be wrong. @@ -690,7 +808,7 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co // label and dropped labels (if any) key, err := d.tokenForLabels(userID, ts.Labels) if err != nil { - return nil, err + return nil, nil, 0, 0, nil, err } validatedSeries, validationErr := d.validateSeries(ts, userID, skipLabelNameValidation, limits) @@ -712,100 +830,7 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co validatedSamples += len(ts.Samples) validatedExemplars += len(ts.Exemplars) } - - for _, m := range req.Metadata { - err := validation.ValidateMetadata(limits, userID, m) - - if err != nil { - if firstPartialErr == nil { - firstPartialErr = err - } - - continue - } - - metadataKeys = append(metadataKeys, d.tokenForMetadata(userID, m.MetricFamilyName)) - validatedMetadata = append(validatedMetadata, m) - } - - d.receivedSamples.WithLabelValues(userID).Add(float64(validatedSamples)) - d.receivedExemplars.WithLabelValues(userID).Add((float64(validatedExemplars))) - d.receivedMetadata.WithLabelValues(userID).Add(float64(len(validatedMetadata))) - - if len(seriesKeys) == 0 && len(metadataKeys) == 0 { - // Ensure the request slice is reused if there's no series or metadata passing the validation. - cortexpb.ReuseSlice(req.Timeseries) - - return &cortexpb.WriteResponse{}, firstPartialErr - } - - totalN := validatedSamples + validatedExemplars + len(validatedMetadata) - if !d.ingestionRateLimiter.AllowN(now, userID, totalN) { - // Ensure the request slice is reused if the request is rate limited. - cortexpb.ReuseSlice(req.Timeseries) - - validation.DiscardedSamples.WithLabelValues(validation.RateLimited, userID).Add(float64(validatedSamples)) - validation.DiscardedExemplars.WithLabelValues(validation.RateLimited, userID).Add(float64(validatedExemplars)) - validation.DiscardedMetadata.WithLabelValues(validation.RateLimited, userID).Add(float64(len(validatedMetadata))) - // Return a 429 here to tell the client it is going too fast. - // Client may discard the data or slow down and re-send. - // Prometheus v2.26 added a remote-write option 'retry_on_http_429'. - return nil, httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (%v) exceeded while adding %d samples and %d metadata", d.ingestionRateLimiter.Limit(now, userID), validatedSamples, len(validatedMetadata)) - } - - // totalN included samples and metadata. Ingester follows this pattern when computing its ingestion rate. - d.ingestionRate.Add(int64(totalN)) - - subRing := d.ingestersRing - - // Obtain a subring if required. - if d.cfg.ShardingStrategy == util.ShardingStrategyShuffle { - subRing = d.ingestersRing.ShuffleShard(userID, limits.IngestionTenantShardSize) - } - - // Use a background context to make sure all ingesters get samples even if we return early - localCtx, cancel := context.WithTimeout(context.Background(), d.cfg.RemoteTimeout) - localCtx = user.InjectOrgID(localCtx, userID) - if sp := opentracing.SpanFromContext(ctx); sp != nil { - localCtx = opentracing.ContextWithSpan(localCtx, sp) - } - // Get any HTTP headers that are supposed to be added to logs and add to localCtx for later use - if headerMap := util_log.HeaderMapFromContext(ctx); headerMap != nil { - localCtx = util_log.ContextWithHeaderMap(localCtx, headerMap) - } - // Get clientIP(s) from Context and add it to localCtx - localCtx = util.AddSourceIPsToOutgoingContext(localCtx, source) - - keys := append(seriesKeys, metadataKeys...) - initialMetadataIndex := len(seriesKeys) - - op := ring.WriteNoExtend - if d.cfg.ExtendWrites { - op = ring.Write - } - - err = ring.DoBatch(ctx, op, subRing, keys, func(ingester ring.InstanceDesc, indexes []int) error { - timeseries := make([]cortexpb.PreallocTimeseries, 0, len(indexes)) - var metadata []*cortexpb.MetricMetadata - - for _, i := range indexes { - if i >= initialMetadataIndex { - metadata = append(metadata, validatedMetadata[i-initialMetadataIndex]) - } else { - timeseries = append(timeseries, validatedTimeseries[i]) - } - } - - return d.send(localCtx, ingester, timeseries, metadata, req.Source) - }, func() { - cortexpb.ReuseSlice(req.Timeseries) - cancel() - }) - - if err != nil { - return nil, err - } - return &cortexpb.WriteResponse{}, firstPartialErr + return seriesKeys, validatedTimeseries, validatedSamples, validatedExemplars, firstPartialErr, nil } func sortLabelsIfNeeded(labels []cortexpb.LabelAdapter) { diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 263e5c4cc57..c2aab448322 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -20,6 +20,7 @@ import ( "github.com/go-kit/log/level" "github.com/gogo/status" "github.com/oklog/ulid" + "github.com/opentracing/opentracing-go" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" @@ -936,6 +937,9 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte return nil, err } + span, ctx := opentracing.StartSpanFromContext(ctx, "Ingester.Push") + defer span.Finish() + // We will report *this* request in the error too. inflight := i.inflightPushRequests.Inc() defer i.inflightPushRequests.Dec() diff --git a/pkg/ring/batch.go b/pkg/ring/batch.go index db3aa4800fe..5278cfecdbb 100644 --- a/pkg/ring/batch.go +++ b/pkg/ring/batch.go @@ -70,6 +70,7 @@ func DoBatch(ctx context.Context, op Operation, r ReadRing, keys []uint32, callb cleanup() return fmt.Errorf("DoBatch: InstancesCount <= 0") } + expectedTrackers := len(keys) * (r.ReplicationFactor() + 1) / r.InstancesCount() itemTrackers := make([]itemTracker, len(keys)) instances := make(map[string]instance, r.InstancesCount())