diff --git a/server/Cargo.toml b/server/Cargo.toml index 423d1316e..eb74eabeb 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -28,6 +28,7 @@ object_store = { version = "0.4", features=["aws"] } derive_more = "0.99.17" env_logger = "0.9.0" futures = "0.3" +filetime = "0.2.17" http = "0.2.4" lazy_static = "1.4.0" log = "0.4.14" diff --git a/server/src/main.rs b/server/src/main.rs index 573a34f60..11a16775f 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -22,13 +22,17 @@ use actix_web::{middleware, web, App, HttpServer}; use actix_web_httpauth::extractors::basic::BasicAuth; use actix_web_httpauth::middleware::HttpAuthentication; use actix_web_static_files::ResourceFiles; +use chrono::{DateTime, NaiveDateTime, Timelike, Utc}; use clokwerk::{AsyncScheduler, Scheduler, TimeUnits}; +use filetime::FileTime; use log::warn; use openssl::ssl::{SslAcceptor, SslFiletype, SslMethod}; include!(concat!(env!("OUT_DIR"), "/generated.rs")); +use std::fs; use std::panic::{catch_unwind, AssertUnwindSafe}; +use std::path::Path; use std::thread::{self, JoinHandle}; use std::time::Duration; use tokio::sync::oneshot; @@ -51,7 +55,7 @@ mod validator; use error::Error; use option::CONFIG; use s3::S3; -use storage::ObjectStorage; +use storage::{ObjectStorage, StorageDir}; // Global configurations const MAX_EVENT_PAYLOAD_SIZE: usize = 102400; @@ -69,6 +73,10 @@ async fn main() -> anyhow::Result<()> { warn!("could not populate local metadata. {:?}", e); } + // Move all exiting data.parquet file to their respective tmp directory + // they will be synced to object store on next s3 sync cycle + startup_sync(); + let (localsync_handler, mut localsync_outbox, localsync_inbox) = run_local_sync(); let (mut s3sync_handler, mut s3sync_outbox, mut s3sync_inbox) = s3_sync(); @@ -98,6 +106,66 @@ async fn main() -> anyhow::Result<()> { } } +fn startup_sync() { + if !Path::new(&CONFIG.parseable.local_disk_path).exists() { + return; + } + + for stream in metadata::STREAM_INFO.list_streams() { + let dir = StorageDir::new(&stream); + // if data.parquet file is not present then skip this stream + if !dir.parquet_path_exists() { + continue; + } + if let Err(e) = dir.create_temp_dir() { + log::error!( + "Error creating tmp directory for {} due to error [{}]", + &stream, + e + ); + continue; + } + // create prefix for this file from its last modified time + let path = dir.data_path.join("data.parquet"); + + // metadata.modified gives us system time + // This may not work on all platfomns + let metadata = match fs::metadata(&path) { + Ok(meta) => meta, + Err(err) => { + log::warn!( + "Failed to get file metadata for {} due to {:?}. Skipping!", + path.display(), + err + ); + continue; + } + }; + + let last_modified = FileTime::from_last_modification_time(&metadata); + let last_modified = NaiveDateTime::from_timestamp(last_modified.unix_seconds(), 0); + let last_modified: DateTime = DateTime::from_utc(last_modified, Utc); + + let uri = utils::date_to_prefix(last_modified.date()) + + &utils::hour_to_prefix(last_modified.hour()) + + &utils::minute_to_prefix( + last_modified.minute(), + storage::OBJECT_STORE_DATA_GRANULARITY, + ) + .unwrap(); + let local_uri = str::replace(&uri, "/", "."); + let hostname = utils::hostname_unchecked(); + let parquet_file_local = format!("{}{}.data.parquet", local_uri, hostname); + if let Err(err) = dir.move_parquet_to_temp(parquet_file_local) { + log::warn!( + "Failed to move parquet file at {} to tmp directory due to error {}", + path.display(), + err + ) + } + } +} + fn s3_sync() -> (JoinHandle<()>, oneshot::Receiver<()>, oneshot::Sender<()>) { let (outbox_tx, outbox_rx) = oneshot::channel::<()>(); let (inbox_tx, inbox_rx) = oneshot::channel::<()>(); diff --git a/server/src/storage.rs b/server/src/storage.rs index 5dd9bc0d5..e74a88679 100644 --- a/server/src/storage.rs +++ b/server/src/storage.rs @@ -164,18 +164,18 @@ impl StorageDir { } } - fn create_temp_dir(&self) -> io::Result<()> { + pub fn create_temp_dir(&self) -> io::Result<()> { fs::create_dir_all(&self.temp_dir) } - fn move_parquet_to_temp(&self, filename: String) -> io::Result<()> { + pub fn move_parquet_to_temp(&self, filename: String) -> io::Result<()> { fs::rename( self.data_path.join("data.parquet"), self.temp_dir.join(filename), ) } - fn parquet_path_exists(&self) -> bool { + pub fn parquet_path_exists(&self) -> bool { self.data_path.join("data.parquet").exists() } }