diff --git a/bom/build.gradle.kts b/bom/build.gradle.kts index 0564351ff1..945ce22de2 100644 --- a/bom/build.gradle.kts +++ b/bom/build.gradle.kts @@ -53,6 +53,7 @@ dependencies { api(project(":polaris-persistence-nosql-testextension")) api(project(":polaris-persistence-nosql-inmemory")) + api(project(":polaris-persistence-nosql-mongodb")) api(project(":polaris-config-docs-annotations")) api(project(":polaris-config-docs-generator")) diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index cdfa650052..2e714797c2 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -85,6 +85,7 @@ micrometer-bom = { module = "io.micrometer:micrometer-bom", version = "1.15.5" } microprofile-fault-tolerance-api = { module = "org.eclipse.microprofile.fault-tolerance:microprofile-fault-tolerance-api", version = "4.1.2" } mockito-core = { module = "org.mockito:mockito-core", version = "5.20.0" } mockito-junit-jupiter = { module = "org.mockito:mockito-junit-jupiter", version = "5.20.0" } +mongodb-driver-sync = { module = "org.mongodb:mongodb-driver-sync", version = "5.6.1" } opentelemetry-bom = { module = "io.opentelemetry:opentelemetry-bom", version = "1.55.0" } opentelemetry-instrumentation-bom-alpha = { module = "io.opentelemetry.instrumentation:opentelemetry-instrumentation-bom-alpha", version= "2.20.1-alpha" } opentelemetry-semconv = { module = "io.opentelemetry.semconv:opentelemetry-semconv", version = "1.37.0" } diff --git a/gradle/projects.main.properties b/gradle/projects.main.properties index 11f74e681b..1a68e1a9e3 100644 --- a/gradle/projects.main.properties +++ b/gradle/projects.main.properties @@ -69,3 +69,4 @@ polaris-persistence-nosql-testextension=persistence/nosql/persistence/testextens polaris-persistence-nosql-varint=persistence/nosql/persistence/varint # persistence / database specific implementations polaris-persistence-nosql-inmemory=persistence/nosql/persistence/db/inmemory +polaris-persistence-nosql-mongodb=persistence/nosql/persistence/db/mongodb diff --git a/persistence/nosql/persistence/db/mongodb/build.gradle.kts b/persistence/nosql/persistence/db/mongodb/build.gradle.kts new file mode 100644 index 0000000000..301d48ed38 --- /dev/null +++ b/persistence/nosql/persistence/db/mongodb/build.gradle.kts @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +plugins { + id("org.kordamp.gradle.jandex") + id("polaris-server") +} + +description = "Polaris NoSQL persistence, MongoDB implementation" + +dependencies { + implementation(project(":polaris-persistence-nosql-api")) + implementation(project(":polaris-persistence-nosql-impl")) + implementation(project(":polaris-idgen-api")) + + implementation(libs.mongodb.driver.sync) + + implementation(libs.guava) + implementation(libs.slf4j.api) + + compileOnly(libs.jakarta.annotation.api) + compileOnly(libs.jakarta.validation.api) + compileOnly(libs.jakarta.inject.api) + compileOnly(libs.jakarta.enterprise.cdi.api) + compileOnly(libs.smallrye.config.core) + compileOnly(platform(libs.quarkus.bom)) + compileOnly("io.quarkus:quarkus-core") + + compileOnly(project(":polaris-immutables")) + annotationProcessor(project(":polaris-immutables", configuration = "processor")) + + compileOnly(platform(libs.jackson.bom)) + compileOnly("com.fasterxml.jackson.core:jackson-annotations") + compileOnly("com.fasterxml.jackson.core:jackson-databind") + + testFixturesApi(testFixtures(project(":polaris-persistence-nosql-impl"))) + + testFixturesCompileOnly(libs.jakarta.annotation.api) + testFixturesCompileOnly(libs.jakarta.validation.api) + + testFixturesCompileOnly(project(":polaris-immutables")) + testFixturesAnnotationProcessor(project(":polaris-immutables", configuration = "processor")) + + testFixturesImplementation(project(":polaris-container-spec-helper")) + testFixturesImplementation(platform(libs.testcontainers.bom)) + testFixturesImplementation("org.testcontainers:testcontainers-mongodb") +} + +testing { + suites { + val intTest by + registering(JvmTestSuite::class) { + dependencies { + runtimeOnly(platform(libs.testcontainers.bom)) + runtimeOnly("org.testcontainers:testcontainers-mongodb") + } + } + } +} diff --git a/persistence/nosql/persistence/db/mongodb/src/intTest/java/org/apache/polaris/persistence/nosql/mongodb/TestMongoDbPersistence.java b/persistence/nosql/persistence/db/mongodb/src/intTest/java/org/apache/polaris/persistence/nosql/mongodb/TestMongoDbPersistence.java new file mode 100644 index 0000000000..e0fcb6ce47 --- /dev/null +++ b/persistence/nosql/persistence/db/mongodb/src/intTest/java/org/apache/polaris/persistence/nosql/mongodb/TestMongoDbPersistence.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.persistence.nosql.mongodb; + +import org.apache.polaris.persistence.nosql.api.Persistence; +import org.apache.polaris.persistence.nosql.impl.AbstractPersistenceTests; +import org.apache.polaris.persistence.nosql.testextension.BackendSpec; +import org.apache.polaris.persistence.nosql.testextension.PolarisPersistence; + +@BackendSpec(name = MongoDbBackendFactory.NAME) +public class TestMongoDbPersistence extends AbstractPersistenceTests { + @PolarisPersistence protected Persistence persistence; + + @Override + protected Persistence persistence() { + return persistence; + } +} diff --git a/persistence/nosql/persistence/db/mongodb/src/intTest/resources/logback-test.xml b/persistence/nosql/persistence/db/mongodb/src/intTest/resources/logback-test.xml new file mode 100644 index 0000000000..fb74fc2c54 --- /dev/null +++ b/persistence/nosql/persistence/db/mongodb/src/intTest/resources/logback-test.xml @@ -0,0 +1,30 @@ + + + + + + + %date{ISO8601} [%thread] %-5level %logger{36} - %msg%n + + + + + + diff --git a/persistence/nosql/persistence/db/mongodb/src/main/java/org/apache/polaris/persistence/nosql/mongodb/MongoDbBackend.java b/persistence/nosql/persistence/db/mongodb/src/main/java/org/apache/polaris/persistence/nosql/mongodb/MongoDbBackend.java new file mode 100644 index 0000000000..cdb46946ba --- /dev/null +++ b/persistence/nosql/persistence/db/mongodb/src/main/java/org/apache/polaris/persistence/nosql/mongodb/MongoDbBackend.java @@ -0,0 +1,649 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.persistence.nosql.mongodb; + +import static com.mongodb.ErrorCategory.DUPLICATE_KEY; +import static com.mongodb.client.model.Filters.and; +import static com.mongodb.client.model.Filters.eq; +import static com.mongodb.client.model.Filters.in; +import static com.mongodb.client.model.Projections.fields; +import static com.mongodb.client.model.Projections.include; +import static com.mongodb.client.model.Updates.set; +import static java.util.stream.Collectors.toList; +import static org.apache.polaris.persistence.nosql.api.backend.PersistId.persistId; +import static org.apache.polaris.persistence.nosql.impl.Identifiers.COL_OBJ_CREATED_AT; +import static org.apache.polaris.persistence.nosql.impl.Identifiers.COL_OBJ_ID; +import static org.apache.polaris.persistence.nosql.impl.Identifiers.COL_OBJ_PART; +import static org.apache.polaris.persistence.nosql.impl.Identifiers.COL_OBJ_REAL_PART_NUM; +import static org.apache.polaris.persistence.nosql.impl.Identifiers.COL_OBJ_TYPE; +import static org.apache.polaris.persistence.nosql.impl.Identifiers.COL_OBJ_VALUE; +import static org.apache.polaris.persistence.nosql.impl.Identifiers.COL_OBJ_VERSION; +import static org.apache.polaris.persistence.nosql.impl.Identifiers.COL_REALM; +import static org.apache.polaris.persistence.nosql.impl.Identifiers.COL_REF_CREATED_AT; +import static org.apache.polaris.persistence.nosql.impl.Identifiers.COL_REF_NAME; +import static org.apache.polaris.persistence.nosql.impl.Identifiers.COL_REF_POINTER; +import static org.apache.polaris.persistence.nosql.impl.Identifiers.COL_REF_PREVIOUS; +import static org.apache.polaris.persistence.nosql.impl.Identifiers.TABLE_OBJS; +import static org.apache.polaris.persistence.nosql.impl.Identifiers.TABLE_REFS; +import static org.apache.polaris.persistence.nosql.impl.PersistenceImplementation.deserialize; +import static org.apache.polaris.persistence.nosql.impl.PersistenceImplementation.serialize; + +import com.google.common.collect.Maps; +import com.mongodb.CursorType; +import com.mongodb.MongoBulkWriteException; +import com.mongodb.MongoException; +import com.mongodb.MongoExecutionTimeoutException; +import com.mongodb.MongoInterruptedException; +import com.mongodb.MongoServerUnavailableException; +import com.mongodb.MongoSocketReadTimeoutException; +import com.mongodb.MongoTimeoutException; +import com.mongodb.MongoWriteException; +import com.mongodb.WriteError; +import com.mongodb.bulk.BulkWriteError; +import com.mongodb.bulk.BulkWriteResult; +import com.mongodb.client.FindIterable; +import com.mongodb.client.MongoClient; +import com.mongodb.client.MongoCollection; +import com.mongodb.client.MongoDatabase; +import com.mongodb.client.model.FindOneAndReplaceOptions; +import com.mongodb.client.model.ReplaceOneModel; +import com.mongodb.client.model.ReplaceOptions; +import com.mongodb.client.model.ReturnDocument; +import com.mongodb.client.model.WriteModel; +import com.mongodb.client.result.UpdateResult; +import jakarta.annotation.Nonnull; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.function.Function; +import org.apache.polaris.ids.api.IdGenerator; +import org.apache.polaris.ids.api.MonotonicClock; +import org.apache.polaris.persistence.nosql.api.Persistence; +import org.apache.polaris.persistence.nosql.api.PersistenceParams; +import org.apache.polaris.persistence.nosql.api.backend.Backend; +import org.apache.polaris.persistence.nosql.api.backend.FetchedObj; +import org.apache.polaris.persistence.nosql.api.backend.PersistId; +import org.apache.polaris.persistence.nosql.api.backend.WriteObj; +import org.apache.polaris.persistence.nosql.api.exceptions.ReferenceNotFoundException; +import org.apache.polaris.persistence.nosql.api.exceptions.UnknownOperationResultException; +import org.apache.polaris.persistence.nosql.api.obj.ObjRef; +import org.apache.polaris.persistence.nosql.api.ref.ImmutableReference; +import org.apache.polaris.persistence.nosql.api.ref.Reference; +import org.apache.polaris.persistence.nosql.impl.PersistenceImplementation; +import org.bson.Document; +import org.bson.conversions.Bson; +import org.bson.types.Binary; + +final class MongoDbBackend implements Backend { + + static final String ID_PROPERTY_NAME = "_id"; + + private static final ReplaceOptions WRITE_OPTIONS; + + static { + ReplaceOptions options = new ReplaceOptions(); + options.upsert(true); + WRITE_OPTIONS = options; + } + + private final MongoDbBackendConfig config; + private final MongoClient client; + private MongoCollection refs; + private MongoCollection objs; + + MongoDbBackend(MongoDbBackendConfig config) { + this.config = config; + this.client = config.client(); + } + + @Nonnull + MongoCollection refs() { + return refs; + } + + @Nonnull + MongoCollection objs() { + return objs; + } + + private synchronized void initialize() { + if (refs == null) { + String databaseName = config.databaseName(); + MongoDatabase database = + client.getDatabase(Objects.requireNonNull(databaseName, "Database name must be set")); + + refs = database.getCollection(TABLE_REFS); + objs = database.getCollection(TABLE_OBJS); + } + } + + @Override + @Nonnull + public String type() { + return MongoDbBackendFactory.NAME; + } + + @Override + public boolean supportsRealmDeletion() { + return config.allowPrefixDeletion(); + } + + @Override + public synchronized void close() { + if (config.closeClient()) { + client.close(); + } + } + + @Nonnull + @Override + public Persistence newPersistence( + Function backendWrapper, + @Nonnull PersistenceParams persistenceParams, + String realmId, + MonotonicClock monotonicClock, + IdGenerator idGenerator) { + initialize(); + return new PersistenceImplementation( + backendWrapper.apply(this), persistenceParams, realmId, monotonicClock, idGenerator); + } + + @Override + public Optional setupSchema() { + initialize(); + return Optional.of("database name: " + config.databaseName()); + } + + @Override + public void deleteRealms(Set realmIds) { + if (realmIds.isEmpty()) { + return; + } + + try { + Bson realmIdFilter = in(ID_PROPERTY_NAME + "." + COL_REALM, new HashSet<>(realmIds)); + var failed = new ArrayList(); + for (var coll : List.of(refs, objs)) { + var res = coll.deleteMany(realmIdFilter); + var ack = res.wasAcknowledged(); + var deleted = res.getDeletedCount(); + if (!ack) { + failed.add( + "realms deletion of collection " + + coll + + " was not acknowledged, reported " + + deleted + + " documents"); + } + } + if (!failed.isEmpty()) { + throw new RuntimeException(String.join(", ", failed)); + } + } catch (RuntimeException e) { + throw unhandledException(e); + } + } + + @Override + public void batchDeleteRefs(Map> realmRefs) { + var idDocs = + realmRefs.entrySet().stream() + .flatMap( + e -> { + var realmId = e.getKey(); + return e.getValue().stream() + .map( + ref -> { + var idDoc = new Document(); + idDoc.put(COL_REALM, realmId); + idDoc.put(COL_REF_NAME, ref); + return idDoc; + }); + }) + .toList(); + if (idDocs.isEmpty()) { + return; + } + try { + refs.deleteMany(in(ID_PROPERTY_NAME, idDocs)); + } catch (RuntimeException e) { + throw unhandledException(e); + } + } + + @Override + public void batchDeleteObjs(Map> realmObjs) { + var idDocs = + realmObjs.entrySet().stream() + .flatMap( + e -> { + var realmId = e.getKey(); + return e.getValue().stream() + .map( + id -> { + var idDoc = new Document(); + idDoc.put(COL_REALM, realmId); + idDoc.put(COL_OBJ_ID, id.id()); + idDoc.put(COL_OBJ_PART, id.part()); + return idDoc; + }); + }) + .toList(); + if (idDocs.isEmpty()) { + return; + } + try { + objs.deleteMany(in(ID_PROPERTY_NAME, idDocs)); + } catch (RuntimeException e) { + throw unhandledException(e); + } + } + + @Override + public void scanBackend( + @Nonnull ReferenceScanCallback referenceConsumer, @Nonnull ObjScanCallback objConsumer) { + try (var cursor = + refs.find() + .batchSize(50) + .cursorType(CursorType.NonTailable) + .projection( + fields(include(ID_PROPERTY_NAME, COL_REALM, COL_REF_NAME, COL_REF_CREATED_AT))) + .cursor()) { + while (cursor.hasNext()) { + var d = cursor.next(); + var id = d.get(ID_PROPERTY_NAME, Document.class); + var realmId = id.getString(COL_REALM); + var ref = id.getString(COL_REF_NAME); + var cr = d.getLong(COL_REF_CREATED_AT); + referenceConsumer.call(realmId, ref, cr); + } + } catch (RuntimeException e) { + throw unhandledException(e); + } + try (var cursor = + objs.find() + .batchSize(50) + .cursorType(CursorType.NonTailable) + .projection(fields(include(ID_PROPERTY_NAME, COL_OBJ_TYPE, COL_OBJ_CREATED_AT))) + .cursor()) { + while (cursor.hasNext()) { + var d = cursor.next(); + var t = d.getString(COL_OBJ_TYPE); + var id = d.get(ID_PROPERTY_NAME, Document.class); + var realmId = id.getString(COL_REALM); + var i = id.getLong(COL_OBJ_ID); + var p = id.getInteger(COL_OBJ_PART); + var cr = d.getLong(COL_OBJ_CREATED_AT); + objConsumer.call(realmId, t, persistId(i, p), cr); + } + } catch (RuntimeException e) { + throw unhandledException(e); + } + } + + @Override + public boolean createReference(@Nonnull String realmId, @Nonnull Reference newRef) { + var doc = newReferenceDoc(realmId, newRef); + try { + refs().insertOne(doc); + } catch (MongoWriteException e) { + if (e.getError().getCategory() == DUPLICATE_KEY) { + return false; + } + throw unhandledException(e); + } catch (RuntimeException e) { + throw unhandledException(e); + } + return true; + } + + private Document newReferenceDoc(@Nonnull String realmId, @Nonnull Reference newRef) { + var doc = new Document(); + doc.put(ID_PROPERTY_NAME, idRefDoc(realmId, newRef)); + doc.put(COL_REF_POINTER, serialize(newRef.pointer())); + doc.put(COL_REF_CREATED_AT, newRef.createdAtMicros()); + byte[] previous = serialize(newRef.previousPointers()); + if (previous != null) { + doc.put(COL_REF_PREVIOUS, new Binary(previous)); + } + return doc; + } + + @Override + public void createReferences(@Nonnull String realmId, @Nonnull List newRefs) { + if (newRefs.isEmpty()) { + return; + } + var docs = newRefs.stream().map(r -> newReferenceDoc(realmId, r)).toList(); + try { + refs().insertMany(docs); + } catch (MongoBulkWriteException e) { + if (e.getWriteErrors().stream().anyMatch(we -> we.getCategory() != DUPLICATE_KEY)) { + throw unhandledException(e); + } + } catch (RuntimeException e) { + throw unhandledException(e); + } + } + + @Override + public boolean updateReference( + @Nonnull String realmId, + @Nonnull Reference updatedRef, + @Nonnull Optional expectedPointer) { + var updates = new ArrayList(); + updates.add(set(COL_REF_POINTER, serialize(updatedRef.pointer()))); + var previous = serialize(updatedRef.previousPointers()); + if (previous != null) { + updates.add(set(COL_REF_PREVIOUS, new Binary(previous))); + } + + UpdateResult result; + try { + + var filters = new ArrayList(5); + filters.add(eq(ID_PROPERTY_NAME, idRefDoc(realmId, updatedRef))); + filters.add(eq(COL_REF_POINTER, serialize(expectedPointer))); + filters.add(eq(COL_REF_CREATED_AT, updatedRef.createdAtMicros())); + var condition = and(filters); + + result = refs().updateOne(condition, updates); + } catch (RuntimeException e) { + throw unhandledException(e); + } + + if (result.getModifiedCount() == 1) { + return true; + } + if (result.getModifiedCount() == 0 && result.getMatchedCount() == 1) { + // not updated + return false; + } + + fetchReference(realmId, updatedRef.name()); + return false; + } + + @Override + @Nonnull + public Reference fetchReference(@Nonnull String realmId, @Nonnull String name) { + FindIterable result; + try { + result = refs().find(eq(ID_PROPERTY_NAME, idRefDoc(realmId, name))); + } catch (RuntimeException e) { + throw unhandledException(e); + } + + var doc = result.first(); + if (doc == null) { + throw new ReferenceNotFoundException(name); + } + + return ImmutableReference.builder() + .name(name) + .pointer( + Optional.ofNullable( + deserialize(doc.get(COL_REF_POINTER, Binary.class).getData(), ObjRef.class))) + .createdAtMicros(doc.getLong(COL_REF_CREATED_AT)) + .previousPointers( + deserialize(doc.get(COL_REF_PREVIOUS, Binary.class).getData(), long[].class)) + .build(); + } + + @Override + @Nonnull + public Map fetch(@Nonnull String realmId, @Nonnull Set ids) { + var list = ids.stream().map(id -> idObjDoc(realmId, id)).toList(); + + var r = Maps.newHashMapWithExpectedSize(ids.size()); + + if (!list.isEmpty()) { + FindIterable result; + try { + result = objs().find(in(ID_PROPERTY_NAME, list)); + } catch (RuntimeException e) { + throw unhandledException(e); + } + for (var doc : result) { + var obj = docToFetched(doc); + var id = docToPersistId(doc); + r.put(id, obj); + } + } + + return r; + } + + @Override + public void write(@Nonnull String realmId, @Nonnull List writes) { + var docs = new ArrayList>(writes.size()); + for (WriteObj write : writes) { + var idDoc = idFetchedDoc(realmId, write); + docs.add( + new ReplaceOneModel<>( + eq(ID_PROPERTY_NAME, idDoc), + objToDoc( + idDoc, + write.type(), + write.serialized(), + write.createdAtMicros(), + null, + write.partNum()), + WRITE_OPTIONS)); + } + + List> updates = new ArrayList<>(docs); + if (!updates.isEmpty()) { + BulkWriteResult res; + try { + res = objs().bulkWrite(updates); + } catch (RuntimeException e) { + throw unhandledException(e); + } + if (!res.wasAcknowledged()) { + throw new RuntimeException("Upsert not acknowledged"); + } + } + } + + @Override + public void delete(@Nonnull String realmId, @Nonnull Set ids) { + var list = ids.stream().map(id -> idObjDoc(realmId, id)).collect(toList()); + if (list.isEmpty()) { + return; + } + try { + objs().deleteMany(in(ID_PROPERTY_NAME, list)); + } catch (RuntimeException e) { + throw unhandledException(e); + } + } + + @Override + public boolean conditionalInsert( + @Nonnull String realmId, + String objTypeId, + @Nonnull PersistId persistId, + long createdAtMicros, + @Nonnull String versionToken, + @Nonnull byte[] serializedValue) { + try { + var idDoc = idObjDoc(realmId, persistId); + var doc = objToDoc(idDoc, objTypeId, serializedValue, createdAtMicros, versionToken, 1); + objs().insertOne(doc); + return true; + } catch (MongoWriteException e) { + if (e.getError().getCategory() == DUPLICATE_KEY) { + return false; + } + throw handleMongoWriteException(e); + } catch (RuntimeException e) { + throw unhandledException(e); + } + } + + @Override + public boolean conditionalUpdate( + @Nonnull String realmId, + String objTypeId, + @Nonnull PersistId persistId, + long createdAtMicros, + @Nonnull String updateToken, + @Nonnull String expectedToken, + @Nonnull byte[] serializedValue) { + var idDoc = idObjDoc(realmId, persistId); + var doc = objToDoc(idDoc, objTypeId, serializedValue, createdAtMicros, updateToken, 1); + + try { + var options = new FindOneAndReplaceOptions().returnDocument(ReturnDocument.BEFORE); + var updateResult = + objs() + .findOneAndReplace( + and(eq(ID_PROPERTY_NAME, idDoc), eq(COL_OBJ_VERSION, expectedToken)), + doc, + options); + return updateResult != null; + } catch (RuntimeException e) { + throw unhandledException(e); + } + } + + @Override + public boolean conditionalDelete( + @Nonnull String realmId, @Nonnull PersistId persistId, @Nonnull String expectedToken) { + var idDoc = idObjDoc(realmId, persistId); + + try { + return objs() + .findOneAndDelete( + and(eq(ID_PROPERTY_NAME, idDoc), eq(COL_OBJ_VERSION, expectedToken))) + != null; + } catch (RuntimeException e) { + throw unhandledException(e); + } + } + + private FetchedObj docToFetched(Document doc) { + if (doc == null) { + return null; + } + var bin = doc.get(COL_OBJ_VALUE, Binary.class); + var objTypeId = doc.getString(COL_OBJ_TYPE); + var versionToken = doc.getString(COL_OBJ_VERSION); + var createdAtMicros = doc.getLong(COL_OBJ_CREATED_AT); + var realPartNum = doc.getInteger(COL_OBJ_REAL_PART_NUM); + return new FetchedObj(objTypeId, createdAtMicros, versionToken, bin.getData(), realPartNum); + } + + private PersistId docToPersistId(Document doc) { + var idDoc = doc.get(ID_PROPERTY_NAME, Document.class); + var id = idDoc.getLong(COL_OBJ_ID); + var part = idDoc.getInteger(COL_OBJ_PART); + return persistId(id, part); + } + + private Document idDocBase(String realmId) { + Document idDoc = new Document(); + idDoc.put(COL_REALM, realmId); + return idDoc; + } + + private Document idRefDoc(String realmId, Reference ref) { + return idRefDoc(realmId, ref.name()); + } + + private Document idRefDoc(String realmId, String name) { + Document idDoc = idDocBase(realmId); + idDoc.put(COL_REF_NAME, name); + return idDoc; + } + + private Document idObjDoc(String realmId, PersistId id) { + Document idDoc = idDocBase(realmId); + idDoc.put(COL_OBJ_ID, id.id()); + idDoc.put(COL_OBJ_PART, id.part()); + return idDoc; + } + + private Document idFetchedDoc(String realmId, WriteObj id) { + Document idDoc = idDocBase(realmId); + idDoc.put(COL_OBJ_ID, id.id()); + idDoc.put(COL_OBJ_PART, id.part()); + return idDoc; + } + + private Document objToDoc( + @Nonnull Document idDoc, + @Nonnull String objTypeId, + @Nonnull byte[] serialized, + long createdAtMicros, + String versionToken, + int partNum) { + var doc = new Document(); + doc.put(ID_PROPERTY_NAME, idDoc); + doc.put(COL_OBJ_TYPE, objTypeId); + doc.put(COL_OBJ_CREATED_AT, createdAtMicros); + doc.put(COL_OBJ_VALUE, new Binary(serialized)); + if (versionToken != null) { + doc.put(COL_OBJ_VERSION, versionToken); + } + doc.put(COL_OBJ_REAL_PART_NUM, partNum); + return doc; + } + + static RuntimeException unhandledException(RuntimeException e) { + if (e instanceof MongoInterruptedException + || e instanceof MongoTimeoutException + || e instanceof MongoServerUnavailableException + || e instanceof MongoSocketReadTimeoutException + || e instanceof MongoExecutionTimeoutException) { + return new UnknownOperationResultException(e); + } + if (e instanceof MongoWriteException mongoWriteException) { + return handleMongoWriteException(mongoWriteException); + } + if (e instanceof MongoBulkWriteException specific) { + for (BulkWriteError error : specific.getWriteErrors()) { + switch (error.getCategory()) { + case EXECUTION_TIMEOUT: + case UNCATEGORIZED: + return new UnknownOperationResultException(e); + default: + break; + } + } + } + return e; + } + + static RuntimeException handleMongoWriteException(MongoWriteException e) { + return handleMongoWriteError(e, e.getError()); + } + + static RuntimeException handleMongoWriteError(MongoException e, WriteError error) { + return switch (error.getCategory()) { + case EXECUTION_TIMEOUT, UNCATEGORIZED -> new UnknownOperationResultException(e); + default -> e; + }; + } +} diff --git a/persistence/nosql/persistence/db/mongodb/src/main/java/org/apache/polaris/persistence/nosql/mongodb/MongoDbBackendConfig.java b/persistence/nosql/persistence/db/mongodb/src/main/java/org/apache/polaris/persistence/nosql/mongodb/MongoDbBackendConfig.java new file mode 100644 index 0000000000..544baf609e --- /dev/null +++ b/persistence/nosql/persistence/db/mongodb/src/main/java/org/apache/polaris/persistence/nosql/mongodb/MongoDbBackendConfig.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.persistence.nosql.mongodb; + +import com.mongodb.client.MongoClient; + +public record MongoDbBackendConfig( + String databaseName, MongoClient client, boolean closeClient, boolean allowPrefixDeletion) {} diff --git a/persistence/nosql/persistence/db/mongodb/src/main/java/org/apache/polaris/persistence/nosql/mongodb/MongoDbBackendFactory.java b/persistence/nosql/persistence/db/mongodb/src/main/java/org/apache/polaris/persistence/nosql/mongodb/MongoDbBackendFactory.java new file mode 100644 index 0000000000..92f686668f --- /dev/null +++ b/persistence/nosql/persistence/db/mongodb/src/main/java/org/apache/polaris/persistence/nosql/mongodb/MongoDbBackendFactory.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.persistence.nosql.mongodb; + +import com.mongodb.client.MongoClients; +import jakarta.annotation.Nonnull; +import org.apache.polaris.persistence.nosql.api.backend.Backend; +import org.apache.polaris.persistence.nosql.api.backend.BackendFactory; + +public class MongoDbBackendFactory + implements BackendFactory { + public static final String NAME = "MongoDb"; + + @Override + @Nonnull + public String name() { + return NAME; + } + + @Override + @Nonnull + public Backend buildBackend(@Nonnull MongoDbBackendConfig backendConfig) { + return new MongoDbBackend(backendConfig); + } + + @Override + public Class configurationInterface() { + return MongoDbConfiguration.class; + } + + @Override + public MongoDbBackendConfig buildConfiguration(MongoDbConfiguration config) { + return new MongoDbBackendConfig( + config + .databaseName() + .orElseThrow( + () -> new IllegalStateException("Mandatory MongoDb database name missing")), + MongoClients.create( + config + .connectionString() + .orElseThrow( + () -> + new IllegalStateException("Mandatory MongoDb connection string missing"))), + true, + config.allowPrefixDeletion().orElse(false)); + } +} diff --git a/persistence/nosql/persistence/db/mongodb/src/main/java/org/apache/polaris/persistence/nosql/mongodb/MongoDbConfiguration.java b/persistence/nosql/persistence/db/mongodb/src/main/java/org/apache/polaris/persistence/nosql/mongodb/MongoDbConfiguration.java new file mode 100644 index 0000000000..5ff744822c --- /dev/null +++ b/persistence/nosql/persistence/db/mongodb/src/main/java/org/apache/polaris/persistence/nosql/mongodb/MongoDbConfiguration.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.persistence.nosql.mongodb; + +import io.smallrye.config.ConfigMapping; +import java.util.Optional; + +/** Polaris persistence, MongoDB backend specific configuration. */ +@ConfigMapping(prefix = "polaris.persistence.backend.mongodb") +public interface MongoDbConfiguration { + Optional connectionString(); + + Optional databaseName(); + + /** + * Optionally enable realm-deletion using a prefix-delete. + * + *

Prefix-deletion is disabled by default. + */ + Optional allowPrefixDeletion(); +} diff --git a/persistence/nosql/persistence/db/mongodb/src/main/resources/META-INF/services/org.apache.polaris.persistence.nosql.api.backend.BackendFactory b/persistence/nosql/persistence/db/mongodb/src/main/resources/META-INF/services/org.apache.polaris.persistence.nosql.api.backend.BackendFactory new file mode 100644 index 0000000000..ef5f633be8 --- /dev/null +++ b/persistence/nosql/persistence/db/mongodb/src/main/resources/META-INF/services/org.apache.polaris.persistence.nosql.api.backend.BackendFactory @@ -0,0 +1,20 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +org.apache.polaris.persistence.nosql.mongodb.MongoDbBackendFactory diff --git a/persistence/nosql/persistence/db/mongodb/src/testFixtures/java/org/apache/polaris/persistence/nosql/mongodb/MongoDbBackendTestFactory.java b/persistence/nosql/persistence/db/mongodb/src/testFixtures/java/org/apache/polaris/persistence/nosql/mongodb/MongoDbBackendTestFactory.java new file mode 100644 index 0000000000..f7643a2fde --- /dev/null +++ b/persistence/nosql/persistence/db/mongodb/src/testFixtures/java/org/apache/polaris/persistence/nosql/mongodb/MongoDbBackendTestFactory.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.persistence.nosql.mongodb; + +import static org.apache.polaris.containerspec.ContainerSpecHelper.containerSpecHelper; + +import java.util.Optional; +import org.apache.polaris.persistence.nosql.api.backend.Backend; +import org.apache.polaris.persistence.nosql.testextension.BackendTestFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.ContainerLaunchException; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.mongodb.MongoDBContainer; + +public class MongoDbBackendTestFactory implements BackendTestFactory { + private static final Logger LOGGER = LoggerFactory.getLogger(MongoDbBackendTestFactory.class); + public static final String NAME = MongoDbBackendFactory.NAME; + public static final String MONGO_DB_NAME = "test"; + public static final int MONGO_PORT = 27017; + + private MongoDBContainer container; + private String connectionString; + + @Override + public Backend createNewBackend() { + var factory = new MongoDbBackendFactory(); + var config = + factory.buildConfiguration( + new MongoDbConfiguration() { + @Override + public Optional connectionString() { + return Optional.of(connectionString); + } + + @Override + public Optional databaseName() { + return Optional.of(MONGO_DB_NAME); + } + + @Override + public Optional allowPrefixDeletion() { + return Optional.empty(); + } + }); + return factory.buildBackend(config); + } + + public String connectionString() { + return connectionString; + } + + @Override + public void start() { + start(Optional.empty()); + } + + @Override + public void start(Optional containerNetworkId) { + if (container != null) { + throw new IllegalStateException("Already started"); + } + + var dockerImage = + containerSpecHelper("mongodb", MongoDbBackendTestFactory.class) + .dockerImageName(null) + .asCompatibleSubstituteFor("mongo"); + + for (var retry = 0; ; retry++) { + var c = new MongoDBContainer(dockerImage).withLogConsumer(new Slf4jLogConsumer(LOGGER)); + containerNetworkId.ifPresent(c::withNetworkMode); + try { + c.start(); + container = c; + break; + } catch (ContainerLaunchException e) { + c.close(); + if (e.getCause() != null && retry < 3) { + LOGGER.warn("Launch of container {} failed, will retry...", c.getDockerImageName(), e); + continue; + } + LOGGER.error("Launch of container {} failed", c.getDockerImageName(), e); + throw new RuntimeException(e); + } + } + + connectionString = container.getReplicaSetUrl(MONGO_DB_NAME); + + if (containerNetworkId.isPresent()) { + var hostPort = container.getHost() + ':' + container.getMappedPort(MONGO_PORT); + var networkHostPort = + container.getCurrentContainerInfo().getConfig().getHostName() + ':' + MONGO_PORT; + connectionString = connectionString.replace(hostPort, networkHostPort); + } + } + + @Override + public void stop() { + try { + if (container != null) { + container.stop(); + } + } finally { + container = null; + } + } + + @Override + public String name() { + return NAME; + } +} diff --git a/persistence/nosql/persistence/db/mongodb/src/testFixtures/resources/META-INF/services/org.apache.polaris.persistence.nosql.testextension.BackendTestFactory b/persistence/nosql/persistence/db/mongodb/src/testFixtures/resources/META-INF/services/org.apache.polaris.persistence.nosql.testextension.BackendTestFactory new file mode 100644 index 0000000000..89b9cafc44 --- /dev/null +++ b/persistence/nosql/persistence/db/mongodb/src/testFixtures/resources/META-INF/services/org.apache.polaris.persistence.nosql.testextension.BackendTestFactory @@ -0,0 +1,20 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +org.apache.polaris.persistence.nosql.mongodb.MongoDbBackendTestFactory diff --git a/persistence/nosql/persistence/db/mongodb/src/testFixtures/resources/org/apache/polaris/persistence/nosql/mongodb/Dockerfile-mongodb-version b/persistence/nosql/persistence/db/mongodb/src/testFixtures/resources/org/apache/polaris/persistence/nosql/mongodb/Dockerfile-mongodb-version new file mode 100644 index 0000000000..21897d491c --- /dev/null +++ b/persistence/nosql/persistence/db/mongodb/src/testFixtures/resources/org/apache/polaris/persistence/nosql/mongodb/Dockerfile-mongodb-version @@ -0,0 +1,22 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# Dockerfile to provide the image name and tag to a test. +# Version is managed by Renovate - do not edit. +FROM docker.io/mongo:8.2.1