Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions bom/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ dependencies {
api(project(":polaris-jpa-model"))

api(project(":polaris-quarkus-admin"))
api(project(":polaris-quarkus-test-commons"))
api(project(":polaris-quarkus-defaults"))
api(project(":polaris-quarkus-server"))
api(project(":polaris-quarkus-service"))
Expand Down
5 changes: 4 additions & 1 deletion extension/persistence/relational-jdbc/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@
* under the License.
*/

plugins { id("polaris-server") }
plugins {
id("polaris-server")
alias(libs.plugins.jandex)
}

dependencies {
implementation(project(":polaris-core"))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.polaris.extension.persistence.relational.jdbc;

import java.util.Locale;

public enum DatabaseType {
POSTGRES("postgres"),
H2("h2");

private final String displayName; // Store the user-friendly name

DatabaseType(String displayName) {
this.displayName = displayName;
}

// Method to get the user-friendly display name
public String getDisplayName() {
return displayName;
}

public static DatabaseType fromDisplayName(String displayName) {
return switch (displayName.toLowerCase(Locale.ROOT)) {
case "h2" -> DatabaseType.H2;
case "postgresql" -> DatabaseType.POSTGRES;
default -> throw new IllegalStateException("Unsupported DatabaseType: '" + displayName + "'");
};
}

public String getInitScriptResource() {
return String.format("%s/schema-v1.sql", this.getDisplayName());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,9 @@
import java.util.function.Predicate;
import javax.sql.DataSource;
import org.apache.polaris.extension.persistence.relational.jdbc.models.Converter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DatasourceOperations {
private static final Logger LOGGER = LoggerFactory.getLogger(DatasourceOperations.class);

private static final String ALREADY_EXISTS_STATE_POSTGRES = "42P07";
private static final String CONSTRAINT_VIOLATION_SQL_CODE = "23505";

private final DataSource datasource;
Expand Down Expand Up @@ -189,10 +185,6 @@ public boolean isConstraintViolation(SQLException e) {
return CONSTRAINT_VIOLATION_SQL_CODE.equals(e.getSQLState());
}

public boolean isAlreadyExistsException(SQLException e) {
return ALREADY_EXISTS_STATE_POSTGRES.equals(e.getSQLState());
}

private Connection borrowConnection() throws SQLException {
return datasource.getConnection();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import jakarta.annotation.Nonnull;
import jakarta.annotation.Nullable;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -86,44 +87,14 @@ public void writeEntity(
@Nonnull PolarisBaseEntity entity,
boolean nameOrParentChanged,
PolarisBaseEntity originalEntity) {
ModelEntity modelEntity = ModelEntity.fromEntity(entity);
String query;
if (originalEntity == null) {
try {
query = generateInsertQuery(modelEntity, realmId);
datasourceOperations.executeUpdate(query);
} catch (SQLException e) {
if ((datasourceOperations.isConstraintViolation(e)
|| datasourceOperations.isAlreadyExistsException(e))) {
throw new EntityAlreadyExistsException(entity, e);
} else {
throw new RuntimeException(
String.format("Failed to write entity due to %s", e.getMessage()), e);
}
}
} else {
Map<String, Object> params =
Map.of(
"id",
originalEntity.getId(),
"catalog_id",
originalEntity.getCatalogId(),
"entity_version",
originalEntity.getEntityVersion(),
"realm_id",
realmId);
query = generateUpdateQuery(modelEntity, params);
try {
int rowsUpdated = datasourceOperations.executeUpdate(query);
if (rowsUpdated == 0) {
throw new RetryOnConcurrencyException(
"Entity '%s' id '%s' concurrently modified; expected version %s",
entity.getName(), entity.getId(), originalEntity.getEntityVersion());
}
} catch (SQLException e) {
throw new RuntimeException(
String.format("Failed to write entity due to %s", e.getMessage()), e);
}
try {
datasourceOperations.runWithinTransaction(
statement -> {
persistEntity(callCtx, entity, originalEntity, statement);
return true;
});
} catch (SQLException e) {
throw new RuntimeException("Error persisting entity", e);
}
}

Expand All @@ -137,70 +108,21 @@ public void writeEntities(
statement -> {
for (int i = 0; i < entities.size(); i++) {
PolarisBaseEntity entity = entities.get(i);
ModelEntity modelEntity = ModelEntity.fromEntity(entity);
PolarisBaseEntity originalEntity =
originalEntities != null ? originalEntities.get(i) : null;

// first, check if the entity has already been created, in which case we will simply
// return it.
PolarisBaseEntity entityFound =
lookupEntity(
callCtx, entity.getCatalogId(), entity.getId(), entity.getTypeCode());
if (entityFound != null) {
if (entityFound != null && originalEntity == null) {
// probably the client retried, simply return it
// TODO: Check correctness of returning entityFound vs entity here. It may have
// already been updated after the creation.
continue;
}
// lookup by name
EntityNameLookupRecord exists =
lookupEntityIdAndSubTypeByName(
callCtx,
entity.getCatalogId(),
entity.getParentId(),
entity.getTypeCode(),
entity.getName());
if (exists != null) {
throw new EntityAlreadyExistsException(entity);
}
String query;
if (originalEntities == null || originalEntities.get(i) == null) {
try {
query = generateInsertQuery(modelEntity, realmId);
statement.executeUpdate(query);
} catch (SQLException e) {
if ((datasourceOperations.isConstraintViolation(e)
|| datasourceOperations.isAlreadyExistsException(e))) {
throw new EntityAlreadyExistsException(entity, e);
} else {
throw new RuntimeException(
String.format("Failed to write entity due to %s", e.getMessage()), e);
}
}
} else {
Map<String, Object> params =
Map.of(
"id",
originalEntities.get(i).getId(),
"catalog_id",
originalEntities.get(i).getCatalogId(),
"entity_version",
originalEntities.get(i).getEntityVersion(),
"realm_id",
realmId);
query = generateUpdateQuery(modelEntity, params);
try {
int rowsUpdated = statement.executeUpdate(query);
if (rowsUpdated == 0) {
throw new RetryOnConcurrencyException(
"Entity '%s' id '%s' concurrently modified; expected version %s",
entity.getName(),
entity.getId(),
originalEntities.get(i).getEntityVersion());
}
} catch (SQLException e) {
throw new RuntimeException(
String.format("Failed to write entity due to %s", e.getMessage()), e);
}
}
persistEntity(callCtx, entity, originalEntity, statement);
}
return true;
});
Expand All @@ -212,6 +134,56 @@ public void writeEntities(
}
}

private void persistEntity(
@Nonnull PolarisCallContext callCtx,
@Nonnull PolarisBaseEntity entity,
PolarisBaseEntity originalEntity,
Statement statement)
throws SQLException {
ModelEntity modelEntity = ModelEntity.fromEntity(entity);
if (originalEntity == null) {
try {
statement.executeUpdate(generateInsertQuery(modelEntity, realmId));
} catch (SQLException e) {
if (datasourceOperations.isConstraintViolation(e)) {
PolarisBaseEntity existingEntity =
lookupEntityByName(
callCtx,
entity.getCatalogId(),
entity.getParentId(),
entity.getTypeCode(),
entity.getName());
throw new EntityAlreadyExistsException(existingEntity, e);
} else {
throw new RuntimeException(
String.format("Failed to write entity due to %s", e.getMessage()), e);
}
}
} else {
Map<String, Object> params =
Map.of(
"id",
originalEntity.getId(),
"catalog_id",
originalEntity.getCatalogId(),
"entity_version",
originalEntity.getEntityVersion(),
"realm_id",
realmId);
try {
int rowsUpdated = statement.executeUpdate(generateUpdateQuery(modelEntity, params));
if (rowsUpdated == 0) {
throw new RetryOnConcurrencyException(
"Entity '%s' id '%s' concurrently modified; expected version %s",
originalEntity.getName(), originalEntity.getId(), originalEntity.getEntityVersion());
}
} catch (SQLException e) {
throw new RuntimeException(
String.format("Failed to write entity due to %s", e.getMessage()), e);
}
}
}

@Override
public void writeToGrantRecords(
@Nonnull PolarisCallContext callCtx, @Nonnull PolarisGrantRecord grantRec) {
Expand Down Expand Up @@ -492,6 +464,8 @@ public PolarisGrantRecord lookupGrantRecord(
throw new IllegalStateException(
String.format(
"More than one grant record %s for a given Grant record", results.getFirst()));
} else if (results.isEmpty()) {
return null;
}
return results.getFirst();
} catch (SQLException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@
import io.smallrye.common.annotation.Identifier;
import jakarta.annotation.Nullable;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.inject.Instance;
import jakarta.inject.Inject;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.Map;
Expand All @@ -37,6 +39,7 @@
import org.apache.polaris.core.entity.PolarisEntitySubType;
import org.apache.polaris.core.entity.PolarisEntityType;
import org.apache.polaris.core.entity.PolarisPrincipalSecrets;
import org.apache.polaris.core.persistence.AtomicOperationMetaStoreManager;
import org.apache.polaris.core.persistence.BasePersistence;
import org.apache.polaris.core.persistence.MetaStoreManagerFactory;
import org.apache.polaris.core.persistence.PolarisMetaStoreManager;
Expand All @@ -46,7 +49,6 @@
import org.apache.polaris.core.persistence.dao.entity.BaseResult;
import org.apache.polaris.core.persistence.dao.entity.EntityResult;
import org.apache.polaris.core.persistence.dao.entity.PrincipalSecretsResult;
import org.apache.polaris.core.persistence.transactional.TransactionalMetaStoreManagerImpl;
import org.apache.polaris.core.storage.PolarisStorageIntegrationProvider;
import org.apache.polaris.core.storage.cache.StorageCredentialCache;
import org.slf4j.Logger;
Expand All @@ -68,10 +70,9 @@ public class JdbcMetaStoreManagerFactory implements MetaStoreManagerFactory {
final Map<String, EntityCache> entityCacheMap = new HashMap<>();
final Map<String, Supplier<BasePersistence>> sessionSupplierMap = new HashMap<>();
protected final PolarisDiagnostics diagServices = new PolarisDefaultDiagServiceImpl();
// TODO: Pending discussion of if we should have one Database per realm or 1 schema per realm
// or realm should be a primary key on all the tables.
@Inject DataSource dataSource;

@Inject PolarisStorageIntegrationProvider storageIntegrationProvider;
@Inject Instance<DataSource> dataSource;

protected JdbcMetaStoreManagerFactory() {}

Expand All @@ -86,7 +87,7 @@ protected PrincipalSecretsGenerator secretsGenerator(
}

protected PolarisMetaStoreManager createNewMetaStoreManager() {
return new TransactionalMetaStoreManagerImpl();
return new AtomicOperationMetaStoreManager();
}

private void initializeForRealm(
Expand All @@ -106,12 +107,16 @@ private void initializeForRealm(
}

private DatasourceOperations getDatasourceOperations(boolean isBootstrap) {
DatasourceOperations databaseOperations = new DatasourceOperations(dataSource);
DatasourceOperations databaseOperations = new DatasourceOperations(dataSource.get());
if (isBootstrap) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to recap: I believe it is preferable to run DDL from the dedicated bootstrap command or higher-level class and avoid conditional DDL yes/no logic on every call to getDatasourceOperations (for follow-up).

// TODO: see if we need to take script from Quarkus or can we just
// use the script committed in the repo.
try {
databaseOperations.executeScript("scripts/postgres/schema-v1-postgres.sql");
DatabaseType databaseType;
try (Connection connection = dataSource.get().getConnection()) {
String productName = connection.getMetaData().getDatabaseProductName();
databaseType = DatabaseType.fromDisplayName(productName);
}
databaseOperations.executeScript(
String.format("%s/schema-v1.sql", databaseType.getDisplayName()));
} catch (SQLException e) {
throw new RuntimeException(
String.format("Error executing sql script: %s", e.getMessage()), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,8 +152,11 @@ public ModelEntity fromResultSet(ResultSet r) throws SQLException {
.purgeTimestamp(r.getObject("purge_timestamp", Long.class))
.toPurgeTimestamp(r.getObject("to_purge_timestamp", Long.class))
.lastUpdateTimestamp(r.getObject("last_update_timestamp", Long.class))
.properties(r.getObject("properties", String.class))
.internalProperties(r.getObject("internal_properties", String.class))
.properties(
r.getString("properties")) // required for extracting when the underlying type is JSONB
.internalProperties(
r.getString(
"internal_properties")) // required for extracting when the underlying type is JSONB
.grantRecordsVersion(r.getObject("grant_records_version", Integer.class))
.build();
}
Expand Down
Loading