Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ the authentication parameters are picked from the environment or configuration f
- The `DEFAULT_LOCATION_OBJECT_STORAGE_PREFIX_ENABLED` feature was added to support placing tables
at locations that better optimize for object storage.

- The `LIST_PAGINATION_ENABLED` (default: false) feature flag can be used to enable pagination
in the Iceberg REST Catalog API.

- The Helm chart now supports Pod Disruption Budgets (PDBs) for Polaris components. This allows users to define
the minimum number of pods that must be available during voluntary disruptions, such as node maintenance.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,20 @@ public List<TableIdentifier> listViews(String catalog, Namespace namespace) {
}
}

public ListTablesResponse listViews(
String catalog, Namespace namespace, String pageToken, String pageSize) {
String ns = RESTUtil.encodeNamespace(namespace);
Map<String, String> queryParams = new HashMap<>();
queryParams.put("pageToken", pageToken);
queryParams.put("pageSize", pageSize);
try (Response res =
request("v1/{cat}/namespaces/" + ns + "/views", Map.of("cat", catalog), queryParams)
.get()) {
assertThat(res.getStatus()).isEqualTo(Response.Status.OK.getStatusCode());
return res.readEntity(ListTablesResponse.class);
}
}

public void dropView(String catalog, TableIdentifier id) {
String ns = RESTUtil.encodeNamespace(id.namespace());
try (Response res =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2160,4 +2160,52 @@ public void testPaginatedListTables() {
restCatalog.dropNamespace(namespace);
}
}

@Test
public void testNonPaginatedListTablesViewNamespaces() {
Catalog catalog = managementApi.getCatalog(currentCatalogName);
Map<String, String> catalogProps = new HashMap<>(catalog.getProperties().toMap());
catalogProps.put(FeatureConfiguration.LIST_PAGINATION_ENABLED.catalogConfig(), "false");
managementApi.updateCatalog(catalog, catalogProps);

String prefix = "testNonPaginatedListTablesViewNamespaces";
Namespace namespace = Namespace.of(prefix);
restCatalog.createNamespace(namespace);
for (int i = 0; i < 5; i++) {
restCatalog.createNamespace(Namespace.of(prefix, "nested-ns" + i));
restCatalog.createTable(TableIdentifier.of(namespace, "table" + i), SCHEMA);
restCatalog
.buildView(TableIdentifier.of(namespace, "view" + i))
.withSchema(SCHEMA)
.withDefaultNamespace(namespace)
.withQuery("spark", VIEW_QUERY)
.create();
}

assertThat(catalogApi.listTables(currentCatalogName, namespace)).hasSize(5);
// Note: no pagination per feature config
ListTablesResponse response = catalogApi.listTables(currentCatalogName, namespace, null, "2");
assertThat(response.identifiers()).hasSize(5);
assertThat(response.nextPageToken()).isNull();
response = catalogApi.listTables(currentCatalogName, namespace, "fake-token", null);
assertThat(response.identifiers()).hasSize(5);
assertThat(response.nextPageToken()).isNull();

assertThat(catalogApi.listViews(currentCatalogName, namespace)).hasSize(5);
response = catalogApi.listViews(currentCatalogName, namespace, null, "2");
assertThat(response.identifiers()).hasSize(5);
assertThat(response.nextPageToken()).isNull();
response = catalogApi.listViews(currentCatalogName, namespace, "fake-token", null);
assertThat(response.identifiers()).hasSize(5);
assertThat(response.nextPageToken()).isNull();

assertThat(catalogApi.listNamespaces(currentCatalogName, namespace)).hasSize(5);
ListNamespacesResponse nsResponse =
catalogApi.listNamespaces(currentCatalogName, namespace, null, "2");
assertThat(nsResponse.namespaces()).hasSize(5);
assertThat(nsResponse.nextPageToken()).isNull();
nsResponse = catalogApi.listNamespaces(currentCatalogName, namespace, "fake-token", null);
assertThat(nsResponse.namespaces()).hasSize(5);
assertThat(nsResponse.nextPageToken()).isNull();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import jakarta.annotation.Nullable;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.function.BooleanSupplier;
import org.apache.polaris.immutables.PolarisImmutable;

/** A wrapper for pagination information passed in as part of a request. */
Expand Down Expand Up @@ -86,7 +87,10 @@ static PageToken fromLimit(int limit) {
* @see Page#encodedResponseToken()
*/
static PageToken build(
@Nullable String serializedPageToken, @Nullable Integer requestedPageSize) {
return PageTokenUtil.decodePageRequest(serializedPageToken, requestedPageSize);
@Nullable String serializedPageToken,
@Nullable Integer requestedPageSize,
BooleanSupplier shouldDecodeToken) {
return PageTokenUtil.decodePageRequest(
serializedPageToken, requestedPageSize, shouldDecodeToken);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import java.util.Optional;
import java.util.OptionalInt;
import java.util.ServiceLoader;
import java.util.function.BooleanSupplier;

final class PageTokenUtil {

Expand Down Expand Up @@ -118,8 +119,10 @@ private PageTokenUtil() {}
* token.
*/
static PageToken decodePageRequest(
@Nullable String requestedPageToken, @Nullable Integer requestedPageSize) {
if (requestedPageToken != null) {
@Nullable String requestedPageToken,
@Nullable Integer requestedPageSize,
BooleanSupplier shouldDecodeToken) {
if (requestedPageToken != null && shouldDecodeToken.getAsBoolean()) {
var bytes = Base64.getUrlDecoder().decode(requestedPageToken);
try {
var pageToken = SMILE_MAPPER.readValue(bytes, PageToken.class);
Expand All @@ -134,7 +137,7 @@ static PageToken decodePageRequest(
} catch (IOException e) {
throw new RuntimeException(e);
}
} else if (requestedPageSize != null) {
} else if (requestedPageSize != null && shouldDecodeToken.getAsBoolean()) {
int pageSizeInt = requestedPageSize;
checkArgument(pageSizeInt >= 0, "Invalid page size");
return fromLimit(pageSizeInt);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public void testReadEverything() {
soft.assertThat(pageEverything.encodedResponseToken()).isNull();
soft.assertThat(pageEverything.items()).containsExactly(1, 2, 3, 4);

r = PageToken.build(null, null);
r = PageToken.build(null, null, () -> true);
soft.assertThat(r.paginationRequested()).isFalse();
soft.assertThat(r.pageSize()).isEmpty();
soft.assertThat(r.value()).isEmpty();
Expand All @@ -62,7 +62,7 @@ public void testReadEverything() {
@Test
public void testLimit() {
PageToken r = PageToken.fromLimit(123);
soft.assertThat(r).isEqualTo(PageToken.build(null, 123));
soft.assertThat(r).isEqualTo(PageToken.build(null, 123, () -> true));
soft.assertThat(r.paginationRequested()).isTrue();
soft.assertThat(r.pageSize()).isEqualTo(OptionalInt.of(123));
soft.assertThat(r.value()).isEmpty();
Expand All @@ -71,7 +71,7 @@ public void testLimit() {
@Test
public void testTokenValueForPaging() {
PageToken r = PageToken.fromLimit(2);
soft.assertThat(r).isEqualTo(PageToken.build(null, 2));
soft.assertThat(r).isEqualTo(PageToken.build(null, 2, () -> true));
Page<Integer> pageMoreData =
Page.mapped(
r,
Expand Down Expand Up @@ -103,7 +103,7 @@ public void testTokenValueForPaging() {
soft.assertThat(lastPageNotSaturated.items()).containsExactly(3);

r = PageToken.fromLimit(200);
soft.assertThat(r).isEqualTo(PageToken.build(null, 200));
soft.assertThat(r).isEqualTo(PageToken.build(null, 200, () -> true));
Page<Integer> page200 =
Page.mapped(
r,
Expand All @@ -117,8 +117,10 @@ public void testTokenValueForPaging() {
@ParameterizedTest
@MethodSource
public void testDeSer(Integer pageSize, String serializedPageToken, PageToken expectedPageToken) {
soft.assertThat(PageTokenUtil.decodePageRequest(serializedPageToken, pageSize))
soft.assertThat(PageTokenUtil.decodePageRequest(serializedPageToken, pageSize, () -> true))
.isEqualTo(expectedPageToken);
soft.assertThat(PageTokenUtil.decodePageRequest(serializedPageToken, pageSize, () -> false))
.isEqualTo(PageToken.readEverything());
}

static Stream<Arguments> testDeSer() {
Expand All @@ -142,16 +144,16 @@ static Stream<Arguments> testDeSer() {
@ParameterizedTest
@MethodSource
public void testApiRoundTrip(Token token) {
PageToken request = PageToken.build(null, 123);
PageToken request = PageToken.build(null, 123, () -> true);
Page<?> page = Page.mapped(request, Stream.of("i1"), Function.identity(), x -> token);
soft.assertThat(page.encodedResponseToken()).isNotBlank();

PageToken r = PageToken.build(page.encodedResponseToken(), null);
PageToken r = PageToken.build(page.encodedResponseToken(), null, () -> true);
soft.assertThat(r.value()).contains(token);
soft.assertThat(r.paginationRequested()).isTrue();
soft.assertThat(r.pageSize()).isEqualTo(OptionalInt.of(123));

r = PageToken.build(page.encodedResponseToken(), 456);
r = PageToken.build(page.encodedResponseToken(), 456, () -> true);
soft.assertThat(r.value()).contains(token);
soft.assertThat(r.paginationRequested()).isTrue();
soft.assertThat(r.pageSize()).isEqualTo(OptionalInt.of(456));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
*/
package org.apache.polaris.service.catalog.iceberg;

import static org.apache.polaris.core.config.FeatureConfiguration.LIST_PAGINATION_ENABLED;

import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import io.smallrye.common.annotation.Identifier;
Expand Down Expand Up @@ -187,13 +189,17 @@ public static boolean isCreate(UpdateTableRequest request) {
return isCreate;
}

private boolean shouldDecodeToken() {
return realmConfig.getConfig(LIST_PAGINATION_ENABLED, getResolvedCatalogEntity());
}

public ListNamespacesResponse listNamespaces(
Namespace parent, String pageToken, Integer pageSize) {
PolarisAuthorizableOperation op = PolarisAuthorizableOperation.LIST_NAMESPACES;
authorizeBasicNamespaceOperationOrThrow(op, parent);

if (baseCatalog instanceof IcebergCatalog polarisCatalog) {
PageToken pageRequest = PageToken.build(pageToken, pageSize);
PageToken pageRequest = PageToken.build(pageToken, pageSize, this::shouldDecodeToken);
Page<Namespace> results = polarisCatalog.listNamespaces(parent, pageRequest);
return ListNamespacesResponse.builder()
.addAll(results.items())
Expand Down Expand Up @@ -332,7 +338,7 @@ public ListTablesResponse listTables(Namespace namespace, String pageToken, Inte
authorizeBasicNamespaceOperationOrThrow(op, namespace);

if (baseCatalog instanceof IcebergCatalog polarisCatalog) {
PageToken pageRequest = PageToken.build(pageToken, pageSize);
PageToken pageRequest = PageToken.build(pageToken, pageSize, this::shouldDecodeToken);
Page<TableIdentifier> results = polarisCatalog.listTables(namespace, pageRequest);
return ListTablesResponse.builder()
.addAll(results.items())
Expand Down Expand Up @@ -935,7 +941,7 @@ public ListTablesResponse listViews(Namespace namespace, String pageToken, Integ
authorizeBasicNamespaceOperationOrThrow(op, namespace);

if (baseCatalog instanceof IcebergCatalog polarisCatalog) {
PageToken pageRequest = PageToken.build(pageToken, pageSize);
PageToken pageRequest = PageToken.build(pageToken, pageSize, this::shouldDecodeToken);
Page<TableIdentifier> results = polarisCatalog.listViews(namespace, pageRequest);
return ListTablesResponse.builder()
.addAll(results.items())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2240,7 +2240,7 @@ public void testEventsAreEmitted() {
}

private static PageToken nextRequest(Page<?> previousPage) {
return PageToken.build(previousPage.encodedResponseToken(), null);
return PageToken.build(previousPage.encodedResponseToken(), null, () -> true);
}

@Test
Expand Down