diff --git a/Cargo.lock b/Cargo.lock index 8a4dcd993f4..746641a3aa2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1114,6 +1114,7 @@ dependencies = [ "tikv-jemallocator", "tokio", "tokio-postgres", + "tokio-util", "toml", "tower", "tower-http", diff --git a/Cargo.toml b/Cargo.toml index fbc6baca651..79de19769c2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -114,6 +114,7 @@ tempfile = "=3.14.0" thiserror = "=2.0.3" tokio = { version = "=1.41.1", features = ["net", "signal", "io-std", "io-util", "rt-multi-thread", "macros", "process"]} tokio-postgres = "=0.7.12" +tokio-util = "=0.7.12" toml = "=0.8.19" tower = "=0.5.1" tower-http = { version = "=0.6.2", features = ["add-extension", "fs", "catch-panic", "timeout", "compression-full"] } diff --git a/src/controllers/krate/publish.rs b/src/controllers/krate/publish.rs index 819771cbc94..57e0320f765 100644 --- a/src/controllers/krate/publish.rs +++ b/src/controllers/krate/publish.rs @@ -5,7 +5,7 @@ use crate::auth::AuthCheck; use crate::worker::jobs::{ self, CheckTyposquat, SendPublishNotificationsJob, UpdateDefaultVersion, }; -use axum::body::Bytes; +use axum::body::{Body, Bytes}; use axum::Json; use cargo_manifest::{Dependency, DepsSet, TargetDepsSet}; use chrono::{DateTime, SecondsFormat, Utc}; @@ -17,10 +17,12 @@ use diesel_async::scoped_futures::ScopedFutureExt; use diesel_async::{AsyncConnection, AsyncPgConnection, RunQueryDsl}; use futures_util::TryStreamExt; use hex::ToHex; +use http::request::Parts; use http::StatusCode; -use hyper::body::Buf; use sha2::{Digest, Sha256}; use std::collections::HashMap; +use tokio::io::{AsyncRead, AsyncReadExt}; +use tokio_util::io::StreamReader; use url::Url; use crate::models::{ @@ -35,7 +37,7 @@ use crate::rate_limiter::LimitedAction; use crate::schema::*; use crate::sql::canon_crate_name; use crate::util::errors::{bad_request, custom, internal, AppResult, BoxedAppError}; -use crate::util::{BytesRequest, Maximums}; +use crate::util::Maximums; use crate::views::{ EncodableCrate, EncodableCrateDependency, GoodCrate, PublishMetadata, PublishWarnings, }; @@ -54,12 +56,20 @@ const MAX_DESCRIPTION_LENGTH: usize = 1000; /// Currently blocks the HTTP thread, perhaps some function calls can spawn new /// threads and return completion or error through other methods a `cargo publish /// --status` command, via crates.io's front end, or email. -pub async fn publish(app: AppState, req: BytesRequest) -> AppResult> { - let (req, bytes) = req.0.into_parts(); - let (json_bytes, tarball_bytes) = split_body(bytes)?; +pub async fn publish(app: AppState, req: Parts, body: Body) -> AppResult> { + let stream = body.into_data_stream(); + let stream = stream.map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err)); + let mut reader = StreamReader::new(stream); - let metadata: PublishMetadata = serde_json::from_slice(&json_bytes) - .map_err(|e| bad_request(format_args!("invalid upload request: {e}")))?; + // The format of the req.body() of a publish request is as follows: + // + // metadata length + // metadata in JSON about the crate being published + // .crate tarball length + // .crate tarball file + + const MAX_JSON_LENGTH: u32 = 1024 * 1024; // 1 MB + let metadata = read_json_metadata(&mut reader, MAX_JSON_LENGTH).await?; Crate::validate_crate_name("crate", &metadata.name).map_err(bad_request)?; @@ -136,20 +146,14 @@ pub async fn publish(app: AppState, req: BytesRequest) -> AppResult maximums.max_upload_size { - return Err(custom( - StatusCode::PAYLOAD_TOO_LARGE, - format!("max upload size is: {}", maximums.max_upload_size), - )); - } + let tarball_bytes = read_tarball_bytes(&mut reader, maximums.max_upload_size as u32).await?; + let content_length = tarball_bytes.len() as u64; let pkg_name = format!("{}-{}", &*metadata.name, &version_string); let tarball_info = @@ -573,44 +577,65 @@ async fn count_versions_published_today( .await } -#[instrument(skip_all)] -fn split_body(mut bytes: Bytes) -> AppResult<(Bytes, Bytes)> { - // The format of the req.body() of a publish request is as follows: - // - // metadata length - // metadata in JSON about the crate being published - // .crate tarball length - // .crate tarball file +async fn read_json_metadata( + reader: &mut R, + max_length: u32, +) -> Result { + let json_len = reader.read_u32_le().await.map_err(|e| { + if e.kind() == std::io::ErrorKind::UnexpectedEof { + bad_request("invalid metadata length") + } else { + e.into() + } + })?; - if bytes.len() < 4 { - // Avoid panic in `get_u32_le()` if there is not enough remaining data - return Err(bad_request("invalid metadata length")); + if json_len > max_length { + let message = "JSON metadata blob too large"; + return Err(custom(StatusCode::PAYLOAD_TOO_LARGE, message)); } - let json_len = bytes.get_u32_le() as usize; - if json_len > bytes.len() { - return Err(bad_request(format!( - "invalid metadata length for remaining payload: {json_len}" - ))); - } + let mut json_bytes = vec![0; json_len as usize]; + reader.read_exact(&mut json_bytes).await.map_err(|e| { + if e.kind() == std::io::ErrorKind::UnexpectedEof { + let message = format!("invalid metadata length for remaining payload: {json_len}"); + bad_request(message) + } else { + e.into() + } + })?; - let json_bytes = bytes.split_to(json_len); + serde_json::from_slice(&json_bytes) + .map_err(|e| bad_request(format_args!("invalid upload request: {e}"))) +} - if bytes.len() < 4 { - // Avoid panic in `get_u32_le()` if there is not enough remaining data - return Err(bad_request("invalid tarball length")); - } +async fn read_tarball_bytes( + reader: &mut R, + max_length: u32, +) -> Result { + let tarball_len = reader.read_u32_le().await.map_err(|e| { + if e.kind() == std::io::ErrorKind::UnexpectedEof { + bad_request("invalid tarball length") + } else { + e.into() + } + })?; - let tarball_len = bytes.get_u32_le() as usize; - if tarball_len > bytes.len() { - return Err(bad_request(format!( - "invalid tarball length for remaining payload: {tarball_len}" - ))); + if tarball_len > max_length { + let message = format!("max upload size is: {}", max_length); + return Err(custom(StatusCode::PAYLOAD_TOO_LARGE, message)); } - let tarball_bytes = bytes.split_to(tarball_len); + let mut tarball_bytes = vec![0; tarball_len as usize]; + reader.read_exact(&mut tarball_bytes).await.map_err(|e| { + if e.kind() == std::io::ErrorKind::UnexpectedEof { + let message = format!("invalid tarball length for remaining payload: {tarball_len}"); + bad_request(message) + } else { + e.into() + } + })?; - Ok((json_bytes, tarball_bytes)) + Ok(Bytes::from(tarball_bytes)) } async fn is_reserved_name(name: &str, conn: &mut AsyncPgConnection) -> QueryResult { diff --git a/src/tests/krate/publish/tarball.rs b/src/tests/krate/publish/tarball.rs index 95cf9a57f41..43ce7010b81 100644 --- a/src/tests/krate/publish/tarball.rs +++ b/src/tests/krate/publish/tarball.rs @@ -1,5 +1,6 @@ use crate::tests::builders::PublishBuilder; use crate::tests::util::{RequestHelper, TestApp}; +use bytes::{BufMut, BytesMut}; use crates_io_tarball::TarballBuilder; use googletest::prelude::*; use http::StatusCode; @@ -80,9 +81,16 @@ async fn json_bytes_truncated() { async fn tarball_len_truncated() { let (app, _, _, token) = TestApp::full().with_token().await; - let response = token - .publish_crate(&[2, 0, 0, 0, b'{', b'}', 0, 0] as &[u8]) - .await; + let json = br#"{ "name": "foo", "vers": "1.0.0" }"#; + + let mut bytes = BytesMut::new(); + bytes.put_u32_le(json.len() as u32); + bytes.put_slice(json); + bytes.put_u8(0); + bytes.put_u8(0); + + let response = token.publish_crate(bytes.freeze()).await; + assert_eq!(response.status(), StatusCode::BAD_REQUEST); assert_snapshot!(response.text(), @r#"{"errors":[{"detail":"invalid tarball length"}]}"#); assert_that!(app.stored_files().await, empty()); @@ -92,9 +100,15 @@ async fn tarball_len_truncated() { async fn tarball_bytes_truncated() { let (app, _, _, token) = TestApp::full().with_token().await; - let response = token - .publish_crate(&[2, 0, 0, 0, b'{', b'}', 100, 0, 0, 0, 0] as &[u8]) - .await; + let json = br#"{ "name": "foo", "vers": "1.0.0" }"#; + + let mut bytes = BytesMut::new(); + bytes.put_u32_le(json.len() as u32); + bytes.put_slice(json); + bytes.put_u32_le(100); + bytes.put_u8(0); + + let response = token.publish_crate(bytes.freeze()).await; assert_eq!(response.status(), StatusCode::BAD_REQUEST); assert_snapshot!(response.text(), @r#"{"errors":[{"detail":"invalid tarball length for remaining payload: 100"}]}"#); assert_that!(app.stored_files().await, empty());