1111 pubsub_client:: PubsubClientError ,
1212 rpc_config:: { RpcAccountInfoConfig , RpcProgramAccountsConfig } ,
1313 rpc_filter:: { Memcmp , RpcFilterType } ,
14+ rpc_response:: { Response , RpcKeyedAccount } ,
1415 } ,
1516 solana_sdk:: pubkey:: Pubkey ,
1617 std:: { fs, str:: FromStr , time:: Duration } ,
@@ -40,6 +41,62 @@ fn find_message_pda(wormhole_pid: &Pubkey, slot: u64) -> Pubkey {
4041 . 0
4142}
4243
44+ const FAILED_TO_DECODE : & str = "Failed to decode account data" ;
45+ const INVALID_UNRELIABLE_DATA_FORMAT : & str = "Invalid unreliable data format" ;
46+
47+ #[ derive( Debug ) ]
48+ enum VerifyUpdateError {
49+ InvalidMessagePDA ,
50+ InvalidEmitterChain ,
51+ InvalidAccumulatorAddress ,
52+ #[ allow( dead_code) ]
53+ DecodingError ( String ) ,
54+ }
55+
56+ fn decode_and_verify_update (
57+ wormhole_pid : & Pubkey ,
58+ accumulator_address : & Pubkey ,
59+ update : Response < RpcKeyedAccount > ,
60+ ) -> Result < PostedMessageUnreliableData , VerifyUpdateError > {
61+ if find_message_pda ( wormhole_pid, update. context . slot ) . to_string ( ) != update. value . pubkey {
62+ return Err ( VerifyUpdateError :: InvalidMessagePDA ) ;
63+ }
64+ let data = update
65+ . value
66+ . account
67+ . data
68+ . decode ( )
69+ . ok_or ( VerifyUpdateError :: DecodingError (
70+ FAILED_TO_DECODE . to_string ( ) ,
71+ ) ) ?;
72+ let unreliable_data: PostedMessageUnreliableData =
73+ BorshDeserialize :: deserialize ( & mut data. as_slice ( ) ) . map_err ( |e| {
74+ VerifyUpdateError :: DecodingError ( format ! ( "{}: {}" , INVALID_UNRELIABLE_DATA_FORMAT , e) )
75+ } ) ?;
76+
77+ if Chain :: Pythnet != unreliable_data. emitter_chain . into ( ) {
78+ return Err ( VerifyUpdateError :: InvalidEmitterChain ) ;
79+ }
80+
81+ if accumulator_address != & Pubkey :: from ( unreliable_data. emitter_address ) {
82+ return Err ( VerifyUpdateError :: InvalidAccumulatorAddress ) ;
83+ }
84+
85+ Ok ( unreliable_data)
86+ }
87+
88+ fn new_body ( unreliable_data : & PostedMessageUnreliableData ) -> Body < & RawMessage > {
89+ Body {
90+ timestamp : unreliable_data. submission_time ,
91+ nonce : unreliable_data. nonce ,
92+ emitter_chain : unreliable_data. emitter_chain . into ( ) ,
93+ emitter_address : Address ( unreliable_data. emitter_address ) ,
94+ sequence : unreliable_data. sequence ,
95+ consistency_level : unreliable_data. consistency_level ,
96+ payload : RawMessage :: new ( unreliable_data. payload . as_slice ( ) ) ,
97+ }
98+ }
99+
43100async fn run_listener ( input : RunListenerInput ) -> Result < ( ) , PubsubClientError > {
44101 let client = PubsubClient :: new ( input. ws_url . as_str ( ) ) . await ?;
45102 let ( mut stream, unsubscribe) = client
@@ -63,49 +120,22 @@ async fn run_listener(input: RunListenerInput) -> Result<(), PubsubClientError>
63120 . await ?;
64121
65122 while let Some ( update) = stream. next ( ) . await {
66- if find_message_pda ( & input. wormhole_pid , update. context . slot ) . to_string ( )
67- != update. value . pubkey
68- {
69- continue ; // Skip updates that are not for the expected PDA
70- }
71-
72- let unreliable_data: PostedMessageUnreliableData = {
73- let data = match update. value . account . data . decode ( ) {
74- Some ( data) => data,
75- None => {
76- tracing:: error!( "Failed to decode account data" ) ;
77- continue ;
78- }
79- } ;
80-
81- match BorshDeserialize :: deserialize ( & mut data. as_slice ( ) ) {
123+ let unreliable_data =
124+ match decode_and_verify_update ( & input. wormhole_pid , & input. accumulator_address , update)
125+ {
82126 Ok ( data) => data,
83127 Err ( e) => {
84- tracing:: error!( error = ?e, "Invalid unreliable data format" ) ;
128+ if !matches ! ( e, VerifyUpdateError :: InvalidMessagePDA ) {
129+ tracing:: error!( error = ?e, "Received an invalid update" ) ;
130+ }
85131 continue ;
86132 }
87- }
88- } ;
89-
90- if Chain :: Pythnet != unreliable_data. emitter_chain . into ( ) {
91- continue ;
92- }
93- if input. accumulator_address != Pubkey :: from ( unreliable_data. emitter_address ) {
94- continue ;
95- }
133+ } ;
96134
97135 tokio:: spawn ( {
98136 let api_client = input. api_client . clone ( ) ;
99137 async move {
100- let body = Body {
101- timestamp : unreliable_data. submission_time ,
102- nonce : unreliable_data. nonce ,
103- emitter_chain : unreliable_data. emitter_chain . into ( ) ,
104- emitter_address : Address ( unreliable_data. emitter_address ) ,
105- sequence : unreliable_data. sequence ,
106- consistency_level : unreliable_data. consistency_level ,
107- payload : RawMessage :: new ( unreliable_data. payload . as_slice ( ) ) ,
108- } ;
138+ let body = new_body ( & unreliable_data) ;
109139 match Observation :: try_new ( body. clone ( ) , input. secret_key ) {
110140 Ok ( observation) => {
111141 if let Err ( e) = api_client. post_observation ( observation) . await {
@@ -171,3 +201,202 @@ async fn main() {
171201 }
172202 }
173203}
204+
205+ #[ cfg( test) ]
206+ mod tests {
207+ use base64:: Engine ;
208+ use borsh:: BorshSerialize ;
209+ use solana_account_decoder:: { UiAccount , UiAccountData } ;
210+
211+ use super :: * ;
212+ use crate :: posted_message:: MessageData ;
213+
214+ fn get_wormhole_pid ( ) -> Pubkey {
215+ Pubkey :: from_str ( "H3fxXJ86ADW2PNuDDmZJg6mzTtPxkYCpNuQUTgmJ7AjU" ) . unwrap ( )
216+ }
217+
218+ fn get_accumulator_address ( ) -> Pubkey {
219+ Pubkey :: from_str ( "G9LV2mp9ua1znRAfYwZz5cPiJMAbo1T6mbjdQsDZuMJg" ) . unwrap ( )
220+ }
221+
222+ fn get_payload ( ) -> Vec < u8 > {
223+ vec ! [
224+ 65 , 85 , 87 , 86 , 0 , 0 , 0 , 0 , 0 , 13 , 74 , 15 , 90 , 0 , 0 , 39 , 16 , 172 , 145 , 156 , 108 , 253 ,
225+ 178 , 4 , 138 , 51 , 74 , 110 , 116 , 101 , 139 , 121 , 254 , 152 , 165 , 24 , 190 ,
226+ ]
227+ }
228+
229+ fn get_unreliable_data ( ) -> PostedMessageUnreliableData {
230+ PostedMessageUnreliableData {
231+ message : MessageData {
232+ submission_time : 1749732585 ,
233+ nonce : 0 ,
234+ emitter_chain : Chain :: Pythnet . into ( ) ,
235+ emitter_address : [
236+ 225 , 1 , 250 , 237 , 172 , 88 , 81 , 227 , 43 , 155 , 35 , 181 , 249 , 65 , 26 , 140 , 43 ,
237+ 172 , 74 , 174 , 62 , 212 , 221 , 123 , 129 , 29 , 209 , 167 , 46 , 164 , 170 , 113 ,
238+ ] ,
239+ sequence : 138184361 ,
240+ consistency_level : 1 ,
241+ payload : get_payload ( ) ,
242+ vaa_version : 1 ,
243+ vaa_time : 0 ,
244+ vaa_signature_account : [ 0 ; 32 ] ,
245+ } ,
246+ }
247+ }
248+
249+ fn get_update ( unreliable_data : PostedMessageUnreliableData ) -> Response < RpcKeyedAccount > {
250+ let message = unreliable_data. try_to_vec ( ) . unwrap ( ) ;
251+ let message = base64:: engine:: general_purpose:: STANDARD . encode ( & message) ;
252+ Response {
253+ context : solana_client:: rpc_response:: RpcResponseContext {
254+ slot : 123456 ,
255+ api_version : None ,
256+ } ,
257+ value : RpcKeyedAccount {
258+ pubkey : find_message_pda ( & get_wormhole_pid ( ) , 123456 ) . to_string ( ) ,
259+ account : UiAccount {
260+ lamports : 0 ,
261+ data : UiAccountData :: Binary ( message, UiAccountEncoding :: Base64 ) ,
262+ owner : get_accumulator_address ( ) . to_string ( ) ,
263+ executable : false ,
264+ rent_epoch : 0 ,
265+ space : None ,
266+ } ,
267+ } ,
268+ }
269+ }
270+
271+ #[ test]
272+ fn test_find_message_pda ( ) {
273+ assert_eq ! (
274+ find_message_pda( & get_wormhole_pid( ) , 123456 ) . to_string( ) ,
275+ "Ed9gRoBySmUjSVFxovuhTk6AcFkv9uE8EovvshtHWLNT"
276+ ) ;
277+ }
278+
279+ #[ test]
280+ fn test_get_body ( ) {
281+ let unreliable_data = get_unreliable_data ( ) ;
282+ let body = new_body ( & unreliable_data) ;
283+ assert_eq ! ( body. timestamp, unreliable_data. submission_time) ;
284+ assert_eq ! ( body. nonce, unreliable_data. nonce) ;
285+ assert_eq ! ( body. emitter_chain, Chain :: Pythnet ) ;
286+ assert_eq ! (
287+ body. emitter_address,
288+ Address ( unreliable_data. emitter_address)
289+ ) ;
290+ assert_eq ! ( body. sequence, unreliable_data. sequence) ;
291+ assert_eq ! ( body. payload, RawMessage :: new( get_payload( ) . as_slice( ) ) ) ;
292+ }
293+
294+ #[ test]
295+ fn test_decode_and_verify_update ( ) {
296+ let expected_unreliable_data = get_unreliable_data ( ) ;
297+ let update = get_update ( expected_unreliable_data. clone ( ) ) ;
298+ let result =
299+ decode_and_verify_update ( & get_wormhole_pid ( ) , & get_accumulator_address ( ) , update) ;
300+
301+ assert ! ( result. is_ok( ) ) ;
302+ let unreliable_data = result. unwrap ( ) ;
303+
304+ assert_eq ! (
305+ expected_unreliable_data. consistency_level,
306+ unreliable_data. consistency_level
307+ ) ;
308+ assert_eq ! (
309+ expected_unreliable_data. emitter_chain,
310+ unreliable_data. emitter_chain
311+ ) ;
312+ assert_eq ! (
313+ expected_unreliable_data. emitter_address,
314+ unreliable_data. emitter_address
315+ ) ;
316+ assert_eq ! ( expected_unreliable_data. sequence, unreliable_data. sequence) ;
317+ assert_eq ! (
318+ expected_unreliable_data. submission_time,
319+ unreliable_data. submission_time
320+ ) ;
321+ assert_eq ! ( expected_unreliable_data. nonce, unreliable_data. nonce) ;
322+ assert_eq ! ( expected_unreliable_data. payload, unreliable_data. payload) ;
323+ assert_eq ! (
324+ expected_unreliable_data. vaa_version,
325+ unreliable_data. vaa_version
326+ ) ;
327+ assert_eq ! ( expected_unreliable_data. vaa_time, unreliable_data. vaa_time) ;
328+ assert_eq ! (
329+ expected_unreliable_data. vaa_signature_account,
330+ unreliable_data. vaa_signature_account
331+ ) ;
332+ }
333+
334+ #[ test]
335+ fn test_decode_and_verify_update_invalid_pda ( ) {
336+ let mut update = get_update ( get_unreliable_data ( ) ) ;
337+ update. context . slot += 1 ;
338+ let result =
339+ decode_and_verify_update ( & get_wormhole_pid ( ) , & get_accumulator_address ( ) , update) ;
340+ assert ! ( matches!( result, Err ( VerifyUpdateError :: InvalidMessagePDA ) ) ) ;
341+ }
342+
343+ #[ test]
344+ fn test_decode_and_verify_update_failed_decode ( ) {
345+ let mut update = get_update ( get_unreliable_data ( ) ) ;
346+ update. value . account . data =
347+ UiAccountData :: Binary ( "invalid_base64" . to_string ( ) , UiAccountEncoding :: Base64 ) ;
348+ let result =
349+ decode_and_verify_update ( & get_wormhole_pid ( ) , & get_accumulator_address ( ) , update) ;
350+ assert ! (
351+ matches!( result, Err ( VerifyUpdateError :: DecodingError ( ref msg) ) if msg == FAILED_TO_DECODE ) ,
352+ ) ;
353+ }
354+
355+ #[ test]
356+ fn test_decode_and_verify_update_invalid_unreliable_data ( ) {
357+ let mut update = get_update ( get_unreliable_data ( ) ) ;
358+ let message = base64:: engine:: general_purpose:: STANDARD . encode ( vec ! [ 4 , 1 , 2 , 3 , 4 ] ) ;
359+ update. value . account . data = UiAccountData :: Binary ( message, UiAccountEncoding :: Base64 ) ;
360+ let result =
361+ decode_and_verify_update ( & get_wormhole_pid ( ) , & get_accumulator_address ( ) , update) ;
362+ let error_message = format ! (
363+ "{}: {}" ,
364+ INVALID_UNRELIABLE_DATA_FORMAT ,
365+ "Magic mismatch. Expected [109, 115, 117] but got [4, 1, 2]"
366+ ) ;
367+ assert ! (
368+ matches!( result, Err ( VerifyUpdateError :: DecodingError ( ref msg) )
369+ if * msg == error_message)
370+ ) ;
371+ }
372+
373+ #[ test]
374+ fn test_decode_and_verify_update_invalid_emitter_chain ( ) {
375+ let mut unreliable_data = get_unreliable_data ( ) ;
376+ unreliable_data. emitter_chain = Chain :: Solana . into ( ) ;
377+ let result = decode_and_verify_update (
378+ & get_wormhole_pid ( ) ,
379+ & get_accumulator_address ( ) ,
380+ get_update ( unreliable_data) ,
381+ ) ;
382+ assert ! ( matches!(
383+ result,
384+ Err ( VerifyUpdateError :: InvalidEmitterChain )
385+ ) ) ;
386+ }
387+
388+ #[ test]
389+ fn test_decode_and_verify_update_invalid_emitter_address ( ) {
390+ let mut unreliable_data = get_unreliable_data ( ) ;
391+ unreliable_data. emitter_address = Pubkey :: new_unique ( ) . to_bytes ( ) ;
392+ let result = decode_and_verify_update (
393+ & get_wormhole_pid ( ) ,
394+ & get_accumulator_address ( ) ,
395+ get_update ( unreliable_data) ,
396+ ) ;
397+ assert ! ( matches!(
398+ result,
399+ Err ( VerifyUpdateError :: InvalidAccumulatorAddress )
400+ ) ) ;
401+ }
402+ }
0 commit comments