diff --git a/server/src/main.rs b/server/src/main.rs index 769e029ea..0fbf490e6 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -90,7 +90,7 @@ async fn main() -> anyhow::Result<()> { { if std::env::var(DEBUG_PYROSCOPE_URL).is_ok() { let url = std::env::var(DEBUG_PYROSCOPE_URL).ok(); - Some(start_profiling(url.unwrap())); + start_profiling(url.unwrap()); } } diff --git a/server/src/option.rs b/server/src/option.rs index 9df303f6b..27159714b 100644 --- a/server/src/option.rs +++ b/server/src/option.rs @@ -195,6 +195,12 @@ pub struct Server { /// Server should check for update or not pub check_update: bool, + + /// Rows in Parquet Rowgroup + pub row_group_size: usize, + + /// Parquet compression algorithm + pub parquet_compression: Compression, } impl FromArgMatches for Server { @@ -231,6 +237,24 @@ impl FromArgMatches for Server { .get_one::(Self::CHECK_UPDATE) .cloned() .expect("default for check update"); + self.row_group_size = m + .get_one::(Self::ROW_GROUP_SIZE) + .cloned() + .expect("default for row_group size"); + self.parquet_compression = match m + .get_one::(Self::PARQUET_COMPRESSION_ALGO) + .expect("default for compression algo") + .as_str() + { + "uncompressed" => Compression::UNCOMPRESSED, + "snappy" => Compression::SNAPPY, + "gzip" => Compression::GZIP, + "lzo" => Compression::LZO, + "brotli" => Compression::BROTLI, + "lz4" => Compression::LZ4, + "zstd" => Compression::ZSTD, + _ => unreachable!(), + }; Ok(()) } @@ -246,6 +270,8 @@ impl Server { pub const USERNAME: &str = "username"; pub const PASSWORD: &str = "password"; pub const CHECK_UPDATE: &str = "check-update"; + pub const ROW_GROUP_SIZE: &str = "row-group-size"; + pub const PARQUET_COMPRESSION_ALGO: &str = "compression-algo"; pub const DEFAULT_USERNAME: &str = "admin"; pub const DEFAULT_PASSWORD: &str = "admin"; @@ -334,6 +360,60 @@ impl Server { .value_parser(value_parser!(bool)) .help("Password for the basic authentication on the server"), ) + .arg( + Arg::new(Self::ROW_GROUP_SIZE) + .long(Self::ROW_GROUP_SIZE) + .env("P_PARQUET_ROW_GROUP_SIZE") + .value_name("NUMBER") + .required(false) + .default_value("16384") + .value_parser(value_parser!(usize)) + .help("Number of rows in a row groups"), + ) + .arg( + Arg::new(Self::PARQUET_COMPRESSION_ALGO) + .long(Self::PARQUET_COMPRESSION_ALGO) + .env("P_PARQUET_COMPRESSION_ALGO") + .value_name("[UNCOMPRESSED, SNAPPY, GZIP, LZO, BROTLI, LZ4, ZSTD]") + .required(false) + .default_value("lz4") + .value_parser([ + "uncompressed", + "snappy", + "gzip", + "lzo", + "brotli", + "lz4", + "zstd"]) + .help("Parquet compression algorithm"), + ) + } +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)] +#[allow(non_camel_case_types, clippy::upper_case_acronyms)] +pub enum Compression { + UNCOMPRESSED, + SNAPPY, + GZIP, + LZO, + BROTLI, + #[default] + LZ4, + ZSTD, +} + +impl From for datafusion::parquet::basic::Compression { + fn from(value: Compression) -> Self { + match value { + Compression::UNCOMPRESSED => datafusion::parquet::basic::Compression::UNCOMPRESSED, + Compression::SNAPPY => datafusion::parquet::basic::Compression::SNAPPY, + Compression::GZIP => datafusion::parquet::basic::Compression::GZIP, + Compression::LZO => datafusion::parquet::basic::Compression::LZO, + Compression::BROTLI => datafusion::parquet::basic::Compression::BROTLI, + Compression::LZ4 => datafusion::parquet::basic::Compression::LZ4, + Compression::ZSTD => datafusion::parquet::basic::Compression::ZSTD, + } } } diff --git a/server/src/storage/object_storage.rs b/server/src/storage/object_storage.rs index f4013d5ac..c8696868e 100644 --- a/server/src/storage/object_storage.rs +++ b/server/src/storage/object_storage.rs @@ -321,7 +321,10 @@ pub trait ObjectStorage: Sync + 'static { fs::File::create(&parquet_path).map_err(|_| MoveDataError::Create)?; parquet_table.upsert(&parquet_path); - let props = WriterProperties::builder().build(); + let props = WriterProperties::builder() + .set_max_row_group_size(CONFIG.parseable.row_group_size) + .set_compression(CONFIG.parseable.parquet_compression.into()) + .build(); let schema = Arc::new(record_reader.merged_schema()); let mut writer = ArrowWriter::try_new(parquet_file, schema.clone(), Some(props))?;