From 192a09878e94049861bd07c8d01b17607ec43d92 Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Tue, 1 Jul 2025 11:40:52 -0700 Subject: [PATCH 1/2] Add retry logic to transaction --- Cargo.lock | 76 ++++++- Cargo.toml | 2 + crates/iceberg/Cargo.toml | 2 + crates/iceberg/src/catalog/mod.rs | 2 + crates/iceberg/src/error.rs | 5 + crates/iceberg/src/spec/table_metadata.rs | 20 ++ crates/iceberg/src/transaction/mod.rs | 237 +++++++++++++++++++++- 7 files changed, 339 insertions(+), 5 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 552fcb0892..6a56ce374f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1032,9 +1032,9 @@ dependencies = [ [[package]] name = "backon" -version = "1.3.0" +version = "1.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ba5289ec98f68f28dd809fd601059e6aa908bb8f6108620930828283d4ee23d7" +checksum = "302eaff5357a264a2c42f127ecb8bac761cf99749fc3dc95677e2743991f99e7" dependencies = [ "fastrand", "gloo-timers", @@ -2638,6 +2638,12 @@ version = "0.15.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1aaf95b3e5c8f23aa320147307562d361db0ae0d51242340f558153b4eb2439b" +[[package]] +name = "downcast" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1435fa1053d8b2fbbe9be7e97eca7f33d37b28409959813daefc1446a14247f1" + [[package]] name = "dunce" version = "1.0.5" @@ -2896,6 +2902,12 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "fragile" +version = "2.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "28dd6caf6059519a65843af8fe2a3ae298b14b80179855aeb4adc2c1934ee619" + [[package]] name = "fs-err" version = "3.1.0" @@ -3500,6 +3512,7 @@ dependencies = [ "as-any", "async-std", "async-trait", + "backon", "base64 0.22.1", "bimap", "bytes", @@ -3511,6 +3524,7 @@ dependencies = [ "futures", "iceberg_test_utils", "itertools 0.13.0", + "mockall", "moka", "murmur3", "num-bigint", @@ -4434,6 +4448,32 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "mockall" +version = "0.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "39a6bfcc6c8c7eed5ee98b9c3e33adc726054389233e201c95dab2d41a3839d2" +dependencies = [ + "cfg-if", + "downcast", + "fragile", + "mockall_derive", + "predicates", + "predicates-tree", +] + +[[package]] +name = "mockall_derive" +version = "0.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "25ca3004c2efe9011bd4e461bd8256445052b9615405b4f7ea43fc8ca5c20898" +dependencies = [ + "cfg-if", + "proc-macro2", + "quote", + "syn 2.0.101", +] + [[package]] name = "mockito" version = "1.6.1" @@ -5249,6 +5289,32 @@ dependencies = [ "zerocopy 0.7.35", ] +[[package]] +name = "predicates" +version = "3.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a5d19ee57562043d37e82899fade9a22ebab7be9cef5026b07fda9cdd4293573" +dependencies = [ + "anstyle", + "predicates-core", +] + +[[package]] +name = "predicates-core" +version = "1.0.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "727e462b119fe9c93fd0eb1429a5f7647394014cf3c04ab2c0350eeb09095ffa" + +[[package]] +name = "predicates-tree" +version = "1.0.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "72dd2d6d381dfb73a193c7fca536518d7caee39fc8503f74e7dc0be0531b425c" +dependencies = [ + "predicates-core", + "termtree", +] + [[package]] name = "pretty_assertions" version = "1.4.1" @@ -6883,6 +6949,12 @@ dependencies = [ "unic-segment", ] +[[package]] +name = "termtree" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f50febec83f5ee1df3015341d8bd429f2d1cc62bcba7ea2076759d315084683" + [[package]] name = "thiserror" version = "1.0.69" diff --git a/Cargo.toml b/Cargo.toml index 9c6d22de4e..3176cfdb79 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -55,6 +55,7 @@ async-std = "1.12" async-trait = "0.1.88" aws-config = "1.6.1" aws-sdk-glue = "1.39" +backon = "1.5.1" base64 = "0.22.1" bimap = "0.6" bytes = "1.10" @@ -82,6 +83,7 @@ itertools = "0.13" linkedbytes = "0.1.8" metainfo = "0.7.14" mimalloc = "0.1.46" +mockall = "0.13.1" mockito = "1" motore-macros = "0.4.3" murmur3 = "0.5.2" diff --git a/crates/iceberg/Cargo.toml b/crates/iceberg/Cargo.toml index ac56c0b3dd..6ae17d21fd 100644 --- a/crates/iceberg/Cargo.toml +++ b/crates/iceberg/Cargo.toml @@ -57,6 +57,7 @@ arrow-string = { workspace = true } as-any = { workspace = true } async-std = { workspace = true, optional = true, features = ["attributes"] } async-trait = { workspace = true } +backon = { workspace = true } base64 = { workspace = true } bimap = { workspace = true } bytes = { workspace = true } @@ -66,6 +67,7 @@ expect-test = { workspace = true } fnv = { workspace = true } futures = { workspace = true } itertools = { workspace = true } +mockall = { workspace = true } moka = { version = "0.12.10", features = ["future"] } murmur3 = { workspace = true } num-bigint = { workspace = true } diff --git a/crates/iceberg/src/catalog/mod.rs b/crates/iceberg/src/catalog/mod.rs index 409a80ee40..a9b92d47e9 100644 --- a/crates/iceberg/src/catalog/mod.rs +++ b/crates/iceberg/src/catalog/mod.rs @@ -29,6 +29,7 @@ use std::sync::Arc; use _serde::deserialize_snapshot; use async_trait::async_trait; pub use memory::MemoryCatalog; +use mockall::automock; use serde_derive::{Deserialize, Serialize}; use typed_builder::TypedBuilder; use uuid::Uuid; @@ -43,6 +44,7 @@ use crate::{Error, ErrorKind, Result}; /// The catalog API for Iceberg Rust. #[async_trait] +#[automock] pub trait Catalog: Debug + Sync + Send { /// List namespaces inside the catalog. async fn list_namespaces(&self, parent: Option<&NamespaceIdent>) diff --git a/crates/iceberg/src/error.rs b/crates/iceberg/src/error.rs index 9f299fb6a9..067329d524 100644 --- a/crates/iceberg/src/error.rs +++ b/crates/iceberg/src/error.rs @@ -320,6 +320,11 @@ impl Error { self.kind } + /// Return error's retryable status + pub fn retryable(&self) -> bool { + self.retryable + } + /// Return error's message. #[inline] pub fn message(&self) -> &str { diff --git a/crates/iceberg/src/spec/table_metadata.rs b/crates/iceberg/src/spec/table_metadata.rs index c3156e56a4..3789e7ed59 100644 --- a/crates/iceberg/src/spec/table_metadata.rs +++ b/crates/iceberg/src/spec/table_metadata.rs @@ -98,6 +98,26 @@ pub const RESERVED_PROPERTIES: [&str; 9] = [ PROPERTY_DEFAULT_SORT_ORDER, ]; +/// Property key for number of commit retries. +pub const COMMIT_NUM_RETRIES: &str = "commit.retry.num-retries"; +/// Default value for number of commit retries. +pub const COMMIT_NUM_RETRIES_DEFAULT: usize = 4; + +/// Property key for minimum wait time (ms) between retries. +pub const COMMIT_MIN_RETRY_WAIT_MS: &str = "commit.retry.min-wait-ms"; +/// Default value for minimum wait time (ms) between retries. +pub const COMMIT_MIN_RETRY_WAIT_MS_DEFAULT: u64 = 100; + +/// Property key for maximum wait time (ms) between retries. +pub const COMMIT_MAX_RETRY_WAIT_MS: &str = "commit.retry.max-wait-ms"; +/// Default value for maximum wait time (ms) between retries. +pub const COMMIT_MAX_RETRY_WAIT_MS_DEFAULT: u64 = 60 * 1000; // 1 minute + +/// Property key for total maximum retry time (ms). +pub const COMMIT_TOTAL_RETRY_TIME_MS: &str = "commit.retry.total-timeout-ms"; +/// Default value for total maximum retry time (ms). +pub const COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT: u64 = 30 * 60 * 1000; // 30 minutes + /// Reference to [`TableMetadata`]. pub type TableMetadataRef = Arc; diff --git a/crates/iceberg/src/transaction/mod.rs b/crates/iceberg/src/transaction/mod.rs index f96a15b73c..47e8f3d55e 100644 --- a/crates/iceberg/src/transaction/mod.rs +++ b/crates/iceberg/src/transaction/mod.rs @@ -51,6 +51,9 @@ /// The `ApplyTransactionAction` trait provides an `apply` method /// that allows users to apply a transaction action to a `Transaction`. mod action; + +use std::collections::HashMap; + pub use action::*; mod append; mod snapshot; @@ -61,8 +64,16 @@ mod update_statistics; mod upgrade_format_version; use std::sync::Arc; +use std::time::Duration; + +use backon::{BackoffBuilder, ExponentialBackoff, ExponentialBuilder, RetryableWithContext}; use crate::error::Result; +use crate::spec::{ + COMMIT_MAX_RETRY_WAIT_MS, COMMIT_MAX_RETRY_WAIT_MS_DEFAULT, COMMIT_MIN_RETRY_WAIT_MS, + COMMIT_MIN_RETRY_WAIT_MS_DEFAULT, COMMIT_NUM_RETRIES, COMMIT_NUM_RETRIES_DEFAULT, + COMMIT_TOTAL_RETRY_TIME_MS, COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT, +}; use crate::table::Table; use crate::transaction::action::BoxedTransactionAction; use crate::transaction::append::FastAppendAction; @@ -152,13 +163,59 @@ impl Transaction { } /// Commit transaction. - pub async fn commit(mut self, catalog: &dyn Catalog) -> Result { + pub async fn commit(self, catalog: &dyn Catalog) -> Result
{ if self.actions.is_empty() { // nothing to commit return Ok(self.table); } - self.do_commit(catalog).await + let backoff = Self::build_backoff(self.table.metadata().properties()); + let tx = self; + + (|mut tx: Transaction| async { + let result = tx.do_commit(catalog).await; + (tx, result) + }) + .retry(backoff) + .sleep(tokio::time::sleep) + .context(tx) + .when(|e| e.retryable()) + .await + .1 + } + + fn build_backoff(props: &HashMap) -> ExponentialBackoff { + ExponentialBuilder::new() + .with_min_delay(Duration::from_millis( + props + .get(COMMIT_MIN_RETRY_WAIT_MS) + .map(|s| s.parse()) + .unwrap_or_else(|| Ok(COMMIT_MIN_RETRY_WAIT_MS_DEFAULT)) + .expect("Invalid value for commit.retry.min-wait-ms"), + )) + .with_max_delay(Duration::from_millis( + props + .get(COMMIT_MAX_RETRY_WAIT_MS) + .map(|s| s.parse()) + .unwrap_or_else(|| Ok(COMMIT_MAX_RETRY_WAIT_MS_DEFAULT)) + .expect("Invalid value for commit.retry.max-wait-ms"), + )) + .with_total_delay(Some(Duration::from_millis( + props + .get(COMMIT_TOTAL_RETRY_TIME_MS) + .map(|s| s.parse()) + .unwrap_or_else(|| Ok(COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT)) + .expect("Invalid value for commit.retry.total-timeout-ms"), + ))) + .with_max_times( + props + .get(COMMIT_NUM_RETRIES) + .map(|s| s.parse()) + .unwrap_or_else(|| Ok(COMMIT_NUM_RETRIES_DEFAULT)) + .expect("Invalid value for commit.retry.num-retries"), + ) + .with_factor(2.0) + .build() } async fn do_commit(&mut self, catalog: &dyn Catalog) -> Result
{ @@ -198,13 +255,17 @@ impl Transaction { #[cfg(test)] mod tests { + use std::collections::HashMap; use std::fs::File; use std::io::BufReader; + use std::sync::Arc; - use crate::TableIdent; + use crate::catalog::MockCatalog; use crate::io::FileIOBuilder; use crate::spec::TableMetadata; use crate::table::Table; + use crate::transaction::{ApplyTransactionAction, Transaction}; + use crate::{Error, ErrorKind, TableIdent}; pub fn make_v1_table() -> Table { let file = File::open(format!( @@ -262,4 +323,174 @@ mod tests { .build() .unwrap() } + + /// Helper function to create a test table with retry properties + fn setup_test_table(num_retries: &str) -> Table { + let table = make_v2_table(); + + // Set retry properties + let mut props = HashMap::new(); + props.insert("commit.retry.min-wait-ms".to_string(), "10".to_string()); + props.insert("commit.retry.max-wait-ms".to_string(), "100".to_string()); + props.insert( + "commit.retry.total-timeout-ms".to_string(), + "1000".to_string(), + ); + props.insert( + "commit.retry.num-retries".to_string(), + num_retries.to_string(), + ); + + // Update table properties + let metadata = table + .metadata() + .clone() + .into_builder(None) + .set_properties(props) + .unwrap() + .build() + .unwrap() + .metadata; + + table.with_metadata(Arc::new(metadata)) + } + + /// Helper function to create a transaction with a simple update action + fn create_test_transaction(table: &Table) -> Transaction { + let tx = Transaction::new(table); + tx.update_table_properties() + .set("test.key".to_string(), "test.value".to_string()) + .apply(tx) + .unwrap() + } + + /// Helper function to set up a mock catalog with retryable errors + fn setup_mock_catalog_with_retryable_errors( + success_after_attempts: Option, + expected_calls: usize, + ) -> MockCatalog { + let mut mock_catalog = MockCatalog::new(); + + mock_catalog + .expect_load_table() + .returning_st(|_| Box::pin(async move { Ok(make_v2_table()) })); + + mock_catalog + .expect_update_table() + .times(expected_calls) + .returning_st(move |_| { + if let Some(attempts) = success_after_attempts { + static mut ATTEMPTS: u32 = 0; + unsafe { + ATTEMPTS += 1; + if ATTEMPTS <= attempts { + Box::pin(async move { + Err(Error::new( + ErrorKind::CatalogCommitConflicts, + "Commit conflict", + ) + .with_retryable(true)) + }) + } else { + Box::pin(async move { Ok(make_v2_table()) }) + } + } + } else { + // Always fail with retryable error + Box::pin(async move { + Err( + Error::new(ErrorKind::CatalogCommitConflicts, "Commit conflict") + .with_retryable(true), + ) + }) + } + }); + + mock_catalog + } + + /// Helper function to set up a mock catalog with non-retryable error + fn setup_mock_catalog_with_non_retryable_error() -> MockCatalog { + let mut mock_catalog = MockCatalog::new(); + + mock_catalog + .expect_load_table() + .returning_st(|_| Box::pin(async move { Ok(make_v2_table()) })); + + mock_catalog + .expect_update_table() + .times(1) // Should only be called once since error is not retryable + .returning_st(move |_| { + Box::pin(async move { + Err(Error::new(ErrorKind::Unexpected, "Non-retryable error") + .with_retryable(false)) + }) + }); + + mock_catalog + } + + #[tokio::test] + async fn test_commit_retryable_error() { + // Create a test table with retry properties + let table = setup_test_table("3"); + + // Create a transaction with a simple update action + let tx = create_test_transaction(&table); + + // Create a mock catalog that fails twice then succeeds + let mock_catalog = setup_mock_catalog_with_retryable_errors(Some(2), 3); + + // Commit the transaction + let result = tx.commit(&mock_catalog).await; + + // Verify the result + assert!(result.is_ok(), "Transaction should eventually succeed"); + } + + #[tokio::test] + async fn test_commit_non_retryable_error() { + // Create a test table with retry properties + let table = setup_test_table("3"); + + // Create a transaction with a simple update action + let tx = create_test_transaction(&table); + + // Create a mock catalog that fails with non-retryable error + let mock_catalog = setup_mock_catalog_with_non_retryable_error(); + + // Commit the transaction + let result = tx.commit(&mock_catalog).await; + + // Verify the result + assert!(result.is_err(), "Transaction should fail immediately"); + if let Err(err) = result { + assert_eq!(err.kind(), ErrorKind::Unexpected); + assert_eq!(err.message(), "Non-retryable error"); + assert!(!err.retryable(), "Error should not be retryable"); + } + } + + #[tokio::test] + async fn test_commit_max_retries_exceeded() { + // Create a test table with retry properties (only allow 2 retries) + let table = setup_test_table("2"); + + // Create a transaction with a simple update action + let tx = create_test_transaction(&table); + + // Create a mock catalog that always fails with retryable error + let mock_catalog = setup_mock_catalog_with_retryable_errors(None, 3); // Initial attempt + 2 retries = 3 total attempts + + // Commit the transaction + let result = tx.commit(&mock_catalog).await; + + // Verify the result + assert!(result.is_err(), "Transaction should fail after max retries"); + if let Err(err) = result { + assert_eq!(err.kind(), ErrorKind::CatalogCommitConflicts); + assert_eq!(err.message(), "Commit conflict"); + assert!(err.retryable(), "Error should be retryable"); + } + } } From 845f654b6e965dab8c795abfe01ec5cd16987c78 Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Sun, 13 Jul 2025 21:55:34 -0700 Subject: [PATCH 2/2] change property names, safer tests --- crates/iceberg/src/spec/table_metadata.rs | 16 +-- crates/iceberg/src/transaction/mod.rs | 118 ++++++++++++---------- 2 files changed, 75 insertions(+), 59 deletions(-) diff --git a/crates/iceberg/src/spec/table_metadata.rs b/crates/iceberg/src/spec/table_metadata.rs index 3789e7ed59..2604eac03d 100644 --- a/crates/iceberg/src/spec/table_metadata.rs +++ b/crates/iceberg/src/spec/table_metadata.rs @@ -99,24 +99,24 @@ pub const RESERVED_PROPERTIES: [&str; 9] = [ ]; /// Property key for number of commit retries. -pub const COMMIT_NUM_RETRIES: &str = "commit.retry.num-retries"; +pub const PROPERTY_COMMIT_NUM_RETRIES: &str = "commit.retry.num-retries"; /// Default value for number of commit retries. -pub const COMMIT_NUM_RETRIES_DEFAULT: usize = 4; +pub const PROPERTY_COMMIT_NUM_RETRIES_DEFAULT: usize = 4; /// Property key for minimum wait time (ms) between retries. -pub const COMMIT_MIN_RETRY_WAIT_MS: &str = "commit.retry.min-wait-ms"; +pub const PROPERTY_COMMIT_MIN_RETRY_WAIT_MS: &str = "commit.retry.min-wait-ms"; /// Default value for minimum wait time (ms) between retries. -pub const COMMIT_MIN_RETRY_WAIT_MS_DEFAULT: u64 = 100; +pub const PROPERTY_COMMIT_MIN_RETRY_WAIT_MS_DEFAULT: u64 = 100; /// Property key for maximum wait time (ms) between retries. -pub const COMMIT_MAX_RETRY_WAIT_MS: &str = "commit.retry.max-wait-ms"; +pub const PROPERTY_COMMIT_MAX_RETRY_WAIT_MS: &str = "commit.retry.max-wait-ms"; /// Default value for maximum wait time (ms) between retries. -pub const COMMIT_MAX_RETRY_WAIT_MS_DEFAULT: u64 = 60 * 1000; // 1 minute +pub const PROPERTY_COMMIT_MAX_RETRY_WAIT_MS_DEFAULT: u64 = 60 * 1000; // 1 minute /// Property key for total maximum retry time (ms). -pub const COMMIT_TOTAL_RETRY_TIME_MS: &str = "commit.retry.total-timeout-ms"; +pub const PROPERTY_COMMIT_TOTAL_RETRY_TIME_MS: &str = "commit.retry.total-timeout-ms"; /// Default value for total maximum retry time (ms). -pub const COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT: u64 = 30 * 60 * 1000; // 30 minutes +pub const PROPERTY_COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT: u64 = 30 * 60 * 1000; // 30 minutes /// Reference to [`TableMetadata`]. pub type TableMetadataRef = Arc; diff --git a/crates/iceberg/src/transaction/mod.rs b/crates/iceberg/src/transaction/mod.rs index 47e8f3d55e..06549a95c5 100644 --- a/crates/iceberg/src/transaction/mod.rs +++ b/crates/iceberg/src/transaction/mod.rs @@ -70,9 +70,10 @@ use backon::{BackoffBuilder, ExponentialBackoff, ExponentialBuilder, RetryableWi use crate::error::Result; use crate::spec::{ - COMMIT_MAX_RETRY_WAIT_MS, COMMIT_MAX_RETRY_WAIT_MS_DEFAULT, COMMIT_MIN_RETRY_WAIT_MS, - COMMIT_MIN_RETRY_WAIT_MS_DEFAULT, COMMIT_NUM_RETRIES, COMMIT_NUM_RETRIES_DEFAULT, - COMMIT_TOTAL_RETRY_TIME_MS, COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT, + PROPERTY_COMMIT_MAX_RETRY_WAIT_MS, PROPERTY_COMMIT_MAX_RETRY_WAIT_MS_DEFAULT, + PROPERTY_COMMIT_MIN_RETRY_WAIT_MS, PROPERTY_COMMIT_MIN_RETRY_WAIT_MS_DEFAULT, + PROPERTY_COMMIT_NUM_RETRIES, PROPERTY_COMMIT_NUM_RETRIES_DEFAULT, + PROPERTY_COMMIT_TOTAL_RETRY_TIME_MS, PROPERTY_COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT, }; use crate::table::Table; use crate::transaction::action::BoxedTransactionAction; @@ -82,7 +83,7 @@ use crate::transaction::update_location::UpdateLocationAction; use crate::transaction::update_properties::UpdatePropertiesAction; use crate::transaction::update_statistics::UpdateStatisticsAction; use crate::transaction::upgrade_format_version::UpgradeFormatVersionAction; -use crate::{Catalog, TableCommit, TableRequirement, TableUpdate}; +use crate::{Catalog, Error, ErrorKind, TableCommit, TableRequirement, TableUpdate}; /// Table transaction. #[derive(Clone)] @@ -169,7 +170,7 @@ impl Transaction { return Ok(self.table); } - let backoff = Self::build_backoff(self.table.metadata().properties()); + let backoff = Self::build_backoff(self.table.metadata().properties())?; let tx = self; (|mut tx: Transaction| async { @@ -184,38 +185,55 @@ impl Transaction { .1 } - fn build_backoff(props: &HashMap) -> ExponentialBackoff { - ExponentialBuilder::new() - .with_min_delay(Duration::from_millis( - props - .get(COMMIT_MIN_RETRY_WAIT_MS) - .map(|s| s.parse()) - .unwrap_or_else(|| Ok(COMMIT_MIN_RETRY_WAIT_MS_DEFAULT)) - .expect("Invalid value for commit.retry.min-wait-ms"), - )) - .with_max_delay(Duration::from_millis( - props - .get(COMMIT_MAX_RETRY_WAIT_MS) - .map(|s| s.parse()) - .unwrap_or_else(|| Ok(COMMIT_MAX_RETRY_WAIT_MS_DEFAULT)) - .expect("Invalid value for commit.retry.max-wait-ms"), - )) - .with_total_delay(Some(Duration::from_millis( - props - .get(COMMIT_TOTAL_RETRY_TIME_MS) - .map(|s| s.parse()) - .unwrap_or_else(|| Ok(COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT)) - .expect("Invalid value for commit.retry.total-timeout-ms"), - ))) - .with_max_times( - props - .get(COMMIT_NUM_RETRIES) - .map(|s| s.parse()) - .unwrap_or_else(|| Ok(COMMIT_NUM_RETRIES_DEFAULT)) - .expect("Invalid value for commit.retry.num-retries"), - ) + fn build_backoff(props: &HashMap) -> Result { + let min_delay = match props.get(PROPERTY_COMMIT_MIN_RETRY_WAIT_MS) { + Some(value_str) => value_str.parse::().map_err(|e| { + Error::new( + ErrorKind::DataInvalid, + "Invalid value for commit.retry.min-wait-ms", + ) + .with_source(e) + })?, + None => PROPERTY_COMMIT_MIN_RETRY_WAIT_MS_DEFAULT, + }; + let max_delay = match props.get(PROPERTY_COMMIT_MAX_RETRY_WAIT_MS) { + Some(value_str) => value_str.parse::().map_err(|e| { + Error::new( + ErrorKind::DataInvalid, + "Invalid value for commit.retry.max-wait-ms", + ) + .with_source(e) + })?, + None => PROPERTY_COMMIT_MAX_RETRY_WAIT_MS_DEFAULT, + }; + let total_delay = match props.get(PROPERTY_COMMIT_TOTAL_RETRY_TIME_MS) { + Some(value_str) => value_str.parse::().map_err(|e| { + Error::new( + ErrorKind::DataInvalid, + "Invalid value for commit.retry.total-timeout-ms", + ) + .with_source(e) + })?, + None => PROPERTY_COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT, + }; + let max_times = match props.get(PROPERTY_COMMIT_NUM_RETRIES) { + Some(value_str) => value_str.parse::().map_err(|e| { + Error::new( + ErrorKind::DataInvalid, + "Invalid value for commit.retry.num-retries", + ) + .with_source(e) + })?, + None => PROPERTY_COMMIT_NUM_RETRIES_DEFAULT, + }; + + Ok(ExponentialBuilder::new() + .with_min_delay(Duration::from_millis(min_delay)) + .with_max_delay(Duration::from_millis(max_delay)) + .with_total_delay(Some(Duration::from_millis(total_delay))) + .with_max_times(max_times) .with_factor(2.0) - .build() + .build()) } async fn do_commit(&mut self, catalog: &dyn Catalog) -> Result
{ @@ -259,6 +277,7 @@ mod tests { use std::fs::File; use std::io::BufReader; use std::sync::Arc; + use std::sync::atomic::{AtomicU32, Ordering}; use crate::catalog::MockCatalog; use crate::io::FileIOBuilder; @@ -375,25 +394,22 @@ mod tests { .expect_load_table() .returning_st(|_| Box::pin(async move { Ok(make_v2_table()) })); + let attempts = AtomicU32::new(0); mock_catalog .expect_update_table() .times(expected_calls) .returning_st(move |_| { - if let Some(attempts) = success_after_attempts { - static mut ATTEMPTS: u32 = 0; - unsafe { - ATTEMPTS += 1; - if ATTEMPTS <= attempts { - Box::pin(async move { - Err(Error::new( - ErrorKind::CatalogCommitConflicts, - "Commit conflict", - ) - .with_retryable(true)) - }) - } else { - Box::pin(async move { Ok(make_v2_table()) }) - } + if let Some(success_after_attempts) = success_after_attempts { + attempts.fetch_add(1, Ordering::SeqCst); + if attempts.load(Ordering::SeqCst) <= success_after_attempts { + Box::pin(async move { + Err( + Error::new(ErrorKind::CatalogCommitConflicts, "Commit conflict") + .with_retryable(true), + ) + }) + } else { + Box::pin(async move { Ok(make_v2_table()) }) } } else { // Always fail with retryable error