Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 1 addition & 38 deletions hermes/src/network/pythnet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,7 @@ use {
Result,
},
borsh::BorshDeserialize,
byteorder::BE,
futures::stream::StreamExt,
pythnet_sdk::messages::Message,
solana_account_decoder::UiAccountEncoding,
solana_client::{
nonblocking::{
Expand Down Expand Up @@ -157,44 +155,9 @@ pub async fn run(store: Arc<Store>, pythnet_ws_endpoint: String) -> Result<!> {
}
};

// The validators writes the accumulator messages using Borsh with
// the following struct. We cannot directly have messages as Vec<Messages>
// because they are serialized using big-endian byte order and Borsh
// uses little-endian byte order.
#[derive(BorshDeserialize)]
struct RawAccumulatorMessages {
pub magic: [u8; 4],
pub slot: u64,
pub ring_size: u32,
pub raw_messages: Vec<Vec<u8>>,
}

let accumulator_messages = RawAccumulatorMessages::try_from_slice(&account.data);
let accumulator_messages = AccumulatorMessages::try_from_slice(&account.data);
match accumulator_messages {
Ok(accumulator_messages) => {
let messages = accumulator_messages
.raw_messages
.iter()
.map(|message| {
pythnet_sdk::wire::from_slice::<BE, Message>(message.as_slice())
})
.collect::<Result<Vec<Message>, _>>();

let messages = match messages {
Ok(messages) => messages,
Err(err) => {
log::error!("Failed to parse messages: {:?}", err);
continue;
}
};

let accumulator_messages = AccumulatorMessages {
magic: accumulator_messages.magic,
slot: accumulator_messages.slot,
ring_size: accumulator_messages.ring_size,
messages,
};

let (candidate, _) = Pubkey::find_program_address(
&[
b"AccumulatorState",
Expand Down
20 changes: 13 additions & 7 deletions hermes/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,19 @@ use {
anyhow,
Result,
},
byteorder::BigEndian,
pyth_sdk::PriceIdentifier,
pythnet_sdk::{
messages::{
Message,
MessageType,
},
wire::v1::{
WormholeMessage,
WormholePayload,
wire::{
from_slice,
v1::{
WormholeMessage,
WormholePayload,
},
},
},
std::{
Expand Down Expand Up @@ -199,12 +203,14 @@ impl Store {

let message_states = completed_state
.accumulator_messages
.messages
.iter()
.raw_messages
.into_iter()
.enumerate()
.map(|(idx, message)| {
.map(|(idx, raw_message)| {
Ok(MessageState::new(
*message,
from_slice::<BigEndian, _>(raw_message.as_ref())
.map_err(|e| anyhow!("Failed to deserialize message: {:?}", e))?,
raw_message,
ProofSet {
wormhole_merkle_proof: wormhole_merkle_message_states_proofs
.get(idx)
Expand Down
22 changes: 8 additions & 14 deletions hermes/src/store/proof/wormhole_merkle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,26 +76,20 @@ pub fn construct_message_states_proofs(
let accumulator_messages = &completed_accumulator_state.accumulator_messages;
let wormhole_merkle_state = &completed_accumulator_state.wormhole_merkle_state;

let raw_messages = accumulator_messages
.messages
.iter()
.map(|m| {
to_vec::<_, byteorder::BE>(m).map_err(|e| anyhow!("Failed to serialize message: {}", e))
})
.collect::<Result<Vec<Vec<u8>>>>()?;

// Check whether the state is valid
let merkle_acc =
match MerkleTree::<Keccak160>::from_set(raw_messages.iter().map(|m| m.as_ref())) {
Some(merkle_acc) => merkle_acc,
None => return Ok(vec![]), // It only happens when the message set is empty
};
let merkle_acc = match MerkleTree::<Keccak160>::from_set(
accumulator_messages.raw_messages.iter().map(|m| m.as_ref()),
) {
Some(merkle_acc) => merkle_acc,
None => return Ok(vec![]), // It only happens when the message set is empty
};

if merkle_acc.root.as_bytes() != wormhole_merkle_state.root.root {
return Err(anyhow!("Invalid merkle root"));
}

raw_messages
accumulator_messages
.raw_messages
.iter()
.map(|m| {
Ok(WormholeMerkleMessageProof {
Expand Down
36 changes: 20 additions & 16 deletions hermes/src/store/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use {
types::{
AccumulatorMessages,
ProofSet,
RawMessage,
RequestTime,
Slot,
UnixTimestamp,
Expand Down Expand Up @@ -71,6 +72,7 @@ pub struct MessageStateTime {
pub struct MessageState {
pub slot: Slot,
pub message: Message,
pub raw_message: RawMessage,
pub proof_set: ProofSet,
pub received_at: UnixTimestamp,
}
Expand All @@ -92,13 +94,15 @@ impl MessageState {

pub fn new(
message: Message,
raw_message: RawMessage,
proof_set: ProofSet,
slot: Slot,
received_at: UnixTimestamp,
) -> Self {
Self {
slot,
message,
raw_message,
proof_set,
received_at,
}
Expand Down Expand Up @@ -151,20 +155,20 @@ mod test {
wormhole_merkle_state: None,
};

assert!(CompletedAccumulatorState::try_from(accumulator_state.clone()).is_err());
assert!(CompletedAccumulatorState::try_from(accumulator_state).is_err());

let accumulator_state = AccumulatorState {
slot: 1,
accumulator_messages: Some(AccumulatorMessages {
slot: 1,
magic: [0; 4],
ring_size: 10,
messages: vec![],
slot: 1,
magic: [0; 4],
ring_size: 10,
raw_messages: vec![],
}),
wormhole_merkle_state: None,
};

assert!(CompletedAccumulatorState::try_from(accumulator_state.clone()).is_err());
assert!(CompletedAccumulatorState::try_from(accumulator_state).is_err());

let accumulator_state = AccumulatorState {
slot: 1,
Expand All @@ -179,15 +183,15 @@ mod test {
}),
};

assert!(CompletedAccumulatorState::try_from(accumulator_state.clone()).is_err());
assert!(CompletedAccumulatorState::try_from(accumulator_state).is_err());

let accumulator_state = AccumulatorState {
slot: 1,
accumulator_messages: Some(AccumulatorMessages {
slot: 1,
magic: [0; 4],
ring_size: 10,
messages: vec![],
slot: 1,
magic: [0; 4],
ring_size: 10,
raw_messages: vec![],
}),
wormhole_merkle_state: Some(WormholeMerkleState {
vaa: vec![],
Expand All @@ -200,14 +204,14 @@ mod test {
};

assert_eq!(
CompletedAccumulatorState::try_from(accumulator_state.clone()).unwrap(),
CompletedAccumulatorState::try_from(accumulator_state).unwrap(),
CompletedAccumulatorState {
slot: 1,
accumulator_messages: AccumulatorMessages {
slot: 1,
magic: [0; 4],
ring_size: 10,
messages: vec![],
slot: 1,
magic: [0; 4],
ring_size: 10,
raw_messages: vec![],
},
wormhole_merkle_state: WormholeMerkleState {
vaa: vec![],
Expand Down
9 changes: 5 additions & 4 deletions hermes/src/store/storage/local_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,7 @@ mod test {
) -> MessageState {
MessageState {
slot,
raw_message: vec![],
message: Message::PriceFeedMessage(PriceFeedMessage {
feed_id,
publish_time,
Expand Down Expand Up @@ -574,10 +575,10 @@ mod test {
// Change the state to have accumulator messages
// We mutate the existing state because the normal flow is like this.
accumulator_state.accumulator_messages = Some(AccumulatorMessages {
magic: [0; 4],
slot: 10,
ring_size: 3,
messages: vec![],
magic: [0; 4],
slot: 10,
ring_size: 3,
raw_messages: vec![],
});

// Store the accumulator state again.
Expand Down
24 changes: 15 additions & 9 deletions hermes/src/store/types.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
use {
super::proof::wormhole_merkle::WormholeMerkleMessageProof,
pythnet_sdk::messages::{
Message,
PriceFeedMessage,
},
borsh::BorshDeserialize,
pythnet_sdk::messages::PriceFeedMessage,
};

#[derive(Clone, PartialEq, Debug)]
Expand All @@ -20,12 +18,20 @@ pub enum RequestTime {
FirstAfter(UnixTimestamp),
}

#[derive(Clone, PartialEq, Debug)]
pub type RawMessage = Vec<u8>;

/// Accumulator messages coming from Pythnet validators.
///
/// The validators writes the accumulator messages using Borsh with
/// the following struct. We cannot directly have messages as Vec<Messages>
/// because they are serialized using big-endian byte order and Borsh
/// uses little-endian byte order.
#[derive(Clone, PartialEq, Debug, BorshDeserialize)]
pub struct AccumulatorMessages {
pub magic: [u8; 4],
pub slot: Slot,
pub ring_size: u32,
pub messages: Vec<Message>,
pub magic: [u8; 4],
pub slot: u64,
pub ring_size: u32,
pub raw_messages: Vec<RawMessage>,
}

impl AccumulatorMessages {
Expand Down