diff --git a/CHANGELOG.md b/CHANGELOG.md index 9a777e650ab..fe5443b3c0a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,10 @@ ## Unreleased +**Features**: + +- By adding `.no-cache` to the DSN key, Relay refreshes project configuration caches immediately. This allows to apply changed settings instantly, such as updates to data scrubbing or inbound filter rules. ([#911](https://github.com/getsentry/relay/pull/911)) + **Internal**: - Compatibility mode for pre-aggregated sessions was removed. The feature is now enabled by default in full fidelity. ([#913](https://github.com/getsentry/relay/pull/913)) diff --git a/relay-common/src/project.rs b/relay-common/src/project.rs index 0e91766787a..300b691d121 100644 --- a/relay-common/src/project.rs +++ b/relay-common/src/project.rs @@ -56,6 +56,13 @@ impl ProjectKey { Ok(project_key) } + /// Parses a `ProjectKey` from a string with flags. + pub fn parse_with_flags(key: &str) -> Result<(Self, Vec<&str>), ParseProjectKeyError> { + let mut iter = key.split('.'); + let key = ProjectKey::parse(iter.next().ok_or(ParseProjectKeyError)?)?; + Ok((key, iter.collect())) + } + /// Returns the string representation of the project key. #[inline] pub fn as_str(&self) -> &str { diff --git a/relay-server/src/actors/events.rs b/relay-server/src/actors/events.rs index 4b73dbc2919..e9e58db116d 100644 --- a/relay-server/src/actors/events.rs +++ b/relay-server/src/actors/events.rs @@ -1489,9 +1489,11 @@ impl Handler for EventManager { } }) .and_then(clone!(project, |envelope| { - // get the state for the current project + // get the state for the current project. we can always fetch the cached version + // even if the no_cache flag was passed, as the cache was updated prior in + // `CheckEnvelope`. project - .send(GetProjectState) + .send(GetProjectState::new()) .map_err(ProcessingError::ScheduleFailed) .and_then(|result| result.map_err(ProcessingError::ProjectFailed)) .map(|state| (envelope, state)) diff --git a/relay-server/src/actors/project.rs b/relay-server/src/actors/project.rs index c5f1df27c3f..b0294c0d47c 100644 --- a/relay-server/src/actors/project.rs +++ b/relay-server/src/actors/project.rs @@ -1,5 +1,5 @@ use std::sync::Arc; -use std::time::Instant; +use std::time::{Duration, Instant}; use actix::prelude::*; use chrono::{DateTime, Utc}; @@ -372,6 +372,36 @@ pub struct PublicKeyConfig { pub numeric_id: Option, } +struct StateChannel { + sender: oneshot::Sender>, + receiver: Shared>>, + no_cache: bool, +} + +impl StateChannel { + pub fn new() -> Self { + let (sender, receiver) = oneshot::channel(); + Self { + sender, + receiver: receiver.shared(), + no_cache: false, + } + } + + pub fn no_cache(&mut self, no_cache: bool) -> &mut Self { + self.no_cache = no_cache; + self + } + + pub fn receiver(&self) -> Shared>> { + self.receiver.clone() + } + + pub fn send(self, state: Arc) { + self.sender.send(state).ok(); + } +} + /// Actor representing organization and project configuration for a project key. /// /// This actor no longer uniquely identifies a project. Instead, it identifies a project key. @@ -381,8 +411,9 @@ pub struct Project { config: Arc, manager: Addr, state: Option>, - state_channel: Option>>>, + state_channel: Option, rate_limits: RateLimits, + last_no_cache: Instant, } impl Project { @@ -394,6 +425,7 @@ impl Project { state: None, state_channel: None, rate_limits: RateLimits::new(), + last_no_cache: Instant::now(), } } @@ -403,17 +435,31 @@ impl Project { fn get_or_fetch_state( &mut self, + mut no_cache: bool, context: &mut Context, ) -> Response, ProjectError> { // count number of times we are looking for the project state metric!(counter(RelayCounters::ProjectStateGet) += 1); + // Allow at most 1 no_cache request per second. Gracefully degrade to cached requests. + if no_cache { + if self.last_no_cache.elapsed() < Duration::from_secs(1) { + no_cache = false; + } else { + metric!(counter(RelayCounters::ProjectStateNoCache) += 1); + self.last_no_cache = Instant::now(); + } + } + let state = self.state.as_ref(); let outdated = state .map(|s| s.outdated(&self.config)) .unwrap_or(Outdated::HardOutdated); let cached_state = match (state, outdated) { + // Never use the cached state if `no_cache` is set. + _ if no_cache => None, + // There is no project state that can be used, fetch a state and return it. (None, _) | (_, Outdated::HardOutdated) => None, @@ -424,16 +470,26 @@ impl Project { (Some(state), Outdated::Updated) => return Response::ok(state.clone()), }; - let channel = match self.state_channel { - Some(ref channel) => { + let receiver = match self.state_channel { + Some(ref channel) if channel.no_cache || !no_cache => { relay_log::debug!("project {} state request amended", self.public_key); - channel.clone() + channel.receiver() } - None => { + _ => { relay_log::debug!("project {} state requested", self.public_key); - let channel = self.fetch_state(context); - self.state_channel = Some(channel.clone()); - channel + + let receiver = self + .state_channel + .get_or_insert_with(StateChannel::new) + .no_cache(no_cache) + .receiver(); + + // Either there is no running request, or the current request does not have + // `no_cache` set. In both cases, start a new request. All in-flight receivers will + // get the latest state. + self.fetch_state(no_cache, context); + + receiver } }; @@ -441,36 +497,46 @@ impl Project { return Response::ok(rv); } - let future = channel + let future = receiver .map(|shared| (*shared).clone()) .map_err(|_| ProjectError::FetchFailed); Response::future(future) } - fn fetch_state( - &mut self, - context: &mut Context, - ) -> Shared>> { - let (sender, receiver) = oneshot::channel(); + fn fetch_state(&mut self, no_cache: bool, context: &mut Context) { + debug_assert!(self.state_channel.is_some()); let public_key = self.public_key; self.manager - .send(FetchProjectState { public_key }) + .send(FetchProjectState { + public_key, + no_cache, + }) .into_actor(self) .map(move |state_result, slf, _ctx| { + let channel = match slf.state_channel.take() { + Some(channel) => channel, + None => return, + }; + + // If the channel has `no_cache` set but we are not a `no_cache` request, we have + // been superseeded. Put it back and let the other request take precedence. + if channel.no_cache && !no_cache { + slf.state_channel = Some(channel); + return; + } + slf.state_channel = None; slf.state = state_result.map(|resp| resp.state).ok(); if let Some(ref state) = slf.state { relay_log::debug!("project state {} updated", public_key); - sender.send(state.clone()).ok(); + channel.send(state.clone()); } }) .drop_err() .spawn(context); - - receiver.shared() } fn get_scoping(&mut self, meta: &RequestMeta) -> Scoping { @@ -532,7 +598,7 @@ impl Actor for Project { /// /// This is used for cases when we only want to perform operations that do /// not require waiting for network requests. -/// +#[derive(Debug)] pub struct GetCachedProjectState; impl Message for GetCachedProjectState { @@ -553,8 +619,24 @@ impl Handler for Project { /// Returns the project state. /// -/// The project state is fetched if it is missing or outdated. -pub struct GetProjectState; +/// The project state is fetched if it is missing or outdated. If `no_cache` is specified, then the +/// state is always refreshed. +#[derive(Debug)] +pub struct GetProjectState { + no_cache: bool, +} + +impl GetProjectState { + /// Fetches the project state and uses the cached version if up-to-date. + pub fn new() -> Self { + Self { no_cache: false } + } + + /// Fetches the project state and conditionally skips the cache. + pub fn no_cache(no_cache: bool) -> Self { + Self { no_cache } + } +} impl Message for GetProjectState { type Result = Result, ProjectError>; @@ -563,8 +645,8 @@ impl Message for GetProjectState { impl Handler for Project { type Result = Response, ProjectError>; - fn handle(&mut self, _message: GetProjectState, context: &mut Context) -> Self::Result { - self.get_or_fetch_state(context) + fn handle(&mut self, message: GetProjectState, context: &mut Context) -> Self::Result { + self.get_or_fetch_state(message.no_cache, context) } } @@ -629,13 +711,17 @@ impl Handler for Project { if message.fetch { // Project state fetching is allowed, so ensure the state is fetched and up-to-date. // This will return synchronously if the state is still cached. - self.get_or_fetch_state(context) + self.get_or_fetch_state(message.envelope.meta().no_cache(), context) .into_actor() .map(self, context, move |_, slf, _ctx| { slf.check_envelope_scoped(message) }) } else { - self.get_or_fetch_state(context); + // Preload the project cache so that it arrives a little earlier in processing. However, + // do not pass `no_cache`. In case the project is rate limited, we do not want to force + // a full reload. + self.get_or_fetch_state(false, context); + // message.fetch == false: Fetching must not block the store request. The EventManager // will later fetch the project state. ActorResponse::ok(self.check_envelope_scoped(message)) diff --git a/relay-server/src/actors/project_cache.rs b/relay-server/src/actors/project_cache.rs index a9c3c6bba21..80cee42ca5d 100644 --- a/relay-server/src/actors/project_cache.rs +++ b/relay-server/src/actors/project_cache.rs @@ -170,7 +170,11 @@ impl Handler for ProjectCache { /// individual requests. #[derive(Clone)] pub struct FetchProjectState { + /// The public key to fetch the project by. pub public_key: ProjectKey, + + /// If true, all caches should be skipped and a fresh state should be computed. + pub no_cache: bool, } #[derive(Debug)] @@ -201,7 +205,8 @@ impl Handler for ProjectCache { type Result = Response; fn handle(&mut self, message: FetchProjectState, _context: &mut Self::Context) -> Self::Result { - let FetchProjectState { public_key } = message; + let public_key = message.public_key; + if let Some(mut entry) = self.projects.get_mut(&public_key) { // Bump the update time of the project in our hashmap to evade eviction. Eviction is a // sequential scan over self.projects, so this needs to be as fast as possible and diff --git a/relay-server/src/actors/project_upstream.rs b/relay-server/src/actors/project_upstream.rs index 1dc09cf09b0..dc677ea7320 100644 --- a/relay-server/src/actors/project_upstream.rs +++ b/relay-server/src/actors/project_upstream.rs @@ -40,6 +40,8 @@ pub struct GetProjectStates { pub public_keys: Vec, #[serde(default)] pub full_config: bool, + #[serde(default)] + pub no_cache: bool, } #[derive(Debug, Deserialize, Serialize)] @@ -77,6 +79,7 @@ struct ProjectStateChannel { sender: oneshot::Sender>, receiver: Shared>>, deadline: Instant, + no_cache: bool, } impl ProjectStateChannel { @@ -87,9 +90,14 @@ impl ProjectStateChannel { sender, receiver: receiver.shared(), deadline: Instant::now() + timeout, + no_cache: false, } } + pub fn no_cache(&mut self) { + self.no_cache = true; + } + pub fn send(self, state: ProjectState) { self.sender.send(Arc::new(state)).ok(); } @@ -183,6 +191,8 @@ impl UpstreamProjectSource { // num_batches. Worst case, we're left with one project per request, but that's fine. let actual_batch_size = (total_count + (total_count % num_batches)) / num_batches; + // TODO(ja): This mixes requests with no_cache. Separate out channels with no_cache: true? + let requests: Vec<_> = channels .into_iter() .chunks(actual_batch_size) @@ -198,6 +208,7 @@ impl UpstreamProjectSource { let query = GetProjectStates { public_keys: channels_batch.keys().copied().collect(), full_config: self.config.processing_enabled(), + no_cache: channels_batch.values().any(|c| c.no_cache), }; // count number of http requests for project states @@ -324,6 +335,10 @@ impl Handler for UpstreamProjectSource { } let query_timeout = self.config.query_timeout(); + let FetchProjectState { + public_key, + no_cache, + } = message; // There's an edge case where a project is represented by two Project actors. This can // happen if our project eviction logic removes an actor from `project_cache.projects` @@ -336,9 +351,15 @@ impl Handler for UpstreamProjectSource { // channel for our current `message.id`. let channel = self .state_channels - .entry(message.public_key) + .entry(public_key) .or_insert_with(|| ProjectStateChannel::new(query_timeout)); + // Ensure upstream skips caches if one of the recipients requests an uncached response. This + // operation is additive across requests. + if no_cache { + channel.no_cache(); + } + Box::new( channel .receiver() diff --git a/relay-server/src/endpoints/project_configs.rs b/relay-server/src/endpoints/project_configs.rs index 0ae4b9ac2e0..93867728ed4 100644 --- a/relay-server/src/endpoints/project_configs.rs +++ b/relay-server/src/endpoints/project_configs.rs @@ -66,6 +66,7 @@ fn get_project_configs( ) -> ResponseFuture, Error> { let relay = body.relay; let full = relay.internal && body.inner.full_config; + let no_cache = body.inner.no_cache; let futures = body.inner.public_keys.into_iter().map(move |public_key| { let relay = relay.clone(); @@ -73,7 +74,11 @@ fn get_project_configs( .project_cache() .send(GetProject { public_key }) .map_err(Error::from) - .and_then(|project| project.send(GetProjectState).map_err(Error::from)) + .and_then(move |project| { + project + .send(GetProjectState::no_cache(no_cache)) + .map_err(Error::from) + }) .map(move |project_state| { let project_state = project_state.ok()?; // If public key is known (even if rate-limited, which is Some(false)), it has diff --git a/relay-server/src/extractors/request_meta.rs b/relay-server/src/extractors/request_meta.rs index b02cb4d550b..3030b3bcf84 100644 --- a/relay-server/src/extractors/request_meta.rs +++ b/relay-server/src/extractors/request_meta.rs @@ -153,6 +153,14 @@ const fn default_version() -> u16 { relay_common::PROTOCOL_VERSION } +fn is_false(value: &bool) -> bool { + !*value +} + +fn make_false() -> bool { + false +} + /// Request information for sentry ingest data, such as events, envelopes or metrics. #[derive(Debug, Clone, Serialize, Deserialize)] pub struct RequestMeta { @@ -183,6 +191,10 @@ pub struct RequestMeta { #[serde(default, skip_serializing_if = "Option::is_none")] user_agent: Option, + /// A flag that indicates that project options caching should be bypassed. + #[serde(default = "make_false", skip_serializing_if = "is_false")] + no_cache: bool, + /// The time at which the request started. // // NOTE: This is internal-only and not exposed to Envelope headers. @@ -234,6 +246,11 @@ impl RequestMeta { self.user_agent.as_deref() } + /// Indicates that caches should be bypassed. + pub fn no_cache(&self) -> bool { + self.no_cache + } + /// The time at which the request started. pub fn start_time(&self) -> Instant { self.start_time @@ -252,6 +269,7 @@ impl RequestMeta { remote_addr: Some("192.168.0.1".parse().unwrap()), forwarded_for: String::new(), user_agent: Some("sentry/agent".to_string()), + no_cache: false, start_time: Instant::now(), } } @@ -350,6 +368,9 @@ impl PartialMeta { if self.user_agent.is_some() { complete.user_agent = self.user_agent; } + if self.no_cache { + complete.no_cache = true; + } complete } @@ -436,9 +457,12 @@ impl FromRequest for RequestMeta { let config = request.state().config(); let upstream = config.upstream_descriptor(); + let (public_key, key_flags) = + ProjectKey::parse_with_flags(auth.public_key()).map_err(BadEventMeta::BadPublicKey)?; + let dsn = PartialDsn { scheme: upstream.scheme(), - public_key: ProjectKey::parse(auth.public_key()).map_err(BadEventMeta::BadPublicKey)?, + public_key, host: upstream.host().to_owned(), port: upstream.port(), path: String::new(), @@ -458,6 +482,7 @@ impl FromRequest for RequestMeta { .get(header::USER_AGENT) .and_then(|h| h.to_str().ok()) .map(str::to_owned), + no_cache: key_flags.contains(&"no-cache"), start_time, }) } diff --git a/relay-server/src/metrics.rs b/relay-server/src/metrics.rs index cd1cd2f430e..017fecb3775 100644 --- a/relay-server/src/metrics.rs +++ b/relay-server/src/metrics.rs @@ -335,6 +335,15 @@ pub enum RelayCounters { /// Note that after an update loop has completed, there may be more projects pending updates. /// This is indicated by `project_state.pending`. ProjectStateRequest, + /// Number of times a project config was requested with `.no-cache`. + /// + /// This effectively counts the number of envelopes or events that have been sent with a + /// corresponding DSN. Actual queries to the upstream may still be deduplicated for these + /// project state requests. + /// + /// A maximum of 1 such requests per second is allowed per project key. This metric counts only + /// permitted requests. + ProjectStateNoCache, /// Number of times a project is looked up from the cache. /// /// The cache may contain and outdated or expired project state. In that case, the project state @@ -433,6 +442,7 @@ impl CounterMetric for RelayCounters { RelayCounters::Outcomes => "events.outcomes", RelayCounters::ProjectStateGet => "project_state.get", RelayCounters::ProjectStateRequest => "project_state.request", + RelayCounters::ProjectStateNoCache => "project_state.no_cache", RelayCounters::ProjectCacheHit => "project_cache.hit", RelayCounters::ProjectCacheMiss => "project_cache.miss", RelayCounters::ServerStarting => "server.starting", diff --git a/relay-server/src/utils/dynamic_sampling.rs b/relay-server/src/utils/dynamic_sampling.rs index 1144ec4be71..5a42cd7c89c 100644 --- a/relay-server/src/utils/dynamic_sampling.rs +++ b/relay-server/src/utils/dynamic_sampling.rs @@ -340,7 +340,7 @@ pub fn sample_transaction( }); Box::new(fut) as ResponseFuture<_, _> } else { - let fut = project.send(GetProjectState).then(|project_state| { + let fut = project.send(GetProjectState::new()).then(|project_state| { let project_state = match project_state { // error getting the project, give up and return envelope unchanged Err(_) => return Ok(envelope),