From 0cdbcf3160f776e6c51ff075f3b780ff182f5333 Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Mon, 3 Oct 2022 11:30:18 +0530 Subject: [PATCH 1/3] Lock metadata on first event. When multiple requests reaches event processing too quickly then many threads race to register first event. This is problematic as it means server may drop many request after erroring/panicking on local writer. This PR fixes this issue by locking metadata with write lock beforehand in case of first event. This prevents other thread from reading metadata and thus blocks other requests until first event has processed. Todo: This can be seperate issue but as metadata expects that all it's method are panic free there should not be any panic inside s3 client when processing for first event --- server/src/event.rs | 83 ++++++++++++++++++++++++++++-------- server/src/handlers/event.rs | 8 ++-- server/src/metadata.rs | 1 + server/src/s3.rs | 4 ++ server/src/storage.rs | 1 + 5 files changed, 75 insertions(+), 22 deletions(-) diff --git a/server/src/event.rs b/server/src/event.rs index 0b27c5a89..a9f14c9a1 100644 --- a/server/src/event.rs +++ b/server/src/event.rs @@ -24,17 +24,19 @@ use datafusion::arrow::json; use datafusion::arrow::json::reader::infer_json_schema; use datafusion::arrow::record_batch::RecordBatch; use lazy_static::lazy_static; -use log::error; use std::collections::HashMap; use std::fs::OpenOptions; use std::io::BufReader; +use std::ops::{Deref, DerefMut}; use std::sync::Arc; use std::sync::Mutex; use std::sync::MutexGuard; use std::sync::RwLock; use crate::metadata; +use crate::metadata::LOCK_EXPECT; use crate::option::CONFIG; +use crate::s3; use crate::storage::ObjectStorage; use self::error::EventError; @@ -190,7 +192,7 @@ pub struct Event { // Events holds the schema related to a each event for a single log stream impl Event { - pub async fn process(&self, storage: &impl ObjectStorage) -> Result<(), EventError> { + pub async fn process(&self) -> Result<(), EventError> { let inferred_schema = self.infer_schema()?; let event = self.get_reader(inferred_schema.clone()); @@ -207,12 +209,12 @@ impl Event { } else { // if stream schema is none then it is first event, // process first event and store schema in obect store - self.process_first_event(event, inferred_schema, storage) + self.process_first_event::(event, inferred_schema) .await? }; if let Err(e) = metadata::STREAM_INFO.check_alerts(self).await { - error!("Error checking for alerts. {:?}", e); + log::error!("Error checking for alerts. {:?}", e); } Ok(()) @@ -221,25 +223,47 @@ impl Event { // This is called when the first event of a log stream is received. The first event is // special because we parse this event to generate the schema for the log stream. This // schema is then enforced on rest of the events sent to this log stream. - async fn process_first_event( + async fn process_first_event( &self, mut event: json::Reader, schema: Schema, - storage: &impl ObjectStorage, ) -> Result { - let rb = event.next()?.ok_or(EventError::MissingRecord)?; - let stream_name = &self.stream_name; - - // Store record batch on local cache - STREAM_WRITERS::create_entry(stream_name.clone(), &rb).unwrap(); + // note for functions _schema_with_map and _set_schema_with_map, + // these are to be called while holding a write lock specifically. + // this guarantees two things + // - no other metadata operation can happen inbetween + // - map always have an entry for this stream - // Put the inferred schema to object store - storage.put_schema(stream_name.clone(), &schema).await?; - - // set the schema in memory for this stream - metadata::STREAM_INFO.set_schema(stream_name, schema)?; + let stream_name = &self.stream_name; - Ok(0) + let mut stream_metadata = metadata::STREAM_INFO.write().expect(LOCK_EXPECT); + // if the metadata is not none after acquiring lock + // then some other thread has already completed this function. + if _schema_with_map(stream_name, &stream_metadata).is_some() { + // drop the lock + drop(stream_metadata); + // Try to post event usual way + log::info!("first event is redirected to process_event"); + self.process_event(event) + } else { + // stream metadata is still none, + // this means this execution should be considered as first event. + log::info!( + "setting schema on objectstore for logstream {}", + stream_name + ); + let storage = S::new(); + storage.put_schema(stream_name.clone(), &schema).await?; + // Store record batch on local cache + log::info!("creating local writer for this first event"); + let rb = event.next()?.ok_or(EventError::MissingRecord)?; + STREAM_WRITERS::append_to_local(stream_name, &rb)?; + + log::info!("schema is set in memory map for logstream {}", stream_name); + _set_schema_with_map(stream_name, schema, &mut stream_metadata); + + Ok(0) + } } // event process all events after the 1st event. Concatenates record batches @@ -273,6 +297,31 @@ impl Event { } } +// Special functions which reads from metadata map while holding the lock +#[inline] +pub fn _schema_with_map( + stream_name: &str, + map: &impl Deref>, +) -> Option { + map.get(stream_name) + .expect("map has entry for this stream name") + .schema + .to_owned() +} + +#[inline] +// Special functions which writes to metadata map while holding the lock +pub fn _set_schema_with_map( + stream_name: &str, + schema: Schema, + map: &mut impl DerefMut>, +) { + map.get_mut(stream_name) + .expect("map has entry for this stream name") + .schema + .replace(schema); +} + pub mod error { use crate::metadata::error::stream_info::MetadataError; use crate::storage::ObjectStorageError; diff --git a/server/src/handlers/event.rs b/server/src/handlers/event.rs index 5a3115655..1b6556258 100644 --- a/server/src/handlers/event.rs +++ b/server/src/handlers/event.rs @@ -64,20 +64,18 @@ pub async fn post_event( collect_labelled_headers(&req, PREFIX_META, SEPARATOR)?, )]); - let s3 = S3::new(); - if let Some(array) = body.as_array() { for body in array { let body = merge(body.clone(), metadata.clone()); let body = merge(body, tags.clone()); let body = flatten_json_body(web::Json(body)).unwrap(); - let e = event::Event { + let event = event::Event { body, stream_name: stream_name.clone(), }; - e.process(&s3).await?; + event.process().await?; } } else { let body = merge(body.clone(), metadata); @@ -88,7 +86,7 @@ pub async fn post_event( stream_name, }; - event.process(&s3).await?; + event.process().await?; } Ok(HttpResponse::Ok().finish()) diff --git a/server/src/metadata.rs b/server/src/metadata.rs index 34e7f06e5..3e1e124d5 100644 --- a/server/src/metadata.rs +++ b/server/src/metadata.rs @@ -90,6 +90,7 @@ impl STREAM_INFO { Ok(()) } + #[allow(dead_code)] pub fn set_schema(&self, stream_name: &str, schema: Schema) -> Result<(), MetadataError> { let mut map = self.write().expect(LOCK_EXPECT); map.get_mut(stream_name) diff --git a/server/src/s3.rs b/server/src/s3.rs index 5c4cd1fa2..1a4c35c9c 100644 --- a/server/src/s3.rs +++ b/server/src/s3.rs @@ -365,6 +365,10 @@ impl S3 { #[async_trait] impl ObjectStorage for S3 { + fn new() -> Self { + Self::new() + } + async fn check(&self) -> Result<(), ObjectStorageError> { self.client .head_bucket() diff --git a/server/src/storage.rs b/server/src/storage.rs index 18ab005eb..f60903f65 100644 --- a/server/src/storage.rs +++ b/server/src/storage.rs @@ -52,6 +52,7 @@ pub const OBJECT_STORE_DATA_GRANULARITY: u32 = (LOCAL_SYNC_INTERVAL as u32) / 60 #[async_trait] pub trait ObjectStorage: Sync + 'static { + fn new() -> Self; async fn check(&self) -> Result<(), ObjectStorageError>; async fn put_schema( &self, From fc30dc2ea34d5461798d420bd45cca4a72eff211 Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Mon, 3 Oct 2022 14:43:29 +0530 Subject: [PATCH 2/3] Drop writer guard before calling s3 --- server/src/event.rs | 30 +++++++++++++++++++++++------- server/src/handlers/logstream.rs | 2 -- 2 files changed, 23 insertions(+), 9 deletions(-) diff --git a/server/src/event.rs b/server/src/event.rs index a9f14c9a1..d9c20590b 100644 --- a/server/src/event.rs +++ b/server/src/event.rs @@ -248,19 +248,35 @@ impl Event { } else { // stream metadata is still none, // this means this execution should be considered as first event. - log::info!( - "setting schema on objectstore for logstream {}", - stream_name - ); - let storage = S::new(); - storage.put_schema(stream_name.clone(), &schema).await?; + // Store record batch on local cache log::info!("creating local writer for this first event"); let rb = event.next()?.ok_or(EventError::MissingRecord)?; STREAM_WRITERS::append_to_local(stream_name, &rb)?; log::info!("schema is set in memory map for logstream {}", stream_name); - _set_schema_with_map(stream_name, schema, &mut stream_metadata); + _set_schema_with_map(stream_name, schema.clone(), &mut stream_metadata); + // drop mutex before going across await point + drop(stream_metadata); + + log::info!( + "setting schema on objectstore for logstream {}", + stream_name + ); + let storage = S::new(); + if let Err(e) = storage.put_schema(stream_name.clone(), &schema).await { + // If this call has failed then currently there is no right way to make local state consistent + // this needs a fix after more constraints are safety guarentee is provided by localwriter and s3_sync. + // Reasoning - + // - After dropping lock many events may process through + // - Processed events may sync before metadata deletion + log::error!( + "Parseable failed to upload schema to objectstore due to error {}", + e + ); + log::error!("Please manually delete this logstream and create a new one."); + metadata::STREAM_INFO.delete_stream(stream_name); + } Ok(0) } diff --git a/server/src/handlers/logstream.rs b/server/src/handlers/logstream.rs index b5c8b637d..ae5f644fe 100644 --- a/server/src/handlers/logstream.rs +++ b/server/src/handlers/logstream.rs @@ -168,8 +168,6 @@ pub async fn put(req: HttpRequest) -> HttpResponse { if s3.get_schema(&stream_name).await.is_err() { // Fail if unable to create log stream on object store backend if let Err(e) = s3.create_stream(&stream_name).await { - // delete the stream from metadata because we couldn't create it on object store backend - metadata::STREAM_INFO.delete_stream(&stream_name); return response::ServerResponse { msg: format!( "failed to create log stream {} due to err: {}", From f2844e1227de7af9329410a4aea55441f9533147 Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Mon, 3 Oct 2022 15:22:50 +0530 Subject: [PATCH 3/3] Move to sync context and drop lock --- server/src/event.rs | 36 ++++++++++++++++++++---------------- 1 file changed, 20 insertions(+), 16 deletions(-) diff --git a/server/src/event.rs b/server/src/event.rs index d9c20590b..6fac8a684 100644 --- a/server/src/event.rs +++ b/server/src/event.rs @@ -16,6 +16,7 @@ * * */ +use actix_web::rt::spawn; use datafusion::arrow; use datafusion::arrow::datatypes::Schema; use datafusion::arrow::error::ArrowError; @@ -209,8 +210,7 @@ impl Event { } else { // if stream schema is none then it is first event, // process first event and store schema in obect store - self.process_first_event::(event, inferred_schema) - .await? + self.process_first_event::(event, inferred_schema)? }; if let Err(e) = metadata::STREAM_INFO.check_alerts(self).await { @@ -223,7 +223,7 @@ impl Event { // This is called when the first event of a log stream is received. The first event is // special because we parse this event to generate the schema for the log stream. This // schema is then enforced on rest of the events sent to this log stream. - async fn process_first_event( + fn process_first_event( &self, mut event: json::Reader, schema: Schema, @@ -264,19 +264,23 @@ impl Event { stream_name ); let storage = S::new(); - if let Err(e) = storage.put_schema(stream_name.clone(), &schema).await { - // If this call has failed then currently there is no right way to make local state consistent - // this needs a fix after more constraints are safety guarentee is provided by localwriter and s3_sync. - // Reasoning - - // - After dropping lock many events may process through - // - Processed events may sync before metadata deletion - log::error!( - "Parseable failed to upload schema to objectstore due to error {}", - e - ); - log::error!("Please manually delete this logstream and create a new one."); - metadata::STREAM_INFO.delete_stream(stream_name); - } + + let stream_name = stream_name.clone(); + spawn(async move { + if let Err(e) = storage.put_schema(stream_name.clone(), &schema).await { + // If this call has failed then currently there is no right way to make local state consistent + // this needs a fix after more constraints are safety guarentee is provided by localwriter and s3_sync. + // Reasoning - + // - After dropping lock many events may process through + // - Processed events may sync before metadata deletion + log::error!( + "Parseable failed to upload schema to objectstore due to error {}", + e + ); + log::error!("Please manually delete this logstream and create a new one."); + metadata::STREAM_INFO.delete_stream(&stream_name); + } + }); Ok(0) }