66 "io"
77 "math/rand"
88 "net/http"
9+ "net/textproto"
910 "path"
1011 "strings"
1112 "sync"
@@ -39,10 +40,12 @@ type Distributor struct {
3940 alertmanagerClientsPool ClientsPool
4041
4142 logger log.Logger
43+
44+ targetHeaders []string
4245}
4346
4447// NewDistributor constructs a new Distributor
45- func NewDistributor (cfg ClientConfig , maxRecvMsgSize int64 , alertmanagersRing * ring.Ring , alertmanagerClientsPool ClientsPool , logger log.Logger , reg prometheus.Registerer ) (d * Distributor , err error ) {
48+ func NewDistributor (cfg ClientConfig , maxRecvMsgSize int64 , alertmanagersRing * ring.Ring , alertmanagerClientsPool ClientsPool , targetHeaders [] string , logger log.Logger , reg prometheus.Registerer ) (d * Distributor , err error ) {
4649 if alertmanagerClientsPool == nil {
4750 alertmanagerClientsPool = newAlertmanagerClientsPool (client .NewRingServiceDiscovery (alertmanagersRing ), cfg , logger , reg )
4851 }
@@ -53,6 +56,7 @@ func NewDistributor(cfg ClientConfig, maxRecvMsgSize int64, alertmanagersRing *r
5356 maxRecvMsgSize : maxRecvMsgSize ,
5457 alertmanagerRing : alertmanagersRing ,
5558 alertmanagerClientsPool : alertmanagerClientsPool ,
59+ targetHeaders : targetHeaders ,
5660 }
5761
5862 d .Service = services .NewBasicService (nil , d .running , nil )
@@ -170,6 +174,7 @@ func (d *Distributor) doQuorum(userID string, w http.ResponseWriter, r *http.Req
170174 var responses []* httpgrpc.HTTPResponse
171175 var responsesMtx sync.Mutex
172176 grpcHeaders := httpToHttpgrpcHeaders (r .Header )
177+
173178 err = ring .DoBatch (r .Context (), RingOp , d .alertmanagerRing , []uint32 {shardByUser (userID )}, func (am ring.InstanceDesc , _ []int ) error {
174179 // Use a background context to make sure all alertmanagers get the request even if we return early.
175180 localCtx := opentracing .ContextWithSpan (user .InjectOrgID (context .Background (), userID ), opentracing .SpanFromContext (r .Context ()))
@@ -305,6 +310,23 @@ func (d *Distributor) doRequest(ctx context.Context, am ring.InstanceDesc, req *
305310 return nil , errors .Wrapf (err , "failed to get alertmanager client from pool (alertmanager address: %s)" , am .Addr )
306311 }
307312
313+ headers := make (map [string ]string , 0 )
314+ for _ , h := range req .Headers {
315+ headers [h .Key ] = h .Values [0 ]
316+ }
317+
318+ headerMap := make (map [string ]string , 0 )
319+ // Remove non-existent header.
320+ for _ , header := range d .targetHeaders {
321+ if v , ok := headers [textproto .CanonicalMIMEHeaderKey (header )]; ok {
322+ headerMap [header ] = v
323+ }
324+ }
325+
326+ if len (headerMap ) > 0 {
327+ ctx = util_log .ContextWithHeaderMap (ctx , headerMap )
328+ }
329+
308330 return amClient .HandleRequest (ctx , req )
309331}
310332
0 commit comments