diff --git a/server/src/event/writer.rs b/server/src/event/writer.rs index 493b0b971..db53bcf0a 100644 --- a/server/src/event/writer.rs +++ b/server/src/event/writer.rs @@ -30,7 +30,7 @@ use crate::storage::StorageDir; use self::errors::StreamWriterError; type ArrowWriter = StreamWriter; -type LocalWriter = Mutex>>; +type LocalWriter = Mutex, ArrowWriter)>>; lazy_static! { #[derive(Default)] @@ -41,14 +41,14 @@ impl STREAM_WRITERS { // append to a existing stream pub fn append_to_local( stream: &str, - schema_key: &str, + schema_key: &String, record: &RecordBatch, ) -> Result<(), StreamWriterError> { let hashmap_guard = STREAM_WRITERS .read() .map_err(|_| StreamWriterError::RwPoisoned)?; - match hashmap_guard.get(stream, schema_key) { + match hashmap_guard.get(stream) { Some(localwriter) => { let mut writer_guard = localwriter .lock() @@ -56,14 +56,22 @@ impl STREAM_WRITERS { // if it's some writer then we write without dropping any lock // hashmap cannot be brought mutably at any point until this finishes - if let Some(ref mut writer) = *writer_guard { - writer.write(record).map_err(StreamWriterError::Writer)?; + if let Some((ref mut order, ref mut hashes, ref mut writer)) = *writer_guard { + if hashes.contains(schema_key) { + writer.write(record).map_err(StreamWriterError::Writer)?; + } else { + *order += 1; + hashes.push(schema_key.to_owned()); + *writer = init_new_stream_writer_file(stream, *order, record)?; + } } else { // pass on this mutex to set entry so that it can be reused // we have a guard for underlying entry thus // hashmap must not be availible as mutable to any other thread - let writer = init_new_stream_writer_file(stream, schema_key, record)?; - writer_guard.replace(writer); // replace the stream writer behind this mutex + let order = 0; + let writer = init_new_stream_writer_file(stream, order, record)?; + writer_guard.replace((order, vec![schema_key.to_owned()], writer)); + // replace the stream writer behind this mutex } } // entry is not present thus we create it @@ -87,9 +95,13 @@ impl STREAM_WRITERS { .write() .map_err(|_| StreamWriterError::RwPoisoned)?; - let writer = init_new_stream_writer_file(&stream, &schema_key, record)?; + let order = StorageDir::new(&stream) + .last_order_by_current_time() + .unwrap_or_default(); - hashmap_guard.insert(stream, schema_key, Mutex::new(Some(writer))); + let writer = init_new_stream_writer_file(&stream, order, record)?; + + hashmap_guard.insert(stream, Mutex::new(Some((order, vec![schema_key], writer)))); Ok(()) } @@ -104,7 +116,7 @@ impl STREAM_WRITERS { .map_err(|_| StreamWriterError::RwPoisoned)?; for writer in table.iter() { - if let Some(mut streamwriter) = writer + if let Some((_, _, mut streamwriter)) = writer .lock() .map_err(|_| StreamWriterError::MutexPoisoned)? .take() @@ -120,16 +132,14 @@ impl STREAM_WRITERS { pub struct WriterTable where A: Eq + std::hash::Hash, - B: Eq + std::hash::Hash, T: Write, { - table: HashMap>>, + table: HashMap>, } impl WriterTable where A: Eq + std::hash::Hash, - B: Eq + std::hash::Hash, T: Write, { pub fn new() -> Self { @@ -137,19 +147,16 @@ where Self { table } } - fn get(&self, a: &X, b: &Y) -> Option<&LocalWriter> + fn get(&self, a: &X) -> Option<&LocalWriter> where A: Borrow, - B: Borrow, X: Eq + std::hash::Hash + ?Sized, - Y: Eq + std::hash::Hash + ?Sized, { - self.table.get(a)?.get(b) + self.table.get(a) } - fn insert(&mut self, a: A, b: B, v: LocalWriter) { - let inner = self.table.entry(a).or_default(); - inner.insert(b, v); + fn insert(&mut self, a: A, v: LocalWriter) { + self.table.insert(a, v); } pub fn delete_stream(&mut self, stream: &X) @@ -160,18 +167,18 @@ where self.table.remove(stream); } - fn iter(&self) -> impl Iterator> { - self.table.values().flat_map(|inner| inner.values()) + fn iter(&self) -> impl Iterator> { + self.table.values() } } fn init_new_stream_writer_file( stream_name: &str, - schema_key: &str, + order: usize, record: &RecordBatch, ) -> Result, StreamWriterError> { let dir = StorageDir::new(stream_name); - let path = dir.path_by_current_time(schema_key); + let path = dir.path_by_current_time(order); std::fs::create_dir_all(dir.data_path)?; diff --git a/server/src/query.rs b/server/src/query.rs index b94d314d5..922f05b87 100644 --- a/server/src/query.rs +++ b/server/src/query.rs @@ -26,20 +26,16 @@ use datafusion::datasource::TableProvider; use datafusion::prelude::*; use itertools::Itertools; use serde_json::Value; -use std::collections::hash_map::RandomState; -use std::collections::HashSet; -use std::path::{Path, PathBuf}; +use std::path::Path; use std::sync::Arc; use crate::option::CONFIG; use crate::storage::ObjectStorageError; -use crate::storage::StorageDir; use crate::storage::{ObjectStorage, OBJECT_STORE_DATA_GRANULARITY}; use crate::utils::TimePeriod; use crate::validator; use self::error::{ExecuteError, ParseError}; -use table_provider::QueryTableProvider; type Key = &'static str; fn get_value(value: &Value, key: Key) -> Result<&str, Key> { @@ -89,41 +85,18 @@ impl Query { &self, storage: Arc, ) -> Result<(Vec, Vec), ExecuteError> { - let dir = StorageDir::new(&self.stream_name); - // take a look at local dir and figure out what local cache we could use for this query - let staging_arrows = dir - .arrow_files_grouped_by_time() - .into_iter() - .filter(|(path, _)| path_intersects_query(path, self.start, self.end)) - .sorted_by(|(a, _), (b, _)| Ord::cmp(a, b)) - .collect_vec(); - - let staging_parquet_set: HashSet<&PathBuf, RandomState> = - HashSet::from_iter(staging_arrows.iter().map(|(p, _)| p)); - - let other_staging_parquet = dir - .parquet_files() - .into_iter() - .filter(|path| path_intersects_query(path, self.start, self.end)) - .filter(|path| !staging_parquet_set.contains(path)) - .collect_vec(); - let ctx = SessionContext::with_config_rt( SessionConfig::default(), CONFIG.storage().get_datafusion_runtime(), ); - let table = Arc::new(QueryTableProvider::new( - staging_arrows, - other_staging_parquet, - self.get_prefixes(), - storage, - Arc::new(self.get_schema().clone()), - )); + let Some(table) = storage.query_table(self.get_prefixes(), Arc::new(self.get_schema().clone()))? else { + return Ok((Vec::new(), Vec::new())); + }; ctx.register_table( &*self.stream_name, - Arc::clone(&table) as Arc, + Arc::new(table) as Arc, ) .map_err(ObjectStorageError::DataFusionError)?; // execute the query and collect results @@ -144,11 +117,13 @@ impl Query { } } +#[allow(unused)] fn path_intersects_query(path: &Path, starttime: DateTime, endtime: DateTime) -> bool { let time = time_from_path(path); starttime <= time && time <= endtime } +#[allow(unused)] fn time_from_path(path: &Path) -> DateTime { let prefix = path .file_name() diff --git a/server/src/query/table_provider.rs b/server/src/query/table_provider.rs index 1d30c5475..909c6c1bc 100644 --- a/server/src/query/table_provider.rs +++ b/server/src/query/table_provider.rs @@ -16,9 +16,10 @@ * */ +#![allow(unused)] + use async_trait::async_trait; use datafusion::arrow::datatypes::{Schema, SchemaRef}; -use datafusion::arrow::ipc::reader::StreamReader; use datafusion::arrow::record_batch::RecordBatch; use datafusion::datasource::file_format::parquet::ParquetFormat; use datafusion::datasource::listing::{ @@ -33,11 +34,10 @@ use datafusion::physical_plan::ExecutionPlan; use datafusion::prelude::Expr; use itertools::Itertools; use std::any::Any; -use std::fs::File; use std::path::PathBuf; use std::sync::Arc; -use crate::storage::ObjectStorage; +use crate::storage::{MergedRecordReader, ObjectStorage}; pub struct QueryTableProvider { // parquet - ( arrow files ) @@ -88,10 +88,11 @@ impl QueryTableProvider { let mut parquet_files = Vec::new(); for (staging_parquet, arrow_files) in &self.staging_arrows { - if !load_arrows(arrow_files, &self.schema, &mut mem_records) { + if !load_arrows(arrow_files, Arc::clone(&self.schema), &mut mem_records) { parquet_files.push(staging_parquet.clone()) } } + parquet_files.extend(self.other_staging_parquet.clone()); let memtable = MemTable::try_new(Arc::clone(&self.schema), mem_records)?; @@ -204,21 +205,11 @@ fn local_parquet_table(parquet_files: &[PathBuf], schema: &SchemaRef) -> Option< fn load_arrows( files: &[PathBuf], - schema: &Schema, + schema: Arc, mem_records: &mut Vec>, ) -> bool { - let mut stream_readers = Vec::with_capacity(files.len()); - - for file in files { - let Ok(arrow_file) = File::open(file) else { return false; }; - let Ok(reader)= StreamReader::try_new(arrow_file, None) else { return false; }; - stream_readers.push(reader); - } - - let reader = crate::storage::MergedRecordReader { - readers: stream_readers, - }; - let records = reader.merged_iter(schema).collect(); - mem_records.push(records); + let Ok(reader) = MergedRecordReader::try_new(files.to_owned()) else { return false }; + let Ok(iter ) = reader.get_owned_iterator(schema) else { return false }; + mem_records.push(iter.collect()); true } diff --git a/server/src/storage.rs b/server/src/storage.rs index 1fb8666be..d160b7f49 100644 --- a/server/src/storage.rs +++ b/server/src/storage.rs @@ -221,18 +221,18 @@ impl StorageDir { format!("{local_uri}{hostname}.data.arrows") } - fn filename_by_time(stream_hash: &str, time: NaiveDateTime) -> String { - format!("{}.{}", stream_hash, Self::file_time_suffix(time)) + fn filename_by_time(order: usize, time: NaiveDateTime) -> String { + let order_prefix = format!("{order:03}"); + format!("{}.{}", order_prefix, Self::file_time_suffix(time)) } - fn filename_by_current_time(stream_hash: &str) -> String { + fn filename_by_current_time(order: usize) -> String { let datetime = Utc::now(); - Self::filename_by_time(stream_hash, datetime.naive_utc()) + Self::filename_by_time(order, datetime.naive_utc()) } - pub fn path_by_current_time(&self, stream_hash: &str) -> PathBuf { - self.data_path - .join(Self::filename_by_current_time(stream_hash)) + pub fn path_by_current_time(&self, order: usize) -> PathBuf { + self.data_path.join(Self::filename_by_current_time(order)) } pub fn arrow_files(&self) -> Vec { @@ -248,6 +248,7 @@ impl StorageDir { paths } + #[allow(unused)] pub fn arrow_files_grouped_by_time(&self) -> HashMap> { // hashmap let mut grouped_arrow_file: HashMap> = HashMap::new(); @@ -308,6 +309,25 @@ impl StorageDir { parquet_path.set_extension("parquet"); parquet_path } + + pub fn last_order_by_current_time(&self) -> Option { + let time = Utc::now().naive_utc(); + let hot_filename = StorageDir::file_time_suffix(time); + let arrow_files = self.arrow_files(); + + arrow_files + .iter() + .filter_map(|path| { + let filename = path.file_name().unwrap().to_str().unwrap(); + if filename.ends_with(&hot_filename) { + let (number, _) = filename.split_once('.').unwrap(); + Some(number.parse().unwrap()) + } else { + None + } + }) + .max() + } } #[derive(Debug, thiserror::Error)] diff --git a/server/src/storage/object_storage.rs b/server/src/storage/object_storage.rs index 884172f4d..ff969075b 100644 --- a/server/src/storage/object_storage.rs +++ b/server/src/storage/object_storage.rs @@ -30,20 +30,18 @@ use crate::{ }; use actix_web_prometheus::PrometheusMetrics; +use arrow_schema::ArrowError; use async_trait::async_trait; use bytes::Bytes; use datafusion::arrow::datatypes::Schema; use datafusion::{ - arrow::{ - array::TimestampMillisecondArray, ipc::reader::StreamReader, record_batch::RecordBatch, - }, + arrow::{ipc::reader::StreamReader, record_batch::RecordBatch}, datasource::listing::ListingTable, error::DataFusionError, execution::runtime_env::RuntimeEnv, parquet::{arrow::ArrowWriter, file::properties::WriterProperties}, }; -use itertools::kmerge_by; -use lazy_static::__Deref; +use itertools::Itertools; use relative_path::RelativePath; use relative_path::RelativePathBuf; use serde_json::Value; @@ -268,7 +266,7 @@ pub trait ObjectStorage: Sync + 'static { .with_label_values(&[stream]) .set(files.len() as i64); - let record_reader = MergedRecordReader::try_new(&files).unwrap(); + let record_reader = MergedRecordReader::try_new(files.clone()).unwrap(); let mut parquet_table = CACHED_FILES.lock().unwrap(); let parquet_file = @@ -276,10 +274,11 @@ pub trait ObjectStorage: Sync + 'static { parquet_table.upsert(&parquet_path); let props = WriterProperties::builder().build(); - let schema = Arc::new(record_reader.merged_schema()); - let mut writer = ArrowWriter::try_new(parquet_file, schema.clone(), Some(props))?; + let schema = record_reader.merged_schema()?; + let mut writer = + ArrowWriter::try_new(parquet_file, Arc::clone(&schema), Some(props))?; - for ref record in record_reader.merged_iter(&schema) { + for ref record in record_reader.get_iterator(schema) { writer.write(record)?; } @@ -373,51 +372,39 @@ pub trait ObjectStorage: Sync + 'static { #[derive(Debug)] pub struct MergedRecordReader { - pub readers: Vec>, + pub files: Vec, } impl MergedRecordReader { - pub fn try_new(files: &[PathBuf]) -> Result { - let mut readers = Vec::with_capacity(files.len()); - - for file in files { - let reader = StreamReader::try_new(File::open(file).unwrap(), None)?; - readers.push(reader); - } + pub fn try_new(mut files: Vec) -> Result { + files.sort_by(|file1, file2| file1.file_name().unwrap().cmp(file2.file_name().unwrap())); + Ok(Self { files }) + } - Ok(Self { readers }) + pub fn merged_schema(&self) -> Result, ArrowError> { + let last = StreamReader::try_new(File::open(self.files.last().unwrap()).unwrap(), None)?; + Ok(last.schema()) } - pub fn merged_iter(self, schema: &Schema) -> impl Iterator + '_ { - let adapted_readers = self - .readers - .into_iter() - .map(move |reader| reader.flatten().map(|batch| adapt_batch(schema, batch))); - - kmerge_by(adapted_readers, |a: &RecordBatch, b: &RecordBatch| { - let a: &TimestampMillisecondArray = a - .column(0) - .as_any() - .downcast_ref::() - .unwrap(); - - let b: &TimestampMillisecondArray = b - .column(0) - .as_any() - .downcast_ref::() - .unwrap(); - - a.value(0) < b.value(0) - }) + pub fn get_iterator(&self, schema: Arc) -> impl Iterator + '_ { + self.files + .iter() + .flat_map(|file| StreamReader::try_new(File::open(file).unwrap(), None).unwrap()) + .flatten() + .map(move |batch| adapt_batch(&schema, batch)) } - pub fn merged_schema(&self) -> Schema { - Schema::try_merge( - self.readers - .iter() - .map(|stream| stream.schema().deref().clone()), - ) - .unwrap() + pub fn get_owned_iterator( + &self, + schema: Arc, + ) -> Result, std::io::Error> { + let iterators: Vec<_> = self.files.iter().map(File::open).try_collect()?; + + Ok(iterators + .into_iter() + .flat_map(|file| StreamReader::try_new(file, None).unwrap()) + .flatten() + .map(move |batch| adapt_batch(&schema, batch))) } }