Skip to content

Commit dee2494

Browse files
authored
feat(server): Garbage collector thread for project cache eviction (#1410)
Spawn a background thread in ProjectCache that takes care of dropping Project objects, to shorten cache eviction times. Switch from std::collections::HashMap to hashbrown::HashMap to take advantage of drain_filter.
1 parent d3b9778 commit dee2494

File tree

7 files changed

+143
-9
lines changed

7 files changed

+143
-9
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
**Internal**:
1010

1111
- Refactor tokio-based Addr from healthcheck to be generic. ([#1405](https://github.com/relay/pull/1405))
12+
- Defer dropping of projects to a background thread to speed up project cache eviction. ([#1410](https://github.com/getsentry/relay/pull/1410))
1213

1314
**Features**:
1415

Cargo.lock

Lines changed: 12 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

relay-server/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ flate2 = "1.0.19"
3838
fragile = { version = "1.2.1", features = ["slab"] } # used for vendoring sentry-actix
3939
futures01 = { version = "0.1.28", package = "futures" }
4040
futures = { version = "0.3", package = "futures", features = ["compat"] }
41+
hashbrown = "0.12.3"
4142
itertools = "0.8.2"
4243
json-forensics = { version = "*", git = "https://github.com/getsentry/rust-json-forensics" }
4344
lazy_static = "1.4.0"

relay-server/src/actors/project_cache.rs

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
use std::collections::HashMap;
21
use std::sync::Arc;
32
use std::time::Instant;
43

@@ -20,8 +19,8 @@ use crate::actors::project::{Project, ProjectState};
2019
use crate::actors::project_local::LocalProjectSource;
2120
use crate::actors::project_upstream::UpstreamProjectSource;
2221
use crate::envelope::Envelope;
23-
use crate::statsd::{RelayCounters, RelayHistograms, RelayTimers};
24-
use crate::utils::{ActorResponse, EnvelopeContext, Response};
22+
use crate::statsd::{RelayCounters, RelayGauges, RelayHistograms, RelayTimers};
23+
use crate::utils::{ActorResponse, EnvelopeContext, GarbageDisposal, Response};
2524

2625
use super::project::ExpiryState;
2726

@@ -41,11 +40,12 @@ impl ResponseError for ProjectError {}
4140

4241
pub struct ProjectCache {
4342
config: Arc<Config>,
44-
projects: HashMap<ProjectKey, Project>,
43+
projects: hashbrown::HashMap<ProjectKey, Project>, // need hashbrown because drain_filter is not stable in std yet
4544
local_source: Addr<LocalProjectSource>,
4645
upstream_source: Addr<UpstreamProjectSource>,
4746
#[cfg(feature = "processing")]
4847
redis_source: Option<Addr<RedisProjectSource>>,
48+
garbage_disposal: GarbageDisposal<Project>,
4949
}
5050

5151
impl ProjectCache {
@@ -66,11 +66,12 @@ impl ProjectCache {
6666

6767
ProjectCache {
6868
config,
69-
projects: HashMap::new(),
69+
projects: hashbrown::HashMap::new(),
7070
local_source,
7171
upstream_source,
7272
#[cfg(feature = "processing")]
7373
redis_source,
74+
garbage_disposal: GarbageDisposal::new(),
7475
}
7576
}
7677

@@ -85,8 +86,18 @@ impl ProjectCache {
8586
let eviction_start = Instant::now();
8687
let delta = 2 * self.config.project_cache_expiry() + self.config.project_grace_period();
8788

88-
self.projects
89-
.retain(|_, entry| entry.last_updated_at() + delta > eviction_start);
89+
let expired = self
90+
.projects
91+
.drain_filter(|_, entry| entry.last_updated_at() + delta <= eviction_start);
92+
93+
// Defer dropping the projects to a dedicated thread:
94+
for (_, project) in expired {
95+
self.garbage_disposal.dispose(project);
96+
}
97+
98+
// Log garbage queue size:
99+
let queue_size = self.garbage_disposal.queue_size() as f64;
100+
relay_statsd::metric!(gauge(RelayGauges::ProjectCacheGarbageQueueSize) = queue_size);
90101

91102
metric!(timer(RelayTimers::ProjectStateEvictionDuration) = eviction_start.elapsed());
92103
}

relay-server/src/statsd.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,16 @@ pub enum RelayGauges {
55
/// The state of Relay with respect to the upstream connection.
66
/// Possible values are `0` for normal operations and `1` for a network outage.
77
NetworkOutage,
8+
9+
/// The number of items currently in the garbage disposal queue.
10+
ProjectCacheGarbageQueueSize,
811
}
912

1013
impl GaugeMetric for RelayGauges {
1114
fn name(&self) -> &'static str {
1215
match self {
1316
RelayGauges::NetworkOutage => "upstream.network_outage",
17+
RelayGauges::ProjectCacheGarbageQueueSize => "project_cache.garbage.queue_size",
1418
}
1519
}
1620
}

relay-server/src/utils/garbage.rs

Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
use std::sync::atomic::{AtomicUsize, Ordering};
2+
use std::sync::{mpsc, Arc};
3+
use std::thread::JoinHandle;
4+
5+
/// Garbage disposal agent.
6+
///
7+
/// Spawns a background thread which drops items sent to it via [`GarbageDisposal::dispose`].
8+
pub struct GarbageDisposal<T> {
9+
tx: mpsc::Sender<T>,
10+
queue_size: Arc<AtomicUsize>,
11+
}
12+
13+
impl<T: Send + 'static> GarbageDisposal<T> {
14+
/// Returns a new instance plus a handle to join on the background thread.
15+
/// Currently only used in tests.
16+
fn new_joinable() -> (Self, JoinHandle<()>) {
17+
let (tx, rx) = mpsc::channel();
18+
19+
let queue_size = Arc::new(AtomicUsize::new(0));
20+
let queue_size_clone = queue_size.clone();
21+
let join_handle = std::thread::spawn(move || {
22+
relay_log::debug!("Start garbage collection thread");
23+
while let Ok(object) = rx.recv() {
24+
queue_size_clone.fetch_sub(1, Ordering::Relaxed); // Wraps around on overflow
25+
drop(object);
26+
}
27+
relay_log::debug!("Stop garbage collection thread");
28+
});
29+
30+
(Self { tx, queue_size }, join_handle)
31+
}
32+
33+
/// Spawns a new garbage disposal instance.
34+
/// Every instance has its own background thread that received items to be dropped via
35+
/// [`Self::dispose`].
36+
/// When the instance is dropped, the background thread stops automatically.
37+
pub fn new() -> Self {
38+
let (instance, _) = Self::new_joinable();
39+
instance
40+
}
41+
42+
/// Defers dropping an object by sending it to the background thread.
43+
pub fn dispose(&self, object: T) {
44+
self.queue_size.fetch_add(1, Ordering::Relaxed);
45+
self.tx
46+
.send(object)
47+
.map_err(|e| {
48+
relay_log::error!("Failed to send object to garbage disposal thread, drop here");
49+
drop(e.0);
50+
})
51+
.ok();
52+
}
53+
54+
/// Get current queue size.
55+
pub fn queue_size(&self) -> usize {
56+
self.queue_size.load(Ordering::Relaxed)
57+
}
58+
}
59+
60+
#[cfg(test)]
61+
mod tests {
62+
use std::{
63+
sync::{Arc, Mutex},
64+
thread::ThreadId,
65+
};
66+
67+
use super::GarbageDisposal;
68+
69+
struct SomeStruct {
70+
thread_ids: Arc<Mutex<Vec<ThreadId>>>,
71+
}
72+
73+
impl Drop for SomeStruct {
74+
fn drop(&mut self) {
75+
self.thread_ids
76+
.lock()
77+
.unwrap()
78+
.push(std::thread::current().id())
79+
}
80+
}
81+
82+
#[test]
83+
fn test_garbage_disposal() {
84+
let thread_ids = Arc::new(Mutex::new(Vec::<ThreadId>::new()));
85+
86+
let x1 = SomeStruct {
87+
thread_ids: thread_ids.clone(),
88+
};
89+
drop(x1);
90+
91+
let x2 = SomeStruct {
92+
thread_ids: thread_ids.clone(),
93+
};
94+
95+
let (garbage, join_handle) = GarbageDisposal::new_joinable();
96+
garbage.dispose(x2);
97+
drop(garbage); // breaks the while loop by dropping rx
98+
join_handle.join().ok(); // wait for thread to finish its work
99+
100+
let thread_ids = thread_ids.lock().unwrap();
101+
assert_eq!(thread_ids.len(), 2);
102+
assert_eq!(thread_ids[0], std::thread::current().id());
103+
assert!(thread_ids[0] != thread_ids[1]);
104+
}
105+
}

relay-server/src/utils/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ mod api;
33
mod dynamic_sampling;
44
mod envelope_context;
55
mod error_boundary;
6+
mod garbage;
67
mod multipart;
78
mod param_parser;
89
mod rate_limits;
@@ -25,6 +26,7 @@ pub use self::api::*;
2526
pub use self::dynamic_sampling::*;
2627
pub use self::envelope_context::*;
2728
pub use self::error_boundary::*;
29+
pub use self::garbage::*;
2830
pub use self::multipart::*;
2931
pub use self::param_parser::*;
3032
pub use self::rate_limits::*;

0 commit comments

Comments
 (0)