From 82ab1cf8c0eb4198bc9d5b30976e5638fddd2b9a Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Wed, 27 Mar 2024 16:00:55 +0530 Subject: [PATCH 1/2] feat: add a new stream info API API - GET /logstream/{logstream}/info Response - { "created-at": "2024-03-27T15:58:28.418792+05:30", "first-event-at": "2024-03-27T15:59:08.980+05:30", "cache_enabled": false, "time_partition": "source_time" } --- server/src/handlers/http.rs | 8 +++++ server/src/handlers/http/logstream.rs | 44 ++++++++++++++++++++++----- server/src/rbac/role.rs | 5 +++ server/src/storage.rs | 13 ++++++++ 4 files changed, 62 insertions(+), 8 deletions(-) diff --git a/server/src/handlers/http.rs b/server/src/handlers/http.rs index e30e3d77a..771eaac7a 100644 --- a/server/src/handlers/http.rs +++ b/server/src/handlers/http.rs @@ -158,6 +158,14 @@ pub fn configure_routes( ) .app_data(web::PayloadConfig::default().limit(MAX_EVENT_PAYLOAD_SIZE)), ) + .service( + // GET "/logstream/{logstream}/info" ==> Get info for given log stream + web::resource("/info").route( + web::get() + .to(logstream::get_stream_info) + .authorize_for_stream(Action::GetStream), + ), + ) .service( web::resource("/alert") // PUT "/logstream/{logstream}/alert" ==> Set alert for given log stream diff --git a/server/src/handlers/http/logstream.rs b/server/src/handlers/http/logstream.rs index e2bca0193..314de6386 100644 --- a/server/src/handlers/http/logstream.rs +++ b/server/src/handlers/http/logstream.rs @@ -23,7 +23,7 @@ use crate::alerts::Alerts; use crate::handlers::TIME_PARTITION_KEY; use crate::metadata::STREAM_INFO; use crate::option::CONFIG; -use crate::storage::{retention::Retention, LogStream, StorageDir}; +use crate::storage::{retention::Retention, LogStream, StorageDir, StreamInfo}; use crate::{catalog, event, stats}; use crate::{metadata, validator}; use actix_web::http::StatusCode; @@ -288,17 +288,10 @@ pub async fn get_stats(req: HttpRequest) -> Result let stats = stats::get_current_stats(&stream_name, "json") .ok_or(StreamError::StreamNotFound(stream_name.clone()))?; - let hash_map = STREAM_INFO.read().unwrap(); - let stream_meta = &hash_map - .get(&stream_name) - .ok_or(StreamError::StreamNotFound(stream_name.clone()))?; - let time = Utc::now(); let stats = serde_json::json!({ "stream": stream_name, - "creation_time": &stream_meta.created_at, - "first_event_at": Some(&stream_meta.first_event_at), "time": time, "ingestion": { "count": stats.events, @@ -366,6 +359,41 @@ pub async fn create_stream( Ok(()) } +pub async fn get_stream_info(req: HttpRequest) -> Result { + let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); + if !metadata::STREAM_INFO.stream_exists(&stream_name) { + return Err(StreamError::StreamNotFound(stream_name)); + } + + if first_event_at_empty(&stream_name) { + let store = CONFIG.storage().get_object_store(); + if let Ok(Some(first_event_at)) = catalog::get_first_event(store, &stream_name).await { + if let Err(err) = + metadata::STREAM_INFO.set_first_event_at(&stream_name, Some(first_event_at)) + { + log::error!( + "Failed to update first_event_at in streaminfo for stream {:?} {err:?}", + stream_name + ); + } + } + } + + let hash_map = STREAM_INFO.read().unwrap(); + let stream_meta = &hash_map + .get(&stream_name) + .ok_or(StreamError::StreamNotFound(stream_name.clone()))?; + + let stream_info: StreamInfo = StreamInfo { + created_at: stream_meta.created_at.clone(), + first_event_at: stream_meta.first_event_at.clone(), + time_partition: stream_meta.time_partition.clone(), + cache_enabled: stream_meta.cache_enabled, + }; + + Ok((web::Json(stream_info), StatusCode::OK)) +} + pub mod error { use actix_web::http::header::ContentType; diff --git a/server/src/rbac/role.rs b/server/src/rbac/role.rs index 43b5160cf..b53e0cde8 100644 --- a/server/src/rbac/role.rs +++ b/server/src/rbac/role.rs @@ -24,6 +24,7 @@ pub enum Action { Query, CreateStream, ListStream, + GetStream, GetSchema, GetStats, DeleteStream, @@ -97,6 +98,7 @@ impl RoleBuilder { | Action::ListRole | Action::CreateStream | Action::DeleteStream + | Action::GetStream | Action::ListStream => Permission::Unit(action), Action::Ingest | Action::GetSchema @@ -169,6 +171,7 @@ pub mod model { Action::Query, Action::CreateStream, Action::ListStream, + Action::GetStream, Action::GetSchema, Action::GetStats, Action::GetRetention, @@ -191,6 +194,7 @@ pub mod model { Action::Ingest, Action::Query, Action::ListStream, + Action::GetStream, Action::GetSchema, Action::GetStats, Action::GetRetention, @@ -209,6 +213,7 @@ pub mod model { actions: vec![ Action::Query, Action::ListStream, + Action::GetStream, Action::GetSchema, Action::GetStats, Action::GetRetention, diff --git a/server/src/storage.rs b/server/src/storage.rs index 5602b1984..f14c9a046 100644 --- a/server/src/storage.rs +++ b/server/src/storage.rs @@ -86,6 +86,19 @@ pub struct ObjectStoreFormat { pub time_partition: Option, } +#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] +pub struct StreamInfo { + #[serde(rename = "created-at")] + pub created_at: String, + #[serde(rename = "first-event-at")] + #[serde(skip_serializing_if = "Option::is_none")] + pub first_event_at: Option, + #[serde(default)] + pub cache_enabled: bool, + #[serde(skip_serializing_if = "Option::is_none")] + pub time_partition: Option, +} + #[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] pub struct Owner { pub id: String, From 54ddccd0b24a3818acd1f7256e1d2e375401ea2c Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Wed, 27 Mar 2024 16:19:45 +0530 Subject: [PATCH 2/2] removed logic of first-event-at from stats api call --- server/src/handlers/http/logstream.rs | 14 -------------- 1 file changed, 14 deletions(-) diff --git a/server/src/handlers/http/logstream.rs b/server/src/handlers/http/logstream.rs index 314de6386..2f7fe39d8 100644 --- a/server/src/handlers/http/logstream.rs +++ b/server/src/handlers/http/logstream.rs @@ -271,20 +271,6 @@ pub async fn get_stats(req: HttpRequest) -> Result return Err(StreamError::StreamNotFound(stream_name)); } - if first_event_at_empty(&stream_name) { - let store = CONFIG.storage().get_object_store(); - if let Ok(Some(first_event_at)) = catalog::get_first_event(store, &stream_name).await { - if let Err(err) = - metadata::STREAM_INFO.set_first_event_at(&stream_name, Some(first_event_at)) - { - log::error!( - "Failed to update first_event_at in streaminfo for stream {:?} {err:?}", - stream_name - ); - } - } - } - let stats = stats::get_current_stats(&stream_name, "json") .ok_or(StreamError::StreamNotFound(stream_name.clone()))?;