Skip to content

Commit bc58bb7

Browse files
committed
Auto merge of #3413 - pietroalbini:downloads-counter, r=jtgeibel
Reduce database writes for the download endpoint crates.io receives a lot of download requests, and we're reaching a point where executing a write query to PostgreSQL for each request is causing operational issues. Over the past few weeks we've been getting regular pages due to download requests being slow to execute or outright fail for a couple of seconds, due to the database being overwhelmed. This PR avoids that problem by collecting download counts in memory and periodically persisting them into the database, hopefully reducing the write load coming to our primary database thanks to the new `downloads_counter` module. The implementation is designed to reduce the amount of locking needed as much as possible to prevent requests from being unnecessarily delayed. Instead of storing the counters in a `RwLock<HashMap<i32, AtomicUsize>>`, which would require locking the whole map when a new version needs to be tracked, this uses the [`dashmap`](https://crates.io/crates/dashmap) crate. `DashMap` offers an API similar to the standard library's `HashMap`, but stores the items in `num_cpus()*4` individually locked `RwLock<HashMap<K, V>>` (called "shards") based on the key's hash. Splitting the items in shards allows to reduce how much of the `DashMap` is locked at the same time, with concurrent download requests locking each other only inside a single shard. Persisting download counts also takes advantage of sharding: to avoid "stopping the world" when the background thread starts persisting the counters only one shard is persisted at the time. The current configuration persists all shards over the span of one minute, and that's configurable at runtime with the `DOWNLOADS_PERSIST_INTERVAL_MS` environment variable. To avoid losing downloads the code also persists all the shards at the same time when the server gracefully shuts down. There is still the possibility of losing download counts if the server is killed without a chance of gracefully shutting down (for example due to platform issues), but the possibility of losing less a minute of download counts is far less important than reducing database load. r? `@jtgeibel`
2 parents daa2f4a + f646e45 commit bc58bb7

File tree

11 files changed

+249
-67
lines changed

11 files changed

+249
-67
lines changed

Cargo.lock

Lines changed: 11 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ conduit-static = "0.9.0-alpha.3"
4949

5050
cookie = { version = "0.15", features = ["secure"] }
5151
ctrlc = { version = "3.0", features = ["termination"] }
52+
dashmap = { version = "4.0.2", features = ["raw-api"] }
5253
derive_deref = "1.1.1"
5354
dialoguer = "0.7.1"
5455
diesel = { version = "1.4.0", features = ["postgres", "serde_json", "chrono", "r2d2"] }

src/app.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
use crate::{db, Config, Env};
44
use std::{sync::Arc, time::Duration};
55

6+
use crate::downloads_counter::DownloadsCounter;
67
use crate::github::GitHubClient;
78
use diesel::r2d2;
89
use oauth2::basic::BasicClient;
@@ -32,6 +33,9 @@ pub struct App {
3233
/// The server configuration
3334
pub config: Config,
3435

36+
/// Count downloads and periodically persist them in the database
37+
pub downloads_counter: DownloadsCounter,
38+
3539
/// A configured client for outgoing HTTP requests
3640
///
3741
/// In production this shares a single connection pool across requests. In tests
@@ -131,6 +135,7 @@ impl App {
131135
github_oauth,
132136
session_key: config.session_key.clone(),
133137
config,
138+
downloads_counter: DownloadsCounter::new(),
134139
http_client,
135140
}
136141
}

src/bin/server.rs

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -49,9 +49,12 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
4949

5050
let config = cargo_registry::Config::default();
5151
let client = Client::new();
52+
let app = Arc::new(App::new(config.clone(), Some(client)));
5253

53-
let app = App::new(config.clone(), Some(client));
54-
let app = cargo_registry::build_handler(Arc::new(app));
54+
// Start the background thread periodically persisting download counts to the database.
55+
downloads_counter_thread(app.clone());
56+
57+
let handler = cargo_registry::build_handler(app.clone());
5558

5659
// On every server restart, ensure the categories available in the database match
5760
// the information in *src/categories.toml*.
@@ -100,7 +103,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
100103
.build()
101104
.unwrap();
102105

103-
let handler = Arc::new(conduit_hyper::BlockingHandler::new(app));
106+
let handler = Arc::new(conduit_hyper::BlockingHandler::new(handler));
104107
let make_service =
105108
hyper::service::make_service_fn(move |socket: &hyper::server::conn::AddrStream| {
106109
let addr = socket.remote_addr();
@@ -131,7 +134,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
131134
println!("Booting with a civet based server");
132135
let mut cfg = civet::Config::new();
133136
cfg.port(port).threads(threads).keep_alive(true);
134-
Civet(CivetServer::start(cfg, app).unwrap())
137+
Civet(CivetServer::start(cfg, handler).unwrap())
135138
};
136139

137140
println!("listening on port {}", port);
@@ -164,6 +167,11 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
164167
}
165168
}
166169

170+
println!("Persisting remaining downloads counters");
171+
if let Err(err) = app.downloads_counter.persist_all_shards(&app) {
172+
println!("downloads_counter error: {}", err);
173+
}
174+
167175
println!("Server has gracefully shutdown!");
168176
Ok(())
169177
}
@@ -184,3 +192,17 @@ where
184192
})
185193
.unwrap();
186194
}
195+
196+
fn downloads_counter_thread(app: Arc<App>) {
197+
let interval = Duration::from_millis(
198+
(app.config.downloads_persist_interval_ms / app.downloads_counter.shards_count()) as u64,
199+
);
200+
201+
std::thread::spawn(move || loop {
202+
std::thread::sleep(interval);
203+
204+
if let Err(err) = app.downloads_counter.persist_next_shard(&app) {
205+
println!("downloads_counter error: {}", err);
206+
}
207+
});
208+
}

src/config.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ pub struct Config {
1919
pub blocked_traffic: Vec<(String, Vec<String>)>,
2020
pub domain_name: String,
2121
pub allowed_origins: Vec<String>,
22+
pub downloads_persist_interval_ms: usize,
2223
}
2324

2425
impl Default for Config {
@@ -45,6 +46,7 @@ impl Default for Config {
4546
/// - `READ_ONLY_REPLICA_URL`: The URL of an optional postgres read-only replica database.
4647
/// - `BLOCKED_TRAFFIC`: A list of headers and environment variables to use for blocking
4748
///. traffic. See the `block_traffic` module for more documentation.
49+
/// - `DOWNLOADS_PERSIST_INTERVAL_MS`: how frequent to persist download counts (in ms).
4850
fn default() -> Config {
4951
let api_protocol = String::from("https");
5052
let mirror = if dotenv::var("MIRROR").is_ok() {
@@ -144,6 +146,13 @@ impl Default for Config {
144146
blocked_traffic: blocked_traffic(),
145147
domain_name: domain_name(),
146148
allowed_origins,
149+
downloads_persist_interval_ms: dotenv::var("DOWNLOADS_PERSIST_INTERVAL_MS")
150+
.map(|interval| {
151+
interval
152+
.parse()
153+
.expect("invalid DOWNLOADS_PERSIST_INTERVAL_MS")
154+
})
155+
.unwrap_or(60_000), // 1 minute
147156
}
148157
}
149158
}

src/controllers/version/downloads.rs

Lines changed: 20 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -20,20 +20,33 @@ pub fn download(req: &mut dyn RequestExt) -> EndpointResult {
2020
let crate_name = &req.params()["crate_id"];
2121
let version = &req.params()["version"];
2222

23-
let (crate_name, was_counted) = increment_download_counts(req, recorder, crate_name, version)?;
23+
let (version_id, crate_name): (_, String) = {
24+
use self::versions::dsl::*;
25+
26+
let conn = recorder.record("get_conn", || req.db_conn())?;
27+
28+
// Returns the crate name as stored in the database, or an error if we could
29+
// not load the version ID from the database.
30+
recorder.record("get_version", || {
31+
versions
32+
.inner_join(crates::table)
33+
.select((id, crates::name))
34+
.filter(Crate::with_name(crate_name))
35+
.filter(num.eq(version))
36+
.first(&*conn)
37+
})?
38+
};
39+
40+
// The increment does not happen instantly, but it's deferred to be executed in a batch
41+
// along with other downloads. See crate::downloads_counter for the implementation.
42+
req.app().downloads_counter.increment(version_id);
2443

2544
let redirect_url = req
2645
.app()
2746
.config
2847
.uploader
2948
.crate_location(&crate_name, version);
3049

31-
// Adding log metadata requires &mut access, so we have to defer this step until
32-
// after the (immutable) query parameters are no longer used.
33-
if !was_counted {
34-
req.log_metadata("uncounted_dl", "true");
35-
}
36-
3750
if req.wants_json() {
3851
#[derive(Serialize)]
3952
struct R {
@@ -45,42 +58,6 @@ pub fn download(req: &mut dyn RequestExt) -> EndpointResult {
4558
}
4659
}
4760

48-
/// Increment the download counts for a given crate version.
49-
///
50-
/// Returns the crate name as stored in the database, or an error if we could
51-
/// not load the version ID from the database.
52-
///
53-
/// This ignores any errors that occur updating the download count. Failure is
54-
/// expected if the application is in read only mode, or for API-only mirrors.
55-
/// Even if failure occurs for unexpected reasons, we would rather have `cargo
56-
/// build` succeed and not count the download than break people's builds.
57-
fn increment_download_counts(
58-
req: &dyn RequestExt,
59-
recorder: TimingRecorder,
60-
crate_name: &str,
61-
version: &str,
62-
) -> AppResult<(String, bool)> {
63-
use self::versions::dsl::*;
64-
65-
let conn = recorder.record("get_conn", || req.db_conn())?;
66-
67-
let (version_id, crate_name) = recorder.record("get_version", || {
68-
versions
69-
.inner_join(crates::table)
70-
.select((id, crates::name))
71-
.filter(Crate::with_name(crate_name))
72-
.filter(num.eq(version))
73-
.first(&*conn)
74-
})?;
75-
76-
// Wrap in a transaction so we don't poison the outer transaction if this
77-
// fails
78-
let res = recorder.record("update_count", || {
79-
conn.transaction(|| VersionDownload::create_or_increment(version_id, &conn))
80-
});
81-
Ok((crate_name, res.is_ok()))
82-
}
83-
8461
/// Handles the `GET /crates/:crate_id/:version/downloads` route.
8562
pub fn downloads(req: &mut dyn RequestExt) -> EndpointResult {
8663
let (crate_name, semver) = extract_crate_name_and_semver(req)?;

src/downloads_counter.rs

Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,161 @@
1+
use crate::App;
2+
use anyhow::Error;
3+
use dashmap::{DashMap, SharedValue};
4+
use diesel::{pg::upsert::excluded, prelude::*};
5+
use std::collections::HashMap;
6+
use std::sync::atomic::{AtomicI64, AtomicUsize, Ordering};
7+
8+
/// crates.io receives a lot of download requests, and we can't execute a write query to the
9+
/// database during each connection for performance reasons. To reduce the write load, this struct
10+
/// collects the pending updates from the current process and writes in batch.
11+
///
12+
/// To avoid locking the whole data structure behind a RwLock, which could potentially delay
13+
/// requests, this uses the dashmap crate. A DashMap has the same public API as an HashMap, but
14+
/// stores the items into `num_cpus()*4` individually locked shards. This approach reduces the
15+
/// likelyhood of a request encountering a locked shard.
16+
///
17+
/// Persisting the download counts in the database also takes advantage of the inner sharding of
18+
/// DashMaps: to avoid locking all the download requests at the same time each iteration only
19+
/// persists a single shard at the time.
20+
///
21+
/// The disadvantage of this approach is that download counts are stored in memory until they're
22+
/// persisted, so it's possible to lose some of them if the process exits ungracefully. While
23+
/// that's far from ideal, the advantage of batching database updates far outweights potentially
24+
/// losing some download counts.
25+
#[derive(Debug)]
26+
pub struct DownloadsCounter {
27+
/// Inner storage for the download counts.
28+
inner: DashMap<i32, AtomicUsize>,
29+
/// Index of the next shard that should be persisted by `persist_next_shard`.
30+
shard_idx: AtomicUsize,
31+
/// Number of downloads that are not yet persisted on the database. This is just used as a
32+
/// metric included in log lines, and it's not guaranteed to be accurate.
33+
pending_count: AtomicI64,
34+
}
35+
36+
impl DownloadsCounter {
37+
pub(crate) fn new() -> Self {
38+
Self {
39+
inner: DashMap::new(),
40+
shard_idx: AtomicUsize::new(0),
41+
pending_count: AtomicI64::new(0),
42+
}
43+
}
44+
45+
pub(crate) fn increment(&self, version_id: i32) {
46+
self.pending_count.fetch_add(1, Ordering::SeqCst);
47+
48+
if let Some(counter) = self.inner.get(&version_id) {
49+
// The version is already recorded in the DashMap, so we don't need to lock the whole
50+
// shard in write mode. The shard is instead locked in read mode, which allows an
51+
// unbounded number of readers as long as there are no write locks.
52+
counter.value().fetch_add(1, Ordering::SeqCst);
53+
} else {
54+
// The version is not in the DashMap, so we need to lock the whole shard in write mode
55+
// and insert the version into it. This has worse performance than the above case.
56+
self.inner
57+
.entry(version_id)
58+
.and_modify(|counter| {
59+
// Handle the version being inserted by another thread while we were waiting
60+
// for the write lock on the shard.
61+
counter.fetch_add(1, Ordering::SeqCst);
62+
})
63+
.or_insert_with(|| AtomicUsize::new(1));
64+
}
65+
}
66+
67+
pub fn persist_all_shards(&self, app: &App) -> Result<(), Error> {
68+
let conn = app.primary_database.get()?;
69+
70+
let mut counted_downloads = 0;
71+
let mut counted_versions = 0;
72+
let mut pending_downloads = 0;
73+
for shard in self.inner.shards() {
74+
let shard = std::mem::take(&mut *shard.write());
75+
let stats = self.persist_shard(&conn, shard)?;
76+
77+
counted_downloads += stats.counted_downloads;
78+
counted_versions += stats.counted_versions;
79+
pending_downloads = stats.pending_downloads;
80+
}
81+
82+
println!(
83+
"download_counter all_shards counted_versions={} counted_downloads={} pending_downloads={}",
84+
counted_versions,
85+
counted_downloads,
86+
pending_downloads,
87+
);
88+
89+
Ok(())
90+
}
91+
92+
pub fn persist_next_shard(&self, app: &App) -> Result<(), Error> {
93+
let conn = app.primary_database.get()?;
94+
95+
// Replace the next shard in the ring with an empty HashMap (clearing it), and return the
96+
// previous contents for processing. The fetch_add method wraps around on overflow, so it's
97+
// fine to keep incrementing it without resetting.
98+
let shards = self.inner.shards();
99+
let idx = self.shard_idx.fetch_add(1, Ordering::SeqCst) % shards.len();
100+
let shard = std::mem::take(&mut *shards[idx].write());
101+
102+
let stats = self.persist_shard(&conn, shard)?;
103+
println!(
104+
"download_counter shard={} counted_versions={} counted_downloads={} pending_downloads={}",
105+
idx,
106+
stats.counted_versions,
107+
stats.counted_downloads,
108+
stats.pending_downloads,
109+
);
110+
111+
Ok(())
112+
}
113+
114+
fn persist_shard(
115+
&self,
116+
conn: &PgConnection,
117+
shard: HashMap<i32, SharedValue<AtomicUsize>>,
118+
) -> Result<PersistStats, Error> {
119+
use crate::schema::version_downloads::dsl::*;
120+
121+
let mut counted_downloads = 0;
122+
let mut counted_versions = 0;
123+
let mut to_insert = Vec::new();
124+
for (key, atomic) in shard.iter() {
125+
let count = atomic.get().load(Ordering::SeqCst);
126+
counted_downloads += count;
127+
counted_versions += 1;
128+
129+
to_insert.push((version_id.eq(*key), downloads.eq(count as i32)));
130+
}
131+
132+
if !to_insert.is_empty() {
133+
diesel::insert_into(version_downloads)
134+
.values(&to_insert)
135+
.on_conflict((version_id, date))
136+
.do_update()
137+
.set(downloads.eq(downloads + excluded(downloads)))
138+
.execute(conn)?;
139+
}
140+
141+
let old_pending = self
142+
.pending_count
143+
.fetch_sub(counted_downloads as i64, Ordering::SeqCst);
144+
145+
Ok(PersistStats {
146+
counted_downloads,
147+
counted_versions,
148+
pending_downloads: old_pending - counted_downloads as i64,
149+
})
150+
}
151+
152+
pub fn shards_count(&self) -> usize {
153+
self.inner.shards().len()
154+
}
155+
}
156+
157+
struct PersistStats {
158+
counted_downloads: usize,
159+
counted_versions: usize,
160+
pending_downloads: i64,
161+
}

0 commit comments

Comments
 (0)