diff --git a/server/src/event.rs b/server/src/event.rs index 4bcb8beda..80acdf050 100644 --- a/server/src/event.rs +++ b/server/src/event.rs @@ -18,23 +18,143 @@ */ use datafusion::arrow; use datafusion::arrow::datatypes::Schema; +use datafusion::arrow::ipc::writer::StreamWriter; use datafusion::arrow::json; use datafusion::arrow::json::reader::infer_json_schema; use datafusion::arrow::record_batch::RecordBatch; -use datafusion::parquet::arrow::{ArrowReader, ArrowWriter, ParquetFileArrowReader}; -use datafusion::parquet::file::properties::WriterProperties; -use datafusion::parquet::file::serialized_reader::SerializedFileReader; +use lazy_static::lazy_static; use log::error; -use std::fs; +use std::collections::HashMap; +use std::fs::OpenOptions; use std::io::BufReader; use std::sync::Arc; +use std::sync::Mutex; +use std::sync::RwLock; use crate::metadata; +use crate::metadata::STREAM_INFO; use crate::option::CONFIG; use crate::response; use crate::storage::ObjectStorage; use crate::Error; +type LocalWriter = Mutex>>; + +lazy_static! { + #[derive(Default)] + pub static ref STREAM_WRITERS: RwLock> = RwLock::new(HashMap::new()); +} + +impl STREAM_WRITERS { + // append to a existing stream + fn append_to_local(stream: &str, record: &RecordBatch) -> Result<(), ()> { + let hashmap_guard = STREAM_WRITERS.read().unwrap(); + match hashmap_guard.get(stream) { + Some(localwriter) => { + let mut writer_guard = localwriter.lock().unwrap(); + if let Some(ref mut writer) = *writer_guard { + writer.write(record).map_err(|_| ())?; + } else { + drop(writer_guard); + drop(hashmap_guard); + STREAM_WRITERS::set_entry(stream, record).unwrap(); + } + } + None => { + drop(hashmap_guard); + STREAM_WRITERS::create_entry(stream.to_string(), record).unwrap(); + } + }; + Ok(()) + } + + // create a new entry with new stream_writer + // todo: error type + // Only create entry for valid streams + fn create_entry(stream: String, record: &RecordBatch) -> Result<(), ()> { + let mut hashmap_guard = STREAM_WRITERS.write().unwrap(); + + if STREAM_INFO.schema(&stream).is_err() { + return Err(()); + } + + let file = OpenOptions::new() + .append(true) + .create_new(true) + .open(data_file_path(&stream)) + .map_err(|_| ())?; + + let mut stream_writer = StreamWriter::try_new(file, &record.schema()).map_err(|_| ())?; + stream_writer.write(record).map_err(|_| ())?; + + hashmap_guard.insert(stream, Mutex::new(Some(stream_writer))); + + Ok(()) + } + + // Deleting a logstream requires that metadata is deleted first + pub fn delete_entry(stream: &str) -> Result<(), ()> { + let mut hashmap_guard = STREAM_WRITERS.write().unwrap(); + + if STREAM_INFO.schema(stream).is_ok() { + return Err(()); + } + + hashmap_guard.remove(stream); + + Ok(()) + } + + fn set_entry(stream: &str, record: &RecordBatch) -> Result<(), ()> { + let file = OpenOptions::new() + .append(true) + .create_new(true) + .open(data_file_path(stream)) + .map_err(|_| ())?; + + let mut stream_writer = StreamWriter::try_new(file, &record.schema()).map_err(|_| ())?; + stream_writer.write(record).map_err(|_| ())?; + + STREAM_WRITERS + .read() + .expect("Current Thread should not hold any lock") + .get(stream) + .expect("set entry is only called on valid entries") + .lock() + .expect("Poisioning is not handled yet") + .replace(stream_writer); // replace the stream writer behind this mutex + + Ok(()) + } + + // Unset the entry so that + pub fn unset_entry(stream: &str) { + let guard = STREAM_WRITERS.read().unwrap(); + let stream_writer = match guard.get(stream) { + Some(writer) => writer, + None => return, + }; + stream_writer + .lock() + .expect("Poisioning is not handled yet") + .take(); + } +} + +#[derive(Debug, thiserror::Error)] +enum StreamWriterError {} + +fn data_file_path(stream_name: &str) -> String { + format!( + "{}/{}", + CONFIG + .parseable + .local_stream_data_path(stream_name) + .to_string_lossy(), + "data.records" + ) +} + #[derive(Clone)] pub struct Event { pub body: String, @@ -44,17 +164,6 @@ pub struct Event { // Events holds the schema related to a each event for a single log stream impl Event { - fn data_file_path(&self) -> String { - format!( - "{}/{}", - CONFIG - .parseable - .local_stream_data_path(&self.stream_name) - .to_string_lossy(), - "data.parquet" - ) - } - pub async fn process( &self, storage: &impl ObjectStorage, @@ -65,12 +174,11 @@ impl Event { })?; let event = self.get_reader(inferred_schema.clone()); - let size = self.body_size(); let stream_schema = metadata::STREAM_INFO.schema(&self.stream_name)?; let is_first_event = stream_schema.is_none(); - let compressed_size = if let Some(existing_schema) = stream_schema { + if let Some(existing_schema) = stream_schema { // validate schema before processing the event if existing_schema != inferred_schema { return Err(Error::SchemaMismatch(self.stream_name.clone())); @@ -84,11 +192,6 @@ impl Event { .await? }; - if let Err(e) = metadata::STREAM_INFO.update_stats(&self.stream_name, size, compressed_size) - { - error!("Couldn't update stream stats. {:?}", e); - } - if let Err(e) = metadata::STREAM_INFO.check_alerts(self).await { error!("Error checking for alerts. {:?}", e); } @@ -115,59 +218,44 @@ impl Event { storage: &impl ObjectStorage, ) -> Result { let rb = event.next()?.ok_or(Error::MissingRecord)?; + let stream_name = &self.stream_name; - // Store record batch to Parquet file on local cache - let compressed_size = self.convert_arrow_parquet(rb)?; + // Store record batch on local cache + STREAM_WRITERS::create_entry(stream_name.clone(), &rb).unwrap(); // Put the inferred schema to object store - let stream_name = &self.stream_name; - storage .put_schema(stream_name.clone(), &schema) .await .map_err(|e| response::EventError { msg: format!( "Failed to upload schema for log stream {} due to err: {}", - self.stream_name, e + stream_name, e ), })?; // set the schema in memory for this stream metadata::STREAM_INFO - .set_schema(&self.stream_name, schema) + .set_schema(stream_name, schema) .map_err(|e| response::EventError { msg: format!( "Failed to set schema for log stream {} due to err: {}", - &self.stream_name, e + stream_name, e ), })?; - Ok(compressed_size) + Ok(0) } // event process all events after the 1st event. Concatenates record batches // and puts them in memory store for each event. fn process_event(&self, mut event: json::Reader) -> Result { - let next_event_rb = event.next()?.ok_or(Error::MissingRecord)?; - - let compressed_size = match self.convert_parquet_rb_reader() { - Ok(mut arrow_reader) => { - let mut total_size = 0; - let rb = arrow_reader.get_record_reader(2048).unwrap(); - for prev_rb in rb { - let new_rb = RecordBatch::concat( - &std::sync::Arc::new(arrow_reader.get_schema().unwrap()), - &[next_event_rb.clone(), prev_rb.unwrap()], - )?; - total_size += self.convert_arrow_parquet(new_rb)?; - } + let rb = event.next()?.ok_or(Error::MissingRecord)?; + let stream_name = &self.stream_name; - total_size - } - Err(_) => self.convert_arrow_parquet(next_event_rb)?, - }; + STREAM_WRITERS::append_to_local(stream_name, &rb).map_err(|_| Error::MissingRecord)?; - Ok(compressed_size) + Ok(0) } // inferSchema is a constructor to Schema @@ -187,32 +275,4 @@ impl Event { json::reader::DecoderOptions::new().with_batch_size(1024), ) } - - fn body_size(&self) -> u64 { - self.body.as_bytes().len() as u64 - } - - // convert arrow record batch to parquet - // and write it to local cache path as a data.parquet file. - fn convert_arrow_parquet(&self, rb: RecordBatch) -> Result { - let parquet_path = self.data_file_path(); - let parquet_file = fs::File::create(&parquet_path)?; - let props = WriterProperties::builder().build(); - let mut writer = - ArrowWriter::try_new(parquet_file, Arc::new(self.infer_schema()?), Some(props))?; - writer.write(&rb)?; - writer.close()?; - - let compressed_size = fs::metadata(parquet_path)?.len(); - - Ok(compressed_size) - } - - pub fn convert_parquet_rb_reader(&self) -> Result { - let file = fs::File::open(&self.data_file_path())?; - let file_reader = SerializedFileReader::new(file)?; - let arrow_reader = ParquetFileArrowReader::new(Arc::new(file_reader)); - - Ok(arrow_reader) - } } diff --git a/server/src/handlers/logstream.rs b/server/src/handlers/logstream.rs index 288e979c7..ff0b59af0 100644 --- a/server/src/handlers/logstream.rs +++ b/server/src/handlers/logstream.rs @@ -22,9 +22,9 @@ use actix_web::http::StatusCode; use actix_web::{web, HttpRequest, HttpResponse, Responder}; use crate::alerts::Alerts; -use crate::response; use crate::s3::S3; use crate::storage::{ObjectStorage, StorageDir}; +use crate::{event, response}; use crate::{metadata, validator}; pub async fn delete(req: HttpRequest) -> HttpResponse { @@ -59,20 +59,27 @@ pub async fn delete(req: HttpRequest) -> HttpResponse { .to_http(); } - let stream_dir = StorageDir::new(&stream_name); - if fs::remove_dir_all(&stream_dir.data_path).is_err() { + if let Err(e) = metadata::STREAM_INFO.delete_stream(&stream_name) { log::warn!( - "failed to delete local data for stream {}. Clean {} manually", + "failed to delete log stream {} from metadata due to err: {}", stream_name, - stream_dir.data_path.to_string_lossy() + e ) } - if let Err(e) = metadata::STREAM_INFO.delete_stream(&stream_name) { + if event::STREAM_WRITERS::delete_entry(&stream_name).is_err() { log::warn!( - "failed to delete log stream {} from metadata due to err: {}", + "failed to delete log stream event writers for stream {}", + stream_name + ) + } + + let stream_dir = StorageDir::new(stream_name.clone()); + if fs::remove_dir_all(&stream_dir.data_path).is_err() { + log::warn!( + "failed to delete local data for stream {}. Clean {} manually", stream_name, - e + stream_dir.data_path.to_string_lossy() ) } diff --git a/server/src/main.rs b/server/src/main.rs index 11a16775f..92faffa21 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -73,7 +73,7 @@ async fn main() -> anyhow::Result<()> { warn!("could not populate local metadata. {:?}", e); } - // Move all exiting data.parquet file to their respective tmp directory + // Move all exiting data.records file to their respective tmp directory // they will be synced to object store on next s3 sync cycle startup_sync(); @@ -112,11 +112,12 @@ fn startup_sync() { } for stream in metadata::STREAM_INFO.list_streams() { - let dir = StorageDir::new(&stream); - // if data.parquet file is not present then skip this stream - if !dir.parquet_path_exists() { + let dir = StorageDir::new(stream.clone()); + // if data.records file is not present then skip this stream + if !dir.local_data_exists() { continue; } + if let Err(e) = dir.create_temp_dir() { log::error!( "Error creating tmp directory for {} due to error [{}]", @@ -126,7 +127,7 @@ fn startup_sync() { continue; } // create prefix for this file from its last modified time - let path = dir.data_path.join("data.parquet"); + let path = dir.data_path.join("data.records"); // metadata.modified gives us system time // This may not work on all platfomns @@ -156,7 +157,7 @@ fn startup_sync() { let local_uri = str::replace(&uri, "/", "."); let hostname = utils::hostname_unchecked(); let parquet_file_local = format!("{}{}.data.parquet", local_uri, hostname); - if let Err(err) = dir.move_parquet_to_temp(parquet_file_local) { + if let Err(err) = dir.move_local_to_temp(parquet_file_local) { log::warn!( "Failed to move parquet file at {} to tmp directory due to error {}", path.display(), @@ -269,7 +270,7 @@ async fn run_http() -> anyhow::Result<()> { (_, _) => None, }; - let http_server = HttpServer::new(move || create_app!()); + let http_server = HttpServer::new(move || create_app!()).workers(num_cpus::get() - 1); if let Some(builder) = ssl_acceptor { http_server .bind_openssl(&CONFIG.parseable.address, builder)? diff --git a/server/src/storage.rs b/server/src/storage.rs index 153ac3417..103fb8e4a 100644 --- a/server/src/storage.rs +++ b/server/src/storage.rs @@ -20,16 +20,21 @@ use crate::alerts::Alerts; use crate::metadata::{Stats, STREAM_INFO}; use crate::option::CONFIG; use crate::query::Query; -use crate::utils; +use crate::{event, utils}; use async_trait::async_trait; use chrono::{Duration, Timelike, Utc}; use datafusion::arrow::datatypes::Schema; +use datafusion::arrow::error::ArrowError; +use datafusion::arrow::ipc::reader::StreamReader; use datafusion::arrow::record_batch::RecordBatch; +use datafusion::parquet::arrow::ArrowWriter; +use datafusion::parquet::errors::ParquetError; +use datafusion::parquet::file::properties::WriterProperties; use serde::Serialize; use std::fmt::Debug; -use std::fs; +use std::fs::{self, File}; use std::io; use std::iter::Iterator; use std::path::{Path, PathBuf}; @@ -37,7 +42,7 @@ use std::path::{Path, PathBuf}; extern crate walkdir; use walkdir::WalkDir; -/// local sync interval to move data.parquet to /tmp dir of that stream. +/// local sync interval to move data.records to /tmp dir of that stream. /// 60 sec is a reasonable value. pub const LOCAL_SYNC_INTERVAL: u64 = 60; @@ -79,10 +84,10 @@ pub trait ObjectStorage: Sync + 'static { // entries here means all the streams present on local disk for stream in streams { - let sync = StorageSync::new(&stream); + let sync = StorageSync::new(stream.clone()); - // if data.parquet file not present, skip this stream - if !sync.dir.parquet_path_exists() { + // if data.records file not present, skip this stream + if !sync.dir.local_data_exists() { continue; } @@ -95,14 +100,21 @@ pub trait ObjectStorage: Sync + 'static { continue; } - if let Err(e) = sync.move_parquet_to_temp() { - log::error!( - "Error copying parquet from stream directory in [{}] to tmp directory [{}] due to error [{}]", - sync.dir.data_path.to_string_lossy(), - sync.dir.temp_dir.to_string_lossy(), - e - ); - continue; + match sync.move_local_to_temp() { + Ok(parquet_size) => { + if let Err(e) = STREAM_INFO.update_stats(&stream, 0, parquet_size) { + log::error!("Couldn't update stream stats. {:?}", e); + } + } + Err(e) => { + log::error!( + "Error copying parquet from stream directory in [{}] to tmp directory [{}] due to error [{}]", + sync.dir.data_path.to_string_lossy(), + sync.dir.temp_dir.to_string_lossy(), + e + ); + continue; + } } } @@ -117,7 +129,7 @@ pub trait ObjectStorage: Sync + 'static { let streams = STREAM_INFO.list_streams(); for stream in streams { - let dir = StorageDir::new(&stream); + let dir = StorageDir::new(stream.clone()); for file in WalkDir::new(dir.temp_dir) .min_depth(1) @@ -126,6 +138,14 @@ pub trait ObjectStorage: Sync + 'static { .filter_map(|file| file.ok()) .map(|file| file.path().to_path_buf()) .filter(|file| file.is_file()) + .filter(|file| { + let is_tmp = match file.extension() { + Some(ext) => ext.eq_ignore_ascii_case("tmp"), + None => false, + }; + + !is_tmp + }) { let filename = file.file_name().unwrap().to_str().unwrap(); let file_suffix = str::replacen(filename, ".", "/", 3); @@ -152,50 +172,101 @@ pub struct LogStream { #[derive(Debug)] pub struct StorageDir { + pub stream_name: String, pub data_path: PathBuf, pub temp_dir: PathBuf, } +// Storage Dir is a type which can move files form datapath to temp dir impl StorageDir { - pub fn new(stream_name: &str) -> Self { - let data_path = CONFIG.parseable.local_stream_data_path(stream_name); + pub fn new(stream_name: String) -> Self { + let data_path = CONFIG.parseable.local_stream_data_path(&stream_name); let temp_dir = data_path.join("tmp"); Self { + stream_name, data_path, temp_dir, } } + // create tmp dir if it does not exist pub fn create_temp_dir(&self) -> io::Result<()> { fs::create_dir_all(&self.temp_dir) } - pub fn move_parquet_to_temp(&self, filename: String) -> io::Result<()> { - fs::rename( - self.data_path.join("data.parquet"), - self.temp_dir.join(filename), - ) + pub fn move_local_to_temp(&self, filename: String) -> Result { + let record_tmp_file_path = self.temp_dir.join(filename.clone() + ".tmp"); + fs::rename(self.data_path.join("data.records"), &record_tmp_file_path) + .map_err(|_| MoveDataError::Rename)?; + event::STREAM_WRITERS::unset_entry(&self.stream_name); + let file = File::open(&record_tmp_file_path).map_err(|_| MoveDataError::Open)?; + let reader = StreamReader::try_new(file, None)?; + let schema = reader.schema(); + let records = reader.filter_map(|record| match record { + Ok(record) => Some(record), + Err(e) => { + log::warn!("error when reading from arrow stream {:?}", e); + None + } + }); + + let parquet_path = self.temp_dir.join(filename); + let parquet_file = fs::File::create(&parquet_path).map_err(|_| MoveDataError::Create)?; + let props = WriterProperties::builder().build(); + let mut writer = ArrowWriter::try_new(parquet_file, schema, Some(props))?; + + for ref record in records { + writer.write(record)?; + } + + writer.close()?; + + fs::remove_file(record_tmp_file_path).map_err(|_| MoveDataError::Delete)?; + + let compressed_size = fs::metadata(parquet_path) + .map_err(|_| MoveDataError::Metadata)? + .len(); + + Ok(compressed_size) } - pub fn parquet_path_exists(&self) -> bool { - self.data_path.join("data.parquet").exists() + pub fn local_data_exists(&self) -> bool { + self.data_path.join("data.records").exists() } } +#[derive(Debug, thiserror::Error)] +pub enum MoveDataError { + #[error("Failed to rename file")] + Rename, + #[error("Unable to Open file after moving")] + Open, + #[error("Unable to create recordbatch stream")] + Arrow(#[from] ArrowError), + #[error("Could not generate parquet file")] + Parquet(#[from] ParquetError), + #[error("Could not generate parquet file")] + Create, + #[error("Could not delete temp arrow file")] + Delete, + #[error("Could not fetch metadata of moved parquet file")] + Metadata, +} + struct StorageSync { pub dir: StorageDir, time: chrono::DateTime, } impl StorageSync { - fn new(stream_name: &str) -> Self { + fn new(stream_name: String) -> Self { let dir = StorageDir::new(stream_name); let time = Utc::now(); Self { dir, time } } - fn move_parquet_to_temp(&self) -> io::Result<()> { + fn move_local_to_temp(&self) -> Result { let time = self.time - Duration::minutes(OBJECT_STORE_DATA_GRANULARITY as i64); let uri = utils::date_to_prefix(time.date()) + &utils::hour_to_prefix(time.hour()) @@ -203,7 +274,7 @@ impl StorageSync { let local_uri = str::replace(&uri, "/", "."); let hostname = utils::hostname_unchecked(); let parquet_file_local = format!("{}{}.data.parquet", local_uri, hostname); - self.dir.move_parquet_to_temp(parquet_file_local) + self.dir.move_local_to_temp(parquet_file_local) } }