Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 9 additions & 10 deletions hermes/src/aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,23 +264,22 @@ pub async fn store_update(state: &State, update: Update) -> Result<()> {
// Update the aggregate state
let mut aggregate_state = state.aggregate_state.write().await;

// Check if the update is new or out of order
match aggregate_state.latest_completed_slot {
// Send update event to subscribers. We are purposefully ignoring the result
// because there might be no subscribers.
let _ = match aggregate_state.latest_completed_slot {
None => {
aggregate_state.latest_completed_slot.replace(slot);
state.api_update_tx.send(AggregationEvent::New { slot })?;
state.api_update_tx.send(AggregationEvent::New { slot })
}
Some(latest) if slot > latest => {
state.prune_removed_keys(message_state_keys).await;
aggregate_state.latest_completed_slot.replace(slot);
state.api_update_tx.send(AggregationEvent::New { slot })?;
state.api_update_tx.send(AggregationEvent::New { slot })
}
_ => {
state
.api_update_tx
.send(AggregationEvent::OutOfOrder { slot })?;
}
}
_ => state
.api_update_tx
.send(AggregationEvent::OutOfOrder { slot }),
};

aggregate_state.latest_completed_slot = aggregate_state
.latest_completed_slot
Expand Down
23 changes: 14 additions & 9 deletions hermes/src/network/wormhole.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,10 @@ pub async fn process_message(state: Arc<State>, vaa_bytes: Vec<u8>) -> Result<()
)?;

// Finally, store the resulting VAA in Hermes.
store_vaa(state.clone(), vaa.sequence, vaa_bytes).await?;
let sequence = vaa.sequence;
tokio::spawn(async move {
store_vaa(state.clone(), sequence, vaa_bytes).await;
});

Ok(())
}
Expand Down Expand Up @@ -334,22 +337,24 @@ pub fn verify_vaa<'a>(
}

#[tracing::instrument(skip(state, vaa_bytes))]
pub async fn store_vaa(state: Arc<State>, sequence: u64, vaa_bytes: Vec<u8>) -> Result<()> {
pub async fn store_vaa(state: Arc<State>, sequence: u64, vaa_bytes: Vec<u8>) {
// Check VAA hasn't already been seen, this may have been checked previously
// but due to async nature It's possible other threads have mutated the state
// but due to async nature it's possible other threads have mutated the state
// since this VAA started processing.
let mut observed_vaa_seqs = state.observed_vaa_seqs.write().await;
ensure!(
!observed_vaa_seqs.contains(&sequence),
"Previously observed VAA: {}",
sequence,
);
if observed_vaa_seqs.contains(&sequence) {
return;
}

// Clear old cached VAA sequences.
while observed_vaa_seqs.len() > OBSERVED_CACHE_SIZE {
observed_vaa_seqs.pop_first();
}

// Hand the VAA to the aggregate store.
crate::aggregate::store_update(&state, crate::aggregate::Update::Vaa(vaa_bytes)).await
if let Err(e) =
crate::aggregate::store_update(&state, crate::aggregate::Update::Vaa(vaa_bytes)).await
{
tracing::error!(error = ?e, "Failed to store VAA in aggregate store.");
}
}