Skip to content

Commit bc730ee

Browse files
feat: allow historical data ingestion based on user defined time (#683)
This PR adds enhancement to use a user provided timestamp for partition in ingesting logs instead of using server time. User needs to add custom header X-P-Time-Partition (optional) at stream creation api to allow ingestion/query using timestamp column from the log data instead of server time p_timestamp This is time_partition field name is stored in stream.json and in memory STREAM_INFO in ingest api. Server checks if timestamp column name exists in the log event, if not, throw exception. Also, checks if timestamp value can be parsed into datetime, if not, throw exception arrow file name gets the date, hr, mm from the timestamp field (if defined in stream) else file name gets the date, hr, mm from the server time parquet file name gets a random number attached to it. This is because a lot of log data can have same date, hr, mm value of the timestamp field and with this random number, parquet will not get overwritten in the console, query from and to date will be matched against the value of the timestamp column of the log data (if defined in the stream), else from and to date will be matched against the p_timestamp column. Fixes #671 Fixes #685
1 parent 3f9a2c5 commit bc730ee

File tree

17 files changed

+436
-257
lines changed

17 files changed

+436
-257
lines changed

server/src/catalog.rs

Lines changed: 66 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ use relative_path::RelativePathBuf;
2323

2424
use crate::{
2525
catalog::manifest::Manifest,
26+
event::DEFAULT_TIMESTAMP_KEY,
2627
query::PartialTimeFilter,
2728
storage::{ObjectStorage, ObjectStorageError},
2829
};
@@ -69,25 +70,46 @@ impl ManifestFile for manifest::File {
6970
}
7071
}
7172

72-
fn get_file_bounds(file: &manifest::File) -> (DateTime<Utc>, DateTime<Utc>) {
73-
match file
74-
.columns()
75-
.iter()
76-
.find(|col| col.name == "p_timestamp")
77-
.unwrap()
78-
.stats
79-
.clone()
80-
.unwrap()
81-
{
82-
column::TypedStatistics::Int(stats) => (
83-
NaiveDateTime::from_timestamp_millis(stats.min)
84-
.unwrap()
85-
.and_utc(),
86-
NaiveDateTime::from_timestamp_millis(stats.max)
87-
.unwrap()
88-
.and_utc(),
89-
),
90-
_ => unreachable!(),
73+
fn get_file_bounds(
74+
file: &manifest::File,
75+
partition_column: String,
76+
) -> (DateTime<Utc>, DateTime<Utc>) {
77+
if partition_column == DEFAULT_TIMESTAMP_KEY {
78+
match file
79+
.columns()
80+
.iter()
81+
.find(|col| col.name == partition_column)
82+
.unwrap()
83+
.stats
84+
.as_ref()
85+
.unwrap()
86+
{
87+
column::TypedStatistics::Int(stats) => (
88+
NaiveDateTime::from_timestamp_millis(stats.min)
89+
.unwrap()
90+
.and_utc(),
91+
NaiveDateTime::from_timestamp_millis(stats.max)
92+
.unwrap()
93+
.and_utc(),
94+
),
95+
_ => unreachable!(),
96+
}
97+
} else {
98+
match file
99+
.columns()
100+
.iter()
101+
.find(|col| col.name == partition_column)
102+
.unwrap()
103+
.stats
104+
.as_ref()
105+
.unwrap()
106+
{
107+
column::TypedStatistics::String(stats) => (
108+
stats.min.parse::<DateTime<Utc>>().unwrap(),
109+
stats.max.parse::<DateTime<Utc>>().unwrap(),
110+
),
111+
_ => unreachable!(),
112+
}
91113
}
92114
}
93115

@@ -97,10 +119,19 @@ pub async fn update_snapshot(
97119
change: manifest::File,
98120
) -> Result<(), ObjectStorageError> {
99121
// get current snapshot
100-
let mut meta = storage.get_snapshot(stream_name).await?;
101-
let manifests = &mut meta.manifest_list;
102-
103-
let (lower_bound, _) = get_file_bounds(&change);
122+
let mut meta = storage.get_object_store_format(stream_name).await?;
123+
let manifests = &mut meta.snapshot.manifest_list;
124+
let time_partition = meta.time_partition;
125+
let lower_bound = match time_partition {
126+
Some(time_partition) => {
127+
let (lower_bound, _) = get_file_bounds(&change, time_partition);
128+
lower_bound
129+
}
130+
None => {
131+
let (lower_bound, _) = get_file_bounds(&change, DEFAULT_TIMESTAMP_KEY.to_string());
132+
lower_bound
133+
}
134+
};
104135
let pos = manifests.iter().position(|item| {
105136
item.time_lower_bound <= lower_bound && lower_bound < item.time_upper_bound
106137
});
@@ -109,16 +140,18 @@ pub async fn update_snapshot(
109140
// This updates an existing file so there is no need to create a snapshot entry.
110141
if let Some(pos) = pos {
111142
let info = &mut manifests[pos];
112-
let path = partition_path(stream_name, info.time_lower_bound, info.time_upper_bound);
113-
let Some(mut manifest) = storage.get_manifest(&path).await? else {
143+
let manifest_path =
144+
partition_path(stream_name, info.time_lower_bound, info.time_upper_bound);
145+
146+
let Some(mut manifest) = storage.get_manifest(&manifest_path).await? else {
114147
return Err(ObjectStorageError::UnhandledError(
115148
"Manifest found in snapshot but not in object-storage"
116149
.to_string()
117150
.into(),
118151
));
119152
};
120153
manifest.apply_change(change);
121-
storage.put_manifest(&path, manifest).await?;
154+
storage.put_manifest(&manifest_path, manifest).await?;
122155
} else {
123156
let lower_bound = lower_bound.date_naive().and_time(NaiveTime::MIN).and_utc();
124157
let upper_bound = lower_bound
@@ -148,7 +181,7 @@ pub async fn update_snapshot(
148181
time_upper_bound: upper_bound,
149182
};
150183
manifests.push(new_snapshot_entriy);
151-
storage.put_snapshot(stream_name, meta).await?;
184+
storage.put_snapshot(stream_name, meta.snapshot).await?;
152185
}
153186

154187
Ok(())
@@ -160,13 +193,13 @@ pub async fn remove_manifest_from_snapshot(
160193
dates: Vec<String>,
161194
) -> Result<(), ObjectStorageError> {
162195
// get current snapshot
163-
let mut meta = storage.get_snapshot(stream_name).await?;
164-
let manifests = &mut meta.manifest_list;
196+
let mut meta = storage.get_object_store_format(stream_name).await?;
197+
let manifests = &mut meta.snapshot.manifest_list;
165198

166199
// Filter out items whose manifest_path contains any of the dates_to_delete
167200
manifests.retain(|item| !dates.iter().any(|date| item.manifest_path.contains(date)));
168201

169-
storage.put_snapshot(stream_name, meta).await?;
202+
storage.put_snapshot(stream_name, meta.snapshot).await?;
170203
Ok(())
171204
}
172205

@@ -175,8 +208,8 @@ pub async fn get_first_event(
175208
stream_name: &str,
176209
) -> Result<Option<String>, ObjectStorageError> {
177210
// get current snapshot
178-
let mut meta = storage.get_snapshot(stream_name).await?;
179-
let manifests = &mut meta.manifest_list;
211+
let mut meta = storage.get_object_store_format(stream_name).await?;
212+
let manifests = &mut meta.snapshot.manifest_list;
180213

181214
if manifests.is_empty() {
182215
log::info!("No manifest found for stream {stream_name}");
@@ -199,7 +232,7 @@ pub async fn get_first_event(
199232
};
200233

201234
if let Some(first_event) = manifest.files.first() {
202-
let (lower_bound, _) = get_file_bounds(first_event);
235+
let (lower_bound, _) = get_file_bounds(first_event, DEFAULT_TIMESTAMP_KEY.to_string());
203236
let first_event_at = lower_bound.with_timezone(&Local).to_rfc3339();
204237
return Ok(Some(first_event_at));
205238
}

server/src/catalog/manifest.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,6 @@ pub fn create_from_parquet_file(
112112
let columns = column_statistics(row_groups);
113113
manifest_file.columns = columns.into_values().collect();
114114
let mut sort_orders = sort_order(row_groups);
115-
116115
if let Some(last_sort_order) = sort_orders.pop() {
117116
if sort_orders
118117
.into_iter()
@@ -155,7 +154,7 @@ fn sort_order(
155154
})
156155
.collect_vec();
157156

158-
sort_orders.push(sort_order)
157+
sort_orders.push(sort_order);
159158
}
160159
sort_orders
161160
}

server/src/event.rs

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,10 @@ use itertools::Itertools;
2626

2727
use std::sync::Arc;
2828

29-
use crate::metadata;
30-
3129
use self::error::EventError;
3230
pub use self::writer::STREAM_WRITERS;
31+
use crate::metadata;
32+
use chrono::NaiveDateTime;
3333

3434
pub const DEFAULT_TIMESTAMP_KEY: &str = "p_timestamp";
3535
pub const DEFAULT_TAGS_KEY: &str = "p_tags";
@@ -42,6 +42,7 @@ pub struct Event {
4242
pub origin_format: &'static str,
4343
pub origin_size: u64,
4444
pub is_first_event: bool,
45+
pub parsed_timestamp: NaiveDateTime,
4546
}
4647

4748
// Events holds the schema related to a each event for a single log stream
@@ -54,7 +55,12 @@ impl Event {
5455
commit_schema(&self.stream_name, self.rb.schema())?;
5556
}
5657

57-
Self::process_event(&self.stream_name, &key, self.rb.clone())?;
58+
Self::process_event(
59+
&self.stream_name,
60+
&key,
61+
self.rb.clone(),
62+
self.parsed_timestamp,
63+
)?;
5864

5965
metadata::STREAM_INFO.update_stats(
6066
&self.stream_name,
@@ -81,8 +87,9 @@ impl Event {
8187
stream_name: &str,
8288
schema_key: &str,
8389
rb: RecordBatch,
90+
parsed_timestamp: NaiveDateTime,
8491
) -> Result<(), EventError> {
85-
STREAM_WRITERS.append_to_local(stream_name, schema_key, rb)?;
92+
STREAM_WRITERS.append_to_local(stream_name, schema_key, rb, parsed_timestamp)?;
8693
Ok(())
8794
}
8895
}

server/src/event/format.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,14 +53,14 @@ pub trait EventFormat: Sized {
5353
return Err(anyhow!("field {} is a reserved field", DEFAULT_TAGS_KEY));
5454
};
5555

56-
if get_field(&schema, DEFAULT_TAGS_KEY).is_some() {
56+
if get_field(&schema, DEFAULT_METADATA_KEY).is_some() {
5757
return Err(anyhow!(
5858
"field {} is a reserved field",
5959
DEFAULT_METADATA_KEY
6060
));
6161
};
6262

63-
if get_field(&schema, DEFAULT_TAGS_KEY).is_some() {
63+
if get_field(&schema, DEFAULT_TIMESTAMP_KEY).is_some() {
6464
return Err(anyhow!(
6565
"field {} is a reserved field",
6666
DEFAULT_TIMESTAMP_KEY

server/src/event/writer.rs

Lines changed: 18 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ use crate::utils;
3030
use self::{errors::StreamWriterError, file_writer::FileWriter, mem_writer::MemWriter};
3131
use arrow_array::{RecordBatch, TimestampMillisecondArray};
3232
use arrow_schema::Schema;
33-
use chrono::Utc;
33+
use chrono::{NaiveDateTime, Utc};
3434
use derive_more::{Deref, DerefMut};
3535
use once_cell::sync::Lazy;
3636

@@ -48,6 +48,7 @@ impl Writer {
4848
stream_name: &str,
4949
schema_key: &str,
5050
rb: RecordBatch,
51+
parsed_timestamp: NaiveDateTime,
5152
) -> Result<(), StreamWriterError> {
5253
let rb = utils::arrow::replace_columns(
5354
rb.schema(),
@@ -56,7 +57,8 @@ impl Writer {
5657
&[Arc::new(get_timestamp_array(rb.num_rows()))],
5758
);
5859

59-
self.disk.push(stream_name, schema_key, &rb)?;
60+
self.disk
61+
.push(stream_name, schema_key, &rb, parsed_timestamp)?;
6062
self.mem.push(schema_key, rb);
6163
Ok(())
6264
}
@@ -72,29 +74,34 @@ impl WriterTable {
7274
stream_name: &str,
7375
schema_key: &str,
7476
record: RecordBatch,
77+
parsed_timestamp: NaiveDateTime,
7578
) -> Result<(), StreamWriterError> {
7679
let hashmap_guard = self.read().unwrap();
7780

7881
match hashmap_guard.get(stream_name) {
7982
Some(stream_writer) => {
80-
stream_writer
81-
.lock()
82-
.unwrap()
83-
.push(stream_name, schema_key, record)?;
83+
stream_writer.lock().unwrap().push(
84+
stream_name,
85+
schema_key,
86+
record,
87+
parsed_timestamp,
88+
)?;
8489
}
8590
None => {
8691
drop(hashmap_guard);
8792
let mut map = self.write().unwrap();
8893
// check for race condition
8994
// if map contains entry then just
9095
if let Some(writer) = map.get(stream_name) {
91-
writer
92-
.lock()
93-
.unwrap()
94-
.push(stream_name, schema_key, record)?;
96+
writer.lock().unwrap().push(
97+
stream_name,
98+
schema_key,
99+
record,
100+
parsed_timestamp,
101+
)?;
95102
} else {
96103
let mut writer = Writer::default();
97-
writer.push(stream_name, schema_key, record)?;
104+
writer.push(stream_name, schema_key, record, parsed_timestamp)?;
98105
map.insert(stream_name.to_owned(), Mutex::new(writer));
99106
}
100107
}

server/src/event/writer/file_writer.rs

Lines changed: 16 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,13 @@
1717
*
1818
*/
1919

20-
use std::collections::HashMap;
21-
use std::fs::{File, OpenOptions};
22-
use std::path::PathBuf;
23-
2420
use arrow_array::RecordBatch;
2521
use arrow_ipc::writer::StreamWriter;
22+
use chrono::NaiveDateTime;
2623
use derive_more::{Deref, DerefMut};
24+
use std::collections::HashMap;
25+
use std::fs::{File, OpenOptions};
26+
use std::path::PathBuf;
2727

2828
use crate::storage::staging::StorageDir;
2929

@@ -44,27 +44,17 @@ impl FileWriter {
4444
stream_name: &str,
4545
schema_key: &str,
4646
record: &RecordBatch,
47+
parsed_timestamp: NaiveDateTime,
4748
) -> Result<(), StreamWriterError> {
48-
match self.get_mut(schema_key) {
49-
Some(writer) => {
50-
writer
51-
.writer
52-
.write(record)
53-
.map_err(StreamWriterError::Writer)?;
54-
}
55-
// entry is not present thus we create it
56-
None => {
57-
// this requires mutable borrow of the map so we drop this read lock and wait for write lock
58-
let (path, writer) = init_new_stream_writer_file(stream_name, schema_key, record)?;
59-
self.insert(
60-
schema_key.to_owned(),
61-
ArrowWriter {
62-
file_path: path,
63-
writer,
64-
},
65-
);
66-
}
67-
};
49+
let (path, writer) =
50+
init_new_stream_writer_file(stream_name, schema_key, record, parsed_timestamp)?;
51+
self.insert(
52+
schema_key.to_owned(),
53+
ArrowWriter {
54+
file_path: path,
55+
writer,
56+
},
57+
);
6858

6959
Ok(())
7060
}
@@ -80,10 +70,10 @@ fn init_new_stream_writer_file(
8070
stream_name: &str,
8171
schema_key: &str,
8272
record: &RecordBatch,
73+
parsed_timestamp: NaiveDateTime,
8374
) -> Result<(PathBuf, StreamWriter<std::fs::File>), StreamWriterError> {
8475
let dir = StorageDir::new(stream_name);
85-
let path = dir.path_by_current_time(schema_key);
86-
76+
let path = dir.path_by_current_time(schema_key, parsed_timestamp);
8777
std::fs::create_dir_all(dir.data_path)?;
8878

8979
let file = OpenOptions::new().create(true).append(true).open(&path)?;
@@ -94,6 +84,5 @@ fn init_new_stream_writer_file(
9484
stream_writer
9585
.write(record)
9686
.map_err(StreamWriterError::Writer)?;
97-
9887
Ok((path, stream_writer))
9988
}

0 commit comments

Comments
 (0)