Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 14 additions & 7 deletions relay-cardinality/src/limiter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::hash::Hash;
use relay_statsd::metric;

use crate::statsd::CardinalityLimiterTimers;
use crate::{OrganizationId, Result};
use crate::{Error, OrganizationId, Result};

/// Limiter responsible to enforce limits.
pub trait Limiter {
Expand Down Expand Up @@ -109,17 +109,24 @@ impl<T: Limiter> CardinalityLimiter<T> {
&self,
organization: OrganizationId,
items: Vec<I>,
) -> Result<CardinalityLimits<I>> {
) -> Result<CardinalityLimits<I>, (Vec<I>, Error)> {
metric!(timer(CardinalityLimiterTimers::CardinalityLimiter), {
let entries = items.iter().enumerate().filter_map(|(id, item)| {
Some(Entry::new(EntryId(id), item.to_scope()?, item.to_hash()))
});

let rejections = self
.limiter
.check_cardinality_limits(organization, entries, self.config.cardinality_limit)?
.map(|rejection| rejection.id.0)
.collect::<BTreeSet<usize>>();
let result = self.limiter.check_cardinality_limits(
organization,
entries,
self.config.cardinality_limit,
);

let rejections = match result {
Ok(rejections) => rejections
.map(|rejection| rejection.id.0)
.collect::<BTreeSet<usize>>(),
Err(err) => return Err((items, err)),
};

Ok(CardinalityLimits {
source: items,
Expand Down
39 changes: 20 additions & 19 deletions relay-server/src/actors/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use bytes::Bytes;
use chrono::{DateTime, Utc};
use flate2::write::{GzEncoder, ZlibEncoder};
use flate2::Compression;
use itertools::Either;
use relay_base_schema::project::{ProjectId, ProjectKey};
use relay_common::time::UnixTimestamp;
use relay_config::{Config, HttpEncoding};
Expand Down Expand Up @@ -1449,21 +1450,30 @@ impl EnvelopeProcessorService {
enable_cardinality_limiter: bool,
_organization_id: u64,
buckets: Vec<Bucket>,
) -> Result<Box<dyn Iterator<Item = Bucket>>, relay_cardinality::Error> {
) -> impl Iterator<Item = Bucket> {
if !enable_cardinality_limiter {
return Ok(Box::new(buckets.into_iter()));
// Use left for original vector of buckets, right for cardinality limited/filtered buckets.
return Either::Left(buckets.into_iter());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC Left is for buckets that are accepted and Right for those that are rate-limited. Is this correct? Is it worth documenting it?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exactly, but it doesn't have to be like that, Left and Right can contain any iterator, it's just a helper to be able to return 2 distinct iterator types as one concrete type

}

#[cfg(feature = "processing")]
if let Some(ref cardinality_limiter) = self.inner.cardinality_limiter {
return Ok(Box::new(
cardinality_limiter
.check_cardinality_limits(_organization_id, buckets)?
.into_accepted(),
));
let limits = cardinality_limiter.check_cardinality_limits(_organization_id, buckets);

return match limits {
Ok(limits) => Either::Right(limits.into_accepted()),
Err((buckets, error)) => {
relay_log::error!(
error = &error as &dyn std::error::Error,
"cardinality limiter failed"
);

Either::Left(buckets.into_iter())
}
};
}

Ok(Box::new(buckets.into_iter()))
Either::<_, relay_cardinality::limiter::Accepted<_>>::Left(buckets.into_iter())
}

fn handle_encode_metrics(&self, message: EncodeMetrics) {
Expand All @@ -1476,20 +1486,11 @@ impl EnvelopeProcessorService {
enable_cardinality_limiter,
} = message;

let buckets = match self.check_cardinality_limits(
let buckets = self.check_cardinality_limits(
enable_cardinality_limiter,
scoping.organization_id,
buckets,
) {
Ok(buckets) => buckets,
Err(error) => {
relay_log::error!(
error = &error as &dyn std::error::Error,
"cardinality limiter failed"
);
return;
}
};
);

let partitions = self.inner.config.metrics_partitions();
let max_batch_size_bytes = self.inner.config.metrics_max_batch_size_bytes();
Expand Down