Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ log = "0.4.14"
num_cpus = "1.0.0"
openssl = { version = "0.10" }
os_info = "3.0.7"
hostname = "0.3"
parquet = "15.0"
rand = "0.8.4"
rust-flatten-json = "0.2.0"
Expand Down
5 changes: 4 additions & 1 deletion server/src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,10 @@ impl Event {
fn data_file_path(&self) -> String {
format!(
"{}/{}",
CONFIG.parseable.local_stream_data_path(&self.stream_name),
CONFIG
.parseable
.local_stream_data_path(&self.stream_name)
.to_string_lossy(),
"data.parquet"
)
}
Expand Down
4 changes: 4 additions & 0 deletions server/src/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,10 @@ impl STREAM_INFO {
Ok(())
}

pub fn list_streams(&self) -> Vec<String> {
self.read().unwrap().keys().map(String::clone).collect()
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's not add unwrap.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unwrapping here is fine because same thread does not hold any lock. Some measures can be considered to handle poisoning across all instances where read is needed.


pub fn update_stats(
&self,
stream_name: &str,
Expand Down
12 changes: 6 additions & 6 deletions server/src/option.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ impl Config {
Local Data Path: {}
Object Storage: {}/{}",
"Storage:".to_string().blue().bold(),
self.parseable.local_disk_path,
self.parseable.local_disk_path.to_string_lossy(),
self.storage.endpoint_url(),
self.storage.bucket_name()
)
Expand Down Expand Up @@ -181,7 +181,7 @@ pub struct Opt {
/// for incoming events and local cache while querying data pulled
/// from object storage backend
#[structopt(long, env = "P_LOCAL_STORAGE", default_value = "./data")]
pub local_disk_path: String,
pub local_disk_path: PathBuf,

/// Optional interval after which server would upload uncommited data to
/// remote object storage platform. Defaults to 1min.
Expand All @@ -198,12 +198,12 @@ pub struct Opt {
}

impl Opt {
pub fn get_cache_path(&self, stream_name: &str) -> String {
format!("{}/{}", self.local_disk_path, stream_name)
pub fn get_cache_path(&self, stream_name: &str) -> PathBuf {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this and below function return the exact same thing. Let's remove one of these.

self.local_disk_path.join(stream_name)
}

pub fn local_stream_data_path(&self, stream_name: &str) -> String {
format!("{}/{}", self.local_disk_path, stream_name)
pub fn local_stream_data_path(&self, stream_name: &str) -> PathBuf {
self.local_disk_path.join(stream_name)
}

pub fn get_scheme(&self) -> String {
Expand Down
6 changes: 5 additions & 1 deletion server/src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,11 @@ impl Query {

ctx.register_listing_table(
&self.stream_name,
CONFIG.parseable.get_cache_path(&self.stream_name).as_str(),
CONFIG
.parseable
.get_cache_path(&self.stream_name)
.to_str()
.unwrap(),
listing_options,
None,
)
Expand Down
159 changes: 64 additions & 95 deletions server/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
*/

use crate::alerts::Alerts;
use crate::metadata::Stats;
use crate::metadata::{Stats, STREAM_INFO};
use crate::option::CONFIG;
use crate::query::Query;
use crate::utils;
Expand All @@ -32,7 +32,7 @@ use std::fmt::Debug;
use std::fs;
use std::io;
use std::iter::Iterator;
use std::path::Path;
use std::path::{Path, PathBuf};

extern crate walkdir;
use walkdir::WalkDir;
Expand Down Expand Up @@ -72,34 +72,31 @@ pub trait ObjectStorage: Sync + 'static {
return Ok(());
}

let entries = fs::read_dir(&CONFIG.parseable.local_disk_path)?
.map(|res| res.map(|e| e.path()))
.collect::<Result<Vec<_>, io::Error>>()?;
let streams = STREAM_INFO.list_streams();

// entries here means all the streams present on local disk
for entry in entries {
let path = entry.into_os_string().into_string().unwrap();
let init_sync = StorageSync::new(path);
for stream in streams {
let sync = StorageSync::new(stream.clone());

// if data.parquet file not present, skip this stream
if !init_sync.parquet_path_exists() {
if !sync.dir.parquet_path_exists() {
continue;
}

let dir = init_sync.get_dir_name();
if let Err(e) = dir.create_dir_name_tmp() {
if let Err(e) = sync.dir.create_temp_dir() {
log::error!(
"Error copying parquet file {} due to error [{}]",
dir.parquet_path,
"Error creating tmp directory for {} due to error [{}]",
&stream,
e
);
continue;
}

if let Err(e) = dir.move_parquet_to_tmp() {
if let Err(e) = sync.move_parquet_to_temp() {
log::error!(
"Error copying parquet from stream dir to tmp in path {} due to error [{}]",
dir.dir_name_local,
"Error copying parquet from stream directory in [{}] to tmp directory [{}] due to error [{}]",
sync.dir.data_path.to_string_lossy(),
sync.dir.temp_dir.to_string_lossy(),
e
);
continue;
Expand All @@ -114,35 +111,30 @@ pub trait ObjectStorage: Sync + 'static {
return Ok(());
}

let entries = fs::read_dir(&CONFIG.parseable.local_disk_path)?
.map(|res| res.map(|e| e.path()))
.collect::<Result<Vec<_>, io::Error>>()?;
let streams = STREAM_INFO.list_streams();

for entry in entries {
let path = entry.into_os_string().into_string().unwrap();
let init_sync = StorageSync::new(path);
for stream in streams {
let dir = StorageDir::new(stream.clone());

let dir = init_sync.get_dir_name();

for file in WalkDir::new(&format!("{}/tmp", &dir.dir_name_local))
for file in WalkDir::new(dir.temp_dir)
.min_depth(1)
.max_depth(1)
.into_iter()
.filter_map(|file| file.ok())
.map(|file| file.path().to_path_buf())
.filter(|file| file.is_file())
{
if file.metadata().unwrap().is_file() {
let file_local = format!("{}", file.path().display());
let file_s3 = file_local.replace("/tmp", "");
let final_s3_path =
file_s3.replace(&format!("{}/", CONFIG.parseable.local_disk_path), "");
let f_path = str::replace(&final_s3_path, ".", "/");
let f_new_path = f_path.replace("/parquet", ".parquet");
let _put_parquet_file = self.upload_file(&f_new_path, &file_local).await?;
if let Err(e) = dir.delete_parquet_file(file_local.clone()) {
log::error!(
"Error deleting parquet file in path {} due to error [{}]",
file_local,
e
);
}
let filename = file.file_name().unwrap().to_str().unwrap();
let file_suffix = str::replacen(filename, ".", "/", 3);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets avoid adding new unwraps

Copy link
Contributor Author

@trueleo trueleo Aug 24, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

filename is a valid utf-8 filename generated when local sync happens. Only way this could fail is if someone created an invalid file in the tmp directory.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Someone could try this, we can't underestimate our users :)

let s3_path = format!("{}/{}", stream, file_suffix);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets use the join approach everywhere

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s3 path is key for s3 put object. This does not represents any local file but is generated from the filename of the files which are yet to be synced. It is fine for it to be a string.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

right


let _put_parquet_file = self.upload_file(&s3_path, file.to_str().unwrap()).await?;
if let Err(e) = fs::remove_file(&file) {
log::error!(
"Error deleting parquet file in path {} due to error [{}]",
file.to_string_lossy(),
e
);
}
}
}
Expand All @@ -156,82 +148,59 @@ pub struct LogStream {
}

#[derive(Debug)]
struct DirName {
dir_name_tmp_local: String,
dir_name_local: String,
parquet_path: String,
parquet_file_local: String,
struct StorageDir {
pub data_path: PathBuf,
pub temp_dir: PathBuf,
}

impl DirName {
fn move_parquet_to_tmp(&self) -> io::Result<()> {
fs::rename(
&self.parquet_path,
format!("{}/{}", self.dir_name_tmp_local, self.parquet_file_local),
)
impl StorageDir {
fn new(stream_name: String) -> Self {
let data_path = CONFIG.parseable.local_stream_data_path(&stream_name);
let temp_dir = data_path.join("tmp");

Self {
data_path,
temp_dir,
}
}

fn create_dir_name_tmp(&self) -> io::Result<()> {
fs::create_dir_all(&self.dir_name_tmp_local)
fn create_temp_dir(&self) -> io::Result<()> {
fs::create_dir_all(&self.temp_dir)
}

fn delete_parquet_file(&self, path: String) -> io::Result<()> {
fs::remove_file(path)
fn move_parquet_to_temp(&self, filename: String) -> io::Result<()> {
fs::rename(
self.data_path.join("data.parquet"),
self.temp_dir.join(filename),
)
}

fn parquet_path_exists(&self) -> bool {
self.data_path.join("data.parquet").exists()
}
}

struct StorageSync {
path: String,
pub dir: StorageDir,
time: chrono::DateTime<Utc>,
}

impl StorageSync {
fn new(path: String) -> Self {
Self {
path,
time: Utc::now(),
}
fn new(stream_name: String) -> Self {
let dir = StorageDir::new(stream_name);
let time = Utc::now();
Self { dir, time }
}

fn parquet_path_exists(&self) -> bool {
let new_parquet_path = format!("{}/data.parquet", &self.path);

Path::new(&new_parquet_path).exists()
}

fn get_dir_name(&self) -> DirName {
let local_path = format!("{}/", CONFIG.parseable.local_disk_path);
let _storage_path = format!("{}/", CONFIG.storage.bucket_name());
let stream_name = self.path.replace(&local_path, "");
let parquet_path = format!("{}/data.parquet", self.path);
// subtract OBJECT_STORE_DATA_GRANULARITY from current time here,
// this is because, when we're creating this file
// the data in the file is from OBJECT_STORE_DATA_GRANULARITY time ago.
fn move_parquet_to_temp(&self) -> io::Result<()> {
let time = self.time - Duration::minutes(OBJECT_STORE_DATA_GRANULARITY as i64);
let uri = utils::date_to_prefix(time.date())
+ &utils::hour_to_prefix(time.hour())
+ &utils::minute_to_prefix(time.minute(), OBJECT_STORE_DATA_GRANULARITY).unwrap();

let local_uri = str::replace(&uri, "/", ".");

let dir_name_tmp_local = format!("{}{}/tmp", local_path, stream_name);

let storage_dir_name_s3 = format!("{}/{}", stream_name, uri);

let random_string = utils::random_string();

let parquet_file_local = format!("{}{}.parquet", local_uri, random_string);

let _parquet_file_s3 = format!("{}{}.parquet", storage_dir_name_s3, random_string);

let dir_name_local = local_path + &stream_name;

DirName {
dir_name_tmp_local,
dir_name_local,
parquet_path,
parquet_file_local,
}
let hostname = utils::hostname_unchecked();
let parquet_file_local = format!("{}{}.data.parquet", local_uri, hostname);
self.dir.move_parquet_to_temp(parquet_file_local)
}
}

Expand Down
16 changes: 9 additions & 7 deletions server/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
use actix_web::web;
use actix_web::HttpRequest;
use chrono::{Date, DateTime, Timelike, Utc};
use rand::{distributions::Alphanumeric, Rng};
use serde_json::{json, Value};
use std::collections::HashMap;

Expand Down Expand Up @@ -67,12 +66,15 @@ fn merge(v: &Value, fields: &HashMap<String, String>) -> Value {
}
}

pub fn random_string() -> String {
rand::thread_rng()
.sample_iter(&Alphanumeric)
.take(7)
.map(char::from)
.collect()
#[allow(dead_code)]
pub fn hostname() -> Option<String> {
hostname::get()
.ok()
.and_then(|hostname| hostname.into_string().ok())
}

pub fn hostname_unchecked() -> String {
hostname::get().unwrap().into_string().unwrap()
}

/// Convert minutes to a slot range
Expand Down