Skip to content

Commit c22ba7c

Browse files
committed
downloads_counter: handle versions being deleted before persisting
1 parent b082f00 commit c22ba7c

File tree

1 file changed

+90
-19
lines changed

1 file changed

+90
-19
lines changed

src/downloads_counter.rs

Lines changed: 90 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use crate::App;
22
use anyhow::Error;
33
use dashmap::{DashMap, SharedValue};
44
use diesel::{pg::upsert::excluded, prelude::*};
5-
use std::collections::HashMap;
5+
use std::collections::{HashMap, HashSet};
66
use std::sync::atomic::{AtomicI64, AtomicUsize, Ordering};
77

88
/// crates.io receives a lot of download requests, and we can't execute a write query to the
@@ -102,18 +102,16 @@ impl DownloadsCounter {
102102
conn: &PgConnection,
103103
shard: HashMap<i32, SharedValue<AtomicUsize>>,
104104
) -> Result<PersistStats, Error> {
105-
use crate::schema::version_downloads::dsl::*;
105+
use crate::schema::{version_downloads, versions};
106106

107+
let mut discarded_downloads = 0;
107108
let mut counted_downloads = 0;
108109
let mut counted_versions = 0;
109-
let mut to_insert = Vec::new();
110-
for (key, atomic) in shard.iter() {
111-
let count = atomic.get().load(Ordering::SeqCst);
112-
counted_downloads += count;
113-
counted_versions += 1;
114110

115-
to_insert.push((*key, count));
116-
}
111+
let mut to_insert = shard
112+
.iter()
113+
.map(|(id, atomic)| (*id, atomic.get().load(Ordering::SeqCst)))
114+
.collect::<Vec<_>>();
117115

118116
if !to_insert.is_empty() {
119117
// The rows we're about to insert need to be sorted to avoid deadlocks when multiple
@@ -132,22 +130,63 @@ impl DownloadsCounter {
132130
//
133131
to_insert.sort_by_key(|(key, _)| *key);
134132

135-
let to_insert = to_insert
133+
// Our database schema enforces that every row in the `version_downloads` table points
134+
// to a valid version in the `versions` table with a foreign key. This doesn't cause
135+
// problems most of the times, as the rest of the application checks whether the
136+
// version exists before calling the `increment` method.
137+
//
138+
// On rare occasions crates are deleted from crates.io though, and that would break the
139+
// invariant if the crate is deleted after the `increment` method is called but before
140+
// the downloads are persisted in the database.
141+
//
142+
// That happening would cause the whole `INSERT` to fail, also losing the downloads in
143+
// the shard we were about to persist. To avoid that from happening this snippet does a
144+
// `SELECT` query on the version table before persisting to check whether every version
145+
// still exists in the database. Missing versions are removed from the following query.
146+
let version_ids = to_insert.iter().map(|(id, _)| *id).collect::<Vec<_>>();
147+
let existing_version_ids: HashSet<i32> = versions::table
148+
.select(versions::id)
149+
// `FOR SHARE` prevents updates or deletions on the selected rows in the `versions`
150+
// table until this transaction commits. That prevents a version from being deleted
151+
// between this query and the next one.
152+
//
153+
// `FOR SHARE` is used instead of `FOR UPDATE` to allow rows to be locked by
154+
// multiple `SELECT` transactions, to allow for concurrent downloads persisting.
155+
.for_share()
156+
.filter(versions::id.eq_any(version_ids))
157+
.load(conn)?
136158
.into_iter()
137-
.map(|(key, count)| (version_id.eq(key), downloads.eq(count as i32)))
138-
.collect::<Vec<_>>();
159+
.collect();
160+
161+
let mut values = Vec::new();
162+
for (id, count) in &to_insert {
163+
if !existing_version_ids.contains(id) {
164+
discarded_downloads += *count;
165+
continue;
166+
}
167+
counted_versions += 1;
168+
counted_downloads += *count;
169+
values.push((
170+
version_downloads::version_id.eq(*id),
171+
version_downloads::downloads.eq(*count as i32),
172+
));
173+
}
139174

140-
diesel::insert_into(version_downloads)
141-
.values(&to_insert)
142-
.on_conflict((version_id, date))
175+
diesel::insert_into(version_downloads::table)
176+
.values(&values)
177+
.on_conflict((version_downloads::version_id, version_downloads::date))
143178
.do_update()
144-
.set(downloads.eq(downloads + excluded(downloads)))
179+
.set(
180+
version_downloads::downloads
181+
.eq(version_downloads::downloads + excluded(version_downloads::downloads)),
182+
)
145183
.execute(conn)?;
146184
}
147185

148-
let old_pending = self
149-
.pending_count
150-
.fetch_sub(counted_downloads as i64, Ordering::SeqCst);
186+
let old_pending = self.pending_count.fetch_sub(
187+
(counted_downloads + discarded_downloads) as i64,
188+
Ordering::SeqCst,
189+
);
151190

152191
Ok(PersistStats {
153192
shard: None,
@@ -318,6 +357,38 @@ mod tests {
318357
state.assert_downloads_count(&conn, v2, 5);
319358
}
320359

360+
#[test]
361+
fn test_increment_missing_version() {
362+
let counter = DownloadsCounter::new();
363+
let conn = crate::db::test_conn();
364+
let mut state = State::new(&conn);
365+
366+
let v1 = state.new_version(&conn);
367+
let v2 = v1 + 1; // Should not exist in the database!
368+
369+
// No error should happen when calling the increment method on a missing version.
370+
counter.increment(v1);
371+
counter.increment(v2);
372+
373+
// No error should happen when persisting. The missing versions should be ignored.
374+
let stats = counter
375+
.persist_all_shards_with_conn(&conn)
376+
.expect("failed to persist download counts");
377+
378+
// The download should not be counted for version 2.
379+
assert_eq!(
380+
stats,
381+
PersistStats {
382+
shard: None,
383+
counted_downloads: 1,
384+
counted_versions: 1,
385+
pending_downloads: 0,
386+
}
387+
);
388+
state.assert_downloads_count(&conn, v1, 1);
389+
state.assert_downloads_count(&conn, v2, 0);
390+
}
391+
321392
struct State {
322393
user: User,
323394
krate: Crate,

0 commit comments

Comments
 (0)