From 6e71d8044ad7b49a15d6194b7daa161247249abe Mon Sep 17 00:00:00 2001 From: Tobias Bieniek Date: Fri, 29 Nov 2024 11:05:03 +0100 Subject: [PATCH] Reapply "Merge pull request #10069 from Turbo87/publish-stream" This reverts commit 4f55cbedd87b9178ef77c03eab6c21c7700c48f5. --- src/controllers/krate/publish.rs | 114 ++++++++++++++++++----------- src/tests/krate/publish/tarball.rs | 26 +++++-- 2 files changed, 90 insertions(+), 50 deletions(-) diff --git a/src/controllers/krate/publish.rs b/src/controllers/krate/publish.rs index a419e7aec75..fe938360d23 100644 --- a/src/controllers/krate/publish.rs +++ b/src/controllers/krate/publish.rs @@ -5,7 +5,7 @@ use crate::auth::AuthCheck; use crate::worker::jobs::{ self, CheckTyposquat, SendPublishNotificationsJob, UpdateDefaultVersion, }; -use axum::body::Bytes; +use axum::body::{Body, Bytes}; use axum::Json; use cargo_manifest::{Dependency, DepsSet, TargetDepsSet}; use chrono::{DateTime, SecondsFormat, Utc}; @@ -18,10 +18,12 @@ use diesel_async::{AsyncConnection, AsyncPgConnection, RunQueryDsl}; use futures_util::TryFutureExt; use futures_util::TryStreamExt; use hex::ToHex; +use http::request::Parts; use http::StatusCode; -use hyper::body::Buf; use sha2::{Digest, Sha256}; use std::collections::HashMap; +use tokio::io::{AsyncRead, AsyncReadExt}; +use tokio_util::io::StreamReader; use url::Url; use crate::models::{ @@ -36,7 +38,6 @@ use crate::rate_limiter::LimitedAction; use crate::schema::*; use crate::sql::canon_crate_name; use crate::util::errors::{bad_request, custom, internal, AppResult, BoxedAppError}; -use crate::util::BytesRequest; use crate::views::{ EncodableCrate, EncodableCrateDependency, GoodCrate, PublishMetadata, PublishWarnings, }; @@ -51,12 +52,20 @@ const MAX_DESCRIPTION_LENGTH: usize = 1000; /// Handles the `PUT /crates/new` route. /// Used by `cargo publish` to publish a new crate or to publish a new version of an /// existing crate. -pub async fn publish(app: AppState, req: BytesRequest) -> AppResult> { - let (req, bytes) = req.0.into_parts(); - let (json_bytes, tarball_bytes) = split_body(bytes)?; +pub async fn publish(app: AppState, req: Parts, body: Body) -> AppResult> { + let stream = body.into_data_stream(); + let stream = stream.map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err)); + let mut reader = StreamReader::new(stream); - let metadata: PublishMetadata = serde_json::from_slice(&json_bytes) - .map_err(|e| bad_request(format_args!("invalid upload request: {e}")))?; + // The format of the req.body() of a publish request is as follows: + // + // metadata length + // metadata in JSON about the crate being published + // .crate tarball length + // .crate tarball file + + const MAX_JSON_LENGTH: u32 = 1024 * 1024; // 1 MB + let metadata = read_json_metadata(&mut reader, MAX_JSON_LENGTH).await?; Crate::validate_crate_name("crate", &metadata.name).map_err(bad_request)?; @@ -133,19 +142,13 @@ pub async fn publish(app: AppState, req: BytesRequest) -> AppResult max_upload_size as u64 { - return Err(custom( - StatusCode::PAYLOAD_TOO_LARGE, - format!("max upload size is: {max_upload_size}"), - )); - } + let tarball_bytes = read_tarball_bytes(&mut reader, max_upload_size).await?; + let content_length = tarball_bytes.len() as u64; let pkg_name = format!("{}-{}", &*metadata.name, &version_string); let max_unpack_size = std::cmp::max(app.config.max_unpack_size, max_upload_size as u64); @@ -584,43 +587,66 @@ async fn count_versions_published_today( } #[instrument(skip_all)] -fn split_body(mut bytes: Bytes) -> AppResult<(Bytes, Bytes)> { - // The format of the req.body() of a publish request is as follows: - // - // metadata length - // metadata in JSON about the crate being published - // .crate tarball length - // .crate tarball file +async fn read_json_metadata( + reader: &mut R, + max_length: u32, +) -> Result { + let json_len = reader.read_u32_le().await.map_err(|e| { + if e.kind() == std::io::ErrorKind::UnexpectedEof { + bad_request("invalid metadata length") + } else { + e.into() + } + })?; - if bytes.len() < 4 { - // Avoid panic in `get_u32_le()` if there is not enough remaining data - return Err(bad_request("invalid metadata length")); + if json_len > max_length { + let message = "JSON metadata blob too large"; + return Err(custom(StatusCode::PAYLOAD_TOO_LARGE, message)); } - let json_len = bytes.get_u32_le() as usize; - if json_len > bytes.len() { - return Err(bad_request(format!( - "invalid metadata length for remaining payload: {json_len}" - ))); - } + let mut json_bytes = vec![0; json_len as usize]; + reader.read_exact(&mut json_bytes).await.map_err(|e| { + if e.kind() == std::io::ErrorKind::UnexpectedEof { + let message = format!("invalid metadata length for remaining payload: {json_len}"); + bad_request(message) + } else { + e.into() + } + })?; - let json_bytes = bytes.split_to(json_len); + serde_json::from_slice(&json_bytes) + .map_err(|e| bad_request(format_args!("invalid upload request: {e}"))) +} - if bytes.len() < 4 { - // Avoid panic in `get_u32_le()` if there is not enough remaining data - return Err(bad_request("invalid tarball length")); - } +#[instrument(skip_all)] +async fn read_tarball_bytes( + reader: &mut R, + max_length: u32, +) -> Result { + let tarball_len = reader.read_u32_le().await.map_err(|e| { + if e.kind() == std::io::ErrorKind::UnexpectedEof { + bad_request("invalid tarball length") + } else { + e.into() + } + })?; - let tarball_len = bytes.get_u32_le() as usize; - if tarball_len > bytes.len() { - return Err(bad_request(format!( - "invalid tarball length for remaining payload: {tarball_len}" - ))); + if tarball_len > max_length { + let message = format!("max upload size is: {}", max_length); + return Err(custom(StatusCode::PAYLOAD_TOO_LARGE, message)); } - let tarball_bytes = bytes.split_to(tarball_len); + let mut tarball_bytes = vec![0; tarball_len as usize]; + reader.read_exact(&mut tarball_bytes).await.map_err(|e| { + if e.kind() == std::io::ErrorKind::UnexpectedEof { + let message = format!("invalid tarball length for remaining payload: {tarball_len}"); + bad_request(message) + } else { + e.into() + } + })?; - Ok((json_bytes, tarball_bytes)) + Ok(Bytes::from(tarball_bytes)) } #[instrument(skip_all)] diff --git a/src/tests/krate/publish/tarball.rs b/src/tests/krate/publish/tarball.rs index 95cf9a57f41..43ce7010b81 100644 --- a/src/tests/krate/publish/tarball.rs +++ b/src/tests/krate/publish/tarball.rs @@ -1,5 +1,6 @@ use crate::tests::builders::PublishBuilder; use crate::tests::util::{RequestHelper, TestApp}; +use bytes::{BufMut, BytesMut}; use crates_io_tarball::TarballBuilder; use googletest::prelude::*; use http::StatusCode; @@ -80,9 +81,16 @@ async fn json_bytes_truncated() { async fn tarball_len_truncated() { let (app, _, _, token) = TestApp::full().with_token().await; - let response = token - .publish_crate(&[2, 0, 0, 0, b'{', b'}', 0, 0] as &[u8]) - .await; + let json = br#"{ "name": "foo", "vers": "1.0.0" }"#; + + let mut bytes = BytesMut::new(); + bytes.put_u32_le(json.len() as u32); + bytes.put_slice(json); + bytes.put_u8(0); + bytes.put_u8(0); + + let response = token.publish_crate(bytes.freeze()).await; + assert_eq!(response.status(), StatusCode::BAD_REQUEST); assert_snapshot!(response.text(), @r#"{"errors":[{"detail":"invalid tarball length"}]}"#); assert_that!(app.stored_files().await, empty()); @@ -92,9 +100,15 @@ async fn tarball_len_truncated() { async fn tarball_bytes_truncated() { let (app, _, _, token) = TestApp::full().with_token().await; - let response = token - .publish_crate(&[2, 0, 0, 0, b'{', b'}', 100, 0, 0, 0, 0] as &[u8]) - .await; + let json = br#"{ "name": "foo", "vers": "1.0.0" }"#; + + let mut bytes = BytesMut::new(); + bytes.put_u32_le(json.len() as u32); + bytes.put_slice(json); + bytes.put_u32_le(100); + bytes.put_u8(0); + + let response = token.publish_crate(bytes.freeze()).await; assert_eq!(response.status(), StatusCode::BAD_REQUEST); assert_snapshot!(response.text(), @r#"{"errors":[{"detail":"invalid tarball length for remaining payload: 100"}]}"#); assert_that!(app.stored_files().await, empty());