Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ object_store = { version = "0.4", features=["aws"] }
derive_more = "0.99.17"
env_logger = "0.9.0"
futures = "0.3"
filetime = "0.2.17"
http = "0.2.4"
lazy_static = "1.4.0"
log = "0.4.14"
Expand Down
70 changes: 69 additions & 1 deletion server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,17 @@ use actix_web::{middleware, web, App, HttpServer};
use actix_web_httpauth::extractors::basic::BasicAuth;
use actix_web_httpauth::middleware::HttpAuthentication;
use actix_web_static_files::ResourceFiles;
use chrono::{DateTime, NaiveDateTime, Timelike, Utc};
use clokwerk::{AsyncScheduler, Scheduler, TimeUnits};
use filetime::FileTime;
use log::warn;
use openssl::ssl::{SslAcceptor, SslFiletype, SslMethod};

include!(concat!(env!("OUT_DIR"), "/generated.rs"));

use std::fs;
use std::panic::{catch_unwind, AssertUnwindSafe};
use std::path::Path;
use std::thread::{self, JoinHandle};
use std::time::Duration;
use tokio::sync::oneshot;
Expand All @@ -51,7 +55,7 @@ mod validator;
use error::Error;
use option::CONFIG;
use s3::S3;
use storage::ObjectStorage;
use storage::{ObjectStorage, StorageDir};

// Global configurations
const MAX_EVENT_PAYLOAD_SIZE: usize = 102400;
Expand All @@ -69,6 +73,10 @@ async fn main() -> anyhow::Result<()> {
warn!("could not populate local metadata. {:?}", e);
}

// Move all exiting data.parquet file to their respective tmp directory
// they will be synced to object store on next s3 sync cycle
startup_sync();

let (localsync_handler, mut localsync_outbox, localsync_inbox) = run_local_sync();
let (mut s3sync_handler, mut s3sync_outbox, mut s3sync_inbox) = s3_sync();

Expand Down Expand Up @@ -98,6 +106,66 @@ async fn main() -> anyhow::Result<()> {
}
}

fn startup_sync() {
if !Path::new(&CONFIG.parseable.local_disk_path).exists() {
return;
}

for stream in metadata::STREAM_INFO.list_streams() {
let dir = StorageDir::new(&stream);
// if data.parquet file is not present then skip this stream
if !dir.parquet_path_exists() {
continue;
}
if let Err(e) = dir.create_temp_dir() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is the behaviour here if the tmp directory exists already? I mean we need to ensure we don't overwrite, because it is highly probably that the directory exists and has some data that is still not sent to S3.

Copy link
Contributor Author

@trueleo trueleo Aug 28, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

create_dir_all recursively builds all the missing component of path. If there is already an existing tmp, it will not overwrite anything inside

log::error!(
"Error creating tmp directory for {} due to error [{}]",
&stream,
e
);
continue;
}
// create prefix for this file from its last modified time
let path = dir.data_path.join("data.parquet");

// metadata.modified gives us system time
// This may not work on all platfomns
let metadata = match fs::metadata(&path) {
Ok(meta) => meta,
Err(err) => {
log::warn!(
"Failed to get file metadata for {} due to {:?}. Skipping!",
path.display(),
err
);
continue;
}
};

let last_modified = FileTime::from_last_modification_time(&metadata);
let last_modified = NaiveDateTime::from_timestamp(last_modified.unix_seconds(), 0);
let last_modified: DateTime<Utc> = DateTime::from_utc(last_modified, Utc);

let uri = utils::date_to_prefix(last_modified.date())
+ &utils::hour_to_prefix(last_modified.hour())
+ &utils::minute_to_prefix(
last_modified.minute(),
storage::OBJECT_STORE_DATA_GRANULARITY,
)
.unwrap();
let local_uri = str::replace(&uri, "/", ".");
let hostname = utils::hostname_unchecked();
let parquet_file_local = format!("{}{}.data.parquet", local_uri, hostname);
if let Err(err) = dir.move_parquet_to_temp(parquet_file_local) {
log::warn!(
"Failed to move parquet file at {} to tmp directory due to error {}",
path.display(),
err
)
}
}
}

fn s3_sync() -> (JoinHandle<()>, oneshot::Receiver<()>, oneshot::Sender<()>) {
let (outbox_tx, outbox_rx) = oneshot::channel::<()>();
let (inbox_tx, inbox_rx) = oneshot::channel::<()>();
Expand Down
6 changes: 3 additions & 3 deletions server/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,18 +164,18 @@ impl StorageDir {
}
}

fn create_temp_dir(&self) -> io::Result<()> {
pub fn create_temp_dir(&self) -> io::Result<()> {
fs::create_dir_all(&self.temp_dir)
}

fn move_parquet_to_temp(&self, filename: String) -> io::Result<()> {
pub fn move_parquet_to_temp(&self, filename: String) -> io::Result<()> {
fs::rename(
self.data_path.join("data.parquet"),
self.temp_dir.join(filename),
)
}

fn parquet_path_exists(&self) -> bool {
pub fn parquet_path_exists(&self) -> bool {
self.data_path.join("data.parquet").exists()
}
}
Expand Down