Skip to content

Revert "Merge pull request #10069 from Turbo87/publish-stream" #10102

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Nov 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 44 additions & 70 deletions src/controllers/krate/publish.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::auth::AuthCheck;
use crate::worker::jobs::{
self, CheckTyposquat, SendPublishNotificationsJob, UpdateDefaultVersion,
};
use axum::body::{Body, Bytes};
use axum::body::Bytes;
use axum::Json;
use cargo_manifest::{Dependency, DepsSet, TargetDepsSet};
use chrono::{DateTime, SecondsFormat, Utc};
Expand All @@ -17,12 +17,10 @@ use diesel_async::scoped_futures::ScopedFutureExt;
use diesel_async::{AsyncConnection, AsyncPgConnection, RunQueryDsl};
use futures_util::TryStreamExt;
use hex::ToHex;
use http::request::Parts;
use http::StatusCode;
use hyper::body::Buf;
use sha2::{Digest, Sha256};
use std::collections::HashMap;
use tokio::io::{AsyncRead, AsyncReadExt};
use tokio_util::io::StreamReader;
use url::Url;

use crate::models::{
Expand All @@ -37,6 +35,7 @@ use crate::rate_limiter::LimitedAction;
use crate::schema::*;
use crate::sql::canon_crate_name;
use crate::util::errors::{bad_request, custom, internal, AppResult, BoxedAppError};
use crate::util::BytesRequest;
use crate::views::{
EncodableCrate, EncodableCrateDependency, GoodCrate, PublishMetadata, PublishWarnings,
};
Expand All @@ -51,20 +50,12 @@ const MAX_DESCRIPTION_LENGTH: usize = 1000;
/// Handles the `PUT /crates/new` route.
/// Used by `cargo publish` to publish a new crate or to publish a new version of an
/// existing crate.
pub async fn publish(app: AppState, req: Parts, body: Body) -> AppResult<Json<GoodCrate>> {
let stream = body.into_data_stream();
let stream = stream.map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err));
let mut reader = StreamReader::new(stream);
pub async fn publish(app: AppState, req: BytesRequest) -> AppResult<Json<GoodCrate>> {
let (req, bytes) = req.0.into_parts();
let (json_bytes, tarball_bytes) = split_body(bytes)?;

// The format of the req.body() of a publish request is as follows:
//
// metadata length
// metadata in JSON about the crate being published
// .crate tarball length
// .crate tarball file

const MAX_JSON_LENGTH: u32 = 1024 * 1024; // 1 MB
let metadata = read_json_metadata(&mut reader, MAX_JSON_LENGTH).await?;
let metadata: PublishMetadata = serde_json::from_slice(&json_bytes)
.map_err(|e| bad_request(format_args!("invalid upload request: {e}")))?;

Crate::validate_crate_name("crate", &metadata.name).map_err(bad_request)?;

Expand Down Expand Up @@ -141,13 +132,19 @@ pub async fn publish(app: AppState, req: Parts, body: Body) -> AppResult<Json<Go
.check_rate_limit(auth.user().id, rate_limit_action, &mut conn)
.await?;

let content_length = tarball_bytes.len() as u64;

let max_upload_size = existing_crate
.as_ref()
.and_then(|c| c.max_upload_size())
.unwrap_or(app.config.max_upload_size);

let tarball_bytes = read_tarball_bytes(&mut reader, max_upload_size).await?;
let content_length = tarball_bytes.len() as u64;
if content_length > max_upload_size as u64 {
return Err(custom(
StatusCode::PAYLOAD_TOO_LARGE,
format!("max upload size is: {max_upload_size}"),
));
}

let pkg_name = format!("{}-{}", &*metadata.name, &version_string);
let max_unpack_size = std::cmp::max(app.config.max_unpack_size, max_upload_size as u64);
Expand Down Expand Up @@ -572,66 +569,43 @@ async fn count_versions_published_today(
}

#[instrument(skip_all)]
async fn read_json_metadata<R: AsyncRead + Unpin>(
reader: &mut R,
max_length: u32,
) -> Result<PublishMetadata, BoxedAppError> {
let json_len = reader.read_u32_le().await.map_err(|e| {
if e.kind() == std::io::ErrorKind::UnexpectedEof {
bad_request("invalid metadata length")
} else {
e.into()
}
})?;
fn split_body(mut bytes: Bytes) -> AppResult<(Bytes, Bytes)> {
// The format of the req.body() of a publish request is as follows:
//
// metadata length
// metadata in JSON about the crate being published
// .crate tarball length
// .crate tarball file

if json_len > max_length {
let message = "JSON metadata blob too large";
return Err(custom(StatusCode::PAYLOAD_TOO_LARGE, message));
if bytes.len() < 4 {
// Avoid panic in `get_u32_le()` if there is not enough remaining data
return Err(bad_request("invalid metadata length"));
}

let mut json_bytes = vec![0; json_len as usize];
reader.read_exact(&mut json_bytes).await.map_err(|e| {
if e.kind() == std::io::ErrorKind::UnexpectedEof {
let message = format!("invalid metadata length for remaining payload: {json_len}");
bad_request(message)
} else {
e.into()
}
})?;
let json_len = bytes.get_u32_le() as usize;
if json_len > bytes.len() {
return Err(bad_request(format!(
"invalid metadata length for remaining payload: {json_len}"
)));
}

serde_json::from_slice(&json_bytes)
.map_err(|e| bad_request(format_args!("invalid upload request: {e}")))
}
let json_bytes = bytes.split_to(json_len);

#[instrument(skip_all)]
async fn read_tarball_bytes<R: AsyncRead + Unpin>(
reader: &mut R,
max_length: u32,
) -> Result<Bytes, BoxedAppError> {
let tarball_len = reader.read_u32_le().await.map_err(|e| {
if e.kind() == std::io::ErrorKind::UnexpectedEof {
bad_request("invalid tarball length")
} else {
e.into()
}
})?;
if bytes.len() < 4 {
// Avoid panic in `get_u32_le()` if there is not enough remaining data
return Err(bad_request("invalid tarball length"));
}

if tarball_len > max_length {
let message = format!("max upload size is: {}", max_length);
return Err(custom(StatusCode::PAYLOAD_TOO_LARGE, message));
let tarball_len = bytes.get_u32_le() as usize;
if tarball_len > bytes.len() {
return Err(bad_request(format!(
"invalid tarball length for remaining payload: {tarball_len}"
)));
}

let mut tarball_bytes = vec![0; tarball_len as usize];
reader.read_exact(&mut tarball_bytes).await.map_err(|e| {
if e.kind() == std::io::ErrorKind::UnexpectedEof {
let message = format!("invalid tarball length for remaining payload: {tarball_len}");
bad_request(message)
} else {
e.into()
}
})?;
let tarball_bytes = bytes.split_to(tarball_len);

Ok(Bytes::from(tarball_bytes))
Ok((json_bytes, tarball_bytes))
}

#[instrument(skip_all)]
Expand Down
26 changes: 6 additions & 20 deletions src/tests/krate/publish/tarball.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use crate::tests::builders::PublishBuilder;
use crate::tests::util::{RequestHelper, TestApp};
use bytes::{BufMut, BytesMut};
use crates_io_tarball::TarballBuilder;
use googletest::prelude::*;
use http::StatusCode;
Expand Down Expand Up @@ -81,16 +80,9 @@ async fn json_bytes_truncated() {
async fn tarball_len_truncated() {
let (app, _, _, token) = TestApp::full().with_token().await;

let json = br#"{ "name": "foo", "vers": "1.0.0" }"#;

let mut bytes = BytesMut::new();
bytes.put_u32_le(json.len() as u32);
bytes.put_slice(json);
bytes.put_u8(0);
bytes.put_u8(0);

let response = token.publish_crate(bytes.freeze()).await;

let response = token
.publish_crate(&[2, 0, 0, 0, b'{', b'}', 0, 0] as &[u8])
.await;
assert_eq!(response.status(), StatusCode::BAD_REQUEST);
assert_snapshot!(response.text(), @r#"{"errors":[{"detail":"invalid tarball length"}]}"#);
assert_that!(app.stored_files().await, empty());
Expand All @@ -100,15 +92,9 @@ async fn tarball_len_truncated() {
async fn tarball_bytes_truncated() {
let (app, _, _, token) = TestApp::full().with_token().await;

let json = br#"{ "name": "foo", "vers": "1.0.0" }"#;

let mut bytes = BytesMut::new();
bytes.put_u32_le(json.len() as u32);
bytes.put_slice(json);
bytes.put_u32_le(100);
bytes.put_u8(0);

let response = token.publish_crate(bytes.freeze()).await;
let response = token
.publish_crate(&[2, 0, 0, 0, b'{', b'}', 100, 0, 0, 0, 0] as &[u8])
.await;
assert_eq!(response.status(), StatusCode::BAD_REQUEST);
assert_snapshot!(response.text(), @r#"{"errors":[{"detail":"invalid tarball length for remaining payload: 100"}]}"#);
assert_that!(app.stored_files().await, empty());
Expand Down