2020
2121import static java .nio .charset .StandardCharsets .UTF_8 ;
2222
23+ import com .google .common .annotations .VisibleForTesting ;
2324import jakarta .annotation .Nonnull ;
2425import java .io .BufferedReader ;
2526import java .io .IOException ;
3031import java .sql .Statement ;
3132import java .util .ArrayList ;
3233import java .util .List ;
34+ import java .util .Locale ;
3335import java .util .Objects ;
36+ import java .util .Random ;
37+ import java .util .concurrent .TimeUnit ;
3438import java .util .function .Consumer ;
3539import java .util .stream .Stream ;
3640import javax .sql .DataSource ;
41+ import org .apache .polaris .core .persistence .EntityAlreadyExistsException ;
3742import org .apache .polaris .extension .persistence .relational .jdbc .models .Converter ;
43+ import org .slf4j .Logger ;
44+ import org .slf4j .LoggerFactory ;
3845
3946public class DatasourceOperations {
4047
48+ private static final Logger LOGGER = LoggerFactory .getLogger (DatasourceOperations .class );
49+
4150 private static final String CONSTRAINT_VIOLATION_SQL_CODE = "23505" ;
4251
52+ // POSTGRES RETRYABLE EXCEPTIONS
53+ private static final String SERIALIZATION_FAILURE_SQL_CODE = "40001" ;
54+
4355 private final DataSource datasource ;
56+ private final RelationalJdbcConfiguration relationalJdbcConfiguration ;
57+
58+ private final Random random = new Random ();
4459
45- public DatasourceOperations (DataSource datasource ) {
60+ public DatasourceOperations (
61+ DataSource datasource , RelationalJdbcConfiguration relationalJdbcConfiguration ) {
4662 this .datasource = datasource ;
63+ this .relationalJdbcConfiguration = relationalJdbcConfiguration ;
4764 }
4865
4966 /**
@@ -121,22 +138,16 @@ public <T> void executeSelectOverStream(
121138 @ Nonnull Converter <T > converterInstance ,
122139 @ Nonnull Consumer <Stream <T >> consumer )
123140 throws SQLException {
124- try (Connection connection = borrowConnection ();
125- Statement statement = connection .createStatement ();
126- ResultSet resultSet = statement .executeQuery (query )) {
127- ResultSetIterator <T > iterator = new ResultSetIterator <>(resultSet , converterInstance );
128- consumer .accept (iterator .toStream ());
129- } catch (SQLException e ) {
130- throw e ;
131- } catch (RuntimeException e ) {
132- if (e .getCause () instanceof SQLException ) {
133- throw (SQLException ) e .getCause ();
134- } else {
135- throw e ;
136- }
137- } catch (Exception e ) {
138- throw new RuntimeException (e );
139- }
141+ withRetries (
142+ () -> {
143+ try (Connection connection = borrowConnection ();
144+ Statement statement = connection .createStatement ();
145+ ResultSet resultSet = statement .executeQuery (query )) {
146+ ResultSetIterator <T > iterator = new ResultSetIterator <>(resultSet , converterInstance );
147+ consumer .accept (iterator .toStream ());
148+ return null ;
149+ }
150+ });
140151 }
141152
142153 /**
@@ -147,16 +158,19 @@ public <T> void executeSelectOverStream(
147158 * @throws SQLException : Exception during Query Execution.
148159 */
149160 public int executeUpdate (String query ) throws SQLException {
150- try (Connection connection = borrowConnection ();
151- Statement statement = connection .createStatement ()) {
152- boolean autoCommit = connection .getAutoCommit ();
153- connection .setAutoCommit (true );
154- try {
155- return statement .executeUpdate (query );
156- } finally {
157- connection .setAutoCommit (autoCommit );
158- }
159- }
161+ return withRetries (
162+ () -> {
163+ try (Connection connection = borrowConnection ();
164+ Statement statement = connection .createStatement ()) {
165+ boolean autoCommit = connection .getAutoCommit ();
166+ connection .setAutoCommit (true );
167+ try {
168+ return statement .executeUpdate (query );
169+ } finally {
170+ connection .setAutoCommit (autoCommit );
171+ }
172+ }
173+ });
160174 }
161175
162176 /**
@@ -166,23 +180,113 @@ public int executeUpdate(String query) throws SQLException {
166180 * @throws SQLException : Exception caught during transaction execution.
167181 */
168182 public void runWithinTransaction (TransactionCallback callback ) throws SQLException {
169- try (Connection connection = borrowConnection ()) {
170- boolean autoCommit = connection .getAutoCommit ();
171- connection .setAutoCommit (false );
172- boolean success = false ;
183+ withRetries (
184+ () -> {
185+ try (Connection connection = borrowConnection ()) {
186+ boolean autoCommit = connection .getAutoCommit ();
187+ boolean success = false ;
188+ connection .setAutoCommit (false );
189+ try {
190+ try {
191+ try (Statement statement = connection .createStatement ()) {
192+ success = callback .execute (statement );
193+ }
194+ } finally {
195+ if (success ) {
196+ connection .commit ();
197+ } else {
198+ connection .rollback ();
199+ }
200+ }
201+ } finally {
202+ connection .setAutoCommit (autoCommit );
203+ }
204+ }
205+ return null ;
206+ });
207+ }
208+
209+ private boolean isRetryable (SQLException e ) {
210+ String sqlState = e .getSQLState ();
211+
212+ if (sqlState != null ) {
213+ return sqlState .equals (SERIALIZATION_FAILURE_SQL_CODE ); // Serialization failure
214+ }
215+
216+ // Additionally, one might check for specific error messages or other conditions
217+ return e .getMessage ().toLowerCase (Locale .ROOT ).contains ("connection refused" )
218+ || e .getMessage ().toLowerCase (Locale .ROOT ).contains ("connection reset" );
219+ }
220+
221+ // TODO: consider refactoring to use a retry library, inorder to have fair retries
222+ // and more knobs for tuning retry pattern.
223+ @ VisibleForTesting
224+ <T > T withRetries (Operation <T > operation ) throws SQLException {
225+ int attempts = 0 ;
226+ // maximum number of retries.
227+ int maxAttempts = relationalJdbcConfiguration .maxRetries ().orElse (1 );
228+ // How long we should try, since the first attempt.
229+ long maxDuration = relationalJdbcConfiguration .maxDurationInMs ().orElse (5000L );
230+ // How long to wait before first failure.
231+ long delay = relationalJdbcConfiguration .initialDelayInMs ().orElse (100L );
232+
233+ // maximum time we will retry till.
234+ long maxRetryTime = TimeUnit .NANOSECONDS .toMillis (System .nanoTime ()) + maxDuration ;
235+
236+ while (attempts < maxAttempts ) {
173237 try {
174- try (Statement statement = connection .createStatement ()) {
175- success = callback .execute (statement );
176- }
177- } finally {
178- if (success ) {
179- connection .commit ();
238+ return operation .execute ();
239+ } catch (SQLException | RuntimeException e ) {
240+ SQLException sqlException ;
241+ if (e instanceof RuntimeException ) {
242+ // Handle Exceptions from ResultSet Iterator consumer, as it throws a RTE, ignore RTE from
243+ // the transactions.
244+ if (e .getCause () instanceof SQLException
245+ && !(e instanceof EntityAlreadyExistsException )) {
246+ sqlException = (SQLException ) e .getCause ();
247+ } else {
248+ throw e ;
249+ }
180250 } else {
181- connection . rollback () ;
251+ sqlException = ( SQLException ) e ;
182252 }
183- connection .setAutoCommit (autoCommit );
253+
254+ attempts ++;
255+ long timeLeft =
256+ Math .max ((maxRetryTime - TimeUnit .NANOSECONDS .toMillis (System .nanoTime ())), 0L );
257+ if (timeLeft == 0 || attempts >= maxAttempts || !isRetryable (sqlException )) {
258+ String exceptionMessage =
259+ String .format (
260+ "Failed due to %s, after , %s attempts and %s milliseconds" ,
261+ sqlException .getMessage (), attempts , maxDuration );
262+ throw new SQLException (
263+ exceptionMessage , sqlException .getSQLState (), sqlException .getErrorCode (), e );
264+ }
265+ // Add jitter
266+ long timeToSleep = Math .min (timeLeft , delay + (long ) (random .nextFloat () * 0.2 * delay ));
267+ LOGGER .debug (
268+ "Sleeping {} ms before retrying {} on attempt {} / {}, reason {}" ,
269+ timeToSleep ,
270+ operation ,
271+ attempts ,
272+ maxAttempts ,
273+ e .getMessage (),
274+ e );
275+ try {
276+ Thread .sleep (timeToSleep );
277+ } catch (InterruptedException ie ) {
278+ Thread .currentThread ().interrupt ();
279+ throw new RuntimeException ("Retry interrupted" , ie );
280+ }
281+ delay *= 2 ; // Exponential backoff
184282 }
185283 }
284+ // This should never be reached
285+ return null ;
286+ }
287+
288+ public interface Operation <T > {
289+ T execute () throws SQLException ;
186290 }
187291
188292 // Interface for transaction callback
0 commit comments