1- use crate :: jrpc_handle:: jrpc_handler_inner;
2- use crate :: publisher_handle:: publisher_inner_handler;
3- use crate :: websocket_utils:: { handle_websocket_error, send_text} ;
1+ use crate :: jrpc_handle:: { JrpcConnectionContext , handle_jrpc} ;
2+ use crate :: publisher_handle:: handle_publisher;
43use crate :: {
5- config:: Config , lazer_publisher:: LazerPublisher , publisher_handle:: PublisherConnectionContext ,
4+ config:: Config , http_server, lazer_publisher:: LazerPublisher ,
5+ publisher_handle:: PublisherConnectionContext ,
66} ;
7- use anyhow:: { Context , Result , bail} ;
8- use futures_util:: io:: { BufReader , BufWriter } ;
7+ use anyhow:: { Context , Result } ;
98use hyper:: body:: Incoming ;
109use hyper:: { Response , StatusCode , body:: Bytes , server:: conn:: http1, service:: service_fn} ;
1110use hyper_util:: rt:: TokioIo ;
12- use pyth_lazer_protocol:: publisher:: { ServerResponse , UpdateDeserializationErrorResponse } ;
1311use soketto:: {
1412 BoxedError ,
1513 handshake:: http:: { Server , is_upgrade_request} ,
1614} ;
1715use std:: fmt:: Debug ;
18- use std:: pin:: Pin ;
1916use std:: { io, net:: SocketAddr } ;
2017use tokio:: net:: { TcpListener , TcpStream } ;
21- use tokio:: { pin, select} ;
22- use tokio_util:: compat:: TokioAsyncReadCompatExt ;
23- use tracing:: { debug, error, info, instrument, warn} ;
18+ use tracing:: { debug, info, instrument, warn} ;
2419
2520type FullBody = http_body_util:: Full < Bytes > ;
26- pub type InnerHandlerResult = Pin < Box < dyn Future < Output = Result < Option < String > > > + Send > > ;
2721
2822#[ derive( Debug , Copy , Clone ) ]
2923pub enum PublisherRequest {
@@ -53,8 +47,10 @@ pub async fn run(config: Config, lazer_publisher: LazerPublisher) -> Result<()>
5347 let stream_addr = listener. accept ( ) . await ;
5448 let lazer_publisher_clone = lazer_publisher. clone ( ) ;
5549 let config = config. clone ( ) ;
56- tokio:: spawn ( async {
57- if let Err ( err) = try_handle_connection ( config, stream_addr, lazer_publisher_clone) . await {
50+ tokio:: spawn ( async move {
51+ if let Err ( err) =
52+ try_handle_connection ( config, stream_addr, lazer_publisher_clone) . await
53+ {
5854 warn ! ( "error while handling connection: {err:?}" ) ;
5955 }
6056 } ) ;
@@ -74,7 +70,12 @@ async fn try_handle_connection(
7470 TokioIo :: new ( stream) ,
7571 service_fn ( move |r| {
7672 let request = RelayerRequest ( r) ;
77- request_handler ( config. clone ( ) , request, remote_addr, lazer_publisher. clone ( ) )
73+ request_handler (
74+ config. clone ( ) ,
75+ request,
76+ remote_addr,
77+ lazer_publisher. clone ( ) ,
78+ )
7879 } ) ,
7980 )
8081 . with_upgrades ( )
@@ -136,27 +137,23 @@ async fn request_handler(
136137 request_type : publisher_request_type,
137138 _remote_addr : remote_addr,
138139 } ;
139-
140- tokio:: spawn ( handle_ws (
141- config,
140+ tokio:: spawn ( handle_publisher (
142141 server,
143142 request. 0 ,
144- lazer_publisher,
145143 publisher_connection_context,
146- publisher_inner_handler ,
144+ lazer_publisher ,
147145 ) ) ;
148146 Ok ( response. map ( |( ) | FullBody :: default ( ) ) )
149147 }
150148 Request :: JrpcV1 => {
151- tokio:: spawn ( handle_ws (
152- config,
149+ let publisher_connection_context = JrpcConnectionContext { } ;
150+ tokio:: spawn ( handle_jrpc (
151+ config. clone ( ) ,
153152 server,
154153 request. 0 ,
154+ publisher_connection_context,
155155 lazer_publisher,
156- ( ) ,
157- jrpc_handler_inner,
158156 ) ) ;
159-
160157 Ok ( response. map ( |( ) | FullBody :: default ( ) ) )
161158 }
162159 }
@@ -170,88 +167,3 @@ async fn request_handler(
170167 }
171168 }
172169}
173-
174- #[ instrument(
175- skip( server, request, lazer_publisher) ,
176- fields( component = "publisher_ws" )
177- ) ]
178- async fn handle_ws < T : Debug + Copy > (
179- config : Config ,
180- server : Server ,
181- request : http:: Request < Incoming > ,
182- lazer_publisher : LazerPublisher ,
183- context : T ,
184- inner_handler : fn ( Config , Vec < u8 > , LazerPublisher , T ) -> InnerHandlerResult ,
185- ) {
186- if let Err ( err) = try_handle_ws ( config, server, request, lazer_publisher, context, inner_handler) . await
187- {
188- handle_websocket_error ( err) ;
189- }
190- }
191-
192- #[ instrument(
193- skip( server, request, lazer_publisher) ,
194- fields( component = "publisher_ws" )
195- ) ]
196- async fn try_handle_ws < T : Debug + Copy > (
197- config : Config ,
198- server : Server ,
199- request : http:: Request < Incoming > ,
200- lazer_publisher : LazerPublisher ,
201- context : T ,
202- inner_handler : fn ( Config , Vec < u8 > , LazerPublisher , T ) -> InnerHandlerResult ,
203- ) -> Result < ( ) > {
204- let stream = hyper:: upgrade:: on ( request) . await ?;
205- let io = TokioIo :: new ( stream) ;
206- let stream = BufReader :: new ( BufWriter :: new ( io. compat ( ) ) ) ;
207- let ( mut ws_sender, mut ws_receiver) = server. into_builder ( stream) . finish ( ) ;
208-
209- let mut receive_buf = Vec :: new ( ) ;
210-
211- let mut error_count = 0u32 ;
212- const MAX_ERROR_LOG : u32 = 10u32 ;
213- const MAX_ERROR_DISCONNECT : u32 = 100u32 ;
214-
215- loop {
216- receive_buf. clear ( ) ;
217- {
218- // soketto is not cancel-safe, so we need to store the future and poll it
219- // in the inner loop.
220- let receive = async { ws_receiver. receive ( & mut receive_buf) . await } ;
221- pin ! ( receive) ;
222- loop {
223- select ! {
224- _result = & mut receive => {
225- break
226- }
227- }
228- }
229- }
230-
231- match inner_handler ( config. clone ( ) , receive_buf. clone ( ) , lazer_publisher. clone ( ) , context) . await {
232- Ok ( response) => {
233- if let Some ( response) = response {
234- send_text ( & mut ws_sender, & response) . await ?;
235- }
236- }
237- Err ( err) => {
238- error_count += 1 ;
239- if error_count <= MAX_ERROR_LOG {
240- warn ! ( "Error decoding message error: {err}" ) ;
241- }
242- if error_count >= MAX_ERROR_DISCONNECT {
243- error ! ( "Error threshold reached; disconnecting" ) ;
244- bail ! ( "Error threshold reached" ) ;
245- }
246- let error_json = & serde_json:: to_string :: < ServerResponse > (
247- & UpdateDeserializationErrorResponse {
248- error : format ! ( "failed to parse a binary message: {err}" ) ,
249- }
250- . into ( ) ,
251- ) ?;
252- send_text ( & mut ws_sender, error_json) . await ?;
253- continue ;
254- }
255- }
256- }
257- }
0 commit comments