-
Notifications
You must be signed in to change notification settings - Fork 119
Flush batches early #167
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Flush batches early #167
Conversation
| // Flush any pending complete batches of price updates by submitting solana TXs. | ||
| for( user *uptr = olist_.first(); uptr; uptr = uptr->get_next() ) { | ||
| if (uptr->num_pending_upds() >= get_max_batch_size()) { | ||
| uptr->send_pending_upds(get_max_batch_size()); | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It might be simpler to instead add a partial boolean parameter to user::send_pending_upds, and change the semantics of user::send_pending_upds to only send partial batches if partial is true.
You could then call send_pending_upds(false) on every iteration of the event loop, and send_pending_upds(true) when a slot is received.
pc/user.cpp
Outdated
| n_sent = pending_vec_.size(); | ||
| } | ||
|
|
||
| if ( !price::send( pending_vec_.data(), n_sent) ) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As price::send is effectively the API boundary for publishers which link to the C++ bindings, this change won't benefit those publishers.
pc/user.cpp
Outdated
| return pending_vec_.size(); | ||
| } | ||
|
|
||
| void user::send_pending_upds(uint32_t n) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Regarding testing: we don't have an easy way to reliably test when transactions are sent. To test if partial batches and full batches are sent, you could use something like test_update_price.py.
pc/manager.cpp
Outdated
| for( user *uptr = olist_.first(); uptr; uptr = uptr->get_next() ) { | ||
| uptr->send_pending_upds(uptr->num_pending_upds()); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It might be nice to factor this loop out into an manager::send_pending_upds function.
pc/user.cpp
Outdated
| void user::send_pending_upds() | ||
| { | ||
| if ( pending_vec_.empty() ) { | ||
| uint32_t n_sent = 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think to_send might be a clearer variable name: to me, n_sent implies that that it's calculating the number of updates that have already been sent.
| void on_response( price_sched *, uint64_t ) override; | ||
|
|
||
| // send all pending updates | ||
| // send a batch of pending price updates. This function eagerly sends any complete batches. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might be worth clarifying the semantics:
// If there are multiple complete batches, only one will be sent.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I assume those are the desired semantics, as the aim of this is to stagger transactions.
At the moment, pythd sends all of the price update transactions at roughly the same time. I think this is causing the later transactions to fail (though I'm not 100% sure).
Before we added batching, pythd staggered price updates by staggering requests for prices. When the client responded to this request, pythd immediately sent a price update transaction. This approach had the effect of staggering the update transactions (because the staggered update request schedule is directly reflected in the TXs). We lost this staggering when we added batching, because the client responses were queued and then flushed at the start of the next slot.
This change adds back staggering by eagerly sending transactions for any complete batches during the slot. It also explicitly flushes any pending updates at the end of the price request schedule; this flushing handles the case where there aren't enough prices to complete a single batch.