Skip to content

Commit ffbb917

Browse files
committed
downloads_counter: batch download count writes to the db
1 parent 8423bab commit ffbb917

File tree

10 files changed

+192
-64
lines changed

10 files changed

+192
-64
lines changed

Cargo.lock

Lines changed: 11 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,8 @@ tracing = "0.1"
8686
tracing-subscriber = "0.2"
8787
url = "2.1"
8888

89+
dashmap = { version = "4.0.2", features = ["raw-api"] }
90+
8991
[dev-dependencies]
9092
claim = "0.4.0"
9193
conduit-test = "0.9.0-alpha.4"

src/app.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ use crate::{db, Config, Env};
44
use std::{sync::Arc, time::Duration};
55

66
use crate::github::GitHubClient;
7+
use crate::downloads_counter::DownloadsCounter;
78
use diesel::r2d2;
89
use oauth2::basic::BasicClient;
910
use reqwest::blocking::Client;
@@ -32,6 +33,9 @@ pub struct App {
3233
/// The server configuration
3334
pub config: Config,
3435

36+
/// Count downloads and periodically persist them in the database
37+
pub downloads_counter: DownloadsCounter,
38+
3539
/// A configured client for outgoing HTTP requests
3640
///
3741
/// In production this shares a single connection pool across requests. In tests
@@ -131,6 +135,7 @@ impl App {
131135
github_oauth,
132136
session_key: config.session_key.clone(),
133137
config,
138+
downloads_counter: DownloadsCounter::new(),
134139
http_client,
135140
}
136141
}

src/bin/server.rs

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,9 +49,12 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
4949

5050
let config = cargo_registry::Config::default();
5151
let client = Client::new();
52+
let app = Arc::new(App::new(config.clone(), Some(client)));
5253

53-
let app = App::new(config.clone(), Some(client));
54-
let app = cargo_registry::build_handler(Arc::new(app));
54+
// Start the background thread periodically persisting download counts to the database.
55+
downloads_counter_thread(app.clone());
56+
57+
let app = cargo_registry::build_handler(app);
5558

5659
// On every server restart, ensure the categories available in the database match
5760
// the information in *src/categories.toml*.
@@ -184,3 +187,17 @@ where
184187
})
185188
.unwrap();
186189
}
190+
191+
fn downloads_counter_thread(app: Arc<App>) {
192+
let interval = Duration::from_millis(
193+
(app.config.downloads_persist_interval_ms / app.downloads_counter.shards_count()) as u64,
194+
);
195+
196+
std::thread::spawn(move || loop {
197+
std::thread::sleep(interval);
198+
199+
if let Err(err) = app.downloads_counter.persist_next_shard(&app) {
200+
println!("downloads_counter error: {}", err);
201+
}
202+
});
203+
}

src/config.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ pub struct Config {
1919
pub blocked_traffic: Vec<(String, Vec<String>)>,
2020
pub domain_name: String,
2121
pub allowed_origins: Vec<String>,
22+
pub downloads_persist_interval_ms: usize,
2223
}
2324

2425
impl Default for Config {
@@ -45,6 +46,7 @@ impl Default for Config {
4546
/// - `READ_ONLY_REPLICA_URL`: The URL of an optional postgres read-only replica database.
4647
/// - `BLOCKED_TRAFFIC`: A list of headers and environment variables to use for blocking
4748
///. traffic. See the `block_traffic` module for more documentation.
49+
/// - `DOWNLOADS_PERSIST_INTERVAL_MS`: how frequent to persist download counts (in ms).
4850
fn default() -> Config {
4951
let api_protocol = String::from("https");
5052
let mirror = if dotenv::var("MIRROR").is_ok() {
@@ -144,6 +146,13 @@ impl Default for Config {
144146
blocked_traffic: blocked_traffic(),
145147
domain_name: domain_name(),
146148
allowed_origins,
149+
downloads_persist_interval_ms: dotenv::var("DOWNLOADS_PERSIST_INTERVAL_MS")
150+
.map(|interval| {
151+
interval
152+
.parse()
153+
.expect("invalid DOWNLOADS_PERSIST_INTERVAL_MS")
154+
})
155+
.unwrap_or(60_000), // 1 minute
147156
}
148157
}
149158
}

src/controllers/version/downloads.rs

Lines changed: 20 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -20,20 +20,33 @@ pub fn download(req: &mut dyn RequestExt) -> EndpointResult {
2020
let crate_name = &req.params()["crate_id"];
2121
let version = &req.params()["version"];
2222

23-
let (crate_name, was_counted) = increment_download_counts(req, recorder, crate_name, version)?;
23+
let (version_id, crate_name): (_, String) = {
24+
use self::versions::dsl::*;
25+
26+
let conn = recorder.record("get_conn", || req.db_conn())?;
27+
28+
// Returns the crate name as stored in the database, or an error if we could
29+
// not load the version ID from the database.
30+
recorder.record("get_version", || {
31+
versions
32+
.inner_join(crates::table)
33+
.select((id, crates::name))
34+
.filter(Crate::with_name(crate_name))
35+
.filter(num.eq(version))
36+
.first(&*conn)
37+
})?
38+
};
39+
40+
// The increment does not happen instantly, but it's deferred to be executed in a batch
41+
// along with other downloads. See crate::downloads_counter for the implementation.
42+
req.app().downloads_counter.increment(version_id);
2443

2544
let redirect_url = req
2645
.app()
2746
.config
2847
.uploader
2948
.crate_location(&crate_name, version);
3049

31-
// Adding log metadata requires &mut access, so we have to defer this step until
32-
// after the (immutable) query parameters are no longer used.
33-
if !was_counted {
34-
req.log_metadata("uncounted_dl", "true");
35-
}
36-
3750
if req.wants_json() {
3851
#[derive(Serialize)]
3952
struct R {
@@ -45,42 +58,6 @@ pub fn download(req: &mut dyn RequestExt) -> EndpointResult {
4558
}
4659
}
4760

48-
/// Increment the download counts for a given crate version.
49-
///
50-
/// Returns the crate name as stored in the database, or an error if we could
51-
/// not load the version ID from the database.
52-
///
53-
/// This ignores any errors that occur updating the download count. Failure is
54-
/// expected if the application is in read only mode, or for API-only mirrors.
55-
/// Even if failure occurs for unexpected reasons, we would rather have `cargo
56-
/// build` succeed and not count the download than break people's builds.
57-
fn increment_download_counts(
58-
req: &dyn RequestExt,
59-
recorder: TimingRecorder,
60-
crate_name: &str,
61-
version: &str,
62-
) -> AppResult<(String, bool)> {
63-
use self::versions::dsl::*;
64-
65-
let conn = recorder.record("get_conn", || req.db_conn())?;
66-
67-
let (version_id, crate_name) = recorder.record("get_version", || {
68-
versions
69-
.inner_join(crates::table)
70-
.select((id, crates::name))
71-
.filter(Crate::with_name(crate_name))
72-
.filter(num.eq(version))
73-
.first(&*conn)
74-
})?;
75-
76-
// Wrap in a transaction so we don't poison the outer transaction if this
77-
// fails
78-
let res = recorder.record("update_count", || {
79-
conn.transaction(|| VersionDownload::create_or_increment(version_id, &conn))
80-
});
81-
Ok((crate_name, res.is_ok()))
82-
}
83-
8461
/// Handles the `GET /crates/:crate_id/:version/downloads` route.
8562
pub fn downloads(req: &mut dyn RequestExt) -> EndpointResult {
8663
let (crate_name, semver) = extract_crate_name_and_semver(req)?;

src/downloads_counter.rs

Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
use crate::App;
2+
use anyhow::Error;
3+
use dashmap::{DashMap, SharedValue};
4+
use diesel::{pg::upsert::excluded, prelude::*};
5+
use std::collections::HashMap;
6+
use std::sync::atomic::{AtomicUsize, Ordering};
7+
8+
/// crates.io receives a lot of download requests, and we can't execute a write query to the
9+
/// database during each connection for performance reasons. To reduce the write load, this struct
10+
/// collects the pending updates from the current process and writes in batch.
11+
///
12+
/// To avoid locking the whole data structure behind a RwLock, which could potentially delay
13+
/// requests, this uses the dashmap crate. A DashMap has the same public API as an HashMap, but
14+
/// stores the items into `num_cpus()*4` individually locked shards. This approach reduces the
15+
/// likelyhood of a request encountering a locked shard.
16+
///
17+
/// Persisting the download counts in the database also takes advantage of the inner sharding of
18+
/// DashMaps: to avoid locking all the download requests at the same time each iteration only
19+
/// persists a single shard at the time.
20+
///
21+
/// The disadvantage of this approach is that download counts are stored in memory until they're
22+
/// persisted, so it's possible to lose some of them if the process exits ungracefully. While
23+
/// that's far from ideal, the advantage of batching database updates far outweights potentially
24+
/// losing some download counts.
25+
#[derive(Debug)]
26+
pub struct DownloadsCounter {
27+
inner: DashMap<i32, AtomicUsize>,
28+
shard_idx: AtomicUsize,
29+
pending_count: AtomicUsize,
30+
}
31+
32+
impl DownloadsCounter {
33+
pub(crate) fn new() -> Self {
34+
Self {
35+
inner: DashMap::new(),
36+
shard_idx: AtomicUsize::new(0),
37+
pending_count: AtomicUsize::new(0),
38+
}
39+
}
40+
41+
pub(crate) fn increment(&self, version_id: i32) {
42+
self.pending_count.fetch_add(1, Ordering::SeqCst);
43+
44+
if let Some(counter) = self.inner.get(&version_id) {
45+
counter.value().fetch_add(1, Ordering::SeqCst);
46+
} else {
47+
self.inner
48+
.entry(version_id)
49+
.and_modify(|counter| {
50+
counter.fetch_add(1, Ordering::SeqCst);
51+
})
52+
.or_insert_with(|| AtomicUsize::new(1));
53+
}
54+
}
55+
56+
pub fn persist_next_shard(&self, app: &App) -> Result<(), Error> {
57+
let conn = app.primary_database.get()?;
58+
59+
// Replace the next shard in the ring with an empty HashMap (clearing it), and return the
60+
// previous contents for processing. The fetch_add method wraps around on overflow, so it's
61+
// fine to keep incrementing it without resetting.
62+
let shards = self.inner.shards();
63+
let idx = self.shard_idx.fetch_add(1, Ordering::SeqCst) % shards.len();
64+
let shard = std::mem::take(&mut *shards[idx].write());
65+
66+
let stats = self.persist_shard(&conn, shard)?;
67+
println!(
68+
"download_counter shard={} counted_versions={} counted_downloads={} pending_downloads={}",
69+
idx,
70+
stats.counted_versions,
71+
stats.counted_downloads,
72+
stats.pending_downloads,
73+
);
74+
75+
Ok(())
76+
}
77+
78+
fn persist_shard(
79+
&self,
80+
conn: &PgConnection,
81+
shard: HashMap<i32, SharedValue<AtomicUsize>>,
82+
) -> Result<PersistStats, Error> {
83+
use crate::schema::version_downloads::dsl::*;
84+
85+
let mut counted_downloads = 0;
86+
let mut counted_versions = 0;
87+
let mut to_insert = Vec::new();
88+
for (key, atomic) in shard.iter() {
89+
let count = atomic.get().load(Ordering::SeqCst);
90+
counted_downloads += count;
91+
counted_versions += 1;
92+
93+
to_insert.push((version_id.eq(*key), downloads.eq(count as i32)));
94+
}
95+
96+
if !to_insert.is_empty() {
97+
diesel::insert_into(version_downloads)
98+
.values(&to_insert)
99+
.on_conflict((version_id, date))
100+
.do_update()
101+
.set(downloads.eq(downloads + excluded(downloads)))
102+
.execute(conn)?;
103+
}
104+
105+
let old_pending = self
106+
.pending_count
107+
.fetch_sub(counted_downloads, Ordering::SeqCst);
108+
Ok(PersistStats {
109+
counted_downloads,
110+
counted_versions,
111+
pending_downloads: old_pending - counted_downloads,
112+
})
113+
}
114+
115+
pub fn shards_count(&self) -> usize {
116+
self.inner.shards().len()
117+
}
118+
}
119+
120+
struct PersistStats {
121+
counted_downloads: usize,
122+
counted_versions: usize,
123+
pending_downloads: usize,
124+
}

src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ pub mod background_jobs;
3636
pub mod boot;
3737
mod config;
3838
pub mod db;
39+
mod downloads_counter;
3940
pub mod email;
4041
pub mod git;
4142
pub mod github;

src/models/download.rs

Lines changed: 0 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,4 @@
11
use chrono::NaiveDate;
2-
use diesel::prelude::*;
3-
42
use crate::models::Version;
53
use crate::schema::version_downloads;
64

@@ -14,20 +12,3 @@ pub struct VersionDownload {
1412
pub date: NaiveDate,
1513
pub processed: bool,
1614
}
17-
18-
impl VersionDownload {
19-
pub fn create_or_increment(version: i32, conn: &PgConnection) -> QueryResult<()> {
20-
use self::version_downloads::dsl::*;
21-
22-
// We only update the counter for *today* (the default date),
23-
// nothing else. We have lots of other counters, but they're
24-
// all updated later on via the update-downloads script.
25-
diesel::insert_into(version_downloads)
26-
.values(version_id.eq(version))
27-
.on_conflict((version_id, date))
28-
.do_update()
29-
.set(downloads.eq(downloads + 1))
30-
.execute(conn)?;
31-
Ok(())
32-
}
33-
}

src/tests/util/test_app.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -315,6 +315,7 @@ fn simple_config() -> Config {
315315
blocked_traffic: Default::default(),
316316
domain_name: "crates.io".into(),
317317
allowed_origins: Vec::new(),
318+
downloads_persist_interval_ms: 1000,
318319
}
319320
}
320321

0 commit comments

Comments
 (0)