Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 9 additions & 5 deletions server/src/storage/staging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ use parquet::{
};

use crate::{
event::DEFAULT_TIMESTAMP_KEY,
metrics,
option::CONFIG,
storage::OBJECT_STORE_DATA_GRANULARITY,
Expand All @@ -47,6 +48,9 @@ use crate::{
},
};

const ARROW_FILE_EXTENSION: &str = "data.arrows";
const PARQUET_FILE_EXTENSION: &str = "data.parquet";

// in mem global that hold all the in mem buffer that are ready to convert
pub static MEMORY_READ_BUFFERS: Lazy<RwLock<HashMap<String, Vec<ReadBuf>>>> =
Lazy::new(RwLock::default);
Expand Down Expand Up @@ -93,7 +97,7 @@ impl StorageDir {
format!(
"{}.{}",
stream_hash,
Self::file_time_suffix(time, "data.arrows")
Self::file_time_suffix(time, ARROW_FILE_EXTENSION)
)
}

Expand Down Expand Up @@ -140,8 +144,8 @@ impl StorageDir {
&self,
exclude: NaiveDateTime,
) -> HashMap<PathBuf, Vec<PathBuf>> {
let hot_filename = StorageDir::file_time_suffix(exclude, "data.arrow");
// hashmap <time, vec[paths]> but exclude where hotfilename matches
let hot_filename = StorageDir::file_time_suffix(exclude, ARROW_FILE_EXTENSION);
// hashmap <time, vec[paths]> but exclude where hot filename matches
let mut grouped_arrow_file: HashMap<PathBuf, Vec<PathBuf>> = HashMap::new();
let mut arrow_files = self.arrow_files();
arrow_files.retain(|path| {
Expand Down Expand Up @@ -185,7 +189,7 @@ impl StorageDir {

pub fn to_parquet_path(stream_name: &str, time: NaiveDateTime) -> PathBuf {
let data_path = CONFIG.parseable.local_stream_data_path(stream_name);
let dir = StorageDir::file_time_suffix(time, "data.parquet");
let dir = StorageDir::file_time_suffix(time, PARQUET_FILE_EXTENSION);

data_path.join(dir)
}
Expand Down Expand Up @@ -278,7 +282,7 @@ fn parquet_writer_props() -> WriterPropertiesBuilder {
.set_max_row_group_size(CONFIG.parseable.row_group_size)
.set_compression(CONFIG.parseable.parquet_compression.into())
.set_column_encoding(
ColumnPath::new(vec!["p_timestamp".to_string()]),
ColumnPath::new(vec![DEFAULT_TIMESTAMP_KEY.to_string()]),
Encoding::DELTA_BINARY_PACKED,
)
}
Expand Down