diff --git a/Cargo.lock b/Cargo.lock index c7ce57c..48b50ff 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -445,9 +445,9 @@ checksum = "9555578bc9e57714c812a1f84e4fc5b4d21fcb063490c624de019f7464c91268" [[package]] name = "chrono" -version = "0.4.41" +version = "0.4.42" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c469d952047f47f91b68d1cba3f10d63c11d73e4636f24f08daf0278abf01c4d" +checksum = "145052bdd345b87320e369255277e3fb5152762ad123a901ef5c262dd38fe8d2" dependencies = [ "num-traits", "serde", @@ -1805,6 +1805,7 @@ dependencies = [ "reqwest", "same-file", "sarlacc", + "seize", "serde", "serde_json", "tempfile", @@ -2234,12 +2235,12 @@ dependencies = [ [[package]] name = "seize" -version = "0.5.0" +version = "0.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e4b8d813387d566f627f3ea1b914c068aac94c40ae27ec43f5f33bde65abefe7" +checksum = "5b55fb86dfd3a2f5f76ea78310a88f96c4ea21a3031f8d212443d56123fd0521" dependencies = [ "libc", - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 576258e..3b4acd8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,7 +20,7 @@ missing-docs-in-private-items = "warn" [dependencies] axum = { version = "0.8.4", default-features = false, features = [ "http1", "http2", "tokio", "tracing", "query" ] } -chrono = { version = "0.4.41", default-features = false, features = [ "serde", "now" ] } +chrono = { version = "0.4.42", default-features = false, features = [ "serde", "now" ] } clap = { version = "4.5.41", features = ["derive"] } eyre = "0.6.12" futures = { version = "0.3.31", default-features = false, features = [ "alloc" ] } @@ -34,6 +34,7 @@ rand = { version = "0.9.2", default-features = false, features = [ "thread_rng" reqwest = { version = "0.12.22", features = ["stream", "json"] } same-file = "1.0.6" sarlacc = "0.1.4" +seize = "0.5.1" # sarlacc = { path = "../sarlacc" } serde = { version = "1.0.219", features = ["derive"] } serde_json = "1.0.141" diff --git a/src/routes.rs b/src/routes.rs index 9e888f4..7fec43d 100644 --- a/src/routes.rs +++ b/src/routes.rs @@ -489,7 +489,7 @@ mod tests { http::{Request, Uri, header}, response::IntoResponse, }; - use chrono::Utc; + use eyre::eyre; use http_body_util::BodyExt; use indoc::indoc; @@ -500,10 +500,7 @@ mod tests { use tower::{Service, ServiceExt}; use tower_http::catch_panic::ResponseForPanic; - use crate::{ - stats::{TIMEZONE, UNKNOWN_ORIGIN}, - webring::Webring, - }; + use crate::{stats::UNKNOWN_ORIGIN, webring::Webring}; use super::{OriginUriLocation, PanicResponse, RouteError, create_router}; @@ -559,8 +556,6 @@ mod tests { async fn index() { let (router, webring, tmpfiles) = app().await; - let today = Utc::now().with_timezone(&TIMEZONE).date_naive(); - // Request `/` let res = router .oneshot( @@ -578,10 +573,7 @@ mod tests { .unwrap(); assert_eq!("Hello homepage!", text); assert_eq!(status, StatusCode::OK); - webring.assert_stat_entry( - (today, "kasad.com", "ring.purduehackers.com", "kasad.com"), - 1, - ); + webring.assert_stat_entry(("kasad.com", "ring.purduehackers.com", "kasad.com"), 1); drop(tmpfiles); } @@ -590,8 +582,6 @@ mod tests { async fn index_unknown_referer() { let (router, webring, tmpfiles) = app().await; - let today = Utc::now().with_timezone(&TIMEZONE).date_naive(); - let res = router .oneshot( Request::builder() @@ -610,7 +600,6 @@ mod tests { assert_eq!(status, StatusCode::OK); webring.assert_stat_entry( ( - today, UNKNOWN_ORIGIN.as_str(), "ring.purduehackers.com", UNKNOWN_ORIGIN.as_str(), @@ -625,8 +614,6 @@ mod tests { async fn visit() { let (router, webring, tmpfiles) = app().await; - let today = Utc::now().with_timezone(&TIMEZONE).date_naive(); - let res = router .oneshot( Request::builder() @@ -644,7 +631,6 @@ mod tests { assert_eq!(res.status(), StatusCode::SEE_OTHER); webring.assert_stat_entry( ( - today, "ring.purduehackers.com", "clementine.viridian.page", "ring.purduehackers.com", diff --git a/src/stats/mod.rs b/src/stats/mod.rs index 2489653..3c7d11b 100644 --- a/src/stats/mod.rs +++ b/src/stats/mod.rs @@ -4,19 +4,22 @@ #![allow(dead_code)] #![allow(unused_variables)] +use core::fmt::Debug; use std::{ + io::Write, net::IpAddr, sync::{ - LazyLock, - atomic::{AtomicU64, Ordering}, + Arc, LazyLock, Mutex, + atomic::{AtomicI32, AtomicPtr, AtomicU64, Ordering}, }, }; use axum::http::uri::Authority; -use chrono::{DateTime, Duration, FixedOffset, NaiveDate, Utc}; -use papaya::HashMap; +use chrono::{DateTime, Duration, FixedOffset, Utc}; +use papaya::{Guard, HashMap}; use sarlacc::Intern; -use tracing::{info, instrument}; +use seize::Collector; +use tracing::{error, info, instrument}; /// The TTL for IP tracking entries, after which they are considered stale and removed. const IP_TRACKING_TTL: chrono::TimeDelta = Duration::days(1); @@ -36,34 +39,136 @@ struct IpInfo { started_from: Intern, } -#[derive(Debug, Default)] -struct AggregatedStats { - /// (Date (with timezone `TIMEZONE`), From, To, Started From) → Count - #[expect(clippy::type_complexity)] - counters: HashMap< - ( - NaiveDate, - Intern, - Intern, - Intern, - ), - AtomicU64, - >, +type Counts = HashMap<(Intern, Intern, Intern), AtomicU64>; + +/// The counts for all of the possible webring redirects +struct AggregatedStats { + /// The collector for the atomic data that we're handling + collector: Arc, + /// The last date that a redirect was tracked (hopefully today) + today: AtomicI32, + /// (From, To, Started From) → Count + /// Invariant: This MUST ALWAYS be a valid pointer + counter: AtomicPtr, + /// The writer for the statistics output file + output: Arc>, } -#[derive(Debug, Default)] -pub struct Stats { +impl Debug for AggregatedStats { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("AggregatedStats") + .field("today", &self.today.load(Ordering::Relaxed)) + .finish_non_exhaustive() + } +} + +/// Convert the current time into the days since the epoch +fn mk_num(time: DateTime) -> i32 { + time.date_naive().to_epoch_days() +} + +impl AggregatedStats { + /// Create a new `AggregatedStats` + fn new(now: DateTime, writer: W) -> Self { + let counter: Counts = HashMap::default(); + + AggregatedStats { + today: AtomicI32::new(mk_num(now)), + counter: AtomicPtr::new(Box::into_raw(Box::new(counter))), + collector: Arc::new(Collector::new()), + output: Arc::new(Mutex::new(writer)), + } + } + + /// Retrieve the current counter from a guard + fn counter<'a>(&'a self, guard: &'a impl Guard) -> &'a Counts { + // SAFETY: The counter is guaranteed to be a valid pointer and we are using Acquire ordering to synchronize-with its initialization + unsafe { &*guard.protect(&self.counter, Ordering::Acquire) } + } + + /// Retrieve the current counter from a guard while updating it if the current time is a new calendar date + fn maybe_update_counter<'a>(&'a self, now: DateTime, guard: &'a impl Guard) -> &'a Counts { + let now = mk_num(now); + + let mut prev_day = self.today.load(Ordering::Relaxed); + + // If our "now" time is in the past (perhaps tasks got out of order or something), we want to count this redirect towards the most recent day rather than getting rid of the newest day and replacing it with data intended for the oldest day. + + while prev_day < now { + match self + .today + .compare_exchange(prev_day, now, Ordering::Relaxed, Ordering::Relaxed) + { + Ok(_) => break, + Err(new_prev_day) => { + prev_day = new_prev_day; + } + } + } + + if prev_day < now { + let new_counter: *mut Counts = Box::into_raw(Box::new(HashMap::new())); + + // We need this guard to go into our task so it needs to be owned + let guard_owned = self.collector.enter(); + + // Release to synchronize-with `counter` and Acquire to ensure that we can see the initialization of the previous one so that we can properly access and write it. + let prev_ptr = guard_owned.swap(&self.counter, new_counter, Ordering::AcqRel); + + let output = Arc::clone(&self.output); + + // Allow it to be moved to our task + let prev_ptr = prev_ptr as usize; + + let this_collector = Arc::clone(&self.collector); + + tokio::task::spawn_blocking(move || { + let mut output = output.lock().unwrap(); + + let prev_ptr = prev_ptr as *mut Counts; + // SAFETY: Since this pointer hasn't been retired yet, we have access to it until we do retire it. + let prev = unsafe { &*prev_ptr }.pin(); + + for ((from, to, started_from), count) in &prev { + let count = count.load(Ordering::Relaxed); + if let Err(e) = output.write_fmt(format_args!( + "{prev_day},{from},{to},{started_from},{count}\n" + )) { + error!("Error writing statistics: {e}"); + } + } + + // SAFETY: The pointer can no longer be accessed from a new location since we previously overwrote the atomic pointer, and `Box::from_raw` is the correct way to drop the pointer. This task is also finished with its access to it. + unsafe { this_collector.retire(prev_ptr, |ptr, _| drop(Box::from_raw(ptr))) } + }); + } + + self.counter(guard) + } +} + +/// Statistics tracking for the webring +pub struct Stats { /// Aggregated statistics - aggregated: AggregatedStats, + aggregated: AggregatedStats, /// Map of IP information keyed by IP address ip_tracking: HashMap, } -impl Stats { +impl Debug for Stats { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Stats") + .field("aggregated", &self.aggregated) + .field("ip_tracking", &self.ip_tracking) + .finish() + } +} + +impl Stats { /// Creates a new instance of `Stats`. - pub fn new() -> Stats { + pub fn new(now: DateTime, writer: W) -> Stats { Stats { - aggregated: AggregatedStats::default(), + aggregated: AggregatedStats::new(now, writer), ip_tracking: HashMap::new(), } } @@ -96,11 +201,9 @@ impl Stats { }, ); - let date = now.with_timezone(&TIMEZONE).date_naive(); - - let pinned_map = self.aggregated.counters.pin(); - let counter = - pinned_map.get_or_insert((date, from, to, ip_info.started_from), AtomicU64::new(0)); + let guard = self.aggregated.collector.enter(); + let pinned_map = self.aggregated.maybe_update_counter(now, &guard).pin(); + let counter = pinned_map.get_or_insert((from, to, ip_info.started_from), AtomicU64::new(0)); counter.fetch_add(1, Ordering::Relaxed); } @@ -125,17 +228,17 @@ impl Stats { } #[cfg(test)] - pub fn assert_stat_entry(&self, entry: (NaiveDate, &str, &str, &str), count: u64) { + pub fn assert_stat_entry(&self, entry: (&str, &str, &str), count: u64) { + let guard = self.aggregated.collector.enter(); assert_eq!( self.aggregated - .counters + .counter(&guard) .pin() .get(&( - entry.0, + Intern::new(entry.0.parse::().unwrap()), Intern::new(entry.1.parse::().unwrap()), Intern::new(entry.2.parse::().unwrap()), - Intern::new(entry.3.parse::().unwrap()), - )) + ),) .map_or(0, |v| v.load(Ordering::Relaxed)), count, "{self:#?}\n{entry:?}" @@ -145,11 +248,13 @@ impl Stats { #[cfg(test)] mod tests { - use std::net::IpAddr; + use std::{collections::HashSet, net::IpAddr, str::from_utf8}; use axum::http::uri::Authority; use chrono::{DateTime, Duration, NaiveDate, Utc}; + use indoc::indoc; use sarlacc::Intern; + use tokio::sync::mpsc::{self, UnboundedReceiver}; use crate::stats::IP_TRACKING_TTL; @@ -171,9 +276,37 @@ mod tests { t(timestamp).with_timezone(&TIMEZONE).date_naive() } + struct TestWriter(mpsc::UnboundedSender>); + + impl std::io::Write for TestWriter { + fn write(&mut self, buf: &[u8]) -> std::io::Result { + self.0.send(buf.to_owned()).unwrap(); + Ok(buf.len()) + } + + fn flush(&mut self) -> std::io::Result<()> { + Ok(()) + } + } + + async fn assert_same_data(rx: &mut UnboundedReceiver>, expected: &str) { + let mut data = Vec::new(); + while data.len() != expected.len() { + data.extend(rx.recv().await.unwrap()); + } + assert_eq!( + from_utf8(&data) + .unwrap() + .split('\n') + .collect::>(), + expected.split('\n').collect::>() + ); + } + #[tokio::test] async fn test_stat_tracking() { - let stats = Stats::new(); + let (tx, mut rx) = mpsc::unbounded_channel(); + let stats = Stats::new(t(0), TestWriter(tx)); stats.redirected_impl(a("0.0.0.0"), i("a.com"), i("b.com"), t(0)); stats.redirected_impl(a("0.0.0.0"), i("b.com"), i("c.com"), t(1)); @@ -182,11 +315,12 @@ mod tests { stats.redirected_impl(a("1.0.0.0"), i("b.com"), i("homepage.com"), t(2)); stats.redirected_impl(a("1.0.0.0"), i("homepage.com"), i("c.com"), t(3)); - assert_eq!(stats.aggregated.counters.len(), 4); - stats.assert_stat_entry((d(0), "a.com", "b.com", "a.com"), 2); - stats.assert_stat_entry((d(0), "b.com", "c.com", "a.com"), 1); - stats.assert_stat_entry((d(0), "b.com", "homepage.com", "a.com"), 1); - stats.assert_stat_entry((d(0), "homepage.com", "c.com", "a.com"), 1); + let guard = stats.aggregated.collector.enter(); + assert_eq!(stats.aggregated.counter(&guard).len(), 4); + stats.assert_stat_entry(("a.com", "b.com", "a.com"), 2); + stats.assert_stat_entry(("b.com", "c.com", "a.com"), 1); + stats.assert_stat_entry(("b.com", "homepage.com", "a.com"), 1); + stats.assert_stat_entry(("homepage.com", "c.com", "a.com"), 1); let tracking = stats.ip_tracking.pin(); assert_eq!(tracking.len(), 2); @@ -221,16 +355,36 @@ mod tests { let day = Duration::days(1); + assert!(rx.is_empty()); stats.redirected_impl(a("0.0.0.0"), i("b.com"), i("c.com"), t(1) + day); + assert_same_data( + &mut rx, + indoc! {" + 0,a.com,b.com,a.com,2 + 0,b.com,c.com,a.com,1 + 0,b.com,homepage.com,a.com,1 + 0,homepage.com,c.com,a.com,1 + "}, + ) + .await; + stats.redirected_impl(a("0.0.0.0"), i("b.com"), i("c.com"), t(4)); stats.redirected_impl(a("0.0.0.0"), i("c.com"), i("a.com"), t(2) + day); - assert_eq!(stats.aggregated.counters.len(), 6); - stats.assert_stat_entry((d(0), "a.com", "b.com", "a.com"), 2); - stats.assert_stat_entry((d(0), "b.com", "c.com", "a.com"), 1); - stats.assert_stat_entry((d(0), "b.com", "homepage.com", "a.com"), 1); - stats.assert_stat_entry((d(0), "homepage.com", "c.com", "a.com"), 1); - stats.assert_stat_entry((d(day.num_seconds()), "b.com", "c.com", "b.com"), 1); - stats.assert_stat_entry((d(day.num_seconds()), "b.com", "c.com", "b.com"), 1); - stats.assert_stat_entry((d(day.num_seconds()), "c.com", "a.com", "b.com"), 1); + assert_eq!(stats.aggregated.counter(&guard).len(), 2); + stats.assert_stat_entry(("b.com", "c.com", "b.com"), 2); + stats.assert_stat_entry(("c.com", "a.com", "b.com"), 1); + + stats.redirected_impl(a("0.0.0.0"), i("c.com"), i("a.com"), t(2) + day + day); + assert_same_data( + &mut rx, + indoc! {" + 1,b.com,c.com,b.com,2 + 1,c.com,a.com,b.com,1 + "}, + ) + .await; + + assert_eq!(stats.aggregated.counter(&guard).len(), 1); + stats.assert_stat_entry(("c.com", "a.com", "b.com"), 1); } } diff --git a/src/webring.rs b/src/webring.rs index e106cd2..809a864 100644 --- a/src/webring.rs +++ b/src/webring.rs @@ -1,6 +1,7 @@ //! Ring behavior and data structures use std::{ + io::{Empty, empty}, net::IpAddr, path::{Path, PathBuf}, sync::{ @@ -10,7 +11,7 @@ use std::{ }; use axum::http::{Uri, uri::Authority}; -use chrono::TimeDelta; +use chrono::{TimeDelta, Utc}; use futures::{StreamExt, future::join, stream::FuturesUnordered}; use indexmap::IndexMap; use notify::{EventKind, RecommendedWatcher, RecursiveMode, Watcher as _}; @@ -184,7 +185,7 @@ pub struct Webring { /// Discord notifier for notifying members of issues with their sites notifier: Option>, /// Statistics collected about the ring - stats: Arc, + stats: Arc>, /// Current configuration of the webring, used for detecting changes when reloading config: Arc>>, } @@ -199,7 +200,7 @@ impl Webring { members: RwLock::new(member_map_from_config_table(&config.members)), static_dir_path: config.webring.static_dir.clone(), homepage: AsyncRwLock::new(None), - stats: Arc::new(Stats::new()), + stats: Arc::new(Stats::new(Utc::now(), empty())), file_watcher: OnceLock::default(), base_address: config.webring.base_url(), notifier: config @@ -619,7 +620,7 @@ impl Webring { } #[cfg(test)] - pub fn assert_stat_entry(&self, entry: (chrono::NaiveDate, &str, &str, &str), count: u64) { + pub fn assert_stat_entry(&self, entry: (&str, &str, &str), count: u64) { self.stats.assert_stat_entry(entry, count); } } @@ -644,7 +645,7 @@ mod tests { use std::{ collections::HashSet, fs::{File, OpenOptions}, - io::Write as _, + io::{Write as _, empty}, path::PathBuf, sync::{ Arc, OnceLock, RwLock, @@ -670,7 +671,7 @@ mod tests { use crate::{ config::{Config, MemberSpec}, discord::{DiscordNotifier, NOTIFICATION_DEBOUNCE_PERIOD, Snowflake}, - stats::{TIMEZONE, UNKNOWN_ORIGIN}, + stats::{Stats, UNKNOWN_ORIGIN}, webring::{CheckLevel, Webring}, }; @@ -687,7 +688,7 @@ mod tests { base_address: Intern::default(), base_authority: Intern::new("ring.purduehackers.com".parse().unwrap()), notifier: None, - stats: Arc::default(), + stats: Arc::new(Stats::new(Utc::now(), empty())), config: Arc::new(AsyncRwLock::new(None)), } } @@ -818,21 +819,11 @@ mod tests { assert_eq!(*inner, expected); } - let today = Utc::now().with_timezone(&TIMEZONE).date_naive(); - webring.assert_next( "https://hrovnyak.gitlab.io/bruh/bruh/bruh?bruh=bruh", Ok("kasad.com"), ); - webring.assert_stat_entry( - ( - today, - "hrovnyak.gitlab.io", - "kasad.com", - "hrovnyak.gitlab.io", - ), - 1, - ); + webring.assert_stat_entry(("hrovnyak.gitlab.io", "kasad.com", "hrovnyak.gitlab.io"), 1); webring.assert_prev( "https://hrovnyak.gitlab.io/bruh/bruh/bruh?bruh=bruh", @@ -840,7 +831,6 @@ mod tests { ); webring.assert_stat_entry( ( - today, "hrovnyak.gitlab.io", "refuse-the-r.ing", "hrovnyak.gitlab.io", @@ -1018,15 +1008,12 @@ mod tests { .unwrap(); let webring = Webring::new(&config); - let today = Utc::now().with_timezone(&TIMEZONE).date_naive(); - let uri = webring .random_page(None, "0.0.0.0".parse().unwrap()) .unwrap(); assert_eq!(uri, Intern::new("kasad.com".parse().unwrap())); webring.assert_stat_entry( ( - today, UNKNOWN_ORIGIN.as_str(), "kasad.com", UNKNOWN_ORIGIN.as_str(), @@ -1044,8 +1031,6 @@ mod tests { "# }).unwrap(); let webring = Webring::new(&config); - let today = Utc::now().with_timezone(&TIMEZONE).date_naive(); - let uri = webring .random_page( Some(&"clementine.viridian.page".parse().unwrap()), @@ -1055,7 +1040,6 @@ mod tests { assert_eq!(uri, Intern::new("kasad.com".parse().unwrap())); webring.assert_stat_entry( ( - today, "clementine.viridian.page", "kasad.com", "clementine.viridian.page",