diff --git a/CHANGELOG.md b/CHANGELOG.md index 5b72d7af66e..571d808839e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,8 +1,11 @@ # Changelog +## Unreleased + **Internal**: - Use a Lua script and in-memory cache for the cardinality limiting to reduce load on Redis. ([#2849](https://github.com/getsentry/relay/pull/2849)) +- Emit a `processor.message.duration` metric to assess the throughput of the internal CPU pool. ([#2877](https://github.com/getsentry/relay/pull/2877)) ## 23.12.0 diff --git a/relay-server/src/actors/processor.rs b/relay-server/src/actors/processor.rs index 77d60f962f2..aeb833f5709 100644 --- a/relay-server/src/actors/processor.rs +++ b/relay-server/src/actors/processor.rs @@ -497,6 +497,22 @@ pub enum EnvelopeProcessor { RateLimitBuckets(RateLimitBuckets), } +impl EnvelopeProcessor { + /// Returns the name of the message variant. + pub fn variant(&self) -> &'static str { + match self { + EnvelopeProcessor::ProcessEnvelope(_) => "ProcessEnvelope", + EnvelopeProcessor::ProcessMetrics(_) => "ProcessMetrics", + EnvelopeProcessor::ProcessMetricMeta(_) => "ProcessMetricMeta", + EnvelopeProcessor::EncodeEnvelope(_) => "EncodeEnvelope", + EnvelopeProcessor::EncodeMetrics(_) => "EncodeMetrics", + EnvelopeProcessor::EncodeMetricMeta(_) => "EncodeMetricMeta", + #[cfg(feature = "processing")] + EnvelopeProcessor::RateLimitBuckets(_) => "RateLimitBuckets", + } + } +} + impl relay_system::Interface for EnvelopeProcessor {} impl FromMessage for EnvelopeProcessor { @@ -1587,22 +1603,19 @@ impl EnvelopeProcessorService { } fn handle_message(&self, message: EnvelopeProcessor) { - match message { - EnvelopeProcessor::ProcessEnvelope(message) => self.handle_process_envelope(*message), - EnvelopeProcessor::ProcessMetrics(message) => self.handle_process_metrics(*message), - EnvelopeProcessor::ProcessMetricMeta(message) => { - self.handle_process_metric_meta(*message) + let ty = message.variant(); + metric!(timer(RelayTimers::ProcessMessageDuration), message = ty, { + match message { + EnvelopeProcessor::ProcessEnvelope(m) => self.handle_process_envelope(*m), + EnvelopeProcessor::ProcessMetrics(m) => self.handle_process_metrics(*m), + EnvelopeProcessor::ProcessMetricMeta(m) => self.handle_process_metric_meta(*m), + EnvelopeProcessor::EncodeEnvelope(m) => self.handle_encode_envelope(*m), + EnvelopeProcessor::EncodeMetrics(m) => self.handle_encode_metrics(*m), + EnvelopeProcessor::EncodeMetricMeta(m) => self.handle_encode_metric_meta(*m), + #[cfg(feature = "processing")] + EnvelopeProcessor::RateLimitBuckets(m) => self.handle_rate_limit_buckets(m), } - EnvelopeProcessor::EncodeEnvelope(message) => self.handle_encode_envelope(*message), - EnvelopeProcessor::EncodeMetrics(message) => self.handle_encode_metrics(*message), - EnvelopeProcessor::EncodeMetricMeta(message) => { - self.handle_encode_metric_meta(*message) - } - #[cfg(feature = "processing")] - EnvelopeProcessor::RateLimitBuckets(message) => { - self.handle_rate_limit_buckets(message); - } - } + }); } } diff --git a/relay-server/src/statsd.rs b/relay-server/src/statsd.rs index 4160dc255f1..391ea47acc6 100644 --- a/relay-server/src/statsd.rs +++ b/relay-server/src/statsd.rs @@ -331,6 +331,12 @@ pub enum RelayTimers { ReplayRecordingProcessing, /// Total time spent to send a request and receive the response from upstream. GlobalConfigRequestDuration, + /// Timing in milliseconds for processing a message in the internal CPU pool. + /// + /// This metric is tagged with: + /// + /// - `message`: The type of message that was processed. + ProcessMessageDuration, } impl TimerMetric for RelayTimers { @@ -366,6 +372,7 @@ impl TimerMetric for RelayTimers { RelayTimers::OutcomeAggregatorFlushTime => "outcomes.aggregator.flush_time", RelayTimers::ReplayRecordingProcessing => "replay.recording.process", RelayTimers::GlobalConfigRequestDuration => "global_config.requests.duration", + RelayTimers::ProcessMessageDuration => "processor.message.duration", } } }