1+ use std:: time:: Duration ;
2+
13use base64:: Engine ;
2- use futures_util:: StreamExt ;
3- use pyth_lazer_client:: { AnyResponse , LazerClient } ;
4+ use pyth_lazer_client:: backoff:: PythLazerExponentialBackoffBuilder ;
5+ use pyth_lazer_client:: client:: PythLazerClientBuilder ;
6+ use pyth_lazer_client:: ws_connection:: AnyResponse ;
47use pyth_lazer_protocol:: message:: {
58 EvmMessage , LeEcdsaMessage , LeUnsignedMessage , Message , SolanaMessage ,
69} ;
@@ -9,8 +12,10 @@ use pyth_lazer_protocol::router::{
912 Channel , DeliveryFormat , FixedRate , Format , JsonBinaryEncoding , PriceFeedId , PriceFeedProperty ,
1013 SubscriptionParams , SubscriptionParamsRepr ,
1114} ;
12- use pyth_lazer_protocol:: subscription:: { Request , Response , SubscribeRequest , SubscriptionId } ;
15+ use pyth_lazer_protocol:: subscription:: { Response , SubscribeRequest , SubscriptionId } ;
1316use tokio:: pin;
17+ use tracing:: level_filters:: LevelFilter ;
18+ use tracing_subscriber:: EnvFilter ;
1419
1520fn get_lazer_access_token ( ) -> String {
1621 // Place your access token in your env at LAZER_ACCESS_TOKEN or set it here
@@ -20,11 +25,32 @@ fn get_lazer_access_token() -> String {
2025
2126#[ tokio:: main]
2227async fn main ( ) -> anyhow:: Result < ( ) > {
28+ tracing_subscriber:: fmt ( )
29+ . with_env_filter (
30+ EnvFilter :: builder ( )
31+ . with_default_directive ( LevelFilter :: INFO . into ( ) )
32+ . from_env ( ) ?,
33+ )
34+ . json ( )
35+ . init ( ) ;
36+
2337 // Create and start the client
24- let mut client = LazerClient :: new (
25- "wss://pyth-lazer.dourolabs.app/v1/stream" ,
26- & get_lazer_access_token ( ) ,
27- ) ?;
38+ let mut client = PythLazerClientBuilder :: new ( get_lazer_access_token ( ) )
39+ // Optionally override the default endpoints
40+ . with_endpoints ( vec ! [
41+ "wss://pyth-lazer-0.dourolabs.app/v1/stream" . parse( ) ?,
42+ "wss://pyth-lazer-1.dourolabs.app/v1/stream" . parse( ) ?,
43+ ] )
44+ // Optionally set the number of connections
45+ . with_num_connections ( 4 )
46+ // Optionally set the backoff strategy
47+ . with_backoff ( PythLazerExponentialBackoffBuilder :: default ( ) . build ( ) )
48+ // Optionally set the timeout for each connection
49+ . with_timeout ( Duration :: from_secs ( 5 ) )
50+ // Optionally set the channel capacity for responses
51+ . with_channel_capacity ( 1000 )
52+ . build ( ) ?;
53+
2854 let stream = client. start ( ) . await ?;
2955 pin ! ( stream) ;
3056
@@ -72,16 +98,16 @@ async fn main() -> anyhow::Result<()> {
7298 ] ;
7399
74100 for req in subscription_requests {
75- client. subscribe ( Request :: Subscribe ( req) ) . await ?;
101+ client. subscribe ( req) . await ?;
76102 }
77103
78104 println ! ( "Subscribed to price feeds. Waiting for updates..." ) ;
79105
80106 // Process the first few updates
81107 let mut count = 0 ;
82- while let Some ( msg) = stream. next ( ) . await {
108+ while let Some ( msg) = stream. recv ( ) . await {
83109 // The stream gives us base64-encoded binary messages. We need to decode, parse, and verify them.
84- match msg? {
110+ match msg {
85111 AnyResponse :: Json ( msg) => match msg {
86112 Response :: StreamUpdated ( update) => {
87113 println ! ( "Received a JSON update for {:?}" , update. subscription_id) ;
@@ -189,8 +215,6 @@ async fn main() -> anyhow::Result<()> {
189215 println ! ( "Unsubscribed from {sub_id:?}" ) ;
190216 }
191217
192- tokio:: time:: sleep ( tokio:: time:: Duration :: from_millis ( 500 ) ) . await ;
193- client. close ( ) . await ?;
194218 Ok ( ( ) )
195219}
196220
0 commit comments