From e01ceac7dbd8fef1d5ab3c195c9bc615ff6d59cd Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Mon, 25 Dec 2023 11:58:36 +0530 Subject: [PATCH 01/15] Add option for keeping cache of recent data This PR adds local cache / hot tier for Parseable. This option can be enabled by setting following env vars P_CACHE_DIR - Local Path for file cache P_CACHE_SIZE - Size for cache in human readable size ( mb/mib/gib/gb ) When these flags are set, sync flow will move the parquet from staging into the specified cache directory instead of deleting it. Any LRU cached entry is deleted to satisfy the cache constraint upon insertion. LocalCacheManager is responsible for updating and persisting this data structure. --- .gitignore | 1 + Cargo.lock | 27 ++ server/Cargo.toml | 2 + server/src/localcache.rs | 189 +++++++++++ server/src/main.rs | 1 + server/src/option.rs | 50 +++ server/src/query.rs | 1 - server/src/query/stream_schema_provider.rs | 365 ++++++++++++++------- server/src/query/table_provider.rs | 140 -------- server/src/storage/localfs.rs | 9 +- server/src/storage/object_storage.rs | 30 +- server/src/storage/s3.rs | 6 + 12 files changed, 543 insertions(+), 278 deletions(-) create mode 100644 server/src/localcache.rs delete mode 100644 server/src/query/table_provider.rs diff --git a/.gitignore b/.gitignore index fb4be9828..27ff2bd97 100644 --- a/.gitignore +++ b/.gitignore @@ -1,6 +1,7 @@ target data staging +limitcache examples cert.pem key.pem diff --git a/Cargo.lock b/Cargo.lock index 8b4fe8cfe..15405c107 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1792,6 +1792,15 @@ version = "0.12.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888" +[[package]] +name = "hashbrown" +version = "0.13.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "43a3c133739dddd0d2990f9a4bdf8eb4b21ef50e4851ca85ab661199821d510e" +dependencies = [ + "ahash 0.8.3", +] + [[package]] name = "hashbrown" version = "0.14.0" @@ -1802,6 +1811,16 @@ dependencies = [ "allocator-api2", ] +[[package]] +name = "hashlru" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ec243be29f0218c651d6dc31eafb562c4363b2e96cd42a92b6948964d28f4c5a" +dependencies = [ + "hashbrown 0.13.2", + "serde", +] + [[package]] name = "heck" version = "0.4.1" @@ -1889,6 +1908,12 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c4a1e36c821dbe04574f602848a19f742f4fb3c98d40449f11bcad18d6b17421" +[[package]] +name = "human-size" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9994b79e8c1a39b3166c63ae7823bb2b00831e2a96a31399c50fe69df408eaeb" + [[package]] name = "humantime" version = "2.1.0" @@ -2671,10 +2696,12 @@ dependencies = [ "fs_extra", "futures", "futures-util", + "hashlru", "hex", "hostname", "http", "http-auth-basic", + "human-size", "humantime", "humantime-serde", "itertools 0.10.5", diff --git a/server/Cargo.toml b/server/Cargo.toml index 890d09cff..a948a39d3 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -96,10 +96,12 @@ xxhash-rust = { version = "0.8", features = ["xxh3"] } xz2 = { version = "*", features = ["static"] } nom = "7.1.3" humantime = "2.1.0" +human-size = "0.4" openid = { version = "0.12.0", default-features = false, features = ["rustls"] } url = "2.4.0" http-auth-basic = "0.3.3" serde_repr = "0.1.17" +hashlru = { version = "0.11.0", features = ["serde"] } [build-dependencies] cargo_toml = "0.15" diff --git a/server/src/localcache.rs b/server/src/localcache.rs new file mode 100644 index 000000000..106c35de3 --- /dev/null +++ b/server/src/localcache.rs @@ -0,0 +1,189 @@ +/* + * Parseable Server (C) 2022 - 2023 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +use std::{io, path::PathBuf, sync::Arc}; + +use fs_extra::file::CopyOptions; +use futures_util::TryFutureExt; +use hashlru::Cache; +use itertools::{Either, Itertools}; +use object_store::ObjectStore; +use once_cell::sync::OnceCell; +use tokio::{fs, sync::Mutex}; + +use crate::option::CONFIG; + +pub const STREAM_CACHE_FILENAME: &str = ".cache.json"; + +#[derive(Debug, serde::Deserialize, serde::Serialize)] +pub struct LocalCache { + version: String, + current_size: u64, + capacity: u64, + files: Cache, +} + +impl LocalCache { + fn new_with_size(capacity: u64) -> Self { + Self { + version: "v1".to_string(), + current_size: 0, + capacity, + files: Cache::new(100), + } + } + + fn can_push(&self, size: u64) -> bool { + self.capacity >= self.current_size + size + } +} + +pub struct LocalCacheManager { + object_store: Arc, + cache_path: PathBuf, + cache_capacity: u64, + copy_options: CopyOptions, + semaphore: Mutex<()>, +} + +impl LocalCacheManager { + pub fn global() -> Option<&'static LocalCacheManager> { + static INSTANCE: OnceCell = OnceCell::new(); + + let Some(cache_path) = &CONFIG.parseable.local_cache_path else { + return None; + }; + + Some(INSTANCE.get_or_init(|| { + let cache_path = cache_path.clone(); + std::fs::create_dir_all(&cache_path).unwrap(); + let object_store = CONFIG.storage().get_store(); + LocalCacheManager { + object_store, + cache_path, + cache_capacity: CONFIG.parseable.local_cache_size, + copy_options: CopyOptions { + overwrite: true, + skip_exist: false, + ..CopyOptions::new() + }, + semaphore: Mutex::new(()), + } + })) + } + + pub async fn get_cache(&self, stream: &str) -> Result { + let path = cache_file_path(&self.cache_path, stream).unwrap(); + let res = self + .object_store + .get(&path) + .and_then(|resp| resp.bytes()) + .await; + let cache = match res { + Ok(bytes) => serde_json::from_slice(&bytes)?, + Err(object_store::Error::NotFound { .. }) => { + LocalCache::new_with_size(self.cache_capacity) + } + Err(err) => return Err(err.into()), + }; + Ok(cache) + } + + pub async fn put_cache(&self, stream: &str, cache: &LocalCache) -> Result<(), CacheError> { + let path = cache_file_path(&self.cache_path, stream).unwrap(); + let bytes = serde_json::to_vec(cache)?.into(); + Ok(self.object_store.put(&path, bytes).await?) + } + + pub async fn move_to_cache( + &self, + stream: &str, + key: String, + staging_path: PathBuf, + ) -> Result<(), CacheError> { + let lock = self.semaphore.lock().await; + let mut cache_path = self.cache_path.join(stream); + fs::create_dir_all(&cache_path).await?; + cache_path.push(staging_path.file_name().unwrap()); + fs_extra::file::move_file(staging_path, &cache_path, &self.copy_options)?; + let file_size = std::fs::metadata(&cache_path)?.len(); + let mut cache = self.get_cache(stream).await?; + + while !cache.can_push(file_size) { + if let Some((_, file_for_removal)) = cache.files.pop_lru() { + let lru_file_size = std::fs::metadata(&file_for_removal)?.len(); + cache.current_size = cache.current_size.saturating_sub(lru_file_size); + log::info!("removing cache entry"); + tokio::spawn(fs::remove_file(file_for_removal)); + } else { + log::error!("Cache size too small"); + break; + } + } + + if cache.files.is_full() { + cache.files.resize(cache.files.capacity() * 2); + } + cache.files.push(key, cache_path); + cache.current_size += file_size; + self.put_cache(stream, &cache).await?; + drop(lock); + Ok(()) + } + + pub async fn partition_on_cached( + &self, + stream: &str, + collection: Vec, + key: fn(&T) -> &String, + ) -> Result<(Vec<(T, PathBuf)>, Vec), CacheError> { + let lock = self.semaphore.lock().await; + let mut cache = self.get_cache(stream).await?; + let (cached, remainder): (Vec<_>, Vec<_>) = collection.into_iter().partition_map(|item| { + let key = key(&item); + match cache.files.get(key).cloned() { + Some(path) => Either::Left((item, path)), + None => Either::Right(item), + } + }); + self.put_cache(stream, &cache).await?; + drop(lock); + Ok((cached, remainder)) + } +} + +fn cache_file_path( + root: impl AsRef, + stream: &str, +) -> Result { + let mut path = root.as_ref().join(stream); + path.set_file_name(STREAM_CACHE_FILENAME); + object_store::path::Path::from_absolute_path(path) +} + +#[derive(Debug, thiserror::Error)] +pub enum CacheError { + #[error("{0}")] + Serde(#[from] serde_json::Error), + #[error("{0}")] + IOError(#[from] io::Error), + #[error("{0}")] + MoveError(#[from] fs_extra::error::Error), + #[error("{0}")] + ObjectStoreError(#[from] object_store::Error), +} diff --git a/server/src/main.rs b/server/src/main.rs index 0d2bee0b3..4ebd9e545 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -33,6 +33,7 @@ mod catalog; mod event; mod handlers; mod livetail; +mod localcache; mod metadata; mod metrics; mod migration; diff --git a/server/src/option.rs b/server/src/option.rs index 01ac2b832..6a0be27f4 100644 --- a/server/src/option.rs +++ b/server/src/option.rs @@ -174,6 +174,12 @@ pub struct Server { /// for incoming events and local cache pub local_staging_path: PathBuf, + /// The local cache path is used for speeding up query on latest data + pub local_cache_path: Option, + + /// Size for local cache + pub local_cache_size: u64, + /// Interval in seconds after which uncommited data would be /// uploaded to the storage platform. pub upload_interval: u64, @@ -220,6 +226,7 @@ impl FromArgMatches for Server { } fn update_from_arg_matches(&mut self, m: &clap::ArgMatches) -> Result<(), clap::Error> { + self.local_cache_path = m.get_one::(Self::CACHE).cloned(); self.tls_cert_path = m.get_one::(Self::TLS_CERT).cloned(); self.tls_key_path = m.get_one::(Self::TLS_KEY).cloned(); self.domain_address = m.get_one::(Self::DOMAIN_URI).cloned(); @@ -235,6 +242,10 @@ impl FromArgMatches for Server { .get_one::(Self::STAGING) .cloned() .expect("default value for staging"); + self.local_cache_size = m + .get_one::(Self::CACHE_SIZE) + .cloned() + .expect("default value for cache size"); self.upload_interval = m .get_one::(Self::UPLOAD_INTERVAL) .cloned() @@ -319,6 +330,8 @@ impl Server { pub const ADDRESS: &'static str = "address"; pub const DOMAIN_URI: &'static str = "origin"; pub const STAGING: &'static str = "local-staging-path"; + pub const CACHE: &'static str = "cache-path"; + pub const CACHE_SIZE: &'static str = "cache-size"; pub const UPLOAD_INTERVAL: &'static str = "upload-interval"; pub const USERNAME: &'static str = "username"; pub const PASSWORD: &'static str = "password"; @@ -384,6 +397,25 @@ impl Server { .value_parser(validation::canonicalize_path) .help("The local staging path is used as a temporary landing point for incoming events and local cache") .next_line_help(true), + ) + .arg( + Arg::new(Self::CACHE) + .long(Self::CACHE) + .env("P_CACHE_DIR") + .value_name("DIR") + .value_parser(validation::canonicalize_path) + .help("Local path to be used for caching latest files") + .next_line_help(true), + ) + .arg( + Arg::new(Self::CACHE_SIZE) + .long(Self::CACHE_SIZE) + .env("P_CACHE_SIZE") + .value_name("size") + .default_value("1Gib") + .value_parser(validation::human_size_to_bytes) + .help("Size for cache in human readable format (e.g 1GiB, 2GiB, 100MB)") + .next_line_help(true), ) .arg( Arg::new(Self::UPLOAD_INTERVAL) @@ -569,8 +601,11 @@ pub mod validation { fs::{canonicalize, create_dir_all}, net::ToSocketAddrs, path::PathBuf, + str::FromStr, }; + use human_size::SpecificSize; + pub fn file_path(s: &str) -> Result { if s.is_empty() { return Err("empty path".to_owned()); @@ -606,4 +641,19 @@ pub mod validation { pub fn url(s: &str) -> Result { url::Url::parse(s).map_err(|_| "Invalid URL provided".to_string()) } + + pub fn human_size_to_bytes(s: &str) -> Result { + use human_size::multiples; + fn parse_and_map( + s: &str, + ) -> Result { + SpecificSize::::from_str(s).map(|x| x.to_bytes()) + } + + parse_and_map::(s) + .or(parse_and_map::(s)) + .or(parse_and_map::(s)) + .or(parse_and_map::(s)) + .map_err(|_| "Could not parse given size".to_string()) + } } diff --git a/server/src/query.rs b/server/src/query.rs index 663873dae..3e17e3c73 100644 --- a/server/src/query.rs +++ b/server/src/query.rs @@ -19,7 +19,6 @@ mod filter_optimizer; mod listing_table_builder; mod stream_schema_provider; -mod table_provider; use chrono::{DateTime, Utc}; use chrono::{NaiveDateTime, TimeZone}; diff --git a/server/src/query/stream_schema_provider.rs b/server/src/query/stream_schema_provider.rs index 14924cf2c..92214dd7b 100644 --- a/server/src/query/stream_schema_provider.rs +++ b/server/src/query/stream_schema_provider.rs @@ -18,6 +18,7 @@ use std::{any::Any, collections::HashMap, ops::Bound, sync::Arc}; +use arrow_array::RecordBatch; use arrow_schema::{Schema, SchemaRef, SortOptions}; use bytes::Bytes; use chrono::{NaiveDateTime, Timelike, Utc}; @@ -31,18 +32,18 @@ use datafusion::{ file_format::{parquet::ParquetFormat, FileFormat}, listing::PartitionedFile, physical_plan::FileScanConfig, - TableProvider, + MemTable, TableProvider, }, error::DataFusionError, execution::{context::SessionState, object_store::ObjectStoreUrl}, logical_expr::{BinaryExpr, Operator, TableProviderFilterPushDown, TableType}, optimizer::utils::conjunction, physical_expr::{create_physical_expr, PhysicalSortExpr}, - physical_plan::{self, ExecutionPlan}, + physical_plan::{self, empty::EmptyExec, union::UnionExec, ExecutionPlan, Statistics}, prelude::{Column, Expr}, scalar::ScalarValue, }; -use futures_util::{stream::FuturesOrdered, StreamExt, TryStreamExt}; +use futures_util::{stream::FuturesOrdered, StreamExt, TryFutureExt, TryStreamExt}; use itertools::Itertools; use object_store::{path::Path, ObjectStore}; use url::Url; @@ -53,12 +54,13 @@ use crate::{ Snapshot, }, event::{self, DEFAULT_TIMESTAMP_KEY}, + localcache::LocalCacheManager, metadata::STREAM_INFO, option::CONFIG, storage::ObjectStorage, }; -use super::{listing_table_builder::ListingTableBuilder, table_provider::QueryTableProvider}; +use super::listing_table_builder::ListingTableBuilder; // schema provider for stream based on global data pub struct GlobalSchemaProvider { @@ -101,107 +103,101 @@ struct StandardTableProvider { url: Url, } -impl StandardTableProvider { - #[allow(clippy::too_many_arguments)] - async fn remote_physical_plan( - &self, - glob_storage: Arc, - object_store: Arc, - snapshot: &catalog::snapshot::Snapshot, - time_filters: &[PartialTimeFilter], - projection: Option<&Vec>, - filters: &[Expr], - limit: Option, - state: &SessionState, - ) -> Result>, DataFusionError> { - let items = snapshot.manifests(time_filters); - let manifest_files = collect_manifest_files( - object_store, - items - .into_iter() - .sorted_by_key(|file| file.time_lower_bound) - .map(|item| item.manifest_path) - .collect(), - ) - .await?; +#[allow(clippy::too_many_arguments)] +async fn create_parquet_physical_plan( + object_store_url: ObjectStoreUrl, + partitions: Vec>, + statistics: Statistics, + schema: Arc, + projection: Option<&Vec>, + filters: &[Expr], + limit: Option, + state: &SessionState, +) -> Result, DataFusionError> { + let filters = if let Some(expr) = conjunction(filters.to_vec()) { + let table_df_schema = schema.as_ref().clone().to_dfschema()?; + let filters = + create_physical_expr(&expr, &table_df_schema, &schema, state.execution_props())?; + Some(filters) + } else { + None + }; - let mut manifest_files: Vec<_> = manifest_files - .into_iter() - .flat_map(|file| file.files) - .rev() - .collect(); + let sort_expr = PhysicalSortExpr { + expr: physical_plan::expressions::col(DEFAULT_TIMESTAMP_KEY, &schema)?, + options: SortOptions { + descending: true, + nulls_first: true, + }, + }; - for filter in filters { - manifest_files.retain(|file| !file.can_be_pruned(filter)) - } + let file_format = ParquetFormat::default().with_enable_pruning(Some(true)); + // create the execution plan + let plan = file_format + .create_physical_plan( + state, + FileScanConfig { + object_store_url, + file_schema: schema.clone(), + file_groups: partitions, + statistics, + projection: projection.cloned(), + limit, + output_ordering: vec![vec![sort_expr]], + table_partition_cols: Vec::new(), + infinite_source: false, + }, + filters.as_ref(), + ) + .await?; - if let Some(limit) = limit { - let limit = limit as u64; - let mut curr_limit = 0; - let mut pos = None; - - for (index, file) in manifest_files.iter().enumerate() { - curr_limit += file.num_rows(); - if curr_limit >= limit { - pos = Some(index); - break; - } - } + Ok(plan) +} - if let Some(pos) = pos { - manifest_files.truncate(pos + 1); +async fn collect_from_snapshot( + snapshot: &catalog::snapshot::Snapshot, + time_filters: &[PartialTimeFilter], + object_store: Arc, + filters: &[Expr], + limit: Option, +) -> Result, DataFusionError> { + let items = snapshot.manifests(time_filters); + let manifest_files = collect_manifest_files( + object_store, + items + .into_iter() + .sorted_by_key(|file| file.time_lower_bound) + .map(|item| item.manifest_path) + .collect(), + ) + .await?; + let mut manifest_files: Vec<_> = manifest_files + .into_iter() + .flat_map(|file| file.files) + .rev() + .collect(); + for filter in filters { + manifest_files.retain(|file| !file.can_be_pruned(filter)) + } + if let Some(limit) = limit { + let limit = limit as u64; + let mut curr_limit = 0; + let mut pos = None; + + for (index, file) in manifest_files.iter().enumerate() { + curr_limit += file.num_rows(); + if curr_limit >= limit { + pos = Some(index); + break; } } - if manifest_files.is_empty() { - return Ok(None); + if let Some(pos) = pos { + manifest_files.truncate(pos + 1); } - - let (partitioned_files, statistics) = partitioned_files(manifest_files, &self.schema, 1); - - let filters = if let Some(expr) = conjunction(filters.to_vec()) { - let table_df_schema = self.schema.as_ref().clone().to_dfschema()?; - let filters = create_physical_expr( - &expr, - &table_df_schema, - &self.schema, - state.execution_props(), - )?; - Some(filters) - } else { - None - }; - - let sort_expr = PhysicalSortExpr { - expr: physical_plan::expressions::col(DEFAULT_TIMESTAMP_KEY, &self.schema)?, - options: SortOptions { - descending: true, - nulls_first: true, - }, - }; - - let file_format = ParquetFormat::default().with_enable_pruning(Some(true)); - // create the execution plan - let plan = file_format - .create_physical_plan( - state, - FileScanConfig { - object_store_url: ObjectStoreUrl::parse(&glob_storage.store_url()).unwrap(), - file_schema: self.schema.clone(), - file_groups: partitioned_files, - statistics, - projection: projection.cloned(), - limit, - output_ordering: vec![vec![sort_expr]], - table_partition_cols: Vec::new(), - infinite_source: false, - }, - filters.as_ref(), - ) - .await?; - - Ok(Some(plan)) } + + Ok(manifest_files) } fn partitioned_files( @@ -288,23 +284,32 @@ impl TableProvider for StandardTableProvider { filters: &[Expr], limit: Option, ) -> Result, DataFusionError> { + let mut memory_exec = None; + let mut cache_exec = None; + let time_filters = extract_primary_filter(filters); if time_filters.is_empty() { return Err(DataFusionError::Plan("potentially unbounded query on time range. Table scanning requires atleast one time bound".to_string())); } - let memtable = if include_now(filters) { - event::STREAM_WRITERS.recordbatches_cloned(&self.stream, &self.schema) - } else { - None + if include_now(filters) { + if let Some(records) = + event::STREAM_WRITERS.recordbatches_cloned(&self.stream, &self.schema) + { + let reversed_mem_table = reversed_mem_table(records, self.schema.clone())?; + memory_exec = Some( + reversed_mem_table + .scan(state, projection, filters, limit) + .await?, + ); + } }; - let storage = state + let object_store = state .runtime_env() .object_store_registry .get_store(&self.url) .unwrap(); - let glob_storage = CONFIG.storage().get_object_store(); // Fetch snapshot @@ -313,34 +318,90 @@ impl TableProvider for StandardTableProvider { .await .map_err(|err| DataFusionError::Plan(err.to_string()))?; - let remote_table = if is_overlapping_query(&snapshot.manifest_list, &time_filters) { - // Is query timerange is overlapping with older data. - if let Some(table) = ListingTableBuilder::new(self.stream.clone()) - .populate_via_listing(glob_storage.clone(), storage, &time_filters) - .await? - .build(self.schema.clone(), |x| glob_storage.query_prefixes(x))? - { - Some(table.scan(state, projection, filters, limit).await?) - } else { - None - } - } else { - self.remote_physical_plan( + // Is query timerange is overlapping with older data. + if is_overlapping_query(&snapshot.manifest_list, &time_filters) { + return legacy_listing_table( + self.stream.clone(), + memory_exec, glob_storage, - storage, - &snapshot, + object_store, &time_filters, + self.schema.clone(), + state, + projection, + filters, + limit, + ) + .await; + } + + let mut manifest_files = + collect_from_snapshot(&snapshot, &time_filters, object_store, filters, limit).await?; + + if manifest_files.is_empty() { + return final_plan(vec![memory_exec], projection, self.schema.clone()); + } + + if let Some(cache_manager) = LocalCacheManager::global() { + let (cached, remainder) = cache_manager + .partition_on_cached(&self.stream, manifest_files, |file| &file.file_path) + .await + .map_err(|err| DataFusionError::External(Box::new(err)))?; + + manifest_files = remainder; + + let cached = cached + .into_iter() + .map(|(mut file, cache_path)| { + let cache_path = + object_store::path::Path::from_absolute_path(cache_path).unwrap(); + file.file_path = cache_path.to_string(); + file + }) + .collect(); + + let (partitioned_files, statistics) = partitioned_files(cached, &self.schema, 1); + let plan = create_parquet_physical_plan( + ObjectStoreUrl::parse("file:///").unwrap(), + partitioned_files, + statistics, + self.schema.clone(), projection, filters, limit, state, ) - .await? - }; + .await?; - QueryTableProvider::try_new(memtable, remote_table, self.schema.clone())? - .scan(state, projection, filters, limit) - .await + cache_exec = Some(plan) + } + + if manifest_files.is_empty() { + return final_plan( + vec![memory_exec, cache_exec], + projection, + self.schema.clone(), + ); + } + + let (partitioned_files, statistics) = partitioned_files(manifest_files, &self.schema, 1); + let remote_exec = create_parquet_physical_plan( + ObjectStoreUrl::parse(&glob_storage.store_url()).unwrap(), + partitioned_files, + statistics, + self.schema.clone(), + projection, + filters, + limit, + state, + ) + .await?; + + Ok(final_plan( + vec![memory_exec, cache_exec, Some(remote_exec)], + projection, + self.schema.clone(), + )?) } fn supports_filter_pushdown( @@ -358,6 +419,66 @@ impl TableProvider for StandardTableProvider { } } +#[allow(clippy::too_many_arguments)] +async fn legacy_listing_table( + stream: String, + mem_exec: Option>, + glob_storage: Arc, + object_store: Arc, + time_filters: &[PartialTimeFilter], + schema: Arc, + state: &SessionState, + projection: Option<&Vec>, + filters: &[Expr], + limit: Option, +) -> Result, DataFusionError> { + let remote_table = ListingTableBuilder::new(stream) + .populate_via_listing(glob_storage.clone(), object_store, time_filters) + .and_then(|builder| async { + let table = builder.build(schema.clone(), |x| glob_storage.query_prefixes(x))?; + let res = match table { + Some(table) => Some(table.scan(state, projection, filters, limit).await?), + _ => None, + }; + Ok(res) + }) + .await?; + + final_plan(vec![mem_exec, remote_table], projection, schema) +} + +fn final_plan( + execution_plans: Vec>>, + projection: Option<&Vec>, + schema: Arc, +) -> Result, DataFusionError> { + let mut execution_plans = execution_plans.into_iter().flatten().collect_vec(); + let exec: Arc = if execution_plans.is_empty() { + let schema = match projection { + Some(projection) => Arc::new(schema.project(projection)?), + None => schema, + }; + Arc::new(EmptyExec::new(false, schema)) + } else if execution_plans.len() == 1 { + execution_plans.pop().unwrap() + } else { + Arc::new(UnionExec::new(execution_plans)) + }; + + Ok(exec) +} + +fn reversed_mem_table( + mut records: Vec, + schema: Arc, +) -> Result { + records[..].reverse(); + records + .iter_mut() + .for_each(|batch| *batch = crate::utils::arrow::reverse_reader::reverse(batch)); + MemTable::try_new(schema, vec![records]) +} + #[derive(Debug)] pub enum PartialTimeFilter { Low(Bound), diff --git a/server/src/query/table_provider.rs b/server/src/query/table_provider.rs deleted file mode 100644 index 7ffc0c70f..000000000 --- a/server/src/query/table_provider.rs +++ /dev/null @@ -1,140 +0,0 @@ -/* - * Parseable Server (C) 2022 - 2023 Parseable, Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as - * published by the Free Software Foundation, either version 3 of the - * License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - * - */ - -use async_trait::async_trait; -use datafusion::arrow::datatypes::{Schema, SchemaRef}; -use datafusion::arrow::record_batch::RecordBatch; - -use datafusion::datasource::{MemTable, TableProvider}; -use datafusion::error::DataFusionError; -use datafusion::execution::context::SessionState; -use datafusion::logical_expr::{TableProviderFilterPushDown, TableType}; -use datafusion::physical_plan::empty::EmptyExec; -use datafusion::physical_plan::union::UnionExec; -use datafusion::physical_plan::ExecutionPlan; -use datafusion::prelude::Expr; - -use std::any::Any; -use std::sync::Arc; -use std::vec; - -use crate::utils::arrow::reverse_reader::reverse; - -pub struct QueryTableProvider { - staging: Option, - // remote table - storage: Option>, - schema: Arc, -} - -impl QueryTableProvider { - pub fn try_new( - staging: Option>, - storage: Option>, - schema: Arc, - ) -> Result { - // in place reverse transform - let staging = if let Some(mut staged_batches) = staging { - staged_batches[..].reverse(); - staged_batches - .iter_mut() - .for_each(|batch| *batch = reverse(batch)); - Some(staged_batches) - } else { - None - }; - - let memtable = staging - .map(|records| MemTable::try_new(schema.clone(), vec![records])) - .transpose()?; - - Ok(Self { - staging: memtable, - storage, - schema, - }) - } - - async fn create_physical_plan( - &self, - ctx: &SessionState, - projection: Option<&Vec>, - filters: &[Expr], - limit: Option, - ) -> Result, DataFusionError> { - let mut exec = vec![]; - - if let Some(table) = &self.staging { - exec.push(table.scan(ctx, projection, filters, limit).await?) - } - - if let Some(storage_listing) = self.storage.clone() { - exec.push(storage_listing); - } - - let exec: Arc = if exec.is_empty() { - let schema = match projection { - Some(projection) => Arc::new(self.schema.project(projection)?), - None => self.schema.clone(), - }; - Arc::new(EmptyExec::new(false, schema)) - } else if exec.len() == 1 { - exec.pop().unwrap() - } else { - Arc::new(UnionExec::new(exec)) - }; - - Ok(exec) - } -} - -#[async_trait] -impl TableProvider for QueryTableProvider { - fn as_any(&self) -> &dyn Any { - self - } - - fn schema(&self) -> SchemaRef { - Arc::clone(&self.schema) - } - - fn table_type(&self) -> TableType { - TableType::Base - } - - async fn scan( - &self, - ctx: &SessionState, - projection: Option<&Vec>, - filters: &[Expr], - limit: Option, - ) -> datafusion::error::Result> { - self.create_physical_plan(ctx, projection, filters, limit) - .await - } - - fn supports_filters_pushdown( - &self, - filters: &[&Expr], - ) -> datafusion::error::Result> { - Ok(filters - .iter() - .map(|_| TableProviderFilterPushDown::Inexact) - .collect()) - } -} diff --git a/server/src/storage/localfs.rs b/server/src/storage/localfs.rs index 6c163e21f..b87917423 100644 --- a/server/src/storage/localfs.rs +++ b/server/src/storage/localfs.rs @@ -25,7 +25,7 @@ use std::{ use async_trait::async_trait; use bytes::Bytes; use datafusion::{datasource::listing::ListingTableUrl, execution::runtime_env::RuntimeConfig}; -use fs_extra::file::{move_file, CopyOptions}; +use fs_extra::file::CopyOptions; use futures::{stream::FuturesUnordered, TryStreamExt}; use relative_path::RelativePath; use tokio::fs::{self, DirEntry}; @@ -64,6 +64,10 @@ impl ObjectStorageProvider for FSConfig { Arc::new(LocalFS::new(self.root.clone())) } + fn get_store(&self) -> Arc { + Arc::new(object_store::local::LocalFileSystem::new()) + } + fn get_endpoint(&self) -> String { self.root.to_str().unwrap().to_string() } @@ -189,8 +193,7 @@ impl ObjectStorage for LocalFS { if let Some(path) = to_path.parent() { fs::create_dir_all(path).await? } - let _ = move_file(path, to_path, &op)?; - + let _ = fs_extra::file::copy(path, to_path, &op)?; Ok(()) } diff --git a/server/src/storage/object_storage.rs b/server/src/storage/object_storage.rs index 57fbdd3f3..810ac85ec 100644 --- a/server/src/storage/object_storage.rs +++ b/server/src/storage/object_storage.rs @@ -24,6 +24,7 @@ use super::{ use crate::{ alerts::Alerts, catalog::{self, manifest::Manifest, snapshot::Snapshot}, + localcache::LocalCacheManager, metadata::STREAM_INFO, metrics::{storage::StorageMetrics, STORAGE_SIZE}, option::CONFIG, @@ -35,6 +36,7 @@ use arrow_schema::Schema; use async_trait::async_trait; use bytes::Bytes; use datafusion::{datasource::listing::ListingTableUrl, execution::runtime_env::RuntimeConfig}; +use object_store::ObjectStore; use relative_path::RelativePath; use relative_path::RelativePathBuf; use serde_json::Value; @@ -57,6 +59,7 @@ const MANIFEST_FILE: &str = "manifest.json"; pub trait ObjectStorageProvider: StorageMetrics + std::fmt::Debug { fn get_datafusion_runtime(&self) -> RuntimeConfig; fn get_object_store(&self) -> Arc; + fn get_store(&self) -> Arc; fn get_endpoint(&self) -> String; fn register_store_metrics(&self, handler: &PrometheusMetrics); } @@ -334,20 +337,23 @@ pub trait ObjectStorage: Sync + 'static { .to_str() .expect("filename is valid string"); let file_suffix = str::replacen(filename, ".", "/", 3); - let objectstore_path = format!("{stream}/{file_suffix}"); - let manifest = catalog::create_from_parquet_file( - self.absolute_url(RelativePath::from_path(&objectstore_path).unwrap()) - .to_string(), - file, - ) - .unwrap(); - self.upload_file(&objectstore_path, file).await?; + let stream_relative_path = format!("{stream}/{file_suffix}"); + self.upload_file(&stream_relative_path, file).await?; + let absolute_path = self + .absolute_url(RelativePath::from_path(&stream_relative_path).unwrap()) + .to_string(); let store = CONFIG.storage().get_object_store(); + let manifest = + catalog::create_from_parquet_file(absolute_path.clone(), file).unwrap(); catalog::update_snapshot(store, stream, manifest).await?; - } - - for file in parquet_files { - let _ = fs::remove_file(file); + if let Some(manager) = LocalCacheManager::global() { + manager + .move_to_cache(stream, absolute_path, file.to_owned()) + .await + .unwrap() + } else { + let _ = fs::remove_file(file); + } } } diff --git a/server/src/storage/s3.rs b/server/src/storage/s3.rs index 79b38426d..f4f1f6648 100644 --- a/server/src/storage/s3.rs +++ b/server/src/storage/s3.rs @@ -200,6 +200,12 @@ impl ObjectStorageProvider for S3Config { }) } + fn get_store(&self) -> Arc { + let s3 = self.get_default_builder().build().unwrap(); + // limit objectstore to a concurrent request limit + Arc::new(LimitStore::new(s3, super::MAX_OBJECT_STORE_REQUESTS)) + } + fn get_endpoint(&self) -> String { format!("{}/{}", self.endpoint_url, self.bucket_name) } From 1b722daf9a71ded7cb3a3bd511b92a1f973fde79 Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Mon, 25 Dec 2023 12:42:47 +0530 Subject: [PATCH 02/15] Add banner --- server/src/banner.rs | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/server/src/banner.rs b/server/src/banner.rs index 8cee4a756..7eb160c1a 100644 --- a/server/src/banner.rs +++ b/server/src/banner.rs @@ -18,6 +18,7 @@ */ use crossterm::style::Stylize; +use human_size::SpecificSize; use crate::about; use crate::utils::uid::Uid; @@ -100,5 +101,20 @@ async fn storage_info(config: &Config) { config.staging_dir().to_string_lossy(), storage.get_endpoint(), latency - ) + ); + + if let Some(path) = &config.parseable.local_cache_path { + let size: SpecificSize = + SpecificSize::new(config.parseable.local_cache_size as f64, human_size::Byte) + .unwrap() + .into(); + + eprintln!( + " + Cache: \"{}\" + Cache Size: \"{}\"", + path.display(), + size + ); + } } From 5ff79be812914029d77a30b24db4361eca095b94 Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Mon, 25 Dec 2023 12:45:02 +0530 Subject: [PATCH 03/15] Remove unused --- server/src/localcache.rs | 2 +- server/src/storage/localfs.rs | 4 ---- server/src/storage/object_storage.rs | 2 -- server/src/storage/s3.rs | 6 ------ 4 files changed, 1 insertion(+), 13 deletions(-) diff --git a/server/src/localcache.rs b/server/src/localcache.rs index 106c35de3..8407bd644 100644 --- a/server/src/localcache.rs +++ b/server/src/localcache.rs @@ -72,7 +72,7 @@ impl LocalCacheManager { Some(INSTANCE.get_or_init(|| { let cache_path = cache_path.clone(); std::fs::create_dir_all(&cache_path).unwrap(); - let object_store = CONFIG.storage().get_store(); + let object_store = Arc::new(object_store::local::LocalFileSystem::new()); LocalCacheManager { object_store, cache_path, diff --git a/server/src/storage/localfs.rs b/server/src/storage/localfs.rs index b87917423..5ec98bfcc 100644 --- a/server/src/storage/localfs.rs +++ b/server/src/storage/localfs.rs @@ -64,10 +64,6 @@ impl ObjectStorageProvider for FSConfig { Arc::new(LocalFS::new(self.root.clone())) } - fn get_store(&self) -> Arc { - Arc::new(object_store::local::LocalFileSystem::new()) - } - fn get_endpoint(&self) -> String { self.root.to_str().unwrap().to_string() } diff --git a/server/src/storage/object_storage.rs b/server/src/storage/object_storage.rs index 810ac85ec..1d2a17ba8 100644 --- a/server/src/storage/object_storage.rs +++ b/server/src/storage/object_storage.rs @@ -36,7 +36,6 @@ use arrow_schema::Schema; use async_trait::async_trait; use bytes::Bytes; use datafusion::{datasource::listing::ListingTableUrl, execution::runtime_env::RuntimeConfig}; -use object_store::ObjectStore; use relative_path::RelativePath; use relative_path::RelativePathBuf; use serde_json::Value; @@ -59,7 +58,6 @@ const MANIFEST_FILE: &str = "manifest.json"; pub trait ObjectStorageProvider: StorageMetrics + std::fmt::Debug { fn get_datafusion_runtime(&self) -> RuntimeConfig; fn get_object_store(&self) -> Arc; - fn get_store(&self) -> Arc; fn get_endpoint(&self) -> String; fn register_store_metrics(&self, handler: &PrometheusMetrics); } diff --git a/server/src/storage/s3.rs b/server/src/storage/s3.rs index f4f1f6648..79b38426d 100644 --- a/server/src/storage/s3.rs +++ b/server/src/storage/s3.rs @@ -200,12 +200,6 @@ impl ObjectStorageProvider for S3Config { }) } - fn get_store(&self) -> Arc { - let s3 = self.get_default_builder().build().unwrap(); - // limit objectstore to a concurrent request limit - Arc::new(LimitStore::new(s3, super::MAX_OBJECT_STORE_REQUESTS)) - } - fn get_endpoint(&self) -> String { format!("{}/{}", self.endpoint_url, self.bucket_name) } From e7fec8e2fda3baead5b2bb3213104296ded74565 Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Mon, 25 Dec 2023 12:45:52 +0530 Subject: [PATCH 04/15] Add more parse option --- server/src/option.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/server/src/option.rs b/server/src/option.rs index 6a0be27f4..3dbd672b9 100644 --- a/server/src/option.rs +++ b/server/src/option.rs @@ -654,6 +654,8 @@ pub mod validation { .or(parse_and_map::(s)) .or(parse_and_map::(s)) .or(parse_and_map::(s)) + .or(parse_and_map::(s)) + .or(parse_and_map::(s)) .map_err(|_| "Could not parse given size".to_string()) } } From cbdee0b230593b0d8ef3b050f7a6f75208171785 Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Mon, 25 Dec 2023 13:09:58 +0530 Subject: [PATCH 05/15] Fix banner --- server/src/banner.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/server/src/banner.rs b/server/src/banner.rs index 7eb160c1a..4c2d9f256 100644 --- a/server/src/banner.rs +++ b/server/src/banner.rs @@ -110,9 +110,10 @@ async fn storage_info(config: &Config) { .into(); eprintln!( - " - Cache: \"{}\" + "\ + {:8}Cache: \"{}\" Cache Size: \"{}\"", + "", path.display(), size ); From b659668cb06b2631cee21fd8dc579a47501e58b0 Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Mon, 25 Dec 2023 18:15:10 +0530 Subject: [PATCH 06/15] Add Counter for cache hit --- server/src/metrics/mod.rs | 11 +++++++++++ server/src/query/stream_schema_provider.rs | 2 ++ 2 files changed, 13 insertions(+) diff --git a/server/src/metrics/mod.rs b/server/src/metrics/mod.rs index cc51bbc97..78a05e5a3 100644 --- a/server/src/metrics/mod.rs +++ b/server/src/metrics/mod.rs @@ -67,6 +67,14 @@ pub static QUERY_EXECUTE_TIME: Lazy = Lazy::new(|| { .expect("metric can be created") }); +pub static QUERY_CACHE_HIT: Lazy = Lazy::new(|| { + IntCounterVec::new( + Opts::new("QUERY_CACHE_HIT", "Full Cache hit").namespace(METRICS_NAMESPACE), + &["stream"], + ) + .expect("metric can be created") +}); + pub static ALERTS_STATES: Lazy = Lazy::new(|| { IntCounterVec::new( Opts::new("alerts_states", "Alerts States").namespace(METRICS_NAMESPACE), @@ -91,6 +99,9 @@ fn custom_metrics(registry: &Registry) { registry .register(Box::new(QUERY_EXECUTE_TIME.clone())) .expect("metric can be registered"); + registry + .register(Box::new(QUERY_CACHE_HIT.clone())) + .expect("metric can be registered"); registry .register(Box::new(ALERTS_STATES.clone())) .expect("metric can be registered"); diff --git a/server/src/query/stream_schema_provider.rs b/server/src/query/stream_schema_provider.rs index 92214dd7b..365e5319d 100644 --- a/server/src/query/stream_schema_provider.rs +++ b/server/src/query/stream_schema_provider.rs @@ -56,6 +56,7 @@ use crate::{ event::{self, DEFAULT_TIMESTAMP_KEY}, localcache::LocalCacheManager, metadata::STREAM_INFO, + metrics::QUERY_CACHE_HIT, option::CONFIG, storage::ObjectStorage, }; @@ -377,6 +378,7 @@ impl TableProvider for StandardTableProvider { } if manifest_files.is_empty() { + QUERY_CACHE_HIT.with_label_values(&[&self.stream]).inc(); return final_plan( vec![memory_exec, cache_exec], projection, From 8c042305154d1c276e2b2ac9b39ad6f7c67896b2 Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Mon, 25 Dec 2023 18:25:05 +0530 Subject: [PATCH 07/15] Add comment --- server/src/localcache.rs | 1 + server/src/query/stream_schema_provider.rs | 3 +++ 2 files changed, 4 insertions(+) diff --git a/server/src/localcache.rs b/server/src/localcache.rs index 8407bd644..f8e46b49e 100644 --- a/server/src/localcache.rs +++ b/server/src/localcache.rs @@ -35,6 +35,7 @@ pub struct LocalCache { version: String, current_size: u64, capacity: u64, + /// Mapping between storage path and cache path. files: Cache, } diff --git a/server/src/query/stream_schema_provider.rs b/server/src/query/stream_schema_provider.rs index 365e5319d..48128f8e0 100644 --- a/server/src/query/stream_schema_provider.rs +++ b/server/src/query/stream_schema_provider.rs @@ -343,12 +343,15 @@ impl TableProvider for StandardTableProvider { return final_plan(vec![memory_exec], projection, self.schema.clone()); } + // Based on entries in the manifest files, find them in the cache and create a physical plan. if let Some(cache_manager) = LocalCacheManager::global() { let (cached, remainder) = cache_manager .partition_on_cached(&self.stream, manifest_files, |file| &file.file_path) .await .map_err(|err| DataFusionError::External(Box::new(err)))?; + // Assign remaining entries back to manifest list + // This is to be used for remote query manifest_files = remainder; let cached = cached From 4c8037ec344e1a0d133f49fb0bf4e7dde7d07823 Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Mon, 25 Dec 2023 18:36:44 +0530 Subject: [PATCH 08/15] Add size requirement --- server/src/option.rs | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/server/src/option.rs b/server/src/option.rs index 3dbd672b9..f1909ee18 100644 --- a/server/src/option.rs +++ b/server/src/option.rs @@ -29,6 +29,8 @@ use crate::oidc::{self, OpenidConfig}; use crate::storage::{FSConfig, ObjectStorageProvider, S3Config, LOCAL_SYNC_INTERVAL}; use crate::utils::validate_path_is_writeable; +pub const MIN_CACHE_SIZE_BYTES: u64 = 1000u64.pow(3); // 1 GiB + pub static CONFIG: Lazy> = Lazy::new(|| Arc::new(Config::new())); #[derive(Debug)] @@ -606,6 +608,8 @@ pub mod validation { use human_size::SpecificSize; + use crate::option::MIN_CACHE_SIZE_BYTES; + pub fn file_path(s: &str) -> Result { if s.is_empty() { return Err("empty path".to_owned()); @@ -650,12 +654,20 @@ pub mod validation { SpecificSize::::from_str(s).map(|x| x.to_bytes()) } - parse_and_map::(s) + let size = parse_and_map::(s) .or(parse_and_map::(s)) .or(parse_and_map::(s)) .or(parse_and_map::(s)) .or(parse_and_map::(s)) .or(parse_and_map::(s)) - .map_err(|_| "Could not parse given size".to_string()) + .map_err(|_| "Could not parse given size".to_string())?; + + if size < MIN_CACHE_SIZE_BYTES { + return Err( + "Specified value of cache size is smaller than current minimum of 1GiB".to_string(), + ); + } + + Ok(size) } } From 334fc2bc837710bba4fc40de2fcd37db94c18443 Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Mon, 25 Dec 2023 18:50:35 +0530 Subject: [PATCH 09/15] Use capacity from cache manager --- server/src/localcache.rs | 14 +++----------- 1 file changed, 3 insertions(+), 11 deletions(-) diff --git a/server/src/localcache.rs b/server/src/localcache.rs index f8e46b49e..8f7e5fa4e 100644 --- a/server/src/localcache.rs +++ b/server/src/localcache.rs @@ -34,24 +34,18 @@ pub const STREAM_CACHE_FILENAME: &str = ".cache.json"; pub struct LocalCache { version: String, current_size: u64, - capacity: u64, /// Mapping between storage path and cache path. files: Cache, } impl LocalCache { - fn new_with_size(capacity: u64) -> Self { + fn new() -> Self { Self { version: "v1".to_string(), current_size: 0, - capacity, files: Cache::new(100), } } - - fn can_push(&self, size: u64) -> bool { - self.capacity >= self.current_size + size - } } pub struct LocalCacheManager { @@ -97,9 +91,7 @@ impl LocalCacheManager { .await; let cache = match res { Ok(bytes) => serde_json::from_slice(&bytes)?, - Err(object_store::Error::NotFound { .. }) => { - LocalCache::new_with_size(self.cache_capacity) - } + Err(object_store::Error::NotFound { .. }) => LocalCache::new(), Err(err) => return Err(err.into()), }; Ok(cache) @@ -125,7 +117,7 @@ impl LocalCacheManager { let file_size = std::fs::metadata(&cache_path)?.len(); let mut cache = self.get_cache(stream).await?; - while !cache.can_push(file_size) { + while cache.current_size + file_size > self.cache_capacity { if let Some((_, file_for_removal)) = cache.files.pop_lru() { let lru_file_size = std::fs::metadata(&file_for_removal)?.len(); cache.current_size = cache.current_size.saturating_sub(lru_file_size); From 9d990434b7e83db012c80265be3ce5b042e00aa4 Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Mon, 25 Dec 2023 21:28:37 +0530 Subject: [PATCH 10/15] Rename --- server/src/localcache.rs | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/server/src/localcache.rs b/server/src/localcache.rs index 8f7e5fa4e..010b1e3a0 100644 --- a/server/src/localcache.rs +++ b/server/src/localcache.rs @@ -16,13 +16,13 @@ * */ -use std::{io, path::PathBuf, sync::Arc}; +use std::{io, path::PathBuf}; use fs_extra::file::CopyOptions; use futures_util::TryFutureExt; use hashlru::Cache; use itertools::{Either, Itertools}; -use object_store::ObjectStore; +use object_store::{local::LocalFileSystem, ObjectStore}; use once_cell::sync::OnceCell; use tokio::{fs, sync::Mutex}; @@ -49,7 +49,7 @@ impl LocalCache { } pub struct LocalCacheManager { - object_store: Arc, + filesystem: LocalFileSystem, cache_path: PathBuf, cache_capacity: u64, copy_options: CopyOptions, @@ -67,9 +67,8 @@ impl LocalCacheManager { Some(INSTANCE.get_or_init(|| { let cache_path = cache_path.clone(); std::fs::create_dir_all(&cache_path).unwrap(); - let object_store = Arc::new(object_store::local::LocalFileSystem::new()); LocalCacheManager { - object_store, + filesystem: LocalFileSystem::new(), cache_path, cache_capacity: CONFIG.parseable.local_cache_size, copy_options: CopyOptions { @@ -85,7 +84,7 @@ impl LocalCacheManager { pub async fn get_cache(&self, stream: &str) -> Result { let path = cache_file_path(&self.cache_path, stream).unwrap(); let res = self - .object_store + .filesystem .get(&path) .and_then(|resp| resp.bytes()) .await; @@ -100,7 +99,7 @@ impl LocalCacheManager { pub async fn put_cache(&self, stream: &str, cache: &LocalCache) -> Result<(), CacheError> { let path = cache_file_path(&self.cache_path, stream).unwrap(); let bytes = serde_json::to_vec(cache)?.into(); - Ok(self.object_store.put(&path, bytes).await?) + Ok(self.filesystem.put(&path, bytes).await?) } pub async fn move_to_cache( From 8ca5f8aa116c0c6c2fddb7bfc3941b1cf8861c94 Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Tue, 26 Dec 2023 12:08:19 +0530 Subject: [PATCH 11/15] Disable cache for local-store --- server/src/option.rs | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/server/src/option.rs b/server/src/option.rs index f1909ee18..da33b3178 100644 --- a/server/src/option.rs +++ b/server/src/option.rs @@ -64,6 +64,15 @@ impl Config { .exit() } + if server.local_cache_path.is_some() { + parseable_cli_command() + .error( + ErrorKind::ValueValidation, + "Cannot use cache with local-store subcommand.", + ) + .exit() + } + Config { parseable: server, storage: Arc::new(storage), From bd41dce0d170c641f349aa3b1034395d547c2c12 Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Tue, 26 Dec 2023 21:27:49 +0530 Subject: [PATCH 12/15] refactor --- server/src/localcache.rs | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/server/src/localcache.rs b/server/src/localcache.rs index 010b1e3a0..dd5560e52 100644 --- a/server/src/localcache.rs +++ b/server/src/localcache.rs @@ -48,6 +48,12 @@ impl LocalCache { } } +#[derive(Debug, serde::Deserialize, serde::Serialize)] +pub struct CacheMeta { + version: String, + size_capacity: u64, +} + pub struct LocalCacheManager { filesystem: LocalFileSystem, cache_path: PathBuf, @@ -60,9 +66,7 @@ impl LocalCacheManager { pub fn global() -> Option<&'static LocalCacheManager> { static INSTANCE: OnceCell = OnceCell::new(); - let Some(cache_path) = &CONFIG.parseable.local_cache_path else { - return None; - }; + let cache_path = CONFIG.parseable.local_cache_path.as_ref()?; Some(INSTANCE.get_or_init(|| { let cache_path = cache_path.clone(); @@ -164,7 +168,7 @@ fn cache_file_path( stream: &str, ) -> Result { let mut path = root.as_ref().join(stream); - path.set_file_name(STREAM_CACHE_FILENAME); + path.push(STREAM_CACHE_FILENAME); object_store::path::Path::from_absolute_path(path) } From fba9260331cd8f1fa6e7ab347c034063b7f52fd2 Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Wed, 27 Dec 2023 16:52:19 +0530 Subject: [PATCH 13/15] Dynamically set local cache for stream --- server/src/handlers/http.rs | 15 +++++++ server/src/handlers/http/logstream.rs | 27 +++++++++++++ server/src/localcache.rs | 57 +++++++++++++++++++++++++++ server/src/main.rs | 7 ++++ server/src/metadata.rs | 24 ++++++++++- server/src/storage.rs | 3 ++ server/src/storage/object_storage.rs | 52 +++++++++++++++++++----- 7 files changed, 174 insertions(+), 11 deletions(-) diff --git a/server/src/handlers/http.rs b/server/src/handlers/http.rs index dfefd2ace..19250a62e 100644 --- a/server/src/handlers/http.rs +++ b/server/src/handlers/http.rs @@ -200,6 +200,21 @@ pub fn configure_routes( .to(logstream::get_retention) .authorize_for_stream(Action::GetRetention), ), + ) + .service( + web::resource("/cache") + // PUT "/logstream/{logstream}/cache" ==> Set retention for given logstream + .route( + web::put() + .to(logstream::put_enable_cache) + .authorize_for_stream(Action::GetSchema), + ) + // GET "/logstream/{logstream}/cache" ==> Get retention for given logstream + .route( + web::get() + .to(logstream::get_cache_enabled) + .authorize_for_stream(Action::GetSchema), + ), ); // User API diff --git a/server/src/handlers/http/logstream.rs b/server/src/handlers/http/logstream.rs index e1dd1a1cd..97ac52056 100644 --- a/server/src/handlers/http/logstream.rs +++ b/server/src/handlers/http/logstream.rs @@ -227,6 +227,33 @@ pub async fn put_retention( )) } +pub async fn get_cache_enabled(req: HttpRequest) -> Result { + let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); + let cache_enabled = STREAM_INFO.cache_enabled(&stream_name)?; + Ok((web::Json(cache_enabled), StatusCode::OK)) +} + +pub async fn put_enable_cache( + req: HttpRequest, + body: web::Json, +) -> Result { + let enable_cache = body.into_inner(); + let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); + let storage = CONFIG.storage().get_object_store(); + + let mut stream_metadata = storage.get_stream_metadata(&stream_name).await?; + stream_metadata.cache_enabled = enable_cache; + storage + .put_stream_manifest(&stream_name, &stream_metadata) + .await?; + + STREAM_INFO.set_stream_cache(&stream_name, enable_cache)?; + Ok(( + format!("Cache setting updated for log stream {stream_name}"), + StatusCode::OK, + )) +} + pub async fn get_stats(req: HttpRequest) -> Result { let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); diff --git a/server/src/localcache.rs b/server/src/localcache.rs index dd5560e52..d15963f21 100644 --- a/server/src/localcache.rs +++ b/server/src/localcache.rs @@ -29,6 +29,7 @@ use tokio::{fs, sync::Mutex}; use crate::option::CONFIG; pub const STREAM_CACHE_FILENAME: &str = ".cache.json"; +pub const CACHE_META_FILENAME: &str = ".cache_meta.json"; #[derive(Debug, serde::Deserialize, serde::Serialize)] pub struct LocalCache { @@ -54,6 +55,15 @@ pub struct CacheMeta { size_capacity: u64, } +impl CacheMeta { + fn new() -> Self { + Self { + version: "v1".to_string(), + size_capacity: 0, + } + } +} + pub struct LocalCacheManager { filesystem: LocalFileSystem, cache_path: PathBuf, @@ -85,6 +95,46 @@ impl LocalCacheManager { })) } + pub async fn validate(&self, config_capacity: u64) -> Result<(), CacheError> { + fs::create_dir_all(&self.cache_path).await?; + let path = cache_meta_path(&self.cache_path).unwrap(); + let resp = self + .filesystem + .get(&path) + .and_then(|resp| resp.bytes()) + .await; + + let updated_cache = match resp { + Ok(bytes) => { + let mut meta: CacheMeta = serde_json::from_slice(&bytes)?; + if !meta.size_capacity == config_capacity { + meta.size_capacity = config_capacity; + Some(meta) + } else { + None + } + } + Err(object_store::Error::NotFound { .. }) => { + let mut meta = CacheMeta::new(); + meta.size_capacity = config_capacity; + Some(meta) + } + Err(err) => return Err(err.into()), + }; + + if let Some(updated_cache) = updated_cache { + log::info!( + "Cache is updated to new size of {} Bytes", + &updated_cache.size_capacity + ); + self.filesystem + .put(&path, serde_json::to_vec(&updated_cache)?.into()) + .await? + } + + Ok(()) + } + pub async fn get_cache(&self, stream: &str) -> Result { let path = cache_file_path(&self.cache_path, stream).unwrap(); let res = self @@ -172,6 +222,13 @@ fn cache_file_path( object_store::path::Path::from_absolute_path(path) } +fn cache_meta_path( + root: impl AsRef, +) -> Result { + let path = root.as_ref().join(CACHE_META_FILENAME); + object_store::path::Path::from_absolute_path(path) +} + #[derive(Debug, thiserror::Error)] pub enum CacheError { #[error("{0}")] diff --git a/server/src/main.rs b/server/src/main.rs index 4ebd9e545..6856e587e 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -49,6 +49,8 @@ mod validator; use option::CONFIG; +use crate::localcache::LocalCacheManager; + #[actix_web::main] async fn main() -> anyhow::Result<()> { env_logger::init(); @@ -60,6 +62,11 @@ async fn main() -> anyhow::Result<()> { banner::print(&CONFIG, &metadata).await; rbac::map::init(&metadata); metadata.set_global(); + if let Some(cache_manager) = LocalCacheManager::global() { + cache_manager + .validate(CONFIG.parseable.local_cache_size) + .await?; + }; let prometheus = metrics::build_metrics_handler(); CONFIG.storage().register_store_metrics(&prometheus); diff --git a/server/src/metadata.rs b/server/src/metadata.rs index 81855f5c9..5bdfdb515 100644 --- a/server/src/metadata.rs +++ b/server/src/metadata.rs @@ -42,6 +42,7 @@ pub struct StreamInfo(RwLock>); pub struct LogStreamMetadata { pub schema: HashMap>, pub alerts: Alerts, + pub cache_enabled: bool, } // It is very unlikely that panic will occur when dealing with metadata. @@ -80,6 +81,22 @@ impl StreamInfo { Ok(!self.schema(stream_name)?.fields.is_empty()) } + pub fn cache_enabled(&self, stream_name: &str) -> Result { + let map = self.read().expect(LOCK_EXPECT); + map.get(stream_name) + .ok_or(MetadataError::StreamMetaNotFound(stream_name.to_string())) + .map(|metadata| metadata.cache_enabled) + } + + pub fn set_stream_cache(&self, stream_name: &str, enable: bool) -> Result<(), MetadataError> { + let mut map = self.write().expect(LOCK_EXPECT); + let stream = map + .get_mut(stream_name) + .ok_or(MetadataError::StreamMetaNotFound(stream_name.to_string()))?; + stream.cache_enabled = enable; + Ok(()) + } + pub fn schema(&self, stream_name: &str) -> Result, MetadataError> { let map = self.read().expect(LOCK_EXPECT); let schema = map @@ -131,6 +148,7 @@ impl StreamInfo { for stream in storage.list_streams().await? { let alerts = storage.get_alerts(&stream.name).await?; let schema = storage.get_schema(&stream.name).await?; + let meta = storage.get_stream_metadata(&stream.name).await?; let schema = update_schema_from_staging(&stream.name, schema); let schema = HashMap::from_iter( @@ -140,7 +158,11 @@ impl StreamInfo { .map(|v| (v.name().to_owned(), v.clone())), ); - let metadata = LogStreamMetadata { schema, alerts }; + let metadata = LogStreamMetadata { + schema, + alerts, + cache_enabled: meta.cache_enabled, + }; let mut map = self.write().expect(LOCK_EXPECT); diff --git a/server/src/storage.rs b/server/src/storage.rs index 21f054887..10db5770f 100644 --- a/server/src/storage.rs +++ b/server/src/storage.rs @@ -74,6 +74,8 @@ pub struct ObjectStoreFormat { pub stats: Stats, #[serde(default)] pub snapshot: Snapshot, + #[serde(default)] + pub cache_enabled: bool, } #[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] @@ -115,6 +117,7 @@ impl Default for ObjectStoreFormat { permissions: vec![Permisssion::new("parseable".to_string())], stats: Stats::default(), snapshot: Snapshot::default(), + cache_enabled: false, } } } diff --git a/server/src/storage/object_storage.rs b/server/src/storage/object_storage.rs index 1d2a17ba8..e6f713715 100644 --- a/server/src/storage/object_storage.rs +++ b/server/src/storage/object_storage.rs @@ -36,6 +36,7 @@ use arrow_schema::Schema; use async_trait::async_trait; use bytes::Bytes; use datafusion::{datasource::listing::ListingTableUrl, execution::runtime_env::RuntimeConfig}; +use itertools::Itertools; use relative_path::RelativePath; use relative_path::RelativePathBuf; use serde_json::Value; @@ -197,6 +198,15 @@ pub trait ObjectStorage: Sync + 'static { Ok(serde_json::from_slice(&stream_metadata).expect("parseable config is valid json")) } + async fn put_stream_manifest( + &self, + stream_name: &str, + manifest: &ObjectStoreFormat, + ) -> Result<(), ObjectStorageError> { + let path = stream_json_path(stream_name); + self.put_object(&path, to_bytes(manifest)).await + } + async fn get_stats(&self, stream_name: &str) -> Result { let stream_metadata = self.get_object(&stream_json_path(stream_name)).await?; let stream_metadata: Value = @@ -219,7 +229,6 @@ pub trait ObjectStorage: Sync + 'static { .expect("is object") .get("retention") .cloned(); - if let Some(retention) = retention { Ok(serde_json::from_value(retention).unwrap()) } else { @@ -307,10 +316,15 @@ pub trait ObjectStorage: Sync + 'static { } let streams = STREAM_INFO.list_streams(); - let mut stream_stats = HashMap::new(); + let cache_manager = LocalCacheManager::global(); + let mut cache_updates: HashMap<&String, Vec<_>> = HashMap::new(); + for stream in &streams { + let cache_enabled = STREAM_INFO + .cache_enabled(stream) + .map_err(|err| ObjectStorageError::UnhandledError(Box::new(err)))?; let dir = StorageDir::new(stream); let schema = convert_disk_files_to_parquet(stream, &dir) .map_err(|err| ObjectStorageError::UnhandledError(Box::new(err)))?; @@ -328,7 +342,7 @@ pub trait ObjectStorage: Sync + 'static { .or_insert_with(|| compressed_size); }); - for file in &parquet_files { + for file in parquet_files { let filename = file .file_name() .expect("only parquet files are returned by iterator") @@ -336,19 +350,19 @@ pub trait ObjectStorage: Sync + 'static { .expect("filename is valid string"); let file_suffix = str::replacen(filename, ".", "/", 3); let stream_relative_path = format!("{stream}/{file_suffix}"); - self.upload_file(&stream_relative_path, file).await?; + self.upload_file(&stream_relative_path, &file).await?; let absolute_path = self .absolute_url(RelativePath::from_path(&stream_relative_path).unwrap()) .to_string(); let store = CONFIG.storage().get_object_store(); let manifest = - catalog::create_from_parquet_file(absolute_path.clone(), file).unwrap(); + catalog::create_from_parquet_file(absolute_path.clone(), &file).unwrap(); catalog::update_snapshot(store, stream, manifest).await?; - if let Some(manager) = LocalCacheManager::global() { - manager - .move_to_cache(stream, absolute_path, file.to_owned()) - .await - .unwrap() + if cache_enabled && cache_manager.is_some() { + cache_updates + .entry(stream) + .or_default() + .push((absolute_path, file)) } else { let _ = fs::remove_file(file); } @@ -367,6 +381,24 @@ pub trait ObjectStorage: Sync + 'static { } } + if let Some(manager) = cache_manager { + let cache_updates = cache_updates + .into_iter() + .map(|(key, value)| (key.to_owned(), value)) + .collect_vec(); + + tokio::spawn(async move { + for (stream, files) in cache_updates { + for (storage_path, file) in files { + manager + .move_to_cache(&stream, storage_path, file.to_owned()) + .await + .unwrap() + } + } + }); + } + Ok(()) } } From 7975ac3f21b1411ef2edfd7ff7abef31db18891c Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Wed, 27 Dec 2023 20:08:23 +0530 Subject: [PATCH 14/15] Add perms --- server/src/rbac/role.rs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/server/src/rbac/role.rs b/server/src/rbac/role.rs index 968e58e34..5251bec65 100644 --- a/server/src/rbac/role.rs +++ b/server/src/rbac/role.rs @@ -29,6 +29,8 @@ pub enum Action { DeleteStream, GetRetention, PutRetention, + GetCacheEnabled, + PutCacheEnabled, PutAlert, GetAlert, PutUser, @@ -101,6 +103,8 @@ impl RoleBuilder { | Action::GetStats | Action::GetRetention | Action::PutRetention + | Action::GetCacheEnabled + | Action::PutCacheEnabled | Action::PutAlert | Action::GetAlert | Action::All => Permission::Stream(action, self.stream.clone().unwrap()), @@ -169,6 +173,8 @@ pub mod model { Action::GetStats, Action::GetRetention, Action::PutRetention, + Action::PutCacheEnabled, + Action::GetCacheEnabled, Action::PutAlert, Action::GetAlert, Action::GetAbout, From bebdbf24e39dabe174f52e438e8d25e481cd1b49 Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Wed, 27 Dec 2023 20:11:26 +0530 Subject: [PATCH 15/15] Change action --- server/src/handlers/http.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/src/handlers/http.rs b/server/src/handlers/http.rs index 19250a62e..06fbf1221 100644 --- a/server/src/handlers/http.rs +++ b/server/src/handlers/http.rs @@ -207,13 +207,13 @@ pub fn configure_routes( .route( web::put() .to(logstream::put_enable_cache) - .authorize_for_stream(Action::GetSchema), + .authorize_for_stream(Action::PutCacheEnabled), ) // GET "/logstream/{logstream}/cache" ==> Get retention for given logstream .route( web::get() .to(logstream::get_cache_enabled) - .authorize_for_stream(Action::GetSchema), + .authorize_for_stream(Action::GetCacheEnabled), ), );