Skip to content

Commit dc9e0c5

Browse files
feat: add a new stream info API (#720)
Add API - GET /logstream/{logstream}/info Response - { "created-at": "2024-03-27T15:58:28.418792+05:30", "first-event-at": "2024-03-27T15:59:08.980+05:30", "cache_enabled": false, "time_partition": "source_time" } Also removed logic of first-event-at and created-at from stats API response
1 parent 5209694 commit dc9e0c5

File tree

4 files changed

+62
-22
lines changed

4 files changed

+62
-22
lines changed

server/src/handlers/http.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,14 @@ pub fn configure_routes(
158158
)
159159
.app_data(web::PayloadConfig::default().limit(MAX_EVENT_PAYLOAD_SIZE)),
160160
)
161+
.service(
162+
// GET "/logstream/{logstream}/info" ==> Get info for given log stream
163+
web::resource("/info").route(
164+
web::get()
165+
.to(logstream::get_stream_info)
166+
.authorize_for_stream(Action::GetStream),
167+
),
168+
)
161169
.service(
162170
web::resource("/alert")
163171
// PUT "/logstream/{logstream}/alert" ==> Set alert for given log stream

server/src/handlers/http/logstream.rs

Lines changed: 36 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ use crate::alerts::Alerts;
2323
use crate::handlers::TIME_PARTITION_KEY;
2424
use crate::metadata::STREAM_INFO;
2525
use crate::option::CONFIG;
26-
use crate::storage::{retention::Retention, LogStream, StorageDir};
26+
use crate::storage::{retention::Retention, LogStream, StorageDir, StreamInfo};
2727
use crate::{catalog, event, stats};
2828
use crate::{metadata, validator};
2929
use actix_web::http::StatusCode;
@@ -271,34 +271,13 @@ pub async fn get_stats(req: HttpRequest) -> Result<impl Responder, StreamError>
271271
return Err(StreamError::StreamNotFound(stream_name));
272272
}
273273

274-
if first_event_at_empty(&stream_name) {
275-
let store = CONFIG.storage().get_object_store();
276-
if let Ok(Some(first_event_at)) = catalog::get_first_event(store, &stream_name).await {
277-
if let Err(err) =
278-
metadata::STREAM_INFO.set_first_event_at(&stream_name, Some(first_event_at))
279-
{
280-
log::error!(
281-
"Failed to update first_event_at in streaminfo for stream {:?} {err:?}",
282-
stream_name
283-
);
284-
}
285-
}
286-
}
287-
288274
let stats = stats::get_current_stats(&stream_name, "json")
289275
.ok_or(StreamError::StreamNotFound(stream_name.clone()))?;
290276

291-
let hash_map = STREAM_INFO.read().unwrap();
292-
let stream_meta = &hash_map
293-
.get(&stream_name)
294-
.ok_or(StreamError::StreamNotFound(stream_name.clone()))?;
295-
296277
let time = Utc::now();
297278

298279
let stats = serde_json::json!({
299280
"stream": stream_name,
300-
"creation_time": &stream_meta.created_at,
301-
"first_event_at": Some(&stream_meta.first_event_at),
302281
"time": time,
303282
"ingestion": {
304283
"count": stats.events,
@@ -366,6 +345,41 @@ pub async fn create_stream(
366345
Ok(())
367346
}
368347

348+
pub async fn get_stream_info(req: HttpRequest) -> Result<impl Responder, StreamError> {
349+
let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap();
350+
if !metadata::STREAM_INFO.stream_exists(&stream_name) {
351+
return Err(StreamError::StreamNotFound(stream_name));
352+
}
353+
354+
if first_event_at_empty(&stream_name) {
355+
let store = CONFIG.storage().get_object_store();
356+
if let Ok(Some(first_event_at)) = catalog::get_first_event(store, &stream_name).await {
357+
if let Err(err) =
358+
metadata::STREAM_INFO.set_first_event_at(&stream_name, Some(first_event_at))
359+
{
360+
log::error!(
361+
"Failed to update first_event_at in streaminfo for stream {:?} {err:?}",
362+
stream_name
363+
);
364+
}
365+
}
366+
}
367+
368+
let hash_map = STREAM_INFO.read().unwrap();
369+
let stream_meta = &hash_map
370+
.get(&stream_name)
371+
.ok_or(StreamError::StreamNotFound(stream_name.clone()))?;
372+
373+
let stream_info: StreamInfo = StreamInfo {
374+
created_at: stream_meta.created_at.clone(),
375+
first_event_at: stream_meta.first_event_at.clone(),
376+
time_partition: stream_meta.time_partition.clone(),
377+
cache_enabled: stream_meta.cache_enabled,
378+
};
379+
380+
Ok((web::Json(stream_info), StatusCode::OK))
381+
}
382+
369383
pub mod error {
370384

371385
use actix_web::http::header::ContentType;

server/src/rbac/role.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ pub enum Action {
2424
Query,
2525
CreateStream,
2626
ListStream,
27+
GetStream,
2728
GetSchema,
2829
GetStats,
2930
DeleteStream,
@@ -97,6 +98,7 @@ impl RoleBuilder {
9798
| Action::ListRole
9899
| Action::CreateStream
99100
| Action::DeleteStream
101+
| Action::GetStream
100102
| Action::ListStream => Permission::Unit(action),
101103
Action::Ingest
102104
| Action::GetSchema
@@ -169,6 +171,7 @@ pub mod model {
169171
Action::Query,
170172
Action::CreateStream,
171173
Action::ListStream,
174+
Action::GetStream,
172175
Action::GetSchema,
173176
Action::GetStats,
174177
Action::GetRetention,
@@ -191,6 +194,7 @@ pub mod model {
191194
Action::Ingest,
192195
Action::Query,
193196
Action::ListStream,
197+
Action::GetStream,
194198
Action::GetSchema,
195199
Action::GetStats,
196200
Action::GetRetention,
@@ -209,6 +213,7 @@ pub mod model {
209213
actions: vec![
210214
Action::Query,
211215
Action::ListStream,
216+
Action::GetStream,
212217
Action::GetSchema,
213218
Action::GetStats,
214219
Action::GetRetention,

server/src/storage.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,19 @@ pub struct ObjectStoreFormat {
8686
pub time_partition: Option<String>,
8787
}
8888

89+
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
90+
pub struct StreamInfo {
91+
#[serde(rename = "created-at")]
92+
pub created_at: String,
93+
#[serde(rename = "first-event-at")]
94+
#[serde(skip_serializing_if = "Option::is_none")]
95+
pub first_event_at: Option<String>,
96+
#[serde(default)]
97+
pub cache_enabled: bool,
98+
#[serde(skip_serializing_if = "Option::is_none")]
99+
pub time_partition: Option<String>,
100+
}
101+
89102
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
90103
pub struct Owner {
91104
pub id: String,

0 commit comments

Comments
 (0)