Skip to content

Commit 82ab1cf

Browse files
feat: add a new stream info API
API - GET /logstream/{logstream}/info Response - { "created-at": "2024-03-27T15:58:28.418792+05:30", "first-event-at": "2024-03-27T15:59:08.980+05:30", "cache_enabled": false, "time_partition": "source_time" }
1 parent 5209694 commit 82ab1cf

File tree

4 files changed

+62
-8
lines changed

4 files changed

+62
-8
lines changed

server/src/handlers/http.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,14 @@ pub fn configure_routes(
158158
)
159159
.app_data(web::PayloadConfig::default().limit(MAX_EVENT_PAYLOAD_SIZE)),
160160
)
161+
.service(
162+
// GET "/logstream/{logstream}/info" ==> Get info for given log stream
163+
web::resource("/info").route(
164+
web::get()
165+
.to(logstream::get_stream_info)
166+
.authorize_for_stream(Action::GetStream),
167+
),
168+
)
161169
.service(
162170
web::resource("/alert")
163171
// PUT "/logstream/{logstream}/alert" ==> Set alert for given log stream

server/src/handlers/http/logstream.rs

Lines changed: 36 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ use crate::alerts::Alerts;
2323
use crate::handlers::TIME_PARTITION_KEY;
2424
use crate::metadata::STREAM_INFO;
2525
use crate::option::CONFIG;
26-
use crate::storage::{retention::Retention, LogStream, StorageDir};
26+
use crate::storage::{retention::Retention, LogStream, StorageDir, StreamInfo};
2727
use crate::{catalog, event, stats};
2828
use crate::{metadata, validator};
2929
use actix_web::http::StatusCode;
@@ -288,17 +288,10 @@ pub async fn get_stats(req: HttpRequest) -> Result<impl Responder, StreamError>
288288
let stats = stats::get_current_stats(&stream_name, "json")
289289
.ok_or(StreamError::StreamNotFound(stream_name.clone()))?;
290290

291-
let hash_map = STREAM_INFO.read().unwrap();
292-
let stream_meta = &hash_map
293-
.get(&stream_name)
294-
.ok_or(StreamError::StreamNotFound(stream_name.clone()))?;
295-
296291
let time = Utc::now();
297292

298293
let stats = serde_json::json!({
299294
"stream": stream_name,
300-
"creation_time": &stream_meta.created_at,
301-
"first_event_at": Some(&stream_meta.first_event_at),
302295
"time": time,
303296
"ingestion": {
304297
"count": stats.events,
@@ -366,6 +359,41 @@ pub async fn create_stream(
366359
Ok(())
367360
}
368361

362+
pub async fn get_stream_info(req: HttpRequest) -> Result<impl Responder, StreamError> {
363+
let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap();
364+
if !metadata::STREAM_INFO.stream_exists(&stream_name) {
365+
return Err(StreamError::StreamNotFound(stream_name));
366+
}
367+
368+
if first_event_at_empty(&stream_name) {
369+
let store = CONFIG.storage().get_object_store();
370+
if let Ok(Some(first_event_at)) = catalog::get_first_event(store, &stream_name).await {
371+
if let Err(err) =
372+
metadata::STREAM_INFO.set_first_event_at(&stream_name, Some(first_event_at))
373+
{
374+
log::error!(
375+
"Failed to update first_event_at in streaminfo for stream {:?} {err:?}",
376+
stream_name
377+
);
378+
}
379+
}
380+
}
381+
382+
let hash_map = STREAM_INFO.read().unwrap();
383+
let stream_meta = &hash_map
384+
.get(&stream_name)
385+
.ok_or(StreamError::StreamNotFound(stream_name.clone()))?;
386+
387+
let stream_info: StreamInfo = StreamInfo {
388+
created_at: stream_meta.created_at.clone(),
389+
first_event_at: stream_meta.first_event_at.clone(),
390+
time_partition: stream_meta.time_partition.clone(),
391+
cache_enabled: stream_meta.cache_enabled,
392+
};
393+
394+
Ok((web::Json(stream_info), StatusCode::OK))
395+
}
396+
369397
pub mod error {
370398

371399
use actix_web::http::header::ContentType;

server/src/rbac/role.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ pub enum Action {
2424
Query,
2525
CreateStream,
2626
ListStream,
27+
GetStream,
2728
GetSchema,
2829
GetStats,
2930
DeleteStream,
@@ -97,6 +98,7 @@ impl RoleBuilder {
9798
| Action::ListRole
9899
| Action::CreateStream
99100
| Action::DeleteStream
101+
| Action::GetStream
100102
| Action::ListStream => Permission::Unit(action),
101103
Action::Ingest
102104
| Action::GetSchema
@@ -169,6 +171,7 @@ pub mod model {
169171
Action::Query,
170172
Action::CreateStream,
171173
Action::ListStream,
174+
Action::GetStream,
172175
Action::GetSchema,
173176
Action::GetStats,
174177
Action::GetRetention,
@@ -191,6 +194,7 @@ pub mod model {
191194
Action::Ingest,
192195
Action::Query,
193196
Action::ListStream,
197+
Action::GetStream,
194198
Action::GetSchema,
195199
Action::GetStats,
196200
Action::GetRetention,
@@ -209,6 +213,7 @@ pub mod model {
209213
actions: vec![
210214
Action::Query,
211215
Action::ListStream,
216+
Action::GetStream,
212217
Action::GetSchema,
213218
Action::GetStats,
214219
Action::GetRetention,

server/src/storage.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,19 @@ pub struct ObjectStoreFormat {
8686
pub time_partition: Option<String>,
8787
}
8888

89+
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
90+
pub struct StreamInfo {
91+
#[serde(rename = "created-at")]
92+
pub created_at: String,
93+
#[serde(rename = "first-event-at")]
94+
#[serde(skip_serializing_if = "Option::is_none")]
95+
pub first_event_at: Option<String>,
96+
#[serde(default)]
97+
pub cache_enabled: bool,
98+
#[serde(skip_serializing_if = "Option::is_none")]
99+
pub time_partition: Option<String>,
100+
}
101+
89102
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
90103
pub struct Owner {
91104
pub id: String,

0 commit comments

Comments
 (0)