From 9995150a6a2f6e8aad622399e249216c1402d12c Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Fri, 10 Mar 2023 14:34:54 +0530 Subject: [PATCH 1/2] Add option to set row group size and compression --- server/src/main.rs | 2 +- server/src/option.rs | 80 ++++++++++++++++++++++++++++ server/src/storage/object_storage.rs | 5 +- 3 files changed, 85 insertions(+), 2 deletions(-) diff --git a/server/src/main.rs b/server/src/main.rs index 769e029ea..0fbf490e6 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -90,7 +90,7 @@ async fn main() -> anyhow::Result<()> { { if std::env::var(DEBUG_PYROSCOPE_URL).is_ok() { let url = std::env::var(DEBUG_PYROSCOPE_URL).ok(); - Some(start_profiling(url.unwrap())); + start_profiling(url.unwrap()); } } diff --git a/server/src/option.rs b/server/src/option.rs index 9df303f6b..122fb940e 100644 --- a/server/src/option.rs +++ b/server/src/option.rs @@ -195,6 +195,12 @@ pub struct Server { /// Server should check for update or not pub check_update: bool, + + /// Rows in Parquet Rowgroup + pub row_group_size: usize, + + /// Parquet compression algorithm + pub parquet_compression: Compression, } impl FromArgMatches for Server { @@ -231,6 +237,24 @@ impl FromArgMatches for Server { .get_one::(Self::CHECK_UPDATE) .cloned() .expect("default for check update"); + self.row_group_size = m + .get_one::(Self::ROW_GROUP_SIZE) + .cloned() + .expect("default for row_group size"); + self.parquet_compression = match m + .get_one::(Self::PARQUET_COMPRESSION_ALGO) + .expect("default for compression algo") + .as_str() + { + "uncompressed" => Compression::UNCOMPRESSED, + "snappy" => Compression::SNAPPY, + "gzip" => Compression::GZIP, + "lzo" => Compression::LZO, + "brotli" => Compression::BROTLI, + "lz4" => Compression::LZ4, + "zstd" => Compression::ZSTD, + _ => unreachable!(), + }; Ok(()) } @@ -246,6 +270,8 @@ impl Server { pub const USERNAME: &str = "username"; pub const PASSWORD: &str = "password"; pub const CHECK_UPDATE: &str = "check-update"; + pub const ROW_GROUP_SIZE: &str = "row-group-size"; + pub const PARQUET_COMPRESSION_ALGO: &str = "compression-algo"; pub const DEFAULT_USERNAME: &str = "admin"; pub const DEFAULT_PASSWORD: &str = "admin"; @@ -334,6 +360,60 @@ impl Server { .value_parser(value_parser!(bool)) .help("Password for the basic authentication on the server"), ) + .arg( + Arg::new(Self::ROW_GROUP_SIZE) + .long(Self::ROW_GROUP_SIZE) + .env("P_ROW_GROUP_SIZE") + .value_name("NUMBER") + .required(false) + .default_value("16384") + .value_parser(value_parser!(usize)) + .help("Number of rows in a row groups"), + ) + .arg( + Arg::new(Self::PARQUET_COMPRESSION_ALGO) + .long(Self::PARQUET_COMPRESSION_ALGO) + .env("P_PARQUET_COMPRESSION_ALGO") + .value_name("[UNCOMPRESSED, SNAPPY, GZIP, LZO, BROTLI, LZ4, ZSTD]") + .required(false) + .default_value("lz4") + .value_parser([ + "uncompressed", + "snappy", + "gzip", + "lzo", + "brotli", + "lz4", + "zstd"]) + .help("Parquet compression algorithm"), + ) + } +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)] +#[allow(non_camel_case_types, clippy::upper_case_acronyms)] +pub enum Compression { + UNCOMPRESSED, + SNAPPY, + GZIP, + LZO, + BROTLI, + #[default] + LZ4, + ZSTD, +} + +impl From for datafusion::parquet::basic::Compression { + fn from(value: Compression) -> Self { + match value { + Compression::UNCOMPRESSED => datafusion::parquet::basic::Compression::UNCOMPRESSED, + Compression::SNAPPY => datafusion::parquet::basic::Compression::SNAPPY, + Compression::GZIP => datafusion::parquet::basic::Compression::GZIP, + Compression::LZO => datafusion::parquet::basic::Compression::LZO, + Compression::BROTLI => datafusion::parquet::basic::Compression::BROTLI, + Compression::LZ4 => datafusion::parquet::basic::Compression::LZ4, + Compression::ZSTD => datafusion::parquet::basic::Compression::ZSTD, + } } } diff --git a/server/src/storage/object_storage.rs b/server/src/storage/object_storage.rs index f4013d5ac..c8696868e 100644 --- a/server/src/storage/object_storage.rs +++ b/server/src/storage/object_storage.rs @@ -321,7 +321,10 @@ pub trait ObjectStorage: Sync + 'static { fs::File::create(&parquet_path).map_err(|_| MoveDataError::Create)?; parquet_table.upsert(&parquet_path); - let props = WriterProperties::builder().build(); + let props = WriterProperties::builder() + .set_max_row_group_size(CONFIG.parseable.row_group_size) + .set_compression(CONFIG.parseable.parquet_compression.into()) + .build(); let schema = Arc::new(record_reader.merged_schema()); let mut writer = ArrowWriter::try_new(parquet_file, schema.clone(), Some(props))?; From 100480bb7b0ab8b0a059559c7e1abd34f83a3359 Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Fri, 10 Mar 2023 15:00:43 +0530 Subject: [PATCH 2/2] Change to P_PARQUET_ROW_GROUP_SIZE --- server/src/option.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/option.rs b/server/src/option.rs index 122fb940e..27159714b 100644 --- a/server/src/option.rs +++ b/server/src/option.rs @@ -363,7 +363,7 @@ impl Server { .arg( Arg::new(Self::ROW_GROUP_SIZE) .long(Self::ROW_GROUP_SIZE) - .env("P_ROW_GROUP_SIZE") + .env("P_PARQUET_ROW_GROUP_SIZE") .value_name("NUMBER") .required(false) .default_value("16384")