Skip to content

Commit 50884d9

Browse files
committed
Batch Opmization
Signed-off-by: Alan Protasio <[email protected]>
1 parent f694529 commit 50884d9

File tree

3 files changed

+102
-22
lines changed

3 files changed

+102
-22
lines changed

pkg/querier/batch/batch.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package batch
22

33
import (
4+
"time"
5+
46
"github.com/cortexproject/cortex/pkg/chunk"
57
"github.com/cortexproject/cortex/pkg/chunk/encoding"
68
promchunk "github.com/cortexproject/cortex/pkg/chunk/encoding"
@@ -98,6 +100,24 @@ func (a *iteratorAdapter) Seek(t int64) bool {
98100
a.curr.Index++
99101
}
100102
return true
103+
} else {
104+
// In this case, t is after the end of the current batch. Here we try to calculate if we are seeking to samples
105+
// in the same chunks, and if so, we foward the iterator to the right point in time - we do that this
106+
// is more efficient than the seek call
107+
approxNumberOfSamples := model.Time(t).Sub(model.Time(a.curr.Timestamps[a.curr.Length-1])) / (30 * time.Second)
108+
if approxNumberOfSamples < 60 {
109+
for a.underlying.Next(promchunk.BatchSize) {
110+
a.curr = a.underlying.Batch()
111+
if t <= a.curr.Timestamps[a.curr.Length-1] {
112+
//In this case, some timestamp between current sample and end of batch can fulfill
113+
//the seek. Let's find it.
114+
for a.curr.Index < a.curr.Length && t > a.curr.Timestamps[a.curr.Index] {
115+
a.curr.Index++
116+
}
117+
return true
118+
}
119+
}
120+
}
101121
}
102122
}
103123

pkg/querier/batch/batch_test.go

Lines changed: 80 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -13,42 +13,102 @@ import (
1313
promchunk "github.com/cortexproject/cortex/pkg/chunk/encoding"
1414
)
1515

16-
func BenchmarkNewChunkMergeIterator_CreateAndIterate(b *testing.B) {
16+
//func BenchmarkNewChunkMergeIterator_CreateAndIterate(b *testing.B) {
17+
// scenarios := []struct {
18+
// numChunks int
19+
// numSamplesPerChunk int
20+
// duplicationFactor int
21+
// enc promchunk.Encoding
22+
// }{
23+
// {numChunks: 1000, numSamplesPerChunk: 100, duplicationFactor: 1, enc: promchunk.PrometheusXorChunk},
24+
// {numChunks: 1000, numSamplesPerChunk: 100, duplicationFactor: 3, enc: promchunk.PrometheusXorChunk},
25+
// {numChunks: 100, numSamplesPerChunk: 100, duplicationFactor: 1, enc: promchunk.PrometheusXorChunk},
26+
// {numChunks: 100, numSamplesPerChunk: 100, duplicationFactor: 3, enc: promchunk.PrometheusXorChunk},
27+
// {numChunks: 1, numSamplesPerChunk: 100, duplicationFactor: 1, enc: promchunk.PrometheusXorChunk},
28+
// {numChunks: 1, numSamplesPerChunk: 100, duplicationFactor: 3, enc: promchunk.PrometheusXorChunk},
29+
// }
30+
//
31+
// for _, scenario := range scenarios {
32+
// name := fmt.Sprintf("chunks: %d samples per chunk: %d duplication factor: %d encoding: %s",
33+
// scenario.numChunks,
34+
// scenario.numSamplesPerChunk,
35+
// scenario.duplicationFactor,
36+
// scenario.enc.String())
37+
//
38+
// chunks := createChunks(b, scenario.numChunks, scenario.numSamplesPerChunk, scenario.duplicationFactor, scenario.enc)
39+
//
40+
// b.Run(name, func(b *testing.B) {
41+
// b.ReportAllocs()
42+
//
43+
// for n := 0; n < b.N; n++ {
44+
// it := NewChunkMergeIterator(chunks, 0, 0)
45+
// for it.Next() != chunkenc.ValNone {
46+
// it.At()
47+
// }
48+
//
49+
// // Ensure no error occurred.
50+
// if it.Err() != nil {
51+
// b.Fatal(it.Err().Error())
52+
// }
53+
// }
54+
// })
55+
// }
56+
//}
57+
58+
func BenchmarkNewChunkMergeIterator_Seek(b *testing.B) {
59+
scrapeInterval := 30 * time.Second
60+
1761
scenarios := []struct {
1862
numChunks int
1963
numSamplesPerChunk int
2064
duplicationFactor int
65+
seekStep time.Duration
2166
enc promchunk.Encoding
2267
}{
23-
{numChunks: 1000, numSamplesPerChunk: 100, duplicationFactor: 1, enc: promchunk.PrometheusXorChunk},
24-
{numChunks: 1000, numSamplesPerChunk: 100, duplicationFactor: 3, enc: promchunk.PrometheusXorChunk},
25-
{numChunks: 100, numSamplesPerChunk: 100, duplicationFactor: 1, enc: promchunk.PrometheusXorChunk},
26-
{numChunks: 100, numSamplesPerChunk: 100, duplicationFactor: 3, enc: promchunk.PrometheusXorChunk},
27-
{numChunks: 1, numSamplesPerChunk: 100, duplicationFactor: 1, enc: promchunk.PrometheusXorChunk},
28-
{numChunks: 1, numSamplesPerChunk: 100, duplicationFactor: 3, enc: promchunk.PrometheusXorChunk},
68+
{numChunks: 1000, numSamplesPerChunk: 120, duplicationFactor: 1, seekStep: scrapeInterval / 2, enc: promchunk.PrometheusXorChunk},
69+
{numChunks: 1000, numSamplesPerChunk: 120, duplicationFactor: 1, seekStep: scrapeInterval, enc: promchunk.PrometheusXorChunk},
70+
{numChunks: 1000, numSamplesPerChunk: 120, duplicationFactor: 1, seekStep: scrapeInterval * 2, enc: promchunk.PrometheusXorChunk},
71+
{numChunks: 1000, numSamplesPerChunk: 120, duplicationFactor: 1, seekStep: scrapeInterval * 10, enc: promchunk.PrometheusXorChunk},
72+
{numChunks: 1000, numSamplesPerChunk: 120, duplicationFactor: 1, seekStep: scrapeInterval * 100, enc: promchunk.PrometheusXorChunk},
73+
{numChunks: 1000, numSamplesPerChunk: 120, duplicationFactor: 1, seekStep: scrapeInterval * 1000, enc: promchunk.PrometheusXorChunk},
74+
{numChunks: 1000, numSamplesPerChunk: 120, duplicationFactor: 3, seekStep: scrapeInterval / 2, enc: promchunk.PrometheusXorChunk},
75+
{numChunks: 1000, numSamplesPerChunk: 120, duplicationFactor: 3, seekStep: scrapeInterval, enc: promchunk.PrometheusXorChunk},
76+
{numChunks: 1000, numSamplesPerChunk: 120, duplicationFactor: 3, seekStep: scrapeInterval * 2, enc: promchunk.PrometheusXorChunk},
77+
{numChunks: 1000, numSamplesPerChunk: 120, duplicationFactor: 3, seekStep: scrapeInterval * 10, enc: promchunk.PrometheusXorChunk},
78+
{numChunks: 1000, numSamplesPerChunk: 120, duplicationFactor: 3, seekStep: scrapeInterval * 100, enc: promchunk.PrometheusXorChunk},
79+
{numChunks: 1000, numSamplesPerChunk: 120, duplicationFactor: 3, seekStep: scrapeInterval * 1000, enc: promchunk.PrometheusXorChunk},
80+
{numChunks: 100, numSamplesPerChunk: 120, duplicationFactor: 1, seekStep: scrapeInterval / 2, enc: promchunk.PrometheusXorChunk},
81+
{numChunks: 100, numSamplesPerChunk: 120, duplicationFactor: 1, seekStep: scrapeInterval, enc: promchunk.PrometheusXorChunk},
82+
{numChunks: 100, numSamplesPerChunk: 120, duplicationFactor: 1, seekStep: scrapeInterval * 2, enc: promchunk.PrometheusXorChunk},
83+
{numChunks: 100, numSamplesPerChunk: 120, duplicationFactor: 1, seekStep: scrapeInterval * 10, enc: promchunk.PrometheusXorChunk},
84+
{numChunks: 100, numSamplesPerChunk: 120, duplicationFactor: 1, seekStep: scrapeInterval * 100, enc: promchunk.PrometheusXorChunk},
85+
{numChunks: 100, numSamplesPerChunk: 120, duplicationFactor: 1, seekStep: scrapeInterval * 1000, enc: promchunk.PrometheusXorChunk},
86+
{numChunks: 100, numSamplesPerChunk: 120, duplicationFactor: 3, seekStep: scrapeInterval / 2, enc: promchunk.PrometheusXorChunk},
87+
{numChunks: 100, numSamplesPerChunk: 120, duplicationFactor: 3, seekStep: scrapeInterval, enc: promchunk.PrometheusXorChunk},
88+
{numChunks: 100, numSamplesPerChunk: 120, duplicationFactor: 3, seekStep: scrapeInterval * 2, enc: promchunk.PrometheusXorChunk},
89+
{numChunks: 100, numSamplesPerChunk: 120, duplicationFactor: 3, seekStep: scrapeInterval * 10, enc: promchunk.PrometheusXorChunk},
90+
{numChunks: 100, numSamplesPerChunk: 120, duplicationFactor: 3, seekStep: scrapeInterval * 100, enc: promchunk.PrometheusXorChunk},
91+
{numChunks: 100, numSamplesPerChunk: 120, duplicationFactor: 3, seekStep: scrapeInterval * 1000, enc: promchunk.PrometheusXorChunk},
2992
}
3093

3194
for _, scenario := range scenarios {
32-
name := fmt.Sprintf("chunks: %d samples per chunk: %d duplication factor: %d encoding: %s",
95+
name := fmt.Sprintf("chunks: %d samples per chunk: %d duplication factor: %d seekStep %vs encoding: %s",
3396
scenario.numChunks,
3497
scenario.numSamplesPerChunk,
3598
scenario.duplicationFactor,
99+
scenario.seekStep.Seconds(),
36100
scenario.enc.String())
37101

38-
chunks := createChunks(b, scenario.numChunks, scenario.numSamplesPerChunk, scenario.duplicationFactor, scenario.enc)
102+
chunks := createChunks(b, scrapeInterval, scenario.numChunks, scenario.numSamplesPerChunk, scenario.duplicationFactor, scenario.enc)
39103

40104
b.Run(name, func(b *testing.B) {
41105
b.ReportAllocs()
42106

43107
for n := 0; n < b.N; n++ {
44108
it := NewChunkMergeIterator(chunks, 0, 0)
45-
for it.Next() != chunkenc.ValNone {
46-
it.At()
47-
}
48-
49-
// Ensure no error occurred.
50-
if it.Err() != nil {
51-
b.Fatal(it.Err().Error())
109+
i := int64(0)
110+
for it.Seek(i*scenario.seekStep.Milliseconds()) != chunkenc.ValNone {
111+
i++
52112
}
53113
}
54114
})
@@ -57,8 +117,8 @@ func BenchmarkNewChunkMergeIterator_CreateAndIterate(b *testing.B) {
57117

58118
func TestSeekCorrectlyDealWithSinglePointChunks(t *testing.T) {
59119
t.Parallel()
60-
chunkOne := mkChunk(t, model.Time(1*step/time.Millisecond), 1, promchunk.PrometheusXorChunk)
61-
chunkTwo := mkChunk(t, model.Time(10*step/time.Millisecond), 1, promchunk.PrometheusXorChunk)
120+
chunkOne := mkChunk(t, step, model.Time(1*step/time.Millisecond), 1, promchunk.PrometheusXorChunk)
121+
chunkTwo := mkChunk(t, step, model.Time(10*step/time.Millisecond), 1, promchunk.PrometheusXorChunk)
62122
chunks := []chunk.Chunk{chunkOne, chunkTwo}
63123

64124
sut := NewChunkMergeIterator(chunks, 0, 0)
@@ -72,13 +132,13 @@ func TestSeekCorrectlyDealWithSinglePointChunks(t *testing.T) {
72132
require.Equal(t, int64(1*time.Second/time.Millisecond), actual)
73133
}
74134

75-
func createChunks(b *testing.B, numChunks, numSamplesPerChunk, duplicationFactor int, enc promchunk.Encoding) []chunk.Chunk {
135+
func createChunks(b *testing.B, step time.Duration, numChunks, numSamplesPerChunk, duplicationFactor int, enc promchunk.Encoding) []chunk.Chunk {
76136
result := make([]chunk.Chunk, 0, numChunks)
77137

78138
for d := 0; d < duplicationFactor; d++ {
79139
for c := 0; c < numChunks; c++ {
80140
minTime := step * time.Duration(c*numSamplesPerChunk)
81-
result = append(result, mkChunk(b, model.Time(minTime.Milliseconds()), numSamplesPerChunk, enc))
141+
result = append(result, mkChunk(b, step, model.Time(minTime.Milliseconds()), numSamplesPerChunk, enc))
82142
}
83143
}
84144

pkg/querier/batch/chunk_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ func forEncodings(t *testing.T, f func(t *testing.T, enc promchunk.Encoding)) {
4444
}
4545
}
4646

47-
func mkChunk(t require.TestingT, from model.Time, points int, enc promchunk.Encoding) chunk.Chunk {
47+
func mkChunk(t require.TestingT, step time.Duration, from model.Time, points int, enc promchunk.Encoding) chunk.Chunk {
4848
metric := labels.Labels{
4949
{Name: model.MetricNameLabel, Value: "foo"},
5050
}
@@ -65,7 +65,7 @@ func mkChunk(t require.TestingT, from model.Time, points int, enc promchunk.Enco
6565
}
6666

6767
func mkGenericChunk(t require.TestingT, from model.Time, points int, enc promchunk.Encoding) GenericChunk {
68-
ck := mkChunk(t, from, points, enc)
68+
ck := mkChunk(t, step, from, points, enc)
6969
return NewGenericChunk(int64(ck.From), int64(ck.Through), ck.Data.NewIterator)
7070
}
7171

0 commit comments

Comments
 (0)