Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 25 additions & 6 deletions server/src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,13 +213,30 @@ fn commit_schema(
.expect("map of schemas is serializable");
// try to put to storage
let storage = CONFIG.storage().get_object_store();
let res = futures::executor::block_on(storage.put_schema_map(stream_name, &schema_map));

let _stream_name = stream_name.to_owned();
let handle = std::thread::spawn(move || {
let rt = actix_web::rt::System::new();
rt.block_on(storage.put_schema_map(&_stream_name, &schema_map))
});

let res = match handle.join() {
Ok(res) => res.map_err(EventError::ObjectStorage),
Err(_) => {
log::error!("commit schema thread panicked");
Err(EventError::InternalError)
}
};
// revert if err
if res.is_err() {
stream_metadata.remove_unchecked(stream_name, schema_key)
if let Err(ref err) = res {
stream_metadata.remove_unchecked(stream_name, schema_key);
log::error!(
"Failed to commit schema during new event ingestion: {}",
err
)
}
// return result
res.map_err(|err| err.into())

res
}
}

Expand Down Expand Up @@ -288,8 +305,10 @@ pub mod error {
Arrow(#[from] ArrowError),
#[error("Schema Mismatch")]
SchemaMismatch,
#[error("Schema Mismatch: {0}")]
#[error("ObjectStorage Error: {0}")]
ObjectStorage(#[from] ObjectStorageError),
#[error("Internal Error")]
InternalError,
}
}

Expand Down