Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions relay-server/src/endpoints/attachments.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,10 @@ pub async fn handle(
meta: RequestMeta,
Path(path): Path<AttachmentPath>,
multipart: ConstrainedMultipart,
) -> Result<impl IntoResponse, BadStoreRequest> {
) -> axum::response::Result<impl IntoResponse> {
let envelope = extract_envelope(meta, path, multipart, state.config()).await?;
common::handle_envelope(&state, envelope).await?;
common::handle_envelope(&state, envelope)
.await?
.ignore_rate_limits();
Ok(StatusCode::CREATED)
}
141 changes: 97 additions & 44 deletions relay-server/src/endpoints/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use relay_statsd::metric;
use serde::Deserialize;

use crate::envelope::{AttachmentType, Envelope, EnvelopeError, Item, ItemType, Items};
use crate::managed::ManagedEnvelope;
use crate::managed::{Managed, Rejected};
use crate::service::ServiceState;
use crate::services::buffer::ProjectKeyPair;
use crate::services::outcome::{DiscardItemType, DiscardReason, Outcome};
Expand Down Expand Up @@ -269,31 +269,40 @@ pub fn event_id_from_items(items: &Items) -> Result<Option<EventId>, BadStoreReq
/// returned and the envelope is not queued.
fn queue_envelope(
state: &ServiceState,
mut managed_envelope: ManagedEnvelope,
) -> Result<(), BadStoreRequest> {
let envelope = managed_envelope.envelope_mut();

mut envelope: Managed<Box<Envelope>>,
) -> Result<(), Rejected<BadStoreRequest>> {
if state.config().relay_mode() != RelayMode::Proxy {
// Remove metrics from the envelope and queue them directly on the project's `Aggregator`.
// In proxy mode, we cannot aggregate metrics because we may not have a project ID.
let is_metric = |i: &Item| matches!(i.ty(), ItemType::Statsd | ItemType::MetricBuckets);
let metric_items = envelope.take_items_by(is_metric);

if !metric_items.is_empty() {
relay_log::trace!("sending metrics into processing queue");
let metrics;
(envelope, metrics) = envelope.split_once(|mut envelope| {
let metrics = envelope.take_items_by(is_metric).into_vec();
(envelope, metrics)
});

metrics.accept(|metrics| {
if metrics.is_empty() {
return;
}

state.processor().send(ProcessMetrics {
data: MetricData::Raw(metric_items.into_vec()),
data: MetricData::Raw(metrics),
received_at: envelope.received_at(),
sent_at: envelope.sent_at(),
project_key: envelope.meta().public_key(),
source: BucketSource::from_meta(envelope.meta()),
});
}
})
}

let pkp = ProjectKeyPair::from_envelope(&*envelope);
if !state.envelope_buffer(pkp).try_push(managed_envelope) {
return Err(BadStoreRequest::QueueFailed);
let pkp = ProjectKeyPair::from_envelope(&envelope);
if let Err(envelope) = state.envelope_buffer(pkp).try_push(envelope) {
return Err(envelope.reject_err((
Outcome::Invalid(DiscardReason::Internal),
BadStoreRequest::QueueFailed,
)));
}

Ok(())
Expand All @@ -310,66 +319,110 @@ fn queue_envelope(
pub async fn handle_envelope(
state: &ServiceState,
envelope: Box<Envelope>,
) -> Result<Option<EventId>, BadStoreRequest> {
) -> Result<HandledEnvelope, Rejected<BadStoreRequest>> {
emit_envelope_metrics(&envelope);

let mut envelope = Managed::from_envelope(envelope, state.outcome_aggregator().clone());

if state.memory_checker().check_memory().is_exceeded() {
return Err(BadStoreRequest::QueueFailed);
return Err(envelope.reject_err((
Outcome::Invalid(DiscardReason::Internal),
BadStoreRequest::QueueFailed,
)));
};

let mut managed_envelope = ManagedEnvelope::new(envelope, state.outcome_aggregator().clone());

// If configured, remove unknown items at the very beginning. If the envelope is
// empty, we fail the request with a special control flow error to skip checks and
// queueing, that still results in a `200 OK` response.
utils::remove_unknown_items(state.config(), &mut managed_envelope);

let event_id = managed_envelope.envelope().event_id();
if managed_envelope.envelope().is_empty() {
managed_envelope.reject(Outcome::Invalid(DiscardReason::EmptyEnvelope));
return Ok(event_id);
utils::remove_unknown_items(state.config(), &mut envelope);

let event_id = envelope.event_id();
if envelope.is_empty() {
return Ok(HandledEnvelope {
event_id,
rate_limits: Default::default(),
});
}

let project_key = managed_envelope.envelope().meta().public_key();
let project_key = envelope.meta().public_key();

// Prefetch sampling project key, current spooling implementations rely on this behavior.
//
// To be changed once spool v1 has been removed.
if let Some(sampling_project_key) = managed_envelope.envelope().sampling_key()
if let Some(sampling_project_key) = envelope.sampling_key()
&& sampling_project_key != project_key
{
state.project_cache_handle().fetch(sampling_project_key);
}

let checked = state
let rate_limits = state
.project_cache_handle()
.get(project_key)
.check_envelope(managed_envelope)
.check_envelope(&mut envelope)
.await
.map_err(BadStoreRequest::EventRejected)?;
.map_err(|err| err.map(BadStoreRequest::EventRejected))?;

let Some(mut managed_envelope) = checked.envelope else {
// All items have been removed from the envelope.
return Err(BadStoreRequest::RateLimited(checked.rate_limits));
};
if envelope.is_empty() {
return Err(envelope.reject_err((None, BadStoreRequest::RateLimited(rate_limits))));
}

if let Err(offender) =
utils::check_envelope_size_limits(state.config(), managed_envelope.envelope())
{
managed_envelope.reject(Outcome::Invalid(DiscardReason::TooLarge(offender)));
return Err(BadStoreRequest::Overflow(offender));
if let Err(offender) = utils::check_envelope_size_limits(state.config(), &envelope) {
return Err(envelope.reject_err((
Outcome::Invalid(DiscardReason::TooLarge(offender)),
BadStoreRequest::Overflow(offender),
)));
}

queue_envelope(state, managed_envelope)?;
queue_envelope(state, envelope)?;

if checked.rate_limits.is_limited() {
Ok(HandledEnvelope {
event_id,
// Even if some envelope items have been queued, there might be active rate limits on
// other items. Communicate these rate limits to the downstream (Relay or SDK).
//
// See `IntoResponse` implementation of `BadStoreRequest`.
Err(BadStoreRequest::RateLimited(checked.rate_limits))
} else {
Ok(event_id)
rate_limits,
})
}

/// A successfully handled envelope, returned by [`handle_envelope`].
#[derive(Debug)]
#[must_use = "rate limits of a handled envelope must be used"]
pub struct HandledEnvelope {
/// The event id of the envelope.
pub event_id: Option<EventId>,
/// All active, but not necessarily enforced, rate limits.
///
/// These rate limits should always be communicated to the client on envelope endpoints.
///
/// See also: [`BadStoreRequest::RateLimited`].
pub rate_limits: RateLimits,
}

impl HandledEnvelope {
/// Ensures all active rate limits are handled as an error.
///
/// This is legacy behaviour where active rate limits are returned as an error, instead of
/// being added to the usual response.
/// The event id in this legacy behaviour is only returned when there are no active rate
/// limits.
///
/// The functions simplifies this legacy handling by turning rate limits into an error again.
pub fn ensure_rate_limits(self) -> Result<Option<EventId>, BadStoreRequest> {
if self.rate_limits.is_limited() {
return Err(BadStoreRequest::RateLimited(self.rate_limits));
}
Ok(self.event_id)
}

/// Explicitly ignores contained active rate limits.
///
/// Endpoints which choose to not propagate active rate limits, should use this method to
/// explicitly state the fact they do not propagate the rate limits.
///
/// Most endpoints ignore active rate limits, they are mostly used in envelope based endpoints.
///
/// Note: enforced rate limits are still returned as an error from [`handle_envelope`].
pub fn ignore_rate_limits(self) -> Option<EventId> {
self.event_id
}
}

Expand Down
6 changes: 4 additions & 2 deletions relay-server/src/endpoints/envelope.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,11 @@ struct StoreResponse {
async fn handle(
state: ServiceState,
params: EnvelopeParams,
) -> Result<impl IntoResponse, BadStoreRequest> {
) -> axum::response::Result<impl IntoResponse> {
let envelope = params.extract_envelope()?;
let id = common::handle_envelope(&state, envelope).await?;
let id = common::handle_envelope(&state, envelope)
.await?
.ensure_rate_limits()?;
Ok(Json(StoreResponse { id }))
}

Expand Down
8 changes: 6 additions & 2 deletions relay-server/src/endpoints/integrations/otlp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@ mod traces {
.with_required_feature(Feature::OtelTracesEndpoint)
.build();

common::handle_envelope(&state, envelope).await?;
common::handle_envelope(&state, envelope)
.await?
.ignore_rate_limits();

Ok(StatusCode::ACCEPTED)
}
Expand Down Expand Up @@ -72,7 +74,9 @@ mod logs {
.with_required_feature(Feature::OtelLogsEndpoint)
.build();

common::handle_envelope(&state, envelope).await?;
common::handle_envelope(&state, envelope)
.await?
.ignore_rate_limits();

Ok(StatusCode::ACCEPTED)
}
Expand Down
4 changes: 3 additions & 1 deletion relay-server/src/endpoints/integrations/vercel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,9 @@ mod logs {
.with_required_feature(Feature::VercelLogDrainEndpoint)
.build();

common::handle_envelope(&state, envelope).await?;
common::handle_envelope(&state, envelope)
.await?
.ignore_rate_limits();

Ok(StatusCode::ACCEPTED)
}
Expand Down
5 changes: 4 additions & 1 deletion relay-server/src/endpoints/minidump.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,10 @@ async fn handle(
let id = envelope.event_id();

// Never respond with a 429 since clients often retry these
match common::handle_envelope(&state, envelope).await {
match common::handle_envelope(&state, envelope)
.await
.map_err(|err| err.into_inner())
{
Ok(_) | Err(BadStoreRequest::RateLimited(_)) => (),
Err(error) => return Err(error.into()),
};
Expand Down
5 changes: 4 additions & 1 deletion relay-server/src/endpoints/monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,10 @@ async fn handle(
envelope.add_item(item);

// Never respond with a 429
match common::handle_envelope(&state, envelope).await {
match common::handle_envelope(&state, envelope)
.await
.map_err(|err| err.into_inner())
{
Ok(_) | Err(BadStoreRequest::RateLimited(_)) => (),
Err(error) => return Err(error.into()),
};
Expand Down
7 changes: 5 additions & 2 deletions relay-server/src/endpoints/nel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ async fn handle(
state: ServiceState,
mime: Mime,
params: NelReportParams,
) -> Result<impl IntoResponse, BadStoreRequest> {
) -> axum::response::Result<impl IntoResponse> {
if !is_nel_mime(mime) {
return Ok(StatusCode::UNSUPPORTED_MEDIA_TYPE.into_response());
}
Expand All @@ -55,7 +55,10 @@ async fn handle(
envelope.add_item(report_item);
}

common::handle_envelope(&state, envelope).await?;
common::handle_envelope(&state, envelope)
.await?
.ignore_rate_limits();

Ok(().into_response())
}

Expand Down
5 changes: 4 additions & 1 deletion relay-server/src/endpoints/playstation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,10 @@ async fn handle(
let id = envelope.event_id();

// Never respond with a 429 since clients often retry these
match common::handle_envelope(&state, envelope).await {
match common::handle_envelope(&state, envelope)
.await
.map_err(|err| err.into_inner())
{
Ok(_) | Err(BadStoreRequest::RateLimited(_)) => (),
Err(error) => return Err(error.into()),
};
Expand Down
7 changes: 5 additions & 2 deletions relay-server/src/endpoints/security_report.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,13 +95,16 @@ async fn handle(
state: ServiceState,
mime: Mime,
params: SecurityReportParams,
) -> Result<impl IntoResponse, BadStoreRequest> {
) -> axum::response::Result<impl IntoResponse> {
if !is_security_mime(mime) {
return Ok(StatusCode::UNSUPPORTED_MEDIA_TYPE.into_response());
}

let envelope = params.extract_envelope()?;
common::handle_envelope(&state, envelope).await?;
common::handle_envelope(&state, envelope)
.await?
.ignore_rate_limits();

Ok(().into_response())
}

Expand Down
19 changes: 13 additions & 6 deletions relay-server/src/endpoints/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,16 +108,21 @@ async fn handle_post(
meta: RequestMeta,
content_type: RawContentType,
body: Bytes,
) -> Result<impl IntoResponse, BadStoreRequest> {
) -> axum::response::Result<impl IntoResponse> {
let envelope = match content_type.as_ref() {
envelope::CONTENT_TYPE => Envelope::parse_request(body, meta)?,
envelope::CONTENT_TYPE => {
Envelope::parse_request(body, meta).map_err(BadStoreRequest::InvalidEnvelope)?
}
_ => parse_event(body, meta, state.config())?,
};
if envelope.is_internal() {
return Err(BadStoreRequest::InternalEnvelope);
return Err(BadStoreRequest::InternalEnvelope.into());
}

let id = common::handle_envelope(&state, envelope).await?;
let id = common::handle_envelope(&state, envelope)
.await?
.ensure_rate_limits()?;

Ok(axum::Json(PostResponse { id }).into_response())
}

Expand All @@ -140,9 +145,11 @@ async fn handle_get(
state: ServiceState,
meta: RequestMeta,
Query(query): Query<GetQuery>,
) -> Result<impl IntoResponse, BadStoreRequest> {
) -> axum::response::Result<impl IntoResponse> {
let envelope = parse_event(query.sentry_data.into(), meta, state.config())?;
common::handle_envelope(&state, envelope).await?;
common::handle_envelope(&state, envelope)
.await?
.ensure_rate_limits()?;
Ok(([(header::CONTENT_TYPE, "image/gif")], PIXEL))
}

Expand Down
5 changes: 4 additions & 1 deletion relay-server/src/endpoints/unreal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,10 @@ async fn handle(
let id = envelope.event_id();

// Never respond with a 429 since clients often retry these
match common::handle_envelope(&state, envelope).await {
match common::handle_envelope(&state, envelope)
.await
.map_err(|err| err.into_inner())
{
Ok(_) | Err(BadStoreRequest::RateLimited(_)) => (),
Err(error) => return Err(error),
};
Expand Down
Loading
Loading