From d090f7291f094a7fce3b49d479729dbf891c2772 Mon Sep 17 00:00:00 2001 From: Tom Pointon Date: Tue, 21 Jun 2022 09:39:45 +0000 Subject: [PATCH 01/13] Add secondary network manager --- pc/manager.cpp | 46 +++++++++++++++++++++++++++++++++++++++++++++- pc/manager.hpp | 8 ++++++++ 2 files changed, 53 insertions(+), 1 deletion(-) diff --git a/pc/manager.cpp b/pc/manager.cpp index 4749e7027..40917e5ad 100644 --- a/pc/manager.cpp +++ b/pc/manager.cpp @@ -76,7 +76,8 @@ manager::manager() is_pub_( false ), cmt_( commitment::e_confirmed ), max_batch_( PC_MAX_BATCH ), - sreq_{ { commitment::e_processed } } + sreq_{ { commitment::e_processed } }, + secondary_{ nullptr } { tconn_.set_sub( this ); breq_->set_sub( this ); @@ -98,6 +99,9 @@ manager::~manager() delete ptr; } svec_.clear(); + if ( has_secondary() ) { + delete secondary_; + } } bool manager::tx_parser::parse( const char *, size_t len, size_t& res ) @@ -287,6 +291,11 @@ void manager::teardown() wconn_ = nullptr; clnt_.set_ws_conn( nullptr ); } + + // Shutdown secondary messenger + if ( has_secondary() ) { + get_secondary()->teardown(); + } } bool manager::init() @@ -377,9 +386,39 @@ bool manager::init() .add( "publish_interval(ms)", get_publish_interval() ) .end(); + // Initialize secondary network manager + if ( has_secondary() ) { + PC_LOG_INF("initializing secondary manager").end(); + secondary_->init(); + PC_LOG_INF("initialized secondary manager").end(); + } + return true; } +void manager::add_secondary( const std::string& rpc_host, const std::string& key_dir ) +{ + + manager *mgr = new manager; + mgr->set_dir( key_dir ); + mgr->set_rpc_host( rpc_host ); + mgr->set_tx_host( thost_ ); + mgr->set_do_tx( do_tx_ ); + mgr->set_do_ws( do_ws_ ); + mgr->set_commitment( cmt_ ); + + secondary_ = mgr; + +} + +bool manager::has_secondary() const { + return secondary_ != nullptr; +} + +manager *manager::get_secondary() { + return secondary_; +} + bool manager::get_is_tx_send() const { return tconn_.get_is_send(); @@ -542,6 +581,11 @@ void manager::poll( bool do_wait ) } else { reconnect_rpc(); } + + // Call the secondary manager's poll loop if necessary + if ( has_secondary() ) { + secondary_->poll(); + } } void manager::poll_schedule() diff --git a/pc/manager.hpp b/pc/manager.hpp index 871dbf0c9..9937f0b3b 100644 --- a/pc/manager.hpp +++ b/pc/manager.hpp @@ -164,6 +164,9 @@ namespace pc // accept new pyth client apps void accept( int fd ) override; + // add secondary network manager + void add_secondary( const std::string& rpc_host, const std::string& key_dir ); + // shut-down server void teardown(); @@ -189,6 +192,9 @@ namespace pc get_mapping *get_last_mapping() const; bool get_is_rpc_send() const; + bool has_secondary() const; + manager *get_secondary(); + private: struct trait_account { @@ -279,6 +285,8 @@ namespace pc // Timestamp of the last batch int64_t last_upd_ts_= 0; + + manager *secondary_; // manager for secondary network }; inline bool manager::get_is_tx_connect() const From b5a63542d0de1c10f7f23b01edbf46e003eae0b5 Mon Sep 17 00:00:00 2001 From: Tom Pointon Date: Mon, 20 Jun 2022 08:50:12 +0000 Subject: [PATCH 02/13] Add secondary field to log lines --- pc/manager.cpp | 28 +++++++++++++++++++++------- pc/manager.hpp | 1 + pc/request.cpp | 16 +++++++++++++++- 3 files changed, 37 insertions(+), 8 deletions(-) diff --git a/pc/manager.cpp b/pc/manager.cpp index 40917e5ad..dbb5a088d 100644 --- a/pc/manager.cpp +++ b/pc/manager.cpp @@ -120,7 +120,7 @@ void manager::del_map_sub() { if ( --num_sub_ <= 0 && !has_status( PC_PYTH_HAS_MAPPING ) ) { set_status( PC_PYTH_HAS_MAPPING ); - PC_LOG_INF( "completed_mapping_init" ).end(); + PC_LOG_INF( "completed_mapping_init" ).add( "secondary", is_secondary() ).end(); // notify user that initialization is complete if ( sub_ ) { sub_->on_init( this ); @@ -270,7 +270,7 @@ uint64_t manager::get_slot() const void manager::teardown() { - PC_LOG_INF( "pythd_teardown" ).end(); + PC_LOG_INF( "pythd_teardown" ).add( "secondary", is_secondary() ).end(); // shutdown listener lsvr_.close(); @@ -308,15 +308,15 @@ bool manager::init() // log import key names key_pair *kp = get_publish_key_pair(); if ( kp ) { - PC_LOG_INF( "publish_key" ).add( "key_name", *kp ).end(); + PC_LOG_INF( "publish_key" ).add( "key_name", *kp ).add( "secondary", is_secondary() ).end(); } pub_key *mpub = get_mapping_pub_key(); if ( mpub ) { - PC_LOG_INF( "mapping_key" ).add( "key_name", *mpub ).end(); + PC_LOG_INF( "mapping_key" ).add( "key_name", *mpub ).add( "secondary", is_secondary() ).end(); } pub_key *gpub = get_program_pub_key(); if ( gpub ) { - PC_LOG_INF( "program_key" ).add( "key_name", *gpub ).end(); + PC_LOG_INF( "program_key" ).add( "key_name", *gpub ).add( "secondary", is_secondary() ).end(); } // initialize capture @@ -374,10 +374,12 @@ bool manager::init() return set_err_msg( lsvr_.get_err_msg() ); } PC_LOG_INF("listening").add("port",lsvr_.get_port()) + .add( "secondary", is_secondary() ) .add( "content_dir", get_content_dir() ) .end(); } PC_LOG_INF( "initialized" ) + .add( "secondary", is_secondary() ) .add( "version", PC_VERSION ) .add( "rpc_host", get_rpc_host() ) .add( "tx_host", get_tx_host() ) @@ -415,6 +417,10 @@ bool manager::has_secondary() const { return secondary_ != nullptr; } +bool manager::is_secondary() const { + return !has_secondary(); +} + manager *manager::get_secondary() { return secondary_; } @@ -622,7 +628,7 @@ void manager::reconnect_rpc() // check for successful (re)connect if ( !hconn_.get_is_err() && ( !wconn_ || !wconn_->get_is_err() ) ) { - PC_LOG_INF( "rpc_connected" ).end(); + PC_LOG_INF( "rpc_connected" ).add( "secondary", is_secondary() ).end(); set_status( PC_PYTH_RPC_CONNECTED ); // reset state @@ -729,6 +735,7 @@ void manager::log_disconnect() { if ( hconn_.get_is_err() ) { PC_LOG_ERR( "rpc_http_reset") + .add( "secondary", is_secondary() ) .add( "error", hconn_.get_err_msg() ) .add( "host", rhost_ ) .add( "port", hconn_.get_port() ) @@ -737,6 +744,7 @@ void manager::log_disconnect() } if ( wconn_ && wconn_->get_is_err() ) { PC_LOG_ERR( "rpc_websocket_reset" ) + .add( "secondary", is_secondary() ) .add( "error", wconn_->get_err_msg() ) .add( "host", rhost_ ) .add( "port", wconn_->get_port() ) @@ -814,6 +822,7 @@ void manager::on_response( rpc::get_slot *res ) PC_LOG_DBG( "received get_slot" ) .add( "slot", slot_ ) .add( "round_trip_time(ms)", 1e-6*ack_ts ) + .add( "secondary", is_secondary() ) .end(); // submit block hash every N slots @@ -856,6 +865,7 @@ void manager::on_response( rpc::get_recent_block_hash *m ) // set initialized status for block hash set_status( PC_PYTH_HAS_BLOCK_HASH ); PC_LOG_INF( "received_recent_block_hash" ) + .add( "secondary", is_secondary() ) .add( "curr_slot", slot_ ) .add( "hash_slot", m->get_slot() ) .add( "round_trip_time(ms)", 1e-6*ack_ts ) @@ -874,6 +884,7 @@ void manager::on_response( rpc::account_update *m ) if ( m->get_is_http() ) { int64_t ack_ts = m->get_recv_time() - m->get_sent_time(); PC_LOG_DBG( "received account_update" ) + .add( "secondary", is_secondary() ) .add( "account", *m->get_account() ) .add( "slot", slot_ ) .add( "round_trip_time(ms)", 1e-6*ack_ts ) @@ -881,6 +892,7 @@ void manager::on_response( rpc::account_update *m ) } else { PC_LOG_DBG( "received account_update" ) + .add( "secondary", is_secondary() ) .add( "account", *m->get_account() ) .add( "slot", slot_ ) .end(); @@ -927,12 +939,14 @@ bool manager::submit_poll( request *req ) } if ( req->get_is_err() ) { PC_LOG_ERR( "request error") + .add( "secondary", is_secondary() ) .add( "error", req->get_err_msg() ) .end(); return false; } if ( get_is_err() ) { PC_LOG_ERR( "request error") + .add( "secondary", is_secondary() ) .add( "error", get_err_msg() ) .end(); return false; @@ -952,7 +966,7 @@ void manager::on_connect() void manager::on_disconnect() { // callback user with connection status - PC_LOG_INF( "pyth_tx_reset" ).end(); + PC_LOG_INF( "pyth_tx_reset" ).add( "secondary", is_secondary() ).end(); if ( sub_ ) { sub_->on_tx_disconnect( this ); } diff --git a/pc/manager.hpp b/pc/manager.hpp index 9937f0b3b..7e72780e3 100644 --- a/pc/manager.hpp +++ b/pc/manager.hpp @@ -193,6 +193,7 @@ namespace pc bool get_is_rpc_send() const; bool has_secondary() const; + bool is_secondary() const; manager *get_secondary(); private: diff --git a/pc/request.cpp b/pc/request.cpp index 440a6bf45..45de6309e 100644 --- a/pc/request.cpp +++ b/pc/request.cpp @@ -235,6 +235,7 @@ void get_mapping::update( T *res ) // check and get any new product accounts in mapping table num_sym_ = tab->num_; PC_LOG_INF( "add_mapping" ) + .add( "secondary", cptr->is_secondary() ) .add( "account", mkey_ ) .add( "num_products", num_sym_ ) .end(); @@ -381,6 +382,7 @@ void product::update( T *res ) net_buf *jhd, *jtl; wtr.detach( jhd, jtl ); PC_LOG_INF( st_ != e_done ? "add_product" : "upd_product" ) + .add( "secondary", cptr->is_secondary() ) .add( "account", acc_ ) .add( "attr", str( jhd->buf_, jhd->size_) ) .end(); @@ -691,6 +693,7 @@ bool price::update( tvec_.emplace_back( std::string( 100, '\0' ), preq_->get_sent_time() ); preq_->get_signature()->enc_base58( tvec_.back().first ); PC_LOG_DBG( "sent price update transaction" ) + .add( "secondary", mgr->is_secondary() ) .add( "price_account", *get_account() ) .add( "product_account", *prod_->get_account() ) .add( "symbol", get_symbol() ) @@ -700,6 +703,7 @@ bool price::update( .end(); if ( PC_UNLIKELY( tvec_.size() >= 100 ) ) { PC_LOG_WRN( "too many unacked price update transactions" ) + .add( "secondary", mgr->is_secondary() ) .add( "price_account", *get_account() ) .add( "product_account", *prod_->get_account() ) .add( "symbol", get_symbol() ) @@ -732,8 +736,10 @@ bool price::send( price *prices[], const unsigned n ) // Build an upd_price rpc request for every price for ( unsigned i = 0, j = 0; i < n; ++i ) { price *const p = prices[ i ]; + manager *const mgr = p->get_manager(); if ( PC_UNLIKELY( ! p->init_ && ! p->init_publish() ) ) { PC_LOG_ERR( "failed to initialize publisher" ) + .add( "secondary", mgr->is_secondary() ) .add( "price_account", *p->get_account() ) .add( "product_account", *p->prod_->get_account() ) .add( "symbol", p->get_symbol() ) @@ -742,6 +748,7 @@ bool price::send( price *prices[], const unsigned n ) } if ( PC_UNLIKELY( ! p->has_publisher() ) ) { PC_LOG_ERR( "missing publish permission" ) + .add( "secondary", mgr->is_secondary() ) .add( "price_account", *p->get_account() ) .add( "product_account", *p->prod_->get_account() ) .add( "symbol", p->get_symbol() ) @@ -750,18 +757,20 @@ bool price::send( price *prices[], const unsigned n ) } if ( PC_UNLIKELY( ! p->get_is_ready_publish() ) ) { PC_LOG_ERR( "not ready to publish - check rpc / pyth_tx connection" ) + .add( "secondary", mgr->is_secondary() ) .add( "price_account", *p->get_account() ) .add( "product_account", *p->prod_->get_account() ) .add( "symbol", p->get_symbol() ) .add( "price_type", price_type_to_str( p->get_price_type() ) ).end(); continue; } - manager *const mgr = p->get_manager(); + if ( ! mgr1 ) { mgr1 = mgr; } else if ( mgr != mgr1 ) { PC_LOG_ERR( "unexpected manager" ) + .add( "secondary", mgr->is_secondary() ) .add( "price_account", *p->get_account() ) .add( "product_account", *p->prod_->get_account() ) .add( "symbol", p->get_symbol() ) @@ -786,6 +795,7 @@ bool price::send( price *prices[], const unsigned n ) } else { PC_LOG_ERR( "failed to build msg" ) + .add( "secondary", mgr->is_secondary() ) .add( "price_account", *p->get_account() ) .add( "product_account", *p->prod_->get_account() ) .add( "symbol", p->get_symbol() ) @@ -801,6 +811,7 @@ bool price::send( price *prices[], const unsigned n ) ); p1->preq_->get_signature()->enc_base58( p1->tvec_.back().first ); PC_LOG_DBG( "sent price update" ) + .add( "secondary", mgr->is_secondary() ) .add( "price_account", *p1->get_account() ) .add( "product_account", *p1->prod_->get_account() ) .add( "symbol", p1->get_symbol() ) @@ -810,6 +821,7 @@ bool price::send( price *prices[], const unsigned n ) .end(); if ( PC_UNLIKELY( p1->tvec_.size() >= 100 ) ) { PC_LOG_WRN( "too many unacked price update transactions" ) + .add( "secondary", mgr->is_secondary() ) .add( "price_account", *p1->get_account() ) .add( "product_account", *p1->prod_->get_account() ) .add( "symbol", p1->get_symbol() ) @@ -860,6 +872,7 @@ void price::on_response( rpc::upd_price *res ) const int64_t ack_dur = res->get_recv_time() - it->second; tvec_.erase( it ); PC_LOG_DBG( "received price update transaction ack" ) + .add( "secondary", get_manager()->is_secondary() ) .add( "price_account", *get_account() ) .add( "product_account", *prod_->get_account() ) .add( "symbol", get_symbol() ) @@ -885,6 +898,7 @@ void price::on_response( rpc::account_update *res ) void price::log_update( const char *title ) { PC_LOG_INF( title ) + .add( "secondary", get_manager()->is_secondary() ) .add( "account", *get_account() ) .add( "product", *prod_->get_account() ) .add( "symbol", get_symbol() ) From 472f0e684a83d8a9fb32986a38efb0483aff6464 Mon Sep 17 00:00:00 2001 From: Tom Pointon Date: Tue, 21 Jun 2022 09:40:49 +0000 Subject: [PATCH 03/13] Add support for secondary network to pythd --- pcapps/pythd.cpp | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/pcapps/pythd.cpp b/pcapps/pythd.cpp index f5cf1f092..6af2b77e4 100644 --- a/pcapps/pythd.cpp +++ b/pcapps/pythd.cpp @@ -97,15 +97,17 @@ int main(int argc, char **argv) commitment cmt = commitment::e_confirmed; std::string cnt_dir, cap_file, log_file; std::string rpc_host = get_rpc_host(); + std::string secondary_rpc_host = ""; std::string key_dir = get_key_store(); std::string tx_host = get_tx_host(); int pyth_port = get_port(); int opt = 0; unsigned max_batch_size = 0; bool do_wait = true, do_tx = true, do_ws = true, do_debug = false; - while( (opt = ::getopt(argc,argv, "r:t:p:k:w:c:l:m:b:dnxhz" )) != -1 ) { + while( (opt = ::getopt(argc,argv, "r:s:t:p:k:w:c:l:m:b:dnxhz" )) != -1 ) { switch(opt) { case 'r': rpc_host = optarg; break; + case 's': secondary_rpc_host = optarg; break; case 't': tx_host = optarg; break; case 'p': pyth_port = ::atoi(optarg); break; case 'k': key_dir = optarg; break; @@ -147,6 +149,11 @@ int main(int argc, char **argv) mgr.set_do_ws( do_ws ); mgr.set_do_capture( !cap_file.empty() ); mgr.set_commitment( cmt ); + + bool do_secondary = !secondary_rpc_host.empty(); + if ( do_secondary ) { + mgr.add_secondary( secondary_rpc_host, key_dir ); + } if ( !mgr.init() ) { std::cerr << "pythd: " << mgr.get_err_msg() << std::endl; return 1; From f353b2dd9a12cc6aa0fb97dcceb55bbcb9bf3a22 Mon Sep 17 00:00:00 2001 From: Tom Pointon Date: Tue, 21 Jun 2022 09:41:10 +0000 Subject: [PATCH 04/13] Publish prices to both primary and secondary networks if possible. --- pc/user.cpp | 24 ++++++++++++++++++------ 1 file changed, 18 insertions(+), 6 deletions(-) diff --git a/pc/user.cpp b/pc/user.cpp index f0afbf0f6..a1224c4fd 100644 --- a/pc/user.cpp +++ b/pc/user.cpp @@ -198,7 +198,14 @@ void user::parse_upd_price( uint32_t tok, uint32_t itok ) pub_key pkey; pkey.init_from_text( jp_.get_str( ntok ) ); price *sptr = sptr_->get_price( pkey ); - if ( PC_UNLIKELY( !sptr ) ) { add_unknown_symbol(itok); return; } + price *sptr_secondary = nullptr; + if ( sptr_->has_secondary() ) { + sptr_secondary = sptr_->get_secondary()->get_price( pkey ); + } + + // Bail if we cannot find the price in either manager. + if ( PC_UNLIKELY( !sptr && !sptr_secondary ) ) { add_unknown_symbol(itok); return; } + if ( 0 == (ntok = jp_.find_val( ptok, "price" ) ) ) break; int64_t price = jp_.get_int( ntok ); if ( 0 == (ntok = jp_.find_val( ptok, "conf" ) ) ) break; @@ -206,11 +213,16 @@ void user::parse_upd_price( uint32_t tok, uint32_t itok ) if ( 0 == (ntok = jp_.find_val( ptok, "status" ) ) ) break; symbol_status stype = str_to_symbol_status( jp_.get_str( ntok ) ); - // Add the updated price to the pending updates - sptr->update_no_send( price, conf, stype, false ); - - // pass the updated price to manager - sptr_->add_dirty_price(sptr); + // Add the price to both the managers pending updates, so that it will + // be published to both networks if possible. + if ( sptr ) { + sptr->update_no_send( price, conf, stype, false ); + sptr_->add_dirty_price( sptr ); + } + if ( sptr_secondary ) { + sptr_secondary->update_no_send( price, conf, stype, false ); + sptr_->get_secondary()->add_dirty_price( sptr_secondary ); + } // Send the result back add_header(); From 96e26ac4d4a1bd773cf8e8d237b0bc24b322cae7 Mon Sep 17 00:00:00 2001 From: Tom Pointon Date: Tue, 21 Jun 2022 09:41:45 +0000 Subject: [PATCH 05/13] Populate get_product_list from the secondary network if the primary is unavailable. --- pc/user.cpp | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/pc/user.cpp b/pc/user.cpp index a1224c4fd..8afafffe5 100644 --- a/pc/user.cpp +++ b/pc/user.cpp @@ -294,8 +294,16 @@ void user::parse_get_product_list( uint32_t itok ) { add_header(); jw_.add_key( "result", json_wtr::e_arr ); - for( unsigned i=0; i != sptr_->get_num_product(); ++i ) { - product *prod = sptr_->get_product( i ); + + // If the primary manager has no products, pull them from the secondary + // manager instead. + pc::manager *mgr = sptr_; + if ( sptr_->get_num_product() == 0 && sptr_->has_secondary() ) { + mgr = sptr_->get_secondary(); + } + + for( unsigned i=0; i != mgr->get_num_product(); ++i ) { + product *prod = mgr->get_product( i ); jw_.add_val( json_wtr::e_obj ); jw_.add_key( "account", *prod->get_account() ); jw_.add_key( "attr_dict", json_wtr::e_obj ); From 8a4a2e4bfbb2bafc7e8a700b6bc44d1de3a297fe Mon Sep 17 00:00:00 2001 From: Tom Pointon Date: Tue, 21 Jun 2022 09:41:57 +0000 Subject: [PATCH 06/13] Populate get_product RPC response from the secondary network if the primary is unavailable. --- pc/user.cpp | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/pc/user.cpp b/pc/user.cpp index 8afafffe5..e8760c904 100644 --- a/pc/user.cpp +++ b/pc/user.cpp @@ -339,6 +339,13 @@ void user::parse_get_product( uint32_t tok, uint32_t itok ) pub_key pkey; pkey.init_from_text( jp_.get_str( ntok ) ); product *prod = sptr_->get_product( pkey ); + + // If the product is not present in the primary manager's mapping, + // attempt to use the one in the secondary manager's mapping instead. + if ( PC_UNLIKELY( !prod && sptr_->has_secondary() ) ) { + prod = sptr_->get_secondary()->get_product( pkey ); + } + if ( PC_UNLIKELY( !prod ) ) return add_unknown_symbol( itok ); From 8efd0a9b4e2217c1a023502f258abecb7b43edc0 Mon Sep 17 00:00:00 2001 From: Tom Pointon Date: Tue, 21 Jun 2022 09:42:12 +0000 Subject: [PATCH 07/13] Populate get_all_products RPC response from secondary manager if necessary. --- pc/user.cpp | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/pc/user.cpp b/pc/user.cpp index e8760c904..bc8d24703 100644 --- a/pc/user.cpp +++ b/pc/user.cpp @@ -361,8 +361,16 @@ void user::parse_get_all_products( uint32_t itok ) { add_header(); jw_.add_key( "result", json_wtr::e_arr ); - for( unsigned i=0; i != sptr_->get_num_product(); ++i ) { - product *prod = sptr_->get_product( i ); + + // If the primary manager has no products, pull them from the secondary + // manager instead. + pc::manager *mgr = sptr_; + if ( sptr_->get_num_product() == 0 && sptr_->has_secondary() ) { + mgr = sptr_->get_secondary(); + } + + for( unsigned i=0; i != mgr->get_num_product(); ++i ) { + product *prod = mgr->get_product( i ); jw_.add_val( json_wtr::e_obj ); prod->dump_json( jw_ ); jw_.pop(); From e653b7772c51d2ec814471535955acfc21fca3d8 Mon Sep 17 00:00:00 2001 From: Tom Pointon Date: Tue, 21 Jun 2022 16:15:30 +0000 Subject: [PATCH 08/13] Add staleness comment --- pc/user.cpp | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/pc/user.cpp b/pc/user.cpp index bc8d24703..4e0622661 100644 --- a/pc/user.cpp +++ b/pc/user.cpp @@ -297,6 +297,11 @@ void user::parse_get_product_list( uint32_t itok ) // If the primary manager has no products, pull them from the secondary // manager instead. + // + // Warning: if the primary network is disconnected but the products still + // exist in the primary manager's mapping (i.e. pythd hasn't restarted), the prices + // returned from this endpoint will therefore be stale and will only be updated + // when the primary network reconnects. pc::manager *mgr = sptr_; if ( sptr_->get_num_product() == 0 && sptr_->has_secondary() ) { mgr = sptr_->get_secondary(); From 07b0ff562d5ae1cb0f264f725a88f1365a2cbe39 Mon Sep 17 00:00:00 2001 From: Tom Pointon Date: Wed, 22 Jun 2022 08:48:31 +0000 Subject: [PATCH 09/13] Track is_secondary using an explicit boolean flag --- pc/manager.cpp | 46 ++++++++++++++++++++++++++-------------------- pc/manager.hpp | 6 ++++-- pc/request.cpp | 26 +++++++++++++------------- 3 files changed, 43 insertions(+), 35 deletions(-) diff --git a/pc/manager.cpp b/pc/manager.cpp index dbb5a088d..4c9b23442 100644 --- a/pc/manager.cpp +++ b/pc/manager.cpp @@ -77,7 +77,8 @@ manager::manager() cmt_( commitment::e_confirmed ), max_batch_( PC_MAX_BATCH ), sreq_{ { commitment::e_processed } }, - secondary_{ nullptr } + secondary_{ nullptr }, + is_secondary_( false ) { tconn_.set_sub( this ); breq_->set_sub( this ); @@ -120,7 +121,7 @@ void manager::del_map_sub() { if ( --num_sub_ <= 0 && !has_status( PC_PYTH_HAS_MAPPING ) ) { set_status( PC_PYTH_HAS_MAPPING ); - PC_LOG_INF( "completed_mapping_init" ).add( "secondary", is_secondary() ).end(); + PC_LOG_INF( "completed_mapping_init" ).add( "secondary", get_is_secondary() ).end(); // notify user that initialization is complete if ( sub_ ) { sub_->on_init( this ); @@ -270,7 +271,7 @@ uint64_t manager::get_slot() const void manager::teardown() { - PC_LOG_INF( "pythd_teardown" ).add( "secondary", is_secondary() ).end(); + PC_LOG_INF( "pythd_teardown" ).add( "secondary", get_is_secondary() ).end(); // shutdown listener lsvr_.close(); @@ -308,15 +309,15 @@ bool manager::init() // log import key names key_pair *kp = get_publish_key_pair(); if ( kp ) { - PC_LOG_INF( "publish_key" ).add( "key_name", *kp ).add( "secondary", is_secondary() ).end(); + PC_LOG_INF( "publish_key" ).add( "key_name", *kp ).add( "secondary", get_is_secondary() ).end(); } pub_key *mpub = get_mapping_pub_key(); if ( mpub ) { - PC_LOG_INF( "mapping_key" ).add( "key_name", *mpub ).add( "secondary", is_secondary() ).end(); + PC_LOG_INF( "mapping_key" ).add( "key_name", *mpub ).add( "secondary", get_is_secondary() ).end(); } pub_key *gpub = get_program_pub_key(); if ( gpub ) { - PC_LOG_INF( "program_key" ).add( "key_name", *gpub ).add( "secondary", is_secondary() ).end(); + PC_LOG_INF( "program_key" ).add( "key_name", *gpub ).add( "secondary", get_is_secondary() ).end(); } // initialize capture @@ -374,12 +375,12 @@ bool manager::init() return set_err_msg( lsvr_.get_err_msg() ); } PC_LOG_INF("listening").add("port",lsvr_.get_port()) - .add( "secondary", is_secondary() ) + .add( "secondary", get_is_secondary() ) .add( "content_dir", get_content_dir() ) .end(); } PC_LOG_INF( "initialized" ) - .add( "secondary", is_secondary() ) + .add( "secondary", get_is_secondary() ) .add( "version", PC_VERSION ) .add( "rpc_host", get_rpc_host() ) .add( "tx_host", get_tx_host() ) @@ -408,6 +409,7 @@ void manager::add_secondary( const std::string& rpc_host, const std::string& key mgr->set_do_tx( do_tx_ ); mgr->set_do_ws( do_ws_ ); mgr->set_commitment( cmt_ ); + mgr->set_is_secondary( true ); secondary_ = mgr; @@ -417,8 +419,12 @@ bool manager::has_secondary() const { return secondary_ != nullptr; } -bool manager::is_secondary() const { - return !has_secondary(); +void manager::set_is_secondary(bool is_secondary) { + is_secondary_ = is_secondary; +} + +bool manager::get_is_secondary() const { + return is_secondary_; } manager *manager::get_secondary() { @@ -628,7 +634,7 @@ void manager::reconnect_rpc() // check for successful (re)connect if ( !hconn_.get_is_err() && ( !wconn_ || !wconn_->get_is_err() ) ) { - PC_LOG_INF( "rpc_connected" ).add( "secondary", is_secondary() ).end(); + PC_LOG_INF( "rpc_connected" ).add( "secondary", get_is_secondary() ).end(); set_status( PC_PYTH_RPC_CONNECTED ); // reset state @@ -735,7 +741,7 @@ void manager::log_disconnect() { if ( hconn_.get_is_err() ) { PC_LOG_ERR( "rpc_http_reset") - .add( "secondary", is_secondary() ) + .add( "secondary", get_is_secondary() ) .add( "error", hconn_.get_err_msg() ) .add( "host", rhost_ ) .add( "port", hconn_.get_port() ) @@ -744,7 +750,7 @@ void manager::log_disconnect() } if ( wconn_ && wconn_->get_is_err() ) { PC_LOG_ERR( "rpc_websocket_reset" ) - .add( "secondary", is_secondary() ) + .add( "secondary", get_is_secondary() ) .add( "error", wconn_->get_err_msg() ) .add( "host", rhost_ ) .add( "port", wconn_->get_port() ) @@ -822,7 +828,7 @@ void manager::on_response( rpc::get_slot *res ) PC_LOG_DBG( "received get_slot" ) .add( "slot", slot_ ) .add( "round_trip_time(ms)", 1e-6*ack_ts ) - .add( "secondary", is_secondary() ) + .add( "secondary", get_is_secondary() ) .end(); // submit block hash every N slots @@ -865,7 +871,7 @@ void manager::on_response( rpc::get_recent_block_hash *m ) // set initialized status for block hash set_status( PC_PYTH_HAS_BLOCK_HASH ); PC_LOG_INF( "received_recent_block_hash" ) - .add( "secondary", is_secondary() ) + .add( "secondary", get_is_secondary() ) .add( "curr_slot", slot_ ) .add( "hash_slot", m->get_slot() ) .add( "round_trip_time(ms)", 1e-6*ack_ts ) @@ -884,7 +890,7 @@ void manager::on_response( rpc::account_update *m ) if ( m->get_is_http() ) { int64_t ack_ts = m->get_recv_time() - m->get_sent_time(); PC_LOG_DBG( "received account_update" ) - .add( "secondary", is_secondary() ) + .add( "secondary", get_is_secondary() ) .add( "account", *m->get_account() ) .add( "slot", slot_ ) .add( "round_trip_time(ms)", 1e-6*ack_ts ) @@ -892,7 +898,7 @@ void manager::on_response( rpc::account_update *m ) } else { PC_LOG_DBG( "received account_update" ) - .add( "secondary", is_secondary() ) + .add( "secondary", get_is_secondary() ) .add( "account", *m->get_account() ) .add( "slot", slot_ ) .end(); @@ -939,14 +945,14 @@ bool manager::submit_poll( request *req ) } if ( req->get_is_err() ) { PC_LOG_ERR( "request error") - .add( "secondary", is_secondary() ) + .add( "secondary", get_is_secondary() ) .add( "error", req->get_err_msg() ) .end(); return false; } if ( get_is_err() ) { PC_LOG_ERR( "request error") - .add( "secondary", is_secondary() ) + .add( "secondary", get_is_secondary() ) .add( "error", get_err_msg() ) .end(); return false; @@ -966,7 +972,7 @@ void manager::on_connect() void manager::on_disconnect() { // callback user with connection status - PC_LOG_INF( "pyth_tx_reset" ).add( "secondary", is_secondary() ).end(); + PC_LOG_INF( "pyth_tx_reset" ).add( "secondary", get_is_secondary() ).end(); if ( sub_ ) { sub_->on_tx_disconnect( this ); } diff --git a/pc/manager.hpp b/pc/manager.hpp index 7e72780e3..689e90616 100644 --- a/pc/manager.hpp +++ b/pc/manager.hpp @@ -193,7 +193,8 @@ namespace pc bool get_is_rpc_send() const; bool has_secondary() const; - bool is_secondary() const; + void set_is_secondary(bool is_secondary); + bool get_is_secondary() const; manager *get_secondary(); private: @@ -287,7 +288,8 @@ namespace pc // Timestamp of the last batch int64_t last_upd_ts_= 0; - manager *secondary_; // manager for secondary network + manager *secondary_; // manager for secondary network + bool is_secondary_; // flag tracking whether we are a secondary manager }; inline bool manager::get_is_tx_connect() const diff --git a/pc/request.cpp b/pc/request.cpp index 45de6309e..f5fef20a9 100644 --- a/pc/request.cpp +++ b/pc/request.cpp @@ -235,7 +235,7 @@ void get_mapping::update( T *res ) // check and get any new product accounts in mapping table num_sym_ = tab->num_; PC_LOG_INF( "add_mapping" ) - .add( "secondary", cptr->is_secondary() ) + .add( "secondary", cptr->get_is_secondary() ) .add( "account", mkey_ ) .add( "num_products", num_sym_ ) .end(); @@ -382,7 +382,7 @@ void product::update( T *res ) net_buf *jhd, *jtl; wtr.detach( jhd, jtl ); PC_LOG_INF( st_ != e_done ? "add_product" : "upd_product" ) - .add( "secondary", cptr->is_secondary() ) + .add( "secondary", cptr->get_is_secondary() ) .add( "account", acc_ ) .add( "attr", str( jhd->buf_, jhd->size_) ) .end(); @@ -693,7 +693,7 @@ bool price::update( tvec_.emplace_back( std::string( 100, '\0' ), preq_->get_sent_time() ); preq_->get_signature()->enc_base58( tvec_.back().first ); PC_LOG_DBG( "sent price update transaction" ) - .add( "secondary", mgr->is_secondary() ) + .add( "secondary", mgr->get_is_secondary() ) .add( "price_account", *get_account() ) .add( "product_account", *prod_->get_account() ) .add( "symbol", get_symbol() ) @@ -703,7 +703,7 @@ bool price::update( .end(); if ( PC_UNLIKELY( tvec_.size() >= 100 ) ) { PC_LOG_WRN( "too many unacked price update transactions" ) - .add( "secondary", mgr->is_secondary() ) + .add( "secondary", mgr->get_is_secondary() ) .add( "price_account", *get_account() ) .add( "product_account", *prod_->get_account() ) .add( "symbol", get_symbol() ) @@ -739,7 +739,7 @@ bool price::send( price *prices[], const unsigned n ) manager *const mgr = p->get_manager(); if ( PC_UNLIKELY( ! p->init_ && ! p->init_publish() ) ) { PC_LOG_ERR( "failed to initialize publisher" ) - .add( "secondary", mgr->is_secondary() ) + .add( "secondary", mgr->get_is_secondary() ) .add( "price_account", *p->get_account() ) .add( "product_account", *p->prod_->get_account() ) .add( "symbol", p->get_symbol() ) @@ -748,7 +748,7 @@ bool price::send( price *prices[], const unsigned n ) } if ( PC_UNLIKELY( ! p->has_publisher() ) ) { PC_LOG_ERR( "missing publish permission" ) - .add( "secondary", mgr->is_secondary() ) + .add( "secondary", mgr->get_is_secondary() ) .add( "price_account", *p->get_account() ) .add( "product_account", *p->prod_->get_account() ) .add( "symbol", p->get_symbol() ) @@ -757,7 +757,7 @@ bool price::send( price *prices[], const unsigned n ) } if ( PC_UNLIKELY( ! p->get_is_ready_publish() ) ) { PC_LOG_ERR( "not ready to publish - check rpc / pyth_tx connection" ) - .add( "secondary", mgr->is_secondary() ) + .add( "secondary", mgr->get_is_secondary() ) .add( "price_account", *p->get_account() ) .add( "product_account", *p->prod_->get_account() ) .add( "symbol", p->get_symbol() ) @@ -770,7 +770,7 @@ bool price::send( price *prices[], const unsigned n ) } else if ( mgr != mgr1 ) { PC_LOG_ERR( "unexpected manager" ) - .add( "secondary", mgr->is_secondary() ) + .add( "secondary", mgr->get_is_secondary() ) .add( "price_account", *p->get_account() ) .add( "product_account", *p->prod_->get_account() ) .add( "symbol", p->get_symbol() ) @@ -795,7 +795,7 @@ bool price::send( price *prices[], const unsigned n ) } else { PC_LOG_ERR( "failed to build msg" ) - .add( "secondary", mgr->is_secondary() ) + .add( "secondary", mgr->get_is_secondary() ) .add( "price_account", *p->get_account() ) .add( "product_account", *p->prod_->get_account() ) .add( "symbol", p->get_symbol() ) @@ -811,7 +811,7 @@ bool price::send( price *prices[], const unsigned n ) ); p1->preq_->get_signature()->enc_base58( p1->tvec_.back().first ); PC_LOG_DBG( "sent price update" ) - .add( "secondary", mgr->is_secondary() ) + .add( "secondary", mgr->get_is_secondary() ) .add( "price_account", *p1->get_account() ) .add( "product_account", *p1->prod_->get_account() ) .add( "symbol", p1->get_symbol() ) @@ -821,7 +821,7 @@ bool price::send( price *prices[], const unsigned n ) .end(); if ( PC_UNLIKELY( p1->tvec_.size() >= 100 ) ) { PC_LOG_WRN( "too many unacked price update transactions" ) - .add( "secondary", mgr->is_secondary() ) + .add( "secondary", mgr->get_is_secondary() ) .add( "price_account", *p1->get_account() ) .add( "product_account", *p1->prod_->get_account() ) .add( "symbol", p1->get_symbol() ) @@ -872,7 +872,7 @@ void price::on_response( rpc::upd_price *res ) const int64_t ack_dur = res->get_recv_time() - it->second; tvec_.erase( it ); PC_LOG_DBG( "received price update transaction ack" ) - .add( "secondary", get_manager()->is_secondary() ) + .add( "secondary", get_manager()->get_is_secondary() ) .add( "price_account", *get_account() ) .add( "product_account", *prod_->get_account() ) .add( "symbol", get_symbol() ) @@ -898,7 +898,7 @@ void price::on_response( rpc::account_update *res ) void price::log_update( const char *title ) { PC_LOG_INF( title ) - .add( "secondary", get_manager()->is_secondary() ) + .add( "secondary", get_manager()->get_is_secondary() ) .add( "account", *get_account() ) .add( "product", *prod_->get_account() ) .add( "symbol", get_symbol() ) From f0c4bce112ab998383338b7c71ec9103ae21dccb Mon Sep 17 00:00:00 2001 From: Tom Pointon Date: Thu, 23 Jun 2022 08:18:09 +0000 Subject: [PATCH 10/13] Start staggering price_sched requests at fixed intervals instead of on a new slot --- pc/manager.cpp | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/pc/manager.cpp b/pc/manager.cpp index 4c9b23442..e6ddee0ff 100644 --- a/pc/manager.cpp +++ b/pc/manager.cpp @@ -602,6 +602,15 @@ void manager::poll( bool do_wait ) void manager::poll_schedule() { + // Enable publishing mode if enough time has elapsed since last time. + int64_t time_since_last_stagger = curr_ts_ - pub_ts_; + if ( !is_pub_ && ( time_since_last_stagger > pub_int_ ) ) { + is_pub_ = true; + kidx_ = 0; + pub_ts_ = curr_ts_; + } + + // Schedule the price_sched requests in a staggered fashion. while ( is_pub_ && kidx_ < kvec_.size() ) { price_sched *kptr = kvec_[kidx_]; int64_t pub_ts = pub_ts_ + static_cast< int64_t >( @@ -639,10 +648,7 @@ void manager::reconnect_rpc() // reset state wait_conn_ = false; - is_pub_ = false; - kidx_ = 0; ctimeout_ = PC_NSECS_IN_SEC; - pub_ts_ = 0L; slot_ = 0L; slot_cnt_ = 0UL; slot_ts_ = 0L; @@ -836,13 +842,6 @@ void manager::on_response( rpc::get_slot *res ) clnt_.send( breq_ ); } - // reset submit - if ( !is_pub_ ) { - kidx_ = 0; - pub_ts_ = ts; - is_pub_ = true; - } - // flush capture if ( do_cap_ ) { cap_.flush(); From 4d302474e68de61afb733906a7fd8c2a63b1ad39 Mon Sep 17 00:00:00 2001 From: Tom Pointon Date: Thu, 23 Jun 2022 08:18:56 +0000 Subject: [PATCH 11/13] Send notify_price_sched RPC messages even if manager isn't connected --- pc/manager.cpp | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pc/manager.cpp b/pc/manager.cpp index e6ddee0ff..af267227d 100644 --- a/pc/manager.cpp +++ b/pc/manager.cpp @@ -578,6 +578,9 @@ void manager::poll( bool do_wait ) } } + // request quotes from the publishers + poll_schedule(); + // try to (re)connect to tx proxy if ( do_tx_ && ( !tconn_.get_is_connect() || tconn_.get_is_err() ) ) { tconn_.reconnect(); @@ -586,9 +589,6 @@ void manager::poll( bool do_wait ) if ( has_status( PC_PYTH_RPC_CONNECTED ) && !hconn_.get_is_err() && ( !wconn_ || !wconn_->get_is_err() ) ) { - // request product quotes from pythd's clients while connected - poll_schedule(); - send_pending_ups(); } else { reconnect_rpc(); From 9f53099a9d0d04b2fc1315adb66636a88789efcd Mon Sep 17 00:00:00 2001 From: Tom Pointon Date: Thu, 23 Jun 2022 08:34:35 +0000 Subject: [PATCH 12/13] Use slot when upd_price message was received as publish slot for price --- pc/request.cpp | 5 ++--- pc/rpc_client.cpp | 4 ++++ pc/rpc_client.hpp | 2 ++ 3 files changed, 8 insertions(+), 3 deletions(-) diff --git a/pc/request.cpp b/pc/request.cpp index f5fef20a9..6f70b2c75 100644 --- a/pc/request.cpp +++ b/pc/request.cpp @@ -722,6 +722,7 @@ void price::update_no_send( , const symbol_status st, const bool is_agg ) { + preq_->set_slot( get_manager()->get_slot() ); preq_->set_price( price, conf, st, is_agg ); } @@ -777,8 +778,6 @@ bool price::send( price *prices[], const unsigned n ) .add( "price_type", price_type_to_str( p->get_price_type() ) ).end(); continue; } - const uint64_t slot = mgr->get_slot(); - p->preq_->set_slot( slot ); p->preq_->set_block_hash( mgr->get_recent_block_hash() ); upds_.emplace_back( p->preq_ ); @@ -817,7 +816,7 @@ bool price::send( price *prices[], const unsigned n ) .add( "symbol", p1->get_symbol() ) .add( "price_type", price_type_to_str( p1->get_price_type() ) ) .add( "sig", p1->tvec_.back().first ) - .add( "pub_slot", slot ) + .add( "pub_slot", p1->preq_->get_slot() ) .end(); if ( PC_UNLIKELY( p1->tvec_.size() >= 100 ) ) { PC_LOG_WRN( "too many unacked price update transactions" ) diff --git a/pc/rpc_client.cpp b/pc/rpc_client.cpp index 59e57c0c5..843b0e421 100644 --- a/pc/rpc_client.cpp +++ b/pc/rpc_client.cpp @@ -859,6 +859,10 @@ void rpc::upd_price::set_slot( const uint64_t pub_slot ) pub_slot_ = pub_slot; } +uint64_t rpc::upd_price::get_slot() { + return pub_slot_; +} + signature *rpc::upd_price::get_signature() { return &sig_; diff --git a/pc/rpc_client.hpp b/pc/rpc_client.hpp index 48472f529..e087f7f85 100644 --- a/pc/rpc_client.hpp +++ b/pc/rpc_client.hpp @@ -414,6 +414,8 @@ namespace pc bool is_aggregate ); void set_slot( uint64_t ); + uint64_t get_slot(); + // results signature *get_signature(); str get_ack_signature() const; From 33277e5de8f965b56b3a947e9d51eb8cfd4d0964 Mon Sep 17 00:00:00 2001 From: Tom Pointon Date: Thu, 23 Jun 2022 08:41:07 +0000 Subject: [PATCH 13/13] Increase PC_PUB_INTERVAL to match slot time --- pc/manager.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pc/manager.cpp b/pc/manager.cpp index af267227d..00dc81dd9 100644 --- a/pc/manager.cpp +++ b/pc/manager.cpp @@ -9,7 +9,7 @@ using namespace pc; #define PC_RPC_HTTP_PORT 8899 #define PC_RECONNECT_TIMEOUT (120L*1000000000L) #define PC_BLOCKHASH_TIMEOUT 3 -#define PC_PUB_INTERVAL (227L*PC_NSECS_IN_MSEC) +#define PC_PUB_INTERVAL (400L*PC_NSECS_IN_MSEC) #define PC_RPC_HOST "localhost" #define PC_MAX_BATCH 8 // Flush partial batches if not completed within 400 ms.