Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ async fn main() -> anyhow::Result<()> {
if let Err(e) = metadata::STREAM_INFO.load(&storage).await {
warn!("could not populate local metadata. {:?}", e);
}
// track all parquet files already in the data directory
storage::CACHED_FILES.track_parquet();

let (localsync_handler, mut localsync_outbox, localsync_inbox) = run_local_sync();
let (mut s3sync_handler, mut s3sync_outbox, mut s3sync_inbox) = s3_sync();
Expand Down
4 changes: 0 additions & 4 deletions server/src/option.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,10 +245,6 @@ impl<S> Server<S>
where
S: Clone + clap::Args + StorageOpt,
{
pub fn get_cache_path(&self, stream_name: &str) -> PathBuf {
self.local_disk_path.join(stream_name)
}

pub fn local_stream_data_path(&self, stream_name: &str) -> PathBuf {
self.local_disk_path.join(stream_name)
}
Expand Down
240 changes: 204 additions & 36 deletions server/src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,36 @@
*
*/

use arrow_schema::SchemaRef;
use async_trait::async_trait;
use chrono::TimeZone;
use chrono::{DateTime, Utc};
use datafusion::arrow::datatypes::Schema;
use datafusion::arrow::ipc::reader::StreamReader;
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::datasource::file_format::parquet::ParquetFormat;
use datafusion::datasource::listing::ListingOptions;
use datafusion::datasource::listing::ListingTable;
use datafusion::datasource::listing::ListingTableConfig;
use datafusion::datasource::listing::ListingTableUrl;
use datafusion::datasource::{MemTable, TableProvider};
use datafusion::error::DataFusionError;
use datafusion::execution::context::SessionState;
use datafusion::logical_expr::TableType;
use datafusion::physical_plan::union::UnionExec;
use datafusion::physical_plan::ExecutionPlan;
use datafusion::prelude::*;
use serde_json::Value;
use std::any::Any;
use std::fs::File;
use std::path::Path;
use std::path::PathBuf;
use std::sync::Arc;

use crate::option::CONFIG;
use crate::storage;
use crate::storage::ObjectStorage;
use crate::storage::ObjectStorageError;
use crate::storage::StorageDir;
use crate::utils::TimePeriod;
use crate::validator;

Expand Down Expand Up @@ -75,56 +89,201 @@ impl Query {
&self,
storage: &impl ObjectStorage,
) -> Result<Vec<RecordBatch>, ExecuteError> {
let dir = StorageDir::new(&self.stream_name);

// take a look at local dir and figure out what local cache we could use for this query
let arrow_files: Vec<PathBuf> = dir
.arrow_files()
.into_iter()
.filter(|path| path_intersects_query(path, self.start, self.end))
.collect();

let possible_parquet_files = arrow_files.clone().into_iter().map(|mut path| {
path.set_extension("parquet");
path
});

let parquet_files = dir
.parquet_files()
.into_iter()
.filter(|path| path_intersects_query(path, self.start, self.end));

let parquet_files: Vec<PathBuf> = possible_parquet_files.chain(parquet_files).collect();

let mut results = vec![];
storage.query(self, &mut results).await?;

// query cache only if end_time coulld have been after last sync.
let duration_since = Utc::now() - self.end;
if duration_since.num_seconds() < CONFIG.parseable.upload_interval as i64 {
self.execute_on_cache(&mut results).await?;
if !(arrow_files.is_empty() && parquet_files.is_empty()) {
self.execute_on_cache(
arrow_files,
parquet_files,
self.schema.clone(),
&mut results,
)
.await?;
}

storage.query(self, &mut results).await?;
Ok(results)
}

async fn execute_on_cache(&self, results: &mut Vec<RecordBatch>) -> Result<(), ExecuteError> {
async fn execute_on_cache(
&self,
arrow_files: Vec<PathBuf>,
parquet_files: Vec<PathBuf>,
schema: Arc<Schema>,
results: &mut Vec<RecordBatch>,
) -> Result<(), ExecuteError> {
let ctx = SessionContext::new();
let file_format = ParquetFormat::default().with_enable_pruning(true);
let table = Arc::new(QueryTableProvider::new(arrow_files, parquet_files, schema));
ctx.register_table(
&*self.stream_name,
Arc::clone(&table) as Arc<dyn TableProvider>,
)
.map_err(ObjectStorageError::DataFusionError)?;
// execute the query and collect results
let df = ctx.sql(self.query.as_str()).await?;
results.extend(df.collect().await?);
table.remove_preserve();
Ok(())
}
}

let listing_options = ListingOptions {
file_extension: ".parquet".to_owned(),
format: Arc::new(file_format),
table_partition_cols: vec![],
collect_stat: true,
target_partitions: 1,
};
fn path_intersects_query(path: &Path, starttime: DateTime<Utc>, endtime: DateTime<Utc>) -> bool {
let time = time_from_path(path);
starttime <= time && time <= endtime
}

let cache_path = CONFIG.parseable.get_cache_path(&self.stream_name);
fn time_from_path(path: &Path) -> DateTime<Utc> {
let prefix = path
.file_name()
.expect("all given path are file")
.to_str()
.expect("filename is valid");

// substring of filename i.e date=xxxx.hour=xx.minute=xx
let prefix = &prefix[..33];
Utc.datetime_from_str(prefix, "date=%F.hour=%H.minute=%M")
.expect("valid prefix is parsed")
}

let table_path = match ListingTableUrl::parse(
cache_path.to_str().expect("path should is valid unicode"),
) {
Ok(table_path) => table_path,
Err(e) => {
log::warn!("could not parse local filesystem path. Maybe directory does not exist. Error {}", e);
return Ok(());
}
};
#[derive(Debug)]
struct QueryTableProvider {
arrow_files: Vec<PathBuf>,
parquet_files: Vec<PathBuf>,
schema: Arc<Schema>,
}

let config = ListingTableConfig::new(table_path)
.with_listing_options(listing_options)
.with_schema(Arc::clone(&self.schema));
impl QueryTableProvider {
fn new(arrow_files: Vec<PathBuf>, parquet_files: Vec<PathBuf>, schema: Arc<Schema>) -> Self {
// By the time this query executes the arrow files could be converted to parquet files
// we want to preserve these files as well in case

let table = ListingTable::try_new(config)?;
let mut parquet_cached = crate::storage::CACHED_FILES.lock().expect("no poisoning");
for file in &parquet_files {
parquet_cached.upsert(file)
}

ctx.register_table(&*self.stream_name, Arc::new(table))
.map_err(ObjectStorageError::DataFusionError)?;
Self {
arrow_files,
parquet_files,
schema,
}
}

// execute the query and collect results
let df = ctx.sql(self.query.as_str()).await?;
results.extend(df.collect().await?);
pub fn remove_preserve(&self) {
let mut parquet_cached = crate::storage::CACHED_FILES.lock().expect("no poisoning");
for file in &self.parquet_files {
parquet_cached.remove(file)
}
}

Ok(())
pub async fn create_physical_plan(
&self,
ctx: &SessionState,
projection: &Option<Vec<usize>>,
filters: &[Expr],
limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>, DataFusionError> {
let mut mem_records: Vec<Vec<RecordBatch>> = Vec::new();
let mut parquet_files = self.parquet_files.clone();
for file in &self.arrow_files {
let Ok(arrow_file) = File::open(file) else { continue; };
let reader = StreamReader::try_new(arrow_file, None)?;
let records = reader
.filter_map(|record| match record {
Ok(record) => Some(record),
Err(e) => {
log::warn!("warning from arrow stream {:?}", e);
None
}
})
.collect();
mem_records.push(records);

let mut file = file.clone();
file.set_extension("parquet");

parquet_files.retain(|p| p != &file)
}

let memtable = MemTable::try_new(Arc::clone(&self.schema), mem_records)?;
let memexec = memtable.scan(ctx, projection, filters, limit).await?;

if parquet_files.is_empty() {
Ok(memexec)
} else {
let listing_options = ListingOptions {
file_extension: ".parquet".to_owned(),
format: Arc::new(ParquetFormat::default().with_enable_pruning(true)),
table_partition_cols: vec![],
collect_stat: true,
target_partitions: 1,
};

let paths = parquet_files
.clone()
.into_iter()
.map(|path| {
ListingTableUrl::parse(path.to_str().expect("path should is valid unicode"))
.expect("path is valid for filesystem listing")
})
.collect();

let config = ListingTableConfig::new_with_multi_paths(paths)
.with_listing_options(listing_options)
.with_schema(Arc::clone(&self.schema));

let listtable = ListingTable::try_new(config).unwrap();
let listexec = listtable.scan(ctx, projection, filters, limit).await?;

Ok(Arc::new(UnionExec::new(vec![memexec, listexec])))
}
}
}

#[async_trait]
impl TableProvider for QueryTableProvider {
fn as_any(&self) -> &dyn Any {
self
}

fn schema(&self) -> SchemaRef {
Arc::clone(&self.schema)
}

fn table_type(&self) -> TableType {
TableType::Base
}

async fn scan(
&self,
ctx: &SessionState,
projection: &Option<Vec<usize>>,
filters: &[Expr],
limit: Option<usize>,
) -> datafusion::error::Result<Arc<dyn ExecutionPlan>> {
self.create_physical_plan(ctx, projection, filters, limit)
.await
}
}

Expand Down Expand Up @@ -160,14 +319,23 @@ pub mod error {

#[cfg(test)]
mod tests {
use super::Query;
use super::{time_from_path, Query};
use crate::{alerts::Alerts, metadata::STREAM_INFO};
use datafusion::arrow::datatypes::Schema;
use datafusion::arrow::datatypes::{DataType, Field};
use rstest::*;
use serde_json::Value;
use std::path::PathBuf;
use std::str::FromStr;

#[test]
fn test_time_from_parquet_path() {
let path = PathBuf::from("date=2022-01-01.hour=00.minute=00.hostname.data.parquet");
let time = time_from_path(path.as_path());
assert_eq!(time.timestamp(), 1640995200);
}

// Query prefix generation tests
#[fixture]
fn schema() -> Schema {
let field_a = Field::new("a", DataType::Int64, false);
Expand Down
Loading