Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions bom/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ dependencies {
api(project(":polaris-jpa-model"))

api(project(":polaris-quarkus-admin"))
api(project(":polaris-quarkus-common"))
api(project(":polaris-quarkus-test-commons"))
api(project(":polaris-quarkus-defaults"))
api(project(":polaris-quarkus-server"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import static java.nio.charset.StandardCharsets.UTF_8;

import com.google.common.annotations.VisibleForTesting;
import jakarta.annotation.Nonnull;
import java.io.BufferedReader;
import java.io.IOException;
Expand All @@ -30,20 +31,36 @@
import java.sql.Statement;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import java.util.Objects;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.Stream;
import javax.sql.DataSource;
import org.apache.polaris.core.persistence.EntityAlreadyExistsException;
import org.apache.polaris.extension.persistence.relational.jdbc.models.Converter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DatasourceOperations {

private static final Logger LOGGER = LoggerFactory.getLogger(DatasourceOperations.class);

private static final String CONSTRAINT_VIOLATION_SQL_CODE = "23505";

// POSTGRES RETRYABLE EXCEPTIONS
private static final String SERIALIZATION_FAILURE_SQL_CODE = "40001";

private final DataSource datasource;
private final RelationalJdbcConfiguration relationalJdbcConfiguration;

private final Random random = new Random();

public DatasourceOperations(DataSource datasource) {
public DatasourceOperations(
DataSource datasource, RelationalJdbcConfiguration relationalJdbcConfiguration) {
this.datasource = datasource;
this.relationalJdbcConfiguration = relationalJdbcConfiguration;
}

/**
Expand Down Expand Up @@ -121,22 +138,16 @@ public <T> void executeSelectOverStream(
@Nonnull Converter<T> converterInstance,
@Nonnull Consumer<Stream<T>> consumer)
throws SQLException {
try (Connection connection = borrowConnection();
Statement statement = connection.createStatement();
ResultSet resultSet = statement.executeQuery(query)) {
ResultSetIterator<T> iterator = new ResultSetIterator<>(resultSet, converterInstance);
consumer.accept(iterator.toStream());
} catch (SQLException e) {
throw e;
} catch (RuntimeException e) {
if (e.getCause() instanceof SQLException) {
throw (SQLException) e.getCause();
} else {
throw e;
}
} catch (Exception e) {
throw new RuntimeException(e);
}
withRetries(
() -> {
try (Connection connection = borrowConnection();
Statement statement = connection.createStatement();
ResultSet resultSet = statement.executeQuery(query)) {
ResultSetIterator<T> iterator = new ResultSetIterator<>(resultSet, converterInstance);
consumer.accept(iterator.toStream());
return null;
}
});
}

/**
Expand All @@ -147,16 +158,19 @@ public <T> void executeSelectOverStream(
* @throws SQLException : Exception during Query Execution.
*/
public int executeUpdate(String query) throws SQLException {
try (Connection connection = borrowConnection();
Statement statement = connection.createStatement()) {
boolean autoCommit = connection.getAutoCommit();
connection.setAutoCommit(true);
try {
return statement.executeUpdate(query);
} finally {
connection.setAutoCommit(autoCommit);
}
}
return withRetries(
() -> {
try (Connection connection = borrowConnection();
Statement statement = connection.createStatement()) {
boolean autoCommit = connection.getAutoCommit();
connection.setAutoCommit(true);
try {
return statement.executeUpdate(query);
} finally {
connection.setAutoCommit(autoCommit);
}
}
});
}

/**
Expand All @@ -166,23 +180,113 @@ public int executeUpdate(String query) throws SQLException {
* @throws SQLException : Exception caught during transaction execution.
*/
public void runWithinTransaction(TransactionCallback callback) throws SQLException {
try (Connection connection = borrowConnection()) {
boolean autoCommit = connection.getAutoCommit();
connection.setAutoCommit(false);
boolean success = false;
withRetries(
() -> {
try (Connection connection = borrowConnection()) {
boolean autoCommit = connection.getAutoCommit();
boolean success = false;
connection.setAutoCommit(false);
try {
try {
try (Statement statement = connection.createStatement()) {
success = callback.execute(statement);
}
} finally {
if (success) {
connection.commit();
} else {
connection.rollback();
}
}
} finally {
connection.setAutoCommit(autoCommit);
}
}
return null;
});
}

private boolean isRetryable(SQLException e) {
String sqlState = e.getSQLState();

if (sqlState != null) {
return sqlState.equals(SERIALIZATION_FAILURE_SQL_CODE); // Serialization failure
}

// Additionally, one might check for specific error messages or other conditions
return e.getMessage().toLowerCase(Locale.ROOT).contains("connection refused")
|| e.getMessage().toLowerCase(Locale.ROOT).contains("connection reset");
}

// TODO: consider refactoring to use a retry library, inorder to have fair retries
// and more knobs for tuning retry pattern.
@VisibleForTesting
<T> T withRetries(Operation<T> operation) throws SQLException {
int attempts = 0;
// maximum number of retries.
int maxAttempts = relationalJdbcConfiguration.maxRetries().orElse(1);
// How long we should try, since the first attempt.
long maxDuration = relationalJdbcConfiguration.maxDurationInMs().orElse(5000L);
// How long to wait before first failure.
long delay = relationalJdbcConfiguration.initialDelayInMs().orElse(100L);

// maximum time we will retry till.
long maxRetryTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime()) + maxDuration;

while (attempts < maxAttempts) {
try {
try (Statement statement = connection.createStatement()) {
success = callback.execute(statement);
}
} finally {
if (success) {
connection.commit();
return operation.execute();
} catch (SQLException | RuntimeException e) {
SQLException sqlException;
if (e instanceof RuntimeException) {
// Handle Exceptions from ResultSet Iterator consumer, as it throws a RTE, ignore RTE from
// the transactions.
if (e.getCause() instanceof SQLException
&& !(e instanceof EntityAlreadyExistsException)) {
sqlException = (SQLException) e.getCause();
} else {
throw e;
}
} else {
connection.rollback();
sqlException = (SQLException) e;
}
connection.setAutoCommit(autoCommit);

attempts++;
long timeLeft =
Math.max((maxRetryTime - TimeUnit.NANOSECONDS.toMillis(System.nanoTime())), 0L);
if (timeLeft == 0 || attempts >= maxAttempts || !isRetryable(sqlException)) {
String exceptionMessage =
String.format(
"Failed due to %s, after , %s attempts and %s milliseconds",
sqlException.getMessage(), attempts, maxDuration);
throw new SQLException(
exceptionMessage, sqlException.getSQLState(), sqlException.getErrorCode(), e);
}
// Add jitter
long timeToSleep = Math.min(timeLeft, delay + (long) (random.nextFloat() * 0.2 * delay));
LOGGER.debug(
"Sleeping {} ms before retrying {} on attempt {} / {}, reason {}",
timeToSleep,
operation,
attempts,
maxAttempts,
e.getMessage(),
e);
try {
Thread.sleep(timeToSleep);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new RuntimeException("Retry interrupted", ie);
}
delay *= 2; // Exponential backoff
}
}
// This should never be reached
return null;
}

public interface Operation<T> {
T execute() throws SQLException;
}

// Interface for transaction callback
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ public class JdbcMetaStoreManagerFactory implements MetaStoreManagerFactory {

@Inject PolarisStorageIntegrationProvider storageIntegrationProvider;
@Inject Instance<DataSource> dataSource;
@Inject RelationalJdbcConfiguration relationalJdbcConfiguration;

protected JdbcMetaStoreManagerFactory() {}

Expand Down Expand Up @@ -108,7 +109,8 @@ private void initializeForRealm(
}

private DatasourceOperations getDatasourceOperations(boolean isBootstrap) {
DatasourceOperations databaseOperations = new DatasourceOperations(dataSource.get());
DatasourceOperations databaseOperations =
new DatasourceOperations(dataSource.get(), relationalJdbcConfiguration);
if (isBootstrap) {
try {
DatabaseType databaseType;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.polaris.extension.persistence.relational.jdbc;

import java.util.Optional;

public interface RelationalJdbcConfiguration {
// max retries before giving up
Optional<Integer> maxRetries();

// max retry duration
Optional<Long> maxDurationInMs();

// initial delay
Optional<Long> initialDelayInMs();
}
Loading