Skip to content

Commit 4987eb6

Browse files
committed
Reset subscriptions after reconnect
1 parent 06212b7 commit 4987eb6

File tree

1 file changed

+23
-12
lines changed

1 file changed

+23
-12
lines changed

pctest/test_publish_websocket.cpp

Lines changed: 23 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -63,9 +63,11 @@ class pythd_websocket
6363
typedef std::string account_pubkey_t;
6464

6565
// Mapping of product symbols to price account public keys.
66-
std::map<symbol_t, account_pubkey_t> accounts_;
66+
std::map<symbol_t, account_pubkey_t> symbol_to_account_;
6767
// Mapping of pythd subscription identifiers to price account public keys.
68-
std::map<int64_t, account_pubkey_t> subscriptions_;
68+
std::map<subscription_id_t, account_pubkey_t> subscription_to_account_;
69+
// Mapping of price account public key to pythd subscription identifiers
70+
std::map<account_pubkey_t, subscription_id_t> account_to_subscription_;
6971
// Mapping of price account public keys to the updates we will send to pythd
7072
// next time we receive a notify_price_sched message with a subscription identifier
7173
// corresponding to the price account public key.
@@ -114,6 +116,11 @@ pythd_websocket::pythd_websocket( QObject* parent, std::string pythd_websocket_e
114116
QTimer *timer = new QTimer( parent );
115117
QObject::connect(timer, &QTimer::timeout, parent, [this](){
116118
if ( !rpc_client_->isConnected() ) {
119+
// Reset the subscription state
120+
subscription_to_account_.clear();
121+
account_to_subscription_.clear();
122+
123+
// Reconnect
117124
connect();
118125
get_product_list_and_subscribe();
119126
}
@@ -141,13 +148,16 @@ void pythd_websocket::get_product_list_and_subscribe( )
141148
account_pubkey_t account = product["price"].toList()[0].toMap()["account"].toString().toStdString();
142149
auto attr_dict = product["attr_dict"].toMap();
143150
symbol_t symbol = attr_dict["symbol"].toString().toStdString();
144-
151+
145152
// If this is a new symbol, associate the symbol with the account
146-
// and subscribe to updates from this symbol.
147-
if (accounts_.find(account) == accounts_.end() || accounts_[symbol] != account) {
148-
accounts_[symbol] = account;
149-
subscribe_price_sched(account);
153+
if (symbol_to_account_.find(account) == symbol_to_account_.end() || symbol_to_account_[symbol] != account) {
154+
symbol_to_account_[symbol] = account;
150155
}
156+
157+
// If we don't already have a subscription for this account, subscribe to it
158+
if (account_to_subscription_.find(account) == account_to_subscription_.end()) {
159+
subscribe_price_sched(account);
160+
}
151161
}
152162
});
153163

@@ -169,7 +179,8 @@ void pythd_websocket::subscribe_price_sched( account_pubkey_t account )
169179

170180
req->connect(req.get(), &jcon::JsonRpcRequest::result, [this, account](const QVariant& result){
171181
auto subscription_id = result.toMap()["subscription"].toInt();
172-
subscriptions_[subscription_id] = account;
182+
subscription_to_account_[subscription_id] = account;
183+
account_to_subscription_[account] = subscription_id;
173184
std::cout << "received subscription id " << subscription_id << " for account " << account << std::endl;
174185
});
175186
}
@@ -192,10 +203,10 @@ void pythd_websocket::update_price( account_pubkey_t account, int price, uint co
192203
void pythd_websocket::on_notify_price_sched( subscription_id_t subscription_id )
193204
{
194205
// Fetch the account associated with the subscription
195-
if (subscriptions_.find(subscription_id) == subscriptions_.end()) {
206+
if (subscription_to_account_.find(subscription_id) == subscription_to_account_.end()) {
196207
return;
197208
}
198-
account_pubkey_t account = subscriptions_[subscription_id];
209+
account_pubkey_t account = subscription_to_account_[subscription_id];
199210

200211
// Fetch any price update we have for this account
201212
if (pending_updates_.find(account) == pending_updates_.end()) {
@@ -214,10 +225,10 @@ void pythd_websocket::on_notify_price_sched( subscription_id_t subscription_id )
214225
}
215226

216227
void pythd_websocket::add_price_update( symbol_t symbol, int64_t price, uint64_t conf, status_t status ) {
217-
if (accounts_.find(symbol) == accounts_.end()) {
228+
if (symbol_to_account_.find(symbol) == symbol_to_account_.end()) {
218229
return;
219230
}
220-
account_pubkey_t account = accounts_[symbol];
231+
account_pubkey_t account = symbol_to_account_[symbol];
221232

222233
pending_updates_[account] = update_t{
223234
price: price,

0 commit comments

Comments
 (0)