Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,4 @@ futures = "0.3"
anyhow = "1.0"
log = "0.4"
env_logger = { version = "0.8", features = ["termcolor", "humantime"] }
parse_link_header = "0.2"
2 changes: 1 addition & 1 deletion src/collectors/github_rate_limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ impl ProductMetrics {
let gauge = |name, help| -> IntGauge {
IntGauge::with_opts(
Opts::new(name, help)
.namespace("monitorbot_github_rate_limit")
.namespace("github_rate_limit")
.const_label("username", user)
.const_label("product", product),
)
Expand Down
205 changes: 205 additions & 0 deletions src/collectors/github_runners.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
use super::default_headers;
use crate::Config;
use anyhow::{Context, Result};
use log::{debug, error};
use prometheus::core::AtomicI64;
use prometheus::core::{Desc, GenericGauge};
use prometheus::proto::MetricFamily;
use prometheus::{core::Collector, IntGauge, Opts};
use reqwest::header::{HeaderValue, LINK};
use reqwest::{Client, Response};
use std::collections::HashMap;
use std::sync::{Arc, RwLock};
use tokio::time::Duration;

const GH_RUNNERS_ENDPOINT: &str =
"https://api.github.com/repos/{owner_repo}/actions/runners?per_page=100";

#[derive(Debug, serde::Deserialize)]
struct ApiResponse {
total_count: usize,
runners: Vec<Runner>,
}

#[derive(Debug, serde::Deserialize)]
struct Runner {
id: usize,
name: String,
os: String,
status: String,
busy: bool,
}

#[derive(Clone)]
pub struct GithubRunners {
//api token to use
token: String,
// repos to track gha runners
repos: Vec<String>,
// actual metrics
metrics: Arc<RwLock<Vec<IntGauge>>>,
// default metric description
desc: Desc,
http: Client,
}

impl GithubRunners {
pub async fn new(config: &Config, http: Client) -> Result<Self> {
let token = config.github_token.to_string();
let repos: Vec<String> = config
.gha_runners_repos
.split(',')
.map(|v| v.trim().to_string())
.collect();

let rv = Self {
token,
repos,
http,
metrics: Arc::new(RwLock::new(Vec::new())),
desc: Desc::new(
String::from("gha_runner"),
String::from("GHA runner's status"),
Vec::new(),
HashMap::new(),
)
.unwrap(),
};

let refresh_rate = config.gha_runners_cache_refresh;
let mut rv2 = rv.clone();
tokio::spawn(async move {
loop {
if let Err(e) = rv2.update_stats().await {
error!("{:#?}", e);
}

tokio::time::delay_for(Duration::from_secs(refresh_rate)).await;
}
});

Ok(rv)
}

async fn update_stats(&mut self) -> Result<()> {
let mut gauges = Vec::with_capacity(self.repos.len() * 2);
for repo in self.repos.iter() {
let mut url: Option<String> = String::from(GH_RUNNERS_ENDPOINT)
.replace("{owner_repo}", repo)
.into();

debug!("Updating runner's stats");

while let Some(endpoint) = url.take() {
let response = self
.http
.get(&endpoint)
.headers(default_headers(&self.token))
.send()
.await?;

url = guard_rate_limited(&response)?
.error_for_status_ref()
.map(|res| next_uri(res.headers().get(LINK)))?;

let resp = response.json::<ApiResponse>().await?;

for runner in resp.runners.iter() {
let online = metric_factory(
"online",
"runner is online",
&self.desc.fq_name,
&repo,
&runner.name,
);
online.set(if runner.status == "online" { 1 } else { 0 });
gauges.push(online);

let busy = metric_factory(
"busy",
"runner is busy",
&self.desc.fq_name,
&repo,
&runner.name,
);
busy.set(if runner.busy { 1 } else { 0 });
gauges.push(busy);
}
}
}

// lock and replace old data
let mut guard = self.metrics.write().unwrap();
*guard = gauges;

Ok(())
}
}

impl Collector for GithubRunners {
fn desc(&self) -> Vec<&Desc> {
vec![&self.desc]
}

fn collect(&self) -> Vec<MetricFamily> {
self.metrics.read().map_or_else(
|e| {
error!("Unable to collect: {:#?}", e);
Vec::with_capacity(0)
},
|guard| {
guard.iter().fold(Vec::new(), |mut acc, item| {
acc.extend(item.collect());
acc
})
},
)
}
}

fn guard_rate_limited(response: &Response) -> Result<&Response> {
let rate_limited = match response.headers().get("x-ratelimit-remaining") {
Some(rl) => rl.to_str()?.parse::<usize>()? == 0,
None => unreachable!(),
};

if rate_limited {
return response
.error_for_status_ref()
.context("We've hit the rate limit");
}

Ok(response)
}

fn next_uri(header: Option<&HeaderValue>) -> Option<String> {
if let Some(header) = header {
return match header.to_str() {
Ok(header_str) => match parse_link_header::parse(header_str) {
Ok(links) => links
.get(&Some("next".to_string()))
.map(|next| next.uri.to_string()),
_ => None,
},
_ => None,
};
}

None
}

fn metric_factory<S: Into<String>>(
name: S,
help: S,
ns: S,
repo: S,
runner: S,
) -> GenericGauge<AtomicI64> {
IntGauge::with_opts(
Opts::new(name, help)
.namespace(ns)
.const_label("repo", repo)
.const_label("runner", runner),
)
.unwrap()
}
25 changes: 25 additions & 0 deletions src/collectors/mod.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,43 @@
mod github_rate_limit;
mod github_runners;

pub use crate::collectors::github_rate_limit::GitHubRateLimit;
pub use crate::collectors::github_runners::GithubRunners;

use crate::MetricProvider;
use anyhow::{Error, Result};
use futures::TryFutureExt;
use log::info;
use reqwest::header::{HeaderMap, ACCEPT, AUTHORIZATION};
use reqwest::ClientBuilder;

// register collectors for metrics gathering
pub async fn register_collectors(p: &MetricProvider) -> Result<(), Error> {
let http = ClientBuilder::new()
.user_agent("https://github.com/rust-lang/monitorbot ([email protected])")
.build()?;

GitHubRateLimit::new(&p.config)
.and_then(|rl| async {
info!("Registering GitHubRateLimit collector");
p.register_collector(rl)
})
.await?;

GithubRunners::new(&p.config, http)
.and_then(|gr| async {
info!("Registering GitHubActionsRunners collector");
p.register_collector(gr)
})
.await
}

fn default_headers(token: &str) -> HeaderMap {
let mut headers = HeaderMap::new();
headers.insert(
AUTHORIZATION,
format!("{} {}", "token", token).parse().unwrap(),
);
headers.insert(ACCEPT, "application/vnd.github.v3+json".parse().unwrap());
headers
}
11 changes: 11 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,14 @@ pub struct Config {
pub gh_rate_limit_tokens: String,
// github rate limit stats data cache refresh rate frequency (in seconds)
pub gh_rate_limit_stats_cache_refresh: u64,
// github api token to be used when querying for gha runner's status
// note: token must have (repo scope) authorization
pub github_token: String,
// gh runner's repos to track they status. multiple repos are allowed
// ex. "rust,cargo,docs.rs"
pub gha_runners_repos: String,
// gha runner's status refresh rate frequency (in seconds)
pub gha_runners_cache_refresh: u64,
}

impl Config {
Expand All @@ -23,6 +31,9 @@ impl Config {
port: default_env("PORT", 3001)?,
gh_rate_limit_tokens: require_env("RATE_LIMIT_TOKENS")?,
gh_rate_limit_stats_cache_refresh: default_env("GH_RATE_LIMIT_STATS_REFRESH", 120)?,
github_token: require_env("GITHUB_TOKEN")?,
gha_runners_repos: require_env("RUNNERS_REPOS")?,
gha_runners_cache_refresh: default_env("GHA_RUNNERS_REFRESH", 120)?,
})
}
}
Expand Down
3 changes: 2 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ pub struct MetricProvider {

impl MetricProvider {
pub fn new(config: Config) -> Self {
let register = Registry::new_custom(None, None).expect("Unable to build Registry");
let register = Registry::new_custom(Some("monitorbot".to_string()), None)
.expect("Unable to build Registry");
Self { register, config }
}

Expand Down