Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 20 additions & 18 deletions pyth-lazer-agent/src/lazer_publisher.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::config::{CHANNEL_CAPACITY, Config};
use crate::relayer_session::RelayerSender;
use crate::relayer_session::RelayerSessionTask;
use anyhow::{Context, Result, bail};
use ed25519_dalek::{Signer, SigningKey};
use protobuf::well_known_types::timestamp::Timestamp;
Expand All @@ -11,6 +11,7 @@ use pyth_lazer_publisher_sdk::transaction::{
Ed25519SignatureData, LazerTransaction, SignatureData, SignedLazerTransaction,
};
use solana_keypair::read_keypair_file;
use tokio::sync::broadcast;
use tokio::{
select,
sync::mpsc::{self, Receiver, Sender},
Expand All @@ -25,20 +26,22 @@ pub struct LazerPublisher {

impl LazerPublisher {
pub async fn new(config: &Config) -> Self {
let relayer_senders = futures::future::join_all(
config
.relayer_urls
.iter()
.map(async |url| RelayerSender::new(url, &config.authorization_token).await),
)
.await;
let (relayer_sender, _) = broadcast::channel(CHANNEL_CAPACITY);
for url in config.relayer_urls.iter() {
let mut task = RelayerSessionTask {
url: url.clone(),
token: config.authorization_token.to_owned(),
receiver: relayer_sender.subscribe(),
};
tokio::spawn(async move { task.run().await });
}

let (sender, receiver) = mpsc::channel(CHANNEL_CAPACITY);
let mut task = LazerPublisherTask {
config: config.clone(),
receiver,
pending_updates: Vec::new(),
relayer_senders,
relayer_sender,
};
tokio::spawn(async move { task.run().await });
Self { sender }
Expand All @@ -55,7 +58,7 @@ struct LazerPublisherTask {
config: Config,
receiver: Receiver<FeedUpdate>,
pending_updates: Vec<FeedUpdate>,
relayer_senders: Vec<RelayerSender>,
relayer_sender: broadcast::Sender<SignedLazerTransaction>,
}

impl LazerPublisherTask {
Expand Down Expand Up @@ -108,7 +111,7 @@ impl LazerPublisherTask {
}

let publisher_update = PublisherUpdate {
updates: self.pending_updates.clone(),
updates: self.pending_updates.drain(..).collect(),
publisher_timestamp: MessageField::some(Timestamp::now()),
special_fields: Default::default(),
};
Expand Down Expand Up @@ -137,14 +140,13 @@ impl LazerPublisherTask {
payload: Some(buf),
special_fields: Default::default(),
};
futures::future::join_all(
self.relayer_senders
.iter_mut()
.map(|relayer_sender| relayer_sender.sender.send(signed_lazer_transaction.clone())),
)
.await;
match self.relayer_sender.send(signed_lazer_transaction.clone()) {
Ok(_) => (),
Err(e) => {
tracing::error!("Error sending transaction to relayer receivers: {e}");
}
}

self.pending_updates.clear();
Ok(())
}
}
2 changes: 1 addition & 1 deletion pyth-lazer-agent/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ mod websocket_utils;
#[derive(Parser)]
#[command(version)]
struct Cli {
#[clap(short, long, default_value = "config.toml")]
#[clap(short, long, default_value = "config/config.toml")]
config: String,
}

Expand Down
66 changes: 34 additions & 32 deletions pyth-lazer-agent/src/relayer_session.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use crate::config::CHANNEL_CAPACITY;
use anyhow::{Result, bail};
use backoff::ExponentialBackoffBuilder;
use backoff::backoff::Backoff;
Expand All @@ -7,36 +6,17 @@ use futures_util::{SinkExt, StreamExt};
use http::HeaderValue;
use protobuf::Message;
use pyth_lazer_publisher_sdk::transaction::SignedLazerTransaction;
use std::time::Duration;
use std::time::{Duration, Instant};
use tokio::net::TcpStream;
use tokio::{
select,
sync::mpsc::{self, Receiver, Sender},
};
use tokio::select;
use tokio::sync::broadcast;
use tokio_tungstenite::tungstenite::client::IntoClientRequest;
use tokio_tungstenite::{
MaybeTlsStream, WebSocketStream, connect_async_with_config,
tungstenite::Message as TungsteniteMessage,
};
use url::Url;

pub struct RelayerSender {
pub(crate) sender: Sender<SignedLazerTransaction>,
}

impl RelayerSender {
pub async fn new(url: &Url, token: &str) -> Self {
let (sender, receiver) = mpsc::channel(CHANNEL_CAPACITY);
let mut task = RelayerSessionTask {
url: url.clone(),
token: token.to_owned(),
receiver,
};
tokio::spawn(async move { task.run().await });
Self { sender }
}
}

type RelayerWsSender = SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, TungsteniteMessage>;
type RelayerWsReceiver = SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>;

Expand Down Expand Up @@ -78,11 +58,11 @@ impl RelayerWsSession {
}
}

struct RelayerSessionTask {
pub struct RelayerSessionTask {
// connection state
url: Url,
token: String,
receiver: Receiver<SignedLazerTransaction>,
pub url: Url,
pub token: String,
pub receiver: broadcast::Receiver<SignedLazerTransaction>,
}

impl RelayerSessionTask {
Expand All @@ -95,6 +75,8 @@ impl RelayerSessionTask {
.with_max_elapsed_time(None)
.build();

const FAILURE_RESET_TIME: Duration = Duration::from_secs(300);
let mut first_failure_time = Instant::now();
let mut failure_count = 0;

loop {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still doesn't use retry, I know. 😄

Expand All @@ -104,6 +86,12 @@ impl RelayerSessionTask {
return;
}
Err(e) => {
if first_failure_time.elapsed() > FAILURE_RESET_TIME {
Copy link

Copilot AI Jun 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently failure_count is only reset after the timer expires; on a successful WebSocket session you should also reset failure_count (and possibly first_failure_time) so that retries start fresh after each successful connection.

Copilot uses AI. Check for mistakes.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A successful session will go indefinitely so I'll leave this as is.

failure_count = 0;
first_failure_time = Instant::now();
backoff.reset();
}

failure_count += 1;
let next_backoff = backoff.next_backoff().unwrap_or(max_interval);
tracing::error!(
Expand All @@ -129,11 +117,25 @@ impl RelayerSessionTask {

loop {
select! {
Some(transaction) = self.receiver.recv() => {
if let Err(e) = relayer_ws_session.send_transaction(transaction).await
{
tracing::error!("Error publishing transaction to Lazer relayer: {e:?}");
bail!("Failed to publish transaction to Lazer relayer: {e:?}");
recv_result = self.receiver.recv() => {
match recv_result {
Ok(transaction) => {
if let Err(e) = relayer_ws_session.send_transaction(transaction).await {
tracing::error!("Error publishing transaction to Lazer relayer: {e:?}");
bail!("Failed to publish transaction to Lazer relayer: {e:?}");
}
},
Err(e) => {
match e {
broadcast::error::RecvError::Closed => {
tracing::error!("transaction broadcast channel closed");
bail!("transaction broadcast channel closed");
}
broadcast::error::RecvError::Lagged(skipped_count) => {
tracing::warn!("transaction broadcast channel lagged by {skipped_count} messages");
}
}
}
}
}
// Handle messages from the relayers, such as errors if we send a bad update
Expand Down