diff --git a/server/src/main.rs b/server/src/main.rs index 473484f26..5410b6fd9 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -72,6 +72,8 @@ async fn main() -> anyhow::Result<()> { if let Err(e) = metadata::STREAM_INFO.load(&storage).await { warn!("could not populate local metadata. {:?}", e); } + // track all parquet files already in the data directory + storage::CACHED_FILES.track_parquet(); let (localsync_handler, mut localsync_outbox, localsync_inbox) = run_local_sync(); let (mut s3sync_handler, mut s3sync_outbox, mut s3sync_inbox) = s3_sync(); diff --git a/server/src/option.rs b/server/src/option.rs index fb5e63f9a..314d83c68 100644 --- a/server/src/option.rs +++ b/server/src/option.rs @@ -245,10 +245,6 @@ impl Server where S: Clone + clap::Args + StorageOpt, { - pub fn get_cache_path(&self, stream_name: &str) -> PathBuf { - self.local_disk_path.join(stream_name) - } - pub fn local_stream_data_path(&self, stream_name: &str) -> PathBuf { self.local_disk_path.join(stream_name) } diff --git a/server/src/query.rs b/server/src/query.rs index 9b21f9b01..a79941174 100644 --- a/server/src/query.rs +++ b/server/src/query.rs @@ -16,22 +16,36 @@ * */ +use arrow_schema::SchemaRef; +use async_trait::async_trait; +use chrono::TimeZone; use chrono::{DateTime, Utc}; use datafusion::arrow::datatypes::Schema; +use datafusion::arrow::ipc::reader::StreamReader; use datafusion::arrow::record_batch::RecordBatch; use datafusion::datasource::file_format::parquet::ParquetFormat; use datafusion::datasource::listing::ListingOptions; use datafusion::datasource::listing::ListingTable; use datafusion::datasource::listing::ListingTableConfig; use datafusion::datasource::listing::ListingTableUrl; +use datafusion::datasource::{MemTable, TableProvider}; +use datafusion::error::DataFusionError; +use datafusion::execution::context::SessionState; +use datafusion::logical_expr::TableType; +use datafusion::physical_plan::union::UnionExec; +use datafusion::physical_plan::ExecutionPlan; use datafusion::prelude::*; use serde_json::Value; +use std::any::Any; +use std::fs::File; +use std::path::Path; +use std::path::PathBuf; use std::sync::Arc; -use crate::option::CONFIG; use crate::storage; use crate::storage::ObjectStorage; use crate::storage::ObjectStorageError; +use crate::storage::StorageDir; use crate::utils::TimePeriod; use crate::validator; @@ -75,56 +89,201 @@ impl Query { &self, storage: &impl ObjectStorage, ) -> Result, ExecuteError> { + let dir = StorageDir::new(&self.stream_name); + + // take a look at local dir and figure out what local cache we could use for this query + let arrow_files: Vec = dir + .arrow_files() + .into_iter() + .filter(|path| path_intersects_query(path, self.start, self.end)) + .collect(); + + let possible_parquet_files = arrow_files.clone().into_iter().map(|mut path| { + path.set_extension("parquet"); + path + }); + + let parquet_files = dir + .parquet_files() + .into_iter() + .filter(|path| path_intersects_query(path, self.start, self.end)); + + let parquet_files: Vec = possible_parquet_files.chain(parquet_files).collect(); + let mut results = vec![]; - storage.query(self, &mut results).await?; - // query cache only if end_time coulld have been after last sync. - let duration_since = Utc::now() - self.end; - if duration_since.num_seconds() < CONFIG.parseable.upload_interval as i64 { - self.execute_on_cache(&mut results).await?; + if !(arrow_files.is_empty() && parquet_files.is_empty()) { + self.execute_on_cache( + arrow_files, + parquet_files, + self.schema.clone(), + &mut results, + ) + .await?; } + storage.query(self, &mut results).await?; Ok(results) } - async fn execute_on_cache(&self, results: &mut Vec) -> Result<(), ExecuteError> { + async fn execute_on_cache( + &self, + arrow_files: Vec, + parquet_files: Vec, + schema: Arc, + results: &mut Vec, + ) -> Result<(), ExecuteError> { let ctx = SessionContext::new(); - let file_format = ParquetFormat::default().with_enable_pruning(true); + let table = Arc::new(QueryTableProvider::new(arrow_files, parquet_files, schema)); + ctx.register_table( + &*self.stream_name, + Arc::clone(&table) as Arc, + ) + .map_err(ObjectStorageError::DataFusionError)?; + // execute the query and collect results + let df = ctx.sql(self.query.as_str()).await?; + results.extend(df.collect().await?); + table.remove_preserve(); + Ok(()) + } +} - let listing_options = ListingOptions { - file_extension: ".parquet".to_owned(), - format: Arc::new(file_format), - table_partition_cols: vec![], - collect_stat: true, - target_partitions: 1, - }; +fn path_intersects_query(path: &Path, starttime: DateTime, endtime: DateTime) -> bool { + let time = time_from_path(path); + starttime <= time && time <= endtime +} - let cache_path = CONFIG.parseable.get_cache_path(&self.stream_name); +fn time_from_path(path: &Path) -> DateTime { + let prefix = path + .file_name() + .expect("all given path are file") + .to_str() + .expect("filename is valid"); + + // substring of filename i.e date=xxxx.hour=xx.minute=xx + let prefix = &prefix[..33]; + Utc.datetime_from_str(prefix, "date=%F.hour=%H.minute=%M") + .expect("valid prefix is parsed") +} - let table_path = match ListingTableUrl::parse( - cache_path.to_str().expect("path should is valid unicode"), - ) { - Ok(table_path) => table_path, - Err(e) => { - log::warn!("could not parse local filesystem path. Maybe directory does not exist. Error {}", e); - return Ok(()); - } - }; +#[derive(Debug)] +struct QueryTableProvider { + arrow_files: Vec, + parquet_files: Vec, + schema: Arc, +} - let config = ListingTableConfig::new(table_path) - .with_listing_options(listing_options) - .with_schema(Arc::clone(&self.schema)); +impl QueryTableProvider { + fn new(arrow_files: Vec, parquet_files: Vec, schema: Arc) -> Self { + // By the time this query executes the arrow files could be converted to parquet files + // we want to preserve these files as well in case - let table = ListingTable::try_new(config)?; + let mut parquet_cached = crate::storage::CACHED_FILES.lock().expect("no poisoning"); + for file in &parquet_files { + parquet_cached.upsert(file) + } - ctx.register_table(&*self.stream_name, Arc::new(table)) - .map_err(ObjectStorageError::DataFusionError)?; + Self { + arrow_files, + parquet_files, + schema, + } + } - // execute the query and collect results - let df = ctx.sql(self.query.as_str()).await?; - results.extend(df.collect().await?); + pub fn remove_preserve(&self) { + let mut parquet_cached = crate::storage::CACHED_FILES.lock().expect("no poisoning"); + for file in &self.parquet_files { + parquet_cached.remove(file) + } + } - Ok(()) + pub async fn create_physical_plan( + &self, + ctx: &SessionState, + projection: &Option>, + filters: &[Expr], + limit: Option, + ) -> Result, DataFusionError> { + let mut mem_records: Vec> = Vec::new(); + let mut parquet_files = self.parquet_files.clone(); + for file in &self.arrow_files { + let Ok(arrow_file) = File::open(file) else { continue; }; + let reader = StreamReader::try_new(arrow_file, None)?; + let records = reader + .filter_map(|record| match record { + Ok(record) => Some(record), + Err(e) => { + log::warn!("warning from arrow stream {:?}", e); + None + } + }) + .collect(); + mem_records.push(records); + + let mut file = file.clone(); + file.set_extension("parquet"); + + parquet_files.retain(|p| p != &file) + } + + let memtable = MemTable::try_new(Arc::clone(&self.schema), mem_records)?; + let memexec = memtable.scan(ctx, projection, filters, limit).await?; + + if parquet_files.is_empty() { + Ok(memexec) + } else { + let listing_options = ListingOptions { + file_extension: ".parquet".to_owned(), + format: Arc::new(ParquetFormat::default().with_enable_pruning(true)), + table_partition_cols: vec![], + collect_stat: true, + target_partitions: 1, + }; + + let paths = parquet_files + .clone() + .into_iter() + .map(|path| { + ListingTableUrl::parse(path.to_str().expect("path should is valid unicode")) + .expect("path is valid for filesystem listing") + }) + .collect(); + + let config = ListingTableConfig::new_with_multi_paths(paths) + .with_listing_options(listing_options) + .with_schema(Arc::clone(&self.schema)); + + let listtable = ListingTable::try_new(config).unwrap(); + let listexec = listtable.scan(ctx, projection, filters, limit).await?; + + Ok(Arc::new(UnionExec::new(vec![memexec, listexec]))) + } + } +} + +#[async_trait] +impl TableProvider for QueryTableProvider { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> SchemaRef { + Arc::clone(&self.schema) + } + + fn table_type(&self) -> TableType { + TableType::Base + } + + async fn scan( + &self, + ctx: &SessionState, + projection: &Option>, + filters: &[Expr], + limit: Option, + ) -> datafusion::error::Result> { + self.create_physical_plan(ctx, projection, filters, limit) + .await } } @@ -160,14 +319,23 @@ pub mod error { #[cfg(test)] mod tests { - use super::Query; + use super::{time_from_path, Query}; use crate::{alerts::Alerts, metadata::STREAM_INFO}; use datafusion::arrow::datatypes::Schema; use datafusion::arrow::datatypes::{DataType, Field}; use rstest::*; use serde_json::Value; + use std::path::PathBuf; use std::str::FromStr; + #[test] + fn test_time_from_parquet_path() { + let path = PathBuf::from("date=2022-01-01.hour=00.minute=00.hostname.data.parquet"); + let time = time_from_path(path.as_path()); + assert_eq!(time.timestamp(), 1640995200); + } + + // Query prefix generation tests #[fixture] fn schema() -> Schema { let field_a = Field::new("a", DataType::Int64, false); diff --git a/server/src/storage.rs b/server/src/storage.rs index 634400357..22f54244d 100644 --- a/server/src/storage.rs +++ b/server/src/storage.rs @@ -21,6 +21,7 @@ use crate::metadata::{LOCK_EXPECT, STREAM_INFO}; use crate::option::CONFIG; use crate::query::Query; use crate::stats::Stats; +use crate::storage::file_link::{FileLink, FileTable}; use crate::utils; use async_trait::async_trait; @@ -32,6 +33,7 @@ use datafusion::arrow::record_batch::RecordBatch; use datafusion::parquet::arrow::ArrowWriter; use datafusion::parquet::errors::ParquetError; use datafusion::parquet::file::properties::WriterProperties; +use lazy_static::lazy_static; use serde::Serialize; use std::collections::HashMap; @@ -39,6 +41,9 @@ use std::fmt::Debug; use std::fs::{self, File}; use std::iter::Iterator; use std::path::{Path, PathBuf}; +use std::sync::Mutex; + +use self::file_link::CacheState; /// local sync interval to move data.records to /tmp dir of that stream. /// 60 sec is a reasonable value. @@ -48,6 +53,21 @@ pub const LOCAL_SYNC_INTERVAL: u64 = 60; /// used for storage. Defaults to 1 min. pub const OBJECT_STORE_DATA_GRANULARITY: u32 = (LOCAL_SYNC_INTERVAL as u32) / 60; +lazy_static! { + pub static ref CACHED_FILES: Mutex> = Mutex::new(FileTable::new()); +} + +impl CACHED_FILES { + pub fn track_parquet(&self) { + let mut table = self.lock().expect("no poisoning"); + STREAM_INFO + .list_streams() + .into_iter() + .flat_map(|ref stream_name| StorageDir::new(stream_name).parquet_files().into_iter()) + .for_each(|ref path| table.upsert(path)) + } +} + #[async_trait] pub trait ObjectStorage: Sync + 'static { fn new() -> Self; @@ -90,7 +110,20 @@ pub trait ObjectStorage: Sync + 'static { // get dir let dir = StorageDir::new(stream); // walk dir, find all .arrows files and convert to parquet - for file in dir.arrow_files() { + + let mut arrow_files = dir.arrow_files(); + // Do not include file which is being written to + let hot_file = dir.path_by_current_time(); + let hot_filename = hot_file.file_name().expect("is a not none filename"); + + arrow_files.retain(|file| { + !file + .file_name() + .expect("is a not none filename") + .eq(hot_filename) + }); + + for file in arrow_files { let arrow_file = File::open(&file).map_err(|_| MoveDataError::Open)?; let reader = StreamReader::try_new(arrow_file, None)?; let schema = reader.schema(); @@ -104,9 +137,11 @@ pub trait ObjectStorage: Sync + 'static { let mut parquet_path = file.clone(); parquet_path.set_extension("parquet"); - + let mut parquet_table = CACHED_FILES.lock().unwrap(); let parquet_file = fs::File::create(&parquet_path).map_err(|_| MoveDataError::Create)?; + parquet_table.upsert(&parquet_path); + let props = WriterProperties::builder().build(); let mut writer = ArrowWriter::try_new(parquet_file, schema, Some(props))?; @@ -120,6 +155,11 @@ pub trait ObjectStorage: Sync + 'static { } for file in dir.parquet_files() { + let metadata = CACHED_FILES.lock().unwrap().get_mut(&file).metadata; + if metadata != CacheState::Idle { + continue; + } + let filename = file .file_name() .expect("only parquet files are returned by iterator") @@ -127,21 +167,24 @@ pub trait ObjectStorage: Sync + 'static { .expect("filename is valid string"); let file_suffix = str::replacen(filename, ".", "/", 3); let s3_path = format!("{}/{}", stream, file_suffix); - + CACHED_FILES + .lock() + .unwrap() + .get_mut(&file) + .set_metadata(CacheState::Uploading); let _put_parquet_file = self.upload_file(&s3_path, file.to_str().unwrap()).await?; + CACHED_FILES + .lock() + .unwrap() + .get_mut(&file) + .set_metadata(CacheState::Uploaded); stream_stats .entry(stream) .and_modify(|size| *size += file.metadata().map_or(0, |meta| meta.len())) .or_insert_with(|| file.metadata().map_or(0, |meta| meta.len())); - if let Err(e) = fs::remove_file(&file) { - log::error!( - "Error deleting parquet file in path {} due to error [{}]", - file.to_string_lossy(), - e - ); - } + CACHED_FILES.lock().unwrap().remove(&file); } } @@ -205,23 +248,12 @@ impl StorageDir { let Ok(dir) = self.data_path .read_dir() else { return vec![] }; - let mut paths: Vec = dir + let paths: Vec = dir .flatten() .map(|file| file.path()) .filter(|file| file.extension().map_or(false, |ext| ext.eq("arrows"))) .collect(); - // Do not include file which is being written to - let hot_file = self.path_by_current_time(); - let hot_filename = hot_file.file_name().expect("is a not none filename"); - - paths.retain(|file| { - !file - .file_name() - .expect("is a not none filename") - .eq(hot_filename) - }); - paths } @@ -236,6 +268,94 @@ impl StorageDir { } } +pub mod file_link { + use std::{ + collections::HashMap, + path::{Path, PathBuf}, + }; + + pub trait Link { + fn links(&self) -> usize; + fn increase_link_count(&mut self) -> usize; + fn decreate_link_count(&mut self) -> usize; + } + + #[derive(Debug, Default, PartialEq, Eq, Clone, Copy)] + pub enum CacheState { + #[default] + Idle, + Uploading, + Uploaded, + } + + #[derive(Debug)] + pub struct FileLink { + link: usize, + pub metadata: CacheState, + } + + impl Default for FileLink { + fn default() -> Self { + Self { + link: 1, + metadata: CacheState::Idle, + } + } + } + + impl FileLink { + pub fn set_metadata(&mut self, state: CacheState) { + self.metadata = state + } + } + + impl Link for FileLink { + fn links(&self) -> usize { + self.link + } + + fn increase_link_count(&mut self) -> usize { + self.link.saturating_add(1) + } + + fn decreate_link_count(&mut self) -> usize { + self.link.saturating_sub(1) + } + } + + pub struct FileTable { + inner: HashMap, + } + + impl FileTable { + pub fn new() -> Self { + Self { + inner: HashMap::default(), + } + } + + pub fn upsert(&mut self, path: &Path) { + if let Some(entry) = self.inner.get_mut(path) { + entry.increase_link_count(); + } else { + self.inner.insert(path.to_path_buf(), L::default()); + } + } + + pub fn remove(&mut self, path: &Path) { + let Some(link_count) = self.inner.get_mut(path).map(|entry| entry.decreate_link_count()) else { return }; + if link_count == 0 { + let _ = std::fs::remove_file(path); + self.inner.remove(path); + } + } + + pub fn get_mut(&mut self, path: &Path) -> &mut L { + self.inner.get_mut(path).unwrap() + } + } +} + #[derive(Debug, thiserror::Error)] pub enum MoveDataError { #[error("Unable to Open file after moving")]