@@ -1122,19 +1122,21 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte
1122
1122
// Keep track of some stats which are tracked only if the samples will be
1123
1123
// successfully committed
1124
1124
var (
1125
- succeededSamplesCount = 0
1126
- failedSamplesCount = 0
1127
- succeededExemplarsCount = 0
1128
- failedExemplarsCount = 0
1129
- startAppend = time .Now ()
1130
- sampleOutOfBoundsCount = 0
1131
- sampleOutOfOrderCount = 0
1132
- sampleTooOldCount = 0
1133
- newValueForTimestampCount = 0
1134
- perUserSeriesLimitCount = 0
1135
- perLabelSetSeriesLimitCount = 0
1136
- perMetricSeriesLimitCount = 0
1137
- nativeHistogramCount = 0
1125
+ succeededSamplesCount = 0
1126
+ failedSamplesCount = 0
1127
+ succeededHistogramsCount = 0
1128
+ failedHistogramsCount = 0
1129
+ succeededExemplarsCount = 0
1130
+ failedExemplarsCount = 0
1131
+ startAppend = time .Now ()
1132
+ sampleOutOfBoundsCount = 0
1133
+ sampleOutOfOrderCount = 0
1134
+ sampleTooOldCount = 0
1135
+ newValueForTimestampCount = 0
1136
+ perUserSeriesLimitCount = 0
1137
+ perLabelSetSeriesLimitCount = 0
1138
+ perMetricSeriesLimitCount = 0
1139
+ discardedNativeHistogramCount = 0
1138
1140
1139
1141
updateFirstPartial = func (errFn func () error ) {
1140
1142
if firstPartialErr == nil {
@@ -1215,6 +1217,8 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte
1215
1217
1216
1218
// To find out if any sample was added to this series, we keep old value.
1217
1219
oldSucceededSamplesCount := succeededSamplesCount
1220
+ // To find out if any histogram was added to this series, we keep old value.
1221
+ oldSucceededHistogramsCount := succeededHistogramsCount
1218
1222
1219
1223
for _ , s := range ts .Samples {
1220
1224
var err error
@@ -1266,19 +1270,19 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte
1266
1270
1267
1271
if ref != 0 {
1268
1272
if _ , err = app .AppendHistogram (ref , copiedLabels , hp .TimestampMs , h , fh ); err == nil {
1269
- succeededSamplesCount ++
1273
+ succeededHistogramsCount ++
1270
1274
continue
1271
1275
}
1272
1276
} else {
1273
1277
// Copy the label set because both TSDB and the active series tracker may retain it.
1274
1278
copiedLabels = cortexpb .FromLabelAdaptersToLabelsWithCopy (ts .Labels )
1275
1279
if ref , err = app .AppendHistogram (0 , copiedLabels , hp .TimestampMs , h , fh ); err == nil {
1276
- succeededSamplesCount ++
1280
+ succeededHistogramsCount ++
1277
1281
continue
1278
1282
}
1279
1283
}
1280
1284
1281
- failedSamplesCount ++
1285
+ failedHistogramsCount ++
1282
1286
1283
1287
if rollback := handleAppendFailure (err , hp .TimestampMs , ts .Labels , copiedLabels ); ! rollback {
1284
1288
continue
@@ -1290,12 +1294,12 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte
1290
1294
return nil , wrapWithUser (err , userID )
1291
1295
}
1292
1296
} else {
1293
- nativeHistogramCount += len (ts .Histograms )
1297
+ discardedNativeHistogramCount += len (ts .Histograms )
1294
1298
}
1295
-
1296
- if i .cfg .ActiveSeriesMetricsEnabled && succeededSamplesCount > oldSucceededSamplesCount {
1299
+ shouldUpdateSeries := ( succeededSamplesCount > oldSucceededSamplesCount ) || ( succeededHistogramsCount > oldSucceededHistogramsCount )
1300
+ if i .cfg .ActiveSeriesMetricsEnabled && shouldUpdateSeries {
1297
1301
db .activeSeries .UpdateSeries (tsLabels , tsLabelsHash , startAppend , func (l labels.Labels ) labels.Labels {
1298
- // we must already have copied the labels if succeededSamplesCount has been incremented.
1302
+ // we must already have copied the labels if succeededSamplesCount or succeededHistogramsCount has been incremented.
1299
1303
return copiedLabels
1300
1304
})
1301
1305
}
@@ -1343,8 +1347,8 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte
1343
1347
}
1344
1348
i .TSDBState .appenderCommitDuration .Observe (time .Since (startCommit ).Seconds ())
1345
1349
1346
- // If only invalid samples are pushed, don't change "last update", as TSDB was not modified.
1347
- if succeededSamplesCount > 0 {
1350
+ // If only invalid samples or histograms are pushed, don't change "last update", as TSDB was not modified.
1351
+ if succeededSamplesCount > 0 || succeededHistogramsCount > 0 {
1348
1352
db .setLastUpdate (time .Now ())
1349
1353
}
1350
1354
@@ -1353,6 +1357,8 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte
1353
1357
// which will be converted into an HTTP 5xx and the client should/will retry.
1354
1358
i .metrics .ingestedSamples .Add (float64 (succeededSamplesCount ))
1355
1359
i .metrics .ingestedSamplesFail .Add (float64 (failedSamplesCount ))
1360
+ i .metrics .ingestedHistograms .Add (float64 (succeededHistogramsCount ))
1361
+ i .metrics .ingestedHistogramsFail .Add (float64 (failedHistogramsCount ))
1356
1362
i .metrics .ingestedExemplars .Add (float64 (succeededExemplarsCount ))
1357
1363
i .metrics .ingestedExemplarsFail .Add (float64 (failedExemplarsCount ))
1358
1364
@@ -1378,20 +1384,20 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte
1378
1384
i .validateMetrics .DiscardedSamples .WithLabelValues (perLabelsetSeriesLimit , userID ).Add (float64 (perLabelSetSeriesLimitCount ))
1379
1385
}
1380
1386
1381
- if ! i .cfg .BlocksStorageConfig .TSDB .EnableNativeHistograms && nativeHistogramCount > 0 {
1382
- i .validateMetrics .DiscardedSamples .WithLabelValues (nativeHistogramSample , userID ).Add (float64 (nativeHistogramCount ))
1387
+ if ! i .cfg .BlocksStorageConfig .TSDB .EnableNativeHistograms && discardedNativeHistogramCount > 0 {
1388
+ i .validateMetrics .DiscardedSamples .WithLabelValues (nativeHistogramSample , userID ).Add (float64 (discardedNativeHistogramCount ))
1383
1389
}
1384
1390
1385
1391
// Distributor counts both samples, metadata and histograms, so for consistency ingester does the same.
1386
- i .ingestionRate .Add (int64 (succeededSamplesCount + ingestedMetadata ))
1392
+ i .ingestionRate .Add (int64 (succeededSamplesCount + succeededHistogramsCount + ingestedMetadata ))
1387
1393
1388
1394
switch req .Source {
1389
1395
case cortexpb .RULE :
1390
- db .ingestedRuleSamples .Add (int64 (succeededSamplesCount ))
1396
+ db .ingestedRuleSamples .Add (int64 (succeededSamplesCount + succeededHistogramsCount ))
1391
1397
case cortexpb .API :
1392
1398
fallthrough
1393
1399
default :
1394
- db .ingestedAPISamples .Add (int64 (succeededSamplesCount ))
1400
+ db .ingestedAPISamples .Add (int64 (succeededSamplesCount + succeededHistogramsCount ))
1395
1401
}
1396
1402
1397
1403
if firstPartialErr != nil {
@@ -1400,7 +1406,7 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte
1400
1406
if errors .As (firstPartialErr , & ve ) {
1401
1407
code = ve .code
1402
1408
}
1403
- level .Debug (logutil .WithContext (ctx , i .logger )).Log ("msg" , "partial failures to push" , "totalSamples" , succeededSamplesCount + failedSamplesCount , "failedSamples" , failedSamplesCount , "firstPartialErr" , firstPartialErr )
1409
+ level .Debug (logutil .WithContext (ctx , i .logger )).Log ("msg" , "partial failures to push" , "totalSamples" , succeededSamplesCount + failedSamplesCount , "failedSamples" , failedSamplesCount , "totalHistograms" , succeededHistogramsCount + failedHistogramsCount , "failedHistograms" , failedHistogramsCount , " firstPartialErr" , firstPartialErr )
1404
1410
return & cortexpb.WriteResponse {}, httpgrpc .Errorf (code , wrapWithUser (firstPartialErr , userID ).Error ())
1405
1411
}
1406
1412
0 commit comments