Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
**Internal**:

- Refactor tokio-based Addr from healthcheck to be generic. ([#1405](https://github.com/relay/pull/1405))
- Defer dropping of projects to a background thread to speed up project cache eviction. ([#1410](https://github.com/getsentry/relay/pull/1410))

**Features**:

Expand Down
14 changes: 12 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions relay-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ flate2 = "1.0.19"
fragile = { version = "1.2.1", features = ["slab"] } # used for vendoring sentry-actix
futures01 = { version = "0.1.28", package = "futures" }
futures = { version = "0.3", package = "futures", features = ["compat"] }
hashbrown = "0.12.3"
itertools = "0.8.2"
json-forensics = { version = "*", git = "https://github.com/getsentry/rust-json-forensics" }
lazy_static = "1.4.0"
Expand Down
25 changes: 18 additions & 7 deletions relay-server/src/actors/project_cache.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Instant;

Expand All @@ -20,8 +19,8 @@ use crate::actors::project::{Project, ProjectState};
use crate::actors::project_local::LocalProjectSource;
use crate::actors::project_upstream::UpstreamProjectSource;
use crate::envelope::Envelope;
use crate::statsd::{RelayCounters, RelayHistograms, RelayTimers};
use crate::utils::{ActorResponse, EnvelopeContext, Response};
use crate::statsd::{RelayCounters, RelayGauges, RelayHistograms, RelayTimers};
use crate::utils::{ActorResponse, EnvelopeContext, GarbageDisposal, Response};

use super::project::ExpiryState;

Expand All @@ -41,11 +40,12 @@ impl ResponseError for ProjectError {}

pub struct ProjectCache {
config: Arc<Config>,
projects: HashMap<ProjectKey, Project>,
projects: hashbrown::HashMap<ProjectKey, Project>, // need hashbrown because drain_filter is not stable in std yet
local_source: Addr<LocalProjectSource>,
upstream_source: Addr<UpstreamProjectSource>,
#[cfg(feature = "processing")]
redis_source: Option<Addr<RedisProjectSource>>,
garbage_disposal: GarbageDisposal<Project>,
}

impl ProjectCache {
Expand All @@ -66,11 +66,12 @@ impl ProjectCache {

ProjectCache {
config,
projects: HashMap::new(),
projects: hashbrown::HashMap::new(),
local_source,
upstream_source,
#[cfg(feature = "processing")]
redis_source,
garbage_disposal: GarbageDisposal::new(),
}
}

Expand All @@ -85,8 +86,18 @@ impl ProjectCache {
let eviction_start = Instant::now();
let delta = 2 * self.config.project_cache_expiry() + self.config.project_grace_period();

self.projects
.retain(|_, entry| entry.last_updated_at() + delta > eviction_start);
let expired = self
.projects
.drain_filter(|_, entry| entry.last_updated_at() + delta <= eviction_start);

// Defer dropping the projects to a dedicated thread:
for (_, project) in expired {
self.garbage_disposal.dispose(project);
}

// Log garbage queue size:
let queue_size = self.garbage_disposal.queue_size() as f64;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this alone was enough reason for me to report queue size as u64 rather than usize :)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you clarify? You could equally have the queue_size as usize here and then cast this to an f64.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wait, why is this f64 and not u64? Only just noticed that and now I'm confused.

  • Metrics are reported as u64.
  • We don't really know what sizes a channel can get up to as this is not described AFAIK, but we can probably assume we'd crash before we reach the max size.
  • That means deciding to expose it as a usize is arbitrary as exposing it as a u64.
  • But now here you are lead to believe that the true value is represented by a usize and you're supposedly casting here. This is misleading.

relay_statsd::metric!(gauge(RelayGauges::ProjectCacheGarbageQueueSize) = queue_size);

metric!(timer(RelayTimers::ProjectStateEvictionDuration) = eviction_start.elapsed());
}
Expand Down
4 changes: 4 additions & 0 deletions relay-server/src/statsd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,16 @@ pub enum RelayGauges {
/// The state of Relay with respect to the upstream connection.
/// Possible values are `0` for normal operations and `1` for a network outage.
NetworkOutage,

/// The number of items currently in the garbage disposal queue.
ProjectCacheGarbageQueueSize,
}

impl GaugeMetric for RelayGauges {
fn name(&self) -> &'static str {
match self {
RelayGauges::NetworkOutage => "upstream.network_outage",
RelayGauges::ProjectCacheGarbageQueueSize => "project_cache.garbage.queue_size",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if all this was to get rid of the nonsensical tag, I'd have rather gone with passing a string for the tag into the constructor. but whatever, either will do.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The main motivation was to get rid of the i % 100 condition for logging. The utility class now does not concern itself with statsd, but simply offers a method to query its queue size, and it's up to the caller when and how to log that information, which I think is a better separation of concerns.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(note i approved so i'm not asking for changes)
The metrics are either needed or not, it shouldn't be up to the caller to think of hooking up the metrics, we're not building a library but an application (and even then, but that's a larger topic).

}
}
}
Expand Down
105 changes: 105 additions & 0 deletions relay-server/src/utils/garbage.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{mpsc, Arc};
use std::thread::JoinHandle;

/// Garbage disposal agent.
///
/// Spawns a background thread which drops items sent to it via [`GarbageDisposal::dispose`].
pub struct GarbageDisposal<T> {
tx: mpsc::Sender<T>,
queue_size: Arc<AtomicUsize>,
}

impl<T: Send + 'static> GarbageDisposal<T> {
/// Returns a new instance plus a handle to join on the background thread.
/// Currently only used in tests.
fn new_joinable() -> (Self, JoinHandle<()>) {
let (tx, rx) = mpsc::channel();

let queue_size = Arc::new(AtomicUsize::new(0));
let queue_size_clone = queue_size.clone();
let join_handle = std::thread::spawn(move || {
relay_log::debug!("Start garbage collection thread");
while let Ok(object) = rx.recv() {
queue_size_clone.fetch_sub(1, Ordering::Relaxed); // Wraps around on overflow
drop(object);
}
relay_log::debug!("Stop garbage collection thread");
});

(Self { tx, queue_size }, join_handle)
}

/// Spawns a new garbage disposal instance.
/// Every instance has its own background thread that received items to be dropped via
/// [`Self::dispose`].
/// When the instance is dropped, the background thread stops automatically.
pub fn new() -> Self {
let (instance, _) = Self::new_joinable();
instance
}

/// Defers dropping an object by sending it to the background thread.
pub fn dispose(&self, object: T) {
self.queue_size.fetch_add(1, Ordering::Relaxed);
self.tx
.send(object)
.map_err(|e| {
relay_log::error!("Failed to send object to garbage disposal thread, drop here");
drop(e.0);
})
.ok();
}

/// Get current queue size.
pub fn queue_size(&self) -> usize {
self.queue_size.load(Ordering::Relaxed)
}
}

#[cfg(test)]
mod tests {
use std::{
sync::{Arc, Mutex},
thread::ThreadId,
};

use super::GarbageDisposal;

struct SomeStruct {
thread_ids: Arc<Mutex<Vec<ThreadId>>>,
}

impl Drop for SomeStruct {
fn drop(&mut self) {
self.thread_ids
.lock()
.unwrap()
.push(std::thread::current().id())
}
}

#[test]
fn test_garbage_disposal() {
let thread_ids = Arc::new(Mutex::new(Vec::<ThreadId>::new()));

let x1 = SomeStruct {
thread_ids: thread_ids.clone(),
};
drop(x1);

let x2 = SomeStruct {
thread_ids: thread_ids.clone(),
};

let (garbage, join_handle) = GarbageDisposal::new_joinable();
garbage.dispose(x2);
drop(garbage); // breaks the while loop by dropping rx
join_handle.join().ok(); // wait for thread to finish its work

let thread_ids = thread_ids.lock().unwrap();
assert_eq!(thread_ids.len(), 2);
assert_eq!(thread_ids[0], std::thread::current().id());
assert!(thread_ids[0] != thread_ids[1]);
}
}
2 changes: 2 additions & 0 deletions relay-server/src/utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ mod api;
mod dynamic_sampling;
mod envelope_context;
mod error_boundary;
mod garbage;
mod multipart;
mod param_parser;
mod rate_limits;
Expand All @@ -25,6 +26,7 @@ pub use self::api::*;
pub use self::dynamic_sampling::*;
pub use self::envelope_context::*;
pub use self::error_boundary::*;
pub use self::garbage::*;
pub use self::multipart::*;
pub use self::param_parser::*;
pub use self::rate_limits::*;
Expand Down