From 049f5c0b430ff8a40dd6ab6bfc191d3fce757b92 Mon Sep 17 00:00:00 2001 From: Stan Drozd Date: Wed, 26 Jul 2023 16:52:30 +0200 Subject: [PATCH 1/6] Make error!() logs shorter, label log location, fix warnings --- src/agent.rs | 3 ++- src/agent/pythd/adapter.rs | 6 ++++-- src/agent/pythd/api.rs | 6 ++++-- src/agent/solana.rs | 11 ++--------- src/agent/solana/exporter.rs | 11 +++++++---- src/agent/solana/oracle.rs | 14 ++++++++++---- src/agent/store/global.rs | 4 ++-- src/agent/store/local.rs | 3 ++- src/bin/agent.rs | 4 +++- 9 files changed, 36 insertions(+), 26 deletions(-) diff --git a/src/agent.rs b/src/agent.rs index afc9439e..442b9878 100644 --- a/src/agent.rs +++ b/src/agent.rs @@ -94,7 +94,8 @@ impl Agent { pub async fn start(&self, logger: Logger) { info!(logger, "starting agent"; "config" => format!("{:?}", self.config)); if let Err(err) = self.spawn(logger.clone()).await { - error!(logger, "{:#}", err; "error" => format!("{:?}", err)); + error!(logger, "{}", err); + debug!(logger, "error context"; "context" => format!("{:?}", err)); }; } diff --git a/src/agent/pythd/adapter.rs b/src/agent/pythd/adapter.rs index c74dd676..a64f7416 100644 --- a/src/agent/pythd/adapter.rs +++ b/src/agent/pythd/adapter.rs @@ -210,7 +210,8 @@ impl Adapter { tokio::select! { Some(message) = self.message_rx.recv() => { if let Err(err) = self.handle_message(message).await { - error!(self.logger, "{:#}", err; "error" => format!("{:?}", err)) + error!(self.logger, "{}", err); + debug!(self.logger, "error context"; "context" => format!("{:?}", err)); } } _ = self.shutdown_rx.recv() => { @@ -219,7 +220,8 @@ impl Adapter { } _ = self.notify_price_sched_interval.tick() => { if let Err(err) = self.send_notify_price_sched().await { - error!(self.logger, "{:#}", err; "error" => format!("{:?}", err)) + error!(self.logger, "{}", err); + debug!(self.logger, "error context"; "context" => format!("{:?}", err)); } } } diff --git a/src/agent/pythd/api.rs b/src/agent/pythd/api.rs index f75b58bb..571b1686 100644 --- a/src/agent/pythd/api.rs +++ b/src/agent/pythd/api.rs @@ -259,7 +259,8 @@ pub mod rpc { return; } - error!(self.logger, "{:#}", err; "error" => format!("{:?}", err)) + error!(self.logger, "{}", err); + debug!(self.logger, "error context"; "context" => format!("{:?}", err)); } } } @@ -617,7 +618,8 @@ pub mod rpc { pub async fn run(&self, shutdown_rx: broadcast::Receiver<()>) { if let Err(err) = self.serve(shutdown_rx).await { - error!(self.logger, "{:#}", err; "error" => format!("{:?}", err)) + error!(self.logger, "{}", err); + debug!(self.logger, "error context"; "context" => format!("{:?}", err)); } } diff --git a/src/agent/solana.rs b/src/agent/solana.rs index 86ed9ff7..d1702513 100644 --- a/src/agent/solana.rs +++ b/src/agent/solana.rs @@ -113,10 +113,7 @@ pub mod network { /// The key_store module is responsible for parsing the pythd key store. mod key_store { use { - anyhow::{ - Context, - Result, - }, + anyhow::Result, serde::{ de::Error, Deserialize, @@ -131,11 +128,7 @@ mod key_store { signer::keypair, }, std::{ - fs, - path::{ - Path, - PathBuf, - }, + path::PathBuf, str::FromStr, }, }; diff --git a/src/agent/solana/exporter.rs b/src/agent/solana/exporter.rs index e5a94c27..0ced8058 100644 --- a/src/agent/solana/exporter.rs +++ b/src/agent/solana/exporter.rs @@ -270,7 +270,8 @@ impl Exporter { loop { self.publish_interval.tick().await; if let Err(err) = self.publish_updates().await { - error!(self.logger, "{:#}", err; "error" => format!("{:?}", err)); + error!(self.logger, "{}", err); + debug!(self.logger, "error context"; "context" => format!("{:?}", err)); } } } @@ -367,7 +368,7 @@ impl Exporter { let permissioned_updates = fresh_updates .into_iter() .filter(|(id, _data)| { - let key_from_id = Pubkey::new(id.clone().to_bytes().as_slice()); + let key_from_id = Pubkey::new((*id).clone().to_bytes().as_slice()); if self.our_prices.contains(&key_from_id) { true } else { @@ -751,7 +752,8 @@ impl NetworkStateQuerier { loop { self.query_interval.tick().await; if let Err(err) = self.query_network_state().await { - error!(self.logger, "{:#}", err; "error" => format!("{:?}", err)); + error!(self.logger, "{}", err); + debug!(self.logger, "error context"; "context" => format!("{:?}", err)); } } } @@ -867,7 +869,8 @@ mod transaction_monitor { pub async fn run(&mut self) { loop { if let Err(err) = self.handle_next().await { - error!(self.logger, "{:#}", err; "error" => format!("{:?}", err)); + error!(self.logger, "{}", err); + debug!(self.logger, "error context"; "context" => format!("{:?}", err)); } } } diff --git a/src/agent/solana/oracle.rs b/src/agent/solana/oracle.rs index 86dcdb20..96a2963d 100644 --- a/src/agent/solana/oracle.rs +++ b/src/agent/solana/oracle.rs @@ -196,7 +196,8 @@ impl Oracle { pub async fn run(&mut self) { loop { if let Err(err) = self.handle_next().await { - error!(self.logger, "{:#}", err; "error" => format!("{:?}", err)); + error!(self.logger, "{}", err); + debug!(self.logger, "error context"; "context" => format!("{:?}", err)); } } } @@ -400,7 +401,8 @@ impl Poller { self.poll_interval.tick().await; info!(self.logger, "fetching all pyth account data"); if let Err(err) = self.poll_and_send().await { - error!(self.logger, "{:#}", err; "error" => format!("{:?}", err)); + error!(self.logger, "{}", err); + debug!(self.logger, "error context"; "context" => format!("{:?}", err)); } } } @@ -661,14 +663,18 @@ mod subscriber { pub async fn run(&self) { match self.start_shadow().await { Ok(mut shadow_rx) => self.forward_updates(&mut shadow_rx).await, - Err(err) => error!(self.logger, "{:#}", err; "error" => format!("{:?}", err)), + Err(err) => { + error!(self.logger, "{}", err); + debug!(self.logger, "error context"; "context" => format!("{:?}", err)); + } } } async fn forward_updates(&self, shadow_rx: &mut broadcast::Receiver<(Pubkey, Account)>) { loop { if let Err(err) = self.forward_update(shadow_rx).await { - error!(self.logger, "error forwarding updates: {:#}", err; "error" => format!("{:?}", err)) + error!(self.logger, "{}", err); + debug!(self.logger, "error context"; "context" => format!("{:?}", err)); } } } diff --git a/src/agent/store/global.rs b/src/agent/store/global.rs index c7c4f847..f8f715ec 100644 --- a/src/agent/store/global.rs +++ b/src/agent/store/global.rs @@ -74,7 +74,6 @@ impl From for ProductAccountMetadata { } } - /// PriceAccountMetadata contains the metadata for a price account. #[derive(Debug, Clone)] pub struct PriceAccountMetadata { @@ -185,7 +184,8 @@ impl Store { pub async fn run(&mut self) { loop { if let Err(err) = self.handle_next().await { - error!(self.logger, "{:#}", err; "error" => format!("{:?}", err)); + error!(self.logger, "{}", err); + debug!(self.logger, "error context"; "context" => format!("{:?}", err)); } } } diff --git a/src/agent/store/local.rs b/src/agent/store/local.rs index 930f6794..fe6b3192 100644 --- a/src/agent/store/local.rs +++ b/src/agent/store/local.rs @@ -84,7 +84,8 @@ impl Store { pub async fn run(&mut self) { while let Some(message) = self.rx.recv().await { if let Err(err) = self.handle(message) { - error!(self.logger, "{:#}", err; "error" => format!("{:?}", err)) + error!(self.logger, "{}", err); + debug!(self.logger, "error context"; "context" => format!("{:?}", err)); } } } diff --git a/src/bin/agent.rs b/src/bin/agent.rs index d5833db6..112a8703 100644 --- a/src/bin/agent.rs +++ b/src/bin/agent.rs @@ -49,6 +49,7 @@ async fn main() -> Result<()> { // A plain slog drain that sits inside an async drain instance let inner_drain = LogBuilder::new( slog_term::FullFormat::new(slog_term::TermDecorator::new().stdout().build()) + .use_file_location() .build() .fuse(), // Yell loud on logger internal errors ) @@ -67,7 +68,8 @@ async fn main() -> Result<()> { debug!(&logger, "Current working directory"; "cwd" => cwd.display()); if let Err(err) = start(config, logger.clone()).await { - error!(logger, "{:#}", err; "error" => format!("{:?}", err)); + error!(logger, "{}", err); + debug!(logger, "error context"; "context" => format!("{:?}", err)); return Err(err); } From 5988ad3e3f71741396308e8efcaeb3a8e13a31f8 Mon Sep 17 00:00:00 2001 From: Stan Drozd Date: Fri, 4 Aug 2023 14:00:31 +0200 Subject: [PATCH 2/6] Add JSON log support via slog-bunyan; Add code location flag --- Cargo.lock | 30 +++++++++++++++++ Cargo.toml | 1 + src/bin/agent.rs | 86 ++++++++++++++++++++++++++++++++++++++---------- 3 files changed, 99 insertions(+), 18 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c578c4ef..d2d338ea 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1543,6 +1543,17 @@ dependencies = [ "hmac 0.8.1", ] +[[package]] +name = "hostname" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3c731c3e10504cc8ed35cfe2f1db4c9274c3d35fa486e3b31df46f068ef3e867" +dependencies = [ + "libc", + "match_cfg", + "winapi", +] + [[package]] name = "htmlescape" version = "0.3.1" @@ -2007,6 +2018,12 @@ dependencies = [ "hashbrown 0.12.3", ] +[[package]] +name = "match_cfg" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ffbee8634e0d45d258acb448e7eaab3fce7a0a467395d4d9f228e3c1f01fb2e4" + [[package]] name = "matches" version = "0.1.9" @@ -2746,6 +2763,7 @@ dependencies = [ "serde_json", "slog", "slog-async", + "slog-bunyan", "slog-envlogger", "slog-extlog", "slog-term", @@ -3696,6 +3714,18 @@ dependencies = [ "thread_local", ] +[[package]] +name = "slog-bunyan" +version = "2.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "440fd32d0423c31e4f98d76c0b62ebdb847f905aa07357197e9b41ac620af97d" +dependencies = [ + "hostname", + "slog", + "slog-json", + "time 0.3.14", +] + [[package]] name = "slog-envlogger" version = "2.2.0" diff --git a/Cargo.toml b/Cargo.toml index 7e1645bf..cd26ea2b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -51,6 +51,7 @@ humantime = "2.1.0" prometheus-client = "0.19.0" lazy_static = "1.4.0" toml_edit = "0.19.13" +slog-bunyan = "2.4.0" [dev-dependencies] tokio-util = { version = "0.7.0", features = ["full"] } diff --git a/src/bin/agent.rs b/src/bin/agent.rs index 112a8703..74fcb055 100644 --- a/src/bin/agent.rs +++ b/src/bin/agent.rs @@ -4,7 +4,10 @@ use { Context, Result, }, - clap::Parser, + clap::{ + Parser, + ValueEnum, + }, pyth_agent::agent::{ config::Config, Agent, @@ -15,6 +18,8 @@ use { o, Drain, Logger, + PushFnValue, + Record, }, slog_async::Async, slog_envlogger::LogBuilder, @@ -30,7 +35,22 @@ use { struct Arguments { #[clap(short, long, default_value = "config/config.toml")] /// Path to configuration file - config: PathBuf, + config: PathBuf, + #[clap(short, long, default_value = "json", value_enum)] + /// Log flavor to use + log_flavor: LogFlavor, + + #[clap(short = 'L', long)] + /// Whether to print file:line info for each log statement + log_locations: bool, +} + +#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, ValueEnum)] +enum LogFlavor { + /// Standard human-readable output + Plain, + /// Structured JSON output + Json, } #[tokio::main] @@ -46,22 +66,52 @@ async fn main() -> Result<()> { // Parse config early for logging channel capacity let config = Config::new(args.config).context("Could not parse config")?; - // A plain slog drain that sits inside an async drain instance - let inner_drain = LogBuilder::new( - slog_term::FullFormat::new(slog_term::TermDecorator::new().stdout().build()) - .use_file_location() - .build() - .fuse(), // Yell loud on logger internal errors - ) - .parse(&env::var("RUST_LOG").unwrap_or("info".to_string())) - .build(); - - // The top level async drain - let async_drain = Async::new(inner_drain) - .chan_size(config.channel_capacities.logger_buffer) - .build() - .fuse(); - let logger = slog::Logger::root(async_drain, o!()); + let log_level = env::var("RUST_LOG").unwrap_or("info".to_string()); + + // Build an async drain with a different inner drain depending on + // log flavor choice in CLI + let async_drain = match args.log_flavor { + LogFlavor::Json => { + // JSON output using slog-bunyan + let inner_drain = LogBuilder::new(slog_bunyan::new(std::io::stdout()).build().fuse()) + .parse(&log_level) + .build(); + + Async::new(inner_drain) + .chan_size(config.channel_capacities.logger_buffer) + .build() + .fuse() + } + LogFlavor::Plain => { + // Plain, colored output usind slog-term + let inner_drain = LogBuilder::new( + slog_term::FullFormat::new(slog_term::TermDecorator::new().stdout().build()) + .build() + .fuse(), + ) + .parse(&log_level) + .build(); + + Async::new(inner_drain) + .chan_size(config.channel_capacities.logger_buffer) + .build() + .fuse() + } + }; + + let mut logger = slog::Logger::root(async_drain, o!()); + + // Add location information to each log statement if enabled + if args.log_locations { + logger = logger.new(o!( + "loc" => PushFnValue( + move |r: &Record, ser| { + ser.emit(format!("{}:{}", r.file(), r.line())) + } + ), + )); + } + let cwd = std::env::current_dir()?; From 7ee85316efc325a61f8dbe704a67633ce3780a39 Mon Sep 17 00:00:00 2001 From: Stan Drozd Date: Fri, 4 Aug 2023 14:10:49 +0200 Subject: [PATCH 3/6] README: Document logging options, agent.rs: make plain logs default --- README.md | 9 ++++++++- src/bin/agent.rs | 2 +- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 7ed28d4b..16e78f4f 100644 --- a/README.md +++ b/README.md @@ -28,9 +28,16 @@ The agent takes a single `--config` CLI option, pointing at there, containing a minimal set of mandatory options and documentation comments for optional settings. **The config file must exist.** +### Logging The logging level can be configured at runtime through the `RUST_LOG` environment variable using the standard -`error|warn|info|debug|trace` levels. +`error|warn|info|debug|trace`. + +#### Plain/JSON logging +By default, pyth-agent will print plaintext log statements. This can be switched to structured JSON output with `-l json`. + +#### Code location in logs +For debugging purposes, you can specify `-L` to print file/line information with each log statement. This option is disabled by default. ### Key Store Config Migration [v1.x.x LEGACY] Pyth agent v2.0.0 introduces a simplified program and mapping key configuration. This breaking change alters how you define program/mapping key options in your agent config: diff --git a/src/bin/agent.rs b/src/bin/agent.rs index 74fcb055..b281d3bb 100644 --- a/src/bin/agent.rs +++ b/src/bin/agent.rs @@ -36,7 +36,7 @@ struct Arguments { #[clap(short, long, default_value = "config/config.toml")] /// Path to configuration file config: PathBuf, - #[clap(short, long, default_value = "json", value_enum)] + #[clap(short, long, default_value = "plain", value_enum)] /// Log flavor to use log_flavor: LogFlavor, From 4f779ed31b58d129c937eea319bf082d10581c8b Mon Sep 17 00:00:00 2001 From: Stan Drozd Date: Fri, 4 Aug 2023 14:25:24 +0200 Subject: [PATCH 4/6] bin/agent.rs: Fill bunyan "name" field, print version before startup --- src/bin/agent.rs | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/src/bin/agent.rs b/src/bin/agent.rs index b281d3bb..5840dc45 100644 --- a/src/bin/agent.rs +++ b/src/bin/agent.rs @@ -73,9 +73,13 @@ async fn main() -> Result<()> { let async_drain = match args.log_flavor { LogFlavor::Json => { // JSON output using slog-bunyan - let inner_drain = LogBuilder::new(slog_bunyan::new(std::io::stdout()).build().fuse()) - .parse(&log_level) - .build(); + let inner_drain = LogBuilder::new( + slog_bunyan::with_name(env!("CARGO_PKG_NAME"), std::io::stdout()) + .build() + .fuse(), + ) + .parse(&log_level) + .build(); Async::new(inner_drain) .chan_size(config.channel_capacities.logger_buffer) @@ -113,9 +117,10 @@ async fn main() -> Result<()> { } - let cwd = std::env::current_dir()?; - - debug!(&logger, "Current working directory"; "cwd" => cwd.display()); + debug!(&logger, "Starting {}", env!("CARGO_PKG_NAME"); + "version" => env!("CARGO_PKG_VERSION"), + "cwd" => std::env::current_dir()?.display() + ); if let Err(err) = start(config, logger.clone()).await { error!(logger, "{}", err); From 8cde5bce49c94b9f1e1d906d69faa88a821bee9c Mon Sep 17 00:00:00 2001 From: Stan Drozd Date: Fri, 4 Aug 2023 14:58:40 +0200 Subject: [PATCH 5/6] Move version/cwd information to src/agent.rs --- src/agent.rs | 7 ++++++- src/bin/agent.rs | 5 ----- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/src/agent.rs b/src/agent.rs index 442b9878..def7e4af 100644 --- a/src/agent.rs +++ b/src/agent.rs @@ -92,7 +92,12 @@ impl Agent { } pub async fn start(&self, logger: Logger) { - info!(logger, "starting agent"; "config" => format!("{:?}", self.config)); + info!(logger, "Starting {}", env!("CARGO_PKG_NAME"); + "config" => format!("{:?}", &self.config), + "version" => env!("CARGO_PKG_VERSION"), + "cwd" => std::env::current_dir().map(|p| format!("{}", p.display())).unwrap_or("".to_owned()) + ); + if let Err(err) = self.spawn(logger.clone()).await { error!(logger, "{}", err); debug!(logger, "error context"; "context" => format!("{:?}", err)); diff --git a/src/bin/agent.rs b/src/bin/agent.rs index 5840dc45..beb9a4a4 100644 --- a/src/bin/agent.rs +++ b/src/bin/agent.rs @@ -117,11 +117,6 @@ async fn main() -> Result<()> { } - debug!(&logger, "Starting {}", env!("CARGO_PKG_NAME"); - "version" => env!("CARGO_PKG_VERSION"), - "cwd" => std::env::current_dir()?.display() - ); - if let Err(err) = start(config, logger.clone()).await { error!(logger, "{}", err); debug!(logger, "error context"; "context" => format!("{:?}", err)); From 59ecd57cc46a37f2f5c4bb35adc188cdf1e83f6f Mon Sep 17 00:00:00 2001 From: Stan Drozd Date: Mon, 7 Aug 2023 13:35:16 +0200 Subject: [PATCH 6/6] Cargo.toml: Bump version for v2.1.0 release --- Cargo.lock | 2 +- Cargo.toml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d2d338ea..356711da 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2738,7 +2738,7 @@ dependencies = [ [[package]] name = "pyth-agent" -version = "2.0.1" +version = "2.1.0" dependencies = [ "anyhow", "async-trait", diff --git a/Cargo.toml b/Cargo.toml index cd26ea2b..ced29827 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "pyth-agent" -version = "2.0.1" +version = "2.1.0" edition = "2021" [[bin]]