Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/admin/delete_crate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ pub struct Opts {
}

pub fn run(opts: Opts) {
let conn = db::connect_now().unwrap();
let conn = db::oneoff_connection().unwrap();
conn.transaction::<_, diesel::result::Error, _>(|| {
delete(opts, &conn);
Ok(())
Expand Down
2 changes: 1 addition & 1 deletion src/admin/delete_version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ pub struct Opts {
}

pub fn run(opts: Opts) {
let conn = db::connect_now().unwrap();
let conn = db::oneoff_connection().unwrap();
conn.transaction::<_, diesel::result::Error, _>(|| {
delete(opts, &conn);
Ok(())
Expand Down
13 changes: 8 additions & 5 deletions src/admin/migrate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,15 @@ diesel_migrations::embed_migrations!("./migrations");
pub struct Opts;

pub fn run(_opts: Opts) -> Result<(), Error> {
let db_config = crate::config::DatabasePools::full_from_environment();
let config = crate::config::DatabasePools::full_from_environment(
&crate::config::Base::from_environment(),
);

// TODO: Refactor logic so that we can also check things from App::new() here.
// If the app will panic due to bad configuration, it is better to error in the release phase
// to avoid launching dynos that will fail.

if db_config.are_all_read_only() {
if config.are_all_read_only() {
// TODO: Check `any_pending_migrations()` with a read-only connection and error if true.
// It looks like this requires changes upstream to make this pub in `migration_macros`.

Expand All @@ -29,13 +31,14 @@ pub fn run(_opts: Opts) -> Result<(), Error> {
return Ok(());
}

println!("==> migrating the database");
// The primary is online, access directly via `DATABASE_URL`.
let conn = crate::db::connect_now()?;
let conn = crate::db::oneoff_connection_with_config(&config)?;

println!("==> migrating the database");
embedded_migrations::run_with_output(&conn, &mut std::io::stdout())?;

println!("==> synchronizing crate categories");
crate::boot::categories::sync(CATEGORIES_TOML).unwrap();
crate::boot::categories::sync_with_connection(CATEGORIES_TOML, &conn).unwrap();

Ok(())
}
2 changes: 1 addition & 1 deletion src/admin/populate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ pub struct Opts {
}

pub fn run(opts: Opts) {
let conn = db::connect_now().unwrap();
let conn = db::oneoff_connection().unwrap();
conn.transaction(|| update(opts, &conn)).unwrap();
}

Expand Down
2 changes: 1 addition & 1 deletion src/admin/render_readmes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ pub struct Opts {

pub fn run(opts: Opts) -> anyhow::Result<()> {
let base_config = Arc::new(config::Base::from_environment());
let conn = db::connect_now().unwrap();
let conn = db::oneoff_connection().unwrap();

let start_time = Utc::now();

Expand Down
2 changes: 1 addition & 1 deletion src/admin/transfer_crates.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ pub struct Opts {
}

pub fn run(opts: Opts) {
let conn = db::connect_now().unwrap();
let conn = db::oneoff_connection().unwrap();
conn.transaction::<_, diesel::result::Error, _>(|| {
transfer(opts, &conn);
Ok(())
Expand Down
2 changes: 1 addition & 1 deletion src/admin/verify_token.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ pub struct Opts {
}

pub fn run(opts: Opts) -> AppResult<()> {
let conn = db::connect_now()?;
let conn = db::oneoff_connection()?;
let user = User::find_by_api_token(&conn, &opts.api_token)?;
println!("The token belongs to user {}", user.gh_login);
Ok(())
Expand Down
6 changes: 4 additions & 2 deletions src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ impl App {
let thread_pool = Arc::new(ScheduledThreadPool::new(db_helper_threads));

let primary_database = if config.use_test_database_pool {
DieselPool::new_test(&config.db.primary.url)
DieselPool::new_test(&config.db, &config.db.primary.url)
} else {
let primary_db_connection_config = ConnectionConfig {
statement_timeout: db_connection_timeout,
Expand All @@ -116,6 +116,7 @@ impl App {

DieselPool::new(
&config.db.primary.url,
&config.db,
primary_db_config,
instance_metrics
.database_time_to_obtain_connection
Expand All @@ -126,7 +127,7 @@ impl App {

let replica_database = if let Some(pool_config) = config.db.replica.as_ref() {
if config.use_test_database_pool {
Some(DieselPool::new_test(&pool_config.url))
Some(DieselPool::new_test(&config.db, &pool_config.url))
} else {
let replica_db_connection_config = ConnectionConfig {
statement_timeout: db_connection_timeout,
Expand All @@ -143,6 +144,7 @@ impl App {
Some(
DieselPool::new(
&pool_config.url,
&config.db,
replica_db_config,
instance_metrics
.database_time_to_obtain_connection
Expand Down
9 changes: 4 additions & 5 deletions src/bin/background-worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,10 @@ use std::time::Duration;
fn main() {
println!("Booting runner");

let db_config = config::DatabasePools::full_from_environment();
let base_config = config::Base::from_environment();
let uploader = base_config.uploader();
let config = config::Server::default();
let uploader = config.base.uploader();

if db_config.are_all_read_only() {
if config.db.are_all_read_only() {
loop {
println!(
"Cannot run background jobs with a read-only pool. Please scale background_worker \
Expand All @@ -38,7 +37,7 @@ fn main() {
}
}

let db_url = db::connection_url(&db_config.primary.url);
let db_url = db::connection_url(&config.db, &config.db.primary.url);

let job_start_timeout = dotenv::var("BACKGROUND_JOB_TIMEOUT")
.unwrap_or_else(|_| "30".into())
Expand Down
2 changes: 1 addition & 1 deletion src/bin/enqueue-job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use swirl::schema::background_jobs::dsl::*;
use swirl::Job;

fn main() -> Result<()> {
let conn = db::connect_now()?;
let conn = db::oneoff_connection()?;
let mut args = std::env::args().skip(1);

let job = args.next().unwrap_or_default();
Expand Down
2 changes: 1 addition & 1 deletion src/bin/monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use cargo_registry::{admin::on_call, db, schema::*};
use diesel::prelude::*;

fn main() -> Result<()> {
let conn = db::connect_now()?;
let conn = db::oneoff_connection()?;

check_failing_background_jobs(&conn)?;
check_stalled_update_downloads(&conn)?;
Expand Down
7 changes: 0 additions & 7 deletions src/boot/categories.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
// Sync available crate categories from `src/categories.toml`.
// Runs when the server is started.

use crate::db;

use anyhow::{Context, Result};
use diesel::prelude::*;

Expand Down Expand Up @@ -77,11 +75,6 @@ fn categories_from_toml(
Ok(result)
}

pub fn sync(toml_str: &str) -> Result<()> {
let conn = db::connect_now()?;
sync_with_connection(toml_str, &conn)
}

pub fn sync_with_connection(toml_str: &str, conn: &PgConnection) -> Result<()> {
use crate::schema::categories::dsl::*;
use diesel::dsl::all;
Expand Down
5 changes: 3 additions & 2 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,10 @@ impl Default for Server {
Some(s) if s.is_empty() => vec![],
Some(s) => s.split(',').map(String::from).collect(),
};
let base = Base::from_environment();
Server {
db: DatabasePools::full_from_environment(),
base: Base::from_environment(),
db: DatabasePools::full_from_environment(&base),
base,
session_key: env("SESSION_KEY"),
gh_client_id: env("GH_CLIENT_ID"),
gh_client_secret: env("GH_CLIENT_SECRET"),
Expand Down
29 changes: 27 additions & 2 deletions src/config/database_pools.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,25 @@
//! - `DB_OFFLINE`: If set to `leader` then use the read-only follower as if it was the leader.
//! If set to `follower` then act as if `READ_ONLY_REPLICA_URL` was unset.
//! - `READ_ONLY_MODE`: If defined (even as empty) then force all connections to be read-only.
//! - `DB_TCP_TIMEOUT_MS`: TCP timeout in milliseconds. See the doc comment for more details.

use crate::env;
use crate::config::Base;
use crate::{env, Env};

pub struct DatabasePools {
/// Settings for the primary database. This is usually writeable, but will be read-only in
/// some configurations.
pub primary: DbPoolConfig,
/// An optional follower database. Always read-only.
pub replica: Option<DbPoolConfig>,
/// Number of seconds to wait for unacknowledged TCP packets before treating the connection as
/// broken. This value will determine how long crates.io stays unavailable in case of full
/// packet loss between the application and the database: setting it too high will result in an
/// unnecessarily long outage (before the unhealthy database logic kicks in), while setting it
/// too low might result in healthy connections being dropped.
pub tcp_timeout_ms: u64,
/// Whether to enforce that all the database connections are encrypted with TLS.
pub enforce_tls: bool,
}

#[derive(Debug)]
Expand All @@ -42,7 +52,7 @@ impl DatabasePools {
/// # Panics
///
/// This function panics if `DB_OFFLINE=leader` but `READ_ONLY_REPLICA_URL` is unset.
pub fn full_from_environment() -> Self {
pub fn full_from_environment(base: &Base) -> Self {
let leader_url = env("DATABASE_URL");
let follower_url = dotenv::var("READ_ONLY_REPLICA_URL").ok();
let read_only_mode = dotenv::var("READ_ONLY_MODE").is_ok();
Expand All @@ -67,6 +77,13 @@ impl DatabasePools {
_ => None,
};

let tcp_timeout_ms = match dotenv::var("DB_TCP_TIMEOUT_MS") {
Ok(num) => num.parse().expect("couldn't parse DB_TCP_TIMEOUT_MS"),
Err(_) => 15 * 1000, // 15 seconds
};

let enforce_tls = base.env == Env::Production;

match dotenv::var("DB_OFFLINE").as_deref() {
// The actual leader is down, use the follower in read-only mode as the primary and
// don't configure a replica.
Expand All @@ -79,6 +96,8 @@ impl DatabasePools {
min_idle: primary_min_idle,
},
replica: None,
tcp_timeout_ms,
enforce_tls,
},
// The follower is down, don't configure the replica.
Ok("follower") => Self {
Expand All @@ -89,6 +108,8 @@ impl DatabasePools {
min_idle: primary_min_idle,
},
replica: None,
tcp_timeout_ms,
enforce_tls,
},
_ => Self {
primary: DbPoolConfig {
Expand All @@ -106,6 +127,8 @@ impl DatabasePools {
pool_size: replica_pool_size,
min_idle: replica_min_idle,
}),
tcp_timeout_ms,
enforce_tls,
},
}
}
Expand All @@ -119,6 +142,8 @@ impl DatabasePools {
min_idle: None,
},
replica: None,
tcp_timeout_ms: 1000, // 1 second
enforce_tls: false,
}
}
}
47 changes: 36 additions & 11 deletions src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use std::{ops::Deref, time::Duration};
use thiserror::Error;
use url::Url;

use crate::config;
use crate::middleware::app::RequestApp;

#[derive(Clone)]
Expand All @@ -22,10 +23,11 @@ pub enum DieselPool {
impl DieselPool {
pub(crate) fn new(
url: &str,
config: r2d2::Builder<ConnectionManager<PgConnection>>,
config: &config::DatabasePools,
r2d2_config: r2d2::Builder<ConnectionManager<PgConnection>>,
time_to_obtain_connection_metric: Histogram,
) -> Result<DieselPool, PoolError> {
let manager = ConnectionManager::new(connection_url(url));
let manager = ConnectionManager::new(connection_url(config, url));

// For crates.io we want the behavior of creating a database pool to be slightly different
// than the defaults of R2D2: the library's build() method assumes its consumers always
Expand All @@ -39,7 +41,7 @@ impl DieselPool {
// establish any connection continue booting up the application. The database pool will
// automatically be marked as unhealthy and the rest of the application will adapt.
let pool = DieselPool::Pool {
pool: config.build_unchecked(manager),
pool: r2d2_config.build_unchecked(manager),
time_to_obtain_connection_metric,
};
match pool.wait_until_healthy(Duration::from_secs(5)) {
Expand All @@ -51,9 +53,9 @@ impl DieselPool {
Ok(pool)
}

pub(crate) fn new_test(url: &str) -> DieselPool {
let conn =
PgConnection::establish(&connection_url(url)).expect("failed to establish connection");
pub(crate) fn new_test(config: &config::DatabasePools, url: &str) -> DieselPool {
let conn = PgConnection::establish(&connection_url(config, url))
.expect("failed to establish connection");
conn.begin_test_transaction()
.expect("failed to begin test transaction");
DieselPool::Test(Arc::new(ReentrantMutex::new(conn)))
Expand Down Expand Up @@ -131,19 +133,42 @@ impl Deref for DieselPooledConn<'_> {
}
}

pub fn connect_now() -> ConnectionResult<PgConnection> {
let url = connection_url(&crate::env("DATABASE_URL"));
pub fn oneoff_connection_with_config(
config: &config::DatabasePools,
) -> ConnectionResult<PgConnection> {
let url = connection_url(config, &config.primary.url);
PgConnection::establish(&url)
}

pub fn connection_url(url: &str) -> String {
pub fn oneoff_connection() -> ConnectionResult<PgConnection> {
let config = config::DatabasePools::full_from_environment(&config::Base::from_environment());
oneoff_connection_with_config(&config)
}

pub fn connection_url(config: &config::DatabasePools, url: &str) -> String {
let mut url = Url::parse(url).expect("Invalid database URL");
if dotenv::var("HEROKU").is_ok() && !url.query_pairs().any(|(k, _)| k == "sslmode") {
url.query_pairs_mut().append_pair("sslmode", "require");

if config.enforce_tls {
maybe_append_url_param(&mut url, "sslmode", "require");
}

// Configure the time it takes for diesel to return an error when there is full packet loss
// between the application and the database.
maybe_append_url_param(
&mut url,
"tcp_user_timeout",
&config.tcp_timeout_ms.to_string(),
);

url.into()
}

fn maybe_append_url_param(url: &mut Url, key: &str, value: &str) {
if !url.query_pairs().any(|(k, _)| k == key) {
url.query_pairs_mut().append_pair(key, value);
}
}

pub trait RequestTransaction {
/// Obtain a read/write database connection from the primary pool
fn db_conn(&self) -> Result<DieselPooledConn<'_>, PoolError>;
Expand Down