1- use crate :: config:: CHANNEL_CAPACITY ;
21use anyhow:: { Result , bail} ;
32use backoff:: ExponentialBackoffBuilder ;
43use backoff:: backoff:: Backoff ;
@@ -7,36 +6,17 @@ use futures_util::{SinkExt, StreamExt};
76use http:: HeaderValue ;
87use protobuf:: Message ;
98use pyth_lazer_publisher_sdk:: transaction:: SignedLazerTransaction ;
10- use std:: time:: Duration ;
9+ use std:: time:: { Duration , Instant } ;
1110use tokio:: net:: TcpStream ;
12- use tokio:: {
13- select,
14- sync:: mpsc:: { self , Receiver , Sender } ,
15- } ;
11+ use tokio:: select;
12+ use tokio:: sync:: broadcast;
1613use tokio_tungstenite:: tungstenite:: client:: IntoClientRequest ;
1714use tokio_tungstenite:: {
1815 MaybeTlsStream , WebSocketStream , connect_async_with_config,
1916 tungstenite:: Message as TungsteniteMessage ,
2017} ;
2118use url:: Url ;
2219
23- pub struct RelayerSender {
24- pub ( crate ) sender : Sender < SignedLazerTransaction > ,
25- }
26-
27- impl RelayerSender {
28- pub async fn new ( url : & Url , token : & str ) -> Self {
29- let ( sender, receiver) = mpsc:: channel ( CHANNEL_CAPACITY ) ;
30- let mut task = RelayerSessionTask {
31- url : url. clone ( ) ,
32- token : token. to_owned ( ) ,
33- receiver,
34- } ;
35- tokio:: spawn ( async move { task. run ( ) . await } ) ;
36- Self { sender }
37- }
38- }
39-
4020type RelayerWsSender = SplitSink < WebSocketStream < MaybeTlsStream < TcpStream > > , TungsteniteMessage > ;
4121type RelayerWsReceiver = SplitStream < WebSocketStream < MaybeTlsStream < TcpStream > > > ;
4222
@@ -78,11 +58,11 @@ impl RelayerWsSession {
7858 }
7959}
8060
81- struct RelayerSessionTask {
61+ pub struct RelayerSessionTask {
8262 // connection state
83- url : Url ,
84- token : String ,
85- receiver : Receiver < SignedLazerTransaction > ,
63+ pub url : Url ,
64+ pub token : String ,
65+ pub receiver : broadcast :: Receiver < SignedLazerTransaction > ,
8666}
8767
8868impl RelayerSessionTask {
@@ -95,6 +75,8 @@ impl RelayerSessionTask {
9575 . with_max_elapsed_time ( None )
9676 . build ( ) ;
9777
78+ const FAILURE_RESET_TIME : Duration = Duration :: from_secs ( 300 ) ;
79+ let mut first_failure_time = Instant :: now ( ) ;
9880 let mut failure_count = 0 ;
9981
10082 loop {
@@ -104,6 +86,12 @@ impl RelayerSessionTask {
10486 return ;
10587 }
10688 Err ( e) => {
89+ if first_failure_time. elapsed ( ) > FAILURE_RESET_TIME {
90+ failure_count = 0 ;
91+ first_failure_time = Instant :: now ( ) ;
92+ backoff. reset ( ) ;
93+ }
94+
10795 failure_count += 1 ;
10896 let next_backoff = backoff. next_backoff ( ) . unwrap_or ( max_interval) ;
10997 tracing:: error!(
@@ -129,11 +117,25 @@ impl RelayerSessionTask {
129117
130118 loop {
131119 select ! {
132- Some ( transaction) = self . receiver. recv( ) => {
133- if let Err ( e) = relayer_ws_session. send_transaction( transaction) . await
134- {
135- tracing:: error!( "Error publishing transaction to Lazer relayer: {e:?}" ) ;
136- bail!( "Failed to publish transaction to Lazer relayer: {e:?}" ) ;
120+ recv_result = self . receiver. recv( ) => {
121+ match recv_result {
122+ Ok ( transaction) => {
123+ if let Err ( e) = relayer_ws_session. send_transaction( transaction) . await {
124+ tracing:: error!( "Error publishing transaction to Lazer relayer: {e:?}" ) ;
125+ bail!( "Failed to publish transaction to Lazer relayer: {e:?}" ) ;
126+ }
127+ } ,
128+ Err ( e) => {
129+ match e {
130+ broadcast:: error:: RecvError :: Closed => {
131+ tracing:: error!( "transaction broadcast channel closed" ) ;
132+ bail!( "transaction broadcast channel closed" ) ;
133+ }
134+ broadcast:: error:: RecvError :: Lagged ( skipped_count) => {
135+ tracing:: warn!( "transaction broadcast channel lagged by {skipped_count} messages" ) ;
136+ }
137+ }
138+ }
137139 }
138140 }
139141 // Handle messages from the relayers, such as errors if we send a bad update
0 commit comments