Skip to content

Commit e9319eb

Browse files
Modifying cortexproject#4422 to retry only for GET requests. Retrying AlertManager UnaryPath GET Requests on next replica if one fail.
Signed-off-by: Krishna Teja Puttagunta <[email protected]>
1 parent c815b3c commit e9319eb

File tree

3 files changed

+66
-13
lines changed

3 files changed

+66
-13
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2088,3 +2088,4 @@ This release has several exciting features, the most notable of them being setti
20882088
* [FEATURE] You can specify "heap ballast" to reduce Go GC Churn #1489
20892089
* [BUGFIX] HA Tracker no longer always makes a request to Consul/Etcd when a request is not from the active replica #1516
20902090
* [BUGFIX] Queries are now correctly cancelled by the query-frontend #1508
2091+
* [ENHANCEMENT] AlertManager: Retrying AlertManager Get Requests (Get Alertmanager status, Get Alertmanager Receivers) on next replica on error #4840

pkg/alertmanager/distributor.go

Lines changed: 33 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -254,14 +254,41 @@ func (d *Distributor) doUnary(userID string, w http.ResponseWriter, r *http.Requ
254254
defer sp.Finish()
255255
// Until we have a mechanism to combine the results from multiple alertmanagers,
256256
// we forward the request to only only of the alertmanagers.
257-
amDesc := replicationSet.Instances[rand.Intn(len(replicationSet.Instances))]
258-
resp, err := d.doRequest(ctx, amDesc, req)
259-
if err != nil {
260-
respondFromError(err, w, logger)
261-
return
257+
258+
instances := replicationSet.Instances
259+
// Randomize the list of instances to not always query the same one.
260+
rand.Shuffle(len(instances), func(i, j int) {
261+
instances[i], instances[j] = instances[j], instances[i]
262+
263+
})
264+
finalInstances := make([]ring.InstanceDesc, 0)
265+
if req.GetMethod() == "GET" && d.isUnaryReadPath(r.URL.Path) {
266+
finalInstances = instances
267+
268+
} else {
269+
//using only 1 random instance if the request is not a GET request
270+
finalInstances = append(finalInstances, instances[0])
271+
}
272+
273+
var lastErr error
274+
for _, instance := range finalInstances {
275+
resp, err := d.doRequest(ctx, instance, req)
276+
// storing the last error message
277+
if err != nil {
278+
lastErr = err
279+
}
280+
281+
// Return on the first succeeded request
282+
if err == nil {
283+
respondFromHTTPGRPCResponse(w, resp)
284+
return
285+
}
286+
}
287+
// throwing the last error if the for loop finish without succeeding
288+
if lastErr != nil {
289+
respondFromError(lastErr, w, logger)
262290
}
263291

264-
respondFromHTTPGRPCResponse(w, resp)
265292
}
266293

267294
func respondFromError(err error, w http.ResponseWriter, logger log.Logger) {

pkg/alertmanager/distributor_test.go

Lines changed: 32 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"github.com/stretchr/testify/require"
2020
"github.com/weaveworks/common/httpgrpc"
2121
"github.com/weaveworks/common/user"
22+
"go.uber.org/atomic"
2223
"google.golang.org/grpc"
2324
"google.golang.org/grpc/health/grpc_health_v1"
2425

@@ -37,6 +38,7 @@ func TestDistributor_DistributeRequest(t *testing.T) {
3738
name string
3839
numAM, numHappyAM int
3940
replicationFactor int
41+
failedReqCount int
4042
isRead bool
4143
isDelete bool
4244
expStatusCode int
@@ -196,6 +198,25 @@ func TestDistributor_DistributeRequest(t *testing.T) {
196198
expStatusCode: http.StatusOK,
197199
expectedTotalCalls: 1,
198200
route: "/status",
201+
}, {
202+
name: "Read /status should try all alert managers on error",
203+
numAM: 3,
204+
numHappyAM: 0,
205+
replicationFactor: 3,
206+
isRead: true,
207+
expStatusCode: http.StatusInternalServerError,
208+
expectedTotalCalls: 3,
209+
route: "/status",
210+
}, {
211+
name: "Read /status is sent to 3 AM when 2 are not happy",
212+
numAM: 3,
213+
numHappyAM: 3,
214+
failedReqCount: 2,
215+
replicationFactor: 3,
216+
isRead: true,
217+
expStatusCode: http.StatusOK,
218+
expectedTotalCalls: 3,
219+
route: "/status",
199220
}, {
200221
name: "Write /status not supported",
201222
numAM: 5,
@@ -229,7 +250,7 @@ func TestDistributor_DistributeRequest(t *testing.T) {
229250
for _, c := range cases {
230251
t.Run(c.name, func(t *testing.T) {
231252
route := "/alertmanager/api/v1" + c.route
232-
d, ams, cleanup := prepare(t, c.numAM, c.numHappyAM, c.replicationFactor, c.responseBody)
253+
d, ams, cleanup := prepare(t, c.numAM, c.numHappyAM, c.failedReqCount, c.replicationFactor, c.responseBody)
233254
t.Cleanup(cleanup)
234255

235256
ctx := user.InjectOrgID(context.Background(), "1")
@@ -306,20 +327,21 @@ func TestDistributor_IsPathSupported(t *testing.T) {
306327

307328
for path, isSupported := range supported {
308329
t.Run(path, func(t *testing.T) {
309-
d, _, cleanup := prepare(t, 1, 1, 1, []byte{})
330+
d, _, cleanup := prepare(t, 1, 1, 0, 1, []byte{})
310331
t.Cleanup(cleanup)
311332
require.Equal(t, isSupported, d.IsPathSupported(path))
312333
})
313334
}
314335
}
315336

316-
func prepare(t *testing.T, numAM, numHappyAM, replicationFactor int, responseBody []byte) (*Distributor, []*mockAlertmanager, func()) {
337+
func prepare(t *testing.T, numAM, numHappyAM, totalFailedReq, replicationFactor int, responseBody []byte) (*Distributor, []*mockAlertmanager, func()) {
317338
ams := []*mockAlertmanager{}
339+
failedReqCount := atomic.NewInt32(int32(totalFailedReq))
318340
for i := 0; i < numHappyAM; i++ {
319-
ams = append(ams, newMockAlertmanager(i, true, responseBody))
341+
ams = append(ams, newMockAlertmanager(i, true, failedReqCount, responseBody))
320342
}
321343
for i := numHappyAM; i < numAM; i++ {
322-
ams = append(ams, newMockAlertmanager(i, false, responseBody))
344+
ams = append(ams, newMockAlertmanager(i, false, failedReqCount, responseBody))
323345
}
324346

325347
// Use a real ring with a mock KV store to test ring RF logic.
@@ -383,14 +405,16 @@ type mockAlertmanager struct {
383405
myAddr string
384406
happy bool
385407
responseBody []byte
408+
failedReqCount *atomic.Int32
386409
}
387410

388-
func newMockAlertmanager(idx int, happy bool, responseBody []byte) *mockAlertmanager {
411+
func newMockAlertmanager(idx int, happy bool, failedReqCount *atomic.Int32, responseBody []byte) *mockAlertmanager {
389412
return &mockAlertmanager{
390413
receivedRequests: make(map[string]map[int]int),
391414
myAddr: fmt.Sprintf("127.0.0.1:%05d", 10000+idx),
392415
happy: happy,
393416
responseBody: responseBody,
417+
failedReqCount: failedReqCount,
394418
}
395419
}
396420

@@ -409,7 +433,8 @@ func (am *mockAlertmanager) HandleRequest(_ context.Context, in *httpgrpc.HTTPRe
409433
am.receivedRequests[path] = m
410434
}
411435

412-
if am.happy {
436+
failedCountLocal := am.failedReqCount.Dec()
437+
if am.happy && failedCountLocal < 0 {
413438
m[http.StatusOK]++
414439
return &httpgrpc.HTTPResponse{
415440
Code: http.StatusOK,

0 commit comments

Comments
 (0)