Skip to content

Commit 8a417e3

Browse files
authored
Reuse prom serieset from Thanos (#5844)
* reuse prom serieset from Thanos Signed-off-by: Ben Ye <[email protected]> * fix lint Signed-off-by: Ben Ye <[email protected]> --------- Signed-off-by: Ben Ye <[email protected]>
1 parent 1072572 commit 8a417e3

File tree

15 files changed

+5558
-556
lines changed

15 files changed

+5558
-556
lines changed

pkg/querier/block.go

Lines changed: 13 additions & 176 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,8 @@
11
package querier
22

33
import (
4-
"math"
5-
"sort"
6-
7-
"github.com/pkg/errors"
84
"github.com/prometheus/prometheus/model/labels"
9-
"github.com/prometheus/prometheus/storage"
10-
"github.com/prometheus/prometheus/tsdb/chunkenc"
11-
"github.com/prometheus/prometheus/util/annotations"
12-
"github.com/thanos-io/thanos/pkg/store/labelpb"
135
"github.com/thanos-io/thanos/pkg/store/storepb"
14-
15-
"github.com/cortexproject/cortex/pkg/querier/iterators"
16-
"github.com/cortexproject/cortex/pkg/querier/series"
176
)
187

198
func convertMatchersToLabelMatcher(matchers []*labels.Matcher) []storepb.LabelMatcher {
@@ -40,180 +29,28 @@ func convertMatchersToLabelMatcher(matchers []*labels.Matcher) []storepb.LabelMa
4029
return converted
4130
}
4231

43-
// Implementation of storage.SeriesSet, based on individual responses from store client.
44-
type blockQuerierSeriesSet struct {
45-
series []*storepb.Series
46-
warnings annotations.Annotations
47-
48-
// next response to process
49-
next int
50-
51-
currSeries storage.Series
32+
// storeSeriesSet implements a storepb SeriesSet against a list of storepb.Series.
33+
type storeSeriesSet struct {
34+
series []*storepb.Series
35+
i int
5236
}
5337

54-
func (bqss *blockQuerierSeriesSet) Next() bool {
55-
bqss.currSeries = nil
38+
func newStoreSeriesSet(s []*storepb.Series) *storeSeriesSet {
39+
return &storeSeriesSet{series: s, i: -1}
40+
}
5641

57-
if bqss.next >= len(bqss.series) {
42+
func (s *storeSeriesSet) Next() bool {
43+
if s.i >= len(s.series)-1 {
5844
return false
5945
}
60-
61-
currLabels := labelpb.ZLabelsToPromLabels(bqss.series[bqss.next].Labels)
62-
currChunks := bqss.series[bqss.next].Chunks
63-
64-
bqss.next++
65-
66-
// Merge chunks for current series. Chunks may come in multiple responses, but as soon
67-
// as the response has chunks for a new series, we can stop searching. Series are sorted.
68-
// See documentation for StoreClient.Series call for details.
69-
for bqss.next < len(bqss.series) && labels.Compare(currLabels, labelpb.ZLabelsToPromLabels(bqss.series[bqss.next].Labels)) == 0 {
70-
currChunks = append(currChunks, bqss.series[bqss.next].Chunks...)
71-
bqss.next++
72-
}
73-
74-
bqss.currSeries = newBlockQuerierSeries(currLabels, currChunks)
46+
s.i++
7547
return true
7648
}
7749

78-
func (bqss *blockQuerierSeriesSet) At() storage.Series {
79-
return bqss.currSeries
80-
}
81-
82-
func (bqss *blockQuerierSeriesSet) Err() error {
50+
func (*storeSeriesSet) Err() error {
8351
return nil
8452
}
8553

86-
func (bqss *blockQuerierSeriesSet) Warnings() annotations.Annotations {
87-
return bqss.warnings
88-
}
89-
90-
// newBlockQuerierSeries makes a new blockQuerierSeries. Input labels must be already sorted by name.
91-
func newBlockQuerierSeries(lbls []labels.Label, chunks []storepb.AggrChunk) *blockQuerierSeries {
92-
sort.Slice(chunks, func(i, j int) bool {
93-
return chunks[i].MinTime < chunks[j].MinTime
94-
})
95-
96-
return &blockQuerierSeries{labels: lbls, chunks: chunks}
97-
}
98-
99-
type blockQuerierSeries struct {
100-
labels labels.Labels
101-
chunks []storepb.AggrChunk
102-
}
103-
104-
func (bqs *blockQuerierSeries) Labels() labels.Labels {
105-
return bqs.labels
106-
}
107-
108-
func (bqs *blockQuerierSeries) Iterator(chunkenc.Iterator) chunkenc.Iterator {
109-
if len(bqs.chunks) == 0 {
110-
// should not happen in practice, but we have a unit test for it
111-
return series.NewErrIterator(errors.New("no chunks"))
112-
}
113-
114-
its := make([]chunkenc.Iterator, 0, len(bqs.chunks))
115-
116-
for _, c := range bqs.chunks {
117-
// Ignore if the current chunk is not XOR chunk.
118-
if c.Raw == nil {
119-
continue
120-
}
121-
ch, err := chunkenc.FromData(chunkenc.EncXOR, c.Raw.Data)
122-
if err != nil {
123-
return series.NewErrIterator(errors.Wrapf(err, "failed to initialize chunk from XOR encoded raw data (series: %v min time: %d max time: %d)", bqs.Labels(), c.MinTime, c.MaxTime))
124-
}
125-
126-
it := ch.Iterator(nil)
127-
its = append(its, it)
128-
}
129-
130-
return iterators.NewCompatibleChunksIterator(newBlockQuerierSeriesIterator(bqs.Labels(), its))
131-
}
132-
133-
func newBlockQuerierSeriesIterator(labels labels.Labels, its []chunkenc.Iterator) *blockQuerierSeriesIterator {
134-
return &blockQuerierSeriesIterator{labels: labels, iterators: its, lastT: math.MinInt64}
135-
}
136-
137-
// blockQuerierSeriesIterator implements a series iterator on top
138-
// of a list of time-sorted, non-overlapping chunks.
139-
type blockQuerierSeriesIterator struct {
140-
// only used for error reporting
141-
labels labels.Labels
142-
143-
iterators []chunkenc.Iterator
144-
i int
145-
lastT int64
146-
}
147-
148-
func (it *blockQuerierSeriesIterator) Seek(t int64) bool {
149-
// We generally expect the chunks already to be cut down
150-
// to the range we are interested in. There's not much to be gained from
151-
// hopping across chunks so we just call next until we reach t.
152-
for {
153-
ct, _ := it.At()
154-
if ct >= t {
155-
return true
156-
}
157-
if !it.Next() {
158-
return false
159-
}
160-
}
161-
}
162-
163-
func (it *blockQuerierSeriesIterator) At() (int64, float64) {
164-
if it.i >= len(it.iterators) {
165-
return 0, 0
166-
}
167-
168-
t, v := it.iterators[it.i].At()
169-
it.lastT = t
170-
return t, v
171-
}
172-
173-
func (it *blockQuerierSeriesIterator) Next() bool {
174-
if it.i >= len(it.iterators) {
175-
return false
176-
}
177-
178-
if it.iterators[it.i].Next() != chunkenc.ValNone {
179-
return true
180-
}
181-
if it.iterators[it.i].Err() != nil {
182-
return false
183-
}
184-
185-
for {
186-
it.i++
187-
188-
if it.i >= len(it.iterators) {
189-
return false
190-
}
191-
192-
// we must advance iterator first, to see if it has any samples.
193-
// Seek will call At() as its first operation.
194-
if it.iterators[it.i].Next() == chunkenc.ValNone {
195-
if it.iterators[it.i].Err() != nil {
196-
return false
197-
}
198-
199-
// Found empty iterator without error, skip it.
200-
continue
201-
}
202-
203-
// Chunks are guaranteed to be ordered but not generally guaranteed to not overlap.
204-
// We must ensure to skip any overlapping range between adjacent chunks.
205-
return it.Seek(it.lastT + 1)
206-
}
207-
}
208-
209-
func (it *blockQuerierSeriesIterator) Err() error {
210-
if it.i >= len(it.iterators) {
211-
return nil
212-
}
213-
214-
err := it.iterators[it.i].Err()
215-
if err != nil {
216-
return errors.Wrapf(err, "cannot iterate chunk for series: %v", it.labels)
217-
}
218-
return nil
54+
func (s *storeSeriesSet) At() (labels.Labels, []storepb.AggrChunk) {
55+
return s.series[s.i].PromLabels(), s.series[s.i].Chunks
21956
}

0 commit comments

Comments
 (0)