Skip to content

Commit 6f81f3d

Browse files
authored
Reapply "Merge pull request #10069 from Turbo87/publish-stream" (#10166)
This reverts commit 4f55cbe.
1 parent c93ef34 commit 6f81f3d

File tree

2 files changed

+90
-50
lines changed

2 files changed

+90
-50
lines changed

src/controllers/krate/publish.rs

Lines changed: 70 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use crate::auth::AuthCheck;
55
use crate::worker::jobs::{
66
self, CheckTyposquat, SendPublishNotificationsJob, UpdateDefaultVersion,
77
};
8-
use axum::body::Bytes;
8+
use axum::body::{Body, Bytes};
99
use axum::Json;
1010
use cargo_manifest::{Dependency, DepsSet, TargetDepsSet};
1111
use chrono::{DateTime, SecondsFormat, Utc};
@@ -18,10 +18,12 @@ use diesel_async::{AsyncConnection, AsyncPgConnection, RunQueryDsl};
1818
use futures_util::TryFutureExt;
1919
use futures_util::TryStreamExt;
2020
use hex::ToHex;
21+
use http::request::Parts;
2122
use http::StatusCode;
22-
use hyper::body::Buf;
2323
use sha2::{Digest, Sha256};
2424
use std::collections::HashMap;
25+
use tokio::io::{AsyncRead, AsyncReadExt};
26+
use tokio_util::io::StreamReader;
2527
use url::Url;
2628

2729
use crate::models::{
@@ -36,7 +38,6 @@ use crate::rate_limiter::LimitedAction;
3638
use crate::schema::*;
3739
use crate::sql::canon_crate_name;
3840
use crate::util::errors::{bad_request, custom, internal, AppResult, BoxedAppError};
39-
use crate::util::BytesRequest;
4041
use crate::views::{
4142
EncodableCrate, EncodableCrateDependency, GoodCrate, PublishMetadata, PublishWarnings,
4243
};
@@ -51,12 +52,20 @@ const MAX_DESCRIPTION_LENGTH: usize = 1000;
5152
/// Handles the `PUT /crates/new` route.
5253
/// Used by `cargo publish` to publish a new crate or to publish a new version of an
5354
/// existing crate.
54-
pub async fn publish(app: AppState, req: BytesRequest) -> AppResult<Json<GoodCrate>> {
55-
let (req, bytes) = req.0.into_parts();
56-
let (json_bytes, tarball_bytes) = split_body(bytes)?;
55+
pub async fn publish(app: AppState, req: Parts, body: Body) -> AppResult<Json<GoodCrate>> {
56+
let stream = body.into_data_stream();
57+
let stream = stream.map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err));
58+
let mut reader = StreamReader::new(stream);
5759

58-
let metadata: PublishMetadata = serde_json::from_slice(&json_bytes)
59-
.map_err(|e| bad_request(format_args!("invalid upload request: {e}")))?;
60+
// The format of the req.body() of a publish request is as follows:
61+
//
62+
// metadata length
63+
// metadata in JSON about the crate being published
64+
// .crate tarball length
65+
// .crate tarball file
66+
67+
const MAX_JSON_LENGTH: u32 = 1024 * 1024; // 1 MB
68+
let metadata = read_json_metadata(&mut reader, MAX_JSON_LENGTH).await?;
6069

6170
Crate::validate_crate_name("crate", &metadata.name).map_err(bad_request)?;
6271

@@ -133,19 +142,13 @@ pub async fn publish(app: AppState, req: BytesRequest) -> AppResult<Json<GoodCra
133142
.check_rate_limit(auth.user().id, rate_limit_action, &mut conn)
134143
.await?;
135144

136-
let content_length = tarball_bytes.len() as u64;
137-
138145
let max_upload_size = existing_crate
139146
.as_ref()
140147
.and_then(|c| c.max_upload_size())
141148
.unwrap_or(app.config.max_upload_size);
142149

143-
if content_length > max_upload_size as u64 {
144-
return Err(custom(
145-
StatusCode::PAYLOAD_TOO_LARGE,
146-
format!("max upload size is: {max_upload_size}"),
147-
));
148-
}
150+
let tarball_bytes = read_tarball_bytes(&mut reader, max_upload_size).await?;
151+
let content_length = tarball_bytes.len() as u64;
149152

150153
let pkg_name = format!("{}-{}", &*metadata.name, &version_string);
151154
let max_unpack_size = std::cmp::max(app.config.max_unpack_size, max_upload_size as u64);
@@ -584,43 +587,66 @@ async fn count_versions_published_today(
584587
}
585588

586589
#[instrument(skip_all)]
587-
fn split_body(mut bytes: Bytes) -> AppResult<(Bytes, Bytes)> {
588-
// The format of the req.body() of a publish request is as follows:
589-
//
590-
// metadata length
591-
// metadata in JSON about the crate being published
592-
// .crate tarball length
593-
// .crate tarball file
590+
async fn read_json_metadata<R: AsyncRead + Unpin>(
591+
reader: &mut R,
592+
max_length: u32,
593+
) -> Result<PublishMetadata, BoxedAppError> {
594+
let json_len = reader.read_u32_le().await.map_err(|e| {
595+
if e.kind() == std::io::ErrorKind::UnexpectedEof {
596+
bad_request("invalid metadata length")
597+
} else {
598+
e.into()
599+
}
600+
})?;
594601

595-
if bytes.len() < 4 {
596-
// Avoid panic in `get_u32_le()` if there is not enough remaining data
597-
return Err(bad_request("invalid metadata length"));
602+
if json_len > max_length {
603+
let message = "JSON metadata blob too large";
604+
return Err(custom(StatusCode::PAYLOAD_TOO_LARGE, message));
598605
}
599606

600-
let json_len = bytes.get_u32_le() as usize;
601-
if json_len > bytes.len() {
602-
return Err(bad_request(format!(
603-
"invalid metadata length for remaining payload: {json_len}"
604-
)));
605-
}
607+
let mut json_bytes = vec![0; json_len as usize];
608+
reader.read_exact(&mut json_bytes).await.map_err(|e| {
609+
if e.kind() == std::io::ErrorKind::UnexpectedEof {
610+
let message = format!("invalid metadata length for remaining payload: {json_len}");
611+
bad_request(message)
612+
} else {
613+
e.into()
614+
}
615+
})?;
606616

607-
let json_bytes = bytes.split_to(json_len);
617+
serde_json::from_slice(&json_bytes)
618+
.map_err(|e| bad_request(format_args!("invalid upload request: {e}")))
619+
}
608620

609-
if bytes.len() < 4 {
610-
// Avoid panic in `get_u32_le()` if there is not enough remaining data
611-
return Err(bad_request("invalid tarball length"));
612-
}
621+
#[instrument(skip_all)]
622+
async fn read_tarball_bytes<R: AsyncRead + Unpin>(
623+
reader: &mut R,
624+
max_length: u32,
625+
) -> Result<Bytes, BoxedAppError> {
626+
let tarball_len = reader.read_u32_le().await.map_err(|e| {
627+
if e.kind() == std::io::ErrorKind::UnexpectedEof {
628+
bad_request("invalid tarball length")
629+
} else {
630+
e.into()
631+
}
632+
})?;
613633

614-
let tarball_len = bytes.get_u32_le() as usize;
615-
if tarball_len > bytes.len() {
616-
return Err(bad_request(format!(
617-
"invalid tarball length for remaining payload: {tarball_len}"
618-
)));
634+
if tarball_len > max_length {
635+
let message = format!("max upload size is: {}", max_length);
636+
return Err(custom(StatusCode::PAYLOAD_TOO_LARGE, message));
619637
}
620638

621-
let tarball_bytes = bytes.split_to(tarball_len);
639+
let mut tarball_bytes = vec![0; tarball_len as usize];
640+
reader.read_exact(&mut tarball_bytes).await.map_err(|e| {
641+
if e.kind() == std::io::ErrorKind::UnexpectedEof {
642+
let message = format!("invalid tarball length for remaining payload: {tarball_len}");
643+
bad_request(message)
644+
} else {
645+
e.into()
646+
}
647+
})?;
622648

623-
Ok((json_bytes, tarball_bytes))
649+
Ok(Bytes::from(tarball_bytes))
624650
}
625651

626652
#[instrument(skip_all)]

src/tests/krate/publish/tarball.rs

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use crate::tests::builders::PublishBuilder;
22
use crate::tests::util::{RequestHelper, TestApp};
3+
use bytes::{BufMut, BytesMut};
34
use crates_io_tarball::TarballBuilder;
45
use googletest::prelude::*;
56
use http::StatusCode;
@@ -80,9 +81,16 @@ async fn json_bytes_truncated() {
8081
async fn tarball_len_truncated() {
8182
let (app, _, _, token) = TestApp::full().with_token().await;
8283

83-
let response = token
84-
.publish_crate(&[2, 0, 0, 0, b'{', b'}', 0, 0] as &[u8])
85-
.await;
84+
let json = br#"{ "name": "foo", "vers": "1.0.0" }"#;
85+
86+
let mut bytes = BytesMut::new();
87+
bytes.put_u32_le(json.len() as u32);
88+
bytes.put_slice(json);
89+
bytes.put_u8(0);
90+
bytes.put_u8(0);
91+
92+
let response = token.publish_crate(bytes.freeze()).await;
93+
8694
assert_eq!(response.status(), StatusCode::BAD_REQUEST);
8795
assert_snapshot!(response.text(), @r#"{"errors":[{"detail":"invalid tarball length"}]}"#);
8896
assert_that!(app.stored_files().await, empty());
@@ -92,9 +100,15 @@ async fn tarball_len_truncated() {
92100
async fn tarball_bytes_truncated() {
93101
let (app, _, _, token) = TestApp::full().with_token().await;
94102

95-
let response = token
96-
.publish_crate(&[2, 0, 0, 0, b'{', b'}', 100, 0, 0, 0, 0] as &[u8])
97-
.await;
103+
let json = br#"{ "name": "foo", "vers": "1.0.0" }"#;
104+
105+
let mut bytes = BytesMut::new();
106+
bytes.put_u32_le(json.len() as u32);
107+
bytes.put_slice(json);
108+
bytes.put_u32_le(100);
109+
bytes.put_u8(0);
110+
111+
let response = token.publish_crate(bytes.freeze()).await;
98112
assert_eq!(response.status(), StatusCode::BAD_REQUEST);
99113
assert_snapshot!(response.text(), @r#"{"errors":[{"detail":"invalid tarball length for remaining payload: 100"}]}"#);
100114
assert_that!(app.stored_files().await, empty());

0 commit comments

Comments
 (0)