diff --git a/integration-tests/src/main/java/org/apache/polaris/service/it/env/CatalogApi.java b/integration-tests/src/main/java/org/apache/polaris/service/it/env/CatalogApi.java index 7be67f1947..5e10081d8c 100644 --- a/integration-tests/src/main/java/org/apache/polaris/service/it/env/CatalogApi.java +++ b/integration-tests/src/main/java/org/apache/polaris/service/it/env/CatalogApi.java @@ -101,6 +101,24 @@ public List listNamespaces(String catalog, Namespace parent) { } } + public ListNamespacesResponse listNamespaces( + String catalog, Namespace parent, String pageToken, String pageSize) { + Map queryParams = new HashMap<>(); + if (!parent.isEmpty()) { + // TODO change this for Iceberg 1.7.2: + // queryParams.put("parent", RESTUtil.encodeNamespace(parent)); + queryParams.put("parent", Joiner.on('\u001f').join(parent.levels())); + } + queryParams.put("pageToken", pageToken); + queryParams.put("pageSize", pageSize); + try (Response response = + request("v1/{cat}/namespaces", Map.of("cat", catalog), queryParams).get()) { + assertThat(response.getStatus()).isEqualTo(OK.getStatusCode()); + ListNamespacesResponse res = response.readEntity(ListNamespacesResponse.class); + return res; + } + } + public List listAllNamespacesChildFirst(String catalog) { List result = new ArrayList<>(); for (int idx = -1; idx < result.size(); idx++) { @@ -142,6 +160,20 @@ public List listTables(String catalog, Namespace namespace) { } } + public ListTablesResponse listTables( + String catalog, Namespace namespace, String pageToken, String pageSize) { + String ns = RESTUtil.encodeNamespace(namespace); + Map queryParams = new HashMap<>(); + queryParams.put("pageToken", pageToken); + queryParams.put("pageSize", pageSize); + try (Response res = + request("v1/{cat}/namespaces/" + ns + "/tables", Map.of("cat", catalog), queryParams) + .get()) { + assertThat(res.getStatus()).isEqualTo(Response.Status.OK.getStatusCode()); + return res.readEntity(ListTablesResponse.class); + } + } + public void dropTable(String catalog, TableIdentifier id) { String ns = RESTUtil.encodeNamespace(id.namespace()); try (Response res = diff --git a/integration-tests/src/main/java/org/apache/polaris/service/it/test/PolarisRestCatalogIntegrationTest.java b/integration-tests/src/main/java/org/apache/polaris/service/it/test/PolarisRestCatalogIntegrationTest.java index a859338b97..fe183aef74 100644 --- a/integration-tests/src/main/java/org/apache/polaris/service/it/test/PolarisRestCatalogIntegrationTest.java +++ b/integration-tests/src/main/java/org/apache/polaris/service/it/test/PolarisRestCatalogIntegrationTest.java @@ -67,6 +67,8 @@ import org.apache.iceberg.rest.RESTUtil; import org.apache.iceberg.rest.requests.CreateTableRequest; import org.apache.iceberg.rest.responses.ErrorResponse; +import org.apache.iceberg.rest.responses.ListNamespacesResponse; +import org.apache.iceberg.rest.responses.ListTablesResponse; import org.apache.iceberg.types.Types; import org.apache.polaris.core.admin.model.AwsStorageConfigInfo; import org.apache.polaris.core.admin.model.Catalog; @@ -164,7 +166,8 @@ public class PolarisRestCatalogIntegrationTest extends CatalogTests private static final String[] DEFAULT_CATALOG_PROPERTIES = { "polaris.config.allow.unstructured.table.location", "true", - "polaris.config.allow.external.table.location", "true" + "polaris.config.allow.external.table.location", "true", + "polaris.config.list-pagination-enabled", "true" }; @Retention(RetentionPolicy.RUNTIME) @@ -1558,4 +1561,72 @@ public void testUpdateTableWithReservedProperty() { .hasMessageContaining("reserved prefix"); genericTableApi.purge(currentCatalogName, namespace); } + + @Test + public void testPaginatedListNamespaces() { + String prefix = "testPaginatedListNamespaces"; + for (int i = 0; i < 20; i++) { + Namespace namespace = Namespace.of(prefix + i); + restCatalog.createNamespace(namespace); + } + + try { + Assertions.assertThat(catalogApi.listNamespaces(currentCatalogName, Namespace.empty())) + .hasSize(20); + for (var pageSize : List.of(1, 2, 3, 9, 10, 11, 19, 20, 21, 2000)) { + int total = 0; + String pageToken = null; + do { + ListNamespacesResponse response = + catalogApi.listNamespaces( + currentCatalogName, Namespace.empty(), pageToken, String.valueOf(pageSize)); + Assertions.assertThat(response.namespaces().size()).isLessThanOrEqualTo(pageSize); + total += response.namespaces().size(); + pageToken = response.nextPageToken(); + } while (pageToken != null); + Assertions.assertThat(total) + .as("Total paginated results for pageSize = " + pageSize) + .isEqualTo(20); + } + } finally { + for (int i = 0; i < 20; i++) { + Namespace namespace = Namespace.of(prefix + i); + restCatalog.dropNamespace(namespace); + } + } + } + + @Test + public void testPaginatedListTables() { + String prefix = "testPaginatedListTables"; + Namespace namespace = Namespace.of(prefix); + restCatalog.createNamespace(namespace); + for (int i = 0; i < 20; i++) { + restCatalog.createTable(TableIdentifier.of(namespace, prefix + i), SCHEMA); + } + + try { + Assertions.assertThat(catalogApi.listTables(currentCatalogName, namespace)).hasSize(20); + for (var pageSize : List.of(1, 2, 3, 9, 10, 11, 19, 20, 21, 2000)) { + int total = 0; + String pageToken = null; + do { + ListTablesResponse response = + catalogApi.listTables( + currentCatalogName, namespace, pageToken, String.valueOf(pageSize)); + Assertions.assertThat(response.identifiers().size()).isLessThanOrEqualTo(pageSize); + total += response.identifiers().size(); + pageToken = response.nextPageToken(); + } while (pageToken != null); + Assertions.assertThat(total) + .as("Total paginated results for pageSize = " + pageSize) + .isEqualTo(20); + } + } finally { + for (int i = 0; i < 20; i++) { + restCatalog.dropTable(TableIdentifier.of(namespace, prefix + i)); + } + restCatalog.dropNamespace(namespace); + } + } } diff --git a/persistence/eclipselink/src/main/java/org/apache/polaris/extension/persistence/impl/eclipselink/PolarisEclipseLinkMetaStoreSessionImpl.java b/persistence/eclipselink/src/main/java/org/apache/polaris/extension/persistence/impl/eclipselink/PolarisEclipseLinkMetaStoreSessionImpl.java index 02997f37ce..1e0b08c7c2 100644 --- a/persistence/eclipselink/src/main/java/org/apache/polaris/extension/persistence/impl/eclipselink/PolarisEclipseLinkMetaStoreSessionImpl.java +++ b/persistence/eclipselink/src/main/java/org/apache/polaris/extension/persistence/impl/eclipselink/PolarisEclipseLinkMetaStoreSessionImpl.java @@ -53,7 +53,7 @@ import org.apache.polaris.core.persistence.BaseMetaStoreManager; import org.apache.polaris.core.persistence.PrincipalSecretsGenerator; import org.apache.polaris.core.persistence.RetryOnConcurrencyException; -import org.apache.polaris.core.persistence.pagination.HasPageSize; +import org.apache.polaris.core.persistence.pagination.EntityIdPaging; import org.apache.polaris.core.persistence.pagination.Page; import org.apache.polaris.core.persistence.pagination.PageToken; import org.apache.polaris.core.persistence.transactional.AbstractTransactionalPersistence; @@ -468,6 +468,7 @@ public List lookupEntityActiveBatchInCurrentTxn( @Nonnull Predicate entityFilter, @Nonnull Function transformer, @Nonnull PageToken pageToken) { + // full range scan under the parent for that type Stream data = this.store @@ -477,11 +478,7 @@ public List lookupEntityActiveBatchInCurrentTxn( .map(ModelEntity::toEntity) .filter(entityFilter); - if (pageToken instanceof HasPageSize hasPageSize) { - data = data.limit(hasPageSize.getPageSize()); - } - - return Page.fromItems(data.map(transformer).collect(Collectors.toList())); + return Page.mapped(pageToken, data, transformer, EntityIdPaging::encodedDataReference); } /** {@inheritDoc} */ diff --git a/persistence/eclipselink/src/main/java/org/apache/polaris/extension/persistence/impl/eclipselink/PolarisEclipseLinkStore.java b/persistence/eclipselink/src/main/java/org/apache/polaris/extension/persistence/impl/eclipselink/PolarisEclipseLinkStore.java index 4a889d3c03..6170958eb4 100644 --- a/persistence/eclipselink/src/main/java/org/apache/polaris/extension/persistence/impl/eclipselink/PolarisEclipseLinkStore.java +++ b/persistence/eclipselink/src/main/java/org/apache/polaris/extension/persistence/impl/eclipselink/PolarisEclipseLinkStore.java @@ -35,6 +35,7 @@ import org.apache.polaris.core.entity.PolarisEntityType; import org.apache.polaris.core.entity.PolarisGrantRecord; import org.apache.polaris.core.entity.PolarisPrincipalSecrets; +import org.apache.polaris.core.persistence.pagination.EntityIdPaging; import org.apache.polaris.core.persistence.pagination.PageToken; import org.apache.polaris.core.policy.PolarisPolicyMappingRecord; import org.apache.polaris.core.policy.PolicyEntity; @@ -294,7 +295,16 @@ List lookupFullEntitiesActive( // Currently check against ENTITIES not joining with ENTITIES_ACTIVE String hql = - "SELECT m from ModelEntity m where m.catalogId=:catalogId and m.parentId=:parentId and m.typeCode=:typeCode"; + "SELECT m from ModelEntity m where" + + " m.catalogId=:catalogId and m.parentId=:parentId and m.typeCode=:typeCode"; + + if (pageToken.hasDataReference()) { + hql += " and m.id > :tokenId"; + } + + if (pageToken.paginationRequested()) { + hql += " order by m.id asc"; + } TypedQuery query = session @@ -303,6 +313,11 @@ List lookupFullEntitiesActive( .setParameter("parentId", parentId) .setParameter("typeCode", entityType.getCode()); + if (pageToken.hasDataReference()) { + long tokenId = EntityIdPaging.entityIdBoundary(pageToken); + query = query.setParameter("tokenId", tokenId); + } + return query.getResultList(); } diff --git a/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/JdbcBasePersistenceImpl.java b/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/JdbcBasePersistenceImpl.java index 2fb0c90ca0..06f4fd2a8f 100644 --- a/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/JdbcBasePersistenceImpl.java +++ b/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/JdbcBasePersistenceImpl.java @@ -18,6 +18,7 @@ */ package org.apache.polaris.persistence.relational.jdbc; +import static org.apache.polaris.core.persistence.pagination.EntityIdPaging.entityIdBoundary; import static org.apache.polaris.persistence.relational.jdbc.QueryGenerator.PreparedQuery; import com.google.common.base.Preconditions; @@ -31,6 +32,7 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import java.util.function.Predicate; import java.util.stream.Collectors; @@ -50,7 +52,7 @@ import org.apache.polaris.core.persistence.PolicyMappingAlreadyExistsException; import org.apache.polaris.core.persistence.PrincipalSecretsGenerator; import org.apache.polaris.core.persistence.RetryOnConcurrencyException; -import org.apache.polaris.core.persistence.pagination.HasPageSize; +import org.apache.polaris.core.persistence.pagination.EntityIdPaging; import org.apache.polaris.core.persistence.pagination.Page; import org.apache.polaris.core.persistence.pagination.PageToken; import org.apache.polaris.core.policy.PolarisPolicyMappingRecord; @@ -449,7 +451,7 @@ public Page listEntities( @Nonnull Predicate entityFilter, @Nonnull Function transformer, @Nonnull PageToken pageToken) { - Map params = + Map whereEquals = Map.of( "catalog_id", catalogId, @@ -459,29 +461,37 @@ public Page listEntities( entityType.getCode(), "realm_id", realmId); + Map whereGreater = Map.of(); // Limit can't be pushed down, due to client side filtering // absence of transaction. + String orderByColumnName = null; + if (pageToken.paginationRequested()) { + orderByColumnName = ModelEntity.ID_COLUMN; + if (pageToken.hasDataReference()) { + long boundary = entityIdBoundary(pageToken); + whereGreater = Map.of(ModelEntity.ID_COLUMN, boundary); + } + } + try { PreparedQuery query = QueryGenerator.generateSelectQuery( - ModelEntity.ALL_COLUMNS, ModelEntity.TABLE_NAME, params); - List results = new ArrayList<>(); + ModelEntity.ALL_COLUMNS, + ModelEntity.TABLE_NAME, + whereEquals, + whereGreater, + orderByColumnName); + AtomicReference> results = new AtomicReference<>(); datasourceOperations.executeSelectOverStream( query, new ModelEntity(), stream -> { var data = stream.filter(entityFilter); - if (pageToken instanceof HasPageSize hasPageSize) { - data = data.limit(hasPageSize.getPageSize()); - } - data.forEach(results::add); + results.set( + Page.mapped(pageToken, data, transformer, EntityIdPaging::encodedDataReference)); }); - List resultsOrEmpty = - results == null - ? Collections.emptyList() - : results.stream().filter(entityFilter).map(transformer).collect(Collectors.toList()); - return Page.fromItems(resultsOrEmpty); + return results.get(); } catch (SQLException e) { throw new RuntimeException( String.format("Failed to retrieve polaris entities due to %s", e.getMessage()), e); diff --git a/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/QueryGenerator.java b/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/QueryGenerator.java index 1ba2ae2839..5f6a28a6ae 100644 --- a/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/QueryGenerator.java +++ b/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/QueryGenerator.java @@ -20,6 +20,7 @@ import com.google.common.annotations.VisibleForTesting; import jakarta.annotation.Nonnull; +import jakarta.annotation.Nullable; import java.util.*; import java.util.stream.Collectors; import org.apache.polaris.core.entity.PolarisEntityCore; @@ -52,8 +53,27 @@ public static PreparedQuery generateSelectQuery( @Nonnull List projections, @Nonnull String tableName, @Nonnull Map whereClause) { - QueryFragment where = generateWhereClause(new HashSet<>(projections), whereClause); - PreparedQuery query = generateSelectQuery(projections, tableName, where.sql()); + return generateSelectQuery(projections, tableName, whereClause, Map.of(), null); + } + + /** + * Generates a SELECT query with projection and filtering. + * + * @param projections List of columns to retrieve. + * @param tableName Target table name. + * @param whereEquals Column-value pairs used in WHERE filtering. + * @return A parameterized SELECT query. + * @throws IllegalArgumentException if any whereClause column isn't in projections. + */ + public static PreparedQuery generateSelectQuery( + @Nonnull List projections, + @Nonnull String tableName, + @Nonnull Map whereEquals, + @Nonnull Map whereGreater, + @Nullable String orderByColumn) { + QueryFragment where = + generateWhereClause(new HashSet<>(projections), whereEquals, whereGreater); + PreparedQuery query = generateSelectQuery(projections, tableName, where.sql(), orderByColumn); return new PreparedQuery(query.sql(), where.parameters()); } @@ -101,7 +121,8 @@ public static PreparedQuery generateSelectQueryWithEntityIds( params.add(realmId); String where = " WHERE (catalog_id, id) IN (" + placeholders + ") AND realm_id = ?"; return new PreparedQuery( - generateSelectQuery(ModelEntity.ALL_COLUMNS, ModelEntity.TABLE_NAME, where).sql(), params); + generateSelectQuery(ModelEntity.ALL_COLUMNS, ModelEntity.TABLE_NAME, where, null).sql(), + params); } /** @@ -150,7 +171,7 @@ public static PreparedQuery generateUpdateQuery( @Nonnull List values, @Nonnull Map whereClause) { List bindingParams = new ArrayList<>(values); - QueryFragment where = generateWhereClause(new HashSet<>(allColumns), whereClause); + QueryFragment where = generateWhereClause(new HashSet<>(allColumns), whereClause, Map.of()); String setClause = allColumns.stream().map(c -> c + " = ?").collect(Collectors.joining(", ")); String sql = "UPDATE " + getFullyQualifiedTableName(tableName) + " SET " + setClause + where.sql(); @@ -170,34 +191,49 @@ public static PreparedQuery generateDeleteQuery( @Nonnull List tableColumns, @Nonnull String tableName, @Nonnull Map whereClause) { - QueryFragment where = generateWhereClause(new HashSet<>(tableColumns), whereClause); + QueryFragment where = generateWhereClause(new HashSet<>(tableColumns), whereClause, Map.of()); return new PreparedQuery( "DELETE FROM " + getFullyQualifiedTableName(tableName) + where.sql(), where.parameters()); } private static PreparedQuery generateSelectQuery( - @Nonnull List columnNames, @Nonnull String tableName, @Nonnull String filter) { + @Nonnull List columnNames, + @Nonnull String tableName, + @Nonnull String filter, + @Nullable String orderByColumn) { String sql = "SELECT " + String.join(", ", columnNames) + " FROM " + getFullyQualifiedTableName(tableName) + filter; + if (orderByColumn != null) { + sql += " ORDER BY " + orderByColumn + " ASC"; + } return new PreparedQuery(sql, Collections.emptyList()); } @VisibleForTesting static QueryFragment generateWhereClause( - @Nonnull Set tableColumns, @Nonnull Map whereClause) { + @Nonnull Set tableColumns, + @Nonnull Map whereEquals, + @Nonnull Map whereGreater) { List conditions = new ArrayList<>(); List parameters = new ArrayList<>(); - for (Map.Entry entry : whereClause.entrySet()) { + for (Map.Entry entry : whereEquals.entrySet()) { if (!tableColumns.contains(entry.getKey()) && !entry.getKey().equals("realm_id")) { throw new IllegalArgumentException("Invalid query column: " + entry.getKey()); } conditions.add(entry.getKey() + " = ?"); parameters.add(entry.getValue()); } + for (Map.Entry entry : whereGreater.entrySet()) { + if (!tableColumns.contains(entry.getKey()) && !entry.getKey().equals("realm_id")) { + throw new IllegalArgumentException("Invalid query column: " + entry.getKey()); + } + conditions.add(entry.getKey() + " > ?"); + parameters.add(entry.getValue()); + } String clause = conditions.isEmpty() ? "" : " WHERE " + String.join(" AND ", conditions); return new QueryFragment(clause, parameters); } diff --git a/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/models/ModelEntity.java b/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/models/ModelEntity.java index b847a677f2..bf5e139a9b 100644 --- a/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/models/ModelEntity.java +++ b/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/models/ModelEntity.java @@ -31,6 +31,8 @@ public class ModelEntity implements Converter { public static final String TABLE_NAME = "ENTITIES"; + public static final String ID_COLUMN = "id"; + public static final List ALL_COLUMNS = List.of( "id", diff --git a/persistence/relational-jdbc/src/test/java/org/apache/polaris/persistence/relational/jdbc/QueryGeneratorTest.java b/persistence/relational-jdbc/src/test/java/org/apache/polaris/persistence/relational/jdbc/QueryGeneratorTest.java index d1b71b841d..be5590c474 100644 --- a/persistence/relational-jdbc/src/test/java/org/apache/polaris/persistence/relational/jdbc/QueryGeneratorTest.java +++ b/persistence/relational-jdbc/src/test/java/org/apache/polaris/persistence/relational/jdbc/QueryGeneratorTest.java @@ -206,7 +206,8 @@ void testGenerateWhereClause_singleCondition() { Map whereClause = new HashMap<>(); whereClause.put("name", "test"); assertEquals( - " WHERE name = ?", QueryGenerator.generateWhereClause(Set.of("name"), whereClause).sql()); + " WHERE name = ?", + QueryGenerator.generateWhereClause(Set.of("name"), whereClause, Map.of()).sql()); } @Test @@ -216,12 +217,24 @@ void testGenerateWhereClause_multipleConditions() { whereClause.put("version", 1); assertEquals( " WHERE name = ? AND version = ?", - QueryGenerator.generateWhereClause(Set.of("name", "version"), whereClause).sql()); + QueryGenerator.generateWhereClause(Set.of("name", "version"), whereClause, Map.of()).sql()); + } + + @Test + void testGenerateWhereClause_multipleConditions_AndInequality() { + Map whereClause = new HashMap<>(); + whereClause.put("name", "test"); + whereClause.put("version", 1); + assertEquals( + " WHERE name = ? AND version = ? AND id > ?", + QueryGenerator.generateWhereClause( + Set.of("name", "version", "id"), whereClause, Map.of("id", 123)) + .sql()); } @Test void testGenerateWhereClause_emptyMap() { Map whereClause = Collections.emptyMap(); - assertEquals("", QueryGenerator.generateWhereClause(Set.of(), whereClause).sql()); + assertEquals("", QueryGenerator.generateWhereClause(Set.of(), whereClause, Map.of()).sql()); } } diff --git a/polaris-core/src/main/java/org/apache/polaris/core/catalog/PolarisCatalogHelpers.java b/polaris-core/src/main/java/org/apache/polaris/core/catalog/PolarisCatalogHelpers.java index e10c24f2f1..b37efaae37 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/catalog/PolarisCatalogHelpers.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/catalog/PolarisCatalogHelpers.java @@ -19,7 +19,6 @@ package org.apache.polaris.core.catalog; import com.google.common.collect.ImmutableList; -import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.stream.Collectors; @@ -62,20 +61,28 @@ public static Namespace getParentNamespace(Namespace namespace) { return Namespace.of(parentLevels); } - public static List nameAndIdToNamespaces( - List catalogPath, List entities) { + public static Namespace nameAndIdToNamespace( + List catalogPath, PolarisEntity.NameAndId entity) { + // Skip element 0 which is the catalog entity + String[] fullName = new String[catalogPath.size()]; + for (int i = 0; i < fullName.length - 1; ++i) { + fullName[i] = catalogPath.get(i + 1).getName(); + } + fullName[fullName.length - 1] = entity.getName(); + return Namespace.of(fullName); + } + + /** + * Given the shortnames/ids of entities that all live under the given catalogPath, reconstructs + * TableIdentifier objects for each that all hold the catalogPath excluding the catalog entity. + */ + public static Namespace parentNamespace(List catalogPath) { // Skip element 0 which is the catalog entity String[] parentNamespaces = new String[catalogPath.size() - 1]; for (int i = 0; i < parentNamespaces.length; ++i) { parentNamespaces[i] = catalogPath.get(i + 1).getName(); } - List namespaces = new ArrayList<>(); - for (PolarisEntity.NameAndId entity : entities) { - String[] fullName = Arrays.copyOf(parentNamespaces, parentNamespaces.length + 1); - fullName[fullName.length - 1] = entity.getName(); - namespaces.add(Namespace.of(fullName)); - } - return namespaces; + return Namespace.of(parentNamespaces); } /** diff --git a/polaris-core/src/main/java/org/apache/polaris/core/config/FeatureConfiguration.java b/polaris-core/src/main/java/org/apache/polaris/core/config/FeatureConfiguration.java index 1e0e963297..36386d54c4 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/config/FeatureConfiguration.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/config/FeatureConfiguration.java @@ -217,6 +217,7 @@ public static void enforceFeatureEnabledOrThrow( public static final PolarisConfiguration LIST_PAGINATION_ENABLED = PolarisConfiguration.builder() .key("LIST_PAGINATION_ENABLED") + .catalogConfig("polaris.config.list-pagination-enabled") .description("If set to true, pagination for APIs like listTables is enabled.") .defaultValue(false) .buildFeatureConfiguration(); diff --git a/polaris-core/src/main/java/org/apache/polaris/core/persistence/AtomicOperationMetaStoreManager.java b/polaris-core/src/main/java/org/apache/polaris/core/persistence/AtomicOperationMetaStoreManager.java index 060a908e15..d567d49115 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/persistence/AtomicOperationMetaStoreManager.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/persistence/AtomicOperationMetaStoreManager.java @@ -30,6 +30,7 @@ import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; +import java.util.function.Predicate; import java.util.stream.Collectors; import org.apache.polaris.core.PolarisCallContext; import org.apache.polaris.core.entity.AsyncTaskType; @@ -706,17 +707,13 @@ private void revokeGrantRecord( catalogPath == null || catalogPath.size() == 0 ? 0l : catalogPath.get(catalogPath.size() - 1).getId(); - Page resultPage = - ms.listEntities(callCtx, catalogId, parentId, entityType, pageToken); - + Predicate filter = entity -> true; // prune the returned list with only entities matching the entity subtype if (entitySubType != PolarisEntitySubType.ANY_SUBTYPE) { - resultPage = - pageToken.buildNextPage( - resultPage.items.stream() - .filter(rec -> rec.getSubTypeCode() == entitySubType.getCode()) - .collect(Collectors.toList())); + filter = e -> e.getSubTypeCode() == entitySubType.getCode(); } + Page resultPage = + ms.listEntities(callCtx, catalogId, parentId, entityType, filter, pageToken); // TODO: Use post-validation to enforce consistent view against catalogPath. In the // meantime, happens-before ordering semantics aren't guaranteed during high-concurrency @@ -957,7 +954,7 @@ private void revokeGrantRecord( e.getExistingEntity().getSubTypeCode())); } - return new EntitiesResult(createdEntities); + return new EntitiesResult(Page.fromItems(createdEntities)); } /** {@inheritDoc} */ @@ -1028,7 +1025,7 @@ private void revokeGrantRecord( } // good, all success - return new EntitiesResult(updatedEntities); + return new EntitiesResult(Page.fromItems(updatedEntities)); } /** {@inheritDoc} */ @@ -1191,7 +1188,7 @@ private void revokeGrantRecord( entity -> true, Function.identity(), PageToken.fromLimit(2)) - .items; + .items(); // if we have 2, we cannot drop the catalog. If only one left, better be the admin role if (catalogRoles.size() > 1) { @@ -1529,29 +1526,32 @@ private void revokeGrantRecord( List loadedTasks = new ArrayList<>(); final AtomicInteger failedLeaseCount = new AtomicInteger(0); - availableTasks.items.forEach( - task -> { - PolarisBaseEntity updatedTask = new PolarisBaseEntity(task); - Map properties = - PolarisObjectMapperUtil.deserializeProperties(callCtx, task.getProperties()); - properties.put(PolarisTaskConstants.LAST_ATTEMPT_EXECUTOR_ID, executorId); - properties.put( - PolarisTaskConstants.LAST_ATTEMPT_START_TIME, - String.valueOf(callCtx.getClock().millis())); - properties.put( - PolarisTaskConstants.ATTEMPT_COUNT, - String.valueOf( - Integer.parseInt(properties.getOrDefault(PolarisTaskConstants.ATTEMPT_COUNT, "0")) - + 1)); - updatedTask.setProperties( - PolarisObjectMapperUtil.serializeProperties(callCtx, properties)); - EntityResult result = updateEntityPropertiesIfNotChanged(callCtx, null, updatedTask); - if (result.getReturnStatus() == BaseResult.ReturnStatus.SUCCESS) { - loadedTasks.add(result.getEntity()); - } else { - failedLeaseCount.getAndIncrement(); - } - }); + availableTasks + .items() + .forEach( + task -> { + PolarisBaseEntity updatedTask = new PolarisBaseEntity(task); + Map properties = + PolarisObjectMapperUtil.deserializeProperties(callCtx, task.getProperties()); + properties.put(PolarisTaskConstants.LAST_ATTEMPT_EXECUTOR_ID, executorId); + properties.put( + PolarisTaskConstants.LAST_ATTEMPT_START_TIME, + String.valueOf(callCtx.getClock().millis())); + properties.put( + PolarisTaskConstants.ATTEMPT_COUNT, + String.valueOf( + Integer.parseInt( + properties.getOrDefault(PolarisTaskConstants.ATTEMPT_COUNT, "0")) + + 1)); + updatedTask.setProperties( + PolarisObjectMapperUtil.serializeProperties(callCtx, properties)); + EntityResult result = updateEntityPropertiesIfNotChanged(callCtx, null, updatedTask); + if (result.getReturnStatus() == BaseResult.ReturnStatus.SUCCESS) { + loadedTasks.add(result.getEntity()); + } else { + failedLeaseCount.getAndIncrement(); + } + }); // Since the contract of this method is to only return an empty list once no available tasks // are found anymore, if we happen to fail to lease any tasks at all due to all of them diff --git a/polaris-core/src/main/java/org/apache/polaris/core/persistence/dao/entity/EntitiesResult.java b/polaris-core/src/main/java/org/apache/polaris/core/persistence/dao/entity/EntitiesResult.java index e27b69680f..13c7422f02 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/persistence/dao/entity/EntitiesResult.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/persistence/dao/entity/EntitiesResult.java @@ -18,25 +18,20 @@ */ package org.apache.polaris.core.persistence.dao.entity; -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonProperty; import jakarta.annotation.Nonnull; import jakarta.annotation.Nullable; import java.util.List; -import java.util.Optional; import org.apache.polaris.core.entity.PolarisBaseEntity; import org.apache.polaris.core.persistence.pagination.Page; -import org.apache.polaris.core.persistence.pagination.PageToken; /** a set of returned entities result */ public class EntitiesResult extends BaseResult { // null if not success. Else the list of entities being returned - private final List entities; - private final Optional pageTokenOpt; + private final Page entities; public static EntitiesResult fromPage(Page page) { - return new EntitiesResult(page.items, Optional.ofNullable(page.pageToken)); + return new EntitiesResult(page); } /** @@ -48,11 +43,6 @@ public static EntitiesResult fromPage(Page page) { public EntitiesResult(@Nonnull ReturnStatus errorStatus, @Nullable String extraInformation) { super(errorStatus, extraInformation); this.entities = null; - this.pageTokenOpt = Optional.empty(); - } - - public EntitiesResult(@Nonnull List entities) { - this(entities, Optional.empty()); } /** @@ -60,29 +50,12 @@ public EntitiesResult(@Nonnull List entities) { * * @param entities list of entities being returned, implies success */ - public EntitiesResult( - @Nonnull List entities, @Nonnull Optional pageTokenOpt) { + public EntitiesResult(@Nonnull Page entities) { super(ReturnStatus.SUCCESS); this.entities = entities; - this.pageTokenOpt = pageTokenOpt; - } - - @JsonCreator - private EntitiesResult( - @JsonProperty("returnStatus") @Nonnull ReturnStatus returnStatus, - @JsonProperty("extraInformation") String extraInformation, - @JsonProperty("entities") List entities, - @JsonProperty("pageToken") Optional pageTokenOpt) { - super(returnStatus, extraInformation); - this.entities = entities; - this.pageTokenOpt = pageTokenOpt; - } - - public List getEntities() { - return entities; } - public Optional getPageToken() { - return pageTokenOpt; + public @Nullable List getEntities() { + return entities == null ? null : entities.items(); } } diff --git a/polaris-core/src/main/java/org/apache/polaris/core/persistence/dao/entity/ListEntitiesResult.java b/polaris-core/src/main/java/org/apache/polaris/core/persistence/dao/entity/ListEntitiesResult.java index 10669e8994..a7a51d2297 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/persistence/dao/entity/ListEntitiesResult.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/persistence/dao/entity/ListEntitiesResult.java @@ -18,26 +18,21 @@ */ package org.apache.polaris.core.persistence.dao.entity; -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonProperty; import jakarta.annotation.Nonnull; import jakarta.annotation.Nullable; import java.util.List; -import java.util.Optional; import org.apache.polaris.core.entity.EntityNameLookupRecord; import org.apache.polaris.core.persistence.pagination.Page; -import org.apache.polaris.core.persistence.pagination.PageToken; /** the return the result for a list entities call */ public class ListEntitiesResult extends BaseResult { // null if not success. Else the list of entities being returned - private final List entities; - private final Optional pageTokenOpt; + private final Page entities; /** Create a {@link ListEntitiesResult} from a {@link Page} */ public static ListEntitiesResult fromPage(Page page) { - return new ListEntitiesResult(page.items, Optional.ofNullable(page.pageToken)); + return new ListEntitiesResult(page); } /** @@ -46,13 +41,9 @@ public static ListEntitiesResult fromPage(Page page) { * @param errorCode error code, cannot be SUCCESS * @param extraInformation extra information */ - public ListEntitiesResult( - @Nonnull ReturnStatus errorCode, - @Nullable String extraInformation, - @Nonnull Optional pageTokenOpt) { + public ListEntitiesResult(@Nonnull ReturnStatus errorCode, @Nullable String extraInformation) { super(errorCode, extraInformation); this.entities = null; - this.pageTokenOpt = pageTokenOpt; } /** @@ -60,29 +51,16 @@ public ListEntitiesResult( * * @param entities list of entities being returned, implies success */ - public ListEntitiesResult( - @Nonnull List entities, @Nonnull Optional pageTokenOpt) { + public ListEntitiesResult(Page entities) { super(ReturnStatus.SUCCESS); this.entities = entities; - this.pageTokenOpt = pageTokenOpt; } - @JsonCreator - private ListEntitiesResult( - @JsonProperty("returnStatus") @Nonnull ReturnStatus returnStatus, - @JsonProperty("extraInformation") String extraInformation, - @JsonProperty("entities") List entities, - @JsonProperty("pageToken") Optional pageTokenOpt) { - super(returnStatus, extraInformation); - this.entities = entities; - this.pageTokenOpt = pageTokenOpt; - } - - public List getEntities() { - return entities; + public @Nullable List getEntities() { + return entities == null ? null : entities.items(); } - public Optional getPageToken() { - return pageTokenOpt; + public Page getPage() { + return entities == null ? Page.fromItems(List.of()) : entities; } } diff --git a/polaris-core/src/main/java/org/apache/polaris/core/persistence/pagination/DonePageToken.java b/polaris-core/src/main/java/org/apache/polaris/core/persistence/pagination/DonePageToken.java deleted file mode 100644 index d46ea7b026..0000000000 --- a/polaris-core/src/main/java/org/apache/polaris/core/persistence/pagination/DonePageToken.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.polaris.core.persistence.pagination; - -import java.util.List; - -/** - * A {@link PageToken} string that represents the lack of a page token. Returns `null` in - * `toTokenString`, which the client will interpret as there being no more data available. - */ -public class DonePageToken extends PageToken { - - public DonePageToken() {} - - @Override - public String toTokenString() { - return null; - } - - @Override - protected PageToken updated(List newData) { - throw new IllegalStateException("DonePageToken.updated is invalid"); - } -} diff --git a/polaris-core/src/main/java/org/apache/polaris/core/persistence/pagination/EntityIdPaging.java b/polaris-core/src/main/java/org/apache/polaris/core/persistence/pagination/EntityIdPaging.java new file mode 100644 index 0000000000..c4c7d6ce36 --- /dev/null +++ b/polaris-core/src/main/java/org/apache/polaris/core/persistence/pagination/EntityIdPaging.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.core.persistence.pagination; + +import static com.google.common.base.Preconditions.checkArgument; + +import jakarta.annotation.Nullable; +import org.apache.polaris.core.entity.PolarisBaseEntity; + +/** Utility class for processing pagination of streams of entities ordered by some ID property. */ +public final class EntityIdPaging { + private EntityIdPaging() {} + + /** + * Produces the reference to the next page of data in the form of a numerical entity ID. Entities + * in the associated stream of data are assumed to be ordered by ID. + */ + public static @Nullable String encodedDataReference(PolarisBaseEntity entity) { + if (entity == null) { + return null; + } + return Long.toString(entity.getId()); + } + + /** + * Extracts the entity ID from a {@link PageToken} assuming the request is a continuation of the + * API call that previously produced an {@link #encodedDataReference(PolarisBaseEntity) entity ID + * page token}. This ID is meant to be used as a boundary between the previous and the next pages + * in a stream of entities ordered by their Entity IDs. + */ + public static long entityIdBoundary(PageToken pageToken) { + String encodedId = pageToken.encodedDataReference(); + checkArgument(encodedId != null, "Encoded data reference is null in the page request"); + return Long.parseLong(encodedId); + } +} diff --git a/polaris-core/src/main/java/org/apache/polaris/core/persistence/pagination/HasPageSize.java b/polaris-core/src/main/java/org/apache/polaris/core/persistence/pagination/HasPageSize.java deleted file mode 100644 index c6b216fcd3..0000000000 --- a/polaris-core/src/main/java/org/apache/polaris/core/persistence/pagination/HasPageSize.java +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.polaris.core.persistence.pagination; - -/** - * A light interface for {@link PageToken} implementations to express that they have a page size - * that should be respected - */ -public interface HasPageSize { - int getPageSize(); -} diff --git a/polaris-core/src/main/java/org/apache/polaris/core/persistence/pagination/LimitPageToken.java b/polaris-core/src/main/java/org/apache/polaris/core/persistence/pagination/LimitPageToken.java deleted file mode 100644 index 18586446ca..0000000000 --- a/polaris-core/src/main/java/org/apache/polaris/core/persistence/pagination/LimitPageToken.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.polaris.core.persistence.pagination; - -import java.util.List; - -/** - * A {@link PageToken} implementation that has a page size, but no start offset. This can be used to - * represent a `limit`. When updated, it returns {@link DonePageToken}. As such it should never be - * user-facing and doesn't truly paginate. - */ -public class LimitPageToken extends PageToken implements HasPageSize { - - public static final String PREFIX = "limit"; - - private final int pageSize; - - public LimitPageToken(int pageSize) { - this.pageSize = pageSize; - } - - @Override - public int getPageSize() { - return pageSize; - } - - @Override - public String toTokenString() { - return String.format("%s/%d", PREFIX, pageSize); - } - - @Override - protected PageToken updated(List newData) { - return new DonePageToken(); - } -} diff --git a/polaris-core/src/main/java/org/apache/polaris/core/persistence/pagination/Page.java b/polaris-core/src/main/java/org/apache/polaris/core/persistence/pagination/Page.java index 18287f85c1..7bfeefa39c 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/persistence/pagination/Page.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/persistence/pagination/Page.java @@ -18,25 +18,96 @@ */ package org.apache.polaris.core.persistence.pagination; +import static java.util.Spliterators.iterator; + +import jakarta.annotation.Nullable; +import java.util.ArrayList; +import java.util.Iterator; import java.util.List; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.Stream; /** - * An immutable page of items plus their paging cursor. The {@link PageToken} here can be used to - * continue the listing operation that generated the `items`. + * An immutable page of items plus their paging cursor. The {@link #encodedResponseToken()} here can + * be used to continue the listing operation that generated the `items`. */ public class Page { - public final PageToken pageToken; - public final List items; + private final PageToken request; + private final List items; + @Nullable private final String encodedDataReference; - public Page(PageToken pageToken, List items) { - this.pageToken = pageToken; + private Page(PageToken request, @Nullable String encodedDataReference, List items) { + this.request = request; + this.encodedDataReference = encodedDataReference; this.items = items; } /** - * Used to wrap a {@link List} of items into a {@link Page } when there are no more pages + * Builds a complete response page for the full list of relevant items. No subsequence pages of + * related data exist. */ public static Page fromItems(List items) { - return new Page<>(new DonePageToken(), items); + return new Page<>(PageToken.readEverything(), null, items); + } + + /** + * Produces a response page by consuming the number of items from the provided stream according to + * the {@code request} parameter. Source items can be converted to a different type by providing a + * {@code mapper} function. The page token for the response will be produced from the request data + * combined with the pointer to the next page of data provided by the {@code dataPointer} + * function. + * + * @param request defines pagination parameters that were uses to produce this page of data. + * @param items stream of source data + * @param mapper converter from source data types to response data types. + * @param dataPointer determines the internal pointer to the next page of data given the last item + * from the previous page. The output of this function will be available from {@link + * PageToken#encodedDataReference()} associated with the request for the next page. + */ + public static Page mapped( + PageToken request, Stream items, Function mapper, Function dataPointer) { + List data; + if (request.paginationRequested()) { + data = new ArrayList<>(request.pageSize()); + } else { + data = new ArrayList<>(); + } + + T last = null; + Iterator it = iterator(items.spliterator()); + while (it.hasNext() && (!request.paginationRequested() || data.size() < request.pageSize())) { + last = it.next(); + data.add(mapper.apply(last)); + } + + if (request.paginationRequested() && data.size() < request.pageSize()) { + // the page was not filled, inform the client that there's no next page + last = null; + } + + return new Page<>(request, dataPointer.apply(last), data); + } + + public List items() { + return items; + } + + /** + * Returns a page token in encoded form suitable for returning to API clients. The string returned + * from this method is expected to be parsed by {@link PageToken#build(String, Integer)} when + * servicing the request for the next page of related data. + */ + public @Nullable String encodedResponseToken() { + return PageTokenUtil.encodePageToken(request, encodedDataReference); + } + + /** + * Converts this page of data to objects of a different type, while maintaining the underlying + * pointer to the next page of source data. + */ + public Page map(Function mapper) { + return new Page<>( + request, encodedDataReference, items.stream().map(mapper).collect(Collectors.toList())); } } diff --git a/polaris-core/src/main/java/org/apache/polaris/core/persistence/pagination/PageToken.java b/polaris-core/src/main/java/org/apache/polaris/core/persistence/pagination/PageToken.java index 2e335ccd40..78589a841a 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/persistence/pagination/PageToken.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/persistence/pagination/PageToken.java @@ -18,82 +18,85 @@ */ package org.apache.polaris.core.persistence.pagination; -import java.util.List; -import java.util.Objects; +import static com.google.common.base.Preconditions.checkState; -/** - * Represents a page token that can be used by operations like `listTables`. Clients that specify a - * `pageSize` (or a `pageToken`) may receive a `next-page-token` in the response, the content of - * which is a serialized PageToken. - * - *

By providing that in the next query's `pageToken`, the client can resume listing where they - * left off. If the client provides a `pageToken` or `pageSize` but `next-page-token` is null in the - * response, that means there is no more data to read. - */ -public abstract class PageToken { +import jakarta.annotation.Nullable; - /** Build a new PageToken that reads everything */ - public static PageToken readEverything() { - return build(null, null); - } +/** A wrapper for pagination information passed in as part of a request. */ +public class PageToken { + private final @Nullable String encodedDataReference; + private final int pageSize; - /** Build a new PageToken from an input String, without a specified page size */ - public static PageToken fromString(String token) { - return build(token, null); + PageToken(@Nullable String encodedDataReference, int pageSize) { + this.encodedDataReference = encodedDataReference; + this.pageSize = pageSize; } - /** Build a new PageToken from a limit */ - public static PageToken fromLimit(Integer pageSize) { - return build(null, pageSize); + /** Represents a non-paginated request. */ + public static PageToken readEverything() { + return new PageToken(null, -1); } - /** Build a {@link PageToken} from the input string and page size */ - public static PageToken build(String token, Integer pageSize) { - if (token == null || token.isEmpty()) { - if (pageSize != null) { - return new LimitPageToken(pageSize); - } else { - return new ReadEverythingPageToken(); - } - } else { - // TODO implement, split out by the token's prefix - throw new IllegalArgumentException("Unrecognized page token: " + token); - } + /** Represents a request to start paginating with a particular page size. */ + public static PageToken fromLimit(int limit) { + return new PageToken(null, limit); } - /** Serialize a {@link PageToken} into a string */ - public abstract String toTokenString(); + /** + * Reconstructs a page token from the API-level page token string (returned to the client in the + * response to a previous request for similar data) and an API-level new requested page size. + * + * @param encodedPageToken page token from the {@link Page#encodedResponseToken() previous page} + * @param requestedPageSize optional page size for the next page. If not set, the page size of the + * previous page (encoded in the page token string) will be reused. + * @see Page#encodedResponseToken() + */ + public static PageToken build( + @Nullable String encodedPageToken, @Nullable Integer requestedPageSize) { + return PageTokenUtil.decodePageRequest(encodedPageToken, requestedPageSize); + } /** - * Builds a new page token to reflect new data that's been read. If the amount of data read is - * less than the pageSize, this will return a {@link DonePageToken} + * Returns whether requests using this page token should produce paginated responses ({@code + * true}) or return all available data ({@code false}). */ - protected abstract PageToken updated(List newData); + public boolean paginationRequested() { + return pageSize > 0; + } /** - * Builds a {@link Page } from a {@link List}. The {@link PageToken} attached to the new - * {@link Page } is the same as the result of calling {@link #updated(List)} on this {@link - * PageToken}. + * Returns whether this token has an opaque reference that should be used to produce the next + * response page ({@code true}), or whether the response should start from the first page of + * available data ({@code false}). */ - public final Page buildNextPage(List data) { - return new Page(updated(data), data); + public boolean hasDataReference() { + return paginationRequested() && encodedDataReference != null; } - @Override - public final boolean equals(Object o) { - if (o instanceof PageToken) { - return Objects.equals(this.toTokenString(), ((PageToken) o).toTokenString()); - } else { - return false; - } + /** + * Returns the encoded form of the internal pointer to a page of data. This data should be + * interpreted by the code that actually handles pagination for this request (usually at the + * Persistence layer). This piece of code is normally the code that produced the {@link Page} of + * data in response to the previous request. + * + *

If this request is not related to a previous page, {@code null} will be returned. + * + * @throws IllegalStateException if this method is called when {@link #paginationRequested()} is + * {@code false} + */ + public @Nullable String encodedDataReference() { + checkState(paginationRequested(), "Pagination was not requested."); + return encodedDataReference; } - @Override - public final int hashCode() { - if (toTokenString() == null) { - return 0; - } else { - return toTokenString().hashCode(); - } + /** + * Returns the requested results page size. + * + * @throws IllegalStateException if this method is called when {@link #paginationRequested()} is + * {@code false} + */ + public int pageSize() { + checkState(paginationRequested(), "Pagination was not requested."); + return pageSize; } } diff --git a/polaris-core/src/main/java/org/apache/polaris/core/persistence/pagination/PageTokenUtil.java b/polaris-core/src/main/java/org/apache/polaris/core/persistence/pagination/PageTokenUtil.java new file mode 100644 index 0000000000..8e82c99f4a --- /dev/null +++ b/polaris-core/src/main/java/org/apache/polaris/core/persistence/pagination/PageTokenUtil.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.polaris.core.persistence.pagination; + +import static com.google.api.client.util.Preconditions.checkArgument; + +import jakarta.annotation.Nullable; +import java.nio.charset.StandardCharsets; +import java.util.Base64; +import java.util.Optional; + +final class PageTokenUtil { + + private PageTokenUtil() {} + + static PageToken decodePageRequest( + @Nullable String requestedPageToken, @Nullable Integer requestedPageSize) { + int pageSize; + String encodedDataReference = null; + if (requestedPageToken != null) { + byte[] bytes = Base64.getUrlDecoder().decode(requestedPageToken); + String token = new String(bytes, StandardCharsets.UTF_8); + int idx = token.indexOf(':'); + checkArgument(idx > 0, "Invalid page token: %s", requestedPageToken); + int prevPageSize = Integer.parseInt(token.substring(0, idx)); + encodedDataReference = token.substring(idx + 1); + pageSize = Optional.ofNullable(requestedPageSize).orElse(prevPageSize); + } else { + pageSize = Optional.ofNullable(requestedPageSize).orElse(-1); + } + + return new PageToken(encodedDataReference, pageSize); + } + + static @Nullable String encodePageToken( + PageToken request, @Nullable String encodedDataReference) { + if (!request.paginationRequested()) { + return null; + } + + if (encodedDataReference == null) { + return null; + } + + Base64.Encoder encoder = Base64.getUrlEncoder(); + return encoder.encodeToString( + String.format("%d:%s", request.pageSize(), encodedDataReference) + .getBytes(StandardCharsets.UTF_8)); + } +} diff --git a/polaris-core/src/main/java/org/apache/polaris/core/persistence/pagination/ReadEverythingPageToken.java b/polaris-core/src/main/java/org/apache/polaris/core/persistence/pagination/ReadEverythingPageToken.java deleted file mode 100644 index c8476c3511..0000000000 --- a/polaris-core/src/main/java/org/apache/polaris/core/persistence/pagination/ReadEverythingPageToken.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.polaris.core.persistence.pagination; - -import java.util.List; - -/** - * A {@link PageToken} implementation for readers who want to read everything. The behavior when - * using this token should be the same as when reading without a token. - */ -public class ReadEverythingPageToken extends PageToken { - - public static String PREFIX = "read-everything"; - - public ReadEverythingPageToken() {} - - @Override - public String toTokenString() { - return PREFIX; - } - - @Override - protected PageToken updated(List newData) { - return new DonePageToken(); - } -} diff --git a/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/TransactionalMetaStoreManagerImpl.java b/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/TransactionalMetaStoreManagerImpl.java index 0d6111d54b..15cfae23c9 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/TransactionalMetaStoreManagerImpl.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/TransactionalMetaStoreManagerImpl.java @@ -28,9 +28,9 @@ import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.Optional; import java.util.Set; import java.util.function.Function; +import java.util.function.Predicate; import java.util.stream.Collectors; import org.apache.polaris.core.PolarisCallContext; import org.apache.polaris.core.entity.AsyncTaskType; @@ -683,8 +683,8 @@ private void bootstrapPolarisService( } /** - * See {@link #listEntities(PolarisCallContext, List, PolarisEntityType, PolarisEntitySubType, - * PageToken)} + * See {@link PolarisMetaStoreManager#listEntities(PolarisCallContext, List, PolarisEntityType, + * PolarisEntitySubType, PageToken)} */ private @Nonnull ListEntitiesResult listEntities( @Nonnull PolarisCallContext callCtx, @@ -698,24 +698,25 @@ private void bootstrapPolarisService( // return if we failed to resolve if (resolver.isFailure()) { - return new ListEntitiesResult( - BaseResult.ReturnStatus.CATALOG_PATH_CANNOT_BE_RESOLVED, null, Optional.empty()); + return new ListEntitiesResult(BaseResult.ReturnStatus.CATALOG_PATH_CANNOT_BE_RESOLVED, null); } - // return list of active entities - Page resultPage = - ms.listEntitiesInCurrentTxn( - callCtx, resolver.getCatalogIdOrNull(), resolver.getParentId(), entityType, pageToken); - + Predicate filter = entity -> true; // prune the returned list with only entities matching the entity subtype if (entitySubType != PolarisEntitySubType.ANY_SUBTYPE) { - resultPage = - pageToken.buildNextPage( - resultPage.items.stream() - .filter(rec -> rec.getSubTypeCode() == entitySubType.getCode()) - .collect(Collectors.toList())); + filter = e -> e.getSubTypeCode() == entitySubType.getCode(); } + // return list of active entities + Page resultPage = + ms.listEntitiesInCurrentTxn( + callCtx, + resolver.getCatalogIdOrNull(), + resolver.getParentId(), + entityType, + filter, + pageToken); + // done return ListEntitiesResult.fromPage(resultPage); } @@ -1070,7 +1071,7 @@ private void bootstrapPolarisService( } createdEntities.add(entityCreateResult.getEntity()); } - return new EntitiesResult(createdEntities); + return new EntitiesResult(Page.fromItems(createdEntities)); }); } @@ -1175,7 +1176,7 @@ private void bootstrapPolarisService( } // good, all success - return new EntitiesResult(updatedEntities); + return new EntitiesResult(Page.fromItems(updatedEntities)); } /** {@inheritDoc} */ @@ -1378,7 +1379,7 @@ private void bootstrapPolarisService( entity -> true, Function.identity(), PageToken.fromLimit(2)) - .items; + .items(); // if we have 2, we cannot drop the catalog. If only one left, better be the admin role if (catalogRoles.size() > 1) { @@ -1967,37 +1968,41 @@ private PolarisEntityResolver resolveSecurableToRoleGrant( pageToken); List loadedTasks = new ArrayList<>(); - availableTasks.items.forEach( - task -> { - // Make a copy to avoid mutating someone else's reference. - // TODO: Refactor into immutable/Builder pattern. - PolarisBaseEntity updatedTask = new PolarisBaseEntity(task); - Map properties = - PolarisObjectMapperUtil.deserializeProperties(callCtx, task.getProperties()); - properties.put(PolarisTaskConstants.LAST_ATTEMPT_EXECUTOR_ID, executorId); - properties.put( - PolarisTaskConstants.LAST_ATTEMPT_START_TIME, - String.valueOf(callCtx.getClock().millis())); - properties.put( - PolarisTaskConstants.ATTEMPT_COUNT, - String.valueOf( - Integer.parseInt(properties.getOrDefault(PolarisTaskConstants.ATTEMPT_COUNT, "0")) - + 1)); - updatedTask.setProperties( - PolarisObjectMapperUtil.serializeProperties(callCtx, properties)); - EntityResult result = updateEntityPropertiesIfNotChanged(callCtx, ms, null, updatedTask); - if (result.getReturnStatus() == BaseResult.ReturnStatus.SUCCESS) { - loadedTasks.add(result.getEntity()); - } else { - // TODO: Consider performing incremental leasing of individual tasks one at a time - // instead of requiring all-or-none semantics for all the tasks we think we listed, - // or else contention could be very bad. - ms.rollback(); - throw new RetryOnConcurrencyException( - "Failed to lease available task with status %s, info: %s", - result.getReturnStatus(), result.getExtraInformation()); - } - }); + availableTasks + .items() + .forEach( + task -> { + // Make a copy to avoid mutating someone else's reference. + // TODO: Refactor into immutable/Builder pattern. + PolarisBaseEntity updatedTask = new PolarisBaseEntity(task); + Map properties = + PolarisObjectMapperUtil.deserializeProperties(callCtx, task.getProperties()); + properties.put(PolarisTaskConstants.LAST_ATTEMPT_EXECUTOR_ID, executorId); + properties.put( + PolarisTaskConstants.LAST_ATTEMPT_START_TIME, + String.valueOf(callCtx.getClock().millis())); + properties.put( + PolarisTaskConstants.ATTEMPT_COUNT, + String.valueOf( + Integer.parseInt( + properties.getOrDefault(PolarisTaskConstants.ATTEMPT_COUNT, "0")) + + 1)); + updatedTask.setProperties( + PolarisObjectMapperUtil.serializeProperties(callCtx, properties)); + EntityResult result = + updateEntityPropertiesIfNotChanged(callCtx, ms, null, updatedTask); + if (result.getReturnStatus() == BaseResult.ReturnStatus.SUCCESS) { + loadedTasks.add(result.getEntity()); + } else { + // TODO: Consider performing incremental leasing of individual tasks one at a time + // instead of requiring all-or-none semantics for all the tasks we think we listed, + // or else contention could be very bad. + ms.rollback(); + throw new RetryOnConcurrencyException( + "Failed to lease available task with status %s, info: %s", + result.getReturnStatus(), result.getExtraInformation()); + } + }); return EntitiesResult.fromPage(Page.fromItems(loadedTasks)); } diff --git a/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/TreeMapTransactionalPersistenceImpl.java b/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/TreeMapTransactionalPersistenceImpl.java index 44b37c2760..bfa0cc30b7 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/TreeMapTransactionalPersistenceImpl.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/TreeMapTransactionalPersistenceImpl.java @@ -21,6 +21,7 @@ import com.google.common.base.Predicates; import jakarta.annotation.Nonnull; import jakarta.annotation.Nullable; +import java.util.Comparator; import java.util.List; import java.util.function.Function; import java.util.function.Predicate; @@ -39,7 +40,7 @@ import org.apache.polaris.core.entity.PolarisPrincipalSecrets; import org.apache.polaris.core.persistence.BaseMetaStoreManager; import org.apache.polaris.core.persistence.PrincipalSecretsGenerator; -import org.apache.polaris.core.persistence.pagination.HasPageSize; +import org.apache.polaris.core.persistence.pagination.EntityIdPaging; import org.apache.polaris.core.persistence.pagination.Page; import org.apache.polaris.core.persistence.pagination.PageToken; import org.apache.polaris.core.policy.PolarisPolicyMappingRecord; @@ -361,13 +362,21 @@ public List lookupEntityActiveBatchInCurrentTxn( .map( nameRecord -> this.lookupEntityInCurrentTxn( - callCtx, catalogId, nameRecord.getId(), entityType.getCode())) - .filter(entityFilter); - if (pageToken instanceof HasPageSize) { - data = data.limit(((HasPageSize) pageToken).getPageSize()); + callCtx, catalogId, nameRecord.getId(), entityType.getCode())); + + Predicate tokenFilter; + if (pageToken.hasDataReference()) { + long nextId = EntityIdPaging.entityIdBoundary(pageToken); + tokenFilter = e -> e.getId() > nextId; + } else { + tokenFilter = e -> true; } - return Page.fromItems(data.map(transformer).collect(Collectors.toList())); + data = data.sorted(Comparator.comparingLong(PolarisEntityCore::getId)).filter(tokenFilter); + + data = data.filter(entityFilter); + + return Page.mapped(pageToken, data, transformer, EntityIdPaging::encodedDataReference); } /** {@inheritDoc} */ diff --git a/polaris-core/src/test/java/org/apache/polaris/core/persistence/pagination/PageTokenTest.java b/polaris-core/src/test/java/org/apache/polaris/core/persistence/pagination/PageTokenTest.java new file mode 100644 index 0000000000..bdbc21d274 --- /dev/null +++ b/polaris-core/src/test/java/org/apache/polaris/core/persistence/pagination/PageTokenTest.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.polaris.core.persistence.pagination; + +import static org.assertj.core.api.Assertions.*; + +import java.util.function.Function; +import java.util.stream.Stream; +import org.junit.jupiter.api.Test; + +class PageTokenTest { + + @Test + public void testReadEverything() { + PageToken r = PageToken.readEverything(); + assertThat(r.paginationRequested()).isFalse(); + assertThatThrownBy(r::encodedDataReference).isInstanceOf(IllegalStateException.class); + assertThatThrownBy(r::pageSize).isInstanceOf(IllegalStateException.class); + + r = PageToken.build(null, null); + assertThat(r.paginationRequested()).isFalse(); + assertThatThrownBy(r::encodedDataReference).isInstanceOf(IllegalStateException.class); + assertThatThrownBy(r::pageSize).isInstanceOf(IllegalStateException.class); + } + + @Test + public void testLimit() { + PageToken r = PageToken.fromLimit(123); + assertThat(r.encodedDataReference()).isNull(); + assertThat(r.paginationRequested()).isTrue(); + assertThat(r.pageSize()).isEqualTo(123); + } + + @Test + public void testFirstRequest() { + PageToken r = PageToken.build(null, 123); + assertThat(r.encodedDataReference()).isNull(); + assertThat(r.paginationRequested()).isTrue(); + assertThat(r.pageSize()).isEqualTo(123); + } + + @Test + public void testApiRoundTrip() { + PageToken request = PageToken.build(null, 123); + Page page = Page.mapped(request, Stream.of("i1"), Function.identity(), x -> "test:123"); + assertThat(page.encodedResponseToken()).isNotNull(); + PageToken r = PageToken.build(page.encodedResponseToken(), null); + assertThat(r.paginationRequested()).isTrue(); + assertThat(r.encodedDataReference()).isEqualTo("test:123"); + assertThat(r.pageSize()).isEqualTo(123); + + r = PageToken.build(page.encodedResponseToken(), 456); + assertThat(r.paginationRequested()).isTrue(); + assertThat(r.encodedDataReference()).isEqualTo("test:123"); + assertThat(r.pageSize()).isEqualTo(456); + } +} diff --git a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/IcebergCatalogTest.java b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/IcebergCatalogTest.java index 2b4c4205c9..92d74e09a8 100644 --- a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/IcebergCatalogTest.java +++ b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/IcebergCatalogTest.java @@ -98,6 +98,7 @@ import org.apache.polaris.core.persistence.dao.entity.BaseResult; import org.apache.polaris.core.persistence.dao.entity.EntityResult; import org.apache.polaris.core.persistence.dao.entity.PrincipalSecretsResult; +import org.apache.polaris.core.persistence.pagination.Page; import org.apache.polaris.core.persistence.pagination.PageToken; import org.apache.polaris.core.persistence.transactional.TransactionalPersistence; import org.apache.polaris.core.secrets.UserSecretsManager; @@ -180,6 +181,8 @@ public Map getConfigOverrides() { "polaris.event-listener.type", "test", "polaris.readiness.ignore-severe-issues", + "true", + "LIST_PAGINATION_ENABLED", "true"); } } @@ -2007,4 +2010,137 @@ public void testEventsAreEmitted() { Assertions.assertThat(afterTableEvent.base().properties().get(key)).isEqualTo(valOld); Assertions.assertThat(afterTableEvent.metadata().properties().get(key)).isEqualTo(valNew); } + + private static PageToken nextRequest(Page previousPage) { + return PageToken.build(previousPage.encodedResponseToken(), null); + } + + @Test + public void testPaginatedListTables() { + Assumptions.assumeTrue( + requiresNamespaceCreate(), + "Only applicable if namespaces must be created before adding children"); + + catalog.createNamespace(NS); + + for (int i = 0; i < 5; i++) { + catalog.buildTable(TableIdentifier.of(NS, "pagination_table_" + i), SCHEMA).create(); + } + + try { + // List without pagination + Assertions.assertThat(catalog.listTables(NS)).isNotNull().hasSize(5); + + // List with a limit: + Page firstListResult = catalog.listTables(NS, PageToken.fromLimit(2)); + Assertions.assertThat(firstListResult.items().size()).isEqualTo(2); + Assertions.assertThat(firstListResult.encodedResponseToken()).isNotNull().isNotEmpty(); + + // List using the previously obtained token: + Page secondListResult = catalog.listTables(NS, nextRequest(firstListResult)); + Assertions.assertThat(secondListResult.items().size()).isEqualTo(2); + Assertions.assertThat(secondListResult.encodedResponseToken()).isNotNull().isNotEmpty(); + + // List using the final token: + Page finalListResult = catalog.listTables(NS, nextRequest(secondListResult)); + Assertions.assertThat(finalListResult.items().size()).isEqualTo(1); + Assertions.assertThat(finalListResult.encodedResponseToken()).isNull(); + } finally { + for (int i = 0; i < 5; i++) { + catalog.dropTable(TableIdentifier.of(NS, "pagination_table_" + i)); + } + } + } + + @Test + public void testPaginatedListViews() { + Assumptions.assumeTrue( + requiresNamespaceCreate(), + "Only applicable if namespaces must be created before adding children"); + + catalog.createNamespace(NS); + + for (int i = 0; i < 5; i++) { + catalog + .buildView(TableIdentifier.of(NS, "pagination_view_" + i)) + .withQuery("a_" + i, "SELECT 1 id") + .withSchema(SCHEMA) + .withDefaultNamespace(NS) + .create(); + } + + try { + // List without pagination + Assertions.assertThat(catalog.listViews(NS)).isNotNull().hasSize(5); + + // List with a limit: + Page firstListResult = catalog.listViews(NS, PageToken.fromLimit(2)); + Assertions.assertThat(firstListResult.items().size()).isEqualTo(2); + Assertions.assertThat(firstListResult.encodedResponseToken()).isNotNull().isNotEmpty(); + + // List using the previously obtained token: + Page secondListResult = catalog.listViews(NS, nextRequest(firstListResult)); + Assertions.assertThat(secondListResult.items().size()).isEqualTo(2); + Assertions.assertThat(secondListResult.encodedResponseToken()).isNotNull().isNotEmpty(); + + // List using the final token: + Page finalListResult = catalog.listViews(NS, nextRequest(secondListResult)); + Assertions.assertThat(finalListResult.items().size()).isEqualTo(1); + Assertions.assertThat(finalListResult.encodedResponseToken()).isNull(); + } finally { + for (int i = 0; i < 5; i++) { + catalog.dropTable(TableIdentifier.of(NS, "pagination_view_" + i)); + } + } + } + + @Test + public void testPaginatedListNamespaces() { + for (int i = 0; i < 5; i++) { + catalog.createNamespace(Namespace.of("pagination_namespace_" + i)); + } + + try { + // List without pagination + Assertions.assertThat(catalog.listNamespaces()).isNotNull().hasSize(5); + + // List with a limit: + Page firstListResult = catalog.listNamespaces(Namespace.empty(), PageToken.fromLimit(2)); + Assertions.assertThat(firstListResult.items().size()).isEqualTo(2); + Assertions.assertThat(firstListResult.encodedResponseToken()).isNotNull().isNotEmpty(); + + // List using the previously obtained token: + Page secondListResult = + catalog.listNamespaces(Namespace.empty(), nextRequest(firstListResult)); + Assertions.assertThat(secondListResult.items().size()).isEqualTo(2); + Assertions.assertThat(secondListResult.encodedResponseToken()).isNotNull().isNotEmpty(); + + // List using the final token: + Page finalListResult = + catalog.listNamespaces(Namespace.empty(), nextRequest(secondListResult)); + Assertions.assertThat(finalListResult.items().size()).isEqualTo(1); + Assertions.assertThat(finalListResult.encodedResponseToken()).isNull(); + + // List with page size matching the amount of data + Page firstExactListResult = + catalog.listNamespaces(Namespace.empty(), PageToken.fromLimit(5)); + Assertions.assertThat(firstExactListResult.items().size()).isEqualTo(5); + Assertions.assertThat(firstExactListResult.encodedResponseToken()).isNotNull().isNotEmpty(); + + // Again list with matching page size + Page secondExactListResult = + catalog.listNamespaces(Namespace.empty(), nextRequest(firstExactListResult)); + Assertions.assertThat(secondExactListResult.items()).isEmpty(); + Assertions.assertThat(secondExactListResult.encodedResponseToken()).isNull(); + + // List with huge page size: + Page bigListResult = catalog.listNamespaces(Namespace.empty(), PageToken.fromLimit(9999)); + Assertions.assertThat(bigListResult.items().size()).isEqualTo(5); + Assertions.assertThat(bigListResult.encodedResponseToken()).isNull(); + } finally { + for (int i = 0; i < 5; i++) { + catalog.dropNamespace(Namespace.of("pagination_namespace_" + i)); + } + } + } } diff --git a/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java b/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java index e99513155a..8c00fa9fb7 100644 --- a/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java +++ b/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java @@ -453,14 +453,10 @@ public boolean dropTable(TableIdentifier tableIdentifier, boolean purge) { @Override public List listTables(Namespace namespace) { - return listTables(namespace, PageToken.readEverything()).items; + return listTables(namespace, PageToken.readEverything()).items(); } - public Page listTables(Namespace namespace, String pageToken, Integer pageSize) { - return listTables(namespace, buildPageToken(pageToken, pageSize)); - } - - private Page listTables(Namespace namespace, PageToken pageToken) { + public Page listTables(Namespace namespace, PageToken pageToken) { if (!namespaceExists(namespace)) { throw new NoSuchNamespaceException( "Cannot list tables for namespace. Namespace does not exist: '%s'", namespace); @@ -775,14 +771,10 @@ public List listNamespaces() { @Override public List listNamespaces(Namespace namespace) throws NoSuchNamespaceException { - return listNamespaces(namespace, PageToken.readEverything()).items; - } - - public Page listNamespaces(Namespace namespace, String pageToken, Integer pageSize) { - return listNamespaces(namespace, buildPageToken(pageToken, pageSize)); + return listNamespaces(namespace, PageToken.readEverything()).items(); } - private Page listNamespaces(Namespace namespace, PageToken pageToken) + public Page listNamespaces(Namespace namespace, PageToken pageToken) throws NoSuchNamespaceException { PolarisResolvedPathWrapper resolvedEntities = resolvedEntityView.getResolvedPath(namespace); if (resolvedEntities == null) { @@ -798,13 +790,12 @@ private Page listNamespaces(Namespace namespace, PageToken pageToken) PolarisEntityType.NAMESPACE, PolarisEntitySubType.NULL_SUBTYPE, pageToken); - List entities = - PolarisEntity.toNameAndIdList(listResult.getEntities()); - List namespaces = PolarisCatalogHelpers.nameAndIdToNamespaces(catalogPath, entities); return listResult - .getPageToken() - .map(token -> new Page<>(token, namespaces)) - .orElseGet(() -> Page.fromItems(namespaces)); + .getPage() + .map( + record -> + PolarisCatalogHelpers.nameAndIdToNamespace( + catalogPath, new PolarisEntity.NameAndId(record.getName(), record.getId()))); } @Override @@ -816,14 +807,10 @@ public void close() throws IOException { @Override public List listViews(Namespace namespace) { - return listViews(namespace, PageToken.readEverything()).items; + return listViews(namespace, PageToken.readEverything()).items(); } - public Page listViews(Namespace namespace, String pageToken, Integer pageSize) { - return listViews(namespace, buildPageToken(pageToken, pageSize)); - } - - private Page listViews(Namespace namespace, PageToken pageToken) { + public Page listViews(Namespace namespace, PageToken pageToken) { if (!namespaceExists(namespace)) { throw new NoSuchNamespaceException( "Cannot list views for namespace. Namespace does not exist: '%s'", namespace); @@ -2468,15 +2455,9 @@ private Page listTableLike( PolarisEntityType.TABLE_LIKE, subType, pageToken); - List entities = - PolarisEntity.toNameAndIdList(listResult.getEntities()); - List identifiers = - PolarisCatalogHelpers.nameAndIdToTableIdentifiers(catalogPath, entities); - return listResult - .getPageToken() - .map(token -> new Page<>(token, identifiers)) - .orElseGet(() -> Page.fromItems(identifiers)); + Namespace ns = PolarisCatalogHelpers.parentNamespace(catalogPath); + return listResult.getPage().map(record -> TableIdentifier.of(ns, record.getName())); } /** @@ -2525,8 +2506,7 @@ private int getMaxMetadataRefreshRetries() { } /** Build a {@link PageToken} from a string and page size. */ - private PageToken buildPageToken(@Nullable String tokenString, @Nullable Integer pageSize) { - + private PageToken buildPageRequest(@Nullable String tokenString, @Nullable Integer pageSize) { boolean paginationEnabled = callContext .getPolarisCallContext() diff --git a/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java b/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java index c06e9d98d4..2e81738d0c 100644 --- a/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java +++ b/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java @@ -92,6 +92,7 @@ import org.apache.polaris.core.persistence.dao.entity.EntitiesResult; import org.apache.polaris.core.persistence.dao.entity.EntityWithPath; import org.apache.polaris.core.persistence.pagination.Page; +import org.apache.polaris.core.persistence.pagination.PageToken; import org.apache.polaris.core.secrets.UserSecretsManager; import org.apache.polaris.core.storage.AccessConfig; import org.apache.polaris.core.storage.PolarisStorageActions; @@ -184,10 +185,11 @@ public ListNamespacesResponse listNamespaces( authorizeBasicNamespaceOperationOrThrow(op, parent); if (baseCatalog instanceof IcebergCatalog polarisCatalog) { - Page results = polarisCatalog.listNamespaces(parent, pageToken, pageSize); + PageToken pageRequest = PageToken.build(pageToken, pageSize); + Page results = polarisCatalog.listNamespaces(parent, pageRequest); return ListNamespacesResponse.builder() - .addAll(results.items) - .nextPageToken(results.pageToken.toTokenString()) + .addAll(results.items()) + .nextPageToken(results.encodedResponseToken()) .build(); } else { return catalogHandlerUtils.listNamespaces( @@ -345,10 +347,11 @@ public ListTablesResponse listTables(Namespace namespace, String pageToken, Inte authorizeBasicNamespaceOperationOrThrow(op, namespace); if (baseCatalog instanceof IcebergCatalog polarisCatalog) { - Page results = polarisCatalog.listTables(namespace, pageToken, pageSize); + PageToken pageRequest = PageToken.build(pageToken, pageSize); + Page results = polarisCatalog.listTables(namespace, pageRequest); return ListTablesResponse.builder() - .addAll(results.items) - .nextPageToken(results.pageToken.toTokenString()) + .addAll(results.items()) + .nextPageToken(results.encodedResponseToken()) .build(); } else { return catalogHandlerUtils.listTables( @@ -1011,10 +1014,11 @@ public ListTablesResponse listViews(Namespace namespace, String pageToken, Integ authorizeBasicNamespaceOperationOrThrow(op, namespace); if (baseCatalog instanceof IcebergCatalog polarisCatalog) { - Page results = polarisCatalog.listViews(namespace, pageToken, pageSize); + PageToken pageRequest = PageToken.build(pageToken, pageSize); + Page results = polarisCatalog.listViews(namespace, pageRequest); return ListTablesResponse.builder() - .addAll(results.items) - .nextPageToken(results.pageToken.toTokenString()) + .addAll(results.items()) + .nextPageToken(results.encodedResponseToken()) .build(); } else if (baseCatalog instanceof ViewCatalog viewCatalog) { return catalogHandlerUtils.listViews( diff --git a/service/common/src/test/java/org/apache/polaris/service/persistence/pagination/EntityIdPagingTest.java b/service/common/src/test/java/org/apache/polaris/service/persistence/pagination/EntityIdPagingTest.java new file mode 100644 index 0000000000..1ee927f779 --- /dev/null +++ b/service/common/src/test/java/org/apache/polaris/service/persistence/pagination/EntityIdPagingTest.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.service.persistence.pagination; + +import static org.apache.polaris.core.persistence.pagination.EntityIdPaging.*; +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.doReturn; + +import java.util.function.Function; +import java.util.stream.Stream; +import org.apache.polaris.core.entity.PolarisBaseEntity; +import org.apache.polaris.core.persistence.pagination.EntityIdPaging; +import org.apache.polaris.core.persistence.pagination.Page; +import org.apache.polaris.core.persistence.pagination.PageToken; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; + +public class EntityIdPagingTest { + + @Test + void testId() { + PageToken request = PageToken.fromLimit(2); + PolarisBaseEntity e1 = Mockito.mock(PolarisBaseEntity.class); + doReturn(7L).when(e1).getId(); + PolarisBaseEntity e2 = Mockito.mock(PolarisBaseEntity.class); + doReturn(11L).when(e2).getId(); + Page page = + Page.mapped( + request, Stream.of(e1, e2), Function.identity(), EntityIdPaging::encodedDataReference); + assertThat(entityIdBoundary(PageToken.build(page.encodedResponseToken(), null))).isEqualTo(11L); + } +} diff --git a/service/common/src/test/java/org/apache/polaris/service/persistence/pagination/PageTokenTest.java b/service/common/src/test/java/org/apache/polaris/service/persistence/pagination/PageTokenTest.java deleted file mode 100644 index 97e52fb842..0000000000 --- a/service/common/src/test/java/org/apache/polaris/service/persistence/pagination/PageTokenTest.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.polaris.service.persistence.pagination; - -import org.apache.polaris.core.persistence.pagination.DonePageToken; -import org.apache.polaris.core.persistence.pagination.HasPageSize; -import org.apache.polaris.core.persistence.pagination.PageToken; -import org.assertj.core.api.Assertions; -import org.junit.jupiter.api.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class PageTokenTest { - private static final Logger LOGGER = LoggerFactory.getLogger(PageTokenTest.class); - - @Test - void testDoneToken() { - Assertions.assertThat(new DonePageToken()).doesNotReturn(null, PageToken::toString); - Assertions.assertThat(new DonePageToken()).returns(null, PageToken::toTokenString); - Assertions.assertThat(new DonePageToken()).isEqualTo(new DonePageToken()); - Assertions.assertThat(new DonePageToken().hashCode()).isEqualTo(new DonePageToken().hashCode()); - } - - @Test - void testReadEverythingPageToken() { - PageToken token = PageToken.readEverything(); - - Assertions.assertThat(token.toString()).isNotNull(); - Assertions.assertThat(token.toTokenString()).isNotNull(); - Assertions.assertThat(token).isNotInstanceOf(HasPageSize.class); - - Assertions.assertThat(PageToken.readEverything()).isEqualTo(PageToken.readEverything()); - } -}