Skip to content

Commit 1122c89

Browse files
committed
Simplify notify_price_sched logic.
This commit updates the price_sched logic to schedule all price_sched requests in both managers at a fixed interval. The reason for this is: Previously, the notify_price_sched messages were staggered over the slot interval. However, as the updates are now batched there is no need to do this. Therefore, we now send notify_price_sched messages at a fixed interval. When subscribe_price_sched messages come in, the associated price_sched requests are added to the manager which contains the price information. This is necessary, as the manager associated with the price_sched request needs to be the same manager as that of the price object. This means that the price_sched requests the user cares about could be distributed between both managers. Therefore, at any time, all price_sched requests in either manager need to be sent at a constant interval, regardless of their connectivity status. This ensures that price updates are always being submitted to pythd. If a network isn't online, its price updates will simply fail to be sent.
1 parent e653b77 commit 1122c89

File tree

5 files changed

+36
-34
lines changed

5 files changed

+36
-34
lines changed

pc/manager.cpp

Lines changed: 6 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -60,20 +60,17 @@ manager::manager()
6060
sub_( nullptr ),
6161
status_( 0 ),
6262
num_sub_( 0 ),
63-
kidx_( (unsigned)-1 ),
6463
cts_( 0L ),
6564
ctimeout_( PC_NSECS_IN_SEC ),
6665
slot_( 0UL ),
6766
slot_cnt_( 0UL ),
6867
slot_ts_{ 0UL },
6968
curr_ts_( 0L ),
70-
pub_ts_( 0L ),
7169
pub_int_( PC_PUB_INTERVAL ),
7270
wait_conn_( false ),
7371
do_cap_( false ),
7472
do_ws_( true ),
7573
do_tx_( true ),
76-
is_pub_( false ),
7774
cmt_( commitment::e_confirmed ),
7875
max_batch_( PC_MAX_BATCH ),
7976
sreq_{ { commitment::e_processed } },
@@ -572,6 +569,9 @@ void manager::poll( bool do_wait )
572569
}
573570
}
574571

572+
// request product quotes from publishers
573+
poll_schedule();
574+
575575
// try to (re)connect to tx proxy
576576
if ( do_tx_ && ( !tconn_.get_is_connect() || tconn_.get_is_err() ) ) {
577577
tconn_.reconnect();
@@ -580,9 +580,6 @@ void manager::poll( bool do_wait )
580580
if ( has_status( PC_PYTH_RPC_CONNECTED ) &&
581581
!hconn_.get_is_err() &&
582582
( !wconn_ || !wconn_->get_is_err() ) ) {
583-
// request product quotes from pythd's clients while connected
584-
poll_schedule();
585-
586583
send_pending_ups();
587584
} else {
588585
reconnect_rpc();
@@ -596,19 +593,10 @@ void manager::poll( bool do_wait )
596593

597594
void manager::poll_schedule()
598595
{
599-
while ( is_pub_ && kidx_ < kvec_.size() ) {
600-
price_sched *kptr = kvec_[kidx_];
601-
int64_t pub_ts = pub_ts_ + static_cast< int64_t >(
602-
( static_cast< uint64_t >( pub_int_ ) * kptr->get_hash() )
603-
/ price_sched::fraction
604-
);
605-
if ( curr_ts_ > pub_ts ) {
596+
for ( uint32_t i = 0; i < kvec_.size(); i++ ) {
597+
price_sched *kptr = kvec_[i];
598+
if ( ( curr_ts_ - kptr->get_last_scheduled_time() ) > pub_int_ ) {
606599
kptr->schedule();
607-
if ( ++kidx_ >= kvec_.size() ) {
608-
is_pub_ = false;
609-
}
610-
} else {
611-
break;
612600
}
613601
}
614602
}
@@ -633,10 +621,7 @@ void manager::reconnect_rpc()
633621

634622
// reset state
635623
wait_conn_ = false;
636-
is_pub_ = false;
637-
kidx_ = 0;
638624
ctimeout_ = PC_NSECS_IN_SEC;
639-
pub_ts_ = 0L;
640625
slot_ = 0L;
641626
slot_cnt_ = 0UL;
642627
slot_ts_ = 0L;
@@ -830,13 +815,6 @@ void manager::on_response( rpc::get_slot *res )
830815
clnt_.send( breq_ );
831816
}
832817

833-
// reset submit
834-
if ( !is_pub_ ) {
835-
kidx_ = 0;
836-
pub_ts_ = ts;
837-
is_pub_ = true;
838-
}
839-
840818
// flush capture
841819
if ( do_cap_ ) {
842820
cap_.flush();

pc/manager.hpp

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -255,14 +255,12 @@ namespace pc
255255
manager_sub *sub_; // subscription callback
256256
int status_; // status bitmap
257257
int num_sub_; // number of in-flight mapping subscriptions
258-
uint32_t kidx_; // schedule index
259258
int64_t cts_; // (re)connect timestamp
260259
int64_t ctimeout_; // connection timeout
261260
uint64_t slot_; // current slot
262261
uint64_t slot_cnt_; // slot count
263262
int64_t slot_ts_; // current slot time
264263
int64_t curr_ts_; // current time
265-
int64_t pub_ts_; // start publish time
266264
int64_t pub_int_; // publish interval
267265
kpx_vec_t kvec_; // symbol price scheduling
268266
bool wait_conn_;// waiting on connection

pc/request.cpp

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1084,7 +1084,8 @@ void price::dump_json( json_wtr& wtr ) const
10841084

10851085
price_sched::price_sched( price *ptr )
10861086
: ptr_( ptr ),
1087-
shash_( 0UL )
1087+
shash_( 0UL ),
1088+
last_scheduled_time_( 0L )
10881089
{
10891090
}
10901091

@@ -1101,9 +1102,25 @@ uint64_t price_sched::get_hash() const
11011102
bool price_sched::get_is_ready()
11021103
{
11031104
manager *cptr = get_manager();
1104-
return cptr->has_status( PC_PYTH_RPC_CONNECTED |
1105-
PC_PYTH_HAS_BLOCK_HASH |
1106-
PC_PYTH_HAS_MAPPING );
1105+
1106+
bool primary_ready = cptr->has_status(
1107+
PC_PYTH_RPC_CONNECTED |
1108+
PC_PYTH_HAS_BLOCK_HASH |
1109+
PC_PYTH_HAS_MAPPING
1110+
);
1111+
1112+
bool secondary_ready = cptr->has_secondary() &&
1113+
cptr->get_secondary()->has_status(
1114+
PC_PYTH_RPC_CONNECTED |
1115+
PC_PYTH_HAS_BLOCK_HASH |
1116+
PC_PYTH_HAS_MAPPING
1117+
);
1118+
1119+
return primary_ready || secondary_ready;
1120+
}
1121+
1122+
int64_t price_sched::get_last_scheduled_time() {
1123+
return last_scheduled_time_;
11071124
}
11081125

11091126
void price_sched::submit()
@@ -1116,6 +1133,7 @@ void price_sched::submit()
11161133

11171134
void price_sched::schedule()
11181135
{
1136+
last_scheduled_time_ = get_now();
11191137
on_response_sub( this );
11201138
}
11211139

pc/request.hpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,9 @@ namespace pc
203203
// get associated symbol price
204204
price *get_price() const;
205205

206+
// last time this price submision schedule was scheduled
207+
int64_t get_last_scheduled_time();
208+
206209
public:
207210
static const uint64_t fraction = 997UL;
208211

@@ -214,6 +217,7 @@ namespace pc
214217
private:
215218
price *ptr_;
216219
uint64_t shash_;
220+
int64_t last_scheduled_time_;
217221
};
218222

219223
// price account re-initialized

pc/user.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -274,6 +274,10 @@ void user::parse_sub_price_sched( uint32_t tok, uint32_t itok )
274274
pub_key pkey;
275275
pkey.init_from_text( jp_.get_str( ntok ) );
276276
price *sptr = sptr_->get_price( pkey );
277+
if ( PC_UNLIKELY( !sptr && sptr_->has_secondary() ) ) {
278+
sptr = sptr_->get_secondary()->get_price( pkey );
279+
}
280+
277281
if ( PC_UNLIKELY( !sptr ) ) { add_unknown_symbol(itok); return; }
278282

279283
// add subscription

0 commit comments

Comments
 (0)