From 3ebbdd97d21da9cabc63f0db8881f5ca85209b93 Mon Sep 17 00:00:00 2001 From: Tom Pointon Date: Mon, 24 Jan 2022 05:13:17 -0600 Subject: [PATCH 1/5] Change semantics of update_price JRPC call to buffer updates internally, instead of immediately sending them --- pc/user.cpp | 25 +++++++++---------------- pc/user.hpp | 16 +++++++++------- 2 files changed, 18 insertions(+), 23 deletions(-) diff --git a/pc/user.cpp b/pc/user.cpp index 7f7711b58..69db62c62 100644 --- a/pc/user.cpp +++ b/pc/user.cpp @@ -204,22 +204,15 @@ void user::parse_upd_price( uint32_t tok, uint32_t itok ) if ( 0 == (ntok = jp_.find_val( ptok, "status" ) ) ) break; symbol_status stype = str_to_symbol_status( jp_.get_str( ntok ) ); - // submit new price - if ( sptr->update( price, conf, stype ) ) { - // create result - add_header(); - jw_.add_key( "result", 0UL ); - add_tail( itok ); - } else if ( !sptr->get_is_ready_publish() ) { - add_error( itok, PC_JSON_NOT_READY, - "not ready to publish - check rpc / pyth_tx connection" ); - } else if ( !sptr->has_publisher() ) { - add_error( itok, PC_JSON_MISSING_PERMS, "missing publish permission" ); - } else if ( sptr->get_is_err() ) { - add_error( itok, PC_JSON_INVALID_REQUEST, sptr->get_err_msg() ); - } else { - add_error( itok, PC_JSON_INVALID_REQUEST, "unknown error" ); - } + // Add the updated price to the pending updates + sptr->update_no_send( price, conf, stype, false ); + pending_vec_.emplace_back( sptr ); + + // Send the result back + add_header(); + jw_.add_key( "result", 0UL ); + add_tail( itok ); + return; } while( 0 ); add_invalid_params( itok ); diff --git a/pc/user.hpp b/pc/user.hpp index c9c7fc6e9..aa981a943 100644 --- a/pc/user.hpp +++ b/pc/user.hpp @@ -57,6 +57,7 @@ namespace pc }; typedef std::vector def_vec_t; + typedef std::vector pending_vec_t; void parse_request( uint32_t ); void parse_get_product_list( uint32_t ); @@ -73,13 +74,14 @@ namespace pc void add_unknown_symbol( uint32_t id ); void add_error( uint32_t id, int err, str ); - rpc_client *rptr_; // rpc manager api - manager *sptr_; // manager collection - user_http hsvr_; // http parser - jtree jp_; // json parser - json_wtr jw_; // json writer - def_vec_t dvec_; // deferred subscriptions - request_sub_set psub_; // price subscriptions + rpc_client *rptr_; // rpc manager api + manager *sptr_; // manager collection + user_http hsvr_; // http parser + jtree jp_; // json parser + json_wtr jw_; // json writer + def_vec_t dvec_; // deferred subscriptions + request_sub_set psub_; // price subscriptions + pending_vec_t pending_vec_; // prices with pending updates }; } From 85b37583504f35c1c206aa4121d8d7e4471c4399 Mon Sep 17 00:00:00 2001 From: Tom Pointon Date: Mon, 24 Jan 2022 05:14:17 -0600 Subject: [PATCH 2/5] Send pending updates when a new slot is received --- pc/manager.cpp | 6 ++++++ pc/user.cpp | 14 ++++++++++++++ pc/user.hpp | 3 +++ 3 files changed, 23 insertions(+) diff --git a/pc/manager.cpp b/pc/manager.cpp index e980b1952..d32f1789b 100644 --- a/pc/manager.cpp +++ b/pc/manager.cpp @@ -746,6 +746,12 @@ void manager::on_response( rpc::get_slot *res ) if ( has_status( PC_PYTH_RPC_CONNECTED ) ) { + + // New slot received, so flush all pending updates for all active users + for( user *uptr = olist_.first(); uptr; uptr = uptr->get_next() ) { + uptr->send_pending_upds(); + } + if ( sub_ ) { sub_->on_slot_publish( this ); } diff --git a/pc/user.cpp b/pc/user.cpp index 69db62c62..e053bce75 100644 --- a/pc/user.cpp +++ b/pc/user.cpp @@ -11,6 +11,7 @@ #define PC_JSON_UNKNOWN_SYMBOL -32000 #define PC_JSON_MISSING_PERMS -32001 #define PC_JSON_NOT_READY -32002 +#define PC_BATCH_SEND_FAILED -32010 using namespace pc; @@ -326,6 +327,19 @@ void user::parse_get_product( uint32_t tok, uint32_t itok ) add_tail( itok ); } +void user::send_pending_upds() +{ + if ( pending_vec_.empty() ) { + return; + } + + if ( !price::send( pending_vec_.data(), pending_vec_.size()) ) { + add_error( 0, PC_BATCH_SEND_FAILED, "batch send failed - please check the pyth logs" ); + } + + pending_vec_.clear(); +} + void user::parse_get_all_products( uint32_t itok ) { add_header(); diff --git a/pc/user.hpp b/pc/user.hpp index aa981a943..c5e50ff26 100644 --- a/pc/user.hpp +++ b/pc/user.hpp @@ -43,6 +43,9 @@ namespace pc // symbol price schedule callback void on_response( price_sched *, uint64_t ) override; + // send all pending updates + void send_pending_upds(); + private: // http-only request parsing From d22783d2c4e334c30c77700f58e6a2f1fc497e55 Mon Sep 17 00:00:00 2001 From: Tom Pointon Date: Mon, 24 Jan 2022 05:14:41 -0600 Subject: [PATCH 3/5] Add additional logging to batch sending --- pc/request.cpp | 27 +++++++++++++++++++++++++-- 1 file changed, 25 insertions(+), 2 deletions(-) diff --git a/pc/request.cpp b/pc/request.cpp index b96825596..b64ecec82 100644 --- a/pc/request.cpp +++ b/pc/request.cpp @@ -708,12 +708,27 @@ bool price::send( price *prices[], const unsigned n ) for ( unsigned i = 0, j = 0; i < n; ++i ) { price *const p = prices[ i ]; if ( PC_UNLIKELY( ! p->init_ && ! p->init_publish() ) ) { + PC_LOG_ERR( "failed to initialize publisher" ) + .add( "price_account", *p->get_account() ) + .add( "product_account", *p->prod_->get_account() ) + .add( "symbol", p->get_symbol() ) + .add( "price_type", price_type_to_str( p->get_price_type() ) ).end(); continue; } if ( PC_UNLIKELY( ! p->has_publisher() ) ) { + PC_LOG_ERR( "missing publish permission" ) + .add( "price_account", *p->get_account() ) + .add( "product_account", *p->prod_->get_account() ) + .add( "symbol", p->get_symbol() ) + .add( "price_type", price_type_to_str( p->get_price_type() ) ).end(); continue; } if ( PC_UNLIKELY( ! p->get_is_ready_publish() ) ) { + PC_LOG_ERR( "not ready to publish - check rpc / pyth_tx connection" ) + .add( "price_account", *p->get_account() ) + .add( "product_account", *p->prod_->get_account() ) + .add( "symbol", p->get_symbol() ) + .add( "price_type", price_type_to_str( p->get_price_type() ) ).end(); continue; } manager *const mgr = p->get_manager(); @@ -721,7 +736,11 @@ bool price::send( price *prices[], const unsigned n ) mgr1 = mgr; } else if ( mgr != mgr1 ) { - PC_LOG_ERR( "unexpected manager" ).end(); + PC_LOG_ERR( "unexpected manager" ) + .add( "price_account", *p->get_account() ) + .add( "product_account", *p->prod_->get_account() ) + .add( "symbol", p->get_symbol() ) + .add( "price_type", price_type_to_str( p->get_price_type() ) ).end(); continue; } const uint64_t slot = mgr->get_slot(); @@ -739,7 +758,11 @@ bool price::send( price *prices[], const unsigned n ) mgr->submit( msg ); } else { - PC_LOG_ERR( "failed to build msg" ); + PC_LOG_ERR( "failed to build msg" ) + .add( "price_account", *p->get_account() ) + .add( "product_account", *p->prod_->get_account() ) + .add( "symbol", p->get_symbol() ) + .add( "price_type", price_type_to_str( p->get_price_type() ) ).end(); } } else { From 62a3111e8a80f5322fcb74a6b73fc269f0460e37 Mon Sep 17 00:00:00 2001 From: Tom Pointon Date: Mon, 24 Jan 2022 08:08:04 -0600 Subject: [PATCH 4/5] Don't add a price to the list of pending updates if it is already present. --- pc/user.cpp | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/pc/user.cpp b/pc/user.cpp index e053bce75..92792b1ca 100644 --- a/pc/user.cpp +++ b/pc/user.cpp @@ -2,6 +2,7 @@ #include "manager.hpp" #include "log.hpp" #include "mem_map.hpp" +#include #define PC_JSON_RPC_VER "2.0" #define PC_JSON_PARSE_ERROR -32700 @@ -207,7 +208,9 @@ void user::parse_upd_price( uint32_t tok, uint32_t itok ) // Add the updated price to the pending updates sptr->update_no_send( price, conf, stype, false ); - pending_vec_.emplace_back( sptr ); + if( std::find(pending_vec_.begin(), pending_vec_.end(), sptr) == pending_vec_.end() ) { + pending_vec_.emplace_back( sptr ); + } // Send the result back add_header(); From 566a90453102fe77895f30717801a9db205c8671 Mon Sep 17 00:00:00 2001 From: Tom Pointon Date: Thu, 27 Jan 2022 18:00:57 -0600 Subject: [PATCH 5/5] Add test for update_price JPRC call --- docker/Dockerfile | 1 + pc/request.cpp | 2 +- pyth/tests/conftest.py | 4 +- pyth/tests/test_update_price.py | 118 ++++++++++++++++++++++++++++++++ 4 files changed, 122 insertions(+), 3 deletions(-) create mode 100644 pyth/tests/test_update_price.py diff --git a/docker/Dockerfile b/docker/Dockerfile index f9a103653..d054c8f2a 100644 --- a/docker/Dockerfile +++ b/docker/Dockerfile @@ -16,6 +16,7 @@ RUN apt-get install -qq \ libzstd1 \ libzstd-dev \ python3-pytest \ + python3-pytest-asyncio \ python3-websockets \ sudo \ zlib1g \ diff --git a/pc/request.cpp b/pc/request.cpp index b64ecec82..ab187ce84 100644 --- a/pc/request.cpp +++ b/pc/request.cpp @@ -773,7 +773,7 @@ bool price::send( price *prices[], const unsigned n ) std::string( 100, '\0' ), p1->preq_->get_sent_time() ); p1->preq_->get_signature()->enc_base58( p1->tvec_.back().first ); - PC_LOG_DBG( "sent price update transaction" ) + PC_LOG_DBG( "sent price update" ) .add( "price_account", *p1->get_account() ) .add( "product_account", *p1->prod_->get_account() ) .add( "symbol", p1->get_symbol() ) diff --git a/pyth/tests/conftest.py b/pyth/tests/conftest.py index 45c05ee6a..f8e620589 100644 --- a/pyth/tests/conftest.py +++ b/pyth/tests/conftest.py @@ -347,12 +347,12 @@ def pythd(solana_test_validator, pyth_dir): '-x', '-m', 'finalized', '-d', + '-l', 'pyth_logs.txt', ] kwargs = { 'stdin': DEVNULL, - 'stdout': DEVNULL, - 'stderr': DEVNULL, } + with Popen(cmd, **kwargs) as p: time.sleep(3) yield diff --git a/pyth/tests/test_update_price.py b/pyth/tests/test_update_price.py new file mode 100644 index 000000000..a6e8d89b7 --- /dev/null +++ b/pyth/tests/test_update_price.py @@ -0,0 +1,118 @@ +import json +from subprocess import check_output +import pytest +import websockets +import time +import itertools +import random + +from pyth.tests.conftest import PRODUCTS + +@pytest.mark.asyncio +async def test_batch_update_price(solana_test_validator, pythd, pyth_dir, pyth_init_product, pyth_init_price): + + messageIds = itertools.count() + + # Use a single websocket connection for the entire test, as batching is done per-user + async with websockets.connect('ws://localhost:8910/') as ws: + + async def update_price(account, price, conf, status): + msg = jrpc_req( + method='update_price', + params={ + 'account': account, + 'price': price, + 'conf': conf, + 'status': status, + }) + + await send(msg) + resp = await recv() + assert resp['result'] == 0 + + async def get_product(account): + output = check_output([ + 'pyth', 'get_product', + account, + '-r', 'localhost', + '-k', pyth_dir, + '-c', 'finalized', + '-j', + ]).decode('ascii') + result = json.loads(output) + + return result + + def jrpc_req(method=None, params=None): + return { + 'jsonrpc': '2.0', + 'method': method, + 'params': params, + 'id': next(messageIds) + } + + async def send(msg): + print("--- sending message ---") + print(msg) + await ws.send(json.dumps(msg)) + + async def recv(): + data = await ws.recv() + msg = json.loads(data) + print("----- received message -----") + print(msg) + return msg + + def get_publisher_acc(product_acc): + assert len(product_acc['price_accounts']) == 1 + price_acc = product_acc['price_accounts'][0] + + assert len(price_acc['publisher_accounts']) == 1 + return price_acc['publisher_accounts'][0] + + # Check that the prices are 0 initially + for product in PRODUCTS.keys(): + product_acc = await get_product(pyth_init_product[product]) + publisher_acc = get_publisher_acc(product_acc) + + assert publisher_acc['price'] == 0 + assert publisher_acc['conf'] == 0 + assert publisher_acc['status'] == 'unknown' + + # Generate new values for this test + new_values = { + product: { + 'price':random.randint(1, 150), + 'conf': random.randint(1, 20), + 'status': 'trading', + } for product in PRODUCTS.keys() + } + + # Update the values of the products + for product in PRODUCTS.keys(): + await update_price( + pyth_init_price[product], + new_values[product]['price'], + new_values[product]['conf'], + new_values[product]['status']) + + time.sleep(80) + + # Crank the products + for product in PRODUCTS.keys(): + await update_price( + pyth_init_price[product], + 1, + 1, + 'trading') + + time.sleep(80) + + # Check that the price has been updated + for product in PRODUCTS.keys(): + product_acc = await get_product(pyth_init_product[product]) + publisher_acc = get_publisher_acc(product_acc) + + assert publisher_acc['price'] == new_values[product]['price'] + assert publisher_acc['conf'] == new_values[product]['conf'] + assert publisher_acc['status'] == new_values[product]['status']