diff --git a/bom/build.gradle.kts b/bom/build.gradle.kts
index 3a0688880c..e0f1838584 100644
--- a/bom/build.gradle.kts
+++ b/bom/build.gradle.kts
@@ -48,6 +48,8 @@ dependencies {
api(project(":polaris-nodes-impl"))
api(project(":polaris-nodes-spi"))
+ api(project(":polaris-persistence-nosql-api"))
+
api(project(":polaris-config-docs-annotations"))
api(project(":polaris-config-docs-generator"))
diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml
index 1e419a0636..dea126c804 100644
--- a/gradle/libs.versions.toml
+++ b/gradle/libs.versions.toml
@@ -76,6 +76,7 @@ jakarta-validation-api = { module = "jakarta.validation:jakarta.validation-api",
jakarta-ws-rs-api = { module = "jakarta.ws.rs:jakarta.ws.rs-api", version = "4.0.0" }
javax-servlet-api = { module = "javax.servlet:javax.servlet-api", version = "4.0.1" }
junit-bom = { module = "org.junit:junit-bom", version = "5.14.1" }
+junit-pioneer = { module = "org.junit-pioneer:junit-pioneer", version = "2.3.0" }
keycloak-admin-client = { module = "org.keycloak:keycloak-admin-client", version = "26.0.7" }
jcstress-core = { module = "org.openjdk.jcstress:jcstress-core", version = "0.16" }
jmh-core = { module = "org.openjdk.jmh:jmh-core", version.ref = "jmh" }
diff --git a/gradle/projects.main.properties b/gradle/projects.main.properties
index 5869bc5bc7..51308902bb 100644
--- a/gradle/projects.main.properties
+++ b/gradle/projects.main.properties
@@ -64,4 +64,5 @@ polaris-nodes-api=persistence/nosql/nodes/api
polaris-nodes-impl=persistence/nosql/nodes/impl
polaris-nodes-spi=persistence/nosql/nodes/spi
# persistence / database agnostic
+polaris-persistence-nosql-api=persistence/nosql/persistence/api
polaris-persistence-nosql-varint=persistence/nosql/persistence/varint
diff --git a/persistence/nosql/persistence/README.md b/persistence/nosql/persistence/README.md
new file mode 100644
index 0000000000..f42a941ca0
--- /dev/null
+++ b/persistence/nosql/persistence/README.md
@@ -0,0 +1,252 @@
+
+
+# Database agnostic persistence framework
+
+The NoSQL persistence API and functional implementations are based on the assumption that all databases targeted as
+backing stores for Polaris support "compare and swap" operations on a single row.
+These CAS operations are the only requirement.
+
+Since some databases do enforce hard size limits, for example, DynamoDB has a hard 400kB row size limit.
+MariaDB/MySQL has a default 512kB packet size limit.
+Other databases have row-size recommendations around similar sizes.
+Polaris persistence respects those limits and recommendations using a common hard limit of 350kB.
+
+Objects exposed via the `Persistence` interface are typed Java objects that must be immutable and serializable using
+Jackson.
+Each type is described via an implementation of the `ObjType` interface using a name, which must be unique
+in Polaris, and a target Java type: the Jackson serializable Java type.
+Object types are registered using the Java service API using `ObjType`.
+The actual java target types must extend the `Obj` interface.
+The (logical) key for each `Obj` is a composite of the `ObjType.id()` and a `long` ID (64-bit signed int),
+combined using the `ObjId` composite type.
+
+The "primary key" of each object in a database is always _realmId + object-ID_, where realm-ID is a string and
+object-ID is a 64-bit integer.
+This allows, but does not enforce, storing multiple realms in one backend database.
+
+Data in/for/of each Polaris realm (think: _tenant_) is isolated using the realm's ID (string).
+The base `Persistence` API interface is always scoped to exactly one realm ID.
+
+## Supporting more databases
+
+The code to support a particular database is isolated in a project, for example `polaris-persistence-nosql-inmemory` and
+`polaris-persistence-nosql-mongodb`.
+
+When adding another database, it must also be wired up to Quarkus in `polaris-persistence-nosql-cdi-quarkus` preferably
+using Quarkus extensions, added to the `polaris-persitence-corectness` tests and available in
+`polaris-persistence-nosql-benchmark` for low-level benchmarks.
+
+## Named pointers
+
+Polaris represents a catalog for data lakehouses, which means that the information of and for catalog entities like
+Iceberg tables, views, and namespaces must be consistent, even if multiple catalog entities are changes in a single
+atomic operation.
+
+Polaris leverages a concept called "Named pointers."
+The state of the whole catalog is referenced via the so-called HEAD (think: Git HEAD),
+which _points to_ all catalog entities.
+This state is persisted as an `Obj` with an index of the catalog entities,
+the ID of that "current catalog state `Obj`" is maintained in one named pointer.
+
+Named pointers are also used for other purposes than catalog entities, for example, to maintain realms or
+configurations.
+
+## Committing changes
+
+Changes are persisted using a commit mechanism, providing atomic changes across multiple entities against one named
+pointer.
+The logic implementation ensures that even high-frequency concurrent changes do neither let clients fail
+nor cause timeouts.
+The behavior and achievable throughput depend on the database being used; some databases perform
+_much_ better than others.
+
+A use-case agnostic "committer" abstraction exists to ease implementing committing operations.
+For catalog operations, there is a more specialized abstraction.
+
+## `long` IDs
+
+Polaris NoSQL persistence uses so-called Snowflake IDs, which are 64-bit integers that represent a timestamp, a
+node-ID, and a sequence number.
+The epoch of these timestamps is 2025-03-01-00:00:00.0 GMT.
+Timestamps occupy 41 bits at millisecond precision, which lasts for about 69 years.
+Node-IDs are 10 bits, which allows 1024 concurrently active "JVMs running Polaris."
+Twelve (12) bits are used by the sequence number, which then allows each node to generate 4096 IDs per
+millisecond.
+One bit is reserved for future use.
+
+Node IDs are leased by every "JVM running Polaris" for a period of time.
+The ID generator implementation guarantees that no IDs will be generated for a timestamp that exceeds the "lease time."
+Leases can be extended.
+The implementation leverages atomic database operations (CAS) for the lease implementation.
+
+ID generators must not use timestamps before or after the lease period, nor must they re-use an older timestamp.
+This requirement is satisfied using a monotonic clock implementation.
+
+## Caching
+
+Since most `Obj`s are by default assumed to be immutable, caching is very straight forward and does not require any
+coordination, which simplifies the design and implementation quite a bit.
+
+## Strong vs. eventual consistency
+
+Polaris NoSQL persistence offers two ways to persist `Obj`s: strongly consistent and eventually consistent.
+The former is slower than the latter.
+
+Since Polaris NoSQL persistence respects the hard size limitations mentioned above, it cannot persist the serialized
+representation of objects that exceed those limits in a single database row.
+However, some objects legibly exceed those limits.
+Polaris NoSQL persistence allows such "big object serializations" and writes those into multiple database rows,
+with the restriction that this is only supported for eventually consistent write operations.
+The serialized representation for strong consistency writes must always be within the hard limit.
+
+## Indexes
+
+The state of a data-lakehouse catalog can contain many thousand, potentially a few 100,000, tables/views/namespaces.
+Even space-efficient serialization of an index for that many entries can exceed the "common hard 350kB limit."
+New changes end in the index, which is "embedded" in the "current catalog state `Obj`".
+If the respective index size limit of this "embedded" index is being approached,
+the index is spilled out to separate rows in the database.
+The implementation is built to split and combine when needed.
+
+## Change log / events / notifications
+
+The commit mechanism described above builds a commit log.
+All changes can be inspected via that log in exactly the order in which those happened (think: `git log`).
+Since the log of changes is already present, it is possible to retrieve the changes from some point in time or
+commit log ID.
+This allows clients to receive all changes that have happened since the last known commit ID,
+offering a mechanism to poll for changes.
+Since the necessary `Obj`s are immutable,
+such change-log-requests likely hit already cached data and rather not the database.
+
+## Clean up old commits / unused data
+
+Despite the beauty of having a "commit log" and all metadata representation in the backing database,
+the size of that database would always grow.
+
+Purging unused table/view metadata memoized in the database is one piece.
+Purging old commit log entries is the second part.
+Purging (then) unreferenced `Obj`s the third part.
+
+See [maintenance service](#maintenance-service) below.
+
+## Realms (aka tenants)
+
+Bootstrapping but more importantly, deleting/purging a realm is a non-trivial operation, which requires its own
+lifecycle.
+Bootstrapping is a straight forward operation as the necessary information can be validated and enhanced if necessary.
+
+Both the logical but also the physical process of realm deletion are more complex.
+From a logical point of view,
+users want to disable the realm for a while before they eventually are okay with deleting the information.
+
+The process to delete a realm's data from the database can be quite time-consuming, and how that happens is
+database-specific.
+While some databases can do bulk-deletions, which "just" take some time (RDBMS, BigTable), other databases
+require that the process of deleting a realm must happen during a full scan of the database (for example, RocksDB
+and Apache Cassandra).
+Since scanning the whole database itself can take quite long, and no more than one instance should scan the database
+at any time.
+
+The realm has a status to reflect its lifecycle.
+The initial status of a realm is `CREATED`, which effectively only means that the realm-ID has been reserved and that
+the necessary data needs to be populated (bootstrap).
+Once a realm has been fully bootstrapped, its status is changed to `ACTIVE`.
+Only `ACTIVE` realms can be used for user requests.
+
+Between `CREATED` and `ACTIVE`/`INACTIVE` there are two states that are mutually exclusive.
+The state `INITIALIZING` means that Polaris will initialize the realm as a fresh, new realm.
+The state `LOADING` means that realm data, which has been exported from another Polaris instance, is to be imported.
+
+Realm deletion is a multistep approach as well: Realms are first put into `INACTIVE` state, which can be reverted
+to `ACTIVE` state or into `PURGING` state.
+The state `PURGING` means that the realm's data is being deleted from the database,
+once purging has been started, the realm's information in the database is inconsistent and cannot be restored.
+Once the realm's data has been purged, the realm is put into `PURGED` state. Only realms that are in state `PURGED`
+can be deleted.
+
+The multi-state approach also prevents that a realm can only be used when the system knows that all necessary
+information is present.
+
+**Note**: the realm state machine is not fully implemented yet.
+
+## `::system::` realm
+
+Polaris NoSQL persistence uses a system realm which is used for node ID leases and realm management.
+The realm-IDs starting with two colons (`::`) are reserved for system use.
+
+### Named pointers in the `::system::` realm
+
+| Named pointer | Meaning |
+|---------------|-----------------|
+| `realms` | Realms, by name |
+
+## "User" realms
+
+### Named pointers in the user realms
+
+| Named pointer | Meaning |
+|-------------------|------------------------------|
+| `root` | Pointer to the "root" entity |
+| `catalogs` | Catalogs |
+| `principals` | Principals |
+| `principal-roles` | Principal roles |
+| `grants` | All grants |
+| `immediate-tasks` | Immediately scheduled tasks |
+| `policy-mappings` | Policy mappings |
+
+Per catalog named pointers, where `%d` refers to the catalog's integer ID:
+
+| Named pointer | Meaning |
+|---------------------|--------------------------------------------------|
+| `cat/%d/roles` | Catalog roles |
+| `cat/%d/heads/main` | Catalog content (namespaces, tables, views, etc) |
+| `cat/%d/grants` | Catalog related grants (*) |
+
+(*) = currently not used, stored in the realm grants.
+
+## Maintenance Service
+
+**Note**: maintenance service not yet in the code base.
+
+The maintenance service is a mechanism to scan the backend database and perform necessary maintenance operations
+as a background service.
+
+The most important maintenance operation is to purge unreferenced objects from the database.
+Pluggable "identifiers" are used to "mark" objects to retain.
+
+The implementation calls all per-realm "identifiers," which then "mark" the named pointers and objects that have to be
+retained.
+Plugins and/or extensions can provide per-object-type "identifiers," which get called for "identified" objects.
+The second phase of the maintenance service scans the whole backend database and purges those objects,
+which have not been "marked" to be retained.
+
+Maintenance service invocations require two sets of realm-ids: the set of realms to retain and the set of realms
+to purge.
+These sets can be derived using `RealmManagement.list()` and grouping realms by their status.
+
+### Purging realms
+
+Eventually, purging realm from the backend database can happen in two different ways, depending on the database.
+Some databases support deleting one or more realms using bulk deletions.
+Other databases do not support this kind of bulk deletion.
+Both ways are supported by the maintenance service.
+
+Eventually, purging realms is a responsibility of the [maintenance service](#maintenance-service).
diff --git a/persistence/nosql/persistence/api/build.gradle.kts b/persistence/nosql/persistence/api/build.gradle.kts
new file mode 100644
index 0000000000..ff0d441897
--- /dev/null
+++ b/persistence/nosql/persistence/api/build.gradle.kts
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+plugins {
+ id("org.kordamp.gradle.jandex")
+ id("polaris-server")
+}
+
+description = "Polaris NoSQL persistence API, no concrete implementations"
+
+dependencies {
+ api(project(":polaris-version"))
+ api(project(":polaris-misc-types"))
+
+ implementation(project(":polaris-idgen-api"))
+ implementation(project(":polaris-nodes-api"))
+ implementation(project(":polaris-persistence-nosql-varint"))
+
+ implementation(platform(libs.jackson.bom))
+ implementation("com.fasterxml.jackson.core:jackson-annotations")
+ implementation("com.fasterxml.jackson.core:jackson-core")
+ implementation("com.fasterxml.jackson.core:jackson-databind")
+ runtimeOnly("com.fasterxml.jackson.datatype:jackson-datatype-guava")
+ runtimeOnly("com.fasterxml.jackson.datatype:jackson-datatype-jdk8")
+ runtimeOnly("com.fasterxml.jackson.datatype:jackson-datatype-jsr310")
+
+ implementation(libs.guava)
+ implementation(libs.slf4j.api)
+
+ compileOnly(libs.smallrye.config.core)
+ compileOnly(platform(libs.quarkus.bom))
+ compileOnly("io.quarkus:quarkus-core")
+
+ compileOnly(project(":polaris-immutables"))
+ annotationProcessor(project(":polaris-immutables", configuration = "processor"))
+
+ compileOnly(libs.jakarta.annotation.api)
+ compileOnly(libs.jakarta.validation.api)
+ compileOnly(libs.jakarta.inject.api)
+ compileOnly(libs.jakarta.enterprise.cdi.api)
+
+ testImplementation(platform(libs.jackson.bom))
+ testImplementation("com.fasterxml.jackson.dataformat:jackson-dataformat-smile")
+
+ testImplementation(libs.junit.pioneer)
+
+ testFixturesImplementation(platform(libs.jackson.bom))
+ testFixturesImplementation("com.fasterxml.jackson.core:jackson-databind")
+
+ testFixturesCompileOnly(project(":polaris-immutables"))
+ testFixturesAnnotationProcessor(project(":polaris-immutables", configuration = "processor"))
+
+ testFixturesCompileOnly(libs.jakarta.annotation.api)
+ testFixturesCompileOnly(libs.jakarta.validation.api)
+}
diff --git a/persistence/nosql/persistence/api/src/main/java/org/apache/polaris/persistence/nosql/api/Persistence.java b/persistence/nosql/persistence/api/src/main/java/org/apache/polaris/persistence/nosql/api/Persistence.java
new file mode 100644
index 0000000000..6795036235
--- /dev/null
+++ b/persistence/nosql/persistence/api/src/main/java/org/apache/polaris/persistence/nosql/api/Persistence.java
@@ -0,0 +1,437 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.persistence.nosql.api;
+
+import static com.google.common.base.Preconditions.checkState;
+
+import jakarta.annotation.Nonnull;
+import jakarta.annotation.Nullable;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Arrays;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.stream.Stream;
+import org.apache.polaris.ids.api.IdGenerator;
+import org.apache.polaris.ids.api.MonotonicClock;
+import org.apache.polaris.persistence.nosql.api.backend.Backend;
+import org.apache.polaris.persistence.nosql.api.commit.Commits;
+import org.apache.polaris.persistence.nosql.api.commit.Committer;
+import org.apache.polaris.persistence.nosql.api.exceptions.ReferenceAlreadyExistsException;
+import org.apache.polaris.persistence.nosql.api.exceptions.ReferenceNotFoundException;
+import org.apache.polaris.persistence.nosql.api.index.Index;
+import org.apache.polaris.persistence.nosql.api.index.IndexContainer;
+import org.apache.polaris.persistence.nosql.api.index.IndexValueSerializer;
+import org.apache.polaris.persistence.nosql.api.index.UpdatableIndex;
+import org.apache.polaris.persistence.nosql.api.obj.BaseCommitObj;
+import org.apache.polaris.persistence.nosql.api.obj.Obj;
+import org.apache.polaris.persistence.nosql.api.obj.ObjRef;
+import org.apache.polaris.persistence.nosql.api.obj.ObjType;
+import org.apache.polaris.persistence.nosql.api.ref.Reference;
+
+/**
+ * Polaris NoSQL persistence interface providing fundamental primitive operations to manage
+ * named-references including atomic updates and to read and write {@code Obj}s. Batch operations
+ * are provided where applicable.
+ *
+ *
{@code Obj}s are usually only written but never updated. This enables efficient caching of
+ * persisted data. In certain, exceptional use cases, which should always almost be avoided, CAS
+ * primitives allow conditional creates/updates/deletes. {@code ObjType} implementations can provide
+ * custom positive and negative caching rules.
+ *
+ *
Databases often have hard limits or at least more-or-less strong recommendations on the size
+ * of serialized {@link Obj}s. The "main" implementation of this interface in {@code
+ * :polaris-persistence-nosql-impl} takes care of transparently splitting and re-assembling {@link
+ * Obj}s across multiple database rows. The latter is not supported for conditionally updated {@link
+ * Obj}s.
+ *
+ *
This interface is a Polaris-internal low-level API interface for NoSQL. Instances of this
+ * interface are scoped to a specific realm.
+ *
+ *
The behavior when fetching a non-existing reference is to throw, which is different from
+ * fetching non-existing {@link Obj}s, because references are supposed to exist and a non-existence
+ * is usually a sign of a missing initialization step, whereas a missing {@link Obj} is often
+ * expected.
+ *
+ *
Database-specific implementations do implement the {@link Backend} interface, not this one.
+ */
+public interface Persistence {
+ /**
+ * Creates the reference with the given name and {@linkplain Reference#pointer() pointer} value.
+ *
+ *
Reference creation is always a strongly consistent operation.
+ *
+ * @throws ReferenceAlreadyExistsException if a reference with the same name already exists
+ */
+ @Nonnull
+ Reference createReference(@Nonnull String name, @Nonnull Optional pointer)
+ throws ReferenceAlreadyExistsException;
+
+ /**
+ * Convenience function to create a reference with an empty {@linkplain Reference#pointer()
+ * pointer}, if it does not already exist.
+ *
+ * @see #createReferencesSilent(Set)
+ */
+ default void createReferenceSilent(@Nonnull String name) {
+ createReferencesSilent(Set.of(name));
+ }
+
+ /**
+ * Ensures that multiple references exist, leveraging bulk operations, if possible. References are
+ * created with empty {@linkplain Reference#pointer() pointers}.
+ *
+ *
This whole operation is not guaranteed to be atomic, the creation of each reference is
+ * atomic.
+ *
+ * @see #createReferenceSilent(String)
+ */
+ void createReferencesSilent(Set referenceNames);
+
+ /**
+ * Convenience function to return an existing reference or to create the reference with a supplied
+ * {@linkplain Reference#pointer() pointer}, if it does not already exist.
+ */
+ @Nonnull
+ default Reference fetchOrCreateReference(
+ @Nonnull String name, @Nonnull Supplier> pointerForCreate) {
+ try {
+ return fetchReference(name);
+ } catch (ReferenceNotFoundException e) {
+ try {
+ return createReference(name, pointerForCreate.get());
+ } catch (ReferenceAlreadyExistsException x) {
+ // Unlikely that we ever get here (ref does not exist (but then concurrently created)
+ return fetchReference(name);
+ }
+ }
+ }
+
+ /**
+ * Updates the {@linkplain Reference#pointer() pointer} to {@code newPointer}, if the reference
+ * exists and the current persisted pointer is the same as in {@code reference}.
+ *
+ *
Reference update is always a strongly consistent operation.
+ *
+ * @param reference the existing reference including the expected pointer
+ * @param newPointer the pointer to update the reference to. If the reference has a current
+ * pointer value, both the current and the new pointer must use the same {@link ObjType
+ * ObjType}.
+ * @return If the reference was successfully updated, an updated {@link Reference} instances will
+ * be returned.
+ * @throws ReferenceNotFoundException if the reference does not exist
+ */
+ @Nonnull
+ Optional updateReferencePointer(
+ @Nonnull Reference reference, @Nonnull ObjRef newPointer) throws ReferenceNotFoundException;
+
+ /**
+ * Fetch the reference with the given name, leveraging the reference cache.
+ *
+ * @throws ReferenceNotFoundException if the reference does not exist
+ * @see #fetchReferenceForUpdate(String)
+ * @see #fetchReferenceHead(String, Class)
+ */
+ @Nonnull
+ Reference fetchReference(@Nonnull String name) throws ReferenceNotFoundException;
+
+ /**
+ * Fetches the reference with the given name, but will always fetch the most recent state from the
+ * backend database.
+ *
+ * @see #fetchReference(String)
+ */
+ @Nonnull
+ default Reference fetchReferenceForUpdate(@Nonnull String name)
+ throws ReferenceNotFoundException {
+ return fetchReference(name);
+ }
+
+ /**
+ * Convenience function to return the {@link Obj} as pointed to from the reference with the given
+ * name.
+ *
+ * @see #fetchReference(String)
+ * @see #fetch(ObjRef, Class)
+ */
+ default Optional fetchReferenceHead(
+ @Nonnull String name, @Nonnull Class clazz) throws ReferenceNotFoundException {
+ var ref = fetchReference(name);
+ return ref.pointer()
+ .map(
+ id -> {
+ var head = fetch(id, clazz);
+ checkState(head != null, "%s referenced by '%s' does not exist", id, name);
+ return head;
+ });
+ }
+
+ /**
+ * Fetch the objects for the given object Ids.
+ *
+ *
Supports assembling object splits across multiple rows by {@link #write(Obj, Class)} or
+ * {@link #writeMany(Class, Obj[])}.
+ *
+ * @param id ID of the object to load
+ * @param clazz expected {@link Obj} subtype, passing {@code Obj.class} is fine
+ * @return loaded object or {@code null} if it does not exist
+ * @param returned type can also be just {@code Obj}
+ * @see #fetchMany(Class, ObjRef[])
+ */
+ @Nullable
+ T fetch(@Nonnull ObjRef id, @Nonnull Class clazz);
+
+ /**
+ * Fetch multiple objects for the given object Ids.
+ *
+ *
Supports assembling object splits across multiple rows by {@link #write(Obj, Class)} or
+ * {@link #writeMany(Class, Obj[])}.
+ *
+ * @param returned type can also be just {@code Obj}
+ * @param clazz expected {@link Obj} subtype, passing {@code Obj.class} is fine
+ * @param ids ID of the object to load, callers must ensure that the IDs are not duplicated within
+ * the array
+ * @return array of the same length as {@code ids} containing the loaded objects, with {@code
+ * null} elements for objects that do not exist
+ * @see #fetch(ObjRef, Class)
+ */
+ @Nonnull
+ T[] fetchMany(@Nonnull Class clazz, @Nonnull ObjRef... ids);
+
+ /**
+ * Persist {@code obj} with eventually consistent guarantees.
+ *
+ *
Supports splitting the serialized representation across multiple rows in the backend
+ * database, if the serialized representation does not fit entirely in a single row, limited by
+ * {@link #maxSerializedValueSize()}.
+ *
+ *
This function (and {@link #writeMany(Class, Obj[])}) are not meant to actually
+ * update existing objects with different information, especially not when the size of the
+ * serialized object changes the number of splits in the backend database. Note that there is
+ * no protection against this scenario.
+ *
+ * @return {@code obj} with the {@link Obj#createdAtMicros()} and {@link Obj#numParts()} fields
+ * updated
+ * @see #writeMany(Class, Obj[])
+ */
+ @Nonnull
+ T write(@Nonnull T obj, @Nonnull Class clazz);
+
+ /**
+ * Persist multiple {@code objs} with eventually consistent guarantees.
+ *
+ *
See {@link #write(Obj, Class)} for more information.
+ *
+ *
Supports splitting the serialized representation across multiple rows in the backend
+ * database, if the serialized representation does not fit entirely in a single row, limited by
+ * {@link #maxSerializedValueSize()}.
+ *
+ *
This function and {@link #write(Obj, Class)} are not meant to actually update
+ * existing objects with different information, especially not when the size of the serialized
+ * object changes the number of splits in the backend database. Note that there is no
+ * protection against this scenario.
+ *
+ * @return {@code objs} with the {@link Obj#createdAtMicros()} and {@link Obj#numParts()} fields
+ * updated, callers must ensure that the IDs are not duplicated within the array. {@code null}
+ * elements in the returned array will appear for {@code null} elements in the {@code objs}
+ * array.
+ * @see #write(Obj, Class)
+ */
+ @SuppressWarnings("unchecked")
+ @Nonnull
+ T[] writeMany(@Nonnull Class clazz, @Nonnull T... objs);
+
+ /**
+ * Unconditionally delete the object with the given id.
+ *
+ *
Note that it is generally not advised to actively (or prematurely) delete objects. In
+ * general, it is better to just leave the object and let the maintenance service take care of
+ * purging it.
+ *
+ *
If the object has been split across multiple database rows, only the number of parts
+ * mentioned in {@link ObjRef#numParts()} will be deleted. However, the maintenance service will
+ * take care of purging possibly left-over parts.
+ *
+ * @see #deleteMany(ObjRef[])
+ */
+ void delete(@Nonnull ObjRef id);
+
+ /**
+ * Unconditionally delete the objects with the given ids.
+ *
+ *
Note that it is generally not advised to actively (or prematurely) delete objects. In
+ * general, it is better to just leave the object and let the maintenance service take care of
+ * purging it.
+ *
+ *
If the object has been split across multiple database rows, only the number of parts
+ * mentioned in {@link ObjRef#numParts()} will be deleted. However, the maintenance service will
+ * take care of purging possibly left-over parts.
+ *
+ * @param ids IDs of objects to delete, callers must ensure that the IDs are not duplicated within
+ * the array
+ * @see #delete(ObjRef)
+ */
+ void deleteMany(@Nonnull ObjRef... ids);
+
+ /**
+ * Persist {@code obj} with strong consistent guarantees.
+ *
+ *
Unlike {@linkplain #write(Obj, Class) eventually consistent writes}, conditional write
+ * operations do not support splitting the serialized representation across multiple rows in the
+ * backend database.
+ *
+ *
The serialized representation must fit entirely in a single row, limited by {@link
+ * #maxSerializedValueSize()}.
+ *
+ * @return {@code obj} with the {@link Obj#createdAtMicros()} field updated if and only if no
+ * other object with the same object id existed before, otherwise {@code null}
+ */
+ @Nullable
+ T conditionalInsert(@Nonnull T obj, @Nonnull Class clazz);
+
+ /**
+ * Update an object with strong consistent guarantees.
+ *
+ *
Unlike {@linkplain #write(Obj, Class) eventually consistent writes}, conditional write
+ * operations do not support splitting the serialized representation across multiple rows in the
+ * backend database.
+ *
+ *
The serialized representation must fit entirely in a single row, limited by {@link
+ * #maxSerializedValueSize()}.
+ *
+ * @param expected the object expected to have the same {@link Obj#versionToken()} as this one
+ * @param update the object to be updated to, must have the same {@linkplain Obj#id() id},
+ * {@linkplain Obj#type() type} but a different {@linkplain Obj#versionToken() version token}
+ * @return updated state in the database, if successful, otherwise {@code null}
+ */
+ @Nullable
+ T conditionalUpdate(
+ @Nonnull T expected, @Nonnull T update, @Nonnull Class clazz);
+
+ /**
+ * Delete an object with strong consistent guarantees.
+ *
+ * @param expected the object expected to have the same {@link Obj#versionToken()} as this one
+ * @return {@code true} if the object existed with the expected version token and was deleted in
+ * the database, if successful, otherwise {@code false}
+ */
+ boolean conditionalDelete(@Nonnull T expected, Class clazz);
+
+ PersistenceParams params();
+
+ /**
+ * Defines the maximum allowed {@linkplain Obj serialized object} size. Serialized representation
+ * larger than this value will be split into multiple database rows.
+ */
+ int maxSerializedValueSize();
+
+ long generateId();
+
+ ObjRef generateObjId(ObjType type);
+
+ /**
+ * If the {@linkplain Persistence persistence implementation} is caching, this function returns
+ * the object with the ID from the cache, but does not consult the backend.
+ *
+ *
Non-caching implementations default to {@link #fetch(ObjRef, Class)}.
+ */
+ @Nullable
+ T getImmediate(@Nonnull ObjRef id, @Nonnull Class clazz);
+
+ Commits commits();
+
+ Committer createCommitter(
+ @Nonnull String refName,
+ @Nonnull Class referencedObjType,
+ @Nonnull Class resultType);
+
+ Index buildReadIndex(
+ @Nullable IndexContainer indexContainer,
+ @Nonnull IndexValueSerializer indexValueSerializer);
+
+ UpdatableIndex buildWriteIndex(
+ @Nullable IndexContainer indexContainer,
+ @Nonnull IndexValueSerializer indexValueSerializer);
+
+ @Nonnull
+ default Duration objAge(@Nonnull Obj obj) {
+ return Duration.ofNanos(
+ TimeUnit.MICROSECONDS.toNanos(Math.max(currentTimeMicros() - obj.createdAtMicros(), 0L)));
+ }
+
+ String realmId();
+
+ MonotonicClock monotonicClock();
+
+ IdGenerator idGenerator();
+
+ /**
+ * Convenience for {@link #monotonicClock() monotonicClock().}{@link
+ * MonotonicClock#currentTimeMicros()}.
+ */
+ @SuppressWarnings("resource")
+ default long currentTimeMicros() {
+ return monotonicClock().currentTimeMicros();
+ }
+
+ /**
+ * Convenience for {@link #monotonicClock() monotonicClock().}{@link
+ * MonotonicClock#currentTimeMillis()}.
+ */
+ @SuppressWarnings("resource")
+ default long currentTimeMillis() {
+ return monotonicClock().currentTimeMillis();
+ }
+
+ /**
+ * Convenience for {@link #monotonicClock() monotonicClock().}{@link
+ * MonotonicClock#currentInstant()}.
+ */
+ @SuppressWarnings("resource")
+ default Instant currentInstant() {
+ return monotonicClock().currentInstant();
+ }
+
+ /**
+ * Convenience function to perform {@link #fetchMany(Class, ObjRef...)} on an arbitrary number of
+ * objects to fetch.
+ *
+ * @param objRefs all {@link ObjRef}s to fetch
+ * @param clazz type of {@link Obj} to fetch
+ * @return stream of fetched {@link Obj}s, not found {@link Obj}s are filtered out
+ * @see StreamUtil#bucketized(Stream, Function, int) for a more generic implementation
+ */
+ default Stream bucketizedBulkFetches(Stream objRefs, Class clazz) {
+ var fetchSize = params().bucketizedBulkFetchSize();
+
+ return StreamUtil.bucketized(
+ objRefs,
+ refs -> {
+ var toFetch = refs.toArray(new ObjRef[0]);
+ var objs = fetchMany(clazz, toFetch);
+ return Arrays.asList(objs);
+ },
+ fetchSize)
+ .filter(Objects::nonNull);
+ }
+}
diff --git a/persistence/nosql/persistence/api/src/main/java/org/apache/polaris/persistence/nosql/api/PersistenceDecorator.java b/persistence/nosql/persistence/api/src/main/java/org/apache/polaris/persistence/nosql/api/PersistenceDecorator.java
new file mode 100644
index 0000000000..746d0d90ae
--- /dev/null
+++ b/persistence/nosql/persistence/api/src/main/java/org/apache/polaris/persistence/nosql/api/PersistenceDecorator.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.persistence.nosql.api;
+
+public interface PersistenceDecorator {
+ /** Flag whether the decorator should be considered. */
+ boolean active();
+
+ /**
+ * Indicates the priority. Decorators with a lower priority are applied before those with a higher
+ * priority.
+ */
+ int priority();
+
+ Persistence decorate(Persistence persistence);
+}
diff --git a/persistence/nosql/persistence/api/src/main/java/org/apache/polaris/persistence/nosql/api/PersistenceParams.java b/persistence/nosql/persistence/api/src/main/java/org/apache/polaris/persistence/nosql/api/PersistenceParams.java
new file mode 100644
index 0000000000..0fa9036ae2
--- /dev/null
+++ b/persistence/nosql/persistence/api/src/main/java/org/apache/polaris/persistence/nosql/api/PersistenceParams.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.persistence.nosql.api;
+
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import io.smallrye.config.ConfigMapping;
+import io.smallrye.config.WithDefault;
+import java.util.stream.Stream;
+import org.apache.polaris.immutables.PolarisImmutable;
+import org.apache.polaris.misc.types.memorysize.MemorySize;
+import org.apache.polaris.persistence.nosql.api.commit.RetryConfig;
+import org.immutables.value.Value;
+
+@ConfigMapping(prefix = "polaris.persistence")
+@JsonSerialize(as = ImmutableBuildablePersistenceParams.class)
+@JsonDeserialize(as = ImmutableBuildablePersistenceParams.class)
+public interface PersistenceParams {
+ String DEFAULT_REFERENCE_PREVIOUS_HEAD_COUNT_STRING = "20";
+ int DEFAULT_REFERENCE_PREVIOUS_HEAD_COUNT =
+ Integer.parseInt(DEFAULT_REFERENCE_PREVIOUS_HEAD_COUNT_STRING);
+
+ @WithDefault(DEFAULT_REFERENCE_PREVIOUS_HEAD_COUNT_STRING)
+ int referencePreviousHeadCount();
+
+ String DEFAULT_MAX_INDEX_STRIPES_STRING = "20";
+ int DEFAULT_MAX_INDEX_STRIPES = Integer.parseInt(DEFAULT_MAX_INDEX_STRIPES_STRING);
+
+ @WithDefault(DEFAULT_MAX_INDEX_STRIPES_STRING)
+ int maxIndexStripes();
+
+ String DEFAULT_MAX_EMBEDDED_INDEX_SIZE_STRING = "32k";
+ MemorySize DEFAULT_MAX_EMBEDDED_INDEX_SIZE =
+ MemorySize.valueOf(DEFAULT_MAX_EMBEDDED_INDEX_SIZE_STRING);
+
+ @WithDefault(DEFAULT_MAX_EMBEDDED_INDEX_SIZE_STRING)
+ MemorySize maxEmbeddedIndexSize();
+
+ String DEFAULT_MAX_INDEX_STRIPE_SIZE_STRING = "128k";
+ MemorySize DEFAULT_MAX_INDEX_STRIPE_SIZE =
+ MemorySize.valueOf(DEFAULT_MAX_INDEX_STRIPE_SIZE_STRING);
+
+ @WithDefault(DEFAULT_MAX_INDEX_STRIPE_SIZE_STRING)
+ MemorySize maxIndexStripeSize();
+
+ @Value.Default
+ default RetryConfig retryConfig() {
+ return RetryConfig.DEFAULT_RETRY_CONFIG;
+ }
+
+ String DEFAULT_BUCKETIZED_BULK_FETCH_SIZE_STRING = "16";
+ int DEFAULT_BUCKETIZED_BULK_FETCH_SIZE =
+ Integer.parseInt(DEFAULT_BUCKETIZED_BULK_FETCH_SIZE_STRING);
+
+ /**
+ * The number of objects to fetch at once via {@link Persistence#bucketizedBulkFetches(Stream,
+ * Class)}.
+ */
+ @WithDefault(DEFAULT_BUCKETIZED_BULK_FETCH_SIZE_STRING)
+ int bucketizedBulkFetchSize();
+
+ String DEFAULT_MAX_SERIALIZED_VALUE_SIZE_STRING = "350k";
+ MemorySize DEFAULT_MAX_SERIALIZED_VALUE_SIZE =
+ MemorySize.valueOf(DEFAULT_MAX_SERIALIZED_VALUE_SIZE_STRING);
+
+ /** The maximum size of a serialized value in a persisted database row. */
+ @WithDefault(DEFAULT_MAX_SERIALIZED_VALUE_SIZE_STRING)
+ MemorySize maxSerializedValueSize();
+
+ @PolarisImmutable
+ interface BuildablePersistenceParams extends PersistenceParams {
+ static ImmutableBuildablePersistenceParams.Builder builder() {
+ return ImmutableBuildablePersistenceParams.builder();
+ }
+
+ @Override
+ @Value.Default
+ default int referencePreviousHeadCount() {
+ return DEFAULT_REFERENCE_PREVIOUS_HEAD_COUNT;
+ }
+
+ @Override
+ @Value.Default
+ default int maxIndexStripes() {
+ return DEFAULT_MAX_INDEX_STRIPES;
+ }
+
+ @Override
+ @Value.Default
+ default MemorySize maxEmbeddedIndexSize() {
+ return DEFAULT_MAX_EMBEDDED_INDEX_SIZE;
+ }
+
+ @Override
+ @Value.Default
+ default MemorySize maxIndexStripeSize() {
+ return DEFAULT_MAX_INDEX_STRIPE_SIZE;
+ }
+
+ @Override
+ @Value.Default
+ default RetryConfig retryConfig() {
+ return RetryConfig.BuildableRetryConfig.builder().build();
+ }
+
+ @Override
+ @Value.Default
+ default int bucketizedBulkFetchSize() {
+ return DEFAULT_BUCKETIZED_BULK_FETCH_SIZE;
+ }
+
+ @Override
+ @Value.Default
+ default MemorySize maxSerializedValueSize() {
+ return DEFAULT_MAX_SERIALIZED_VALUE_SIZE;
+ }
+ }
+}
diff --git a/persistence/nosql/persistence/api/src/main/java/org/apache/polaris/persistence/nosql/api/RealmPersistenceFactory.java b/persistence/nosql/persistence/api/src/main/java/org/apache/polaris/persistence/nosql/api/RealmPersistenceFactory.java
new file mode 100644
index 0000000000..cd2bdf196b
--- /dev/null
+++ b/persistence/nosql/persistence/api/src/main/java/org/apache/polaris/persistence/nosql/api/RealmPersistenceFactory.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.persistence.nosql.api;
+
+import jakarta.annotation.Nonnull;
+import jakarta.enterprise.context.ApplicationScoped;
+
+/**
+ * Builder factory to generate "realm-scoped" {@link Persistence} instances.
+ *
+ *
{@link RealmPersistenceFactory} instance is available as an {@link ApplicationScoped} bean.
+ */
+public interface RealmPersistenceFactory {
+ /**
+ * Return a new builder for per-realm persistence.
+ *
+ *
Builders must only be used once.
+ */
+ RealmPersistenceBuilder newBuilder();
+
+ interface RealmPersistenceBuilder {
+ RealmPersistenceBuilder realmId(@Nonnull String realmId);
+
+ RealmPersistenceBuilder skipDecorators();
+
+ Persistence build();
+ }
+}
diff --git a/persistence/nosql/persistence/api/src/main/java/org/apache/polaris/persistence/nosql/api/Realms.java b/persistence/nosql/persistence/api/src/main/java/org/apache/polaris/persistence/nosql/api/Realms.java
new file mode 100644
index 0000000000..6275875871
--- /dev/null
+++ b/persistence/nosql/persistence/api/src/main/java/org/apache/polaris/persistence/nosql/api/Realms.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.persistence.nosql.api;
+
+public final class Realms {
+ private Realms() {}
+
+ /**
+ * Realms with special meanings and "non-standard behavior" (as per {@code
+ * org.apache.polaris.realms.api.RealmDefinition.RealmStatus}) have to have an ID that starts with
+ * this prefix.
+ */
+ public static final String SYSTEM_REALM_PREFIX = "::";
+
+ public static final String SYSTEM_REALM_ID = SYSTEM_REALM_PREFIX + "system::";
+}
diff --git a/persistence/nosql/persistence/api/src/main/java/org/apache/polaris/persistence/nosql/api/StartupPersistence.java b/persistence/nosql/persistence/api/src/main/java/org/apache/polaris/persistence/nosql/api/StartupPersistence.java
new file mode 100644
index 0000000000..4530531f71
--- /dev/null
+++ b/persistence/nosql/persistence/api/src/main/java/org/apache/polaris/persistence/nosql/api/StartupPersistence.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.persistence.nosql.api;
+
+import static java.lang.annotation.ElementType.FIELD;
+import static java.lang.annotation.ElementType.METHOD;
+import static java.lang.annotation.ElementType.PARAMETER;
+import static java.lang.annotation.ElementType.TYPE;
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
+
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.enterprise.util.AnnotationLiteral;
+import jakarta.inject.Inject;
+import jakarta.inject.Qualifier;
+import java.lang.annotation.Documented;
+import java.lang.annotation.Retention;
+import java.lang.annotation.Target;
+import org.apache.polaris.ids.api.IdGenerator;
+import org.apache.polaris.persistence.nosql.nodeids.api.NodeManagement;
+
+/**
+ * Qualifier for system-level {@link Persistence} instance against the {@linkplain
+ * Realms#SYSTEM_REALM_ID system realm} needed for {@linkplain NodeManagement node management}.
+ *
+ *
This qualifier is only needed and should only be used by code used to initialize the
+ * application. There is really no need to use this qualifier in any application code.
+ *
+ *
The qualified {@link Persistence} instance has no functional {@link IdGenerator}.
+ *
+ *
A system-realm {@link Persistence} instance can be {@link Inject @Inject}ed as an {@link
+ * ApplicationScoped @ApplicationScoped} bean using
+ *
+ * {@snippet :
+ * @ApplicationScoped
+ * class MyBean {
+ * @Inject @StartupPersistence Persistence startupPersistence; // @highlight
+ * }
+ * }
+ *
+ * @see SystemPersistence
+ */
+@Target({TYPE, METHOD, PARAMETER, FIELD})
+@Retention(RUNTIME)
+@Documented
+@Qualifier
+public @interface StartupPersistence {
+ @SuppressWarnings("ClassExplicitlyAnnotation")
+ final class Literal extends AnnotationLiteral implements StartupPersistence {}
+}
diff --git a/persistence/nosql/persistence/api/src/main/java/org/apache/polaris/persistence/nosql/api/StreamUtil.java b/persistence/nosql/persistence/api/src/main/java/org/apache/polaris/persistence/nosql/api/StreamUtil.java
new file mode 100644
index 0000000000..f0d7640a03
--- /dev/null
+++ b/persistence/nosql/persistence/api/src/main/java/org/apache/polaris/persistence/nosql/api/StreamUtil.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.persistence.nosql.api;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Spliterator;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+
+public final class StreamUtil {
+ /**
+ * Bucketizes the elements of the source stream, passes each bucket through the {@code
+ * bucketFetcher} function, eventually yielding a stream the elements of all buckets.
+ *
+ *
A classic use case for this function is {@link Persistence#bucketizedBulkFetches(Stream,
+ * Class)}.
+ */
+ public static Stream bucketized(
+ Stream source, Function, List> bucketFetcher, int bucketSize) {
+ var sourceIter = source.iterator();
+
+ var split =
+ new Spliterator>() {
+ @Override
+ public boolean tryAdvance(Consumer super List> action) {
+ if (!sourceIter.hasNext()) {
+ // nothing more to do
+ return false;
+ }
+
+ var bucket = new ArrayList(bucketSize);
+ for (int i = 0; i < bucketSize && sourceIter.hasNext(); i++) {
+ bucket.add(sourceIter.next());
+ }
+ var fetched = bucketFetcher.apply(bucket);
+ action.accept(fetched);
+
+ return true;
+ }
+
+ @Override
+ public Spliterator> trySplit() {
+ return null;
+ }
+
+ @Override
+ public long estimateSize() {
+ return Long.MAX_VALUE;
+ }
+
+ @Override
+ public int characteristics() {
+ return 0;
+ }
+ };
+ return StreamSupport.stream(split, false).flatMap(List::stream);
+ }
+}
diff --git a/persistence/nosql/persistence/api/src/main/java/org/apache/polaris/persistence/nosql/api/SystemPersistence.java b/persistence/nosql/persistence/api/src/main/java/org/apache/polaris/persistence/nosql/api/SystemPersistence.java
new file mode 100644
index 0000000000..6520da4d68
--- /dev/null
+++ b/persistence/nosql/persistence/api/src/main/java/org/apache/polaris/persistence/nosql/api/SystemPersistence.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.persistence.nosql.api;
+
+import static java.lang.annotation.ElementType.FIELD;
+import static java.lang.annotation.ElementType.METHOD;
+import static java.lang.annotation.ElementType.PARAMETER;
+import static java.lang.annotation.ElementType.TYPE;
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
+
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.enterprise.util.AnnotationLiteral;
+import jakarta.inject.Inject;
+import jakarta.inject.Qualifier;
+import java.lang.annotation.Documented;
+import java.lang.annotation.Retention;
+import java.lang.annotation.Target;
+import org.apache.polaris.ids.api.IdGenerator;
+import org.apache.polaris.persistence.nosql.nodeids.api.NodeManagement;
+
+/**
+ * Qualifier for system-level {@link Persistence} instance against the {@linkplain
+ * Realms#SYSTEM_REALM_ID system realm} needed for realm management.
+ *
+ *
The qualified {@link Persistence} instance has a functional {@link IdGenerator}, enabled via a
+ * valid {@linkplain NodeManagement#lease() node lease}.
+ *
+ *
A system-realm {@link Persistence} instance can be {@link Inject @Inject}ed as an {@link
+ * ApplicationScoped @ApplicationScoped} bean using
+ *
+ * {@snippet :
+ * @ApplicationScoped
+ * class MyBean {
+ * @Inject @SystemPersistence Persistence systemPersistence; // @highlight
+ * }
+ * }
+ *
+ * @see StartupPersistence
+ */
+@Target({TYPE, METHOD, PARAMETER, FIELD})
+@Retention(RUNTIME)
+@Documented
+@Qualifier
+public @interface SystemPersistence {
+ @SuppressWarnings("ClassExplicitlyAnnotation")
+ final class Literal extends AnnotationLiteral implements SystemPersistence {}
+}
diff --git a/persistence/nosql/persistence/api/src/main/java/org/apache/polaris/persistence/nosql/api/backend/Backend.java b/persistence/nosql/persistence/api/src/main/java/org/apache/polaris/persistence/nosql/api/backend/Backend.java
new file mode 100644
index 0000000000..6eea4a5940
--- /dev/null
+++ b/persistence/nosql/persistence/api/src/main/java/org/apache/polaris/persistence/nosql/api/backend/Backend.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.persistence.nosql.api.backend;
+
+import jakarta.annotation.Nonnull;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.Function;
+import org.apache.polaris.ids.api.IdGenerator;
+import org.apache.polaris.ids.api.MonotonicClock;
+import org.apache.polaris.persistence.nosql.api.Persistence;
+import org.apache.polaris.persistence.nosql.api.PersistenceParams;
+import org.apache.polaris.persistence.nosql.api.obj.ObjRef;
+import org.apache.polaris.persistence.nosql.api.ref.Reference;
+
+/** Provides "low-level" access to the database-specific backend. */
+public interface Backend extends AutoCloseable {
+ /** Name of this backend. This value serves as an identifier to select the correct backend. */
+ @Nonnull
+ String type();
+
+ /**
+ * Called to set up the database schema.
+ *
+ * @return optional, human-readable information
+ */
+ Optional setupSchema();
+
+ @Nonnull
+ Persistence newPersistence(
+ Function backendWrapper,
+ @Nonnull PersistenceParams persistenceParams,
+ String realmId,
+ MonotonicClock monotonicClock,
+ IdGenerator idGenerator);
+
+ /** Whether the implementation supports {@link #deleteRealms(Set)}. */
+ boolean supportsRealmDeletion();
+
+ /**
+ * Delete the given realms.
+ *
+ *
This function works, if {@link #supportsRealmDeletion()} yields {@code true}.
+ *
+ *
Throws an {@link UnsupportedOperationException}, if {@link #supportsRealmDeletion()} yields
+ * {@code false}.
+ */
+ void deleteRealms(Set realmIds);
+
+ /**
+ * Bulk reference deletion, grouped by realm. This functionality is primarily needed for the
+ * maintenance service.
+ */
+ void batchDeleteRefs(Map> realmRefs);
+
+ /**
+ * Bulk object-part deletion, grouped by realm. This functionality is primarily needed for the
+ * maintenance service.
+ */
+ void batchDeleteObjs(Map> realmObjs);
+
+ /** Callback interface for {@link #scanBackend(ReferenceScanCallback, ObjScanCallback)}. */
+ @FunctionalInterface
+ interface ReferenceScanCallback {
+ /**
+ * Called for each discovered reference and object-part ("item").
+ *
+ * @param realmId the realm to which the item belongs
+ * @param refName the reference name
+ * @param createdAtMicros the timestamp in microseconds since (Unix) epoch at which the item was
+ * created in the database
+ */
+ void call(@Nonnull String realmId, @Nonnull String refName, long createdAtMicros);
+ }
+
+ /** Callback interface for {@link #scanBackend(ReferenceScanCallback, ObjScanCallback)}. */
+ @FunctionalInterface
+ interface ObjScanCallback {
+ /**
+ * Called for each discovered reference and object-part ("item").
+ *
+ * @param realmId the realm to which the item belongs
+ * @param type the object type ID
+ * @param id object-part ID
+ * @param createdAtMicros the timestamp in microseconds since (Unix) epoch at which the item was
+ * created in the database
+ */
+ void call(
+ @Nonnull String realmId, @Nonnull String type, @Nonnull PersistId id, long createdAtMicros);
+ }
+
+ /**
+ * Scan the whole backend database and return each discovered reference and object-part via the
+ * provided callbacks. This functionality is primarily needed for the maintenance service.
+ */
+ void scanBackend(
+ @Nonnull ReferenceScanCallback referenceConsumer, @Nonnull ObjScanCallback objConsumer);
+
+ boolean createReference(@Nonnull String realmId, @Nonnull Reference newRef);
+
+ void createReferences(@Nonnull String realmId, @Nonnull List newRefs);
+
+ boolean updateReference(
+ @Nonnull String realmId,
+ @Nonnull Reference updatedRef,
+ @Nonnull Optional expectedPointer);
+
+ @Nonnull
+ Reference fetchReference(@Nonnull String realmId, @Nonnull String name);
+
+ @Nonnull
+ Map fetch(@Nonnull String realmId, @Nonnull Set ids);
+
+ void write(@Nonnull String realmId, @Nonnull List writes);
+
+ void delete(@Nonnull String realmId, @Nonnull Set ids);
+
+ boolean conditionalInsert(
+ @Nonnull String realmId,
+ String objTypeId,
+ @Nonnull PersistId persistId,
+ long createdAtMicros,
+ @Nonnull String versionToken,
+ @Nonnull byte[] serializedValue);
+
+ boolean conditionalUpdate(
+ @Nonnull String realmId,
+ String objTypeId,
+ @Nonnull PersistId persistId,
+ long createdAtMicros,
+ @Nonnull String updateToken,
+ @Nonnull String expectedToken,
+ @Nonnull byte[] serializedValue);
+
+ boolean conditionalDelete(
+ @Nonnull String realmId, @Nonnull PersistId persistId, @Nonnull String expectedToken);
+}
diff --git a/persistence/nosql/persistence/api/src/main/java/org/apache/polaris/persistence/nosql/api/backend/BackendConfiguration.java b/persistence/nosql/persistence/api/src/main/java/org/apache/polaris/persistence/nosql/api/backend/BackendConfiguration.java
new file mode 100644
index 0000000000..5e178d23bf
--- /dev/null
+++ b/persistence/nosql/persistence/api/src/main/java/org/apache/polaris/persistence/nosql/api/backend/BackendConfiguration.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.persistence.nosql.api.backend;
+
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import io.smallrye.config.ConfigMapping;
+import java.util.Optional;
+import org.apache.polaris.immutables.PolarisImmutable;
+
+/** Polaris persistence backend configuration. */
+@ConfigMapping(prefix = "polaris.persistence.backend")
+@JsonSerialize(as = ImmutableBuildableBackendConfiguration.class)
+@JsonDeserialize(as = ImmutableBuildableBackendConfiguration.class)
+public interface BackendConfiguration {
+ /** Name of the persistence backend to use. */
+ Optional type();
+
+ @PolarisImmutable
+ interface BuildableBackendConfiguration extends BackendConfiguration {
+ static ImmutableBuildableBackendConfiguration.Builder builder() {
+ return ImmutableBuildableBackendConfiguration.builder();
+ }
+
+ @Override
+ Optional type();
+ }
+}
diff --git a/persistence/nosql/persistence/api/src/main/java/org/apache/polaris/persistence/nosql/api/backend/BackendFactory.java b/persistence/nosql/persistence/api/src/main/java/org/apache/polaris/persistence/nosql/api/backend/BackendFactory.java
new file mode 100644
index 0000000000..b9cc72ce84
--- /dev/null
+++ b/persistence/nosql/persistence/api/src/main/java/org/apache/polaris/persistence/nosql/api/backend/BackendFactory.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.persistence.nosql.api.backend;
+
+import jakarta.annotation.Nonnull;
+
+/**
+ * Factory responsible to produce {@link Backend} instances. Usually only one {@link Backend}
+ * instance is ever produced and active in a production environment.
+ */
+public interface BackendFactory {
+ /** Human-readable name. */
+ String name();
+
+ @Nonnull
+ Backend buildBackend(@Nonnull RUNTIME_CONFIG backendConfig);
+
+ Class configurationInterface();
+
+ RUNTIME_CONFIG buildConfiguration(CONFIG_INTERFACE config);
+}
diff --git a/persistence/nosql/persistence/api/src/main/java/org/apache/polaris/persistence/nosql/api/backend/BackendLoader.java b/persistence/nosql/persistence/api/src/main/java/org/apache/polaris/persistence/nosql/api/backend/BackendLoader.java
new file mode 100644
index 0000000000..346e3bf96d
--- /dev/null
+++ b/persistence/nosql/persistence/api/src/main/java/org/apache/polaris/persistence/nosql/api/backend/BackendLoader.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.persistence.nosql.api.backend;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+
+import jakarta.annotation.Nonnull;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.ServiceLoader;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public final class BackendLoader {
+ private BackendLoader() {}
+
+ @Nonnull
+ public static BackendFactory findFactoryByName(@Nonnull String name) {
+ return findFactory(f -> f.name().equals(name));
+ }
+
+ @Nonnull
+ public static Stream> availableFactories() {
+ @SuppressWarnings("rawtypes")
+ var x = (Stream) loader().stream().map(ServiceLoader.Provider::get);
+ @SuppressWarnings("unchecked")
+ var r = (Stream>) x;
+ return r;
+ }
+
+ @Nonnull
+ public static BackendFactory findFactory(
+ @Nonnull Predicate> filter) {
+ ServiceLoader> loader = loader();
+ List> candidates = new ArrayList<>();
+ boolean any = false;
+ for (BackendFactory backendFactory : loader) {
+ any = true;
+ if (filter.test(backendFactory)) {
+ candidates.add(backendFactory);
+ }
+ }
+ checkState(any, "No BackendFactory on class path");
+ checkArgument(!candidates.isEmpty(), "No BackendFactory matched the given filter");
+
+ if (candidates.size() == 1) {
+ return cast(candidates.getFirst());
+ }
+
+ throw new IllegalStateException(
+ "More than one BackendFactory matched the given filter: "
+ + candidates.stream().map(BackendFactory::name).collect(Collectors.joining(", ")));
+ }
+
+ // Helper for ugly generics casting
+ private static ServiceLoader> loader() {
+ @SuppressWarnings("rawtypes")
+ ServiceLoader f = ServiceLoader.load(BackendFactory.class);
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ ServiceLoader> r = (ServiceLoader) f;
+ return r;
+ }
+
+ // Helper for ugly generics casting
+ private static BackendFactory cast(BackendFactory, ?> backendFactory) {
+ @SuppressWarnings("unchecked")
+ BackendFactory r = (BackendFactory) backendFactory;
+ return r;
+ }
+}
diff --git a/persistence/nosql/persistence/api/src/main/java/org/apache/polaris/persistence/nosql/api/backend/FetchedObj.java b/persistence/nosql/persistence/api/src/main/java/org/apache/polaris/persistence/nosql/api/backend/FetchedObj.java
new file mode 100644
index 0000000000..61f21db1b4
--- /dev/null
+++ b/persistence/nosql/persistence/api/src/main/java/org/apache/polaris/persistence/nosql/api/backend/FetchedObj.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.persistence.nosql.api.backend;
+
+public record FetchedObj(
+ String type, long createdAtMicros, String versionToken, byte[] serialized, int realNumParts) {}
diff --git a/persistence/nosql/persistence/api/src/main/java/org/apache/polaris/persistence/nosql/api/backend/PersistId.java b/persistence/nosql/persistence/api/src/main/java/org/apache/polaris/persistence/nosql/api/backend/PersistId.java
new file mode 100644
index 0000000000..5a0de664c5
--- /dev/null
+++ b/persistence/nosql/persistence/api/src/main/java/org/apache/polaris/persistence/nosql/api/backend/PersistId.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.persistence.nosql.api.backend;
+
+import static com.google.common.base.Preconditions.checkState;
+import static org.apache.polaris.persistence.varint.VarInt.putVarInt;
+import static org.apache.polaris.persistence.varint.VarInt.readVarInt;
+import static org.apache.polaris.persistence.varint.VarInt.varIntLen;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.JsonDeserializer;
+import com.fasterxml.jackson.databind.JsonSerializer;
+import com.fasterxml.jackson.databind.SerializerProvider;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import org.apache.polaris.immutables.PolarisImmutable;
+import org.apache.polaris.persistence.nosql.api.obj.Obj;
+import org.immutables.value.Value;
+
+/**
+ * Represents the key of a serialized part of an {@link Obj}, where {@link #part()} defines
+ * the {@code 0}-based offset of the serialized part.
+ *
+ *
This type is used internally when dealing with individual database rows/documents and for
+ * maintenance operations. This type is not part of any application/user facing API.
+ */
+@JsonSerialize(using = PersistId.PersistIdSerializer.class)
+@JsonDeserialize(using = PersistId.PersistIdDeserializer.class)
+@PolarisImmutable
+public interface PersistId {
+ @Value.Parameter(order = 1)
+ long id();
+
+ @Value.Parameter(order = 2)
+ int part();
+
+ @Value.Check
+ default void check() {
+ checkState(part() >= 0, "part must not be negative");
+ }
+
+ static PersistId persistId(long id, int part) {
+ return ImmutablePersistId.of(id, part);
+ }
+
+ static PersistId persistIdPart0(Obj obj) {
+ return persistId(obj.id(), 0);
+ }
+
+ class PersistIdSerializer extends JsonSerializer {
+ @Override
+ public void serialize(PersistId value, JsonGenerator gen, SerializerProvider serializers)
+ throws IOException {
+ gen.writeBinary(serializeAsBytes(value));
+ }
+ }
+
+ class PersistIdDeserializer extends JsonDeserializer {
+ @Override
+ public PersistId deserialize(JsonParser p, DeserializationContext ctxt) throws IOException {
+ return fromBytes(p.getBinaryValue());
+ }
+ }
+
+ static int serializedSize(PersistId persistId) {
+ var part = persistId.part();
+ var hasPart = part > 0;
+ var partLen = hasPart ? varIntLen(part) : 0;
+ return 1 + Long.BYTES + partLen;
+ }
+
+ @Value.NonAttribute
+ @JsonIgnore
+ default byte[] toBytes() {
+ return serializeAsBytes(this);
+ }
+
+ static byte[] serializeAsBytes(PersistId persistId) {
+ var part = persistId.part();
+ var hasPart = part > 0;
+ var partLen = hasPart ? varIntLen(part) : 0;
+ var type = (byte) (hasPart ? 2 : 1);
+
+ var bytes = new byte[1 + Long.BYTES + partLen];
+ var buf = ByteBuffer.wrap(bytes);
+ buf.put(type);
+ buf.putLong(persistId.id());
+ if (hasPart) {
+ putVarInt(buf, part);
+ }
+ return bytes;
+ }
+
+ static PersistId fromBytes(byte[] bytes) {
+ if (bytes == null || bytes.length == 0) {
+ return null;
+ }
+ var buf = ByteBuffer.wrap(bytes);
+ var type = buf.get();
+ return switch (type) {
+ case 1 -> persistId(buf.getLong(), 0);
+ case 2 -> {
+ var id = buf.getLong();
+ var part = readVarInt(buf);
+ yield persistId(id, part);
+ }
+ default -> throw new IllegalArgumentException("Unsupported PersistId type: " + type);
+ };
+ }
+}
diff --git a/persistence/nosql/persistence/api/src/main/java/org/apache/polaris/persistence/nosql/api/backend/WriteObj.java b/persistence/nosql/persistence/api/src/main/java/org/apache/polaris/persistence/nosql/api/backend/WriteObj.java
new file mode 100644
index 0000000000..3379dd8fa6
--- /dev/null
+++ b/persistence/nosql/persistence/api/src/main/java/org/apache/polaris/persistence/nosql/api/backend/WriteObj.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.persistence.nosql.api.backend;
+
+public record WriteObj(
+ String type, long id, int part, long createdAtMicros, byte[] serialized, int partNum) {}
diff --git a/persistence/nosql/persistence/api/src/main/java/org/apache/polaris/persistence/nosql/api/cache/CacheBackend.java b/persistence/nosql/persistence/api/src/main/java/org/apache/polaris/persistence/nosql/api/cache/CacheBackend.java
new file mode 100644
index 0000000000..38d6653dfd
--- /dev/null
+++ b/persistence/nosql/persistence/api/src/main/java/org/apache/polaris/persistence/nosql/api/cache/CacheBackend.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.persistence.nosql.api.cache;
+
+import static org.apache.polaris.persistence.nosql.api.obj.ObjRef.objRef;
+
+import jakarta.annotation.Nonnull;
+import org.apache.polaris.persistence.nosql.api.Persistence;
+import org.apache.polaris.persistence.nosql.api.backend.Backend;
+import org.apache.polaris.persistence.nosql.api.obj.Obj;
+import org.apache.polaris.persistence.nosql.api.obj.ObjRef;
+import org.apache.polaris.persistence.nosql.api.obj.ObjType;
+import org.apache.polaris.persistence.nosql.api.ref.Reference;
+
+/**
+ * Provides the cache primitives for a caching {@link Persistence} facade, suitable for multiple
+ * repositories. It is advisable to have one {@link CacheBackend} per {@link Backend}.
+ */
+public interface CacheBackend {
+ /**
+ * Special sentinel reference instance to indicate that a reference object has been marked as "not
+ * found". This object is only for cache-internal purposes.
+ */
+ Reference NON_EXISTENT_REFERENCE_SENTINEL =
+ Reference.builder()
+ .name("NON_EXISTENT")
+ .pointer(objRef("CACHE_SENTINEL", 0L))
+ .createdAtMicros(0L)
+ .previousPointers()
+ .build();
+
+ /**
+ * Special sentinel object instance to indicate that an object has been marked as "not found".
+ * This object is only for cache-internal purposes.
+ */
+ Obj NOT_FOUND_OBJ_SENTINEL =
+ new Obj() {
+ @Override
+ public ObjType type() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public long id() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int numParts() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public String versionToken() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public long createdAtMicros() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ @Nonnull
+ public Obj withCreatedAtMicros(long createdAtMicros) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ @Nonnull
+ public Obj withNumParts(int numParts) {
+ throw new UnsupportedOperationException();
+ }
+ };
+
+ /** Returns the {@link Obj} for the given {@link ObjRef id}. */
+ Obj get(@Nonnull String realmId, @Nonnull ObjRef id);
+
+ /**
+ * Adds the given object to the local cache and sends a cache-invalidation message to Polaris
+ * peers.
+ */
+ void put(@Nonnull String realmId, @Nonnull Obj obj);
+
+ /** Adds the given object only to the local cache, does not send a cache-invalidation message. */
+ void putLocal(@Nonnull String realmId, @Nonnull Obj obj);
+
+ /** Record the "not found" sentinel for the given {@link ObjRef id} and {@link ObjType type}. */
+ void putNegative(@Nonnull String realmId, @Nonnull ObjRef id);
+
+ void remove(@Nonnull String realmId, @Nonnull ObjRef id);
+
+ void clear(@Nonnull String realmId);
+
+ void purge();
+
+ long estimatedSize();
+
+ Persistence wrap(@Nonnull Persistence persist);
+
+ Reference getReference(@Nonnull String realmId, @Nonnull String name);
+
+ void removeReference(@Nonnull String realmId, @Nonnull String name);
+
+ /**
+ * Adds the given reference to the local cache and sends a cache-invalidation message to Polaris
+ * peers.
+ */
+ void putReference(@Nonnull String realmId, @Nonnull Reference reference);
+
+ /**
+ * Adds the given reference only to the local cache, does not send a cache-invalidation message.
+ */
+ void putReferenceLocal(@Nonnull String realmId, @Nonnull Reference reference);
+
+ void putReferenceNegative(@Nonnull String realmId, @Nonnull String name);
+}
diff --git a/persistence/nosql/persistence/api/src/main/java/org/apache/polaris/persistence/nosql/api/cache/CacheConfig.java b/persistence/nosql/persistence/api/src/main/java/org/apache/polaris/persistence/nosql/api/cache/CacheConfig.java
new file mode 100644
index 0000000000..8753d4e0ac
--- /dev/null
+++ b/persistence/nosql/persistence/api/src/main/java/org/apache/polaris/persistence/nosql/api/cache/CacheConfig.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.persistence.nosql.api.cache;
+
+import static com.google.common.base.Preconditions.checkState;
+
+import com.fasterxml.jackson.annotation.JsonFormat;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import com.google.errorprone.annotations.CanIgnoreReturnValue;
+import io.smallrye.config.ConfigMapping;
+import io.smallrye.config.WithDefault;
+import java.time.Duration;
+import java.util.Optional;
+import java.util.function.LongSupplier;
+import org.apache.polaris.immutables.PolarisImmutable;
+import org.immutables.value.Value;
+
+/** Persistence cache configuration. */
+@ConfigMapping(prefix = "polaris.persistence.cache")
+@JsonSerialize(as = ImmutableBuildableCacheConfig.class)
+@JsonDeserialize(as = ImmutableBuildableCacheConfig.class)
+public interface CacheConfig {
+
+ String INVALID_REFERENCE_NEGATIVE_TTL =
+ "Cache reference-negative-TTL, if present, must be positive.";
+ String INVALID_REFERENCE_TTL =
+ "Cache reference-TTL must be positive, 0 disables reference caching.";
+
+ String DEFAULT_REFERENCE_TTL_STRING = "PT15M";
+ Duration DEFAULT_REFERENCE_TTL = Duration.parse(DEFAULT_REFERENCE_TTL_STRING);
+
+ boolean DEFAULT_ENABLE = true;
+
+ /**
+ * Optionally disable the cache, the default value is {@code true}, meaning that the cache is
+ * enabled by default.
+ */
+ @WithDefault("" + DEFAULT_ENABLE)
+ Optional enable();
+
+ /** Duration to cache the state of references. */
+ @WithDefault(DEFAULT_REFERENCE_TTL_STRING)
+ Optional referenceTtl();
+
+ /** Duration to cache whether a reference does not exist (negative caching). */
+ @JsonFormat(shape = JsonFormat.Shape.STRING)
+ Optional referenceNegativeTtl();
+
+ Optional sizing();
+
+ @Value.Default
+ @JsonIgnore
+ default LongSupplier clockNanos() {
+ return System::nanoTime;
+ }
+
+ @PolarisImmutable
+ interface BuildableCacheConfig extends CacheConfig {
+
+ static Builder builder() {
+ return ImmutableBuildableCacheConfig.builder();
+ }
+
+ @Value.Check
+ default void check() {
+ var referenceTtl = referenceTtl().orElse(DEFAULT_REFERENCE_TTL);
+ checkState(referenceTtl.compareTo(Duration.ZERO) >= 0, INVALID_REFERENCE_TTL);
+ referenceNegativeTtl()
+ .ifPresent(
+ ttl ->
+ checkState(
+ referenceTtl.compareTo(Duration.ZERO) > 0 && ttl.compareTo(Duration.ZERO) > 0,
+ INVALID_REFERENCE_NEGATIVE_TTL));
+ }
+
+ interface Builder {
+ @CanIgnoreReturnValue
+ Builder referenceTtl(Duration referenceTtl);
+
+ @CanIgnoreReturnValue
+ Builder referenceNegativeTtl(Duration referenceNegativeTtl);
+
+ @CanIgnoreReturnValue
+ Builder sizing(CacheSizing sizing);
+
+ @CanIgnoreReturnValue
+ Builder clockNanos(LongSupplier clockNanos);
+
+ CacheConfig build();
+ }
+ }
+}
diff --git a/persistence/nosql/persistence/api/src/main/java/org/apache/polaris/persistence/nosql/api/cache/CacheInvalidations.java b/persistence/nosql/persistence/api/src/main/java/org/apache/polaris/persistence/nosql/api/cache/CacheInvalidations.java
new file mode 100644
index 0000000000..f98afd07cd
--- /dev/null
+++ b/persistence/nosql/persistence/api/src/main/java/org/apache/polaris/persistence/nosql/api/cache/CacheInvalidations.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.persistence.nosql.api.cache;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonSubTypes;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import java.util.List;
+import org.apache.polaris.persistence.nosql.api.obj.ObjRef;
+import org.immutables.value.Value;
+
+@Value.Immutable
+@Value.Style(jdkOnly = true)
+@JsonSerialize(as = ImmutableCacheInvalidations.class)
+@JsonDeserialize(as = ImmutableCacheInvalidations.class)
+public interface CacheInvalidations {
+ @Value.Parameter(order = 1)
+ List invalidations();
+
+ static CacheInvalidations cacheInvalidations(List invalidations) {
+ return ImmutableCacheInvalidations.of(invalidations);
+ }
+
+ @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "t")
+ @JsonSubTypes({
+ @JsonSubTypes.Type(
+ value = CacheInvalidationEvictObj.class,
+ name = CacheInvalidationEvictObj.TYPE),
+ @JsonSubTypes.Type(
+ value = CacheInvalidationEvictReference.class,
+ name = CacheInvalidationEvictReference.TYPE),
+ })
+ interface CacheInvalidation {
+ String type();
+ }
+
+ @Value.Immutable
+ @JsonSerialize(as = ImmutableCacheInvalidationEvictObj.class)
+ @JsonDeserialize(as = ImmutableCacheInvalidationEvictObj.class)
+ @JsonTypeName(value = CacheInvalidationEvictObj.TYPE)
+ interface CacheInvalidationEvictObj extends CacheInvalidation {
+ String TYPE = "obj";
+
+ @Override
+ default String type() {
+ return TYPE;
+ }
+
+ @JsonProperty("r")
+ @Value.Parameter(order = 1)
+ String realmId();
+
+ @Value.Parameter(order = 2)
+ ObjRef id();
+
+ static CacheInvalidationEvictObj cacheInvalidationEvictObj(String realmId, ObjRef id) {
+ return ImmutableCacheInvalidationEvictObj.of(realmId, id);
+ }
+ }
+
+ @Value.Immutable
+ @JsonSerialize(as = ImmutableCacheInvalidationEvictReference.class)
+ @JsonDeserialize(as = ImmutableCacheInvalidationEvictReference.class)
+ @JsonTypeName(value = CacheInvalidationEvictReference.TYPE)
+ interface CacheInvalidationEvictReference extends CacheInvalidation {
+ String TYPE = "ref";
+
+ @Override
+ default String type() {
+ return TYPE;
+ }
+
+ @JsonProperty("r")
+ @Value.Parameter(order = 1)
+ String realmId();
+
+ @Value.Parameter(order = 2)
+ String ref();
+
+ static CacheInvalidationEvictReference cacheInvalidationEvictReference(
+ String realmId, String refName) {
+ return ImmutableCacheInvalidationEvictReference.of(realmId, refName);
+ }
+ }
+}
diff --git a/persistence/nosql/persistence/api/src/main/java/org/apache/polaris/persistence/nosql/api/cache/CacheSizing.java b/persistence/nosql/persistence/api/src/main/java/org/apache/polaris/persistence/nosql/api/cache/CacheSizing.java
new file mode 100644
index 0000000000..81a7ecf096
--- /dev/null
+++ b/persistence/nosql/persistence/api/src/main/java/org/apache/polaris/persistence/nosql/api/cache/CacheSizing.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.persistence.nosql.api.cache;
+
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.errorprone.annotations.CanIgnoreReturnValue;
+import io.smallrye.config.WithDefault;
+import java.util.Optional;
+import java.util.OptionalDouble;
+import org.apache.polaris.immutables.PolarisImmutable;
+import org.apache.polaris.misc.types.memorysize.MemorySize;
+import org.immutables.value.Value;
+
+/**
+ * Parameters to size the persistence cache. It is recommended to leave the defaults. If changes are
+ * necessary, prefer the heap-size relative options over a fixed cache size, because relative sizing
+ * is portable across instances with different heap sizes.
+ */
+@PolarisImmutable
+public interface CacheSizing {
+
+ double DEFAULT_HEAP_FRACTION = .6d;
+
+ /**
+ * Fraction of Java’s max heap size to use for cache objects, set to 0 to disable. Must not be
+ * used with fixed cache sizing. If neither this value nor a fixed size is configured, a default
+ * of {@code .4} (40%) is assumed, if {@code enable-soft-references} is enabled, else {@code .6}
+ * (60%) is assumed.
+ */
+ OptionalDouble fractionOfMaxHeapSize();
+
+ String DEFAULT_MIN_SIZE_STRING = "64M";
+ MemorySize DEFAULT_MIN_SIZE = MemorySize.valueOf(DEFAULT_MIN_SIZE_STRING);
+
+ /** When using fractional cache sizing, this amount in MB is the minimum cache size. */
+ @WithDefault(DEFAULT_MIN_SIZE_STRING)
+ Optional fractionMinSize();
+
+ String DEFAULT_HEAP_SIZE_KEEP_FREE_STRING = "256M";
+ MemorySize DEFAULT_HEAP_SIZE_KEEP_FREE = MemorySize.valueOf(DEFAULT_HEAP_SIZE_KEEP_FREE_STRING);
+
+ /**
+ * When using fractional cache sizing, this amount in MB of the heap will always be "kept free"
+ * when calculating the cache size.
+ */
+ @WithDefault(DEFAULT_HEAP_SIZE_KEEP_FREE_STRING)
+ Optional fractionAdjustment();
+
+ /** Capacity of the persistence cache in MiB. */
+ Optional fixedSize();
+
+ double DEFAULT_CACHE_CAPACITY_OVERSHOOT = 0.1d;
+ String DEFAULT_CACHE_CAPACITY_OVERSHOOT_STRING = "0.1";
+
+ /**
+ * Admitted cache-capacity-overshoot fraction, defaults to {@code 0.1} (10 %).
+ *
+ *
New elements are admitted to be added to the cache, if the cache's size is less than {@code
+ * cache-capacity * (1 + cache-capacity-overshoot}.
+ *
+ *
Cache eviction happens asynchronously. Situations when eviction cannot keep up with the
+ * amount of data added could lead to out-of-memory situations.
+ *
+ *
The value, if present, must be greater than 0.
+ */
+ @WithDefault(DEFAULT_CACHE_CAPACITY_OVERSHOOT_STRING)
+ OptionalDouble cacheCapacityOvershoot();
+
+ default long calculateEffectiveSize(long maxHeapInBytes, double defaultHeapFraction) {
+ if (fixedSize().isPresent()) {
+ return fixedSize().get().asLong();
+ }
+
+ long fractionAsBytes =
+ (long) (fractionOfMaxHeapSize().orElse(defaultHeapFraction) * maxHeapInBytes);
+
+ long freeHeap = maxHeapInBytes - fractionAsBytes;
+ long minFree = fractionAdjustment().orElse(DEFAULT_HEAP_SIZE_KEEP_FREE).asLong();
+
+ long capacityInBytes = (minFree > freeHeap) ? maxHeapInBytes - minFree : fractionAsBytes;
+
+ long fractionMin = fractionMinSize().orElse(DEFAULT_MIN_SIZE).asLong();
+ if (capacityInBytes < fractionMin) {
+ capacityInBytes = fractionMin;
+ }
+
+ return capacityInBytes;
+ }
+
+ static Builder builder() {
+ return ImmutableCacheSizing.builder();
+ }
+
+ @SuppressWarnings("unused")
+ interface Builder {
+ @CanIgnoreReturnValue
+ Builder fixedSize(MemorySize fixedSize);
+
+ @CanIgnoreReturnValue
+ Builder fixedSize(Optional extends MemorySize> fixedSize);
+
+ @CanIgnoreReturnValue
+ Builder fractionOfMaxHeapSize(double fractionOfMaxHeapSize);
+
+ @CanIgnoreReturnValue
+ Builder fractionOfMaxHeapSize(OptionalDouble fractionOfMaxHeapSize);
+
+ @CanIgnoreReturnValue
+ Builder fractionMinSize(MemorySize fractionMinSize);
+
+ @CanIgnoreReturnValue
+ Builder fractionMinSize(Optional extends MemorySize> fractionMinSize);
+
+ @CanIgnoreReturnValue
+ Builder fractionAdjustment(MemorySize fractionAdjustment);
+
+ @CanIgnoreReturnValue
+ Builder fractionAdjustment(Optional extends MemorySize> fractionAdjustment);
+
+ @CanIgnoreReturnValue
+ Builder cacheCapacityOvershoot(double cacheCapacityOvershoot);
+
+ @CanIgnoreReturnValue
+ Builder cacheCapacityOvershoot(OptionalDouble cacheCapacityOvershoot);
+
+ CacheSizing build();
+ }
+
+ @Value.Check
+ default void check() {
+ if (fractionOfMaxHeapSize().isPresent()) {
+ checkState(
+ fractionOfMaxHeapSize().getAsDouble() > 0d && fractionOfMaxHeapSize().getAsDouble() < 1d,
+ "Cache sizing: fractionOfMaxHeapSize must be > 0 and < 1, but is %s",
+ fractionOfMaxHeapSize());
+ }
+ if (fixedSize().isPresent()) {
+ long fixed = fixedSize().get().asLong();
+ checkState(
+ fixed >= 0, "Cache sizing: sizeInBytes must be greater than 0, but is %s", fixedSize());
+ }
+ checkState(
+ fractionAdjustment().orElse(DEFAULT_HEAP_SIZE_KEEP_FREE).asLong() > 64L * 1024L * 1024L,
+ "Cache sizing: heapSizeAdjustment must be greater than 64 MB, but is %s",
+ fractionAdjustment());
+ checkState(
+ cacheCapacityOvershoot().orElse(DEFAULT_CACHE_CAPACITY_OVERSHOOT) > 0d,
+ "Cache sizing: cacheCapacityOvershoot must be greater than 0, but is %s",
+ cacheCapacityOvershoot());
+ }
+}
diff --git a/persistence/nosql/persistence/api/src/main/java/org/apache/polaris/persistence/nosql/api/cache/DistributedCacheInvalidation.java b/persistence/nosql/persistence/api/src/main/java/org/apache/polaris/persistence/nosql/api/cache/DistributedCacheInvalidation.java
new file mode 100644
index 0000000000..7ba5de407d
--- /dev/null
+++ b/persistence/nosql/persistence/api/src/main/java/org/apache/polaris/persistence/nosql/api/cache/DistributedCacheInvalidation.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.persistence.nosql.api.cache;
+
+import jakarta.annotation.Nonnull;
+import org.apache.polaris.persistence.nosql.api.obj.ObjRef;
+
+public interface DistributedCacheInvalidation {
+ void evictObj(@Nonnull String realmId, @Nonnull ObjRef objRef);
+
+ void evictReference(@Nonnull String realmId, @Nonnull String refName);
+
+ interface Receiver extends DistributedCacheInvalidation {}
+
+ interface Sender extends DistributedCacheInvalidation {}
+}
diff --git a/persistence/nosql/persistence/api/src/main/java/org/apache/polaris/persistence/nosql/api/commit/CommitException.java b/persistence/nosql/persistence/api/src/main/java/org/apache/polaris/persistence/nosql/api/commit/CommitException.java
new file mode 100644
index 0000000000..b3d7382b78
--- /dev/null
+++ b/persistence/nosql/persistence/api/src/main/java/org/apache/polaris/persistence/nosql/api/commit/CommitException.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.persistence.nosql.api.commit;
+
+public abstract class CommitException extends RuntimeException {
+
+ public CommitException(String message) {
+ super(message);
+ }
+
+ public CommitException(String message, Throwable cause) {
+ super(message, cause);
+ }
+}
diff --git a/persistence/nosql/persistence/api/src/main/java/org/apache/polaris/persistence/nosql/api/commit/CommitRetryable.java b/persistence/nosql/persistence/api/src/main/java/org/apache/polaris/persistence/nosql/api/commit/CommitRetryable.java
new file mode 100644
index 0000000000..2dffdf44e0
--- /dev/null
+++ b/persistence/nosql/persistence/api/src/main/java/org/apache/polaris/persistence/nosql/api/commit/CommitRetryable.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.persistence.nosql.api.commit;
+
+import jakarta.annotation.Nonnull;
+import java.util.Optional;
+import java.util.function.Supplier;
+import org.apache.polaris.persistence.nosql.api.obj.BaseCommitObj;
+import org.apache.polaris.persistence.nosql.api.obj.Obj;
+import org.apache.polaris.persistence.nosql.api.ref.Reference;
+
+@FunctionalInterface
+public interface CommitRetryable {
+
+ /**
+ * Called from {@linkplain Committer committer} implementations.
+ *
+ *
Implementations call the {@code refObjSupplier} to retrieve the current reference object
+ * using the current state of the reference. Long-running attempt implementations that need to
+ * have the reference object early should call the supplier again shortly before from this
+ * function and attempt to perform the required checks against the latest state of the reference
+ * object. This helps in reducing unnecessary retries when the attempt can be safely applied to
+ * the latest state of the reference object.
+ *
+ *
Writes must be triggered via the various {@code write*()} functions on {@link
+ * CommitterState}, preferable via {@link CommitterState#writeOrReplace(Object, Obj, Class)}. The
+ * {@link String} keys are used as symbolic identifiers, implementations are responsible for
+ * providing keys that are unique.
+ *
+ *
Reads must happen via the specialized {@link CommitterState#persistence() Persistence}
+ * provided by the committer implementation.
+ *
+ * @param state Communicate {@linkplain Obj objects} to be persisted via {@link CommitterState}
+ * @param refObjSupplier supplier returning the {@linkplain Reference#pointer() current object},
+ * if present. Must be invoked.
+ * @return Successful attempts return a non-empty {@link Optional} containing the result. An
+ * {@linkplain Optional#empty() empty optional} indicates that a retry should be attempted.
+ * @throws CommitException Instances of this class let the whole commit operation abort.
+ */
+ @Nonnull
+ Optional attempt(
+ @Nonnull CommitterState state,
+ @Nonnull Supplier> refObjSupplier)
+ throws CommitException;
+}
diff --git a/persistence/nosql/persistence/api/src/main/java/org/apache/polaris/persistence/nosql/api/commit/Commits.java b/persistence/nosql/persistence/api/src/main/java/org/apache/polaris/persistence/nosql/api/commit/Commits.java
new file mode 100644
index 0000000000..cf06219f7f
--- /dev/null
+++ b/persistence/nosql/persistence/api/src/main/java/org/apache/polaris/persistence/nosql/api/commit/Commits.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.persistence.nosql.api.commit;
+
+import java.util.Iterator;
+import java.util.OptionalLong;
+import org.apache.polaris.persistence.nosql.api.obj.BaseCommitObj;
+
+/** Provides iterator-based access to the history of a named reference. */
+public interface Commits {
+
+ /**
+ * Retrieves the commit log in the natural, chronologically reverse order - most recent commit
+ * first.
+ */
+ Iterator commitLog(
+ String refName, OptionalLong offset, Class clazz);
+
+ /**
+ * Retrieves the commit log in chronological order starting at the given offset.
+ *
+ *
This function is useful when retrieving commits to serve events/notification use cases.
+ */
+ Iterator commitLogReversed(
+ String refName, long offset, Class clazz);
+}
diff --git a/persistence/nosql/persistence/api/src/main/java/org/apache/polaris/persistence/nosql/api/commit/Committer.java b/persistence/nosql/persistence/api/src/main/java/org/apache/polaris/persistence/nosql/api/commit/Committer.java
new file mode 100644
index 0000000000..a148b3e9ec
--- /dev/null
+++ b/persistence/nosql/persistence/api/src/main/java/org/apache/polaris/persistence/nosql/api/commit/Committer.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.persistence.nosql.api.commit;
+
+import java.util.Optional;
+import java.util.function.Supplier;
+import org.apache.polaris.persistence.nosql.api.Persistence;
+import org.apache.polaris.persistence.nosql.api.obj.BaseCommitObj;
+import org.apache.polaris.persistence.nosql.api.obj.Obj;
+import org.apache.polaris.persistence.nosql.api.ref.Reference;
+
+/**
+ * A {@link Committer} performs an atomic change against a named reference. This is a higher-level
+ * functionality building on top of the low-level {@code RetryLoop}.
+ *
+ *
Committing use cases ensure that a {@linkplain Reference#pointer() reference} always points to
+ * a consistent state, and that the change is atomic.
+ *
+ *
Committing use cases usually need to write more {@linkplain Obj objects} than just the
+ * {@linkplain Reference#pointer() referenced} one. Implementations must use {@link
+ * CommitterState#writeIntent(Object, Obj)} to get those objects being persisted. Retries can
+ * {@linkplain CommitterState#getWrittenByKey(Object) check} whether an object has already been
+ * written to prevent unnecessary write operations against the backend database.
+ *
+ *
A committing use case {@linkplain Persistence#createCommitter(String, Class, Class) creates} a
+ * {@link Committer} instance using a {@link CommitRetryable} implementation, which {@linkplain
+ * CommitRetryable#attempt(CommitterState, Supplier) receives} the {@linkplain Obj object} pointed
+ * in the {@linkplain Reference reference} and returns the new object to which the reference shall
+ * point to.
+ *
+ * @param type of the {@link Obj} {@linkplain Reference#pointer() referenced}
+ * @param the commit result type for successful commits including non-changing
+ */
+public interface Committer {
+
+ /**
+ * When called, commits to the same reference will be synchronized locally.
+ *
+ *
Using local reference-synchronization prevents commit retries. When using this feature, the
+ * actual {@link CommitRetryable#attempt(CommitterState, Supplier)} implementation must not block
+ * and complete quickly.
+ */
+ Committer synchronizingLocally();
+
+ /**
+ * Perform an atomic change.
+ *
+ *
The given {@link CommitRetryable} is called to perform the actual change. The implementation
+ * of the {@link CommitRetryable} must be side-effect-free and prepared to be called multiple
+ * times.
+ *
+ * @param commitRetryable performs the state change, must be side-effect-free
+ * @return the result as returned via {@link CommitterState#commitResult(Object,
+ * BaseCommitObj.Builder, Optional)} or an empty optional if {@linkplain
+ * CommitterState#noCommit() no change happened}
+ */
+ Optional commit(CommitRetryable commitRetryable)
+ throws CommitException, RetryTimeoutException;
+
+ /**
+ * Same as {@link #commit(CommitRetryable)}, but wraps the checked exceptions in a {@link
+ * RuntimeException}.
+ */
+ default Optional commitRuntimeException(
+ CommitRetryable commitRetryable) {
+ try {
+ return commit(commitRetryable);
+ } catch (RetryTimeoutException e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
diff --git a/persistence/nosql/persistence/api/src/main/java/org/apache/polaris/persistence/nosql/api/commit/CommitterState.java b/persistence/nosql/persistence/api/src/main/java/org/apache/polaris/persistence/nosql/api/commit/CommitterState.java
new file mode 100644
index 0000000000..4142c4b581
--- /dev/null
+++ b/persistence/nosql/persistence/api/src/main/java/org/apache/polaris/persistence/nosql/api/commit/CommitterState.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.persistence.nosql.api.commit;
+
+import jakarta.annotation.Nonnull;
+import java.util.Optional;
+import org.apache.polaris.persistence.nosql.api.obj.BaseCommitObj;
+
+public interface CommitterState extends UpdateState {
+ > Optional commitResult(
+ @Nonnull RESULT result, @Nonnull B refObjBuilder, @Nonnull Optional refObj);
+
+ Optional noCommit();
+
+ Optional noCommit(@Nonnull RESULT result);
+}
diff --git a/persistence/nosql/persistence/api/src/main/java/org/apache/polaris/persistence/nosql/api/commit/FairRetriesType.java b/persistence/nosql/persistence/api/src/main/java/org/apache/polaris/persistence/nosql/api/commit/FairRetriesType.java
new file mode 100644
index 0000000000..f27b0a7ec8
--- /dev/null
+++ b/persistence/nosql/persistence/api/src/main/java/org/apache/polaris/persistence/nosql/api/commit/FairRetriesType.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.persistence.nosql.api.commit;
+
+public enum FairRetriesType {
+ UNFAIR,
+ SLEEPING,
+}
diff --git a/persistence/nosql/persistence/api/src/main/java/org/apache/polaris/persistence/nosql/api/commit/RetryConfig.java b/persistence/nosql/persistence/api/src/main/java/org/apache/polaris/persistence/nosql/api/commit/RetryConfig.java
new file mode 100644
index 0000000000..109a333d84
--- /dev/null
+++ b/persistence/nosql/persistence/api/src/main/java/org/apache/polaris/persistence/nosql/api/commit/RetryConfig.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.persistence.nosql.api.commit;
+
+import io.smallrye.config.WithDefault;
+import java.time.Duration;
+import org.apache.polaris.immutables.PolarisImmutable;
+import org.immutables.value.Value;
+
+public interface RetryConfig {
+ RetryConfig DEFAULT_RETRY_CONFIG = BuildableRetryConfig.builder().build();
+
+ /**
+ * Maximum allowed time until a retry-loop and {@linkplain Committer#commit(CommitRetryable)
+ * commits} fails with a {@link RetryTimeoutException}, defaults to {@value #DEFAULT_TIMEOUT}.
+ */
+ @WithDefault(DEFAULT_TIMEOUT)
+ Duration timeout();
+
+ /** Maximum number of allowed retries, defaults to {@value #DEFAULT_RETRIES}. */
+ @WithDefault(DEFAULT_RETRIES)
+ int retries();
+
+ /**
+ * Initial lower bound for a retry-sleep duration for the retry-loop, defaults to {@link
+ * #DEFAULT_RETRY_INITIAL_SLEEP_LOWER}. This value will be doubled after each retry, as long as
+ * {@link #maxSleep()} is not exceeded. A concrete sleep duration will be randomly chosen between
+ * the current lower and upper bounds.
+ */
+ @WithDefault(DEFAULT_RETRY_INITIAL_SLEEP_LOWER)
+ Duration initialSleepLower();
+
+ /**
+ * Initial upper bound for a retry-sleep duration for the retry-loop, defaults to {@link
+ * #DEFAULT_RETRY_INITIAL_SLEEP_UPPER}. This value will be doubled after each retry, as long as
+ * {@link #maxSleep()} is not exceeded. A concrete sleep duration will be randomly chosen between
+ * the current lower and upper bounds.
+ */
+ @WithDefault(DEFAULT_RETRY_INITIAL_SLEEP_UPPER)
+ Duration initialSleepUpper();
+
+ /** Maximum retry-sleep duration, defaults to {@link #DEFAULT_RETRY_MAX_SLEEP}. */
+ @WithDefault(DEFAULT_RETRY_MAX_SLEEP)
+ Duration maxSleep();
+
+ /**
+ * Without mitigation, very frequently started retry-loops running against highly contended
+ * resources can result in some retry-loops invocations never making any progress and eventually
+ * time out.
+ *
+ *
The default "fair retries type" helps in these scenarios with sacrificing the overall
+ * throughput too much.
+ */
+ @WithDefault("SLEEPING")
+ FairRetriesType fairRetries();
+
+ String DEFAULT_TIMEOUT = "PT15S";
+ String DEFAULT_RETRIES = "10000";
+ String DEFAULT_RETRY_INITIAL_SLEEP_LOWER = "PT0.010S";
+ String DEFAULT_RETRY_INITIAL_SLEEP_UPPER = "PT0.020S";
+ String DEFAULT_RETRY_MAX_SLEEP = "PT0.250S";
+
+ @PolarisImmutable
+ interface BuildableRetryConfig extends RetryConfig {
+
+ static ImmutableBuildableRetryConfig.Builder builder() {
+ return ImmutableBuildableRetryConfig.builder();
+ }
+
+ @Override
+ @Value.Default
+ default Duration timeout() {
+ return Duration.parse(DEFAULT_TIMEOUT);
+ }
+
+ @Override
+ @Value.Default
+ default int retries() {
+ return Integer.parseInt(DEFAULT_RETRIES);
+ }
+
+ @Override
+ @Value.Default
+ default Duration initialSleepLower() {
+ return Duration.parse(DEFAULT_RETRY_INITIAL_SLEEP_LOWER);
+ }
+
+ @Override
+ @Value.Default
+ default Duration initialSleepUpper() {
+ return Duration.parse(DEFAULT_RETRY_INITIAL_SLEEP_UPPER);
+ }
+
+ @Override
+ @Value.Default
+ default Duration maxSleep() {
+ return Duration.parse(DEFAULT_RETRY_MAX_SLEEP);
+ }
+
+ @Override
+ @Value.Default
+ default FairRetriesType fairRetries() {
+ return FairRetriesType.SLEEPING;
+ }
+ }
+}
diff --git a/persistence/nosql/persistence/api/src/main/java/org/apache/polaris/persistence/nosql/api/commit/RetryTimeoutException.java b/persistence/nosql/persistence/api/src/main/java/org/apache/polaris/persistence/nosql/api/commit/RetryTimeoutException.java
new file mode 100644
index 0000000000..cac5b2b3ef
--- /dev/null
+++ b/persistence/nosql/persistence/api/src/main/java/org/apache/polaris/persistence/nosql/api/commit/RetryTimeoutException.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.persistence.nosql.api.commit;
+
+import java.time.Duration;
+
+/**
+ * Thrown to indicate that a retryable ({@linkplain Committer#commit(CommitRetryable) commit})
+ * attempt eventually failed due to a timeout.
+ */
+public final class RetryTimeoutException extends Exception {
+
+ private final int retry;
+ private final long timeNanos;
+
+ public RetryTimeoutException(int retry, long timeNanos) {
+ super("Retry timeout after " + Duration.ofNanos(timeNanos) + ", " + retry + " retries");
+ this.retry = retry;
+ this.timeNanos = timeNanos;
+ }
+
+ public int getRetry() {
+ return retry;
+ }
+
+ public long getTimeNanos() {
+ return timeNanos;
+ }
+}
diff --git a/persistence/nosql/persistence/api/src/main/java/org/apache/polaris/persistence/nosql/api/commit/UpdateState.java b/persistence/nosql/persistence/api/src/main/java/org/apache/polaris/persistence/nosql/api/commit/UpdateState.java
new file mode 100644
index 0000000000..ed1611cce4
--- /dev/null
+++ b/persistence/nosql/persistence/api/src/main/java/org/apache/polaris/persistence/nosql/api/commit/UpdateState.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.persistence.nosql.api.commit;
+
+import jakarta.annotation.Nonnull;
+import java.util.function.Supplier;
+import org.apache.polaris.persistence.nosql.api.Persistence;
+import org.apache.polaris.persistence.nosql.api.obj.Obj;
+import org.apache.polaris.persistence.nosql.api.obj.ObjRef;
+
+public interface UpdateState {
+
+ /**
+ * Use this instance of {@link Persistence} instead for operations related to this state,
+ * especially from {@link CommitRetryable#attempt(CommitterState, Supplier)}.
+ */
+ Persistence persistence();
+
+ /**
+ * Add {@code obj} to the list of objects to be persisted, using {@code key} to {@linkplain
+ * #getWrittenByKey(Object) identify/reuse} an already persisted object in a retried attempt.
+ *
+ *
Prefer this function over {@link #writeIntent(Object, Obj)} and {@link
+ * #getWrittenByKey(Object)}.
+ *
+ *
Note that objects will not be immediately persisted, but after the {@linkplain
+ * CommitRetryable#attempt(CommitterState, Supplier) attempt returns}, but before the {@linkplain
+ * Committer#commit(CommitRetryable) commit returns}.
+ *
+ *
A {@linkplain Committer#commit(CommitRetryable) failed commit} will delete objects passed to
+ * this function.
+ *
+ * @param key key identifying {@code obj}
+ * @param obj object to persist
+ * @return returns the given {@code obj}, if {@code key} is new, or the previous {@linkplain Obj},
+ * if {@code key} was already used in a call to this function or {@link #writeIntent(Object,
+ * Obj)}.
+ */
+ O writeIfNew(@Nonnull Object key, @Nonnull O obj, @Nonnull Class type);
+
+ default Obj writeIfNew(@Nonnull Object key, @Nonnull Obj obj) {
+ return writeIfNew(key, obj, Obj.class);
+ }
+
+ /**
+ * Add {@code obj} to the list of objects to be persisted, using {@code key} to {@linkplain
+ * #getWrittenByKey(Object) identify/reuse} an already persisted object in a retried attempt.
+ *
+ *
If an object was already associated with the same {@code key}, the previous object will be
+ * eventually deleted.
+ *
+ * @param key key identifying {@code obj}
+ * @param obj object to persist
+ * @return returns {@code obj}
+ */
+ O writeOrReplace(@Nonnull Object key, @Nonnull O obj, @Nonnull Class type);
+
+ default Obj writeOrReplace(@Nonnull Object key, @Nonnull Obj obj) {
+ return writeOrReplace(key, obj, Obj.class);
+ }
+
+ /**
+ * Get an already present object by a use-case defined key.
+ *
+ * @return the already present object or {@code null}, if no object is associated with the {@code
+ * key}
+ */
+ Obj getWrittenByKey(@Nonnull Object key);
+
+ /**
+ * Get an already present object by its {@link ObjRef}.
+ *
+ * @return the already present object or {@code null}, if no object is associated with the {@code
+ * id}
+ */
+ C getWrittenById(ObjRef id, Class clazz);
+
+ /**
+ * Add {@code obj} to the list of objects to be persisted, using {@code key} to {@linkplain
+ * #getWrittenByKey(Object) identify/reuse} an already persisted object in a retried attempt.
+ *
+ *
Note that objects will not be immediately persisted, but after the {@linkplain
+ * CommitRetryable#attempt(CommitterState, Supplier) attempt returns}, but before the {@linkplain
+ * Committer#commit(CommitRetryable) commit returns}.
+ *
+ *
A {@linkplain Committer#commit(CommitRetryable) failed commit} will delete objects passed to
+ * this function.
+ *
+ *
Prefer {@link #writeIfNew(Object, Obj)}, if possible.
+ *
+ * @param key key identifying {@code obj}, must be unique across all objects. Throws an {@link
+ * IllegalStateException}, if the {@code key} has already been used.
+ * @param obj object to persist
+ */
+ void writeIntent(@Nonnull Object key, @Nonnull Obj obj);
+}
diff --git a/persistence/nosql/persistence/api/src/main/java/org/apache/polaris/persistence/nosql/api/exceptions/PersistenceException.java b/persistence/nosql/persistence/api/src/main/java/org/apache/polaris/persistence/nosql/api/exceptions/PersistenceException.java
new file mode 100644
index 0000000000..65f289bb97
--- /dev/null
+++ b/persistence/nosql/persistence/api/src/main/java/org/apache/polaris/persistence/nosql/api/exceptions/PersistenceException.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.persistence.nosql.api.exceptions;
+
+public abstract class PersistenceException extends RuntimeException {
+ public PersistenceException(Throwable cause) {
+ super(cause);
+ }
+
+ public PersistenceException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public PersistenceException(String message) {
+ super(message);
+ }
+}
diff --git a/persistence/nosql/persistence/api/src/main/java/org/apache/polaris/persistence/nosql/api/exceptions/ReferenceAlreadyExistsException.java b/persistence/nosql/persistence/api/src/main/java/org/apache/polaris/persistence/nosql/api/exceptions/ReferenceAlreadyExistsException.java
new file mode 100644
index 0000000000..59804f0eba
--- /dev/null
+++ b/persistence/nosql/persistence/api/src/main/java/org/apache/polaris/persistence/nosql/api/exceptions/ReferenceAlreadyExistsException.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.persistence.nosql.api.exceptions;
+
+public class ReferenceAlreadyExistsException extends PersistenceException {
+ public ReferenceAlreadyExistsException(String message) {
+ super(message);
+ }
+}
diff --git a/persistence/nosql/persistence/api/src/main/java/org/apache/polaris/persistence/nosql/api/exceptions/ReferenceNotFoundException.java b/persistence/nosql/persistence/api/src/main/java/org/apache/polaris/persistence/nosql/api/exceptions/ReferenceNotFoundException.java
new file mode 100644
index 0000000000..85c8afe6bf
--- /dev/null
+++ b/persistence/nosql/persistence/api/src/main/java/org/apache/polaris/persistence/nosql/api/exceptions/ReferenceNotFoundException.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.persistence.nosql.api.exceptions;
+
+public class ReferenceNotFoundException extends PersistenceException {
+ public ReferenceNotFoundException(String message) {
+ super(message);
+ }
+}
diff --git a/persistence/nosql/persistence/api/src/main/java/org/apache/polaris/persistence/nosql/api/exceptions/UnknownOperationResultException.java b/persistence/nosql/persistence/api/src/main/java/org/apache/polaris/persistence/nosql/api/exceptions/UnknownOperationResultException.java
new file mode 100644
index 0000000000..22ec764d6b
--- /dev/null
+++ b/persistence/nosql/persistence/api/src/main/java/org/apache/polaris/persistence/nosql/api/exceptions/UnknownOperationResultException.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.persistence.nosql.api.exceptions;
+
+import org.apache.polaris.persistence.nosql.api.Persistence;
+
+/**
+ * Thrown by {@link Persistence} implementations when the result of a database operation is unknown,
+ * for example, due to a timeout.
+ */
+public class UnknownOperationResultException extends PersistenceException {
+ public UnknownOperationResultException(Throwable cause) {
+ super(cause);
+ }
+
+ public UnknownOperationResultException(String message, Throwable cause) {
+ super(message, cause);
+ }
+}
diff --git a/persistence/nosql/persistence/api/src/main/java/org/apache/polaris/persistence/nosql/api/index/EmptyIndex.java b/persistence/nosql/persistence/api/src/main/java/org/apache/polaris/persistence/nosql/api/index/EmptyIndex.java
new file mode 100644
index 0000000000..cda55ba743
--- /dev/null
+++ b/persistence/nosql/persistence/api/src/main/java/org/apache/polaris/persistence/nosql/api/index/EmptyIndex.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.persistence.nosql.api.index;
+
+import jakarta.annotation.Nonnull;
+import jakarta.annotation.Nullable;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+final class EmptyIndex {
+ private static final Index> EMPTY =
+ new Index