diff --git a/clippy.toml b/clippy.toml index 00aa4ed8e..c5f0ebf94 100644 --- a/clippy.toml +++ b/clippy.toml @@ -14,3 +14,8 @@ reason = """ [[disallowed-types]] path = "semver::Version" reason = "use our own custom db::types::version::Version so you can use it with sqlx" + +[[disallowed-types]] +path = "axum_extra::headers::IfNoneMatch" +reason = "use our own custom web::headers::IfNoneMatch for sane behaviour with missing headers" + diff --git a/src/storage/database.rs b/src/storage/database.rs index 4c29d1715..c7d8a7b49 100644 --- a/src/storage/database.rs +++ b/src/storage/database.rs @@ -1,5 +1,5 @@ use super::{BlobUpload, FileRange, StorageMetrics, StreamingBlob}; -use crate::{InstanceMetrics, db::Pool, error::Result}; +use crate::{InstanceMetrics, db::Pool, error::Result, web::headers::compute_etag}; use chrono::{DateTime, Utc}; use futures_util::stream::{Stream, TryStreamExt}; use sqlx::Acquire; @@ -123,6 +123,8 @@ impl DatabaseBackend { }); let content = result.content.unwrap_or_default(); let content_len = content.len(); + + let etag = compute_etag(&content); Ok(StreamingBlob { path: result.path, mime: result @@ -130,6 +132,7 @@ impl DatabaseBackend { .parse() .unwrap_or(mime::APPLICATION_OCTET_STREAM), date_updated: result.date_updated, + etag: Some(etag), content: Box::new(io::Cursor::new(content)), content_length: content_len, compression, diff --git a/src/storage/mod.rs b/src/storage/mod.rs index a316d63a5..ee0ea99b3 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -22,6 +22,7 @@ use crate::{ utils::spawn_blocking, }; use anyhow::{anyhow, bail}; +use axum_extra::headers; use chrono::{DateTime, Utc}; use dashmap::DashMap; use fn_error_context::context; @@ -61,7 +62,7 @@ type FileRange = RangeInclusive; pub(crate) struct PathNotFoundError; /// represents a blob to be uploaded to storage. -#[derive(Clone, Debug, PartialEq, Eq, Hash)] +#[derive(Clone, Debug, PartialEq, Eq)] pub(crate) struct BlobUpload { pub(crate) path: String, pub(crate) mime: Mime, @@ -80,11 +81,12 @@ impl From for BlobUpload { } } -#[derive(Clone, Debug, PartialEq, Eq, Hash)] +#[derive(Clone, Debug, PartialEq, Eq)] pub(crate) struct Blob { pub(crate) path: String, pub(crate) mime: Mime, pub(crate) date_updated: DateTime, + pub(crate) etag: Option, pub(crate) content: Vec, pub(crate) compression: Option, } @@ -99,6 +101,7 @@ pub(crate) struct StreamingBlob { pub(crate) path: String, pub(crate) mime: Mime, pub(crate) date_updated: DateTime, + pub(crate) etag: Option, pub(crate) compression: Option, pub(crate) content_length: usize, pub(crate) content: Box, @@ -110,6 +113,7 @@ impl std::fmt::Debug for StreamingBlob { .field("path", &self.path) .field("mime", &self.mime) .field("date_updated", &self.date_updated) + .field("etag", &self.etag) .field("compression", &self.compression) .finish() } @@ -144,6 +148,7 @@ impl StreamingBlob { ); self.compression = None; + // not touching the etag, it should represent the original content Ok(self) } @@ -158,12 +163,27 @@ impl StreamingBlob { path: self.path, mime: self.mime, date_updated: self.date_updated, + etag: self.etag, // downloading doesn't change the etag content: content.into_inner(), compression: self.compression, }) } } +impl From for StreamingBlob { + fn from(value: Blob) -> Self { + Self { + path: value.path, + mime: value.mime, + date_updated: value.date_updated, + etag: value.etag, + compression: value.compression, + content_length: value.content.len(), + content: Box::new(io::Cursor::new(value.content)), + } + } +} + pub fn get_file_list>(path: P) -> Box>> { let path = path.as_ref().to_path_buf(); if path.is_file() { @@ -590,6 +610,7 @@ impl AsyncStorage { path: format!("{archive_path}/{path}"), mime: detect_mime(path), date_updated: stream.date_updated, + etag: stream.etag, content: stream.content, content_length: stream.content_length, compression: None, @@ -763,7 +784,6 @@ impl AsyncStorage { mime, content, compression: Some(alg), - // this field is ignored by the backend }); } Ok((blobs, file_paths)) @@ -1275,7 +1295,7 @@ pub(crate) fn source_archive_path(name: &str, version: &Version) -> String { #[cfg(test)] mod test { use super::*; - use crate::test::TestEnvironment; + use crate::{test::TestEnvironment, web::headers::compute_etag}; use std::env; use test_case::test_case; @@ -1291,6 +1311,7 @@ mod test { mime: mime::APPLICATION_OCTET_STREAM, date_updated: Utc::now(), compression: alg, + etag: Some(compute_etag(&content)), content_length: content.len(), content: Box::new(io::Cursor::new(content)), } @@ -1432,6 +1453,7 @@ mod test { path: "some_path.db".into(), mime: mime::APPLICATION_OCTET_STREAM, date_updated: Utc::now(), + etag: None, compression: Some(alg), content_length: compressed_index_content.len(), content: Box::new(io::Cursor::new(compressed_index_content)), diff --git a/src/storage/s3.rs b/src/storage/s3.rs index c67a8ee01..b83727ed4 100644 --- a/src/storage/s3.rs +++ b/src/storage/s3.rs @@ -10,6 +10,7 @@ use aws_sdk_s3::{ types::{Delete, ObjectIdentifier, Tag, Tagging}, }; use aws_smithy_types_convert::date_time::DateTimeExt; +use axum_extra::headers; use chrono::Utc; use futures_util::{ future::TryFutureExt, @@ -17,7 +18,7 @@ use futures_util::{ stream::{FuturesUnordered, Stream, StreamExt}, }; use std::sync::Arc; -use tracing::{error, warn}; +use tracing::{error, instrument, warn}; const PUBLIC_ACCESS_TAG: &str = "static-cloudfront-access"; const PUBLIC_ACCESS_VALUE: &str = "allow"; @@ -180,6 +181,7 @@ impl S3Backend { .map(|_| ()) } + #[instrument(skip(self))] pub(super) async fn get_stream( &self, path: &str, @@ -190,7 +192,11 @@ impl S3Backend { .get_object() .bucket(&self.bucket) .key(path) - .set_range(range.map(|r| format!("bytes={}-{}", r.start(), r.end()))) + .set_range( + range + .as_ref() + .map(|r| format!("bytes={}-{}", r.start(), r.end())), + ) .send() .await .convert_errors()?; @@ -204,6 +210,30 @@ impl S3Backend { let compression = res.content_encoding.as_ref().and_then(|s| s.parse().ok()); + let etag = if let Some(mut etag) = res.e_tag { + if let Some(range) = range { + // we can generate a strong, unique etag for a range of the remote object too, + // by just concatenating the original etag with the range start and end. + // + // About edge cases: + // When the etag of the full archive changes after a rebuild, + // all derived etags for files inside the archive will also change. + // + // This could lead to _changed_ ETags, where the single file inside the archive + // is actually the same. + etag = format!("\"{}:{}-{}\"", etag, range.start(), range.end()) + } + match etag.parse::() { + Ok(etag) => Some(etag), + Err(err) => { + warn!(?err, etag, "Failed to parse ETag from S3 response"); + None + } + } + } else { + None + }; + Ok(StreamingBlob { path: path.into(), mime: res @@ -213,6 +243,7 @@ impl S3Backend { .parse() .unwrap_or(mime::APPLICATION_OCTET_STREAM), date_updated, + etag, content_length: res .content_length .and_then(|length| length.try_into().ok()) diff --git a/src/web/file.rs b/src/web/file.rs index a99997c99..c0ef1cd9f 100644 --- a/src/web/file.rs +++ b/src/web/file.rs @@ -1,6 +1,6 @@ //! Database based file handler -use super::cache::CachePolicy; +use super::{cache::CachePolicy, headers::IfNoneMatch}; use crate::{ Config, error::Result, @@ -9,12 +9,14 @@ use crate::{ use axum::{ body::Body, extract::Extension, - http::{ - StatusCode, - header::{CONTENT_TYPE, LAST_MODIFIED}, - }, + http::StatusCode, response::{IntoResponse, Response as AxumResponse}, }; +use axum_extra::{ + TypedHeader, + headers::{ContentType, LastModified}, +}; +use std::time::SystemTime; use tokio_util::io::ReaderStream; #[derive(Debug)] @@ -37,21 +39,10 @@ impl File { } } -impl IntoResponse for File { - fn into_response(self) -> AxumResponse { - ( - StatusCode::OK, - [ - (CONTENT_TYPE, self.0.mime.as_ref()), - ( - LAST_MODIFIED, - &self.0.date_updated.format("%a, %d %b %Y %T %Z").to_string(), - ), - ], - Extension(CachePolicy::ForeverInCdnAndBrowser), - self.0.content, - ) - .into_response() +impl File { + pub fn into_response(self, if_none_match: Option) -> AxumResponse { + let streaming_blob: StreamingBlob = self.0.into(); + StreamingFile(streaming_blob).into_response(if_none_match) } } @@ -63,36 +54,80 @@ impl StreamingFile { pub(super) async fn from_path(storage: &AsyncStorage, path: &str) -> Result { Ok(StreamingFile(storage.get_stream(path).await?)) } -} -impl IntoResponse for StreamingFile { - fn into_response(self) -> AxumResponse { - // Convert the AsyncBufRead into a Stream of Bytes - let stream = ReaderStream::new(self.0.content); - let body = Body::from_stream(stream); - ( - StatusCode::OK, - [ - (CONTENT_TYPE, self.0.mime.as_ref()), - ( - LAST_MODIFIED, - &self.0.date_updated.format("%a, %d %b %Y %T %Z").to_string(), - ), - ], - Extension(CachePolicy::ForeverInCdnAndBrowser), - body, - ) - .into_response() + pub fn into_response(self, if_none_match: Option) -> AxumResponse { + const CACHE_POLICY: CachePolicy = CachePolicy::ForeverInCdnAndBrowser; + + if let Some(ref if_none_match) = if_none_match + && let Some(ref etag) = self.0.etag + && !if_none_match.precondition_passes(etag) + { + ( + StatusCode::NOT_MODIFIED, + TypedHeader(etag.clone()), + // it's generally a good idea to repeat caching headers on 304 responses + Extension(CACHE_POLICY), + ) + .into_response() + } else { + // Convert the AsyncBufRead into a Stream of Bytes + let stream = ReaderStream::new(self.0.content); + + let last_modified: SystemTime = self.0.date_updated.into(); + ( + StatusCode::OK, + TypedHeader(ContentType::from(self.0.mime)), + TypedHeader(LastModified::from(last_modified)), + self.0.etag.map(TypedHeader), + Extension(CACHE_POLICY), + Body::from_stream(stream), + ) + .into_response() + } } } #[cfg(test)] mod tests { use super::*; - use crate::test::TestEnvironment; + use crate::{storage::CompressionAlgorithm, test::TestEnvironment, web::headers::compute_etag}; use chrono::Utc; - use http::header::CACHE_CONTROL; - use std::rc::Rc; + use http::header::{CACHE_CONTROL, LAST_MODIFIED}; + use std::{io, rc::Rc}; + + fn streaming_blob( + content: impl Into>, + alg: Option, + ) -> StreamingBlob { + let content = content.into(); + StreamingBlob { + path: "some_path.db".into(), + mime: mime::APPLICATION_OCTET_STREAM, + date_updated: Utc::now(), + compression: alg, + etag: Some(compute_etag(&content)), + content_length: content.len(), + content: Box::new(io::Cursor::new(content)), + } + } + + // FIXME: add tests for conditional get in `StreamingFile::into_response` + + #[tokio::test] + async fn test_stream_into_response() -> Result<()> { + let stream = StreamingFile(streaming_blob(b"123", None)); + let resp = stream.into_response(None); + assert!(resp.status().is_success()); + assert!(resp.headers().get(CACHE_CONTROL).is_none()); + let cache = resp + .extensions() + .get::() + .expect("missing cache response extension"); + assert!(matches!(cache, CachePolicy::ForeverInCdnAndBrowser)); + assert!(resp.headers().get(LAST_MODIFIED).is_some()); + + Ok(()) + } #[tokio::test(flavor = "multi_thread")] async fn file_roundtrip_axum() -> Result<()> { @@ -111,7 +146,8 @@ mod tests { file.0.date_updated = now; - let resp = file.into_response(); + let resp = file.into_response(None); + assert!(resp.status().is_success()); assert!(resp.headers().get(CACHE_CONTROL).is_none()); let cache = resp .extensions() diff --git a/src/web/headers/mod.rs b/src/web/headers/mod.rs index 7c1eef31b..00790953d 100644 --- a/src/web/headers/mod.rs +++ b/src/web/headers/mod.rs @@ -14,7 +14,6 @@ pub static SURROGATE_CONTROL: HeaderName = HeaderName::from_static("surrogate-co /// compute our etag header value from some content /// /// Has to match the implementation in our build-script. -#[cfg(test)] pub fn compute_etag>(content: T) -> axum_extra::headers::ETag { let digest = md5::compute(&content); format!("\"{:x}\"", digest).parse().unwrap() diff --git a/src/web/metrics.rs b/src/web/metrics.rs index 0bc44576f..bd560036b 100644 --- a/src/web/metrics.rs +++ b/src/web/metrics.rs @@ -143,7 +143,20 @@ pub(crate) async fn request_recorder( let result = next.run(request).await; let resp_time = start.elapsed().as_secs_f64(); - let attrs = [KeyValue::new("route", route_name.to_string())]; + let status_kind = match result.status() { + StatusCode::NOT_MODIFIED => "not_modified", + s if s.is_informational() => "informational", + s if s.is_success() => "success", + s if s.is_redirection() => "redirection", + s if s.is_client_error() => "client_error", + s if s.is_server_error() => "server_error", + _ => "other", + }; + + let attrs = [ + KeyValue::new("route", route_name.to_string()), + KeyValue::new("status_kind", status_kind), + ]; metrics .routes_visited diff --git a/src/web/rustdoc.rs b/src/web/rustdoc.rs index 32d2969ab..b226e0463 100644 --- a/src/web/rustdoc.rs +++ b/src/web/rustdoc.rs @@ -19,6 +19,7 @@ use crate::{ rustdoc::{PageKind, RustdocParams}, }, file::StreamingFile, + headers::IfNoneMatch, match_version, metrics::WebMetrics, page::{ @@ -35,7 +36,8 @@ use axum::{ http::StatusCode, response::{IntoResponse, Response as AxumResponse}, }; -use http::{HeaderValue, Uri, header, uri::Authority}; +use axum_extra::{headers::ContentType, typed_header::TypedHeader}; +use http::{Uri, uri::Authority}; use serde::Deserialize; use std::{ collections::HashMap, @@ -193,6 +195,7 @@ pub(crate) static DOC_RUST_LANG_ORG_REDIRECTS: LazyLock, path: impl AsRef, + if_none_match: Option, ) -> AxumResult { let path = path.as_ref().to_owned(); // FIXME: this could be optimized: when a path doesn't exist @@ -204,8 +207,8 @@ async fn try_serve_legacy_toolchain_asset( // toolchain specific resources into the new folder, // which is reached via the new handler. Ok(StreamingFile::from_path(&storage, &path) - .await - .map(IntoResponse::into_response)?) + .await? + .into_response(if_none_match)) } /// Handler called for `/:crate` and `/:crate/:version` URLs. Automatically redirects to the docs @@ -215,9 +218,11 @@ pub(crate) async fn rustdoc_redirector_handler( params: RustdocParams, Extension(storage): Extension>, mut conn: DbConnection, + if_none_match: Option>, RawQuery(original_query): RawQuery, ) -> AxumResult { let params = params.with_page_kind(PageKind::Rustdoc); + let if_none_match = if_none_match.map(|th| th.0); fn redirect_to_doc( original_uri: Option<&Uri>, @@ -255,7 +260,7 @@ pub(crate) async fn rustdoc_redirector_handler( .binary_search(&extension) .is_ok() { - return try_serve_legacy_toolchain_asset(storage, params.name()) + return try_serve_legacy_toolchain_asset(storage, params.name(), if_none_match) .instrument(info_span!("serve static asset")) .await; } @@ -319,7 +324,7 @@ pub(crate) async fn rustdoc_redirector_handler( ) .await { - Ok(blob) => Ok(StreamingFile(blob).into_response()), + Ok(blob) => Ok(StreamingFile(blob).into_response(if_none_match)), Err(err) => { if !matches!(err.downcast_ref(), Some(AxumNope::ResourceNotFound)) && !matches!(err.downcast_ref(), Some(crate::storage::PathNotFoundError)) @@ -332,7 +337,7 @@ pub(crate) async fn rustdoc_redirector_handler( // docs that were affected by this bug. // https://github.com/rust-lang/docs.rs/issues/1979 if inner_path.starts_with("search-") || inner_path.starts_with("settings-") { - try_serve_legacy_toolchain_asset(storage, inner_path).await + try_serve_legacy_toolchain_asset(storage, inner_path, if_none_match).await } else { Err(err.into()) } @@ -400,10 +405,7 @@ impl RustdocPage { } else { CachePolicy::ForeverInCdnAndStaleInBrowser }), - [( - header::CONTENT_TYPE, - HeaderValue::from_static(mime::TEXT_HTML_UTF_8.as_ref()), - )], + TypedHeader(ContentType::from(mime::TEXT_HTML_UTF_8)), Body::from_stream(utils::rewrite_rustdoc_html_stream( template_data, rustdoc_html.content, @@ -436,9 +438,11 @@ pub(crate) async fn rustdoc_html_server_handler( Extension(config): Extension>, Extension(csp): Extension>, RawQuery(original_query): RawQuery, + if_none_match: Option>, mut conn: DbConnection, ) -> AxumResult { let params = params.with_page_kind(PageKind::Rustdoc); + let if_none_match = if_none_match.map(|th| th.0); trace!(?params, ?original_query, "original params"); // Pages generated by Rustdoc are not ready to be served with a CSP yet. @@ -598,7 +602,7 @@ pub(crate) async fn rustdoc_html_server_handler( // default asset caching behaviour is `Cache::ForeverInCdnAndBrowser`. // This is an edge-case when we serve invocation specific static assets under `/latest/`: // https://github.com/rust-lang/docs.rs/issues/1593 - return Ok(StreamingFile(blob).into_response()); + return Ok(StreamingFile(blob).into_response(if_none_match)); } let latest_release = krate.latest_release()?; @@ -910,10 +914,14 @@ pub(crate) async fn download_handler( pub(crate) async fn static_asset_handler( Path(path): Path, Extension(storage): Extension>, + if_none_match: Option>, ) -> AxumResult { + let if_none_match = if_none_match.map(|th| th.0); let storage_path = format!("{RUSTDOC_STATIC_STORAGE_PREFIX}{path}"); - Ok(StreamingFile::from_path(&storage, &storage_path).await?) + Ok(StreamingFile::from_path(&storage, &storage_path) + .await? + .into_response(if_none_match)) } #[cfg(test)] @@ -931,6 +939,7 @@ mod test { }; use anyhow::{Context, Result}; use chrono::{NaiveDate, Utc}; + use http::header::ETAG; use kuchikiki::traits::TendrilSink; use pretty_assertions::assert_eq; use reqwest::StatusCode; @@ -938,6 +947,10 @@ mod test { use test_case::test_case; use tracing::info; + // FIXME: more tests for static asset serving that now + // * gets etags + // * does conditional get / 304 + async fn try_latest_version_redirect( path: &str, web: &axum::Router, @@ -2988,6 +3001,7 @@ mod test { let web = env.web_app().await; let response = web.get(&format!("/dummy/0.1.0/{name}")).await?; assert!(response.status().is_success()); + assert!(response.headers().get(ETAG).is_some()); assert_eq!(response.text().await?, "content"); Ok(()) @@ -3020,7 +3034,9 @@ mod test { "{:?}", response.headers().get("Location"), ); - assert!(web.get("/asset.js").await?.status().is_success()); + let response = web.get("/asset.js").await?; + assert!(response.status().is_success()); + assert!(response.headers().get(ETAG).is_some()); assert!(web.get(&format!("/{path}")).await?.status().is_success()); let response = web.get(&format!("/dummy/0.1.0/{path}")).await?; diff --git a/src/web/source.rs b/src/web/source.rs index d1b2bb9fa..0bfff71a7 100644 --- a/src/web/source.rs +++ b/src/web/source.rs @@ -13,6 +13,7 @@ use crate::{ }, file::File as DbFile, headers::CanonicalUrl, + headers::IfNoneMatch, match_version, page::templates::{RenderBrands, RenderRegular, RenderSolid, filters}, }, @@ -20,7 +21,7 @@ use crate::{ use anyhow::{Context as _, Result}; use askama::Template; use axum::{Extension, response::IntoResponse}; -use axum_extra::headers::HeaderMapExt; +use axum_extra::{TypedHeader, headers::HeaderMapExt}; use mime::Mime; use std::{cmp::Ordering, sync::Arc}; use tracing::instrument; @@ -189,7 +190,9 @@ pub(crate) async fn source_browser_handler( params: RustdocParams, Extension(storage): Extension>, mut conn: DbConnection, + if_none_match: Option>, ) -> AxumResult { + let if_none_match = if_none_match.map(|th| th.0); let params = params.with_page_kind(PageKind::Source); let matched_release = match_version(&mut conn, params.name(), params.req_version()) .await? @@ -281,7 +284,7 @@ pub(crate) async fn source_browser_handler( let is_text = blob.mime.type_() == mime::TEXT || blob.mime == mime::APPLICATION_JSON; // serve the file with DatabaseFileHandler if file isn't text and not empty if !is_text && !blob.is_empty() { - let mut response = DbFile(blob).into_response(); + let mut response = DbFile(blob).into_response(if_none_match); response.headers_mut().typed_insert(canonical_url); response .extensions_mut() @@ -348,6 +351,8 @@ mod tests { use reqwest::StatusCode; use test_case::test_case; + // FIXME: add tests for the etag & conditional get for raw source assets returned + fn get_file_list_links(body: &str) -> Vec { let dom = kuchikiki::parse_html().one(body);