@@ -77,6 +77,7 @@ type Config struct {
7777 ClientCleanupPeriod time.Duration
7878 IngestionRateLimit float64
7979 IngestionBurstSize int
80+ WaitForAllIngesters bool
8081
8182 // for testing
8283 ingesterClientFactory func (addr string , cfg ingester_client.Config ) (client.IngesterClient , error )
@@ -92,6 +93,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
9293 flag .DurationVar (& cfg .ClientCleanupPeriod , "distributor.client-cleanup-period" , 15 * time .Second , "How frequently to clean up clients for ingesters that have gone away." )
9394 flag .Float64Var (& cfg .IngestionRateLimit , "distributor.ingestion-rate-limit" , 25000 , "Per-user ingestion rate limit in samples per second." )
9495 flag .IntVar (& cfg .IngestionBurstSize , "distributor.ingestion-burst-size" , 50000 , "Per-user allowed ingestion burst size (in number of samples)." )
96+ flag .BoolVar (& cfg .WaitForAllIngesters , "distributor.wait-for-all-ingesters" , false , "Turning this on will no longer short circuit ingester writes once a minimum quorum is reached." )
9597}
9698
9799// New constructs a new Distributor
@@ -250,8 +252,10 @@ func tokenFor(userID string, name []byte) uint32 {
250252type sampleTracker struct {
251253 labels []client.LabelPair
252254 sample client.Sample
255+ total int
253256 minSuccess int
254257 maxFailures int
258+ finished int32
255259 succeeded int32
256260 failed int32
257261}
@@ -316,6 +320,7 @@ func (d *Distributor) Push(ctx context.Context, req *client.WriteRequest) (*clie
316320 // We need a response from a quorum of ingesters, which is n/2 + 1.
317321 minSuccess := (len (ingesters [i ]) / 2 ) + 1
318322 samples [i ].minSuccess = minSuccess
323+ samples [i ].total = len (ingesters [i ])
319324 samples [i ].maxFailures = len (ingesters [i ]) - minSuccess
320325
321326 // Skip those that have not heartbeated in a while. NB these are still
@@ -387,20 +392,44 @@ func (d *Distributor) sendSamples(ctx context.Context, ingester *ring.IngesterDe
387392 // The use of atomic increments here guarantees only a single sendSamples
388393 // goroutine will write to either channel.
389394 for i := range sampleTrackers {
390- if err != nil {
391- if atomic .AddInt32 (& sampleTrackers [i ].failed , 1 ) <= int32 (sampleTrackers [i ].maxFailures ) {
392- continue
393- }
395+ if d .cfg .WaitForAllIngesters {
396+ waitForAll (err , sampleTrackers [i ], pushTracker )
397+ } else {
398+ shortCircuit (err , sampleTrackers [i ], pushTracker )
399+ }
400+ }
401+ }
402+
403+ func shortCircuit (err error , sampleTracker * sampleTracker , pushTracker * pushTracker ) {
404+ if err != nil {
405+ if atomic .AddInt32 (& sampleTracker .failed , 1 ) <= int32 (sampleTracker .maxFailures ) {
406+ return
407+ }
408+ if atomic .AddInt32 (& pushTracker .samplesFailed , 1 ) == 1 {
409+ pushTracker .err <- err
410+ }
411+ } else {
412+ if atomic .AddInt32 (& sampleTracker .succeeded , 1 ) != int32 (sampleTracker .minSuccess ) {
413+ return
414+ }
415+ if atomic .AddInt32 (& pushTracker .samplesPending , - 1 ) == 0 {
416+ pushTracker .done <- struct {}{}
417+ }
418+ }
419+ }
420+
421+ func waitForAll (err error , sampleTracker * sampleTracker , pushTracker * pushTracker ) {
422+ if err != nil {
423+ if atomic .AddInt32 (& sampleTracker .failed , 1 ) > int32 (sampleTracker .maxFailures ) {
394424 if atomic .AddInt32 (& pushTracker .samplesFailed , 1 ) == 1 {
395425 pushTracker .err <- err
396426 }
397- } else {
398- if atomic .AddInt32 (& sampleTrackers [i ].succeeded , 1 ) != int32 (sampleTrackers [i ].minSuccess ) {
399- continue
400- }
401- if atomic .AddInt32 (& pushTracker .samplesPending , - 1 ) == 0 {
402- pushTracker .done <- struct {}{}
403- }
427+ }
428+ }
429+
430+ if atomic .AddInt32 (& sampleTracker .finished , 1 ) == int32 (sampleTracker .total ) {
431+ if atomic .AddInt32 (& pushTracker .samplesPending , - 1 ) == 0 {
432+ pushTracker .done <- struct {}{}
404433 }
405434 }
406435}
0 commit comments