Skip to content

Commit 122dd43

Browse files
fix for staging size metrics
- to subtract the size of the arrow file removed from staging fix for GET /cache - to return error in case global cache is not set
1 parent b251e05 commit 122dd43

File tree

2 files changed

+18
-1
lines changed

2 files changed

+18
-1
lines changed

server/src/handlers/http/logstream.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -338,6 +338,17 @@ pub async fn put_retention(
338338

339339
pub async fn get_cache_enabled(req: HttpRequest) -> Result<impl Responder, StreamError> {
340340
let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap();
341+
342+
match CONFIG.parseable.mode {
343+
Mode::Ingest | Mode::All => {
344+
if CONFIG.parseable.local_cache_path.is_none() {
345+
return Err(StreamError::CacheNotEnabled(stream_name));
346+
}
347+
}
348+
_ => {}
349+
}
350+
351+
341352
let cache_enabled = STREAM_INFO.cache_enabled(&stream_name)?;
342353
Ok((web::Json(cache_enabled), StatusCode::OK))
343354
}

server/src/storage/staging.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -247,10 +247,16 @@ pub fn convert_disk_files_to_parquet(
247247
writer.close()?;
248248

249249
for file in files {
250-
if fs::remove_file(file).is_err() {
250+
let file_size = file.metadata().unwrap().len();
251+
let file_type = file.extension().unwrap().to_str().unwrap();
252+
253+
if fs::remove_file(file.clone()).is_err() {
251254
log::error!("Failed to delete file. Unstable state");
252255
process::abort()
253256
}
257+
metrics::STORAGE_SIZE
258+
.with_label_values(&["staging", stream, file_type])
259+
.sub(file_size as i64);
254260
}
255261
}
256262

0 commit comments

Comments
 (0)