Skip to content

Allow ingesting native histograms #5986

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Jun 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
## master / unreleased
* [CHANGE] Upgrade Dockerfile Node version from 14x to 18x. #5906
* [CHANGE] Ingester: Remove `-querier.query-store-for-labels-enabled` flag. Querying long-term store for labels is always enabled. #5984
* [FEATURE] Ingester: Experimental: Enable native histogram ingestion via `-blocks-storage.tsdb.enable-native-histograms` flag. #5986
* [ENHANCEMENT] rulers: Add support to persist tokens in rulers. #5987
* [ENHANCEMENT] Query Frontend/Querier: Added store gateway postings touched count and touched size in Querier stats and log in Query Frontend. #5892
* [ENHANCEMENT] Query Frontend/Querier: Returns `warnings` on prometheus query responses. #5916
Expand Down
4 changes: 4 additions & 0 deletions docs/blocks-storage/querier.md
Original file line number Diff line number Diff line change
Expand Up @@ -1441,4 +1441,8 @@ blocks_storage:
# be out-of-order.
# CLI flag: -blocks-storage.tsdb.out-of-order-cap-max
[out_of_order_cap_max: <int> | default = 32]

# [EXPERIMENTAL] True to enable native histogram.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

# CLI flag: -blocks-storage.tsdb.enable-native-histograms
[enable_native_histograms: <boolean> | default = false]
```
4 changes: 4 additions & 0 deletions docs/blocks-storage/store-gateway.md
Original file line number Diff line number Diff line change
Expand Up @@ -1566,4 +1566,8 @@ blocks_storage:
# be out-of-order.
# CLI flag: -blocks-storage.tsdb.out-of-order-cap-max
[out_of_order_cap_max: <int> | default = 32]

# [EXPERIMENTAL] True to enable native histogram.
# CLI flag: -blocks-storage.tsdb.enable-native-histograms
[enable_native_histograms: <boolean> | default = false]
```
4 changes: 4 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -1998,6 +1998,10 @@ tsdb:
# be out-of-order.
# CLI flag: -blocks-storage.tsdb.out-of-order-cap-max
[out_of_order_cap_max: <int> | default = 32]

# [EXPERIMENTAL] True to enable native histogram.
# CLI flag: -blocks-storage.tsdb.enable-native-histograms
[enable_native_histograms: <boolean> | default = false]
```

### `compactor_config`
Expand Down
2 changes: 2 additions & 0 deletions docs/configuration/v1-guarantees.md
Original file line number Diff line number Diff line change
Expand Up @@ -111,3 +111,5 @@ Currently experimental features are:
- OTLP Receiver
- Persistent tokens in the Ruler Ring:
- `-ruler.ring.tokens-file-path` (path) CLI flag
- Native Histograms
- Ingestion can be enabled by setting `-blocks-storage.tsdb.enable-native-histograms=true` on Ingester.
10 changes: 7 additions & 3 deletions pkg/cortexpb/compat.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
"github.com/cortexproject/cortex/pkg/util"
)

// ToWriteRequest converts matched slices of Labels, Samples and Metadata into a WriteRequest proto.
// ToWriteRequest converts matched slices of Labels, Samples, Metadata and Histograms into a WriteRequest proto.
// It gets timeseries from the pool, so ReuseSlice() should be called when done.
func ToWriteRequest(lbls []labels.Labels, samples []Sample, metadata []*MetricMetadata, histograms []Histogram, source WriteRequest_SourceEnum) *WriteRequest {
req := &WriteRequest{
Expand All @@ -27,13 +27,17 @@ func ToWriteRequest(lbls []labels.Labels, samples []Sample, metadata []*MetricMe
Source: source,
}

for i, s := range samples {
i := 0
for i < len(samples) || i < len(histograms) {
ts := TimeseriesFromPool()
ts.Labels = append(ts.Labels, FromLabelsToLabelAdapters(lbls[i])...)
ts.Samples = append(ts.Samples, s)
if i < len(samples) {
ts.Samples = append(ts.Samples, samples[i])
}
if i < len(histograms) {
ts.Histograms = append(ts.Histograms, histograms[i])
}
i++
req.Timeseries = append(req.Timeseries, PreallocTimeseries{TimeSeries: ts})
}

Expand Down
120 changes: 120 additions & 0 deletions pkg/cortexpb/histograms.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
// Copyright 2017 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package cortexpb

import "github.com/prometheus/prometheus/model/histogram"

func (h Histogram) IsFloatHistogram() bool {
_, ok := h.GetCount().(*Histogram_CountFloat)
return ok
}

// HistogramProtoToHistogram extracts a (normal integer) Histogram from the
// provided proto message. The caller has to make sure that the proto message
// represents an interger histogram and not a float histogram.
// Changed from https://github.com/prometheus/prometheus/blob/0ab95536115adfe50af249d36d73674be694ca3f/storage/remote/codec.go#L626-L645
func HistogramProtoToHistogram(hp Histogram) *histogram.Histogram {
if hp.IsFloatHistogram() {
panic("HistogramProtoToHistogram called with a float histogram")
}
return &histogram.Histogram{
CounterResetHint: histogram.CounterResetHint(hp.ResetHint),
Schema: hp.Schema,
ZeroThreshold: hp.ZeroThreshold,
ZeroCount: hp.GetZeroCountInt(),
Count: hp.GetCountInt(),
Sum: hp.Sum,
PositiveSpans: spansProtoToSpans(hp.GetPositiveSpans()),
PositiveBuckets: hp.GetPositiveDeltas(),
NegativeSpans: spansProtoToSpans(hp.GetNegativeSpans()),
NegativeBuckets: hp.GetNegativeDeltas(),
}
}

// FloatHistogramProtoToFloatHistogram extracts a float Histogram from the provided proto message.
// The caller has to make sure that the proto message represents a float histogram and not an
// integer histogram, or it panics.
// Changed from https://github.com/prometheus/prometheus/blob/0ab95536115adfe50af249d36d73674be694ca3f/storage/remote/codec.go#L647-L667
func FloatHistogramProtoToFloatHistogram(hp Histogram) *histogram.FloatHistogram {
if !hp.IsFloatHistogram() {
panic("FloatHistogramProtoToFloatHistogram called with an integer histogram")
}
return &histogram.FloatHistogram{
CounterResetHint: histogram.CounterResetHint(hp.ResetHint),
Schema: hp.Schema,
ZeroThreshold: hp.ZeroThreshold,
ZeroCount: hp.GetZeroCountFloat(),
Count: hp.GetCountFloat(),
Sum: hp.Sum,
PositiveSpans: spansProtoToSpans(hp.GetPositiveSpans()),
PositiveBuckets: hp.GetPositiveCounts(),
NegativeSpans: spansProtoToSpans(hp.GetNegativeSpans()),
NegativeBuckets: hp.GetNegativeCounts(),
}
}

// HistogramToHistogramProto converts a (normal integer) Histogram to its protobuf message type.
// Changed from https://github.com/prometheus/prometheus/blob/0ab95536115adfe50af249d36d73674be694ca3f/storage/remote/codec.go#L709-L723
func HistogramToHistogramProto(timestamp int64, h *histogram.Histogram) Histogram {
return Histogram{
Count: &Histogram_CountInt{CountInt: h.Count},
Sum: h.Sum,
Schema: h.Schema,
ZeroThreshold: h.ZeroThreshold,
ZeroCount: &Histogram_ZeroCountInt{ZeroCountInt: h.ZeroCount},
NegativeSpans: spansToSpansProto(h.NegativeSpans),
NegativeDeltas: h.NegativeBuckets,
PositiveSpans: spansToSpansProto(h.PositiveSpans),
PositiveDeltas: h.PositiveBuckets,
ResetHint: Histogram_ResetHint(h.CounterResetHint),
TimestampMs: timestamp,
}
}

// FloatHistogramToHistogramProto converts a float Histogram to a normal
// Histogram's protobuf message type.
// Changed from https://github.com/prometheus/prometheus/blob/0ab95536115adfe50af249d36d73674be694ca3f/storage/remote/codec.go#L725-L739
func FloatHistogramToHistogramProto(timestamp int64, fh *histogram.FloatHistogram) Histogram {
return Histogram{
Count: &Histogram_CountFloat{CountFloat: fh.Count},
Sum: fh.Sum,
Schema: fh.Schema,
ZeroThreshold: fh.ZeroThreshold,
ZeroCount: &Histogram_ZeroCountFloat{ZeroCountFloat: fh.ZeroCount},
NegativeSpans: spansToSpansProto(fh.NegativeSpans),
NegativeCounts: fh.NegativeBuckets,
PositiveSpans: spansToSpansProto(fh.PositiveSpans),
PositiveCounts: fh.PositiveBuckets,
ResetHint: Histogram_ResetHint(fh.CounterResetHint),
TimestampMs: timestamp,
}
}

func spansProtoToSpans(s []BucketSpan) []histogram.Span {
spans := make([]histogram.Span, len(s))
for i := 0; i < len(s); i++ {
spans[i] = histogram.Span{Offset: s[i].Offset, Length: s[i].Length}
}

return spans
}

func spansToSpansProto(s []histogram.Span) []BucketSpan {
spans := make([]BucketSpan, len(s))
for i := 0; i < len(s); i++ {
spans[i] = BucketSpan{Offset: s[i].Offset, Length: s[i].Length}
}

return spans
}
68 changes: 43 additions & 25 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,9 @@ const (
// mergeSlicesParallelism is a constant of how much go routines we should use to merge slices, and
// it was based on empirical observation: See BenchmarkMergeSlicesParallel
mergeSlicesParallelism = 8

sampleMetricTypeFloat = "float"
sampleMetricTypeHistogram = "histogram"
)

// Distributor is a storage.SampleAppender and a client.Querier which
Expand Down Expand Up @@ -276,7 +279,7 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove
Namespace: "cortex",
Name: "distributor_received_samples_total",
Help: "The total number of received samples, excluding rejected and deduped samples.",
}, []string{"user"}),
}, []string{"user", "type"}),
receivedExemplars: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Namespace: "cortex",
Name: "distributor_received_exemplars_total",
Expand All @@ -291,7 +294,7 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove
Namespace: "cortex",
Name: "distributor_samples_in_total",
Help: "The total number of samples that have come in to the distributor, including rejected or deduped samples.",
}, []string{"user"}),
}, []string{"user", "type"}),
incomingExemplars: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Namespace: "cortex",
Name: "distributor_exemplars_in_total",
Expand Down Expand Up @@ -428,10 +431,12 @@ func (d *Distributor) cleanupInactiveUser(userID string) {

d.HATracker.CleanupHATrackerMetricsForUser(userID)

d.receivedSamples.DeleteLabelValues(userID)
d.receivedSamples.DeleteLabelValues(userID, sampleMetricTypeFloat)
d.receivedSamples.DeleteLabelValues(userID, sampleMetricTypeHistogram)
d.receivedExemplars.DeleteLabelValues(userID)
d.receivedMetadata.DeleteLabelValues(userID)
d.incomingSamples.DeleteLabelValues(userID)
d.incomingSamples.DeleteLabelValues(userID, sampleMetricTypeFloat)
d.incomingSamples.DeleteLabelValues(userID, sampleMetricTypeHistogram)
d.incomingExemplars.DeleteLabelValues(userID)
d.incomingMetadata.DeleteLabelValues(userID)
d.nonHASamples.DeleteLabelValues(userID)
Expand Down Expand Up @@ -547,7 +552,7 @@ func (d *Distributor) validateSeries(ts cortexpb.PreallocTimeseries, userID stri
// Only alloc when data present
samples = make([]cortexpb.Sample, 0, len(ts.Samples))
for _, s := range ts.Samples {
if err := validation.ValidateSample(d.validateMetrics, limits, userID, ts.Labels, s); err != nil {
if err := validation.ValidateSampleTimestamp(d.validateMetrics, limits, userID, ts.Labels, s.TimestampMs); err != nil {
return emptyPreallocSeries, err
}
samples = append(samples, s)
Expand All @@ -574,8 +579,13 @@ func (d *Distributor) validateSeries(ts cortexpb.PreallocTimeseries, userID stri
if len(ts.Histograms) > 0 {
// Only alloc when data present
histograms = make([]cortexpb.Histogram, 0, len(ts.Histograms))
// TODO(yeya24): we need to have validations for native histograms
// at some point. Skip validations for now.
for _, h := range ts.Histograms {
// TODO(yeya24): add other validations for native histogram.
// For example, Prometheus scrape has bucket limit and schema check.
if err := validation.ValidateSampleTimestamp(d.validateMetrics, limits, userID, ts.Labels, h.TimestampMs); err != nil {
return emptyPreallocSeries, err
}
}
histograms = append(histograms, ts.Histograms...)
}

Expand Down Expand Up @@ -607,14 +617,17 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co
now := time.Now()
d.activeUsers.UpdateUserTimestamp(userID, now)

numSamples := 0
numFloatSamples := 0
numHistogramSamples := 0
numExemplars := 0
for _, ts := range req.Timeseries {
numSamples += len(ts.Samples) + len(ts.Histograms)
numFloatSamples += len(ts.Samples)
numHistogramSamples += len(ts.Histograms)
numExemplars += len(ts.Exemplars)
}
// Count the total samples, exemplars in, prior to validation or deduplication, for comparison with other metrics.
d.incomingSamples.WithLabelValues(userID).Add(float64(numSamples))
d.incomingSamples.WithLabelValues(userID, sampleMetricTypeFloat).Add(float64(numFloatSamples))
d.incomingSamples.WithLabelValues(userID, sampleMetricTypeHistogram).Add(float64(numFloatSamples))
d.incomingExemplars.WithLabelValues(userID).Add(float64(numExemplars))
// Count the total number of metadata in.
d.incomingMetadata.WithLabelValues(userID).Add(float64(len(req.Metadata)))
Expand Down Expand Up @@ -642,31 +655,32 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co

if errors.Is(err, ha.ReplicasNotMatchError{}) {
// These samples have been deduped.
d.dedupedSamples.WithLabelValues(userID, cluster).Add(float64(numSamples))
d.dedupedSamples.WithLabelValues(userID, cluster).Add(float64(numFloatSamples + numHistogramSamples))
return nil, httpgrpc.Errorf(http.StatusAccepted, err.Error())
}

if errors.Is(err, ha.TooManyReplicaGroupsError{}) {
d.validateMetrics.DiscardedSamples.WithLabelValues(validation.TooManyHAClusters, userID).Add(float64(numSamples))
d.validateMetrics.DiscardedSamples.WithLabelValues(validation.TooManyHAClusters, userID).Add(float64(numFloatSamples + numHistogramSamples))
return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error())
}

return nil, err
}
// If there wasn't an error but removeReplica is false that means we didn't find both HA labels.
if !removeReplica {
d.nonHASamples.WithLabelValues(userID).Add(float64(numSamples))
d.nonHASamples.WithLabelValues(userID).Add(float64(numFloatSamples + numHistogramSamples))
}
}

// A WriteRequest can only contain series or metadata but not both. This might change in the future.
seriesKeys, validatedTimeseries, validatedSamples, validatedExemplars, firstPartialErr, err := d.prepareSeriesKeys(ctx, req, userID, limits, removeReplica)
seriesKeys, validatedTimeseries, validatedFloatSamples, validatedHistogramSamples, validatedExemplars, firstPartialErr, err := d.prepareSeriesKeys(ctx, req, userID, limits, removeReplica)
if err != nil {
return nil, err
}
metadataKeys, validatedMetadata, firstPartialErr := d.prepareMetadataKeys(req, limits, userID, firstPartialErr)

d.receivedSamples.WithLabelValues(userID).Add(float64(validatedSamples))
d.receivedSamples.WithLabelValues(userID, sampleMetricTypeFloat).Add(float64(validatedFloatSamples))
d.receivedSamples.WithLabelValues(userID, sampleMetricTypeHistogram).Add(float64(validatedHistogramSamples))
d.receivedExemplars.WithLabelValues(userID).Add(float64(validatedExemplars))
d.receivedMetadata.WithLabelValues(userID).Add(float64(len(validatedMetadata)))

Expand All @@ -677,18 +691,19 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co
return &cortexpb.WriteResponse{}, firstPartialErr
}

totalN := validatedSamples + validatedExemplars + len(validatedMetadata)
totalSamples := validatedFloatSamples + validatedHistogramSamples
totalN := totalSamples + validatedExemplars + len(validatedMetadata)
if !d.ingestionRateLimiter.AllowN(now, userID, totalN) {
// Ensure the request slice is reused if the request is rate limited.
cortexpb.ReuseSlice(req.Timeseries)

d.validateMetrics.DiscardedSamples.WithLabelValues(validation.RateLimited, userID).Add(float64(validatedSamples))
d.validateMetrics.DiscardedSamples.WithLabelValues(validation.RateLimited, userID).Add(float64(totalSamples))
d.validateMetrics.DiscardedExemplars.WithLabelValues(validation.RateLimited, userID).Add(float64(validatedExemplars))
d.validateMetrics.DiscardedMetadata.WithLabelValues(validation.RateLimited, userID).Add(float64(len(validatedMetadata)))
// Return a 429 here to tell the client it is going too fast.
// Client may discard the data or slow down and re-send.
// Prometheus v2.26 added a remote-write option 'retry_on_http_429'.
return nil, httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (%v) exceeded while adding %d samples and %d metadata", d.ingestionRateLimiter.Limit(now, userID), validatedSamples, len(validatedMetadata))
return nil, httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (%v) exceeded while adding %d samples and %d metadata", d.ingestionRateLimiter.Limit(now, userID), totalSamples, len(validatedMetadata))
}

// totalN included samples and metadata. Ingester follows this pattern when computing its ingestion rate.
Expand Down Expand Up @@ -810,15 +825,16 @@ func (d *Distributor) prepareMetadataKeys(req *cortexpb.WriteRequest, limits *va
return metadataKeys, validatedMetadata, firstPartialErr
}

func (d *Distributor) prepareSeriesKeys(ctx context.Context, req *cortexpb.WriteRequest, userID string, limits *validation.Limits, removeReplica bool) ([]uint32, []cortexpb.PreallocTimeseries, int, int, error, error) {
func (d *Distributor) prepareSeriesKeys(ctx context.Context, req *cortexpb.WriteRequest, userID string, limits *validation.Limits, removeReplica bool) ([]uint32, []cortexpb.PreallocTimeseries, int, int, int, error, error) {
pSpan, _ := opentracing.StartSpanFromContext(ctx, "prepareSeriesKeys")
defer pSpan.Finish()

// For each timeseries or samples, we compute a hash to distribute across ingesters;
// check each sample/metadata and discard if outside limits.
validatedTimeseries := make([]cortexpb.PreallocTimeseries, 0, len(req.Timeseries))
seriesKeys := make([]uint32, 0, len(req.Timeseries))
validatedSamples := 0
validatedFloatSamples := 0
validatedHistogramSamples := 0
validatedExemplars := 0

var firstPartialErr error
Expand All @@ -839,7 +855,9 @@ func (d *Distributor) prepareSeriesKeys(ctx context.Context, req *cortexpb.Write
if len(ts.Samples) > 0 {
latestSampleTimestampMs = max(latestSampleTimestampMs, ts.Samples[len(ts.Samples)-1].TimestampMs)
}
// TODO(yeya24): use timestamp of the latest native histogram in the series as well.
if len(ts.Histograms) > 0 {
latestSampleTimestampMs = max(latestSampleTimestampMs, ts.Histograms[len(ts.Histograms)-1].TimestampMs)
}

if mrc := limits.MetricRelabelConfigs; len(mrc) > 0 {
l, _ := relabel.Process(cortexpb.FromLabelAdaptersToLabels(ts.Labels), mrc...)
Expand Down Expand Up @@ -885,7 +903,7 @@ func (d *Distributor) prepareSeriesKeys(ctx context.Context, req *cortexpb.Write
// label and dropped labels (if any)
key, err := d.tokenForLabels(userID, ts.Labels)
if err != nil {
return nil, nil, 0, 0, nil, err
return nil, nil, 0, 0, 0, nil, err
}
validatedSeries, validationErr := d.validateSeries(ts, userID, skipLabelNameValidation, limits)

Expand All @@ -904,11 +922,11 @@ func (d *Distributor) prepareSeriesKeys(ctx context.Context, req *cortexpb.Write

seriesKeys = append(seriesKeys, key)
validatedTimeseries = append(validatedTimeseries, validatedSeries)
// TODO(yeya24): add histogram samples as well when supported.
validatedSamples += len(ts.Samples)
validatedFloatSamples += len(ts.Samples)
validatedHistogramSamples += len(ts.Histograms)
validatedExemplars += len(ts.Exemplars)
}
return seriesKeys, validatedTimeseries, validatedSamples, validatedExemplars, firstPartialErr, nil
return seriesKeys, validatedTimeseries, validatedFloatSamples, validatedHistogramSamples, validatedExemplars, firstPartialErr, nil
}

func sortLabelsIfNeeded(labels []cortexpb.LabelAdapter) {
Expand Down
Loading
Loading