diff --git a/Cargo.lock b/Cargo.lock index 4224e37..c4a2dae 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -595,6 +595,7 @@ dependencies = [ "futures", "hyper", "log", + "parse_link_header", "prometheus", "reqwest", "serde", @@ -706,6 +707,16 @@ dependencies = [ "winapi 0.3.9", ] +[[package]] +name = "parse_link_header" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9b4f88f9547da526c2670628a21a1749599b79b65ffc2b8609c874fe012f58a0" +dependencies = [ + "http", + "regex", +] + [[package]] name = "percent-encoding" version = "2.1.0" diff --git a/Cargo.toml b/Cargo.toml index 8d5705d..36aa587 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,3 +18,4 @@ futures = "0.3" anyhow = "1.0" log = "0.4" env_logger = { version = "0.8", features = ["termcolor", "humantime"] } +parse_link_header = "0.2" diff --git a/src/collectors/github_rate_limit.rs b/src/collectors/github_rate_limit.rs index 3aaafbe..703cca4 100644 --- a/src/collectors/github_rate_limit.rs +++ b/src/collectors/github_rate_limit.rs @@ -184,7 +184,7 @@ impl ProductMetrics { let gauge = |name, help| -> IntGauge { IntGauge::with_opts( Opts::new(name, help) - .namespace("monitorbot_github_rate_limit") + .namespace("github_rate_limit") .const_label("username", user) .const_label("product", product), ) diff --git a/src/collectors/github_runners.rs b/src/collectors/github_runners.rs new file mode 100644 index 0000000..e1ee2f8 --- /dev/null +++ b/src/collectors/github_runners.rs @@ -0,0 +1,205 @@ +use super::default_headers; +use crate::Config; +use anyhow::{Context, Result}; +use log::{debug, error}; +use prometheus::core::AtomicI64; +use prometheus::core::{Desc, GenericGauge}; +use prometheus::proto::MetricFamily; +use prometheus::{core::Collector, IntGauge, Opts}; +use reqwest::header::{HeaderValue, LINK}; +use reqwest::{Client, Response}; +use std::collections::HashMap; +use std::sync::{Arc, RwLock}; +use tokio::time::Duration; + +const GH_RUNNERS_ENDPOINT: &str = + "https://api.github.com/repos/{owner_repo}/actions/runners?per_page=100"; + +#[derive(Debug, serde::Deserialize)] +struct ApiResponse { + total_count: usize, + runners: Vec, +} + +#[derive(Debug, serde::Deserialize)] +struct Runner { + id: usize, + name: String, + os: String, + status: String, + busy: bool, +} + +#[derive(Clone)] +pub struct GithubRunners { + //api token to use + token: String, + // repos to track gha runners + repos: Vec, + // actual metrics + metrics: Arc>>, + // default metric description + desc: Desc, + http: Client, +} + +impl GithubRunners { + pub async fn new(config: &Config, http: Client) -> Result { + let token = config.github_token.to_string(); + let repos: Vec = config + .gha_runners_repos + .split(',') + .map(|v| v.trim().to_string()) + .collect(); + + let rv = Self { + token, + repos, + http, + metrics: Arc::new(RwLock::new(Vec::new())), + desc: Desc::new( + String::from("gha_runner"), + String::from("GHA runner's status"), + Vec::new(), + HashMap::new(), + ) + .unwrap(), + }; + + let refresh_rate = config.gha_runners_cache_refresh; + let mut rv2 = rv.clone(); + tokio::spawn(async move { + loop { + if let Err(e) = rv2.update_stats().await { + error!("{:#?}", e); + } + + tokio::time::delay_for(Duration::from_secs(refresh_rate)).await; + } + }); + + Ok(rv) + } + + async fn update_stats(&mut self) -> Result<()> { + let mut gauges = Vec::with_capacity(self.repos.len() * 2); + for repo in self.repos.iter() { + let mut url: Option = String::from(GH_RUNNERS_ENDPOINT) + .replace("{owner_repo}", repo) + .into(); + + debug!("Updating runner's stats"); + + while let Some(endpoint) = url.take() { + let response = self + .http + .get(&endpoint) + .headers(default_headers(&self.token)) + .send() + .await?; + + url = guard_rate_limited(&response)? + .error_for_status_ref() + .map(|res| next_uri(res.headers().get(LINK)))?; + + let resp = response.json::().await?; + + for runner in resp.runners.iter() { + let online = metric_factory( + "online", + "runner is online", + &self.desc.fq_name, + &repo, + &runner.name, + ); + online.set(if runner.status == "online" { 1 } else { 0 }); + gauges.push(online); + + let busy = metric_factory( + "busy", + "runner is busy", + &self.desc.fq_name, + &repo, + &runner.name, + ); + busy.set(if runner.busy { 1 } else { 0 }); + gauges.push(busy); + } + } + } + + // lock and replace old data + let mut guard = self.metrics.write().unwrap(); + *guard = gauges; + + Ok(()) + } +} + +impl Collector for GithubRunners { + fn desc(&self) -> Vec<&Desc> { + vec![&self.desc] + } + + fn collect(&self) -> Vec { + self.metrics.read().map_or_else( + |e| { + error!("Unable to collect: {:#?}", e); + Vec::with_capacity(0) + }, + |guard| { + guard.iter().fold(Vec::new(), |mut acc, item| { + acc.extend(item.collect()); + acc + }) + }, + ) + } +} + +fn guard_rate_limited(response: &Response) -> Result<&Response> { + let rate_limited = match response.headers().get("x-ratelimit-remaining") { + Some(rl) => rl.to_str()?.parse::()? == 0, + None => unreachable!(), + }; + + if rate_limited { + return response + .error_for_status_ref() + .context("We've hit the rate limit"); + } + + Ok(response) +} + +fn next_uri(header: Option<&HeaderValue>) -> Option { + if let Some(header) = header { + return match header.to_str() { + Ok(header_str) => match parse_link_header::parse(header_str) { + Ok(links) => links + .get(&Some("next".to_string())) + .map(|next| next.uri.to_string()), + _ => None, + }, + _ => None, + }; + } + + None +} + +fn metric_factory>( + name: S, + help: S, + ns: S, + repo: S, + runner: S, +) -> GenericGauge { + IntGauge::with_opts( + Opts::new(name, help) + .namespace(ns) + .const_label("repo", repo) + .const_label("runner", runner), + ) + .unwrap() +} diff --git a/src/collectors/mod.rs b/src/collectors/mod.rs index f6ba43c..358f583 100644 --- a/src/collectors/mod.rs +++ b/src/collectors/mod.rs @@ -1,18 +1,43 @@ mod github_rate_limit; +mod github_runners; pub use crate::collectors::github_rate_limit::GitHubRateLimit; +pub use crate::collectors::github_runners::GithubRunners; use crate::MetricProvider; use anyhow::{Error, Result}; use futures::TryFutureExt; use log::info; +use reqwest::header::{HeaderMap, ACCEPT, AUTHORIZATION}; +use reqwest::ClientBuilder; // register collectors for metrics gathering pub async fn register_collectors(p: &MetricProvider) -> Result<(), Error> { + let http = ClientBuilder::new() + .user_agent("https://github.com/rust-lang/monitorbot (infra@rust-lang.org)") + .build()?; + GitHubRateLimit::new(&p.config) .and_then(|rl| async { info!("Registering GitHubRateLimit collector"); p.register_collector(rl) }) + .await?; + + GithubRunners::new(&p.config, http) + .and_then(|gr| async { + info!("Registering GitHubActionsRunners collector"); + p.register_collector(gr) + }) .await } + +fn default_headers(token: &str) -> HeaderMap { + let mut headers = HeaderMap::new(); + headers.insert( + AUTHORIZATION, + format!("{} {}", "token", token).parse().unwrap(), + ); + headers.insert(ACCEPT, "application/vnd.github.v3+json".parse().unwrap()); + headers +} diff --git a/src/config.rs b/src/config.rs index fed220e..c405889 100644 --- a/src/config.rs +++ b/src/config.rs @@ -14,6 +14,14 @@ pub struct Config { pub gh_rate_limit_tokens: String, // github rate limit stats data cache refresh rate frequency (in seconds) pub gh_rate_limit_stats_cache_refresh: u64, + // github api token to be used when querying for gha runner's status + // note: token must have (repo scope) authorization + pub github_token: String, + // gh runner's repos to track they status. multiple repos are allowed + // ex. "rust,cargo,docs.rs" + pub gha_runners_repos: String, + // gha runner's status refresh rate frequency (in seconds) + pub gha_runners_cache_refresh: u64, } impl Config { @@ -23,6 +31,9 @@ impl Config { port: default_env("PORT", 3001)?, gh_rate_limit_tokens: require_env("RATE_LIMIT_TOKENS")?, gh_rate_limit_stats_cache_refresh: default_env("GH_RATE_LIMIT_STATS_REFRESH", 120)?, + github_token: require_env("GITHUB_TOKEN")?, + gha_runners_repos: require_env("RUNNERS_REPOS")?, + gha_runners_cache_refresh: default_env("GHA_RUNNERS_REFRESH", 120)?, }) } } diff --git a/src/lib.rs b/src/lib.rs index a0860be..208ee87 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -25,7 +25,8 @@ pub struct MetricProvider { impl MetricProvider { pub fn new(config: Config) -> Self { - let register = Registry::new_custom(None, None).expect("Unable to build Registry"); + let register = Registry::new_custom(Some("monitorbot".to_string()), None) + .expect("Unable to build Registry"); Self { register, config } }