diff --git a/bom/build.gradle.kts b/bom/build.gradle.kts index 4fc33d96c2..5b400f5150 100644 --- a/bom/build.gradle.kts +++ b/bom/build.gradle.kts @@ -46,6 +46,7 @@ dependencies { api(project(":polaris-jpa-model")) api(project(":polaris-quarkus-admin")) + api(project(":polaris-quarkus-common")) api(project(":polaris-quarkus-test-commons")) api(project(":polaris-quarkus-defaults")) api(project(":polaris-quarkus-server")) diff --git a/extension/persistence/relational-jdbc/src/main/java/org/apache/polaris/extension/persistence/relational/jdbc/DatasourceOperations.java b/extension/persistence/relational-jdbc/src/main/java/org/apache/polaris/extension/persistence/relational/jdbc/DatasourceOperations.java index a8a854ad4e..337824882a 100644 --- a/extension/persistence/relational-jdbc/src/main/java/org/apache/polaris/extension/persistence/relational/jdbc/DatasourceOperations.java +++ b/extension/persistence/relational-jdbc/src/main/java/org/apache/polaris/extension/persistence/relational/jdbc/DatasourceOperations.java @@ -20,6 +20,7 @@ import static java.nio.charset.StandardCharsets.UTF_8; +import com.google.common.annotations.VisibleForTesting; import jakarta.annotation.Nonnull; import java.io.BufferedReader; import java.io.IOException; @@ -30,20 +31,36 @@ import java.sql.Statement; import java.util.ArrayList; import java.util.List; +import java.util.Locale; import java.util.Objects; +import java.util.Random; +import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import java.util.stream.Stream; import javax.sql.DataSource; +import org.apache.polaris.core.persistence.EntityAlreadyExistsException; import org.apache.polaris.extension.persistence.relational.jdbc.models.Converter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class DatasourceOperations { + private static final Logger LOGGER = LoggerFactory.getLogger(DatasourceOperations.class); + private static final String CONSTRAINT_VIOLATION_SQL_CODE = "23505"; + // POSTGRES RETRYABLE EXCEPTIONS + private static final String SERIALIZATION_FAILURE_SQL_CODE = "40001"; + private final DataSource datasource; + private final RelationalJdbcConfiguration relationalJdbcConfiguration; + + private final Random random = new Random(); - public DatasourceOperations(DataSource datasource) { + public DatasourceOperations( + DataSource datasource, RelationalJdbcConfiguration relationalJdbcConfiguration) { this.datasource = datasource; + this.relationalJdbcConfiguration = relationalJdbcConfiguration; } /** @@ -121,22 +138,16 @@ public void executeSelectOverStream( @Nonnull Converter converterInstance, @Nonnull Consumer> consumer) throws SQLException { - try (Connection connection = borrowConnection(); - Statement statement = connection.createStatement(); - ResultSet resultSet = statement.executeQuery(query)) { - ResultSetIterator iterator = new ResultSetIterator<>(resultSet, converterInstance); - consumer.accept(iterator.toStream()); - } catch (SQLException e) { - throw e; - } catch (RuntimeException e) { - if (e.getCause() instanceof SQLException) { - throw (SQLException) e.getCause(); - } else { - throw e; - } - } catch (Exception e) { - throw new RuntimeException(e); - } + withRetries( + () -> { + try (Connection connection = borrowConnection(); + Statement statement = connection.createStatement(); + ResultSet resultSet = statement.executeQuery(query)) { + ResultSetIterator iterator = new ResultSetIterator<>(resultSet, converterInstance); + consumer.accept(iterator.toStream()); + return null; + } + }); } /** @@ -147,16 +158,19 @@ public void executeSelectOverStream( * @throws SQLException : Exception during Query Execution. */ public int executeUpdate(String query) throws SQLException { - try (Connection connection = borrowConnection(); - Statement statement = connection.createStatement()) { - boolean autoCommit = connection.getAutoCommit(); - connection.setAutoCommit(true); - try { - return statement.executeUpdate(query); - } finally { - connection.setAutoCommit(autoCommit); - } - } + return withRetries( + () -> { + try (Connection connection = borrowConnection(); + Statement statement = connection.createStatement()) { + boolean autoCommit = connection.getAutoCommit(); + connection.setAutoCommit(true); + try { + return statement.executeUpdate(query); + } finally { + connection.setAutoCommit(autoCommit); + } + } + }); } /** @@ -166,23 +180,113 @@ public int executeUpdate(String query) throws SQLException { * @throws SQLException : Exception caught during transaction execution. */ public void runWithinTransaction(TransactionCallback callback) throws SQLException { - try (Connection connection = borrowConnection()) { - boolean autoCommit = connection.getAutoCommit(); - connection.setAutoCommit(false); - boolean success = false; + withRetries( + () -> { + try (Connection connection = borrowConnection()) { + boolean autoCommit = connection.getAutoCommit(); + boolean success = false; + connection.setAutoCommit(false); + try { + try { + try (Statement statement = connection.createStatement()) { + success = callback.execute(statement); + } + } finally { + if (success) { + connection.commit(); + } else { + connection.rollback(); + } + } + } finally { + connection.setAutoCommit(autoCommit); + } + } + return null; + }); + } + + private boolean isRetryable(SQLException e) { + String sqlState = e.getSQLState(); + + if (sqlState != null) { + return sqlState.equals(SERIALIZATION_FAILURE_SQL_CODE); // Serialization failure + } + + // Additionally, one might check for specific error messages or other conditions + return e.getMessage().toLowerCase(Locale.ROOT).contains("connection refused") + || e.getMessage().toLowerCase(Locale.ROOT).contains("connection reset"); + } + + // TODO: consider refactoring to use a retry library, inorder to have fair retries + // and more knobs for tuning retry pattern. + @VisibleForTesting + T withRetries(Operation operation) throws SQLException { + int attempts = 0; + // maximum number of retries. + int maxAttempts = relationalJdbcConfiguration.maxRetries().orElse(1); + // How long we should try, since the first attempt. + long maxDuration = relationalJdbcConfiguration.maxDurationInMs().orElse(5000L); + // How long to wait before first failure. + long delay = relationalJdbcConfiguration.initialDelayInMs().orElse(100L); + + // maximum time we will retry till. + long maxRetryTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime()) + maxDuration; + + while (attempts < maxAttempts) { try { - try (Statement statement = connection.createStatement()) { - success = callback.execute(statement); - } - } finally { - if (success) { - connection.commit(); + return operation.execute(); + } catch (SQLException | RuntimeException e) { + SQLException sqlException; + if (e instanceof RuntimeException) { + // Handle Exceptions from ResultSet Iterator consumer, as it throws a RTE, ignore RTE from + // the transactions. + if (e.getCause() instanceof SQLException + && !(e instanceof EntityAlreadyExistsException)) { + sqlException = (SQLException) e.getCause(); + } else { + throw e; + } } else { - connection.rollback(); + sqlException = (SQLException) e; } - connection.setAutoCommit(autoCommit); + + attempts++; + long timeLeft = + Math.max((maxRetryTime - TimeUnit.NANOSECONDS.toMillis(System.nanoTime())), 0L); + if (timeLeft == 0 || attempts >= maxAttempts || !isRetryable(sqlException)) { + String exceptionMessage = + String.format( + "Failed due to %s, after , %s attempts and %s milliseconds", + sqlException.getMessage(), attempts, maxDuration); + throw new SQLException( + exceptionMessage, sqlException.getSQLState(), sqlException.getErrorCode(), e); + } + // Add jitter + long timeToSleep = Math.min(timeLeft, delay + (long) (random.nextFloat() * 0.2 * delay)); + LOGGER.debug( + "Sleeping {} ms before retrying {} on attempt {} / {}, reason {}", + timeToSleep, + operation, + attempts, + maxAttempts, + e.getMessage(), + e); + try { + Thread.sleep(timeToSleep); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Retry interrupted", ie); + } + delay *= 2; // Exponential backoff } } + // This should never be reached + return null; + } + + public interface Operation { + T execute() throws SQLException; } // Interface for transaction callback diff --git a/extension/persistence/relational-jdbc/src/main/java/org/apache/polaris/extension/persistence/relational/jdbc/JdbcMetaStoreManagerFactory.java b/extension/persistence/relational-jdbc/src/main/java/org/apache/polaris/extension/persistence/relational/jdbc/JdbcMetaStoreManagerFactory.java index f19194f33d..2fc1d4c5f1 100644 --- a/extension/persistence/relational-jdbc/src/main/java/org/apache/polaris/extension/persistence/relational/jdbc/JdbcMetaStoreManagerFactory.java +++ b/extension/persistence/relational-jdbc/src/main/java/org/apache/polaris/extension/persistence/relational/jdbc/JdbcMetaStoreManagerFactory.java @@ -74,6 +74,7 @@ public class JdbcMetaStoreManagerFactory implements MetaStoreManagerFactory { @Inject PolarisStorageIntegrationProvider storageIntegrationProvider; @Inject Instance dataSource; + @Inject RelationalJdbcConfiguration relationalJdbcConfiguration; protected JdbcMetaStoreManagerFactory() {} @@ -108,7 +109,8 @@ private void initializeForRealm( } private DatasourceOperations getDatasourceOperations(boolean isBootstrap) { - DatasourceOperations databaseOperations = new DatasourceOperations(dataSource.get()); + DatasourceOperations databaseOperations = + new DatasourceOperations(dataSource.get(), relationalJdbcConfiguration); if (isBootstrap) { try { DatabaseType databaseType; diff --git a/extension/persistence/relational-jdbc/src/main/java/org/apache/polaris/extension/persistence/relational/jdbc/RelationalJdbcConfiguration.java b/extension/persistence/relational-jdbc/src/main/java/org/apache/polaris/extension/persistence/relational/jdbc/RelationalJdbcConfiguration.java new file mode 100644 index 0000000000..f1cbddbcb5 --- /dev/null +++ b/extension/persistence/relational-jdbc/src/main/java/org/apache/polaris/extension/persistence/relational/jdbc/RelationalJdbcConfiguration.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.extension.persistence.relational.jdbc; + +import java.util.Optional; + +public interface RelationalJdbcConfiguration { + // max retries before giving up + Optional maxRetries(); + + // max retry duration + Optional maxDurationInMs(); + + // initial delay + Optional initialDelayInMs(); +} diff --git a/extension/persistence/relational-jdbc/src/test/java/org/apache/polaris/extension/persistence/impl/relational/jdbc/DatasourceOperationsTest.java b/extension/persistence/relational-jdbc/src/test/java/org/apache/polaris/extension/persistence/impl/relational/jdbc/DatasourceOperationsTest.java deleted file mode 100644 index c73cf3fd4b..0000000000 --- a/extension/persistence/relational-jdbc/src/test/java/org/apache/polaris/extension/persistence/impl/relational/jdbc/DatasourceOperationsTest.java +++ /dev/null @@ -1,113 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.polaris.extension.persistence.impl.relational.jdbc; - -import static org.junit.jupiter.api.Assertions.*; -import static org.mockito.Mockito.*; - -import java.sql.Connection; -import java.sql.SQLException; -import java.sql.Statement; -import javax.sql.DataSource; -import org.apache.polaris.extension.persistence.relational.jdbc.DatasourceOperations; -import org.apache.polaris.extension.persistence.relational.jdbc.models.ModelEntity; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; -import org.mockito.Mock; -import org.mockito.junit.jupiter.MockitoExtension; - -@ExtendWith(MockitoExtension.class) -public class DatasourceOperationsTest { - @Mock private DataSource mockDataSource; - - @Mock private Connection mockConnection; - - @Mock private Statement mockStatement; - - private DatasourceOperations datasourceOperations; - - @BeforeEach - void setUp() throws Exception { - when(mockDataSource.getConnection()).thenReturn(mockConnection); - when(mockConnection.createStatement()).thenReturn(mockStatement); - datasourceOperations = new DatasourceOperations(mockDataSource); - } - - @Test - void testExecuteUpdate_success() throws Exception { - String query = "UPDATE users SET active = true"; - when(mockStatement.executeUpdate(query)).thenReturn(1); - - int result = datasourceOperations.executeUpdate(query); - - assertEquals(1, result); - verify(mockStatement).executeUpdate(query); - } - - @Test - void testExecuteUpdate_failure() throws Exception { - String query = "INVALID SQL"; - when(mockStatement.executeUpdate(query)).thenThrow(new SQLException("demo", "42P07")); - - assertThrows(SQLException.class, () -> datasourceOperations.executeUpdate(query)); - } - - @Test - void testExecuteSelect_exception() throws Exception { - String query = "SELECT * FROM users"; - when(mockStatement.executeQuery(query)).thenThrow(new SQLException()); - - assertThrows( - SQLException.class, () -> datasourceOperations.executeSelect(query, new ModelEntity())); - } - - @Test - void testRunWithinTransaction_commit() throws Exception { - DatasourceOperations.TransactionCallback callback = statement -> true; - when(mockConnection.getAutoCommit()).thenReturn(true); - datasourceOperations.runWithinTransaction(callback); - verify(mockConnection).setAutoCommit(true); - verify(mockConnection).setAutoCommit(false); - verify(mockConnection).commit(); - verify(mockConnection).setAutoCommit(true); - verify(mockConnection).close(); - } - - @Test - void testRunWithinTransaction_rollback() throws Exception { - DatasourceOperations.TransactionCallback callback = statement -> false; - - datasourceOperations.runWithinTransaction(callback); - - verify(mockConnection).rollback(); - } - - @Test - void testRunWithinTransaction_exceptionTriggersRollback() throws Exception { - DatasourceOperations.TransactionCallback callback = - statement -> { - throw new SQLException("Boom"); - }; - - assertThrows(SQLException.class, () -> datasourceOperations.runWithinTransaction(callback)); - - verify(mockConnection).rollback(); - } -} diff --git a/extension/persistence/relational-jdbc/src/test/java/org/apache/polaris/extension/persistence/impl/relational/jdbc/AtomicMetastoreManagerWithJdbcBasePersistenceImplTest.java b/extension/persistence/relational-jdbc/src/test/java/org/apache/polaris/extension/persistence/relational/jdbc/AtomicMetastoreManagerWithJdbcBasePersistenceImplTest.java similarity index 81% rename from extension/persistence/relational-jdbc/src/test/java/org/apache/polaris/extension/persistence/impl/relational/jdbc/AtomicMetastoreManagerWithJdbcBasePersistenceImplTest.java rename to extension/persistence/relational-jdbc/src/test/java/org/apache/polaris/extension/persistence/relational/jdbc/AtomicMetastoreManagerWithJdbcBasePersistenceImplTest.java index 92d31d8343..1012aff02d 100644 --- a/extension/persistence/relational-jdbc/src/test/java/org/apache/polaris/extension/persistence/impl/relational/jdbc/AtomicMetastoreManagerWithJdbcBasePersistenceImplTest.java +++ b/extension/persistence/relational-jdbc/src/test/java/org/apache/polaris/extension/persistence/relational/jdbc/AtomicMetastoreManagerWithJdbcBasePersistenceImplTest.java @@ -16,12 +16,13 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.polaris.extension.persistence.impl.relational.jdbc; +package org.apache.polaris.extension.persistence.relational.jdbc; import static org.apache.polaris.core.persistence.PrincipalSecretsGenerator.RANDOM_SECRETS; import java.sql.SQLException; import java.time.ZoneId; +import java.util.Optional; import javax.sql.DataSource; import org.apache.polaris.core.PolarisCallContext; import org.apache.polaris.core.PolarisDefaultDiagServiceImpl; @@ -30,9 +31,6 @@ import org.apache.polaris.core.persistence.AtomicOperationMetaStoreManager; import org.apache.polaris.core.persistence.BasePolarisMetaStoreManagerTest; import org.apache.polaris.core.persistence.PolarisTestMetaStoreManager; -import org.apache.polaris.extension.persistence.relational.jdbc.DatabaseType; -import org.apache.polaris.extension.persistence.relational.jdbc.DatasourceOperations; -import org.apache.polaris.extension.persistence.relational.jdbc.JdbcBasePersistenceImpl; import org.h2.jdbcx.JdbcConnectionPool; import org.mockito.Mockito; @@ -46,7 +44,8 @@ public static DataSource createH2DataSource() { @Override protected PolarisTestMetaStoreManager createPolarisTestMetaStoreManager() { PolarisDiagnostics diagServices = new PolarisDefaultDiagServiceImpl(); - DatasourceOperations datasourceOperations = new DatasourceOperations(createH2DataSource()); + DatasourceOperations datasourceOperations = + new DatasourceOperations(createH2DataSource(), new H2JdbcConfiguration()); try { datasourceOperations.executeScript( String.format("%s/schema-v1.sql", DatabaseType.H2.getDisplayName())); @@ -67,4 +66,22 @@ protected PolarisTestMetaStoreManager createPolarisTestMetaStoreManager() { new PolarisConfigurationStore() {}, timeSource.withZone(ZoneId.systemDefault()))); } + + private static class H2JdbcConfiguration implements RelationalJdbcConfiguration { + + @Override + public Optional maxRetries() { + return Optional.of(2); + } + + @Override + public Optional maxDurationInMs() { + return Optional.of(100L); + } + + @Override + public Optional initialDelayInMs() { + return Optional.of(100L); + } + } } diff --git a/extension/persistence/relational-jdbc/src/test/java/org/apache/polaris/extension/persistence/relational/jdbc/DatasourceOperationsTest.java b/extension/persistence/relational-jdbc/src/test/java/org/apache/polaris/extension/persistence/relational/jdbc/DatasourceOperationsTest.java new file mode 100644 index 0000000000..09600694dd --- /dev/null +++ b/extension/persistence/relational-jdbc/src/test/java/org/apache/polaris/extension/persistence/relational/jdbc/DatasourceOperationsTest.java @@ -0,0 +1,245 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.extension.persistence.relational.jdbc; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.atLeast; +import static org.mockito.Mockito.atMost; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.sql.Connection; +import java.sql.SQLException; +import java.sql.Statement; +import java.time.Instant; +import java.util.Optional; +import javax.sql.DataSource; +import org.apache.polaris.extension.persistence.relational.jdbc.DatasourceOperations.Operation; +import org.apache.polaris.extension.persistence.relational.jdbc.models.ModelEntity; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +public class DatasourceOperationsTest { + @Mock private DataSource mockDataSource; + + @Mock private Connection mockConnection; + + @Mock private Statement mockStatement; + + @Mock private RelationalJdbcConfiguration relationalJdbcConfiguration; + + @Mock Operation mockOperation; + + private DatasourceOperations datasourceOperations; + + @BeforeEach + void setUp() throws Exception { + datasourceOperations = new DatasourceOperations(mockDataSource, relationalJdbcConfiguration); + } + + @Test + void testExecuteUpdate_success() throws Exception { + when(mockDataSource.getConnection()).thenReturn(mockConnection); + when(mockConnection.createStatement()).thenReturn(mockStatement); + String query = "UPDATE users SET active = true"; + when(mockStatement.executeUpdate(query)).thenReturn(1); + + int result = datasourceOperations.executeUpdate(query); + + assertEquals(1, result); + verify(mockStatement).executeUpdate(query); + } + + @Test + void testExecuteUpdate_failure() throws Exception { + when(mockDataSource.getConnection()).thenReturn(mockConnection); + when(mockConnection.createStatement()).thenReturn(mockStatement); + String query = "INVALID SQL"; + when(mockStatement.executeUpdate(query)).thenThrow(new SQLException("demo", "42P07")); + + assertThrows(SQLException.class, () -> datasourceOperations.executeUpdate(query)); + } + + @Test + void testExecuteSelect_exception() throws Exception { + when(mockDataSource.getConnection()).thenReturn(mockConnection); + when(mockConnection.createStatement()).thenReturn(mockStatement); + String query = "SELECT * FROM users"; + when(mockStatement.executeQuery(query)).thenThrow(new SQLException("demo", "42P07")); + + assertThrows( + SQLException.class, () -> datasourceOperations.executeSelect(query, new ModelEntity())); + } + + @Test + void testRunWithinTransaction_commit() throws Exception { + when(mockDataSource.getConnection()).thenReturn(mockConnection); + when(mockConnection.createStatement()).thenReturn(mockStatement); + DatasourceOperations.TransactionCallback callback = statement -> true; + when(mockConnection.getAutoCommit()).thenReturn(true); + datasourceOperations.runWithinTransaction(callback); + verify(mockConnection).setAutoCommit(true); + verify(mockConnection).setAutoCommit(false); + verify(mockConnection).commit(); + verify(mockConnection).setAutoCommit(true); + verify(mockConnection).close(); + } + + @Test + void testRunWithinTransaction_rollback() throws Exception { + when(mockDataSource.getConnection()).thenReturn(mockConnection); + when(mockConnection.createStatement()).thenReturn(mockStatement); + DatasourceOperations.TransactionCallback callback = statement -> false; + + datasourceOperations.runWithinTransaction(callback); + + verify(mockConnection).rollback(); + } + + @Test + void testRunWithinTransaction_exceptionTriggersRollback() throws Exception { + when(mockDataSource.getConnection()).thenReturn(mockConnection); + when(mockConnection.createStatement()).thenReturn(mockStatement); + DatasourceOperations.TransactionCallback callback = + statement -> { + throw new SQLException("Boom"); + }; + + assertThrows(SQLException.class, () -> datasourceOperations.runWithinTransaction(callback)); + + verify(mockConnection).rollback(); + } + + @Test + void testSuccessfulExecutionOnFirstAttempt() throws SQLException { + when(relationalJdbcConfiguration.maxRetries()).thenReturn(Optional.of(3)); + when(relationalJdbcConfiguration.maxDurationInMs()).thenReturn(Optional.of(1000L)); + when(relationalJdbcConfiguration.initialDelayInMs()).thenReturn(Optional.of(100L)); + when(mockOperation.execute()).thenReturn("Success!"); + + String result = datasourceOperations.withRetries(mockOperation); + assertEquals("Success!", result); + verify(mockOperation, times(1)).execute(); + } + + @Test + void testSuccessfulExecutionAfterOneRetry() throws SQLException { + when(relationalJdbcConfiguration.maxRetries()).thenReturn(Optional.of(4)); + when(relationalJdbcConfiguration.maxDurationInMs()).thenReturn(Optional.of(2000L)); + when(relationalJdbcConfiguration.initialDelayInMs()).thenReturn(Optional.of(0L)); + when(mockOperation.execute()) + .thenThrow(new SQLException("Retryable error", "40001")) + .thenThrow(new SQLException("connection refused")) + .thenThrow(new SQLException("connection reset")) + .thenReturn("Success!"); + + String result = datasourceOperations.withRetries(mockOperation); + assertEquals("Success!", result); + verify(mockOperation, times(4)).execute(); + } + + @Test + void testRetryAttemptsExceedMaxRetries() throws SQLException { + when(relationalJdbcConfiguration.maxRetries()).thenReturn(Optional.of(2)); + when(relationalJdbcConfiguration.maxDurationInMs()).thenReturn(Optional.of(1000L)); + when(relationalJdbcConfiguration.initialDelayInMs()).thenReturn(Optional.of(100L)); + when(mockOperation.execute()) + .thenThrow( + new SQLException("Retryable error", "40001", new SQLException("Retryable error"))); + + SQLException thrown = + assertThrows(SQLException.class, () -> datasourceOperations.withRetries(mockOperation)); + assertEquals( + "Failed due to Retryable error, after , 2 attempts and 1000 milliseconds", + thrown.getMessage()); + verify(mockOperation, times(2)).execute(); // Tried twice, then threw + } + + @Test + void testRetryAttemptsExceedMaxDuration() throws SQLException { + when(relationalJdbcConfiguration.maxRetries()).thenReturn(Optional.of(10)); + when(relationalJdbcConfiguration.maxDurationInMs()) + .thenReturn(Optional.of(250L)); // Short max duration + when(mockOperation.execute()) + .thenThrow( + new SQLException("Demo Exception", "40001", new SQLException("Retryable error"))); + + long startTime = Instant.now().toEpochMilli(); + assertThrows(SQLException.class, () -> datasourceOperations.withRetries(mockOperation)); + assertTrue((Instant.now().toEpochMilli() - startTime) >= 250); + // The number of executions depends on the timing and jitter, but should be more than 1 + verify(mockOperation, atLeast(2)).execute(); + } + + @Test + void testNonRetryableSQLException() throws SQLException { + when(relationalJdbcConfiguration.maxRetries()).thenReturn(Optional.of(3)); + when(relationalJdbcConfiguration.maxDurationInMs()).thenReturn(Optional.of(1000L)); + when(relationalJdbcConfiguration.initialDelayInMs()).thenReturn(Optional.of(100L)); + when(mockOperation.execute()).thenThrow(new SQLException("NonRetryable error")); + + SQLException thrown = + assertThrows(SQLException.class, () -> datasourceOperations.withRetries(mockOperation)); + assertEquals( + "Failed due to NonRetryable error, after , 1 attempts and 1000 milliseconds", + thrown.getMessage()); + verify(mockOperation, times(1)).execute(); // Should not retry + } + + @Test + void testInterruptedExceptionDuringRetry() throws SQLException, InterruptedException { + when(relationalJdbcConfiguration.maxRetries()).thenReturn(Optional.of(3)); + when(relationalJdbcConfiguration.maxDurationInMs()).thenReturn(Optional.of(1000L)); + when(relationalJdbcConfiguration.initialDelayInMs()).thenReturn(Optional.of(100L)); + when(mockOperation.execute()) + .thenThrow( + new SQLException("Demo Exception", "40001", new SQLException("Retryable error"))); + + Thread.currentThread().interrupt(); // Simulate interruption + + RuntimeException thrown = + assertThrows(RuntimeException.class, () -> datasourceOperations.withRetries(mockOperation)); + assertEquals("Retry interrupted", thrown.getMessage()); + assertTrue(Thread.currentThread().isInterrupted()); + verify(mockOperation, atMost(1)) + .execute(); // Might not even be called if interrupted very early + } + + @Test + void testDefaultConfigurationValues() throws SQLException { + when(relationalJdbcConfiguration.maxRetries()).thenReturn(Optional.empty()); // Defaults to 1 + when(relationalJdbcConfiguration.maxDurationInMs()) + .thenReturn(Optional.empty()); // Defaults to 100 + when(relationalJdbcConfiguration.initialDelayInMs()) + .thenReturn(Optional.empty()); // Defaults to 100 + when(mockOperation.execute()) + .thenThrow( + new SQLException("Demo Exception", "40001", new SQLException("Retryable error"))); + + assertThrows(SQLException.class, () -> datasourceOperations.withRetries(mockOperation)); + verify(mockOperation, times(1)).execute(); + } +} diff --git a/extension/persistence/relational-jdbc/src/test/java/org/apache/polaris/extension/persistence/impl/relational/jdbc/QueryGeneratorTest.java b/extension/persistence/relational-jdbc/src/test/java/org/apache/polaris/extension/persistence/relational/jdbc/QueryGeneratorTest.java similarity index 98% rename from extension/persistence/relational-jdbc/src/test/java/org/apache/polaris/extension/persistence/impl/relational/jdbc/QueryGeneratorTest.java rename to extension/persistence/relational-jdbc/src/test/java/org/apache/polaris/extension/persistence/relational/jdbc/QueryGeneratorTest.java index 2f886ac69c..2c25844c62 100644 --- a/extension/persistence/relational-jdbc/src/test/java/org/apache/polaris/extension/persistence/impl/relational/jdbc/QueryGeneratorTest.java +++ b/extension/persistence/relational-jdbc/src/test/java/org/apache/polaris/extension/persistence/relational/jdbc/QueryGeneratorTest.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.polaris.extension.persistence.impl.relational.jdbc; +package org.apache.polaris.extension.persistence.relational.jdbc; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -30,7 +30,6 @@ import java.util.Map; import org.apache.polaris.core.entity.PolarisEntityCore; import org.apache.polaris.core.entity.PolarisEntityId; -import org.apache.polaris.extension.persistence.relational.jdbc.QueryGenerator; import org.apache.polaris.extension.persistence.relational.jdbc.models.ModelEntity; import org.apache.polaris.extension.persistence.relational.jdbc.models.ModelGrantRecord; import org.apache.polaris.extension.persistence.relational.jdbc.models.ModelPrincipalAuthenticationData; diff --git a/getting-started/jdbc/docker-compose.yml b/getting-started/jdbc/docker-compose.yml index 8b2a2dd769..fbfd427ee2 100644 --- a/getting-started/jdbc/docker-compose.yml +++ b/getting-started/jdbc/docker-compose.yml @@ -32,6 +32,9 @@ services: - JAVA_DEBUG=true - JAVA_DEBUG_PORT=*:5005 - POLARIS_PERSISTENCE_TYPE=relational-jdbc + - POLARIS_PERSISTENCE_RELATIONAL_JDBC_MAX_RETRIES=5 + - POLARIS_PERSISTENCE_RELATIONAL_JDBC_INITIAL_DELAY_IN_MS=100 + - POLARIS_PERSISTENCE_RELATIONAL_JDBC_MAX_DELAY_IN_MS=5000 - QUARKUS_DATASOURCE_DB_KIND=pgsql - QUARKUS_DATASOURCE_JDBC_URL=${QUARKUS_DATASOURCE_JDBC_URL} - QUARKUS_DATASOURCE_USERNAME=${QUARKUS_DATASOURCE_USERNAME} diff --git a/gradle/projects.main.properties b/gradle/projects.main.properties index 6161eee47b..5c1a222d55 100644 --- a/gradle/projects.main.properties +++ b/gradle/projects.main.properties @@ -30,6 +30,7 @@ polaris-quarkus-service=quarkus/service polaris-quarkus-server=quarkus/server polaris-quarkus-spark-tests=quarkus/spark-tests polaris-quarkus-admin=quarkus/admin +polaris-quarkus-common=quarkus/common polaris-quarkus-test-commons=quarkus/test-commons polaris-quarkus-run-script=quarkus/run-script polaris-eclipselink=extension/persistence/eclipselink diff --git a/quarkus/admin/build.gradle.kts b/quarkus/admin/build.gradle.kts index 16560f2b3e..abb4008a4f 100644 --- a/quarkus/admin/build.gradle.kts +++ b/quarkus/admin/build.gradle.kts @@ -54,6 +54,7 @@ dependencies { implementation("io.quarkus:quarkus-picocli") implementation("io.quarkus:quarkus-container-image-docker") + implementation(project(":polaris-quarkus-common")) implementation("org.jboss.slf4j:slf4j-jboss-logmanager") testImplementation(project(":polaris-quarkus-test-commons")) diff --git a/quarkus/common/build.gradle.kts b/quarkus/common/build.gradle.kts new file mode 100644 index 0000000000..07d94328e2 --- /dev/null +++ b/quarkus/common/build.gradle.kts @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +plugins { + alias(libs.plugins.quarkus) + alias(libs.plugins.jandex) + id("polaris-quarkus") +} + +dependencies { + compileOnly(libs.smallrye.config.core) + implementation(project(":polaris-relational-jdbc")) +} diff --git a/quarkus/common/src/main/java/org/apache/polaris/quarkus/common/config/jdbc/QuarkusRelationalJdbcConfiguration.java b/quarkus/common/src/main/java/org/apache/polaris/quarkus/common/config/jdbc/QuarkusRelationalJdbcConfiguration.java new file mode 100644 index 0000000000..0dbdecf6bf --- /dev/null +++ b/quarkus/common/src/main/java/org/apache/polaris/quarkus/common/config/jdbc/QuarkusRelationalJdbcConfiguration.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.quarkus.common.config.jdbc; + +import io.smallrye.config.ConfigMapping; +import org.apache.polaris.extension.persistence.relational.jdbc.RelationalJdbcConfiguration; + +@ConfigMapping(prefix = "polaris.persistence.relational.jdbc") +public interface QuarkusRelationalJdbcConfiguration extends RelationalJdbcConfiguration {} diff --git a/quarkus/service/build.gradle.kts b/quarkus/service/build.gradle.kts index f08d25d967..0c34075bfb 100644 --- a/quarkus/service/build.gradle.kts +++ b/quarkus/service/build.gradle.kts @@ -40,6 +40,7 @@ dependencies { implementation(platform(libs.opentelemetry.bom)) implementation(platform(libs.quarkus.bom)) + implementation(project(":polaris-quarkus-common")) implementation("io.quarkus:quarkus-logging-json") implementation("io.quarkus:quarkus-rest-jackson") implementation("io.quarkus:quarkus-reactive-routes") diff --git a/quarkus/test-commons/build.gradle.kts b/quarkus/test-commons/build.gradle.kts index b3813c90be..61d26d54cd 100644 --- a/quarkus/test-commons/build.gradle.kts +++ b/quarkus/test-commons/build.gradle.kts @@ -18,8 +18,9 @@ */ plugins { + alias(libs.plugins.quarkus) alias(libs.plugins.jandex) - id("java-test-fixtures") + id("polaris-quarkus") } configurations.all { @@ -28,11 +29,6 @@ configurations.all { exclude(group = "org.scala-lang", module = "scala-reflect") } -java { - sourceCompatibility = JavaVersion.VERSION_21 - targetCompatibility = JavaVersion.VERSION_21 -} - dependencies { implementation(enforcedPlatform(libs.quarkus.bom)) implementation("io.quarkus:quarkus-junit5") diff --git a/quarkus/test-commons/src/main/java/org/apache/polaris/test/commons/PostgresRelationalJdbcLifeCycleManagement.java b/quarkus/test-commons/src/main/java/org/apache/polaris/test/commons/PostgresRelationalJdbcLifeCycleManagement.java index 9dc230722f..5f6564609e 100644 --- a/quarkus/test-commons/src/main/java/org/apache/polaris/test/commons/PostgresRelationalJdbcLifeCycleManagement.java +++ b/quarkus/test-commons/src/main/java/org/apache/polaris/test/commons/PostgresRelationalJdbcLifeCycleManagement.java @@ -16,7 +16,6 @@ * specific language governing permissions and limitations * under the License. */ - package org.apache.polaris.test.commons; import io.quarkus.test.common.DevServicesContext; @@ -56,6 +55,8 @@ public Map start() { return Map.of( "polaris.persistence.type", "relational-jdbc", + "polaris.persistence.relational.jdbc.max-retries", + "2", "quarkus.datasource.db-kind", "pgsql", "quarkus.datasource.jdbc.url",