Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions pc/manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ using namespace pc;
#define PC_MAX_BATCH 8
// Flush partial batches if not completed within 400 ms.
#define PC_FLUSH_INTERVAL (400L*PC_NSECS_IN_MSEC)
// Compute units requested per price update instruction
// The biggest instruction appears to be about ~10300 CUs, so we overestimate by 100%.
#define PC_UPD_PRICE_COMPUTE_UNITS 20000

///////////////////////////////////////////////////////////////////////////
// manager_sub
Expand Down Expand Up @@ -76,6 +79,8 @@ manager::manager()
is_pub_( false ),
cmt_( commitment::e_confirmed ),
max_batch_( PC_MAX_BATCH ),
requested_upd_price_cu_units_( PC_UPD_PRICE_COMPUTE_UNITS ),
requested_upd_price_cu_price_( 0UL ),
sreq_{ { commitment::e_processed } },
secondary_{ nullptr },
is_secondary_( false )
Expand Down Expand Up @@ -189,6 +194,22 @@ int64_t manager::get_publish_interval() const
return pub_int_ / PC_NSECS_IN_MSEC;
}

void manager::set_requested_upd_price_cu_units( unsigned cu_units ) {
requested_upd_price_cu_units_ = cu_units;
}

unsigned manager::get_requested_upd_price_cu_units() const {
return requested_upd_price_cu_units_;
}

void manager::set_requested_upd_price_cu_price( unsigned cu_price ) {
requested_upd_price_cu_price_ = cu_price;
}

unsigned manager::get_requested_upd_price_cu_price() const {
return requested_upd_price_cu_price_;
}

void manager::set_max_batch_size( unsigned batch_size )
{
max_batch_ = batch_size;
Expand Down
10 changes: 10 additions & 0 deletions pc/manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,14 @@ namespace pc
void set_publish_interval( int64_t mill_secs );
int64_t get_publish_interval() const;

// override the amount of requested CU units per upd_price transaction
void set_requested_upd_price_cu_units( unsigned cu_units );
unsigned get_requested_upd_price_cu_units() const;

// override the price per CU for upd_price transaction
void set_requested_upd_price_cu_price( unsigned cu_price );
unsigned get_requested_upd_price_cu_price() const;

// override the default maximum number of price updates to send in a batch
void set_max_batch_size( unsigned batch_size );
unsigned get_max_batch_size() const;
Expand Down Expand Up @@ -275,6 +283,8 @@ namespace pc
tx_parser txp_; // handle unexpected errors
commitment cmt_; // account get/subscribe commitment
unsigned max_batch_;// maximum number of price updates that can be sent in a single batch
unsigned requested_upd_price_cu_units_; // amount of requested CU units per upd_price transaction
unsigned requested_upd_price_cu_price_; // price per CU for upd_price transaction

// requests
rpc::get_slot sreq_[1]; // slot subscription
Expand Down
4 changes: 2 additions & 2 deletions pc/request.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -801,7 +801,7 @@ bool price::send( price *prices[], const unsigned n )
) {
if ( mgr->get_do_tx() ) {
net_wtr msg;
if ( rpc::upd_price::build( msg, &upds_[ 0 ], upds_.size() ) ) {
if ( rpc::upd_price::build( msg, &upds_[ 0 ], upds_.size(), mgr->get_requested_upd_price_cu_units(), mgr->get_requested_upd_price_cu_units() ) ) {
mgr->submit( msg );
}
else {
Expand All @@ -814,7 +814,7 @@ bool price::send( price *prices[], const unsigned n )
}
}
else {
p->get_rpc_client()->send( &upds_[ 0 ], upds_.size() );
p->get_rpc_client()->send( &upds_[ 0 ], upds_.size(), mgr->get_requested_upd_price_cu_units(), mgr->get_requested_upd_price_cu_price() );
for ( unsigned k = j; k <= i; ++k ) {
price *const p1 = prices[ k ];
p1->tvec_.emplace_back(
Expand Down
73 changes: 50 additions & 23 deletions pc/rpc_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ void rpc_client::send( rpc_request *rptr )
}
}

void rpc_client::send( rpc::upd_price *upds[], const unsigned n )
void rpc_client::send( rpc::upd_price *upds[], const unsigned n, unsigned cu_units, unsigned cu_price )
{
if ( ! n ) {
return;
Expand Down Expand Up @@ -212,7 +212,7 @@ void rpc_client::send( rpc::upd_price *upds[], const unsigned n )
jw.add_val( json_wtr::e_obj );
jw.add_key( "jsonrpc", "2.0" );
jw.add_key( "id", id );
rpc::upd_price::request( jw, upds, n );
rpc::upd_price::request( jw, upds, n, cu_units, cu_price );
jw.pop();
// jw.print();
if ( upds[ 0 ]->get_is_http() ) {
Expand Down Expand Up @@ -888,8 +888,16 @@ class tx_wtr : public net_wtr
}
};

// Populates the given tx with the given upd_price requests. This function allows
// specifying the number of requested cu units, and a price per cu unit, to enable
// priority fees. If these parameters are emitted these are left as unspecified in
// the transaction.
bool rpc::upd_price::build_tx(
bincode& tx, upd_price* upds[], const unsigned n
bincode& tx,
upd_price* upds[],
const unsigned n,
unsigned cu_units,
unsigned cu_price
)
{
if ( ! n ) {
Expand Down Expand Up @@ -923,28 +931,34 @@ bool rpc::upd_price::build_tx(
tx.add( *first.bhash_ ); // recent block hash

// instructions section
tx.add_len( n + 1 + 1 ); // 1 compute limit instruction, 1 compute unit price instruction, n upd_price instruction(s)
unsigned instruction_count = n; // n upd_price instruction(s)
if ( cu_units > 0 ) {
instruction_count += 1; // Extra instruction for specifying number of cus per instructions
}
if ( cu_price > 0 ) {
instruction_count += 1; // Extra instruction for specifying compute unit price
}
tx.add_len( instruction_count ); // 1 compute limit instruction, 1 compute unit price instruction, n upd_price instruction(s)

// Set compute limit
tx.add( (uint8_t)( n + 3 ) ); // compute budget program id index in accounts list
tx.add_len<0>(); // no accounts
// compute limit instruction parameters
tx.add_len<sizeof(uint8_t) + sizeof(uint32_t)>(); // uint8_t enum variant + uint32_t requested compute units
tx.add( (uint8_t) 2 ); // SetComputeLimit enum variant
tx.add( (uint32_t) CU_BUDGET_PER_IX * n ); // the budget (scaled for number of instructions)

// Requested price per compute unit, in micro lamports
// We want to pay 5000 lamports in total, so ((5000/20000) / 8) * (10^6)
// assuming upper bound of 20000 CUs and 8 instructions.
const uint64_t cu_price_micro_lamports = 31250;
if ( cu_units > 0 ) {
tx.add( (uint8_t)( n + 3 ) ); // compute budget program id index in accounts list
tx.add_len<0>(); // no accounts
// compute limit instruction parameters
tx.add_len<sizeof(uint8_t) + sizeof(uint32_t)>(); // uint8_t enum variant + uint32_t requested compute units
tx.add( (uint8_t) 2 ); // SetComputeLimit enum variant
tx.add( (uint32_t) ( cu_units * n ) ); // the budget (scaled for number of instructions)
}

// Set compute unit price
tx.add( (uint8_t)( n + 3 ) );
tx.add_len<0>(); // no accounts
// compute unit price instruction parameters
tx.add_len<sizeof(uint8_t) + sizeof(uint64_t)>(); // uint8_t enum variant + uint62_t compute price
tx.add( (uint8_t) 3 ); // SetComputePrice enum variant
tx.add( (uint64_t) cu_price_micro_lamports ); // price we are willing to pay per compute unit in Micro Lamports
if ( cu_price > 0 ) {
tx.add( (uint8_t)( n + 3 ) );
tx.add_len<0>(); // no accounts
// compute unit price instruction parameters
tx.add_len<sizeof(uint8_t) + sizeof(uint64_t)>(); // uint8_t enum variant + uint62_t compute price
tx.add( (uint8_t) 3 ); // SetComputePrice enum variant
tx.add( (uint64_t) cu_price ); // price we are willing to pay per compute unit in Micro Lamports
}

for ( unsigned i = 0; i < n; ++i ) {
tx.add( (uint8_t)( n + 2 ) ); // program_id index
Expand Down Expand Up @@ -989,10 +1003,17 @@ void rpc::upd_price::request( json_wtr& msg )
bool rpc::upd_price::build(
net_wtr& wtr, upd_price* upds[], const unsigned n
)
{
return build( wtr, upds, n, 0, 0 );
}

bool rpc::upd_price::build(
net_wtr& wtr, upd_price* upds[], const unsigned n, unsigned cu_units, unsigned cu_price
)
{
bincode tx;
static_cast< tx_wtr& >( wtr ).init( tx );
if ( ! build_tx( tx, upds, n ) ) {
if ( ! build_tx( tx, upds, n, cu_units, cu_price ) ) {
return false;
}
static_cast< tx_wtr& >( wtr ).commit( tx );
Expand All @@ -1001,12 +1022,18 @@ bool rpc::upd_price::build(

bool rpc::upd_price::request(
json_wtr& msg, upd_price* upds[], const unsigned n
) {
return request( msg, upds, n, 0, 0 );
}

bool rpc::upd_price::request(
json_wtr& msg, upd_price* upds[], const unsigned n, unsigned cu_units, unsigned cu_price
)
{
// construct binary transaction
net_buf *bptr = net_buf::alloc();
bincode tx( bptr->buf_ );
if ( ! build_tx( tx, upds, n ) ) {
if ( ! build_tx( tx, upds, n, cu_units, cu_price ) ) {
return false;
}

Expand Down
7 changes: 5 additions & 2 deletions pc/rpc_client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ namespace pc

// submit rpc request (and bundled callback)
void send( rpc_request * );
void send( rpc::upd_price *[], unsigned n );
void send( rpc::upd_price *[], unsigned n, unsigned cu_units, unsigned cu_price );

public:

Expand Down Expand Up @@ -428,8 +428,11 @@ namespace pc
static bool build( net_wtr&, upd_price*[], unsigned n );
static bool request( json_wtr&, upd_price*[], const unsigned n );

static bool build( net_wtr&, upd_price*[], unsigned n, unsigned cu_units, unsigned cu_price );
static bool request( json_wtr&, upd_price*[], const unsigned n, unsigned cu_units, unsigned cu_price );

private:
static bool build_tx( bincode&, upd_price*[], unsigned n );
static bool build_tx( bincode&, upd_price*[], unsigned n, unsigned cu_units, unsigned cu_price );

hash *bhash_;
key_pair *pkey_;
Expand Down
10 changes: 10 additions & 0 deletions pcapps/pythd.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,10 @@ int usage()
std::cerr << " -d" << std::endl;
std::cerr << " Turn on debug logging. Can also toggle this on/off via "
"kill -s SIGUSR1 <pid>\n" << std::endl;
std::cerr << " -u" << std::endl;
std::cerr << " Number of compute units requested by each upd_price transaction (default 20000)" << std::endl;
std::cerr << " -v" << std::endl;
std::cerr << " Price per compute unit for each upd_price transaction, in micro lamports (the default is not to specify a specific price)" << std::endl;
return 1;
}

Expand Down Expand Up @@ -104,6 +108,8 @@ int main(int argc, char **argv)
int pyth_port = get_port();
int opt = 0;
int pub_int = 1000;
unsigned cu_units = 20000;
unsigned cu_price = 0;
unsigned max_batch_size = 0;
bool do_wait = true, do_tx = true, do_ws = true, do_debug = false;
while( (opt = ::getopt(argc,argv, "r:s:t:p:i:k:w:c:l:m:b:dnxhz" )) != -1 ) {
Expand All @@ -123,6 +129,8 @@ int main(int argc, char **argv)
case 'x': do_tx = false; break;
case 'z': do_ws = false; break;
case 'd': do_debug = true; break;
case 'u': cu_units = strtoul(optarg, NULL, 0); break;
case 'v': cu_price = strtoul(optarg, NULL, 0); break;
default: return usage();
}
}
Expand Down Expand Up @@ -153,6 +161,8 @@ int main(int argc, char **argv)
mgr.set_do_capture( !cap_file.empty() );
mgr.set_commitment( cmt );
mgr.set_publish_interval( pub_int );
mgr.set_requested_upd_price_cu_units( cu_units );
mgr.set_requested_upd_price_cu_price( cu_price );

bool do_secondary = !secondary_rpc_host.empty();
if ( do_secondary ) {
Expand Down