From 577211c491d4f726ffae30d24bb4b686d4fcbded Mon Sep 17 00:00:00 2001 From: Prashant Date: Tue, 29 Apr 2025 11:41:38 -0700 Subject: [PATCH 1/2] Remove transaction from atomic writes --- .../jdbc/JdbcBasePersistenceImpl.java | 23 ++++++++++++------- 1 file changed, 15 insertions(+), 8 deletions(-) diff --git a/extension/persistence/relational-jdbc/src/main/java/org/apache/polaris/extension/persistence/relational/jdbc/JdbcBasePersistenceImpl.java b/extension/persistence/relational-jdbc/src/main/java/org/apache/polaris/extension/persistence/relational/jdbc/JdbcBasePersistenceImpl.java index 5ffce813f9..9b877c5b10 100644 --- a/extension/persistence/relational-jdbc/src/main/java/org/apache/polaris/extension/persistence/relational/jdbc/JdbcBasePersistenceImpl.java +++ b/extension/persistence/relational-jdbc/src/main/java/org/apache/polaris/extension/persistence/relational/jdbc/JdbcBasePersistenceImpl.java @@ -93,11 +93,7 @@ public void writeEntity( boolean nameOrParentChanged, PolarisBaseEntity originalEntity) { try { - datasourceOperations.runWithinTransaction( - statement -> { - persistEntity(callCtx, entity, originalEntity, statement); - return true; - }); + persistEntity(callCtx, entity, originalEntity, datasourceOperations); } catch (SQLException e) { throw new RuntimeException("Error persisting entity", e); } @@ -143,12 +139,12 @@ private void persistEntity( @Nonnull PolarisCallContext callCtx, @Nonnull PolarisBaseEntity entity, PolarisBaseEntity originalEntity, - Statement statement) + Object executor) throws SQLException { ModelEntity modelEntity = ModelEntity.fromEntity(entity); if (originalEntity == null) { try { - statement.executeUpdate(generateInsertQuery(modelEntity, realmId)); + execute(executor, generateInsertQuery(modelEntity, realmId)); } catch (SQLException e) { if (datasourceOperations.isConstraintViolation(e)) { PolarisBaseEntity existingEntity = @@ -176,7 +172,7 @@ private void persistEntity( "realm_id", realmId); try { - int rowsUpdated = statement.executeUpdate(generateUpdateQuery(modelEntity, params)); + int rowsUpdated = execute(executor, generateUpdateQuery(modelEntity, params)); if (rowsUpdated == 0) { throw new RetryOnConcurrencyException( "Entity '%s' id '%s' concurrently modified; expected version %s", @@ -189,6 +185,17 @@ private void persistEntity( } } + private int execute(Object executor, String query) throws SQLException { + if (executor instanceof Statement) { + // used for running in transaction + return ((Statement) executor).executeUpdate(query); + } else if (executor instanceof DatasourceOperations) { + return ((DatasourceOperations) executor).executeUpdate(query); + } else { + throw new IllegalArgumentException("Unsupported executor: " + executor); + } + } + @Override public void writeToGrantRecords( @Nonnull PolarisCallContext callCtx, @Nonnull PolarisGrantRecord grantRec) { From 3e4ec1f01a33098db954b17e20ebcb46829f28e8 Mon Sep 17 00:00:00 2001 From: Prashant Date: Wed, 30 Apr 2025 09:50:26 -0700 Subject: [PATCH 2/2] remove if-else --- .../jdbc/JdbcBasePersistenceImpl.java | 27 +++++++------------ 1 file changed, 10 insertions(+), 17 deletions(-) diff --git a/extension/persistence/relational-jdbc/src/main/java/org/apache/polaris/extension/persistence/relational/jdbc/JdbcBasePersistenceImpl.java b/extension/persistence/relational-jdbc/src/main/java/org/apache/polaris/extension/persistence/relational/jdbc/JdbcBasePersistenceImpl.java index 9b877c5b10..f58101b915 100644 --- a/extension/persistence/relational-jdbc/src/main/java/org/apache/polaris/extension/persistence/relational/jdbc/JdbcBasePersistenceImpl.java +++ b/extension/persistence/relational-jdbc/src/main/java/org/apache/polaris/extension/persistence/relational/jdbc/JdbcBasePersistenceImpl.java @@ -93,7 +93,7 @@ public void writeEntity( boolean nameOrParentChanged, PolarisBaseEntity originalEntity) { try { - persistEntity(callCtx, entity, originalEntity, datasourceOperations); + persistEntity(callCtx, entity, originalEntity, datasourceOperations::executeUpdate); } catch (SQLException e) { throw new RuntimeException("Error persisting entity", e); } @@ -111,7 +111,6 @@ public void writeEntities( PolarisBaseEntity entity = entities.get(i); PolarisBaseEntity originalEntity = originalEntities != null ? originalEntities.get(i) : null; - // first, check if the entity has already been created, in which case we will simply // return it. PolarisBaseEntity entityFound = @@ -123,7 +122,7 @@ public void writeEntities( // already been updated after the creation. continue; } - persistEntity(callCtx, entity, originalEntity, statement); + persistEntity(callCtx, entity, originalEntity, statement::executeUpdate); } return true; }); @@ -139,12 +138,12 @@ private void persistEntity( @Nonnull PolarisCallContext callCtx, @Nonnull PolarisBaseEntity entity, PolarisBaseEntity originalEntity, - Object executor) + QueryAction queryAction) throws SQLException { ModelEntity modelEntity = ModelEntity.fromEntity(entity); if (originalEntity == null) { try { - execute(executor, generateInsertQuery(modelEntity, realmId)); + queryAction.apply(generateInsertQuery(modelEntity, realmId)); } catch (SQLException e) { if (datasourceOperations.isConstraintViolation(e)) { PolarisBaseEntity existingEntity = @@ -172,7 +171,7 @@ private void persistEntity( "realm_id", realmId); try { - int rowsUpdated = execute(executor, generateUpdateQuery(modelEntity, params)); + int rowsUpdated = queryAction.apply(generateUpdateQuery(modelEntity, params)); if (rowsUpdated == 0) { throw new RetryOnConcurrencyException( "Entity '%s' id '%s' concurrently modified; expected version %s", @@ -185,17 +184,6 @@ private void persistEntity( } } - private int execute(Object executor, String query) throws SQLException { - if (executor instanceof Statement) { - // used for running in transaction - return ((Statement) executor).executeUpdate(query); - } else if (executor instanceof DatasourceOperations) { - return ((DatasourceOperations) executor).executeUpdate(query); - } else { - throw new IllegalArgumentException("Unsupported executor: " + executor); - } - } - @Override public void writeToGrantRecords( @Nonnull PolarisCallContext callCtx, @Nonnull PolarisGrantRecord grantRec) { @@ -930,4 +918,9 @@ PolarisStorageIntegration loadPolarisStorageIntegration( BaseMetaStoreManager.extractStorageConfiguration(callContext, entity); return storageIntegrationProvider.getStorageIntegrationForConfig(storageConfig); } + + @FunctionalInterface + private interface QueryAction { + Integer apply(String query) throws SQLException; + } }