Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 50 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,5 @@ hyper = "0.13"
prometheus = "0.10"
futures = "0.3"
anyhow = "1.0"
log = "0.4"
env_logger = { version = "0.8", features = ["termcolor", "humantime"] }
69 changes: 35 additions & 34 deletions src/collectors/github_rate_limit.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use prometheus::{core::Collector, IntGauge, Opts};

use crate::Config;
use anyhow::{Context, Error, Result};
use log::{debug, error};
use reqwest::header::{ACCEPT, AUTHORIZATION, USER_AGENT};
use reqwest::{Client, Method, Request};
use std::collections::HashMap;
Expand All @@ -24,7 +26,7 @@ enum GithubReqBuilder {
}

impl GithubReqBuilder {
fn build_request(&self, client: &Client, token: &str) -> Result<Request, reqwest::Error> {
fn build_request(&self, client: &Client, token: &str) -> Result<Request, Error> {
let rb = match self {
Self::User => client.request(Method::GET, GH_API_USER_ENDPOINT),
Self::RateLimit => client.request(Method::GET, GH_API_RATE_LIMIT_ENDPOINT),
Expand All @@ -37,49 +39,50 @@ impl GithubReqBuilder {
.header(AUTHORIZATION, format!("{} {}", "token", token))
.header(ACCEPT, "application/vnd.github.v3+json")
.build()
.map_err(Error::from)
}
}

#[derive(Clone)]
pub struct GitHubRateLimit {
descriptions: Vec<prometheus::core::Desc>,
users: Vec<User>,
}

impl GitHubRateLimit {
pub async fn new(config: &Config) -> Self {
pub async fn new(config: &Config) -> Result<Self, Error> {
let tokens: Vec<String> = config
.gh_rate_limit_tokens
.split(',')
.map(|v| v.trim().to_string())
.collect();

let users = Self::get_users_for_tokens(tokens).await;
let descriptions = Vec::new();
let users = Self::get_users_for_tokens(tokens)
.await
.context("Unable to get usernames for rate limit stats")?;

let rv = Self {
users,
descriptions,
};
let rv = Self { users };

let refresh_rate = config.gh_rate_limit_stats_cache_refresh;
let mut rv2 = rv.clone();
tokio::spawn(async move {
loop {
rv2.update_stats().await;
if let Err(e) = rv2.update_stats().await {
error!("{:#?}", e);
}

tokio::time::delay_for(Duration::from_secs(refresh_rate)).await;
}
});

rv
Ok(rv)
}

async fn get_users_for_tokens(tokens: Vec<String>) -> Vec<User> {
async fn get_users_for_tokens(tokens: Vec<String>) -> Result<Vec<User>, Error> {
let ns = String::from("monitorbot_github_rate_limit");
let mut rv: Vec<User> = Vec::new();
for token in tokens.into_iter() {
let ns2 = ns.clone();
let username = GitHubRateLimit::get_github_api_username(&token).await;
let username = GitHubRateLimit::get_github_api_username(&token).await?;
let user_future = tokio::task::spawn_blocking(move || {
let rate_limit = IntGauge::with_opts(
Opts::new("limit", "Rate limit.")
Expand Down Expand Up @@ -119,50 +122,45 @@ impl GitHubRateLimit {
rv.push(user);
}

rv
Ok(rv)
}

async fn get_github_api_username(token: &str) -> String {
async fn get_github_api_username(token: &str) -> Result<String, Error> {
#[derive(serde::Deserialize)]
struct GithubUser {
pub login: String,
}

let client = reqwest::Client::new();
let req = GithubReqBuilder::User
.build_request(&client, &token)
.unwrap();
let u = client
.execute(req)
.await
.unwrap()
.json::<GithubUser>()
.await
.unwrap();
let req = GithubReqBuilder::User.build_request(&client, &token)?;
let u = client.execute(req).await?.json::<GithubUser>().await?;

u.login
Ok(u.login)
}

async fn update_stats(&mut self) {
async fn update_stats(&mut self) -> Result<(), Error> {
debug!("Updating rate limit stats");

#[derive(Debug, serde::Deserialize)]
struct GithubRateLimit {
pub rate: HashMap<String, usize>,
}

let client = reqwest::Client::new();

//FIXME: we will (might?) need a RWLock on users structure
for u in self.users.iter_mut() {
let req = GithubReqBuilder::RateLimit
.build_request(&client, &u.token)
.unwrap();
let mut data = client
.context("Unable to build request to update stats")?;

let response = client
.execute(req)
.await
.unwrap()
.context("Unable to execute request to update stats")?;

let mut data = response
.json::<GithubRateLimit>()
.await
.unwrap();
.context("Unable to deserialize rate limit stats")?;

let remaining = data.rate.remove("remaining").unwrap_or(0);
let limit = data.rate.remove("limit").unwrap_or(0);
Expand All @@ -172,12 +170,15 @@ impl GitHubRateLimit {
u.reset.set(reset as i64);
u.limit.set(limit as i64);
}

Ok(())
}
}

impl Collector for GitHubRateLimit {
fn desc(&self) -> std::vec::Vec<&prometheus::core::Desc> {
self.descriptions.iter().collect()
// descriptions are being defined in the initialization of the metrics options
Vec::default()
}

fn collect(&self) -> std::vec::Vec<prometheus::proto::MetricFamily> {
Expand Down
12 changes: 9 additions & 3 deletions src/collectors/mod.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,18 @@
mod github_rate_limit;

pub use crate::collectors::github_rate_limit::GitHubRateLimit;

use crate::MetricProvider;
use futures::FutureExt;
use anyhow::{Error, Result};
use futures::TryFutureExt;
use log::info;

// register collectors for metrics gathering
pub async fn register_collectors(p: &MetricProvider) -> Result<(), prometheus::Error> {
pub async fn register_collectors(p: &MetricProvider) -> Result<(), Error> {
GitHubRateLimit::new(&p.config)
.map(|rl| p.register_collector(rl))
.and_then(|rl| async {
info!("Registering GitHubRateLimit collector");
p.register_collector(rl)
})
.await
}
42 changes: 29 additions & 13 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,18 @@

pub mod collectors;
mod config;

pub use config::Config;

use prometheus::core::Collector;
use prometheus::{Encoder, Registry};

use anyhow::{Error, Result};
use futures::future;
use futures::task::{Context, Poll};
use hyper::service::Service;
use hyper::{Body, Method, Request, Response, StatusCode};
use log::{debug, error};

#[derive(Clone, Debug)]
pub struct MetricProvider {
Expand All @@ -24,15 +27,19 @@ impl MetricProvider {
Self { register, config }
}

fn register_collector(
&self,
collector: impl Collector + 'static,
) -> Result<(), prometheus::Error> {
self.register.register(Box::new(collector))
fn register_collector(&self, collector: impl Collector + 'static) -> Result<(), Error> {
self.register
.register(Box::new(collector))
.map_err(Error::from)
}

fn gather_with_encoder<BUF: std::io::Write>(&self, encoder: impl Encoder, buf: &mut BUF) {
encoder.encode(&self.register.gather(), buf).unwrap();
fn gather_with_encoder<BUF>(&self, encoder: impl Encoder, buf: &mut BUF) -> Result<(), Error>
where
BUF: std::io::Write,
{
encoder
.encode(&self.register.gather(), buf)
.map_err(Error::from)
}

pub fn into_service(self) -> MetricProviderFactory {
Expand All @@ -50,17 +57,26 @@ impl Service<Request<Body>> for MetricProvider {
}

fn call(&mut self, req: Request<Body>) -> Self::Future {
debug!("New Request to endpoint {}", req.uri().path());

let output = match (req.method(), req.uri().path()) {
// Metrics handler
(&Method::GET, "/metrics") => {
let encoder = prometheus::TextEncoder::new();
let mut buffer = Vec::<u8>::new();
self.gather_with_encoder(encoder, &mut buffer);

Response::builder()
.status(StatusCode::OK)
.body(Body::from(buffer))
.unwrap()
match self.gather_with_encoder(encoder, &mut buffer) {
Ok(_) => Response::builder()
.status(StatusCode::OK)
.body(Body::from(buffer))
.unwrap(),
Err(e) => {
error!("{:?}", e);
Response::builder()
.status(StatusCode::INTERNAL_SERVER_ERROR)
.body(Body::empty())
.unwrap()
}
}
}
// All other paths and methods
_ => Response::builder()
Expand Down
12 changes: 7 additions & 5 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,27 +1,29 @@
use anyhow::{bail, Error, Result};
use hyper::Server;
use log::info;
use monitorbot::Config;
use monitorbot::{collectors::register_collectors, MetricProvider};
use std::net::SocketAddr;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
async fn main() -> Result<(), Error> {
dotenv::dotenv().ok();
env_logger::init();

let config = Config::from_env()?;
let port = config.port;
let addr = SocketAddr::from(([0, 0, 0, 0], port));

let provider = MetricProvider::new(config);
if let Err(e) = register_collectors(&provider).await {
eprintln!("Unable to register collectors: {:#?}", e);
return Ok(());
bail!("(Registering collectors) {}", e)
}

let server = Server::bind(&addr).serve(provider.into_service());
println!("Server listening on port {}", port);
info!("Server listening on port: {}", port);

if let Err(e) = server.await {
eprintln!("Server error: {}", e);
bail!("(Hyper server error) {}", e);
}

Ok(())
Expand Down