diff --git a/server/src/event.rs b/server/src/event.rs index 0941e6a0f..49750af2f 100644 --- a/server/src/event.rs +++ b/server/src/event.rs @@ -213,13 +213,30 @@ fn commit_schema( .expect("map of schemas is serializable"); // try to put to storage let storage = CONFIG.storage().get_object_store(); - let res = futures::executor::block_on(storage.put_schema_map(stream_name, &schema_map)); + + let _stream_name = stream_name.to_owned(); + let handle = std::thread::spawn(move || { + let rt = actix_web::rt::System::new(); + rt.block_on(storage.put_schema_map(&_stream_name, &schema_map)) + }); + + let res = match handle.join() { + Ok(res) => res.map_err(EventError::ObjectStorage), + Err(_) => { + log::error!("commit schema thread panicked"); + Err(EventError::InternalError) + } + }; // revert if err - if res.is_err() { - stream_metadata.remove_unchecked(stream_name, schema_key) + if let Err(ref err) = res { + stream_metadata.remove_unchecked(stream_name, schema_key); + log::error!( + "Failed to commit schema during new event ingestion: {}", + err + ) } - // return result - res.map_err(|err| err.into()) + + res } } @@ -288,8 +305,10 @@ pub mod error { Arrow(#[from] ArrowError), #[error("Schema Mismatch")] SchemaMismatch, - #[error("Schema Mismatch: {0}")] + #[error("ObjectStorage Error: {0}")] ObjectStorage(#[from] ObjectStorageError), + #[error("Internal Error")] + InternalError, } }