1111 pubsub_client:: PubsubClientError ,
1212 rpc_config:: { RpcAccountInfoConfig , RpcProgramAccountsConfig } ,
1313 rpc_filter:: { Memcmp , RpcFilterType } ,
14+ rpc_response:: { Response , RpcKeyedAccount } ,
1415 } ,
1516 solana_sdk:: pubkey:: Pubkey ,
1617 std:: { fs, str:: FromStr , time:: Duration } ,
@@ -40,6 +41,68 @@ fn find_message_pda(wormhole_pid: &Pubkey, slot: u64) -> Pubkey {
4041 . 0
4142}
4243
44+ const FAILED_TO_DECODE : & str = "Failed to decode account data" ;
45+ const INVALID_UNRELIABLE_DATA_FORMAT : & str = "Invalid unreliable data format" ;
46+ const INVALID_PDA_MESSAGE : & str = "Invalid PDA message" ;
47+ const INVALID_EMITTER_CHAIN : & str = "Invalid emitter chain" ;
48+ const INVALID_ACCUMULATOR_ADDRESS : & str = "Invalid accumulator address" ;
49+
50+ fn decode_and_verify_update (
51+ wormhole_pid : & Pubkey ,
52+ accumulator_address : & Pubkey ,
53+ update : Response < RpcKeyedAccount > ,
54+ ) -> anyhow:: Result < PostedMessageUnreliableData > {
55+ if find_message_pda ( wormhole_pid, update. context . slot ) . to_string ( ) != update. value . pubkey {
56+ return Err ( anyhow:: anyhow!( INVALID_PDA_MESSAGE ) ) ;
57+ }
58+ let data = update. value . account . data . decode ( ) . ok_or_else ( || {
59+ tracing:: error!(
60+ data = ?update. value. account. data,
61+ "Failed to decode account data" ,
62+ ) ;
63+ anyhow:: anyhow!( FAILED_TO_DECODE )
64+ } ) ?;
65+ let unreliable_data: PostedMessageUnreliableData =
66+ BorshDeserialize :: deserialize ( & mut data. as_slice ( ) ) . map_err ( |e| {
67+ tracing:: error!(
68+ data = ?data,
69+ error = ?e,
70+ "Failed to decode unreliable data" ,
71+ ) ;
72+ anyhow:: anyhow!( format!( "{}: {}" , INVALID_UNRELIABLE_DATA_FORMAT , e) )
73+ } ) ?;
74+
75+ if Chain :: Pythnet != unreliable_data. emitter_chain . into ( ) {
76+ tracing:: error!(
77+ emitter_chain = unreliable_data. emitter_chain,
78+ "Invalid emitter chain"
79+ ) ;
80+ return Err ( anyhow:: anyhow!( INVALID_EMITTER_CHAIN ) ) ;
81+ }
82+
83+ if accumulator_address != & Pubkey :: from ( unreliable_data. emitter_address ) {
84+ tracing:: error!(
85+ emitter_address = ?unreliable_data. emitter_address,
86+ "Invalid accumulator address"
87+ ) ;
88+ return Err ( anyhow:: anyhow!( INVALID_ACCUMULATOR_ADDRESS ) ) ;
89+ }
90+
91+ Ok ( unreliable_data)
92+ }
93+
94+ fn message_data_to_body ( unreliable_data : & PostedMessageUnreliableData ) -> Body < & RawMessage > {
95+ Body {
96+ timestamp : unreliable_data. submission_time ,
97+ nonce : unreliable_data. nonce ,
98+ emitter_chain : unreliable_data. emitter_chain . into ( ) ,
99+ emitter_address : Address ( unreliable_data. emitter_address ) ,
100+ sequence : unreliable_data. sequence ,
101+ consistency_level : unreliable_data. consistency_level ,
102+ payload : RawMessage :: new ( unreliable_data. payload . as_slice ( ) ) ,
103+ }
104+ }
105+
43106async fn run_listener ( input : RunListenerInput ) -> Result < ( ) , PubsubClientError > {
44107 let client = PubsubClient :: new ( input. ws_url . as_str ( ) ) . await ?;
45108 let ( mut stream, unsubscribe) = client
@@ -63,49 +126,17 @@ async fn run_listener(input: RunListenerInput) -> Result<(), PubsubClientError>
63126 . await ?;
64127
65128 while let Some ( update) = stream. next ( ) . await {
66- if find_message_pda ( & input. wormhole_pid , update. context . slot ) . to_string ( )
67- != update. value . pubkey
68- {
69- continue ; // Skip updates that are not for the expected PDA
70- }
71-
72- let unreliable_data: PostedMessageUnreliableData = {
73- let data = match update. value . account . data . decode ( ) {
74- Some ( data) => data,
75- None => {
76- tracing:: error!( "Failed to decode account data" ) ;
77- continue ;
78- }
79- } ;
80-
81- match BorshDeserialize :: deserialize ( & mut data. as_slice ( ) ) {
129+ let unreliable_data =
130+ match decode_and_verify_update ( & input. wormhole_pid , & input. accumulator_address , update)
131+ {
82132 Ok ( data) => data,
83- Err ( e) => {
84- tracing:: error!( error = ?e, "Invalid unreliable data format" ) ;
85- continue ;
86- }
87- }
88- } ;
89-
90- if Chain :: Pythnet != unreliable_data. emitter_chain . into ( ) {
91- continue ;
92- }
93- if input. accumulator_address != Pubkey :: from ( unreliable_data. emitter_address ) {
94- continue ;
95- }
133+ Err ( _) => continue ,
134+ } ;
96135
97136 tokio:: spawn ( {
98137 let api_client = input. api_client . clone ( ) ;
99138 async move {
100- let body = Body {
101- timestamp : unreliable_data. submission_time ,
102- nonce : unreliable_data. nonce ,
103- emitter_chain : unreliable_data. emitter_chain . into ( ) ,
104- emitter_address : Address ( unreliable_data. emitter_address ) ,
105- sequence : unreliable_data. sequence ,
106- consistency_level : unreliable_data. consistency_level ,
107- payload : RawMessage :: new ( unreliable_data. payload . as_slice ( ) ) ,
108- } ;
139+ let body = message_data_to_body ( & unreliable_data) ;
109140 match Observation :: try_new ( body. clone ( ) , input. secret_key ) {
110141 Ok ( observation) => {
111142 if let Err ( e) = api_client. post_observation ( observation) . await {
@@ -171,3 +202,192 @@ async fn main() {
171202 }
172203 }
173204}
205+
206+ #[ cfg( test) ]
207+ mod tests {
208+ use super :: * ;
209+
210+ use base64:: Engine ;
211+ use borsh:: BorshSerialize ;
212+ use solana_account_decoder:: { UiAccount , UiAccountData } ;
213+
214+ use crate :: posted_message:: MessageData ;
215+
216+ fn get_wormhole_pid ( ) -> Pubkey {
217+ Pubkey :: from_str ( "H3fxXJ86ADW2PNuDDmZJg6mzTtPxkYCpNuQUTgmJ7AjU" ) . unwrap ( )
218+ }
219+
220+ fn get_accumulator_address ( ) -> Pubkey {
221+ Pubkey :: from_str ( "G9LV2mp9ua1znRAfYwZz5cPiJMAbo1T6mbjdQsDZuMJg" ) . unwrap ( )
222+ }
223+
224+ fn get_payload ( ) -> Vec < u8 > {
225+ vec ! [
226+ 65 , 85 , 87 , 86 , 0 , 0 , 0 , 0 , 0 , 13 , 74 , 15 , 90 , 0 , 0 , 39 , 16 , 172 , 145 , 156 , 108 , 253 ,
227+ 178 , 4 , 138 , 51 , 74 , 110 , 116 , 101 , 139 , 121 , 254 , 152 , 165 , 24 , 190 ,
228+ ]
229+ }
230+
231+ fn get_unreliable_data ( ) -> PostedMessageUnreliableData {
232+ PostedMessageUnreliableData {
233+ message : MessageData {
234+ submission_time : 1749732585 ,
235+ nonce : 0 ,
236+ emitter_chain : Chain :: Pythnet . into ( ) ,
237+ emitter_address : [
238+ 225 , 1 , 250 , 237 , 172 , 88 , 81 , 227 , 43 , 155 , 35 , 181 , 249 , 65 , 26 , 140 , 43 ,
239+ 172 , 74 , 174 , 62 , 212 , 221 , 123 , 129 , 29 , 209 , 167 , 46 , 164 , 170 , 113 ,
240+ ] ,
241+ sequence : 138184361 ,
242+ consistency_level : 1 ,
243+ payload : get_payload ( ) ,
244+ vaa_version : 1 ,
245+ vaa_time : 0 ,
246+ vaa_signature_account : [ 0 ; 32 ] ,
247+ } ,
248+ }
249+ }
250+
251+ fn get_update ( unreliable_data : PostedMessageUnreliableData ) -> Response < RpcKeyedAccount > {
252+ let message = unreliable_data. try_to_vec ( ) . unwrap ( ) ;
253+ let message = base64:: engine:: general_purpose:: STANDARD . encode ( & message) ;
254+ Response {
255+ context : solana_client:: rpc_response:: RpcResponseContext {
256+ slot : 123456 ,
257+ api_version : None ,
258+ } ,
259+ value : RpcKeyedAccount {
260+ pubkey : find_message_pda ( & get_wormhole_pid ( ) , 123456 ) . to_string ( ) ,
261+ account : UiAccount {
262+ lamports : 0 ,
263+ data : UiAccountData :: Binary ( message, UiAccountEncoding :: Base64 ) ,
264+ owner : get_accumulator_address ( ) . to_string ( ) ,
265+ executable : false ,
266+ rent_epoch : 0 ,
267+ space : None ,
268+ } ,
269+ } ,
270+ }
271+ }
272+
273+ #[ test]
274+ fn test_find_message_pda ( ) {
275+ assert_eq ! (
276+ find_message_pda( & get_wormhole_pid( ) , 123456 ) . to_string( ) ,
277+ "Ed9gRoBySmUjSVFxovuhTk6AcFkv9uE8EovvshtHWLNT"
278+ ) ;
279+ }
280+
281+ #[ test]
282+ fn test_get_body ( ) {
283+ let unreliable_data = get_unreliable_data ( ) ;
284+ let body = message_data_to_body ( & unreliable_data) ;
285+ assert_eq ! ( body. timestamp, unreliable_data. submission_time) ;
286+ assert_eq ! ( body. nonce, unreliable_data. nonce) ;
287+ assert_eq ! ( body. emitter_chain, Chain :: Pythnet ) ;
288+ assert_eq ! (
289+ body. emitter_address,
290+ Address ( unreliable_data. emitter_address)
291+ ) ;
292+ assert_eq ! ( body. sequence, unreliable_data. sequence) ;
293+ assert_eq ! ( body. payload, RawMessage :: new( get_payload( ) . as_slice( ) ) ) ;
294+ }
295+
296+ #[ test]
297+ fn test_decode_and_verify_update ( ) {
298+ let expected_unreliable_data = get_unreliable_data ( ) ;
299+ let update = get_update ( expected_unreliable_data. clone ( ) ) ;
300+ let result =
301+ decode_and_verify_update ( & get_wormhole_pid ( ) , & get_accumulator_address ( ) , update) ;
302+
303+ assert ! ( result. is_ok( ) ) ;
304+ let unreliable_data = result. unwrap ( ) ;
305+
306+ assert_eq ! (
307+ expected_unreliable_data. consistency_level,
308+ unreliable_data. consistency_level
309+ ) ;
310+ assert_eq ! (
311+ expected_unreliable_data. emitter_chain,
312+ unreliable_data. emitter_chain
313+ ) ;
314+ assert_eq ! (
315+ expected_unreliable_data. emitter_address,
316+ unreliable_data. emitter_address
317+ ) ;
318+ assert_eq ! ( expected_unreliable_data. sequence, unreliable_data. sequence) ;
319+ assert_eq ! (
320+ expected_unreliable_data. submission_time,
321+ unreliable_data. submission_time
322+ ) ;
323+ assert_eq ! ( expected_unreliable_data. nonce, unreliable_data. nonce) ;
324+ assert_eq ! ( expected_unreliable_data. payload, unreliable_data. payload) ;
325+ assert_eq ! (
326+ expected_unreliable_data. vaa_version,
327+ unreliable_data. vaa_version
328+ ) ;
329+ assert_eq ! ( expected_unreliable_data. vaa_time, unreliable_data. vaa_time) ;
330+ assert_eq ! (
331+ expected_unreliable_data. vaa_signature_account,
332+ unreliable_data. vaa_signature_account
333+ ) ;
334+ }
335+
336+ #[ test]
337+ fn test_decode_and_verify_update_invalid_pda ( ) {
338+ let mut update = get_update ( get_unreliable_data ( ) ) ;
339+ update. context . slot += 1 ;
340+ let result =
341+ decode_and_verify_update ( & get_wormhole_pid ( ) , & get_accumulator_address ( ) , update) ;
342+ assert_eq ! ( result. unwrap_err( ) . to_string( ) , INVALID_PDA_MESSAGE ) ;
343+ }
344+
345+ #[ test]
346+ fn test_decode_and_verify_update_failed_decode ( ) {
347+ let mut update = get_update ( get_unreliable_data ( ) ) ;
348+ update. value . account . data =
349+ UiAccountData :: Binary ( "invalid_base64" . to_string ( ) , UiAccountEncoding :: Base64 ) ;
350+ let result =
351+ decode_and_verify_update ( & get_wormhole_pid ( ) , & get_accumulator_address ( ) , update) ;
352+ assert_eq ! ( result. unwrap_err( ) . to_string( ) , FAILED_TO_DECODE ) ;
353+ }
354+
355+ #[ test]
356+ fn test_decode_and_verify_update_invalid_unreliable_data ( ) {
357+ let mut update = get_update ( get_unreliable_data ( ) ) ;
358+ let message = base64:: engine:: general_purpose:: STANDARD . encode ( vec ! [ 4 , 1 , 2 , 3 , 4 ] ) ;
359+ update. value . account . data = UiAccountData :: Binary ( message, UiAccountEncoding :: Base64 ) ;
360+ let result =
361+ decode_and_verify_update ( & get_wormhole_pid ( ) , & get_accumulator_address ( ) , update) ;
362+ let error_message = format ! (
363+ "{}: {}" ,
364+ INVALID_UNRELIABLE_DATA_FORMAT ,
365+ "Magic mismatch. Expected [109, 115, 117] but got [4, 1, 2]"
366+ ) ;
367+ assert_eq ! ( result. unwrap_err( ) . to_string( ) , error_message) ;
368+ }
369+
370+ #[ test]
371+ fn test_decode_and_verify_update_invalid_emitter_chain ( ) {
372+ let mut unreliable_data = get_unreliable_data ( ) ;
373+ unreliable_data. emitter_chain = Chain :: Solana . into ( ) ;
374+ let result = decode_and_verify_update (
375+ & get_wormhole_pid ( ) ,
376+ & get_accumulator_address ( ) ,
377+ get_update ( unreliable_data) ,
378+ ) ;
379+ assert_eq ! ( result. unwrap_err( ) . to_string( ) , INVALID_EMITTER_CHAIN ) ;
380+ }
381+
382+ #[ test]
383+ fn test_decode_and_verify_update_invalid_emitter_address ( ) {
384+ let mut unreliable_data = get_unreliable_data ( ) ;
385+ unreliable_data. emitter_address = Pubkey :: new_unique ( ) . to_bytes ( ) ;
386+ let result = decode_and_verify_update (
387+ & get_wormhole_pid ( ) ,
388+ & get_accumulator_address ( ) ,
389+ get_update ( unreliable_data) ,
390+ ) ;
391+ assert_eq ! ( result. unwrap_err( ) . to_string( ) , INVALID_ACCUMULATOR_ADDRESS ) ;
392+ }
393+ }
0 commit comments