@@ -187,15 +187,9 @@ pub async fn get_alert(req: HttpRequest) -> Result<impl Responder, StreamError>
187187
188188pub async fn put_stream ( req : HttpRequest , body : Bytes ) -> Result < impl Responder , StreamError > {
189189 let stream_name: String = req. match_info ( ) . get ( "logstream" ) . unwrap ( ) . parse ( ) . unwrap ( ) ;
190- let update_stream = if let Some ( ( _, update_stream) ) = req
191- . headers ( )
192- . iter ( )
193- . find ( |& ( key, _) | key == UPDATE_STREAM_KEY )
194- {
195- update_stream. to_str ( ) . unwrap ( )
196- } else {
197- ""
198- } ;
190+ let ( time_partition, time_partition_limit, custom_partition, static_schema_flag, update_stream) =
191+ fetch_headers_from_put_stream_request ( & req) ;
192+
199193 if metadata:: STREAM_INFO . stream_exists ( & stream_name) && update_stream != "true" {
200194 // Error if the log stream already exists
201195 return Err ( StreamError :: Custom {
@@ -206,43 +200,19 @@ pub async fn put_stream(req: HttpRequest, body: Bytes) -> Result<impl Responder,
206200 } ) ;
207201 }
208202
209- let time_partition = if let Some ( ( _, time_partition_name) ) = req
210- . headers ( )
211- . iter ( )
212- . find ( |& ( key, _) | key == TIME_PARTITION_KEY )
213- {
214- time_partition_name. to_str ( ) . unwrap ( )
215- } else {
216- ""
217- } ;
218-
219203 if !time_partition. is_empty ( ) && update_stream == "true" {
220204 return Err ( StreamError :: Custom {
221205 msg : "Altering the time partition of an existing stream is restricted." . to_string ( ) ,
222206 status : StatusCode :: BAD_REQUEST ,
223207 } ) ;
224208 }
225209 let mut time_partition_in_days: & str = "" ;
226- if let Some ( ( _, time_partition_limit_name) ) = req
227- . headers ( )
228- . iter ( )
229- . find ( |& ( key, _) | key == TIME_PARTITION_LIMIT_KEY )
230- {
231- let time_partition_limit = time_partition_limit_name. to_str ( ) . unwrap ( ) ;
232- if !time_partition_limit. ends_with ( 'd' ) {
233- return Err ( StreamError :: Custom {
234- msg : "Missing 'd' suffix for duration value" . to_string ( ) ,
235- status : StatusCode :: BAD_REQUEST ,
236- } ) ;
237- }
238- let days = & time_partition_limit[ 0 ..time_partition_limit. len ( ) - 1 ] ;
239- if days. parse :: < NonZeroU32 > ( ) . is_err ( ) {
240- return Err ( StreamError :: Custom {
241- msg : "Could not convert duration to an unsigned number" . to_string ( ) ,
242- status : StatusCode :: BAD_REQUEST ,
243- } ) ;
210+ if !time_partition_limit. is_empty ( ) {
211+ let time_partition_days = validate_time_partition_limit ( & time_partition_limit) ;
212+ if let Err ( err) = time_partition_days {
213+ return Err ( StreamError :: CreateStream ( err) ) ;
244214 } else {
245- time_partition_in_days = days ;
215+ time_partition_in_days = time_partition_days . unwrap ( ) ;
246216 if update_stream == "true" {
247217 if let Err ( err) = update_time_partition_limit_in_stream (
248218 stream_name. clone ( ) ,
@@ -256,15 +226,6 @@ pub async fn put_stream(req: HttpRequest, body: Bytes) -> Result<impl Responder,
256226 }
257227 }
258228 }
259- let static_schema_flag = if let Some ( ( _, static_schema_flag) ) = req
260- . headers ( )
261- . iter ( )
262- . find ( |& ( key, _) | key == STATIC_SCHEMA_FLAG )
263- {
264- static_schema_flag. to_str ( ) . unwrap ( )
265- } else {
266- ""
267- } ;
268229
269230 if !static_schema_flag. is_empty ( ) && update_stream == "true" {
270231 return Err ( StreamError :: Custom {
@@ -273,34 +234,118 @@ pub async fn put_stream(req: HttpRequest, body: Bytes) -> Result<impl Responder,
273234 } ) ;
274235 }
275236
276- let mut custom_partition: & str = "" ;
277- if let Some ( ( _, custom_partition_key) ) = req
278- . headers ( )
279- . iter ( )
280- . find ( |& ( key, _) | key == CUSTOM_PARTITION_KEY )
281- {
282- custom_partition = custom_partition_key. to_str ( ) . unwrap ( ) ;
283- let custom_partition_list = custom_partition. split ( ',' ) . collect :: < Vec < & str > > ( ) ;
284- if custom_partition_list. len ( ) > 3 {
285- return Err ( StreamError :: Custom {
286- msg : "Maximum 3 custom partition keys are supported" . to_string ( ) ,
287- status : StatusCode :: BAD_REQUEST ,
288- } ) ;
237+ if !custom_partition. is_empty ( ) {
238+ if let Err ( err) = validate_custom_partition ( & custom_partition) {
239+ return Err ( StreamError :: CreateStream ( err) ) ;
289240 }
290241 if update_stream == "true" {
291242 if let Err ( err) =
292- update_custom_partition_in_stream ( stream_name. clone ( ) , custom_partition) . await
243+ update_custom_partition_in_stream ( stream_name. clone ( ) , & custom_partition) . await
293244 {
294245 return Err ( StreamError :: CreateStream ( err) ) ;
295246 }
296247 return Ok ( ( "Log stream updated" , StatusCode :: OK ) ) ;
297248 }
298249 }
299250
300- let mut schema = Arc :: new ( Schema :: empty ( ) ) ;
251+ let schema = validate_static_schema (
252+ & body,
253+ & stream_name,
254+ & time_partition,
255+ & custom_partition,
256+ & static_schema_flag,
257+ ) ;
258+ if let Err ( err) = schema {
259+ return Err ( StreamError :: CreateStream ( err) ) ;
260+ }
261+
262+ create_stream (
263+ stream_name,
264+ & time_partition,
265+ time_partition_in_days,
266+ & custom_partition,
267+ & static_schema_flag,
268+ schema. unwrap ( ) ,
269+ )
270+ . await ?;
271+
272+ Ok ( ( "Log stream created" , StatusCode :: OK ) )
273+ }
274+
275+ fn fetch_headers_from_put_stream_request (
276+ req : & HttpRequest ,
277+ ) -> ( String , String , String , String , String ) {
278+ let mut time_partition = String :: default ( ) ;
279+ let mut time_partition_limit = String :: default ( ) ;
280+ let mut custom_partition = String :: default ( ) ;
281+ let mut static_schema_flag = String :: default ( ) ;
282+ let mut update_stream = String :: default ( ) ;
283+ req. headers ( ) . iter ( ) . for_each ( |( key, value) | {
284+ if key == TIME_PARTITION_KEY {
285+ time_partition = value. to_str ( ) . unwrap ( ) . to_string ( ) ;
286+ }
287+ if key == TIME_PARTITION_LIMIT_KEY {
288+ time_partition_limit = value. to_str ( ) . unwrap ( ) . to_string ( ) ;
289+ }
290+ if key == CUSTOM_PARTITION_KEY {
291+ custom_partition = value. to_str ( ) . unwrap ( ) . to_string ( ) ;
292+ }
293+ if key == STATIC_SCHEMA_FLAG {
294+ static_schema_flag = value. to_str ( ) . unwrap ( ) . to_string ( ) ;
295+ }
296+ if key == UPDATE_STREAM_KEY {
297+ update_stream = value. to_str ( ) . unwrap ( ) . to_string ( ) ;
298+ }
299+ } ) ;
301300
301+ (
302+ time_partition,
303+ time_partition_limit,
304+ custom_partition,
305+ static_schema_flag,
306+ update_stream,
307+ )
308+ }
309+
310+ fn validate_time_partition_limit ( time_partition_limit : & str ) -> Result < & str , CreateStreamError > {
311+ if !time_partition_limit. ends_with ( 'd' ) {
312+ return Err ( CreateStreamError :: Custom {
313+ msg : "Missing 'd' suffix for duration value" . to_string ( ) ,
314+ status : StatusCode :: BAD_REQUEST ,
315+ } ) ;
316+ }
317+ let days = & time_partition_limit[ 0 ..time_partition_limit. len ( ) - 1 ] ;
318+ if days. parse :: < NonZeroU32 > ( ) . is_err ( ) {
319+ return Err ( CreateStreamError :: Custom {
320+ msg : "Could not convert duration to an unsigned number" . to_string ( ) ,
321+ status : StatusCode :: BAD_REQUEST ,
322+ } ) ;
323+ }
324+
325+ Ok ( days)
326+ }
327+
328+ fn validate_custom_partition ( custom_partition : & str ) -> Result < ( ) , CreateStreamError > {
329+ let custom_partition_list = custom_partition. split ( ',' ) . collect :: < Vec < & str > > ( ) ;
330+ if custom_partition_list. len ( ) > 3 {
331+ return Err ( CreateStreamError :: Custom {
332+ msg : "Maximum 3 custom partition keys are supported" . to_string ( ) ,
333+ status : StatusCode :: BAD_REQUEST ,
334+ } ) ;
335+ }
336+ Ok ( ( ) )
337+ }
338+
339+ fn validate_static_schema (
340+ body : & Bytes ,
341+ stream_name : & str ,
342+ time_partition : & str ,
343+ custom_partition : & str ,
344+ static_schema_flag : & str ,
345+ ) -> Result < Arc < Schema > , CreateStreamError > {
346+ let mut schema = Arc :: new ( Schema :: empty ( ) ) ;
302347 if !body. is_empty ( ) && static_schema_flag == "true" {
303- let static_schema: StaticSchema = serde_json:: from_slice ( & body) ?;
348+ let static_schema: StaticSchema = serde_json:: from_slice ( body) ?;
304349
305350 let parsed_schema = convert_static_schema_to_arrow_schema (
306351 static_schema. clone ( ) ,
@@ -310,31 +355,21 @@ pub async fn put_stream(req: HttpRequest, body: Bytes) -> Result<impl Responder,
310355 if let Ok ( parsed_schema) = parsed_schema {
311356 schema = parsed_schema;
312357 } else {
313- return Err ( StreamError :: Custom {
358+ return Err ( CreateStreamError :: Custom {
314359 msg : format ! ( "Unable to commit static schema, logstream {stream_name} not created" ) ,
315360 status : StatusCode :: BAD_REQUEST ,
316361 } ) ;
317362 }
318363 } else if body. is_empty ( ) && static_schema_flag == "true" {
319- return Err ( StreamError :: Custom {
364+ return Err ( CreateStreamError :: Custom {
320365 msg : format ! (
321366 "Please provide schema in the request body for static schema logstream {stream_name}"
322367 ) ,
323368 status : StatusCode :: BAD_REQUEST ,
324369 } ) ;
325370 }
326371
327- create_stream (
328- stream_name,
329- time_partition,
330- time_partition_in_days,
331- custom_partition,
332- static_schema_flag,
333- schema,
334- )
335- . await ?;
336-
337- Ok ( ( "Log stream created" , StatusCode :: OK ) )
372+ Ok ( schema)
338373}
339374
340375pub async fn put_alert (
@@ -810,6 +845,8 @@ pub mod error {
810845 } ,
811846 #[ error( "{msg}" ) ]
812847 Custom { msg : String , status : StatusCode } ,
848+ #[ error( "Could not deserialize into JSON object, {0}" ) ]
849+ SerdeError ( #[ from] serde_json:: Error ) ,
813850 }
814851
815852 #[ derive( Debug , thiserror:: Error ) ]
@@ -865,6 +902,9 @@ pub mod error {
865902 StreamError :: CreateStream ( CreateStreamError :: Custom { .. } ) => {
866903 StatusCode :: INTERNAL_SERVER_ERROR
867904 }
905+ StreamError :: CreateStream ( CreateStreamError :: SerdeError ( _) ) => {
906+ StatusCode :: BAD_REQUEST
907+ }
868908 StreamError :: CacheNotEnabled ( _) => StatusCode :: BAD_REQUEST ,
869909 StreamError :: StreamNotFound ( _) => StatusCode :: NOT_FOUND ,
870910 StreamError :: Custom { status, .. } => * status,
0 commit comments