From a7f827f0548ba20e2c13b7b4256fc582f7d6db4c Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Wed, 17 May 2023 18:48:34 +0530 Subject: [PATCH] Remove mem ingestion and query --- server/src/event/writer.rs | 101 ++---------------- server/src/event/writer/mem_writer.rs | 118 --------------------- server/src/query.rs | 44 +------- server/src/query/table_provider.rs | 143 -------------------------- server/src/storage/object_storage.rs | 15 +-- server/src/storage/staging.rs | 57 +--------- 6 files changed, 18 insertions(+), 460 deletions(-) delete mode 100644 server/src/event/writer/mem_writer.rs delete mode 100644 server/src/query/table_provider.rs diff --git a/server/src/event/writer.rs b/server/src/event/writer.rs index caf5a9246..2b44fc542 100644 --- a/server/src/event/writer.rs +++ b/server/src/event/writer.rs @@ -18,63 +18,21 @@ */ mod file_writer; -mod mem_writer; use std::{ collections::HashMap, sync::{Mutex, RwLock}, }; -use crate::{ - option::CONFIG, - storage::staging::{self, ReadBuf}, -}; - use self::{errors::StreamWriterError, file_writer::FileWriter}; use arrow_array::RecordBatch; -use chrono::{NaiveDateTime, Utc}; use derive_more::{Deref, DerefMut}; -use mem_writer::MemWriter; use once_cell::sync::Lazy; -type InMemWriter = MemWriter<8192>; - pub static STREAM_WRITERS: Lazy = Lazy::new(WriterTable::default); -pub enum StreamWriter { - Mem(InMemWriter), - Disk(FileWriter, InMemWriter), -} - -impl StreamWriter { - pub fn push( - &mut self, - stream_name: &str, - schema_key: &str, - rb: RecordBatch, - ) -> Result<(), StreamWriterError> { - match self { - StreamWriter::Mem(mem) => { - mem.push(schema_key, rb); - } - StreamWriter::Disk(disk, mem) => { - disk.push(stream_name, schema_key, &rb)?; - mem.push(schema_key, rb); - } - } - Ok(()) - } -} - -// Each entry in writer table is initialized with some context -// This is helpful for generating prefix when writer is finalized -pub struct WriterContext { - stream_name: String, - time: NaiveDateTime, -} - #[derive(Deref, DerefMut, Default)] -pub struct WriterTable(RwLock, WriterContext)>>); +pub struct WriterTable(RwLock>>); impl WriterTable { // append to a existing stream @@ -87,36 +45,26 @@ impl WriterTable { let hashmap_guard = self.read().unwrap(); match hashmap_guard.get(stream_name) { - Some((stream_writer, _)) => { + Some(stream_writer) => { stream_writer .lock() .unwrap() - .push(stream_name, schema_key, record)?; + .push(stream_name, schema_key, &record)?; } None => { drop(hashmap_guard); let mut map = self.write().unwrap(); // check for race condition // if map contains entry then just - if let Some((writer, _)) = map.get(stream_name) { + if let Some(writer) = map.get(stream_name) { writer .lock() .unwrap() - .push(stream_name, schema_key, record)?; + .push(stream_name, schema_key, &record)?; } else { - // there is no entry so this can be inserted safely - let context = WriterContext { - stream_name: stream_name.to_owned(), - time: Utc::now().naive_utc(), - }; - let mut writer = if CONFIG.parseable.in_mem_ingestion { - StreamWriter::Mem(InMemWriter::default()) - } else { - StreamWriter::Disk(FileWriter::default(), InMemWriter::default()) - }; - - writer.push(stream_name, schema_key, record)?; - map.insert(stream_name.to_owned(), (Mutex::new(writer), context)); + let mut writer = FileWriter::default(); + writer.push(stream_name, schema_key, &record)?; + map.insert(stream_name.to_owned(), Mutex::new(writer)); } } }; @@ -131,40 +79,11 @@ impl WriterTable { let mut table = self.write().unwrap(); let map = std::mem::take(&mut *table); drop(table); - for (writer, context) in map.into_values() { + for writer in map.into_values() { let writer = writer.into_inner().unwrap(); - match writer { - StreamWriter::Mem(mem) => { - let rb = mem.finalize(); - let mut read_bufs = staging::MEMORY_READ_BUFFERS.write().unwrap(); - - read_bufs - .entry(context.stream_name) - .or_insert(Vec::default()) - .push(ReadBuf { - time: context.time, - buf: rb, - }); - } - StreamWriter::Disk(disk, _) => disk.close_all(), - } + writer.close_all(); } } - - pub fn clone_read_buf(&self, stream_name: &str) -> Option { - let hashmap_guard = self.read().unwrap(); - let (writer, context) = hashmap_guard.get(stream_name)?; - let writer = writer.lock().unwrap(); - let mem = match &*writer { - StreamWriter::Mem(mem) => mem, - StreamWriter::Disk(_, mem) => mem, - }; - - Some(ReadBuf { - time: context.time, - buf: mem.recordbatch_cloned(), - }) - } } pub mod errors { diff --git a/server/src/event/writer/mem_writer.rs b/server/src/event/writer/mem_writer.rs deleted file mode 100644 index 4bd159f46..000000000 --- a/server/src/event/writer/mem_writer.rs +++ /dev/null @@ -1,118 +0,0 @@ -/* - * Parseable Server (C) 2022 - 2023 Parseable, Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as - * published by the Free Software Foundation, either version 3 of the - * License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - * - * - */ - -use std::{collections::HashMap, sync::Arc}; - -use arrow_array::{RecordBatch, TimestampMillisecondArray}; -use arrow_schema::Schema; -use arrow_select::concat::concat_batches; -use itertools::kmerge_by; - -use crate::utils::arrow::adapt_batch; - -#[derive(Default)] -pub struct MemWriter { - read_buffer: Vec, - mutable_buffer: HashMap>, -} - -impl MemWriter { - pub fn push(&mut self, schema_key: &str, rb: RecordBatch) { - if self.mutable_buffer.len() + rb.num_rows() > N { - // init new mutable columns with schema of current - let schema = self.current_mutable_schema(); - // replace new mutable buffer with current one as that is full - let mutable_buffer = std::mem::take(&mut self.mutable_buffer); - let batches = mutable_buffer.values().collect(); - self.read_buffer.push(merge_rb(batches, Arc::new(schema))); - } - - if let Some(buf) = self.mutable_buffer.get_mut(schema_key) { - buf.push(rb); - } else { - self.mutable_buffer.insert(schema_key.to_owned(), vec![rb]); - } - } - - pub fn recordbatch_cloned(&self) -> Vec { - let mut read_buffer = self.read_buffer.clone(); - let schema = self.current_mutable_schema(); - let batches = self.mutable_buffer.values().collect(); - let rb = merge_rb(batches, Arc::new(schema)); - let schema = rb.schema(); - if rb.num_rows() > 0 { - read_buffer.push(rb) - } - - read_buffer - .into_iter() - .map(|rb| adapt_batch(&schema, rb)) - .collect() - } - - pub fn finalize(self) -> Vec { - let schema = self.current_mutable_schema(); - let mut read_buffer = self.read_buffer; - let batches = self.mutable_buffer.values().collect(); - let rb = merge_rb(batches, Arc::new(schema)); - let schema = rb.schema(); - if rb.num_rows() > 0 { - read_buffer.push(rb) - } - read_buffer - .into_iter() - .map(|rb| adapt_batch(&schema, rb)) - .collect() - } - - fn current_mutable_schema(&self) -> Schema { - Schema::try_merge( - self.mutable_buffer - .values() - .flat_map(|rb| rb.first()) - .map(|rb| rb.schema().as_ref().clone()), - ) - .unwrap() - } -} - -fn merge_rb(rb: Vec<&Vec>, schema: Arc) -> RecordBatch { - let sorted_rb: Vec = kmerge_by(rb, |a: &&RecordBatch, b: &&RecordBatch| { - let a: &TimestampMillisecondArray = a - .column(0) - .as_any() - .downcast_ref::() - .unwrap(); - - let b: &TimestampMillisecondArray = b - .column(0) - .as_any() - .downcast_ref::() - .unwrap(); - - a.value(0) < b.value(0) - }) - .map(|batch| adapt_batch(&schema, batch.clone())) - .collect(); - - // must be true for this to work - // each rb is of same schema. ( adapt_schema should do this ) - // datatype is same - concat_batches(&schema, sorted_rb.iter()).unwrap() -} diff --git a/server/src/query.rs b/server/src/query.rs index a419925ea..6498ce236 100644 --- a/server/src/query.rs +++ b/server/src/query.rs @@ -16,29 +16,23 @@ * */ -pub mod table_provider; - use chrono::TimeZone; use chrono::{DateTime, Utc}; use datafusion::arrow::datatypes::Schema; use datafusion::arrow::record_batch::RecordBatch; -use datafusion::datasource::TableProvider; use datafusion::prelude::*; use itertools::Itertools; use serde_json::Value; use std::path::Path; use std::sync::Arc; -use crate::event::STREAM_WRITERS; use crate::option::CONFIG; -use crate::storage::staging::{ReadBuf, MEMORY_READ_BUFFERS}; use crate::storage::ObjectStorageError; use crate::storage::{ObjectStorage, OBJECT_STORE_DATA_GRANULARITY}; use crate::utils::TimePeriod; use crate::validator; use self::error::{ExecuteError, ParseError}; -use self::table_provider::QueryTableProvider; type Key = &'static str; fn get_value(value: &Value, key: Key) -> Result<&str, Key> { @@ -90,18 +84,10 @@ impl Query { ); let prefixes = self.get_prefixes(); - let table = QueryTableProvider::new( - prefixes, - storage, - get_all_read_buf(&self.stream_name, self.start, self.end), - Arc::clone(&self.schema), - ); + let Some(table) = storage.query_table(prefixes, Arc::clone(&self.schema))? else { return Ok((Vec::new(), Vec::new())) }; - ctx.register_table( - &*self.stream_name, - Arc::new(table) as Arc, - ) - .map_err(ObjectStorageError::DataFusionError)?; + ctx.register_table(&*self.stream_name, Arc::new(table)) + .map_err(ObjectStorageError::DataFusionError)?; // execute the query and collect results let df = ctx.sql(self.query.as_str()).await?; // dataframe qualifies name by adding table name before columns. \ @@ -181,30 +167,6 @@ pub mod error { } } -fn get_all_read_buf(stream_name: &str, start: DateTime, end: DateTime) -> Vec { - let now = Utc::now(); - let include_mutable = start <= now && now <= end; - // copy from mutable buffer - let mut queryable_read_buffer = Vec::new(); - - if let Some(mem) = MEMORY_READ_BUFFERS.read().unwrap().get(stream_name) { - for read_buffer in mem { - let time = read_buffer.time; - if start.naive_utc() <= time && time <= end.naive_utc() { - queryable_read_buffer.push(read_buffer.clone()) - } - } - } - - if include_mutable { - if let Some(x) = STREAM_WRITERS.clone_read_buf(stream_name) { - queryable_read_buffer.push(x); - } - } - - queryable_read_buffer -} - #[cfg(test)] mod tests { use super::time_from_path; diff --git a/server/src/query/table_provider.rs b/server/src/query/table_provider.rs deleted file mode 100644 index eb79385d1..000000000 --- a/server/src/query/table_provider.rs +++ /dev/null @@ -1,143 +0,0 @@ -/* - * Parseable Server (C) 2022 - 2023 Parseable, Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as - * published by the Free Software Foundation, either version 3 of the - * License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - * - */ - -use async_trait::async_trait; -use datafusion::arrow::datatypes::{Schema, SchemaRef}; -use datafusion::datasource::{MemTable, TableProvider}; -use datafusion::error::DataFusionError; -use datafusion::execution::context::SessionState; -use datafusion::logical_expr::TableType; -use datafusion::physical_plan::empty::EmptyExec; -use datafusion::physical_plan::union::UnionExec; -use datafusion::physical_plan::ExecutionPlan; -use datafusion::prelude::Expr; -use std::any::Any; -use std::sync::Arc; - -use crate::storage::staging::ReadBuf; -use crate::storage::ObjectStorage; -use crate::utils::arrow::adapt_batch; - -pub struct QueryTableProvider { - storage_prefixes: Vec, - storage: Arc, - readable_buffer: Vec, - schema: Arc, -} - -impl QueryTableProvider { - pub fn new( - storage_prefixes: Vec, - storage: Arc, - readable_buffer: Vec, - schema: Arc, - ) -> Self { - Self { - storage_prefixes, - storage, - readable_buffer, - schema, - } - } - - async fn create_physical_plan( - &self, - ctx: &SessionState, - projection: Option<&Vec>, - filters: &[Expr], - limit: Option, - ) -> Result, DataFusionError> { - let memexec = self.get_mem_exec(ctx, projection, filters, limit).await?; - let table = self - .storage - .query_table(self.storage_prefixes.clone(), Arc::clone(&self.schema))?; - - let mut exec = Vec::new(); - if let Some(memexec) = memexec { - exec.push(memexec); - } - - if let Some(ref storage_listing) = table { - exec.push( - storage_listing - .scan(ctx, projection, filters, limit) - .await?, - ); - } - - if exec.is_empty() { - Ok(Arc::new(EmptyExec::new(false, Arc::clone(&self.schema)))) - } else { - Ok(Arc::new(UnionExec::new(exec))) - } - } - - async fn get_mem_exec( - &self, - ctx: &SessionState, - projection: Option<&Vec>, - filters: &[Expr], - limit: Option, - ) -> Result>, DataFusionError> { - if self.readable_buffer.is_empty() { - return Ok(None); - } - - let mem_records: Vec> = self - .readable_buffer - .iter() - .map(|r| { - r.buf - .iter() - .cloned() - .map(|rb| adapt_batch(&self.schema, rb)) - .collect() - }) - .collect(); - - let memtable = MemTable::try_new(Arc::clone(&self.schema), mem_records)?; - let memexec = memtable.scan(ctx, projection, filters, limit).await?; - Ok(Some(memexec)) - } -} - -#[async_trait] -impl TableProvider for QueryTableProvider { - fn as_any(&self) -> &dyn Any { - self - } - - fn schema(&self) -> SchemaRef { - Arc::clone(&self.schema) - } - - fn table_type(&self) -> TableType { - TableType::Base - } - - async fn scan( - &self, - ctx: &SessionState, - projection: Option<&Vec>, - filters: &[Expr], - limit: Option, - ) -> datafusion::error::Result> { - self.create_physical_plan(ctx, projection, filters, limit) - .await - } -} diff --git a/server/src/storage/object_storage.rs b/server/src/storage/object_storage.rs index ea920378f..39e2b6877 100644 --- a/server/src/storage/object_storage.rs +++ b/server/src/storage/object_storage.rs @@ -17,9 +17,8 @@ */ use super::{ - retention::Retention, - staging::{self, convert_disk_files_to_parquet, convert_mem_to_parquet}, - LogStream, ObjectStorageError, ObjectStoreFormat, Permisssion, StorageDir, StorageMetadata, + retention::Retention, staging::convert_disk_files_to_parquet, LogStream, ObjectStorageError, + ObjectStoreFormat, Permisssion, StorageDir, StorageMetadata, }; use crate::{ alerts::Alerts, @@ -246,16 +245,6 @@ pub trait ObjectStorage: Sync + 'static { let mut stream_stats = HashMap::new(); - for (stream_name, bufs) in staging::take_all_read_bufs() { - for buf in bufs { - let schema = convert_mem_to_parquet(&stream_name, buf) - .map_err(|err| ObjectStorageError::UnhandledError(Box::new(err)))?; - if let Some(schema) = schema { - commit_schema_to_storage(&stream_name, schema).await?; - } - } - } - for stream in &streams { let dir = StorageDir::new(stream); let schema = convert_disk_files_to_parquet(stream, &dir) diff --git a/server/src/storage/staging.rs b/server/src/storage/staging.rs index 7326c82aa..71100a6a7 100644 --- a/server/src/storage/staging.rs +++ b/server/src/storage/staging.rs @@ -22,13 +22,11 @@ use std::{ fs, path::{Path, PathBuf}, process, - sync::{Arc, RwLock}, + sync::Arc, }; -use arrow_array::RecordBatch; use arrow_schema::{ArrowError, Schema}; use chrono::{NaiveDateTime, Timelike, Utc}; -use once_cell::sync::Lazy; use parquet::{ arrow::ArrowWriter, basic::Encoding, @@ -42,36 +40,12 @@ use crate::{ metrics, option::CONFIG, storage::OBJECT_STORE_DATA_GRANULARITY, - utils::{ - self, - arrow::{adapt_batch, MergedRecordReader}, - }, + utils::{self, arrow::MergedRecordReader}, }; const ARROW_FILE_EXTENSION: &str = "data.arrows"; const PARQUET_FILE_EXTENSION: &str = "data.parquet"; -// in mem global that hold all the in mem buffer that are ready to convert -pub static MEMORY_READ_BUFFERS: Lazy>>> = - Lazy::new(RwLock::default); - -// this function takes all the read bufs per stream -pub fn take_all_read_bufs() -> Vec<(String, Vec)> { - let mut res = Vec::new(); - for (stream_name, bufs) in MEMORY_READ_BUFFERS.write().unwrap().iter_mut() { - let stream_name = stream_name.to_owned(); - let bufs = std::mem::take(bufs); - res.push((stream_name, bufs)); - } - res -} - -#[derive(Debug, Clone)] -pub struct ReadBuf { - pub time: NaiveDateTime, - pub buf: Vec, -} - #[derive(Debug)] pub struct StorageDir { pub data_path: PathBuf, @@ -187,6 +161,7 @@ impl StorageDir { } } +#[allow(unused)] pub fn to_parquet_path(stream_name: &str, time: NaiveDateTime) -> PathBuf { let data_path = CONFIG.parseable.local_stream_data_path(stream_name); let dir = StorageDir::file_time_suffix(time, PARQUET_FILE_EXTENSION); @@ -251,32 +226,6 @@ pub fn convert_disk_files_to_parquet( } } -pub fn convert_mem_to_parquet( - stream: &str, - read_buf: ReadBuf, -) -> Result, MoveDataError> { - let ReadBuf { time, buf } = read_buf; - let Some(last_schema) = buf.last().map(|last| last.schema()) else { return Ok(None) }; - let record_reader = buf.into_iter().map(|rb| adapt_batch(&last_schema, rb)); - - let parquet_path = to_parquet_path(stream, time); - if let Some(path) = parquet_path.parent() { - fs::create_dir_all(path)?; - } - let parquet_file = fs::File::create(&parquet_path).map_err(|_| MoveDataError::Create)?; - - let props = parquet_writer_props().build(); - let mut writer = ArrowWriter::try_new(parquet_file, last_schema.clone(), Some(props))?; - - for ref record in record_reader { - writer.write(record)?; - } - - writer.close()?; - - Ok(Some(last_schema.as_ref().clone())) -} - fn parquet_writer_props() -> WriterPropertiesBuilder { WriterProperties::builder() .set_max_row_group_size(CONFIG.parseable.row_group_size)