diff --git a/hermes/src/aggregate.rs b/hermes/src/aggregate.rs index 81b0b56a25..d88d5db26d 100644 --- a/hermes/src/aggregate.rs +++ b/hermes/src/aggregate.rs @@ -264,23 +264,22 @@ pub async fn store_update(state: &State, update: Update) -> Result<()> { // Update the aggregate state let mut aggregate_state = state.aggregate_state.write().await; - // Check if the update is new or out of order - match aggregate_state.latest_completed_slot { + // Send update event to subscribers. We are purposefully ignoring the result + // because there might be no subscribers. + let _ = match aggregate_state.latest_completed_slot { None => { aggregate_state.latest_completed_slot.replace(slot); - state.api_update_tx.send(AggregationEvent::New { slot })?; + state.api_update_tx.send(AggregationEvent::New { slot }) } Some(latest) if slot > latest => { state.prune_removed_keys(message_state_keys).await; aggregate_state.latest_completed_slot.replace(slot); - state.api_update_tx.send(AggregationEvent::New { slot })?; + state.api_update_tx.send(AggregationEvent::New { slot }) } - _ => { - state - .api_update_tx - .send(AggregationEvent::OutOfOrder { slot })?; - } - } + _ => state + .api_update_tx + .send(AggregationEvent::OutOfOrder { slot }), + }; aggregate_state.latest_completed_slot = aggregate_state .latest_completed_slot diff --git a/hermes/src/network/wormhole.rs b/hermes/src/network/wormhole.rs index 1a6f03d9b4..d96aad2c79 100644 --- a/hermes/src/network/wormhole.rs +++ b/hermes/src/network/wormhole.rs @@ -225,7 +225,10 @@ pub async fn process_message(state: Arc, vaa_bytes: Vec) -> Result<() )?; // Finally, store the resulting VAA in Hermes. - store_vaa(state.clone(), vaa.sequence, vaa_bytes).await?; + let sequence = vaa.sequence; + tokio::spawn(async move { + store_vaa(state.clone(), sequence, vaa_bytes).await; + }); Ok(()) } @@ -334,16 +337,14 @@ pub fn verify_vaa<'a>( } #[tracing::instrument(skip(state, vaa_bytes))] -pub async fn store_vaa(state: Arc, sequence: u64, vaa_bytes: Vec) -> Result<()> { +pub async fn store_vaa(state: Arc, sequence: u64, vaa_bytes: Vec) { // Check VAA hasn't already been seen, this may have been checked previously - // but due to async nature It's possible other threads have mutated the state + // but due to async nature it's possible other threads have mutated the state // since this VAA started processing. let mut observed_vaa_seqs = state.observed_vaa_seqs.write().await; - ensure!( - !observed_vaa_seqs.contains(&sequence), - "Previously observed VAA: {}", - sequence, - ); + if observed_vaa_seqs.contains(&sequence) { + return; + } // Clear old cached VAA sequences. while observed_vaa_seqs.len() > OBSERVED_CACHE_SIZE { @@ -351,5 +352,9 @@ pub async fn store_vaa(state: Arc, sequence: u64, vaa_bytes: Vec) -> } // Hand the VAA to the aggregate store. - crate::aggregate::store_update(&state, crate::aggregate::Update::Vaa(vaa_bytes)).await + if let Err(e) = + crate::aggregate::store_update(&state, crate::aggregate::Update::Vaa(vaa_bytes)).await + { + tracing::error!(error = ?e, "Failed to store VAA in aggregate store."); + } }