Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
4f775a6
add missing apis
eric-maynard Mar 20, 2025
e2fcd5e
more tests, fixes
eric-maynard Mar 21, 2025
467ebcb
clean up drop
eric-maynard Mar 21, 2025
5f5ca42
autolint
eric-maynard Mar 21, 2025
c1b2b73
Merge branch 'main' of github.com:apache/polaris into implement-Polar…
eric-maynard Mar 21, 2025
34c8e96
changes per review
eric-maynard Mar 24, 2025
dab7279
revert iceberg messages to comply with oss tests
eric-maynard Mar 24, 2025
54a47a5
another revert
eric-maynard Mar 24, 2025
1ffbf07
more iceberg catalog changes
eric-maynard Mar 24, 2025
113c941
autolint
eric-maynard Mar 24, 2025
b672986
dependency issues
eric-maynard Mar 24, 2025
4a9ed69
more wiring
eric-maynard Mar 24, 2025
6d7c59b
continuing rebase
eric-maynard Mar 24, 2025
5c969d7
remaining issues are related to task loading
eric-maynard Mar 24, 2025
e67b0e9
re-add tests
eric-maynard Mar 24, 2025
663dafb
debugging
eric-maynard Mar 27, 2025
90b4c73
resolve conflicts
eric-maynard Mar 27, 2025
133b658
fix failing tests
eric-maynard Mar 27, 2025
98661be
fix another test
eric-maynard Mar 27, 2025
0798b63
changes per review
eric-maynard Apr 1, 2025
241e7ed
autolint
eric-maynard Apr 1, 2025
0ad4ced
some fixes
eric-maynard Apr 1, 2025
fcaa2b7
rebase
eric-maynard Apr 3, 2025
dc86441
stable
eric-maynard Apr 3, 2025
4b58278
rebase
eric-maynard Apr 16, 2025
6050494
pull main
eric-maynard Apr 28, 2025
0fcec2f
updates for new persistence
eric-maynard Apr 28, 2025
6b9031c
fix
eric-maynard Apr 28, 2025
01a8468
merge conflicts
eric-maynard Apr 29, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,14 @@
import org.apache.polaris.core.persistence.BaseMetaStoreManager;
import org.apache.polaris.core.persistence.PrincipalSecretsGenerator;
import org.apache.polaris.core.persistence.RetryOnConcurrencyException;
import org.apache.polaris.core.persistence.pagination.PageToken;
import org.apache.polaris.core.persistence.pagination.PolarisPage;
import org.apache.polaris.core.persistence.transactional.AbstractTransactionalPersistence;
import org.apache.polaris.core.policy.PolarisPolicyMappingRecord;
import org.apache.polaris.core.storage.PolarisStorageConfigurationInfo;
import org.apache.polaris.core.storage.PolarisStorageIntegration;
import org.apache.polaris.core.storage.PolarisStorageIntegrationProvider;
import org.apache.polaris.jpa.models.EntityIdPageToken;
import org.apache.polaris.jpa.models.ModelEntity;
import org.apache.polaris.jpa.models.ModelEntityActive;
import org.apache.polaris.jpa.models.ModelEntityChangeTracking;
Expand Down Expand Up @@ -419,29 +422,30 @@ public List<EntityNameLookupRecord> lookupEntityActiveBatchInCurrentTxn(

/** {@inheritDoc} */
@Override
public @Nonnull List<EntityNameLookupRecord> listEntitiesInCurrentTxn(
public @Nonnull PolarisPage<EntityNameLookupRecord> listEntitiesInCurrentTxn(
@Nonnull PolarisCallContext callCtx,
long catalogId,
long parentId,
@Nonnull PolarisEntityType entityType) {
@Nonnull PolarisEntityType entityType,
@Nonnull PageToken pageToken) {
return this.listEntitiesInCurrentTxn(
callCtx, catalogId, parentId, entityType, Predicates.alwaysTrue());
callCtx, catalogId, parentId, entityType, Predicates.alwaysTrue(), pageToken);
}

@Override
public @Nonnull List<EntityNameLookupRecord> listEntitiesInCurrentTxn(
public @Nonnull PolarisPage<EntityNameLookupRecord> listEntitiesInCurrentTxn(
@Nonnull PolarisCallContext callCtx,
long catalogId,
long parentId,
@Nonnull PolarisEntityType entityType,
@Nonnull Predicate<PolarisBaseEntity> entityFilter) {
@Nonnull Predicate<PolarisBaseEntity> entityFilter,
@Nonnull PageToken pageToken) {
// full range scan under the parent for that type
return this.listEntitiesInCurrentTxn(
callCtx,
catalogId,
parentId,
entityType,
Integer.MAX_VALUE,
entityFilter,
entity ->
new EntityNameLookupRecord(
Expand All @@ -450,27 +454,57 @@ public List<EntityNameLookupRecord> lookupEntityActiveBatchInCurrentTxn(
entity.getParentId(),
entity.getName(),
entity.getTypeCode(),
entity.getSubTypeCode()));
entity.getSubTypeCode()),
pageToken);
}

@Override
public @Nonnull <T> List<T> listEntitiesInCurrentTxn(
public @Nonnull <T> PolarisPage<T> listEntitiesInCurrentTxn(
@Nonnull PolarisCallContext callCtx,
long catalogId,
long parentId,
@Nonnull PolarisEntityType entityType,
int limit,
@Nonnull Predicate<PolarisBaseEntity> entityFilter,
@Nonnull Function<PolarisBaseEntity, T> transformer) {
// full range scan under the parent for that type
return this.store
.lookupFullEntitiesActive(localSession.get(), catalogId, parentId, entityType)
.stream()
.map(ModelEntity::toEntity)
.filter(entityFilter)
.limit(limit)
.map(transformer)
.collect(Collectors.toList());
@Nonnull Function<PolarisBaseEntity, T> transformer,
@Nonnull PageToken pageToken) {
List<T> data;
if (entityFilter.equals(Predicates.alwaysTrue())) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does Predicate.alwaysTrue return something that can be compared and will that hold true for future?
Predicates are rather functions/lambdas - those are not meant to be compared with anything.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's right. This is only meant to capture the predicate that's provided in the method overload that doesn't have a predicate, e.g. here. It is not meant to capture any predicate that's semantically "always true".

// In this case, we can push the filter down into the query
data =
this.store
.lookupFullEntitiesActive(
localSession.get(), catalogId, parentId, entityType, pageToken)
.stream()
.map(ModelEntity::toEntity)
.filter(entityFilter)
.map(transformer)
.collect(Collectors.toList());
} else {
// In this case, we cannot push the filter down into the query. We must therefore remove
// the page size limit from the PageToken and filter on the client side.
// TODO Implement a generic predicate that can be pushed down into different metastores
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

PageToken unlimitedPageSizeToken = pageToken.withPageSize(Integer.MAX_VALUE);
List<ModelEntity> rawData =
this.store.lookupFullEntitiesActive(
localSession.get(), catalogId, parentId, entityType, unlimitedPageSizeToken);
Comment on lines +487 to +489
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[doubt] Let say we want to 10 records and table had 10k records and we have filter currently since this is a client side filter. And client requested page size of 2 we will get 5 time 10 K records.

I was seeing similar issue had 2 options.

  1. Load all the entities in one shot
  2. keep triggering the queries in loop until we get the requested page results.

I am not sure which is lesser poison pill to swallow.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Luckily, we do not use this filter very often at all... but yes, this situation is less than ideal.

If the metastore itself provided a lazily-fetched Iterator here instead of a List (presumably with its own pagination/cache built into the client), we should definitely prefer to use that. But I think the right place for that logic is in the metastore itself and not at this layer, so I kept with the simplest approach. I think we need a solution for predicate push down in place eventually anyway.

There is actually a third option as well, which is that we just return more results than the client asked for. This is what we currently do actually, since we don't respect the page token.

Copy link
Contributor Author

@eric-maynard eric-maynard Mar 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking into this a little more, EclipseLink does support TypedQuery.getResultStream, so maybe it's as simple as just refactoring lookupFullEntitiesActive to return a Stream instead of a List. I'm not sure how well that actually performs though; we'd want to benchmark it.

edit: 😔

Copy link
Contributor

@singhpk234 singhpk234 Mar 31, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does listing requires now to make this transactionally consistent, consider for ex: we need to list records, and some record got added while we were listing ? whats the behaviour expected ?

for ex Snowflake id generator which NoSQL impl generates or the ID generator that i am writing just makes sure entity id is unique, it doesn't guarantee incremental, am I missing something ?

Copy link
Contributor Author

@eric-maynard eric-maynard Mar 31, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this needs to be transactionally consistent. You can miss an entity, but you will never get the same entity twice.

Entity ID is guaranteed to always be unique.

I suppose there is a case where you transactionally add some table and drop some other table that was already returned in an existing page -- the list results in aggregate will be inconsistent with that transaction. But I think this is outside the realm of what we need to support for the time being. I would probably not want to fail either the list operation or the table update operation to ensure consistency when clients who care about transactional consistency have the option to list without a page token.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I disagree that an inconsistency (duplicate and omitted) results is acceptable.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also have a strong concern that there's more data processed here than absolutely necessary.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are no duplicate or omitted results here if we define omitted as "not present in the results of a list when the entity was in fact present through the duration of the list". Results are only omitted when they are deleted in the middle of a list operation; that is, the entity is gone by the time we reach the page that should contain it.

if (pageToken.pageSize < Integer.MAX_VALUE && rawData.size() > pageToken.pageSize) {
LOGGER.info(
"A page token could not be respected due to a predicate. "
+ "{} records were read but the client was asked to return {}.",
rawData.size(),
pageToken.pageSize);
}

data =
rawData.stream()
.map(ModelEntity::toEntity)
.filter(entityFilter)
.limit(pageToken.pageSize)
.map(transformer)
.collect(Collectors.toList());
}

return pageToken.buildNextPage(data);
}

/** {@inheritDoc} */
Expand Down Expand Up @@ -762,4 +796,9 @@ public void rollback() {
session.getTransaction().rollback();
}
}

@Override
public @Nonnull PageToken.PageTokenBuilder<?> pageTokenBuilder() {
return EntityIdPageToken.builder();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,10 @@
import org.apache.polaris.core.entity.PolarisEntityType;
import org.apache.polaris.core.entity.PolarisGrantRecord;
import org.apache.polaris.core.entity.PolarisPrincipalSecrets;
import org.apache.polaris.core.persistence.pagination.PageToken;
import org.apache.polaris.core.persistence.pagination.ReadEverythingPageToken;
import org.apache.polaris.core.policy.PolarisPolicyMappingRecord;
import org.apache.polaris.jpa.models.EntityIdPageToken;
import org.apache.polaris.jpa.models.ModelEntity;
import org.apache.polaris.jpa.models.ModelEntityActive;
import org.apache.polaris.jpa.models.ModelEntityChangeTracking;
Expand Down Expand Up @@ -282,21 +285,40 @@ long countActiveChildEntities(
}

List<ModelEntity> lookupFullEntitiesActive(
EntityManager session, long catalogId, long parentId, @Nonnull PolarisEntityType entityType) {
EntityManager session,
long catalogId,
long parentId,
@Nonnull PolarisEntityType entityType,
@Nonnull PageToken pageToken) {
diagnosticServices.check(session != null, "session_is_null");
diagnosticServices.check(
(pageToken instanceof EntityIdPageToken || pageToken instanceof ReadEverythingPageToken),
"unexpected_page_token");
checkInitialized();

// Currently check against ENTITIES not joining with ENTITIES_ACTIVE
String hql =
"SELECT m from ModelEntity m where m.catalogId=:catalogId and m.parentId=:parentId and m.typeCode=:typeCode";
"SELECT m from ModelEntity m "
+ "where m.catalogId=:catalogId and m.parentId=:parentId and m.typeCode=:typeCode and m.id > :tokenId";

if (pageToken instanceof EntityIdPageToken) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if it is not an EntityIdPageToken?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then we should have failed above

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not quite... The check is an OR 🤔

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess what I meant to say is that having logic conditional on the runtime type of the PageToken looks risky to me... and hard to evolve/maintain.

I'm totally fine with each meta store impl. having its own PageToken implementation, but I do not think token polymorphism makes sense at this level.

Copy link
Contributor Author

@eric-maynard eric-maynard Apr 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I think I understand your comment better now. You are pointing out that depending on the page token type, the results may or may not get sorted by entity ID? That's true.

Not sorting is more of an optimization for the case where we have a ReadEverythingPageToken (i.e. no page token). We could remove this optimization and always sort, but I added it to improve performance as in the majority of cases we have no page token.

In fact, I am trying not to couple page token implementations with metastore implementations here. In theory, the treemap metastore could also work with EntityIdPageToken.

I agree that the runtime check is a little awkward -- although we have such runtime checks all over the place currently. Is there a way you think we can achieve this same behavior without one?

I'm really hesitant to just always sort here. On the other hand, I think adding methods to PageToken that are really only relevant to one implementation (e.g. requiresSortingByEntityId) does not make a ton of sense from the interface perspective. Further, I think flipping the dependency between the metastore and page token types (e.g. PageToken::rewriteEclipseLinkQueryAsNeeded) is not a good pattern either.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thx for that extra insight. I need some time for a deeper review 😅 ⏳

hql += " order by m.id asc";
}

TypedQuery<ModelEntity> query =
session
.createQuery(hql, ModelEntity.class)
.setParameter("catalogId", catalogId)
.setParameter("parentId", parentId)
.setParameter("typeCode", entityType.getCode());

.setParameter("typeCode", entityType.getCode())
.setParameter("tokenId", -1L);

if (pageToken instanceof EntityIdPageToken) {
query =
query
.setParameter("tokenId", ((EntityIdPageToken) pageToken).id)
.setMaxResults(pageToken.pageSize);
}
return query.getResultList();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.polaris.jpa.models;

import java.util.List;
import org.apache.polaris.core.entity.PolarisBaseEntity;
import org.apache.polaris.core.persistence.pagination.PageToken;

/**
* A {@link PageToken} implementation that tracks the greatest ID from either {@link
* PolarisBaseEntity} or {@link ModelEntity} objects supplied in updates. Entities are meant to be
* filtered during listing such that only entities with and ID greater than the ID of the token are
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we assume that entities are loaded in the order of their IDs?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Metastores that use this PageToken implementation must enforce this

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we want to add some new method to the metastore to canonicalize this check I think we can. The current check is similar to some other checks we have throughout the codebase that enforce a certain entity type is passed in.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops, I missed that this page token was specific to JPA 🤦

Copy link
Contributor Author

@eric-maynard eric-maynard Apr 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it's a little brittle though. Related to your comment above about the varying semantics of pageSize it could make sense to add a method to the metastore interface like supportedPageTokenTypes or something.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Noting: the order of values returned by database sequences is not necessarily strictly increasing. Aka: it is wrong to assume that each retrieved values from a database sequence is higher than any value retrieved before.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not an assumption; it's something the metastore has to enforce in order for this page token to be a valid implementation to use.

* returned.
*/
public class EntityIdPageToken extends PageToken {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The JPA-model module should only be used by EclipseLink. We need to either found a common place to put this class, or duplicate it in different persistence implmentations.

public long id;

private EntityIdPageToken(long id, int pageSize) {
this.id = id;
this.pageSize = pageSize;
validate();
}

/** The minimum ID that could be attached to an entity */
private static final long MINIMUM_ID = 0;

/** The entity ID to use to start with. */
private static final long BASE_ID = MINIMUM_ID - 1;

@Override
protected List<String> getComponents() {
return List.of(String.valueOf(id), String.valueOf(pageSize));
}

/** Get a new `EntityIdPageTokenBuilder` instance */
public static PageTokenBuilder<EntityIdPageToken> builder() {
return new EntityIdPageToken.EntityIdPageTokenBuilder();
}

@Override
protected PageTokenBuilder<?> getBuilder() {
return EntityIdPageToken.builder();
}

/** A {@link PageTokenBuilder} implementation for {@link EntityIdPageToken} */
public static class EntityIdPageTokenBuilder extends PageTokenBuilder<EntityIdPageToken> {

@Override
public String tokenPrefix() {
return "polaris-entity-id";
}

@Override
public int expectedComponents() {
// id, pageSize
return 2;
}

@Override
protected EntityIdPageToken fromStringComponents(List<String> components) {
return new EntityIdPageToken(
Integer.parseInt(components.get(0)), Integer.parseInt(components.get(1)));
}

@Override
protected EntityIdPageToken fromLimitImpl(int limit) {
return new EntityIdPageToken(BASE_ID, limit);
}
}

@Override
public PageToken updated(List<?> newData) {
if (newData == null || newData.size() < this.pageSize) {
return DONE;
} else {
var head = newData.getFirst();
if (head instanceof ModelEntity) {
return new EntityIdPageToken(((ModelEntity) newData.getLast()).getId(), this.pageSize);
} else if (head instanceof PolarisBaseEntity) {
// Assumed to be sorted with the greatest entity ID last
return new EntityIdPageToken(
((PolarisBaseEntity) newData.getLast()).getId(), this.pageSize);
} else {
throw new IllegalArgumentException(
"Cannot build a page token from: " + newData.getFirst().getClass().getSimpleName());
}
}
}

@Override
public PageToken withPageSize(Integer pageSize) {
if (pageSize == null) {
return new EntityIdPageToken(BASE_ID, this.pageSize);
} else {
return new EntityIdPageToken(this.id, pageSize);
}
}
}
1 change: 1 addition & 0 deletions extension/persistence/relational-jdbc/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ plugins {

dependencies {
implementation(project(":polaris-core"))
implementation(project(":polaris-jpa-model"))
implementation(libs.slf4j.api)
implementation(libs.guava)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.util.function.Function;
import java.util.function.Predicate;
import javax.sql.DataSource;
import org.apache.polaris.core.persistence.pagination.PageToken;
import org.apache.polaris.extension.persistence.relational.jdbc.models.Converter;

public class DatasourceOperations {
Expand Down Expand Up @@ -95,8 +96,8 @@ public void executeScript(String scriptFilePath) throws SQLException {
* @param query : Query to executed
* @param entityClass : Class of the entity being selected
* @param transformer : Transformation of entity class to Result class
* @param entityFilter : Filter to applied on the Result class
* @param limit : Limit to to enforced.
* @param entityFilter : Client-side filter to applied on the Result class
* @param pageToken : Page token to be enforced.
* @return List of Result class objects
* @param <T> : Entity class
* @param <R> : Result class
Expand All @@ -107,13 +108,13 @@ public <T, R> List<R> executeSelect(
@Nonnull Class<T> entityClass,
@Nonnull Function<T, R> transformer,
Predicate<R> entityFilter,
int limit)
PageToken pageToken)
throws SQLException {
try (Connection connection = borrowConnection();
Statement statement = connection.createStatement();
ResultSet resultSet = statement.executeQuery(query)) {
List<R> resultList = new ArrayList<>();
while (resultSet.next() && resultList.size() < limit) {
List<R> resultList = new ArrayList<>(PageToken.DEFAULT_PAGE_SIZE);
while (resultSet.next() && resultList.size() < pageToken.pageSize) {
Converter<T> object =
(Converter<T>)
entityClass.getDeclaredConstructor().newInstance(); // Create a new instance
Expand Down
Loading