diff --git a/relay-conventions/sentry-conventions b/relay-conventions/sentry-conventions index 5cb2bb6c12..4c7cda093a 160000 --- a/relay-conventions/sentry-conventions +++ b/relay-conventions/sentry-conventions @@ -1 +1 @@ -Subproject commit 5cb2bb6c12ab979227f2c928cb6071021c58c0ac +Subproject commit 4c7cda093a241389073046f9f4e15a8ac2ccd0da diff --git a/relay-server/src/envelope/mod.rs b/relay-server/src/envelope/mod.rs index 98b0a1bb92..6fa4443d39 100644 --- a/relay-server/src/envelope/mod.rs +++ b/relay-server/src/envelope/mod.rs @@ -196,6 +196,16 @@ impl EnvelopeHeaders { } } + /// Overrides the dynamic sampling context in envelope headers. + pub fn set_dsc(&mut self, dsc: DynamicSamplingContext) { + self.trace = Some(ErrorBoundary::Ok(dsc)); + } + + /// Removes the dynamic sampling context from envelope headers. + pub fn remove_dsc(&mut self) { + self.trace = None; + } + /// Returns the timestamp when the event has been sent, according to the SDK. pub fn sent_at(&self) -> Option> { self.sent_at @@ -441,12 +451,12 @@ impl Envelope { /// Overrides the dynamic sampling context in envelope headers. pub fn set_dsc(&mut self, dsc: DynamicSamplingContext) { - self.headers.trace = Some(ErrorBoundary::Ok(dsc)); + self.headers.set_dsc(dsc); } /// Removes the dynamic sampling context from envelope headers. pub fn remove_dsc(&mut self) { - self.headers.trace = None; + self.headers.remove_dsc(); } /// Features required to process this envelope. @@ -1221,7 +1231,7 @@ mod tests { ); *Envelope::parse_bytes(bytes).unwrap() }; - envelope.set_dsc(dsc.clone()); + envelope.headers.set_dsc(dsc.clone()); assert_eq!( envelope.dsc().unwrap().transaction.as_ref().unwrap(), diff --git a/relay-server/src/managed/counted.rs b/relay-server/src/managed/counted.rs index 6ed87e3efa..1c64f52a21 100644 --- a/relay-server/src/managed/counted.rs +++ b/relay-server/src/managed/counted.rs @@ -1,3 +1,5 @@ +use std::collections::BTreeMap; + use relay_event_schema::protocol::{ OurLog, SessionAggregateItem, SessionAggregates, SessionUpdate, Span, SpanV2, TraceMetric, }; @@ -29,6 +31,27 @@ impl Counted for () { } } +impl Counted for Option { + fn quantities(&self) -> Quantities { + match self { + Some(inner) => inner.quantities(), + None => Quantities::new(), + } + } +} + +impl Counted for (T, S) +where + T: Counted, + S: Counted, +{ + fn quantities(&self) -> Quantities { + let mut v = self.0.quantities(); + v.extend(self.1.quantities()); + v + } +} + impl Counted for Item { fn quantities(&self) -> Quantities { self.quantities() @@ -175,3 +198,27 @@ where self.as_ref().quantities() } } + +impl Counted for Vec { + fn quantities(&self) -> Quantities { + let mut quantities = BTreeMap::new(); + for element in self { + for (category, size) in element.quantities() { + *quantities.entry(category).or_default() += size; + } + } + quantities.into_iter().collect() + } +} + +impl Counted for SmallVec<[T; N]> { + fn quantities(&self) -> Quantities { + let mut quantities = BTreeMap::new(); + for element in self { + for (category, size) in element.quantities() { + *quantities.entry(category).or_default() += size; + } + } + quantities.into_iter().collect() + } +} diff --git a/relay-server/src/managed/managed.rs b/relay-server/src/managed/managed.rs index f194e8e303..c9b5a3fe60 100644 --- a/relay-server/src/managed/managed.rs +++ b/relay-server/src/managed/managed.rs @@ -15,7 +15,9 @@ use relay_system::Addr; use smallvec::SmallVec; use crate::Envelope; +use crate::envelope::Item; use crate::managed::{Counted, ManagedEnvelope, Quantities}; +use crate::processing::CountRateLimited; use crate::services::outcome::{DiscardReason, Outcome, TrackOutcome}; use crate::services::processor::ProcessingError; @@ -152,6 +154,21 @@ impl Managed { Managed::from_parts(other, Arc::clone(&self.meta)) } + /// Merge the current managed item with another managed item. + /// + /// The merged tuple uses the meta of `self`. It is the responsibility of the caller to make sure + /// that this matches the `meta` of `other`. + pub fn merge(self, other: Managed) -> Managed<(T, S)> + where + S: Counted, + { + let (this, meta) = self.destructure(); + let (other, other_meta) = other.destructure(); + debug_assert!(Arc::ptr_eq(&meta, &other_meta)); + + Managed::from_parts((this, other), meta) + } + /// Original received timestamp. pub fn received_at(&self) -> DateTime { self.meta.received_at @@ -162,6 +179,13 @@ impl Managed { self.meta.scoping } + /// Get the address of the outcome aggregator. + /// + /// NOTE: This should not be exposed, only here for the transition period. + pub fn outcome_addr(&self) -> &Addr { + &self.meta.outcome_aggregator + } + /// Splits [`Self`] into two other [`Managed`] items. /// /// The two resulting managed instances together are expected to have the same outcomes as the original instance.. @@ -560,6 +584,13 @@ impl fmt::Debug for Managed { } } +impl Managed> { + pub fn transpose(self) -> Option> { + let (o, meta) = self.destructure(); + o.map(|t| Managed::from_parts(t, meta)) + } +} + impl From>> for ManagedEnvelope { fn from(value: Managed>) -> Self { let (value, meta) = value.destructure(); diff --git a/relay-server/src/processing/mod.rs b/relay-server/src/processing/mod.rs index ae05848688..cbb9f53de7 100644 --- a/relay-server/src/processing/mod.rs +++ b/relay-server/src/processing/mod.rs @@ -27,6 +27,7 @@ pub mod logs; pub mod sessions; pub mod spans; pub mod trace_metrics; +pub mod transactions; pub mod utils; /// A processor, for an arbitrary unit of work extracted from an envelope. @@ -47,7 +48,9 @@ pub trait Processor { /// Extracts a [`Self::UnitOfWork`] from a [`ManagedEnvelope`]. /// /// This is infallible, if a processor wants to report an error, - /// it should return a [`Self::UnitOfWork`] which later, can produce an error when being + /// it should return a [`Self::UnitOfWork`] which later, can produce an error when being processed. + /// + /// Returns `None` if nothing in the envelope concerns this processor. fn prepare_envelope(&self, envelope: &mut ManagedEnvelope) -> Option>; diff --git a/relay-server/src/processing/spans/mod.rs b/relay-server/src/processing/spans/mod.rs index 3c97400137..60cb71ad25 100644 --- a/relay-server/src/processing/spans/mod.rs +++ b/relay-server/src/processing/spans/mod.rs @@ -22,7 +22,7 @@ mod filter; mod integrations; mod process; #[cfg(feature = "processing")] -mod store; +pub mod store; mod validate; type Result = std::result::Result; diff --git a/relay-server/src/processing/transactions/mod.rs b/relay-server/src/processing/transactions/mod.rs new file mode 100644 index 0000000000..785857e748 --- /dev/null +++ b/relay-server/src/processing/transactions/mod.rs @@ -0,0 +1,706 @@ +use std::sync::Arc; + +use relay_base_schema::events::EventType; +use relay_dynamic_config::{ErrorBoundary, Feature}; +use relay_event_normalization::GeoIpLookup; +use relay_event_schema::protocol::{Event, Metrics, SpanV2}; +use relay_protocol::{Annotated, Empty}; +use relay_quotas::{DataCategory, RateLimits}; +#[cfg(feature = "processing")] +use relay_redis::AsyncRedisClient; +use relay_sampling::evaluation::{ReservoirCounters, ReservoirEvaluator, SamplingDecision}; +use relay_statsd::metric; +use smallvec::smallvec; + +use crate::Envelope; +use crate::envelope::{ContentType, EnvelopeHeaders, Item, ItemType}; +use crate::managed::{ + Counted, Managed, ManagedEnvelope, ManagedResult, OutcomeError, Quantities, Rejected, +}; +use crate::processing::transactions::profile::Profile; +use crate::processing::utils::attachments; +use crate::processing::utils::event::{ + EventFullyNormalized, EventMetricsExtracted, FiltersStatus, SpansExtracted, event_type, +}; +use crate::processing::utils::transaction::ExtractMetricsContext; +use crate::processing::{Forward, Processor, QuotaRateLimiter, RateLimited, utils}; +use crate::services::outcome::{DiscardReason, Outcome}; +use crate::services::processor::{ProcessingError, ProcessingExtractedMetrics}; +#[cfg(feature = "processing")] +use crate::services::store::StoreEnvelope; +use crate::statsd::{RelayCounters, RelayTimers}; +use crate::utils::{SamplingResult, should_filter}; + +#[cfg(feature = "processing")] +use crate::managed::TypedEnvelope; +#[cfg(feature = "processing")] +use crate::services::processor::ProcessingGroup; + +mod process; +pub mod profile; +pub mod spans; + +#[derive(Debug, thiserror::Error)] +pub enum Error { + #[error("invalid JSON")] + InvalidJson(#[from] serde_json::Error), + #[error("envelope processor failed")] + ProcessingFailed(#[from] ProcessingError), + #[error("rate limited")] + RateLimited(RateLimits), +} + +impl From for Error { + fn from(value: RateLimits) -> Self { + Self::RateLimited(value) + } +} + +impl OutcomeError for Error { + type Error = Self; + + fn consume(self) -> (Option, Self::Error) { + let outcome = match &self { + Self::InvalidJson(_) => Outcome::Invalid(DiscardReason::InvalidJson), + Self::ProcessingFailed(e) => match e { + ProcessingError::InvalidTransaction => { + Outcome::Invalid(DiscardReason::InvalidTransaction) + } + ProcessingError::EventFiltered(key) => Outcome::Filtered(key.clone()), + _other => { + relay_log::error!( + error = &self as &dyn std::error::Error, + "internal error: trace metric processing failed" + ); + Outcome::Invalid(DiscardReason::Internal) + } + }, + Self::RateLimited(limits) => { + let reason_code = limits.longest().and_then(|limit| limit.reason_code.clone()); + Outcome::RateLimited(reason_code) + } + }; + (Some(outcome), self) + } +} + +/// A processor for transactions. +pub struct TransactionProcessor { + limiter: Arc, + geoip_lookup: GeoIpLookup, + #[cfg(feature = "processing")] + quotas_client: Option, + #[cfg(feature = "processing")] + reservoir_counters: ReservoirCounters, +} + +impl Processor for TransactionProcessor { + type UnitOfWork = SerializedTransaction; + + type Output = TransactionOutput; + + type Error = Error; + + fn prepare_envelope( + &self, + envelope: &mut ManagedEnvelope, + ) -> Option> { + let headers = envelope.envelope().headers().clone(); + + // The envelope might contain only a profile as a leftover after dynamic sampling. + // In this case, `transaction` is `None`. + let transaction = envelope + .envelope_mut() + .take_item_by(|item| matches!(*item.ty(), ItemType::Transaction)); + + // Attachments are only allowed if a transaction exists. + let attachments = match transaction { + Some(_) => envelope + .envelope_mut() + .take_items_by(|item| matches!(*item.ty(), ItemType::Attachment)), + None => smallvec::smallvec![], // no attachments allowed. + }; + + // A profile is only allowed if a transaction exists, or if it is marked as not sampled, + // in which case it is a leftover from a transaction that was dropped by dynamic sampling. + let profile = envelope.envelope_mut().take_item_by(|item| { + matches!(*item.ty(), ItemType::Profile) && (transaction.is_some() || !item.sampled()) + }); + + if transaction.is_none() && profile.is_none() && attachments.is_empty() { + return None; + } + + let work = SerializedTransaction { + headers, + transaction, + attachments, + profile, + }; + + Some(Managed::from_envelope(envelope, work)) + } + + async fn process( + &self, + work: Managed, + mut ctx: super::Context<'_>, + ) -> Result, Rejected> { + let project_id = work.scoping().project_id; + let transaction = work.transaction.as_ref(); + let mut flags = Flags { + metrics_extracted: transaction.is_some_and(|i| i.metrics_extracted()), + spans_extracted: transaction.is_some_and(|i| i.spans_extracted()), + fully_normalized: work.headers.meta().request_trust().is_trusted() + && work + .transaction + .as_ref() + .map_or(false, Item::fully_normalized), + }; + let mut metrics = Metrics::default(); + let mut extracted_metrics = ProcessingExtractedMetrics::new(); + + let mut work = work.try_map(|w, _| { + let SerializedTransaction { + headers, + transaction: transaction_item, + attachments, + profile, + } = w; + let mut transaction = Annotated::empty(); + if let Some(transaction_item) = transaction_item.as_ref() { + transaction = metric!(timer(RelayTimers::EventProcessingDeserialize), { + Annotated::::from_json_bytes(&transaction_item.payload()) + })?; + if let Some(event) = transaction.value_mut() { + event.ty = EventType::Transaction.into(); + } + } + Ok::<_, Error>(ExpandedTransaction { + headers, + transaction: Transaction(transaction), + flags, + attachments, + profile, + extracted_spans: vec![], + }) + })?; + + let mut profile_id = None; + let mut run_dynamic_sampling = false; + work.try_modify(|work, record_keeper| { + if let Some(profile_item) = work.profile.as_mut() { + let feature = Feature::Profiling; + if should_filter(ctx.config, ctx.project_info, feature) { + record_keeper.reject_err( + Outcome::Invalid(DiscardReason::FeatureDisabled(feature)), + work.profile.take(), + ); + } else if work.transaction.0.value().is_none() && profile_item.sampled() { + // A profile with `sampled=true` should never be without a transaction + record_keeper.reject_err( + Outcome::Invalid(DiscardReason::Profiling("missing_transaction")), + work.profile.take(), + ); + } else { + match relay_profiling::parse_metadata(&profile_item.payload(), project_id) { + Ok(id) => { + profile_id = Some(id); + } + Err(err) => { + record_keeper.reject_err( + Outcome::Invalid(DiscardReason::Profiling( + relay_profiling::discard_reason(err), + )), + work.profile.take(), + ); + } + } + } + } + + let event = &mut work.transaction.0; + profile::transfer_id(event, profile_id); + + utils::dsc::validate_and_set_dsc(&mut work.headers, event, &mut ctx); + + utils::event::finalize( + &work.headers, + event, + work.attachments.iter(), + &mut metrics, + &ctx.config, + ); + + work.flags.fully_normalized = utils::event::normalize( + &work.headers, + event, + EventFullyNormalized(work.flags.fully_normalized), + project_id, + &ctx, + &self.geoip_lookup, + )?.0; + + let filter_run = utils::event::filter(&work.headers, event, &ctx) + .map_err(|e| ProcessingError::EventFiltered(e))?; + + // Always run dynamic sampling on processing Relays, + // but delay decision until inbound filters have been fully processed. + // Also, we require transaction metrics to be enabled before sampling. + run_dynamic_sampling = (matches!(filter_run, FiltersStatus::Ok) + || ctx.config.processing_enabled()) + && matches!(&ctx.project_info.config.transaction_metrics, Some(ErrorBoundary::Ok(c)) if c.is_enabled()); + + Ok::<_,Error>(()) + })?; + + let sampling_result = match run_dynamic_sampling { + true => { + #[allow(unused_mut)] + let mut reservoir = ReservoirEvaluator::new(Arc::clone(&self.reservoir_counters)); + #[cfg(feature = "processing")] + if let Some(quotas_client) = self.quotas_client.as_ref() { + reservoir.set_redis(work.scoping().organization_id, quotas_client); + } + utils::dynamic_sampling::run( + work.headers.dsc(), + &work.transaction.0, + &ctx, + Some(&reservoir), + ) + .await + } + false => SamplingResult::Pending, + }; + + relay_statsd::metric!( + counter(RelayCounters::SamplingDecision) += 1, + decision = sampling_result.decision().as_str(), + item = "transaction" + ); + + #[cfg(feature = "processing")] + let server_sample_rate = sampling_result.sample_rate(); + + if let Some(outcome) = sampling_result.into_dropped_outcome() { + // Process profiles before dropping the transaction, if necessary. + // Before metric extraction to make sure the profile count is reflected correctly. + + work.try_modify(|work, r| { + if let Some(profile) = work.profile.as_mut() { + profile.set_sampled(false); + let result = profile::process( + profile, + work.headers.meta().client_addr(), + work.transaction.0.value(), + &ctx, + ); + if let Err(outcome) = result { + r.reject_err(outcome, work.profile.take()); + } + } + // Extract metrics here, we're about to drop the event/transaction. + work.flags.metrics_extracted = utils::transaction::extract_metrics( + &mut work.transaction.0, + &mut extracted_metrics, + ExtractMetricsContext { + dsc: work.headers.dsc(), + project_id, + ctx: &ctx, + sampling_decision: SamplingDecision::Drop, + metrics_extracted: EventMetricsExtracted(work.flags.metrics_extracted), + spans_extracted: work.flags.spans_extracted, + }, + )? + .0; + Ok::<_, Error>(()) + }); + + // .map_err(Error::ProcessingFailed) + // .reject(&work)?; + + let (work, profile) = work.split_once(|mut work| { + let profile = work.profile.take(); + (work, profile) + }); + + // reject everything but the profile: + // FIXME: track non-extracted spans as well. + // -> Type TransactionWithEmbeddedSpans + let _ = work.reject_err(outcome); + + // If we have a profile left, we need to make sure there is quota for this profile. + let profile = profile.transpose(); + if let Some(profile) = profile { + let mut profile = profile.map(|p, _| Profile(p)); + self.limiter.enforce_quotas(&mut profile, ctx).await?; + } + + let metrics = work.wrap(extracted_metrics.into_inner()); + return Ok(super::Output { + main: Some(TransactionOutput(work)), + metrics: Some(metrics), + }); + } + + // let _post_ds = cogs.start_category("post_ds"); // FIXME + + // Need to scrub the transaction before extracting spans. + // + // Unconditionally scrub to make sure PII is removed as early as possible. + + work.try_modify(|work, _| { + utils::event::scrub(&mut work.transaction.0, ctx.project_info)?; + utils::attachments::scrub(work.attachments.iter_mut(), ctx.project_info); + Ok::<_, Error>(()) + }); + + if cfg!(feature = "processing") && ctx.config.processing_enabled() { + // Process profiles before extracting metrics, to make sure they are removed if they are invalid. + let mut profile_id = None; + work.try_modify(|work, r| { + if let Some(profile) = work.profile.as_mut() { + profile.set_sampled(false); + let result = profile::process( + profile, + work.headers.meta().client_addr(), + work.transaction.0.value(), + &ctx, + ); + match result { + Err(outcome) => { + r.reject_err(outcome, work.profile.take()); + } + Ok(p) => profile_id = Some(p), + } + } + + profile::transfer_id(&mut work.transaction.0, profile_id); + profile::scrub_profiler_id(&mut work.transaction.0); + + // Always extract metrics in processing Relays for sampled items. + work.flags.metrics_extracted = utils::transaction::extract_metrics( + &mut work.transaction.0, + &mut extracted_metrics, + ExtractMetricsContext { + dsc: work.headers.dsc(), + project_id, + ctx: &ctx, + sampling_decision: SamplingDecision::Drop, + metrics_extracted: EventMetricsExtracted(work.flags.metrics_extracted), + spans_extracted: work.flags.spans_extracted, + }, + )? + .0; + + if let Some(results) = spans::extract_from_event( + work.headers.dsc(), + &work.transaction.0, + ctx.global_config, + ctx.config, + server_sample_rate, + EventMetricsExtracted(work.flags.metrics_extracted), + SpansExtracted(work.flags.spans_extracted), + ) { + work.flags.spans_extracted = true; + for result in results { + match result { + Ok(item) => work.extracted_spans.push(item), + Err(_) => r.reject_err( + Outcome::Invalid(DiscardReason::InvalidSpan), + SpanV2::default(), + ), + } + } + } + + Ok::<_, Error>(()) + }); + } + + self.limiter.enforce_quotas(&mut work, ctx).await?; + + if ctx.config.processing_enabled() && !work.flags.fully_normalized { + relay_log::error!( + tags.project = %project_id, + tags.ty = event_type(&work.transaction.0).map(|e| e.to_string()).unwrap_or("none".to_owned()), + "ingested event without normalizing" + ); + }; + + let metrics = work.wrap(extracted_metrics.into_inner()); + Ok(super::Output { + main: Some(TransactionOutput(work)), + metrics: Some(metrics), + }) + } +} + +/// A transaction in its serialized state, as transported in an envelope. +#[derive(Debug)] +pub struct SerializedTransaction { + headers: EnvelopeHeaders, + transaction: Option, + attachments: smallvec::SmallVec<[Item; 3]>, + profile: Option, +} + +impl SerializedTransaction { + fn items(&self) -> impl Iterator { + let Self { + headers: _, + transaction, + attachments, + profile, + } = self; + transaction + .into_iter() + .chain(attachments.into_iter()) + .chain(profile.into_iter()) + } +} + +impl Counted for SerializedTransaction { + fn quantities(&self) -> Quantities { + let mut quantities = Quantities::new(); + // IDEA: `#[derive(Counted)]` + for item in self.items() { + // NOTE: This assumes non-overlapping item quantities. + quantities.extend(item.quantities()); + } + quantities + } +} + +#[derive(Debug)] +pub struct ExpandedTransaction { + headers: EnvelopeHeaders, + transaction: Transaction, // might be empty + flags: Flags, + attachments: smallvec::SmallVec<[Item; 3]>, + profile: Option, + extracted_spans: Vec, +} + +impl ExpandedTransaction { + fn serialize_envelope(self) -> Result, serde_json::Error> { + let Self { + headers, + transaction: Transaction(event), + flags: + Flags { + metrics_extracted, + spans_extracted, + fully_normalized, + }, + attachments, + profile, + extracted_spans, + } = self; + + let mut items = smallvec![]; + if !event.is_empty() { + let data = metric!(timer(RelayTimers::EventProcessingSerialization), { + event.to_json()? + }); + let mut item = Item::new(ItemType::Transaction); + item.set_payload(ContentType::Json, data); + + item.set_metrics_extracted(metrics_extracted); + item.set_spans_extracted(spans_extracted); + item.set_fully_normalized(fully_normalized); + + items.push(item); + } + items.extend(attachments); + items.extend(profile.into_iter()); + items.extend(extracted_spans); + + Ok(Envelope::from_parts(headers, items)) + } +} + +impl Counted for ExpandedTransaction { + fn quantities(&self) -> Quantities { + let mut quantities = Quantities::new(); + let Self { + headers: _, + transaction, + flags: _, // TODO: might be used to conditionally count embedded spans. + attachments, + profile, + extracted_spans, + } = self; + + quantities.extend(transaction.quantities()); + quantities.extend(attachments.quantities()); + quantities.extend(profile.quantities()); + quantities.extend(extracted_spans.quantities()); + + quantities + } +} + +impl RateLimited for Managed { + type Error = Error; + + async fn enforce( + &mut self, + mut rate_limiter: T, + ctx: super::Context<'_>, + ) -> Result<(), Rejected> + where + T: super::RateLimiter, + { + let scoping = self.scoping(); + + let ExpandedTransaction { + headers: _, + transaction, + flags, + attachments, + profile, + extracted_spans, + } = self.as_ref(); + + // If there is a transaction limit, drop everything. + // This also affects profiles that lost their transaction due to sampling. + for (category, quantity) in transaction.quantities() { + let limits = rate_limiter + .try_consume(scoping.item(category), quantity) + .await; + + if !limits.is_empty() { + return Err(self.reject_err(Error::from(limits))); + } + } + + let attachment_quantities = attachments.quantities(); + let span_quantities = extracted_spans.quantities(); + + // Check profile limits: + for (category, quantity) in profile.quantities() { + let limits = rate_limiter + .try_consume(scoping.item(category), quantity) + .await; + + if !limits.is_empty() { + self.modify(|this, record_keeper| { + record_keeper.reject_err(Error::from(limits), this.profile.take()); + }); + } + } + + // Check attachment limits: + for (category, quantity) in attachment_quantities { + let limits = rate_limiter + .try_consume(scoping.item(category), quantity) + .await; + + if !limits.is_empty() { + self.modify(|this, record_keeper| { + record_keeper + .reject_err(Error::from(limits), std::mem::take(&mut this.attachments)); + }); + } + } + + // Check attachment limits: + for (category, quantity) in span_quantities { + let limits = rate_limiter + .try_consume(scoping.item(category), quantity) + .await; + + if !limits.is_empty() { + self.modify(|this, record_keeper| { + record_keeper.reject_err( + Error::from(limits), + std::mem::take(&mut this.extracted_spans), + ); + }); + } + } + + Ok(()) + } +} + +#[derive(Debug)] +struct Flags { + metrics_extracted: bool, + spans_extracted: bool, + fully_normalized: bool, +} + +#[derive(Debug)] +struct Transaction(Annotated); + +impl Counted for Transaction { + fn quantities(&self) -> Quantities { + match self.0.value() { + Some(_) => smallvec![ + (DataCategory::TransactionIndexed, 1), + (DataCategory::Transaction, 1), + ], + None => smallvec![], + } + } +} + +// pub struct CountSpans<'a>(&'a Event); + +// impl Counted for CountSpans<'_> { +// fn quantities(&self) -> Quantities { +// let mut quantities = self.0.quantities(); +// quantities.extend(self.0.spans.quantities()); +// quantities +// } +// } + +#[derive(Debug)] +pub struct TransactionOutput(Managed); + +impl Forward for TransactionOutput { + fn serialize_envelope( + self, + ctx: super::ForwardContext<'_>, + ) -> Result>, Rejected<()>> { + self.0.try_map(|output, _| { + output + .serialize_envelope() + .map_err(drop) + .with_outcome(Outcome::Invalid(DiscardReason::Internal)) + }) + } + + #[cfg(feature = "processing")] + fn forward_store( + self, + s: &relay_system::Addr, + ctx: super::ForwardContext<'_>, + ) -> Result<(), Rejected<()>> { + let Self(output) = self; + let outcome_addr = output.outcome_addr().clone(); + + // pass ownership to store service. + output + .try_accept(|output| { + // TODO: split out spans here. + let envelope = output + .serialize_envelope() + .map_err(|_| Outcome::Invalid(DiscardReason::Internal))?; + + let envelope = ManagedEnvelope::new(envelope, outcome_addr); + let envelope = TypedEnvelope::::try_from(( + envelope, + ProcessingGroup::Transaction, + )) + .map_err(|_| Outcome::Invalid(DiscardReason::Internal))? + .into_processed(); + + s.send(StoreEnvelope { envelope }); + Ok::<_, Outcome>(()) + }) + .map_err(|e| e.map(|_| ())) + } +} diff --git a/relay-server/src/processing/transactions/process.rs b/relay-server/src/processing/transactions/process.rs new file mode 100644 index 0000000000..8b13789179 --- /dev/null +++ b/relay-server/src/processing/transactions/process.rs @@ -0,0 +1 @@ + diff --git a/relay-server/src/processing/transactions/profile.rs b/relay-server/src/processing/transactions/profile.rs new file mode 100644 index 0000000000..061a38e908 --- /dev/null +++ b/relay-server/src/processing/transactions/profile.rs @@ -0,0 +1,248 @@ +//! Profiles related processor code. +use relay_dynamic_config::{Feature, GlobalConfig}; +use std::net::IpAddr; + +use relay_base_schema::events::EventType; +use relay_config::Config; +use relay_event_schema::protocol::{Contexts, Event, ProfileContext}; +use relay_filter::ProjectFiltersConfig; +use relay_profiling::{ProfileError, ProfileId}; +use relay_protocol::Annotated; +#[cfg(feature = "processing")] +use relay_protocol::{Getter, Remark, RemarkType}; + +use crate::envelope::{ContentType, Item, ItemType}; +use crate::managed::{Counted, Managed}; +use crate::processing::transactions::Error; +use crate::processing::{Context, CountRateLimited}; +use crate::services::outcome::{DiscardReason, Outcome}; + +pub struct Profile(pub Item); + +impl Counted for Profile { + fn quantities(&self) -> crate::managed::Quantities { + self.0.quantities() + } +} + +impl CountRateLimited for Managed { + type Error = super::Error; +} + +/// Transfers the profile ID from the profile item to the transaction item. +/// +/// The profile id may be `None` when the envelope does not contain a profile, +/// in that case the profile context is removed. +/// Some SDKs send transactions with profile ids but omit the profile in the envelope. +pub fn transfer_id(event: &mut Annotated, profile_id: Option) { + let Some(event) = event.value_mut() else { + return; + }; + + match profile_id { + Some(profile_id) => { + let contexts = event.contexts.get_or_insert_with(Contexts::new); + contexts.add(ProfileContext { + profile_id: Annotated::new(profile_id), + ..ProfileContext::default() + }); + } + None => { + if let Some(contexts) = event.contexts.value_mut() + && let Some(profile_context) = contexts.get_mut::() + { + profile_context.profile_id = Annotated::empty(); + } + } + } +} + +/// Processes profiles and set the profile ID in the profile context on the transaction if successful. +#[must_use] +pub fn process( + profile: &mut Item, + client_ip: Option, + event: Option<&Event>, + ctx: &Context, +) -> Result { + debug_assert_eq!(profile.ty(), &ItemType::Profile); + let filter_settings = &ctx.project_info.config.filter_settings; + let profiling_enabled = ctx.project_info.has_feature(Feature::Profiling); + + if !profiling_enabled { + return Err(Outcome::Invalid(DiscardReason::FeatureDisabled( + Feature::Profiling, + ))); + } + + let Some(event) = event else { + return Err(Outcome::Invalid(DiscardReason::NoEventPayload)); + }; + + expand_profile( + profile, + event, + ctx.config, + client_ip, + filter_settings, + ctx.global_config, + ) +} + +/// Strip out the profiler_id from the transaction's profile context if the transaction lasts less than 20ms. +/// +/// This is necessary because if the transaction lasts less than 19.8ms, we know that the respective +/// profile data won't have enough samples to be of any use, hence we "unlink" the profile from the transaction. +#[cfg(feature = "processing")] +pub fn scrub_profiler_id(event: &mut Annotated) { + let Some(event) = event.value_mut() else { + return; + }; + let transaction_duration = event + .get_value("event.duration") + .and_then(|duration| duration.as_f64()); + + if !transaction_duration.is_some_and(|duration| duration < 19.8) { + return; + } + if let Some(contexts) = event.contexts.value_mut().as_mut() + && let Some(profiler_id) = contexts + .get_mut::() + .map(|ctx| &mut ctx.profiler_id) + { + let id = std::mem::take(profiler_id.value_mut()); + let remark = Remark::new(RemarkType::Removed, "transaction_duration"); + profiler_id.meta_mut().add_remark(remark); + profiler_id.meta_mut().set_original_value(id); + } +} + +/// Transfers transaction metadata to profile and check its size. +fn expand_profile( + item: &mut Item, + event: &Event, + config: &Config, + client_ip: Option, + filter_settings: &ProjectFiltersConfig, + global_config: &GlobalConfig, +) -> Result { + match relay_profiling::expand_profile( + &item.payload(), + event, + client_ip, + filter_settings, + global_config, + ) { + Ok((id, payload)) => { + if payload.len() <= config.max_profile_size() { + item.set_payload(ContentType::Json, payload); + Ok(id) + } else { + Err(Outcome::Invalid(DiscardReason::Profiling( + relay_profiling::discard_reason(relay_profiling::ProfileError::ExceedSizeLimit), + ))) + } + } + Err(relay_profiling::ProfileError::Filtered(filter_stat_key)) => { + Err(Outcome::Filtered(filter_stat_key)) + } + Err(err) => Err(Outcome::Invalid(DiscardReason::Profiling( + relay_profiling::discard_reason(err), + ))), + } +} + +#[cfg(test)] +#[cfg(feature = "processing")] +mod tests { + use super::*; + use chrono::{Duration, TimeZone, Utc}; + use relay_event_schema::protocol::EventId; + use relay_protocol::get_value; + use uuid::Uuid; + + #[test] + fn test_scrub_profiler_id_should_not_be_stripped() { + let mut contexts = Contexts::new(); + contexts.add(ProfileContext { + profiler_id: Annotated::new(EventId( + Uuid::parse_str("52df9022835246eeb317dbd739ccd059").unwrap(), + )), + ..Default::default() + }); + let mut event: Annotated = Annotated::new(Event { + ty: Annotated::new(EventType::Transaction), + start_timestamp: Annotated::new( + Utc.with_ymd_and_hms(2000, 1, 1, 0, 0, 0).unwrap().into(), + ), + timestamp: Annotated::new( + Utc.with_ymd_and_hms(2000, 1, 1, 0, 0, 0) + .unwrap() + .checked_add_signed(Duration::milliseconds(20)) + .unwrap() + .into(), + ), + contexts: Annotated::new(contexts), + ..Default::default() + }); + + scrub_profiler_id(&mut event); + + let profile_context = get_value!(event.contexts) + .unwrap() + .get::() + .unwrap(); + + assert!( + !profile_context + .profiler_id + .meta() + .iter_remarks() + .any(|remark| remark.rule_id == *"transaction_duration" + && remark.ty == RemarkType::Removed) + ) + } + + #[cfg(feature = "processing")] + #[test] + fn test_scrub_profiler_id_should_be_stripped() { + let mut contexts = Contexts::new(); + contexts.add(ProfileContext { + profiler_id: Annotated::new(EventId( + Uuid::parse_str("52df9022835246eeb317dbd739ccd059").unwrap(), + )), + ..Default::default() + }); + let mut event: Annotated = Annotated::new(Event { + ty: Annotated::new(EventType::Transaction), + start_timestamp: Annotated::new( + Utc.with_ymd_and_hms(2000, 1, 1, 0, 0, 0).unwrap().into(), + ), + timestamp: Annotated::new( + Utc.with_ymd_and_hms(2000, 1, 1, 0, 0, 0) + .unwrap() + .checked_add_signed(Duration::milliseconds(15)) + .unwrap() + .into(), + ), + contexts: Annotated::new(contexts), + ..Default::default() + }); + + scrub_profiler_id(&mut event); + + let profile_context = get_value!(event.contexts) + .unwrap() + .get::() + .unwrap(); + + assert!( + profile_context + .profiler_id + .meta() + .iter_remarks() + .any(|remark| remark.rule_id == *"transaction_duration" + && remark.ty == RemarkType::Removed) + ) + } +} diff --git a/relay-server/src/processing/transactions/spans.rs b/relay-server/src/processing/transactions/spans.rs new file mode 100644 index 0000000000..2f40b14836 --- /dev/null +++ b/relay-server/src/processing/transactions/spans.rs @@ -0,0 +1,434 @@ +use std::error::Error; + +use crate::envelope::{ContentType, Item, ItemType}; +use crate::managed::RecordKeeper; +use crate::processing::utils::event::{EventMetricsExtracted, SpansExtracted, event_type}; +use crate::services::outcome::{DiscardReason, Outcome}; + +use crate::services::projects::project::ProjectInfo; +use crate::{processing, utils}; +use chrono::DateTime; +use relay_base_schema::events::EventType; +use relay_config::Config; +use relay_dynamic_config::GlobalConfig; +use relay_event_schema::protocol::{Event, Measurement, Measurements, Span, SpanV2}; +use relay_metrics::{FractionUnit, MetricNamespace, MetricUnit}; +use relay_protocol::{Annotated, Empty}; +use relay_sampling::DynamicSamplingContext; + +#[allow(clippy::too_many_arguments)] +pub fn extract_from_event( + dsc: Option<&DynamicSamplingContext>, + event: &Annotated, + global_config: &GlobalConfig, + config: &Config, + server_sample_rate: Option, + event_metrics_extracted: EventMetricsExtracted, + spans_extracted: SpansExtracted, +) -> Option>> { + // Only extract spans from transactions (not errors). + if event_type(event) != Some(EventType::Transaction) { + return None; + }; + + if spans_extracted.0 { + return None; + } + + if let Some(sample_rate) = global_config.options.span_extraction_sample_rate + && utils::sample(sample_rate).is_discard() + { + return None; + } + + let client_sample_rate = dsc.and_then(|ctx| ctx.sample_rate); + + let event = event.value()?; + + let transaction_span = processing::utils::transaction::extract_segment_span( + event, + config + .aggregator_config_for(MetricNamespace::Spans) + .max_tag_value_length, + &[], + )?; + + let mut results = vec![]; + + // Add child spans. + if let Some(spans) = event.spans.value() { + for span in spans { + let inner_span = span.value()?; + // HACK: clone the span to set the segment_id. This should happen + // as part of normalization once standalone spans reach wider adoption. + let mut new_span = inner_span.clone(); + new_span.is_segment = Annotated::new(false); + new_span.is_remote = Annotated::new(false); + new_span.received = transaction_span.received.clone(); + new_span.segment_id = transaction_span.segment_id.clone(); + new_span.platform = transaction_span.platform.clone(); + + // If a profile is associated with the transaction, also associate it with its + // child spans. + new_span.profile_id = transaction_span.profile_id.clone(); + + results.push(make_span_item( + new_span, + config, + client_sample_rate, + server_sample_rate, + event_metrics_extracted.0, + )); + } + } + + results.push(make_span_item( + transaction_span, + config, + client_sample_rate, + server_sample_rate, + event_metrics_extracted.0, + )); + + Some(results) +} + +fn make_span_item( + mut span: Span, + config: &Config, + client_sample_rate: Option, + server_sample_rate: Option, + metrics_extracted: bool, +) -> Result { + add_sample_rate( + &mut span.measurements, + "client_sample_rate", + client_sample_rate, + ); + add_sample_rate( + &mut span.measurements, + "server_sample_rate", + server_sample_rate, + ); + + let mut span = Annotated::new(span); + + validate(&mut span) + .inspect_err(|e| { + relay_log::error!( + error = e as &dyn Error, + span = ?span, + source = "event", + "invalid span" + ); + }) + .map_err(|_| ())?; + + let mut item = create_span_item(span, config)?; + // If metrics extraction happened for the event, it also happened for its spans: + item.set_metrics_extracted(metrics_extracted); + + relay_log::trace!("Adding span to envelope"); + Ok(item) +} + +#[derive(thiserror::Error, Debug)] +pub enum ValidationError { + #[error("empty span")] + EmptySpan, + #[error("span is missing `trace_id`")] + MissingTraceId, + #[error("span is missing `span_id`")] + MissingSpanId, + #[error("span is missing `timestamp`")] + MissingTimestamp, + #[error("span is missing `start_timestamp`")] + MissingStartTimestamp, + #[error("span end must be after start")] + EndBeforeStartTimestamp, + #[error("span is missing `exclusive_time`")] + MissingExclusiveTime, +} + +/// We do not extract or ingest spans with missing fields if those fields are required on the Kafka topic. +pub fn validate(span: &mut Annotated) -> Result<(), ValidationError> { + let inner = span + .value_mut() + .as_mut() + .ok_or(ValidationError::EmptySpan)?; + let Span { + exclusive_time, + tags, + sentry_tags, + start_timestamp, + timestamp, + span_id, + trace_id, + .. + } = inner; + + trace_id.value().ok_or(ValidationError::MissingTraceId)?; + span_id.value().ok_or(ValidationError::MissingSpanId)?; + + match (start_timestamp.value(), timestamp.value()) { + (Some(start), Some(end)) if end < start => Err(ValidationError::EndBeforeStartTimestamp), + (Some(_), Some(_)) => Ok(()), + (_, None) => Err(ValidationError::MissingTimestamp), + (None, _) => Err(ValidationError::MissingStartTimestamp), + }?; + + exclusive_time + .value() + .ok_or(ValidationError::MissingExclusiveTime)?; + + if let Some(sentry_tags) = sentry_tags.value_mut() { + if sentry_tags + .group + .value() + .is_some_and(|s| s.len() > 16 || s.chars().any(|c| !c.is_ascii_hexdigit())) + { + sentry_tags.group.set_value(None); + } + + if sentry_tags + .status_code + .value() + .is_some_and(|s| s.parse::().is_err()) + { + sentry_tags.group.set_value(None); + } + } + if let Some(tags) = tags.value_mut() { + tags.retain(|_, value| !value.value().is_empty()) + } + + Ok(()) +} + +pub fn create_span_item(span: Annotated, config: &Config) -> Result { + let mut new_item = Item::new(ItemType::Span); + if cfg!(feature = "processing") && config.processing_enabled() { + let span_v2 = span.map_value(relay_spans::span_v1_to_span_v2); + let payload = match span_v2.to_json() { + Ok(payload) => payload, + Err(err) => { + relay_log::error!("failed to serialize span V2: {}", err); + return Err(()); + } + }; + if let Some(trace_id) = span_v2.value().and_then(|s| s.trace_id.value()) { + new_item.set_routing_hint(*trace_id.as_ref()); + } + + new_item.set_payload(ContentType::Json, payload); + } else { + let payload = match span.to_json() { + Ok(payload) => payload, + Err(err) => { + relay_log::error!("failed to serialize span: {}", err); + return Err(()); + } + }; + new_item.set_payload(ContentType::Json, payload); + } + + Ok(new_item) +} + +fn add_sample_rate(measurements: &mut Annotated, name: &str, value: Option) { + let value = match value { + Some(value) if value > 0.0 => value, + _ => return, + }; + + let measurement = Annotated::new(Measurement { + value: Annotated::try_from(value), + unit: MetricUnit::Fraction(FractionUnit::Ratio).into(), + }); + + measurements + .get_or_insert_with(Measurements::default) + .insert(name.to_owned(), measurement); +} + +#[cfg(test)] +mod tests { + + use std::collections::BTreeMap; + use std::sync::Arc; + + use bytes::Bytes; + use relay_dynamic_config::GlobalConfig; + use relay_event_schema::protocol::{ + Context, ContextInner, Contexts, Span, Timestamp, TraceContext, + }; + use relay_system::Addr; + + use crate::Envelope; + use crate::managed::{ManagedEnvelope, TypedEnvelope}; + use crate::services::processor::{ProcessingGroup, TransactionGroup}; + + use super::*; + + fn params() -> ( + TypedEnvelope, + Annotated, + Arc, + ) { + let bytes = Bytes::from( + r#"{"event_id":"9ec79c33ec9942ab8353589fcb2e04dc","dsn":"https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42","trace":{"trace_id":"89143b0763095bd9c9955e8175d1fb23","public_key":"e12d836b15bb49d7bbf99e64295d995b","sample_rate":"0.2"}} +{"type":"transaction"} +{} +"#, + ); + + let dummy_envelope = Envelope::parse_bytes(bytes).unwrap(); + let project_info = Arc::new(ProjectInfo::default()); + + let event = Event { + ty: EventType::Transaction.into(), + start_timestamp: Timestamp(DateTime::from_timestamp(0, 0).unwrap()).into(), + timestamp: Timestamp(DateTime::from_timestamp(1, 0).unwrap()).into(), + contexts: Contexts(BTreeMap::from([( + "trace".into(), + ContextInner(Context::Trace(Box::new(TraceContext { + trace_id: Annotated::new("4c79f60c11214eb38604f4ae0781bfb2".parse().unwrap()), + span_id: Annotated::new("fa90fdead5f74053".parse().unwrap()), + exclusive_time: 1000.0.into(), + ..Default::default() + }))) + .into(), + )])) + .into(), + ..Default::default() + }; + + let managed_envelope = ManagedEnvelope::new(dummy_envelope, Addr::dummy()); + let managed_envelope = (managed_envelope, ProcessingGroup::Transaction) + .try_into() + .unwrap(); + + let event = Annotated::from(event); + + (managed_envelope, event, project_info) + } + + #[test] + fn extract_sampled_default() { + let global_config = GlobalConfig::default(); + assert!(global_config.options.span_extraction_sample_rate.is_none()); + let (mut managed_envelope, event, _) = params(); + let spans = extract_from_event( + None, + &event, + &global_config, + &Default::default(), + None, + EventMetricsExtracted(false), + SpansExtracted(false), + ) + .unwrap(); + assert!( + spans + .iter() + .any(|item| item.as_ref().unwrap().ty() == &ItemType::Span), + "{:?}", + managed_envelope.envelope() + ); + } + + #[test] + fn extract_sampled_explicit() { + let mut global_config = GlobalConfig::default(); + global_config.options.span_extraction_sample_rate = Some(1.0); + let (mut managed_envelope, event, _) = params(); + let spans = extract_from_event( + None, + &event, + &global_config, + &Default::default(), + None, + EventMetricsExtracted(false), + SpansExtracted(false), + ) + .unwrap(); + assert!( + spans + .iter() + .any(|item| item.as_ref().unwrap().ty() == &ItemType::Span), + "{:?}", + managed_envelope.envelope() + ); + } + + #[test] + fn extract_sampled_dropped() { + let mut global_config = GlobalConfig::default(); + global_config.options.span_extraction_sample_rate = Some(0.0); + let (mut managed_envelope, event, _) = params(); + let spans = extract_from_event( + None, + &event, + &global_config, + &Default::default(), + None, + EventMetricsExtracted(false), + SpansExtracted(false), + ) + .unwrap(); + assert!( + !spans + .iter() + .any(|item| item.as_ref().unwrap().ty() == &ItemType::Span), + "{:?}", + managed_envelope.envelope() + ); + } + + #[test] + fn extract_sample_rates() { + let mut global_config = GlobalConfig::default(); + global_config.options.span_extraction_sample_rate = Some(1.0); // force enable + let (mut managed_envelope, event, _) = params(); // client sample rate is 0.2 + let spans = extract_from_event( + None, + &event, + &global_config, + &Default::default(), + Some(0.1), + EventMetricsExtracted(false), + SpansExtracted(false), + ) + .unwrap(); + + let span = spans + .into_iter() + .find(|item| item.as_ref().unwrap().ty() == &ItemType::Span) + .unwrap() + .unwrap(); + + let span = Annotated::::from_json_bytes(&span.payload()).unwrap(); + let measurements = span.value().and_then(|s| s.measurements.value()); + + insta::assert_debug_snapshot!(measurements, @r###" + Some( + Measurements( + { + "client_sample_rate": Measurement { + value: 0.2, + unit: Fraction( + Ratio, + ), + }, + "server_sample_rate": Measurement { + value: 0.1, + unit: Fraction( + Ratio, + ), + }, + }, + ), + ) + "###); + } +} diff --git a/relay-server/src/processing/utils/attachments.rs b/relay-server/src/processing/utils/attachments.rs new file mode 100644 index 0000000000..0e7ac50002 --- /dev/null +++ b/relay-server/src/processing/utils/attachments.rs @@ -0,0 +1,204 @@ +use std::error::Error; +use std::time::Instant; + +use relay_pii::{PiiAttachmentsProcessor, SelectorPathItem, SelectorSpec}; +use relay_statsd::metric; + +use crate::envelope::{AttachmentType, ContentType, Item, ItemType}; +use crate::statsd::RelayTimers; + +use crate::managed::TypedEnvelope; +use crate::services::projects::project::ProjectInfo; +use relay_dynamic_config::Feature; +#[cfg(feature = "processing")] +use { + crate::services::processor::ErrorGroup, + crate::utils, + relay_event_schema::protocol::{Event, Metrics}, + relay_protocol::Annotated, +}; + +/// Apply data privacy rules to attachments in the envelope. +/// +/// This only applies the new PII rules that explicitly select `ValueType::Binary` or one of the +/// attachment types. When special attachments are detected, these are scrubbed with custom +/// logic; otherwise the entire attachment is treated as a single binary blob. +pub fn scrub<'a>(attachments: impl Iterator, project_info: &ProjectInfo) { + if let Some(ref config) = project_info.config.pii_config { + let view_hierarchy_scrubbing_enabled = project_info + .config + .features + .has(Feature::ViewHierarchyScrubbing); + for item in attachments { + debug_assert_eq!(item.ty(), &ItemType::Attachment); + if view_hierarchy_scrubbing_enabled + && item.attachment_type() == Some(&AttachmentType::ViewHierarchy) + { + scrub_view_hierarchy(item, config) + } else if item.attachment_type() == Some(&AttachmentType::Minidump) { + scrub_minidump(item, config) + } else if item.ty() == &ItemType::Attachment && has_simple_attachment_selector(config) { + // We temporarily only scrub attachments to projects that have at least one simple attachment rule, + // such as `$attachments.'foo.txt'`. + // After we have assessed the impact on performance we can relax this condition. + scrub_attachment(item, config) + } + } + } +} + +fn scrub_minidump(item: &mut crate::envelope::Item, config: &relay_pii::PiiConfig) { + debug_assert_eq!(item.attachment_type(), Some(&AttachmentType::Minidump)); + let filename = item.filename().unwrap_or_default(); + let mut payload = item.payload().to_vec(); + + let processor = PiiAttachmentsProcessor::new(config.compiled()); + + // Minidump scrubbing can fail if the minidump cannot be parsed. In this case, we + // must be conservative and treat it as a plain attachment. Under extreme + // conditions, this could destroy stack memory. + let start = Instant::now(); + match processor.scrub_minidump(filename, &mut payload) { + Ok(modified) => { + metric!( + timer(RelayTimers::MinidumpScrubbing) = start.elapsed(), + status = if modified { "ok" } else { "n/a" }, + ); + } + Err(scrub_error) => { + metric!( + timer(RelayTimers::MinidumpScrubbing) = start.elapsed(), + status = "error" + ); + relay_log::debug!( + error = &scrub_error as &dyn Error, + "failed to scrub minidump", + ); + metric!( + timer(RelayTimers::AttachmentScrubbing), + attachment_type = "minidump", + { + processor.scrub_attachment(filename, &mut payload); + } + ) + } + } + + let content_type = item + .content_type() + .unwrap_or(&ContentType::Minidump) + .clone(); + + item.set_payload(content_type, payload); +} + +fn scrub_view_hierarchy(item: &mut crate::envelope::Item, config: &relay_pii::PiiConfig) { + let processor = PiiAttachmentsProcessor::new(config.compiled()); + + let payload = item.payload(); + let start = Instant::now(); + match processor.scrub_json(&payload) { + Ok(output) => { + metric!( + timer(RelayTimers::ViewHierarchyScrubbing) = start.elapsed(), + status = "ok" + ); + let content_type = item.content_type().unwrap_or(&ContentType::Json).clone(); + item.set_payload(content_type, output); + } + Err(e) => { + relay_log::debug!(error = &e as &dyn Error, "failed to scrub view hierarchy",); + metric!( + timer(RelayTimers::ViewHierarchyScrubbing) = start.elapsed(), + status = "error" + ) + } + } +} + +fn has_simple_attachment_selector(config: &relay_pii::PiiConfig) -> bool { + for application in &config.applications { + if let SelectorSpec::Path(vec) = &application.0 { + let Some([a, b]) = vec.get(0..2) else { + continue; + }; + if matches!( + a, + SelectorPathItem::Type(relay_event_schema::processor::ValueType::Attachments) + ) && matches!(b, SelectorPathItem::Key(_)) + { + return true; + } + } + } + false +} + +fn scrub_attachment(item: &mut crate::envelope::Item, config: &relay_pii::PiiConfig) { + let filename = item.filename().unwrap_or_default(); + let mut payload = item.payload().to_vec(); + + let processor = PiiAttachmentsProcessor::new(config.compiled()); + let attachment_type_tag = match item.attachment_type() { + Some(t) => t.to_string(), + None => "".to_owned(), + }; + metric!( + timer(RelayTimers::AttachmentScrubbing), + attachment_type = &attachment_type_tag, + { + processor.scrub_attachment(filename, &mut payload); + } + ); + + item.set_payload_without_content_type(payload); +} + +#[cfg(test)] +mod tests { + use relay_pii::PiiConfig; + + use super::*; + + #[test] + fn matches_attachment_selector() { + let config = r#"{ + "rules": {"0": {"type": "ip", "redaction": {"method": "remove"}}}, + "applications": {"$attachments.'foo.txt'": ["0"]} + }"#; + let config: PiiConfig = serde_json::from_str(config).unwrap(); + assert!(has_simple_attachment_selector(&config)); + } + + #[test] + fn does_not_match_wildcard() { + let config = r#"{ + "rules": {}, + "applications": {"$attachments.**":["0"]} + }"#; + let config: PiiConfig = serde_json::from_str(config).unwrap(); + assert!(!has_simple_attachment_selector(&config)); + } + + #[test] + fn does_not_match_empty() { + let config = r#"{ + "rules": {}, + "applications": {} + }"#; + let config: PiiConfig = serde_json::from_str(config).unwrap(); + assert!(!has_simple_attachment_selector(&config)); + } + + #[test] + fn does_not_match_something_else() { + let config = r#"{ + "rules": {}, + "applications": { + "**": ["0"] + } + }"#; + let config: PiiConfig = serde_json::from_str(config).unwrap(); + assert!(!has_simple_attachment_selector(&config)); + } +} diff --git a/relay-server/src/processing/utils/dsc.rs b/relay-server/src/processing/utils/dsc.rs new file mode 100644 index 0000000000..9d3850c4e7 --- /dev/null +++ b/relay-server/src/processing/utils/dsc.rs @@ -0,0 +1,101 @@ +use relay_base_schema::{events::EventType, project::ProjectKey}; +use relay_event_schema::protocol::{Event, TraceContext}; +use relay_protocol::Annotated; +use relay_sampling::{DynamicSamplingContext, dsc::TraceUserContext}; + +use crate::envelope::EnvelopeHeaders; +use crate::processing::Context; + +/// Ensures there is a valid dynamic sampling context and corresponding project state. +/// +/// The dynamic sampling context (DSC) specifies the project_key of the project that initiated +/// the trace. That project state should have been loaded previously by the project cache and is +/// available on the `ProcessEnvelopeState`. Under these conditions, this cannot happen: +/// +/// - There is no DSC in the envelope headers. This occurs with older or third-party SDKs. +/// - The project key does not exist. This can happen if the project key was disabled, the +/// project removed, or in rare cases when a project from another Sentry instance is referred +/// to. +/// - The project key refers to a project from another organization. In this case the project +/// cache does not resolve the state and instead leaves it blank. +/// - The project state could not be fetched. This is a runtime error, but in this case Relay +/// should fall back to the next-best sampling rule set. +/// +/// In all of the above cases, this function will compute a new DSC using information from the +/// event payload, similar to how SDKs do this. The `sampling_project_state` is also switched to +/// the main project state. +/// +/// If there is no transaction event in the envelope, this function will do nothing. +/// +/// The function will return the sampling project information of the root project for the event. If +/// no sampling project information is specified, the project information of the event’s project +/// will be returned. +pub fn validate_and_set_dsc( + headers: &mut EnvelopeHeaders, + event: &Annotated, + ctx: &mut Context, +) { + let original_dsc = headers.dsc(); + if original_dsc.is_some() && ctx.sampling_project_info.is_some() { + return; + } + + // The DSC can only be computed if there's a transaction event. Note that `dsc_from_event` + // below already checks for the event type. + if let Some(event) = event.value() + && let Some(key_config) = ctx.project_info.get_public_key_config() + && let Some(mut dsc) = dsc_from_event(key_config.public_key, event) + { + // All other information in the DSC must be discarded, but the sample rate was + // actually applied by the client and is therefore correct. + let original_sample_rate = original_dsc.and_then(|dsc| dsc.sample_rate); + dsc.sample_rate = dsc.sample_rate.or(original_sample_rate); + + headers.set_dsc(dsc); + + ctx.sampling_project_info = Some(ctx.project_info); + return; + } + + // If we cannot compute a new DSC but the old one is incorrect, we need to remove it. + headers.remove_dsc(); +} + +/// Computes a dynamic sampling context from a transaction event. +/// +/// Returns `None` if the passed event is not a transaction event, or if it does not contain a +/// trace ID in its trace context. All optional fields in the dynamic sampling context are +/// populated with the corresponding attributes from the event payload if they are available. +/// +/// Since sampling information is not available in the event payload, the `sample_rate` field +/// cannot be set when computing the dynamic sampling context from a transaction event. +pub fn dsc_from_event(public_key: ProjectKey, event: &Event) -> Option { + if event.ty.value() != Some(&EventType::Transaction) { + return None; + } + + let trace = event.context::()?; + let trace_id = *trace.trace_id.value()?; + let user = event.user.value(); + + Some(DynamicSamplingContext { + trace_id, + public_key, + release: event.release.as_str().map(str::to_owned), + environment: event.environment.value().cloned(), + transaction: event.transaction.value().cloned(), + replay_id: None, + sample_rate: None, + user: TraceUserContext { + user_segment: user + .and_then(|u| u.segment.value().cloned()) + .unwrap_or_default(), + user_id: user + .and_then(|u| u.id.as_str()) + .unwrap_or_default() + .to_owned(), + }, + sampled: None, + other: Default::default(), + }) +} diff --git a/relay-server/src/processing/utils/dynamic_sampling.rs b/relay-server/src/processing/utils/dynamic_sampling.rs index 31e69185b6..b25af8400e 100644 --- a/relay-server/src/processing/utils/dynamic_sampling.rs +++ b/relay-server/src/processing/utils/dynamic_sampling.rs @@ -15,7 +15,7 @@ use crate::utils::SamplingResult; /// Computes the sampling decision on the incoming event pub async fn run( dsc: Option<&DynamicSamplingContext>, - event: &mut Annotated, + event: &Annotated, ctx: &Context<'_>, reservoir: Option<&ReservoirEvaluator<'_>>, ) -> SamplingResult { diff --git a/relay-server/src/processing/utils/event.rs b/relay-server/src/processing/utils/event.rs index 89ced5d1f6..3e7d3780c2 100644 --- a/relay-server/src/processing/utils/event.rs +++ b/relay-server/src/processing/utils/event.rs @@ -8,6 +8,7 @@ use std::sync::OnceLock; use chrono::Duration as SignedDuration; use relay_auth::RelayVersion; use relay_base_schema::events::EventType; +use relay_base_schema::project::ProjectId; use relay_config::Config; use relay_config::NormalizationLevel; use relay_dynamic_config::Feature; @@ -24,6 +25,7 @@ use relay_event_schema::protocol::IpAddr; use relay_event_schema::protocol::{Event, Metrics, OtelContext, RelayInfo}; use relay_filter::FilterStatKey; use relay_metrics::MetricNamespace; +use relay_pii::PiiProcessor; use relay_protocol::Annotated; use relay_protocol::Empty; use relay_quotas::DataCategory; @@ -33,6 +35,7 @@ use crate::constants::DEFAULT_EVENT_RETENTION; use crate::envelope::{Envelope, EnvelopeHeaders, Item}; use crate::processing::Context; use crate::services::processor::{MINIMUM_CLOCK_DRIFT, ProcessingError}; +use crate::services::projects::project::ProjectInfo; use crate::statsd::{RelayCounters, RelayHistograms, RelayTimers}; use crate::utils::{self}; @@ -185,6 +188,7 @@ pub fn normalize( headers: &EnvelopeHeaders, event: &mut Annotated, mut event_fully_normalized: EventFullyNormalized, + project_id: ProjectId, ctx: &Context, geoip_lookup: &GeoIpLookup, ) -> Result { @@ -244,13 +248,6 @@ pub fn normalize( ); } - // TODO: duplicated from `EnvelopeProcessorService::process`. We should pass project_id - // in the Context instead since it's already guaranteed to exist at this point. - let project_id = ctx - .project_info - .project_id - .or_else(|| headers.meta().project_id()) - .ok_or(ProcessingError::MissingProjectId)?; let normalization_config = NormalizationConfig { project_id: Some(project_id.value()), client: request_meta.client().map(str::to_owned), @@ -429,6 +426,39 @@ fn has_unprintable_fields(event: &Annotated) -> bool { } } +/// Apply data privacy rules to the event payload. +/// +/// This uses both the general `datascrubbing_settings`, as well as the the PII rules. +pub fn scrub( + event: &mut Annotated, + project_info: &ProjectInfo, +) -> Result<(), ProcessingError> { + let config = &project_info.config; + + if config.datascrubbing_settings.scrub_data + && let Some(event) = event.value_mut() + { + relay_pii::scrub_graphql(event); + } + + metric!(timer(RelayTimers::EventProcessingPii), { + if let Some(ref config) = config.pii_config { + let mut processor = PiiProcessor::new(config.compiled()); + processor::process_value(event, &mut processor, ProcessingState::root())?; + } + let pii_config = config + .datascrubbing_settings + .pii_config() + .map_err(|e| ProcessingError::PiiConfigError(e.clone()))?; + if let Some(config) = pii_config { + let mut processor = PiiProcessor::new(config.compiled()); + processor::process_value(event, &mut processor, ProcessingState::root())?; + } + }); + + Ok(()) +} + #[cfg(feature = "processing")] #[cfg(test)] mod tests { diff --git a/relay-server/src/processing/utils/mod.rs b/relay-server/src/processing/utils/mod.rs index 4681dc0508..2810f6c078 100644 --- a/relay-server/src/processing/utils/mod.rs +++ b/relay-server/src/processing/utils/mod.rs @@ -1,3 +1,5 @@ +pub mod attachments; +pub mod dsc; pub mod dynamic_sampling; pub mod event; #[cfg(feature = "processing")] diff --git a/relay-server/src/processing/utils/transaction.rs b/relay-server/src/processing/utils/transaction.rs index d4035a5464..e681070c6e 100644 --- a/relay-server/src/processing/utils/transaction.rs +++ b/relay-server/src/processing/utils/transaction.rs @@ -38,8 +38,8 @@ pub struct ExtractMetricsContext<'a> { pub project_id: ProjectId, pub ctx: &'a Context<'a>, pub sampling_decision: SamplingDecision, - pub event_metrics_extracted: EventMetricsExtracted, - pub spans_extracted: SpansExtracted, + pub metrics_extracted: EventMetricsExtracted, + pub spans_extracted: bool, } /// Extract transaction metrics. @@ -53,7 +53,7 @@ pub fn extract_metrics( project_id, ctx, sampling_decision, - event_metrics_extracted, + metrics_extracted: event_metrics_extracted, spans_extracted, } = ctx; @@ -120,7 +120,7 @@ pub fn extract_metrics( } // If spans were already extracted for an event, we rely on span processing to extract metrics. - let extract_spans = !spans_extracted.0 + let extract_spans = !spans_extracted && crate::utils::sample( ctx.global_config .options diff --git a/relay-server/src/services/processor.rs b/relay-server/src/services/processor.rs index 77f424ab90..1701b4b6ff 100644 --- a/relay-server/src/services/processor.rs +++ b/relay-server/src/services/processor.rs @@ -660,6 +660,10 @@ impl ProcessingExtractedMetrics { } } + pub fn into_inner(self) -> ExtractedMetrics { + self.metrics + } + /// Extends the contained metrics with [`ExtractedMetrics`]. pub fn extend( &mut self, @@ -1349,12 +1353,13 @@ impl EnvelopeProcessorService { &mut event, attachments, &mut metrics, - &self.inner.config, + &ctx.config, )?; event_fully_normalized = processing::utils::event::normalize( managed_envelope.envelope().headers(), &mut event, event_fully_normalized, + project_id, &ctx, &self.inner.geoip_lookup, )?; @@ -1383,7 +1388,7 @@ impl EnvelopeProcessorService { .await?; if event.value().is_some() { - event::scrub(&mut event, ctx.project_info)?; + processing::utils::event::scrub(&mut event, ctx.project_info)?; event::serialize( managed_envelope, &mut event, @@ -1394,7 +1399,11 @@ impl EnvelopeProcessorService { event::emit_feedback_metrics(managed_envelope.envelope()); } - attachment::scrub(managed_envelope, ctx.project_info); + let attachments = managed_envelope + .envelope_mut() + .items_mut() + .filter(|i| i.ty() == &ItemType::Attachment); + processing::utils::attachments::scrub(attachments, ctx.project_info); if self.inner.config.processing_enabled() && !event_fully_normalized.0 { relay_log::error!( @@ -1408,8 +1417,6 @@ impl EnvelopeProcessorService { } /// Processes only transactions and transaction-related items. - #[allow(unused_assignments)] - #[allow(clippy::too_many_arguments)] async fn process_transactions( &self, managed_envelope: &mut TypedEnvelope, @@ -1450,7 +1457,7 @@ impl EnvelopeProcessorService { project_id, ctx.project_info, ); - profile::transfer_id(&mut event, profile_id); + processing::transactions::profile::transfer_id(&mut event, profile_id); ctx.sampling_project_info = dynamic_sampling::validate_and_set_dsc( managed_envelope, @@ -1475,6 +1482,7 @@ impl EnvelopeProcessorService { managed_envelope.envelope().headers(), &mut event, event_fully_normalized, + project_id, &ctx, &self.inner.geoip_lookup, )?; @@ -1543,8 +1551,8 @@ impl EnvelopeProcessorService { project_id, ctx: &ctx, sampling_decision: SamplingDecision::Drop, - event_metrics_extracted, - spans_extracted, + metrics_extracted: event_metrics_extracted, + spans_extracted: spans_extracted.0, }, )?; @@ -1576,9 +1584,13 @@ impl EnvelopeProcessorService { // Need to scrub the transaction before extracting spans. // // Unconditionally scrub to make sure PII is removed as early as possible. - event::scrub(&mut event, ctx.project_info)?; + processing::utils::event::scrub(&mut event, ctx.project_info)?; - attachment::scrub(managed_envelope, ctx.project_info); + let attachments = managed_envelope + .envelope_mut() + .items_mut() + .filter(|i| i.ty() == &ItemType::Attachment); + processing::utils::attachments::scrub(attachments, ctx.project_info); if_processing!(self.inner.config, { // Process profiles before extracting metrics, to make sure they are removed if they are invalid. @@ -1589,8 +1601,8 @@ impl EnvelopeProcessorService { ctx.config, ctx.project_info, ); - profile::transfer_id(&mut event, profile_id); - profile::scrub_profiler_id(&mut event); + processing::transactions::profile::transfer_id(&mut event, profile_id); + processing::transactions::profile::scrub_profiler_id(&mut event); // Always extract metrics in processing Relays for sampled items. event_metrics_extracted = processing::utils::transaction::extract_metrics( @@ -1601,30 +1613,39 @@ impl EnvelopeProcessorService { project_id, ctx: &ctx, sampling_decision: SamplingDecision::Keep, - event_metrics_extracted, - spans_extracted, + metrics_extracted: event_metrics_extracted, + spans_extracted: spans_extracted.0, }, )?; - spans_extracted = span::extract_from_event( - managed_envelope, + if let Some(spans) = processing::transactions::spans::extract_from_event( + managed_envelope.envelope().dsc(), &event, ctx.global_config, ctx.config, server_sample_rate, event_metrics_extracted, spans_extracted, - ); + ) { + spans_extracted = SpansExtracted(true); + for item in spans { + match item { + Ok(item) => managed_envelope.envelope_mut().add_item(item), + Err(()) => managed_envelope.track_outcome( + Outcome::Invalid(DiscardReason::InvalidSpan), + DataCategory::SpanIndexed, + 1, + ), + // TODO: also `DataCategory::Span`? + } + } + } }); event = self .enforce_quotas(managed_envelope, event, &mut extracted_metrics, ctx) .await?; - if_processing!(self.inner.config, { - event = span::maybe_discard_transaction(managed_envelope, event, ctx.project_info); - }); - // Event may have been dropped because of a quota and the envelope can be empty. if event.value().is_some() { event::serialize( @@ -1702,7 +1723,11 @@ impl EnvelopeProcessorService { .await?; report::process_user_reports(managed_envelope); - attachment::scrub(managed_envelope, ctx.project_info); + let attachments = managed_envelope + .envelope_mut() + .items_mut() + .filter(|i| i.ty() == &ItemType::Transaction); + processing::utils::attachments::scrub(attachments, ctx.project_info); Ok(Some(extracted_metrics)) } diff --git a/relay-server/src/services/processor/attachment.rs b/relay-server/src/services/processor/attachment.rs index 3131191bbf..e28b2554d9 100644 --- a/relay-server/src/services/processor/attachment.rs +++ b/relay-server/src/services/processor/attachment.rs @@ -1,17 +1,8 @@ //! Attachments processor code. -use std::error::Error; -use std::time::Instant; - -use relay_pii::{PiiAttachmentsProcessor, SelectorPathItem, SelectorSpec}; -use relay_statsd::metric; - -use crate::envelope::{AttachmentType, ContentType, ItemType}; -use crate::statsd::RelayTimers; +use crate::envelope::AttachmentType; use crate::managed::TypedEnvelope; -use crate::services::projects::project::ProjectInfo; -use relay_dynamic_config::Feature; #[cfg(feature = "processing")] use { crate::services::processor::{ErrorGroup, EventFullyNormalized}, @@ -52,188 +43,3 @@ pub fn create_placeholders( None } - -/// Apply data privacy rules to attachments in the envelope. -/// -/// This only applies the new PII rules that explicitly select `ValueType::Binary` or one of the -/// attachment types. When special attachments are detected, these are scrubbed with custom -/// logic; otherwise the entire attachment is treated as a single binary blob. -pub fn scrub(managed_envelope: &mut TypedEnvelope, project_info: &ProjectInfo) { - let envelope = managed_envelope.envelope_mut(); - if let Some(ref config) = project_info.config.pii_config { - let view_hierarchy_scrubbing_enabled = project_info - .config - .features - .has(Feature::ViewHierarchyScrubbing); - for item in envelope.items_mut() { - if view_hierarchy_scrubbing_enabled - && item.attachment_type() == Some(&AttachmentType::ViewHierarchy) - { - scrub_view_hierarchy(item, config) - } else if item.attachment_type() == Some(&AttachmentType::Minidump) { - scrub_minidump(item, config) - } else if item.ty() == &ItemType::Attachment && has_simple_attachment_selector(config) { - // We temporarily only scrub attachments to projects that have at least one simple attachment rule, - // such as `$attachments.'foo.txt'`. - // After we have assessed the impact on performance we can relax this condition. - scrub_attachment(item, config) - } - } - } -} - -fn scrub_minidump(item: &mut crate::envelope::Item, config: &relay_pii::PiiConfig) { - debug_assert_eq!(item.attachment_type(), Some(&AttachmentType::Minidump)); - let filename = item.filename().unwrap_or_default(); - let mut payload = item.payload().to_vec(); - - let processor = PiiAttachmentsProcessor::new(config.compiled()); - - // Minidump scrubbing can fail if the minidump cannot be parsed. In this case, we - // must be conservative and treat it as a plain attachment. Under extreme - // conditions, this could destroy stack memory. - let start = Instant::now(); - match processor.scrub_minidump(filename, &mut payload) { - Ok(modified) => { - metric!( - timer(RelayTimers::MinidumpScrubbing) = start.elapsed(), - status = if modified { "ok" } else { "n/a" }, - ); - } - Err(scrub_error) => { - metric!( - timer(RelayTimers::MinidumpScrubbing) = start.elapsed(), - status = "error" - ); - relay_log::debug!( - error = &scrub_error as &dyn Error, - "failed to scrub minidump", - ); - metric!( - timer(RelayTimers::AttachmentScrubbing), - attachment_type = "minidump", - { - processor.scrub_attachment(filename, &mut payload); - } - ) - } - } - - let content_type = item - .content_type() - .unwrap_or(&ContentType::Minidump) - .clone(); - - item.set_payload(content_type, payload); -} - -fn scrub_view_hierarchy(item: &mut crate::envelope::Item, config: &relay_pii::PiiConfig) { - let processor = PiiAttachmentsProcessor::new(config.compiled()); - - let payload = item.payload(); - let start = Instant::now(); - match processor.scrub_json(&payload) { - Ok(output) => { - metric!( - timer(RelayTimers::ViewHierarchyScrubbing) = start.elapsed(), - status = "ok" - ); - let content_type = item.content_type().unwrap_or(&ContentType::Json).clone(); - item.set_payload(content_type, output); - } - Err(e) => { - relay_log::debug!(error = &e as &dyn Error, "failed to scrub view hierarchy",); - metric!( - timer(RelayTimers::ViewHierarchyScrubbing) = start.elapsed(), - status = "error" - ) - } - } -} - -fn has_simple_attachment_selector(config: &relay_pii::PiiConfig) -> bool { - for application in &config.applications { - if let SelectorSpec::Path(vec) = &application.0 { - let Some([a, b]) = vec.get(0..2) else { - continue; - }; - if matches!( - a, - SelectorPathItem::Type(relay_event_schema::processor::ValueType::Attachments) - ) && matches!(b, SelectorPathItem::Key(_)) - { - return true; - } - } - } - false -} - -fn scrub_attachment(item: &mut crate::envelope::Item, config: &relay_pii::PiiConfig) { - let filename = item.filename().unwrap_or_default(); - let mut payload = item.payload().to_vec(); - - let processor = PiiAttachmentsProcessor::new(config.compiled()); - let attachment_type_tag = match item.attachment_type() { - Some(t) => t.to_string(), - None => "".to_owned(), - }; - metric!( - timer(RelayTimers::AttachmentScrubbing), - attachment_type = &attachment_type_tag, - { - processor.scrub_attachment(filename, &mut payload); - } - ); - - item.set_payload_without_content_type(payload); -} - -#[cfg(test)] -mod tests { - use relay_pii::PiiConfig; - - use super::*; - - #[test] - fn matches_attachment_selector() { - let config = r#"{ - "rules": {"0": {"type": "ip", "redaction": {"method": "remove"}}}, - "applications": {"$attachments.'foo.txt'": ["0"]} - }"#; - let config: PiiConfig = serde_json::from_str(config).unwrap(); - assert!(has_simple_attachment_selector(&config)); - } - - #[test] - fn does_not_match_wildcard() { - let config = r#"{ - "rules": {}, - "applications": {"$attachments.**":["0"]} - }"#; - let config: PiiConfig = serde_json::from_str(config).unwrap(); - assert!(!has_simple_attachment_selector(&config)); - } - - #[test] - fn does_not_match_empty() { - let config = r#"{ - "rules": {}, - "applications": {} - }"#; - let config: PiiConfig = serde_json::from_str(config).unwrap(); - assert!(!has_simple_attachment_selector(&config)); - } - - #[test] - fn does_not_match_something_else() { - let config = r#"{ - "rules": {}, - "applications": { - "**": ["0"] - } - }"#; - let config: PiiConfig = serde_json::from_str(config).unwrap(); - assert!(!has_simple_attachment_selector(&config)); - } -} diff --git a/relay-server/src/services/processor/dynamic_sampling.rs b/relay-server/src/services/processor/dynamic_sampling.rs index 55f10643e7..8e5ec7f052 100644 --- a/relay-server/src/services/processor/dynamic_sampling.rs +++ b/relay-server/src/services/processor/dynamic_sampling.rs @@ -54,7 +54,8 @@ pub fn validate_and_set_dsc<'a, T>( // below already checks for the event type. if let Some(event) = event.value() && let Some(key_config) = project_info.get_public_key_config() - && let Some(mut dsc) = utils::dsc_from_event(key_config.public_key, event) + && let Some(mut dsc) = + crate::processing::utils::dsc::dsc_from_event(key_config.public_key, event) { // All other information in the DSC must be discarded, but the sample rate was // actually applied by the client and is therefore correct. diff --git a/relay-server/src/services/processor/event.rs b/relay-server/src/services/processor/event.rs index f782ad6c78..f3b0b5a7e2 100644 --- a/relay-server/src/services/processor/event.rs +++ b/relay-server/src/services/processor/event.rs @@ -4,12 +4,10 @@ use std::error::Error; use relay_base_schema::events::EventType; use relay_config::Config; -use relay_event_schema::processor::{self, ProcessingState}; use relay_event_schema::protocol::{ Breadcrumb, Csp, Event, ExpectCt, ExpectStaple, Hpkp, LenientString, Metrics, SecurityReportType, Values, }; -use relay_pii::PiiProcessor; use relay_protocol::{Annotated, Array, Empty, Object, Value}; use relay_statsd::metric; use serde_json::Value as SerdeValue; @@ -21,7 +19,6 @@ use crate::services::processor::{ EventFullyNormalized, EventMetricsExtracted, EventProcessing, ExtractedEvent, ProcessingError, SpansExtracted, event_type, }; -use crate::services::projects::project::ProjectInfo; use crate::statsd::{RelayCounters, RelayTimers}; use crate::utils::{self, ChunkedFormDataAggregator, FormDataIter}; @@ -140,39 +137,6 @@ pub fn extract( }) } -/// Apply data privacy rules to the event payload. -/// -/// This uses both the general `datascrubbing_settings`, as well as the the PII rules. -pub fn scrub( - event: &mut Annotated, - project_info: &ProjectInfo, -) -> Result<(), ProcessingError> { - let config = &project_info.config; - - if config.datascrubbing_settings.scrub_data - && let Some(event) = event.value_mut() - { - relay_pii::scrub_graphql(event); - } - - metric!(timer(RelayTimers::EventProcessingPii), { - if let Some(ref config) = config.pii_config { - let mut processor = PiiProcessor::new(config.compiled()); - processor::process_value(event, &mut processor, ProcessingState::root())?; - } - let pii_config = config - .datascrubbing_settings - .pii_config() - .map_err(|e| ProcessingError::PiiConfigError(e.clone()))?; - if let Some(config) = pii_config { - let mut processor = PiiProcessor::new(config.compiled()); - processor::process_value(event, &mut processor, ProcessingState::root())?; - } - }); - - Ok(()) -} - pub fn serialize( managed_envelope: &mut TypedEnvelope, event: &mut Annotated, diff --git a/relay-server/src/services/processor/profile.rs b/relay-server/src/services/processor/profile.rs index c30e210990..157a4ce9a8 100644 --- a/relay-server/src/services/processor/profile.rs +++ b/relay-server/src/services/processor/profile.rs @@ -30,8 +30,6 @@ pub fn filter( ) -> Option { let profiling_disabled = should_filter(config, project_info, Feature::Profiling); let has_transaction = event_type(event) == Some(EventType::Transaction); - let keep_unsampled_profiles = true; - let mut profile_id = None; managed_envelope.retain_items(|item| match item.ty() { // First profile found in the envelope, we'll keep it if metadata are valid. @@ -42,7 +40,7 @@ pub fn filter( // Drop profile without a transaction in the same envelope, // except if unsampled profiles are allowed for this project. - let profile_allowed = has_transaction || (keep_unsampled_profiles && !item.sampled()); + let profile_allowed = has_transaction || !item.sampled(); if !profile_allowed { return ItemAction::DropSilently; } @@ -67,62 +65,6 @@ pub fn filter( profile_id } -/// Transfers the profile ID from the profile item to the transaction item. -/// -/// The profile id may be `None` when the envelope does not contain a profile, -/// in that case the profile context is removed. -/// Some SDKs send transactions with profile ids but omit the profile in the envelope. -pub fn transfer_id(event: &mut Annotated, profile_id: Option) { - let Some(event) = event.value_mut() else { - return; - }; - - match profile_id { - Some(profile_id) => { - let contexts = event.contexts.get_or_insert_with(Contexts::new); - contexts.add(ProfileContext { - profile_id: Annotated::new(profile_id), - ..ProfileContext::default() - }); - } - None => { - if let Some(contexts) = event.contexts.value_mut() - && let Some(profile_context) = contexts.get_mut::() - { - profile_context.profile_id = Annotated::empty(); - } - } - } -} - -/// Strip out the profiler_id from the transaction's profile context if the transaction lasts less than 20ms. -/// -/// This is necessary because if the transaction lasts less than 19.8ms, we know that the respective -/// profile data won't have enough samples to be of any use, hence we "unlink" the profile from the transaction. -#[cfg(feature = "processing")] -pub fn scrub_profiler_id(event: &mut Annotated) { - let Some(event) = event.value_mut() else { - return; - }; - let transaction_duration = event - .get_value("event.duration") - .and_then(|duration| duration.as_f64()); - - if !transaction_duration.is_some_and(|duration| duration < 19.8) { - return; - } - if let Some(contexts) = event.contexts.value_mut().as_mut() - && let Some(profiler_id) = contexts - .get_mut::() - .map(|ctx| &mut ctx.profiler_id) - { - let id = std::mem::take(profiler_id.value_mut()); - let remark = Remark::new(RemarkType::Removed, "transaction_duration"); - profiler_id.meta_mut().add_remark(remark); - profiler_id.meta_mut().set_original_value(id); - } -} - /// Processes profiles and set the profile ID in the profile context on the transaction if successful. pub fn process( managed_envelope: &mut TypedEnvelope, @@ -657,90 +599,4 @@ mod tests { } "###); } - - #[cfg(feature = "processing")] - #[test] - fn test_scrub_profiler_id_should_be_stripped() { - let mut contexts = Contexts::new(); - contexts.add(ProfileContext { - profiler_id: Annotated::new(EventId( - Uuid::parse_str("52df9022835246eeb317dbd739ccd059").unwrap(), - )), - ..Default::default() - }); - let mut event: Annotated = Annotated::new(Event { - ty: Annotated::new(EventType::Transaction), - start_timestamp: Annotated::new( - Utc.with_ymd_and_hms(2000, 1, 1, 0, 0, 0).unwrap().into(), - ), - timestamp: Annotated::new( - Utc.with_ymd_and_hms(2000, 1, 1, 0, 0, 0) - .unwrap() - .checked_add_signed(Duration::milliseconds(15)) - .unwrap() - .into(), - ), - contexts: Annotated::new(contexts), - ..Default::default() - }); - - scrub_profiler_id(&mut event); - - let profile_context = get_value!(event.contexts) - .unwrap() - .get::() - .unwrap(); - - assert!( - profile_context - .profiler_id - .meta() - .iter_remarks() - .any(|remark| remark.rule_id == *"transaction_duration" - && remark.ty == RemarkType::Removed) - ) - } - - #[cfg(feature = "processing")] - #[test] - fn test_scrub_profiler_id_should_not_be_stripped() { - let mut contexts = Contexts::new(); - contexts.add(ProfileContext { - profiler_id: Annotated::new(EventId( - Uuid::parse_str("52df9022835246eeb317dbd739ccd059").unwrap(), - )), - ..Default::default() - }); - let mut event: Annotated = Annotated::new(Event { - ty: Annotated::new(EventType::Transaction), - start_timestamp: Annotated::new( - Utc.with_ymd_and_hms(2000, 1, 1, 0, 0, 0).unwrap().into(), - ), - timestamp: Annotated::new( - Utc.with_ymd_and_hms(2000, 1, 1, 0, 0, 0) - .unwrap() - .checked_add_signed(Duration::milliseconds(20)) - .unwrap() - .into(), - ), - contexts: Annotated::new(contexts), - ..Default::default() - }); - - scrub_profiler_id(&mut event); - - let profile_context = get_value!(event.contexts) - .unwrap() - .get::() - .unwrap(); - - assert!( - !profile_context - .profiler_id - .meta() - .iter_remarks() - .any(|remark| remark.rule_id == *"transaction_duration" - && remark.ty == RemarkType::Removed) - ) - } } diff --git a/relay-server/src/services/processor/span/processing.rs b/relay-server/src/services/processor/span/processing.rs index c707641a14..163031505c 100644 --- a/relay-server/src/services/processor/span/processing.rs +++ b/relay-server/src/services/processor/span/processing.rs @@ -2,24 +2,19 @@ use std::error::Error; -use crate::envelope::{ContentType, Item, ItemType}; +use crate::envelope::ItemType; use crate::managed::{ItemAction, ManagedEnvelope, TypedEnvelope}; use crate::metrics_extraction::{event, generic}; +use crate::processing; use crate::services::outcome::{DiscardReason, Outcome}; -use crate::services::processor::{ - EventMetricsExtracted, ProcessingError, ProcessingExtractedMetrics, SpanGroup, SpansExtracted, - TransactionGroup, event_type, -}; -use crate::services::projects::project::ProjectInfo; +use crate::services::processor::{ProcessingError, ProcessingExtractedMetrics, SpanGroup}; use crate::statsd::RelayCounters; use crate::utils::SamplingResult; -use crate::{processing, utils}; use chrono::{DateTime, Utc}; -use relay_base_schema::events::EventType; use relay_base_schema::project::ProjectId; use relay_config::Config; use relay_dynamic_config::{ - CombinedMetricExtractionConfig, ErrorBoundary, Feature, GlobalConfig, ProjectConfig, + CombinedMetricExtractionConfig, ErrorBoundary, GlobalConfig, ProjectConfig, }; use relay_event_normalization::AiOperationTypeMap; use relay_event_normalization::span::ai::enrich_ai_span_data; @@ -31,33 +26,13 @@ use relay_event_normalization::{ normalize_transaction_name, span::tag_extraction, validate_span, }; use relay_event_schema::processor::{ProcessingAction, ProcessingState, process_value}; -use relay_event_schema::protocol::{ - BrowserContext, Event, EventId, IpAddr, Measurement, Measurements, Span, SpanData, -}; +use relay_event_schema::protocol::{BrowserContext, Event, EventId, IpAddr, Span, SpanData}; use relay_log::protocol::{Attachment, AttachmentType}; -use relay_metrics::{FractionUnit, MetricNamespace, MetricUnit, UnixTimestamp}; +use relay_metrics::{MetricNamespace, UnixTimestamp}; use relay_pii::PiiProcessor; use relay_protocol::{Annotated, Empty, Value}; use relay_quotas::DataCategory; -#[derive(thiserror::Error, Debug)] -enum ValidationError { - #[error("empty span")] - EmptySpan, - #[error("span is missing `trace_id`")] - MissingTraceId, - #[error("span is missing `span_id`")] - MissingSpanId, - #[error("span is missing `timestamp`")] - MissingTimestamp, - #[error("span is missing `start_timestamp`")] - MissingStartTimestamp, - #[error("span end must be after start")] - EndBeforeStartTimestamp, - #[error("span is missing `exclusive_time`")] - MissingExclusiveTime, -} - pub async fn process( managed_envelope: &mut TypedEnvelope, event: &mut Annotated, @@ -207,7 +182,7 @@ pub async fn process( .ok(); // Validate for kafka (TODO: this should be moved to kafka producer) - match validate(&mut annotated_span) { + match processing::transactions::spans::validate(&mut annotated_span) { Ok(res) => res, Err(err) => { relay_log::with_scope( @@ -231,7 +206,10 @@ pub async fn process( } }; - let Ok(mut new_item) = create_span_item(annotated_span, ctx.config) else { + let Ok(mut new_item) = + processing::transactions::spans::create_span_item(annotated_span, ctx.config) + // TODO: move function + else { return ItemAction::Drop(Outcome::Invalid(DiscardReason::Internal)); }; @@ -254,187 +232,6 @@ pub async fn process( } } -fn create_span_item(span: Annotated, config: &Config) -> Result { - let mut new_item = Item::new(ItemType::Span); - if cfg!(feature = "processing") && config.processing_enabled() { - let span_v2 = span.map_value(relay_spans::span_v1_to_span_v2); - let payload = match span_v2.to_json() { - Ok(payload) => payload, - Err(err) => { - relay_log::error!("failed to serialize span V2: {}", err); - return Err(()); - } - }; - if let Some(trace_id) = span_v2.value().and_then(|s| s.trace_id.value()) { - new_item.set_routing_hint(*trace_id.as_ref()); - } - - new_item.set_payload(ContentType::Json, payload); - } else { - let payload = match span.to_json() { - Ok(payload) => payload, - Err(err) => { - relay_log::error!("failed to serialize span: {}", err); - return Err(()); - } - }; - new_item.set_payload(ContentType::Json, payload); - } - - Ok(new_item) -} - -fn add_sample_rate(measurements: &mut Annotated, name: &str, value: Option) { - let value = match value { - Some(value) if value > 0.0 => value, - _ => return, - }; - - let measurement = Annotated::new(Measurement { - value: Annotated::try_from(value), - unit: MetricUnit::Fraction(FractionUnit::Ratio).into(), - }); - - measurements - .get_or_insert_with(Measurements::default) - .insert(name.to_owned(), measurement); -} - -#[allow(clippy::too_many_arguments)] -pub fn extract_from_event( - managed_envelope: &mut TypedEnvelope, - event: &Annotated, - global_config: &GlobalConfig, - config: &Config, - server_sample_rate: Option, - event_metrics_extracted: EventMetricsExtracted, - spans_extracted: SpansExtracted, -) -> SpansExtracted { - // Only extract spans from transactions (not errors). - if event_type(event) != Some(EventType::Transaction) { - return spans_extracted; - }; - - if spans_extracted.0 { - return spans_extracted; - } - - if let Some(sample_rate) = global_config.options.span_extraction_sample_rate - && utils::sample(sample_rate).is_discard() - { - return spans_extracted; - } - - let client_sample_rate = managed_envelope - .envelope() - .dsc() - .and_then(|ctx| ctx.sample_rate); - - let mut add_span = |mut span: Span| { - add_sample_rate( - &mut span.measurements, - "client_sample_rate", - client_sample_rate, - ); - add_sample_rate( - &mut span.measurements, - "server_sample_rate", - server_sample_rate, - ); - - let mut span = Annotated::new(span); - - match validate(&mut span) { - Ok(span) => span, - Err(e) => { - relay_log::error!( - error = &e as &dyn Error, - span = ?span, - source = "event", - "invalid span" - ); - - managed_envelope.track_outcome( - Outcome::Invalid(DiscardReason::InvalidSpan), - relay_quotas::DataCategory::SpanIndexed, - 1, - ); - return; - } - }; - - let Ok(mut item) = create_span_item(span, config) else { - managed_envelope.track_outcome( - Outcome::Invalid(DiscardReason::InvalidSpan), - relay_quotas::DataCategory::SpanIndexed, - 1, - ); - return; - }; - // If metrics extraction happened for the event, it also happened for its spans: - item.set_metrics_extracted(event_metrics_extracted.0); - - relay_log::trace!("Adding span to envelope"); - managed_envelope.envelope_mut().add_item(item); - }; - - let Some(event) = event.value() else { - return spans_extracted; - }; - - let Some(transaction_span) = processing::utils::transaction::extract_segment_span( - event, - config - .aggregator_config_for(MetricNamespace::Spans) - .max_tag_value_length, - &[], - ) else { - return spans_extracted; - }; - - // Add child spans as envelope items. - if let Some(child_spans) = event.spans.value() { - for span in child_spans { - let Some(inner_span) = span.value() else { - continue; - }; - // HACK: clone the span to set the segment_id. This should happen - // as part of normalization once standalone spans reach wider adoption. - let mut new_span = inner_span.clone(); - new_span.is_segment = Annotated::new(false); - new_span.is_remote = Annotated::new(false); - new_span.received = transaction_span.received.clone(); - new_span.segment_id = transaction_span.segment_id.clone(); - new_span.platform = transaction_span.platform.clone(); - - // If a profile is associated with the transaction, also associate it with its - // child spans. - new_span.profile_id = transaction_span.profile_id.clone(); - - add_span(new_span); - } - } - - add_span(transaction_span); - - SpansExtracted(true) -} - -/// Removes the transaction in case the project has made the transition to spans-only. -pub fn maybe_discard_transaction( - managed_envelope: &mut TypedEnvelope, - event: Annotated, - project_info: &ProjectInfo, -) -> Annotated { - if event_type(&event) == Some(EventType::Transaction) - && project_info.has_feature(Feature::DiscardTransaction) - { - managed_envelope.update(); - return Annotated::empty(); - } - - event -} /// Config needed to normalize a standalone span. #[derive(Clone, Debug)] struct NormalizeSpanConfig<'a> { @@ -755,240 +552,16 @@ fn scrub( Ok(()) } -/// We do not extract or ingest spans with missing fields if those fields are required on the Kafka topic. -fn validate(span: &mut Annotated) -> Result<(), ValidationError> { - let inner = span - .value_mut() - .as_mut() - .ok_or(ValidationError::EmptySpan)?; - let Span { - exclusive_time, - tags, - sentry_tags, - start_timestamp, - timestamp, - span_id, - trace_id, - .. - } = inner; - - trace_id.value().ok_or(ValidationError::MissingTraceId)?; - span_id.value().ok_or(ValidationError::MissingSpanId)?; - - match (start_timestamp.value(), timestamp.value()) { - (Some(start), Some(end)) if end < start => Err(ValidationError::EndBeforeStartTimestamp), - (Some(_), Some(_)) => Ok(()), - (_, None) => Err(ValidationError::MissingTimestamp), - (None, _) => Err(ValidationError::MissingStartTimestamp), - }?; - - exclusive_time - .value() - .ok_or(ValidationError::MissingExclusiveTime)?; - - if let Some(sentry_tags) = sentry_tags.value_mut() { - if sentry_tags - .group - .value() - .is_some_and(|s| s.len() > 16 || s.chars().any(|c| !c.is_ascii_hexdigit())) - { - sentry_tags.group.set_value(None); - } - - if sentry_tags - .status_code - .value() - .is_some_and(|s| s.parse::().is_err()) - { - sentry_tags.group.set_value(None); - } - } - if let Some(tags) = tags.value_mut() { - tags.retain(|_, value| !value.value().is_empty()) - } - - Ok(()) -} - #[cfg(test)] mod tests { - use std::collections::BTreeMap; - use std::sync::{Arc, LazyLock}; + use std::sync::LazyLock; - use bytes::Bytes; - use relay_event_schema::protocol::{Context, ContextInner, EventId, Timestamp, TraceContext}; - use relay_event_schema::protocol::{Contexts, Event, Span}; + use relay_event_schema::protocol::EventId; + use relay_event_schema::protocol::Span; use relay_protocol::get_value; - use relay_system::Addr; - - use crate::envelope::Envelope; - use crate::managed::ManagedEnvelope; - use crate::services::processor::ProcessingGroup; - use crate::services::projects::project::ProjectInfo; use super::*; - fn params() -> ( - TypedEnvelope, - Annotated, - Arc, - ) { - let bytes = Bytes::from( - r#"{"event_id":"9ec79c33ec9942ab8353589fcb2e04dc","dsn":"https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42","trace":{"trace_id":"89143b0763095bd9c9955e8175d1fb23","public_key":"e12d836b15bb49d7bbf99e64295d995b","sample_rate":"0.2"}} -{"type":"transaction"} -{} -"#, - ); - - let dummy_envelope = Envelope::parse_bytes(bytes).unwrap(); - let project_info = Arc::new(ProjectInfo::default()); - - let event = Event { - ty: EventType::Transaction.into(), - start_timestamp: Timestamp(DateTime::from_timestamp(0, 0).unwrap()).into(), - timestamp: Timestamp(DateTime::from_timestamp(1, 0).unwrap()).into(), - contexts: Contexts(BTreeMap::from([( - "trace".into(), - ContextInner(Context::Trace(Box::new(TraceContext { - trace_id: Annotated::new("4c79f60c11214eb38604f4ae0781bfb2".parse().unwrap()), - span_id: Annotated::new("fa90fdead5f74053".parse().unwrap()), - exclusive_time: 1000.0.into(), - ..Default::default() - }))) - .into(), - )])) - .into(), - ..Default::default() - }; - - let managed_envelope = ManagedEnvelope::new(dummy_envelope, Addr::dummy()); - let managed_envelope = (managed_envelope, ProcessingGroup::Transaction) - .try_into() - .unwrap(); - - let event = Annotated::from(event); - - (managed_envelope, event, project_info) - } - - #[test] - fn extract_sampled_default() { - let global_config = GlobalConfig::default(); - assert!(global_config.options.span_extraction_sample_rate.is_none()); - let (mut managed_envelope, event, _) = params(); - extract_from_event( - &mut managed_envelope, - &event, - &global_config, - &Default::default(), - None, - EventMetricsExtracted(false), - SpansExtracted(false), - ); - assert!( - managed_envelope - .envelope() - .items() - .any(|item| item.ty() == &ItemType::Span), - "{:?}", - managed_envelope.envelope() - ); - } - - #[test] - fn extract_sampled_explicit() { - let mut global_config = GlobalConfig::default(); - global_config.options.span_extraction_sample_rate = Some(1.0); - let (mut managed_envelope, event, _) = params(); - extract_from_event( - &mut managed_envelope, - &event, - &global_config, - &Default::default(), - None, - EventMetricsExtracted(false), - SpansExtracted(false), - ); - assert!( - managed_envelope - .envelope() - .items() - .any(|item| item.ty() == &ItemType::Span), - "{:?}", - managed_envelope.envelope() - ); - } - - #[test] - fn extract_sampled_dropped() { - let mut global_config = GlobalConfig::default(); - global_config.options.span_extraction_sample_rate = Some(0.0); - let (mut managed_envelope, event, _) = params(); - extract_from_event( - &mut managed_envelope, - &event, - &global_config, - &Default::default(), - None, - EventMetricsExtracted(false), - SpansExtracted(false), - ); - assert!( - !managed_envelope - .envelope() - .items() - .any(|item| item.ty() == &ItemType::Span), - "{:?}", - managed_envelope.envelope() - ); - } - - #[test] - fn extract_sample_rates() { - let mut global_config = GlobalConfig::default(); - global_config.options.span_extraction_sample_rate = Some(1.0); // force enable - let (mut managed_envelope, event, _) = params(); // client sample rate is 0.2 - extract_from_event( - &mut managed_envelope, - &event, - &global_config, - &Default::default(), - Some(0.1), - EventMetricsExtracted(false), - SpansExtracted(false), - ); - - let span = managed_envelope - .envelope() - .items() - .find(|item| item.ty() == &ItemType::Span) - .unwrap(); - - let span = Annotated::::from_json_bytes(&span.payload()).unwrap(); - let measurements = span.value().and_then(|s| s.measurements.value()); - - insta::assert_debug_snapshot!(measurements, @r###" - Some( - Measurements( - { - "client_sample_rate": Measurement { - value: 0.2, - unit: Fraction( - Ratio, - ), - }, - "server_sample_rate": Measurement { - value: 0.1, - unit: Fraction( - Ratio, - ), - }, - }, - ), - ) - "###); - } - #[test] fn segment_no_overwrite() { let mut span: Annotated = Annotated::from_json( diff --git a/relay-server/src/services/store.rs b/relay-server/src/services/store.rs index d08a59ecf7..e266e4d08c 100644 --- a/relay-server/src/services/store.rs +++ b/relay-server/src/services/store.rs @@ -13,7 +13,6 @@ use futures::future::BoxFuture; use prost::Message as _; use sentry_protos::snuba::v1::{TraceItem, TraceItemType}; use serde::{Deserialize, Serialize}; -use serde_json::value::RawValue; use uuid::Uuid; use relay_base_schema::data_category::DataCategory; @@ -33,7 +32,7 @@ use relay_statsd::metric; use relay_system::{Addr, FromMessage, Interface, NoResponse, Service}; use relay_threading::AsyncPool; -use crate::envelope::{AttachmentType, ContentType, Envelope, Item, ItemType}; +use crate::envelope::{AttachmentType, Envelope, Item, ItemType}; use crate::managed::{Counted, Managed, OutcomeError, Quantities, TypedEnvelope}; use crate::metrics::{ArrayEncoding, BucketEncoder, MetricOutcomes}; use crate::service::ServiceError; @@ -277,8 +276,6 @@ impl StoreService { scoping: Scoping, ) -> Result<(), StoreError> { let retention = envelope.retention(); - let downsampled_retention = envelope.downsampled_retention(); - let event_id = envelope.event_id(); let event_item = envelope.as_mut().take_item_by(|item| { matches!( @@ -315,7 +312,6 @@ impl StoreService { .is_keep(); for item in envelope.items() { - let content_type = item.content_type(); match item.ty() { ItemType::Attachment => { if let Some(attachment) = self.produce_attachment( @@ -382,14 +378,6 @@ impl StoreService { let client = envelope.meta().client(); self.produce_check_in(scoping.project_id, received_at, client, retention, item)? } - ItemType::Span if content_type == Some(&ContentType::Json) => self.produce_span( - scoping, - received_at, - event_id, - retention, - downsampled_retention, - item, - )?, ty @ ItemType::Log => { debug_assert!( false, @@ -627,7 +615,7 @@ impl StoreService { let result = message.try_accept(|span| { let item = Annotated::new(span.item); - let message = KafkaMessage::SpanV2 { + let message = KafkaMessage::Span { routing_key: span.routing_key, headers: BTreeMap::from([( "project_id".to_owned(), @@ -1081,74 +1069,6 @@ impl StoreService { Ok(()) } - fn produce_span( - &self, - scoping: Scoping, - received_at: DateTime, - event_id: Option, - retention_days: u16, - downsampled_retention_days: u16, - item: &Item, - ) -> Result<(), StoreError> { - debug_assert_eq!(item.ty(), &ItemType::Span); - debug_assert_eq!(item.content_type(), Some(&ContentType::Json)); - - let Scoping { - organization_id, - project_id, - project_key: _, - key_id, - } = scoping; - - let payload = item.payload(); - let message = SpanKafkaMessageRaw { - meta: SpanMeta { - organization_id, - project_id, - key_id, - event_id, - retention_days, - downsampled_retention_days, - received: datetime_to_timestamp(received_at), - }, - span: serde_json::from_slice(&payload) - .map_err(|e| StoreError::EncodingFailed(e.into()))?, - }; - - // Verify that this is a V2 span: - debug_assert!(message.span.contains_key("attributes")); - relay_statsd::metric!( - counter(RelayCounters::SpanV2Produced) += 1, - via = "envelope" - ); - - self.produce( - KafkaTopic::Spans, - KafkaMessage::SpanRaw { - routing_key: item.routing_hint(), - headers: BTreeMap::from([( - "project_id".to_owned(), - scoping.project_id.to_string(), - )]), - message, - }, - )?; - - // XXX: Temporarily produce span outcomes. Keep in sync with either EAP - // or the segments consumer, depending on which will produce outcomes later. - self.outcome_aggregator.send(TrackOutcome { - category: DataCategory::SpanIndexed, - event_id: None, - outcome: Outcome::Accepted, - quantity: 1, - remote_addr: None, - scoping, - timestamp: received_at, - }); - - Ok(()) - } - fn produce_profile_chunk( &self, organization_id: OrganizationId, @@ -1444,14 +1364,6 @@ struct CheckInKafkaMessage { retention_days: u16, } -#[derive(Debug, Serialize)] -struct SpanKafkaMessageRaw<'a> { - #[serde(flatten)] - meta: SpanMeta, - #[serde(flatten)] - span: BTreeMap<&'a str, &'a RawValue>, -} - #[derive(Debug, Serialize)] struct SpanKafkaMessage<'a> { #[serde(flatten)] @@ -1508,15 +1420,7 @@ enum KafkaMessage<'a> { #[serde(skip)] message: TraceItem, }, - SpanRaw { - #[serde(skip)] - routing_key: Option, - #[serde(skip)] - headers: BTreeMap, - #[serde(flatten)] - message: SpanKafkaMessageRaw<'a>, - }, - SpanV2 { + Span { #[serde(skip)] routing_key: Option, #[serde(skip)] @@ -1548,7 +1452,7 @@ impl Message for KafkaMessage<'_> { MetricNamespace::Unsupported => "metric_unsupported", }, KafkaMessage::CheckIn(_) => "check_in", - KafkaMessage::SpanRaw { .. } | KafkaMessage::SpanV2 { .. } => "span", + KafkaMessage::Span { .. } => "span", KafkaMessage::Item { item_type, .. } => item_type.as_str_name(), KafkaMessage::Attachment(_) => "attachment", @@ -1567,7 +1471,7 @@ impl Message for KafkaMessage<'_> { match self { Self::Event(message) => Some(message.event_id.0), Self::UserReport(message) => Some(message.event_id.0), - Self::SpanRaw { routing_key, .. } | Self::SpanV2 { routing_key, .. } => *routing_key, + Self::Span { routing_key, .. } => *routing_key, // Monitor check-ins use the hinted UUID passed through from the Envelope. // @@ -1593,8 +1497,7 @@ impl Message for KafkaMessage<'_> { fn headers(&self) -> Option<&BTreeMap> { match &self { KafkaMessage::Metric { headers, .. } - | KafkaMessage::SpanRaw { headers, .. } - | KafkaMessage::SpanV2 { headers, .. } + | KafkaMessage::Span { headers, .. } | KafkaMessage::Item { headers, .. } | KafkaMessage::Profile(ProfileKafkaMessage { headers, .. }) => Some(headers), @@ -1613,8 +1516,7 @@ impl Message for KafkaMessage<'_> { match self { KafkaMessage::Metric { message, .. } => serialize_as_json(message), KafkaMessage::ReplayEvent(message) => serialize_as_json(message), - KafkaMessage::SpanRaw { message, .. } => serialize_as_json(message), - KafkaMessage::SpanV2 { message, .. } => serialize_as_json(message), + KafkaMessage::Span { message, .. } => serialize_as_json(message), KafkaMessage::Item { message, .. } => { let mut payload = Vec::new(); match message.encode(&mut payload) { diff --git a/relay-server/src/utils/dynamic_sampling.rs b/relay-server/src/utils/dynamic_sampling.rs index af2c73b11a..faf0694cb9 100644 --- a/relay-server/src/utils/dynamic_sampling.rs +++ b/relay-server/src/utils/dynamic_sampling.rs @@ -3,10 +3,9 @@ use std::ops::ControlFlow; use chrono::Utc; use relay_base_schema::events::EventType; -use relay_base_schema::project::ProjectKey; -use relay_event_schema::protocol::{Event, TraceContext}; +use relay_event_schema::protocol::Event; use relay_sampling::config::{RuleType, SamplingConfig}; -use relay_sampling::dsc::{DynamicSamplingContext, TraceUserContext}; +use relay_sampling::dsc::DynamicSamplingContext; use relay_sampling::evaluation::{SamplingDecision, SamplingEvaluator, SamplingMatch}; use crate::services::outcome::Outcome; @@ -97,45 +96,6 @@ pub async fn is_trace_fully_sampled( Some(SamplingResult::from(evaluation).decision().is_keep()) } -/// Computes a dynamic sampling context from a transaction event. -/// -/// Returns `None` if the passed event is not a transaction event, or if it does not contain a -/// trace ID in its trace context. All optional fields in the dynamic sampling context are -/// populated with the corresponding attributes from the event payload if they are available. -/// -/// Since sampling information is not available in the event payload, the `sample_rate` field -/// cannot be set when computing the dynamic sampling context from a transaction event. -pub fn dsc_from_event(public_key: ProjectKey, event: &Event) -> Option { - if event.ty.value() != Some(&EventType::Transaction) { - return None; - } - - let trace = event.context::()?; - let trace_id = *trace.trace_id.value()?; - let user = event.user.value(); - - Some(DynamicSamplingContext { - trace_id, - public_key, - release: event.release.as_str().map(str::to_owned), - environment: event.environment.value().cloned(), - transaction: event.transaction.value().cloned(), - replay_id: None, - sample_rate: None, - user: TraceUserContext { - user_segment: user - .and_then(|u| u.segment.value().cloned()) - .unwrap_or_default(), - user_id: user - .and_then(|u| u.id.as_str()) - .unwrap_or_default() - .to_owned(), - }, - sampled: None, - other: Default::default(), - }) -} - #[cfg(test)] mod tests { use relay_event_schema::protocol::{EventId, LenientString}; diff --git a/relay-server/src/utils/feature.rs b/relay-server/src/utils/feature.rs new file mode 100644 index 0000000000..631fb96864 --- /dev/null +++ b/relay-server/src/utils/feature.rs @@ -0,0 +1,15 @@ +use relay_config::{Config, RelayMode}; +use relay_dynamic_config::Feature; + +use crate::services::projects::project::ProjectInfo; + +/// Function for on-off switches that filter specific item types (profiles, spans) +/// based on a feature flag. +/// +/// If the project config did not come from the upstream, we keep the items. +pub fn should_filter(config: &Config, project_info: &ProjectInfo, feature: Feature) -> bool { + match config.relay_mode() { + RelayMode::Proxy => false, + RelayMode::Managed => !project_info.has_feature(feature), + } +} diff --git a/relay-server/src/utils/mod.rs b/relay-server/src/utils/mod.rs index 9e73d8b3a5..35876b9607 100644 --- a/relay-server/src/utils/mod.rs +++ b/relay-server/src/utils/mod.rs @@ -12,6 +12,7 @@ mod split_off; mod statsd; mod thread_pool; +mod feature; mod memory; #[cfg(feature = "processing")] mod native; @@ -38,3 +39,4 @@ pub use self::statsd::*; pub use self::thread_pool::*; #[cfg(feature = "processing")] pub use self::unreal::*; +pub use feature::*;