11use {
22 super :: {
3- super :: store:: {
4- local,
5- PriceIdentifier ,
6- } ,
3+ super :: store:: PriceIdentifier ,
74 api:: {
85 NotifyPrice ,
96 NotifyPriceSched ,
2926
3027pub mod api;
3128pub mod global;
29+ pub mod local;
3230pub use api:: {
3331 notifier,
3432 AdapterApi ,
@@ -68,14 +66,14 @@ pub struct Adapter {
6866 /// The fixed interval at which Notify Price Sched notifications are sent
6967 notify_price_sched_interval_duration : Duration ,
7068
71- /// Channel on which to communicate with the local store
72- local_store_tx : mpsc:: Sender < local:: Message > ,
73-
7469 /// The logger
7570 logger : Logger ,
7671
7772 /// Global store for managing the unified state of Pyth-on-Solana networks.
7873 global_store : global:: Store ,
74+
75+ /// Local store for managing the unpushed state.
76+ local_store : local:: Store ,
7977}
8078
8179/// Represents a single Notify Price Sched subscription
@@ -95,19 +93,15 @@ struct NotifyPriceSubscription {
9593}
9694
9795impl Adapter {
98- pub async fn new (
99- config : Config ,
100- local_store_tx : mpsc:: Sender < local:: Message > ,
101- logger : Logger ,
102- ) -> Self {
96+ pub async fn new ( config : Config , logger : Logger ) -> Self {
10397 let registry = & mut * PROMETHEUS_REGISTRY . lock ( ) . await ;
10498 Adapter {
10599 global_store : global:: Store :: new ( logger. clone ( ) , registry) ,
100+ local_store : local:: Store :: new ( logger. clone ( ) , registry) ,
106101 subscription_id_seq : 1 . into ( ) ,
107102 notify_price_sched_subscriptions : RwLock :: new ( HashMap :: new ( ) ) ,
108103 notify_price_subscriptions : RwLock :: new ( HashMap :: new ( ) ) ,
109104 notify_price_sched_interval_duration : config. notify_price_sched_interval_duration ,
110- local_store_tx,
111105 logger,
112106 }
113107 }
@@ -128,8 +122,9 @@ mod tests {
128122 } ,
129123 crate :: agent:: {
130124 pythd:: {
131- api ,
125+ adapter :: local :: LocalStore ,
132126 api:: {
127+ self ,
133128 NotifyPrice ,
134129 NotifyPriceSched ,
135130 PriceAccountMetadata ,
@@ -140,7 +135,6 @@ mod tests {
140135 } ,
141136 } ,
142137 solana,
143- store:: local,
144138 } ,
145139 iobuffer:: IoBuffer ,
146140 pyth_sdk:: Identifier ,
@@ -172,29 +166,26 @@ mod tests {
172166 } ;
173167
174168 struct TestAdapter {
175- adapter : Arc < Adapter > ,
176- local_store_rx : mpsc:: Receiver < local:: Message > ,
177- shutdown_tx : broadcast:: Sender < ( ) > ,
178- jh : JoinHandle < ( ) > ,
169+ adapter : Arc < Adapter > ,
170+ shutdown_tx : broadcast:: Sender < ( ) > ,
171+ jh : JoinHandle < ( ) > ,
179172 }
180173
181174 async fn setup ( ) -> TestAdapter {
182175 // Create and spawn an adapter
183- let ( local_store_tx, local_store_rx) = mpsc:: channel ( 1000 ) ;
184176 let notify_price_sched_interval_duration = Duration :: from_nanos ( 10 ) ;
185177 let logger = slog_test:: new_test_logger ( IoBuffer :: new ( ) ) ;
186178 let config = Config {
187179 notify_price_sched_interval_duration,
188180 } ;
189- let adapter = Arc :: new ( Adapter :: new ( config, local_store_tx , logger) . await ) ;
181+ let adapter = Arc :: new ( Adapter :: new ( config, logger) . await ) ;
190182 let ( shutdown_tx, _) = broadcast:: channel ( 1 ) ;
191183
192184 // Spawn Price Notifier
193185 let jh = tokio:: spawn ( notifier ( adapter. clone ( ) , shutdown_tx. subscribe ( ) ) ) ;
194186
195187 TestAdapter {
196188 adapter,
197- local_store_rx,
198189 shutdown_tx,
199190 jh,
200191 }
@@ -1379,7 +1370,7 @@ mod tests {
13791370 #[ tokio:: test( flavor = "multi_thread" , worker_threads = 2 ) ]
13801371 async fn test_update_price ( ) {
13811372 // Start the test adapter
1382- let mut test_adapter = setup ( ) . await ;
1373+ let test_adapter = setup ( ) . await ;
13831374
13841375 // Send an Update Price message
13851376 let account = "CkMrDWtmFJZcmAUC11qNaWymbXQKvnRx4cq1QudLav7t"
@@ -1394,18 +1385,14 @@ mod tests {
13941385 . unwrap ( ) ;
13951386
13961387 // Check that the local store indeed received the correct update
1397- match test_adapter. local_store_rx . recv ( ) . await . unwrap ( ) {
1398- local:: Message :: Update {
1399- price_identifier,
1400- price_info,
1401- } => {
1402- assert_eq ! ( price_identifier, Identifier :: new( account. to_bytes( ) ) ) ;
1403- assert_eq ! ( price_info. price, price) ;
1404- assert_eq ! ( price_info. conf, conf) ;
1405- assert_eq ! ( price_info. status, PriceStatus :: Trading ) ;
1406- }
1407- _ => panic ! ( "Uexpected message received by local store from adapter" ) ,
1408- } ;
1388+ let price_infos = LocalStore :: get_all_price_infos ( & * test_adapter. adapter ) . await ;
1389+ let price_info = price_infos
1390+ . get ( & Identifier :: new ( account. to_bytes ( ) ) )
1391+ . unwrap ( ) ;
1392+
1393+ assert_eq ! ( price_info. price, price) ;
1394+ assert_eq ! ( price_info. conf, conf) ;
1395+ assert_eq ! ( price_info. status, PriceStatus :: Trading ) ;
14091396
14101397 let _ = test_adapter. shutdown_tx . send ( ( ) ) ;
14111398 test_adapter. jh . abort ( ) ;
0 commit comments