diff --git a/server/src/event.rs b/server/src/event.rs index 0b27c5a89..6fac8a684 100644 --- a/server/src/event.rs +++ b/server/src/event.rs @@ -16,6 +16,7 @@ * * */ +use actix_web::rt::spawn; use datafusion::arrow; use datafusion::arrow::datatypes::Schema; use datafusion::arrow::error::ArrowError; @@ -24,17 +25,19 @@ use datafusion::arrow::json; use datafusion::arrow::json::reader::infer_json_schema; use datafusion::arrow::record_batch::RecordBatch; use lazy_static::lazy_static; -use log::error; use std::collections::HashMap; use std::fs::OpenOptions; use std::io::BufReader; +use std::ops::{Deref, DerefMut}; use std::sync::Arc; use std::sync::Mutex; use std::sync::MutexGuard; use std::sync::RwLock; use crate::metadata; +use crate::metadata::LOCK_EXPECT; use crate::option::CONFIG; +use crate::s3; use crate::storage::ObjectStorage; use self::error::EventError; @@ -190,7 +193,7 @@ pub struct Event { // Events holds the schema related to a each event for a single log stream impl Event { - pub async fn process(&self, storage: &impl ObjectStorage) -> Result<(), EventError> { + pub async fn process(&self) -> Result<(), EventError> { let inferred_schema = self.infer_schema()?; let event = self.get_reader(inferred_schema.clone()); @@ -207,12 +210,11 @@ impl Event { } else { // if stream schema is none then it is first event, // process first event and store schema in obect store - self.process_first_event(event, inferred_schema, storage) - .await? + self.process_first_event::(event, inferred_schema)? }; if let Err(e) = metadata::STREAM_INFO.check_alerts(self).await { - error!("Error checking for alerts. {:?}", e); + log::error!("Error checking for alerts. {:?}", e); } Ok(()) @@ -221,25 +223,67 @@ impl Event { // This is called when the first event of a log stream is received. The first event is // special because we parse this event to generate the schema for the log stream. This // schema is then enforced on rest of the events sent to this log stream. - async fn process_first_event( + fn process_first_event( &self, mut event: json::Reader, schema: Schema, - storage: &impl ObjectStorage, ) -> Result { - let rb = event.next()?.ok_or(EventError::MissingRecord)?; - let stream_name = &self.stream_name; - - // Store record batch on local cache - STREAM_WRITERS::create_entry(stream_name.clone(), &rb).unwrap(); + // note for functions _schema_with_map and _set_schema_with_map, + // these are to be called while holding a write lock specifically. + // this guarantees two things + // - no other metadata operation can happen inbetween + // - map always have an entry for this stream - // Put the inferred schema to object store - storage.put_schema(stream_name.clone(), &schema).await?; + let stream_name = &self.stream_name; - // set the schema in memory for this stream - metadata::STREAM_INFO.set_schema(stream_name, schema)?; + let mut stream_metadata = metadata::STREAM_INFO.write().expect(LOCK_EXPECT); + // if the metadata is not none after acquiring lock + // then some other thread has already completed this function. + if _schema_with_map(stream_name, &stream_metadata).is_some() { + // drop the lock + drop(stream_metadata); + // Try to post event usual way + log::info!("first event is redirected to process_event"); + self.process_event(event) + } else { + // stream metadata is still none, + // this means this execution should be considered as first event. + + // Store record batch on local cache + log::info!("creating local writer for this first event"); + let rb = event.next()?.ok_or(EventError::MissingRecord)?; + STREAM_WRITERS::append_to_local(stream_name, &rb)?; + + log::info!("schema is set in memory map for logstream {}", stream_name); + _set_schema_with_map(stream_name, schema.clone(), &mut stream_metadata); + // drop mutex before going across await point + drop(stream_metadata); + + log::info!( + "setting schema on objectstore for logstream {}", + stream_name + ); + let storage = S::new(); + + let stream_name = stream_name.clone(); + spawn(async move { + if let Err(e) = storage.put_schema(stream_name.clone(), &schema).await { + // If this call has failed then currently there is no right way to make local state consistent + // this needs a fix after more constraints are safety guarentee is provided by localwriter and s3_sync. + // Reasoning - + // - After dropping lock many events may process through + // - Processed events may sync before metadata deletion + log::error!( + "Parseable failed to upload schema to objectstore due to error {}", + e + ); + log::error!("Please manually delete this logstream and create a new one."); + metadata::STREAM_INFO.delete_stream(&stream_name); + } + }); - Ok(0) + Ok(0) + } } // event process all events after the 1st event. Concatenates record batches @@ -273,6 +317,31 @@ impl Event { } } +// Special functions which reads from metadata map while holding the lock +#[inline] +pub fn _schema_with_map( + stream_name: &str, + map: &impl Deref>, +) -> Option { + map.get(stream_name) + .expect("map has entry for this stream name") + .schema + .to_owned() +} + +#[inline] +// Special functions which writes to metadata map while holding the lock +pub fn _set_schema_with_map( + stream_name: &str, + schema: Schema, + map: &mut impl DerefMut>, +) { + map.get_mut(stream_name) + .expect("map has entry for this stream name") + .schema + .replace(schema); +} + pub mod error { use crate::metadata::error::stream_info::MetadataError; use crate::storage::ObjectStorageError; diff --git a/server/src/handlers/event.rs b/server/src/handlers/event.rs index 5a3115655..1b6556258 100644 --- a/server/src/handlers/event.rs +++ b/server/src/handlers/event.rs @@ -64,20 +64,18 @@ pub async fn post_event( collect_labelled_headers(&req, PREFIX_META, SEPARATOR)?, )]); - let s3 = S3::new(); - if let Some(array) = body.as_array() { for body in array { let body = merge(body.clone(), metadata.clone()); let body = merge(body, tags.clone()); let body = flatten_json_body(web::Json(body)).unwrap(); - let e = event::Event { + let event = event::Event { body, stream_name: stream_name.clone(), }; - e.process(&s3).await?; + event.process().await?; } } else { let body = merge(body.clone(), metadata); @@ -88,7 +86,7 @@ pub async fn post_event( stream_name, }; - event.process(&s3).await?; + event.process().await?; } Ok(HttpResponse::Ok().finish()) diff --git a/server/src/handlers/logstream.rs b/server/src/handlers/logstream.rs index b5c8b637d..ae5f644fe 100644 --- a/server/src/handlers/logstream.rs +++ b/server/src/handlers/logstream.rs @@ -168,8 +168,6 @@ pub async fn put(req: HttpRequest) -> HttpResponse { if s3.get_schema(&stream_name).await.is_err() { // Fail if unable to create log stream on object store backend if let Err(e) = s3.create_stream(&stream_name).await { - // delete the stream from metadata because we couldn't create it on object store backend - metadata::STREAM_INFO.delete_stream(&stream_name); return response::ServerResponse { msg: format!( "failed to create log stream {} due to err: {}", diff --git a/server/src/metadata.rs b/server/src/metadata.rs index 34e7f06e5..3e1e124d5 100644 --- a/server/src/metadata.rs +++ b/server/src/metadata.rs @@ -90,6 +90,7 @@ impl STREAM_INFO { Ok(()) } + #[allow(dead_code)] pub fn set_schema(&self, stream_name: &str, schema: Schema) -> Result<(), MetadataError> { let mut map = self.write().expect(LOCK_EXPECT); map.get_mut(stream_name) diff --git a/server/src/s3.rs b/server/src/s3.rs index 5c4cd1fa2..1a4c35c9c 100644 --- a/server/src/s3.rs +++ b/server/src/s3.rs @@ -365,6 +365,10 @@ impl S3 { #[async_trait] impl ObjectStorage for S3 { + fn new() -> Self { + Self::new() + } + async fn check(&self) -> Result<(), ObjectStorageError> { self.client .head_bucket() diff --git a/server/src/storage.rs b/server/src/storage.rs index 18ab005eb..f60903f65 100644 --- a/server/src/storage.rs +++ b/server/src/storage.rs @@ -52,6 +52,7 @@ pub const OBJECT_STORE_DATA_GRANULARITY: u32 = (LOCAL_SYNC_INTERVAL as u32) / 60 #[async_trait] pub trait ObjectStorage: Sync + 'static { + fn new() -> Self; async fn check(&self) -> Result<(), ObjectStorageError>; async fn put_schema( &self,