diff --git a/CHANGELOG.md b/CHANGELOG.md index 6c1aff49045..20a59d90db7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,7 @@ - Extract crashpad annotations into contexts. ([#892](https://github.com/getsentry/relay/pull/892)) - Normalize user reports during ingestion and create empty fields. ([#903](https://github.com/getsentry/relay/pull/903)) +- Ingest and normalize sample rates from envelope item headers. ([#910](https://github.com/getsentry/relay/pull/910)) ## 20.12.1 diff --git a/relay-general/src/protocol/metrics.rs b/relay-general/src/protocol/metrics.rs index 44c62b43971..e61e3310bee 100644 --- a/relay-general/src/protocol/metrics.rs +++ b/relay-general/src/protocol/metrics.rs @@ -1,5 +1,26 @@ use crate::processor::ProcessValue; -use crate::types::Annotated; +use crate::types::{Annotated, Array}; + +#[derive(Clone, Debug, Default, Empty, PartialEq, FromValue, ToValue)] +#[cfg_attr(feature = "jsonschema", derive(JsonSchema))] +pub struct SampleRate { + /// The unique identifier of the sampling rule or mechanism. + /// + /// For client-side sampling, this identifies the sampling mechanism: + /// - `client_rate`: Default base sample rate configured in client options. Only reported in + /// the absence of the traces sampler callback. + /// - `client_sampler`: Return value from the traces sampler callback during runtime. Always + /// overrides the `client_rate`. + /// + /// For server-side sampling, this identifies the dynamic sampling rule. + id: Annotated, + + /// The effective sample rate in the range `(0..1]`. + /// + /// While allowed in the protocol, a value of `0` can never occur in practice since such events + /// would never be reported to Sentry and thus never generate this metric. + rate: Annotated, +} /// Metrics captured during event ingestion and processing. /// @@ -130,6 +151,12 @@ pub struct Metrics { /// This metric is measured in Sentry and should be reported in all processing tasks. #[metastructure(field = "flag.processing.fatal")] pub flag_processing_fatal: Annotated, + + /// A list of cumulative sample rates applied to this event. + /// + /// Multiple entries in `sample_rates` mean that the event was sampled multiple times. The + /// effective sample rate is multiplied. + pub sample_rates: Annotated>, } // Do not process Metrics diff --git a/relay-general/src/protocol/mod.rs b/relay-general/src/protocol/mod.rs index e28d45b4f1c..b1dc00711e2 100644 --- a/relay-general/src/protocol/mod.rs +++ b/relay-general/src/protocol/mod.rs @@ -49,7 +49,7 @@ pub use self::fingerprint::Fingerprint; pub use self::logentry::{LogEntry, Message}; pub use self::measurements::Measurements; pub use self::mechanism::{CError, MachException, Mechanism, MechanismMeta, PosixSignal}; -pub use self::metrics::Metrics; +pub use self::metrics::{Metrics, SampleRate}; pub use self::request::{Cookies, HeaderName, HeaderValue, Headers, Query, Request}; #[cfg(feature = "jsonschema")] pub use self::schema::event_json_schema; diff --git a/relay-server/src/actors/events.rs b/relay-server/src/actors/events.rs index cd2953537d5..4b73dbc2919 100644 --- a/relay-server/src/actors/events.rs +++ b/relay-server/src/actors/events.rs @@ -20,7 +20,7 @@ use relay_general::protocol::{ LenientString, Metrics, SecurityReportType, SessionUpdate, Timestamp, UserReport, Values, }; use relay_general::store::ClockDriftProcessor; -use relay_general::types::{Annotated, Array, Object, ProcessingAction, Value}; +use relay_general::types::{Annotated, Array, FromValue, Object, ProcessingAction, Value}; use relay_log::LogError; use relay_quotas::RateLimits; use relay_redis::RedisPool; @@ -202,6 +202,12 @@ struct ProcessEnvelopeState { /// persisted into the Event. All modifications afterwards will have no effect. metrics: Metrics, + /// A list of cumulative sample rates applied to this event. + /// + /// This element is obtained from the event or transaction item and re-serialized into the + /// resulting item. + sample_rates: Option, + /// Rate limits returned in processing mode. /// /// The rate limiter is invoked in processing mode, after which the resulting limits are stored @@ -463,6 +469,7 @@ impl EventProcessor { envelope, event: Annotated::empty(), metrics: Metrics::default(), + sample_rates: None, rate_limits: RateLimits::new(), project_state, project_id, @@ -749,22 +756,25 @@ impl EventProcessor { return Err(ProcessingError::DuplicateItem(duplicate.ty())); } - let (event, event_len) = if let Some(item) = event_item.or(security_item) { + let (event, event_len) = if let Some(mut item) = event_item.or(security_item) { relay_log::trace!("processing json event"); + state.sample_rates = item.take_sample_rates(); metric!(timer(RelayTimers::EventProcessingDeserialize), { // Event items can never include transactions, so retain the event type and let // inference deal with this during store normalization. self.event_from_json_payload(item, None)? }) - } else if let Some(item) = transaction_item { + } else if let Some(mut item) = transaction_item { relay_log::trace!("processing json transaction"); + state.sample_rates = item.take_sample_rates(); metric!(timer(RelayTimers::EventProcessingDeserialize), { // Transaction items can only contain transaction events. Force the event type to // hint to normalization that we're dealing with a transaction now. self.event_from_json_payload(item, Some(EventType::Transaction))? }) - } else if let Some(item) = raw_security_item { + } else if let Some(mut item) = raw_security_item { relay_log::trace!("processing security report"); + state.sample_rates = item.take_sample_rates(); self.event_from_security_report(item)? } else if attachment_item.is_some() || breadcrumbs1.is_some() || breadcrumbs2.is_some() { relay_log::trace!("extracting attached event data"); @@ -847,6 +857,8 @@ impl EventProcessor { // In processing mode, also write metrics into the event. Most metrics have already been // collected at this state, except for the combined size of all attachments. if self.config.processing_enabled() { + let mut metrics = std::mem::take(&mut state.metrics); + let attachment_size = envelope .items() .filter(|item| item.attachment_type() == Some(AttachmentType::Attachment)) @@ -854,10 +866,22 @@ impl EventProcessor { .sum::(); if attachment_size > 0 { - state.metrics.bytes_ingested_event_attachment = Annotated::new(attachment_size); + metrics.bytes_ingested_event_attachment = Annotated::new(attachment_size); + } + + let sample_rates = state + .sample_rates + .take() + .and_then(|value| Array::from_value(Annotated::new(value)).into_value()); + + if let Some(rates) = sample_rates { + metrics + .sample_rates + .get_or_insert_with(Array::new) + .extend(rates) } - event._metrics = Annotated::new(std::mem::take(&mut state.metrics)); + event._metrics = Annotated::new(metrics); } // TODO: Temporary workaround before processing. Experimental SDKs relied on a buggy @@ -1076,6 +1100,13 @@ impl EventProcessor { let event_type = state.event_type().unwrap_or_default(); let mut event_item = Item::new(ItemType::from_event_type(event_type)); event_item.set_payload(ContentType::Json, data); + + // If there are sample rates, write them back to the envelope. In processing mode, sample + // rates have been removed from the state and burnt into the event via `finalize_event`. + if let Some(sample_rates) = state.sample_rates.take() { + event_item.set_sample_rates(sample_rates); + } + state.envelope.add_item(event_item); Ok(()) @@ -1127,7 +1158,6 @@ impl EventProcessor { }); self.finalize_event(&mut state)?; - self.sample_event(&mut state)?; if_processing!({ diff --git a/relay-server/src/envelope.rs b/relay-server/src/envelope.rs index d38eefbc2cf..b206bbdba2b 100644 --- a/relay-server/src/envelope.rs +++ b/relay-server/src/envelope.rs @@ -349,6 +349,13 @@ pub struct ItemHeaders { #[serde(default, skip)] rate_limited: bool, + /// A list of cumulative sample rates applied to this event. + /// + /// Multiple entries in `sample_rates` mean that the event was sampled multiple times. The + /// effective sample rate is multiplied. + #[serde(default, skip_serializing_if = "Option::is_none")] + sample_rates: Option, + /// Other attributes for forward compatibility. #[serde(flatten)] other: BTreeMap, @@ -371,6 +378,7 @@ impl Item { content_type: None, filename: None, rate_limited: false, + sample_rates: None, other: BTreeMap::new(), }, payload: Bytes::new(), @@ -456,6 +464,18 @@ impl Item { self.headers.rate_limited = rate_limited; } + /// Removes sample rates from the headers, if any. + pub fn take_sample_rates(&mut self) -> Option { + self.headers.sample_rates.take() + } + + /// Sets sample rates for this item. + pub fn set_sample_rates(&mut self, sample_rates: Value) { + if matches!(sample_rates, Value::Array(ref a) if !a.is_empty()) { + self.headers.sample_rates = Some(sample_rates); + } + } + /// Returns the specified header value, if present. pub fn get_header(&self, name: &K) -> Option<&Value> where diff --git a/tests/integration/test_envelope.py b/tests/integration/test_envelope.py index 1fef8f97087..e51932818f8 100644 --- a/tests/integration/test_envelope.py +++ b/tests/integration/test_envelope.py @@ -51,18 +51,12 @@ def generate_transaction_item(): def test_normalize_measurement_interface( mini_sentry, relay_with_processing, transactions_consumer ): - - # set up relay - relay = relay_with_processing() mini_sentry.add_basic_project_config(42) events_consumer = transactions_consumer() - # construct envelope - transaction_item = generate_transaction_item() - transaction_item.update( { "measurements": { @@ -79,15 +73,9 @@ def test_normalize_measurement_interface( envelope = Envelope() envelope.add_transaction(transaction_item) - - # ingest envelope - relay.send_envelope(42, envelope) event, _ = events_consumer.try_get_event() - - # test actual output - assert event["transaction"] == "/organizations/:orgId/performance/:eventSlug/" assert "trace" in event["contexts"] assert "measurements" in event, event @@ -102,31 +90,19 @@ def test_normalize_measurement_interface( def test_empty_measurement_interface(mini_sentry, relay_chain): - - # set up relay - relay = relay_chain() mini_sentry.add_basic_project_config(42) - # construct envelope - transaction_item = generate_transaction_item() - transaction_item.update({"measurements": {}}) envelope = Envelope() envelope.add_transaction(transaction_item) - - # ingest envelope - relay.send_envelope(42, envelope) envelope = mini_sentry.captured_events.get(timeout=1) - event = envelope.get_transaction_event() - # test actual output - assert event["transaction"] == "/organizations/:orgId/performance/:eventSlug/" assert "measurements" not in event, event @@ -134,16 +110,9 @@ def test_empty_measurement_interface(mini_sentry, relay_chain): def test_strip_measurement_interface( mini_sentry, relay_with_processing, events_consumer ): - - # set up relay - relay = relay_with_processing() mini_sentry.add_basic_project_config(42) - events_consumer = events_consumer() - - # construct envelope - envelope = Envelope() envelope.add_event( { @@ -155,16 +124,49 @@ def test_strip_measurement_interface( }, } ) - - # ingest envelope - relay.send_envelope(42, envelope) + events_consumer = events_consumer() event, _ = events_consumer.try_get_event() - # test actual output - assert event["logentry"] == {"formatted": "Hello, World!"} - # expect measurements interface object to be stripped out since it's attached to a non-transaction event assert "measurements" not in event, event + + +def test_sample_rates(mini_sentry, relay_chain): + relay = relay_chain() + mini_sentry.add_basic_project_config(42) + + sample_rates = [ + {"id": "client_sampler", "rate": 0.01}, + {"id": "dyanmic_user", "rate": 0.5}, + ] + + envelope = Envelope() + envelope.add_event({"message": "hello, world!"}) + envelope.items[0].headers["sample_rates"] = sample_rates + relay.send_envelope(42, envelope) + + envelope = mini_sentry.captured_events.get(timeout=1) + assert envelope.items[0].headers["sample_rates"] == sample_rates + + +def test_sample_rates_metrics(mini_sentry, relay_with_processing, events_consumer): + relay = relay_with_processing() + mini_sentry.add_basic_project_config(42) + + sample_rates = [ + {"id": "client_sampler", "rate": 0.01}, + {"id": "dyanmic_user", "rate": 0.5}, + ] + + envelope = Envelope() + envelope.add_event({"message": "hello, world!"}) + envelope.items[0].headers["sample_rates"] = sample_rates + relay.send_envelope(42, envelope) + + events_consumer = events_consumer() + event, _ = events_consumer.try_get_event() + + assert event["_metrics"]["sample_rates"] == sample_rates