Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ RUN apt-get install -qq \
libzstd1 \
libzstd-dev \
python3-pytest \
python3-pytest-asyncio \
python3-websockets \
sudo \
zlib1g \
Expand Down
6 changes: 6 additions & 0 deletions pc/manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -746,6 +746,12 @@ void manager::on_response( rpc::get_slot *res )
if (
has_status( PC_PYTH_RPC_CONNECTED )
) {

// New slot received, so flush all pending updates for all active users
for( user *uptr = olist_.first(); uptr; uptr = uptr->get_next() ) {
uptr->send_pending_upds();
}

if ( sub_ ) {
sub_->on_slot_publish( this );
}
Expand Down
29 changes: 26 additions & 3 deletions pc/request.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -708,20 +708,39 @@ bool price::send( price *prices[], const unsigned n )
for ( unsigned i = 0, j = 0; i < n; ++i ) {
price *const p = prices[ i ];
if ( PC_UNLIKELY( ! p->init_ && ! p->init_publish() ) ) {
PC_LOG_ERR( "failed to initialize publisher" )
.add( "price_account", *p->get_account() )
.add( "product_account", *p->prod_->get_account() )
.add( "symbol", p->get_symbol() )
.add( "price_type", price_type_to_str( p->get_price_type() ) ).end();
continue;
}
if ( PC_UNLIKELY( ! p->has_publisher() ) ) {
PC_LOG_ERR( "missing publish permission" )
.add( "price_account", *p->get_account() )
.add( "product_account", *p->prod_->get_account() )
.add( "symbol", p->get_symbol() )
.add( "price_type", price_type_to_str( p->get_price_type() ) ).end();
continue;
}
if ( PC_UNLIKELY( ! p->get_is_ready_publish() ) ) {
PC_LOG_ERR( "not ready to publish - check rpc / pyth_tx connection" )
.add( "price_account", *p->get_account() )
.add( "product_account", *p->prod_->get_account() )
.add( "symbol", p->get_symbol() )
.add( "price_type", price_type_to_str( p->get_price_type() ) ).end();
continue;
}
manager *const mgr = p->get_manager();
if ( ! mgr1 ) {
mgr1 = mgr;
}
else if ( mgr != mgr1 ) {
PC_LOG_ERR( "unexpected manager" ).end();
PC_LOG_ERR( "unexpected manager" )
.add( "price_account", *p->get_account() )
.add( "product_account", *p->prod_->get_account() )
.add( "symbol", p->get_symbol() )
.add( "price_type", price_type_to_str( p->get_price_type() ) ).end();
continue;
}
const uint64_t slot = mgr->get_slot();
Expand All @@ -739,7 +758,11 @@ bool price::send( price *prices[], const unsigned n )
mgr->submit( msg );
}
else {
PC_LOG_ERR( "failed to build msg" );
PC_LOG_ERR( "failed to build msg" )
.add( "price_account", *p->get_account() )
.add( "product_account", *p->prod_->get_account() )
.add( "symbol", p->get_symbol() )
.add( "price_type", price_type_to_str( p->get_price_type() ) ).end();
}
}
else {
Expand All @@ -750,7 +773,7 @@ bool price::send( price *prices[], const unsigned n )
std::string( 100, '\0' ), p1->preq_->get_sent_time()
);
p1->preq_->get_signature()->enc_base58( p1->tvec_.back().first );
PC_LOG_DBG( "sent price update transaction" )
PC_LOG_DBG( "sent price update" )
.add( "price_account", *p1->get_account() )
.add( "product_account", *p1->prod_->get_account() )
.add( "symbol", p1->get_symbol() )
Expand Down
40 changes: 25 additions & 15 deletions pc/user.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#include "manager.hpp"
#include "log.hpp"
#include "mem_map.hpp"
#include <algorithm>

#define PC_JSON_RPC_VER "2.0"
#define PC_JSON_PARSE_ERROR -32700
Expand All @@ -11,6 +12,7 @@
#define PC_JSON_UNKNOWN_SYMBOL -32000
#define PC_JSON_MISSING_PERMS -32001
#define PC_JSON_NOT_READY -32002
#define PC_BATCH_SEND_FAILED -32010

using namespace pc;

Expand Down Expand Up @@ -204,22 +206,17 @@ void user::parse_upd_price( uint32_t tok, uint32_t itok )
if ( 0 == (ntok = jp_.find_val( ptok, "status" ) ) ) break;
symbol_status stype = str_to_symbol_status( jp_.get_str( ntok ) );

// submit new price
if ( sptr->update( price, conf, stype ) ) {
// create result
add_header();
jw_.add_key( "result", 0UL );
add_tail( itok );
} else if ( !sptr->get_is_ready_publish() ) {
add_error( itok, PC_JSON_NOT_READY,
"not ready to publish - check rpc / pyth_tx connection" );
} else if ( !sptr->has_publisher() ) {
add_error( itok, PC_JSON_MISSING_PERMS, "missing publish permission" );
} else if ( sptr->get_is_err() ) {
add_error( itok, PC_JSON_INVALID_REQUEST, sptr->get_err_msg() );
} else {
add_error( itok, PC_JSON_INVALID_REQUEST, "unknown error" );
// Add the updated price to the pending updates
sptr->update_no_send( price, conf, stype, false );
if( std::find(pending_vec_.begin(), pending_vec_.end(), sptr) == pending_vec_.end() ) {
pending_vec_.emplace_back( sptr );
}

// Send the result back
add_header();
jw_.add_key( "result", 0UL );
add_tail( itok );

return;
} while( 0 );
add_invalid_params( itok );
Expand Down Expand Up @@ -333,6 +330,19 @@ void user::parse_get_product( uint32_t tok, uint32_t itok )
add_tail( itok );
}

void user::send_pending_upds()
{
if ( pending_vec_.empty() ) {
return;
}

if ( !price::send( pending_vec_.data(), pending_vec_.size()) ) {
add_error( 0, PC_BATCH_SEND_FAILED, "batch send failed - please check the pyth logs" );
}

pending_vec_.clear();
}

void user::parse_get_all_products( uint32_t itok )
{
add_header();
Expand Down
19 changes: 12 additions & 7 deletions pc/user.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ namespace pc
// symbol price schedule callback
void on_response( price_sched *, uint64_t ) override;

// send all pending updates
void send_pending_upds();

private:

// http-only request parsing
Expand All @@ -57,6 +60,7 @@ namespace pc
};

typedef std::vector<deferred_sub> def_vec_t;
typedef std::vector<price*> pending_vec_t;

void parse_request( uint32_t );
void parse_get_product_list( uint32_t );
Expand All @@ -73,13 +77,14 @@ namespace pc
void add_unknown_symbol( uint32_t id );
void add_error( uint32_t id, int err, str );

rpc_client *rptr_; // rpc manager api
manager *sptr_; // manager collection
user_http hsvr_; // http parser
jtree jp_; // json parser
json_wtr jw_; // json writer
def_vec_t dvec_; // deferred subscriptions
request_sub_set psub_; // price subscriptions
rpc_client *rptr_; // rpc manager api
manager *sptr_; // manager collection
user_http hsvr_; // http parser
jtree jp_; // json parser
json_wtr jw_; // json writer
def_vec_t dvec_; // deferred subscriptions
request_sub_set psub_; // price subscriptions
pending_vec_t pending_vec_; // prices with pending updates
};

}
4 changes: 2 additions & 2 deletions pyth/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -347,12 +347,12 @@ def pythd(solana_test_validator, pyth_dir):
'-x',
'-m', 'finalized',
'-d',
'-l', 'pyth_logs.txt',
]
kwargs = {
'stdin': DEVNULL,
'stdout': DEVNULL,
'stderr': DEVNULL,
}

with Popen(cmd, **kwargs) as p:
time.sleep(3)
yield
Expand Down
118 changes: 118 additions & 0 deletions pyth/tests/test_update_price.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
import json
from subprocess import check_output
import pytest
import websockets
import time
import itertools
import random

from pyth.tests.conftest import PRODUCTS

@pytest.mark.asyncio
async def test_batch_update_price(solana_test_validator, pythd, pyth_dir, pyth_init_product, pyth_init_price):

messageIds = itertools.count()

# Use a single websocket connection for the entire test, as batching is done per-user
async with websockets.connect('ws://localhost:8910/') as ws:

async def update_price(account, price, conf, status):
msg = jrpc_req(
method='update_price',
params={
'account': account,
'price': price,
'conf': conf,
'status': status,
})

await send(msg)
resp = await recv()
assert resp['result'] == 0

async def get_product(account):
output = check_output([
'pyth', 'get_product',
account,
'-r', 'localhost',
'-k', pyth_dir,
'-c', 'finalized',
'-j',
]).decode('ascii')
result = json.loads(output)

return result

def jrpc_req(method=None, params=None):
return {
'jsonrpc': '2.0',
'method': method,
'params': params,
'id': next(messageIds)
}

async def send(msg):
print("--- sending message ---")
print(msg)
await ws.send(json.dumps(msg))

async def recv():
data = await ws.recv()
msg = json.loads(data)
print("----- received message -----")
print(msg)
return msg

def get_publisher_acc(product_acc):
assert len(product_acc['price_accounts']) == 1
price_acc = product_acc['price_accounts'][0]

assert len(price_acc['publisher_accounts']) == 1
return price_acc['publisher_accounts'][0]

# Check that the prices are 0 initially
for product in PRODUCTS.keys():
product_acc = await get_product(pyth_init_product[product])
publisher_acc = get_publisher_acc(product_acc)

assert publisher_acc['price'] == 0
assert publisher_acc['conf'] == 0
assert publisher_acc['status'] == 'unknown'

# Generate new values for this test
new_values = {
product: {
'price':random.randint(1, 150),
'conf': random.randint(1, 20),
'status': 'trading',
} for product in PRODUCTS.keys()
}

# Update the values of the products
for product in PRODUCTS.keys():
await update_price(
pyth_init_price[product],
new_values[product]['price'],
new_values[product]['conf'],
new_values[product]['status'])

time.sleep(80)

# Crank the products
for product in PRODUCTS.keys():
await update_price(
pyth_init_price[product],
1,
1,
'trading')

time.sleep(80)

# Check that the price has been updated
for product in PRODUCTS.keys():
product_acc = await get_product(pyth_init_product[product])
publisher_acc = get_publisher_acc(product_acc)

assert publisher_acc['price'] == new_values[product]['price']
assert publisher_acc['conf'] == new_values[product]['conf']
assert publisher_acc['status'] == new_values[product]['status']