diff --git a/server/Cargo.toml b/server/Cargo.toml index 8da2242e5..7505d007f 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -34,6 +34,7 @@ log = "0.4.14" num_cpus = "1.0.0" openssl = { version = "0.10" } os_info = "3.0.7" +hostname = "0.3" parquet = "15.0" rand = "0.8.4" rust-flatten-json = "0.2.0" diff --git a/server/src/event.rs b/server/src/event.rs index 0b323c4f8..5d04845e5 100644 --- a/server/src/event.rs +++ b/server/src/event.rs @@ -51,7 +51,10 @@ impl Event { fn data_file_path(&self) -> String { format!( "{}/{}", - CONFIG.parseable.local_stream_data_path(&self.stream_name), + CONFIG + .parseable + .local_stream_data_path(&self.stream_name) + .to_string_lossy(), "data.parquet" ) } diff --git a/server/src/metadata.rs b/server/src/metadata.rs index 16c2e59d7..f4408bebf 100644 --- a/server/src/metadata.rs +++ b/server/src/metadata.rs @@ -170,6 +170,10 @@ impl STREAM_INFO { Ok(()) } + pub fn list_streams(&self) -> Vec { + self.read().unwrap().keys().map(String::clone).collect() + } + pub fn update_stats( &self, stream_name: &str, diff --git a/server/src/option.rs b/server/src/option.rs index a14942ce6..f3a8c8fa5 100644 --- a/server/src/option.rs +++ b/server/src/option.rs @@ -110,7 +110,7 @@ impl Config { Local Data Path: {} Object Storage: {}/{}", "Storage:".to_string().blue().bold(), - self.parseable.local_disk_path, + self.parseable.local_disk_path.to_string_lossy(), self.storage.endpoint_url(), self.storage.bucket_name() ) @@ -181,7 +181,7 @@ pub struct Opt { /// for incoming events and local cache while querying data pulled /// from object storage backend #[structopt(long, env = "P_LOCAL_STORAGE", default_value = "./data")] - pub local_disk_path: String, + pub local_disk_path: PathBuf, /// Optional interval after which server would upload uncommited data to /// remote object storage platform. Defaults to 1min. @@ -198,12 +198,12 @@ pub struct Opt { } impl Opt { - pub fn get_cache_path(&self, stream_name: &str) -> String { - format!("{}/{}", self.local_disk_path, stream_name) + pub fn get_cache_path(&self, stream_name: &str) -> PathBuf { + self.local_disk_path.join(stream_name) } - pub fn local_stream_data_path(&self, stream_name: &str) -> String { - format!("{}/{}", self.local_disk_path, stream_name) + pub fn local_stream_data_path(&self, stream_name: &str) -> PathBuf { + self.local_disk_path.join(stream_name) } pub fn get_scheme(&self) -> String { diff --git a/server/src/query.rs b/server/src/query.rs index ef82ff9d5..1bdc44180 100644 --- a/server/src/query.rs +++ b/server/src/query.rs @@ -94,7 +94,11 @@ impl Query { ctx.register_listing_table( &self.stream_name, - CONFIG.parseable.get_cache_path(&self.stream_name).as_str(), + CONFIG + .parseable + .get_cache_path(&self.stream_name) + .to_str() + .unwrap(), listing_options, None, ) diff --git a/server/src/storage.rs b/server/src/storage.rs index 01a579bc8..07971526c 100644 --- a/server/src/storage.rs +++ b/server/src/storage.rs @@ -17,7 +17,7 @@ */ use crate::alerts::Alerts; -use crate::metadata::Stats; +use crate::metadata::{Stats, STREAM_INFO}; use crate::option::CONFIG; use crate::query::Query; use crate::utils; @@ -32,7 +32,7 @@ use std::fmt::Debug; use std::fs; use std::io; use std::iter::Iterator; -use std::path::Path; +use std::path::{Path, PathBuf}; extern crate walkdir; use walkdir::WalkDir; @@ -72,34 +72,31 @@ pub trait ObjectStorage: Sync + 'static { return Ok(()); } - let entries = fs::read_dir(&CONFIG.parseable.local_disk_path)? - .map(|res| res.map(|e| e.path())) - .collect::, io::Error>>()?; + let streams = STREAM_INFO.list_streams(); // entries here means all the streams present on local disk - for entry in entries { - let path = entry.into_os_string().into_string().unwrap(); - let init_sync = StorageSync::new(path); + for stream in streams { + let sync = StorageSync::new(stream.clone()); // if data.parquet file not present, skip this stream - if !init_sync.parquet_path_exists() { + if !sync.dir.parquet_path_exists() { continue; } - let dir = init_sync.get_dir_name(); - if let Err(e) = dir.create_dir_name_tmp() { + if let Err(e) = sync.dir.create_temp_dir() { log::error!( - "Error copying parquet file {} due to error [{}]", - dir.parquet_path, + "Error creating tmp directory for {} due to error [{}]", + &stream, e ); continue; } - if let Err(e) = dir.move_parquet_to_tmp() { + if let Err(e) = sync.move_parquet_to_temp() { log::error!( - "Error copying parquet from stream dir to tmp in path {} due to error [{}]", - dir.dir_name_local, + "Error copying parquet from stream directory in [{}] to tmp directory [{}] due to error [{}]", + sync.dir.data_path.to_string_lossy(), + sync.dir.temp_dir.to_string_lossy(), e ); continue; @@ -114,35 +111,30 @@ pub trait ObjectStorage: Sync + 'static { return Ok(()); } - let entries = fs::read_dir(&CONFIG.parseable.local_disk_path)? - .map(|res| res.map(|e| e.path())) - .collect::, io::Error>>()?; + let streams = STREAM_INFO.list_streams(); - for entry in entries { - let path = entry.into_os_string().into_string().unwrap(); - let init_sync = StorageSync::new(path); + for stream in streams { + let dir = StorageDir::new(stream.clone()); - let dir = init_sync.get_dir_name(); - - for file in WalkDir::new(&format!("{}/tmp", &dir.dir_name_local)) + for file in WalkDir::new(dir.temp_dir) + .min_depth(1) + .max_depth(1) .into_iter() .filter_map(|file| file.ok()) + .map(|file| file.path().to_path_buf()) + .filter(|file| file.is_file()) { - if file.metadata().unwrap().is_file() { - let file_local = format!("{}", file.path().display()); - let file_s3 = file_local.replace("/tmp", ""); - let final_s3_path = - file_s3.replace(&format!("{}/", CONFIG.parseable.local_disk_path), ""); - let f_path = str::replace(&final_s3_path, ".", "/"); - let f_new_path = f_path.replace("/parquet", ".parquet"); - let _put_parquet_file = self.upload_file(&f_new_path, &file_local).await?; - if let Err(e) = dir.delete_parquet_file(file_local.clone()) { - log::error!( - "Error deleting parquet file in path {} due to error [{}]", - file_local, - e - ); - } + let filename = file.file_name().unwrap().to_str().unwrap(); + let file_suffix = str::replacen(filename, ".", "/", 3); + let s3_path = format!("{}/{}", stream, file_suffix); + + let _put_parquet_file = self.upload_file(&s3_path, file.to_str().unwrap()).await?; + if let Err(e) = fs::remove_file(&file) { + log::error!( + "Error deleting parquet file in path {} due to error [{}]", + file.to_string_lossy(), + e + ); } } } @@ -156,82 +148,59 @@ pub struct LogStream { } #[derive(Debug)] -struct DirName { - dir_name_tmp_local: String, - dir_name_local: String, - parquet_path: String, - parquet_file_local: String, +struct StorageDir { + pub data_path: PathBuf, + pub temp_dir: PathBuf, } -impl DirName { - fn move_parquet_to_tmp(&self) -> io::Result<()> { - fs::rename( - &self.parquet_path, - format!("{}/{}", self.dir_name_tmp_local, self.parquet_file_local), - ) +impl StorageDir { + fn new(stream_name: String) -> Self { + let data_path = CONFIG.parseable.local_stream_data_path(&stream_name); + let temp_dir = data_path.join("tmp"); + + Self { + data_path, + temp_dir, + } } - fn create_dir_name_tmp(&self) -> io::Result<()> { - fs::create_dir_all(&self.dir_name_tmp_local) + fn create_temp_dir(&self) -> io::Result<()> { + fs::create_dir_all(&self.temp_dir) } - fn delete_parquet_file(&self, path: String) -> io::Result<()> { - fs::remove_file(path) + fn move_parquet_to_temp(&self, filename: String) -> io::Result<()> { + fs::rename( + self.data_path.join("data.parquet"), + self.temp_dir.join(filename), + ) + } + + fn parquet_path_exists(&self) -> bool { + self.data_path.join("data.parquet").exists() } } struct StorageSync { - path: String, + pub dir: StorageDir, time: chrono::DateTime, } impl StorageSync { - fn new(path: String) -> Self { - Self { - path, - time: Utc::now(), - } + fn new(stream_name: String) -> Self { + let dir = StorageDir::new(stream_name); + let time = Utc::now(); + Self { dir, time } } - fn parquet_path_exists(&self) -> bool { - let new_parquet_path = format!("{}/data.parquet", &self.path); - - Path::new(&new_parquet_path).exists() - } - - fn get_dir_name(&self) -> DirName { - let local_path = format!("{}/", CONFIG.parseable.local_disk_path); - let _storage_path = format!("{}/", CONFIG.storage.bucket_name()); - let stream_name = self.path.replace(&local_path, ""); - let parquet_path = format!("{}/data.parquet", self.path); - // subtract OBJECT_STORE_DATA_GRANULARITY from current time here, - // this is because, when we're creating this file - // the data in the file is from OBJECT_STORE_DATA_GRANULARITY time ago. + fn move_parquet_to_temp(&self) -> io::Result<()> { let time = self.time - Duration::minutes(OBJECT_STORE_DATA_GRANULARITY as i64); let uri = utils::date_to_prefix(time.date()) + &utils::hour_to_prefix(time.hour()) + &utils::minute_to_prefix(time.minute(), OBJECT_STORE_DATA_GRANULARITY).unwrap(); - let local_uri = str::replace(&uri, "/", "."); - - let dir_name_tmp_local = format!("{}{}/tmp", local_path, stream_name); - - let storage_dir_name_s3 = format!("{}/{}", stream_name, uri); - - let random_string = utils::random_string(); - - let parquet_file_local = format!("{}{}.parquet", local_uri, random_string); - - let _parquet_file_s3 = format!("{}{}.parquet", storage_dir_name_s3, random_string); - - let dir_name_local = local_path + &stream_name; - - DirName { - dir_name_tmp_local, - dir_name_local, - parquet_path, - parquet_file_local, - } + let hostname = utils::hostname_unchecked(); + let parquet_file_local = format!("{}{}.data.parquet", local_uri, hostname); + self.dir.move_parquet_to_temp(parquet_file_local) } } diff --git a/server/src/utils.rs b/server/src/utils.rs index b631d9e6e..af93288cf 100644 --- a/server/src/utils.rs +++ b/server/src/utils.rs @@ -19,7 +19,6 @@ use actix_web::web; use actix_web::HttpRequest; use chrono::{Date, DateTime, Timelike, Utc}; -use rand::{distributions::Alphanumeric, Rng}; use serde_json::{json, Value}; use std::collections::HashMap; @@ -67,12 +66,15 @@ fn merge(v: &Value, fields: &HashMap) -> Value { } } -pub fn random_string() -> String { - rand::thread_rng() - .sample_iter(&Alphanumeric) - .take(7) - .map(char::from) - .collect() +#[allow(dead_code)] +pub fn hostname() -> Option { + hostname::get() + .ok() + .and_then(|hostname| hostname.into_string().ok()) +} + +pub fn hostname_unchecked() -> String { + hostname::get().unwrap().into_string().unwrap() } /// Convert minutes to a slot range