From 61cd28bb6b760b4dc3cc038f92ab0ce3968d6a2e Mon Sep 17 00:00:00 2001 From: Eshan Chatterjee Date: Tue, 16 Jan 2024 17:17:27 +0530 Subject: [PATCH 01/11] Update writer to wait and acquire a lock --- server/src/event/writer.rs | 29 +++++++++++++++++------------ 1 file changed, 17 insertions(+), 12 deletions(-) diff --git a/server/src/event/writer.rs b/server/src/event/writer.rs index 2d1b46d4e..7eae45277 100644 --- a/server/src/event/writer.rs +++ b/server/src/event/writer.rs @@ -23,6 +23,7 @@ mod mem_writer; use std::{ collections::HashMap, sync::{Arc, Mutex, RwLock}, + time::Duration, }; use crate::utils; @@ -79,7 +80,7 @@ impl WriterTable { Some(stream_writer) => { stream_writer .lock() - .unwrap() + .unwrap() // /yyyyyyyyyyy .push(stream_name, schema_key, record)?; } None => { @@ -121,17 +122,21 @@ impl WriterTable { stream_name: &str, schema: &Arc, ) -> Option> { - let records = self - .0 - .read() - .unwrap() - .get(stream_name)? - .lock() - .unwrap() - .mem - .recordbatch_cloned(schema); - - Some(records) + let read_guard = self.0.read().unwrap(); + let stream_guard = read_guard.get(stream_name)?; + + loop { + match stream_guard.lock() { + Ok(guard) => { + let records = guard.mem.recordbatch_cloned(schema); + return Some(records); + } + Err(_ /*poisoned */) => { + std::thread::sleep(Duration::from_millis(1000 * 10)); + continue; + } + } + } } } From e2e22e692b279e9f8ee11eede98de8ca2794f85c Mon Sep 17 00:00:00 2001 From: Eshan Chatterjee Date: Tue, 16 Jan 2024 19:55:25 +0530 Subject: [PATCH 02/11] Decreased the thread sleep duration to 100ms --- server/src/event/writer.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/event/writer.rs b/server/src/event/writer.rs index 7eae45277..7bd8a9b3e 100644 --- a/server/src/event/writer.rs +++ b/server/src/event/writer.rs @@ -132,7 +132,7 @@ impl WriterTable { return Some(records); } Err(_ /*poisoned */) => { - std::thread::sleep(Duration::from_millis(1000 * 10)); + std::thread::sleep(Duration::from_millis(100)); continue; } } From 0895a1feaea948aeede4d271898d7dd91eda5e9f Mon Sep 17 00:00:00 2001 From: Eshan Chatterjee Date: Tue, 16 Jan 2024 19:56:53 +0530 Subject: [PATCH 03/11] Add the wait for acquiring the mutex lock in append_to_local func --- server/src/event/writer.rs | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/server/src/event/writer.rs b/server/src/event/writer.rs index 7bd8a9b3e..aa052a8b5 100644 --- a/server/src/event/writer.rs +++ b/server/src/event/writer.rs @@ -78,10 +78,18 @@ impl WriterTable { match hashmap_guard.get(stream_name) { Some(stream_writer) => { - stream_writer - .lock() - .unwrap() // /yyyyyyyyyyy - .push(stream_name, schema_key, record)?; + loop { + match stream_writer.lock() { + Ok(mut stream_writer) => { + stream_writer.push(stream_name, schema_key, record)?; + break; + } + Err(_ /*poisoned */) => { + std::thread::sleep(Duration::from_millis(100)); + continue; + } + } + } } None => { drop(hashmap_guard); From a65f4f0c483b713adb27670fa35d64c49a6dba43 Mon Sep 17 00:00:00 2001 From: Eshan Chatterjee Date: Wed, 17 Jan 2024 16:57:37 +0530 Subject: [PATCH 04/11] Removed Generic Buffer Size in MemWriter struct --- server/src/event/writer/mem_writer.rs | 21 +++++++++++++-------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/server/src/event/writer/mem_writer.rs b/server/src/event/writer/mem_writer.rs index 1f5ce4532..f60eed162 100644 --- a/server/src/event/writer/mem_writer.rs +++ b/server/src/event/writer/mem_writer.rs @@ -30,15 +30,15 @@ use crate::utils::arrow::adapt_batch; /// Any new schema is updated in the schema map. /// Recordbatches are pushed to mutable buffer first and then concated together and pushed to read buffer #[derive(Debug)] -pub struct MemWriter { +pub struct MemWriter { schema: Schema, // for checking uniqueness of schema schema_map: HashSet, read_buffer: Vec, - mutable_buffer: MutableBuffer, + mutable_buffer: MutableBuffer, } -impl Default for MemWriter { +impl Default for MemWriter { fn default() -> Self { Self { schema: Schema::empty(), @@ -49,7 +49,7 @@ impl Default for MemWriter { } } -impl MemWriter { +impl MemWriter { pub fn push(&mut self, schema_key: &str, rb: RecordBatch) { if !self.schema_map.contains(schema_key) { self.schema_map.insert(schema_key.to_owned()); @@ -83,15 +83,20 @@ fn concat_records(schema: &Arc, record: &[RecordBatch]) -> RecordBatch { } #[derive(Debug, Default)] -struct MutableBuffer { +struct MutableBuffer { pub inner: Vec, pub rows: usize, } -impl MutableBuffer { +impl MutableBuffer { fn push(&mut self, rb: RecordBatch) -> Option> { - if self.rows + rb.num_rows() >= N { - let left = N - self.rows; + let buf_size = std::env::var("P_BUFFER_SIZE") + .unwrap_or("16384".to_owned()) + .parse::() + .unwrap(); + + if self.rows + rb.num_rows() >= buf_size { + let left = buf_size - self.rows; let right = rb.num_rows() - left; let left_slice = rb.slice(0, left); let right_slice = if left < rb.num_rows() { From 80040a36fbc05e9e640c628095ef64260d5cff50 Mon Sep 17 00:00:00 2001 From: Eshan Chatterjee Date: Wed, 17 Jan 2024 16:58:49 +0530 Subject: [PATCH 05/11] Undo the retry mechanism for getting locks --- server/src/event/writer.rs | 43 +++++++++++++------------------------- 1 file changed, 15 insertions(+), 28 deletions(-) diff --git a/server/src/event/writer.rs b/server/src/event/writer.rs index aa052a8b5..2d1b46d4e 100644 --- a/server/src/event/writer.rs +++ b/server/src/event/writer.rs @@ -23,7 +23,6 @@ mod mem_writer; use std::{ collections::HashMap, sync::{Arc, Mutex, RwLock}, - time::Duration, }; use crate::utils; @@ -78,18 +77,10 @@ impl WriterTable { match hashmap_guard.get(stream_name) { Some(stream_writer) => { - loop { - match stream_writer.lock() { - Ok(mut stream_writer) => { - stream_writer.push(stream_name, schema_key, record)?; - break; - } - Err(_ /*poisoned */) => { - std::thread::sleep(Duration::from_millis(100)); - continue; - } - } - } + stream_writer + .lock() + .unwrap() + .push(stream_name, schema_key, record)?; } None => { drop(hashmap_guard); @@ -130,21 +121,17 @@ impl WriterTable { stream_name: &str, schema: &Arc, ) -> Option> { - let read_guard = self.0.read().unwrap(); - let stream_guard = read_guard.get(stream_name)?; - - loop { - match stream_guard.lock() { - Ok(guard) => { - let records = guard.mem.recordbatch_cloned(schema); - return Some(records); - } - Err(_ /*poisoned */) => { - std::thread::sleep(Duration::from_millis(100)); - continue; - } - } - } + let records = self + .0 + .read() + .unwrap() + .get(stream_name)? + .lock() + .unwrap() + .mem + .recordbatch_cloned(schema); + + Some(records) } } From f80ab5f2ecde363d69b687deffecf613ed906ca4 Mon Sep 17 00:00:00 2001 From: Eshan Chatterjee Date: Wed, 17 Jan 2024 16:59:27 +0530 Subject: [PATCH 06/11] In struct Writer remove hard coded buffer size --- server/src/event/writer.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/event/writer.rs b/server/src/event/writer.rs index 2d1b46d4e..df7c00983 100644 --- a/server/src/event/writer.rs +++ b/server/src/event/writer.rs @@ -38,7 +38,7 @@ pub static STREAM_WRITERS: Lazy = Lazy::new(WriterTable::default); #[derive(Default)] pub struct Writer { - pub mem: MemWriter<16384>, + pub mem: MemWriter, pub disk: FileWriter, } From 76169af2584ff705e948ac5584d0f04b631b6b31 Mon Sep 17 00:00:00 2001 From: Eshan Chatterjee Date: Wed, 17 Jan 2024 17:37:06 +0530 Subject: [PATCH 07/11] Extracted env var name to a static variable and cleaned up unwrap or --- server/src/event/writer/mem_writer.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/server/src/event/writer/mem_writer.rs b/server/src/event/writer/mem_writer.rs index f60eed162..d7876fe48 100644 --- a/server/src/event/writer/mem_writer.rs +++ b/server/src/event/writer/mem_writer.rs @@ -25,6 +25,8 @@ use itertools::Itertools; use crate::utils::arrow::adapt_batch; +static BUF_SIZE: &str = "P_BUFFER_SIZE"; + /// Structure to keep recordbatches in memory. /// /// Any new schema is updated in the schema map. @@ -90,8 +92,8 @@ struct MutableBuffer { impl MutableBuffer { fn push(&mut self, rb: RecordBatch) -> Option> { - let buf_size = std::env::var("P_BUFFER_SIZE") - .unwrap_or("16384".to_owned()) + let buf_size = std::env::var(BUF_SIZE) + .unwrap_or_else(|_| String::from("16384")) .parse::() .unwrap(); From 88b5fb104ad6a3c8d9e86d1ad152ee9240089e29 Mon Sep 17 00:00:00 2001 From: Eshan Chatterjee Date: Wed, 17 Jan 2024 21:22:02 +0530 Subject: [PATCH 08/11] Removed access to the env for the maxima value --- server/src/event/writer/mem_writer.rs | 13 ++++--------- 1 file changed, 4 insertions(+), 9 deletions(-) diff --git a/server/src/event/writer/mem_writer.rs b/server/src/event/writer/mem_writer.rs index d7876fe48..155f38ea2 100644 --- a/server/src/event/writer/mem_writer.rs +++ b/server/src/event/writer/mem_writer.rs @@ -23,9 +23,7 @@ use arrow_schema::Schema; use arrow_select::concat::concat_batches; use itertools::Itertools; -use crate::utils::arrow::adapt_batch; - -static BUF_SIZE: &str = "P_BUFFER_SIZE"; +use crate::{utils::arrow::adapt_batch, option::CONFIG}; /// Structure to keep recordbatches in memory. /// @@ -92,13 +90,10 @@ struct MutableBuffer { impl MutableBuffer { fn push(&mut self, rb: RecordBatch) -> Option> { - let buf_size = std::env::var(BUF_SIZE) - .unwrap_or_else(|_| String::from("16384")) - .parse::() - .unwrap(); + let maxima = CONFIG.parseable.records_per_request; - if self.rows + rb.num_rows() >= buf_size { - let left = buf_size - self.rows; + if self.rows + rb.num_rows() >= maxima { + let left = maxima - self.rows; let right = rb.num_rows() - left; let left_slice = rb.slice(0, left); let right_slice = if left < rb.num_rows() { From 559fd02fcb3a23e913d1f4b4041bb90ef6ea25c9 Mon Sep 17 00:00:00 2001 From: Eshan Chatterjee Date: Wed, 17 Jan 2024 21:23:00 +0530 Subject: [PATCH 09/11] Update CLI to accept records per request parameter --- server/src/option.rs | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/server/src/option.rs b/server/src/option.rs index 8b3983170..1de228388 100644 --- a/server/src/option.rs +++ b/server/src/option.rs @@ -229,6 +229,9 @@ pub struct Server { /// Parquet compression algorithm pub parquet_compression: Compression, + + /// Max value a record can be before spliting the request + pub records_per_request: usize, } impl FromArgMatches for Server { @@ -247,6 +250,10 @@ impl FromArgMatches for Server { let openid_client_secret = m.get_one::(Self::OPENID_CLIENT_SECRET).cloned(); let openid_issuer = m.get_one::(Self::OPENID_ISSUER).cloned(); + self.records_per_request = m + .get_one(Self::RECORDS_PER_REQUEST) + .cloned() + .expect("default value for records per request"); self.address = m .get_one::(Self::ADDRESS) .cloned() @@ -362,6 +369,7 @@ impl Server { pub const PARQUET_COMPRESSION_ALGO: &'static str = "compression-algo"; pub const DEFAULT_USERNAME: &'static str = "admin"; pub const DEFAULT_PASSWORD: &'static str = "admin"; + pub const RECORDS_PER_REQUEST: &'static str = "records-per-request"; pub fn local_stream_data_path(&self, stream_name: &str) -> PathBuf { self.local_staging_path.join(stream_name) @@ -509,6 +517,16 @@ impl Server { .value_parser(validation::url) .help("OIDC provider's host address"), ) + .arg( + Arg::new(Self::RECORDS_PER_REQUEST) + .long(Self::RECORDS_PER_REQUEST) + .env("P_RECORDS_PER_REQUEST") + .value_name("NUMBER") + .default_value("16384") + .required(false) + .value_parser(value_parser!(usize)) + .help("maximum size records are split up per request"), + ) .arg( Arg::new(Self::DOMAIN_URI) .long(Self::DOMAIN_URI) From b1c74c70cbbbbdb3501876b44edd58b84f907680 Mon Sep 17 00:00:00 2001 From: Eshan Chatterjee Date: Wed, 17 Jan 2024 21:25:17 +0530 Subject: [PATCH 10/11] ran cargo fmt --- server/src/event/writer/mem_writer.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/event/writer/mem_writer.rs b/server/src/event/writer/mem_writer.rs index 155f38ea2..ba69177cb 100644 --- a/server/src/event/writer/mem_writer.rs +++ b/server/src/event/writer/mem_writer.rs @@ -23,7 +23,7 @@ use arrow_schema::Schema; use arrow_select::concat::concat_batches; use itertools::Itertools; -use crate::{utils::arrow::adapt_batch, option::CONFIG}; +use crate::{option::CONFIG, utils::arrow::adapt_batch}; /// Structure to keep recordbatches in memory. /// From 013c6b0e34a96b8ec0a8d82eac8138a95689d3c7 Mon Sep 17 00:00:00 2001 From: Nitish Tiwari Date: Thu, 18 Jan 2024 14:56:07 +0530 Subject: [PATCH 11/11] temp --- server/src/option.rs | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/server/src/option.rs b/server/src/option.rs index 1de228388..cda8b7a64 100644 --- a/server/src/option.rs +++ b/server/src/option.rs @@ -230,7 +230,7 @@ pub struct Server { /// Parquet compression algorithm pub parquet_compression: Compression, - /// Max value a record can be before spliting the request + /// Max value a record can be before splitting the request pub records_per_request: usize, } @@ -251,7 +251,7 @@ impl FromArgMatches for Server { let openid_issuer = m.get_one::(Self::OPENID_ISSUER).cloned(); self.records_per_request = m - .get_one(Self::RECORDS_PER_REQUEST) + .get_one(Self::BUFFER_SIZE) .cloned() .expect("default value for records per request"); self.address = m @@ -369,7 +369,7 @@ impl Server { pub const PARQUET_COMPRESSION_ALGO: &'static str = "compression-algo"; pub const DEFAULT_USERNAME: &'static str = "admin"; pub const DEFAULT_PASSWORD: &'static str = "admin"; - pub const RECORDS_PER_REQUEST: &'static str = "records-per-request"; + pub const BUFFER_SIZE: &'static str = "buffer-size"; pub fn local_stream_data_path(&self, stream_name: &str) -> PathBuf { self.local_staging_path.join(stream_name) @@ -518,14 +518,14 @@ impl Server { .help("OIDC provider's host address"), ) .arg( - Arg::new(Self::RECORDS_PER_REQUEST) - .long(Self::RECORDS_PER_REQUEST) - .env("P_RECORDS_PER_REQUEST") + Arg::new(Self::BUFFER_SIZE) + .long(Self::BUFFER_SIZE) + .env("P_BUFFER_SIZE") .value_name("NUMBER") .default_value("16384") .required(false) .value_parser(value_parser!(usize)) - .help("maximum size records are split up per request"), + .help("buffer size for internal request buffer"), ) .arg( Arg::new(Self::DOMAIN_URI)