From 036202ef6df4cc814329a97d11347d7d5bf560ab Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Sun, 28 Aug 2022 19:31:20 +0530 Subject: [PATCH 1/2] Move data.parquet to last modified time on startup --- server/src/main.rs | 68 ++++++++++++++++++++++++++++++++++++++++++- server/src/storage.rs | 6 ++-- 2 files changed, 70 insertions(+), 4 deletions(-) diff --git a/server/src/main.rs b/server/src/main.rs index 573a34f60..f4c66adf8 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -22,13 +22,16 @@ use actix_web::{middleware, web, App, HttpServer}; use actix_web_httpauth::extractors::basic::BasicAuth; use actix_web_httpauth::middleware::HttpAuthentication; use actix_web_static_files::ResourceFiles; +use chrono::{DateTime, Timelike, Utc}; use clokwerk::{AsyncScheduler, Scheduler, TimeUnits}; use log::warn; use openssl::ssl::{SslAcceptor, SslFiletype, SslMethod}; include!(concat!(env!("OUT_DIR"), "/generated.rs")); +use std::fs; use std::panic::{catch_unwind, AssertUnwindSafe}; +use std::path::Path; use std::thread::{self, JoinHandle}; use std::time::Duration; use tokio::sync::oneshot; @@ -51,7 +54,7 @@ mod validator; use error::Error; use option::CONFIG; use s3::S3; -use storage::ObjectStorage; +use storage::{ObjectStorage, StorageDir}; // Global configurations const MAX_EVENT_PAYLOAD_SIZE: usize = 102400; @@ -69,6 +72,10 @@ async fn main() -> anyhow::Result<()> { warn!("could not populate local metadata. {:?}", e); } + // Move all exiting data.parquet file to their respective tmp directory + // they will be synced to object store on next s3 sync cycle + startup_sync(); + let (localsync_handler, mut localsync_outbox, localsync_inbox) = run_local_sync(); let (mut s3sync_handler, mut s3sync_outbox, mut s3sync_inbox) = s3_sync(); @@ -98,6 +105,65 @@ async fn main() -> anyhow::Result<()> { } } +fn startup_sync() { + if !Path::new(&CONFIG.parseable.local_disk_path).exists() { + return; + } + + for stream in metadata::STREAM_INFO.list_streams() { + let dir = StorageDir::new(&stream); + // if data.parquet file is not present then skip this stream + if !dir.parquet_path_exists() { + continue; + } + if let Err(e) = dir.create_temp_dir() { + log::error!( + "Error creating tmp directory for {} due to error [{}]", + &stream, + e + ); + continue; + } + // create prefix for this file from its last modified time + let path = dir.data_path.join("data.parquet"); + + // metadata.modified gives us system time + // This may not work on all platfomns + let last_modified: DateTime = match fs::metadata(&path) + .and_then(|meta| meta.modified()) + .map(DateTime::from) + { + Ok(time) => time, + Err(err) => { + log::warn!( + "Failed to get last modified time for {} due to {:?}. Skipping!", + path.display(), + err + ); + continue; + } + }; + + let uri = utils::date_to_prefix(last_modified.date()) + + &utils::hour_to_prefix(last_modified.hour()) + + &utils::minute_to_prefix( + last_modified.minute(), + storage::OBJECT_STORE_DATA_GRANULARITY, + ) + .unwrap(); + let local_uri = str::replace(&uri, "/", "."); + let hostname = utils::hostname_unchecked(); + let parquet_file_local = format!("{}{}.data.parquet", local_uri, hostname); + if let Err(err) = dir.move_parquet_to_temp(parquet_file_local) { + log::warn!( + "Failed to move parquet file at {} to tmp directory due to error {}", + path.display(), + err + ) + } + } +} + fn s3_sync() -> (JoinHandle<()>, oneshot::Receiver<()>, oneshot::Sender<()>) { let (outbox_tx, outbox_rx) = oneshot::channel::<()>(); let (inbox_tx, inbox_rx) = oneshot::channel::<()>(); diff --git a/server/src/storage.rs b/server/src/storage.rs index 5dd9bc0d5..e74a88679 100644 --- a/server/src/storage.rs +++ b/server/src/storage.rs @@ -164,18 +164,18 @@ impl StorageDir { } } - fn create_temp_dir(&self) -> io::Result<()> { + pub fn create_temp_dir(&self) -> io::Result<()> { fs::create_dir_all(&self.temp_dir) } - fn move_parquet_to_temp(&self, filename: String) -> io::Result<()> { + pub fn move_parquet_to_temp(&self, filename: String) -> io::Result<()> { fs::rename( self.data_path.join("data.parquet"), self.temp_dir.join(filename), ) } - fn parquet_path_exists(&self) -> bool { + pub fn parquet_path_exists(&self) -> bool { self.data_path.join("data.parquet").exists() } } From 28993e8e527f90a8776c16f5415128ca92646624 Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Mon, 29 Aug 2022 17:04:43 +0530 Subject: [PATCH 2/2] Use filetime --- server/Cargo.toml | 1 + server/src/main.rs | 16 +++++++++------- 2 files changed, 10 insertions(+), 7 deletions(-) diff --git a/server/Cargo.toml b/server/Cargo.toml index 423d1316e..eb74eabeb 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -28,6 +28,7 @@ object_store = { version = "0.4", features=["aws"] } derive_more = "0.99.17" env_logger = "0.9.0" futures = "0.3" +filetime = "0.2.17" http = "0.2.4" lazy_static = "1.4.0" log = "0.4.14" diff --git a/server/src/main.rs b/server/src/main.rs index f4c66adf8..11a16775f 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -22,8 +22,9 @@ use actix_web::{middleware, web, App, HttpServer}; use actix_web_httpauth::extractors::basic::BasicAuth; use actix_web_httpauth::middleware::HttpAuthentication; use actix_web_static_files::ResourceFiles; -use chrono::{DateTime, Timelike, Utc}; +use chrono::{DateTime, NaiveDateTime, Timelike, Utc}; use clokwerk::{AsyncScheduler, Scheduler, TimeUnits}; +use filetime::FileTime; use log::warn; use openssl::ssl::{SslAcceptor, SslFiletype, SslMethod}; @@ -129,14 +130,11 @@ fn startup_sync() { // metadata.modified gives us system time // This may not work on all platfomns - let last_modified: DateTime = match fs::metadata(&path) - .and_then(|meta| meta.modified()) - .map(DateTime::from) - { - Ok(time) => time, + let metadata = match fs::metadata(&path) { + Ok(meta) => meta, Err(err) => { log::warn!( - "Failed to get last modified time for {} due to {:?}. Skipping!", + "Failed to get file metadata for {} due to {:?}. Skipping!", path.display(), err ); @@ -144,6 +142,10 @@ fn startup_sync() { } }; + let last_modified = FileTime::from_last_modification_time(&metadata); + let last_modified = NaiveDateTime::from_timestamp(last_modified.unix_seconds(), 0); + let last_modified: DateTime = DateTime::from_utc(last_modified, Utc); + let uri = utils::date_to_prefix(last_modified.date()) + &utils::hour_to_prefix(last_modified.hour()) + &utils::minute_to_prefix(