diff --git a/pkg/distributor/distributor_test.go b/pkg/distributor/distributor_test.go index cdd9ed0afc1..139c24fbb0f 100644 --- a/pkg/distributor/distributor_test.go +++ b/pkg/distributor/distributor_test.go @@ -135,6 +135,7 @@ func TestDistributor_Push(t *testing.T) { lastSeenTimestamp := "cortex_distributor_latest_seen_sample_timestamp_seconds" distributorAppend := "cortex_distributor_ingester_appends_total" distributorAppendFailure := "cortex_distributor_ingester_append_failures_total" + distributorReceivedSamples := "cortex_distributor_received_samples_total" ctx := user.InjectOrgID(context.Background(), "userDistributorPush") type samplesIn struct { @@ -146,6 +147,7 @@ func TestDistributor_Push(t *testing.T) { numIngesters int happyIngesters int samples samplesIn + histogramSamples bool metadata int expectedResponse *cortexpb.WriteResponse expectedError error @@ -276,6 +278,77 @@ func TestDistributor_Push(t *testing.T) { cortex_distributor_ingester_appends_total{ingester="2",type="metadata"} 1 `, }, + "A push to 3 happy ingesters should succeed, histograms": { + numIngesters: 3, + happyIngesters: 3, + samples: samplesIn{num: 5, startTimestampMs: 123456789000}, + histogramSamples: true, + metadata: 5, + expectedResponse: emptyResponse, + metricNames: []string{lastSeenTimestamp, distributorReceivedSamples}, + expectedMetrics: ` + # HELP cortex_distributor_latest_seen_sample_timestamp_seconds Unix timestamp of latest received sample per user. + # TYPE cortex_distributor_latest_seen_sample_timestamp_seconds gauge + cortex_distributor_latest_seen_sample_timestamp_seconds{user="userDistributorPush"} 123456789.004 + # HELP cortex_distributor_received_samples_total The total number of received samples, excluding rejected and deduped samples. + # TYPE cortex_distributor_received_samples_total counter + cortex_distributor_received_samples_total{type="float",user="userDistributorPush"} 0 + cortex_distributor_received_samples_total{type="histogram",user="userDistributorPush"} 5 + `, + }, + "A push to 2 happy ingesters should succeed, histograms": { + numIngesters: 3, + happyIngesters: 2, + samples: samplesIn{num: 5, startTimestampMs: 123456789000}, + histogramSamples: true, + metadata: 5, + expectedResponse: emptyResponse, + metricNames: []string{lastSeenTimestamp, distributorReceivedSamples}, + expectedMetrics: ` + # HELP cortex_distributor_latest_seen_sample_timestamp_seconds Unix timestamp of latest received sample per user. + # TYPE cortex_distributor_latest_seen_sample_timestamp_seconds gauge + cortex_distributor_latest_seen_sample_timestamp_seconds{user="userDistributorPush"} 123456789.004 + # HELP cortex_distributor_received_samples_total The total number of received samples, excluding rejected and deduped samples. + # TYPE cortex_distributor_received_samples_total counter + cortex_distributor_received_samples_total{type="float",user="userDistributorPush"} 0 + cortex_distributor_received_samples_total{type="histogram",user="userDistributorPush"} 5 + `, + }, + "A push to 1 happy ingesters should fail, histograms": { + numIngesters: 3, + happyIngesters: 1, + samples: samplesIn{num: 10, startTimestampMs: 123456789000}, + histogramSamples: true, + expectedError: errFail, + metricNames: []string{lastSeenTimestamp, distributorReceivedSamples}, + expectedMetrics: ` + # HELP cortex_distributor_latest_seen_sample_timestamp_seconds Unix timestamp of latest received sample per user. + # TYPE cortex_distributor_latest_seen_sample_timestamp_seconds gauge + cortex_distributor_latest_seen_sample_timestamp_seconds{user="userDistributorPush"} 123456789.009 + # HELP cortex_distributor_received_samples_total The total number of received samples, excluding rejected and deduped samples. + # TYPE cortex_distributor_received_samples_total counter + cortex_distributor_received_samples_total{type="float",user="userDistributorPush"} 0 + cortex_distributor_received_samples_total{type="histogram",user="userDistributorPush"} 10 + `, + }, + "A push exceeding burst size should fail, histograms": { + numIngesters: 3, + happyIngesters: 3, + samples: samplesIn{num: 25, startTimestampMs: 123456789000}, + histogramSamples: true, + metadata: 5, + expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (20) exceeded while adding 25 samples and 5 metadata"), + metricNames: []string{lastSeenTimestamp, distributorReceivedSamples}, + expectedMetrics: ` + # HELP cortex_distributor_latest_seen_sample_timestamp_seconds Unix timestamp of latest received sample per user. + # TYPE cortex_distributor_latest_seen_sample_timestamp_seconds gauge + cortex_distributor_latest_seen_sample_timestamp_seconds{user="userDistributorPush"} 123456789.024 + # HELP cortex_distributor_received_samples_total The total number of received samples, excluding rejected and deduped samples. + # TYPE cortex_distributor_received_samples_total counter + cortex_distributor_received_samples_total{type="float",user="userDistributorPush"} 0 + cortex_distributor_received_samples_total{type="histogram",user="userDistributorPush"} 25 + `, + }, } { for _, shardByAllLabels := range []bool{true, false} { tc := tc @@ -297,7 +370,12 @@ func TestDistributor_Push(t *testing.T) { errFail: tc.ingesterError, }) - request := makeWriteRequest(tc.samples.startTimestampMs, tc.samples.num, tc.metadata) + var request *cortexpb.WriteRequest + if !tc.histogramSamples { + request = makeWriteRequest(tc.samples.startTimestampMs, tc.samples.num, tc.metadata, 0) + } else { + request = makeWriteRequest(tc.samples.startTimestampMs, 0, tc.metadata, tc.samples.num) + } response, err := ds[0].Push(ctx, request) assert.Equal(t, tc.expectedResponse, response) assert.Equal(t, status.Code(tc.expectedError), status.Code(err)) @@ -554,37 +632,45 @@ func TestDistributor_PushIngestionRateLimiter(t *testing.T) { for testName, testData := range tests { testData := testData - t.Run(testName, func(t *testing.T) { - t.Parallel() - limits := &validation.Limits{} - flagext.DefaultValues(limits) - limits.IngestionRateStrategy = testData.ingestionRateStrategy - limits.IngestionRate = testData.ingestionRate - limits.IngestionBurstSize = testData.ingestionBurstSize - - // Start all expected distributors - distributors, _, _, _ := prepare(t, prepConfig{ - numIngesters: 3, - happyIngesters: 3, - numDistributors: testData.distributors, - shardByAllLabels: true, - limits: limits, - }) + for _, enableHistogram := range []bool{false, true} { + enableHistogram := enableHistogram + t.Run(fmt.Sprintf("%s, histogram=%s", testName, strconv.FormatBool(enableHistogram)), func(t *testing.T) { + t.Parallel() + limits := &validation.Limits{} + flagext.DefaultValues(limits) + limits.IngestionRateStrategy = testData.ingestionRateStrategy + limits.IngestionRate = testData.ingestionRate + limits.IngestionBurstSize = testData.ingestionBurstSize - // Push samples in multiple requests to the first distributor - for _, push := range testData.pushes { - request := makeWriteRequest(0, push.samples, push.metadata) - response, err := distributors[0].Push(ctx, request) + // Start all expected distributors + distributors, _, _, _ := prepare(t, prepConfig{ + numIngesters: 3, + happyIngesters: 3, + numDistributors: testData.distributors, + shardByAllLabels: true, + limits: limits, + }) - if push.expectedError == nil { - assert.Equal(t, emptyResponse, response) - assert.Nil(t, err) - } else { - assert.Nil(t, response) - assert.Equal(t, push.expectedError, err) + // Push samples in multiple requests to the first distributor + for _, push := range testData.pushes { + var request *cortexpb.WriteRequest + if !enableHistogram { + request = makeWriteRequest(0, push.samples, push.metadata, 0) + } else { + request = makeWriteRequest(0, 0, push.metadata, push.samples) + } + response, err := distributors[0].Push(ctx, request) + + if push.expectedError == nil { + assert.Equal(t, emptyResponse, response) + assert.Nil(t, err) + } else { + assert.Nil(t, response) + assert.Equal(t, push.expectedError, err) + } } - } - }) + }) + } } } @@ -620,7 +706,7 @@ func TestPush_QuorumError(t *testing.T) { ingesters[2].failResp.Store(httpgrpc.Errorf(429, "Throttling")) for i := 0; i < numberOfWrites; i++ { - request := makeWriteRequest(0, 30, 20) + request := makeWriteRequest(0, 30, 20, 10) _, err := d.Push(ctx, request) status, ok := status.FromError(err) require.True(t, ok) @@ -633,7 +719,7 @@ func TestPush_QuorumError(t *testing.T) { ingesters[2].failResp.Store(httpgrpc.Errorf(500, "InternalServerError")) for i := 0; i < numberOfWrites; i++ { - request := makeWriteRequest(0, 300, 200) + request := makeWriteRequest(0, 300, 200, 10) _, err := d.Push(ctx, request) status, ok := status.FromError(err) require.True(t, ok) @@ -646,7 +732,7 @@ func TestPush_QuorumError(t *testing.T) { ingesters[2].happy.Store(true) for i := 0; i < numberOfWrites; i++ { - request := makeWriteRequest(0, 30, 20) + request := makeWriteRequest(0, 30, 20, 10) _, err := d.Push(ctx, request) status, ok := status.FromError(err) require.True(t, ok) @@ -659,7 +745,7 @@ func TestPush_QuorumError(t *testing.T) { ingesters[2].happy.Store(true) for i := 0; i < 1; i++ { - request := makeWriteRequest(0, 30, 20) + request := makeWriteRequest(0, 30, 20, 10) _, err := d.Push(ctx, request) require.NoError(t, err) } @@ -690,7 +776,7 @@ func TestPush_QuorumError(t *testing.T) { } for i := 0; i < numberOfWrites; i++ { - request := makeWriteRequest(0, 30, 20) + request := makeWriteRequest(0, 30, 20, 10) _, err := d.Push(ctx, request) require.Error(t, err) status, ok := status.FromError(err) @@ -809,45 +895,53 @@ func TestDistributor_PushInstanceLimits(t *testing.T) { for testName, testData := range tests { testData := testData - t.Run(testName, func(t *testing.T) { - t.Parallel() - limits := &validation.Limits{} - flagext.DefaultValues(limits) + for _, enableHistogram := range []bool{true, false} { + enableHistogram := enableHistogram + t.Run(fmt.Sprintf("%s, histogram=%s", testName, strconv.FormatBool(enableHistogram)), func(t *testing.T) { + t.Parallel() + limits := &validation.Limits{} + flagext.DefaultValues(limits) - // Start all expected distributors - distributors, _, regs, _ := prepare(t, prepConfig{ - numIngesters: 3, - happyIngesters: 3, - numDistributors: 1, - shardByAllLabels: true, - limits: limits, - maxInflightRequests: testData.inflightLimit, - maxIngestionRate: testData.ingestionRateLimit, - }) + // Start all expected distributors + distributors, _, regs, _ := prepare(t, prepConfig{ + numIngesters: 3, + happyIngesters: 3, + numDistributors: 1, + shardByAllLabels: true, + limits: limits, + maxInflightRequests: testData.inflightLimit, + maxIngestionRate: testData.ingestionRateLimit, + }) - d := distributors[0] - d.inflightPushRequests.Add(int64(testData.preInflight)) - d.ingestionRate.Add(int64(testData.preRateSamples)) + d := distributors[0] + d.inflightPushRequests.Add(int64(testData.preInflight)) + d.ingestionRate.Add(int64(testData.preRateSamples)) - d.ingestionRate.Tick() + d.ingestionRate.Tick() - for _, push := range testData.pushes { - request := makeWriteRequest(0, push.samples, push.metadata) - _, err := d.Push(ctx, request) + for _, push := range testData.pushes { + var request *cortexpb.WriteRequest + if enableHistogram { + request = makeWriteRequest(0, 0, push.metadata, push.samples) + } else { + request = makeWriteRequest(0, push.samples, push.metadata, 0) + } + _, err := d.Push(ctx, request) - if push.expectedError == nil { - assert.Nil(t, err) - } else { - assert.Equal(t, push.expectedError, err) - } + if push.expectedError == nil { + assert.Nil(t, err) + } else { + assert.Equal(t, push.expectedError, err) + } - d.ingestionRate.Tick() + d.ingestionRate.Tick() - if testData.expectedMetrics != "" { - assert.NoError(t, testutil.GatherAndCompare(regs[0], strings.NewReader(testData.expectedMetrics), testData.metricNames...)) + if testData.expectedMetrics != "" { + assert.NoError(t, testutil.GatherAndCompare(regs[0], strings.NewReader(testData.expectedMetrics), testData.metricNames...)) + } } - } - }) + }) + } } } @@ -904,40 +998,43 @@ func TestDistributor_PushHAInstances(t *testing.T) { for _, shardByAllLabels := range []bool{true, false} { tc := tc shardByAllLabels := shardByAllLabels - t.Run(fmt.Sprintf("[%d](shardByAllLabels=%v)", i, shardByAllLabels), func(t *testing.T) { - t.Parallel() - var limits validation.Limits - flagext.DefaultValues(&limits) - limits.AcceptHASamples = true - limits.MaxLabelValueLength = 15 - - ds, _, _, _ := prepare(t, prepConfig{ - numIngesters: 3, - happyIngesters: 3, - numDistributors: 1, - shardByAllLabels: shardByAllLabels, - limits: &limits, - enableTracker: tc.enableTracker, - }) + for _, enableHistogram := range []bool{true, false} { + enableHistogram := enableHistogram + t.Run(fmt.Sprintf("[%d](shardByAllLabels=%v, histogram=%v)", i, shardByAllLabels, enableHistogram), func(t *testing.T) { + t.Parallel() + var limits validation.Limits + flagext.DefaultValues(&limits) + limits.AcceptHASamples = true + limits.MaxLabelValueLength = 15 + + ds, _, _, _ := prepare(t, prepConfig{ + numIngesters: 3, + happyIngesters: 3, + numDistributors: 1, + shardByAllLabels: shardByAllLabels, + limits: &limits, + enableTracker: tc.enableTracker, + }) - d := ds[0] + d := ds[0] - userID, err := tenant.TenantID(ctx) - assert.NoError(t, err) - err = d.HATracker.CheckReplica(ctx, userID, tc.cluster, tc.acceptedReplica, time.Now()) - assert.NoError(t, err) + userID, err := tenant.TenantID(ctx) + assert.NoError(t, err) + err = d.HATracker.CheckReplica(ctx, userID, tc.cluster, tc.acceptedReplica, time.Now()) + assert.NoError(t, err) - request := makeWriteRequestHA(tc.samples, tc.testReplica, tc.cluster) - response, err := d.Push(ctx, request) - assert.Equal(t, tc.expectedResponse, response) + request := makeWriteRequestHA(tc.samples, tc.testReplica, tc.cluster, enableHistogram) + response, err := d.Push(ctx, request) + assert.Equal(t, tc.expectedResponse, response) - httpResp, ok := httpgrpc.HTTPResponseFromError(err) - if ok { - assert.Equal(t, tc.expectedCode, httpResp.Code) - } else if tc.expectedCode != 0 { - assert.Fail(t, "expected HTTP status code", tc.expectedCode) - } - }) + httpResp, ok := httpgrpc.HTTPResponseFromError(err) + if ok { + assert.Equal(t, tc.expectedCode, httpResp.Code) + } else if tc.expectedCode != 0 { + assert.Fail(t, "expected HTTP status code", tc.expectedCode) + } + }) + } } } } @@ -1087,7 +1184,7 @@ func TestDistributor_PushQuery(t *testing.T) { shuffleShardSize: shuffleShardSize, }) - request := makeWriteRequest(0, tc.samples, tc.metadata) + request := makeWriteRequest(0, tc.samples, tc.metadata, 0) writeResponse, err := ds[0].Push(ctx, request) assert.Equal(t, &cortexpb.WriteResponse{}, writeResponse) assert.Nil(t, err) @@ -1119,250 +1216,277 @@ func TestDistributor_QueryStream_ShouldReturnErrorIfMaxChunksPerQueryLimitIsReac t.Parallel() const maxChunksLimit = 30 // Chunks are duplicated due to replication factor. - ctx := user.InjectOrgID(context.Background(), "user") - limits := &validation.Limits{} - flagext.DefaultValues(limits) - limits.MaxChunksPerQuery = maxChunksLimit + for _, histogram := range []bool{true, false} { + ctx := user.InjectOrgID(context.Background(), "user") + limits := &validation.Limits{} + flagext.DefaultValues(limits) + limits.MaxChunksPerQuery = maxChunksLimit - // Prepare distributors. - ds, _, _, _ := prepare(t, prepConfig{ - numIngesters: 3, - happyIngesters: 3, - numDistributors: 1, - shardByAllLabels: true, - limits: limits, - }) + // Prepare distributors. + ds, _, _, _ := prepare(t, prepConfig{ + numIngesters: 3, + happyIngesters: 3, + numDistributors: 1, + shardByAllLabels: true, + limits: limits, + }) - ctx = limiter.AddQueryLimiterToContext(ctx, limiter.NewQueryLimiter(0, 0, maxChunksLimit, 0)) + ctx = limiter.AddQueryLimiterToContext(ctx, limiter.NewQueryLimiter(0, 0, maxChunksLimit, 0)) - // Push a number of series below the max chunks limit. Each series has 1 sample, - // so expect 1 chunk per series when querying back. - initialSeries := maxChunksLimit / 3 - writeReq := makeWriteRequest(0, initialSeries, 0) - writeRes, err := ds[0].Push(ctx, writeReq) - assert.Equal(t, &cortexpb.WriteResponse{}, writeRes) - assert.Nil(t, err) + // Push a number of series below the max chunks limit. Each series has 1 sample, + // so expect 1 chunk per series when querying back. + initialSeries := maxChunksLimit / 3 + var writeReq *cortexpb.WriteRequest + if histogram { + writeReq = makeWriteRequest(0, 0, 0, initialSeries) + } else { + writeReq = makeWriteRequest(0, initialSeries, 0, 0) + } + writeRes, err := ds[0].Push(ctx, writeReq) + assert.Equal(t, &cortexpb.WriteResponse{}, writeRes) + assert.Nil(t, err) - allSeriesMatchers := []*labels.Matcher{ - labels.MustNewMatcher(labels.MatchRegexp, model.MetricNameLabel, ".+"), - } + allSeriesMatchers := []*labels.Matcher{ + labels.MustNewMatcher(labels.MatchRegexp, model.MetricNameLabel, ".+"), + } - // Since the number of series (and thus chunks) is equal to the limit (but doesn't - // exceed it), we expect a query running on all series to succeed. - queryRes, err := ds[0].QueryStream(ctx, math.MinInt32, math.MaxInt32, allSeriesMatchers...) - require.NoError(t, err) - assert.Len(t, queryRes.Chunkseries, initialSeries) + // Since the number of series (and thus chunks) is equal to the limit (but doesn't + // exceed it), we expect a query running on all series to succeed. + queryRes, err := ds[0].QueryStream(ctx, math.MinInt32, math.MaxInt32, allSeriesMatchers...) + require.NoError(t, err) + assert.Len(t, queryRes.Chunkseries, initialSeries) - // Push more series to exceed the limit once we'll query back all series. - writeReq = &cortexpb.WriteRequest{} - for i := 0; i < maxChunksLimit; i++ { - writeReq.Timeseries = append(writeReq.Timeseries, - makeWriteRequestTimeseries([]cortexpb.LabelAdapter{{Name: model.MetricNameLabel, Value: fmt.Sprintf("another_series_%d", i)}}, 0, 0), - ) - } + // Push more series to exceed the limit once we'll query back all series. + writeReq = &cortexpb.WriteRequest{} + for i := 0; i < maxChunksLimit; i++ { + writeReq.Timeseries = append(writeReq.Timeseries, + makeWriteRequestTimeseries([]cortexpb.LabelAdapter{{Name: model.MetricNameLabel, Value: fmt.Sprintf("another_series_%d", i)}}, 0, 0, histogram), + ) + } - writeRes, err = ds[0].Push(ctx, writeReq) - assert.Equal(t, &cortexpb.WriteResponse{}, writeRes) - assert.Nil(t, err) + writeRes, err = ds[0].Push(ctx, writeReq) + assert.Equal(t, &cortexpb.WriteResponse{}, writeRes) + assert.Nil(t, err) - // Since the number of series (and thus chunks) is exceeding to the limit, we expect - // a query running on all series to fail. - _, err = ds[0].QueryStream(ctx, math.MinInt32, math.MaxInt32, allSeriesMatchers...) - require.Error(t, err) - assert.Contains(t, err.Error(), "the query hit the max number of chunks limit") + // Since the number of series (and thus chunks) is exceeding to the limit, we expect + // a query running on all series to fail. + _, err = ds[0].QueryStream(ctx, math.MinInt32, math.MaxInt32, allSeriesMatchers...) + require.Error(t, err) + assert.Contains(t, err.Error(), "the query hit the max number of chunks limit") + } } func TestDistributor_QueryStream_ShouldReturnErrorIfMaxSeriesPerQueryLimitIsReached(t *testing.T) { t.Parallel() const maxSeriesLimit = 10 - ctx := user.InjectOrgID(context.Background(), "user") - limits := &validation.Limits{} - flagext.DefaultValues(limits) - ctx = limiter.AddQueryLimiterToContext(ctx, limiter.NewQueryLimiter(maxSeriesLimit, 0, 0, 0)) + for _, histogram := range []bool{true, false} { + ctx := user.InjectOrgID(context.Background(), "user") + limits := &validation.Limits{} + flagext.DefaultValues(limits) + ctx = limiter.AddQueryLimiterToContext(ctx, limiter.NewQueryLimiter(maxSeriesLimit, 0, 0, 0)) - // Prepare distributors. - ds, _, _, _ := prepare(t, prepConfig{ - numIngesters: 3, - happyIngesters: 3, - numDistributors: 1, - shardByAllLabels: true, - limits: limits, - }) + // Prepare distributors. + ds, _, _, _ := prepare(t, prepConfig{ + numIngesters: 3, + happyIngesters: 3, + numDistributors: 1, + shardByAllLabels: true, + limits: limits, + }) - // Push a number of series below the max series limit. - initialSeries := maxSeriesLimit - writeReq := makeWriteRequest(0, initialSeries, 0) - writeRes, err := ds[0].Push(ctx, writeReq) - assert.Equal(t, &cortexpb.WriteResponse{}, writeRes) - assert.Nil(t, err) + // Push a number of series below the max series limit. + initialSeries := maxSeriesLimit + var writeReq *cortexpb.WriteRequest + if histogram { + writeReq = makeWriteRequest(0, 0, 0, initialSeries) + } else { + writeReq = makeWriteRequest(0, initialSeries, 0, 0) + } - allSeriesMatchers := []*labels.Matcher{ - labels.MustNewMatcher(labels.MatchRegexp, model.MetricNameLabel, ".+"), - } + writeRes, err := ds[0].Push(ctx, writeReq) + assert.Equal(t, &cortexpb.WriteResponse{}, writeRes) + assert.Nil(t, err) - // Since the number of series is equal to the limit (but doesn't - // exceed it), we expect a query running on all series to succeed. - queryRes, err := ds[0].QueryStream(ctx, math.MinInt32, math.MaxInt32, allSeriesMatchers...) - require.NoError(t, err) - assert.Len(t, queryRes.Chunkseries, initialSeries) + allSeriesMatchers := []*labels.Matcher{ + labels.MustNewMatcher(labels.MatchRegexp, model.MetricNameLabel, ".+"), + } - // Push more series to exceed the limit once we'll query back all series. - writeReq = &cortexpb.WriteRequest{} - writeReq.Timeseries = append(writeReq.Timeseries, - makeWriteRequestTimeseries([]cortexpb.LabelAdapter{{Name: model.MetricNameLabel, Value: "another_series"}}, 0, 0), - ) + // Since the number of series is equal to the limit (but doesn't + // exceed it), we expect a query running on all series to succeed. + queryRes, err := ds[0].QueryStream(ctx, math.MinInt32, math.MaxInt32, allSeriesMatchers...) + require.NoError(t, err) + assert.Len(t, queryRes.Chunkseries, initialSeries) - writeRes, err = ds[0].Push(ctx, writeReq) - assert.Equal(t, &cortexpb.WriteResponse{}, writeRes) - assert.Nil(t, err) + // Push more series to exceed the limit once we'll query back all series. + writeReq = &cortexpb.WriteRequest{} + writeReq.Timeseries = append(writeReq.Timeseries, + makeWriteRequestTimeseries([]cortexpb.LabelAdapter{{Name: model.MetricNameLabel, Value: "another_series"}}, 0, 0, histogram), + ) - // Since the number of series is exceeding the limit, we expect - // a query running on all series to fail. - _, err = ds[0].QueryStream(ctx, math.MinInt32, math.MaxInt32, allSeriesMatchers...) - require.Error(t, err) - assert.Contains(t, err.Error(), "max number of series limit") + writeRes, err = ds[0].Push(ctx, writeReq) + assert.Equal(t, &cortexpb.WriteResponse{}, writeRes) + assert.Nil(t, err) + // Since the number of series is exceeding the limit, we expect + // a query running on all series to fail. + _, err = ds[0].QueryStream(ctx, math.MinInt32, math.MaxInt32, allSeriesMatchers...) + require.Error(t, err) + assert.Contains(t, err.Error(), "max number of series limit") + } } func TestDistributor_QueryStream_ShouldReturnErrorIfMaxChunkBytesPerQueryLimitIsReached(t *testing.T) { t.Parallel() const seriesToAdd = 10 - ctx := user.InjectOrgID(context.Background(), "user") - limits := &validation.Limits{} - flagext.DefaultValues(limits) - - // Prepare distributors. - // Use replication factor of 2 to always read all the chunks from both ingesters, - // this guarantees us to always read the same chunks and have a stable test. - ds, _, _, _ := prepare(t, prepConfig{ - numIngesters: 2, - happyIngesters: 2, - numDistributors: 1, - shardByAllLabels: true, - limits: limits, - replicationFactor: 2, - }) + for _, histogram := range []bool{true, false} { + ctx := user.InjectOrgID(context.Background(), "user") + limits := &validation.Limits{} + flagext.DefaultValues(limits) + + // Prepare distributors. + // Use replication factor of 2 to always read all the chunks from both ingesters, + // this guarantees us to always read the same chunks and have a stable test. + ds, _, _, _ := prepare(t, prepConfig{ + numIngesters: 2, + happyIngesters: 2, + numDistributors: 1, + shardByAllLabels: true, + limits: limits, + replicationFactor: 2, + }) - allSeriesMatchers := []*labels.Matcher{ - labels.MustNewMatcher(labels.MatchRegexp, model.MetricNameLabel, ".+"), - } - // Push a single series to allow us to calculate the chunk size to calculate the limit for the test. - writeReq := &cortexpb.WriteRequest{} - writeReq.Timeseries = append(writeReq.Timeseries, - makeWriteRequestTimeseries([]cortexpb.LabelAdapter{{Name: model.MetricNameLabel, Value: "another_series"}}, 0, 0), - ) - writeRes, err := ds[0].Push(ctx, writeReq) - assert.Equal(t, &cortexpb.WriteResponse{}, writeRes) - assert.Nil(t, err) - chunkSizeResponse, err := ds[0].QueryStream(ctx, math.MinInt32, math.MaxInt32, allSeriesMatchers...) - require.NoError(t, err) + allSeriesMatchers := []*labels.Matcher{ + labels.MustNewMatcher(labels.MatchRegexp, model.MetricNameLabel, ".+"), + } + // Push a single series to allow us to calculate the chunk size to calculate the limit for the test. + writeReq := &cortexpb.WriteRequest{} + writeReq.Timeseries = append(writeReq.Timeseries, + makeWriteRequestTimeseries([]cortexpb.LabelAdapter{{Name: model.MetricNameLabel, Value: "another_series"}}, 0, 0, histogram), + ) + writeRes, err := ds[0].Push(ctx, writeReq) + assert.Equal(t, &cortexpb.WriteResponse{}, writeRes) + assert.Nil(t, err) + chunkSizeResponse, err := ds[0].QueryStream(ctx, math.MinInt32, math.MaxInt32, allSeriesMatchers...) + require.NoError(t, err) - // Use the resulting chunks size to calculate the limit as (series to add + our test series) * the response chunk size. - var responseChunkSize = chunkSizeResponse.ChunksSize() - var maxBytesLimit = (seriesToAdd) * responseChunkSize + // Use the resulting chunks size to calculate the limit as (series to add + our test series) * the response chunk size. + var responseChunkSize = chunkSizeResponse.ChunksSize() + var maxBytesLimit = (seriesToAdd) * responseChunkSize - // Update the limiter with the calculated limits. - ctx = limiter.AddQueryLimiterToContext(ctx, limiter.NewQueryLimiter(0, maxBytesLimit, 0, 0)) + // Update the limiter with the calculated limits. + ctx = limiter.AddQueryLimiterToContext(ctx, limiter.NewQueryLimiter(0, maxBytesLimit, 0, 0)) - // Push a number of series below the max chunk bytes limit. Subtract one for the series added above. - writeReq = makeWriteRequest(0, seriesToAdd-1, 0) - writeRes, err = ds[0].Push(ctx, writeReq) - assert.Equal(t, &cortexpb.WriteResponse{}, writeRes) - assert.Nil(t, err) + // Push a number of series below the max chunk bytes limit. Subtract one for the series added above. + if histogram { + writeReq = makeWriteRequest(0, 0, 0, seriesToAdd-1) + } else { + writeReq = makeWriteRequest(0, seriesToAdd-1, 0, 0) + } + writeRes, err = ds[0].Push(ctx, writeReq) + assert.Equal(t, &cortexpb.WriteResponse{}, writeRes) + assert.Nil(t, err) - // Since the number of chunk bytes is equal to the limit (but doesn't - // exceed it), we expect a query running on all series to succeed. - queryRes, err := ds[0].QueryStream(ctx, math.MinInt32, math.MaxInt32, allSeriesMatchers...) - require.NoError(t, err) - assert.Len(t, queryRes.Chunkseries, seriesToAdd) + // Since the number of chunk bytes is equal to the limit (but doesn't + // exceed it), we expect a query running on all series to succeed. + queryRes, err := ds[0].QueryStream(ctx, math.MinInt32, math.MaxInt32, allSeriesMatchers...) + require.NoError(t, err) + assert.Len(t, queryRes.Chunkseries, seriesToAdd) - // Push another series to exceed the chunk bytes limit once we'll query back all series. - writeReq = &cortexpb.WriteRequest{} - writeReq.Timeseries = append(writeReq.Timeseries, - makeWriteRequestTimeseries([]cortexpb.LabelAdapter{{Name: model.MetricNameLabel, Value: "another_series_1"}}, 0, 0), - ) + // Push another series to exceed the chunk bytes limit once we'll query back all series. + writeReq = &cortexpb.WriteRequest{} + writeReq.Timeseries = append(writeReq.Timeseries, + makeWriteRequestTimeseries([]cortexpb.LabelAdapter{{Name: model.MetricNameLabel, Value: "another_series_1"}}, 0, 0, histogram), + ) - writeRes, err = ds[0].Push(ctx, writeReq) - assert.Equal(t, &cortexpb.WriteResponse{}, writeRes) - assert.Nil(t, err) + writeRes, err = ds[0].Push(ctx, writeReq) + assert.Equal(t, &cortexpb.WriteResponse{}, writeRes) + assert.Nil(t, err) - // Since the aggregated chunk size is exceeding the limit, we expect - // a query running on all series to fail. - _, err = ds[0].QueryStream(ctx, math.MinInt32, math.MaxInt32, allSeriesMatchers...) - require.Error(t, err) - assert.Equal(t, err, validation.LimitError(fmt.Sprintf(limiter.ErrMaxChunkBytesHit, maxBytesLimit))) + // Since the aggregated chunk size is exceeding the limit, we expect + // a query running on all series to fail. + _, err = ds[0].QueryStream(ctx, math.MinInt32, math.MaxInt32, allSeriesMatchers...) + require.Error(t, err) + assert.Equal(t, err, validation.LimitError(fmt.Sprintf(limiter.ErrMaxChunkBytesHit, maxBytesLimit))) + } } func TestDistributor_QueryStream_ShouldReturnErrorIfMaxDataBytesPerQueryLimitIsReached(t *testing.T) { t.Parallel() const seriesToAdd = 10 - ctx := user.InjectOrgID(context.Background(), "user") - limits := &validation.Limits{} - flagext.DefaultValues(limits) + for _, histogram := range []bool{true, false} { + ctx := user.InjectOrgID(context.Background(), "user") + limits := &validation.Limits{} + flagext.DefaultValues(limits) + + // Prepare distributors. + // Use replication factor of 2 to always read all the chunks from both ingesters, + // this guarantees us to always read the same chunks and have a stable test. + ds, _, _, _ := prepare(t, prepConfig{ + numIngesters: 2, + happyIngesters: 2, + numDistributors: 1, + shardByAllLabels: true, + limits: limits, + replicationFactor: 2, + }) - // Prepare distributors. - // Use replication factor of 2 to always read all the chunks from both ingesters, - // this guarantees us to always read the same chunks and have a stable test. - ds, _, _, _ := prepare(t, prepConfig{ - numIngesters: 2, - happyIngesters: 2, - numDistributors: 1, - shardByAllLabels: true, - limits: limits, - replicationFactor: 2, - }) + allSeriesMatchers := []*labels.Matcher{ + labels.MustNewMatcher(labels.MatchRegexp, model.MetricNameLabel, ".+"), + } + // Push a single series to allow us to calculate the label size to calculate the limit for the test. + writeReq := &cortexpb.WriteRequest{} + writeReq.Timeseries = append(writeReq.Timeseries, + makeWriteRequestTimeseries([]cortexpb.LabelAdapter{{Name: model.MetricNameLabel, Value: "another_series"}}, 0, 0, histogram), + ) - allSeriesMatchers := []*labels.Matcher{ - labels.MustNewMatcher(labels.MatchRegexp, model.MetricNameLabel, ".+"), - } - // Push a single series to allow us to calculate the label size to calculate the limit for the test. - writeReq := &cortexpb.WriteRequest{} - writeReq.Timeseries = append(writeReq.Timeseries, - makeWriteRequestTimeseries([]cortexpb.LabelAdapter{{Name: model.MetricNameLabel, Value: "another_series"}}, 0, 0), - ) - writeRes, err := ds[0].Push(ctx, writeReq) - assert.Equal(t, &cortexpb.WriteResponse{}, writeRes) - assert.Nil(t, err) - dataSizeResponse, err := ds[0].QueryStream(ctx, math.MinInt32, math.MaxInt32, allSeriesMatchers...) - require.NoError(t, err) + writeRes, err := ds[0].Push(ctx, writeReq) + assert.Equal(t, &cortexpb.WriteResponse{}, writeRes) + assert.Nil(t, err) + dataSizeResponse, err := ds[0].QueryStream(ctx, math.MinInt32, math.MaxInt32, allSeriesMatchers...) + require.NoError(t, err) - // Use the resulting chunks size to calculate the limit as (series to add + our test series) * the response chunk size. - var dataSize = dataSizeResponse.Size() - var maxBytesLimit = (seriesToAdd) * dataSize * 2 // Multiplying by RF because the limit is applied before de-duping. + // Use the resulting chunks size to calculate the limit as (series to add + our test series) * the response chunk size. + var dataSize = dataSizeResponse.Size() + var maxBytesLimit = (seriesToAdd) * dataSize * 2 // Multiplying by RF because the limit is applied before de-duping. - // Update the limiter with the calculated limits. - ctx = limiter.AddQueryLimiterToContext(ctx, limiter.NewQueryLimiter(0, 0, 0, maxBytesLimit)) + // Update the limiter with the calculated limits. + ctx = limiter.AddQueryLimiterToContext(ctx, limiter.NewQueryLimiter(0, 0, 0, maxBytesLimit)) - // Push a number of series below the max chunk bytes limit. Subtract one for the series added above. - writeReq = makeWriteRequest(0, seriesToAdd-1, 0) - writeRes, err = ds[0].Push(ctx, writeReq) - assert.Equal(t, &cortexpb.WriteResponse{}, writeRes) - assert.Nil(t, err) + // Push a number of series below the max chunk bytes limit. Subtract one for the series added above. + if histogram { + writeReq = makeWriteRequest(0, 0, 0, seriesToAdd-1) + } else { + writeReq = makeWriteRequest(0, seriesToAdd-1, 0, 0) + } + writeRes, err = ds[0].Push(ctx, writeReq) + assert.Equal(t, &cortexpb.WriteResponse{}, writeRes) + assert.Nil(t, err) - // Since the number of chunk bytes is equal to the limit (but doesn't - // exceed it), we expect a query running on all series to succeed. - queryRes, err := ds[0].QueryStream(ctx, math.MinInt32, math.MaxInt32, allSeriesMatchers...) - require.NoError(t, err) - assert.Len(t, queryRes.Chunkseries, seriesToAdd) + // Since the number of chunk bytes is equal to the limit (but doesn't + // exceed it), we expect a query running on all series to succeed. + queryRes, err := ds[0].QueryStream(ctx, math.MinInt32, math.MaxInt32, allSeriesMatchers...) + require.NoError(t, err) + assert.Len(t, queryRes.Chunkseries, seriesToAdd) - // Push another series to exceed the chunk bytes limit once we'll query back all series. - writeReq = &cortexpb.WriteRequest{} - writeReq.Timeseries = append(writeReq.Timeseries, - makeWriteRequestTimeseries([]cortexpb.LabelAdapter{{Name: model.MetricNameLabel, Value: "another_series_1"}}, 0, 0), - ) + // Push another series to exceed the chunk bytes limit once we'll query back all series. + writeReq = &cortexpb.WriteRequest{} + writeReq.Timeseries = append(writeReq.Timeseries, + makeWriteRequestTimeseries([]cortexpb.LabelAdapter{{Name: model.MetricNameLabel, Value: "another_series_1"}}, 0, 0, histogram), + ) - writeRes, err = ds[0].Push(ctx, writeReq) - assert.Equal(t, &cortexpb.WriteResponse{}, writeRes) - assert.Nil(t, err) + writeRes, err = ds[0].Push(ctx, writeReq) + assert.Equal(t, &cortexpb.WriteResponse{}, writeRes) + assert.Nil(t, err) - // Since the aggregated chunk size is exceeding the limit, we expect - // a query running on all series to fail. - _, err = ds[0].QueryStream(ctx, math.MinInt32, math.MaxInt32, allSeriesMatchers...) - require.Error(t, err) - assert.Equal(t, err, validation.LimitError(fmt.Sprintf(limiter.ErrMaxDataBytesHit, maxBytesLimit))) + // Since the aggregated chunk size is exceeding the limit, we expect + // a query running on all series to fail. + _, err = ds[0].QueryStream(ctx, math.MinInt32, math.MaxInt32, allSeriesMatchers...) + require.Error(t, err) + assert.Equal(t, err, validation.LimitError(fmt.Sprintf(limiter.ErrMaxDataBytesHit, maxBytesLimit))) + } } func TestDistributor_Push_LabelRemoval(t *testing.T) { @@ -1423,32 +1547,34 @@ func TestDistributor_Push_LabelRemoval(t *testing.T) { } for _, tc := range cases { - var err error - var limits validation.Limits - flagext.DefaultValues(&limits) - limits.DropLabels = tc.removeLabels - limits.AcceptHASamples = tc.removeReplica - - ds, ingesters, _, _ := prepare(t, prepConfig{ - numIngesters: 2, - happyIngesters: 2, - numDistributors: 1, - shardByAllLabels: true, - limits: &limits, - }) + for _, histogram := range []bool{true, false} { + var err error + var limits validation.Limits + flagext.DefaultValues(&limits) + limits.DropLabels = tc.removeLabels + limits.AcceptHASamples = tc.removeReplica - // Push the series to the distributor - req := mockWriteRequest([]labels.Labels{tc.inputSeries}, 1, 1) - _, err = ds[0].Push(ctx, req) - require.NoError(t, err) + ds, ingesters, _, _ := prepare(t, prepConfig{ + numIngesters: 2, + happyIngesters: 2, + numDistributors: 1, + shardByAllLabels: true, + limits: &limits, + }) + + // Push the series to the distributor + req := mockWriteRequest([]labels.Labels{tc.inputSeries}, 1, 1, histogram) + _, err = ds[0].Push(ctx, req) + require.NoError(t, err) - // Since each test pushes only 1 series, we do expect the ingester - // to have received exactly 1 series - for i := range ingesters { - timeseries := ingesters[i].series() - assert.Equal(t, 1, len(timeseries)) - for _, v := range timeseries { - assert.Equal(t, tc.expectedSeries, cortexpb.FromLabelAdaptersToLabels(v.Labels)) + // Since each test pushes only 1 series, we do expect the ingester + // to have received exactly 1 series + for i := range ingesters { + timeseries := ingesters[i].series() + assert.Equal(t, 1, len(timeseries)) + for _, v := range timeseries { + assert.Equal(t, tc.expectedSeries, cortexpb.FromLabelAdaptersToLabels(v.Labels)) + } } } } @@ -1490,7 +1616,7 @@ func TestDistributor_Push_LabelRemoval_RemovingNameLabelWillError(t *testing.T) }) // Push the series to the distributor - req := mockWriteRequest([]labels.Labels{tc.inputSeries}, 1, 1) + req := mockWriteRequest([]labels.Labels{tc.inputSeries}, 1, 1, false) _, err = ds[0].Push(ctx, req) require.Error(t, err) assert.Equal(t, "rpc error: code = Code(400) desc = sample missing metric name", err.Error()) @@ -1587,7 +1713,7 @@ func TestDistributor_Push_ShouldGuaranteeShardingTokenConsistencyOverTheTime(t * }) // Push the series to the distributor - req := mockWriteRequest([]labels.Labels{testData.inputSeries}, 1, 1) + req := mockWriteRequest([]labels.Labels{testData.inputSeries}, 1, 1, false) _, err := ds[0].Push(ctx, req) require.NoError(t, err) @@ -1639,25 +1765,28 @@ func TestDistributor_Push_LabelNameValidation(t *testing.T) { for testName, tc := range tests { tc := tc - t.Run(testName, func(t *testing.T) { - t.Parallel() - ds, _, _, _ := prepare(t, prepConfig{ - numIngesters: 2, - happyIngesters: 2, - numDistributors: 1, - shuffleShardSize: 1, - skipLabelNameValidation: tc.skipLabelNameValidationCfg, + for _, histogram := range []bool{true, false} { + histogram := histogram + t.Run(fmt.Sprintf("%s, histogram=%s", testName, strconv.FormatBool(histogram)), func(t *testing.T) { + t.Parallel() + ds, _, _, _ := prepare(t, prepConfig{ + numIngesters: 2, + happyIngesters: 2, + numDistributors: 1, + shuffleShardSize: 1, + skipLabelNameValidation: tc.skipLabelNameValidationCfg, + }) + req := mockWriteRequest([]labels.Labels{tc.inputLabels}, 42, 100000, histogram) + req.SkipLabelNameValidation = tc.skipLabelNameValidationReq + _, err := ds[0].Push(ctx, req) + if tc.errExpected { + fromError, _ := status.FromError(err) + assert.Equal(t, tc.errMessage, fromError.Message()) + } else { + assert.Nil(t, err) + } }) - req := mockWriteRequest([]labels.Labels{tc.inputLabels}, 42, 100000) - req.SkipLabelNameValidation = tc.skipLabelNameValidationReq - _, err := ds[0].Push(ctx, req) - if tc.errExpected { - fromError, _ := status.FromError(err) - assert.Equal(t, tc.errMessage, fromError.Message()) - } else { - assert.Nil(t, err) - } - }) + } } } @@ -2117,32 +2246,34 @@ func TestSlowQueries(t *testing.T) { func TestDistributor_MetricsForLabelMatchers_SingleSlowIngester(t *testing.T) { t.Parallel() - // Create distributor - ds, ing, _, _ := prepare(t, prepConfig{ - numIngesters: 3, - happyIngesters: 3, - numDistributors: 1, - shardByAllLabels: true, - shuffleShardEnabled: true, - shuffleShardSize: 3, - replicationFactor: 3, - }) + for _, histogram := range []bool{true, false} { + // Create distributor + ds, ing, _, _ := prepare(t, prepConfig{ + numIngesters: 3, + happyIngesters: 3, + numDistributors: 1, + shardByAllLabels: true, + shuffleShardEnabled: true, + shuffleShardSize: 3, + replicationFactor: 3, + }) - ing[2].queryDelay = 50 * time.Millisecond + ing[2].queryDelay = 50 * time.Millisecond - ctx := user.InjectOrgID(context.Background(), "test") + ctx := user.InjectOrgID(context.Background(), "test") - now := model.Now() + now := model.Now() - for i := 0; i < 100; i++ { - req := mockWriteRequest([]labels.Labels{{{Name: labels.MetricName, Value: "test"}, {Name: "app", Value: "m"}, {Name: "uniq8", Value: strconv.Itoa(i)}}}, 1, now.Unix()) - _, err := ds[0].Push(ctx, req) - require.NoError(t, err) - } + for i := 0; i < 100; i++ { + req := mockWriteRequest([]labels.Labels{{{Name: labels.MetricName, Value: "test"}, {Name: "app", Value: "m"}, {Name: "uniq8", Value: strconv.Itoa(i)}}}, 1, now.Unix(), histogram) + _, err := ds[0].Push(ctx, req) + require.NoError(t, err) + } - for i := 0; i < 50; i++ { - _, err := ds[0].MetricsForLabelMatchers(ctx, now, now, mustNewMatcher(labels.MatchEqual, model.MetricNameLabel, "test")) - require.NoError(t, err) + for i := 0; i < 50; i++ { + _, err := ds[0].MetricsForLabelMatchers(ctx, now, now, mustNewMatcher(labels.MatchEqual, model.MetricNameLabel, "test")) + require.NoError(t, err) + } } } @@ -2152,7 +2283,7 @@ func TestDistributor_MetricsForLabelMatchers(t *testing.T) { fixtures := []struct { lbls labels.Labels - value float64 + value int64 timestamp int64 }{ {labels.Labels{{Name: labels.MetricName, Value: "test_1"}, {Name: "status", Value: "200"}}, 1, 100000}, @@ -2282,61 +2413,64 @@ func TestDistributor_MetricsForLabelMatchers(t *testing.T) { for testName, testData := range tests { testData := testData - t.Run(testName, func(t *testing.T) { - t.Parallel() - now := model.Now() - - // Create distributor - ds, ingesters, _, _ := prepare(t, prepConfig{ - numIngesters: numIngesters, - happyIngesters: numIngesters, - numDistributors: 1, - shardByAllLabels: true, - shuffleShardEnabled: testData.shuffleShardEnabled, - shuffleShardSize: testData.shuffleShardSize, - }) - - // Push fixtures - ctx := user.InjectOrgID(context.Background(), "test") - ctx = limiter.AddQueryLimiterToContext(ctx, testData.queryLimiter) + for _, histogram := range []bool{true, false} { + histogram := histogram + t.Run(fmt.Sprintf("%s, histogram=%s", testName, strconv.FormatBool(histogram)), func(t *testing.T) { + t.Parallel() + now := model.Now() - for _, series := range fixtures { - req := mockWriteRequest([]labels.Labels{series.lbls}, series.value, series.timestamp) - _, err := ds[0].Push(ctx, req) - require.NoError(t, err) - } + // Create distributor + ds, ingesters, _, _ := prepare(t, prepConfig{ + numIngesters: numIngesters, + happyIngesters: numIngesters, + numDistributors: 1, + shardByAllLabels: true, + shuffleShardEnabled: testData.shuffleShardEnabled, + shuffleShardSize: testData.shuffleShardSize, + }) - { - metrics, err := ds[0].MetricsForLabelMatchers(ctx, now, now, testData.matchers...) + // Push fixtures + ctx := user.InjectOrgID(context.Background(), "test") + ctx = limiter.AddQueryLimiterToContext(ctx, testData.queryLimiter) - if testData.expectedErr != nil { - assert.ErrorIs(t, err, testData.expectedErr) - return + for _, series := range fixtures { + req := mockWriteRequest([]labels.Labels{series.lbls}, series.value, series.timestamp, histogram) + _, err := ds[0].Push(ctx, req) + require.NoError(t, err) } - require.NoError(t, err) - assert.ElementsMatch(t, testData.expectedResult, metrics) + { + metrics, err := ds[0].MetricsForLabelMatchers(ctx, now, now, testData.matchers...) - // Check how many ingesters have been queried. - // Due to the quorum the distributor could cancel the last request towards ingesters - // if all other ones are successful, so we're good either has been queried X or X-1 - // ingesters. - assert.Contains(t, []int{testData.expectedIngesters, testData.expectedIngesters - 1}, countMockIngestersCalls(ingesters, "MetricsForLabelMatchers")) - } + if testData.expectedErr != nil { + assert.ErrorIs(t, err, testData.expectedErr) + return + } - { - metrics, err := ds[0].MetricsForLabelMatchersStream(ctx, now, now, testData.matchers...) - if testData.expectedErr != nil { - assert.ErrorIs(t, err, testData.expectedErr) - return + require.NoError(t, err) + assert.ElementsMatch(t, testData.expectedResult, metrics) + + // Check how many ingesters have been queried. + // Due to the quorum the distributor could cancel the last request towards ingesters + // if all other ones are successful, so we're good either has been queried X or X-1 + // ingesters. + assert.Contains(t, []int{testData.expectedIngesters, testData.expectedIngesters - 1}, countMockIngestersCalls(ingesters, "MetricsForLabelMatchers")) } - require.NoError(t, err) - assert.ElementsMatch(t, testData.expectedResult, metrics) + { + metrics, err := ds[0].MetricsForLabelMatchersStream(ctx, now, now, testData.matchers...) + if testData.expectedErr != nil { + assert.ErrorIs(t, err, testData.expectedErr) + return + } - assert.Contains(t, []int{testData.expectedIngesters, testData.expectedIngesters - 1}, countMockIngestersCalls(ingesters, "MetricsForLabelMatchersStream")) - } - }) + require.NoError(t, err) + assert.ElementsMatch(t, testData.expectedResult, metrics) + + assert.Contains(t, []int{testData.expectedIngesters, testData.expectedIngesters - 1}, countMockIngestersCalls(ingesters, "MetricsForLabelMatchersStream")) + } + }) + } } } @@ -2474,7 +2608,7 @@ func TestDistributor_MetricsMetadata(t *testing.T) { // Push metadata ctx := user.InjectOrgID(context.Background(), "test") - req := makeWriteRequest(0, 0, 10) + req := makeWriteRequest(0, 0, 10, 0) _, err := ds[0].Push(ctx, req) require.NoError(t, err) @@ -2501,32 +2635,27 @@ func mustNewMatcher(t labels.MatchType, n, v string) *labels.Matcher { return m } -func mockWriteRequest(lbls []labels.Labels, value float64, timestampMs int64) *cortexpb.WriteRequest { - samples := make([]cortexpb.Sample, len(lbls)) - for i := range lbls { - samples[i] = cortexpb.Sample{ - TimestampMs: timestampMs, - Value: value, +func mockWriteRequest(lbls []labels.Labels, value int64, timestampMs int64, histogram bool) *cortexpb.WriteRequest { + var ( + samples []cortexpb.Sample + histograms []cortexpb.Histogram + ) + if histogram { + histograms = make([]cortexpb.Histogram, len(lbls)) + for i := range lbls { + histograms[i] = cortexpb.HistogramToHistogramProto(timestampMs, histogram_util.GenerateTestHistogram(int(value))) } - } - - return cortexpb.ToWriteRequest(lbls, samples, nil, nil, cortexpb.API) -} - -// nolint:unused -func mockHistogramWriteRequest(lbls []labels.Labels, value int, timestampMs int64, float bool) *cortexpb.WriteRequest { - histograms := make([]cortexpb.Histogram, len(lbls)) - for i := range lbls { - if float { - fh := histogram_util.GenerateTestFloatHistogram(value) - histograms[i] = cortexpb.FloatHistogramToHistogramProto(timestampMs, fh) - continue + } else { + samples = make([]cortexpb.Sample, len(lbls)) + for i := range lbls { + samples[i] = cortexpb.Sample{ + TimestampMs: timestampMs, + Value: float64(value), + } } - h := histogram_util.GenerateTestHistogram(value) - histograms[i] = cortexpb.HistogramToHistogramProto(timestampMs, h) } - return cortexpb.ToWriteRequest(lbls, nil, nil, histograms, cortexpb.API) + return cortexpb.ToWriteRequest(lbls, samples, nil, histograms, cortexpb.API) } type prepConfig struct { @@ -2714,7 +2843,7 @@ func stopAll(ds []*Distributor, r *ring.Ring) { r.StopAsync() } -func makeWriteRequest(startTimestampMs int64, samples int, metadata int) *cortexpb.WriteRequest { +func makeWriteRequest(startTimestampMs int64, samples int, metadata int, histograms int) *cortexpb.WriteRequest { request := &cortexpb.WriteRequest{} for i := 0; i < samples; i++ { request.Timeseries = append(request.Timeseries, makeWriteRequestTimeseries( @@ -2722,7 +2851,16 @@ func makeWriteRequest(startTimestampMs int64, samples int, metadata int) *cortex {Name: model.MetricNameLabel, Value: "foo"}, {Name: "bar", Value: "baz"}, {Name: "sample", Value: fmt.Sprintf("%d", i)}, - }, startTimestampMs+int64(i), float64(i))) + }, startTimestampMs+int64(i), i, false)) + } + + for i := 0; i < histograms; i++ { + request.Timeseries = append(request.Timeseries, makeWriteRequestTimeseries( + []cortexpb.LabelAdapter{ + {Name: model.MetricNameLabel, Value: "foo"}, + {Name: "bar", Value: "baz"}, + {Name: "histogram", Value: fmt.Sprintf("%d", i)}, + }, startTimestampMs+int64(i), i, true)) } for i := 0; i < metadata; i++ { @@ -2737,21 +2875,24 @@ func makeWriteRequest(startTimestampMs int64, samples int, metadata int) *cortex return request } -func makeWriteRequestTimeseries(labels []cortexpb.LabelAdapter, ts int64, value float64) cortexpb.PreallocTimeseries { - return cortexpb.PreallocTimeseries{ +func makeWriteRequestTimeseries(labels []cortexpb.LabelAdapter, ts int64, value int, histogram bool) cortexpb.PreallocTimeseries { + t := cortexpb.PreallocTimeseries{ TimeSeries: &cortexpb.TimeSeries{ Labels: labels, - Samples: []cortexpb.Sample{ - { - Value: value, - TimestampMs: ts, - }, - }, }, } + if histogram { + t.Histograms = append(t.Histograms, cortexpb.HistogramToHistogramProto(ts, histogram_util.GenerateTestHistogram(value))) + } else { + t.Samples = append(t.Samples, cortexpb.Sample{ + TimestampMs: ts, + Value: float64(value), + }) + } + return t } -func makeWriteRequestHA(samples int, replica, cluster string) *cortexpb.WriteRequest { +func makeWriteRequestHA(samples int, replica, cluster string, histogram bool) *cortexpb.WriteRequest { request := &cortexpb.WriteRequest{} for i := 0; i < samples; i++ { ts := cortexpb.PreallocTimeseries{ @@ -2765,11 +2906,17 @@ func makeWriteRequestHA(samples int, replica, cluster string) *cortexpb.WriteReq }, }, } - ts.Samples = []cortexpb.Sample{ - { - Value: float64(i), - TimestampMs: int64(i), - }, + if histogram { + ts.Histograms = []cortexpb.Histogram{ + cortexpb.HistogramToHistogramProto(int64(i), histogram_util.GenerateTestHistogram(i)), + } + } else { + ts.Samples = []cortexpb.Sample{ + { + Value: float64(i), + TimestampMs: int64(i), + }, + } } request.Timeseries = append(request.Timeseries, ts) } @@ -3207,12 +3354,15 @@ func TestDistributorValidation(t *testing.T) { ctx := user.InjectOrgID(context.Background(), "1") now := model.Now() future, past := now.Add(5*time.Hour), now.Add(-25*time.Hour) + testHistogram := histogram_util.GenerateTestHistogram(1) + testFloatHistogram := histogram_util.GenerateTestFloatHistogram(1) for i, tc := range []struct { - metadata []*cortexpb.MetricMetadata - labels []labels.Labels - samples []cortexpb.Sample - err error + metadata []*cortexpb.MetricMetadata + labels []labels.Labels + samples []cortexpb.Sample + histograms []cortexpb.Histogram + err error }{ // Test validation passes. { @@ -3222,6 +3372,9 @@ func TestDistributorValidation(t *testing.T) { TimestampMs: int64(now), Value: 1, }}, + histograms: []cortexpb.Histogram{ + cortexpb.HistogramToHistogramProto(int64(now), testHistogram), + }, }, // Test validation fails for very old samples. { @@ -3273,6 +3426,30 @@ func TestDistributorValidation(t *testing.T) { }}, err: httpgrpc.Errorf(http.StatusBadRequest, `metadata missing metric name`), }, + // Test maximum labels names per series for histogram samples. + { + labels: []labels.Labels{{{Name: labels.MetricName, Value: "testmetric"}, {Name: "foo", Value: "bar"}, {Name: "foo2", Value: "bar2"}}}, + histograms: []cortexpb.Histogram{ + cortexpb.HistogramToHistogramProto(int64(now), testHistogram), + }, + err: httpgrpc.Errorf(http.StatusBadRequest, `series has too many labels (actual: 3, limit: 2) series: 'testmetric{foo2="bar2", foo="bar"}'`), + }, + // Test validation fails for very old histogram samples. + { + labels: []labels.Labels{{{Name: labels.MetricName, Value: "testmetric"}, {Name: "foo", Value: "bar"}}}, + histograms: []cortexpb.Histogram{ + cortexpb.HistogramToHistogramProto(int64(past), testHistogram), + }, + err: httpgrpc.Errorf(http.StatusBadRequest, `timestamp too old: %d metric: "testmetric"`, past), + }, + // Test validation fails for histogram samples from the future. + { + labels: []labels.Labels{{{Name: labels.MetricName, Value: "testmetric"}, {Name: "foo", Value: "bar"}}}, + histograms: []cortexpb.Histogram{ + cortexpb.FloatHistogramToHistogramProto(int64(future), testFloatHistogram), + }, + err: httpgrpc.Errorf(http.StatusBadRequest, `timestamp too new: %d metric: "testmetric"`, future), + }, } { tc := tc t.Run(strconv.Itoa(i), func(t *testing.T) { @@ -3293,7 +3470,7 @@ func TestDistributorValidation(t *testing.T) { limits: &limits, }) - _, err := ds[0].Push(ctx, cortexpb.ToWriteRequest(tc.labels, tc.samples, tc.metadata, nil, cortexpb.API)) + _, err := ds[0].Push(ctx, cortexpb.ToWriteRequest(tc.labels, tc.samples, tc.metadata, tc.histograms, cortexpb.API)) require.Equal(t, tc.err, err) }) } @@ -3465,36 +3642,39 @@ func TestDistributor_Push_Relabel(t *testing.T) { for _, tc := range cases { tc := tc - t.Run(tc.name, func(t *testing.T) { - t.Parallel() - var err error - var limits validation.Limits - flagext.DefaultValues(&limits) - limits.MetricRelabelConfigs = tc.metricRelabelConfigs + for _, enableHistogram := range []bool{false, true} { + enableHistogram := enableHistogram + t.Run(fmt.Sprintf("%s, histogram=%s", tc.name, strconv.FormatBool(enableHistogram)), func(t *testing.T) { + t.Parallel() + var err error + var limits validation.Limits + flagext.DefaultValues(&limits) + limits.MetricRelabelConfigs = tc.metricRelabelConfigs - ds, ingesters, _, _ := prepare(t, prepConfig{ - numIngesters: 2, - happyIngesters: 2, - numDistributors: 1, - shardByAllLabels: true, - limits: &limits, - }) + ds, ingesters, _, _ := prepare(t, prepConfig{ + numIngesters: 2, + happyIngesters: 2, + numDistributors: 1, + shardByAllLabels: true, + limits: &limits, + }) - // Push the series to the distributor - req := mockWriteRequest(tc.inputSeries, 1, 1) - _, err = ds[0].Push(ctx, req) - require.NoError(t, err) + // Push the series to the distributor + req := mockWriteRequest(tc.inputSeries, 1, 1, enableHistogram) + _, err = ds[0].Push(ctx, req) + require.NoError(t, err) - // Since each test pushes only 1 series, we do expect the ingester - // to have received exactly 1 series - for i := range ingesters { - timeseries := ingesters[i].series() - assert.Equal(t, 1, len(timeseries)) - for _, v := range timeseries { - assert.Equal(t, tc.expectedSeries, cortexpb.FromLabelAdaptersToLabels(v.Labels)) + // Since each test pushes only 1 series, we do expect the ingester + // to have received exactly 1 series + for i := range ingesters { + timeseries := ingesters[i].series() + assert.Equal(t, 1, len(timeseries)) + for _, v := range timeseries { + assert.Equal(t, tc.expectedSeries, cortexpb.FromLabelAdaptersToLabels(v.Labels)) + } } - } - }) + }) + } } } @@ -3560,7 +3740,7 @@ func TestDistributor_Push_EmptyLabel(t *testing.T) { }) // Push the series to the distributor - req := mockWriteRequest(tc.inputSeries, 1, 1) + req := mockWriteRequest(tc.inputSeries, 1, 1, false) _, err = ds[0].Push(ctx, req) require.NoError(t, err) @@ -3613,7 +3793,7 @@ func TestDistributor_Push_RelabelDropWillExportMetricOfDroppedSamples(t *testing }) // Push the series to the distributor - req := mockWriteRequest(inputSeries, 1, 1) + req := mockWriteRequest(inputSeries, 1, 1, false) ctx := user.InjectOrgID(context.Background(), "userDistributorPushRelabelDropWillExportMetricOfDroppedSamples") _, err = ds[0].Push(ctx, req) require.NoError(t, err)