diff --git a/relay-cardinality/src/limiter.rs b/relay-cardinality/src/limiter.rs index 0549b732467..c8fe4e066e7 100644 --- a/relay-cardinality/src/limiter.rs +++ b/relay-cardinality/src/limiter.rs @@ -7,7 +7,7 @@ use std::hash::Hash; use relay_statsd::metric; use crate::statsd::CardinalityLimiterTimers; -use crate::{OrganizationId, Result}; +use crate::{Error, OrganizationId, Result}; /// Limiter responsible to enforce limits. pub trait Limiter { @@ -109,17 +109,24 @@ impl CardinalityLimiter { &self, organization: OrganizationId, items: Vec, - ) -> Result> { + ) -> Result, (Vec, Error)> { metric!(timer(CardinalityLimiterTimers::CardinalityLimiter), { let entries = items.iter().enumerate().filter_map(|(id, item)| { Some(Entry::new(EntryId(id), item.to_scope()?, item.to_hash())) }); - let rejections = self - .limiter - .check_cardinality_limits(organization, entries, self.config.cardinality_limit)? - .map(|rejection| rejection.id.0) - .collect::>(); + let result = self.limiter.check_cardinality_limits( + organization, + entries, + self.config.cardinality_limit, + ); + + let rejections = match result { + Ok(rejections) => rejections + .map(|rejection| rejection.id.0) + .collect::>(), + Err(err) => return Err((items, err)), + }; Ok(CardinalityLimits { source: items, diff --git a/relay-server/src/actors/processor.rs b/relay-server/src/actors/processor.rs index c4710933ef3..77d60f962f2 100644 --- a/relay-server/src/actors/processor.rs +++ b/relay-server/src/actors/processor.rs @@ -10,6 +10,7 @@ use bytes::Bytes; use chrono::{DateTime, Utc}; use flate2::write::{GzEncoder, ZlibEncoder}; use flate2::Compression; +use itertools::Either; use relay_base_schema::project::{ProjectId, ProjectKey}; use relay_common::time::UnixTimestamp; use relay_config::{Config, HttpEncoding}; @@ -1449,21 +1450,30 @@ impl EnvelopeProcessorService { enable_cardinality_limiter: bool, _organization_id: u64, buckets: Vec, - ) -> Result>, relay_cardinality::Error> { + ) -> impl Iterator { if !enable_cardinality_limiter { - return Ok(Box::new(buckets.into_iter())); + // Use left for original vector of buckets, right for cardinality limited/filtered buckets. + return Either::Left(buckets.into_iter()); } #[cfg(feature = "processing")] if let Some(ref cardinality_limiter) = self.inner.cardinality_limiter { - return Ok(Box::new( - cardinality_limiter - .check_cardinality_limits(_organization_id, buckets)? - .into_accepted(), - )); + let limits = cardinality_limiter.check_cardinality_limits(_organization_id, buckets); + + return match limits { + Ok(limits) => Either::Right(limits.into_accepted()), + Err((buckets, error)) => { + relay_log::error!( + error = &error as &dyn std::error::Error, + "cardinality limiter failed" + ); + + Either::Left(buckets.into_iter()) + } + }; } - Ok(Box::new(buckets.into_iter())) + Either::<_, relay_cardinality::limiter::Accepted<_>>::Left(buckets.into_iter()) } fn handle_encode_metrics(&self, message: EncodeMetrics) { @@ -1476,20 +1486,11 @@ impl EnvelopeProcessorService { enable_cardinality_limiter, } = message; - let buckets = match self.check_cardinality_limits( + let buckets = self.check_cardinality_limits( enable_cardinality_limiter, scoping.organization_id, buckets, - ) { - Ok(buckets) => buckets, - Err(error) => { - relay_log::error!( - error = &error as &dyn std::error::Error, - "cardinality limiter failed" - ); - return; - } - }; + ); let partitions = self.inner.config.metrics_partitions(); let max_batch_size_bytes = self.inner.config.metrics_max_batch_size_bytes();