From 8bceec123b8be37a511651194c9c340e22b56f98 Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Tue, 6 Dec 2022 19:38:05 +0530 Subject: [PATCH 1/2] Re-implement support for query on local cache This PR integrated a basic custom execution plan for datafusion based on existing listing features. Whenever query happens it fetches all the arrows file and loads them into memory. Ideally it will have to load two or three arrow files, this is trivial and done through memory execution plan. Other one is cached parquet which may or may not be synced at query time, this is queried through already existing listing table. Finally result of local execution is combined with that of execution for object storage. To prevent files from being removed while they are used for query there is now a basic global file tracker which will only remove file once it has been uploaded and no on going query is using that file. - [ ] Greater pruning based on query time, limit and offset - [ ] Combined execution model ( will be added after further improvements ) --- server/src/main.rs | 2 + server/src/option.rs | 4 - server/src/query.rs | 240 +++++++++++++++++++++++++++++++++++------- server/src/storage.rs | 164 +++++++++++++++++++++++++---- 4 files changed, 348 insertions(+), 62 deletions(-) diff --git a/server/src/main.rs b/server/src/main.rs index 473484f26..5410b6fd9 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -72,6 +72,8 @@ async fn main() -> anyhow::Result<()> { if let Err(e) = metadata::STREAM_INFO.load(&storage).await { warn!("could not populate local metadata. {:?}", e); } + // track all parquet files already in the data directory + storage::CACHED_FILES.track_parquet(); let (localsync_handler, mut localsync_outbox, localsync_inbox) = run_local_sync(); let (mut s3sync_handler, mut s3sync_outbox, mut s3sync_inbox) = s3_sync(); diff --git a/server/src/option.rs b/server/src/option.rs index fb5e63f9a..314d83c68 100644 --- a/server/src/option.rs +++ b/server/src/option.rs @@ -245,10 +245,6 @@ impl Server where S: Clone + clap::Args + StorageOpt, { - pub fn get_cache_path(&self, stream_name: &str) -> PathBuf { - self.local_disk_path.join(stream_name) - } - pub fn local_stream_data_path(&self, stream_name: &str) -> PathBuf { self.local_disk_path.join(stream_name) } diff --git a/server/src/query.rs b/server/src/query.rs index 9b21f9b01..2663aeaca 100644 --- a/server/src/query.rs +++ b/server/src/query.rs @@ -16,22 +16,36 @@ * */ +use arrow_schema::SchemaRef; +use async_trait::async_trait; +use chrono::TimeZone; use chrono::{DateTime, Utc}; use datafusion::arrow::datatypes::Schema; +use datafusion::arrow::ipc::reader::StreamReader; use datafusion::arrow::record_batch::RecordBatch; use datafusion::datasource::file_format::parquet::ParquetFormat; use datafusion::datasource::listing::ListingOptions; use datafusion::datasource::listing::ListingTable; use datafusion::datasource::listing::ListingTableConfig; use datafusion::datasource::listing::ListingTableUrl; +use datafusion::datasource::{MemTable, TableProvider}; +use datafusion::error::DataFusionError; +use datafusion::execution::context::SessionState; +use datafusion::logical_expr::TableType; +use datafusion::physical_plan::union::UnionExec; +use datafusion::physical_plan::ExecutionPlan; use datafusion::prelude::*; use serde_json::Value; +use std::any::Any; +use std::fs::File; +use std::path::Path; +use std::path::PathBuf; use std::sync::Arc; -use crate::option::CONFIG; use crate::storage; use crate::storage::ObjectStorage; use crate::storage::ObjectStorageError; +use crate::storage::StorageDir; use crate::utils::TimePeriod; use crate::validator; @@ -75,56 +89,201 @@ impl Query { &self, storage: &impl ObjectStorage, ) -> Result, ExecuteError> { + let dir = StorageDir::new(&self.stream_name); + + // take a look at local dir and figure out what local cache we could use for this query + let arrow_files: Vec = dir + .arrow_files() + .into_iter() + .filter(|path| path_intersects_query(path, self.start, self.end)) + .collect(); + + let possible_parquet_files = arrow_files.clone().into_iter().map(|mut path| { + path.set_extension("parquet"); + path + }); + + let parquet_files = dir + .parquet_files() + .into_iter() + .filter(|path| path_intersects_query(path, self.start, self.end)); + + let parquet_files: Vec = possible_parquet_files.chain(parquet_files).collect(); + let mut results = vec![]; - storage.query(self, &mut results).await?; - // query cache only if end_time coulld have been after last sync. - let duration_since = Utc::now() - self.end; - if duration_since.num_seconds() < CONFIG.parseable.upload_interval as i64 { - self.execute_on_cache(&mut results).await?; + if !(arrow_files.is_empty() && parquet_files.is_empty()) { + self.execute_on_cache( + arrow_files, + parquet_files, + self.schema.clone(), + &mut results, + ) + .await?; } + storage.query(self, &mut results).await?; Ok(results) } - async fn execute_on_cache(&self, results: &mut Vec) -> Result<(), ExecuteError> { + async fn execute_on_cache( + &self, + arrow_files: Vec, + parquet_files: Vec, + schema: Arc, + results: &mut Vec, + ) -> Result<(), ExecuteError> { let ctx = SessionContext::new(); - let file_format = ParquetFormat::default().with_enable_pruning(true); + let table = Arc::new(QueryTableProvider::new(arrow_files, parquet_files, schema)); + ctx.register_table( + &*self.stream_name, + Arc::clone(&table) as Arc, + ) + .map_err(ObjectStorageError::DataFusionError)?; + // execute the query and collect results + let df = ctx.sql(self.query.as_str()).await?; + results.extend(df.collect().await?); + table.remove_preserve(); + Ok(()) + } +} - let listing_options = ListingOptions { - file_extension: ".parquet".to_owned(), - format: Arc::new(file_format), - table_partition_cols: vec![], - collect_stat: true, - target_partitions: 1, - }; +fn path_intersects_query(path: &Path, starttime: DateTime, endtime: DateTime) -> bool { + let time = time_from_path(path); + starttime <= time && time <= endtime +} - let cache_path = CONFIG.parseable.get_cache_path(&self.stream_name); +fn time_from_path(path: &Path) -> DateTime { + let prefix = path + .file_name() + .expect("all given path are file") + .to_str() + .expect("filename is valid"); + + // substring of filename i.e date=xxxx.hour=xx.minute=xx + let prefix = &prefix[..33]; + Utc.datetime_from_str(prefix, "date=%F.hour=%H.minute=%M") + .expect("valid prefix is parsed") +} - let table_path = match ListingTableUrl::parse( - cache_path.to_str().expect("path should is valid unicode"), - ) { - Ok(table_path) => table_path, - Err(e) => { - log::warn!("could not parse local filesystem path. Maybe directory does not exist. Error {}", e); - return Ok(()); - } - }; +#[derive(Debug)] +struct QueryTableProvider { + arrow_files: Vec, + parquet_files: Vec, + schema: Arc, +} - let config = ListingTableConfig::new(table_path) - .with_listing_options(listing_options) - .with_schema(Arc::clone(&self.schema)); +impl QueryTableProvider { + fn new(arrow_files: Vec, parquet_files: Vec, schema: Arc) -> Self { + // By the time this query executes the arrow files could be converted to parquet files + // we want to preserve these files as well in case - let table = ListingTable::try_new(config)?; + let mut parquet_cached = crate::storage::CACHED_FILES.lock().expect("no poisoning"); + for file in &parquet_files { + parquet_cached.upsert(file) + } - ctx.register_table(&*self.stream_name, Arc::new(table)) - .map_err(ObjectStorageError::DataFusionError)?; + Self { + arrow_files, + parquet_files, + schema, + } + } - // execute the query and collect results - let df = ctx.sql(self.query.as_str()).await?; - results.extend(df.collect().await?); + pub fn remove_preserve(&self) { + let mut parquet_cached = crate::storage::CACHED_FILES.lock().expect("no poisoning"); + for file in &self.parquet_files { + parquet_cached.remove(file) + } + } - Ok(()) + pub async fn create_physical_plan( + &self, + ctx: &SessionState, + projection: &Option>, + filters: &[Expr], + limit: Option, + ) -> Result, DataFusionError> { + let mut mem_records: Vec> = Vec::new(); + let mut parquet_files = self.parquet_files.clone(); + for file in &self.arrow_files { + let Ok(arrow_file) = File::open(file) else { continue; }; + let reader = StreamReader::try_new(arrow_file, None)?; + let records = reader + .filter_map(|record| match record { + Ok(record) => Some(record), + Err(e) => { + log::warn!("warning from arrow stream {:?}", e); + None + } + }) + .collect(); + mem_records.push(records); + + let mut file = file.clone(); + file.set_extension("parquet"); + + parquet_files.retain(|p| p != &file) + } + + let memtable = MemTable::try_new(Arc::clone(&self.schema), mem_records)?; + let memexec = memtable.scan(ctx, projection, filters, limit).await?; + + if parquet_files.is_empty() { + Ok(memexec) + } else { + let listing_options = ListingOptions { + file_extension: ".parquet".to_owned(), + format: Arc::new(ParquetFormat::default().with_enable_pruning(true)), + table_partition_cols: vec![], + collect_stat: true, + target_partitions: 1, + }; + + let paths = parquet_files + .clone() + .into_iter() + .map(|path| { + ListingTableUrl::parse(path.to_str().expect("path should is valid unicode")) + .expect("path is valid for filesystem listing") + }) + .collect(); + + let config = ListingTableConfig::new_with_multi_paths(paths) + .with_listing_options(listing_options) + .with_schema(Arc::clone(&self.schema)); + + let listtable = ListingTable::try_new(config).unwrap(); + let listexec = listtable.scan(ctx, projection, filters, limit).await?; + + Ok(Arc::new(UnionExec::new(vec![memexec, listexec]))) + } + } +} + +#[async_trait] +impl TableProvider for QueryTableProvider { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> SchemaRef { + Arc::clone(&self.schema) + } + + fn table_type(&self) -> TableType { + TableType::Base + } + + async fn scan( + &self, + ctx: &SessionState, + projection: &Option>, + filters: &[Expr], + limit: Option, + ) -> datafusion::error::Result> { + self.create_physical_plan(ctx, projection, filters, limit) + .await } } @@ -160,14 +319,23 @@ pub mod error { #[cfg(test)] mod tests { - use super::Query; + use super::{time_from_path, Query}; use crate::{alerts::Alerts, metadata::STREAM_INFO}; use datafusion::arrow::datatypes::Schema; use datafusion::arrow::datatypes::{DataType, Field}; use rstest::*; use serde_json::Value; + use std::path::PathBuf; use std::str::FromStr; + #[test] + fn test_time_from_parquet_path() { + let path = PathBuf::from("date=2022-01-01.hour=00.minute=00.hostname.data.parquet"); + let time = time_from_path(path.as_path()); + assert_eq!(time.timestamp(), 100); + } + + // Query prefix generation tests #[fixture] fn schema() -> Schema { let field_a = Field::new("a", DataType::Int64, false); diff --git a/server/src/storage.rs b/server/src/storage.rs index 634400357..22f54244d 100644 --- a/server/src/storage.rs +++ b/server/src/storage.rs @@ -21,6 +21,7 @@ use crate::metadata::{LOCK_EXPECT, STREAM_INFO}; use crate::option::CONFIG; use crate::query::Query; use crate::stats::Stats; +use crate::storage::file_link::{FileLink, FileTable}; use crate::utils; use async_trait::async_trait; @@ -32,6 +33,7 @@ use datafusion::arrow::record_batch::RecordBatch; use datafusion::parquet::arrow::ArrowWriter; use datafusion::parquet::errors::ParquetError; use datafusion::parquet::file::properties::WriterProperties; +use lazy_static::lazy_static; use serde::Serialize; use std::collections::HashMap; @@ -39,6 +41,9 @@ use std::fmt::Debug; use std::fs::{self, File}; use std::iter::Iterator; use std::path::{Path, PathBuf}; +use std::sync::Mutex; + +use self::file_link::CacheState; /// local sync interval to move data.records to /tmp dir of that stream. /// 60 sec is a reasonable value. @@ -48,6 +53,21 @@ pub const LOCAL_SYNC_INTERVAL: u64 = 60; /// used for storage. Defaults to 1 min. pub const OBJECT_STORE_DATA_GRANULARITY: u32 = (LOCAL_SYNC_INTERVAL as u32) / 60; +lazy_static! { + pub static ref CACHED_FILES: Mutex> = Mutex::new(FileTable::new()); +} + +impl CACHED_FILES { + pub fn track_parquet(&self) { + let mut table = self.lock().expect("no poisoning"); + STREAM_INFO + .list_streams() + .into_iter() + .flat_map(|ref stream_name| StorageDir::new(stream_name).parquet_files().into_iter()) + .for_each(|ref path| table.upsert(path)) + } +} + #[async_trait] pub trait ObjectStorage: Sync + 'static { fn new() -> Self; @@ -90,7 +110,20 @@ pub trait ObjectStorage: Sync + 'static { // get dir let dir = StorageDir::new(stream); // walk dir, find all .arrows files and convert to parquet - for file in dir.arrow_files() { + + let mut arrow_files = dir.arrow_files(); + // Do not include file which is being written to + let hot_file = dir.path_by_current_time(); + let hot_filename = hot_file.file_name().expect("is a not none filename"); + + arrow_files.retain(|file| { + !file + .file_name() + .expect("is a not none filename") + .eq(hot_filename) + }); + + for file in arrow_files { let arrow_file = File::open(&file).map_err(|_| MoveDataError::Open)?; let reader = StreamReader::try_new(arrow_file, None)?; let schema = reader.schema(); @@ -104,9 +137,11 @@ pub trait ObjectStorage: Sync + 'static { let mut parquet_path = file.clone(); parquet_path.set_extension("parquet"); - + let mut parquet_table = CACHED_FILES.lock().unwrap(); let parquet_file = fs::File::create(&parquet_path).map_err(|_| MoveDataError::Create)?; + parquet_table.upsert(&parquet_path); + let props = WriterProperties::builder().build(); let mut writer = ArrowWriter::try_new(parquet_file, schema, Some(props))?; @@ -120,6 +155,11 @@ pub trait ObjectStorage: Sync + 'static { } for file in dir.parquet_files() { + let metadata = CACHED_FILES.lock().unwrap().get_mut(&file).metadata; + if metadata != CacheState::Idle { + continue; + } + let filename = file .file_name() .expect("only parquet files are returned by iterator") @@ -127,21 +167,24 @@ pub trait ObjectStorage: Sync + 'static { .expect("filename is valid string"); let file_suffix = str::replacen(filename, ".", "/", 3); let s3_path = format!("{}/{}", stream, file_suffix); - + CACHED_FILES + .lock() + .unwrap() + .get_mut(&file) + .set_metadata(CacheState::Uploading); let _put_parquet_file = self.upload_file(&s3_path, file.to_str().unwrap()).await?; + CACHED_FILES + .lock() + .unwrap() + .get_mut(&file) + .set_metadata(CacheState::Uploaded); stream_stats .entry(stream) .and_modify(|size| *size += file.metadata().map_or(0, |meta| meta.len())) .or_insert_with(|| file.metadata().map_or(0, |meta| meta.len())); - if let Err(e) = fs::remove_file(&file) { - log::error!( - "Error deleting parquet file in path {} due to error [{}]", - file.to_string_lossy(), - e - ); - } + CACHED_FILES.lock().unwrap().remove(&file); } } @@ -205,23 +248,12 @@ impl StorageDir { let Ok(dir) = self.data_path .read_dir() else { return vec![] }; - let mut paths: Vec = dir + let paths: Vec = dir .flatten() .map(|file| file.path()) .filter(|file| file.extension().map_or(false, |ext| ext.eq("arrows"))) .collect(); - // Do not include file which is being written to - let hot_file = self.path_by_current_time(); - let hot_filename = hot_file.file_name().expect("is a not none filename"); - - paths.retain(|file| { - !file - .file_name() - .expect("is a not none filename") - .eq(hot_filename) - }); - paths } @@ -236,6 +268,94 @@ impl StorageDir { } } +pub mod file_link { + use std::{ + collections::HashMap, + path::{Path, PathBuf}, + }; + + pub trait Link { + fn links(&self) -> usize; + fn increase_link_count(&mut self) -> usize; + fn decreate_link_count(&mut self) -> usize; + } + + #[derive(Debug, Default, PartialEq, Eq, Clone, Copy)] + pub enum CacheState { + #[default] + Idle, + Uploading, + Uploaded, + } + + #[derive(Debug)] + pub struct FileLink { + link: usize, + pub metadata: CacheState, + } + + impl Default for FileLink { + fn default() -> Self { + Self { + link: 1, + metadata: CacheState::Idle, + } + } + } + + impl FileLink { + pub fn set_metadata(&mut self, state: CacheState) { + self.metadata = state + } + } + + impl Link for FileLink { + fn links(&self) -> usize { + self.link + } + + fn increase_link_count(&mut self) -> usize { + self.link.saturating_add(1) + } + + fn decreate_link_count(&mut self) -> usize { + self.link.saturating_sub(1) + } + } + + pub struct FileTable { + inner: HashMap, + } + + impl FileTable { + pub fn new() -> Self { + Self { + inner: HashMap::default(), + } + } + + pub fn upsert(&mut self, path: &Path) { + if let Some(entry) = self.inner.get_mut(path) { + entry.increase_link_count(); + } else { + self.inner.insert(path.to_path_buf(), L::default()); + } + } + + pub fn remove(&mut self, path: &Path) { + let Some(link_count) = self.inner.get_mut(path).map(|entry| entry.decreate_link_count()) else { return }; + if link_count == 0 { + let _ = std::fs::remove_file(path); + self.inner.remove(path); + } + } + + pub fn get_mut(&mut self, path: &Path) -> &mut L { + self.inner.get_mut(path).unwrap() + } + } +} + #[derive(Debug, thiserror::Error)] pub enum MoveDataError { #[error("Unable to Open file after moving")] From 361b4e67422687916e660998e45e41a869c17230 Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Tue, 6 Dec 2022 20:38:25 +0530 Subject: [PATCH 2/2] Test fix --- server/src/query.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/query.rs b/server/src/query.rs index 2663aeaca..a79941174 100644 --- a/server/src/query.rs +++ b/server/src/query.rs @@ -332,7 +332,7 @@ mod tests { fn test_time_from_parquet_path() { let path = PathBuf::from("date=2022-01-01.hour=00.minute=00.hostname.data.parquet"); let time = time_from_path(path.as_path()); - assert_eq!(time.timestamp(), 100); + assert_eq!(time.timestamp(), 1640995200); } // Query prefix generation tests