Skip to content

Commit 727bf17

Browse files
author
Jayant Krishnamurthy
committed
better approach
1 parent 889a7ac commit 727bf17

File tree

3 files changed

+20
-22
lines changed

3 files changed

+20
-22
lines changed

pc/manager.cpp

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -507,10 +507,8 @@ void manager::poll( bool do_wait )
507507
poll_schedule();
508508

509509
// Flush any pending complete batches of price updates by submitting solana TXs.
510-
for( user *uptr = olist_.first(); uptr; uptr = uptr->get_next() ) {
511-
if (uptr->num_pending_upds() >= get_max_batch_size()) {
512-
uptr->send_pending_upds(get_max_batch_size());
513-
}
510+
for ( user *uptr = olist_.first(); uptr; uptr = uptr->get_next() ) {
511+
uptr->send_pending_upds();
514512
}
515513
} else {
516514
reconnect_rpc();

pc/user.cpp

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@
1313
#define PC_JSON_MISSING_PERMS -32001
1414
#define PC_JSON_NOT_READY -32002
1515
#define PC_BATCH_SEND_FAILED -32010
16+
// Flush partial batches if not completed within 400 ms.
17+
#define PC_FLUSH_INTERVAL (400L*PC_NSECS_IN_MSEC)
1618

1719
using namespace pc;
1820

@@ -27,7 +29,8 @@ void user::user_http::parse_content( const char *txt, size_t len )
2729
user::user()
2830
: rptr_( nullptr ),
2931
sptr_( nullptr ),
30-
psub_( this )
32+
psub_( this ),
33+
last_update_ts_(0)
3134
{
3235
// setup the plumbing
3336
hsvr_.ptr_ = this;
@@ -330,27 +333,26 @@ void user::parse_get_product( uint32_t tok, uint32_t itok )
330333
add_tail( itok );
331334
}
332335

333-
uint32_t user::num_pending_upds()
336+
void user::send_pending_upds()
334337
{
335-
return pending_vec_.size();
336-
}
337-
338-
void user::send_pending_upds(uint32_t n)
339-
{
340-
if ( pending_vec_.empty() ) {
341-
return;
338+
uint32_t n_sent = 0;
339+
int64_t curr_ts = get_now();
340+
if (curr_ts_ - price_upd_ts_ > PC_FLUSH_INTERVAL) {
341+
n_sent = pending_vec_.size();
342+
} else if (pending_vec_.size() > max_batch_size_) {
343+
n_sent = max_batch_size_;
342344
}
343345

344-
uint32_t n_sent = n;
345-
if (pending_vec_.size() < n) {
346-
n_sent = pending_vec_.size();
346+
if (n_sent == 0) {
347+
return;
347348
}
348349

349350
if ( !price::send( pending_vec_.data(), n_sent) ) {
350351
add_error( 0, PC_BATCH_SEND_FAILED, "batch send failed - please check the pyth logs" );
351352
}
352353

353354
pending_vec_.erase(pending_vec_.begin(), pending_vec_.begin() + n_sent);
355+
price_upd_ts_ = curr_ts_;
354356
}
355357

356358
void user::parse_get_all_products( uint32_t itok )

pc/user.hpp

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -43,12 +43,9 @@ namespace pc
4343
// symbol price schedule callback
4444
void on_response( price_sched *, uint64_t ) override;
4545

46-
// Get the number of pending price updates that are queued to be sent.
47-
uint32_t num_pending_upds();
48-
49-
// send up to n pending price updates. If n > the max batch size,
50-
// this will split the price updates into multiple transactions.
51-
void send_pending_upds(uint32_t n);
46+
// send a batch of pending price updates. This function eagerly sends any complete batches.
47+
// It also sends partial batches that have not been completed within a short interval of time.
48+
void send_pending_upds();
5249

5350
private:
5451

@@ -89,6 +86,7 @@ namespace pc
8986
def_vec_t dvec_; // deferred subscriptions
9087
request_sub_set psub_; // price subscriptions
9188
pending_vec_t pending_vec_; // prices with pending updates
89+
int64_t last_upd_ts_; // timestamp of last price update transaction
9290
};
9391

9492
}

0 commit comments

Comments
 (0)