@@ -23,7 +23,7 @@ use super::ingest::create_stream_if_not_exists;
2323use super :: modal:: utils:: logstream_utils:: create_update_stream;
2424use crate :: alerts:: Alerts ;
2525use crate :: handlers:: STREAM_TYPE_KEY ;
26- use crate :: hottier:: HotTierManager ;
26+ use crate :: hottier:: { HotTierManager , StreamHotTier , CURRENT_HOT_TIER_VERSION } ;
2727use crate :: metadata:: STREAM_INFO ;
2828use crate :: metrics:: { EVENTS_INGESTED_DATE , EVENTS_INGESTED_SIZE_DATE , EVENTS_STORAGE_SIZE_DATE } ;
2929use crate :: option:: CONFIG ;
@@ -512,6 +512,110 @@ pub async fn get_stream_info(req: HttpRequest) -> Result<impl Responder, StreamE
512512 Ok ( ( web:: Json ( stream_info) , StatusCode :: OK ) )
513513}
514514
515+ pub async fn put_stream_hot_tier (
516+ req : HttpRequest ,
517+ body : web:: Json < serde_json:: Value > ,
518+ ) -> Result < impl Responder , StreamError > {
519+ let stream_name: String = req. match_info ( ) . get ( "logstream" ) . unwrap ( ) . parse ( ) . unwrap ( ) ;
520+ if !metadata:: STREAM_INFO . stream_exists ( & stream_name) {
521+ return Err ( StreamError :: StreamNotFound ( stream_name) ) ;
522+ }
523+
524+ if STREAM_INFO . stream_type ( & stream_name) . unwrap ( ) == Some ( StreamType :: Internal . to_string ( ) ) {
525+ return Err ( StreamError :: Custom {
526+ msg : "Hot tier can not be updated for internal stream" . to_string ( ) ,
527+ status : StatusCode :: BAD_REQUEST ,
528+ } ) ;
529+ }
530+ if CONFIG . parseable . hot_tier_storage_path . is_none ( ) {
531+ return Err ( StreamError :: HotTierNotEnabled ( stream_name) ) ;
532+ }
533+
534+ let body = body. into_inner ( ) ;
535+ let mut hottier: StreamHotTier = match serde_json:: from_value ( body) {
536+ Ok ( hottier) => hottier,
537+ Err ( err) => return Err ( StreamError :: InvalidHotTierConfig ( err) ) ,
538+ } ;
539+
540+ validator:: hot_tier ( & hottier. size . to_string ( ) ) ?;
541+
542+ STREAM_INFO . set_hot_tier ( & stream_name, true ) ?;
543+ if let Some ( hot_tier_manager) = HotTierManager :: global ( ) {
544+ let existing_hot_tier_used_size = hot_tier_manager
545+ . validate_hot_tier_size ( & stream_name, & hottier. size )
546+ . await ?;
547+ hottier. used_size = Some ( existing_hot_tier_used_size. to_string ( ) ) ;
548+ hottier. available_size = Some ( hottier. size . clone ( ) ) ;
549+ hottier. version = Some ( CURRENT_HOT_TIER_VERSION . to_string ( ) ) ;
550+ hot_tier_manager
551+ . put_hot_tier ( & stream_name, & mut hottier)
552+ . await ?;
553+ let storage = CONFIG . storage ( ) . get_object_store ( ) ;
554+ let mut stream_metadata = storage. get_object_store_format ( & stream_name) . await ?;
555+ stream_metadata. hot_tier_enabled = Some ( true ) ;
556+ storage
557+ . put_stream_manifest ( & stream_name, & stream_metadata)
558+ . await ?;
559+ }
560+
561+ Ok ( (
562+ format ! ( "hot tier set for stream {stream_name}" ) ,
563+ StatusCode :: OK ,
564+ ) )
565+ }
566+
567+ pub async fn get_stream_hot_tier ( req : HttpRequest ) -> Result < impl Responder , StreamError > {
568+ let stream_name: String = req. match_info ( ) . get ( "logstream" ) . unwrap ( ) . parse ( ) . unwrap ( ) ;
569+
570+ if !metadata:: STREAM_INFO . stream_exists ( & stream_name) {
571+ return Err ( StreamError :: StreamNotFound ( stream_name) ) ;
572+ }
573+
574+ if CONFIG . parseable . hot_tier_storage_path . is_none ( ) {
575+ return Err ( StreamError :: HotTierNotEnabled ( stream_name) ) ;
576+ }
577+
578+ if let Some ( hot_tier_manager) = HotTierManager :: global ( ) {
579+ let mut hot_tier = hot_tier_manager. get_hot_tier ( & stream_name) . await ?;
580+ hot_tier. size = format ! ( "{} {}" , hot_tier. size, "Bytes" ) ;
581+ hot_tier. used_size = Some ( format ! ( "{} {}" , hot_tier. used_size. unwrap( ) , "Bytes" ) ) ;
582+ hot_tier. available_size = Some ( format ! ( "{} {}" , hot_tier. available_size. unwrap( ) , "Bytes" ) ) ;
583+ Ok ( ( web:: Json ( hot_tier) , StatusCode :: OK ) )
584+ } else {
585+ Err ( StreamError :: Custom {
586+ msg : format ! ( "hot tier not initialised for stream {}" , stream_name) ,
587+ status : ( StatusCode :: BAD_REQUEST ) ,
588+ } )
589+ }
590+ }
591+
592+ pub async fn delete_stream_hot_tier ( req : HttpRequest ) -> Result < impl Responder , StreamError > {
593+ let stream_name: String = req. match_info ( ) . get ( "logstream" ) . unwrap ( ) . parse ( ) . unwrap ( ) ;
594+
595+ if !metadata:: STREAM_INFO . stream_exists ( & stream_name) {
596+ return Err ( StreamError :: StreamNotFound ( stream_name) ) ;
597+ }
598+
599+ if CONFIG . parseable . hot_tier_storage_path . is_none ( ) {
600+ return Err ( StreamError :: HotTierNotEnabled ( stream_name) ) ;
601+ }
602+
603+ if STREAM_INFO . stream_type ( & stream_name) . unwrap ( ) == Some ( StreamType :: Internal . to_string ( ) ) {
604+ return Err ( StreamError :: Custom {
605+ msg : "Hot tier can not be deleted for internal stream" . to_string ( ) ,
606+ status : StatusCode :: BAD_REQUEST ,
607+ } ) ;
608+ }
609+
610+ if let Some ( hot_tier_manager) = HotTierManager :: global ( ) {
611+ hot_tier_manager. delete_hot_tier ( & stream_name) . await ?;
612+ }
613+ Ok ( (
614+ format ! ( "hot tier deleted for stream {stream_name}" ) ,
615+ StatusCode :: OK ,
616+ ) )
617+ }
618+
515619pub async fn create_internal_stream_if_not_exists ( ) -> Result < ( ) , StreamError > {
516620 if let Ok ( stream_exists) =
517621 create_stream_if_not_exists ( INTERNAL_STREAM_NAME , & StreamType :: Internal . to_string ( ) ) . await
0 commit comments