diff --git a/extension/persistence/relational-jdbc/src/main/java/org/apache/polaris/extension/persistence/relational/jdbc/JdbcBasePersistenceImpl.java b/extension/persistence/relational-jdbc/src/main/java/org/apache/polaris/extension/persistence/relational/jdbc/JdbcBasePersistenceImpl.java index 5ffce813f9..f58101b915 100644 --- a/extension/persistence/relational-jdbc/src/main/java/org/apache/polaris/extension/persistence/relational/jdbc/JdbcBasePersistenceImpl.java +++ b/extension/persistence/relational-jdbc/src/main/java/org/apache/polaris/extension/persistence/relational/jdbc/JdbcBasePersistenceImpl.java @@ -93,11 +93,7 @@ public void writeEntity( boolean nameOrParentChanged, PolarisBaseEntity originalEntity) { try { - datasourceOperations.runWithinTransaction( - statement -> { - persistEntity(callCtx, entity, originalEntity, statement); - return true; - }); + persistEntity(callCtx, entity, originalEntity, datasourceOperations::executeUpdate); } catch (SQLException e) { throw new RuntimeException("Error persisting entity", e); } @@ -115,7 +111,6 @@ public void writeEntities( PolarisBaseEntity entity = entities.get(i); PolarisBaseEntity originalEntity = originalEntities != null ? originalEntities.get(i) : null; - // first, check if the entity has already been created, in which case we will simply // return it. PolarisBaseEntity entityFound = @@ -127,7 +122,7 @@ public void writeEntities( // already been updated after the creation. continue; } - persistEntity(callCtx, entity, originalEntity, statement); + persistEntity(callCtx, entity, originalEntity, statement::executeUpdate); } return true; }); @@ -143,12 +138,12 @@ private void persistEntity( @Nonnull PolarisCallContext callCtx, @Nonnull PolarisBaseEntity entity, PolarisBaseEntity originalEntity, - Statement statement) + QueryAction queryAction) throws SQLException { ModelEntity modelEntity = ModelEntity.fromEntity(entity); if (originalEntity == null) { try { - statement.executeUpdate(generateInsertQuery(modelEntity, realmId)); + queryAction.apply(generateInsertQuery(modelEntity, realmId)); } catch (SQLException e) { if (datasourceOperations.isConstraintViolation(e)) { PolarisBaseEntity existingEntity = @@ -176,7 +171,7 @@ private void persistEntity( "realm_id", realmId); try { - int rowsUpdated = statement.executeUpdate(generateUpdateQuery(modelEntity, params)); + int rowsUpdated = queryAction.apply(generateUpdateQuery(modelEntity, params)); if (rowsUpdated == 0) { throw new RetryOnConcurrencyException( "Entity '%s' id '%s' concurrently modified; expected version %s", @@ -923,4 +918,9 @@ PolarisStorageIntegration loadPolarisStorageIntegration( BaseMetaStoreManager.extractStorageConfiguration(callContext, entity); return storageIntegrationProvider.getStorageIntegrationForConfig(storageConfig); } + + @FunctionalInterface + private interface QueryAction { + Integer apply(String query) throws SQLException; + } }