5353} ;
5454
5555pub const PING_INTERVAL_DURATION : Duration = Duration :: from_secs ( 30 ) ;
56+ pub const NOTIFICATIONS_CHAN_LEN : usize = 1000 ;
5657
5758pub async fn ws_route_handler (
5859 ws : WebSocketUpgrade ,
@@ -64,19 +65,14 @@ pub async fn ws_route_handler(
6465async fn websocket_handler ( stream : WebSocket , state : super :: State ) {
6566 let ws_state = state. ws . clone ( ) ;
6667 let id = ws_state. subscriber_counter . fetch_add ( 1 , Ordering :: SeqCst ) ;
67-
68- let ( sender, receiver) = stream. split ( ) ;
69-
70- // TODO: Use a configured value for the buffer size or make it const static
71- // TODO: Use redis stream to source the updates instead of a channel
72- let ( tx, rx) = mpsc:: channel :: < ( ) > ( 1000 ) ;
73-
74- ws_state. subscribers . insert ( id, tx) ;
75-
7668 log:: debug!( "New websocket connection, assigning id: {}" , id) ;
7769
78- let mut subscriber = Subscriber :: new ( id, state. store . clone ( ) , rx, receiver, sender) ;
70+ let ( notify_sender, notify_receiver) = mpsc:: channel :: < ( ) > ( NOTIFICATIONS_CHAN_LEN ) ;
71+ let ( sender, receiver) = stream. split ( ) ;
72+ let mut subscriber =
73+ Subscriber :: new ( id, state. store . clone ( ) , notify_receiver, receiver, sender) ;
7974
75+ ws_state. subscribers . insert ( id, notify_sender) ;
8076 subscriber. run ( ) . await ;
8177}
8278
@@ -88,7 +84,7 @@ pub struct Subscriber {
8884 id : SubscriberId ,
8985 closed : bool ,
9086 store : Arc < Store > ,
91- update_rx : mpsc:: Receiver < ( ) > ,
87+ notify_receiver : mpsc:: Receiver < ( ) > ,
9288 receiver : SplitStream < WebSocket > ,
9389 sender : SplitSink < WebSocket , Message > ,
9490 price_feeds_with_config : HashMap < PriceIdentifier , PriceFeedClientConfig > ,
@@ -100,15 +96,15 @@ impl Subscriber {
10096 pub fn new (
10197 id : SubscriberId ,
10298 store : Arc < Store > ,
103- update_rx : mpsc:: Receiver < ( ) > ,
99+ notify_receiver : mpsc:: Receiver < ( ) > ,
104100 receiver : SplitStream < WebSocket > ,
105101 sender : SplitSink < WebSocket , Message > ,
106102 ) -> Self {
107103 Self {
108104 id,
109105 closed : false ,
110106 store,
111- update_rx ,
107+ notify_receiver ,
112108 receiver,
113109 sender,
114110 price_feeds_with_config : HashMap :: new ( ) ,
@@ -128,7 +124,7 @@ impl Subscriber {
128124
129125 async fn handle_next ( & mut self ) -> Result < ( ) > {
130126 tokio:: select! {
131- maybe_update_feeds = self . update_rx . recv( ) => {
127+ maybe_update_feeds = self . notify_receiver . recv( ) => {
132128 if maybe_update_feeds. is_none( ) {
133129 return Err ( anyhow!( "Update channel closed. This should never happen. Closing connection." ) ) ;
134130 } ;
@@ -257,9 +253,7 @@ impl Subscriber {
257253 }
258254}
259255
260- pub async fn dispatch_updates ( state : super :: State ) {
261- let ws_state = state. ws . clone ( ) ;
262-
256+ pub async fn notify_updates ( ws_state : Arc < WsState > ) {
263257 let closed_subscribers: Vec < Option < SubscriberId > > = join_all (
264258 ws_state
265259 . subscribers
0 commit comments