Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 31 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "pyth-agent"
version = "2.0.1"
version = "2.1.0"
edition = "2021"

[[bin]]
Expand Down Expand Up @@ -51,6 +51,7 @@ humantime = "2.1.0"
prometheus-client = "0.19.0"
lazy_static = "1.4.0"
toml_edit = "0.19.13"
slog-bunyan = "2.4.0"

[dev-dependencies]
tokio-util = { version = "0.7.0", features = ["full"] }
Expand Down
9 changes: 8 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,16 @@ The agent takes a single `--config` CLI option, pointing at
there, containing a minimal set of mandatory options and documentation
comments for optional settings. **The config file must exist.**

### Logging
The logging level can be configured at runtime
through the `RUST_LOG` environment variable using the standard
`error|warn|info|debug|trace` levels.
`error|warn|info|debug|trace`.

#### Plain/JSON logging
By default, pyth-agent will print plaintext log statements. This can be switched to structured JSON output with `-l json`.

#### Code location in logs
For debugging purposes, you can specify `-L` to print file/line information with each log statement. This option is disabled by default.

### Key Store Config Migration [v1.x.x LEGACY]
Pyth agent v2.0.0 introduces a simplified program and mapping key configuration. This breaking change alters how you define program/mapping key options in your agent config:
Expand Down
10 changes: 8 additions & 2 deletions src/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,9 +92,15 @@ impl Agent {
}

pub async fn start(&self, logger: Logger) {
info!(logger, "starting agent"; "config" => format!("{:?}", self.config));
info!(logger, "Starting {}", env!("CARGO_PKG_NAME");
"config" => format!("{:?}", &self.config),
"version" => env!("CARGO_PKG_VERSION"),
"cwd" => std::env::current_dir().map(|p| format!("{}", p.display())).unwrap_or("<could not get current directory>".to_owned())
);

if let Err(err) = self.spawn(logger.clone()).await {
error!(logger, "{:#}", err; "error" => format!("{:?}", err));
error!(logger, "{}", err);
debug!(logger, "error context"; "context" => format!("{:?}", err));
};
}

Expand Down
6 changes: 4 additions & 2 deletions src/agent/pythd/adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,8 @@ impl Adapter {
tokio::select! {
Some(message) = self.message_rx.recv() => {
if let Err(err) = self.handle_message(message).await {
error!(self.logger, "{:#}", err; "error" => format!("{:?}", err))
error!(self.logger, "{}", err);
debug!(self.logger, "error context"; "context" => format!("{:?}", err));
}
}
_ = self.shutdown_rx.recv() => {
Expand All @@ -219,7 +220,8 @@ impl Adapter {
}
_ = self.notify_price_sched_interval.tick() => {
if let Err(err) = self.send_notify_price_sched().await {
error!(self.logger, "{:#}", err; "error" => format!("{:?}", err))
error!(self.logger, "{}", err);
debug!(self.logger, "error context"; "context" => format!("{:?}", err));
}
}
}
Expand Down
6 changes: 4 additions & 2 deletions src/agent/pythd/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,8 @@ pub mod rpc {
return;
}

error!(self.logger, "{:#}", err; "error" => format!("{:?}", err))
error!(self.logger, "{}", err);
debug!(self.logger, "error context"; "context" => format!("{:?}", err));
}
}
}
Expand Down Expand Up @@ -617,7 +618,8 @@ pub mod rpc {

pub async fn run(&self, shutdown_rx: broadcast::Receiver<()>) {
if let Err(err) = self.serve(shutdown_rx).await {
error!(self.logger, "{:#}", err; "error" => format!("{:?}", err))
error!(self.logger, "{}", err);
debug!(self.logger, "error context"; "context" => format!("{:?}", err));
}
}

Expand Down
11 changes: 2 additions & 9 deletions src/agent/solana.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,10 +113,7 @@ pub mod network {
/// The key_store module is responsible for parsing the pythd key store.
mod key_store {
use {
anyhow::{
Context,
Result,
},
anyhow::Result,
serde::{
de::Error,
Deserialize,
Expand All @@ -131,11 +128,7 @@ mod key_store {
signer::keypair,
},
std::{
fs,
path::{
Path,
PathBuf,
},
path::PathBuf,
str::FromStr,
},
};
Expand Down
11 changes: 7 additions & 4 deletions src/agent/solana/exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,8 @@ impl Exporter {
loop {
self.publish_interval.tick().await;
if let Err(err) = self.publish_updates().await {
error!(self.logger, "{:#}", err; "error" => format!("{:?}", err));
error!(self.logger, "{}", err);
debug!(self.logger, "error context"; "context" => format!("{:?}", err));
}
}
}
Expand Down Expand Up @@ -367,7 +368,7 @@ impl Exporter {
let permissioned_updates = fresh_updates
.into_iter()
.filter(|(id, _data)| {
let key_from_id = Pubkey::new(id.clone().to_bytes().as_slice());
let key_from_id = Pubkey::new((*id).clone().to_bytes().as_slice());
if self.our_prices.contains(&key_from_id) {
true
} else {
Expand Down Expand Up @@ -751,7 +752,8 @@ impl NetworkStateQuerier {
loop {
self.query_interval.tick().await;
if let Err(err) = self.query_network_state().await {
error!(self.logger, "{:#}", err; "error" => format!("{:?}", err));
error!(self.logger, "{}", err);
debug!(self.logger, "error context"; "context" => format!("{:?}", err));
}
}
}
Expand Down Expand Up @@ -867,7 +869,8 @@ mod transaction_monitor {
pub async fn run(&mut self) {
loop {
if let Err(err) = self.handle_next().await {
error!(self.logger, "{:#}", err; "error" => format!("{:?}", err));
error!(self.logger, "{}", err);
debug!(self.logger, "error context"; "context" => format!("{:?}", err));
}
}
}
Expand Down
14 changes: 10 additions & 4 deletions src/agent/solana/oracle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,8 @@ impl Oracle {
pub async fn run(&mut self) {
loop {
if let Err(err) = self.handle_next().await {
error!(self.logger, "{:#}", err; "error" => format!("{:?}", err));
error!(self.logger, "{}", err);
debug!(self.logger, "error context"; "context" => format!("{:?}", err));
}
}
}
Expand Down Expand Up @@ -400,7 +401,8 @@ impl Poller {
self.poll_interval.tick().await;
info!(self.logger, "fetching all pyth account data");
if let Err(err) = self.poll_and_send().await {
error!(self.logger, "{:#}", err; "error" => format!("{:?}", err));
error!(self.logger, "{}", err);
debug!(self.logger, "error context"; "context" => format!("{:?}", err));
}
}
}
Expand Down Expand Up @@ -661,14 +663,18 @@ mod subscriber {
pub async fn run(&self) {
match self.start_shadow().await {
Ok(mut shadow_rx) => self.forward_updates(&mut shadow_rx).await,
Err(err) => error!(self.logger, "{:#}", err; "error" => format!("{:?}", err)),
Err(err) => {
error!(self.logger, "{}", err);
debug!(self.logger, "error context"; "context" => format!("{:?}", err));
}
}
}

async fn forward_updates(&self, shadow_rx: &mut broadcast::Receiver<(Pubkey, Account)>) {
loop {
if let Err(err) = self.forward_update(shadow_rx).await {
error!(self.logger, "error forwarding updates: {:#}", err; "error" => format!("{:?}", err))
error!(self.logger, "{}", err);
debug!(self.logger, "error context"; "context" => format!("{:?}", err));
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/agent/store/global.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ impl From<oracle::ProductEntry> for ProductAccountMetadata {
}
}


/// PriceAccountMetadata contains the metadata for a price account.
#[derive(Debug, Clone)]
pub struct PriceAccountMetadata {
Expand Down Expand Up @@ -185,7 +184,8 @@ impl Store {
pub async fn run(&mut self) {
loop {
if let Err(err) = self.handle_next().await {
error!(self.logger, "{:#}", err; "error" => format!("{:?}", err));
error!(self.logger, "{}", err);
debug!(self.logger, "error context"; "context" => format!("{:?}", err));
}
}
}
Expand Down
3 changes: 2 additions & 1 deletion src/agent/store/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,8 @@ impl Store {
pub async fn run(&mut self) {
while let Some(message) = self.rx.recv().await {
if let Err(err) = self.handle(message) {
error!(self.logger, "{:#}", err; "error" => format!("{:?}", err))
error!(self.logger, "{}", err);
debug!(self.logger, "error context"; "context" => format!("{:?}", err));
}
}
}
Expand Down
Loading