Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
233 changes: 155 additions & 78 deletions src/collectors/github_rate_limit.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use prometheus::{core::Collector, IntGauge, Opts};

use crate::Config;
use anyhow::{Context, Error, Result};
use log::{debug, error};
use reqwest::header::{ACCEPT, AUTHORIZATION, USER_AGENT};
use reqwest::{Client, Method, Request};
use crate::{http, Config};
use anyhow::{bail, Context, Error, Result};
use log::{debug, error, warn};
use prometheus::core::Desc;
use std::collections::HashMap;
use std::sync::{Arc, RwLock};
use tokio::time::Duration;

#[derive(Clone)]
Expand All @@ -18,34 +18,17 @@ struct User {
}

const GH_API_USER_ENDPOINT: &str = "https://api.github.com/user";
const GH_API_RATE_LIMIT_ENDPOINT: &str = "https://api.github.com/rate_limit";

enum GithubReqBuilder {
User,
RateLimit,
}

impl GithubReqBuilder {
fn build_request(&self, client: &Client, token: &str) -> Result<Request, Error> {
let rb = match self {
Self::User => client.request(Method::GET, GH_API_USER_ENDPOINT),
Self::RateLimit => client.request(Method::GET, GH_API_RATE_LIMIT_ENDPOINT),
};

rb.header(
USER_AGENT,
"https://github.com/rust-lang/monitorbot ([email protected])",
)
.header(AUTHORIZATION, format!("{} {}", "token", token))
.header(ACCEPT, "application/vnd.github.v3+json")
.build()
.map_err(Error::from)
}
}

#[derive(Clone)]
pub struct GitHubRateLimit {
users: Vec<User>,
// uninitialized tokens
uninit_tokens: Vec<String>,
// metric namespace
ns: String,
// metrics collection
metrics: Arc<RwLock<Vec<User>>>,
// default metric description
desc: Desc,
}

impl GitHubRateLimit {
Expand All @@ -56,11 +39,23 @@ impl GitHubRateLimit {
.map(|v| v.trim().to_string())
.collect();

let users = Self::get_users_for_tokens(tokens)
let ns = String::from("github_rate_limit");
let users = Self::get_users_for_tokens(ns.clone(), tokens)
.await
.context("Unable to get usernames for rate limit stats")?;

let rv = Self { users };
let rv = Self {
uninit_tokens: users.1,
ns: ns.clone(),
metrics: Arc::new(RwLock::new(users.0)),
desc: Desc::new(
ns,
"GH rate limit stats".to_string(),
Vec::new(),
HashMap::new(),
)
.unwrap(),
};

let refresh_rate = config.gh_rate_limit_stats_cache_refresh;
let mut rv2 = rv.clone();
Expand All @@ -74,15 +69,67 @@ impl GitHubRateLimit {
}
});

// start task to catch up with uninit tokens
if !rv.uninit_tokens.is_empty() {
let refresh_rate = config.gh_rate_limit_stats_cache_refresh + 10;
let mut rv2 = rv.clone();
tokio::spawn(async move {
loop {
tokio::time::delay_for(Duration::from_secs(refresh_rate)).await;
match Self::get_users_for_tokens(rv2.ns.clone(), rv2.uninit_tokens.clone())
.await
{
Err(e) => {
error!("{:#?}", e);
}
Ok(data) => {
if data.0.is_empty() {
continue;
}

if let Ok(mut guard) = rv2.metrics.try_write() {
for u in data.0 {
guard.push(u)
}
}

// drop the task, all tokens are under monitoring
if data.1.is_empty() {
rv2.uninit_tokens.clear();
return;
}

// update data for next iteration
rv2.uninit_tokens = data.1;
}
};
}
});
}

Ok(rv)
}

async fn get_users_for_tokens(tokens: Vec<String>) -> Result<Vec<User>, Error> {
let ns = String::from("monitorbot_github_rate_limit");
let mut rv: Vec<User> = Vec::new();
async fn get_users_for_tokens(
ns: String,
tokens: Vec<String>,
) -> Result<(Vec<User>, Vec<String>), Error> {
let mut uninit_tokens: Vec<String> = Vec::with_capacity(tokens.len());
let mut metrics: Vec<User> = Vec::new();
for token in tokens.into_iter() {
let username = match GitHubRateLimit::get_github_api_username(&token).await {
Ok(login) => login,
Err(e) => {
warn!(
"unable to get token's '{}' username at this time\n{:?}",
token, e
);
uninit_tokens.push(token);
continue;
}
};

let ns2 = ns.clone();
let username = GitHubRateLimit::get_github_api_username(&token).await?;
let user_future = tokio::task::spawn_blocking(move || {
let rate_limit = IntGauge::with_opts(
Opts::new("limit", "Rate limit.")
Expand All @@ -100,7 +147,7 @@ impl GitHubRateLimit {

let rate_reset = IntGauge::with_opts(
Opts::new("reset", "Rate reset.")
.namespace(ns2.clone())
.namespace(ns2)
.const_label("username", username.clone()),
)
.unwrap();
Expand All @@ -119,10 +166,10 @@ impl GitHubRateLimit {
_ => panic!("We need to decide if we wanna panic or keep going"),
};

rv.push(user);
metrics.push(user);
}

Ok(rv)
Ok((metrics, uninit_tokens))
}

async fn get_github_api_username(token: &str) -> Result<String, Error> {
Expand All @@ -131,65 +178,95 @@ impl GitHubRateLimit {
pub login: String,
}

let client = reqwest::Client::new();
let req = GithubReqBuilder::User.build_request(&client, &token)?;
let u = client.execute(req).await?.json::<GithubUser>().await?;

Ok(u.login)
match http::is_token_flagged(token).await {
Err(e) => bail!("checking if token is flagged: {:?}", e),
Ok(v) => {
if v {
bail!("Token is flagged. unable to get username")
} else {
Ok(http::get(&token, GH_API_USER_ENDPOINT)
.send()
.await?
.json::<GithubUser>()
.await?
.login)
}
}
}
}

async fn update_stats(&mut self) -> Result<(), Error> {
debug!("Updating rate limit stats");

#[derive(Debug, serde::Deserialize)]
struct GithubRateLimit {
pub rate: HashMap<String, usize>,
}

let client = reqwest::Client::new();
for u in self.users.iter_mut() {
let req = GithubReqBuilder::RateLimit
.build_request(&client, &u.token)
.context("Unable to build request to update stats")?;

let response = client
.execute(req)
.await
.context("Unable to execute request to update stats")?;
// lock and read what to query for
let tokens = {
self.metrics.read().map_or_else(
|_| Vec::new(),
|lock| {
lock.iter().fold(Vec::new(), |mut acc, item| {
acc.push(item.token.clone());
acc
})
},
)
};

let mut data = response
.json::<GithubRateLimit>()
.await
.context("Unable to deserialize rate limit stats")?;
// querying new updated data
let mut new_stats = Vec::<(String, i64, i64, i64)>::with_capacity(tokens.len());
for token in tokens.into_iter() {
let mut data = http::get_token_rate_limit_stats(&token).await?;

let remaining = data.rate.remove("remaining").unwrap_or(0);
let limit = data.rate.remove("limit").unwrap_or(0);
let reset = data.rate.remove("reset").unwrap_or(0);

u.remaining.set(remaining as i64);
u.reset.set(reset as i64);
u.limit.set(limit as i64);
new_stats.push((
token.to_owned(),
remaining as i64,
limit as i64,
reset as i64,
));
}

// lock and write
if let Ok(guard) = self.metrics.try_write() {
for new_data in new_stats.into_iter() {
guard
.iter()
.find(|item| item.token == new_data.0)
.map(|item| {
item.remaining.set(new_data.1);
item.limit.set(new_data.2);
item.reset.set(new_data.3);
})
.unwrap();
}
}

Ok(())
}
}

impl Collector for GitHubRateLimit {
fn desc(&self) -> std::vec::Vec<&prometheus::core::Desc> {
// descriptions are being defined in the initialization of the metrics options
Vec::default()
fn desc(&self) -> Vec<&Desc> {
vec![&self.desc]
}

fn collect(&self) -> std::vec::Vec<prometheus::proto::MetricFamily> {
// collect MetricFamilys.
let mut mfs = Vec::new();
for user in self.users.iter() {
mfs.extend(user.limit.collect());
mfs.extend(user.remaining.collect());
mfs.extend(user.reset.collect());
}

mfs
self.metrics.read().map_or_else(
|e| {
error!("Unable to collect: {:#?}", e);
Vec::new()
},
|guard| {
guard.iter().fold(Vec::new(), |mut acc, item| {
acc.extend(item.limit.collect());
acc.extend(item.remaining.collect());
acc.extend(item.reset.collect());

acc
})
},
)
}
}
Loading