@@ -223,11 +223,8 @@ async fn main() -> Result<(), ErrBox> {
223223 }
224224 Action :: Attest {
225225 ref attestation_cfg,
226- n_retries,
227- retry_interval_secs,
228226 confirmation_timeout_secs,
229227 metrics_bind_addr,
230- daemon,
231228 } => {
232229 // Load the attestation config yaml
233230 let attestation_cfg: AttestationConfig =
@@ -249,26 +246,7 @@ async fn main() -> Result<(), ErrBox> {
249246 Duration :: from_millis ( attestation_cfg. min_rpc_interval_ms ) ,
250247 ) ) ;
251248
252- if daemon {
253- handle_attest_daemon_mode (
254- rpc_cfg,
255- payer,
256- p2w_addr,
257- attestation_cfg,
258- metrics_bind_addr,
259- )
260- . await ?;
261- } else {
262- handle_attest_non_daemon_mode (
263- attestation_cfg,
264- rpc_cfg,
265- p2w_addr,
266- payer,
267- n_retries,
268- Duration :: from_secs ( retry_interval_secs) ,
269- )
270- . await ?;
271- }
249+ handle_attest ( rpc_cfg, payer, p2w_addr, attestation_cfg, metrics_bind_addr) . await ?;
272250 }
273251 Action :: GetEmitter => unreachable ! { } , // It is handled early in this function.
274252 Action :: SetIsActive {
@@ -296,7 +274,7 @@ async fn main() -> Result<(), ErrBox> {
296274}
297275
298276/// Continuously send batch attestations for symbols of an attestation config.
299- async fn handle_attest_daemon_mode (
277+ async fn handle_attest (
300278 rpc_cfg : Arc < RLMutex < RpcCfg > > ,
301279 payer : Keypair ,
302280 p2w_addr : Pubkey ,
@@ -463,76 +441,6 @@ async fn lock_and_make_rpc(rlmtx: &RLMutex<RpcCfg>) -> RpcClient {
463441 RpcClient :: new_with_timeout_and_commitment ( url, timeout, commitment)
464442}
465443
466- /// Non-daemon attestation scheduling
467- async fn handle_attest_non_daemon_mode (
468- attestation_cfg : AttestationConfig ,
469- rpc_cfg : Arc < RLMutex < RpcCfg > > ,
470- p2w_addr : Pubkey ,
471- payer : Keypair ,
472- n_retries : usize ,
473- retry_interval : Duration ,
474- ) -> Result < ( ) , ErrBox > {
475- let p2w_cfg = get_config_account ( & lock_and_make_rpc ( & rpc_cfg) . await , & p2w_addr) . await ?;
476-
477- let batch_config =
478- attestation_config_to_batches ( & rpc_cfg, & attestation_cfg, p2w_cfg. max_batch_size as usize )
479- . await
480- . unwrap_or_else ( |_| {
481- attestation_cfg. instantiate_batches ( & [ ] , p2w_cfg. max_batch_size as usize )
482- } ) ;
483-
484- let batches: Vec < _ > = batch_config
485- . into_iter ( )
486- . map ( |x| BatchState :: new ( & x) )
487- . collect ( ) ;
488- let batch_count = batches. len ( ) ;
489-
490- // For enforcing min_msg_reuse_interval_ms, we keep a piece of
491- // state that creates or reuses accounts if enough time had
492- // passed
493- let message_q_mtx = Arc :: new ( Mutex :: new ( P2WMessageQueue :: new (
494- Duration :: from_millis ( attestation_cfg. min_msg_reuse_interval_ms ) ,
495- attestation_cfg. max_msg_accounts as usize ,
496- ) ) ) ;
497-
498- let retry_jobs = batches. into_iter ( ) . enumerate ( ) . map ( |( idx, batch_state) | {
499- attestation_retry_job ( AttestationRetryJobArgs {
500- batch_no : idx + 1 ,
501- batch_count,
502- group_name : batch_state. group_name ,
503- symbols : batch_state. symbols ,
504- n_retries,
505- retry_interval,
506- rpc_cfg : rpc_cfg. clone ( ) ,
507- p2w_addr,
508- p2w_config : p2w_cfg. clone ( ) ,
509- payer : Keypair :: from_bytes ( & payer. to_bytes ( ) ) . unwrap ( ) ,
510- message_q_mtx : message_q_mtx. clone ( ) ,
511- } )
512- } ) ;
513-
514- let results = futures:: future:: join_all ( retry_jobs) . await ;
515-
516- // After completing, we count any errors coming from the sched
517- // futs.
518- let errors: Vec < _ > = results
519- . iter ( )
520- . enumerate ( )
521- . filter_map ( |( idx, r) | {
522- r. as_ref ( )
523- . err ( )
524- . map ( |e| format ! ( "Error {}: {:?}\n " , idx + 1 , e) )
525- } )
526- . collect ( ) ;
527-
528- if !errors. is_empty ( ) {
529- let err_lines = errors. join ( "\n " ) ;
530- let msg = format ! ( "{} batches failed:\n {}" , errors. len( ) , err_lines) ;
531- error ! ( "{}" , msg) ;
532- return Err ( msg. into ( ) ) ;
533- }
534- Ok ( ( ) )
535- }
536444
537445/// Generate batches to attest by retrieving the on-chain product account data and grouping it
538446/// according to the configuration in `attestation_cfg`.
@@ -692,69 +600,6 @@ async fn attestation_sched_job(args: AttestationSchedJobArgs) -> Result<(), ErrB
692600 }
693601}
694602
695- pub struct AttestationRetryJobArgs {
696- pub batch_no : usize ,
697- pub batch_count : usize ,
698- pub group_name : String ,
699- pub symbols : Vec < P2WSymbol > ,
700- pub n_retries : usize ,
701- pub retry_interval : Duration ,
702- pub rpc_cfg : Arc < RLMutex < RpcCfg > > ,
703- pub p2w_addr : Pubkey ,
704- pub p2w_config : Pyth2WormholeConfig ,
705- pub payer : Keypair ,
706- pub message_q_mtx : Arc < Mutex < P2WMessageQueue > > ,
707- }
708-
709- /// A future that cranks a batch up to n_retries times, pausing for
710- /// retry_interval in between; Used exclusively in non-daemon mode
711- async fn attestation_retry_job ( args : AttestationRetryJobArgs ) -> Result < ( ) , ErrBoxSend > {
712- let AttestationRetryJobArgs {
713- batch_no,
714- batch_count,
715- group_name,
716- symbols,
717- n_retries,
718- retry_interval,
719- rpc_cfg,
720- p2w_addr,
721- p2w_config,
722- payer,
723- message_q_mtx,
724- } = args;
725-
726- let mut res = Err (
727- "attestation_retry_job INTERNAL: Could not get a single attestation job result"
728- . to_string ( )
729- . into ( ) ,
730- ) ;
731-
732- for _i in 0 ..=n_retries {
733- res = attestation_job ( AttestationJobArgs {
734- rlmtx : rpc_cfg. clone ( ) ,
735- batch_no,
736- batch_count,
737- group_name : group_name. clone ( ) ,
738- p2w_addr,
739- config : p2w_config. clone ( ) ,
740- payer : Keypair :: from_bytes ( & payer. to_bytes ( ) ) . unwrap ( ) , // Keypair has no clone
741- symbols : symbols. clone ( ) ,
742- max_jobs_sema : Arc :: new ( Semaphore :: new ( 1 ) ) , // Not important for non-daemon mode
743- message_q_mtx : message_q_mtx. clone ( ) ,
744- } )
745- . await ;
746-
747- // Finish early on success
748- if res. is_ok ( ) {
749- break ;
750- }
751-
752- tokio:: time:: sleep ( retry_interval) . await ;
753- }
754-
755- res
756- }
757-
758603/// Arguments for attestation_job(). This struct rules out same-type
759604/// ordering errors due to the large argument count
760605pub struct AttestationJobArgs {
0 commit comments