Skip to content

Commit 5209694

Browse files
feat: allow historical ingestion with custom date column in log (#716)
This PR allow historical ingestion only when date column provided in header x-p-time-partition and server time are within the same minute, no change for default ingestion.
1 parent 72502ad commit 5209694

File tree

12 files changed

+254
-78
lines changed

12 files changed

+254
-78
lines changed

server/src/event/format.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,13 +41,15 @@ pub trait EventFormat: Sized {
4141
fn to_data(
4242
self,
4343
schema: HashMap<String, Arc<Field>>,
44+
time_partition: Option<String>,
4445
) -> Result<(Self::Data, EventSchema, bool, Tags, Metadata), AnyError>;
4546
fn decode(data: Self::Data, schema: Arc<Schema>) -> Result<RecordBatch, AnyError>;
4647
fn into_recordbatch(
4748
self,
4849
schema: HashMap<String, Arc<Field>>,
50+
time_partition: Option<String>,
4951
) -> Result<(RecordBatch, bool), AnyError> {
50-
let (data, mut schema, is_first, tags, metadata) = self.to_data(schema)?;
52+
let (data, mut schema, is_first, tags, metadata) = self.to_data(schema, time_partition)?;
5153

5254
if get_field(&schema, DEFAULT_TAGS_KEY).is_some() {
5355
return Err(anyhow!("field {} is a reserved field", DEFAULT_TAGS_KEY));

server/src/event/format/json.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,9 @@ impl EventFormat for Event {
4545
fn to_data(
4646
self,
4747
schema: HashMap<String, Arc<Field>>,
48+
time_partition: Option<String>,
4849
) -> Result<(Self::Data, Vec<Arc<Field>>, bool, Tags, Metadata), anyhow::Error> {
49-
let data = flatten_json_body(self.data)?;
50+
let data = flatten_json_body(self.data, time_partition)?;
5051
let stream_schema = schema;
5152

5253
// incoming event may be a single json or a json array

server/src/handlers.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ const PREFIX_TAGS: &str = "x-p-tag-";
2323
const PREFIX_META: &str = "x-p-meta-";
2424
const STREAM_NAME_HEADER_KEY: &str = "x-p-stream";
2525
const LOG_SOURCE_KEY: &str = "x-p-log-source";
26+
const TIME_PARTITION_KEY: &str = "x-p-time-partition";
2627
const AUTHORIZATION_KEY: &str = "authorization";
2728
const SEPARATOR: char = '^';
2829

server/src/handlers/http/ingest.rs

Lines changed: 53 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,14 @@ async fn push_logs(stream_name: String, req: HttpRequest, body: Bytes) -> Result
9999
.ok_or(PostError::StreamNotFound(stream_name.clone()))?
100100
.schema
101101
.clone();
102-
into_event_batch(req, body, schema)?
102+
103+
let time_partition = hash_map
104+
.get(&stream_name)
105+
.ok_or(PostError::StreamNotFound(stream_name.clone()))?
106+
.time_partition
107+
.clone();
108+
109+
into_event_batch(req, body, schema, time_partition)?
103110
};
104111

105112
event::Event {
@@ -119,6 +126,7 @@ fn into_event_batch(
119126
req: HttpRequest,
120127
body: Bytes,
121128
schema: HashMap<String, Arc<Field>>,
129+
time_partition: Option<String>,
122130
) -> Result<(usize, arrow_array::RecordBatch, bool), PostError> {
123131
let tags = collect_labelled_headers(&req, PREFIX_TAGS, SEPARATOR)?;
124132
let metadata = collect_labelled_headers(&req, PREFIX_META, SEPARATOR)?;
@@ -129,7 +137,7 @@ fn into_event_batch(
129137
tags,
130138
metadata,
131139
};
132-
let (rb, is_first) = event.into_recordbatch(schema)?;
140+
let (rb, is_first) = event.into_recordbatch(schema, time_partition)?;
133141
Ok((size, rb, is_first))
134142
}
135143

@@ -138,7 +146,7 @@ pub async fn create_stream_if_not_exists(stream_name: &str) -> Result<(), PostEr
138146
if STREAM_INFO.stream_exists(stream_name) {
139147
return Ok(());
140148
}
141-
super::logstream::create_stream(stream_name.to_string()).await?;
149+
super::logstream::create_stream(stream_name.to_string(), "").await?;
142150
Ok(())
143151
}
144152

@@ -241,6 +249,7 @@ mod tests {
241249
req,
242250
Bytes::from(serde_json::to_vec(&json).unwrap()),
243251
HashMap::default(),
252+
None,
244253
)
245254
.unwrap();
246255

@@ -287,6 +296,7 @@ mod tests {
287296
req,
288297
Bytes::from(serde_json::to_vec(&json).unwrap()),
289298
HashMap::default(),
299+
None,
290300
)
291301
.unwrap();
292302

@@ -320,8 +330,13 @@ mod tests {
320330

321331
let req = TestRequest::default().to_http_request();
322332

323-
let (_, rb, _) =
324-
into_event_batch(req, Bytes::from(serde_json::to_vec(&json).unwrap()), schema).unwrap();
333+
let (_, rb, _) = into_event_batch(
334+
req,
335+
Bytes::from(serde_json::to_vec(&json).unwrap()),
336+
schema,
337+
None,
338+
)
339+
.unwrap();
325340

326341
assert_eq!(rb.num_rows(), 1);
327342
assert_eq!(rb.num_columns(), 5);
@@ -353,10 +368,13 @@ mod tests {
353368

354369
let req = TestRequest::default().to_http_request();
355370

356-
assert!(
357-
into_event_batch(req, Bytes::from(serde_json::to_vec(&json).unwrap()), schema,)
358-
.is_err()
359-
);
371+
assert!(into_event_batch(
372+
req,
373+
Bytes::from(serde_json::to_vec(&json).unwrap()),
374+
schema,
375+
None
376+
)
377+
.is_err());
360378
}
361379

362380
#[test]
@@ -374,8 +392,13 @@ mod tests {
374392

375393
let req = TestRequest::default().to_http_request();
376394

377-
let (_, rb, _) =
378-
into_event_batch(req, Bytes::from(serde_json::to_vec(&json).unwrap()), schema).unwrap();
395+
let (_, rb, _) = into_event_batch(
396+
req,
397+
Bytes::from(serde_json::to_vec(&json).unwrap()),
398+
schema,
399+
None,
400+
)
401+
.unwrap();
379402

380403
assert_eq!(rb.num_rows(), 1);
381404
assert_eq!(rb.num_columns(), 3);
@@ -391,6 +414,7 @@ mod tests {
391414
req,
392415
Bytes::from(serde_json::to_vec(&json).unwrap()),
393416
HashMap::default(),
417+
None
394418
)
395419
.is_err())
396420
}
@@ -419,6 +443,7 @@ mod tests {
419443
req,
420444
Bytes::from(serde_json::to_vec(&json).unwrap()),
421445
HashMap::default(),
446+
None,
422447
)
423448
.unwrap();
424449

@@ -472,6 +497,7 @@ mod tests {
472497
req,
473498
Bytes::from(serde_json::to_vec(&json).unwrap()),
474499
HashMap::default(),
500+
None,
475501
)
476502
.unwrap();
477503

@@ -521,8 +547,13 @@ mod tests {
521547
);
522548
let req = TestRequest::default().to_http_request();
523549

524-
let (_, rb, _) =
525-
into_event_batch(req, Bytes::from(serde_json::to_vec(&json).unwrap()), schema).unwrap();
550+
let (_, rb, _) = into_event_batch(
551+
req,
552+
Bytes::from(serde_json::to_vec(&json).unwrap()),
553+
schema,
554+
None,
555+
)
556+
.unwrap();
526557

527558
assert_eq!(rb.num_rows(), 3);
528559
assert_eq!(rb.num_columns(), 6);
@@ -566,6 +597,7 @@ mod tests {
566597
req,
567598
Bytes::from(serde_json::to_vec(&json).unwrap()),
568599
HashMap::default(),
600+
None,
569601
)
570602
.unwrap();
571603

@@ -612,10 +644,13 @@ mod tests {
612644
.into_iter(),
613645
);
614646

615-
assert!(
616-
into_event_batch(req, Bytes::from(serde_json::to_vec(&json).unwrap()), schema,)
617-
.is_err()
618-
);
647+
assert!(into_event_batch(
648+
req,
649+
Bytes::from(serde_json::to_vec(&json).unwrap()),
650+
schema,
651+
None
652+
)
653+
.is_err());
619654
}
620655

621656
#[test]
@@ -647,6 +682,7 @@ mod tests {
647682
req,
648683
Bytes::from(serde_json::to_vec(&json).unwrap()),
649684
HashMap::default(),
685+
None,
650686
)
651687
.unwrap();
652688

server/src/handlers/http/logstream.rs

Lines changed: 26 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,18 +18,18 @@
1818

1919
use std::fs;
2020

21-
use actix_web::http::StatusCode;
22-
use actix_web::{web, HttpRequest, Responder};
23-
use chrono::Utc;
24-
use serde_json::Value;
25-
2621
use self::error::{CreateStreamError, StreamError};
2722
use crate::alerts::Alerts;
23+
use crate::handlers::TIME_PARTITION_KEY;
2824
use crate::metadata::STREAM_INFO;
2925
use crate::option::CONFIG;
3026
use crate::storage::{retention::Retention, LogStream, StorageDir};
3127
use crate::{catalog, event, stats};
3228
use crate::{metadata, validator};
29+
use actix_web::http::StatusCode;
30+
use actix_web::{web, HttpRequest, Responder};
31+
use chrono::Utc;
32+
use serde_json::Value;
3333

3434
pub async fn delete(req: HttpRequest) -> Result<impl Responder, StreamError> {
3535
let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap();
@@ -108,6 +108,16 @@ pub async fn get_alert(req: HttpRequest) -> Result<impl Responder, StreamError>
108108
}
109109

110110
pub async fn put_stream(req: HttpRequest) -> Result<impl Responder, StreamError> {
111+
let time_partition = if let Some((_, time_partition_name)) = req
112+
.headers()
113+
.iter()
114+
.find(|&(key, _)| key == TIME_PARTITION_KEY)
115+
{
116+
time_partition_name.to_str().unwrap()
117+
} else {
118+
""
119+
};
120+
111121
let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap();
112122

113123
if metadata::STREAM_INFO.stream_exists(&stream_name) {
@@ -119,7 +129,7 @@ pub async fn put_stream(req: HttpRequest) -> Result<impl Responder, StreamError>
119129
status: StatusCode::BAD_REQUEST,
120130
});
121131
} else {
122-
create_stream(stream_name).await?;
132+
create_stream(stream_name, time_partition).await?;
123133
}
124134

125135
Ok(("log stream created", StatusCode::OK))
@@ -326,13 +336,16 @@ fn remove_id_from_alerts(value: &mut Value) {
326336
}
327337
}
328338

329-
pub async fn create_stream(stream_name: String) -> Result<(), CreateStreamError> {
339+
pub async fn create_stream(
340+
stream_name: String,
341+
time_partition: &str,
342+
) -> Result<(), CreateStreamError> {
330343
// fail to proceed if invalid stream name
331344
validator::stream_name(&stream_name)?;
332345

333346
// Proceed to create log stream if it doesn't exist
334347
let storage = CONFIG.storage().get_object_store();
335-
if let Err(err) = storage.create_stream(&stream_name).await {
348+
if let Err(err) = storage.create_stream(&stream_name, time_partition).await {
336349
return Err(CreateStreamError::Storage { stream_name, err });
337350
}
338351

@@ -344,7 +357,11 @@ pub async fn create_stream(stream_name: String) -> Result<(), CreateStreamError>
344357
let stream_meta = stream_meta.unwrap();
345358
let created_at = stream_meta.created_at;
346359

347-
metadata::STREAM_INFO.add_stream(stream_name.to_string(), created_at);
360+
metadata::STREAM_INFO.add_stream(
361+
stream_name.to_string(),
362+
created_at,
363+
time_partition.to_string(),
364+
);
348365

349366
Ok(())
350367
}

server/src/metadata.rs

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,13 @@ impl StreamInfo {
9292
.map(|metadata| metadata.cache_enabled)
9393
}
9494

95+
pub fn get_time_partition(&self, stream_name: &str) -> Result<Option<String>, MetadataError> {
96+
let map = self.read().expect(LOCK_EXPECT);
97+
map.get(stream_name)
98+
.ok_or(MetadataError::StreamMetaNotFound(stream_name.to_string()))
99+
.map(|metadata| metadata.time_partition.clone())
100+
}
101+
95102
pub fn set_stream_cache(&self, stream_name: &str, enable: bool) -> Result<(), MetadataError> {
96103
let mut map = self.write().expect(LOCK_EXPECT);
97104
let stream = map
@@ -143,14 +150,19 @@ impl StreamInfo {
143150
})
144151
}
145152

146-
pub fn add_stream(&self, stream_name: String, created_at: String) {
153+
pub fn add_stream(&self, stream_name: String, created_at: String, time_partition: String) {
147154
let mut map = self.write().expect(LOCK_EXPECT);
148155
let metadata = LogStreamMetadata {
149156
created_at: if created_at.is_empty() {
150157
Local::now().to_rfc3339()
151158
} else {
152159
created_at
153160
},
161+
time_partition: if time_partition.is_empty() {
162+
None
163+
} else {
164+
Some(time_partition)
165+
},
154166
..Default::default()
155167
};
156168
map.insert(stream_name, metadata);

server/src/query/filter_optimizer.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -117,9 +117,7 @@ impl FilterOptimizerRule {
117117
Expr::Column(Column::from_name(&self.column)).like(lit(format!("%{}%", literal)))
118118
});
119119

120-
let Some(mut filter_expr) = patterns.next() else {
121-
return None;
122-
};
120+
let mut filter_expr = patterns.next()?;
123121
for expr in patterns {
124122
filter_expr = or(filter_expr, expr)
125123
}

server/src/rbac/map.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -159,9 +159,7 @@ impl Sessions {
159159

160160
// remove a specific session
161161
pub fn remove_session(&mut self, key: &SessionKey) -> Option<String> {
162-
let Some((user, _)) = self.active_sessions.remove(key) else {
163-
return None;
164-
};
162+
let (user, _) = self.active_sessions.remove(key)?;
165163

166164
if let Some(items) = self.user_sessions.get_mut(&user) {
167165
items.retain(|(session, _)| session != key);

server/src/storage/object_storage.rs

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -106,11 +106,20 @@ pub trait ObjectStorage: Sync + 'static {
106106
Ok(())
107107
}
108108

109-
async fn create_stream(&self, stream_name: &str) -> Result<(), ObjectStorageError> {
109+
async fn create_stream(
110+
&self,
111+
stream_name: &str,
112+
time_partition: &str,
113+
) -> Result<(), ObjectStorageError> {
110114
let mut format = ObjectStoreFormat::default();
111115
format.set_id(CONFIG.parseable.username.clone());
112116
let permission = Permisssion::new(CONFIG.parseable.username.clone());
113117
format.permissions = vec![permission];
118+
if time_partition.is_empty() {
119+
format.time_partition = None;
120+
} else {
121+
format.time_partition = Some(time_partition.to_string());
122+
}
114123
let format_json = to_bytes(&format);
115124
self.put_object(&schema_path(stream_name), to_bytes(&Schema::empty()))
116125
.await?;
@@ -325,8 +334,11 @@ pub trait ObjectStorage: Sync + 'static {
325334
let cache_enabled = STREAM_INFO
326335
.cache_enabled(stream)
327336
.map_err(|err| ObjectStorageError::UnhandledError(Box::new(err)))?;
337+
let time_partition = STREAM_INFO
338+
.get_time_partition(stream)
339+
.map_err(|err| ObjectStorageError::UnhandledError(Box::new(err)))?;
328340
let dir = StorageDir::new(stream);
329-
let schema = convert_disk_files_to_parquet(stream, &dir)
341+
let schema = convert_disk_files_to_parquet(stream, &dir, time_partition)
330342
.map_err(|err| ObjectStorageError::UnhandledError(Box::new(err)))?;
331343

332344
if let Some(schema) = schema {

0 commit comments

Comments
 (0)