diff --git a/src/admin/delete_crate.rs b/src/admin/delete_crate.rs index f710d4b6c81..81805452b8e 100644 --- a/src/admin/delete_crate.rs +++ b/src/admin/delete_crate.rs @@ -14,7 +14,7 @@ pub struct Opts { } pub fn run(opts: Opts) { - let conn = db::connect_now().unwrap(); + let conn = db::oneoff_connection().unwrap(); conn.transaction::<_, diesel::result::Error, _>(|| { delete(opts, &conn); Ok(()) diff --git a/src/admin/delete_version.rs b/src/admin/delete_version.rs index 1f3fd188541..e6c0a0cf227 100644 --- a/src/admin/delete_version.rs +++ b/src/admin/delete_version.rs @@ -21,7 +21,7 @@ pub struct Opts { } pub fn run(opts: Opts) { - let conn = db::connect_now().unwrap(); + let conn = db::oneoff_connection().unwrap(); conn.transaction::<_, diesel::result::Error, _>(|| { delete(opts, &conn); Ok(()) diff --git a/src/admin/migrate.rs b/src/admin/migrate.rs index bf3932714a6..808caa58d99 100644 --- a/src/admin/migrate.rs +++ b/src/admin/migrate.rs @@ -11,13 +11,15 @@ diesel_migrations::embed_migrations!("./migrations"); pub struct Opts; pub fn run(_opts: Opts) -> Result<(), Error> { - let db_config = crate::config::DatabasePools::full_from_environment(); + let config = crate::config::DatabasePools::full_from_environment( + &crate::config::Base::from_environment(), + ); // TODO: Refactor logic so that we can also check things from App::new() here. // If the app will panic due to bad configuration, it is better to error in the release phase // to avoid launching dynos that will fail. - if db_config.are_all_read_only() { + if config.are_all_read_only() { // TODO: Check `any_pending_migrations()` with a read-only connection and error if true. // It looks like this requires changes upstream to make this pub in `migration_macros`. @@ -29,13 +31,14 @@ pub fn run(_opts: Opts) -> Result<(), Error> { return Ok(()); } - println!("==> migrating the database"); // The primary is online, access directly via `DATABASE_URL`. - let conn = crate::db::connect_now()?; + let conn = crate::db::oneoff_connection_with_config(&config)?; + + println!("==> migrating the database"); embedded_migrations::run_with_output(&conn, &mut std::io::stdout())?; println!("==> synchronizing crate categories"); - crate::boot::categories::sync(CATEGORIES_TOML).unwrap(); + crate::boot::categories::sync_with_connection(CATEGORIES_TOML, &conn).unwrap(); Ok(()) } diff --git a/src/admin/populate.rs b/src/admin/populate.rs index cb1f8afb2dc..eca1bf548dd 100644 --- a/src/admin/populate.rs +++ b/src/admin/populate.rs @@ -14,7 +14,7 @@ pub struct Opts { } pub fn run(opts: Opts) { - let conn = db::connect_now().unwrap(); + let conn = db::oneoff_connection().unwrap(); conn.transaction(|| update(opts, &conn)).unwrap(); } diff --git a/src/admin/render_readmes.rs b/src/admin/render_readmes.rs index 9fabe742888..7b5899b0761 100644 --- a/src/admin/render_readmes.rs +++ b/src/admin/render_readmes.rs @@ -40,7 +40,7 @@ pub struct Opts { pub fn run(opts: Opts) -> anyhow::Result<()> { let base_config = Arc::new(config::Base::from_environment()); - let conn = db::connect_now().unwrap(); + let conn = db::oneoff_connection().unwrap(); let start_time = Utc::now(); diff --git a/src/admin/transfer_crates.rs b/src/admin/transfer_crates.rs index 2ba1edc772e..cf3bf6436cd 100644 --- a/src/admin/transfer_crates.rs +++ b/src/admin/transfer_crates.rs @@ -21,7 +21,7 @@ pub struct Opts { } pub fn run(opts: Opts) { - let conn = db::connect_now().unwrap(); + let conn = db::oneoff_connection().unwrap(); conn.transaction::<_, diesel::result::Error, _>(|| { transfer(opts, &conn); Ok(()) diff --git a/src/admin/verify_token.rs b/src/admin/verify_token.rs index 4a393c906eb..8595b75b7f3 100644 --- a/src/admin/verify_token.rs +++ b/src/admin/verify_token.rs @@ -13,7 +13,7 @@ pub struct Opts { } pub fn run(opts: Opts) -> AppResult<()> { - let conn = db::connect_now()?; + let conn = db::oneoff_connection()?; let user = User::find_by_api_token(&conn, &opts.api_token)?; println!("The token belongs to user {}", user.gh_login); Ok(()) diff --git a/src/app.rs b/src/app.rs index 3955363c559..eabc74bdf62 100644 --- a/src/app.rs +++ b/src/app.rs @@ -100,7 +100,7 @@ impl App { let thread_pool = Arc::new(ScheduledThreadPool::new(db_helper_threads)); let primary_database = if config.use_test_database_pool { - DieselPool::new_test(&config.db.primary.url) + DieselPool::new_test(&config.db, &config.db.primary.url) } else { let primary_db_connection_config = ConnectionConfig { statement_timeout: db_connection_timeout, @@ -116,6 +116,7 @@ impl App { DieselPool::new( &config.db.primary.url, + &config.db, primary_db_config, instance_metrics .database_time_to_obtain_connection @@ -126,7 +127,7 @@ impl App { let replica_database = if let Some(pool_config) = config.db.replica.as_ref() { if config.use_test_database_pool { - Some(DieselPool::new_test(&pool_config.url)) + Some(DieselPool::new_test(&config.db, &pool_config.url)) } else { let replica_db_connection_config = ConnectionConfig { statement_timeout: db_connection_timeout, @@ -143,6 +144,7 @@ impl App { Some( DieselPool::new( &pool_config.url, + &config.db, replica_db_config, instance_metrics .database_time_to_obtain_connection diff --git a/src/bin/background-worker.rs b/src/bin/background-worker.rs index 80111d66296..4bf286267fb 100644 --- a/src/bin/background-worker.rs +++ b/src/bin/background-worker.rs @@ -24,11 +24,10 @@ use std::time::Duration; fn main() { println!("Booting runner"); - let db_config = config::DatabasePools::full_from_environment(); - let base_config = config::Base::from_environment(); - let uploader = base_config.uploader(); + let config = config::Server::default(); + let uploader = config.base.uploader(); - if db_config.are_all_read_only() { + if config.db.are_all_read_only() { loop { println!( "Cannot run background jobs with a read-only pool. Please scale background_worker \ @@ -38,7 +37,7 @@ fn main() { } } - let db_url = db::connection_url(&db_config.primary.url); + let db_url = db::connection_url(&config.db, &config.db.primary.url); let job_start_timeout = dotenv::var("BACKGROUND_JOB_TIMEOUT") .unwrap_or_else(|_| "30".into()) diff --git a/src/bin/enqueue-job.rs b/src/bin/enqueue-job.rs index 3c9c80ff765..b10c96a7174 100644 --- a/src/bin/enqueue-job.rs +++ b/src/bin/enqueue-job.rs @@ -7,7 +7,7 @@ use swirl::schema::background_jobs::dsl::*; use swirl::Job; fn main() -> Result<()> { - let conn = db::connect_now()?; + let conn = db::oneoff_connection()?; let mut args = std::env::args().skip(1); let job = args.next().unwrap_or_default(); diff --git a/src/bin/monitor.rs b/src/bin/monitor.rs index 7cfc2874f63..68136d7098d 100644 --- a/src/bin/monitor.rs +++ b/src/bin/monitor.rs @@ -11,7 +11,7 @@ use cargo_registry::{admin::on_call, db, schema::*}; use diesel::prelude::*; fn main() -> Result<()> { - let conn = db::connect_now()?; + let conn = db::oneoff_connection()?; check_failing_background_jobs(&conn)?; check_stalled_update_downloads(&conn)?; diff --git a/src/boot/categories.rs b/src/boot/categories.rs index 632a585aeb6..41f45b9302d 100644 --- a/src/boot/categories.rs +++ b/src/boot/categories.rs @@ -1,8 +1,6 @@ // Sync available crate categories from `src/categories.toml`. // Runs when the server is started. -use crate::db; - use anyhow::{Context, Result}; use diesel::prelude::*; @@ -77,11 +75,6 @@ fn categories_from_toml( Ok(result) } -pub fn sync(toml_str: &str) -> Result<()> { - let conn = db::connect_now()?; - sync_with_connection(toml_str, &conn) -} - pub fn sync_with_connection(toml_str: &str, conn: &PgConnection) -> Result<()> { use crate::schema::categories::dsl::*; use diesel::dsl::all; diff --git a/src/config.rs b/src/config.rs index 02dd28dc90d..9353f488b23 100644 --- a/src/config.rs +++ b/src/config.rs @@ -83,9 +83,10 @@ impl Default for Server { Some(s) if s.is_empty() => vec![], Some(s) => s.split(',').map(String::from).collect(), }; + let base = Base::from_environment(); Server { - db: DatabasePools::full_from_environment(), - base: Base::from_environment(), + db: DatabasePools::full_from_environment(&base), + base, session_key: env("SESSION_KEY"), gh_client_id: env("GH_CLIENT_ID"), gh_client_secret: env("GH_CLIENT_SECRET"), diff --git a/src/config/database_pools.rs b/src/config/database_pools.rs index 8d2efca5307..a28b0cb8b82 100644 --- a/src/config/database_pools.rs +++ b/src/config/database_pools.rs @@ -9,8 +9,10 @@ //! - `DB_OFFLINE`: If set to `leader` then use the read-only follower as if it was the leader. //! If set to `follower` then act as if `READ_ONLY_REPLICA_URL` was unset. //! - `READ_ONLY_MODE`: If defined (even as empty) then force all connections to be read-only. +//! - `DB_TCP_TIMEOUT_MS`: TCP timeout in milliseconds. See the doc comment for more details. -use crate::env; +use crate::config::Base; +use crate::{env, Env}; pub struct DatabasePools { /// Settings for the primary database. This is usually writeable, but will be read-only in @@ -18,6 +20,14 @@ pub struct DatabasePools { pub primary: DbPoolConfig, /// An optional follower database. Always read-only. pub replica: Option, + /// Number of seconds to wait for unacknowledged TCP packets before treating the connection as + /// broken. This value will determine how long crates.io stays unavailable in case of full + /// packet loss between the application and the database: setting it too high will result in an + /// unnecessarily long outage (before the unhealthy database logic kicks in), while setting it + /// too low might result in healthy connections being dropped. + pub tcp_timeout_ms: u64, + /// Whether to enforce that all the database connections are encrypted with TLS. + pub enforce_tls: bool, } #[derive(Debug)] @@ -42,7 +52,7 @@ impl DatabasePools { /// # Panics /// /// This function panics if `DB_OFFLINE=leader` but `READ_ONLY_REPLICA_URL` is unset. - pub fn full_from_environment() -> Self { + pub fn full_from_environment(base: &Base) -> Self { let leader_url = env("DATABASE_URL"); let follower_url = dotenv::var("READ_ONLY_REPLICA_URL").ok(); let read_only_mode = dotenv::var("READ_ONLY_MODE").is_ok(); @@ -67,6 +77,13 @@ impl DatabasePools { _ => None, }; + let tcp_timeout_ms = match dotenv::var("DB_TCP_TIMEOUT_MS") { + Ok(num) => num.parse().expect("couldn't parse DB_TCP_TIMEOUT_MS"), + Err(_) => 15 * 1000, // 15 seconds + }; + + let enforce_tls = base.env == Env::Production; + match dotenv::var("DB_OFFLINE").as_deref() { // The actual leader is down, use the follower in read-only mode as the primary and // don't configure a replica. @@ -79,6 +96,8 @@ impl DatabasePools { min_idle: primary_min_idle, }, replica: None, + tcp_timeout_ms, + enforce_tls, }, // The follower is down, don't configure the replica. Ok("follower") => Self { @@ -89,6 +108,8 @@ impl DatabasePools { min_idle: primary_min_idle, }, replica: None, + tcp_timeout_ms, + enforce_tls, }, _ => Self { primary: DbPoolConfig { @@ -106,6 +127,8 @@ impl DatabasePools { pool_size: replica_pool_size, min_idle: replica_min_idle, }), + tcp_timeout_ms, + enforce_tls, }, } } @@ -119,6 +142,8 @@ impl DatabasePools { min_idle: None, }, replica: None, + tcp_timeout_ms: 1000, // 1 second + enforce_tls: false, } } } diff --git a/src/db.rs b/src/db.rs index 68f9c049b72..06242a53fa7 100644 --- a/src/db.rs +++ b/src/db.rs @@ -8,6 +8,7 @@ use std::{ops::Deref, time::Duration}; use thiserror::Error; use url::Url; +use crate::config; use crate::middleware::app::RequestApp; #[derive(Clone)] @@ -22,10 +23,11 @@ pub enum DieselPool { impl DieselPool { pub(crate) fn new( url: &str, - config: r2d2::Builder>, + config: &config::DatabasePools, + r2d2_config: r2d2::Builder>, time_to_obtain_connection_metric: Histogram, ) -> Result { - let manager = ConnectionManager::new(connection_url(url)); + let manager = ConnectionManager::new(connection_url(config, url)); // For crates.io we want the behavior of creating a database pool to be slightly different // than the defaults of R2D2: the library's build() method assumes its consumers always @@ -39,7 +41,7 @@ impl DieselPool { // establish any connection continue booting up the application. The database pool will // automatically be marked as unhealthy and the rest of the application will adapt. let pool = DieselPool::Pool { - pool: config.build_unchecked(manager), + pool: r2d2_config.build_unchecked(manager), time_to_obtain_connection_metric, }; match pool.wait_until_healthy(Duration::from_secs(5)) { @@ -51,9 +53,9 @@ impl DieselPool { Ok(pool) } - pub(crate) fn new_test(url: &str) -> DieselPool { - let conn = - PgConnection::establish(&connection_url(url)).expect("failed to establish connection"); + pub(crate) fn new_test(config: &config::DatabasePools, url: &str) -> DieselPool { + let conn = PgConnection::establish(&connection_url(config, url)) + .expect("failed to establish connection"); conn.begin_test_transaction() .expect("failed to begin test transaction"); DieselPool::Test(Arc::new(ReentrantMutex::new(conn))) @@ -131,19 +133,42 @@ impl Deref for DieselPooledConn<'_> { } } -pub fn connect_now() -> ConnectionResult { - let url = connection_url(&crate::env("DATABASE_URL")); +pub fn oneoff_connection_with_config( + config: &config::DatabasePools, +) -> ConnectionResult { + let url = connection_url(config, &config.primary.url); PgConnection::establish(&url) } -pub fn connection_url(url: &str) -> String { +pub fn oneoff_connection() -> ConnectionResult { + let config = config::DatabasePools::full_from_environment(&config::Base::from_environment()); + oneoff_connection_with_config(&config) +} + +pub fn connection_url(config: &config::DatabasePools, url: &str) -> String { let mut url = Url::parse(url).expect("Invalid database URL"); - if dotenv::var("HEROKU").is_ok() && !url.query_pairs().any(|(k, _)| k == "sslmode") { - url.query_pairs_mut().append_pair("sslmode", "require"); + + if config.enforce_tls { + maybe_append_url_param(&mut url, "sslmode", "require"); } + + // Configure the time it takes for diesel to return an error when there is full packet loss + // between the application and the database. + maybe_append_url_param( + &mut url, + "tcp_user_timeout", + &config.tcp_timeout_ms.to_string(), + ); + url.into() } +fn maybe_append_url_param(url: &mut Url, key: &str, value: &str) { + if !url.query_pairs().any(|(k, _)| k == key) { + url.query_pairs_mut().append_pair(key, value); + } +} + pub trait RequestTransaction { /// Obtain a read/write database connection from the primary pool fn db_conn(&self) -> Result, PoolError>;