File tree Expand file tree Collapse file tree 3 files changed +25
-17
lines changed Expand file tree Collapse file tree 3 files changed +25
-17
lines changed Original file line number Diff line number Diff line change @@ -251,7 +251,7 @@ pub async fn store_update(state: &State, update: Update) -> Result<()> {
251251
252252 // Once the accumulator reaches a complete state for a specific slot
253253 // we can build the message states
254- let message_states = build_message_states ( accumulator_messages, wormhole_merkle_state) . await ?;
254+ let message_states = build_message_states ( accumulator_messages, wormhole_merkle_state) ?;
255255
256256 let message_state_keys = message_states
257257 . iter ( )
@@ -306,7 +306,7 @@ pub async fn store_update(state: &State, update: Update) -> Result<()> {
306306}
307307
308308#[ tracing:: instrument( skip( accumulator_messages, wormhole_merkle_state) ) ]
309- async fn build_message_states (
309+ fn build_message_states (
310310 accumulator_messages : AccumulatorMessages ,
311311 wormhole_merkle_state : WormholeMerkleState ,
312312) -> Result < Vec < MessageState > > {
Original file line number Diff line number Diff line change @@ -362,20 +362,27 @@ impl Subscriber {
362362 Err ( _) => {
363363 // The error can only happen when a price feed was available
364364 // and is no longer there as we check the price feed ids upon
365- // subscription. In this case we send an error message and
366- // close the connection.
367- self . sender
368- . send (
369- serde_json:: to_string ( & ServerResponseMessage :: Err {
370- error : "Some of the subscribed price feeds have been removed. Closing connection."
371- . to_string ( ) ,
372- } ) ?
373- . into ( ) ,
374- )
375- . await ?;
376- self . sender . close ( ) . await ?;
377- self . closed = true ;
378- return Ok ( ( ) ) ;
365+ // subscription. In this case we just remove the non-existing
366+ // price feed from the list and will keep sending updates for
367+ // the rest.
368+ let available_price_feed_ids =
369+ crate :: aggregate:: get_price_feed_ids ( & * self . store ) . await ;
370+
371+ self . price_feeds_with_config
372+ . retain ( |price_feed_id, _| available_price_feed_ids. contains ( price_feed_id) ) ;
373+
374+ let price_feed_ids = self
375+ . price_feeds_with_config
376+ . keys ( )
377+ . cloned ( )
378+ . collect :: < Vec < _ > > ( ) ;
379+
380+ crate :: aggregate:: get_price_feeds_with_update_data (
381+ & * self . store ,
382+ & price_feed_ids,
383+ RequestTime :: AtSlot ( event. slot ( ) ) ,
384+ )
385+ . await ?
379386 }
380387 } ;
381388
Original file line number Diff line number Diff line change @@ -228,7 +228,7 @@ impl AggregateCache for crate::state::State {
228228 async fn prune_removed_keys ( & self , current_keys : HashSet < MessageStateKey > ) {
229229 let mut message_cache = self . cache . message_cache . write ( ) . await ;
230230
231- // Sometiems , some keys are removed from the accumulator. We track which keys are not
231+ // Sometimes , some keys are removed from the accumulator. We track which keys are not
232232 // present in the message states and remove them from the cache.
233233 let keys_in_cache = message_cache
234234 . iter ( )
@@ -237,6 +237,7 @@ impl AggregateCache for crate::state::State {
237237
238238 for key in keys_in_cache {
239239 if !current_keys. contains ( & key) {
240+ tracing:: info!( "Feed {:?} seems to be removed. Removing it from cache" , key) ;
240241 message_cache. remove ( & key) ;
241242 }
242243 }
You can’t perform that action at this time.
0 commit comments