Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 31 additions & 24 deletions server/src/event/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use crate::storage::StorageDir;
use self::errors::StreamWriterError;

type ArrowWriter<T> = StreamWriter<T>;
type LocalWriter<T> = Mutex<Option<ArrowWriter<T>>>;
type LocalWriter<T, S> = Mutex<Option<(usize, Vec<S>, ArrowWriter<T>)>>;

lazy_static! {
#[derive(Default)]
Expand All @@ -41,29 +41,37 @@ impl STREAM_WRITERS {
// append to a existing stream
pub fn append_to_local(
stream: &str,
schema_key: &str,
schema_key: &String,
record: &RecordBatch,
) -> Result<(), StreamWriterError> {
let hashmap_guard = STREAM_WRITERS
.read()
.map_err(|_| StreamWriterError::RwPoisoned)?;

match hashmap_guard.get(stream, schema_key) {
match hashmap_guard.get(stream) {
Some(localwriter) => {
let mut writer_guard = localwriter
.lock()
.map_err(|_| StreamWriterError::MutexPoisoned)?;

// if it's some writer then we write without dropping any lock
// hashmap cannot be brought mutably at any point until this finishes
if let Some(ref mut writer) = *writer_guard {
writer.write(record).map_err(StreamWriterError::Writer)?;
if let Some((ref mut order, ref mut hashes, ref mut writer)) = *writer_guard {
if hashes.contains(schema_key) {
writer.write(record).map_err(StreamWriterError::Writer)?;
} else {
*order += 1;
hashes.push(schema_key.to_owned());
*writer = init_new_stream_writer_file(stream, *order, record)?;
}
} else {
// pass on this mutex to set entry so that it can be reused
// we have a guard for underlying entry thus
// hashmap must not be availible as mutable to any other thread
let writer = init_new_stream_writer_file(stream, schema_key, record)?;
writer_guard.replace(writer); // replace the stream writer behind this mutex
let order = 0;
let writer = init_new_stream_writer_file(stream, order, record)?;
writer_guard.replace((order, vec![schema_key.to_owned()], writer));
// replace the stream writer behind this mutex
}
}
// entry is not present thus we create it
Expand All @@ -87,9 +95,13 @@ impl STREAM_WRITERS {
.write()
.map_err(|_| StreamWriterError::RwPoisoned)?;

let writer = init_new_stream_writer_file(&stream, &schema_key, record)?;
let order = StorageDir::new(&stream)
.last_order_by_current_time()
.unwrap_or_default();

hashmap_guard.insert(stream, schema_key, Mutex::new(Some(writer)));
let writer = init_new_stream_writer_file(&stream, order, record)?;

hashmap_guard.insert(stream, Mutex::new(Some((order, vec![schema_key], writer))));

Ok(())
}
Expand All @@ -104,7 +116,7 @@ impl STREAM_WRITERS {
.map_err(|_| StreamWriterError::RwPoisoned)?;

for writer in table.iter() {
if let Some(mut streamwriter) = writer
if let Some((_, _, mut streamwriter)) = writer
.lock()
.map_err(|_| StreamWriterError::MutexPoisoned)?
.take()
Expand All @@ -120,36 +132,31 @@ impl STREAM_WRITERS {
pub struct WriterTable<A, B, T>
where
A: Eq + std::hash::Hash,
B: Eq + std::hash::Hash,
T: Write,
{
table: HashMap<A, HashMap<B, LocalWriter<T>>>,
table: HashMap<A, LocalWriter<T, B>>,
}

impl<A, B, T> WriterTable<A, B, T>
where
A: Eq + std::hash::Hash,
B: Eq + std::hash::Hash,
T: Write,
{
pub fn new() -> Self {
let table = HashMap::new();
Self { table }
}

fn get<X, Y>(&self, a: &X, b: &Y) -> Option<&LocalWriter<T>>
fn get<X>(&self, a: &X) -> Option<&LocalWriter<T, B>>
where
A: Borrow<X>,
B: Borrow<Y>,
X: Eq + std::hash::Hash + ?Sized,
Y: Eq + std::hash::Hash + ?Sized,
{
self.table.get(a)?.get(b)
self.table.get(a)
}

fn insert(&mut self, a: A, b: B, v: LocalWriter<T>) {
let inner = self.table.entry(a).or_default();
inner.insert(b, v);
fn insert(&mut self, a: A, v: LocalWriter<T, B>) {
self.table.insert(a, v);
}

pub fn delete_stream<X>(&mut self, stream: &X)
Expand All @@ -160,18 +167,18 @@ where
self.table.remove(stream);
}

fn iter(&self) -> impl Iterator<Item = &LocalWriter<T>> {
self.table.values().flat_map(|inner| inner.values())
fn iter(&self) -> impl Iterator<Item = &LocalWriter<T, B>> {
self.table.values()
}
}

fn init_new_stream_writer_file(
stream_name: &str,
schema_key: &str,
order: usize,
record: &RecordBatch,
) -> Result<ArrowWriter<std::fs::File>, StreamWriterError> {
let dir = StorageDir::new(stream_name);
let path = dir.path_by_current_time(schema_key);
let path = dir.path_by_current_time(order);

std::fs::create_dir_all(dir.data_path)?;

Expand Down
39 changes: 7 additions & 32 deletions server/src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,20 +26,16 @@ use datafusion::datasource::TableProvider;
use datafusion::prelude::*;
use itertools::Itertools;
use serde_json::Value;
use std::collections::hash_map::RandomState;
use std::collections::HashSet;
use std::path::{Path, PathBuf};
use std::path::Path;
use std::sync::Arc;

use crate::option::CONFIG;
use crate::storage::ObjectStorageError;
use crate::storage::StorageDir;
use crate::storage::{ObjectStorage, OBJECT_STORE_DATA_GRANULARITY};
use crate::utils::TimePeriod;
use crate::validator;

use self::error::{ExecuteError, ParseError};
use table_provider::QueryTableProvider;

type Key = &'static str;
fn get_value(value: &Value, key: Key) -> Result<&str, Key> {
Expand Down Expand Up @@ -89,41 +85,18 @@ impl Query {
&self,
storage: Arc<dyn ObjectStorage + Send>,
) -> Result<(Vec<RecordBatch>, Vec<String>), ExecuteError> {
let dir = StorageDir::new(&self.stream_name);
// take a look at local dir and figure out what local cache we could use for this query
let staging_arrows = dir
.arrow_files_grouped_by_time()
.into_iter()
.filter(|(path, _)| path_intersects_query(path, self.start, self.end))
.sorted_by(|(a, _), (b, _)| Ord::cmp(a, b))
.collect_vec();

let staging_parquet_set: HashSet<&PathBuf, RandomState> =
HashSet::from_iter(staging_arrows.iter().map(|(p, _)| p));

let other_staging_parquet = dir
.parquet_files()
.into_iter()
.filter(|path| path_intersects_query(path, self.start, self.end))
.filter(|path| !staging_parquet_set.contains(path))
.collect_vec();

let ctx = SessionContext::with_config_rt(
SessionConfig::default(),
CONFIG.storage().get_datafusion_runtime(),
);

let table = Arc::new(QueryTableProvider::new(
staging_arrows,
other_staging_parquet,
self.get_prefixes(),
storage,
Arc::new(self.get_schema().clone()),
));
let Some(table) = storage.query_table(self.get_prefixes(), Arc::new(self.get_schema().clone()))? else {
return Ok((Vec::new(), Vec::new()));
};

ctx.register_table(
&*self.stream_name,
Arc::clone(&table) as Arc<dyn TableProvider>,
Arc::new(table) as Arc<dyn TableProvider>,
)
.map_err(ObjectStorageError::DataFusionError)?;
// execute the query and collect results
Expand All @@ -144,11 +117,13 @@ impl Query {
}
}

#[allow(unused)]
fn path_intersects_query(path: &Path, starttime: DateTime<Utc>, endtime: DateTime<Utc>) -> bool {
let time = time_from_path(path);
starttime <= time && time <= endtime
}

#[allow(unused)]
fn time_from_path(path: &Path) -> DateTime<Utc> {
let prefix = path
.file_name()
Expand Down
27 changes: 9 additions & 18 deletions server/src/query/table_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@
*
*/

#![allow(unused)]

use async_trait::async_trait;
use datafusion::arrow::datatypes::{Schema, SchemaRef};
use datafusion::arrow::ipc::reader::StreamReader;
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::datasource::file_format::parquet::ParquetFormat;
use datafusion::datasource::listing::{
Expand All @@ -33,11 +34,10 @@ use datafusion::physical_plan::ExecutionPlan;
use datafusion::prelude::Expr;
use itertools::Itertools;
use std::any::Any;
use std::fs::File;
use std::path::PathBuf;
use std::sync::Arc;

use crate::storage::ObjectStorage;
use crate::storage::{MergedRecordReader, ObjectStorage};

pub struct QueryTableProvider {
// parquet - ( arrow files )
Expand Down Expand Up @@ -88,10 +88,11 @@ impl QueryTableProvider {
let mut parquet_files = Vec::new();

for (staging_parquet, arrow_files) in &self.staging_arrows {
if !load_arrows(arrow_files, &self.schema, &mut mem_records) {
if !load_arrows(arrow_files, Arc::clone(&self.schema), &mut mem_records) {
parquet_files.push(staging_parquet.clone())
}
}

parquet_files.extend(self.other_staging_parquet.clone());

let memtable = MemTable::try_new(Arc::clone(&self.schema), mem_records)?;
Expand Down Expand Up @@ -204,21 +205,11 @@ fn local_parquet_table(parquet_files: &[PathBuf], schema: &SchemaRef) -> Option<

fn load_arrows(
files: &[PathBuf],
schema: &Schema,
schema: Arc<Schema>,
mem_records: &mut Vec<Vec<RecordBatch>>,
) -> bool {
let mut stream_readers = Vec::with_capacity(files.len());

for file in files {
let Ok(arrow_file) = File::open(file) else { return false; };
let Ok(reader)= StreamReader::try_new(arrow_file, None) else { return false; };
stream_readers.push(reader);
}

let reader = crate::storage::MergedRecordReader {
readers: stream_readers,
};
let records = reader.merged_iter(schema).collect();
mem_records.push(records);
let Ok(reader) = MergedRecordReader::try_new(files.to_owned()) else { return false };
let Ok(iter ) = reader.get_owned_iterator(schema) else { return false };
mem_records.push(iter.collect());
true
}
34 changes: 27 additions & 7 deletions server/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,18 +221,18 @@ impl StorageDir {
format!("{local_uri}{hostname}.data.arrows")
}

fn filename_by_time(stream_hash: &str, time: NaiveDateTime) -> String {
format!("{}.{}", stream_hash, Self::file_time_suffix(time))
fn filename_by_time(order: usize, time: NaiveDateTime) -> String {
let order_prefix = format!("{order:03}");
format!("{}.{}", order_prefix, Self::file_time_suffix(time))
}

fn filename_by_current_time(stream_hash: &str) -> String {
fn filename_by_current_time(order: usize) -> String {
let datetime = Utc::now();
Self::filename_by_time(stream_hash, datetime.naive_utc())
Self::filename_by_time(order, datetime.naive_utc())
}

pub fn path_by_current_time(&self, stream_hash: &str) -> PathBuf {
self.data_path
.join(Self::filename_by_current_time(stream_hash))
pub fn path_by_current_time(&self, order: usize) -> PathBuf {
self.data_path.join(Self::filename_by_current_time(order))
}

pub fn arrow_files(&self) -> Vec<PathBuf> {
Expand All @@ -248,6 +248,7 @@ impl StorageDir {
paths
}

#[allow(unused)]
pub fn arrow_files_grouped_by_time(&self) -> HashMap<PathBuf, Vec<PathBuf>> {
// hashmap <time, vec[paths]>
let mut grouped_arrow_file: HashMap<PathBuf, Vec<PathBuf>> = HashMap::new();
Expand Down Expand Up @@ -308,6 +309,25 @@ impl StorageDir {
parquet_path.set_extension("parquet");
parquet_path
}

pub fn last_order_by_current_time(&self) -> Option<usize> {
let time = Utc::now().naive_utc();
let hot_filename = StorageDir::file_time_suffix(time);
let arrow_files = self.arrow_files();

arrow_files
.iter()
.filter_map(|path| {
let filename = path.file_name().unwrap().to_str().unwrap();
if filename.ends_with(&hot_filename) {
let (number, _) = filename.split_once('.').unwrap();
Some(number.parse().unwrap())
} else {
None
}
})
.max()
}
}

#[derive(Debug, thiserror::Error)]
Expand Down
Loading