diff --git a/server/src/event/writer.rs b/server/src/event/writer.rs index 2d1b46d4e..df7c00983 100644 --- a/server/src/event/writer.rs +++ b/server/src/event/writer.rs @@ -38,7 +38,7 @@ pub static STREAM_WRITERS: Lazy = Lazy::new(WriterTable::default); #[derive(Default)] pub struct Writer { - pub mem: MemWriter<16384>, + pub mem: MemWriter, pub disk: FileWriter, } diff --git a/server/src/event/writer/mem_writer.rs b/server/src/event/writer/mem_writer.rs index 1f5ce4532..ba69177cb 100644 --- a/server/src/event/writer/mem_writer.rs +++ b/server/src/event/writer/mem_writer.rs @@ -23,22 +23,22 @@ use arrow_schema::Schema; use arrow_select::concat::concat_batches; use itertools::Itertools; -use crate::utils::arrow::adapt_batch; +use crate::{option::CONFIG, utils::arrow::adapt_batch}; /// Structure to keep recordbatches in memory. /// /// Any new schema is updated in the schema map. /// Recordbatches are pushed to mutable buffer first and then concated together and pushed to read buffer #[derive(Debug)] -pub struct MemWriter { +pub struct MemWriter { schema: Schema, // for checking uniqueness of schema schema_map: HashSet, read_buffer: Vec, - mutable_buffer: MutableBuffer, + mutable_buffer: MutableBuffer, } -impl Default for MemWriter { +impl Default for MemWriter { fn default() -> Self { Self { schema: Schema::empty(), @@ -49,7 +49,7 @@ impl Default for MemWriter { } } -impl MemWriter { +impl MemWriter { pub fn push(&mut self, schema_key: &str, rb: RecordBatch) { if !self.schema_map.contains(schema_key) { self.schema_map.insert(schema_key.to_owned()); @@ -83,15 +83,17 @@ fn concat_records(schema: &Arc, record: &[RecordBatch]) -> RecordBatch { } #[derive(Debug, Default)] -struct MutableBuffer { +struct MutableBuffer { pub inner: Vec, pub rows: usize, } -impl MutableBuffer { +impl MutableBuffer { fn push(&mut self, rb: RecordBatch) -> Option> { - if self.rows + rb.num_rows() >= N { - let left = N - self.rows; + let maxima = CONFIG.parseable.records_per_request; + + if self.rows + rb.num_rows() >= maxima { + let left = maxima - self.rows; let right = rb.num_rows() - left; let left_slice = rb.slice(0, left); let right_slice = if left < rb.num_rows() { diff --git a/server/src/option.rs b/server/src/option.rs index 8b3983170..cda8b7a64 100644 --- a/server/src/option.rs +++ b/server/src/option.rs @@ -229,6 +229,9 @@ pub struct Server { /// Parquet compression algorithm pub parquet_compression: Compression, + + /// Max value a record can be before splitting the request + pub records_per_request: usize, } impl FromArgMatches for Server { @@ -247,6 +250,10 @@ impl FromArgMatches for Server { let openid_client_secret = m.get_one::(Self::OPENID_CLIENT_SECRET).cloned(); let openid_issuer = m.get_one::(Self::OPENID_ISSUER).cloned(); + self.records_per_request = m + .get_one(Self::BUFFER_SIZE) + .cloned() + .expect("default value for records per request"); self.address = m .get_one::(Self::ADDRESS) .cloned() @@ -362,6 +369,7 @@ impl Server { pub const PARQUET_COMPRESSION_ALGO: &'static str = "compression-algo"; pub const DEFAULT_USERNAME: &'static str = "admin"; pub const DEFAULT_PASSWORD: &'static str = "admin"; + pub const BUFFER_SIZE: &'static str = "buffer-size"; pub fn local_stream_data_path(&self, stream_name: &str) -> PathBuf { self.local_staging_path.join(stream_name) @@ -509,6 +517,16 @@ impl Server { .value_parser(validation::url) .help("OIDC provider's host address"), ) + .arg( + Arg::new(Self::BUFFER_SIZE) + .long(Self::BUFFER_SIZE) + .env("P_BUFFER_SIZE") + .value_name("NUMBER") + .default_value("16384") + .required(false) + .value_parser(value_parser!(usize)) + .help("buffer size for internal request buffer"), + ) .arg( Arg::new(Self::DOMAIN_URI) .long(Self::DOMAIN_URI)