diff --git a/src/parseable/mod.rs b/src/parseable/mod.rs index 443445474..f8ffa1ca9 100644 --- a/src/parseable/mod.rs +++ b/src/parseable/mod.rs @@ -67,6 +67,9 @@ mod streams; /// File extension for arrow files in staging const ARROW_FILE_EXTENSION: &str = "arrows"; +/// File extension for incomplete arrow files +const PART_FILE_EXTENSION: &str = "part"; + /// Name of a Stream /// NOTE: this used to be a struct, flattened out for simplicity pub type LogStream = String; diff --git a/src/parseable/staging/reader.rs b/src/parseable/staging/reader.rs index 5c9b6b5be..372bfa885 100644 --- a/src/parseable/staging/reader.rs +++ b/src/parseable/staging/reader.rs @@ -322,7 +322,6 @@ fn find_limit_and_type( #[cfg(test)] mod tests { use std::{ - fs::File, io::{self, Cursor, Read}, path::Path, sync::Arc, @@ -336,9 +335,17 @@ mod tests { write_message, DictionaryTracker, IpcDataGenerator, IpcWriteOptions, StreamWriter, }; use arrow_schema::{DataType, Field, Schema}; + use chrono::Utc; use temp_dir::TempDir; - use crate::parseable::staging::reader::{MergedReverseRecordReader, OffsetReader}; + use crate::{ + parseable::staging::{ + reader::{MergedReverseRecordReader, OffsetReader}, + writer::DiskWriter, + }, + utils::time::TimeRange, + OBJECT_STORE_DATA_GRANULARITY, + }; use super::get_reverse_reader; @@ -482,15 +489,14 @@ mod tests { schema: &Arc, batches: &[RecordBatch], ) -> io::Result<()> { - let file = File::create(path)?; + let range = TimeRange::granularity_range(Utc::now(), OBJECT_STORE_DATA_GRANULARITY); let mut writer = - StreamWriter::try_new(file, schema).expect("Failed to create StreamWriter"); + DiskWriter::try_new(path, schema, range).expect("Failed to create StreamWriter"); for batch in batches { writer.write(batch).expect("Failed to write batch"); } - writer.finish().expect("Failed to finalize writer"); Ok(()) } @@ -524,7 +530,7 @@ mod tests { #[test] fn test_merged_reverse_record_reader() -> io::Result<()> { let dir = TempDir::new().unwrap(); - let file_path = dir.path().join("test.arrow"); + let file_path = dir.path().join("test.data.arrows"); // Create a schema let schema = Arc::new(Schema::new(vec![ @@ -627,7 +633,7 @@ mod tests { #[test] fn test_get_reverse_reader_single_message() -> io::Result<()> { let dir = TempDir::new().unwrap(); - let file_path = dir.path().join("test_single.arrow"); + let file_path = dir.path().join("test_single.data.arrows"); // Create a schema let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)])); diff --git a/src/parseable/staging/writer.rs b/src/parseable/staging/writer.rs index c43252f14..6397e13e9 100644 --- a/src/parseable/staging/writer.rs +++ b/src/parseable/staging/writer.rs @@ -19,7 +19,9 @@ use std::{ collections::{HashMap, HashSet}, - fs::File, + fs::{File, OpenOptions}, + io::BufWriter, + path::PathBuf, sync::Arc, }; @@ -27,14 +29,77 @@ use arrow_array::RecordBatch; use arrow_ipc::writer::StreamWriter; use arrow_schema::Schema; use arrow_select::concat::concat_batches; +use chrono::Utc; use itertools::Itertools; +use tracing::{error, warn}; -use crate::utils::arrow::adapt_batch; +use crate::{ + parseable::{ARROW_FILE_EXTENSION, PART_FILE_EXTENSION}, + utils::{arrow::adapt_batch, time::TimeRange}, +}; + +use super::StagingError; #[derive(Default)] pub struct Writer { pub mem: MemWriter<16384>, - pub disk: HashMap>, + pub disk: HashMap, +} + +pub struct DiskWriter { + inner: StreamWriter>, + path: PathBuf, + range: TimeRange, +} + +impl DiskWriter { + /// Try to create a file to stream arrows into + pub fn try_new( + path: impl Into, + schema: &Schema, + range: TimeRange, + ) -> Result { + let mut path = path.into(); + path.set_extension(PART_FILE_EXTENSION); + let file = OpenOptions::new() + .write(true) + .truncate(true) + .create(true) + .open(&path)?; + let inner = StreamWriter::try_new_buffered(file, schema)?; + + Ok(Self { inner, path, range }) + } + + pub fn is_current(&self) -> bool { + self.range.contains(Utc::now()) + } + + /// Write a single recordbatch into file + pub fn write(&mut self, rb: &RecordBatch) -> Result<(), StagingError> { + self.inner.write(rb).map_err(StagingError::Arrow) + } +} + +impl Drop for DiskWriter { + /// Write the continuation bytes and mark the file as done, rename to `.data.arrows` + fn drop(&mut self) { + if let Err(err) = self.inner.finish() { + error!("Couldn't finish arrow file {:?}, error = {err}", self.path); + return; + } + + let mut arrow_path = self.path.to_owned(); + arrow_path.set_extension(ARROW_FILE_EXTENSION); + + if arrow_path.exists() { + warn!("File {arrow_path:?} exists and will be overwritten"); + } + + if let Err(err) = std::fs::rename(&self.path, &arrow_path) { + error!("Couldn't rename file {:?}, error = {err}", self.path); + } + } } /// Structure to keep recordbatches in memory. diff --git a/src/parseable/streams.rs b/src/parseable/streams.rs index b21f1ee20..784658c1f 100644 --- a/src/parseable/streams.rs +++ b/src/parseable/streams.rs @@ -28,9 +28,8 @@ use std::{ }; use arrow_array::RecordBatch; -use arrow_ipc::writer::StreamWriter; use arrow_schema::{Field, Fields, Schema}; -use chrono::{NaiveDateTime, Timelike}; +use chrono::{NaiveDateTime, Timelike, Utc}; use derive_more::{Deref, DerefMut}; use itertools::Itertools; use parquet::{ @@ -55,14 +54,14 @@ use crate::{ metrics, option::Mode, storage::{object_storage::to_bytes, retention::Retention, StreamType}, - utils::time::Minute, + utils::time::{Minute, TimeRange}, LOCK_EXPECT, OBJECT_STORE_DATA_GRANULARITY, }; use super::{ staging::{ reader::{MergedRecordReader, MergedReverseRecordReader}, - writer::Writer, + writer::{DiskWriter, Writer}, StagingError, }, LogStream, ARROW_FILE_EXTENSION, @@ -123,29 +122,26 @@ impl Stream { ) -> Result<(), StagingError> { let mut guard = self.writer.lock().unwrap(); if self.options.mode != Mode::Query || stream_type == StreamType::Internal { - match guard.disk.get_mut(schema_key) { + let filename = + self.filename_by_partition(schema_key, parsed_timestamp, custom_partition_values); + match guard.disk.get_mut(&filename) { Some(writer) => { writer.write(record)?; } None => { // entry is not present thus we create it - let file_path = self.path_by_current_time( - schema_key, - parsed_timestamp, - custom_partition_values, - ); std::fs::create_dir_all(&self.data_path)?; - let file = OpenOptions::new() - .create(true) - .append(true) - .open(&file_path)?; - - let mut writer = StreamWriter::try_new(file, &record.schema()) + let range = TimeRange::granularity_range( + parsed_timestamp.and_local_timezone(Utc).unwrap(), + OBJECT_STORE_DATA_GRANULARITY, + ); + let file_path = self.data_path.join(&filename); + let mut writer = DiskWriter::try_new(file_path, &record.schema(), range) .expect("File and RecordBatch both are checked"); writer.write(record)?; - guard.disk.insert(schema_key.to_owned(), writer); + guard.disk.insert(filename, writer); } }; } @@ -155,17 +151,17 @@ impl Stream { Ok(()) } - pub fn path_by_current_time( + pub fn filename_by_partition( &self, stream_hash: &str, parsed_timestamp: NaiveDateTime, custom_partition_values: &HashMap, - ) -> PathBuf { + ) -> String { let mut hostname = hostname::get().unwrap().into_string().unwrap(); if let Some(id) = &self.ingestor_id { hostname.push_str(id); } - let filename = format!( + format!( "{stream_hash}.date={}.hour={:02}.minute={}.{}{hostname}.data.{ARROW_FILE_EXTENSION}", parsed_timestamp.date(), parsed_timestamp.hour(), @@ -175,8 +171,7 @@ impl Stream { .sorted_by_key(|v| v.0) .map(|(key, value)| format!("{key}={value}.")) .join("") - ); - self.data_path.join(filename) + ) } pub fn arrow_files(&self) -> Vec { @@ -366,19 +361,12 @@ impl Stream { self.writer.lock().unwrap().mem.clear(); } - pub fn flush(&self) { - let mut disk_writers = { - let mut writer = self.writer.lock().unwrap(); - // Flush memory - writer.mem.clear(); - // Take schema -> disk writer mapping - std::mem::take(&mut writer.disk) - }; - - // Flush disk - for writer in disk_writers.values_mut() { - _ = writer.finish(); - } + pub fn flush(&self, forced: bool) { + let mut writer = self.writer.lock().unwrap(); + // Flush memory + writer.mem.clear(); + // Drop schema -> disk writer mapping, triggers flush to disk + writer.disk.retain(|_, w| !forced && w.is_current()); } fn parquet_writer_props( @@ -733,7 +721,7 @@ impl Stream { /// First flushes arrows onto disk and then converts the arrow into parquet pub fn flush_and_convert(&self, shutdown_signal: bool) -> Result<(), StagingError> { - self.flush(); + self.flush(shutdown_signal); self.prepare_parquet(shutdown_signal) } @@ -944,18 +932,18 @@ mod tests { None, ); - let expected_path = staging.data_path.join(format!( + let expected = format!( "{stream_hash}.date={}.hour={:02}.minute={}.{}.data.{ARROW_FILE_EXTENSION}", parsed_timestamp.date(), parsed_timestamp.hour(), Minute::from(parsed_timestamp).to_slot(OBJECT_STORE_DATA_GRANULARITY), hostname::get().unwrap().into_string().unwrap() - )); + ); - let generated_path = - staging.path_by_current_time(stream_hash, parsed_timestamp, &custom_partition_values); + let generated = + staging.filename_by_partition(stream_hash, parsed_timestamp, &custom_partition_values); - assert_eq!(generated_path, expected_path); + assert_eq!(generated, expected); } #[test] @@ -978,18 +966,18 @@ mod tests { None, ); - let expected_path = staging.data_path.join(format!( + let expected = format!( "{stream_hash}.date={}.hour={:02}.minute={}.key1=value1.key2=value2.{}.data.{ARROW_FILE_EXTENSION}", parsed_timestamp.date(), parsed_timestamp.hour(), Minute::from(parsed_timestamp).to_slot(OBJECT_STORE_DATA_GRANULARITY), hostname::get().unwrap().into_string().unwrap() - )); + ); - let generated_path = - staging.path_by_current_time(stream_hash, parsed_timestamp, &custom_partition_values); + let generated = + staging.filename_by_partition(stream_hash, parsed_timestamp, &custom_partition_values); - assert_eq!(generated_path, expected_path); + assert_eq!(generated, expected); } #[test] @@ -1045,7 +1033,7 @@ mod tests { StreamType::UserDefined, ) .unwrap(); - staging.flush(); + staging.flush(true); } #[test] diff --git a/src/utils/time.rs b/src/utils/time.rs index cc602ed82..622f28dc1 100644 --- a/src/utils/time.rs +++ b/src/utils/time.rs @@ -262,6 +262,29 @@ impl TimeRange { } time_bounds } + + /// Returns a time range of `data_granularity` length which incorporates provided timestamp + pub fn granularity_range(timestamp: DateTime, data_granularity: u32) -> Self { + let time = timestamp + .time() + .with_second(0) + .and_then(|time| time.with_nanosecond(0)) + .expect("Within expected time range"); + let timestamp = timestamp.with_time(time).unwrap(); + let block_n = timestamp.minute() / data_granularity; + let block_start = block_n * data_granularity; + let start = timestamp + .with_minute(block_start) + .expect("Within minute range"); + let end = start + TimeDelta::minutes(data_granularity as i64); + + Self { start, end } + } + + /// Returns true if the provided timestamp is within the timerange + pub fn contains(&self, time: DateTime) -> bool { + self.start <= time && self.end > time + } } /// Represents a minute value (0-59) and provides methods for converting it to a slot range. @@ -323,7 +346,7 @@ impl Minute { mod tests { use super::*; - use chrono::{Duration, SecondsFormat, Utc}; + use chrono::{Duration, SecondsFormat, TimeZone, Utc}; use rstest::*; #[test] @@ -513,4 +536,82 @@ mod tests { fn illegal_slot_granularity() { Minute::try_from(0).unwrap().to_slot(40); } + + #[test] + fn test_granularity_one_minute() { + // Test with 1-minute granularity + let timestamp = Utc.with_ymd_and_hms(2023, 1, 1, 12, 30, 45).unwrap(); + let range = TimeRange::granularity_range(timestamp, 1); + + assert_eq!(range.start.minute(), 30); + assert_eq!(range.end.minute(), 31); + assert_eq!(range.start.hour(), 12); + assert_eq!(range.end.hour(), 12); + } + + #[test] + fn test_granularity_five_minutes() { + // Test with 5-minute granularity + let timestamp = Utc.with_ymd_and_hms(2023, 1, 1, 12, 17, 45).unwrap(); + let range = TimeRange::granularity_range(timestamp, 5); + + // For minute 17, with granularity 5, block_n = 17 / 5 = 3 + // block_start = 3 * 5 = 15 + // block_end = (3 + 1) * 5 = 20 + assert_eq!(range.start.minute(), 15); + assert_eq!(range.end.minute(), 20); + } + + #[test] + fn test_granularity_fifteen_minutes() { + // Test with 15-minute granularity + let timestamp = Utc.with_ymd_and_hms(2023, 1, 1, 12, 29, 0).unwrap(); + let range = TimeRange::granularity_range(timestamp, 15); + + // For minute 29, with granularity 15, block_n = 29 / 15 = 1 + // block_start = 1 * 15 = 15 + // block_end = (1 + 1) * 15 = 30 + assert_eq!(range.start.minute(), 15); + assert_eq!(range.end.minute(), 30); + } + + #[test] + fn test_granularity_thirty_minutes() { + // Test with 30-minute granularity + let timestamp = Utc.with_ymd_and_hms(2023, 1, 1, 12, 31, 0).unwrap(); + let range = TimeRange::granularity_range(timestamp, 30); + + // For minute 31, with granularity 30, block_n = 31 / 30 = 1 + // block_start = 1 * 30 = 30 + // block_end = (1 + 1) * 30 = 60, which should wrap to 0 in the next hour + assert_eq!(range.start.minute(), 30); + assert_eq!(range.end.minute(), 0); + assert_eq!(range.start.hour(), 12); + assert_eq!(range.end.hour(), 13); // Should be next hour + } + + #[test] + fn test_granularity_edge_case() { + // Test edge case where minute is exactly at granularity boundary + let timestamp = Utc.with_ymd_and_hms(2023, 1, 1, 12, 15, 0).unwrap(); + let range = TimeRange::granularity_range(timestamp, 15); + + assert_eq!(range.start.minute(), 15); + assert_eq!(range.end.minute(), 30); + } + + #[test] + fn test_granularity_hour_boundary() { + // Test case where end would exceed hour boundary + let timestamp = Utc.with_ymd_and_hms(2023, 1, 1, 12, 59, 59).unwrap(); + let range = TimeRange::granularity_range(timestamp, 20); + + // For minute 59, block_n = 59 / 20 = 2 + // block_start = 2 * 20 = 40 + // block_end = (2 + 1) * 20 = 60, which should wrap to 0 in the next hour + assert_eq!(range.start.minute(), 40); + assert_eq!(range.end.minute(), 0); + assert_eq!(range.start.hour(), 12); + assert_eq!(range.end.hour(), 13); + } }