Skip to content

Commit cbd862c

Browse files
committed
fix: remove adapter <> api deadlock
Adapter sends price update notifications to the subscriber API connection actor but at the same time the subscriber actor can get blcoked on receiving messages (e.g. product info) from Adapter. If the channel from Adapter to the subscriber gets full the Adapter can get blocked and this results in a deadlock. This situation propagates and queues messages in other actors and WS messages from the clients will grow in an unbounded queue that results in a constant memory growth (leak). This change simply makes the Adaptor to subscriber API connection nonblocking by dropping messages if the channels are full.
1 parent d19023b commit cbd862c

File tree

4 files changed

+25
-24
lines changed

4 files changed

+25
-24
lines changed

Cargo.lock

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/agent/pythd/adapter.rs

Lines changed: 21 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -214,7 +214,7 @@ impl Adapter {
214214
Some(message) = self.message_rx.recv() => {
215215
if let Err(err) = self.handle_message(message).await {
216216
error!(self.logger, "{}", err);
217-
debug!(self.logger, "error context"; "context" => format!("{:?}", err));
217+
debug!(self.logger, "error context"; "context" => format!("{:?}", err));
218218
}
219219
}
220220
_ = self.shutdown_rx.recv() => {
@@ -224,7 +224,7 @@ impl Adapter {
224224
_ = self.notify_price_sched_interval.tick() => {
225225
if let Err(err) = self.send_notify_price_sched().await {
226226
error!(self.logger, "{}", err);
227-
debug!(self.logger, "error context"; "context" => format!("{:?}", err));
227+
debug!(self.logger, "error context"; "context" => format!("{:?}", err));
228228
}
229229
}
230230
}
@@ -508,12 +508,15 @@ impl Adapter {
508508

509509
async fn send_notify_price_sched(&self) -> Result<()> {
510510
for subscription in self.notify_price_sched_subscriptions.values().flatten() {
511+
// Send the notify price sched update without awaiting. This results
512+
// in raising errors if the channel is full which normally should not
513+
// happen. This is because we do not want to
514+
// block the adapter if the channel is full.
511515
subscription
512516
.notify_price_sched_tx
513-
.send(NotifyPriceSched {
517+
.try_send(NotifyPriceSched {
514518
subscription: subscription.subscription_id,
515-
})
516-
.await?;
519+
})?;
517520
}
518521

519522
Ok(())
@@ -580,19 +583,19 @@ impl Adapter {
580583

581584
// Send the Notify Price update to each subscription
582585
for subscription in subscriptions {
583-
subscription
584-
.notify_price_tx
585-
.send(NotifyPrice {
586-
subscription: subscription.subscription_id,
587-
result: PriceUpdate {
588-
price,
589-
conf,
590-
status: Self::price_status_to_str(status),
591-
valid_slot,
592-
pub_slot,
593-
},
594-
})
595-
.await?;
586+
// Send the notify price update without awaiting. This results in raising errors if the
587+
// channel is full which normally should not happen. This is because we do not want to
588+
// block the adapter if the channel is full.
589+
subscription.notify_price_tx.try_send(NotifyPrice {
590+
subscription: subscription.subscription_id,
591+
result: PriceUpdate {
592+
price,
593+
conf,
594+
status: Self::price_status_to_str(status),
595+
valid_slot,
596+
pub_slot,
597+
},
598+
})?;
596599
}
597600

598601
Ok(())

src/agent/remote_keypair_loader.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -314,8 +314,7 @@ async fn handle_key_requests(
314314
match response_tx.send(copied_keypair) {
315315
Ok(()) => {}
316316
Err(_e) => {
317-
warn!(logger, "remote_keypair_loader: Could not send back secondary keypair to channel";
318-
);
317+
warn!(logger, "remote_keypair_loader: Could not send back secondary keypair to channel");
319318
}
320319
}
321320
}

src/agent/solana/exporter.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -462,7 +462,7 @@ impl Exporter {
462462
self.update_our_prices(&publish_keypair.pubkey());
463463

464464
debug!(self.logger, "Exporter: filtering prices permissioned to us";
465-
"our_prices" => format!("{:?}", self.our_prices),
465+
"our_prices" => format!("{:?}", self.our_prices.keys()),
466466
"publish_pubkey" => publish_keypair.pubkey().to_string(),
467467
);
468468

@@ -594,7 +594,6 @@ impl Exporter {
594594
trace!(
595595
self.logger,
596596
"Exporter: No more permissioned price accounts in channel, using cached value";
597-
"cached_value" => format!("{:?}", self.our_prices),
598597
);
599598
break;
600599
}

0 commit comments

Comments
 (0)