Skip to content

Add fetched samples and chunks count into querier stats #5198

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Mar 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
* [ENHANCEMENT] Add retry logic to S3 bucket client. #5135
* [ENHANCEMENT] Update Go version to 1.20.1. #5159
* [ENHANCEMENT] Distributor: Reuse byte slices when serializing requests from distributors to ingesters. #5193
* [ENHANCEMENT] Query Frontend: Add number of chunks and samples fetched in query stats. #5198
* [FEATURE] Querier/Query Frontend: support Prometheus /api/v1/status/buildinfo API. #4978
* [FEATURE] Ingester: Add active series to all_user_stats page. #4972
* [FEATURE] Ingester: Added `-blocks-storage.tsdb.head-chunks-write-queue-size` allowing to configure the size of the in-memory queue used before flushing chunks to the disk . #5000
Expand Down
2 changes: 2 additions & 0 deletions pkg/distributor/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,8 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSet ri
reqStats.AddFetchedSeries(uint64(len(resp.Chunkseries) + len(resp.Timeseries)))
reqStats.AddFetchedChunkBytes(uint64(resp.ChunksSize()))
reqStats.AddFetchedDataBytes(uint64(resp.Size()))
reqStats.AddFetchedChunks(uint64(resp.ChunksCount()))
reqStats.AddFetchedSamples(uint64(resp.SamplesCount()))

return resp, nil
}
Expand Down
24 changes: 14 additions & 10 deletions pkg/frontend/transport/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,11 @@ type Handler struct {
roundTripper http.RoundTripper

// Metrics.
querySeconds *prometheus.CounterVec
querySeries *prometheus.CounterVec
queryBytes *prometheus.CounterVec
queryDataBytes *prometheus.CounterVec
activeUsers *util.ActiveUsersCleanupService
querySeconds *prometheus.CounterVec
querySeries *prometheus.CounterVec
queryChunkBytes *prometheus.CounterVec
queryDataBytes *prometheus.CounterVec
activeUsers *util.ActiveUsersCleanupService
}

// NewHandler creates a new frontend handler.
Expand All @@ -88,7 +88,7 @@ func NewHandler(cfg HandlerConfig, roundTripper http.RoundTripper, log log.Logge
Help: "Number of series fetched to execute a query.",
}, []string{"user"})

h.queryBytes = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
h.queryChunkBytes = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_query_fetched_chunks_bytes_total",
Help: "Size of all chunks fetched to execute a query in bytes.",
}, []string{"user"})
Expand All @@ -101,7 +101,7 @@ func NewHandler(cfg HandlerConfig, roundTripper http.RoundTripper, log log.Logge
h.activeUsers = util.NewActiveUsersCleanupWithDefaultValues(func(user string) {
h.querySeconds.DeleteLabelValues(user)
h.querySeries.DeleteLabelValues(user)
h.queryBytes.DeleteLabelValues(user)
h.queryChunkBytes.DeleteLabelValues(user)
h.queryDataBytes.DeleteLabelValues(user)
})
// If cleaner stops or fail, we will simply not clean the metrics for inactive users.
Expand Down Expand Up @@ -230,13 +230,15 @@ func (f *Handler) reportQueryStats(r *http.Request, queryString url.Values, quer
userID := tenant.JoinTenantIDs(tenantIDs)
wallTime := stats.LoadWallTime()
numSeries := stats.LoadFetchedSeries()
numBytes := stats.LoadFetchedChunkBytes()
numChunks := stats.LoadFetchedChunks()
numSamples := stats.LoadFetchedSamples()
numChunkBytes := stats.LoadFetchedChunkBytes()
numDataBytes := stats.LoadFetchedDataBytes()

// Track stats.
f.querySeconds.WithLabelValues(userID).Add(wallTime.Seconds())
f.querySeries.WithLabelValues(userID).Add(float64(numSeries))
f.queryBytes.WithLabelValues(userID).Add(float64(numBytes))
f.queryChunkBytes.WithLabelValues(userID).Add(float64(numChunkBytes))
f.queryDataBytes.WithLabelValues(userID).Add(float64(numDataBytes))
f.activeUsers.UpdateUserTimestamp(userID, time.Now())

Expand All @@ -249,7 +251,9 @@ func (f *Handler) reportQueryStats(r *http.Request, queryString url.Values, quer
"response_time", queryResponseTime,
"query_wall_time_seconds", wallTime.Seconds(),
"fetched_series_count", numSeries,
"fetched_chunks_bytes", numBytes,
"fetched_chunks_count", numChunks,
"fetched_samples_count", numSamples,
"fetched_chunks_bytes", numChunkBytes,
"fetched_data_bytes", numDataBytes,
"status_code", statusCode,
}, stats.LoadExtraFields()...)
Expand Down
20 changes: 20 additions & 0 deletions pkg/ingester/client/custom.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
package client

import (
"encoding/binary"

"github.com/cortexproject/cortex/pkg/chunk/encoding"
)

// ChunksCount returns the number of chunks in response.
func (m *QueryStreamResponse) ChunksCount() int {
if len(m.Chunkseries) == 0 {
Expand Down Expand Up @@ -27,3 +33,17 @@ func (m *QueryStreamResponse) ChunksSize() int {
}
return size
}

func (m *QueryStreamResponse) SamplesCount() (count int) {
for _, ts := range m.Timeseries {
count += len(ts.Samples)
}
for _, cs := range m.Chunkseries {
for _, c := range cs.Chunks {
if c.Encoding == int32(encoding.PrometheusXorChunk) {
count += int(binary.BigEndian.Uint16(c.Data))
}
}
}
return
}
18 changes: 18 additions & 0 deletions pkg/querier/blocks_store_queryable.go
Original file line number Diff line number Diff line change
Expand Up @@ -692,16 +692,21 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores(
}

numSeries := len(mySeries)
numSamples, chunksCount := countSamplesAndChunks(mySeries...)
chunkBytes := countChunkBytes(mySeries...)
dataBytes := countDataBytes(mySeries...)

reqStats.AddFetchedSeries(uint64(numSeries))
reqStats.AddFetchedChunks(chunksCount)
reqStats.AddFetchedSamples(numSamples)
reqStats.AddFetchedChunkBytes(uint64(chunkBytes))
reqStats.AddFetchedDataBytes(uint64(dataBytes))

level.Debug(spanLog).Log("msg", "received series from store-gateway",
"instance", c.RemoteAddress(),
"fetched series", numSeries,
"fetched chunks", chunksCount,
"fetched samples", numSamples,
"fetched chunk bytes", chunkBytes,
"fetched data bytes", dataBytes,
"requested blocks", strings.Join(convertULIDsToString(blockIDs), " "),
Expand Down Expand Up @@ -1018,6 +1023,19 @@ func countDataBytes(series ...*storepb.Series) (count int) {
return count
}

// countSamplesAndChunks counts the number of samples and number counts from the series.
func countSamplesAndChunks(series ...*storepb.Series) (samplesCount, chunksCount uint64) {
for _, s := range series {
chunksCount += uint64(len(s.Chunks))
for _, c := range s.Chunks {
if c.Raw != nil {
samplesCount += uint64(c.Raw.XORNumSamples())
}
}
}
return
}

// only retry connection issues
func isRetryableError(err error) bool {
switch status.Code(err) {
Expand Down
61 changes: 61 additions & 0 deletions pkg/querier/blocks_store_queryable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1687,3 +1687,64 @@ func valuesFromSeries(name string, series ...labels.Labels) []string {
sort.Strings(values)
return values
}

func TestCountSamplesAndChunks(t *testing.T) {
c := chunkenc.NewXORChunk()
appender, err := c.Appender()
require.NoError(t, err)
samples := 300
for i := 0; i < samples; i++ {
appender.Append(int64(i), float64(i))
}

for i, tc := range []struct {
serieses []*storepb.Series
expectedChunks uint64
expectedSamples uint64
}{
{
serieses: []*storepb.Series{
{
Chunks: []storepb.AggrChunk{
{
Raw: &storepb.Chunk{
Type: storepb.Chunk_XOR,
Data: c.Bytes(),
},
},
},
},
},
expectedSamples: uint64(samples),
expectedChunks: 1,
},
{
serieses: []*storepb.Series{
{
Chunks: []storepb.AggrChunk{
{
Raw: &storepb.Chunk{
Type: storepb.Chunk_XOR,
Data: c.Bytes(),
},
},
{
Raw: &storepb.Chunk{
Type: storepb.Chunk_XOR,
Data: c.Bytes(),
},
},
},
},
},
expectedSamples: uint64(int64(samples) * 2),
expectedChunks: 2,
},
} {
t.Run(fmt.Sprintf("test_case_%d", i), func(t *testing.T) {
samples, chunks := countSamplesAndChunks(tc.serieses...)
require.Equal(t, tc.expectedSamples, samples)
require.Equal(t, tc.expectedChunks, chunks)
})
}
}
36 changes: 35 additions & 1 deletion pkg/querier/stats/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,39 @@ func (s *QueryStats) LoadFetchedDataBytes() uint64 {
return atomic.LoadUint64(&s.FetchedDataBytes)
}

// Merge the provide Stats into this one.
func (s *QueryStats) AddFetchedSamples(count uint64) {
if s == nil {
return
}

atomic.AddUint64(&s.FetchedSamplesCount, count)
}

func (s *QueryStats) LoadFetchedSamples() uint64 {
if s == nil {
return 0
}

return atomic.LoadUint64(&s.FetchedSamplesCount)
}

func (s *QueryStats) AddFetchedChunks(count uint64) {
if s == nil {
return
}

atomic.AddUint64(&s.FetchedChunksCount, count)
}

func (s *QueryStats) LoadFetchedChunks() uint64 {
if s == nil {
return 0
}

return atomic.LoadUint64(&s.FetchedChunksCount)
}

// Merge the provided Stats into this one.
func (s *QueryStats) Merge(other *QueryStats) {
if s == nil || other == nil {
return
Expand All @@ -158,6 +190,8 @@ func (s *QueryStats) Merge(other *QueryStats) {
s.AddFetchedSeries(other.LoadFetchedSeries())
s.AddFetchedChunkBytes(other.LoadFetchedChunkBytes())
s.AddFetchedDataBytes(other.LoadFetchedDataBytes())
s.AddFetchedSamples(other.LoadFetchedSamples())
s.AddFetchedChunks(other.LoadFetchedChunks())
s.AddExtraFields(other.LoadExtraFields()...)
}

Expand Down
Loading