@@ -132,8 +132,13 @@ impl LazerPublisherTask {
132132 return Ok ( ( ) ) ;
133133 }
134134
135+ let mut updates = self . pending_updates . drain ( ..) . collect ( ) ;
136+ if self . config . enable_update_deduplication {
137+ deduplicate_feed_updates ( & mut updates) ;
138+ }
139+
135140 let publisher_update = PublisherUpdate {
136- updates : self . pending_updates . drain ( .. ) . collect ( ) ,
141+ updates,
137142 publisher_timestamp : MessageField :: some ( Timestamp :: now ( ) ) ,
138143 special_fields : Default :: default ( ) ,
139144 } ;
@@ -173,13 +178,19 @@ impl LazerPublisherTask {
173178 }
174179}
175180
181+ fn deduplicate_feed_updates ( feed_updates : & mut Vec < FeedUpdate > ) {
182+ // assume that feed_updates is already sorted by timestamp for each feed_update.feed_id
183+ feed_updates. dedup_by_key ( |feed_update| ( feed_update. feed_id , feed_update. update . clone ( ) ) ) ;
184+ }
185+
176186#[ cfg( test) ]
177187mod tests {
178188 use crate :: config:: { CHANNEL_CAPACITY , Config } ;
179- use crate :: lazer_publisher:: LazerPublisherTask ;
189+ use crate :: lazer_publisher:: { LazerPublisherTask , deduplicate_feed_updates } ;
180190 use ed25519_dalek:: SigningKey ;
181191 use protobuf:: well_known_types:: timestamp:: Timestamp ;
182192 use protobuf:: { Message , MessageField } ;
193+ use pyth_lazer_protocol:: time:: TimestampUs ;
183194 use pyth_lazer_publisher_sdk:: publisher_update:: feed_update:: Update ;
184195 use pyth_lazer_publisher_sdk:: publisher_update:: { FeedUpdate , PriceUpdate } ;
185196 use pyth_lazer_publisher_sdk:: transaction:: { LazerTransaction , lazer_transaction} ;
@@ -212,6 +223,18 @@ mod tests {
212223 temp_file
213224 }
214225
226+ fn test_feed_update ( feed_id : u32 , timestamp : TimestampUs , price : i64 ) -> FeedUpdate {
227+ FeedUpdate {
228+ feed_id : Some ( feed_id) ,
229+ source_timestamp : MessageField :: some ( timestamp. into ( ) ) ,
230+ update : Some ( Update :: PriceUpdate ( PriceUpdate {
231+ price : Some ( price) ,
232+ ..PriceUpdate :: default ( )
233+ } ) ) ,
234+ special_fields : Default :: default ( ) ,
235+ }
236+ }
237+
215238 #[ tokio:: test]
216239 async fn test_lazer_exporter_task ( ) {
217240 let signing_key_file = get_private_key_file ( ) ;
@@ -224,6 +247,7 @@ mod tests {
224247 publish_keypair_path : PathBuf :: from ( signing_key_file. path ( ) ) ,
225248 publish_interval_duration : Duration :: from_millis ( 25 ) ,
226249 history_service_url : None ,
250+ enable_update_deduplication : false ,
227251 } ;
228252
229253 let ( relayer_sender, mut relayer_receiver) = broadcast:: channel ( CHANNEL_CAPACITY ) ;
@@ -274,4 +298,55 @@ mod tests {
274298 _ => panic ! ( "channel should have a transaction waiting" ) ,
275299 }
276300 }
301+
302+ #[ test]
303+ fn test_deduplicate_feed_updates ( ) {
304+ // let's consider a batch containing updates for a single feed. the updates are (ts, price):
305+ // - (1, 10)
306+ // - (2, 10)
307+ // - (3, 10)
308+ // - (4, 15)
309+ // - (5, 15)
310+ // - (6, 10)
311+ // we should only return (1, 10), (4, 15), (6, 10)
312+
313+ let updates = & mut vec ! [
314+ test_feed_update( 1 , TimestampUs :: from_millis( 1 ) . unwrap( ) , 10 ) ,
315+ test_feed_update( 1 , TimestampUs :: from_millis( 2 ) . unwrap( ) , 10 ) ,
316+ test_feed_update( 1 , TimestampUs :: from_millis( 3 ) . unwrap( ) , 10 ) ,
317+ test_feed_update( 1 , TimestampUs :: from_millis( 4 ) . unwrap( ) , 15 ) ,
318+ test_feed_update( 1 , TimestampUs :: from_millis( 5 ) . unwrap( ) , 15 ) ,
319+ test_feed_update( 1 , TimestampUs :: from_millis( 6 ) . unwrap( ) , 10 ) ,
320+ ] ;
321+
322+ let expected_updates = vec ! [
323+ test_feed_update( 1 , TimestampUs :: from_millis( 1 ) . unwrap( ) , 10 ) ,
324+ test_feed_update( 1 , TimestampUs :: from_millis( 4 ) . unwrap( ) , 15 ) ,
325+ test_feed_update( 1 , TimestampUs :: from_millis( 6 ) . unwrap( ) , 10 ) ,
326+ ] ;
327+
328+ deduplicate_feed_updates ( updates) ;
329+ assert_eq ! ( updates. to_vec( ) , expected_updates) ;
330+ }
331+
332+ #[ test]
333+ fn test_deduplicate_feed_updates_multiple_feeds ( ) {
334+ let updates = & mut vec ! [
335+ test_feed_update( 1 , TimestampUs :: from_millis( 1 ) . unwrap( ) , 10 ) ,
336+ test_feed_update( 1 , TimestampUs :: from_millis( 2 ) . unwrap( ) , 10 ) ,
337+ test_feed_update( 1 , TimestampUs :: from_millis( 3 ) . unwrap( ) , 10 ) ,
338+ test_feed_update( 2 , TimestampUs :: from_millis( 4 ) . unwrap( ) , 15 ) ,
339+ test_feed_update( 2 , TimestampUs :: from_millis( 5 ) . unwrap( ) , 15 ) ,
340+ test_feed_update( 2 , TimestampUs :: from_millis( 6 ) . unwrap( ) , 10 ) ,
341+ ] ;
342+
343+ let expected_updates = vec ! [
344+ test_feed_update( 1 , TimestampUs :: from_millis( 1 ) . unwrap( ) , 10 ) ,
345+ test_feed_update( 2 , TimestampUs :: from_millis( 4 ) . unwrap( ) , 15 ) ,
346+ test_feed_update( 2 , TimestampUs :: from_millis( 6 ) . unwrap( ) , 10 ) ,
347+ ] ;
348+
349+ deduplicate_feed_updates ( updates) ;
350+ assert_eq ! ( updates. to_vec( ) , expected_updates) ;
351+ }
277352}
0 commit comments