From 97d0e5aac141cf3dfa6c90f4e5508876a34d6dee Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Fri, 10 Feb 2023 12:58:43 +0530 Subject: [PATCH 1/2] Fix executor block --- server/src/event.rs | 32 ++++++++++++++++++++++++++------ 1 file changed, 26 insertions(+), 6 deletions(-) diff --git a/server/src/event.rs b/server/src/event.rs index 0941e6a0f..52b4db1fb 100644 --- a/server/src/event.rs +++ b/server/src/event.rs @@ -213,13 +213,31 @@ fn commit_schema( .expect("map of schemas is serializable"); // try to put to storage let storage = CONFIG.storage().get_object_store(); - let res = futures::executor::block_on(storage.put_schema_map(stream_name, &schema_map)); + + let _stream_name = stream_name.to_owned(); + let _schema_map = schema_map.to_owned(); + let handle = std::thread::spawn(move || { + let rt = actix_web::rt::System::new(); + rt.block_on(storage.put_schema_map(&_stream_name, &_schema_map)) + }); + + let res = match handle.join() { + Ok(res) => res.map_err(|err| EventError::ObjectStorage(err)), + Err(_) => { + log::error!("commit schema thread panicked"); + Err(EventError::InternalError) + } + }; // revert if err - if res.is_err() { - stream_metadata.remove_unchecked(stream_name, schema_key) + if let Err(ref err) = res { + stream_metadata.remove_unchecked(stream_name, schema_key); + log::error!( + "Failed to commit schema during new event ingestion: {}", + err + ) } - // return result - res.map_err(|err| err.into()) + + res } } @@ -288,8 +306,10 @@ pub mod error { Arrow(#[from] ArrowError), #[error("Schema Mismatch")] SchemaMismatch, - #[error("Schema Mismatch: {0}")] + #[error("ObjectStorage Error: {0}")] ObjectStorage(#[from] ObjectStorageError), + #[error("Internal Error")] + InternalError, } } From dcc800aac0d8b87cd5bde0e3495ab17db4f60b16 Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Fri, 10 Feb 2023 13:04:48 +0530 Subject: [PATCH 2/2] Fix --- server/src/event.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/server/src/event.rs b/server/src/event.rs index 52b4db1fb..49750af2f 100644 --- a/server/src/event.rs +++ b/server/src/event.rs @@ -215,14 +215,13 @@ fn commit_schema( let storage = CONFIG.storage().get_object_store(); let _stream_name = stream_name.to_owned(); - let _schema_map = schema_map.to_owned(); let handle = std::thread::spawn(move || { let rt = actix_web::rt::System::new(); - rt.block_on(storage.put_schema_map(&_stream_name, &_schema_map)) + rt.block_on(storage.put_schema_map(&_stream_name, &schema_map)) }); let res = match handle.join() { - Ok(res) => res.map_err(|err| EventError::ObjectStorage(err)), + Ok(res) => res.map_err(EventError::ObjectStorage), Err(_) => { log::error!("commit schema thread panicked"); Err(EventError::InternalError)