diff --git a/server/src/handlers/http/logstream.rs b/server/src/handlers/http/logstream.rs index 080761b7d..711a88aa2 100644 --- a/server/src/handlers/http/logstream.rs +++ b/server/src/handlers/http/logstream.rs @@ -957,6 +957,13 @@ pub async fn put_stream_hot_tier( if !metadata::STREAM_INFO.stream_exists(&stream_name) { return Err(StreamError::StreamNotFound(stream_name)); } + + if STREAM_INFO.stream_type(&stream_name).unwrap() == StreamType::Internal.to_string() { + return Err(StreamError::Custom { + msg: "Hot tier can not be updated for internal stream".to_string(), + status: StatusCode::BAD_REQUEST, + }); + } if CONFIG.parseable.hot_tier_storage_path.is_none() { return Err(StreamError::HotTierNotEnabled(stream_name)); } diff --git a/server/src/handlers/http/modal/query_server.rs b/server/src/handlers/http/modal/query_server.rs index c731ea5ca..9d6f3fbdf 100644 --- a/server/src/handlers/http/modal/query_server.rs +++ b/server/src/handlers/http/modal/query_server.rs @@ -193,6 +193,7 @@ impl QueryServer { log::info!("Cluster metrics scheduler started successfully"); } if let Some(hot_tier_manager) = HotTierManager::global() { + hot_tier_manager.put_internal_stream_hot_tier().await?; hot_tier_manager.download_from_s3()?; }; let (localsync_handler, mut localsync_outbox, localsync_inbox) = diff --git a/server/src/hottier.rs b/server/src/hottier.rs index 71bb3cdbd..8501ba5c2 100644 --- a/server/src/hottier.rs +++ b/server/src/hottier.rs @@ -25,6 +25,7 @@ use std::{ use crate::{ catalog::manifest::{File, Manifest}, + handlers::http::cluster::INTERNAL_STREAM_NAME, metadata::{error::stream_info::MetadataError, STREAM_INFO}, option::{ validation::{bytes_to_human_size, human_size_to_bytes}, @@ -51,6 +52,7 @@ use tokio_stream::wrappers::ReadDirStream; pub const STREAM_HOT_TIER_FILENAME: &str = ".hot_tier.json"; pub const MIN_STREAM_HOT_TIER_SIZE_BYTES: u64 = 10737418240; // 10 GiB const HOT_TIER_SYNC_DURATION: Interval = clokwerk::Interval::Minutes(1); +pub const INTERNAL_STREAM_HOT_TIER_SIZE_BYTES: u64 = 10485760; //10 MiB #[derive(Debug, serde::Deserialize, serde::Serialize)] pub struct StreamHotTier { @@ -685,6 +687,22 @@ impl HotTierManager { Ok(None) } + + pub async fn put_internal_stream_hot_tier(&self) -> Result<(), HotTierError> { + if CONFIG.parseable.hot_tier_storage_path.is_some() + && !self.check_stream_hot_tier_exists(INTERNAL_STREAM_NAME) + { + let mut stream_hot_tier = StreamHotTier { + size: INTERNAL_STREAM_HOT_TIER_SIZE_BYTES.to_string(), + used_size: Some("0".to_string()), + available_size: Some(INTERNAL_STREAM_HOT_TIER_SIZE_BYTES.to_string()), + oldest_date_time_entry: None, + }; + self.put_hot_tier(INTERNAL_STREAM_NAME, &mut stream_hot_tier) + .await?; + } + Ok(()) + } } /// get the hot tier file path for the stream