Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ async fn main() -> anyhow::Result<()> {
{
if std::env::var(DEBUG_PYROSCOPE_URL).is_ok() {
let url = std::env::var(DEBUG_PYROSCOPE_URL).ok();
Some(start_profiling(url.unwrap()));
start_profiling(url.unwrap());
}
}

Expand Down
80 changes: 80 additions & 0 deletions server/src/option.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,12 @@ pub struct Server {

/// Server should check for update or not
pub check_update: bool,

/// Rows in Parquet Rowgroup
pub row_group_size: usize,

/// Parquet compression algorithm
pub parquet_compression: Compression,
}

impl FromArgMatches for Server {
Expand Down Expand Up @@ -231,6 +237,24 @@ impl FromArgMatches for Server {
.get_one::<bool>(Self::CHECK_UPDATE)
.cloned()
.expect("default for check update");
self.row_group_size = m
.get_one::<usize>(Self::ROW_GROUP_SIZE)
.cloned()
.expect("default for row_group size");
self.parquet_compression = match m
.get_one::<String>(Self::PARQUET_COMPRESSION_ALGO)
.expect("default for compression algo")
.as_str()
{
"uncompressed" => Compression::UNCOMPRESSED,
"snappy" => Compression::SNAPPY,
"gzip" => Compression::GZIP,
"lzo" => Compression::LZO,
"brotli" => Compression::BROTLI,
"lz4" => Compression::LZ4,
"zstd" => Compression::ZSTD,
_ => unreachable!(),
};

Ok(())
}
Expand All @@ -246,6 +270,8 @@ impl Server {
pub const USERNAME: &str = "username";
pub const PASSWORD: &str = "password";
pub const CHECK_UPDATE: &str = "check-update";
pub const ROW_GROUP_SIZE: &str = "row-group-size";
pub const PARQUET_COMPRESSION_ALGO: &str = "compression-algo";
pub const DEFAULT_USERNAME: &str = "admin";
pub const DEFAULT_PASSWORD: &str = "admin";

Expand Down Expand Up @@ -334,6 +360,60 @@ impl Server {
.value_parser(value_parser!(bool))
.help("Password for the basic authentication on the server"),
)
.arg(
Arg::new(Self::ROW_GROUP_SIZE)
.long(Self::ROW_GROUP_SIZE)
.env("P_PARQUET_ROW_GROUP_SIZE")
.value_name("NUMBER")
.required(false)
.default_value("16384")
.value_parser(value_parser!(usize))
.help("Number of rows in a row groups"),
)
.arg(
Arg::new(Self::PARQUET_COMPRESSION_ALGO)
.long(Self::PARQUET_COMPRESSION_ALGO)
.env("P_PARQUET_COMPRESSION_ALGO")
.value_name("[UNCOMPRESSED, SNAPPY, GZIP, LZO, BROTLI, LZ4, ZSTD]")
.required(false)
.default_value("lz4")
.value_parser([
"uncompressed",
"snappy",
"gzip",
"lzo",
"brotli",
"lz4",
"zstd"])
.help("Parquet compression algorithm"),
)
}
}

#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
#[allow(non_camel_case_types, clippy::upper_case_acronyms)]
pub enum Compression {
UNCOMPRESSED,
SNAPPY,
GZIP,
LZO,
BROTLI,
#[default]
LZ4,
ZSTD,
}

impl From<Compression> for datafusion::parquet::basic::Compression {
fn from(value: Compression) -> Self {
match value {
Compression::UNCOMPRESSED => datafusion::parquet::basic::Compression::UNCOMPRESSED,
Compression::SNAPPY => datafusion::parquet::basic::Compression::SNAPPY,
Compression::GZIP => datafusion::parquet::basic::Compression::GZIP,
Compression::LZO => datafusion::parquet::basic::Compression::LZO,
Compression::BROTLI => datafusion::parquet::basic::Compression::BROTLI,
Compression::LZ4 => datafusion::parquet::basic::Compression::LZ4,
Compression::ZSTD => datafusion::parquet::basic::Compression::ZSTD,
}
}
}

Expand Down
5 changes: 4 additions & 1 deletion server/src/storage/object_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,10 @@ pub trait ObjectStorage: Sync + 'static {
fs::File::create(&parquet_path).map_err(|_| MoveDataError::Create)?;
parquet_table.upsert(&parquet_path);

let props = WriterProperties::builder().build();
let props = WriterProperties::builder()
.set_max_row_group_size(CONFIG.parseable.row_group_size)
.set_compression(CONFIG.parseable.parquet_compression.into())
.build();
let schema = Arc::new(record_reader.merged_schema());
let mut writer = ArrowWriter::try_new(parquet_file, schema.clone(), Some(props))?;

Expand Down