@@ -477,15 +477,15 @@ func removeLabel(labelName string, labels *[]cortexpb.LabelAdapter) {
477477// Returns a boolean that indicates whether or not we want to remove the replica label going forward,
478478// and an error that indicates whether we want to accept samples based on the cluster/replica found in ts.
479479// nil for the error means accept the sample.
480- func (d * Distributor ) checkSample (ctx context.Context , userID , cluster , replica string ) (removeReplicaLabel bool , _ error ) {
480+ func (d * Distributor ) checkSample (ctx context.Context , userID , cluster , replica string , limits * validation. Overrides ) (removeReplicaLabel bool , _ error ) {
481481 // If the sample doesn't have either HA label, accept it.
482482 // At the moment we want to accept these samples by default.
483483 if cluster == "" || replica == "" {
484484 return false , nil
485485 }
486486
487487 // If replica label is too long, don't use it. We accept the sample here, but it will fail validation later anyway.
488- if len (replica ) > d . limits .MaxLabelValueLength (userID ) {
488+ if len (replica ) > limits .MaxLabelValueLength (userID ) {
489489 return false , nil
490490 }
491491
@@ -503,9 +503,10 @@ func (d *Distributor) checkSample(ctx context.Context, userID, cluster, replica
503503// any are configured to be dropped for the user ID.
504504// Returns the validated series with it's labels/samples, and any error.
505505// The returned error may retain the series labels.
506- func (d * Distributor ) validateSeries (ts cortexpb.PreallocTimeseries , userID string , skipLabelNameValidation bool ) (cortexpb.PreallocTimeseries , validation.ValidationError ) {
506+ func (d * Distributor ) validateSeries (ts cortexpb.PreallocTimeseries , userID string , skipLabelNameValidation bool , limits * validation. Overrides ) (cortexpb.PreallocTimeseries , validation.ValidationError ) {
507507 d .labelsHistogram .Observe (float64 (len (ts .Labels )))
508- if err := validation .ValidateLabels (d .limits , userID , ts .Labels , skipLabelNameValidation ); err != nil {
508+
509+ if err := validation .ValidateLabels (limits , userID , ts .Labels , skipLabelNameValidation ); err != nil {
509510 return emptyPreallocSeries , err
510511 }
511512
@@ -514,7 +515,7 @@ func (d *Distributor) validateSeries(ts cortexpb.PreallocTimeseries, userID stri
514515 // Only alloc when data present
515516 samples = make ([]cortexpb.Sample , 0 , len (ts .Samples ))
516517 for _ , s := range ts .Samples {
517- if err := validation .ValidateSample (d . limits , userID , ts .Labels , s ); err != nil {
518+ if err := validation .ValidateSample (limits , userID , ts .Labels , s ); err != nil {
518519 return emptyPreallocSeries , err
519520 }
520521 samples = append (samples , s )
@@ -598,9 +599,12 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co
598599 validatedSamples := 0
599600 validatedExemplars := 0
600601
601- if d .limits .AcceptHASamples (userID ) && len (req .Timeseries ) > 0 {
602- cluster , replica := findHALabels (d .limits .HAReplicaLabel (userID ), d .limits .HAClusterLabel (userID ), req .Timeseries [0 ].Labels )
603- removeReplica , err = d .checkSample (ctx , userID , cluster , replica )
602+ // Cache user limit with overrides.
603+ limits , _ := validation .NewOverrides (* (d .limits .GetOverridesForUser (userID )), nil )
604+
605+ if limits .AcceptHASamples (userID ) && len (req .Timeseries ) > 0 {
606+ cluster , replica := findHALabels (limits .HAReplicaLabel (userID ), limits .HAClusterLabel (userID ), req .Timeseries [0 ].Labels )
607+ removeReplica , err = d .checkSample (ctx , userID , cluster , replica , limits )
604608 if err != nil {
605609 // Ensure the request slice is reused if the series get deduped.
606610 cortexpb .ReuseSlice (req .Timeseries )
@@ -634,13 +638,15 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co
634638
635639 // For each timeseries, compute a hash to distribute across ingesters;
636640 // check each sample and discard if outside limits.
641+
642+ skipLabelNameValidation := d .cfg .SkipLabelNameValidation || req .GetSkipLabelNameValidation ()
637643 for _ , ts := range req .Timeseries {
638644 // Use timestamp of latest sample in the series. If samples for series are not ordered, metric for user may be wrong.
639645 if len (ts .Samples ) > 0 {
640646 latestSampleTimestampMs = util_math .Max64 (latestSampleTimestampMs , ts .Samples [len (ts .Samples )- 1 ].TimestampMs )
641647 }
642648
643- if mrc := d . limits .MetricRelabelConfigs (userID ); len (mrc ) > 0 {
649+ if mrc := limits .MetricRelabelConfigs (userID ); len (mrc ) > 0 {
644650 l := relabel .Process (cortexpb .FromLabelAdaptersToLabels (ts .Labels ), mrc ... )
645651 if len (l ) == 0 {
646652 // all labels are gone, samples will be discarded
@@ -657,10 +663,10 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co
657663 // storing series in Cortex. If we kept the replica label we would end up with another series for the same
658664 // series we're trying to dedupe when HA tracking moves over to a different replica.
659665 if removeReplica {
660- removeLabel (d . limits .HAReplicaLabel (userID ), & ts .Labels )
666+ removeLabel (limits .HAReplicaLabel (userID ), & ts .Labels )
661667 }
662668
663- for _ , labelName := range d . limits .DropLabels (userID ) {
669+ for _ , labelName := range limits .DropLabels (userID ) {
664670 removeLabel (labelName , & ts .Labels )
665671 }
666672
@@ -686,9 +692,7 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co
686692 if err != nil {
687693 return nil , err
688694 }
689-
690- skipLabelNameValidation := d .cfg .SkipLabelNameValidation || req .GetSkipLabelNameValidation ()
691- validatedSeries , validationErr := d .validateSeries (ts , userID , skipLabelNameValidation )
695+ validatedSeries , validationErr := d .validateSeries (ts , userID , skipLabelNameValidation , limits )
692696
693697 // Errors in validation are considered non-fatal, as one series in a request may contain
694698 // invalid data but all the remaining series could be perfectly valid.
@@ -710,7 +714,7 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co
710714 }
711715
712716 for _ , m := range req .Metadata {
713- err := validation .ValidateMetadata (d . limits , userID , m )
717+ err := validation .ValidateMetadata (limits , userID , m )
714718
715719 if err != nil {
716720 if firstPartialErr == nil {
@@ -756,7 +760,7 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co
756760
757761 // Obtain a subring if required.
758762 if d .cfg .ShardingStrategy == util .ShardingStrategyShuffle {
759- subRing = d .ingestersRing .ShuffleShard (userID , d . limits .IngestionTenantShardSize (userID ))
763+ subRing = d .ingestersRing .ShuffleShard (userID , limits .IngestionTenantShardSize (userID ))
760764 }
761765
762766 keys := append (seriesKeys , metadataKeys ... )
0 commit comments