Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 85 additions & 22 deletions pc/manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ using namespace pc;
#define PC_RPC_HTTP_PORT 8899
#define PC_RECONNECT_TIMEOUT (120L*1000000000L)
#define PC_BLOCKHASH_TIMEOUT 3
#define PC_PUB_INTERVAL (227L*PC_NSECS_IN_MSEC)
#define PC_PUB_INTERVAL (400L*PC_NSECS_IN_MSEC)
#define PC_RPC_HOST "localhost"
#define PC_MAX_BATCH 8
// Flush partial batches if not completed within 400 ms.
Expand Down Expand Up @@ -76,7 +76,9 @@ manager::manager()
is_pub_( false ),
cmt_( commitment::e_confirmed ),
max_batch_( PC_MAX_BATCH ),
sreq_{ { commitment::e_processed } }
sreq_{ { commitment::e_processed } },
secondary_{ nullptr },
is_secondary_( false )
{
tconn_.set_sub( this );
breq_->set_sub( this );
Expand All @@ -98,6 +100,9 @@ manager::~manager()
delete ptr;
}
svec_.clear();
if ( has_secondary() ) {
delete secondary_;
}
}

bool manager::tx_parser::parse( const char *, size_t len, size_t& res )
Expand All @@ -116,7 +121,7 @@ void manager::del_map_sub()
{
if ( --num_sub_ <= 0 && !has_status( PC_PYTH_HAS_MAPPING ) ) {
set_status( PC_PYTH_HAS_MAPPING );
PC_LOG_INF( "completed_mapping_init" ).end();
PC_LOG_INF( "completed_mapping_init" ).add( "secondary", get_is_secondary() ).end();
// notify user that initialization is complete
if ( sub_ ) {
sub_->on_init( this );
Expand Down Expand Up @@ -266,7 +271,7 @@ uint64_t manager::get_slot() const

void manager::teardown()
{
PC_LOG_INF( "pythd_teardown" ).end();
PC_LOG_INF( "pythd_teardown" ).add( "secondary", get_is_secondary() ).end();

// shutdown listener
lsvr_.close();
Expand All @@ -287,6 +292,11 @@ void manager::teardown()
wconn_ = nullptr;
clnt_.set_ws_conn( nullptr );
}

// Shutdown secondary messenger
if ( has_secondary() ) {
get_secondary()->teardown();
}
}

bool manager::init()
Expand All @@ -299,15 +309,15 @@ bool manager::init()
// log import key names
key_pair *kp = get_publish_key_pair();
if ( kp ) {
PC_LOG_INF( "publish_key" ).add( "key_name", *kp ).end();
PC_LOG_INF( "publish_key" ).add( "key_name", *kp ).add( "secondary", get_is_secondary() ).end();
}
pub_key *mpub = get_mapping_pub_key();
if ( mpub ) {
PC_LOG_INF( "mapping_key" ).add( "key_name", *mpub ).end();
PC_LOG_INF( "mapping_key" ).add( "key_name", *mpub ).add( "secondary", get_is_secondary() ).end();
}
pub_key *gpub = get_program_pub_key();
if ( gpub ) {
PC_LOG_INF( "program_key" ).add( "key_name", *gpub ).end();
PC_LOG_INF( "program_key" ).add( "key_name", *gpub ).add( "secondary", get_is_secondary() ).end();
}

// initialize capture
Expand Down Expand Up @@ -365,10 +375,12 @@ bool manager::init()
return set_err_msg( lsvr_.get_err_msg() );
}
PC_LOG_INF("listening").add("port",lsvr_.get_port())
.add( "secondary", get_is_secondary() )
.add( "content_dir", get_content_dir() )
.end();
}
PC_LOG_INF( "initialized" )
.add( "secondary", get_is_secondary() )
.add( "version", PC_VERSION )
.add( "rpc_host", get_rpc_host() )
.add( "tx_host", get_tx_host() )
Expand All @@ -377,9 +389,48 @@ bool manager::init()
.add( "publish_interval(ms)", get_publish_interval() )
.end();

// Initialize secondary network manager
if ( has_secondary() ) {
PC_LOG_INF("initializing secondary manager").end();
secondary_->init();
PC_LOG_INF("initialized secondary manager").end();
}

return true;
}

void manager::add_secondary( const std::string& rpc_host, const std::string& key_dir )
{

manager *mgr = new manager;
mgr->set_dir( key_dir );
mgr->set_rpc_host( rpc_host );
mgr->set_tx_host( thost_ );
mgr->set_do_tx( do_tx_ );
mgr->set_do_ws( do_ws_ );
mgr->set_commitment( cmt_ );
mgr->set_is_secondary( true );

secondary_ = mgr;

}

bool manager::has_secondary() const {
return secondary_ != nullptr;
}

void manager::set_is_secondary(bool is_secondary) {
is_secondary_ = is_secondary;
}

bool manager::get_is_secondary() const {
return is_secondary_;
}

manager *manager::get_secondary() {
return secondary_;
}

bool manager::get_is_tx_send() const
{
return tconn_.get_is_send();
Expand Down Expand Up @@ -527,6 +578,9 @@ void manager::poll( bool do_wait )
}
}

// request quotes from the publishers
poll_schedule();

// try to (re)connect to tx proxy
if ( do_tx_ && ( !tconn_.get_is_connect() || tconn_.get_is_err() ) ) {
tconn_.reconnect();
Expand All @@ -535,17 +589,28 @@ void manager::poll( bool do_wait )
if ( has_status( PC_PYTH_RPC_CONNECTED ) &&
!hconn_.get_is_err() &&
( !wconn_ || !wconn_->get_is_err() ) ) {
// request product quotes from pythd's clients while connected
poll_schedule();

send_pending_ups();
} else {
reconnect_rpc();
}

// Call the secondary manager's poll loop if necessary
if ( has_secondary() ) {
secondary_->poll();
}
}

void manager::poll_schedule()
{
// Enable publishing mode if enough time has elapsed since last time.
int64_t time_since_last_stagger = curr_ts_ - pub_ts_;
if ( !is_pub_ && ( time_since_last_stagger > pub_int_ ) ) {
is_pub_ = true;
kidx_ = 0;
pub_ts_ = curr_ts_;
}

// Schedule the price_sched requests in a staggered fashion.
while ( is_pub_ && kidx_ < kvec_.size() ) {
price_sched *kptr = kvec_[kidx_];
int64_t pub_ts = pub_ts_ + static_cast< int64_t >(
Expand Down Expand Up @@ -578,15 +643,12 @@ void manager::reconnect_rpc()

// check for successful (re)connect
if ( !hconn_.get_is_err() && ( !wconn_ || !wconn_->get_is_err() ) ) {
PC_LOG_INF( "rpc_connected" ).end();
PC_LOG_INF( "rpc_connected" ).add( "secondary", get_is_secondary() ).end();
set_status( PC_PYTH_RPC_CONNECTED );

// reset state
wait_conn_ = false;
is_pub_ = false;
kidx_ = 0;
ctimeout_ = PC_NSECS_IN_SEC;
pub_ts_ = 0L;
slot_ = 0L;
slot_cnt_ = 0UL;
slot_ts_ = 0L;
Expand Down Expand Up @@ -685,6 +747,7 @@ void manager::log_disconnect()
{
if ( hconn_.get_is_err() ) {
PC_LOG_ERR( "rpc_http_reset")
.add( "secondary", get_is_secondary() )
.add( "error", hconn_.get_err_msg() )
.add( "host", rhost_ )
.add( "port", hconn_.get_port() )
Expand All @@ -693,6 +756,7 @@ void manager::log_disconnect()
}
if ( wconn_ && wconn_->get_is_err() ) {
PC_LOG_ERR( "rpc_websocket_reset" )
.add( "secondary", get_is_secondary() )
.add( "error", wconn_->get_err_msg() )
.add( "host", rhost_ )
.add( "port", wconn_->get_port() )
Expand Down Expand Up @@ -770,20 +834,14 @@ void manager::on_response( rpc::get_slot *res )
PC_LOG_DBG( "received get_slot" )
.add( "slot", slot_ )
.add( "round_trip_time(ms)", 1e-6*ack_ts )
.add( "secondary", get_is_secondary() )
.end();

// submit block hash every N slots
if ( slot_cnt_++ % PC_BLOCKHASH_TIMEOUT == 0 ) {
clnt_.send( breq_ );
}

// reset submit
if ( !is_pub_ ) {
kidx_ = 0;
pub_ts_ = ts;
is_pub_ = true;
}

// flush capture
if ( do_cap_ ) {
cap_.flush();
Expand Down Expand Up @@ -812,6 +870,7 @@ void manager::on_response( rpc::get_recent_block_hash *m )
// set initialized status for block hash
set_status( PC_PYTH_HAS_BLOCK_HASH );
PC_LOG_INF( "received_recent_block_hash" )
.add( "secondary", get_is_secondary() )
.add( "curr_slot", slot_ )
.add( "hash_slot", m->get_slot() )
.add( "round_trip_time(ms)", 1e-6*ack_ts )
Expand All @@ -830,13 +889,15 @@ void manager::on_response( rpc::account_update *m )
if ( m->get_is_http() ) {
int64_t ack_ts = m->get_recv_time() - m->get_sent_time();
PC_LOG_DBG( "received account_update" )
.add( "secondary", get_is_secondary() )
.add( "account", *m->get_account() )
.add( "slot", slot_ )
.add( "round_trip_time(ms)", 1e-6*ack_ts )
.end();
}
else {
PC_LOG_DBG( "received account_update" )
.add( "secondary", get_is_secondary() )
.add( "account", *m->get_account() )
.add( "slot", slot_ )
.end();
Expand Down Expand Up @@ -883,12 +944,14 @@ bool manager::submit_poll( request *req )
}
if ( req->get_is_err() ) {
PC_LOG_ERR( "request error")
.add( "secondary", get_is_secondary() )
.add( "error", req->get_err_msg() )
.end();
return false;
}
if ( get_is_err() ) {
PC_LOG_ERR( "request error")
.add( "secondary", get_is_secondary() )
.add( "error", get_err_msg() )
.end();
return false;
Expand All @@ -908,7 +971,7 @@ void manager::on_connect()
void manager::on_disconnect()
{
// callback user with connection status
PC_LOG_INF( "pyth_tx_reset" ).end();
PC_LOG_INF( "pyth_tx_reset" ).add( "secondary", get_is_secondary() ).end();
if ( sub_ ) {
sub_->on_tx_disconnect( this );
}
Expand Down
11 changes: 11 additions & 0 deletions pc/manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,9 @@ namespace pc
// accept new pyth client apps
void accept( int fd ) override;

// add secondary network manager
void add_secondary( const std::string& rpc_host, const std::string& key_dir );

// shut-down server
void teardown();

Expand All @@ -189,6 +192,11 @@ namespace pc
get_mapping *get_last_mapping() const;
bool get_is_rpc_send() const;

bool has_secondary() const;
void set_is_secondary(bool is_secondary);
bool get_is_secondary() const;
manager *get_secondary();

private:

struct trait_account {
Expand Down Expand Up @@ -279,6 +287,9 @@ namespace pc

// Timestamp of the last batch
int64_t last_upd_ts_= 0;

manager *secondary_; // manager for secondary network
bool is_secondary_; // flag tracking whether we are a secondary manager
};

inline bool manager::get_is_tx_connect() const
Expand Down
Loading