diff --git a/extension/persistence/relational-jdbc/src/main/java/org/apache/polaris/extension/persistence/relational/jdbc/DatasourceOperations.java b/extension/persistence/relational-jdbc/src/main/java/org/apache/polaris/extension/persistence/relational/jdbc/DatasourceOperations.java index 6fec8e67fe..66ee8f9710 100644 --- a/extension/persistence/relational-jdbc/src/main/java/org/apache/polaris/extension/persistence/relational/jdbc/DatasourceOperations.java +++ b/extension/persistence/relational-jdbc/src/main/java/org/apache/polaris/extension/persistence/relational/jdbc/DatasourceOperations.java @@ -31,8 +31,9 @@ import java.util.ArrayList; import java.util.List; import java.util.Objects; +import java.util.function.Consumer; import java.util.function.Function; -import java.util.function.Predicate; +import java.util.stream.Stream; import javax.sql.DataSource; import org.apache.polaris.extension.persistence.relational.jdbc.models.Converter; @@ -90,41 +91,56 @@ public void executeScript(String scriptFilePath) throws SQLException { } /** - * Executes SELECT Query + * Executes SELECT Query and returns the results after applying a transformer * * @param query : Query to executed - * @param entityClass : Class of the entity being selected - * @param transformer : Transformation of entity class to Result class - * @param entityFilter : Filter to applied on the Result class - * @param limit : Limit to to enforced. - * @return List of Result class objects - * @param : Entity class - * @param : Result class + * @param converterInstance : An instance of the type being selected, used to convert to a + * business entity like PolarisBaseEntity + * @param transformer Transformation of entity class to Result class + * @return The list of results yielded by the query + * @param : Persistence entity class + * @param : Business entity class * @throws SQLException : Exception during the query execution. */ public List executeSelect( @Nonnull String query, - @Nonnull Class entityClass, - @Nonnull Function transformer, - Predicate entityFilter, - int limit) + @Nonnull Converter converterInstance, + @Nonnull Function transformer) + throws SQLException { + ArrayList results = new ArrayList<>(); + executeSelectOverStream( + query, converterInstance, stream -> stream.map(transformer).forEach(results::add)); + return results; + } + + /** + * Executes SELECT Query and takes a consumer over the results. For callers that want more + * sophisticated control over how query results are handled. + * + * @param query : Query to executed + * @param converterInstance : An entity of the type being selected + * @param consumer : An function to consume the returned results + * @param : Entity class + * @throws SQLException : Exception during the query execution. + */ + public void executeSelectOverStream( + @Nonnull String query, + @Nonnull Converter converterInstance, + @Nonnull Consumer> consumer) throws SQLException { try (Connection connection = borrowConnection(); Statement statement = connection.createStatement(); ResultSet resultSet = statement.executeQuery(query)) { - List resultList = new ArrayList<>(); - while (resultSet.next() && resultList.size() < limit) { - Converter object = - (Converter) - entityClass.getDeclaredConstructor().newInstance(); // Create a new instance - R entity = transformer.apply(object.fromResultSet(resultSet)); - if (entityFilter == null || entityFilter.test(entity)) { - resultList.add(entity); - } - } - return resultList; + ResultSetIterator iterator = new ResultSetIterator<>(resultSet, converterInstance); + consumer.accept(iterator.toStream()); } catch (SQLException e) { throw e; + } catch (RuntimeException e) { + if (e.getCause() instanceof SQLException) { + throw (SQLException) e.getCause(); + } else { + throw e; + } } catch (Exception e) { throw new RuntimeException(e); } diff --git a/extension/persistence/relational-jdbc/src/main/java/org/apache/polaris/extension/persistence/relational/jdbc/JdbcBasePersistenceImpl.java b/extension/persistence/relational-jdbc/src/main/java/org/apache/polaris/extension/persistence/relational/jdbc/JdbcBasePersistenceImpl.java index 0d74011cc6..38448934f1 100644 --- a/extension/persistence/relational-jdbc/src/main/java/org/apache/polaris/extension/persistence/relational/jdbc/JdbcBasePersistenceImpl.java +++ b/extension/persistence/relational-jdbc/src/main/java/org/apache/polaris/extension/persistence/relational/jdbc/JdbcBasePersistenceImpl.java @@ -297,8 +297,7 @@ public PolarisBaseEntity lookupEntityByName( private PolarisBaseEntity getPolarisBaseEntity(String query) { try { List results = - datasourceOperations.executeSelect( - query, ModelEntity.class, ModelEntity::toEntity, null, Integer.MAX_VALUE); + datasourceOperations.executeSelect(query, new ModelEntity(), ModelEntity::toEntity); if (results.isEmpty()) { return null; } else if (results.size() > 1) { @@ -322,8 +321,7 @@ public List lookupEntities( if (entityIds == null || entityIds.isEmpty()) return new ArrayList<>(); String query = generateSelectQueryWithEntityIds(realmId, entityIds); try { - return datasourceOperations.executeSelect( - query, ModelEntity.class, ModelEntity::toEntity, null, Integer.MAX_VALUE); + return datasourceOperations.executeSelect(query, new ModelEntity(), ModelEntity::toEntity); } catch (SQLException e) { throw new RuntimeException( String.format("Failed to retrieve polaris entities due to %s", e.getMessage()), e); @@ -412,9 +410,17 @@ public List listEntities( // absence of transaction. String query = QueryGenerator.generateSelectQuery(new ModelEntity(), params); try { - List results = - datasourceOperations.executeSelect( - query, ModelEntity.class, ModelEntity::toEntity, entityFilter, limit); + List results = new ArrayList<>(); + datasourceOperations.executeSelectOverStream( + query, + new ModelEntity(), + stream -> { + stream + .map(ModelEntity::toEntity) + .filter(entityFilter) + .limit(limit) + .forEach(results::add); + }); return results == null ? Collections.emptyList() : results.stream().filter(entityFilter).map(transformer).collect(Collectors.toList()); @@ -461,11 +467,7 @@ public PolarisGrantRecord lookupGrantRecord( try { List results = datasourceOperations.executeSelect( - query, - ModelGrantRecord.class, - ModelGrantRecord::toGrantRecord, - null, - Integer.MAX_VALUE); + query, new ModelGrantRecord(), ModelGrantRecord::toGrantRecord); if (results.size() > 1) { throw new IllegalStateException( String.format( @@ -496,11 +498,7 @@ public List loadAllGrantRecordsOnSecurable( try { List results = datasourceOperations.executeSelect( - query, - ModelGrantRecord.class, - ModelGrantRecord::toGrantRecord, - null, - Integer.MAX_VALUE); + query, new ModelGrantRecord(), ModelGrantRecord::toGrantRecord); return results == null ? Collections.emptyList() : results; } catch (SQLException e) { throw new RuntimeException( @@ -522,11 +520,7 @@ public List loadAllGrantRecordsOnGrantee( try { List results = datasourceOperations.executeSelect( - query, - ModelGrantRecord.class, - ModelGrantRecord::toGrantRecord, - null, - Integer.MAX_VALUE); + query, new ModelGrantRecord(), ModelGrantRecord::toGrantRecord); return results == null ? Collections.emptyList() : results; } catch (SQLException e) { throw new RuntimeException( @@ -553,8 +547,7 @@ public boolean hasChildren( String query = generateSelectQuery(new ModelEntity(), params); try { List results = - datasourceOperations.executeSelect( - query, ModelEntity.class, Function.identity(), null, Integer.MAX_VALUE); + datasourceOperations.executeSelect(query, new ModelEntity(), Function.identity()); return results != null && !results.isEmpty(); } catch (SQLException e) { throw new RuntimeException( @@ -574,10 +567,8 @@ public PolarisPrincipalSecrets loadPrincipalSecrets( List results = datasourceOperations.executeSelect( query, - ModelPrincipalAuthenticationData.class, - ModelPrincipalAuthenticationData::toPrincipalAuthenticationData, - null, - Integer.MAX_VALUE); + new ModelPrincipalAuthenticationData(), + ModelPrincipalAuthenticationData::toPrincipalAuthenticationData); return results == null || results.isEmpty() ? null : results.getFirst(); } catch (SQLException e) { LOGGER.error( @@ -880,10 +871,8 @@ private List fetchPolicyMappingRecords(String query) List results = datasourceOperations.executeSelect( query, - ModelPolicyMappingRecord.class, - ModelPolicyMappingRecord::toPolicyMappingRecord, - null, - Integer.MAX_VALUE); + new ModelPolicyMappingRecord(), + ModelPolicyMappingRecord::toPolicyMappingRecord); return results == null ? Collections.emptyList() : results; } catch (SQLException e) { throw new RuntimeException( diff --git a/extension/persistence/relational-jdbc/src/main/java/org/apache/polaris/extension/persistence/relational/jdbc/ResultSetIterator.java b/extension/persistence/relational-jdbc/src/main/java/org/apache/polaris/extension/persistence/relational/jdbc/ResultSetIterator.java new file mode 100644 index 0000000000..f5acfb076c --- /dev/null +++ b/extension/persistence/relational-jdbc/src/main/java/org/apache/polaris/extension/persistence/relational/jdbc/ResultSetIterator.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.extension.persistence.relational.jdbc; + +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.Iterator; +import java.util.NoSuchElementException; +import java.util.Spliterator; +import java.util.Spliterators; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; +import org.apache.polaris.extension.persistence.relational.jdbc.models.Converter; + +/** + * Used to wrap a ResultSet and to build a stream from the data it contains. This data structure + * will not close the ResultSet passed in, so the caller is still responsible for managing its + * lifecycle + */ +public class ResultSetIterator implements Iterator { + private final ResultSet resultSet; + private final Converter converterInstance; + private boolean hasNext; + + public ResultSetIterator(ResultSet resultSet, Converter converterInstance) + throws SQLException { + this.resultSet = resultSet; + this.converterInstance = converterInstance; + advance(); + } + + private void advance() throws SQLException { + try { + hasNext = resultSet.next(); + } catch (SQLException e) { + hasNext = false; + throw e; + } + } + + @Override + public boolean hasNext() { + return hasNext; + } + + @Override + public T next() { + if (!hasNext) { + throw new NoSuchElementException(); + } + try { + T object = converterInstance.fromResultSet(resultSet); + advance(); + return object; + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + public Stream toStream() { + Spliterator spliterator = Spliterators.spliteratorUnknownSize(this, 0); + return StreamSupport.stream(spliterator, false); + } +} diff --git a/extension/persistence/relational-jdbc/src/test/java/org/apache/polaris/extension/persistence/impl/relational/jdbc/DatasourceOperationsTest.java b/extension/persistence/relational-jdbc/src/test/java/org/apache/polaris/extension/persistence/impl/relational/jdbc/DatasourceOperationsTest.java index 48d2aa39bf..f5694982c7 100644 --- a/extension/persistence/relational-jdbc/src/test/java/org/apache/polaris/extension/persistence/impl/relational/jdbc/DatasourceOperationsTest.java +++ b/extension/persistence/relational-jdbc/src/test/java/org/apache/polaris/extension/persistence/impl/relational/jdbc/DatasourceOperationsTest.java @@ -27,6 +27,7 @@ import java.util.function.Function; import javax.sql.DataSource; import org.apache.polaris.extension.persistence.relational.jdbc.DatasourceOperations; +import org.apache.polaris.extension.persistence.relational.jdbc.models.ModelEntity; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -76,9 +77,7 @@ void testExecuteSelect_exception() throws Exception { assertThrows( SQLException.class, - () -> - datasourceOperations.executeSelect( - query, Object.class, Function.identity(), null, Integer.MAX_VALUE)); + () -> datasourceOperations.executeSelect(query, new ModelEntity(), Function.identity())); } @Test