Skip to content

Commit 007325c

Browse files
enhancement of hot tier implementation
- check for disk availability, - delete older files from hot tier if disk usage exceeds threshold - download the latest entries from S3 to hot tier
1 parent 02b12a9 commit 007325c

File tree

6 files changed

+444
-107
lines changed

6 files changed

+444
-107
lines changed

server/src/cli.rs

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,12 @@ pub struct Cli {
104104

105105
/// The local hot_tier path is used for optimising the query performance in the distributed systems
106106
pub hot_tier_storage_path: Option<PathBuf>,
107+
108+
/// Size for local cache
109+
pub min_hot_tier_size: u64,
110+
111+
///maximum disk usage for hot tier
112+
pub max_disk_usage: f64,
107113
}
108114

109115
impl Cli {
@@ -138,6 +144,8 @@ impl Cli {
138144
pub const FLIGHT_PORT: &'static str = "flight-port";
139145
pub const CORS: &'static str = "cors";
140146
pub const HOT_TIER_PATH: &'static str = "hot-tier-path";
147+
pub const MIN_HOT_TIER_SIZE: &'static str = "hot-tier-size";
148+
pub const MAX_DISK_USAGE: &'static str = "max-disk-usage";
141149

142150
pub fn local_stream_data_path(&self, stream_name: &str) -> PathBuf {
143151
self.local_staging_path.join(stream_name)
@@ -408,7 +416,27 @@ impl Cli {
408416
.value_parser(validation::canonicalize_path)
409417
.help("Local path on this device to be used for hot tier data")
410418
.next_line_help(true),
411-
).group(
419+
)
420+
.arg(
421+
Arg::new(Self::MIN_HOT_TIER_SIZE)
422+
.long(Self::MIN_HOT_TIER_SIZE)
423+
.env("P_MIN_HOT_TIER_SIZE")
424+
.value_name("size")
425+
.default_value("100GiB")
426+
.value_parser(validation::hot_tier_size)
427+
.help("Minimum allowed cache size for all streams combined (In human readable format, e.g 1GiB, 2GiB, 100MB)")
428+
.next_line_help(true),
429+
)
430+
.arg(
431+
Arg::new(Self::MAX_DISK_USAGE)
432+
.long(Self::MAX_DISK_USAGE)
433+
.env("P_MAX_DISK_USAGE")
434+
.value_name("size")
435+
.default_value("80.0")
436+
.value_parser(validation::disk_usage)
437+
.help("Maximum disk usage for hot tier"),
438+
)
439+
.group(
412440
ArgGroup::new("oidc")
413441
.args([Self::OPENID_CLIENT_ID, Self::OPENID_CLIENT_SECRET, Self::OPENID_ISSUER])
414442
.requires_all([Self::OPENID_CLIENT_ID, Self::OPENID_CLIENT_SECRET, Self::OPENID_ISSUER])
@@ -546,7 +574,15 @@ impl FromArgMatches for Cli {
546574
};
547575

548576
self.hot_tier_storage_path = m.get_one::<PathBuf>(Self::HOT_TIER_PATH).cloned();
577+
self.min_hot_tier_size = m
578+
.get_one::<u64>(Self::MIN_HOT_TIER_SIZE)
579+
.cloned()
580+
.expect("default value for cache size");
549581

582+
self.max_disk_usage = m
583+
.get_one::<f64>(Self::MAX_DISK_USAGE)
584+
.cloned()
585+
.expect("default value for max disk usage");
550586
Ok(())
551587
}
552588
}

server/src/handlers/http/logstream.rs

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -951,7 +951,7 @@ pub async fn put_stream_hot_tier(
951951
}
952952

953953
let body = body.into_inner();
954-
let hottier: StreamHotTier = match serde_json::from_value(body) {
954+
let mut hottier: StreamHotTier = match serde_json::from_value(body) {
955955
Ok(hottier) => hottier,
956956
Err(err) => return Err(StreamError::InvalidHotTierConfig(err)),
957957
};
@@ -964,7 +964,7 @@ pub async fn put_stream_hot_tier(
964964

965965
STREAM_INFO.set_hot_tier(&stream_name, true)?;
966966
if let Some(hot_tier_manager) = HotTierManager::global() {
967-
let mut hottier = StreamHotTier {
967+
hottier = StreamHotTier {
968968
start_date: hottier.start_date,
969969
end_date: hottier.end_date,
970970
size: hottier.size.clone(),
@@ -973,8 +973,9 @@ pub async fn put_stream_hot_tier(
973973
updated_date_range: None,
974974
};
975975

976-
hot_tier_manager.validate(&stream_name, &hottier).await?;
977-
976+
hot_tier_manager
977+
.validate(&stream_name, &mut hottier)
978+
.await?;
978979
hot_tier_manager
979980
.put_hot_tier(&stream_name, &mut hottier)
980981
.await?;
@@ -986,10 +987,7 @@ pub async fn put_stream_hot_tier(
986987
.await?;
987988
}
988989

989-
Ok((
990-
format!("hot tier set for log stream {stream_name}"),
991-
StatusCode::OK,
992-
))
990+
Ok((web::Json(hottier), StatusCode::OK))
993991
}
994992

995993
pub async fn get_stream_hot_tier(req: HttpRequest) -> Result<impl Responder, StreamError> {

0 commit comments

Comments
 (0)