Skip to content

Reapply "Merge pull request #10069 from Turbo87/publish-stream" #10166

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Dec 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 70 additions & 44 deletions src/controllers/krate/publish.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
use crate::worker::jobs::{
self, CheckTyposquat, SendPublishNotificationsJob, UpdateDefaultVersion,
};
use axum::body::Bytes;
use axum::body::{Body, Bytes};
use axum::Json;
use cargo_manifest::{Dependency, DepsSet, TargetDepsSet};
use chrono::{DateTime, SecondsFormat, Utc};
Expand All @@ -18,10 +18,12 @@
use futures_util::TryFutureExt;
use futures_util::TryStreamExt;
use hex::ToHex;
use http::request::Parts;
use http::StatusCode;
use hyper::body::Buf;
use sha2::{Digest, Sha256};
use std::collections::HashMap;
use tokio::io::{AsyncRead, AsyncReadExt};
use tokio_util::io::StreamReader;
use url::Url;

use crate::models::{
Expand All @@ -36,7 +38,6 @@
use crate::schema::*;
use crate::sql::canon_crate_name;
use crate::util::errors::{bad_request, custom, internal, AppResult, BoxedAppError};
use crate::util::BytesRequest;
use crate::views::{
EncodableCrate, EncodableCrateDependency, GoodCrate, PublishMetadata, PublishWarnings,
};
Expand All @@ -51,12 +52,20 @@
/// Handles the `PUT /crates/new` route.
/// Used by `cargo publish` to publish a new crate or to publish a new version of an
/// existing crate.
pub async fn publish(app: AppState, req: BytesRequest) -> AppResult<Json<GoodCrate>> {
let (req, bytes) = req.0.into_parts();
let (json_bytes, tarball_bytes) = split_body(bytes)?;
pub async fn publish(app: AppState, req: Parts, body: Body) -> AppResult<Json<GoodCrate>> {
let stream = body.into_data_stream();
let stream = stream.map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err));
let mut reader = StreamReader::new(stream);

let metadata: PublishMetadata = serde_json::from_slice(&json_bytes)
.map_err(|e| bad_request(format_args!("invalid upload request: {e}")))?;
// The format of the req.body() of a publish request is as follows:
//
// metadata length
// metadata in JSON about the crate being published
// .crate tarball length
// .crate tarball file

const MAX_JSON_LENGTH: u32 = 1024 * 1024; // 1 MB
let metadata = read_json_metadata(&mut reader, MAX_JSON_LENGTH).await?;

Crate::validate_crate_name("crate", &metadata.name).map_err(bad_request)?;

Expand Down Expand Up @@ -133,19 +142,13 @@
.check_rate_limit(auth.user().id, rate_limit_action, &mut conn)
.await?;

let content_length = tarball_bytes.len() as u64;

let max_upload_size = existing_crate
.as_ref()
.and_then(|c| c.max_upload_size())
.unwrap_or(app.config.max_upload_size);

if content_length > max_upload_size as u64 {
return Err(custom(
StatusCode::PAYLOAD_TOO_LARGE,
format!("max upload size is: {max_upload_size}"),
));
}
let tarball_bytes = read_tarball_bytes(&mut reader, max_upload_size).await?;
let content_length = tarball_bytes.len() as u64;

let pkg_name = format!("{}-{}", &*metadata.name, &version_string);
let max_unpack_size = std::cmp::max(app.config.max_unpack_size, max_upload_size as u64);
Expand Down Expand Up @@ -584,43 +587,66 @@
}

#[instrument(skip_all)]
fn split_body(mut bytes: Bytes) -> AppResult<(Bytes, Bytes)> {
// The format of the req.body() of a publish request is as follows:
//
// metadata length
// metadata in JSON about the crate being published
// .crate tarball length
// .crate tarball file
async fn read_json_metadata<R: AsyncRead + Unpin>(
reader: &mut R,
max_length: u32,
) -> Result<PublishMetadata, BoxedAppError> {
let json_len = reader.read_u32_le().await.map_err(|e| {
if e.kind() == std::io::ErrorKind::UnexpectedEof {
bad_request("invalid metadata length")
} else {
e.into()

Check warning on line 598 in src/controllers/krate/publish.rs

View check run for this annotation

Codecov / codecov/patch

src/controllers/krate/publish.rs#L598

Added line #L598 was not covered by tests
}
})?;

if bytes.len() < 4 {
// Avoid panic in `get_u32_le()` if there is not enough remaining data
return Err(bad_request("invalid metadata length"));
if json_len > max_length {
let message = "JSON metadata blob too large";
return Err(custom(StatusCode::PAYLOAD_TOO_LARGE, message));
}

let json_len = bytes.get_u32_le() as usize;
if json_len > bytes.len() {
return Err(bad_request(format!(
"invalid metadata length for remaining payload: {json_len}"
)));
}
let mut json_bytes = vec![0; json_len as usize];
reader.read_exact(&mut json_bytes).await.map_err(|e| {
if e.kind() == std::io::ErrorKind::UnexpectedEof {
let message = format!("invalid metadata length for remaining payload: {json_len}");
bad_request(message)
} else {
e.into()

Check warning on line 613 in src/controllers/krate/publish.rs

View check run for this annotation

Codecov / codecov/patch

src/controllers/krate/publish.rs#L613

Added line #L613 was not covered by tests
}
})?;

let json_bytes = bytes.split_to(json_len);
serde_json::from_slice(&json_bytes)
.map_err(|e| bad_request(format_args!("invalid upload request: {e}")))
}

if bytes.len() < 4 {
// Avoid panic in `get_u32_le()` if there is not enough remaining data
return Err(bad_request("invalid tarball length"));
}
#[instrument(skip_all)]
async fn read_tarball_bytes<R: AsyncRead + Unpin>(
reader: &mut R,
max_length: u32,
) -> Result<Bytes, BoxedAppError> {
let tarball_len = reader.read_u32_le().await.map_err(|e| {
if e.kind() == std::io::ErrorKind::UnexpectedEof {
bad_request("invalid tarball length")
} else {
e.into()

Check warning on line 630 in src/controllers/krate/publish.rs

View check run for this annotation

Codecov / codecov/patch

src/controllers/krate/publish.rs#L630

Added line #L630 was not covered by tests
}
})?;

let tarball_len = bytes.get_u32_le() as usize;
if tarball_len > bytes.len() {
return Err(bad_request(format!(
"invalid tarball length for remaining payload: {tarball_len}"
)));
if tarball_len > max_length {
let message = format!("max upload size is: {}", max_length);
return Err(custom(StatusCode::PAYLOAD_TOO_LARGE, message));
}

let tarball_bytes = bytes.split_to(tarball_len);
let mut tarball_bytes = vec![0; tarball_len as usize];
reader.read_exact(&mut tarball_bytes).await.map_err(|e| {
if e.kind() == std::io::ErrorKind::UnexpectedEof {
let message = format!("invalid tarball length for remaining payload: {tarball_len}");
bad_request(message)
} else {
e.into()

Check warning on line 645 in src/controllers/krate/publish.rs

View check run for this annotation

Codecov / codecov/patch

src/controllers/krate/publish.rs#L645

Added line #L645 was not covered by tests
}
})?;

Ok((json_bytes, tarball_bytes))
Ok(Bytes::from(tarball_bytes))
}

#[instrument(skip_all)]
Expand Down
26 changes: 20 additions & 6 deletions src/tests/krate/publish/tarball.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::tests::builders::PublishBuilder;
use crate::tests::util::{RequestHelper, TestApp};
use bytes::{BufMut, BytesMut};
use crates_io_tarball::TarballBuilder;
use googletest::prelude::*;
use http::StatusCode;
Expand Down Expand Up @@ -80,9 +81,16 @@ async fn json_bytes_truncated() {
async fn tarball_len_truncated() {
let (app, _, _, token) = TestApp::full().with_token().await;

let response = token
.publish_crate(&[2, 0, 0, 0, b'{', b'}', 0, 0] as &[u8])
.await;
let json = br#"{ "name": "foo", "vers": "1.0.0" }"#;

let mut bytes = BytesMut::new();
bytes.put_u32_le(json.len() as u32);
bytes.put_slice(json);
bytes.put_u8(0);
bytes.put_u8(0);

let response = token.publish_crate(bytes.freeze()).await;

assert_eq!(response.status(), StatusCode::BAD_REQUEST);
assert_snapshot!(response.text(), @r#"{"errors":[{"detail":"invalid tarball length"}]}"#);
assert_that!(app.stored_files().await, empty());
Expand All @@ -92,9 +100,15 @@ async fn tarball_len_truncated() {
async fn tarball_bytes_truncated() {
let (app, _, _, token) = TestApp::full().with_token().await;

let response = token
.publish_crate(&[2, 0, 0, 0, b'{', b'}', 100, 0, 0, 0, 0] as &[u8])
.await;
let json = br#"{ "name": "foo", "vers": "1.0.0" }"#;

let mut bytes = BytesMut::new();
bytes.put_u32_le(json.len() as u32);
bytes.put_slice(json);
bytes.put_u32_le(100);
bytes.put_u8(0);

let response = token.publish_crate(bytes.freeze()).await;
assert_eq!(response.status(), StatusCode::BAD_REQUEST);
assert_snapshot!(response.text(), @r#"{"errors":[{"detail":"invalid tarball length for remaining payload: 100"}]}"#);
assert_that!(app.stored_files().await, empty());
Expand Down