Skip to content

Commit 150f6ab

Browse files
committed
fix: add first event time of queryable data
1 parent 44fec77 commit 150f6ab

File tree

7 files changed

+166
-7
lines changed

7 files changed

+166
-7
lines changed

server/src/handlers/http/ingest.rs

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
use actix_web::{http::header::ContentType, HttpRequest, HttpResponse};
2020
use arrow_schema::Field;
2121
use bytes::Bytes;
22+
use chrono::Local;
2223
use http::StatusCode;
2324
use serde_json::Value;
2425
use std::collections::{BTreeMap, HashMap};
@@ -31,7 +32,8 @@ use crate::handlers::{
3132
LOG_SOURCE_KEY, LOG_SOURCE_KINESIS, LOG_SOURCE_OTEL, PREFIX_META, PREFIX_TAGS, SEPARATOR,
3233
STREAM_NAME_HEADER_KEY,
3334
};
34-
use crate::metadata::STREAM_INFO;
35+
use crate::metadata::{self, STREAM_INFO};
36+
use crate::option::CONFIG;
3537
use crate::utils::header_parsing::{collect_labelled_headers, ParseHeaderError};
3638

3739
use super::kinesis;
@@ -47,8 +49,37 @@ pub async fn ingest(req: HttpRequest, body: Bytes) -> Result<HttpResponse, PostE
4749
.find(|&(key, _)| key == STREAM_NAME_HEADER_KEY)
4850
{
4951
let stream_name = stream_name.to_str().unwrap().to_owned();
52+
53+
let mut is_new_stream = true;
54+
if STREAM_INFO.stream_exists(&stream_name) {
55+
is_new_stream = false;
56+
}
57+
5058
create_stream_if_not_exists(&stream_name).await?;
5159

60+
if is_new_stream {
61+
let first_event_at = Local::now().to_rfc3339();
62+
if let Err(err) = CONFIG
63+
.storage()
64+
.get_object_store()
65+
.put_first_event_at(&stream_name, first_event_at.as_str())
66+
.await
67+
{
68+
log::error!(
69+
"Failed to update first_event_at in metadata for stream {:?} {err:?}",
70+
stream_name
71+
);
72+
}
73+
74+
if let Err(err) = metadata::STREAM_INFO.set_first_event_at(&stream_name, first_event_at)
75+
{
76+
log::error!(
77+
"Failed to update first_event_at in streaminfo for stream {:?} {err:?}",
78+
stream_name
79+
);
80+
}
81+
}
82+
5283
flatten_and_push_logs(req, body, stream_name).await?;
5384
Ok(HttpResponse::Ok().finish())
5485
} else {

server/src/handlers/http/logstream.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -269,16 +269,16 @@ pub async fn get_stats(req: HttpRequest) -> Result<impl Responder, StreamError>
269269
.ok_or(StreamError::StreamNotFound(stream_name.clone()))?;
270270

271271
let hash_map = STREAM_INFO.read().unwrap();
272-
let stream_creation_time = &hash_map
272+
let stream_meta = &hash_map
273273
.get(&stream_name)
274-
.ok_or(StreamError::StreamNotFound(stream_name.clone()))?
275-
.created_at;
274+
.ok_or(StreamError::StreamNotFound(stream_name.clone()))?;
276275

277276
let time = Utc::now();
278277

279278
let stats = serde_json::json!({
280279
"stream": stream_name,
281-
"creation_time": stream_creation_time,
280+
"creation_time": stream_meta.created_at,
281+
"first_event_at": stream_meta.first_event_at,
282282
"time": time,
283283
"ingestion": {
284284
"count": stats.events,

server/src/metadata.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ pub struct LogStreamMetadata {
4545
pub alerts: Alerts,
4646
pub cache_enabled: bool,
4747
pub created_at: String,
48+
pub first_event_at: String,
4849
}
4950

5051
// It is very unlikely that panic will occur when dealing with metadata.
@@ -128,6 +129,19 @@ impl StreamInfo {
128129
})
129130
}
130131

132+
pub fn set_first_event_at(
133+
&self,
134+
stream_name: &str,
135+
first_event_at: String,
136+
) -> Result<(), MetadataError> {
137+
let mut map = self.write().expect(LOCK_EXPECT);
138+
map.get_mut(stream_name)
139+
.ok_or(MetadataError::StreamMetaNotFound(stream_name.to_string()))
140+
.map(|metadata| {
141+
metadata.first_event_at = first_event_at;
142+
})
143+
}
144+
131145
pub fn add_stream(&self, stream_name: String, created_at: String) {
132146
let mut map = self.write().expect(LOCK_EXPECT);
133147
let metadata = LogStreamMetadata {
@@ -170,6 +184,7 @@ impl StreamInfo {
170184
alerts,
171185
cache_enabled: meta.cache_enabled,
172186
created_at: meta.created_at,
187+
first_event_at: meta.first_event_at,
173188
};
174189

175190
let mut map = self.write().expect(LOCK_EXPECT);

server/src/stats.rs

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,14 @@
1616
*
1717
*/
1818

19+
use chrono::{DateTime, TimeZone, Utc};
20+
use datafusion::arrow::json::writer::record_batches_to_json_rows;
21+
use datafusion::arrow::record_batch::RecordBatch;
22+
use datafusion::error::DataFusionError;
23+
1924
use crate::metrics::{EVENTS_INGESTED, EVENTS_INGESTED_SIZE, STORAGE_SIZE};
25+
use crate::query;
26+
use crate::query::error::ExecuteError;
2027

2128
/// Helper struct type created by copying stats values from metadata
2229
#[derive(Debug, Default, serde::Serialize, serde::Deserialize, Clone, Copy, PartialEq, Eq)]
@@ -71,3 +78,49 @@ fn event_labels<'a>(stream_name: &'a str, format: &'static str) -> [&'a str; 2]
7178
fn storage_size_labels(stream_name: &str) -> [&str; 3] {
7279
["data", stream_name, "parquet"]
7380
}
81+
82+
pub async fn get_first_event_at(stream_name: &str) -> Result<Option<String>, QueryError> {
83+
let query_string = format!(
84+
"SELECT p_timestamp FROM {} ORDER BY p_timestamp LIMIT 1",
85+
stream_name
86+
);
87+
let first_time: DateTime<Utc> = Utc
88+
.timestamp_opt(0, 0)
89+
.single()
90+
.expect("Failed to get the first UTC time");
91+
let now_time: DateTime<Utc> = Utc::now();
92+
let session_state = query::QUERY_SESSION.state();
93+
let logical_plan = session_state.create_logical_plan(&query_string).await?;
94+
95+
let query = query::Query {
96+
raw_logical_plan: logical_plan,
97+
start: first_time,
98+
end: now_time,
99+
filter_tag: Some(Vec::new()),
100+
};
101+
102+
let (records, _fields) = query.execute().await?;
103+
let records_itr: Vec<&RecordBatch> = records.iter().collect();
104+
let json_records = record_batches_to_json_rows(&records_itr).unwrap();
105+
106+
if let Some(single_record) = json_records.first() {
107+
if let Some(p_timestamp_value) = single_record.get("p_timestamp") {
108+
let p_timestamp_str = p_timestamp_value.as_str().unwrap_or_default();
109+
return Ok(Some(p_timestamp_str.to_string()));
110+
}
111+
}
112+
113+
Ok(None)
114+
}
115+
116+
#[derive(Debug, thiserror::Error)]
117+
pub enum QueryError {
118+
#[error("While generating times for 'now' failed to parse duration")]
119+
NotValidDuration(#[from] humantime::DurationError),
120+
#[error("Parsed duration out of range")]
121+
OutOfRange(#[from] chrono::OutOfRangeError),
122+
#[error("Datafusion Error: {0}")]
123+
Datafusion(#[from] DataFusionError),
124+
#[error("Query execution failed due to {0}")]
125+
Execute(#[from] ExecuteError),
126+
}

server/src/storage.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,8 @@ pub struct ObjectStoreFormat {
6969
pub objectstore_format: String,
7070
#[serde(rename = "created-at")]
7171
pub created_at: String,
72+
#[serde(rename = "first-event-at")]
73+
pub first_event_at: String,
7274
pub owner: Owner,
7375
pub permissions: Vec<Permisssion>,
7476
pub stats: Stats,
@@ -113,6 +115,7 @@ impl Default for ObjectStoreFormat {
113115
version: CURRENT_SCHEMA_VERSION.to_string(),
114116
objectstore_format: CURRENT_OBJECT_STORE_VERSION.to_string(),
115117
created_at: Local::now().to_rfc3339(),
118+
first_event_at: Default::default(),
116119
owner: Owner::new("".to_string(), "".to_string()),
117120
permissions: vec![Permisssion::new("parseable".to_string())],
118121
stats: Stats::default(),

server/src/storage/object_storage.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,23 @@ pub trait ObjectStorage: Sync + 'static {
161161
self.put_object(&path, to_bytes(&stream_metadata)).await
162162
}
163163

164+
async fn put_first_event_at(
165+
&self,
166+
stream_name: &str,
167+
first_event_at: &str,
168+
) -> Result<(), ObjectStorageError> {
169+
let path = stream_json_path(stream_name);
170+
let stream_metadata = self.get_object(&path).await?;
171+
let first_event_ts =
172+
serde_json::to_value(first_event_at).expect("first_event_at is perfectly serializable");
173+
let mut stream_metadata: serde_json::Value =
174+
serde_json::from_slice(&stream_metadata).expect("parseable config is valid json");
175+
176+
stream_metadata["first-event-at"] = first_event_ts;
177+
178+
self.put_object(&path, to_bytes(&stream_metadata)).await
179+
}
180+
164181
async fn put_metadata(
165182
&self,
166183
parseable_metadata: &StorageMetadata,

server/src/storage/retention.rs

Lines changed: 42 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -175,12 +175,12 @@ impl From<Retention> for Vec<TaskView> {
175175
}
176176

177177
mod action {
178-
use chrono::{Days, NaiveDate, Utc};
178+
use chrono::{Days, Local, NaiveDate, NaiveDateTime, TimeZone, Utc};
179179
use futures::{stream::FuturesUnordered, StreamExt};
180180
use itertools::Itertools;
181181
use relative_path::RelativePathBuf;
182182

183-
use crate::option::CONFIG;
183+
use crate::{metadata, option::CONFIG, stats};
184184

185185
pub(super) async fn delete(stream_name: String, days: u32) {
186186
log::info!("running retention task - delete");
@@ -219,6 +219,46 @@ mod action {
219219
log::error!("Failed to run delete task {err:?}")
220220
}
221221
}
222+
223+
// update first-event-at after the cleanup
224+
let first_event = stats::get_first_event_at(&stream_name).await;
225+
let first_event_at = match first_event {
226+
Ok(Some(value)) => Some(value.clone()),
227+
Ok(None) => None,
228+
Err(_err) => None,
229+
};
230+
if let Some(first_event_str) = first_event_at {
231+
let naive_datetime =
232+
NaiveDateTime::parse_from_str(&first_event_str, "%Y-%m-%dT%H:%M:%S%.3f")
233+
.expect("Failed to parse timestamp first_event_at");
234+
235+
let parsed_timestamp = Utc
236+
.from_utc_datetime(&naive_datetime)
237+
.with_timezone(&Local)
238+
.format("%Y-%m-%dT%H:%M:%S%.9f%:z")
239+
.to_string();
240+
241+
if let Err(err) = CONFIG
242+
.storage()
243+
.get_object_store()
244+
.put_first_event_at(&stream_name, parsed_timestamp.as_str())
245+
.await
246+
{
247+
log::error!(
248+
"Failed to update first_event_at in metadata for stream {:?} {err:?}",
249+
stream_name
250+
);
251+
}
252+
253+
if let Err(err) =
254+
metadata::STREAM_INFO.set_first_event_at(&stream_name, parsed_timestamp)
255+
{
256+
log::error!(
257+
"Failed to update first_event_at in streaminfo for stream {:?} {err:?}",
258+
stream_name
259+
);
260+
}
261+
}
222262
}
223263

224264
fn get_retain_until(current_date: NaiveDate, days: u64) -> NaiveDate {

0 commit comments

Comments
 (0)