diff --git a/CMakeLists.txt b/CMakeLists.txt index df5d49129..d03c7e37e 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -16,7 +16,8 @@ include_directories( program/src/ ) # gcc compiler/linker flags add_compile_options( -ggdb -Wall -Wextra -Wsign-conversion -Werror -Wno-deprecated-declarations -m64 ) -set( CMAKE_CXX_FLAGS -std=c++11 ) +set( CMAKE_CXX_STANDARD 14) +set( CMAKE_CXX_FLAGS -std=c++14 ) set( CMAKE_C_FLAGS -std=c99 ) set( CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -pthread") @@ -101,6 +102,14 @@ add_executable( test_net pctest/test_net.cpp ) target_link_libraries( test_net ${PC_DEP} ) add_executable( test_publish pctest/test_publish.cpp ) target_link_libraries( test_publish ${PC_DEP} ) + +# This doesn't build on the bullseye base image, due to a packaging bug +# in the newer version of libqt5websockets5-dev for Debian. The below build instructions +# are left in for completeness, and work with libqt5websockets5-dev=5.11.3-5 (buster). +# find_package(Qt5 COMPONENTS Core Network WebSockets) +# add_executable( test_publish_websocket pctest/test_publish_websocket.cpp ) +# target_link_libraries( test_publish_websocket Qt5::Core Qt5::Network Qt5::WebSockets /usr/local/lib/libjcon.so) + add_executable( test_qset pctest/test_qset.cpp ) target_link_libraries( test_qset ${PC_DEP} ) add_executable( test_twap pctest/test_twap.cpp ) diff --git a/docker/Dockerfile b/docker/Dockerfile index d054c8f2a..116ccd966 100644 --- a/docker/Dockerfile +++ b/docker/Dockerfile @@ -20,7 +20,15 @@ RUN apt-get install -qq \ python3-websockets \ sudo \ zlib1g \ - zlib1g-dev + zlib1g-dev \ + qtbase5-dev \ + qtchooser \ + qt5-qmake \ + qtbase5-dev-tools \ + libqt5websockets5-dev + +# Install jcon-cpp library +RUN git clone https://github.com/joncol/jcon-cpp.git /jcon-cpp && cd /jcon-cpp && git checkout 2235654e39c7af505d7158bf996e47e37a23d6e3 && mkdir build && cd build && cmake .. && make -j4 && make install # Grant sudo access to pyth user RUN echo "pyth ALL=(ALL) NOPASSWD:ALL" >> /etc/sudoers diff --git a/pctest/test_publish_websocket.cpp b/pctest/test_publish_websocket.cpp new file mode 100644 index 000000000..75eb1e4cc --- /dev/null +++ b/pctest/test_publish_websocket.cpp @@ -0,0 +1,317 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +#include +#include +#include +#include +#include + +/* +This file demonstrates how to use publish price updates to the Pyth Network using +the pythd websocket API described at https://docs.pyth.network/publish-data/pyth-client-websocket-api. + +High-level flow: +- Call get_product_list_and_subscribe to fetch the product metadata, enabling us to associate the price account public + keys with the symbols we want to publish for. +- For each price account public key, call subscribe_price_sched to subscribe to the price update schedule + for that price account. This will return a subscription ID. +- When we receive a notify_price_sched message, we should: + - Look up the price account associated with the subscription ID. + - If we have any new price information for that price account, send an update_price message to submit this + to the network. + +This example uses the jcon-cpp library, which depends on qt, but you can use any JSON-RPC protocol that +suits your stack. +*/ + +// This class is responsible for the communication with pythd over the websocket protocol +// described at https://docs.pyth.network/publish-data/pyth-client-websocket-api +// +// It abstracts away any solana-specific details, such as public keys. +class pythd_websocket +{ + public: + // The status of a given symbol. Either "trading" or "halted". + typedef std::string status_t; + // The symbol of the product. For example "Crypto.BTC/USD" or "Equity.US.GOOG/USD". + typedef std::string symbol_t; + + pythd_websocket( QObject* parent, std::string pythd_websocket_endpoint ); + + // Submit a new price update for the given symbol, which will be sent to + // pythd next time a notify_price_sched message for that symbol is received. + void add_price_update( symbol_t symbol, int64_t price, uint64_t conf, status_t status ); + + private: + + typedef struct { + int64_t price; + uint64_t conf; + std::string status; + time_t timestamp; + } update_t; + + typedef struct { + int64_t exponent; + } account_metadata_t; + + typedef int64_t subscription_id_t; + typedef std::string account_pubkey_t; + + // Mapping of account public keys to account metadata + std::map account_to_metadata_; + // Mapping of product symbols to price account public keys. + std::map symbol_to_account_; + // Mapping of pythd subscription identifiers to price account public keys. + std::map subscription_to_account_; + // Mapping of price account public key to pythd subscription identifiers + std::map account_to_subscription_; + // Mapping of price account public keys to the updates we will send to pythd + // next time we receive a notify_price_sched message with a subscription identifier + // corresponding to the price account public key. + std::map pending_updates_; + + // Websocket client to handle connection to pythd + std::string pythd_websocket_endpoint_; + jcon::JsonRpcWebSocketClient *rpc_client_; + void connect( ); + + // The duration between the price update being submitted and notify_price_sched being received + // after which the update will be dropped. + long update_staleness_threshold_secs_ = 3; + + // The pythd websocket API calls + + // Fetch the product list, and update the internal mapping of symbols to accounts. + void get_product_list_and_subscribe( ); + + // Send an update_price websocket message for the given price account. + void update_price( account_pubkey_t account, int price, uint conf, status_t status ); + + // Subscribe to the price update schedule for the given price account. + void subscribe_price_sched( account_pubkey_t account ); + + // Handler for notify_price_sched messages, indicating that any pending updates + // for the price account associated with the given subscription id should be sent. + void on_notify_price_sched( subscription_id_t subscription_id ); + +}; + +pythd_websocket::pythd_websocket( QObject* parent, std::string pythd_websocket_endpoint ) +{ + this->pythd_websocket_endpoint_ = pythd_websocket_endpoint; + this->rpc_client_ = new jcon::JsonRpcWebSocketClient(parent); + + // Set up the handler for notify_price_sched + this->rpc_client_->enableReceiveNotification(true); + QObject::connect(rpc_client_, &jcon::JsonRpcClient::notificationReceived, parent, [this](const QString& key, const QVariant& value){ + if (key == "notify_price_sched") { + this->on_notify_price_sched( value.toMap()["subscription"].toInt() ); + } + }); + + // Continually check the connection and reconnect if dropped + QTimer *timer = new QTimer( parent ); + QObject::connect(timer, &QTimer::timeout, parent, [this](){ + if ( !this->rpc_client_->isConnected() ) { + // Reset the subscription state + this->subscription_to_account_.clear(); + this->account_to_subscription_.clear(); + + // Reconnect + this->connect(); + this->get_product_list_and_subscribe(); + } + }); + timer->start(1000); +} + +void pythd_websocket::connect() +{ + this->rpc_client_->connectToServer(QUrl(QString::fromStdString(pythd_websocket_endpoint_))); +} + +void pythd_websocket::get_product_list_and_subscribe( ) +{ + auto req = this->rpc_client_->callAsync("get_product_list"); + + req->connect(req.get(), &jcon::JsonRpcRequest::result, [this](const QVariant& result){ + // Loop over all the products + auto products = result.toList(); + for (int i = 0; i < products.length(); i++) { + + auto product = products[i].toMap(); + + // Extract the symbol, price account and exponent + auto attr_dict = product["attr_dict"].toMap(); + symbol_t symbol = attr_dict["symbol"].toString().toStdString(); + auto price_account = product["price"].toList()[0].toMap(); + account_pubkey_t account = price_account["account"].toString().toStdString(); + int64_t exponent = price_account["price_exponent"].toInt(); + + // If this is a new symbol, associate the symbol with the account + if (this->symbol_to_account_.find(account) == this->symbol_to_account_.end() || this->symbol_to_account_[symbol] != account) { + this->symbol_to_account_[symbol] = account; + } + + // Update the account metadata + this->account_to_metadata_[account] = account_metadata_t{ + exponent = exponent, + }; + + // If we don't already have a subscription for this account, subscribe to it + if (account_to_subscription_.find(account) == account_to_subscription_.end()) { + this->subscribe_price_sched(account); + } + } + }); + + req->connect(req.get(), &jcon::JsonRpcRequest::error, [](int code, const QString& message) { + std::cout << "error sending get_product_list (" << code << ") " << message.toStdString() << std::endl; + }); +} + +void pythd_websocket::subscribe_price_sched( account_pubkey_t account ) +{ + auto req = this->rpc_client_->callAsyncNamedParams("subscribe_price_sched", + QVariantMap{ + {"account", QString::fromStdString(account)}, + }); + + req->connect(req.get(), &jcon::JsonRpcRequest::error, [](int code, const QString& message) { + std::cout << "error sending subscribe_price_sched (" << code << ") " << message.toStdString() << std::endl; + }); + + req->connect(req.get(), &jcon::JsonRpcRequest::result, [this, account](const QVariant& result){ + auto subscription_id = result.toMap()["subscription"].toInt(); + subscription_to_account_[subscription_id] = account; + account_to_subscription_[account] = subscription_id; + std::cout << "received subscription id " << subscription_id << " for account " << account << std::endl; + }); +} + +void pythd_websocket::update_price( account_pubkey_t account, int price, uint conf, status_t status ) +{ + // Scale the price and confidence by the exponent + int64_t exponent = (-1) * this->account_to_metadata_[account].exponent; + double scaled_price = price * pow(10, exponent); + double scaled_conf = conf * pow(10, exponent); + + auto req = this->rpc_client_->callAsyncNamedParams("update_price", + QVariantMap{ + {"account", QString::fromStdString(account)}, + {"price", scaled_price}, + {"conf", scaled_conf}, + {"status", QString::fromStdString(status)} + }); + + req->connect(req.get(), &jcon::JsonRpcRequest::error, [account](int code, const QString& message) { + std::cout << "error sending update_price (" << code << ") " << message.toStdString() << std::endl; + }); +} + +void pythd_websocket::on_notify_price_sched( subscription_id_t subscription_id ) +{ + // Fetch the account associated with the subscription + if (this->subscription_to_account_.find(subscription_id) == this->subscription_to_account_.end()) { + return; + } + account_pubkey_t account = subscription_to_account_[subscription_id]; + + // Fetch any price update we have for this account + if (this->pending_updates_.find(account) == this->pending_updates_.end()) { + return; + } + update_t update = pending_updates_[account]; + this->pending_updates_.erase(account); + + // Check that the update is not stale + if ( (std::time(nullptr) - update.timestamp) > update_staleness_threshold_secs_) { + return; + } + + // Send the price update + update_price( account, update.price, update.conf, update.status ); +} + +void pythd_websocket::add_price_update( symbol_t symbol, int64_t price, uint64_t conf, status_t status ) { + if (this->symbol_to_account_.find(symbol) == this->symbol_to_account_.end()) { + return; + } + account_pubkey_t account = symbol_to_account_[symbol]; + + pending_updates_[account] = update_t{ + price: price, + conf: conf, + status: status, + timestamp: std::time(nullptr), + }; +} + +class test_publish; + +// The test_publish class is responsible for computing the next price and confidence values +// for the symbols and submitting these to its pythd_websocket publisher. +class test_publish +{ + public: + test_publish( QObject* parent, std::vector symbols_to_publish, std::string pythd_websocket_endpoint ); + + private: + pythd_websocket *pythd_websocket_; + std::vector symbols_to_publish_; + + void update_symbols(); +}; + +test_publish::test_publish( + QObject* parent, + std::vector symbols_to_publish, + std::string pythd_websocket_endpoint ) +{ + pythd_websocket_ = new pythd_websocket( parent, pythd_websocket_endpoint ); + this->symbols_to_publish_.insert(this->symbols_to_publish_.end(), symbols_to_publish.begin(), symbols_to_publish.end()); + + // Continually generate new values for the symbols + QTimer *timer = new QTimer( parent ); + QObject::connect(timer, &QTimer::timeout, parent, [this](){ + this->update_symbols(); + }); + timer->start(1000); +} + +void test_publish::update_symbols() { + + // Send a random price update for each symbol + for (size_t i = 0; i < symbols_to_publish_.size(); i++) { + int64_t next_price = std::rand(); + uint64_t next_conf = (uint64_t) std::rand(); + + this->pythd_websocket_->add_price_update( + this->symbols_to_publish_[i], next_price, next_conf, "trading" ); + } + +} + +int main( int argc, char* argv[] ) +{ + std::string pythd_websocket_endpoint = "ws://127.0.0.1:8910"; + + QCoreApplication app(argc, argv); + std::vector symbols_to_publish = { "Crypto.BTC/USD", "Crypto.ETH/USD" }; + new test_publish( &app, symbols_to_publish, pythd_websocket_endpoint ); + + app.exec(); +}