diff --git a/CHANGELOG.md b/CHANGELOG.md index 384fe240506..49ef11c3eb8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -22,6 +22,7 @@ * [ENHANCEMENT] Add retry logic to S3 bucket client. #5135 * [ENHANCEMENT] Update Go version to 1.20.1. #5159 * [ENHANCEMENT] Distributor: Reuse byte slices when serializing requests from distributors to ingesters. #5193 +* [ENHANCEMENT] Query Frontend: Add number of chunks and samples fetched in query stats. #5198 * [FEATURE] Querier/Query Frontend: support Prometheus /api/v1/status/buildinfo API. #4978 * [FEATURE] Ingester: Add active series to all_user_stats page. #4972 * [FEATURE] Ingester: Added `-blocks-storage.tsdb.head-chunks-write-queue-size` allowing to configure the size of the in-memory queue used before flushing chunks to the disk . #5000 diff --git a/pkg/distributor/query.go b/pkg/distributor/query.go index 6543185b2eb..f54bb737fa6 100644 --- a/pkg/distributor/query.go +++ b/pkg/distributor/query.go @@ -400,6 +400,8 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSet ri reqStats.AddFetchedSeries(uint64(len(resp.Chunkseries) + len(resp.Timeseries))) reqStats.AddFetchedChunkBytes(uint64(resp.ChunksSize())) reqStats.AddFetchedDataBytes(uint64(resp.Size())) + reqStats.AddFetchedChunks(uint64(resp.ChunksCount())) + reqStats.AddFetchedSamples(uint64(resp.SamplesCount())) return resp, nil } diff --git a/pkg/frontend/transport/handler.go b/pkg/frontend/transport/handler.go index 7763f79ea98..ee45a45b967 100644 --- a/pkg/frontend/transport/handler.go +++ b/pkg/frontend/transport/handler.go @@ -62,11 +62,11 @@ type Handler struct { roundTripper http.RoundTripper // Metrics. - querySeconds *prometheus.CounterVec - querySeries *prometheus.CounterVec - queryBytes *prometheus.CounterVec - queryDataBytes *prometheus.CounterVec - activeUsers *util.ActiveUsersCleanupService + querySeconds *prometheus.CounterVec + querySeries *prometheus.CounterVec + queryChunkBytes *prometheus.CounterVec + queryDataBytes *prometheus.CounterVec + activeUsers *util.ActiveUsersCleanupService } // NewHandler creates a new frontend handler. @@ -88,7 +88,7 @@ func NewHandler(cfg HandlerConfig, roundTripper http.RoundTripper, log log.Logge Help: "Number of series fetched to execute a query.", }, []string{"user"}) - h.queryBytes = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ + h.queryChunkBytes = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ Name: "cortex_query_fetched_chunks_bytes_total", Help: "Size of all chunks fetched to execute a query in bytes.", }, []string{"user"}) @@ -101,7 +101,7 @@ func NewHandler(cfg HandlerConfig, roundTripper http.RoundTripper, log log.Logge h.activeUsers = util.NewActiveUsersCleanupWithDefaultValues(func(user string) { h.querySeconds.DeleteLabelValues(user) h.querySeries.DeleteLabelValues(user) - h.queryBytes.DeleteLabelValues(user) + h.queryChunkBytes.DeleteLabelValues(user) h.queryDataBytes.DeleteLabelValues(user) }) // If cleaner stops or fail, we will simply not clean the metrics for inactive users. @@ -230,13 +230,15 @@ func (f *Handler) reportQueryStats(r *http.Request, queryString url.Values, quer userID := tenant.JoinTenantIDs(tenantIDs) wallTime := stats.LoadWallTime() numSeries := stats.LoadFetchedSeries() - numBytes := stats.LoadFetchedChunkBytes() + numChunks := stats.LoadFetchedChunks() + numSamples := stats.LoadFetchedSamples() + numChunkBytes := stats.LoadFetchedChunkBytes() numDataBytes := stats.LoadFetchedDataBytes() // Track stats. f.querySeconds.WithLabelValues(userID).Add(wallTime.Seconds()) f.querySeries.WithLabelValues(userID).Add(float64(numSeries)) - f.queryBytes.WithLabelValues(userID).Add(float64(numBytes)) + f.queryChunkBytes.WithLabelValues(userID).Add(float64(numChunkBytes)) f.queryDataBytes.WithLabelValues(userID).Add(float64(numDataBytes)) f.activeUsers.UpdateUserTimestamp(userID, time.Now()) @@ -249,7 +251,9 @@ func (f *Handler) reportQueryStats(r *http.Request, queryString url.Values, quer "response_time", queryResponseTime, "query_wall_time_seconds", wallTime.Seconds(), "fetched_series_count", numSeries, - "fetched_chunks_bytes", numBytes, + "fetched_chunks_count", numChunks, + "fetched_samples_count", numSamples, + "fetched_chunks_bytes", numChunkBytes, "fetched_data_bytes", numDataBytes, "status_code", statusCode, }, stats.LoadExtraFields()...) diff --git a/pkg/ingester/client/custom.go b/pkg/ingester/client/custom.go index e1c0af148c6..f174d670437 100644 --- a/pkg/ingester/client/custom.go +++ b/pkg/ingester/client/custom.go @@ -1,5 +1,11 @@ package client +import ( + "encoding/binary" + + "github.com/cortexproject/cortex/pkg/chunk/encoding" +) + // ChunksCount returns the number of chunks in response. func (m *QueryStreamResponse) ChunksCount() int { if len(m.Chunkseries) == 0 { @@ -27,3 +33,17 @@ func (m *QueryStreamResponse) ChunksSize() int { } return size } + +func (m *QueryStreamResponse) SamplesCount() (count int) { + for _, ts := range m.Timeseries { + count += len(ts.Samples) + } + for _, cs := range m.Chunkseries { + for _, c := range cs.Chunks { + if c.Encoding == int32(encoding.PrometheusXorChunk) { + count += int(binary.BigEndian.Uint16(c.Data)) + } + } + } + return +} diff --git a/pkg/querier/blocks_store_queryable.go b/pkg/querier/blocks_store_queryable.go index 841d7fdf6a3..468c3b8e970 100644 --- a/pkg/querier/blocks_store_queryable.go +++ b/pkg/querier/blocks_store_queryable.go @@ -692,16 +692,21 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores( } numSeries := len(mySeries) + numSamples, chunksCount := countSamplesAndChunks(mySeries...) chunkBytes := countChunkBytes(mySeries...) dataBytes := countDataBytes(mySeries...) reqStats.AddFetchedSeries(uint64(numSeries)) + reqStats.AddFetchedChunks(chunksCount) + reqStats.AddFetchedSamples(numSamples) reqStats.AddFetchedChunkBytes(uint64(chunkBytes)) reqStats.AddFetchedDataBytes(uint64(dataBytes)) level.Debug(spanLog).Log("msg", "received series from store-gateway", "instance", c.RemoteAddress(), "fetched series", numSeries, + "fetched chunks", chunksCount, + "fetched samples", numSamples, "fetched chunk bytes", chunkBytes, "fetched data bytes", dataBytes, "requested blocks", strings.Join(convertULIDsToString(blockIDs), " "), @@ -1018,6 +1023,19 @@ func countDataBytes(series ...*storepb.Series) (count int) { return count } +// countSamplesAndChunks counts the number of samples and number counts from the series. +func countSamplesAndChunks(series ...*storepb.Series) (samplesCount, chunksCount uint64) { + for _, s := range series { + chunksCount += uint64(len(s.Chunks)) + for _, c := range s.Chunks { + if c.Raw != nil { + samplesCount += uint64(c.Raw.XORNumSamples()) + } + } + } + return +} + // only retry connection issues func isRetryableError(err error) bool { switch status.Code(err) { diff --git a/pkg/querier/blocks_store_queryable_test.go b/pkg/querier/blocks_store_queryable_test.go index a0ccf602796..fb0b91752f5 100644 --- a/pkg/querier/blocks_store_queryable_test.go +++ b/pkg/querier/blocks_store_queryable_test.go @@ -1687,3 +1687,64 @@ func valuesFromSeries(name string, series ...labels.Labels) []string { sort.Strings(values) return values } + +func TestCountSamplesAndChunks(t *testing.T) { + c := chunkenc.NewXORChunk() + appender, err := c.Appender() + require.NoError(t, err) + samples := 300 + for i := 0; i < samples; i++ { + appender.Append(int64(i), float64(i)) + } + + for i, tc := range []struct { + serieses []*storepb.Series + expectedChunks uint64 + expectedSamples uint64 + }{ + { + serieses: []*storepb.Series{ + { + Chunks: []storepb.AggrChunk{ + { + Raw: &storepb.Chunk{ + Type: storepb.Chunk_XOR, + Data: c.Bytes(), + }, + }, + }, + }, + }, + expectedSamples: uint64(samples), + expectedChunks: 1, + }, + { + serieses: []*storepb.Series{ + { + Chunks: []storepb.AggrChunk{ + { + Raw: &storepb.Chunk{ + Type: storepb.Chunk_XOR, + Data: c.Bytes(), + }, + }, + { + Raw: &storepb.Chunk{ + Type: storepb.Chunk_XOR, + Data: c.Bytes(), + }, + }, + }, + }, + }, + expectedSamples: uint64(int64(samples) * 2), + expectedChunks: 2, + }, + } { + t.Run(fmt.Sprintf("test_case_%d", i), func(t *testing.T) { + samples, chunks := countSamplesAndChunks(tc.serieses...) + require.Equal(t, tc.expectedSamples, samples) + require.Equal(t, tc.expectedChunks, chunks) + }) + } +} diff --git a/pkg/querier/stats/stats.go b/pkg/querier/stats/stats.go index d5c71f86ed4..94a386c9c0b 100644 --- a/pkg/querier/stats/stats.go +++ b/pkg/querier/stats/stats.go @@ -148,7 +148,39 @@ func (s *QueryStats) LoadFetchedDataBytes() uint64 { return atomic.LoadUint64(&s.FetchedDataBytes) } -// Merge the provide Stats into this one. +func (s *QueryStats) AddFetchedSamples(count uint64) { + if s == nil { + return + } + + atomic.AddUint64(&s.FetchedSamplesCount, count) +} + +func (s *QueryStats) LoadFetchedSamples() uint64 { + if s == nil { + return 0 + } + + return atomic.LoadUint64(&s.FetchedSamplesCount) +} + +func (s *QueryStats) AddFetchedChunks(count uint64) { + if s == nil { + return + } + + atomic.AddUint64(&s.FetchedChunksCount, count) +} + +func (s *QueryStats) LoadFetchedChunks() uint64 { + if s == nil { + return 0 + } + + return atomic.LoadUint64(&s.FetchedChunksCount) +} + +// Merge the provided Stats into this one. func (s *QueryStats) Merge(other *QueryStats) { if s == nil || other == nil { return @@ -158,6 +190,8 @@ func (s *QueryStats) Merge(other *QueryStats) { s.AddFetchedSeries(other.LoadFetchedSeries()) s.AddFetchedChunkBytes(other.LoadFetchedChunkBytes()) s.AddFetchedDataBytes(other.LoadFetchedDataBytes()) + s.AddFetchedSamples(other.LoadFetchedSamples()) + s.AddFetchedChunks(other.LoadFetchedChunks()) s.AddExtraFields(other.LoadExtraFields()...) } diff --git a/pkg/querier/stats/stats.pb.go b/pkg/querier/stats/stats.pb.go index 39a1775ad6c..405dc3bbee7 100644 --- a/pkg/querier/stats/stats.pb.go +++ b/pkg/querier/stats/stats.pb.go @@ -42,6 +42,10 @@ type Stats struct { FetchedDataBytes uint64 `protobuf:"varint,4,opt,name=fetched_data_bytes,json=fetchedDataBytes,proto3" json:"fetched_data_bytes,omitempty"` // Extra fields to be reported on the stats log ExtraFields map[string]string `protobuf:"bytes,5,rep,name=extra_fields,json=extraFields,proto3" json:"extra_fields,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` + // The number of chunks fetched for the query + FetchedChunksCount uint64 `protobuf:"varint,6,opt,name=fetched_chunks_count,json=fetchedChunksCount,proto3" json:"fetched_chunks_count,omitempty"` + // The number of samples fetched for the query + FetchedSamplesCount uint64 `protobuf:"varint,7,opt,name=fetched_samples_count,json=fetchedSamplesCount,proto3" json:"fetched_samples_count,omitempty"` } func (m *Stats) Reset() { *m = Stats{} } @@ -111,6 +115,20 @@ func (m *Stats) GetExtraFields() map[string]string { return nil } +func (m *Stats) GetFetchedChunksCount() uint64 { + if m != nil { + return m.FetchedChunksCount + } + return 0 +} + +func (m *Stats) GetFetchedSamplesCount() uint64 { + if m != nil { + return m.FetchedSamplesCount + } + return 0 +} + func init() { proto.RegisterType((*Stats)(nil), "stats.Stats") proto.RegisterMapType((map[string]string)(nil), "stats.Stats.ExtraFieldsEntry") @@ -119,31 +137,33 @@ func init() { func init() { proto.RegisterFile("stats.proto", fileDescriptor_b4756a0aec8b9d44) } var fileDescriptor_b4756a0aec8b9d44 = []byte{ - // 377 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x5c, 0x91, 0xbf, 0x6e, 0xea, 0x30, - 0x18, 0xc5, 0x63, 0x20, 0x57, 0xe0, 0xdc, 0x81, 0xeb, 0xcb, 0x10, 0x90, 0xae, 0x41, 0x77, 0x62, - 0xa8, 0x42, 0x45, 0x97, 0xaa, 0x95, 0x2a, 0xc4, 0x9f, 0x3e, 0x40, 0xe8, 0xd4, 0x25, 0x32, 0xc4, - 0x84, 0x88, 0x10, 0x57, 0x89, 0xd3, 0x36, 0x5b, 0x1f, 0xa1, 0x63, 0x1f, 0xa1, 0x8f, 0xc2, 0xc8, - 0x88, 0x54, 0xa9, 0x2d, 0x61, 0xe9, 0xc8, 0x23, 0x54, 0xb6, 0x83, 0x2a, 0xb1, 0xf9, 0xfb, 0x7e, - 0xe7, 0x1c, 0xe9, 0x3b, 0x86, 0x46, 0xcc, 0x09, 0x8f, 0xad, 0xbb, 0x88, 0x71, 0x86, 0x74, 0x39, - 0x34, 0x6a, 0x1e, 0xf3, 0x98, 0xdc, 0x74, 0xc4, 0x4b, 0xc1, 0x06, 0xf6, 0x18, 0xf3, 0x02, 0xda, - 0x91, 0xd3, 0x24, 0x99, 0x75, 0xdc, 0x24, 0x22, 0xdc, 0x67, 0x61, 0xce, 0xeb, 0xc7, 0x9c, 0x84, - 0xa9, 0x42, 0xff, 0xdf, 0x0a, 0x50, 0x1f, 0x8b, 0x68, 0xd4, 0x83, 0x95, 0x07, 0x12, 0x04, 0x0e, - 0xf7, 0x97, 0xd4, 0x04, 0x2d, 0xd0, 0x36, 0xba, 0x75, 0x4b, 0x19, 0xad, 0x83, 0xd1, 0x1a, 0xe6, - 0xc1, 0xfd, 0xf2, 0xea, 0xbd, 0xa9, 0xbd, 0x7c, 0x34, 0x81, 0x5d, 0x16, 0xae, 0x1b, 0x7f, 0x49, - 0xd1, 0x29, 0xac, 0xcd, 0x28, 0x9f, 0xce, 0xa9, 0xeb, 0xc4, 0x34, 0xf2, 0x69, 0xec, 0x4c, 0x59, - 0x12, 0x72, 0xb3, 0xd0, 0x02, 0xed, 0x92, 0x8d, 0x72, 0x36, 0x96, 0x68, 0x20, 0x08, 0xb2, 0xe0, - 0xdf, 0x83, 0x63, 0x3a, 0x4f, 0xc2, 0x85, 0x33, 0x49, 0x39, 0x8d, 0xcd, 0xa2, 0x34, 0xfc, 0xc9, - 0xd1, 0x40, 0x90, 0xbe, 0x00, 0xe8, 0x04, 0x1e, 0x52, 0x1c, 0x97, 0x70, 0x92, 0xcb, 0x4b, 0x52, - 0x5e, 0xcd, 0xc9, 0x90, 0x70, 0xa2, 0xd4, 0x3d, 0xf8, 0x9b, 0x3e, 0xf2, 0x88, 0x38, 0x33, 0x9f, - 0x06, 0x6e, 0x6c, 0xea, 0xad, 0x62, 0xdb, 0xe8, 0xfe, 0xb3, 0x54, 0xaf, 0xf2, 0x6a, 0x6b, 0x24, - 0x04, 0xd7, 0x92, 0x8f, 0x42, 0x1e, 0xa5, 0xb6, 0x41, 0x7f, 0x36, 0x8d, 0x2b, 0x58, 0x3d, 0x16, - 0xa0, 0x2a, 0x2c, 0x2e, 0x68, 0x2a, 0x1b, 0xaa, 0xd8, 0xe2, 0x89, 0x6a, 0x50, 0xbf, 0x27, 0x41, - 0x42, 0xe5, 0xa1, 0x15, 0x5b, 0x0d, 0x17, 0x85, 0x73, 0xd0, 0xbf, 0x5c, 0x6f, 0xb1, 0xb6, 0xd9, - 0x62, 0x6d, 0xbf, 0xc5, 0xe0, 0x29, 0xc3, 0xe0, 0x35, 0xc3, 0x60, 0x95, 0x61, 0xb0, 0xce, 0x30, - 0xf8, 0xcc, 0x30, 0xf8, 0xca, 0xb0, 0xb6, 0xcf, 0x30, 0x78, 0xde, 0x61, 0x6d, 0xbd, 0xc3, 0xda, - 0x66, 0x87, 0xb5, 0x5b, 0xf5, 0xd7, 0x93, 0x5f, 0xb2, 0xf5, 0xb3, 0xef, 0x00, 0x00, 0x00, 0xff, - 0xff, 0xe2, 0xdd, 0x06, 0xc8, 0x08, 0x02, 0x00, 0x00, + // 411 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x5c, 0x92, 0xcf, 0xae, 0xd2, 0x40, + 0x14, 0xc6, 0x3b, 0x94, 0x22, 0x4c, 0x5d, 0xe0, 0x88, 0x49, 0x21, 0x71, 0x20, 0xae, 0x58, 0x98, + 0x62, 0x70, 0x63, 0x34, 0x31, 0x84, 0x3f, 0x3e, 0x40, 0x71, 0xe5, 0xa6, 0x19, 0xe8, 0x50, 0x1a, + 0x4a, 0x87, 0xb4, 0x53, 0xb5, 0x3b, 0x1f, 0xc1, 0xa5, 0x8f, 0xe0, 0x6b, 0xb8, 0x63, 0xc9, 0x92, + 0x95, 0x4a, 0xd9, 0xb8, 0xe4, 0x11, 0xcc, 0xcc, 0xb4, 0xdc, 0x7b, 0xd9, 0xcd, 0x99, 0xdf, 0xf9, + 0x4e, 0xce, 0xf7, 0xcd, 0x40, 0x33, 0xe1, 0x84, 0x27, 0xf6, 0x2e, 0x66, 0x9c, 0x21, 0x43, 0x16, + 0x9d, 0x96, 0xcf, 0x7c, 0x26, 0x6f, 0x06, 0xe2, 0xa4, 0x60, 0x07, 0xfb, 0x8c, 0xf9, 0x21, 0x1d, + 0xc8, 0x6a, 0x91, 0xae, 0x06, 0x5e, 0x1a, 0x13, 0x1e, 0xb0, 0xa8, 0xe0, 0xed, 0x5b, 0x4e, 0xa2, + 0x4c, 0xa1, 0x17, 0xbf, 0x74, 0x68, 0xcc, 0xc5, 0x68, 0x34, 0x82, 0x8d, 0x2f, 0x24, 0x0c, 0x5d, + 0x1e, 0x6c, 0xa9, 0x05, 0x7a, 0xa0, 0x6f, 0x0e, 0xdb, 0xb6, 0x12, 0xda, 0xa5, 0xd0, 0x9e, 0x16, + 0x83, 0xc7, 0xf5, 0xfd, 0xef, 0xae, 0xf6, 0xe3, 0x4f, 0x17, 0x38, 0x75, 0xa1, 0xfa, 0x18, 0x6c, + 0x29, 0x7a, 0x05, 0x5b, 0x2b, 0xca, 0x97, 0x6b, 0xea, 0xb9, 0x09, 0x8d, 0x03, 0x9a, 0xb8, 0x4b, + 0x96, 0x46, 0xdc, 0xaa, 0xf4, 0x40, 0xbf, 0xea, 0xa0, 0x82, 0xcd, 0x25, 0x9a, 0x08, 0x82, 0x6c, + 0xf8, 0xb4, 0x54, 0x2c, 0xd7, 0x69, 0xb4, 0x71, 0x17, 0x19, 0xa7, 0x89, 0xa5, 0x4b, 0xc1, 0x93, + 0x02, 0x4d, 0x04, 0x19, 0x0b, 0x80, 0x5e, 0xc2, 0x72, 0x8a, 0xeb, 0x11, 0x4e, 0x8a, 0xf6, 0xaa, + 0x6c, 0x6f, 0x16, 0x64, 0x4a, 0x38, 0x51, 0xdd, 0x23, 0xf8, 0x98, 0x7e, 0xe5, 0x31, 0x71, 0x57, + 0x01, 0x0d, 0xbd, 0xc4, 0x32, 0x7a, 0x7a, 0xdf, 0x1c, 0x3e, 0xb7, 0x55, 0xae, 0xd2, 0xb5, 0x3d, + 0x13, 0x0d, 0x1f, 0x24, 0x9f, 0x45, 0x3c, 0xce, 0x1c, 0x93, 0xde, 0xdd, 0xdc, 0x77, 0x24, 0xf7, + 0x2b, 0x1d, 0xd5, 0x1e, 0x38, 0x92, 0x0b, 0x16, 0x8e, 0x86, 0xf0, 0xd9, 0x35, 0x03, 0xb2, 0xdd, + 0x85, 0xd7, 0x10, 0x1e, 0x49, 0x49, 0x69, 0x77, 0xae, 0x98, 0xd4, 0x74, 0xde, 0xc3, 0xe6, 0xed, + 0x1a, 0xa8, 0x09, 0xf5, 0x0d, 0xcd, 0xe4, 0x3b, 0x34, 0x1c, 0x71, 0x44, 0x2d, 0x68, 0x7c, 0x26, + 0x61, 0x4a, 0x65, 0x9c, 0x0d, 0x47, 0x15, 0x6f, 0x2b, 0x6f, 0xc0, 0xf8, 0xdd, 0xe1, 0x84, 0xb5, + 0xe3, 0x09, 0x6b, 0x97, 0x13, 0x06, 0xdf, 0x72, 0x0c, 0x7e, 0xe6, 0x18, 0xec, 0x73, 0x0c, 0x0e, + 0x39, 0x06, 0x7f, 0x73, 0x0c, 0xfe, 0xe5, 0x58, 0xbb, 0xe4, 0x18, 0x7c, 0x3f, 0x63, 0xed, 0x70, + 0xc6, 0xda, 0xf1, 0x8c, 0xb5, 0x4f, 0xea, 0x47, 0x2d, 0x6a, 0xf2, 0x6d, 0x5f, 0xff, 0x0f, 0x00, + 0x00, 0xff, 0xff, 0xda, 0x89, 0xe2, 0x3c, 0x6e, 0x02, 0x00, 0x00, } func (this *Stats) Equal(that interface{}) bool { @@ -185,13 +205,19 @@ func (this *Stats) Equal(that interface{}) bool { return false } } + if this.FetchedChunksCount != that1.FetchedChunksCount { + return false + } + if this.FetchedSamplesCount != that1.FetchedSamplesCount { + return false + } return true } func (this *Stats) GoString() string { if this == nil { return "nil" } - s := make([]string, 0, 9) + s := make([]string, 0, 11) s = append(s, "&stats.Stats{") s = append(s, "WallTime: "+fmt.Sprintf("%#v", this.WallTime)+",\n") s = append(s, "FetchedSeriesCount: "+fmt.Sprintf("%#v", this.FetchedSeriesCount)+",\n") @@ -210,6 +236,8 @@ func (this *Stats) GoString() string { if this.ExtraFields != nil { s = append(s, "ExtraFields: "+mapStringForExtraFields+",\n") } + s = append(s, "FetchedChunksCount: "+fmt.Sprintf("%#v", this.FetchedChunksCount)+",\n") + s = append(s, "FetchedSamplesCount: "+fmt.Sprintf("%#v", this.FetchedSamplesCount)+",\n") s = append(s, "}") return strings.Join(s, "") } @@ -241,6 +269,16 @@ func (m *Stats) MarshalToSizedBuffer(dAtA []byte) (int, error) { _ = i var l int _ = l + if m.FetchedSamplesCount != 0 { + i = encodeVarintStats(dAtA, i, uint64(m.FetchedSamplesCount)) + i-- + dAtA[i] = 0x38 + } + if m.FetchedChunksCount != 0 { + i = encodeVarintStats(dAtA, i, uint64(m.FetchedChunksCount)) + i-- + dAtA[i] = 0x30 + } if len(m.ExtraFields) > 0 { for k := range m.ExtraFields { v := m.ExtraFields[k] @@ -322,6 +360,12 @@ func (m *Stats) Size() (n int) { n += mapEntrySize + 1 + sovStats(uint64(mapEntrySize)) } } + if m.FetchedChunksCount != 0 { + n += 1 + sovStats(uint64(m.FetchedChunksCount)) + } + if m.FetchedSamplesCount != 0 { + n += 1 + sovStats(uint64(m.FetchedSamplesCount)) + } return n } @@ -351,6 +395,8 @@ func (this *Stats) String() string { `FetchedChunkBytes:` + fmt.Sprintf("%v", this.FetchedChunkBytes) + `,`, `FetchedDataBytes:` + fmt.Sprintf("%v", this.FetchedDataBytes) + `,`, `ExtraFields:` + mapStringForExtraFields + `,`, + `FetchedChunksCount:` + fmt.Sprintf("%v", this.FetchedChunksCount) + `,`, + `FetchedSamplesCount:` + fmt.Sprintf("%v", this.FetchedSamplesCount) + `,`, `}`, }, "") return s @@ -609,6 +655,44 @@ func (m *Stats) Unmarshal(dAtA []byte) error { } m.ExtraFields[mapkey] = mapvalue iNdEx = postIndex + case 6: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field FetchedChunksCount", wireType) + } + m.FetchedChunksCount = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowStats + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.FetchedChunksCount |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + case 7: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field FetchedSamplesCount", wireType) + } + m.FetchedSamplesCount = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowStats + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.FetchedSamplesCount |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } default: iNdEx = preIndex skippy, err := skipStats(dAtA[iNdEx:]) diff --git a/pkg/querier/stats/stats.proto b/pkg/querier/stats/stats.proto index d61d00f7573..7e8ba2003ea 100644 --- a/pkg/querier/stats/stats.proto +++ b/pkg/querier/stats/stats.proto @@ -22,4 +22,8 @@ message Stats { uint64 fetched_data_bytes = 4; // Extra fields to be reported on the stats log map extra_fields = 5; + // The number of chunks fetched for the query + uint64 fetched_chunks_count = 6; + // The number of samples fetched for the query + uint64 fetched_samples_count = 7; } diff --git a/pkg/querier/stats/stats_test.go b/pkg/querier/stats/stats_test.go index 3883cb88525..d9658acc2fb 100644 --- a/pkg/querier/stats/stats_test.go +++ b/pkg/querier/stats/stats_test.go @@ -97,6 +97,42 @@ func TestStats_AddFetchedDataBytes(t *testing.T) { }) } +func TestStats_AddFetchedChunks(t *testing.T) { + t.Parallel() + t.Run("add and load chunks", func(t *testing.T) { + stats, _ := ContextWithEmptyStats(context.Background()) + stats.AddFetchedChunks(4096) + stats.AddFetchedChunks(4096) + + assert.Equal(t, uint64(8192), stats.LoadFetchedChunks()) + }) + + t.Run("add and load chunks nil receiver", func(t *testing.T) { + var stats *QueryStats + stats.AddFetchedChunks(1024) + + assert.Equal(t, uint64(0), stats.LoadFetchedChunks()) + }) +} + +func TestStats_AddFetchedSamples(t *testing.T) { + t.Parallel() + t.Run("add and load samples", func(t *testing.T) { + stats, _ := ContextWithEmptyStats(context.Background()) + stats.AddFetchedSamples(4096) + stats.AddFetchedSamples(4096) + + assert.Equal(t, uint64(8192), stats.LoadFetchedSamples()) + }) + + t.Run("add and load samples nil receiver", func(t *testing.T) { + var stats *QueryStats + stats.AddFetchedSamples(1024) + + assert.Equal(t, uint64(0), stats.LoadFetchedSamples()) + }) +} + func TestStats_Merge(t *testing.T) { t.Parallel() t.Run("merge two stats objects", func(t *testing.T) {