diff --git a/relay-server/src/endpoints/attachments.rs b/relay-server/src/endpoints/attachments.rs index 07eb941df44..5c3ea441dc7 100644 --- a/relay-server/src/endpoints/attachments.rs +++ b/relay-server/src/endpoints/attachments.rs @@ -39,8 +39,10 @@ pub async fn handle( meta: RequestMeta, Path(path): Path, multipart: ConstrainedMultipart, -) -> Result { +) -> axum::response::Result { let envelope = extract_envelope(meta, path, multipart, state.config()).await?; - common::handle_envelope(&state, envelope).await?; + common::handle_envelope(&state, envelope) + .await? + .ignore_rate_limits(); Ok(StatusCode::CREATED) } diff --git a/relay-server/src/endpoints/common.rs b/relay-server/src/endpoints/common.rs index b723746d97e..8932a438fc5 100644 --- a/relay-server/src/endpoints/common.rs +++ b/relay-server/src/endpoints/common.rs @@ -9,7 +9,7 @@ use relay_statsd::metric; use serde::Deserialize; use crate::envelope::{AttachmentType, Envelope, EnvelopeError, Item, ItemType, Items}; -use crate::managed::ManagedEnvelope; +use crate::managed::{Managed, Rejected}; use crate::service::ServiceState; use crate::services::buffer::ProjectKeyPair; use crate::services::outcome::{DiscardItemType, DiscardReason, Outcome}; @@ -269,31 +269,40 @@ pub fn event_id_from_items(items: &Items) -> Result, BadStoreReq /// returned and the envelope is not queued. fn queue_envelope( state: &ServiceState, - mut managed_envelope: ManagedEnvelope, -) -> Result<(), BadStoreRequest> { - let envelope = managed_envelope.envelope_mut(); - + mut envelope: Managed>, +) -> Result<(), Rejected> { if state.config().relay_mode() != RelayMode::Proxy { // Remove metrics from the envelope and queue them directly on the project's `Aggregator`. // In proxy mode, we cannot aggregate metrics because we may not have a project ID. let is_metric = |i: &Item| matches!(i.ty(), ItemType::Statsd | ItemType::MetricBuckets); - let metric_items = envelope.take_items_by(is_metric); - if !metric_items.is_empty() { - relay_log::trace!("sending metrics into processing queue"); + let metrics; + (envelope, metrics) = envelope.split_once(|mut envelope| { + let metrics = envelope.take_items_by(is_metric).into_vec(); + (envelope, metrics) + }); + + metrics.accept(|metrics| { + if metrics.is_empty() { + return; + } + state.processor().send(ProcessMetrics { - data: MetricData::Raw(metric_items.into_vec()), + data: MetricData::Raw(metrics), received_at: envelope.received_at(), sent_at: envelope.sent_at(), project_key: envelope.meta().public_key(), source: BucketSource::from_meta(envelope.meta()), }); - } + }) } - let pkp = ProjectKeyPair::from_envelope(&*envelope); - if !state.envelope_buffer(pkp).try_push(managed_envelope) { - return Err(BadStoreRequest::QueueFailed); + let pkp = ProjectKeyPair::from_envelope(&envelope); + if let Err(envelope) = state.envelope_buffer(pkp).try_push(envelope) { + return Err(envelope.reject_err(( + Outcome::Invalid(DiscardReason::Internal), + BadStoreRequest::QueueFailed, + ))); } Ok(()) @@ -310,66 +319,110 @@ fn queue_envelope( pub async fn handle_envelope( state: &ServiceState, envelope: Box, -) -> Result, BadStoreRequest> { +) -> Result> { emit_envelope_metrics(&envelope); + let mut envelope = Managed::from_envelope(envelope, state.outcome_aggregator().clone()); + if state.memory_checker().check_memory().is_exceeded() { - return Err(BadStoreRequest::QueueFailed); + return Err(envelope.reject_err(( + Outcome::Invalid(DiscardReason::Internal), + BadStoreRequest::QueueFailed, + ))); }; - let mut managed_envelope = ManagedEnvelope::new(envelope, state.outcome_aggregator().clone()); - // If configured, remove unknown items at the very beginning. If the envelope is // empty, we fail the request with a special control flow error to skip checks and // queueing, that still results in a `200 OK` response. - utils::remove_unknown_items(state.config(), &mut managed_envelope); - - let event_id = managed_envelope.envelope().event_id(); - if managed_envelope.envelope().is_empty() { - managed_envelope.reject(Outcome::Invalid(DiscardReason::EmptyEnvelope)); - return Ok(event_id); + utils::remove_unknown_items(state.config(), &mut envelope); + + let event_id = envelope.event_id(); + if envelope.is_empty() { + return Ok(HandledEnvelope { + event_id, + rate_limits: Default::default(), + }); } - let project_key = managed_envelope.envelope().meta().public_key(); + let project_key = envelope.meta().public_key(); // Prefetch sampling project key, current spooling implementations rely on this behavior. // // To be changed once spool v1 has been removed. - if let Some(sampling_project_key) = managed_envelope.envelope().sampling_key() + if let Some(sampling_project_key) = envelope.sampling_key() && sampling_project_key != project_key { state.project_cache_handle().fetch(sampling_project_key); } - let checked = state + let rate_limits = state .project_cache_handle() .get(project_key) - .check_envelope(managed_envelope) + .check_envelope(&mut envelope) .await - .map_err(BadStoreRequest::EventRejected)?; + .map_err(|err| err.map(BadStoreRequest::EventRejected))?; - let Some(mut managed_envelope) = checked.envelope else { - // All items have been removed from the envelope. - return Err(BadStoreRequest::RateLimited(checked.rate_limits)); - }; + if envelope.is_empty() { + return Err(envelope.reject_err((None, BadStoreRequest::RateLimited(rate_limits)))); + } - if let Err(offender) = - utils::check_envelope_size_limits(state.config(), managed_envelope.envelope()) - { - managed_envelope.reject(Outcome::Invalid(DiscardReason::TooLarge(offender))); - return Err(BadStoreRequest::Overflow(offender)); + if let Err(offender) = utils::check_envelope_size_limits(state.config(), &envelope) { + return Err(envelope.reject_err(( + Outcome::Invalid(DiscardReason::TooLarge(offender)), + BadStoreRequest::Overflow(offender), + ))); } - queue_envelope(state, managed_envelope)?; + queue_envelope(state, envelope)?; - if checked.rate_limits.is_limited() { + Ok(HandledEnvelope { + event_id, // Even if some envelope items have been queued, there might be active rate limits on // other items. Communicate these rate limits to the downstream (Relay or SDK). - // - // See `IntoResponse` implementation of `BadStoreRequest`. - Err(BadStoreRequest::RateLimited(checked.rate_limits)) - } else { - Ok(event_id) + rate_limits, + }) +} + +/// A successfully handled envelope, returned by [`handle_envelope`]. +#[derive(Debug)] +#[must_use = "rate limits of a handled envelope must be used"] +pub struct HandledEnvelope { + /// The event id of the envelope. + pub event_id: Option, + /// All active, but not necessarily enforced, rate limits. + /// + /// These rate limits should always be communicated to the client on envelope endpoints. + /// + /// See also: [`BadStoreRequest::RateLimited`]. + pub rate_limits: RateLimits, +} + +impl HandledEnvelope { + /// Ensures all active rate limits are handled as an error. + /// + /// This is legacy behaviour where active rate limits are returned as an error, instead of + /// being added to the usual response. + /// The event id in this legacy behaviour is only returned when there are no active rate + /// limits. + /// + /// The functions simplifies this legacy handling by turning rate limits into an error again. + pub fn ensure_rate_limits(self) -> Result, BadStoreRequest> { + if self.rate_limits.is_limited() { + return Err(BadStoreRequest::RateLimited(self.rate_limits)); + } + Ok(self.event_id) + } + + /// Explicitly ignores contained active rate limits. + /// + /// Endpoints which choose to not propagate active rate limits, should use this method to + /// explicitly state the fact they do not propagate the rate limits. + /// + /// Most endpoints ignore active rate limits, they are mostly used in envelope based endpoints. + /// + /// Note: enforced rate limits are still returned as an error from [`handle_envelope`]. + pub fn ignore_rate_limits(self) -> Option { + self.event_id } } diff --git a/relay-server/src/endpoints/envelope.rs b/relay-server/src/endpoints/envelope.rs index 647231f0582..3c60f144436 100644 --- a/relay-server/src/endpoints/envelope.rs +++ b/relay-server/src/endpoints/envelope.rs @@ -116,9 +116,11 @@ struct StoreResponse { async fn handle( state: ServiceState, params: EnvelopeParams, -) -> Result { +) -> axum::response::Result { let envelope = params.extract_envelope()?; - let id = common::handle_envelope(&state, envelope).await?; + let id = common::handle_envelope(&state, envelope) + .await? + .ensure_rate_limits()?; Ok(Json(StoreResponse { id })) } diff --git a/relay-server/src/endpoints/integrations/otlp.rs b/relay-server/src/endpoints/integrations/otlp.rs index 6b4590b5d8c..5d776a7bbd6 100644 --- a/relay-server/src/endpoints/integrations/otlp.rs +++ b/relay-server/src/endpoints/integrations/otlp.rs @@ -43,7 +43,9 @@ mod traces { .with_required_feature(Feature::OtelTracesEndpoint) .build(); - common::handle_envelope(&state, envelope).await?; + common::handle_envelope(&state, envelope) + .await? + .ignore_rate_limits(); Ok(StatusCode::ACCEPTED) } @@ -72,7 +74,9 @@ mod logs { .with_required_feature(Feature::OtelLogsEndpoint) .build(); - common::handle_envelope(&state, envelope).await?; + common::handle_envelope(&state, envelope) + .await? + .ignore_rate_limits(); Ok(StatusCode::ACCEPTED) } diff --git a/relay-server/src/endpoints/integrations/vercel.rs b/relay-server/src/endpoints/integrations/vercel.rs index 7e8372142b5..6a092188ecd 100644 --- a/relay-server/src/endpoints/integrations/vercel.rs +++ b/relay-server/src/endpoints/integrations/vercel.rs @@ -40,7 +40,9 @@ mod logs { .with_required_feature(Feature::VercelLogDrainEndpoint) .build(); - common::handle_envelope(&state, envelope).await?; + common::handle_envelope(&state, envelope) + .await? + .ignore_rate_limits(); Ok(StatusCode::ACCEPTED) } diff --git a/relay-server/src/endpoints/minidump.rs b/relay-server/src/endpoints/minidump.rs index df649d31fbe..6377c1156a1 100644 --- a/relay-server/src/endpoints/minidump.rs +++ b/relay-server/src/endpoints/minidump.rs @@ -233,7 +233,10 @@ async fn handle( let id = envelope.event_id(); // Never respond with a 429 since clients often retry these - match common::handle_envelope(&state, envelope).await { + match common::handle_envelope(&state, envelope) + .await + .map_err(|err| err.into_inner()) + { Ok(_) | Err(BadStoreRequest::RateLimited(_)) => (), Err(error) => return Err(error.into()), }; diff --git a/relay-server/src/endpoints/monitor.rs b/relay-server/src/endpoints/monitor.rs index 3ac3a7a1ca2..591b7001632 100644 --- a/relay-server/src/endpoints/monitor.rs +++ b/relay-server/src/endpoints/monitor.rs @@ -66,7 +66,10 @@ async fn handle( envelope.add_item(item); // Never respond with a 429 - match common::handle_envelope(&state, envelope).await { + match common::handle_envelope(&state, envelope) + .await + .map_err(|err| err.into_inner()) + { Ok(_) | Err(BadStoreRequest::RateLimited(_)) => (), Err(error) => return Err(error.into()), }; diff --git a/relay-server/src/endpoints/nel.rs b/relay-server/src/endpoints/nel.rs index 3badc3e566f..73248653d04 100644 --- a/relay-server/src/endpoints/nel.rs +++ b/relay-server/src/endpoints/nel.rs @@ -40,7 +40,7 @@ async fn handle( state: ServiceState, mime: Mime, params: NelReportParams, -) -> Result { +) -> axum::response::Result { if !is_nel_mime(mime) { return Ok(StatusCode::UNSUPPORTED_MEDIA_TYPE.into_response()); } @@ -55,7 +55,10 @@ async fn handle( envelope.add_item(report_item); } - common::handle_envelope(&state, envelope).await?; + common::handle_envelope(&state, envelope) + .await? + .ignore_rate_limits(); + Ok(().into_response()) } diff --git a/relay-server/src/endpoints/playstation.rs b/relay-server/src/endpoints/playstation.rs index 3f7d63ed985..4ebb55dcac0 100644 --- a/relay-server/src/endpoints/playstation.rs +++ b/relay-server/src/endpoints/playstation.rs @@ -121,7 +121,10 @@ async fn handle( let id = envelope.event_id(); // Never respond with a 429 since clients often retry these - match common::handle_envelope(&state, envelope).await { + match common::handle_envelope(&state, envelope) + .await + .map_err(|err| err.into_inner()) + { Ok(_) | Err(BadStoreRequest::RateLimited(_)) => (), Err(error) => return Err(error.into()), }; diff --git a/relay-server/src/endpoints/security_report.rs b/relay-server/src/endpoints/security_report.rs index 76cd8e338ec..572460fe236 100644 --- a/relay-server/src/endpoints/security_report.rs +++ b/relay-server/src/endpoints/security_report.rs @@ -95,13 +95,16 @@ async fn handle( state: ServiceState, mime: Mime, params: SecurityReportParams, -) -> Result { +) -> axum::response::Result { if !is_security_mime(mime) { return Ok(StatusCode::UNSUPPORTED_MEDIA_TYPE.into_response()); } let envelope = params.extract_envelope()?; - common::handle_envelope(&state, envelope).await?; + common::handle_envelope(&state, envelope) + .await? + .ignore_rate_limits(); + Ok(().into_response()) } diff --git a/relay-server/src/endpoints/store.rs b/relay-server/src/endpoints/store.rs index d9e052480be..c396955c21c 100644 --- a/relay-server/src/endpoints/store.rs +++ b/relay-server/src/endpoints/store.rs @@ -108,16 +108,21 @@ async fn handle_post( meta: RequestMeta, content_type: RawContentType, body: Bytes, -) -> Result { +) -> axum::response::Result { let envelope = match content_type.as_ref() { - envelope::CONTENT_TYPE => Envelope::parse_request(body, meta)?, + envelope::CONTENT_TYPE => { + Envelope::parse_request(body, meta).map_err(BadStoreRequest::InvalidEnvelope)? + } _ => parse_event(body, meta, state.config())?, }; if envelope.is_internal() { - return Err(BadStoreRequest::InternalEnvelope); + return Err(BadStoreRequest::InternalEnvelope.into()); } - let id = common::handle_envelope(&state, envelope).await?; + let id = common::handle_envelope(&state, envelope) + .await? + .ensure_rate_limits()?; + Ok(axum::Json(PostResponse { id }).into_response()) } @@ -140,9 +145,11 @@ async fn handle_get( state: ServiceState, meta: RequestMeta, Query(query): Query, -) -> Result { +) -> axum::response::Result { let envelope = parse_event(query.sentry_data.into(), meta, state.config())?; - common::handle_envelope(&state, envelope).await?; + common::handle_envelope(&state, envelope) + .await? + .ensure_rate_limits()?; Ok(([(header::CONTENT_TYPE, "image/gif")], PIXEL)) } diff --git a/relay-server/src/endpoints/unreal.rs b/relay-server/src/endpoints/unreal.rs index 8ba74c8bd98..871fd078103 100644 --- a/relay-server/src/endpoints/unreal.rs +++ b/relay-server/src/endpoints/unreal.rs @@ -57,7 +57,10 @@ async fn handle( let id = envelope.event_id(); // Never respond with a 429 since clients often retry these - match common::handle_envelope(&state, envelope).await { + match common::handle_envelope(&state, envelope) + .await + .map_err(|err| err.into_inner()) + { Ok(_) | Err(BadStoreRequest::RateLimited(_)) => (), Err(error) => return Err(error), }; diff --git a/relay-server/src/managed/counted.rs b/relay-server/src/managed/counted.rs index 6ed87e3efab..a6730592d7c 100644 --- a/relay-server/src/managed/counted.rs +++ b/relay-server/src/managed/counted.rs @@ -1,3 +1,5 @@ +use std::collections::BTreeMap; + use relay_event_schema::protocol::{ OurLog, SessionAggregateItem, SessionAggregates, SessionUpdate, Span, SpanV2, TraceMetric, }; @@ -29,6 +31,12 @@ impl Counted for () { } } +impl Counted for (DataCategory, usize) { + fn quantities(&self) -> Quantities { + smallvec::smallvec![*self] + } +} + impl Counted for Item { fn quantities(&self) -> Quantities { self.quantities() @@ -175,3 +183,19 @@ where self.as_ref().quantities() } } + +impl Counted for Vec +where + T: Counted, +{ + fn quantities(&self) -> Quantities { + self.iter() + .flat_map(|c| c.quantities()) + .fold(BTreeMap::new(), |mut acc, (category, quantity)| { + *acc.entry(category).or_default() += quantity; + acc + }) + .into_iter() + .collect() + } +} diff --git a/relay-server/src/managed/managed.rs b/relay-server/src/managed/managed.rs index f194e8e3035..496f44d3026 100644 --- a/relay-server/src/managed/managed.rs +++ b/relay-server/src/managed/managed.rs @@ -46,6 +46,14 @@ impl OutcomeError for Outcome { } } +impl OutcomeError for Option { + type Error = (); + + fn consume(self) -> (Option, Self::Error) { + (self, ()).consume() + } +} + impl OutcomeError for (Outcome, E) { type Error = E; @@ -119,6 +127,15 @@ where } } +impl axum::response::IntoResponse for Rejected +where + T: axum::response::IntoResponse, +{ + fn into_response(self) -> axum::response::Response { + self.0.into_response() + } +} + /// The [`Managed`] wrapper ensures outcomes are correctly emitted for the contained item. pub struct Managed { value: T, @@ -126,12 +143,30 @@ pub struct Managed { done: AtomicBool, } +impl Managed<()> { + /// Creates a managed instance from an unmanaged envelope. + pub fn from_envelope( + envelope: Box, + outcome_aggregator: Addr, + ) -> Managed> { + let meta = Arc::new(Meta { + outcome_aggregator, + received_at: envelope.received_at(), + scoping: envelope.meta().get_partial_scoping().into_scoping(), + event_id: envelope.event_id(), + remote_addr: envelope.meta().remote_addr(), + }); + + Managed::from_parts(envelope, meta) + } +} + impl Managed { - /// Creates a new managed instance with a `value` from a [`ManagedEnvelope`]. + /// Derives a new managed instance with a `value` from a [`ManagedEnvelope`]. /// /// The [`Managed`] instance, inherits all metadata from the passed [`ManagedEnvelope`], /// like received time or scoping. - pub fn from_envelope(envelope: &ManagedEnvelope, value: T) -> Self { + pub fn derive_from(envelope: &ManagedEnvelope, value: T) -> Self { Self::from_parts( value, Arc::new(Meta { @@ -162,6 +197,20 @@ impl Managed { self.meta.scoping } + /// Updates the scoping stored in this context. + /// + /// Special care has to be taken when items contained in the managed instance also store a + /// scoping. Such a scoping will **not** be updated. + /// + /// Conversions between `Managed>` and `ManagedEnvelope` transfer the scoping + /// correctly. + /// + /// See also: [`ManagedEnvelope::scope`]. + pub fn scope(&mut self, scoping: Scoping) { + let meta = Arc::make_mut(&mut self.meta); + meta.scoping = scoping; + } + /// Splits [`Self`] into two other [`Managed`] items. /// /// The two resulting managed instances together are expected to have the same outcomes as the original instance.. @@ -493,14 +542,17 @@ impl Managed { debug_assert!(!self.is_done()); let (outcome, error) = error.consume(); - if let Some(outcome) = outcome { - self.do_reject(outcome); - } + self.do_reject(outcome); Rejected(error) } - fn do_reject(&self, outcome: Outcome) { - if !self.done.fetch_or(true, Ordering::Relaxed) { + fn do_reject(&self, outcome: Option) { + // If item was already handled, do nothing. + if self.done.fetch_or(true, Ordering::Relaxed) { + return; + } + + if let Some(outcome) = outcome { for (category, quantity) in self.value.quantities() { self.meta.track_outcome(outcome.clone(), category, quantity); } @@ -541,7 +593,7 @@ impl Managed { impl Drop for Managed { fn drop(&mut self) { - self.do_reject(Outcome::Invalid(DiscardReason::Internal)); + self.do_reject(Some(Outcome::Invalid(DiscardReason::Internal))); } } @@ -584,6 +636,7 @@ impl std::ops::Deref for Managed { } /// Internal metadata attached with a [`Managed`] instance. +#[derive(Debug, Clone)] struct Meta { /// Outcome aggregator service. outcome_aggregator: Addr, diff --git a/relay-server/src/middlewares/mod.rs b/relay-server/src/middlewares/mod.rs index 34a0036145e..3cf949be3c9 100644 --- a/relay-server/src/middlewares/mod.rs +++ b/relay-server/src/middlewares/mod.rs @@ -7,6 +7,7 @@ //! See the server startup in [`HttpServer`](crate::services::server::HttpServer) for where these //! middlewares are registered. +mod body_timing; mod cors; mod decompression; mod handle_panic; @@ -14,8 +15,6 @@ mod metrics; mod normalize_path; mod trace; -mod body_timing; - pub use self::body_timing::*; pub use self::cors::*; pub use self::decompression::*; diff --git a/relay-server/src/processing/check_ins/mod.rs b/relay-server/src/processing/check_ins/mod.rs index ac9aa01ffc3..478e48e0e93 100644 --- a/relay-server/src/processing/check_ins/mod.rs +++ b/relay-server/src/processing/check_ins/mod.rs @@ -75,7 +75,7 @@ impl processing::Processor for CheckInsProcessor { .into_vec(); let work = SerializedCheckIns { headers, check_ins }; - Some(Managed::from_envelope(envelope, work)) + Some(Managed::derive_from(envelope, work)) } async fn process( diff --git a/relay-server/src/processing/logs/mod.rs b/relay-server/src/processing/logs/mod.rs index 79e3e723d89..b0c603f6cd6 100644 --- a/relay-server/src/processing/logs/mod.rs +++ b/relay-server/src/processing/logs/mod.rs @@ -129,7 +129,7 @@ impl processing::Processor for LogsProcessor { logs, integrations, }; - Some(Managed::from_envelope(envelope, work)) + Some(Managed::derive_from(envelope, work)) } async fn process( diff --git a/relay-server/src/processing/sessions/mod.rs b/relay-server/src/processing/sessions/mod.rs index 484a3eabe53..d74834343c2 100644 --- a/relay-server/src/processing/sessions/mod.rs +++ b/relay-server/src/processing/sessions/mod.rs @@ -81,7 +81,7 @@ impl processing::Processor for SessionsProcessor { updates, aggregates, }; - Some(Managed::from_envelope(envelope, work)) + Some(Managed::derive_from(envelope, work)) } async fn process( diff --git a/relay-server/src/processing/spans/mod.rs b/relay-server/src/processing/spans/mod.rs index 355c192e3f9..37a9461e0d8 100644 --- a/relay-server/src/processing/spans/mod.rs +++ b/relay-server/src/processing/spans/mod.rs @@ -136,7 +136,7 @@ impl processing::Processor for SpansProcessor { legacy, integrations, }; - Some(Managed::from_envelope(envelope, work)) + Some(Managed::derive_from(envelope, work)) } async fn process( diff --git a/relay-server/src/processing/trace_metrics/mod.rs b/relay-server/src/processing/trace_metrics/mod.rs index af06fc03e3a..bb1dae5b87e 100644 --- a/relay-server/src/processing/trace_metrics/mod.rs +++ b/relay-server/src/processing/trace_metrics/mod.rs @@ -108,7 +108,7 @@ impl processing::Processor for TraceMetricsProcessor { } let work = SerializedTraceMetrics { headers, metrics }; - Some(Managed::from_envelope(envelope, work)) + Some(Managed::derive_from(envelope, work)) } async fn process( diff --git a/relay-server/src/services/buffer/mod.rs b/relay-server/src/services/buffer/mod.rs index cbac8862ac8..32812f469f1 100644 --- a/relay-server/src/services/buffer/mod.rs +++ b/relay-server/src/services/buffer/mod.rs @@ -26,12 +26,12 @@ use crate::services::outcome::DiscardReason; use crate::services::outcome::Outcome; use crate::services::outcome::TrackOutcome; use crate::services::processor::{EnvelopeProcessor, ProcessEnvelope}; -use crate::services::projects::cache::{CheckedEnvelope, ProjectCacheHandle, ProjectChange}; +use crate::services::projects::cache::{ProjectCacheHandle, ProjectChange}; use crate::statsd::RelayCounters; use crate::MemoryChecker; use crate::MemoryStat; -use crate::managed::ManagedEnvelope; +use crate::managed::{Managed, ManagedEnvelope}; // pub for benchmarks pub use envelope_buffer::EnvelopeBufferError; @@ -192,13 +192,13 @@ impl ObservableEnvelopeBuffer { /// Attempts to push an envelope into the envelope buffer. /// /// Returns `false`, if the envelope buffer does not have enough capacity. - pub fn try_push(&self, mut envelope: ManagedEnvelope) -> bool { + pub fn try_push(&self, envelope: Managed>) -> Result<(), Managed>> { if self.has_capacity() { + let envelope = envelope.into(); self.addr.send(EnvelopeBuffer::Push(envelope)); - true + Ok(()) } else { - envelope.reject(Outcome::Invalid(DiscardReason::Internal)); - false + Err(envelope) } } @@ -530,20 +530,21 @@ impl EnvelopeBufferService { let sampling_project_info = sampling_project_info .filter(|info| info.organization_id == own_project_info.organization_id); - let managed_envelope = ManagedEnvelope::new(envelope, services.outcome_aggregator.clone()); + let mut managed_envelope = + Managed::from_envelope(envelope, services.outcome_aggregator.clone()); - let Ok(CheckedEnvelope { - envelope: Some(managed_envelope), - .. - }) = own_project.check_envelope(managed_envelope).await - else { + if own_project + .check_envelope(&mut managed_envelope) + .await + .is_err() + { // Outcomes are emitted by `check_envelope`. return Ok(()); }; let reservoir_counters = own_project.reservoir_counters().clone(); services.envelope_processor.send(ProcessEnvelope { - envelope: managed_envelope, + envelope: managed_envelope.into(), project_info: own_project_info.clone(), rate_limits: own_project.rate_limits().current_limits(), sampling_project_info: sampling_project_info.clone(), diff --git a/relay-server/src/services/projects/cache/mod.rs b/relay-server/src/services/projects/cache/mod.rs index c3f94e298a8..9099f50b1fd 100644 --- a/relay-server/src/services/projects/cache/mod.rs +++ b/relay-server/src/services/projects/cache/mod.rs @@ -4,5 +4,5 @@ mod service; mod state; pub use self::handle::ProjectCacheHandle; -pub use self::project::{CheckedEnvelope, Project}; +pub use self::project::Project; pub use self::service::{ProjectCache, ProjectCacheService, ProjectChange}; diff --git a/relay-server/src/services/projects/cache/project.rs b/relay-server/src/services/projects/cache/project.rs index 9502c8eaf0e..658755e021f 100644 --- a/relay-server/src/services/projects/cache/project.rs +++ b/relay-server/src/services/projects/cache/project.rs @@ -5,8 +5,9 @@ use relay_dynamic_config::Feature; use relay_quotas::{CachedRateLimits, DataCategory, MetricNamespaceScoping, RateLimits}; use relay_sampling::evaluation::ReservoirCounters; +use crate::Envelope; use crate::envelope::ItemType; -use crate::managed::ManagedEnvelope; +use crate::managed::{Managed, Rejected}; use crate::services::outcome::{DiscardReason, Outcome}; use crate::services::projects::cache::state::SharedProject; use crate::services::projects::project::ProjectState; @@ -48,17 +49,21 @@ impl<'a> Project<'a> { /// - Validate origins and public keys /// - Quotas with a limit of `0` /// - Cached rate limits + /// + /// Removes rate limited items from the envelope and returns active rate limits. pub async fn check_envelope( &self, - mut envelope: ManagedEnvelope, - ) -> Result { + envelope: &mut Managed>, + ) -> Result> { let state = match self.state() { ProjectState::Enabled(state) => Some(Arc::clone(state)), ProjectState::Disabled => { // TODO(jjbayer): We should refactor this function to either return a Result or // handle envelope rejections internally, but not both. - envelope.reject(Outcome::Invalid(DiscardReason::ProjectId)); - return Err(DiscardReason::ProjectId); + let err = envelope + .reject_err(Outcome::Invalid(DiscardReason::ProjectId)) + .map(|_| DiscardReason::ProjectId); + return Err(err); } ProjectState::Pending => None, }; @@ -66,12 +71,13 @@ impl<'a> Project<'a> { let mut scoping = envelope.scoping(); if let Some(ref state) = state { - scoping = state.scope_request(envelope.envelope().meta()); + scoping = state.scope_request(envelope.meta()); envelope.scope(scoping); - if let Err(reason) = state.check_envelope(envelope.envelope(), self.config) { - envelope.reject(Outcome::Invalid(reason)); - return Err(reason); + if let Err(reason) = state.check_envelope(envelope, self.config) { + return Err(envelope + .reject_err(Outcome::Invalid(reason)) + .map(|_| reason)); } } @@ -83,9 +89,8 @@ impl<'a> Project<'a> { async move { Ok(current_limits.check_with_quotas(quotas, item_scoping)) } }); - let (mut enforcement, mut rate_limits) = envelope_limiter - .compute(envelope.envelope_mut(), &scoping) - .await?; + let (mut enforcement, mut rate_limits) = + envelope_limiter.compute(envelope, &scoping).await?; let check_nested_spans = state .as_ref() @@ -95,54 +100,31 @@ impl<'a> Project<'a> { // spans to correctly emit negative outcomes in case the transaction itself is dropped. if check_nested_spans { relay_statsd::metric!(timer(RelayTimers::CheckNestedSpans), { - sync_spans_to_enforcement(&envelope, &mut enforcement); + sync_spans_to_enforcement(envelope, &mut enforcement); }); } - enforcement.apply_with_outcomes(&mut envelope); - - envelope.update(); + enforcement.apply_to_managed(envelope); // Special case: Expose active rate limits for all metric namespaces if there is at least // one metrics item in the Envelope to communicate backoff to SDKs. This is necessary // because `EnvelopeLimiter` cannot not check metrics without parsing item contents. - if envelope.envelope().items().any(|i| i.ty().is_metrics()) { + if envelope.items().any(|i| i.ty().is_metrics()) { let mut metrics_scoping = scoping.item(DataCategory::MetricBucket); metrics_scoping.namespace = MetricNamespaceScoping::Any; rate_limits.merge(current_limits.check_with_quotas(quotas, metrics_scoping)); } - let envelope = if envelope.envelope().is_empty() { - // Individual rate limits have already been issued above - envelope.reject(Outcome::RateLimited(None)); - None - } else { - Some(envelope) - }; - - Ok(CheckedEnvelope { - envelope, - rate_limits, - }) + Ok(rate_limits) } } -/// A checked envelope and associated rate limits. -/// -/// Items violating the rate limits have been removed from the envelope. If all items are removed -/// from the envelope, `None` is returned in place of the envelope. -#[derive(Debug)] -pub struct CheckedEnvelope { - pub envelope: Option, - pub rate_limits: RateLimits, -} - /// Adds category limits for the nested spans inside a transaction. /// /// On the fast path of rate limiting, we do not have nested spans of a transaction extracted /// as top-level spans, thus if we limited a transaction, we want to count and emit negative /// outcomes for each of the spans nested inside that transaction. -fn sync_spans_to_enforcement(envelope: &ManagedEnvelope, enforcement: &mut Enforcement) { +fn sync_spans_to_enforcement(envelope: &Envelope, enforcement: &mut Enforcement) { if !enforcement.is_event_active() { return; } @@ -164,14 +146,13 @@ fn sync_spans_to_enforcement(envelope: &ManagedEnvelope, enforcement: &mut Enfor } /// Counts the nested spans inside the first transaction envelope item inside the [`Envelope`](crate::envelope::Envelope). -fn count_nested_spans(envelope: &ManagedEnvelope) -> usize { +fn count_nested_spans(envelope: &Envelope) -> usize { #[derive(Debug, serde::Deserialize)] struct PartialEvent { spans: crate::utils::SeqCount, } envelope - .envelope() .items() .find(|item| *item.ty() == ItemType::Transaction && !item.spans_extracted()) .and_then(|item| serde_json::from_slice::(&item.payload()).ok()) @@ -277,9 +258,9 @@ mod tests { let (outcome_aggregator, mut outcome_aggregator_rx) = relay_system::Addr::custom(); - let managed_envelope = ManagedEnvelope::new(envelope, outcome_aggregator.clone()); + let mut managed_envelope = Managed::from_envelope(envelope, outcome_aggregator.clone()); - project.check_envelope(managed_envelope).await.unwrap(); + project.check_envelope(&mut managed_envelope).await.unwrap(); drop(outcome_aggregator); let expected = [ diff --git a/relay-server/src/utils/rate_limits.rs b/relay-server/src/utils/rate_limits.rs index b66ca004f58..23bdaba5ddf 100644 --- a/relay-server/src/utils/rate_limits.rs +++ b/relay-server/src/utils/rate_limits.rs @@ -10,7 +10,7 @@ use relay_quotas::{ use crate::envelope::{Envelope, Item, ItemType}; use crate::integrations::Integration; -use crate::managed::ManagedEnvelope; +use crate::managed::{Managed, ManagedEnvelope}; use crate::services::outcome::Outcome; /// Name of the rate limits header. @@ -506,6 +506,26 @@ impl Enforcement { self.track_outcomes(envelope); } + /// Applies the [`Enforcement`] on the [`Envelope`] by removing all items that were rate limited + /// and emits outcomes for each rate limited category. + /// + /// Works exactly like [`Self::apply_with_outcomes`], but instead operates on [`Managed`] + /// instead of [`ManagedEnvelope`]. + pub fn apply_to_managed(self, envelope: &mut Managed>) { + envelope.modify(|envelope, records| { + envelope.retain_items(|item| self.retain_item(item)); + + // This is a workaround for `sync_spans_to_enforcement` where Relay modifies the + // enforcement in a way where outcome counts no longer verifiable. + records.lenient(DataCategory::Span); + records.lenient(DataCategory::SpanIndexed); + + for (outcome, category, quantity) in self.get_outcomes() { + records.reject_err(outcome, (category, quantity)) + } + }); + } + /// Returns `true` when an [`Item`] can be retained, `false` otherwise. fn retain_item(&self, item: &mut Item) -> bool { // Remove event items and all items that depend on this event @@ -666,7 +686,7 @@ where /// clients are allowed to continue sending them. pub async fn compute( mut self, - envelope: &mut Envelope, + envelope: &Envelope, scoping: &'a Scoping, ) -> Result<(Enforcement, RateLimits), E> { let mut summary = EnvelopeSummary::compute(envelope); diff --git a/relay-server/src/utils/sizes.rs b/relay-server/src/utils/sizes.rs index 3d01fdc3589..d8de95ad851 100644 --- a/relay-server/src/utils/sizes.rs +++ b/relay-server/src/utils/sizes.rs @@ -2,7 +2,7 @@ use relay_config::Config; use crate::envelope::{AttachmentType, Envelope, ItemType}; use crate::integrations::Integration; -use crate::managed::{ItemAction, ManagedEnvelope}; +use crate::managed::Managed; use crate::services::outcome::{DiscardAttachmentType, DiscardItemType}; /// Checks for size limits of items in this envelope. @@ -147,20 +147,33 @@ pub fn check_envelope_size_limits( /// /// If Relay is configured to drop unknown items, this function removes them from the Envelope. All /// known items will be retained. -pub fn remove_unknown_items(config: &Config, envelope: &mut ManagedEnvelope) { - if !config.accept_unknown_items() { - envelope.retain_items(|item| match item.ty() { - ItemType::Unknown(ty) => { - relay_log::debug!("dropping unknown item of type '{ty}'"); - ItemAction::DropSilently - } - _ => match item.attachment_type() { - Some(AttachmentType::Unknown(ty)) => { - relay_log::debug!("dropping unknown attachment of type '{ty}'"); - ItemAction::DropSilently +pub fn remove_unknown_items(config: &Config, envelope: &mut Managed>) { + if config.accept_unknown_items() { + return; + } + + envelope.modify(|envelope, records| { + envelope.retain_items(|item| { + let retain = match item.ty() { + ItemType::Unknown(ty) => { + relay_log::debug!("dropping unknown item of type '{ty}'"); + false } - _ => ItemAction::Keep, - }, + _ => match item.attachment_type() { + Some(AttachmentType::Unknown(ty)) => { + relay_log::debug!("dropping unknown attachment of type '{ty}'"); + false + } + _ => true, + }, + }; + + if !retain { + // Reject items silently, without outcomes. + records.reject_err(None, &*item); + } + + retain }); - } + }); }