Skip to content

Commit e8e1c9c

Browse files
authored
Distributor: fix bug in merging exemplars (#4583)
We need to add the merged value back to the map. Extract merging as a separate function so it can be tested. Adapt the existing test to cover multiple series. Signed-off-by: Bryan Boreham <[email protected]>
1 parent 1b7b749 commit e8e1c9c

File tree

3 files changed

+64
-32
lines changed

3 files changed

+64
-32
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
* [ENHANCEMENT] Updated Prometheus to latest. Includes changes from prometheus#9239, adding 15 new functions. Multiple TSDB bugfixes prometheus#9438 & prometheus#9381. #4524
1313
* [ENHANCEMENT] Query Frontend: Add setting `-frontend.forward-headers-list` in frontend to configure the set of headers from the requests to be forwarded to downstream requests. #4486
1414
* [BUGFIX] AlertManager: remove stale template files. #4495
15+
* [BUGFIX] Distributor: fix bug in query-exemplar where some results would get dropped. #4582
1516

1617
## 1.11.0 2021-11-25
1718

pkg/distributor/query.go

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -251,7 +251,10 @@ func (d *Distributor) queryIngestersExemplars(ctx context.Context, replicationSe
251251
return nil, err
252252
}
253253

254-
// Merge results from replication set.
254+
return mergeExemplarQueryResponses(results), nil
255+
}
256+
257+
func mergeExemplarQueryResponses(results []interface{}) *ingester_client.ExemplarQueryResponse {
255258
var keys []string
256259
exemplarResults := make(map[string]cortexpb.TimeSeries)
257260
for _, result := range results {
@@ -262,9 +265,11 @@ func (d *Distributor) queryIngestersExemplars(ctx context.Context, replicationSe
262265
if !ok {
263266
exemplarResults[lbls] = ts
264267
keys = append(keys, lbls)
268+
} else {
269+
// Merge in any missing values from another ingesters exemplars for this series.
270+
e.Exemplars = mergeExemplarSets(e.Exemplars, ts.Exemplars)
271+
exemplarResults[lbls] = e
265272
}
266-
// Merge in any missing values from another ingesters exemplars for this series.
267-
e.Exemplars = mergeExemplarSets(e.Exemplars, ts.Exemplars)
268273
}
269274
}
270275

@@ -276,7 +281,7 @@ func (d *Distributor) queryIngestersExemplars(ctx context.Context, replicationSe
276281
result[i] = exemplarResults[k]
277282
}
278283

279-
return &ingester_client.ExemplarQueryResponse{Timeseries: result}, nil
284+
return &ingester_client.ExemplarQueryResponse{Timeseries: result}
280285
}
281286

282287
// queryIngesterStream queries the ingesters using the new streaming API.

pkg/distributor/query_test.go

Lines changed: 54 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package distributor
22

33
import (
4+
"fmt"
45
"testing"
56
"time"
67

@@ -9,6 +10,7 @@ import (
910
"github.com/stretchr/testify/require"
1011

1112
"github.com/cortexproject/cortex/pkg/cortexpb"
13+
ingester_client "github.com/cortexproject/cortex/pkg/ingester/client"
1214
)
1315

1416
func TestMergeSamplesIntoFirstDuplicates(t *testing.T) {
@@ -110,51 +112,75 @@ func TestMergeSamplesIntoFirstNilB(t *testing.T) {
110112
require.Equal(t, b, a)
111113
}
112114

113-
func TestMergeExemplarSets(t *testing.T) {
115+
func TestMergeExemplars(t *testing.T) {
114116
now := timestamp.FromTime(time.Now())
115117
exemplar1 := cortexpb.Exemplar{Labels: cortexpb.FromLabelsToLabelAdapters(labels.FromStrings("traceID", "trace-1")), TimestampMs: now, Value: 1}
116118
exemplar2 := cortexpb.Exemplar{Labels: cortexpb.FromLabelsToLabelAdapters(labels.FromStrings("traceID", "trace-2")), TimestampMs: now + 1, Value: 2}
117119
exemplar3 := cortexpb.Exemplar{Labels: cortexpb.FromLabelsToLabelAdapters(labels.FromStrings("traceID", "trace-3")), TimestampMs: now + 4, Value: 3}
118120
exemplar4 := cortexpb.Exemplar{Labels: cortexpb.FromLabelsToLabelAdapters(labels.FromStrings("traceID", "trace-4")), TimestampMs: now + 8, Value: 7}
119121
exemplar5 := cortexpb.Exemplar{Labels: cortexpb.FromLabelsToLabelAdapters(labels.FromStrings("traceID", "trace-4")), TimestampMs: now, Value: 7}
120-
121-
for _, c := range []struct {
122-
exemplarsA []cortexpb.Exemplar
123-
exemplarsB []cortexpb.Exemplar
124-
expected []cortexpb.Exemplar
122+
labels1 := []cortexpb.LabelAdapter{{Name: "label1", Value: "foo1"}}
123+
labels2 := []cortexpb.LabelAdapter{{Name: "label1", Value: "foo2"}}
124+
125+
for i, c := range []struct {
126+
seriesA []cortexpb.TimeSeries
127+
seriesB []cortexpb.TimeSeries
128+
expected []cortexpb.TimeSeries
129+
nonReversible bool
125130
}{
126131
{
127-
exemplarsA: []cortexpb.Exemplar{},
128-
exemplarsB: []cortexpb.Exemplar{},
129-
expected: []cortexpb.Exemplar{},
130-
},
131-
{
132-
exemplarsA: []cortexpb.Exemplar{exemplar1},
133-
exemplarsB: []cortexpb.Exemplar{},
134-
expected: []cortexpb.Exemplar{exemplar1},
132+
seriesA: []cortexpb.TimeSeries{{Labels: labels1, Exemplars: []cortexpb.Exemplar{}}},
133+
seriesB: []cortexpb.TimeSeries{{Labels: labels1, Exemplars: []cortexpb.Exemplar{}}},
134+
expected: []cortexpb.TimeSeries{{Labels: labels1, Exemplars: []cortexpb.Exemplar{}}},
135135
},
136136
{
137-
exemplarsA: []cortexpb.Exemplar{},
138-
exemplarsB: []cortexpb.Exemplar{exemplar1},
139-
expected: []cortexpb.Exemplar{exemplar1},
137+
seriesA: []cortexpb.TimeSeries{{Labels: labels1, Exemplars: []cortexpb.Exemplar{exemplar1}}},
138+
seriesB: []cortexpb.TimeSeries{{Labels: labels1, Exemplars: []cortexpb.Exemplar{}}},
139+
expected: []cortexpb.TimeSeries{{Labels: labels1, Exemplars: []cortexpb.Exemplar{exemplar1}}},
140140
},
141141
{
142-
exemplarsA: []cortexpb.Exemplar{exemplar1},
143-
exemplarsB: []cortexpb.Exemplar{exemplar1},
144-
expected: []cortexpb.Exemplar{exemplar1},
142+
seriesA: []cortexpb.TimeSeries{{Labels: labels1, Exemplars: []cortexpb.Exemplar{exemplar1}}},
143+
seriesB: []cortexpb.TimeSeries{{Labels: labels1, Exemplars: []cortexpb.Exemplar{exemplar1}}},
144+
expected: []cortexpb.TimeSeries{{Labels: labels1, Exemplars: []cortexpb.Exemplar{exemplar1}}},
145145
},
146146
{
147-
exemplarsA: []cortexpb.Exemplar{exemplar1, exemplar2, exemplar3},
148-
exemplarsB: []cortexpb.Exemplar{exemplar1, exemplar3, exemplar4},
149-
expected: []cortexpb.Exemplar{exemplar1, exemplar2, exemplar3, exemplar4},
147+
seriesA: []cortexpb.TimeSeries{{Labels: labels1, Exemplars: []cortexpb.Exemplar{exemplar1, exemplar2, exemplar3}}},
148+
seriesB: []cortexpb.TimeSeries{{Labels: labels1, Exemplars: []cortexpb.Exemplar{exemplar1, exemplar3, exemplar4}}},
149+
expected: []cortexpb.TimeSeries{{Labels: labels1, Exemplars: []cortexpb.Exemplar{exemplar1, exemplar2, exemplar3, exemplar4}}},
150150
},
151151
{ // Ensure that when there are exemplars with duplicate timestamps, the first one wins.
152-
exemplarsA: []cortexpb.Exemplar{exemplar1, exemplar2, exemplar3},
153-
exemplarsB: []cortexpb.Exemplar{exemplar5, exemplar3, exemplar4},
154-
expected: []cortexpb.Exemplar{exemplar1, exemplar2, exemplar3, exemplar4},
152+
seriesA: []cortexpb.TimeSeries{{Labels: labels1, Exemplars: []cortexpb.Exemplar{exemplar1, exemplar2, exemplar3}}},
153+
seriesB: []cortexpb.TimeSeries{{Labels: labels1, Exemplars: []cortexpb.Exemplar{exemplar5, exemplar3, exemplar4}}},
154+
expected: []cortexpb.TimeSeries{{Labels: labels1, Exemplars: []cortexpb.Exemplar{exemplar1, exemplar2, exemplar3, exemplar4}}},
155+
nonReversible: true,
156+
},
157+
{ // Disjoint exemplars on two different series.
158+
seriesA: []cortexpb.TimeSeries{{Labels: labels1, Exemplars: []cortexpb.Exemplar{exemplar1, exemplar2}}},
159+
seriesB: []cortexpb.TimeSeries{{Labels: labels2, Exemplars: []cortexpb.Exemplar{exemplar3, exemplar4}}},
160+
expected: []cortexpb.TimeSeries{
161+
{Labels: labels1, Exemplars: []cortexpb.Exemplar{exemplar1, exemplar2}},
162+
{Labels: labels2, Exemplars: []cortexpb.Exemplar{exemplar3, exemplar4}}},
163+
},
164+
{ // Second input adds to first on one series.
165+
seriesA: []cortexpb.TimeSeries{
166+
{Labels: labels1, Exemplars: []cortexpb.Exemplar{exemplar1, exemplar2}},
167+
{Labels: labels2, Exemplars: []cortexpb.Exemplar{exemplar3}}},
168+
seriesB: []cortexpb.TimeSeries{{Labels: labels2, Exemplars: []cortexpb.Exemplar{exemplar4}}},
169+
expected: []cortexpb.TimeSeries{
170+
{Labels: labels1, Exemplars: []cortexpb.Exemplar{exemplar1, exemplar2}},
171+
{Labels: labels2, Exemplars: []cortexpb.Exemplar{exemplar3, exemplar4}}},
155172
},
156173
} {
157-
e := mergeExemplarSets(c.exemplarsA, c.exemplarsB)
158-
require.Equal(t, c.expected, e)
174+
t.Run(fmt.Sprint("test", i), func(t *testing.T) {
175+
rA := &ingester_client.ExemplarQueryResponse{Timeseries: c.seriesA}
176+
rB := &ingester_client.ExemplarQueryResponse{Timeseries: c.seriesB}
177+
e := mergeExemplarQueryResponses([]interface{}{rA, rB})
178+
require.Equal(t, c.expected, e.Timeseries)
179+
if !c.nonReversible {
180+
// Check the other way round too
181+
e = mergeExemplarQueryResponses([]interface{}{rB, rA})
182+
require.Equal(t, c.expected, e.Timeseries)
183+
}
184+
})
159185
}
160186
}

0 commit comments

Comments
 (0)