From 7eb1c8decc858288f145b64082ac7e8ffbfc5c7f Mon Sep 17 00:00:00 2001 From: Jayant Krishnamurthy Date: Thu, 28 Apr 2022 10:25:33 -0700 Subject: [PATCH 01/11] add code to flush batches early --- pc/manager.cpp | 21 +++++++++++++++------ pc/user.cpp | 16 +++++++++++++--- pc/user.hpp | 4 ++++ 3 files changed, 32 insertions(+), 9 deletions(-) diff --git a/pc/manager.cpp b/pc/manager.cpp index bd92c2ec2..9aa50d9b2 100644 --- a/pc/manager.cpp +++ b/pc/manager.cpp @@ -500,11 +500,18 @@ void manager::poll( bool do_wait ) tconn_.reconnect(); } - // submit new quotes while connected if ( has_status( PC_PYTH_RPC_CONNECTED ) && !hconn_.get_is_err() && ( !wconn_ || !wconn_->get_is_err() ) ) { + // request product quotes from pythd's clients while connected poll_schedule(); + + // send any pending complete price update batches to solana + for( user *uptr = olist_.first(); uptr; uptr = uptr->get_next() ) { + if (uptr->num_pending_upds() >= get_max_batch_size()) { + uptr->send_pending_upds(get_max_batch_size()); + } + } } else { reconnect_rpc(); } @@ -743,6 +750,13 @@ void manager::on_response( rpc::get_slot *res ) clnt_.send( breq_ ); } + if (has_status( PC_PYTH_RPC_CONNECTED ) && !is_pub_) { + // Flush any partial batches of updates, as we've reached the end of the list of products. + for( user *uptr = olist_.first(); uptr; uptr = uptr->get_next() ) { + uptr->send_pending_upds(); + } + } + // reset submit if ( !is_pub_ ) { kidx_ = 0; @@ -758,11 +772,6 @@ void manager::on_response( rpc::get_slot *res ) if ( has_status( PC_PYTH_RPC_CONNECTED ) ) { - // New slot received, so flush all pending updates for all active users - for( user *uptr = olist_.first(); uptr; uptr = uptr->get_next() ) { - uptr->send_pending_upds(); - } - if ( sub_ ) { sub_->on_slot_publish( this ); } diff --git a/pc/user.cpp b/pc/user.cpp index 92792b1ca..e652835aa 100644 --- a/pc/user.cpp +++ b/pc/user.cpp @@ -330,17 +330,27 @@ void user::parse_get_product( uint32_t tok, uint32_t itok ) add_tail( itok ); } -void user::send_pending_upds() +uint32_t user::num_pending_upds() +{ + return pending_vec_.size(); +} + +void user::send_pending_upds(uint32_t n) { if ( pending_vec_.empty() ) { return; } - if ( !price::send( pending_vec_.data(), pending_vec_.size()) ) { + uint32_t n_sent = n; + if (pending_vec_.size() < n) { + n_sent = pending_vec_.size(); + } + + if ( !price::send( pending_vec_.data(), n_sent) ) { add_error( 0, PC_BATCH_SEND_FAILED, "batch send failed - please check the pyth logs" ); } - pending_vec_.clear(); + pending_vec_.erase(0, n_sent); } void user::parse_get_all_products( uint32_t itok ) diff --git a/pc/user.hpp b/pc/user.hpp index c5e50ff26..a146faa45 100644 --- a/pc/user.hpp +++ b/pc/user.hpp @@ -43,7 +43,11 @@ namespace pc // symbol price schedule callback void on_response( price_sched *, uint64_t ) override; + // number of pending price updates that are queued to be sent. + uint32_t num_pending_upds(); + // send all pending updates + // TODO: rename void send_pending_upds(); private: From b4c566f5ca35802827bf75537c0b6e28c4abaff3 Mon Sep 17 00:00:00 2001 From: Jayant Krishnamurthy Date: Thu, 28 Apr 2022 10:41:22 -0700 Subject: [PATCH 02/11] fix build --- pc/user.hpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pc/user.hpp b/pc/user.hpp index a146faa45..d9b882b66 100644 --- a/pc/user.hpp +++ b/pc/user.hpp @@ -48,7 +48,7 @@ namespace pc // send all pending updates // TODO: rename - void send_pending_upds(); + void send_pending_upds(uint32_t n); private: From 260a54f7e6922241c78be662233d36575356cfa0 Mon Sep 17 00:00:00 2001 From: Jayant Krishnamurthy Date: Thu, 28 Apr 2022 10:42:04 -0700 Subject: [PATCH 03/11] fix build --- pc/manager.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pc/manager.cpp b/pc/manager.cpp index 9aa50d9b2..d5428a8ef 100644 --- a/pc/manager.cpp +++ b/pc/manager.cpp @@ -753,7 +753,7 @@ void manager::on_response( rpc::get_slot *res ) if (has_status( PC_PYTH_RPC_CONNECTED ) && !is_pub_) { // Flush any partial batches of updates, as we've reached the end of the list of products. for( user *uptr = olist_.first(); uptr; uptr = uptr->get_next() ) { - uptr->send_pending_upds(); + uptr->send_pending_upds(uptr->num_pending_upds()); } } From 22ecb141a43ff11bc26008177a40d72c744dc436 Mon Sep 17 00:00:00 2001 From: Jayant Krishnamurthy Date: Thu, 28 Apr 2022 10:46:26 -0700 Subject: [PATCH 04/11] fix build --- pc/user.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pc/user.cpp b/pc/user.cpp index e652835aa..d7f6c2c3d 100644 --- a/pc/user.cpp +++ b/pc/user.cpp @@ -350,7 +350,7 @@ void user::send_pending_upds(uint32_t n) add_error( 0, PC_BATCH_SEND_FAILED, "batch send failed - please check the pyth logs" ); } - pending_vec_.erase(0, n_sent); + pending_vec_.erase(pending_vec_.begin(), pending_vec_.begin() + n_sent); } void user::parse_get_all_products( uint32_t itok ) From 99b42f6f594b3f305ba1d8c465aa9f35fd32167e Mon Sep 17 00:00:00 2001 From: Jayant Krishnamurthy Date: Thu, 28 Apr 2022 10:59:54 -0700 Subject: [PATCH 05/11] comments --- pc/manager.cpp | 2 +- pc/user.hpp | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/pc/manager.cpp b/pc/manager.cpp index d5428a8ef..eb269aca7 100644 --- a/pc/manager.cpp +++ b/pc/manager.cpp @@ -506,7 +506,7 @@ void manager::poll( bool do_wait ) // request product quotes from pythd's clients while connected poll_schedule(); - // send any pending complete price update batches to solana + // Flush any pending complete batches of price updates by submitting solana TXs. for( user *uptr = olist_.first(); uptr; uptr = uptr->get_next() ) { if (uptr->num_pending_upds() >= get_max_batch_size()) { uptr->send_pending_upds(get_max_batch_size()); diff --git a/pc/user.hpp b/pc/user.hpp index d9b882b66..0d8d5c28e 100644 --- a/pc/user.hpp +++ b/pc/user.hpp @@ -43,11 +43,11 @@ namespace pc // symbol price schedule callback void on_response( price_sched *, uint64_t ) override; - // number of pending price updates that are queued to be sent. + // Get the number of pending price updates that are queued to be sent. uint32_t num_pending_upds(); - // send all pending updates - // TODO: rename + // send up to n pending price updates. If n > the max batch size, + // this will split the price updates into multiple transactions. void send_pending_upds(uint32_t n); private: From 889a7ace437088da5f9534bed2bda2cc5667d20c Mon Sep 17 00:00:00 2001 From: Jayant Krishnamurthy Date: Fri, 29 Apr 2022 08:23:45 -0700 Subject: [PATCH 06/11] fix test? --- pc/manager.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pc/manager.cpp b/pc/manager.cpp index eb269aca7..819559be2 100644 --- a/pc/manager.cpp +++ b/pc/manager.cpp @@ -750,7 +750,7 @@ void manager::on_response( rpc::get_slot *res ) clnt_.send( breq_ ); } - if (has_status( PC_PYTH_RPC_CONNECTED ) && !is_pub_) { + if (has_status( PC_PYTH_RPC_CONNECTED )) { // Flush any partial batches of updates, as we've reached the end of the list of products. for( user *uptr = olist_.first(); uptr; uptr = uptr->get_next() ) { uptr->send_pending_upds(uptr->num_pending_upds()); From 727bf17691c52e46479205a0c4bb778e7d6640f6 Mon Sep 17 00:00:00 2001 From: Jayant Krishnamurthy Date: Fri, 29 Apr 2022 09:34:11 -0700 Subject: [PATCH 07/11] better approach --- pc/manager.cpp | 6 ++---- pc/user.cpp | 26 ++++++++++++++------------ pc/user.hpp | 10 ++++------ 3 files changed, 20 insertions(+), 22 deletions(-) diff --git a/pc/manager.cpp b/pc/manager.cpp index 819559be2..34232422e 100644 --- a/pc/manager.cpp +++ b/pc/manager.cpp @@ -507,10 +507,8 @@ void manager::poll( bool do_wait ) poll_schedule(); // Flush any pending complete batches of price updates by submitting solana TXs. - for( user *uptr = olist_.first(); uptr; uptr = uptr->get_next() ) { - if (uptr->num_pending_upds() >= get_max_batch_size()) { - uptr->send_pending_upds(get_max_batch_size()); - } + for ( user *uptr = olist_.first(); uptr; uptr = uptr->get_next() ) { + uptr->send_pending_upds(); } } else { reconnect_rpc(); diff --git a/pc/user.cpp b/pc/user.cpp index d7f6c2c3d..f5b72ead1 100644 --- a/pc/user.cpp +++ b/pc/user.cpp @@ -13,6 +13,8 @@ #define PC_JSON_MISSING_PERMS -32001 #define PC_JSON_NOT_READY -32002 #define PC_BATCH_SEND_FAILED -32010 +// Flush partial batches if not completed within 400 ms. +#define PC_FLUSH_INTERVAL (400L*PC_NSECS_IN_MSEC) using namespace pc; @@ -27,7 +29,8 @@ void user::user_http::parse_content( const char *txt, size_t len ) user::user() : rptr_( nullptr ), sptr_( nullptr ), - psub_( this ) + psub_( this ), + last_update_ts_(0) { // setup the plumbing hsvr_.ptr_ = this; @@ -330,20 +333,18 @@ void user::parse_get_product( uint32_t tok, uint32_t itok ) add_tail( itok ); } -uint32_t user::num_pending_upds() +void user::send_pending_upds() { - return pending_vec_.size(); -} - -void user::send_pending_upds(uint32_t n) -{ - if ( pending_vec_.empty() ) { - return; + uint32_t n_sent = 0; + int64_t curr_ts = get_now(); + if (curr_ts_ - price_upd_ts_ > PC_FLUSH_INTERVAL) { + n_sent = pending_vec_.size(); + } else if (pending_vec_.size() > max_batch_size_) { + n_sent = max_batch_size_; } - uint32_t n_sent = n; - if (pending_vec_.size() < n) { - n_sent = pending_vec_.size(); + if (n_sent == 0) { + return; } if ( !price::send( pending_vec_.data(), n_sent) ) { @@ -351,6 +352,7 @@ void user::send_pending_upds(uint32_t n) } pending_vec_.erase(pending_vec_.begin(), pending_vec_.begin() + n_sent); + price_upd_ts_ = curr_ts_; } void user::parse_get_all_products( uint32_t itok ) diff --git a/pc/user.hpp b/pc/user.hpp index 0d8d5c28e..34adff604 100644 --- a/pc/user.hpp +++ b/pc/user.hpp @@ -43,12 +43,9 @@ namespace pc // symbol price schedule callback void on_response( price_sched *, uint64_t ) override; - // Get the number of pending price updates that are queued to be sent. - uint32_t num_pending_upds(); - - // send up to n pending price updates. If n > the max batch size, - // this will split the price updates into multiple transactions. - void send_pending_upds(uint32_t n); + // send a batch of pending price updates. This function eagerly sends any complete batches. + // It also sends partial batches that have not been completed within a short interval of time. + void send_pending_upds(); private: @@ -89,6 +86,7 @@ namespace pc def_vec_t dvec_; // deferred subscriptions request_sub_set psub_; // price subscriptions pending_vec_t pending_vec_; // prices with pending updates + int64_t last_upd_ts_; // timestamp of last price update transaction }; } From fa56b716c26ddbc2f8466820fe3ab5e6f50b3b0a Mon Sep 17 00:00:00 2001 From: Jayant Krishnamurthy Date: Fri, 29 Apr 2022 09:39:26 -0700 Subject: [PATCH 08/11] fix --- pc/manager.cpp | 7 ------- 1 file changed, 7 deletions(-) diff --git a/pc/manager.cpp b/pc/manager.cpp index 34232422e..52ec00a60 100644 --- a/pc/manager.cpp +++ b/pc/manager.cpp @@ -748,13 +748,6 @@ void manager::on_response( rpc::get_slot *res ) clnt_.send( breq_ ); } - if (has_status( PC_PYTH_RPC_CONNECTED )) { - // Flush any partial batches of updates, as we've reached the end of the list of products. - for( user *uptr = olist_.first(); uptr; uptr = uptr->get_next() ) { - uptr->send_pending_upds(uptr->num_pending_upds()); - } - } - // reset submit if ( !is_pub_ ) { kidx_ = 0; From 0ed8c46c2f670fd52a6da69fb99bd01cb5916a6a Mon Sep 17 00:00:00 2001 From: Jayant Krishnamurthy Date: Fri, 29 Apr 2022 09:41:48 -0700 Subject: [PATCH 09/11] fix build --- pc/user.cpp | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pc/user.cpp b/pc/user.cpp index f5b72ead1..323d8ca50 100644 --- a/pc/user.cpp +++ b/pc/user.cpp @@ -29,14 +29,14 @@ void user::user_http::parse_content( const char *txt, size_t len ) user::user() : rptr_( nullptr ), sptr_( nullptr ), - psub_( this ), - last_update_ts_(0) + psub_( this ) { // setup the plumbing hsvr_.ptr_ = this; hsvr_.set_net_connect( this ); hsvr_.set_ws_parser( this ); set_net_parser( &hsvr_ ); + last_upd_ts_ = get_now(); } void user::set_rpc_client( rpc_client *rptr ) @@ -337,7 +337,7 @@ void user::send_pending_upds() { uint32_t n_sent = 0; int64_t curr_ts = get_now(); - if (curr_ts_ - price_upd_ts_ > PC_FLUSH_INTERVAL) { + if (curr_ts - last_upd_ts_ > PC_FLUSH_INTERVAL) { n_sent = pending_vec_.size(); } else if (pending_vec_.size() > max_batch_size_) { n_sent = max_batch_size_; @@ -352,7 +352,7 @@ void user::send_pending_upds() } pending_vec_.erase(pending_vec_.begin(), pending_vec_.begin() + n_sent); - price_upd_ts_ = curr_ts_; + last_upd_ts_ = curr_ts; } void user::parse_get_all_products( uint32_t itok ) From e682913cd61699ee10a93ad92e504f6a9c82e97f Mon Sep 17 00:00:00 2001 From: Jayant Krishnamurthy Date: Fri, 29 Apr 2022 09:43:27 -0700 Subject: [PATCH 10/11] fix --- pc/user.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pc/user.cpp b/pc/user.cpp index 323d8ca50..a3135ad9e 100644 --- a/pc/user.cpp +++ b/pc/user.cpp @@ -339,8 +339,8 @@ void user::send_pending_upds() int64_t curr_ts = get_now(); if (curr_ts - last_upd_ts_ > PC_FLUSH_INTERVAL) { n_sent = pending_vec_.size(); - } else if (pending_vec_.size() > max_batch_size_) { - n_sent = max_batch_size_; + } else if (pending_vec_.size() >= sptr_->get_max_batch_size()) { + n_sent = sptr_->get_max_batch_size(); } if (n_sent == 0) { From acc2314ab2e5a9150528d02aeeaf0547869793ff Mon Sep 17 00:00:00 2001 From: Jayant Krishnamurthy Date: Tue, 3 May 2022 19:40:18 -0700 Subject: [PATCH 11/11] pr comments --- pc/user.cpp | 12 ++++++------ pc/user.hpp | 2 ++ 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/pc/user.cpp b/pc/user.cpp index a3135ad9e..5da92f1b8 100644 --- a/pc/user.cpp +++ b/pc/user.cpp @@ -335,23 +335,23 @@ void user::parse_get_product( uint32_t tok, uint32_t itok ) void user::send_pending_upds() { - uint32_t n_sent = 0; + uint32_t n_to_send = 0; int64_t curr_ts = get_now(); if (curr_ts - last_upd_ts_ > PC_FLUSH_INTERVAL) { - n_sent = pending_vec_.size(); + n_to_send = pending_vec_.size(); } else if (pending_vec_.size() >= sptr_->get_max_batch_size()) { - n_sent = sptr_->get_max_batch_size(); + n_to_send = sptr_->get_max_batch_size(); } - if (n_sent == 0) { + if (n_to_send == 0) { return; } - if ( !price::send( pending_vec_.data(), n_sent) ) { + if ( !price::send( pending_vec_.data(), n_to_send) ) { add_error( 0, PC_BATCH_SEND_FAILED, "batch send failed - please check the pyth logs" ); } - pending_vec_.erase(pending_vec_.begin(), pending_vec_.begin() + n_sent); + pending_vec_.erase(pending_vec_.begin(), pending_vec_.begin() + n_to_send); last_upd_ts_ = curr_ts; } diff --git a/pc/user.hpp b/pc/user.hpp index 34adff604..09b1c98d8 100644 --- a/pc/user.hpp +++ b/pc/user.hpp @@ -45,6 +45,8 @@ namespace pc // send a batch of pending price updates. This function eagerly sends any complete batches. // It also sends partial batches that have not been completed within a short interval of time. + // At most one complete batch will be sent. Additional price updates remain queued until the next + // time this function is invoked. void send_pending_upds(); private: