@@ -21,7 +21,7 @@ use super::base_path_without_preceding_slash;
2121use super :: cluster:: fetch_stats_from_ingestors;
2222use super :: cluster:: utils:: { merge_quried_stats, IngestionStats , QueriedStats , StorageStats } ;
2323use crate :: alerts:: Alerts ;
24- use crate :: handlers:: { STATIC_SCHEMA_FLAG , TIME_PARTITION_KEY } ;
24+ use crate :: handlers:: { STATIC_SCHEMA_FLAG , TIME_PARTITION_KEY , TIME_PARTITION_LIMIT_KEY } ;
2525use crate :: metadata:: STREAM_INFO ;
2626use crate :: option:: { Mode , CONFIG } ;
2727use crate :: static_schema:: { convert_static_schema_to_arrow_schema, StaticSchema } ;
@@ -40,6 +40,7 @@ use itertools::Itertools;
4040use serde_json:: Value ;
4141use std:: collections:: HashMap ;
4242use std:: fs;
43+ use std:: num:: NonZeroU32 ;
4344use std:: sync:: Arc ;
4445
4546pub async fn delete ( req : HttpRequest ) -> Result < impl Responder , StreamError > {
@@ -191,6 +192,29 @@ pub async fn put_stream(req: HttpRequest, body: Bytes) -> Result<impl Responder,
191192 } else {
192193 ""
193194 } ;
195+ let mut time_partition_in_days: & str = "" ;
196+ if let Some ( ( _, time_partition_limit_name) ) = req
197+ . headers ( )
198+ . iter ( )
199+ . find ( |& ( key, _) | key == TIME_PARTITION_LIMIT_KEY )
200+ {
201+ let time_partition_limit = time_partition_limit_name. to_str ( ) . unwrap ( ) ;
202+ if !time_partition_limit. ends_with ( 'd' ) {
203+ return Err ( StreamError :: Custom {
204+ msg : "missing 'd' suffix for duration value" . to_string ( ) ,
205+ status : StatusCode :: BAD_REQUEST ,
206+ } ) ;
207+ }
208+ let days = & time_partition_limit[ 0 ..time_partition_limit. len ( ) - 1 ] ;
209+ if days. parse :: < NonZeroU32 > ( ) . is_err ( ) {
210+ return Err ( StreamError :: Custom {
211+ msg : "could not convert duration to an unsigned number" . to_string ( ) ,
212+ status : StatusCode :: BAD_REQUEST ,
213+ } ) ;
214+ } else {
215+ time_partition_in_days = days;
216+ }
217+ }
194218 let static_schema_flag = if let Some ( ( _, static_schema_flag) ) = req
195219 . headers ( )
196220 . iter ( )
@@ -235,7 +259,14 @@ pub async fn put_stream(req: HttpRequest, body: Bytes) -> Result<impl Responder,
235259 } ) ;
236260 }
237261
238- create_stream ( stream_name, time_partition, static_schema_flag, schema) . await ?;
262+ create_stream (
263+ stream_name,
264+ time_partition,
265+ time_partition_in_days,
266+ static_schema_flag,
267+ schema,
268+ )
269+ . await ?;
239270
240271 Ok ( ( "log stream created" , StatusCode :: OK ) )
241272}
@@ -516,6 +547,7 @@ fn remove_id_from_alerts(value: &mut Value) {
516547pub async fn create_stream (
517548 stream_name : String ,
518549 time_partition : & str ,
550+ time_partition_limit : & str ,
519551 static_schema_flag : & str ,
520552 schema : Arc < Schema > ,
521553) -> Result < ( ) , CreateStreamError > {
@@ -528,6 +560,7 @@ pub async fn create_stream(
528560 . create_stream (
529561 & stream_name,
530562 time_partition,
563+ time_partition_limit,
531564 static_schema_flag,
532565 schema. clone ( ) ,
533566 )
@@ -557,6 +590,7 @@ pub async fn create_stream(
557590 stream_name. to_string ( ) ,
558591 created_at,
559592 time_partition. to_string ( ) ,
593+ time_partition_limit. to_string ( ) ,
560594 static_schema_flag. to_string ( ) ,
561595 static_schema,
562596 ) ;
@@ -595,6 +629,7 @@ pub async fn get_stream_info(req: HttpRequest) -> Result<impl Responder, StreamE
595629 created_at : stream_meta. created_at . clone ( ) ,
596630 first_event_at : stream_meta. first_event_at . clone ( ) ,
597631 time_partition : stream_meta. time_partition . clone ( ) ,
632+ time_partition_limit : stream_meta. time_partition_limit . clone ( ) ,
598633 cache_enabled : stream_meta. cache_enabled ,
599634 static_schema_flag : stream_meta. static_schema_flag . clone ( ) ,
600635 } ;
0 commit comments