From 733e0623dfbd9d2a3c4be5432ed40e89f7f2e8c1 Mon Sep 17 00:00:00 2001 From: Prashant Date: Fri, 2 May 2025 16:50:54 -0700 Subject: [PATCH 01/12] Add retries --- .../relational/jdbc/DatasourceOperations.java | 205 +++++++++++++++--- 1 file changed, 169 insertions(+), 36 deletions(-) diff --git a/extension/persistence/relational-jdbc/src/main/java/org/apache/polaris/extension/persistence/relational/jdbc/DatasourceOperations.java b/extension/persistence/relational-jdbc/src/main/java/org/apache/polaris/extension/persistence/relational/jdbc/DatasourceOperations.java index a8a854ad4e..e6d4f4e8a7 100644 --- a/extension/persistence/relational-jdbc/src/main/java/org/apache/polaris/extension/persistence/relational/jdbc/DatasourceOperations.java +++ b/extension/persistence/relational-jdbc/src/main/java/org/apache/polaris/extension/persistence/relational/jdbc/DatasourceOperations.java @@ -35,11 +35,19 @@ import java.util.stream.Stream; import javax.sql.DataSource; import org.apache.polaris.extension.persistence.relational.jdbc.models.Converter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class DatasourceOperations { + private static final Logger LOGGER = LoggerFactory.getLogger(DatasourceOperations.class); + private static final String CONSTRAINT_VIOLATION_SQL_CODE = "23505"; + // POSTGRES RETRYABLE + private static final String DEADLOCK_SQL_CODE = "40P01"; + private static final String SERIALIZATION_FAILURE_SQL_CODE = "40001"; + private final DataSource datasource; public DatasourceOperations(DataSource datasource) { @@ -121,21 +129,70 @@ public void executeSelectOverStream( @Nonnull Converter converterInstance, @Nonnull Consumer> consumer) throws SQLException { - try (Connection connection = borrowConnection(); - Statement statement = connection.createStatement(); - ResultSet resultSet = statement.executeQuery(query)) { - ResultSetIterator iterator = new ResultSetIterator<>(resultSet, converterInstance); - consumer.accept(iterator.toStream()); - } catch (SQLException e) { - throw e; - } catch (RuntimeException e) { - if (e.getCause() instanceof SQLException) { - throw (SQLException) e.getCause(); - } else { - throw e; + int maxRetries = 3; + int retryCount = 0; + long initialDelay = 100; // milliseconds + boolean success = false; + while (retryCount < maxRetries && !success) { + try (Connection connection = borrowConnection(); + Statement statement = connection.createStatement(); + ResultSet resultSet = statement.executeQuery(query)) { + success = true; + ResultSetIterator iterator = new ResultSetIterator<>(resultSet, converterInstance); + consumer.accept(iterator.toStream()); + } catch (SQLException e) { + if (isRetryable(e)) { + retryCount++; + long delay = initialDelay * (long) Math.pow(2, retryCount - 1); // Exponential backoff + LOGGER.info( + ("Transient error occurred, retrying in " + + delay + + "ms (attempt " + + retryCount + + "/" + + maxRetries + + "): " + + e.getMessage()), + e); + try { + Thread.sleep(delay); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Retry interrupted", ie); + } + } else { + throw e; + } + } catch (RuntimeException e) { + if (e.getCause() instanceof SQLException ex) { + if (isRetryable(ex) && retryCount + 1 < maxRetries) { + retryCount++; + long delay = initialDelay * (long) Math.pow(2, retryCount - 1); // Exponential backoff + LOGGER.info( + ("Transient error occurred, retrying in " + + delay + + "ms (attempt " + + retryCount + + "/" + + maxRetries + + "): " + + e.getMessage()), + e); + try { + Thread.sleep(delay); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Retry interrupted", ie); + } + } else { + throw ex; + } + } else { + throw e; + } + } catch (Exception e) { + throw new RuntimeException(e); } - } catch (Exception e) { - throw new RuntimeException(e); } } @@ -147,16 +204,46 @@ public void executeSelectOverStream( * @throws SQLException : Exception during Query Execution. */ public int executeUpdate(String query) throws SQLException { - try (Connection connection = borrowConnection(); - Statement statement = connection.createStatement()) { - boolean autoCommit = connection.getAutoCommit(); - connection.setAutoCommit(true); - try { - return statement.executeUpdate(query); - } finally { - connection.setAutoCommit(autoCommit); + int maxRetries = 3; + int retryCount = 0; + long initialDelay = 100; // milliseconds + boolean success = false; + while (retryCount < maxRetries && !success) { + try (Connection connection = borrowConnection(); + Statement statement = connection.createStatement()) { + boolean autoCommit = connection.getAutoCommit(); + connection.setAutoCommit(true); + try { + return statement.executeUpdate(query); + } catch (SQLException e) { + if (isRetryable(e) && retryCount + 1 < maxRetries) { + retryCount++; + long delay = initialDelay * (long) Math.pow(2, retryCount - 1); // Exponential backoff + LOGGER.info( + ("Transient error occurred, retrying in " + + delay + + "ms (attempt " + + retryCount + + "/" + + maxRetries + + "): " + + e.getMessage()), + e); + try { + Thread.sleep(delay); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Retry interrupted", ie); + } + } else { + throw e; + } + } finally { + connection.setAutoCommit(autoCommit); + } } } + throw new RuntimeException("Error executing query: "); } /** @@ -166,23 +253,69 @@ public int executeUpdate(String query) throws SQLException { * @throws SQLException : Exception caught during transaction execution. */ public void runWithinTransaction(TransactionCallback callback) throws SQLException { - try (Connection connection = borrowConnection()) { - boolean autoCommit = connection.getAutoCommit(); - connection.setAutoCommit(false); - boolean success = false; - try { - try (Statement statement = connection.createStatement()) { - success = callback.execute(statement); - } - } finally { - if (success) { - connection.commit(); - } else { - connection.rollback(); + int maxRetries = 3; + int retryCount = 0; + long initialDelay = 100; // milliseconds + boolean success = false; + while (retryCount < maxRetries && !success) { + try (Connection connection = borrowConnection()) { + boolean autoCommit = connection.getAutoCommit(); + connection.setAutoCommit(false); + try { + try (Statement statement = connection.createStatement()) { + success = callback.execute(statement); + } catch (SQLException e) { + if (isRetryable(e) && retryCount + 1 < maxRetries) { + retryCount++; + long delay = initialDelay * (long) Math.pow(2, retryCount - 1); // Exponential backoff + LOGGER.info( + ("Transient error occurred, retrying in " + + delay + + "ms (attempt " + + retryCount + + "/" + + maxRetries + + "): " + + e.getMessage()), + e); + try { + Thread.sleep(delay); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Retry interrupted", ie); + } + } else { + throw e; + } + } + } finally { + if (success) { + connection.commit(); + } else { + connection.rollback(); + } + connection.setAutoCommit(autoCommit); } - connection.setAutoCommit(autoCommit); } } + + if (!success) { + throw new RuntimeException("Failed to execute transaction"); + } + } + + private boolean isRetryable(SQLException e) { + String sqlState = e.getSQLState(); + + if (sqlState != null) { + return sqlState.equals(DEADLOCK_SQL_CODE) + || // Deadlock detected + sqlState.equals(SERIALIZATION_FAILURE_SQL_CODE); // Serialization failure + } + + // Additionally, one might check for specific error messages or other conditions + return e.getMessage().contains("connection refused") + || e.getMessage().contains("connection reset"); } // Interface for transaction callback From 307a2c2e0346a4f27220ed2e73e9f6870737ed2b Mon Sep 17 00:00:00 2001 From: Prashant Date: Fri, 2 May 2025 18:22:24 -0700 Subject: [PATCH 02/12] beautify code --- .../relational-jdbc/build.gradle.kts | 2 + .../relational/jdbc/DatasourceOperations.java | 239 +++++++----------- .../jdbc/JdbcMetaStoreManagerFactory.java | 4 +- .../jdbc/RelationalJdbcConfiguration.java | 34 +++ ...anagerWithJdbcBasePersistenceImplTest.java | 23 +- .../jdbc/DatasourceOperationsTest.java | 131 +++++++++- ...gresRelationalJdbcLifeCycleManagement.java | 2 + 7 files changed, 281 insertions(+), 154 deletions(-) create mode 100644 extension/persistence/relational-jdbc/src/main/java/org/apache/polaris/extension/persistence/relational/jdbc/RelationalJdbcConfiguration.java diff --git a/extension/persistence/relational-jdbc/build.gradle.kts b/extension/persistence/relational-jdbc/build.gradle.kts index a8a61f60c7..beab76550d 100644 --- a/extension/persistence/relational-jdbc/build.gradle.kts +++ b/extension/persistence/relational-jdbc/build.gradle.kts @@ -31,7 +31,9 @@ dependencies { compileOnly("com.fasterxml.jackson.core:jackson-annotations") compileOnly(libs.jakarta.annotation.api) compileOnly(libs.jakarta.enterprise.cdi.api) + compileOnly(libs.jakarta.inject.api) + compileOnly(libs.smallrye.config.core) // @ConfigMapping implementation(libs.smallrye.common.annotation) // @Identifier diff --git a/extension/persistence/relational-jdbc/src/main/java/org/apache/polaris/extension/persistence/relational/jdbc/DatasourceOperations.java b/extension/persistence/relational-jdbc/src/main/java/org/apache/polaris/extension/persistence/relational/jdbc/DatasourceOperations.java index e6d4f4e8a7..4cbaacb190 100644 --- a/extension/persistence/relational-jdbc/src/main/java/org/apache/polaris/extension/persistence/relational/jdbc/DatasourceOperations.java +++ b/extension/persistence/relational-jdbc/src/main/java/org/apache/polaris/extension/persistence/relational/jdbc/DatasourceOperations.java @@ -28,9 +28,11 @@ import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; +import java.time.Instant; import java.util.ArrayList; import java.util.List; import java.util.Objects; +import java.util.Random; import java.util.function.Consumer; import java.util.stream.Stream; import javax.sql.DataSource; @@ -44,14 +46,19 @@ public class DatasourceOperations { private static final String CONSTRAINT_VIOLATION_SQL_CODE = "23505"; - // POSTGRES RETRYABLE + // POSTGRES RETRYABLE EXCEPTIONS private static final String DEADLOCK_SQL_CODE = "40P01"; private static final String SERIALIZATION_FAILURE_SQL_CODE = "40001"; private final DataSource datasource; + private final RelationalJdbcConfiguration relationalJdbcConfiguration; - public DatasourceOperations(DataSource datasource) { + private final Random random = new Random(); + + public DatasourceOperations( + DataSource datasource, RelationalJdbcConfiguration relationalJdbcConfiguration) { this.datasource = datasource; + this.relationalJdbcConfiguration = relationalJdbcConfiguration; } /** @@ -129,71 +136,21 @@ public void executeSelectOverStream( @Nonnull Converter converterInstance, @Nonnull Consumer> consumer) throws SQLException { - int maxRetries = 3; - int retryCount = 0; - long initialDelay = 100; // milliseconds - boolean success = false; - while (retryCount < maxRetries && !success) { - try (Connection connection = borrowConnection(); - Statement statement = connection.createStatement(); - ResultSet resultSet = statement.executeQuery(query)) { - success = true; - ResultSetIterator iterator = new ResultSetIterator<>(resultSet, converterInstance); - consumer.accept(iterator.toStream()); - } catch (SQLException e) { - if (isRetryable(e)) { - retryCount++; - long delay = initialDelay * (long) Math.pow(2, retryCount - 1); // Exponential backoff - LOGGER.info( - ("Transient error occurred, retrying in " - + delay - + "ms (attempt " - + retryCount - + "/" - + maxRetries - + "): " - + e.getMessage()), - e); - try { - Thread.sleep(delay); - } catch (InterruptedException ie) { - Thread.currentThread().interrupt(); - throw new RuntimeException("Retry interrupted", ie); - } - } else { - throw e; - } - } catch (RuntimeException e) { - if (e.getCause() instanceof SQLException ex) { - if (isRetryable(ex) && retryCount + 1 < maxRetries) { - retryCount++; - long delay = initialDelay * (long) Math.pow(2, retryCount - 1); // Exponential backoff - LOGGER.info( - ("Transient error occurred, retrying in " - + delay - + "ms (attempt " - + retryCount - + "/" - + maxRetries - + "): " - + e.getMessage()), - e); - try { - Thread.sleep(delay); - } catch (InterruptedException ie) { - Thread.currentThread().interrupt(); - throw new RuntimeException("Retry interrupted", ie); + withRetries( + () -> { + try (Connection connection = borrowConnection(); + Statement statement = connection.createStatement(); + ResultSet resultSet = statement.executeQuery(query)) { + ResultSetIterator iterator = new ResultSetIterator<>(resultSet, converterInstance); + consumer.accept(iterator.toStream()); + return null; + } catch (RuntimeException e) { + if (e.getCause() instanceof SQLException ex) { + throw ex; } - } else { - throw ex; + throw e; } - } else { - throw e; - } - } catch (Exception e) { - throw new RuntimeException(e); - } - } + }); } /** @@ -204,46 +161,19 @@ public void executeSelectOverStream( * @throws SQLException : Exception during Query Execution. */ public int executeUpdate(String query) throws SQLException { - int maxRetries = 3; - int retryCount = 0; - long initialDelay = 100; // milliseconds - boolean success = false; - while (retryCount < maxRetries && !success) { - try (Connection connection = borrowConnection(); - Statement statement = connection.createStatement()) { - boolean autoCommit = connection.getAutoCommit(); - connection.setAutoCommit(true); - try { - return statement.executeUpdate(query); - } catch (SQLException e) { - if (isRetryable(e) && retryCount + 1 < maxRetries) { - retryCount++; - long delay = initialDelay * (long) Math.pow(2, retryCount - 1); // Exponential backoff - LOGGER.info( - ("Transient error occurred, retrying in " - + delay - + "ms (attempt " - + retryCount - + "/" - + maxRetries - + "): " - + e.getMessage()), - e); + return withRetries( + () -> { + try (Connection connection = borrowConnection(); + Statement statement = connection.createStatement()) { + boolean autoCommit = connection.getAutoCommit(); + connection.setAutoCommit(true); try { - Thread.sleep(delay); - } catch (InterruptedException ie) { - Thread.currentThread().interrupt(); - throw new RuntimeException("Retry interrupted", ie); + return statement.executeUpdate(query); + } finally { + connection.setAutoCommit(autoCommit); } - } else { - throw e; } - } finally { - connection.setAutoCommit(autoCommit); - } - } - } - throw new RuntimeException("Error executing query: "); + }); } /** @@ -253,55 +183,27 @@ public int executeUpdate(String query) throws SQLException { * @throws SQLException : Exception caught during transaction execution. */ public void runWithinTransaction(TransactionCallback callback) throws SQLException { - int maxRetries = 3; - int retryCount = 0; - long initialDelay = 100; // milliseconds - boolean success = false; - while (retryCount < maxRetries && !success) { - try (Connection connection = borrowConnection()) { - boolean autoCommit = connection.getAutoCommit(); - connection.setAutoCommit(false); - try { - try (Statement statement = connection.createStatement()) { - success = callback.execute(statement); - } catch (SQLException e) { - if (isRetryable(e) && retryCount + 1 < maxRetries) { - retryCount++; - long delay = initialDelay * (long) Math.pow(2, retryCount - 1); // Exponential backoff - LOGGER.info( - ("Transient error occurred, retrying in " - + delay - + "ms (attempt " - + retryCount - + "/" - + maxRetries - + "): " - + e.getMessage()), - e); - try { - Thread.sleep(delay); - } catch (InterruptedException ie) { - Thread.currentThread().interrupt(); - throw new RuntimeException("Retry interrupted", ie); + withRetries( + () -> { + try (Connection connection = borrowConnection()) { + boolean autoCommit = connection.getAutoCommit(); + boolean success = false; + connection.setAutoCommit(false); + try { + try (Statement statement = connection.createStatement()) { + success = callback.execute(statement); + } + } finally { + if (success) { + connection.commit(); + } else { + connection.rollback(); } - } else { - throw e; + connection.setAutoCommit(autoCommit); } } - } finally { - if (success) { - connection.commit(); - } else { - connection.rollback(); - } - connection.setAutoCommit(autoCommit); - } - } - } - - if (!success) { - throw new RuntimeException("Failed to execute transaction"); - } + return null; + }); } private boolean isRetryable(SQLException e) { @@ -318,6 +220,47 @@ private boolean isRetryable(SQLException e) { || e.getMessage().contains("connection reset"); } + public T withRetries(Operation operation) throws SQLException { + int attempts = 0; + // maximum number of retries. + int maxAttempts = relationalJdbcConfiguration.maxRetries().orElse(1); + // How long we should try, since the first attempt. + long maxDuration = relationalJdbcConfiguration.maxDurationInMs().orElse(100L); + // How long to wait before first failure. + long delay = relationalJdbcConfiguration.initialDelayInMs().orElse(100L); + + // maximum time we will retry till. + long maxRetryTime = Instant.now().toEpochMilli() + maxDuration; + + while (attempts < maxAttempts) { + try { + return operation.execute(); + } catch (SQLException e) { + attempts++; + long timeLeft = Math.max((maxRetryTime - Instant.now().toEpochMilli()), 0L); + if (attempts >= maxAttempts || !isRetryable(e) || timeLeft == 0) { + throw e; + } + // Add jitter + long timeToSleep = Math.min(timeLeft, delay + (long) (random.nextDouble() * 0.2 * delay)); + LOGGER.debug("Retrying {} after {} attempts on {}", operation, attempts, e.getMessage(), e); + try { + Thread.sleep(timeToSleep); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Retry interrupted", ie); + } + delay *= 2; // Exponential backoff + } + } + // This should never be reached + return null; + } + + public interface Operation { + T execute() throws SQLException; + } + // Interface for transaction callback public interface TransactionCallback { boolean execute(Statement statement) throws SQLException; diff --git a/extension/persistence/relational-jdbc/src/main/java/org/apache/polaris/extension/persistence/relational/jdbc/JdbcMetaStoreManagerFactory.java b/extension/persistence/relational-jdbc/src/main/java/org/apache/polaris/extension/persistence/relational/jdbc/JdbcMetaStoreManagerFactory.java index f19194f33d..2fc1d4c5f1 100644 --- a/extension/persistence/relational-jdbc/src/main/java/org/apache/polaris/extension/persistence/relational/jdbc/JdbcMetaStoreManagerFactory.java +++ b/extension/persistence/relational-jdbc/src/main/java/org/apache/polaris/extension/persistence/relational/jdbc/JdbcMetaStoreManagerFactory.java @@ -74,6 +74,7 @@ public class JdbcMetaStoreManagerFactory implements MetaStoreManagerFactory { @Inject PolarisStorageIntegrationProvider storageIntegrationProvider; @Inject Instance dataSource; + @Inject RelationalJdbcConfiguration relationalJdbcConfiguration; protected JdbcMetaStoreManagerFactory() {} @@ -108,7 +109,8 @@ private void initializeForRealm( } private DatasourceOperations getDatasourceOperations(boolean isBootstrap) { - DatasourceOperations databaseOperations = new DatasourceOperations(dataSource.get()); + DatasourceOperations databaseOperations = + new DatasourceOperations(dataSource.get(), relationalJdbcConfiguration); if (isBootstrap) { try { DatabaseType databaseType; diff --git a/extension/persistence/relational-jdbc/src/main/java/org/apache/polaris/extension/persistence/relational/jdbc/RelationalJdbcConfiguration.java b/extension/persistence/relational-jdbc/src/main/java/org/apache/polaris/extension/persistence/relational/jdbc/RelationalJdbcConfiguration.java new file mode 100644 index 0000000000..565ff20466 --- /dev/null +++ b/extension/persistence/relational-jdbc/src/main/java/org/apache/polaris/extension/persistence/relational/jdbc/RelationalJdbcConfiguration.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.extension.persistence.relational.jdbc; + +import io.smallrye.config.ConfigMapping; +import java.util.Optional; + +@ConfigMapping(prefix = "polaris.persistence.relational.jdbc") +public interface RelationalJdbcConfiguration { + // max retries before giving up + Optional maxRetries(); + + // max retry duration + Optional maxDurationInMs(); + + // initial delay + Optional initialDelayInMs(); +} diff --git a/extension/persistence/relational-jdbc/src/test/java/org/apache/polaris/extension/persistence/impl/relational/jdbc/AtomicMetastoreManagerWithJdbcBasePersistenceImplTest.java b/extension/persistence/relational-jdbc/src/test/java/org/apache/polaris/extension/persistence/impl/relational/jdbc/AtomicMetastoreManagerWithJdbcBasePersistenceImplTest.java index 92d31d8343..b757847554 100644 --- a/extension/persistence/relational-jdbc/src/test/java/org/apache/polaris/extension/persistence/impl/relational/jdbc/AtomicMetastoreManagerWithJdbcBasePersistenceImplTest.java +++ b/extension/persistence/relational-jdbc/src/test/java/org/apache/polaris/extension/persistence/impl/relational/jdbc/AtomicMetastoreManagerWithJdbcBasePersistenceImplTest.java @@ -22,6 +22,7 @@ import java.sql.SQLException; import java.time.ZoneId; +import java.util.Optional; import javax.sql.DataSource; import org.apache.polaris.core.PolarisCallContext; import org.apache.polaris.core.PolarisDefaultDiagServiceImpl; @@ -33,6 +34,7 @@ import org.apache.polaris.extension.persistence.relational.jdbc.DatabaseType; import org.apache.polaris.extension.persistence.relational.jdbc.DatasourceOperations; import org.apache.polaris.extension.persistence.relational.jdbc.JdbcBasePersistenceImpl; +import org.apache.polaris.extension.persistence.relational.jdbc.RelationalJdbcConfiguration; import org.h2.jdbcx.JdbcConnectionPool; import org.mockito.Mockito; @@ -46,7 +48,8 @@ public static DataSource createH2DataSource() { @Override protected PolarisTestMetaStoreManager createPolarisTestMetaStoreManager() { PolarisDiagnostics diagServices = new PolarisDefaultDiagServiceImpl(); - DatasourceOperations datasourceOperations = new DatasourceOperations(createH2DataSource()); + DatasourceOperations datasourceOperations = + new DatasourceOperations(createH2DataSource(), new H2JdbcConfiguration()); try { datasourceOperations.executeScript( String.format("%s/schema-v1.sql", DatabaseType.H2.getDisplayName())); @@ -67,4 +70,22 @@ protected PolarisTestMetaStoreManager createPolarisTestMetaStoreManager() { new PolarisConfigurationStore() {}, timeSource.withZone(ZoneId.systemDefault()))); } + + private static class H2JdbcConfiguration implements RelationalJdbcConfiguration { + + @Override + public Optional maxRetries() { + return Optional.of(2); + } + + @Override + public Optional maxDurationInMs() { + return Optional.of(100L); + } + + @Override + public Optional initialDelayInMs() { + return Optional.of(100L); + } + } } diff --git a/extension/persistence/relational-jdbc/src/test/java/org/apache/polaris/extension/persistence/impl/relational/jdbc/DatasourceOperationsTest.java b/extension/persistence/relational-jdbc/src/test/java/org/apache/polaris/extension/persistence/impl/relational/jdbc/DatasourceOperationsTest.java index c73cf3fd4b..bbc21d8365 100644 --- a/extension/persistence/relational-jdbc/src/test/java/org/apache/polaris/extension/persistence/impl/relational/jdbc/DatasourceOperationsTest.java +++ b/extension/persistence/relational-jdbc/src/test/java/org/apache/polaris/extension/persistence/impl/relational/jdbc/DatasourceOperationsTest.java @@ -24,8 +24,12 @@ import java.sql.Connection; import java.sql.SQLException; import java.sql.Statement; +import java.time.Instant; +import java.util.Optional; import javax.sql.DataSource; import org.apache.polaris.extension.persistence.relational.jdbc.DatasourceOperations; +import org.apache.polaris.extension.persistence.relational.jdbc.DatasourceOperations.Operation; +import org.apache.polaris.extension.persistence.relational.jdbc.RelationalJdbcConfiguration; import org.apache.polaris.extension.persistence.relational.jdbc.models.ModelEntity; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -41,17 +45,21 @@ public class DatasourceOperationsTest { @Mock private Statement mockStatement; + @Mock private RelationalJdbcConfiguration relationalJdbcConfiguration; + + @Mock Operation mockOperation; + private DatasourceOperations datasourceOperations; @BeforeEach void setUp() throws Exception { - when(mockDataSource.getConnection()).thenReturn(mockConnection); - when(mockConnection.createStatement()).thenReturn(mockStatement); - datasourceOperations = new DatasourceOperations(mockDataSource); + datasourceOperations = new DatasourceOperations(mockDataSource, relationalJdbcConfiguration); } @Test void testExecuteUpdate_success() throws Exception { + when(mockDataSource.getConnection()).thenReturn(mockConnection); + when(mockConnection.createStatement()).thenReturn(mockStatement); String query = "UPDATE users SET active = true"; when(mockStatement.executeUpdate(query)).thenReturn(1); @@ -63,6 +71,8 @@ void testExecuteUpdate_success() throws Exception { @Test void testExecuteUpdate_failure() throws Exception { + when(mockDataSource.getConnection()).thenReturn(mockConnection); + when(mockConnection.createStatement()).thenReturn(mockStatement); String query = "INVALID SQL"; when(mockStatement.executeUpdate(query)).thenThrow(new SQLException("demo", "42P07")); @@ -71,8 +81,10 @@ void testExecuteUpdate_failure() throws Exception { @Test void testExecuteSelect_exception() throws Exception { + when(mockDataSource.getConnection()).thenReturn(mockConnection); + when(mockConnection.createStatement()).thenReturn(mockStatement); String query = "SELECT * FROM users"; - when(mockStatement.executeQuery(query)).thenThrow(new SQLException()); + when(mockStatement.executeQuery(query)).thenThrow(new SQLException("demo", "42P07")); assertThrows( SQLException.class, () -> datasourceOperations.executeSelect(query, new ModelEntity())); @@ -80,6 +92,8 @@ void testExecuteSelect_exception() throws Exception { @Test void testRunWithinTransaction_commit() throws Exception { + when(mockDataSource.getConnection()).thenReturn(mockConnection); + when(mockConnection.createStatement()).thenReturn(mockStatement); DatasourceOperations.TransactionCallback callback = statement -> true; when(mockConnection.getAutoCommit()).thenReturn(true); datasourceOperations.runWithinTransaction(callback); @@ -92,6 +106,8 @@ void testRunWithinTransaction_commit() throws Exception { @Test void testRunWithinTransaction_rollback() throws Exception { + when(mockDataSource.getConnection()).thenReturn(mockConnection); + when(mockConnection.createStatement()).thenReturn(mockStatement); DatasourceOperations.TransactionCallback callback = statement -> false; datasourceOperations.runWithinTransaction(callback); @@ -101,6 +117,8 @@ void testRunWithinTransaction_rollback() throws Exception { @Test void testRunWithinTransaction_exceptionTriggersRollback() throws Exception { + when(mockDataSource.getConnection()).thenReturn(mockConnection); + when(mockConnection.createStatement()).thenReturn(mockStatement); DatasourceOperations.TransactionCallback callback = statement -> { throw new SQLException("Boom"); @@ -110,4 +128,109 @@ void testRunWithinTransaction_exceptionTriggersRollback() throws Exception { verify(mockConnection).rollback(); } + + @Test + void testSuccessfulExecutionOnFirstAttempt() throws SQLException { + when(relationalJdbcConfiguration.maxRetries()).thenReturn(Optional.of(3)); + when(relationalJdbcConfiguration.maxDurationInMs()).thenReturn(Optional.of(1000L)); + when(relationalJdbcConfiguration.initialDelayInMs()).thenReturn(Optional.of(100L)); + when(mockOperation.execute()).thenReturn("Success!"); + + String result = datasourceOperations.withRetries(mockOperation); + assertEquals("Success!", result); + verify(mockOperation, times(1)).execute(); + } + + @Test + void testSuccessfulExecutionAfterOneRetry() throws SQLException { + when(relationalJdbcConfiguration.maxRetries()).thenReturn(Optional.of(3)); + when(relationalJdbcConfiguration.maxDurationInMs()).thenReturn(Optional.of(1000L)); + when(relationalJdbcConfiguration.initialDelayInMs()).thenReturn(Optional.of(100L)); + when(mockOperation.execute()) + .thenThrow( + new SQLException("Retryable error", "40001", new SQLException("Retryable error"))) + .thenReturn("Success!"); + + String result = datasourceOperations.withRetries(mockOperation); + assertEquals("Success!", result); + verify(mockOperation, times(2)).execute(); + } + + @Test + void testRetryAttemptsExceedMaxRetries() throws SQLException { + when(relationalJdbcConfiguration.maxRetries()).thenReturn(Optional.of(2)); + when(relationalJdbcConfiguration.maxDurationInMs()).thenReturn(Optional.of(1000L)); + when(relationalJdbcConfiguration.initialDelayInMs()).thenReturn(Optional.of(100L)); + when(mockOperation.execute()) + .thenThrow( + new SQLException("Retryable error", "40001", new SQLException("Retryable error"))); + + SQLException thrown = + assertThrows(SQLException.class, () -> datasourceOperations.withRetries(mockOperation)); + assertEquals("Retryable error", thrown.getMessage()); + verify(mockOperation, times(2)).execute(); // Tried twice, then threw + } + + @Test + void testRetryAttemptsExceedMaxDuration() throws SQLException { + when(relationalJdbcConfiguration.maxRetries()).thenReturn(Optional.of(10)); + when(relationalJdbcConfiguration.maxDurationInMs()) + .thenReturn(Optional.of(250L)); // Short max duration + when(mockOperation.execute()) + .thenThrow( + new SQLException("Demo Exception", "40001", new SQLException("Retryable error"))); + + long startTime = Instant.now().toEpochMilli(); + assertThrows(SQLException.class, () -> datasourceOperations.withRetries(mockOperation)); + assertTrue((Instant.now().toEpochMilli() - startTime) >= 250); + // The number of executions depends on the timing and jitter, but should be more than 1 + verify(mockOperation, atLeast(2)).execute(); + } + + @Test + void testNonRetryableSQLException() throws SQLException { + when(relationalJdbcConfiguration.maxRetries()).thenReturn(Optional.of(3)); + when(relationalJdbcConfiguration.maxDurationInMs()).thenReturn(Optional.of(1000L)); + when(relationalJdbcConfiguration.initialDelayInMs()).thenReturn(Optional.of(100L)); + when(mockOperation.execute()).thenThrow(new SQLException("NonRetryable error")); + + SQLException thrown = + assertThrows(SQLException.class, () -> datasourceOperations.withRetries(mockOperation)); + assertEquals("NonRetryable error", thrown.getMessage()); + verify(mockOperation, times(1)).execute(); // Should not retry + } + + @Test + void testInterruptedExceptionDuringRetry() throws SQLException, InterruptedException { + when(relationalJdbcConfiguration.maxRetries()).thenReturn(Optional.of(3)); + when(relationalJdbcConfiguration.maxDurationInMs()).thenReturn(Optional.of(1000L)); + when(relationalJdbcConfiguration.initialDelayInMs()).thenReturn(Optional.of(100L)); + when(mockOperation.execute()) + .thenThrow( + new SQLException("Demo Exception", "40001", new SQLException("Retryable error"))); + + Thread.currentThread().interrupt(); // Simulate interruption + + RuntimeException thrown = + assertThrows(RuntimeException.class, () -> datasourceOperations.withRetries(mockOperation)); + assertEquals("Retry interrupted", thrown.getMessage()); + assertTrue(Thread.currentThread().isInterrupted()); + verify(mockOperation, atMost(1)) + .execute(); // Might not even be called if interrupted very early + } + + @Test + void testDefaultConfigurationValues() throws SQLException { + when(relationalJdbcConfiguration.maxRetries()).thenReturn(Optional.empty()); // Defaults to 1 + when(relationalJdbcConfiguration.maxDurationInMs()) + .thenReturn(Optional.empty()); // Defaults to 100 + when(relationalJdbcConfiguration.initialDelayInMs()) + .thenReturn(Optional.empty()); // Defaults to 100 + when(mockOperation.execute()) + .thenThrow( + new SQLException("Demo Exception", "40001", new SQLException("Retryable error"))); + + assertThrows(SQLException.class, () -> datasourceOperations.withRetries(mockOperation)); + verify(mockOperation, times(1)).execute(); + } } diff --git a/quarkus/test-commons/src/main/java/org/apache/polaris/test/commons/PostgresRelationalJdbcLifeCycleManagement.java b/quarkus/test-commons/src/main/java/org/apache/polaris/test/commons/PostgresRelationalJdbcLifeCycleManagement.java index 9dc230722f..b20c6fbdab 100644 --- a/quarkus/test-commons/src/main/java/org/apache/polaris/test/commons/PostgresRelationalJdbcLifeCycleManagement.java +++ b/quarkus/test-commons/src/main/java/org/apache/polaris/test/commons/PostgresRelationalJdbcLifeCycleManagement.java @@ -56,6 +56,8 @@ public Map start() { return Map.of( "polaris.persistence.type", "relational-jdbc", + "polaris.persistence.relational.jdbc.max-retries", + "2", "quarkus.datasource.db-kind", "pgsql", "quarkus.datasource.jdbc.url", From b6ded6249074201634ed15d5e222f1c9f2de08d3 Mon Sep 17 00:00:00 2001 From: Prashant Date: Mon, 5 May 2025 17:32:18 -0700 Subject: [PATCH 03/12] Address partial review feedback --- .../relational/jdbc/DatasourceOperations.java | 16 ++++++++++------ .../jdbc/DatasourceOperationsTest.java | 8 ++++++-- 2 files changed, 16 insertions(+), 8 deletions(-) diff --git a/extension/persistence/relational-jdbc/src/main/java/org/apache/polaris/extension/persistence/relational/jdbc/DatasourceOperations.java b/extension/persistence/relational-jdbc/src/main/java/org/apache/polaris/extension/persistence/relational/jdbc/DatasourceOperations.java index 4cbaacb190..ed5dc79faa 100644 --- a/extension/persistence/relational-jdbc/src/main/java/org/apache/polaris/extension/persistence/relational/jdbc/DatasourceOperations.java +++ b/extension/persistence/relational-jdbc/src/main/java/org/apache/polaris/extension/persistence/relational/jdbc/DatasourceOperations.java @@ -28,7 +28,7 @@ import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; -import java.time.Instant; +import java.time.Clock; import java.util.ArrayList; import java.util.List; import java.util.Objects; @@ -225,24 +225,28 @@ public T withRetries(Operation operation) throws SQLException { // maximum number of retries. int maxAttempts = relationalJdbcConfiguration.maxRetries().orElse(1); // How long we should try, since the first attempt. - long maxDuration = relationalJdbcConfiguration.maxDurationInMs().orElse(100L); + long maxDuration = relationalJdbcConfiguration.maxDurationInMs().orElse(5000L); // How long to wait before first failure. long delay = relationalJdbcConfiguration.initialDelayInMs().orElse(100L); // maximum time we will retry till. - long maxRetryTime = Instant.now().toEpochMilli() + maxDuration; + long maxRetryTime = Clock.systemUTC().millis() + maxDuration; while (attempts < maxAttempts) { try { return operation.execute(); } catch (SQLException e) { attempts++; - long timeLeft = Math.max((maxRetryTime - Instant.now().toEpochMilli()), 0L); + long timeLeft = Math.max((maxRetryTime - Clock.systemUTC().millis()), 0L); if (attempts >= maxAttempts || !isRetryable(e) || timeLeft == 0) { - throw e; + String exceptionMessage = + String.format( + "Failed due to %s, after , %s attempts and %s milliseconds", + e.getMessage(), attempts, maxDuration); + throw new SQLException(exceptionMessage, e.getSQLState(), e.getErrorCode()); } // Add jitter - long timeToSleep = Math.min(timeLeft, delay + (long) (random.nextDouble() * 0.2 * delay)); + long timeToSleep = Math.min(timeLeft, delay + (long) (random.nextFloat() * 0.2 * delay)); LOGGER.debug("Retrying {} after {} attempts on {}", operation, attempts, e.getMessage(), e); try { Thread.sleep(timeToSleep); diff --git a/extension/persistence/relational-jdbc/src/test/java/org/apache/polaris/extension/persistence/impl/relational/jdbc/DatasourceOperationsTest.java b/extension/persistence/relational-jdbc/src/test/java/org/apache/polaris/extension/persistence/impl/relational/jdbc/DatasourceOperationsTest.java index bbc21d8365..32d956c695 100644 --- a/extension/persistence/relational-jdbc/src/test/java/org/apache/polaris/extension/persistence/impl/relational/jdbc/DatasourceOperationsTest.java +++ b/extension/persistence/relational-jdbc/src/test/java/org/apache/polaris/extension/persistence/impl/relational/jdbc/DatasourceOperationsTest.java @@ -167,7 +167,9 @@ void testRetryAttemptsExceedMaxRetries() throws SQLException { SQLException thrown = assertThrows(SQLException.class, () -> datasourceOperations.withRetries(mockOperation)); - assertEquals("Retryable error", thrown.getMessage()); + assertEquals( + "Failed due to Retryable error, after , 2 attempts and 1000 milliseconds", + thrown.getMessage()); verify(mockOperation, times(2)).execute(); // Tried twice, then threw } @@ -196,7 +198,9 @@ void testNonRetryableSQLException() throws SQLException { SQLException thrown = assertThrows(SQLException.class, () -> datasourceOperations.withRetries(mockOperation)); - assertEquals("NonRetryable error", thrown.getMessage()); + assertEquals( + "Failed due to NonRetryable error, after , 1 attempts and 1000 milliseconds", + thrown.getMessage()); verify(mockOperation, times(1)).execute(); // Should not retry } From 3c24f4e090ad0bbefe7667361a7e3571a7c52428 Mon Sep 17 00:00:00 2001 From: Prashant Date: Wed, 7 May 2025 14:33:52 -0700 Subject: [PATCH 04/12] Address review feedback --- .../relational/jdbc/DatasourceOperations.java | 49 +++++++++++++------ .../jdbc/JdbcMetaStoreManagerFactory.java | 4 +- ...anagerWithJdbcBasePersistenceImplTest.java | 4 +- .../jdbc/DatasourceOperationsTest.java | 4 +- 4 files changed, 42 insertions(+), 19 deletions(-) diff --git a/extension/persistence/relational-jdbc/src/main/java/org/apache/polaris/extension/persistence/relational/jdbc/DatasourceOperations.java b/extension/persistence/relational-jdbc/src/main/java/org/apache/polaris/extension/persistence/relational/jdbc/DatasourceOperations.java index ed5dc79faa..45f9c1c459 100644 --- a/extension/persistence/relational-jdbc/src/main/java/org/apache/polaris/extension/persistence/relational/jdbc/DatasourceOperations.java +++ b/extension/persistence/relational-jdbc/src/main/java/org/apache/polaris/extension/persistence/relational/jdbc/DatasourceOperations.java @@ -20,6 +20,7 @@ import static java.nio.charset.StandardCharsets.UTF_8; +import com.google.common.annotations.VisibleForTesting; import jakarta.annotation.Nonnull; import java.io.BufferedReader; import java.io.IOException; @@ -36,6 +37,7 @@ import java.util.function.Consumer; import java.util.stream.Stream; import javax.sql.DataSource; +import org.apache.polaris.core.persistence.EntityAlreadyExistsException; import org.apache.polaris.extension.persistence.relational.jdbc.models.Converter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -47,18 +49,19 @@ public class DatasourceOperations { private static final String CONSTRAINT_VIOLATION_SQL_CODE = "23505"; // POSTGRES RETRYABLE EXCEPTIONS - private static final String DEADLOCK_SQL_CODE = "40P01"; private static final String SERIALIZATION_FAILURE_SQL_CODE = "40001"; private final DataSource datasource; private final RelationalJdbcConfiguration relationalJdbcConfiguration; + private final Clock clock; private final Random random = new Random(); public DatasourceOperations( - DataSource datasource, RelationalJdbcConfiguration relationalJdbcConfiguration) { + DataSource datasource, RelationalJdbcConfiguration relationalJdbcConfiguration, Clock clock) { this.datasource = datasource; this.relationalJdbcConfiguration = relationalJdbcConfiguration; + this.clock = clock; } /** @@ -144,11 +147,6 @@ public void executeSelectOverStream( ResultSetIterator iterator = new ResultSetIterator<>(resultSet, converterInstance); consumer.accept(iterator.toStream()); return null; - } catch (RuntimeException e) { - if (e.getCause() instanceof SQLException ex) { - throw ex; - } - throw e; } }); } @@ -210,9 +208,7 @@ private boolean isRetryable(SQLException e) { String sqlState = e.getSQLState(); if (sqlState != null) { - return sqlState.equals(DEADLOCK_SQL_CODE) - || // Deadlock detected - sqlState.equals(SERIALIZATION_FAILURE_SQL_CODE); // Serialization failure + return sqlState.equals(SERIALIZATION_FAILURE_SQL_CODE); // Serialization failure } // Additionally, one might check for specific error messages or other conditions @@ -220,6 +216,9 @@ private boolean isRetryable(SQLException e) { || e.getMessage().contains("connection reset"); } + // TODO: consider refactoring to use a retry library, inorder to have fair retries + // and more knobs for tuning retry pattern. + @VisibleForTesting public T withRetries(Operation operation) throws SQLException { int attempts = 0; // maximum number of retries. @@ -230,20 +229,38 @@ public T withRetries(Operation operation) throws SQLException { long delay = relationalJdbcConfiguration.initialDelayInMs().orElse(100L); // maximum time we will retry till. - long maxRetryTime = Clock.systemUTC().millis() + maxDuration; + long maxRetryTime = clock.millis() + maxDuration; while (attempts < maxAttempts) { try { return operation.execute(); - } catch (SQLException e) { + } catch (SQLException | RuntimeException e) { + SQLException sqlException; + if (e instanceof RuntimeException) { + // Handle Exceptions from ResultSet Iterator consumer, as it throws a RTE, ignore RTE from + // the transactions. + if (e.getCause() instanceof SQLException + && !(e instanceof EntityAlreadyExistsException)) { + sqlException = (SQLException) e.getCause(); + } else { + throw e; + } + } else { + sqlException = (SQLException) e; + } + attempts++; - long timeLeft = Math.max((maxRetryTime - Clock.systemUTC().millis()), 0L); - if (attempts >= maxAttempts || !isRetryable(e) || timeLeft == 0) { + long timeLeft = Math.max((maxRetryTime - clock.millis()), 0L); + if (attempts >= maxAttempts || !isRetryable(sqlException) || timeLeft == 0) { String exceptionMessage = String.format( "Failed due to %s, after , %s attempts and %s milliseconds", - e.getMessage(), attempts, maxDuration); - throw new SQLException(exceptionMessage, e.getSQLState(), e.getErrorCode()); + sqlException.getMessage(), attempts, maxDuration); + throw new SQLException( + exceptionMessage, + sqlException.getSQLState(), + sqlException.getErrorCode(), + sqlException); } // Add jitter long timeToSleep = Math.min(timeLeft, delay + (long) (random.nextFloat() * 0.2 * delay)); diff --git a/extension/persistence/relational-jdbc/src/main/java/org/apache/polaris/extension/persistence/relational/jdbc/JdbcMetaStoreManagerFactory.java b/extension/persistence/relational-jdbc/src/main/java/org/apache/polaris/extension/persistence/relational/jdbc/JdbcMetaStoreManagerFactory.java index 2fc1d4c5f1..b04938c115 100644 --- a/extension/persistence/relational-jdbc/src/main/java/org/apache/polaris/extension/persistence/relational/jdbc/JdbcMetaStoreManagerFactory.java +++ b/extension/persistence/relational-jdbc/src/main/java/org/apache/polaris/extension/persistence/relational/jdbc/JdbcMetaStoreManagerFactory.java @@ -25,6 +25,7 @@ import jakarta.inject.Inject; import java.sql.Connection; import java.sql.SQLException; +import java.time.Clock; import java.util.HashMap; import java.util.Map; import java.util.function.Supplier; @@ -75,6 +76,7 @@ public class JdbcMetaStoreManagerFactory implements MetaStoreManagerFactory { @Inject PolarisStorageIntegrationProvider storageIntegrationProvider; @Inject Instance dataSource; @Inject RelationalJdbcConfiguration relationalJdbcConfiguration; + @Inject Clock clock; protected JdbcMetaStoreManagerFactory() {} @@ -110,7 +112,7 @@ private void initializeForRealm( private DatasourceOperations getDatasourceOperations(boolean isBootstrap) { DatasourceOperations databaseOperations = - new DatasourceOperations(dataSource.get(), relationalJdbcConfiguration); + new DatasourceOperations(dataSource.get(), relationalJdbcConfiguration, clock); if (isBootstrap) { try { DatabaseType databaseType; diff --git a/extension/persistence/relational-jdbc/src/test/java/org/apache/polaris/extension/persistence/impl/relational/jdbc/AtomicMetastoreManagerWithJdbcBasePersistenceImplTest.java b/extension/persistence/relational-jdbc/src/test/java/org/apache/polaris/extension/persistence/impl/relational/jdbc/AtomicMetastoreManagerWithJdbcBasePersistenceImplTest.java index b757847554..58103380ff 100644 --- a/extension/persistence/relational-jdbc/src/test/java/org/apache/polaris/extension/persistence/impl/relational/jdbc/AtomicMetastoreManagerWithJdbcBasePersistenceImplTest.java +++ b/extension/persistence/relational-jdbc/src/test/java/org/apache/polaris/extension/persistence/impl/relational/jdbc/AtomicMetastoreManagerWithJdbcBasePersistenceImplTest.java @@ -21,6 +21,7 @@ import static org.apache.polaris.core.persistence.PrincipalSecretsGenerator.RANDOM_SECRETS; import java.sql.SQLException; +import java.time.Clock; import java.time.ZoneId; import java.util.Optional; import javax.sql.DataSource; @@ -49,7 +50,8 @@ public static DataSource createH2DataSource() { protected PolarisTestMetaStoreManager createPolarisTestMetaStoreManager() { PolarisDiagnostics diagServices = new PolarisDefaultDiagServiceImpl(); DatasourceOperations datasourceOperations = - new DatasourceOperations(createH2DataSource(), new H2JdbcConfiguration()); + new DatasourceOperations( + createH2DataSource(), new H2JdbcConfiguration(), Clock.systemUTC()); try { datasourceOperations.executeScript( String.format("%s/schema-v1.sql", DatabaseType.H2.getDisplayName())); diff --git a/extension/persistence/relational-jdbc/src/test/java/org/apache/polaris/extension/persistence/impl/relational/jdbc/DatasourceOperationsTest.java b/extension/persistence/relational-jdbc/src/test/java/org/apache/polaris/extension/persistence/impl/relational/jdbc/DatasourceOperationsTest.java index 32d956c695..e25cd151e4 100644 --- a/extension/persistence/relational-jdbc/src/test/java/org/apache/polaris/extension/persistence/impl/relational/jdbc/DatasourceOperationsTest.java +++ b/extension/persistence/relational-jdbc/src/test/java/org/apache/polaris/extension/persistence/impl/relational/jdbc/DatasourceOperationsTest.java @@ -24,6 +24,7 @@ import java.sql.Connection; import java.sql.SQLException; import java.sql.Statement; +import java.time.Clock; import java.time.Instant; import java.util.Optional; import javax.sql.DataSource; @@ -53,7 +54,8 @@ public class DatasourceOperationsTest { @BeforeEach void setUp() throws Exception { - datasourceOperations = new DatasourceOperations(mockDataSource, relationalJdbcConfiguration); + datasourceOperations = + new DatasourceOperations(mockDataSource, relationalJdbcConfiguration, Clock.systemUTC()); } @Test From d7c278a5de8f419121c1c366879d05fb7c59b2c0 Mon Sep 17 00:00:00 2001 From: Prashant Date: Wed, 7 May 2025 17:06:12 -0700 Subject: [PATCH 05/12] Address review feedback --- .../relational/jdbc/DatasourceOperations.java | 31 +++++++++---------- .../jdbc/DatasourceOperationsTest.java | 13 ++++---- 2 files changed, 21 insertions(+), 23 deletions(-) diff --git a/extension/persistence/relational-jdbc/src/main/java/org/apache/polaris/extension/persistence/relational/jdbc/DatasourceOperations.java b/extension/persistence/relational-jdbc/src/main/java/org/apache/polaris/extension/persistence/relational/jdbc/DatasourceOperations.java index 45f9c1c459..e23982378a 100644 --- a/extension/persistence/relational-jdbc/src/main/java/org/apache/polaris/extension/persistence/relational/jdbc/DatasourceOperations.java +++ b/extension/persistence/relational-jdbc/src/main/java/org/apache/polaris/extension/persistence/relational/jdbc/DatasourceOperations.java @@ -30,10 +30,7 @@ import java.sql.SQLException; import java.sql.Statement; import java.time.Clock; -import java.util.ArrayList; -import java.util.List; -import java.util.Objects; -import java.util.Random; +import java.util.*; import java.util.function.Consumer; import java.util.stream.Stream; import javax.sql.DataSource; @@ -188,15 +185,18 @@ public void runWithinTransaction(TransactionCallback callback) throws SQLExcepti boolean success = false; connection.setAutoCommit(false); try { - try (Statement statement = connection.createStatement()) { - success = callback.execute(statement); + try { + try (Statement statement = connection.createStatement()) { + success = callback.execute(statement); + } + } finally { + if (success) { + connection.commit(); + } else { + connection.rollback(); + } } } finally { - if (success) { - connection.commit(); - } else { - connection.rollback(); - } connection.setAutoCommit(autoCommit); } } @@ -212,8 +212,8 @@ private boolean isRetryable(SQLException e) { } // Additionally, one might check for specific error messages or other conditions - return e.getMessage().contains("connection refused") - || e.getMessage().contains("connection reset"); + return e.getMessage().toLowerCase(Locale.ROOT).contains("connection refused") + || e.getMessage().toLowerCase(Locale.ROOT).contains("connection reset"); } // TODO: consider refactoring to use a retry library, inorder to have fair retries @@ -257,10 +257,7 @@ public T withRetries(Operation operation) throws SQLException { "Failed due to %s, after , %s attempts and %s milliseconds", sqlException.getMessage(), attempts, maxDuration); throw new SQLException( - exceptionMessage, - sqlException.getSQLState(), - sqlException.getErrorCode(), - sqlException); + exceptionMessage, sqlException.getSQLState(), sqlException.getErrorCode(), e); } // Add jitter long timeToSleep = Math.min(timeLeft, delay + (long) (random.nextFloat() * 0.2 * delay)); diff --git a/extension/persistence/relational-jdbc/src/test/java/org/apache/polaris/extension/persistence/impl/relational/jdbc/DatasourceOperationsTest.java b/extension/persistence/relational-jdbc/src/test/java/org/apache/polaris/extension/persistence/impl/relational/jdbc/DatasourceOperationsTest.java index e25cd151e4..097d08b9f2 100644 --- a/extension/persistence/relational-jdbc/src/test/java/org/apache/polaris/extension/persistence/impl/relational/jdbc/DatasourceOperationsTest.java +++ b/extension/persistence/relational-jdbc/src/test/java/org/apache/polaris/extension/persistence/impl/relational/jdbc/DatasourceOperationsTest.java @@ -145,17 +145,18 @@ void testSuccessfulExecutionOnFirstAttempt() throws SQLException { @Test void testSuccessfulExecutionAfterOneRetry() throws SQLException { - when(relationalJdbcConfiguration.maxRetries()).thenReturn(Optional.of(3)); - when(relationalJdbcConfiguration.maxDurationInMs()).thenReturn(Optional.of(1000L)); - when(relationalJdbcConfiguration.initialDelayInMs()).thenReturn(Optional.of(100L)); + when(relationalJdbcConfiguration.maxRetries()).thenReturn(Optional.of(4)); + when(relationalJdbcConfiguration.maxDurationInMs()).thenReturn(Optional.of(2000L)); + when(relationalJdbcConfiguration.initialDelayInMs()).thenReturn(Optional.of(0L)); when(mockOperation.execute()) - .thenThrow( - new SQLException("Retryable error", "40001", new SQLException("Retryable error"))) + .thenThrow(new SQLException("Retryable error", "40001")) + .thenThrow(new SQLException("connection refused")) + .thenThrow(new SQLException("connection reset")) .thenReturn("Success!"); String result = datasourceOperations.withRetries(mockOperation); assertEquals("Success!", result); - verify(mockOperation, times(2)).execute(); + verify(mockOperation, times(4)).execute(); } @Test From 44ff7800bd5705867b5e75735306280ae52e68b7 Mon Sep 17 00:00:00 2001 From: Prashant Date: Wed, 7 May 2025 17:07:43 -0700 Subject: [PATCH 06/12] Add a new module --- bom/build.gradle.kts | 1 + .../relational-jdbc/build.gradle.kts | 1 - .../jdbc/RelationalJdbcConfiguration.java | 2 - gradle/projects.main.properties | 1 + quarkus/admin/build.gradle.kts | 1 + quarkus/common/build.gradle.kts | 40 +++++++++++++++++++ .../QuarkusRelationalJdbcConfiguration.java | 27 +++++++++++++ quarkus/service/build.gradle.kts | 1 + 8 files changed, 71 insertions(+), 3 deletions(-) create mode 100644 quarkus/common/build.gradle.kts create mode 100644 quarkus/common/src/main/java/org/apache/polaris/quarkus/common/QuarkusRelationalJdbcConfiguration.java diff --git a/bom/build.gradle.kts b/bom/build.gradle.kts index 4fc33d96c2..5b400f5150 100644 --- a/bom/build.gradle.kts +++ b/bom/build.gradle.kts @@ -46,6 +46,7 @@ dependencies { api(project(":polaris-jpa-model")) api(project(":polaris-quarkus-admin")) + api(project(":polaris-quarkus-common")) api(project(":polaris-quarkus-test-commons")) api(project(":polaris-quarkus-defaults")) api(project(":polaris-quarkus-server")) diff --git a/extension/persistence/relational-jdbc/build.gradle.kts b/extension/persistence/relational-jdbc/build.gradle.kts index beab76550d..c2ddafab66 100644 --- a/extension/persistence/relational-jdbc/build.gradle.kts +++ b/extension/persistence/relational-jdbc/build.gradle.kts @@ -33,7 +33,6 @@ dependencies { compileOnly(libs.jakarta.enterprise.cdi.api) compileOnly(libs.jakarta.inject.api) - compileOnly(libs.smallrye.config.core) // @ConfigMapping implementation(libs.smallrye.common.annotation) // @Identifier diff --git a/extension/persistence/relational-jdbc/src/main/java/org/apache/polaris/extension/persistence/relational/jdbc/RelationalJdbcConfiguration.java b/extension/persistence/relational-jdbc/src/main/java/org/apache/polaris/extension/persistence/relational/jdbc/RelationalJdbcConfiguration.java index 565ff20466..f1cbddbcb5 100644 --- a/extension/persistence/relational-jdbc/src/main/java/org/apache/polaris/extension/persistence/relational/jdbc/RelationalJdbcConfiguration.java +++ b/extension/persistence/relational-jdbc/src/main/java/org/apache/polaris/extension/persistence/relational/jdbc/RelationalJdbcConfiguration.java @@ -18,10 +18,8 @@ */ package org.apache.polaris.extension.persistence.relational.jdbc; -import io.smallrye.config.ConfigMapping; import java.util.Optional; -@ConfigMapping(prefix = "polaris.persistence.relational.jdbc") public interface RelationalJdbcConfiguration { // max retries before giving up Optional maxRetries(); diff --git a/gradle/projects.main.properties b/gradle/projects.main.properties index 6161eee47b..5c1a222d55 100644 --- a/gradle/projects.main.properties +++ b/gradle/projects.main.properties @@ -30,6 +30,7 @@ polaris-quarkus-service=quarkus/service polaris-quarkus-server=quarkus/server polaris-quarkus-spark-tests=quarkus/spark-tests polaris-quarkus-admin=quarkus/admin +polaris-quarkus-common=quarkus/common polaris-quarkus-test-commons=quarkus/test-commons polaris-quarkus-run-script=quarkus/run-script polaris-eclipselink=extension/persistence/eclipselink diff --git a/quarkus/admin/build.gradle.kts b/quarkus/admin/build.gradle.kts index 16560f2b3e..abb4008a4f 100644 --- a/quarkus/admin/build.gradle.kts +++ b/quarkus/admin/build.gradle.kts @@ -54,6 +54,7 @@ dependencies { implementation("io.quarkus:quarkus-picocli") implementation("io.quarkus:quarkus-container-image-docker") + implementation(project(":polaris-quarkus-common")) implementation("org.jboss.slf4j:slf4j-jboss-logmanager") testImplementation(project(":polaris-quarkus-test-commons")) diff --git a/quarkus/common/build.gradle.kts b/quarkus/common/build.gradle.kts new file mode 100644 index 0000000000..f0511b7ed3 --- /dev/null +++ b/quarkus/common/build.gradle.kts @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +plugins { + alias(libs.plugins.jandex) +} + +configurations.all { + exclude(group = "org.antlr", module = "antlr4-runtime") + exclude(group = "org.scala-lang", module = "scala-library") + exclude(group = "org.scala-lang", module = "scala-reflect") +} + +java { + sourceCompatibility = JavaVersion.VERSION_21 + targetCompatibility = JavaVersion.VERSION_21 +} + +dependencies { + implementation(project(":polaris-relational-jdbc")) + + compileOnly(libs.smallrye.config.core) // @ConfigMap + +} \ No newline at end of file diff --git a/quarkus/common/src/main/java/org/apache/polaris/quarkus/common/QuarkusRelationalJdbcConfiguration.java b/quarkus/common/src/main/java/org/apache/polaris/quarkus/common/QuarkusRelationalJdbcConfiguration.java new file mode 100644 index 0000000000..317c66a542 --- /dev/null +++ b/quarkus/common/src/main/java/org/apache/polaris/quarkus/common/QuarkusRelationalJdbcConfiguration.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.polaris.quarkus.common; + +import io.smallrye.config.ConfigMapping; +import org.apache.polaris.extension.persistence.relational.jdbc.RelationalJdbcConfiguration; + +@ConfigMapping(prefix = "polaris.persistence.relational.jdbc") +public interface QuarkusRelationalJdbcConfiguration extends RelationalJdbcConfiguration { +} \ No newline at end of file diff --git a/quarkus/service/build.gradle.kts b/quarkus/service/build.gradle.kts index f08d25d967..0c34075bfb 100644 --- a/quarkus/service/build.gradle.kts +++ b/quarkus/service/build.gradle.kts @@ -40,6 +40,7 @@ dependencies { implementation(platform(libs.opentelemetry.bom)) implementation(platform(libs.quarkus.bom)) + implementation(project(":polaris-quarkus-common")) implementation("io.quarkus:quarkus-logging-json") implementation("io.quarkus:quarkus-rest-jackson") implementation("io.quarkus:quarkus-reactive-routes") From 20767a563b7650b92b96b9d59d5489b0f3f6458c Mon Sep 17 00:00:00 2001 From: Prashant Date: Wed, 7 May 2025 19:40:40 -0700 Subject: [PATCH 07/12] Address review feedbacks --- .../persistence/relational-jdbc/build.gradle.kts | 1 - .../relational/jdbc/DatasourceOperations.java | 8 ++++++-- ...toreManagerWithJdbcBasePersistenceImplTest.java | 6 +----- .../relational/jdbc/DatasourceOperationsTest.java | 14 +++++++++----- .../relational/jdbc/QueryGeneratorTest.java | 3 +-- quarkus/common/build.gradle.kts | 10 +--------- .../jdbc}/QuarkusRelationalJdbcConfiguration.java | 4 ++-- 7 files changed, 20 insertions(+), 26 deletions(-) rename extension/persistence/relational-jdbc/src/test/java/org/apache/polaris/extension/persistence/{impl => }/relational/jdbc/AtomicMetastoreManagerWithJdbcBasePersistenceImplTest.java (88%) rename extension/persistence/relational-jdbc/src/test/java/org/apache/polaris/extension/persistence/{impl => }/relational/jdbc/DatasourceOperationsTest.java (95%) rename extension/persistence/relational-jdbc/src/test/java/org/apache/polaris/extension/persistence/{impl => }/relational/jdbc/QueryGeneratorTest.java (98%) rename quarkus/common/src/main/java/org/apache/polaris/quarkus/common/{ => config/jdbc}/QuarkusRelationalJdbcConfiguration.java (95%) diff --git a/extension/persistence/relational-jdbc/build.gradle.kts b/extension/persistence/relational-jdbc/build.gradle.kts index c2ddafab66..a8a61f60c7 100644 --- a/extension/persistence/relational-jdbc/build.gradle.kts +++ b/extension/persistence/relational-jdbc/build.gradle.kts @@ -31,7 +31,6 @@ dependencies { compileOnly("com.fasterxml.jackson.core:jackson-annotations") compileOnly(libs.jakarta.annotation.api) compileOnly(libs.jakarta.enterprise.cdi.api) - compileOnly(libs.jakarta.inject.api) implementation(libs.smallrye.common.annotation) // @Identifier diff --git a/extension/persistence/relational-jdbc/src/main/java/org/apache/polaris/extension/persistence/relational/jdbc/DatasourceOperations.java b/extension/persistence/relational-jdbc/src/main/java/org/apache/polaris/extension/persistence/relational/jdbc/DatasourceOperations.java index e23982378a..895de322aa 100644 --- a/extension/persistence/relational-jdbc/src/main/java/org/apache/polaris/extension/persistence/relational/jdbc/DatasourceOperations.java +++ b/extension/persistence/relational-jdbc/src/main/java/org/apache/polaris/extension/persistence/relational/jdbc/DatasourceOperations.java @@ -30,7 +30,11 @@ import java.sql.SQLException; import java.sql.Statement; import java.time.Clock; -import java.util.*; +import java.util.ArrayList; +import java.util.List; +import java.util.Locale; +import java.util.Objects; +import java.util.Random; import java.util.function.Consumer; import java.util.stream.Stream; import javax.sql.DataSource; @@ -219,7 +223,7 @@ private boolean isRetryable(SQLException e) { // TODO: consider refactoring to use a retry library, inorder to have fair retries // and more knobs for tuning retry pattern. @VisibleForTesting - public T withRetries(Operation operation) throws SQLException { + T withRetries(Operation operation) throws SQLException { int attempts = 0; // maximum number of retries. int maxAttempts = relationalJdbcConfiguration.maxRetries().orElse(1); diff --git a/extension/persistence/relational-jdbc/src/test/java/org/apache/polaris/extension/persistence/impl/relational/jdbc/AtomicMetastoreManagerWithJdbcBasePersistenceImplTest.java b/extension/persistence/relational-jdbc/src/test/java/org/apache/polaris/extension/persistence/relational/jdbc/AtomicMetastoreManagerWithJdbcBasePersistenceImplTest.java similarity index 88% rename from extension/persistence/relational-jdbc/src/test/java/org/apache/polaris/extension/persistence/impl/relational/jdbc/AtomicMetastoreManagerWithJdbcBasePersistenceImplTest.java rename to extension/persistence/relational-jdbc/src/test/java/org/apache/polaris/extension/persistence/relational/jdbc/AtomicMetastoreManagerWithJdbcBasePersistenceImplTest.java index 58103380ff..27ce17081f 100644 --- a/extension/persistence/relational-jdbc/src/test/java/org/apache/polaris/extension/persistence/impl/relational/jdbc/AtomicMetastoreManagerWithJdbcBasePersistenceImplTest.java +++ b/extension/persistence/relational-jdbc/src/test/java/org/apache/polaris/extension/persistence/relational/jdbc/AtomicMetastoreManagerWithJdbcBasePersistenceImplTest.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.polaris.extension.persistence.impl.relational.jdbc; +package org.apache.polaris.extension.persistence.relational.jdbc; import static org.apache.polaris.core.persistence.PrincipalSecretsGenerator.RANDOM_SECRETS; @@ -32,10 +32,6 @@ import org.apache.polaris.core.persistence.AtomicOperationMetaStoreManager; import org.apache.polaris.core.persistence.BasePolarisMetaStoreManagerTest; import org.apache.polaris.core.persistence.PolarisTestMetaStoreManager; -import org.apache.polaris.extension.persistence.relational.jdbc.DatabaseType; -import org.apache.polaris.extension.persistence.relational.jdbc.DatasourceOperations; -import org.apache.polaris.extension.persistence.relational.jdbc.JdbcBasePersistenceImpl; -import org.apache.polaris.extension.persistence.relational.jdbc.RelationalJdbcConfiguration; import org.h2.jdbcx.JdbcConnectionPool; import org.mockito.Mockito; diff --git a/extension/persistence/relational-jdbc/src/test/java/org/apache/polaris/extension/persistence/impl/relational/jdbc/DatasourceOperationsTest.java b/extension/persistence/relational-jdbc/src/test/java/org/apache/polaris/extension/persistence/relational/jdbc/DatasourceOperationsTest.java similarity index 95% rename from extension/persistence/relational-jdbc/src/test/java/org/apache/polaris/extension/persistence/impl/relational/jdbc/DatasourceOperationsTest.java rename to extension/persistence/relational-jdbc/src/test/java/org/apache/polaris/extension/persistence/relational/jdbc/DatasourceOperationsTest.java index 097d08b9f2..758b85cc4f 100644 --- a/extension/persistence/relational-jdbc/src/test/java/org/apache/polaris/extension/persistence/impl/relational/jdbc/DatasourceOperationsTest.java +++ b/extension/persistence/relational-jdbc/src/test/java/org/apache/polaris/extension/persistence/relational/jdbc/DatasourceOperationsTest.java @@ -16,10 +16,16 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.polaris.extension.persistence.impl.relational.jdbc; +package org.apache.polaris.extension.persistence.relational.jdbc; -import static org.junit.jupiter.api.Assertions.*; -import static org.mockito.Mockito.*; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.atLeast; +import static org.mockito.Mockito.atMost; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import java.sql.Connection; import java.sql.SQLException; @@ -28,9 +34,7 @@ import java.time.Instant; import java.util.Optional; import javax.sql.DataSource; -import org.apache.polaris.extension.persistence.relational.jdbc.DatasourceOperations; import org.apache.polaris.extension.persistence.relational.jdbc.DatasourceOperations.Operation; -import org.apache.polaris.extension.persistence.relational.jdbc.RelationalJdbcConfiguration; import org.apache.polaris.extension.persistence.relational.jdbc.models.ModelEntity; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; diff --git a/extension/persistence/relational-jdbc/src/test/java/org/apache/polaris/extension/persistence/impl/relational/jdbc/QueryGeneratorTest.java b/extension/persistence/relational-jdbc/src/test/java/org/apache/polaris/extension/persistence/relational/jdbc/QueryGeneratorTest.java similarity index 98% rename from extension/persistence/relational-jdbc/src/test/java/org/apache/polaris/extension/persistence/impl/relational/jdbc/QueryGeneratorTest.java rename to extension/persistence/relational-jdbc/src/test/java/org/apache/polaris/extension/persistence/relational/jdbc/QueryGeneratorTest.java index 2f886ac69c..2c25844c62 100644 --- a/extension/persistence/relational-jdbc/src/test/java/org/apache/polaris/extension/persistence/impl/relational/jdbc/QueryGeneratorTest.java +++ b/extension/persistence/relational-jdbc/src/test/java/org/apache/polaris/extension/persistence/relational/jdbc/QueryGeneratorTest.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.polaris.extension.persistence.impl.relational.jdbc; +package org.apache.polaris.extension.persistence.relational.jdbc; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -30,7 +30,6 @@ import java.util.Map; import org.apache.polaris.core.entity.PolarisEntityCore; import org.apache.polaris.core.entity.PolarisEntityId; -import org.apache.polaris.extension.persistence.relational.jdbc.QueryGenerator; import org.apache.polaris.extension.persistence.relational.jdbc.models.ModelEntity; import org.apache.polaris.extension.persistence.relational.jdbc.models.ModelGrantRecord; import org.apache.polaris.extension.persistence.relational.jdbc.models.ModelPrincipalAuthenticationData; diff --git a/quarkus/common/build.gradle.kts b/quarkus/common/build.gradle.kts index f0511b7ed3..adc25d2bd7 100644 --- a/quarkus/common/build.gradle.kts +++ b/quarkus/common/build.gradle.kts @@ -21,20 +21,12 @@ plugins { alias(libs.plugins.jandex) } -configurations.all { - exclude(group = "org.antlr", module = "antlr4-runtime") - exclude(group = "org.scala-lang", module = "scala-library") - exclude(group = "org.scala-lang", module = "scala-reflect") -} - java { sourceCompatibility = JavaVersion.VERSION_21 targetCompatibility = JavaVersion.VERSION_21 } dependencies { + compileOnly(libs.smallrye.config.core) implementation(project(":polaris-relational-jdbc")) - - compileOnly(libs.smallrye.config.core) // @ConfigMap - } \ No newline at end of file diff --git a/quarkus/common/src/main/java/org/apache/polaris/quarkus/common/QuarkusRelationalJdbcConfiguration.java b/quarkus/common/src/main/java/org/apache/polaris/quarkus/common/config/jdbc/QuarkusRelationalJdbcConfiguration.java similarity index 95% rename from quarkus/common/src/main/java/org/apache/polaris/quarkus/common/QuarkusRelationalJdbcConfiguration.java rename to quarkus/common/src/main/java/org/apache/polaris/quarkus/common/config/jdbc/QuarkusRelationalJdbcConfiguration.java index 317c66a542..c3bae77aaf 100644 --- a/quarkus/common/src/main/java/org/apache/polaris/quarkus/common/QuarkusRelationalJdbcConfiguration.java +++ b/quarkus/common/src/main/java/org/apache/polaris/quarkus/common/config/jdbc/QuarkusRelationalJdbcConfiguration.java @@ -17,11 +17,11 @@ * under the License. */ -package org.apache.polaris.quarkus.common; +package org.apache.polaris.quarkus.common.config.jdbc; import io.smallrye.config.ConfigMapping; import org.apache.polaris.extension.persistence.relational.jdbc.RelationalJdbcConfiguration; @ConfigMapping(prefix = "polaris.persistence.relational.jdbc") public interface QuarkusRelationalJdbcConfiguration extends RelationalJdbcConfiguration { -} \ No newline at end of file +} From 24a4ca48386354bf0e17fb73510794d532d1ae90 Mon Sep 17 00:00:00 2001 From: Prashant Date: Thu, 8 May 2025 15:18:02 -0700 Subject: [PATCH 08/12] Address review feedback --- .../relational/jdbc/DatasourceOperations.java | 9 +++------ .../relational/jdbc/JdbcMetaStoreManagerFactory.java | 4 +--- ...cMetastoreManagerWithJdbcBasePersistenceImplTest.java | 4 +--- .../relational/jdbc/DatasourceOperationsTest.java | 4 +--- quarkus/common/build.gradle.kts | 5 ----- quarkus/test-commons/build.gradle.kts | 5 ----- 6 files changed, 6 insertions(+), 25 deletions(-) diff --git a/extension/persistence/relational-jdbc/src/main/java/org/apache/polaris/extension/persistence/relational/jdbc/DatasourceOperations.java b/extension/persistence/relational-jdbc/src/main/java/org/apache/polaris/extension/persistence/relational/jdbc/DatasourceOperations.java index 895de322aa..99064a581f 100644 --- a/extension/persistence/relational-jdbc/src/main/java/org/apache/polaris/extension/persistence/relational/jdbc/DatasourceOperations.java +++ b/extension/persistence/relational-jdbc/src/main/java/org/apache/polaris/extension/persistence/relational/jdbc/DatasourceOperations.java @@ -29,7 +29,6 @@ import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; -import java.time.Clock; import java.util.ArrayList; import java.util.List; import java.util.Locale; @@ -54,15 +53,13 @@ public class DatasourceOperations { private final DataSource datasource; private final RelationalJdbcConfiguration relationalJdbcConfiguration; - private final Clock clock; private final Random random = new Random(); public DatasourceOperations( - DataSource datasource, RelationalJdbcConfiguration relationalJdbcConfiguration, Clock clock) { + DataSource datasource, RelationalJdbcConfiguration relationalJdbcConfiguration) { this.datasource = datasource; this.relationalJdbcConfiguration = relationalJdbcConfiguration; - this.clock = clock; } /** @@ -233,7 +230,7 @@ T withRetries(Operation operation) throws SQLException { long delay = relationalJdbcConfiguration.initialDelayInMs().orElse(100L); // maximum time we will retry till. - long maxRetryTime = clock.millis() + maxDuration; + long maxRetryTime = System.nanoTime() / 1000000 + maxDuration; while (attempts < maxAttempts) { try { @@ -254,7 +251,7 @@ T withRetries(Operation operation) throws SQLException { } attempts++; - long timeLeft = Math.max((maxRetryTime - clock.millis()), 0L); + long timeLeft = Math.max((maxRetryTime - (System.nanoTime() / 1000000)), 0L); if (attempts >= maxAttempts || !isRetryable(sqlException) || timeLeft == 0) { String exceptionMessage = String.format( diff --git a/extension/persistence/relational-jdbc/src/main/java/org/apache/polaris/extension/persistence/relational/jdbc/JdbcMetaStoreManagerFactory.java b/extension/persistence/relational-jdbc/src/main/java/org/apache/polaris/extension/persistence/relational/jdbc/JdbcMetaStoreManagerFactory.java index b04938c115..2fc1d4c5f1 100644 --- a/extension/persistence/relational-jdbc/src/main/java/org/apache/polaris/extension/persistence/relational/jdbc/JdbcMetaStoreManagerFactory.java +++ b/extension/persistence/relational-jdbc/src/main/java/org/apache/polaris/extension/persistence/relational/jdbc/JdbcMetaStoreManagerFactory.java @@ -25,7 +25,6 @@ import jakarta.inject.Inject; import java.sql.Connection; import java.sql.SQLException; -import java.time.Clock; import java.util.HashMap; import java.util.Map; import java.util.function.Supplier; @@ -76,7 +75,6 @@ public class JdbcMetaStoreManagerFactory implements MetaStoreManagerFactory { @Inject PolarisStorageIntegrationProvider storageIntegrationProvider; @Inject Instance dataSource; @Inject RelationalJdbcConfiguration relationalJdbcConfiguration; - @Inject Clock clock; protected JdbcMetaStoreManagerFactory() {} @@ -112,7 +110,7 @@ private void initializeForRealm( private DatasourceOperations getDatasourceOperations(boolean isBootstrap) { DatasourceOperations databaseOperations = - new DatasourceOperations(dataSource.get(), relationalJdbcConfiguration, clock); + new DatasourceOperations(dataSource.get(), relationalJdbcConfiguration); if (isBootstrap) { try { DatabaseType databaseType; diff --git a/extension/persistence/relational-jdbc/src/test/java/org/apache/polaris/extension/persistence/relational/jdbc/AtomicMetastoreManagerWithJdbcBasePersistenceImplTest.java b/extension/persistence/relational-jdbc/src/test/java/org/apache/polaris/extension/persistence/relational/jdbc/AtomicMetastoreManagerWithJdbcBasePersistenceImplTest.java index 27ce17081f..1012aff02d 100644 --- a/extension/persistence/relational-jdbc/src/test/java/org/apache/polaris/extension/persistence/relational/jdbc/AtomicMetastoreManagerWithJdbcBasePersistenceImplTest.java +++ b/extension/persistence/relational-jdbc/src/test/java/org/apache/polaris/extension/persistence/relational/jdbc/AtomicMetastoreManagerWithJdbcBasePersistenceImplTest.java @@ -21,7 +21,6 @@ import static org.apache.polaris.core.persistence.PrincipalSecretsGenerator.RANDOM_SECRETS; import java.sql.SQLException; -import java.time.Clock; import java.time.ZoneId; import java.util.Optional; import javax.sql.DataSource; @@ -46,8 +45,7 @@ public static DataSource createH2DataSource() { protected PolarisTestMetaStoreManager createPolarisTestMetaStoreManager() { PolarisDiagnostics diagServices = new PolarisDefaultDiagServiceImpl(); DatasourceOperations datasourceOperations = - new DatasourceOperations( - createH2DataSource(), new H2JdbcConfiguration(), Clock.systemUTC()); + new DatasourceOperations(createH2DataSource(), new H2JdbcConfiguration()); try { datasourceOperations.executeScript( String.format("%s/schema-v1.sql", DatabaseType.H2.getDisplayName())); diff --git a/extension/persistence/relational-jdbc/src/test/java/org/apache/polaris/extension/persistence/relational/jdbc/DatasourceOperationsTest.java b/extension/persistence/relational-jdbc/src/test/java/org/apache/polaris/extension/persistence/relational/jdbc/DatasourceOperationsTest.java index 758b85cc4f..09600694dd 100644 --- a/extension/persistence/relational-jdbc/src/test/java/org/apache/polaris/extension/persistence/relational/jdbc/DatasourceOperationsTest.java +++ b/extension/persistence/relational-jdbc/src/test/java/org/apache/polaris/extension/persistence/relational/jdbc/DatasourceOperationsTest.java @@ -30,7 +30,6 @@ import java.sql.Connection; import java.sql.SQLException; import java.sql.Statement; -import java.time.Clock; import java.time.Instant; import java.util.Optional; import javax.sql.DataSource; @@ -58,8 +57,7 @@ public class DatasourceOperationsTest { @BeforeEach void setUp() throws Exception { - datasourceOperations = - new DatasourceOperations(mockDataSource, relationalJdbcConfiguration, Clock.systemUTC()); + datasourceOperations = new DatasourceOperations(mockDataSource, relationalJdbcConfiguration); } @Test diff --git a/quarkus/common/build.gradle.kts b/quarkus/common/build.gradle.kts index adc25d2bd7..097cd1905e 100644 --- a/quarkus/common/build.gradle.kts +++ b/quarkus/common/build.gradle.kts @@ -21,11 +21,6 @@ plugins { alias(libs.plugins.jandex) } -java { - sourceCompatibility = JavaVersion.VERSION_21 - targetCompatibility = JavaVersion.VERSION_21 -} - dependencies { compileOnly(libs.smallrye.config.core) implementation(project(":polaris-relational-jdbc")) diff --git a/quarkus/test-commons/build.gradle.kts b/quarkus/test-commons/build.gradle.kts index b3813c90be..1ff46427ff 100644 --- a/quarkus/test-commons/build.gradle.kts +++ b/quarkus/test-commons/build.gradle.kts @@ -28,11 +28,6 @@ configurations.all { exclude(group = "org.scala-lang", module = "scala-reflect") } -java { - sourceCompatibility = JavaVersion.VERSION_21 - targetCompatibility = JavaVersion.VERSION_21 -} - dependencies { implementation(enforcedPlatform(libs.quarkus.bom)) implementation("io.quarkus:quarkus-junit5") From b78034c70228c6d8161c018f5e3e9c59164b4aea Mon Sep 17 00:00:00 2001 From: Prashant Date: Thu, 8 May 2025 16:54:53 -0700 Subject: [PATCH 09/12] Address review feedback --- .../relational/jdbc/DatasourceOperations.java | 17 +++++++++++++---- quarkus/common/build.gradle.kts | 2 +- 2 files changed, 14 insertions(+), 5 deletions(-) diff --git a/extension/persistence/relational-jdbc/src/main/java/org/apache/polaris/extension/persistence/relational/jdbc/DatasourceOperations.java b/extension/persistence/relational-jdbc/src/main/java/org/apache/polaris/extension/persistence/relational/jdbc/DatasourceOperations.java index 99064a581f..337824882a 100644 --- a/extension/persistence/relational-jdbc/src/main/java/org/apache/polaris/extension/persistence/relational/jdbc/DatasourceOperations.java +++ b/extension/persistence/relational-jdbc/src/main/java/org/apache/polaris/extension/persistence/relational/jdbc/DatasourceOperations.java @@ -34,6 +34,7 @@ import java.util.Locale; import java.util.Objects; import java.util.Random; +import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import java.util.stream.Stream; import javax.sql.DataSource; @@ -230,7 +231,7 @@ T withRetries(Operation operation) throws SQLException { long delay = relationalJdbcConfiguration.initialDelayInMs().orElse(100L); // maximum time we will retry till. - long maxRetryTime = System.nanoTime() / 1000000 + maxDuration; + long maxRetryTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime()) + maxDuration; while (attempts < maxAttempts) { try { @@ -251,8 +252,9 @@ T withRetries(Operation operation) throws SQLException { } attempts++; - long timeLeft = Math.max((maxRetryTime - (System.nanoTime() / 1000000)), 0L); - if (attempts >= maxAttempts || !isRetryable(sqlException) || timeLeft == 0) { + long timeLeft = + Math.max((maxRetryTime - TimeUnit.NANOSECONDS.toMillis(System.nanoTime())), 0L); + if (timeLeft == 0 || attempts >= maxAttempts || !isRetryable(sqlException)) { String exceptionMessage = String.format( "Failed due to %s, after , %s attempts and %s milliseconds", @@ -262,7 +264,14 @@ T withRetries(Operation operation) throws SQLException { } // Add jitter long timeToSleep = Math.min(timeLeft, delay + (long) (random.nextFloat() * 0.2 * delay)); - LOGGER.debug("Retrying {} after {} attempts on {}", operation, attempts, e.getMessage(), e); + LOGGER.debug( + "Sleeping {} ms before retrying {} on attempt {} / {}, reason {}", + timeToSleep, + operation, + attempts, + maxAttempts, + e.getMessage(), + e); try { Thread.sleep(timeToSleep); } catch (InterruptedException ie) { diff --git a/quarkus/common/build.gradle.kts b/quarkus/common/build.gradle.kts index 097cd1905e..d53e863df4 100644 --- a/quarkus/common/build.gradle.kts +++ b/quarkus/common/build.gradle.kts @@ -24,4 +24,4 @@ plugins { dependencies { compileOnly(libs.smallrye.config.core) implementation(project(":polaris-relational-jdbc")) -} \ No newline at end of file +} From b7ac3d7cb6b5689a5304cba2fdfa6fe85530a196 Mon Sep 17 00:00:00 2001 From: Prashant Date: Thu, 8 May 2025 17:33:16 -0700 Subject: [PATCH 10/12] address review feedback and build failure --- quarkus/common/build.gradle.kts | 5 +++++ quarkus/test-commons/build.gradle.kts | 5 +++++ 2 files changed, 10 insertions(+) diff --git a/quarkus/common/build.gradle.kts b/quarkus/common/build.gradle.kts index d53e863df4..353e50dfd5 100644 --- a/quarkus/common/build.gradle.kts +++ b/quarkus/common/build.gradle.kts @@ -21,6 +21,11 @@ plugins { alias(libs.plugins.jandex) } +java { + sourceCompatibility = JavaVersion.toVersion(findProperty("javaVersion") as String? ?: JavaVersion.VERSION_21) + targetCompatibility = JavaVersion.toVersion(findProperty("javaVersion") as String? ?: JavaVersion.VERSION_21) +} + dependencies { compileOnly(libs.smallrye.config.core) implementation(project(":polaris-relational-jdbc")) diff --git a/quarkus/test-commons/build.gradle.kts b/quarkus/test-commons/build.gradle.kts index 1ff46427ff..3b78f62148 100644 --- a/quarkus/test-commons/build.gradle.kts +++ b/quarkus/test-commons/build.gradle.kts @@ -28,6 +28,11 @@ configurations.all { exclude(group = "org.scala-lang", module = "scala-reflect") } +java { + sourceCompatibility = JavaVersion.toVersion(findProperty("javaVersion") as String? ?: JavaVersion.VERSION_21) + targetCompatibility = JavaVersion.toVersion(findProperty("javaVersion") as String? ?: JavaVersion.VERSION_21) +} + dependencies { implementation(enforcedPlatform(libs.quarkus.bom)) implementation("io.quarkus:quarkus-junit5") From 13af81c8ca27442b296ae877b3575e6bfc19dfb2 Mon Sep 17 00:00:00 2001 From: Prashant Date: Sun, 11 May 2025 04:47:00 -0700 Subject: [PATCH 11/12] remove java version req from build --- quarkus/common/build.gradle.kts | 13 +++++-------- .../jdbc/QuarkusRelationalJdbcConfiguration.java | 4 +--- quarkus/test-commons/build.gradle.kts | 8 ++------ .../PostgresRelationalJdbcLifeCycleManagement.java | 1 - 4 files changed, 8 insertions(+), 18 deletions(-) diff --git a/quarkus/common/build.gradle.kts b/quarkus/common/build.gradle.kts index 353e50dfd5..07d94328e2 100644 --- a/quarkus/common/build.gradle.kts +++ b/quarkus/common/build.gradle.kts @@ -18,15 +18,12 @@ */ plugins { - alias(libs.plugins.jandex) -} - -java { - sourceCompatibility = JavaVersion.toVersion(findProperty("javaVersion") as String? ?: JavaVersion.VERSION_21) - targetCompatibility = JavaVersion.toVersion(findProperty("javaVersion") as String? ?: JavaVersion.VERSION_21) + alias(libs.plugins.quarkus) + alias(libs.plugins.jandex) + id("polaris-quarkus") } dependencies { - compileOnly(libs.smallrye.config.core) - implementation(project(":polaris-relational-jdbc")) + compileOnly(libs.smallrye.config.core) + implementation(project(":polaris-relational-jdbc")) } diff --git a/quarkus/common/src/main/java/org/apache/polaris/quarkus/common/config/jdbc/QuarkusRelationalJdbcConfiguration.java b/quarkus/common/src/main/java/org/apache/polaris/quarkus/common/config/jdbc/QuarkusRelationalJdbcConfiguration.java index c3bae77aaf..0dbdecf6bf 100644 --- a/quarkus/common/src/main/java/org/apache/polaris/quarkus/common/config/jdbc/QuarkusRelationalJdbcConfiguration.java +++ b/quarkus/common/src/main/java/org/apache/polaris/quarkus/common/config/jdbc/QuarkusRelationalJdbcConfiguration.java @@ -16,12 +16,10 @@ * specific language governing permissions and limitations * under the License. */ - package org.apache.polaris.quarkus.common.config.jdbc; import io.smallrye.config.ConfigMapping; import org.apache.polaris.extension.persistence.relational.jdbc.RelationalJdbcConfiguration; @ConfigMapping(prefix = "polaris.persistence.relational.jdbc") -public interface QuarkusRelationalJdbcConfiguration extends RelationalJdbcConfiguration { -} +public interface QuarkusRelationalJdbcConfiguration extends RelationalJdbcConfiguration {} diff --git a/quarkus/test-commons/build.gradle.kts b/quarkus/test-commons/build.gradle.kts index 3b78f62148..61d26d54cd 100644 --- a/quarkus/test-commons/build.gradle.kts +++ b/quarkus/test-commons/build.gradle.kts @@ -18,8 +18,9 @@ */ plugins { + alias(libs.plugins.quarkus) alias(libs.plugins.jandex) - id("java-test-fixtures") + id("polaris-quarkus") } configurations.all { @@ -28,11 +29,6 @@ configurations.all { exclude(group = "org.scala-lang", module = "scala-reflect") } -java { - sourceCompatibility = JavaVersion.toVersion(findProperty("javaVersion") as String? ?: JavaVersion.VERSION_21) - targetCompatibility = JavaVersion.toVersion(findProperty("javaVersion") as String? ?: JavaVersion.VERSION_21) -} - dependencies { implementation(enforcedPlatform(libs.quarkus.bom)) implementation("io.quarkus:quarkus-junit5") diff --git a/quarkus/test-commons/src/main/java/org/apache/polaris/test/commons/PostgresRelationalJdbcLifeCycleManagement.java b/quarkus/test-commons/src/main/java/org/apache/polaris/test/commons/PostgresRelationalJdbcLifeCycleManagement.java index b20c6fbdab..5f6564609e 100644 --- a/quarkus/test-commons/src/main/java/org/apache/polaris/test/commons/PostgresRelationalJdbcLifeCycleManagement.java +++ b/quarkus/test-commons/src/main/java/org/apache/polaris/test/commons/PostgresRelationalJdbcLifeCycleManagement.java @@ -16,7 +16,6 @@ * specific language governing permissions and limitations * under the License. */ - package org.apache.polaris.test.commons; import io.quarkus.test.common.DevServicesContext; From 0f5cc3360b4e7d677ee10da9c752e557ef192742 Mon Sep 17 00:00:00 2001 From: Prashant Date: Mon, 12 May 2025 09:44:08 -0700 Subject: [PATCH 12/12] rebase --- getting-started/jdbc/docker-compose.yml | 3 +++ 1 file changed, 3 insertions(+) diff --git a/getting-started/jdbc/docker-compose.yml b/getting-started/jdbc/docker-compose.yml index 8b2a2dd769..fbfd427ee2 100644 --- a/getting-started/jdbc/docker-compose.yml +++ b/getting-started/jdbc/docker-compose.yml @@ -32,6 +32,9 @@ services: - JAVA_DEBUG=true - JAVA_DEBUG_PORT=*:5005 - POLARIS_PERSISTENCE_TYPE=relational-jdbc + - POLARIS_PERSISTENCE_RELATIONAL_JDBC_MAX_RETRIES=5 + - POLARIS_PERSISTENCE_RELATIONAL_JDBC_INITIAL_DELAY_IN_MS=100 + - POLARIS_PERSISTENCE_RELATIONAL_JDBC_MAX_DELAY_IN_MS=5000 - QUARKUS_DATASOURCE_DB_KIND=pgsql - QUARKUS_DATASOURCE_JDBC_URL=${QUARKUS_DATASOURCE_JDBC_URL} - QUARKUS_DATASOURCE_USERNAME=${QUARKUS_DATASOURCE_USERNAME}