@@ -5,7 +5,7 @@ use crate::auth::AuthCheck;
5
5
use crate :: worker:: jobs:: {
6
6
self , CheckTyposquat , SendPublishNotificationsJob , UpdateDefaultVersion ,
7
7
} ;
8
- use axum:: body:: Bytes ;
8
+ use axum:: body:: { Body , Bytes } ;
9
9
use axum:: Json ;
10
10
use cargo_manifest:: { Dependency , DepsSet , TargetDepsSet } ;
11
11
use chrono:: { DateTime , SecondsFormat , Utc } ;
@@ -17,10 +17,12 @@ use diesel_async::scoped_futures::ScopedFutureExt;
17
17
use diesel_async:: { AsyncConnection , AsyncPgConnection , RunQueryDsl } ;
18
18
use futures_util:: TryStreamExt ;
19
19
use hex:: ToHex ;
20
+ use http:: request:: Parts ;
20
21
use http:: StatusCode ;
21
- use hyper:: body:: Buf ;
22
22
use sha2:: { Digest , Sha256 } ;
23
23
use std:: collections:: HashMap ;
24
+ use tokio:: io:: { AsyncRead , AsyncReadExt } ;
25
+ use tokio_util:: io:: StreamReader ;
24
26
use url:: Url ;
25
27
26
28
use crate :: models:: {
@@ -35,7 +37,7 @@ use crate::rate_limiter::LimitedAction;
35
37
use crate :: schema:: * ;
36
38
use crate :: sql:: canon_crate_name;
37
39
use crate :: util:: errors:: { bad_request, custom, internal, AppResult , BoxedAppError } ;
38
- use crate :: util:: { BytesRequest , Maximums } ;
40
+ use crate :: util:: Maximums ;
39
41
use crate :: views:: {
40
42
EncodableCrate , EncodableCrateDependency , GoodCrate , PublishMetadata , PublishWarnings ,
41
43
} ;
@@ -54,12 +56,20 @@ const MAX_DESCRIPTION_LENGTH: usize = 1000;
54
56
/// Currently blocks the HTTP thread, perhaps some function calls can spawn new
55
57
/// threads and return completion or error through other methods a `cargo publish
56
58
/// --status` command, via crates.io's front end, or email.
57
- pub async fn publish ( app : AppState , req : BytesRequest ) -> AppResult < Json < GoodCrate > > {
58
- let ( req, bytes) = req. 0 . into_parts ( ) ;
59
- let ( json_bytes, tarball_bytes) = split_body ( bytes) ?;
59
+ pub async fn publish ( app : AppState , req : Parts , body : Body ) -> AppResult < Json < GoodCrate > > {
60
+ let stream = body. into_data_stream ( ) ;
61
+ let stream = stream. map_err ( |err| std:: io:: Error :: new ( std:: io:: ErrorKind :: Other , err) ) ;
62
+ let mut reader = StreamReader :: new ( stream) ;
60
63
61
- let metadata: PublishMetadata = serde_json:: from_slice ( & json_bytes)
62
- . map_err ( |e| bad_request ( format_args ! ( "invalid upload request: {e}" ) ) ) ?;
64
+ // The format of the req.body() of a publish request is as follows:
65
+ //
66
+ // metadata length
67
+ // metadata in JSON about the crate being published
68
+ // .crate tarball length
69
+ // .crate tarball file
70
+
71
+ const MAX_JSON_LENGTH : u32 = 1024 * 1024 ; // 1 MB
72
+ let metadata = read_json_metadata ( & mut reader, MAX_JSON_LENGTH ) . await ?;
63
73
64
74
Crate :: validate_crate_name ( "crate" , & metadata. name ) . map_err ( bad_request) ?;
65
75
@@ -136,20 +146,14 @@ pub async fn publish(app: AppState, req: BytesRequest) -> AppResult<Json<GoodCra
136
146
. check_rate_limit ( auth. user ( ) . id , rate_limit_action, & mut conn)
137
147
. await ?;
138
148
139
- let content_length = tarball_bytes. len ( ) as u64 ;
140
-
141
149
let maximums = Maximums :: new (
142
150
existing_crate. as_ref ( ) . and_then ( |c| c. max_upload_size ) ,
143
151
app. config . max_upload_size ,
144
152
app. config . max_unpack_size ,
145
153
) ;
146
154
147
- if content_length > maximums. max_upload_size {
148
- return Err ( custom (
149
- StatusCode :: PAYLOAD_TOO_LARGE ,
150
- format ! ( "max upload size is: {}" , maximums. max_upload_size) ,
151
- ) ) ;
152
- }
155
+ let tarball_bytes = read_tarball_bytes ( & mut reader, maximums. max_upload_size as u32 ) . await ?;
156
+ let content_length = tarball_bytes. len ( ) as u64 ;
153
157
154
158
let pkg_name = format ! ( "{}-{}" , & * metadata. name, & version_string) ;
155
159
let tarball_info =
@@ -573,44 +577,65 @@ async fn count_versions_published_today(
573
577
. await
574
578
}
575
579
576
- #[ instrument( skip_all) ]
577
- fn split_body ( mut bytes : Bytes ) -> AppResult < ( Bytes , Bytes ) > {
578
- // The format of the req.body() of a publish request is as follows:
579
- //
580
- // metadata length
581
- // metadata in JSON about the crate being published
582
- // .crate tarball length
583
- // .crate tarball file
580
+ async fn read_json_metadata < R : AsyncRead + Unpin > (
581
+ reader : & mut R ,
582
+ max_length : u32 ,
583
+ ) -> Result < PublishMetadata , BoxedAppError > {
584
+ let json_len = reader. read_u32_le ( ) . await . map_err ( |e| {
585
+ if e. kind ( ) == std:: io:: ErrorKind :: UnexpectedEof {
586
+ bad_request ( "invalid metadata length" )
587
+ } else {
588
+ e. into ( )
589
+ }
590
+ } ) ?;
584
591
585
- if bytes . len ( ) < 4 {
586
- // Avoid panic in `get_u32_le()` if there is not enough remaining data
587
- return Err ( bad_request ( "invalid metadata length" ) ) ;
592
+ if json_len > max_length {
593
+ let message = "JSON metadata blob too large" ;
594
+ return Err ( custom ( StatusCode :: PAYLOAD_TOO_LARGE , message ) ) ;
588
595
}
589
596
590
- let json_len = bytes. get_u32_le ( ) as usize ;
591
- if json_len > bytes. len ( ) {
592
- return Err ( bad_request ( format ! (
593
- "invalid metadata length for remaining payload: {json_len}"
594
- ) ) ) ;
595
- }
597
+ let mut json_bytes = vec ! [ 0 ; json_len as usize ] ;
598
+ reader. read_exact ( & mut json_bytes) . await . map_err ( |e| {
599
+ if e. kind ( ) == std:: io:: ErrorKind :: UnexpectedEof {
600
+ let message = format ! ( "invalid metadata length for remaining payload: {json_len}" ) ;
601
+ bad_request ( message)
602
+ } else {
603
+ e. into ( )
604
+ }
605
+ } ) ?;
596
606
597
- let json_bytes = bytes. split_to ( json_len) ;
607
+ serde_json:: from_slice ( & json_bytes)
608
+ . map_err ( |e| bad_request ( format_args ! ( "invalid upload request: {e}" ) ) )
609
+ }
598
610
599
- if bytes. len ( ) < 4 {
600
- // Avoid panic in `get_u32_le()` if there is not enough remaining data
601
- return Err ( bad_request ( "invalid tarball length" ) ) ;
602
- }
611
+ async fn read_tarball_bytes < R : AsyncRead + Unpin > (
612
+ reader : & mut R ,
613
+ max_length : u32 ,
614
+ ) -> Result < Bytes , BoxedAppError > {
615
+ let tarball_len = reader. read_u32_le ( ) . await . map_err ( |e| {
616
+ if e. kind ( ) == std:: io:: ErrorKind :: UnexpectedEof {
617
+ bad_request ( "invalid tarball length" )
618
+ } else {
619
+ e. into ( )
620
+ }
621
+ } ) ?;
603
622
604
- let tarball_len = bytes. get_u32_le ( ) as usize ;
605
- if tarball_len > bytes. len ( ) {
606
- return Err ( bad_request ( format ! (
607
- "invalid tarball length for remaining payload: {tarball_len}"
608
- ) ) ) ;
623
+ if tarball_len > max_length {
624
+ let message = format ! ( "max upload size is: {}" , max_length) ;
625
+ return Err ( custom ( StatusCode :: PAYLOAD_TOO_LARGE , message) ) ;
609
626
}
610
627
611
- let tarball_bytes = bytes. split_to ( tarball_len) ;
628
+ let mut tarball_bytes = vec ! [ 0 ; tarball_len as usize ] ;
629
+ reader. read_exact ( & mut tarball_bytes) . await . map_err ( |e| {
630
+ if e. kind ( ) == std:: io:: ErrorKind :: UnexpectedEof {
631
+ let message = format ! ( "invalid tarball length for remaining payload: {tarball_len}" ) ;
632
+ bad_request ( message)
633
+ } else {
634
+ e. into ( )
635
+ }
636
+ } ) ?;
612
637
613
- Ok ( ( json_bytes , tarball_bytes) )
638
+ Ok ( Bytes :: from ( tarball_bytes) )
614
639
}
615
640
616
641
async fn is_reserved_name ( name : & str , conn : & mut AsyncPgConnection ) -> QueryResult < bool > {
0 commit comments