From 0905dfbd3749bde9bc77c83b3dce8b2b4d457cb0 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Fri, 14 Mar 2025 16:11:57 +0530 Subject: [PATCH 01/13] refactor: `DiskWriter` --- src/parseable/staging/writer.rs | 49 +++++++++++++++++++++++++++++++-- src/parseable/streams.rs | 10 ++----- 2 files changed, 48 insertions(+), 11 deletions(-) diff --git a/src/parseable/staging/writer.rs b/src/parseable/staging/writer.rs index c43252f14..1e299068a 100644 --- a/src/parseable/staging/writer.rs +++ b/src/parseable/staging/writer.rs @@ -19,7 +19,9 @@ use std::{ collections::{HashMap, HashSet}, - fs::File, + fs::{File, OpenOptions}, + io::BufWriter, + path::PathBuf, sync::Arc, }; @@ -28,13 +30,54 @@ use arrow_ipc::writer::StreamWriter; use arrow_schema::Schema; use arrow_select::concat::concat_batches; use itertools::Itertools; +use tracing::error; -use crate::utils::arrow::adapt_batch; +use crate::{parseable::ARROW_FILE_EXTENSION, utils::arrow::adapt_batch}; + +use super::StagingError; #[derive(Default)] pub struct Writer { pub mem: MemWriter<16384>, - pub disk: HashMap>, + pub disk: HashMap, +} + +pub struct DiskWriter { + pub inner: StreamWriter>, + pub path: PathBuf, +} + +impl DiskWriter { + pub fn new(path: PathBuf, schema: &Schema) -> Result { + let file = OpenOptions::new().create(true).append(true).open(&path)?; + + let inner = StreamWriter::try_new_buffered(file, schema)?; + + Ok(Self { inner, path }) + } + + pub fn write(&mut self, rb: &RecordBatch) -> Result<(), StagingError> { + self.inner.write(rb).map_err(StagingError::Arrow) + } + + pub fn finish(&mut self) { + if let Err(err) = self.inner.finish() { + error!("Couldn't finish arrow file {:?}, error = {err}", self.path); + return; + } + + let mut arrow_path = self.path.to_owned(); + arrow_path.set_extension(ARROW_FILE_EXTENSION); + if let Err(err) = std::fs::rename(&self.path, &arrow_path) { + error!("Couldn't rename file {:?}, error = {err}", self.path); + } + } +} + +impl Drop for DiskWriter { + fn drop(&mut self) { + self.finish(); + } } /// Structure to keep recordbatches in memory. diff --git a/src/parseable/streams.rs b/src/parseable/streams.rs index 0a1b0ba96..d978d3514 100644 --- a/src/parseable/streams.rs +++ b/src/parseable/streams.rs @@ -28,7 +28,6 @@ use std::{ }; use arrow_array::RecordBatch; -use arrow_ipc::writer::StreamWriter; use arrow_schema::{Field, Fields, Schema}; use chrono::{NaiveDateTime, Timelike}; use derive_more::{Deref, DerefMut}; @@ -59,7 +58,7 @@ use crate::{ use super::{ staging::{ reader::{MergedRecordReader, MergedReverseRecordReader}, - writer::Writer, + writer::{DiskWriter, Writer}, StagingError, }, LogStream, ARROW_FILE_EXTENSION, @@ -133,12 +132,7 @@ impl Stream { ); std::fs::create_dir_all(&self.data_path)?; - let file = OpenOptions::new() - .create(true) - .append(true) - .open(&file_path)?; - - let mut writer = StreamWriter::try_new(file, &record.schema()) + let mut writer = DiskWriter::new(file_path, &record.schema()) .expect("File and RecordBatch both are checked"); writer.write(record)?; From cd0e1dd76938bcb453c49de11c88750b5755c4f9 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Fri, 14 Mar 2025 16:22:40 +0530 Subject: [PATCH 02/13] fix: don't keep the mapping after finish --- src/parseable/streams.rs | 17 +++++------------ 1 file changed, 5 insertions(+), 12 deletions(-) diff --git a/src/parseable/streams.rs b/src/parseable/streams.rs index d978d3514..81eb9b805 100644 --- a/src/parseable/streams.rs +++ b/src/parseable/streams.rs @@ -358,18 +358,11 @@ impl Stream { } pub fn flush(&self) { - let mut disk_writers = { - let mut writer = self.writer.lock().unwrap(); - // Flush memory - writer.mem.clear(); - // Take schema -> disk writer mapping - std::mem::take(&mut writer.disk) - }; - - // Flush disk - for writer in disk_writers.values_mut() { - _ = writer.finish(); - } + let mut writer = self.writer.lock().unwrap(); + // Flush memory + writer.mem.clear(); + // Drop schema -> disk writer mapping, triggers flush to disk + writer.disk.drain(); } fn parquet_writer_props( From 2a1ac090f2139838fe1074c5bbc311677a06afc8 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Fri, 14 Mar 2025 17:38:45 +0530 Subject: [PATCH 03/13] doc: working --- src/parseable/staging/writer.rs | 7 +++++-- src/parseable/streams.rs | 2 +- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/src/parseable/staging/writer.rs b/src/parseable/staging/writer.rs index 1e299068a..efe4db571 100644 --- a/src/parseable/staging/writer.rs +++ b/src/parseable/staging/writer.rs @@ -48,18 +48,21 @@ pub struct DiskWriter { } impl DiskWriter { - pub fn new(path: PathBuf, schema: &Schema) -> Result { + /// Try to create a file to stream arrows into + pub fn try_new(path: impl Into, schema: &Schema) -> Result { + let path = path.into(); let file = OpenOptions::new().create(true).append(true).open(&path)?; - let inner = StreamWriter::try_new_buffered(file, schema)?; Ok(Self { inner, path }) } + /// Write a single recordbatch into file pub fn write(&mut self, rb: &RecordBatch) -> Result<(), StagingError> { self.inner.write(rb).map_err(StagingError::Arrow) } + /// Write the continuation bytes and mark the file as done, rename to `.data.arrows` pub fn finish(&mut self) { if let Err(err) = self.inner.finish() { error!("Couldn't finish arrow file {:?}, error = {err}", self.path); diff --git a/src/parseable/streams.rs b/src/parseable/streams.rs index 81eb9b805..6edce6709 100644 --- a/src/parseable/streams.rs +++ b/src/parseable/streams.rs @@ -132,7 +132,7 @@ impl Stream { ); std::fs::create_dir_all(&self.data_path)?; - let mut writer = DiskWriter::new(file_path, &record.schema()) + let mut writer = DiskWriter::try_new(file_path, &record.schema()) .expect("File and RecordBatch both are checked"); writer.write(record)?; From fd6213554581b12e027f5b49cb5d44cfa82c2ede Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Fri, 14 Mar 2025 18:14:01 +0530 Subject: [PATCH 04/13] test: use `DiskWriter` --- src/parseable/mod.rs | 3 +++ src/parseable/staging/reader.rs | 15 +++++++-------- src/parseable/staging/writer.rs | 10 +++++++--- 3 files changed, 17 insertions(+), 11 deletions(-) diff --git a/src/parseable/mod.rs b/src/parseable/mod.rs index 25558a46d..195724973 100644 --- a/src/parseable/mod.rs +++ b/src/parseable/mod.rs @@ -61,6 +61,9 @@ mod streams; /// File extension for arrow files in staging const ARROW_FILE_EXTENSION: &str = "arrows"; +/// File extension for incomplete arrow files +const PART_FILE_EXTENSION: &str = "part"; + /// Name of a Stream /// NOTE: this used to be a struct, flattened out for simplicity pub type LogStream = String; diff --git a/src/parseable/staging/reader.rs b/src/parseable/staging/reader.rs index 5c9b6b5be..2435d0f28 100644 --- a/src/parseable/staging/reader.rs +++ b/src/parseable/staging/reader.rs @@ -322,7 +322,6 @@ fn find_limit_and_type( #[cfg(test)] mod tests { use std::{ - fs::File, io::{self, Cursor, Read}, path::Path, sync::Arc, @@ -338,7 +337,10 @@ mod tests { use arrow_schema::{DataType, Field, Schema}; use temp_dir::TempDir; - use crate::parseable::staging::reader::{MergedReverseRecordReader, OffsetReader}; + use crate::parseable::staging::{ + reader::{MergedReverseRecordReader, OffsetReader}, + writer::DiskWriter, + }; use super::get_reverse_reader; @@ -482,15 +484,12 @@ mod tests { schema: &Arc, batches: &[RecordBatch], ) -> io::Result<()> { - let file = File::create(path)?; - let mut writer = - StreamWriter::try_new(file, schema).expect("Failed to create StreamWriter"); + let mut writer = DiskWriter::try_new(path, schema).expect("Failed to create StreamWriter"); for batch in batches { writer.write(batch).expect("Failed to write batch"); } - writer.finish().expect("Failed to finalize writer"); Ok(()) } @@ -524,7 +523,7 @@ mod tests { #[test] fn test_merged_reverse_record_reader() -> io::Result<()> { let dir = TempDir::new().unwrap(); - let file_path = dir.path().join("test.arrow"); + let file_path = dir.path().join("test.data.arrows"); // Create a schema let schema = Arc::new(Schema::new(vec![ @@ -627,7 +626,7 @@ mod tests { #[test] fn test_get_reverse_reader_single_message() -> io::Result<()> { let dir = TempDir::new().unwrap(); - let file_path = dir.path().join("test_single.arrow"); + let file_path = dir.path().join("test_single.data.arrows"); // Create a schema let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)])); diff --git a/src/parseable/staging/writer.rs b/src/parseable/staging/writer.rs index efe4db571..c4c7236d2 100644 --- a/src/parseable/staging/writer.rs +++ b/src/parseable/staging/writer.rs @@ -32,7 +32,10 @@ use arrow_select::concat::concat_batches; use itertools::Itertools; use tracing::error; -use crate::{parseable::ARROW_FILE_EXTENSION, utils::arrow::adapt_batch}; +use crate::{ + parseable::{ARROW_FILE_EXTENSION, PART_FILE_EXTENSION}, + utils::arrow::adapt_batch, +}; use super::StagingError; @@ -50,8 +53,9 @@ pub struct DiskWriter { impl DiskWriter { /// Try to create a file to stream arrows into pub fn try_new(path: impl Into, schema: &Schema) -> Result { - let path = path.into(); - let file = OpenOptions::new().create(true).append(true).open(&path)?; + let mut path = path.into(); + path.set_extension(PART_FILE_EXTENSION); + let file = OpenOptions::new().write(true).create(true).open(&path)?; let inner = StreamWriter::try_new_buffered(file, schema)?; Ok(Self { inner, path }) From 35682079ef5c2e393381f70e9ed5ee8efb057a75 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Fri, 14 Mar 2025 18:20:54 +0530 Subject: [PATCH 05/13] fix: never append --- src/parseable/staging/writer.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/parseable/staging/writer.rs b/src/parseable/staging/writer.rs index c4c7236d2..745a385ae 100644 --- a/src/parseable/staging/writer.rs +++ b/src/parseable/staging/writer.rs @@ -55,7 +55,11 @@ impl DiskWriter { pub fn try_new(path: impl Into, schema: &Schema) -> Result { let mut path = path.into(); path.set_extension(PART_FILE_EXTENSION); - let file = OpenOptions::new().write(true).create(true).open(&path)?; + let file = OpenOptions::new() + .write(true) + .truncate(true) + .create(true) + .open(&path)?; let inner = StreamWriter::try_new_buffered(file, schema)?; Ok(Self { inner, path }) From fd82d8c73e85381041733019e1017c1945bc2b13 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Fri, 14 Mar 2025 18:21:03 +0530 Subject: [PATCH 06/13] refactor: don't expose --- src/parseable/staging/writer.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/parseable/staging/writer.rs b/src/parseable/staging/writer.rs index 745a385ae..83dedeb95 100644 --- a/src/parseable/staging/writer.rs +++ b/src/parseable/staging/writer.rs @@ -46,8 +46,8 @@ pub struct Writer { } pub struct DiskWriter { - pub inner: StreamWriter>, - pub path: PathBuf, + inner: StreamWriter>, + path: PathBuf, } impl DiskWriter { From 8706176e04286b8c86a559ccf1ad56b6dcc76ef6 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Fri, 14 Mar 2025 23:28:08 +0530 Subject: [PATCH 07/13] refactor: retire `DiskWriter::finish` --- src/parseable/staging/writer.rs | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/src/parseable/staging/writer.rs b/src/parseable/staging/writer.rs index 83dedeb95..8ec25f755 100644 --- a/src/parseable/staging/writer.rs +++ b/src/parseable/staging/writer.rs @@ -69,9 +69,11 @@ impl DiskWriter { pub fn write(&mut self, rb: &RecordBatch) -> Result<(), StagingError> { self.inner.write(rb).map_err(StagingError::Arrow) } +} +impl Drop for DiskWriter { /// Write the continuation bytes and mark the file as done, rename to `.data.arrows` - pub fn finish(&mut self) { + fn drop(&mut self) { if let Err(err) = self.inner.finish() { error!("Couldn't finish arrow file {:?}, error = {err}", self.path); return; @@ -85,12 +87,6 @@ impl DiskWriter { } } -impl Drop for DiskWriter { - fn drop(&mut self) { - self.finish(); - } -} - /// Structure to keep recordbatches in memory. /// /// Any new schema is updated in the schema map. From cce0cb61a30556bea4811fcd2106e5ed4513c04c Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Sat, 15 Mar 2025 00:05:35 +0530 Subject: [PATCH 08/13] log: overwritten arrows file --- src/parseable/staging/writer.rs | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/parseable/staging/writer.rs b/src/parseable/staging/writer.rs index 8ec25f755..d2b50f51b 100644 --- a/src/parseable/staging/writer.rs +++ b/src/parseable/staging/writer.rs @@ -30,7 +30,7 @@ use arrow_ipc::writer::StreamWriter; use arrow_schema::Schema; use arrow_select::concat::concat_batches; use itertools::Itertools; -use tracing::error; +use tracing::{error, warn}; use crate::{ parseable::{ARROW_FILE_EXTENSION, PART_FILE_EXTENSION}, @@ -81,6 +81,11 @@ impl Drop for DiskWriter { let mut arrow_path = self.path.to_owned(); arrow_path.set_extension(ARROW_FILE_EXTENSION); + + if arrow_path.exists() { + warn!("File {arrow_path:?} exists and will be overwritten"); + } + if let Err(err) = std::fs::rename(&self.path, &arrow_path) { error!("Couldn't rename file {:?}, error = {err}", self.path); } From 86d236bd41928c7093ecbbc7c85ed5772b2fea04 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Fri, 14 Mar 2025 23:54:57 +0530 Subject: [PATCH 09/13] feat: construct `TimeRange` given granularity --- src/utils/time.rs | 98 ++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 97 insertions(+), 1 deletion(-) diff --git a/src/utils/time.rs b/src/utils/time.rs index c153999f6..103a0a956 100644 --- a/src/utils/time.rs +++ b/src/utils/time.rs @@ -262,6 +262,24 @@ impl TimeRange { } time_bounds } + + /// Returns a time range of `data_granularity` length which incorporates provided timestamp + pub fn granularity_range(timestamp: DateTime, data_granularity: u32) -> Self { + let time = timestamp + .time() + .with_second(0) + .and_then(|time| time.with_nanosecond(0)) + .expect("Within expected time range"); + let timestamp = timestamp.with_time(time).unwrap(); + let block_n = timestamp.minute() / data_granularity; + let block_start = block_n * data_granularity; + let start = timestamp + .with_minute(block_start) + .expect("Within minute range"); + let end = start + TimeDelta::minutes(data_granularity as i64); + + Self { start, end } + } } /// Represents a minute value (0-59) and provides methods for converting it to a slot range. @@ -323,7 +341,7 @@ impl Minute { mod tests { use super::*; - use chrono::{Duration, SecondsFormat, Utc}; + use chrono::{Duration, SecondsFormat, TimeZone, Utc}; use rstest::*; #[test] @@ -513,4 +531,82 @@ mod tests { fn illegal_slot_granularity() { Minute::try_from(0).unwrap().to_slot(40); } + + #[test] + fn test_granularity_one_minute() { + // Test with 1-minute granularity + let timestamp = Utc.with_ymd_and_hms(2023, 1, 1, 12, 30, 45).unwrap(); + let range = TimeRange::granularity_range(timestamp, 1); + + assert_eq!(range.start.minute(), 30); + assert_eq!(range.end.minute(), 31); + assert_eq!(range.start.hour(), 12); + assert_eq!(range.end.hour(), 12); + } + + #[test] + fn test_granularity_five_minutes() { + // Test with 5-minute granularity + let timestamp = Utc.with_ymd_and_hms(2023, 1, 1, 12, 17, 45).unwrap(); + let range = TimeRange::granularity_range(timestamp, 5); + + // For minute 17, with granularity 5, block_n = 17 / 5 = 3 + // block_start = 3 * 5 = 15 + // block_end = (3 + 1) * 5 = 20 + assert_eq!(range.start.minute(), 15); + assert_eq!(range.end.minute(), 20); + } + + #[test] + fn test_granularity_fifteen_minutes() { + // Test with 15-minute granularity + let timestamp = Utc.with_ymd_and_hms(2023, 1, 1, 12, 29, 0).unwrap(); + let range = TimeRange::granularity_range(timestamp, 15); + + // For minute 29, with granularity 15, block_n = 29 / 15 = 1 + // block_start = 1 * 15 = 15 + // block_end = (1 + 1) * 15 = 30 + assert_eq!(range.start.minute(), 15); + assert_eq!(range.end.minute(), 30); + } + + #[test] + fn test_granularity_thirty_minutes() { + // Test with 30-minute granularity + let timestamp = Utc.with_ymd_and_hms(2023, 1, 1, 12, 31, 0).unwrap(); + let range = TimeRange::granularity_range(timestamp, 30); + + // For minute 31, with granularity 30, block_n = 31 / 30 = 1 + // block_start = 1 * 30 = 30 + // block_end = (1 + 1) * 30 = 60, which should wrap to 0 in the next hour + assert_eq!(range.start.minute(), 30); + assert_eq!(range.end.minute(), 0); + assert_eq!(range.start.hour(), 12); + assert_eq!(range.end.hour(), 13); // Should be next hour + } + + #[test] + fn test_granularity_edge_case() { + // Test edge case where minute is exactly at granularity boundary + let timestamp = Utc.with_ymd_and_hms(2023, 1, 1, 12, 15, 0).unwrap(); + let range = TimeRange::granularity_range(timestamp, 15); + + assert_eq!(range.start.minute(), 15); + assert_eq!(range.end.minute(), 30); + } + + #[test] + fn test_granularity_hour_boundary() { + // Test case where end would exceed hour boundary + let timestamp = Utc.with_ymd_and_hms(2023, 1, 1, 12, 59, 59).unwrap(); + let range = TimeRange::granularity_range(timestamp, 20); + + // For minute 59, block_n = 59 / 20 = 2 + // block_start = 2 * 20 = 40 + // block_end = (2 + 1) * 20 = 60, which should wrap to 0 in the next hour + assert_eq!(range.start.minute(), 40); + assert_eq!(range.end.minute(), 0); + assert_eq!(range.start.hour(), 12); + assert_eq!(range.end.hour(), 13); + } } From dfa9f1c2f6a8d2f181985e3789a2f8a28f7d2c57 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Fri, 14 Mar 2025 23:55:52 +0530 Subject: [PATCH 10/13] feat: `TimeRange::contains` to check presence --- src/utils/time.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/utils/time.rs b/src/utils/time.rs index 103a0a956..5b60b0bde 100644 --- a/src/utils/time.rs +++ b/src/utils/time.rs @@ -280,6 +280,11 @@ impl TimeRange { Self { start, end } } + + /// Returns true if the provided timestamp is within the timerange + pub fn contains(&self, time: DateTime) -> bool { + self.start <= time && self.end > time + } } /// Represents a minute value (0-59) and provides methods for converting it to a slot range. From 1ee5dd01437263049758b0fe2946440971c7ca6f Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Fri, 14 Mar 2025 23:57:38 +0530 Subject: [PATCH 11/13] fix: don't flush current data --- src/parseable/staging/reader.rs | 15 +++++++++++---- src/parseable/staging/writer.rs | 16 +++++++++++++--- src/parseable/streams.rs | 18 +++++++++++------- 3 files changed, 35 insertions(+), 14 deletions(-) diff --git a/src/parseable/staging/reader.rs b/src/parseable/staging/reader.rs index 2435d0f28..372bfa885 100644 --- a/src/parseable/staging/reader.rs +++ b/src/parseable/staging/reader.rs @@ -335,11 +335,16 @@ mod tests { write_message, DictionaryTracker, IpcDataGenerator, IpcWriteOptions, StreamWriter, }; use arrow_schema::{DataType, Field, Schema}; + use chrono::Utc; use temp_dir::TempDir; - use crate::parseable::staging::{ - reader::{MergedReverseRecordReader, OffsetReader}, - writer::DiskWriter, + use crate::{ + parseable::staging::{ + reader::{MergedReverseRecordReader, OffsetReader}, + writer::DiskWriter, + }, + utils::time::TimeRange, + OBJECT_STORE_DATA_GRANULARITY, }; use super::get_reverse_reader; @@ -484,7 +489,9 @@ mod tests { schema: &Arc, batches: &[RecordBatch], ) -> io::Result<()> { - let mut writer = DiskWriter::try_new(path, schema).expect("Failed to create StreamWriter"); + let range = TimeRange::granularity_range(Utc::now(), OBJECT_STORE_DATA_GRANULARITY); + let mut writer = + DiskWriter::try_new(path, schema, range).expect("Failed to create StreamWriter"); for batch in batches { writer.write(batch).expect("Failed to write batch"); diff --git a/src/parseable/staging/writer.rs b/src/parseable/staging/writer.rs index d2b50f51b..6397e13e9 100644 --- a/src/parseable/staging/writer.rs +++ b/src/parseable/staging/writer.rs @@ -29,12 +29,13 @@ use arrow_array::RecordBatch; use arrow_ipc::writer::StreamWriter; use arrow_schema::Schema; use arrow_select::concat::concat_batches; +use chrono::Utc; use itertools::Itertools; use tracing::{error, warn}; use crate::{ parseable::{ARROW_FILE_EXTENSION, PART_FILE_EXTENSION}, - utils::arrow::adapt_batch, + utils::{arrow::adapt_batch, time::TimeRange}, }; use super::StagingError; @@ -48,11 +49,16 @@ pub struct Writer { pub struct DiskWriter { inner: StreamWriter>, path: PathBuf, + range: TimeRange, } impl DiskWriter { /// Try to create a file to stream arrows into - pub fn try_new(path: impl Into, schema: &Schema) -> Result { + pub fn try_new( + path: impl Into, + schema: &Schema, + range: TimeRange, + ) -> Result { let mut path = path.into(); path.set_extension(PART_FILE_EXTENSION); let file = OpenOptions::new() @@ -62,7 +68,11 @@ impl DiskWriter { .open(&path)?; let inner = StreamWriter::try_new_buffered(file, schema)?; - Ok(Self { inner, path }) + Ok(Self { inner, path, range }) + } + + pub fn is_current(&self) -> bool { + self.range.contains(Utc::now()) } /// Write a single recordbatch into file diff --git a/src/parseable/streams.rs b/src/parseable/streams.rs index 6edce6709..4872d1d71 100644 --- a/src/parseable/streams.rs +++ b/src/parseable/streams.rs @@ -29,7 +29,7 @@ use std::{ use arrow_array::RecordBatch; use arrow_schema::{Field, Fields, Schema}; -use chrono::{NaiveDateTime, Timelike}; +use chrono::{NaiveDateTime, Timelike, Utc}; use derive_more::{Deref, DerefMut}; use itertools::Itertools; use parquet::{ @@ -51,7 +51,7 @@ use crate::{ metrics, option::Mode, storage::{object_storage::to_bytes, retention::Retention, StreamType}, - utils::time::Minute, + utils::time::{Minute, TimeRange}, LOCK_EXPECT, OBJECT_STORE_DATA_GRANULARITY, }; @@ -132,7 +132,11 @@ impl Stream { ); std::fs::create_dir_all(&self.data_path)?; - let mut writer = DiskWriter::try_new(file_path, &record.schema()) + let range = TimeRange::granularity_range( + parsed_timestamp.and_local_timezone(Utc).unwrap(), + OBJECT_STORE_DATA_GRANULARITY, + ); + let mut writer = DiskWriter::try_new(file_path, &record.schema(), range) .expect("File and RecordBatch both are checked"); writer.write(record)?; @@ -357,12 +361,12 @@ impl Stream { self.writer.lock().unwrap().mem.clear(); } - pub fn flush(&self) { + pub fn flush(&self, forced: bool) { let mut writer = self.writer.lock().unwrap(); // Flush memory writer.mem.clear(); // Drop schema -> disk writer mapping, triggers flush to disk - writer.disk.drain(); + writer.disk.retain(|_, w| !forced && w.is_current()); } fn parquet_writer_props( @@ -662,7 +666,7 @@ impl Stream { /// First flushes arrows onto disk and then converts the arrow into parquet pub fn flush_and_convert(&self, shutdown_signal: bool) -> Result<(), StagingError> { - self.flush(); + self.flush(shutdown_signal); self.prepare_parquet(shutdown_signal) } @@ -974,7 +978,7 @@ mod tests { StreamType::UserDefined, ) .unwrap(); - staging.flush(); + staging.flush(true); } #[test] From 58b4200713052b5d40b61dc8c8a84cfb9d29e24d Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Sat, 15 Mar 2025 10:47:03 +0530 Subject: [PATCH 12/13] feat: construct path for each request as suggested by @nikhilsinhaparseable --- src/parseable/streams.rs | 37 +++++++++++++++++-------------------- 1 file changed, 17 insertions(+), 20 deletions(-) diff --git a/src/parseable/streams.rs b/src/parseable/streams.rs index 4872d1d71..3ab0d94de 100644 --- a/src/parseable/streams.rs +++ b/src/parseable/streams.rs @@ -119,23 +119,21 @@ impl Stream { ) -> Result<(), StagingError> { let mut guard = self.writer.lock().unwrap(); if self.options.mode != Mode::Query || stream_type == StreamType::Internal { + let filename = + self.filename_by_partition(schema_key, parsed_timestamp, custom_partition_values); match guard.disk.get_mut(schema_key) { Some(writer) => { writer.write(record)?; } None => { // entry is not present thus we create it - let file_path = self.path_by_current_time( - schema_key, - parsed_timestamp, - custom_partition_values, - ); std::fs::create_dir_all(&self.data_path)?; let range = TimeRange::granularity_range( parsed_timestamp.and_local_timezone(Utc).unwrap(), OBJECT_STORE_DATA_GRANULARITY, ); + let file_path = self.data_path.join(filename); let mut writer = DiskWriter::try_new(file_path, &record.schema(), range) .expect("File and RecordBatch both are checked"); @@ -150,17 +148,17 @@ impl Stream { Ok(()) } - pub fn path_by_current_time( + pub fn filename_by_partition( &self, stream_hash: &str, parsed_timestamp: NaiveDateTime, custom_partition_values: &HashMap, - ) -> PathBuf { + ) -> String { let mut hostname = hostname::get().unwrap().into_string().unwrap(); if let Some(id) = &self.ingestor_id { hostname.push_str(id); } - let filename = format!( + format!( "{stream_hash}.date={}.hour={:02}.minute={}.{}{hostname}.data.{ARROW_FILE_EXTENSION}", parsed_timestamp.date(), parsed_timestamp.hour(), @@ -170,8 +168,7 @@ impl Stream { .sorted_by_key(|v| v.0) .map(|(key, value)| format!("{key}={value}.")) .join("") - ); - self.data_path.join(filename) + ) } pub fn arrow_files(&self) -> Vec { @@ -877,18 +874,18 @@ mod tests { None, ); - let expected_path = staging.data_path.join(format!( + let expected = format!( "{stream_hash}.date={}.hour={:02}.minute={}.{}.data.{ARROW_FILE_EXTENSION}", parsed_timestamp.date(), parsed_timestamp.hour(), Minute::from(parsed_timestamp).to_slot(OBJECT_STORE_DATA_GRANULARITY), hostname::get().unwrap().into_string().unwrap() - )); + ); - let generated_path = - staging.path_by_current_time(stream_hash, parsed_timestamp, &custom_partition_values); + let generated = + staging.filename_by_partition(stream_hash, parsed_timestamp, &custom_partition_values); - assert_eq!(generated_path, expected_path); + assert_eq!(generated, expected); } #[test] @@ -911,18 +908,18 @@ mod tests { None, ); - let expected_path = staging.data_path.join(format!( + let expected = format!( "{stream_hash}.date={}.hour={:02}.minute={}.key1=value1.key2=value2.{}.data.{ARROW_FILE_EXTENSION}", parsed_timestamp.date(), parsed_timestamp.hour(), Minute::from(parsed_timestamp).to_slot(OBJECT_STORE_DATA_GRANULARITY), hostname::get().unwrap().into_string().unwrap() - )); + ); - let generated_path = - staging.path_by_current_time(stream_hash, parsed_timestamp, &custom_partition_values); + let generated = + staging.filename_by_partition(stream_hash, parsed_timestamp, &custom_partition_values); - assert_eq!(generated_path, expected_path); + assert_eq!(generated, expected); } #[test] From 2c7467174337dd2ae155c20b10eda1c90c45983b Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Sat, 15 Mar 2025 13:49:31 +0530 Subject: [PATCH 13/13] fix: construct filename for each request --- src/parseable/streams.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/parseable/streams.rs b/src/parseable/streams.rs index 7f7e97402..784658c1f 100644 --- a/src/parseable/streams.rs +++ b/src/parseable/streams.rs @@ -124,7 +124,7 @@ impl Stream { if self.options.mode != Mode::Query || stream_type == StreamType::Internal { let filename = self.filename_by_partition(schema_key, parsed_timestamp, custom_partition_values); - match guard.disk.get_mut(schema_key) { + match guard.disk.get_mut(&filename) { Some(writer) => { writer.write(record)?; } @@ -136,12 +136,12 @@ impl Stream { parsed_timestamp.and_local_timezone(Utc).unwrap(), OBJECT_STORE_DATA_GRANULARITY, ); - let file_path = self.data_path.join(filename); + let file_path = self.data_path.join(&filename); let mut writer = DiskWriter::try_new(file_path, &record.schema(), range) .expect("File and RecordBatch both are checked"); writer.write(record)?; - guard.disk.insert(schema_key.to_owned(), writer); + guard.disk.insert(filename, writer); } }; }