From 49b801ad9ac57c2339ba98fd395d8b58ddd7812d Mon Sep 17 00:00:00 2001 From: Mike Rolish Date: Tue, 10 Jun 2025 17:30:52 -0500 Subject: [PATCH] Further fixes: backoff reset, use broadcast::channel, default config, drain pending updates --- pyth-lazer-agent/src/lazer_publisher.rs | 38 +++++++------- pyth-lazer-agent/src/main.rs | 2 +- pyth-lazer-agent/src/relayer_session.rs | 66 +++++++++++++------------ 3 files changed, 55 insertions(+), 51 deletions(-) diff --git a/pyth-lazer-agent/src/lazer_publisher.rs b/pyth-lazer-agent/src/lazer_publisher.rs index 3173421bad..1e95145feb 100644 --- a/pyth-lazer-agent/src/lazer_publisher.rs +++ b/pyth-lazer-agent/src/lazer_publisher.rs @@ -1,5 +1,5 @@ use crate::config::{CHANNEL_CAPACITY, Config}; -use crate::relayer_session::RelayerSender; +use crate::relayer_session::RelayerSessionTask; use anyhow::{Context, Result, bail}; use ed25519_dalek::{Signer, SigningKey}; use protobuf::well_known_types::timestamp::Timestamp; @@ -11,6 +11,7 @@ use pyth_lazer_publisher_sdk::transaction::{ Ed25519SignatureData, LazerTransaction, SignatureData, SignedLazerTransaction, }; use solana_keypair::read_keypair_file; +use tokio::sync::broadcast; use tokio::{ select, sync::mpsc::{self, Receiver, Sender}, @@ -25,20 +26,22 @@ pub struct LazerPublisher { impl LazerPublisher { pub async fn new(config: &Config) -> Self { - let relayer_senders = futures::future::join_all( - config - .relayer_urls - .iter() - .map(async |url| RelayerSender::new(url, &config.authorization_token).await), - ) - .await; + let (relayer_sender, _) = broadcast::channel(CHANNEL_CAPACITY); + for url in config.relayer_urls.iter() { + let mut task = RelayerSessionTask { + url: url.clone(), + token: config.authorization_token.to_owned(), + receiver: relayer_sender.subscribe(), + }; + tokio::spawn(async move { task.run().await }); + } let (sender, receiver) = mpsc::channel(CHANNEL_CAPACITY); let mut task = LazerPublisherTask { config: config.clone(), receiver, pending_updates: Vec::new(), - relayer_senders, + relayer_sender, }; tokio::spawn(async move { task.run().await }); Self { sender } @@ -55,7 +58,7 @@ struct LazerPublisherTask { config: Config, receiver: Receiver, pending_updates: Vec, - relayer_senders: Vec, + relayer_sender: broadcast::Sender, } impl LazerPublisherTask { @@ -108,7 +111,7 @@ impl LazerPublisherTask { } let publisher_update = PublisherUpdate { - updates: self.pending_updates.clone(), + updates: self.pending_updates.drain(..).collect(), publisher_timestamp: MessageField::some(Timestamp::now()), special_fields: Default::default(), }; @@ -137,14 +140,13 @@ impl LazerPublisherTask { payload: Some(buf), special_fields: Default::default(), }; - futures::future::join_all( - self.relayer_senders - .iter_mut() - .map(|relayer_sender| relayer_sender.sender.send(signed_lazer_transaction.clone())), - ) - .await; + match self.relayer_sender.send(signed_lazer_transaction.clone()) { + Ok(_) => (), + Err(e) => { + tracing::error!("Error sending transaction to relayer receivers: {e}"); + } + } - self.pending_updates.clear(); Ok(()) } } diff --git a/pyth-lazer-agent/src/main.rs b/pyth-lazer-agent/src/main.rs index 41ee36a92c..a6082b747c 100644 --- a/pyth-lazer-agent/src/main.rs +++ b/pyth-lazer-agent/src/main.rs @@ -16,7 +16,7 @@ mod websocket_utils; #[derive(Parser)] #[command(version)] struct Cli { - #[clap(short, long, default_value = "config.toml")] + #[clap(short, long, default_value = "config/config.toml")] config: String, } diff --git a/pyth-lazer-agent/src/relayer_session.rs b/pyth-lazer-agent/src/relayer_session.rs index 29b28bf6df..1b1339442f 100644 --- a/pyth-lazer-agent/src/relayer_session.rs +++ b/pyth-lazer-agent/src/relayer_session.rs @@ -1,4 +1,3 @@ -use crate::config::CHANNEL_CAPACITY; use anyhow::{Result, bail}; use backoff::ExponentialBackoffBuilder; use backoff::backoff::Backoff; @@ -7,12 +6,10 @@ use futures_util::{SinkExt, StreamExt}; use http::HeaderValue; use protobuf::Message; use pyth_lazer_publisher_sdk::transaction::SignedLazerTransaction; -use std::time::Duration; +use std::time::{Duration, Instant}; use tokio::net::TcpStream; -use tokio::{ - select, - sync::mpsc::{self, Receiver, Sender}, -}; +use tokio::select; +use tokio::sync::broadcast; use tokio_tungstenite::tungstenite::client::IntoClientRequest; use tokio_tungstenite::{ MaybeTlsStream, WebSocketStream, connect_async_with_config, @@ -20,23 +17,6 @@ use tokio_tungstenite::{ }; use url::Url; -pub struct RelayerSender { - pub(crate) sender: Sender, -} - -impl RelayerSender { - pub async fn new(url: &Url, token: &str) -> Self { - let (sender, receiver) = mpsc::channel(CHANNEL_CAPACITY); - let mut task = RelayerSessionTask { - url: url.clone(), - token: token.to_owned(), - receiver, - }; - tokio::spawn(async move { task.run().await }); - Self { sender } - } -} - type RelayerWsSender = SplitSink>, TungsteniteMessage>; type RelayerWsReceiver = SplitStream>>; @@ -78,11 +58,11 @@ impl RelayerWsSession { } } -struct RelayerSessionTask { +pub struct RelayerSessionTask { // connection state - url: Url, - token: String, - receiver: Receiver, + pub url: Url, + pub token: String, + pub receiver: broadcast::Receiver, } impl RelayerSessionTask { @@ -95,6 +75,8 @@ impl RelayerSessionTask { .with_max_elapsed_time(None) .build(); + const FAILURE_RESET_TIME: Duration = Duration::from_secs(300); + let mut first_failure_time = Instant::now(); let mut failure_count = 0; loop { @@ -104,6 +86,12 @@ impl RelayerSessionTask { return; } Err(e) => { + if first_failure_time.elapsed() > FAILURE_RESET_TIME { + failure_count = 0; + first_failure_time = Instant::now(); + backoff.reset(); + } + failure_count += 1; let next_backoff = backoff.next_backoff().unwrap_or(max_interval); tracing::error!( @@ -129,11 +117,25 @@ impl RelayerSessionTask { loop { select! { - Some(transaction) = self.receiver.recv() => { - if let Err(e) = relayer_ws_session.send_transaction(transaction).await - { - tracing::error!("Error publishing transaction to Lazer relayer: {e:?}"); - bail!("Failed to publish transaction to Lazer relayer: {e:?}"); + recv_result = self.receiver.recv() => { + match recv_result { + Ok(transaction) => { + if let Err(e) = relayer_ws_session.send_transaction(transaction).await { + tracing::error!("Error publishing transaction to Lazer relayer: {e:?}"); + bail!("Failed to publish transaction to Lazer relayer: {e:?}"); + } + }, + Err(e) => { + match e { + broadcast::error::RecvError::Closed => { + tracing::error!("transaction broadcast channel closed"); + bail!("transaction broadcast channel closed"); + } + broadcast::error::RecvError::Lagged(skipped_count) => { + tracing::warn!("transaction broadcast channel lagged by {skipped_count} messages"); + } + } + } } } // Handle messages from the relayers, such as errors if we send a bad update