@@ -24,6 +24,9 @@ use tokio::{
2424 time:: interval,
2525} ;
2626use tracing:: error;
27+ use ttl_cache:: TtlCache ;
28+
29+ const DEDUP_CACHE_SIZE : usize = 100_000 ;
2730
2831#[ derive( Clone , Debug ) ]
2932pub struct LazerPublisher {
@@ -88,6 +91,7 @@ impl LazerPublisher {
8891 pending_updates : Vec :: new ( ) ,
8992 relayer_sender,
9093 signing_key,
94+ ttl_cache : TtlCache :: new ( DEDUP_CACHE_SIZE ) ,
9195 } ;
9296 tokio:: spawn ( async move { task. run ( ) . await } ) ;
9397 Self {
@@ -109,6 +113,7 @@ struct LazerPublisherTask {
109113 pending_updates : Vec < FeedUpdate > ,
110114 relayer_sender : broadcast:: Sender < SignedLazerTransaction > ,
111115 signing_key : SigningKey ,
116+ ttl_cache : TtlCache < u32 , FeedUpdate > ,
112117}
113118
114119impl LazerPublisherTask {
@@ -136,7 +141,16 @@ impl LazerPublisherTask {
136141 let mut updates: Vec < FeedUpdate > = self . pending_updates . drain ( ..) . collect ( ) ;
137142 updates. sort_by_key ( |u| u. source_timestamp . as_ref ( ) . map ( |t| ( t. seconds , t. nanos ) ) ) ;
138143 if self . config . enable_update_deduplication {
139- updates = deduplicate_feed_updates ( & updates) ?;
144+ updates = deduplicate_feed_updates_in_tx ( & updates) ?;
145+ deduplicate_feed_updates (
146+ & mut updates,
147+ & mut self . ttl_cache ,
148+ self . config . update_deduplication_ttl ,
149+ ) ;
150+ }
151+
152+ if updates. is_empty ( ) {
153+ return Ok ( ( ) ) ;
140154 }
141155
142156 let publisher_update = PublisherUpdate {
@@ -182,7 +196,9 @@ impl LazerPublisherTask {
182196
183197/// For each feed, keep the latest data. Among updates with the same data, keep the one with the earliest timestamp.
184198/// Assumes the input is sorted by timestamp ascending.
185- fn deduplicate_feed_updates ( sorted_feed_updates : & Vec < FeedUpdate > ) -> Result < Vec < FeedUpdate > > {
199+ fn deduplicate_feed_updates_in_tx (
200+ sorted_feed_updates : & Vec < FeedUpdate > ,
201+ ) -> Result < Vec < FeedUpdate > > {
186202 let mut deduped_feed_updates = HashMap :: new ( ) ;
187203 for update in sorted_feed_updates {
188204 let entry = deduped_feed_updates. entry ( update. feed_id ) . or_insert ( update) ;
@@ -193,10 +209,35 @@ fn deduplicate_feed_updates(sorted_feed_updates: &Vec<FeedUpdate>) -> Result<Vec
193209 Ok ( deduped_feed_updates. into_values ( ) . cloned ( ) . collect ( ) )
194210}
195211
212+ fn deduplicate_feed_updates (
213+ sorted_feed_updates : & mut Vec < FeedUpdate > ,
214+ ttl_cache : & mut TtlCache < u32 , FeedUpdate > ,
215+ ttl : std:: time:: Duration ,
216+ ) {
217+ sorted_feed_updates. retain ( |update| {
218+ let feed_id = match update. feed_id {
219+ Some ( id) => id,
220+ None => return false , // drop updates without feed_id
221+ } ;
222+
223+ if let Some ( cached_feed) = ttl_cache. get ( & feed_id) {
224+ if cached_feed. update == update. update {
225+ // drop if the same update is already in the cache
226+ return false ;
227+ }
228+ }
229+
230+ ttl_cache. insert ( feed_id, update. clone ( ) , ttl) ;
231+ true
232+ } ) ;
233+ }
234+
196235#[ cfg( test) ]
197236mod tests {
198237 use crate :: config:: { CHANNEL_CAPACITY , Config } ;
199- use crate :: lazer_publisher:: { LazerPublisherTask , deduplicate_feed_updates} ;
238+ use crate :: lazer_publisher:: {
239+ DEDUP_CACHE_SIZE , LazerPublisherTask , deduplicate_feed_updates_in_tx,
240+ } ;
200241 use ed25519_dalek:: SigningKey ;
201242 use protobuf:: well_known_types:: timestamp:: Timestamp ;
202243 use protobuf:: { Message , MessageField } ;
@@ -210,6 +251,7 @@ mod tests {
210251 use tempfile:: NamedTempFile ;
211252 use tokio:: sync:: broadcast:: error:: TryRecvError ;
212253 use tokio:: sync:: { broadcast, mpsc} ;
254+ use ttl_cache:: TtlCache ;
213255 use url:: Url ;
214256
215257 fn get_private_key ( ) -> SigningKey {
@@ -258,6 +300,7 @@ mod tests {
258300 publish_interval_duration : Duration :: from_millis ( 25 ) ,
259301 history_service_url : None ,
260302 enable_update_deduplication : false ,
303+ update_deduplication_ttl : Default :: default ( ) ,
261304 } ;
262305
263306 let ( relayer_sender, mut relayer_receiver) = broadcast:: channel ( CHANNEL_CAPACITY ) ;
@@ -268,6 +311,7 @@ mod tests {
268311 pending_updates : Vec :: new ( ) ,
269312 relayer_sender,
270313 signing_key,
314+ ttl_cache : TtlCache :: new ( DEDUP_CACHE_SIZE ) ,
271315 } ;
272316 tokio:: spawn ( async move { task. run ( ) . await } ) ;
273317
@@ -337,7 +381,7 @@ mod tests {
337381 10 ,
338382 ) ] ;
339383
340- let deduped_updates = deduplicate_feed_updates ( updates) . unwrap ( ) ;
384+ let deduped_updates = deduplicate_feed_updates_in_tx ( updates) . unwrap ( ) ;
341385 assert_eq ! ( deduped_updates, expected_updates) ;
342386 }
343387
@@ -357,7 +401,7 @@ mod tests {
357401 test_feed_update( 2 , TimestampUs :: from_millis( 6 ) . unwrap( ) , 10 ) ,
358402 ] ;
359403
360- let mut deduped_updates = deduplicate_feed_updates ( updates) . unwrap ( ) ;
404+ let mut deduped_updates = deduplicate_feed_updates_in_tx ( updates) . unwrap ( ) ;
361405 deduped_updates. sort_by_key ( |u| u. feed_id ) ;
362406 assert_eq ! ( deduped_updates, expected_updates) ;
363407 }
@@ -384,7 +428,7 @@ mod tests {
384428 test_feed_update( 2 , TimestampUs :: from_millis( 12 ) . unwrap( ) , 10 ) ,
385429 ] ;
386430
387- let mut deduped_updates = deduplicate_feed_updates ( updates) . unwrap ( ) ;
431+ let mut deduped_updates = deduplicate_feed_updates_in_tx ( updates) . unwrap ( ) ;
388432 deduped_updates. sort_by_key ( |u| u. feed_id ) ;
389433 assert_eq ! ( deduped_updates, expected_updates) ;
390434 }
0 commit comments