Skip to content

Commit 50c9ccf

Browse files
authored
Batch Iterator optimization (#5237)
* Batch Opmization Signed-off-by: Alan Protasio <[email protected]> * Add test bacj Signed-off-by: Alan Protasio <[email protected]> * Testing Multiples scrape intervals Signed-off-by: Alan Protasio <[email protected]> * no assimption Signed-off-by: Alan Protasio <[email protected]> * Using max chunk ts Signed-off-by: Alan Protasio <[email protected]> * test with scrape 10 Signed-off-by: Alan Protasio <[email protected]> * rename method Signed-off-by: Alan Protasio <[email protected]> * comments Signed-off-by: Alan Protasio <[email protected]> * using next Signed-off-by: Alan Protasio <[email protected]> * change test name Signed-off-by: Alan Protasio <[email protected]> * changelog/comments Signed-off-by: Alan Protasio <[email protected]> --------- Signed-off-by: Alan Protasio <[email protected]>
1 parent f694529 commit 50c9ccf

File tree

7 files changed

+88
-7
lines changed

7 files changed

+88
-7
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22

33
## master / unreleased
44

5+
* [ENHANCEMENT] Querier: Batch Iterator optimization to prevent transversing it multiple times query ranges steps does not overlap. #5237
6+
57
## 1.15.0 in progress
68

79
* [CHANGE] Storage: Make Max exemplars config per tenant instead of global configuration. #5016

pkg/querier/batch/batch.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,9 @@ type iterator interface {
4343
// Seek or Next have returned true.
4444
AtTime() int64
4545

46+
// MaxCurrentChunkTime returns the max time on the current chunk.
47+
MaxCurrentChunkTime() int64
48+
4649
// Batch returns the current batch. Must only be called after Seek or Next
4750
// have returned true.
4851
Batch() promchunk.Batch
@@ -98,6 +101,17 @@ func (a *iteratorAdapter) Seek(t int64) bool {
98101
a.curr.Index++
99102
}
100103
return true
104+
} else if t <= a.underlying.MaxCurrentChunkTime() {
105+
// In this case, some timestamp inside the current underlying chunk can fulfill the seek.
106+
// In this case we will call next until we find the sample as it will be faster than calling
107+
// `a.underlying.Seek` directly as this would cause the iterator to start from the beginning of the chunk.
108+
// See: https://github.com/cortexproject/cortex/blob/f69452975877c67ac307709e5f60b8d20477764c/pkg/querier/batch/chunk.go#L26-L45
109+
// https://github.com/cortexproject/cortex/blob/f69452975877c67ac307709e5f60b8d20477764c/pkg/chunk/encoding/prometheus_chunk.go#L90-L95
110+
for a.Next() {
111+
if t <= a.curr.Timestamps[a.curr.Index] {
112+
return true
113+
}
114+
}
101115
}
102116
}
103117

pkg/querier/batch/batch_test.go

Lines changed: 54 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ func BenchmarkNewChunkMergeIterator_CreateAndIterate(b *testing.B) {
3535
scenario.duplicationFactor,
3636
scenario.enc.String())
3737

38-
chunks := createChunks(b, scenario.numChunks, scenario.numSamplesPerChunk, scenario.duplicationFactor, scenario.enc)
38+
chunks := createChunks(b, step, scenario.numChunks, scenario.numSamplesPerChunk, scenario.duplicationFactor, scenario.enc)
3939

4040
b.Run(name, func(b *testing.B) {
4141
b.ReportAllocs()
@@ -55,10 +55,59 @@ func BenchmarkNewChunkMergeIterator_CreateAndIterate(b *testing.B) {
5555
}
5656
}
5757

58+
func BenchmarkNewChunkMergeIterator_Seek(b *testing.B) {
59+
scenarios := []struct {
60+
numChunks int
61+
numSamplesPerChunk int
62+
duplicationFactor int
63+
seekStep time.Duration
64+
scrapeInterval time.Duration
65+
enc promchunk.Encoding
66+
}{
67+
{numChunks: 1000, numSamplesPerChunk: 120, duplicationFactor: 3, scrapeInterval: 30 * time.Second, seekStep: 30 * time.Second / 2, enc: promchunk.PrometheusXorChunk},
68+
{numChunks: 1000, numSamplesPerChunk: 120, duplicationFactor: 3, scrapeInterval: 30 * time.Second, seekStep: 30 * time.Second, enc: promchunk.PrometheusXorChunk},
69+
{numChunks: 1000, numSamplesPerChunk: 120, duplicationFactor: 3, scrapeInterval: 30 * time.Second, seekStep: 30 * time.Second * 2, enc: promchunk.PrometheusXorChunk},
70+
{numChunks: 1000, numSamplesPerChunk: 120, duplicationFactor: 3, scrapeInterval: 30 * time.Second, seekStep: 30 * time.Second * 10, enc: promchunk.PrometheusXorChunk},
71+
{numChunks: 1000, numSamplesPerChunk: 120, duplicationFactor: 3, scrapeInterval: 30 * time.Second, seekStep: 30 * time.Second * 30, enc: promchunk.PrometheusXorChunk},
72+
{numChunks: 1000, numSamplesPerChunk: 120, duplicationFactor: 3, scrapeInterval: 30 * time.Second, seekStep: 30 * time.Second * 50, enc: promchunk.PrometheusXorChunk},
73+
{numChunks: 1000, numSamplesPerChunk: 120, duplicationFactor: 3, scrapeInterval: 30 * time.Second, seekStep: 30 * time.Second * 100, enc: promchunk.PrometheusXorChunk},
74+
{numChunks: 1000, numSamplesPerChunk: 120, duplicationFactor: 3, scrapeInterval: 30 * time.Second, seekStep: 30 * time.Second * 200, enc: promchunk.PrometheusXorChunk},
75+
76+
{numChunks: 1000, numSamplesPerChunk: 120, duplicationFactor: 3, scrapeInterval: 10 * time.Second, seekStep: 10 * time.Second / 2, enc: promchunk.PrometheusXorChunk},
77+
{numChunks: 1000, numSamplesPerChunk: 120, duplicationFactor: 3, scrapeInterval: 10 * time.Second, seekStep: 10 * time.Second, enc: promchunk.PrometheusXorChunk},
78+
{numChunks: 1000, numSamplesPerChunk: 120, duplicationFactor: 3, scrapeInterval: 10 * time.Second, seekStep: 10 * time.Second * 2, enc: promchunk.PrometheusXorChunk},
79+
{numChunks: 1000, numSamplesPerChunk: 120, duplicationFactor: 3, scrapeInterval: 10 * time.Second, seekStep: 10 * time.Second * 10, enc: promchunk.PrometheusXorChunk},
80+
{numChunks: 1000, numSamplesPerChunk: 120, duplicationFactor: 3, scrapeInterval: 10 * time.Second, seekStep: 10 * time.Second * 30, enc: promchunk.PrometheusXorChunk},
81+
{numChunks: 1000, numSamplesPerChunk: 120, duplicationFactor: 3, scrapeInterval: 10 * time.Second, seekStep: 10 * time.Second * 50, enc: promchunk.PrometheusXorChunk},
82+
{numChunks: 1000, numSamplesPerChunk: 120, duplicationFactor: 3, scrapeInterval: 10 * time.Second, seekStep: 10 * time.Second * 100, enc: promchunk.PrometheusXorChunk},
83+
{numChunks: 1000, numSamplesPerChunk: 120, duplicationFactor: 3, scrapeInterval: 10 * time.Second, seekStep: 10 * time.Second * 200, enc: promchunk.PrometheusXorChunk},
84+
}
85+
86+
for _, scenario := range scenarios {
87+
name := fmt.Sprintf("scrapeInterval %vs seekStep: %vs",
88+
scenario.scrapeInterval.Seconds(),
89+
scenario.seekStep.Seconds())
90+
91+
chunks := createChunks(b, scenario.scrapeInterval, scenario.numChunks, scenario.numSamplesPerChunk, scenario.duplicationFactor, scenario.enc)
92+
93+
b.Run(name, func(b *testing.B) {
94+
b.ReportAllocs()
95+
96+
for n := 0; n < b.N; n++ {
97+
it := NewChunkMergeIterator(chunks, 0, 0)
98+
i := int64(0)
99+
for it.Seek(i*scenario.seekStep.Milliseconds()) != chunkenc.ValNone {
100+
i++
101+
}
102+
}
103+
})
104+
}
105+
}
106+
58107
func TestSeekCorrectlyDealWithSinglePointChunks(t *testing.T) {
59108
t.Parallel()
60-
chunkOne := mkChunk(t, model.Time(1*step/time.Millisecond), 1, promchunk.PrometheusXorChunk)
61-
chunkTwo := mkChunk(t, model.Time(10*step/time.Millisecond), 1, promchunk.PrometheusXorChunk)
109+
chunkOne := mkChunk(t, step, model.Time(1*step/time.Millisecond), 1, promchunk.PrometheusXorChunk)
110+
chunkTwo := mkChunk(t, step, model.Time(10*step/time.Millisecond), 1, promchunk.PrometheusXorChunk)
62111
chunks := []chunk.Chunk{chunkOne, chunkTwo}
63112

64113
sut := NewChunkMergeIterator(chunks, 0, 0)
@@ -72,13 +121,13 @@ func TestSeekCorrectlyDealWithSinglePointChunks(t *testing.T) {
72121
require.Equal(t, int64(1*time.Second/time.Millisecond), actual)
73122
}
74123

75-
func createChunks(b *testing.B, numChunks, numSamplesPerChunk, duplicationFactor int, enc promchunk.Encoding) []chunk.Chunk {
124+
func createChunks(b *testing.B, step time.Duration, numChunks, numSamplesPerChunk, duplicationFactor int, enc promchunk.Encoding) []chunk.Chunk {
76125
result := make([]chunk.Chunk, 0, numChunks)
77126

78127
for d := 0; d < duplicationFactor; d++ {
79128
for c := 0; c < numChunks; c++ {
80129
minTime := step * time.Duration(c*numSamplesPerChunk)
81-
result = append(result, mkChunk(b, model.Time(minTime.Milliseconds()), numSamplesPerChunk, enc))
130+
result = append(result, mkChunk(b, step, model.Time(minTime.Milliseconds()), numSamplesPerChunk, enc))
82131
}
83132
}
84133

pkg/querier/batch/chunk.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,10 @@ func (i *chunkIterator) reset(chunk GenericChunk) {
2121
i.batch.Index = 0
2222
}
2323

24+
func (i *chunkIterator) MaxCurrentChunkTime() int64 {
25+
return i.chunk.MaxTime
26+
}
27+
2428
// Seek advances the iterator forward to the value at or after
2529
// the given timestamp.
2630
func (i *chunkIterator) Seek(t int64, size int) bool {

pkg/querier/batch/chunk_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ func forEncodings(t *testing.T, f func(t *testing.T, enc promchunk.Encoding)) {
4444
}
4545
}
4646

47-
func mkChunk(t require.TestingT, from model.Time, points int, enc promchunk.Encoding) chunk.Chunk {
47+
func mkChunk(t require.TestingT, step time.Duration, from model.Time, points int, enc promchunk.Encoding) chunk.Chunk {
4848
metric := labels.Labels{
4949
{Name: model.MetricNameLabel, Value: "foo"},
5050
}
@@ -65,7 +65,7 @@ func mkChunk(t require.TestingT, from model.Time, points int, enc promchunk.Enco
6565
}
6666

6767
func mkGenericChunk(t require.TestingT, from model.Time, points int, enc promchunk.Encoding) GenericChunk {
68-
ck := mkChunk(t, from, points, enc)
68+
ck := mkChunk(t, step, from, points, enc)
6969
return NewGenericChunk(int64(ck.From), int64(ck.Through), ck.Data.NewIterator)
7070
}
7171

pkg/querier/batch/merge.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,14 @@ func (c *mergeIterator) AtTime() int64 {
128128
return c.batches[0].Timestamps[0]
129129
}
130130

131+
func (c *mergeIterator) MaxCurrentChunkTime() int64 {
132+
if len(c.h) < 1 {
133+
return -1
134+
}
135+
136+
return c.h[0].MaxCurrentChunkTime()
137+
}
138+
131139
func (c *mergeIterator) Batch() promchunk.Batch {
132140
return c.batches[0]
133141
}

pkg/querier/batch/non_overlapping.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,10 @@ func (it *nonOverlappingIterator) Seek(t int64, size int) bool {
3232
}
3333
}
3434

35+
func (it *nonOverlappingIterator) MaxCurrentChunkTime() int64 {
36+
return it.iter.MaxCurrentChunkTime()
37+
}
38+
3539
func (it *nonOverlappingIterator) Next(size int) bool {
3640
for {
3741
if it.iter.Next(size) {

0 commit comments

Comments
 (0)