-
Notifications
You must be signed in to change notification settings - Fork 332
[JDBC] Support Policy #1468
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[JDBC] Support Policy #1468
Changes from all commits
7a3a5c9
03472a8
63460ee
bd3531e
f93e5a8
0cf173f
60c7de8
9a2860b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -20,6 +20,7 @@ | |
|
|
||
| import static org.apache.polaris.extension.persistence.relational.jdbc.QueryGenerator.*; | ||
|
|
||
| import com.google.common.base.Preconditions; | ||
| import jakarta.annotation.Nonnull; | ||
| import jakarta.annotation.Nullable; | ||
| import java.sql.SQLException; | ||
|
|
@@ -45,13 +46,17 @@ | |
| import org.apache.polaris.core.persistence.BasePersistence; | ||
| import org.apache.polaris.core.persistence.EntityAlreadyExistsException; | ||
| import org.apache.polaris.core.persistence.IntegrationPersistence; | ||
| import org.apache.polaris.core.persistence.PolicyMappingAlreadyExistsException; | ||
| import org.apache.polaris.core.persistence.PrincipalSecretsGenerator; | ||
| import org.apache.polaris.core.persistence.RetryOnConcurrencyException; | ||
| import org.apache.polaris.core.policy.PolarisPolicyMappingRecord; | ||
| import org.apache.polaris.core.policy.PolicyType; | ||
| import org.apache.polaris.core.storage.PolarisStorageConfigurationInfo; | ||
| import org.apache.polaris.core.storage.PolarisStorageIntegration; | ||
| import org.apache.polaris.core.storage.PolarisStorageIntegrationProvider; | ||
| import org.apache.polaris.extension.persistence.relational.jdbc.models.ModelEntity; | ||
| import org.apache.polaris.extension.persistence.relational.jdbc.models.ModelGrantRecord; | ||
| import org.apache.polaris.extension.persistence.relational.jdbc.models.ModelPolicyMappingRecord; | ||
| import org.apache.polaris.extension.persistence.relational.jdbc.models.ModelPrincipalAuthenticationData; | ||
| import org.slf4j.Logger; | ||
| import org.slf4j.LoggerFactory; | ||
|
|
@@ -220,7 +225,7 @@ public void deleteEntity(@Nonnull PolarisCallContext callCtx, @Nonnull PolarisBa | |
| public void deleteFromGrantRecords( | ||
| @Nonnull PolarisCallContext callCtx, @Nonnull PolarisGrantRecord grantRec) { | ||
| ModelGrantRecord modelGrantRecord = ModelGrantRecord.fromGrantRecord(grantRec); | ||
| String query = generateDeleteQuery(modelGrantRecord, ModelGrantRecord.class, realmId); | ||
| String query = generateDeleteQuery(modelGrantRecord, realmId); | ||
| try { | ||
| datasourceOperations.executeUpdate(query); | ||
| } catch (SQLException e) { | ||
|
|
@@ -246,9 +251,15 @@ public void deleteAllEntityGrantRecords( | |
| @Override | ||
| public void deleteAll(@Nonnull PolarisCallContext callCtx) { | ||
| try { | ||
| datasourceOperations.executeUpdate(generateDeleteAll(ModelEntity.class, realmId)); | ||
| datasourceOperations.executeUpdate(generateDeleteAll(ModelGrantRecord.class, realmId)); | ||
| datasourceOperations.executeUpdate(generateDeleteAll(ModelEntity.class, realmId)); | ||
| datasourceOperations.runWithinTransaction( | ||
| statement -> { | ||
| statement.executeUpdate(generateDeleteAll(ModelEntity.class, realmId)); | ||
| statement.executeUpdate(generateDeleteAll(ModelGrantRecord.class, realmId)); | ||
| statement.executeUpdate( | ||
| generateDeleteAll(ModelPrincipalAuthenticationData.class, realmId)); | ||
| statement.executeUpdate(generateDeleteAll(ModelPolicyMappingRecord.class, realmId)); | ||
| return true; | ||
| }); | ||
| } catch (SQLException e) { | ||
| throw new RuntimeException( | ||
| String.format("Failed to delete all due to %s", e.getMessage()), e); | ||
|
|
@@ -701,6 +712,190 @@ public void deletePrincipalSecrets( | |
| } | ||
| } | ||
|
|
||
| @Override | ||
| public void writeToPolicyMappingRecords( | ||
| @Nonnull PolarisCallContext callCtx, @Nonnull PolarisPolicyMappingRecord record) { | ||
| try { | ||
| datasourceOperations.runWithinTransaction( | ||
| statement -> { | ||
| PolicyType policyType = PolicyType.fromCode(record.getPolicyTypeCode()); | ||
| Preconditions.checkArgument( | ||
| policyType != null, "Invalid policy type code: %s", record.getPolicyTypeCode()); | ||
| String insertPolicyMappingQuery = | ||
| generateInsertQuery( | ||
| ModelPolicyMappingRecord.fromPolicyMappingRecord(record), realmId); | ||
| if (policyType.isInheritable()) { | ||
| return handleInheritablePolicy(callCtx, record, insertPolicyMappingQuery, statement); | ||
| } else { | ||
| statement.executeUpdate(insertPolicyMappingQuery); | ||
| } | ||
| return true; | ||
| }); | ||
| } catch (SQLException e) { | ||
| throw new RuntimeException( | ||
| String.format("Failed to write to policy mapping records due to %s", e.getMessage()), e); | ||
| } | ||
| } | ||
|
|
||
| private boolean handleInheritablePolicy( | ||
| @Nonnull PolarisCallContext callCtx, | ||
| @Nonnull PolarisPolicyMappingRecord record, | ||
| @Nonnull String insertQuery, | ||
| Statement statement) | ||
| throws SQLException { | ||
| List<PolarisPolicyMappingRecord> existingRecords = | ||
| loadPoliciesOnTargetByType( | ||
| callCtx, record.getTargetCatalogId(), record.getTargetId(), record.getPolicyTypeCode()); | ||
| if (existingRecords.size() > 1) { | ||
| throw new PolicyMappingAlreadyExistsException(existingRecords.getFirst()); | ||
| } else if (existingRecords.size() == 1) { | ||
| PolarisPolicyMappingRecord existingRecord = existingRecords.getFirst(); | ||
| if (existingRecord.getPolicyCatalogId() != record.getPolicyCatalogId() | ||
| || existingRecord.getPolicyId() != record.getPolicyId()) { | ||
| // Only one policy of the same type can be attached to an entity when the policy is | ||
| // inheritable. | ||
| throw new PolicyMappingAlreadyExistsException(existingRecord); | ||
| } | ||
| Map<String, Object> updateClause = | ||
| Map.of( | ||
| "target_catalog_id", | ||
| record.getTargetCatalogId(), | ||
| "target_id", | ||
| record.getTargetId(), | ||
| "policy_type_code", | ||
| record.getPolicyTypeCode(), | ||
| "policy_id", | ||
| record.getPolicyId(), | ||
| "policy_catalog_id", | ||
| record.getPolicyCatalogId(), | ||
| "realm_id", | ||
| realmId); | ||
| // In case of the mapping exist, update the policy mapping with the new parameters. | ||
| String updateQuery = | ||
| generateUpdateQuery( | ||
| ModelPolicyMappingRecord.fromPolicyMappingRecord(record), updateClause); | ||
| statement.executeUpdate(updateQuery); | ||
| } else { | ||
| // record doesn't exist do an insert. | ||
| statement.executeUpdate(insertQuery); | ||
| } | ||
| return true; | ||
| } | ||
|
|
||
| @Override | ||
| public void deleteFromPolicyMappingRecords( | ||
| @Nonnull PolarisCallContext callCtx, @Nonnull PolarisPolicyMappingRecord record) { | ||
| var modelPolicyMappingRecord = ModelPolicyMappingRecord.fromPolicyMappingRecord(record); | ||
| String query = generateDeleteQuery(modelPolicyMappingRecord, realmId); | ||
| try { | ||
| datasourceOperations.executeUpdate(query); | ||
| } catch (SQLException e) { | ||
| throw new RuntimeException( | ||
| String.format("Failed to write to policy records due to %s", e.getMessage()), e); | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
| public void deleteAllEntityPolicyMappingRecords( | ||
| @Nonnull PolarisCallContext callCtx, | ||
| @Nonnull PolarisEntityCore entity, | ||
| @Nonnull List<PolarisPolicyMappingRecord> mappingOnTarget, | ||
| @Nonnull List<PolarisPolicyMappingRecord> mappingOnPolicy) { | ||
| try { | ||
| datasourceOperations.executeUpdate( | ||
| generateDeleteQueryForEntityPolicyMappingRecords(entity, realmId)); | ||
| } catch (SQLException e) { | ||
| throw new RuntimeException( | ||
| String.format("Failed to delete policy mapping records due to %s", e.getMessage()), e); | ||
| } | ||
| } | ||
|
|
||
| @Nullable | ||
| @Override | ||
| public PolarisPolicyMappingRecord lookupPolicyMappingRecord( | ||
| @Nonnull PolarisCallContext callCtx, | ||
| long targetCatalogId, | ||
| long targetId, | ||
| int policyTypeCode, | ||
| long policyCatalogId, | ||
| long policyId) { | ||
| Map<String, Object> params = | ||
| Map.of( | ||
| "target_catalog_id", | ||
| targetCatalogId, | ||
| "target_id", | ||
| targetId, | ||
| "policy_type_code", | ||
| policyTypeCode, | ||
| "policy_id", | ||
| policyId, | ||
| "policy_catalog_id", | ||
| policyCatalogId, | ||
| "realm_id", | ||
| realmId); | ||
| String query = generateSelectQuery(ModelPolicyMappingRecord.class, params); | ||
| List<PolarisPolicyMappingRecord> results = fetchPolicyMappingRecords(query); | ||
| Preconditions.checkState(results.size() <= 1, "More than one policy mapping records found"); | ||
| return results.size() == 1 ? results.getFirst() : null; | ||
| } | ||
|
|
||
| @Nonnull | ||
| @Override | ||
| public List<PolarisPolicyMappingRecord> loadPoliciesOnTargetByType( | ||
| @Nonnull PolarisCallContext callCtx, | ||
| long targetCatalogId, | ||
| long targetId, | ||
| int policyTypeCode) { | ||
| Map<String, Object> params = | ||
| Map.of( | ||
| "target_catalog_id", | ||
| targetCatalogId, | ||
| "target_id", | ||
| targetId, | ||
| "policy_type_code", | ||
| policyTypeCode, | ||
| "realm_id", | ||
| realmId); | ||
| String query = generateSelectQuery(ModelPolicyMappingRecord.class, params); | ||
| return fetchPolicyMappingRecords(query); | ||
| } | ||
|
|
||
| @Nonnull | ||
| @Override | ||
| public List<PolarisPolicyMappingRecord> loadAllPoliciesOnTarget( | ||
| @Nonnull PolarisCallContext callCtx, long targetCatalogId, long targetId) { | ||
| Map<String, Object> params = | ||
| Map.of("target_catalog_id", targetCatalogId, "target_id", targetId, "realm_id", realmId); | ||
| String query = generateSelectQuery(ModelPolicyMappingRecord.class, params); | ||
| return fetchPolicyMappingRecords(query); | ||
| } | ||
|
|
||
| @Nonnull | ||
| @Override | ||
| public List<PolarisPolicyMappingRecord> loadAllTargetsOnPolicy( | ||
| @Nonnull PolarisCallContext callCtx, long policyCatalogId, long policyId) { | ||
|
Comment on lines
+875
to
+876
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just wanna to confirm: we are going to add policy type as a parameter, right? cc @HonahX
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. My current plan is that if we want to optimize this we can do another
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What if business layer got the policy type already? The major concern is the performance, esp. for postgres, adding another query inside means we have to wrap 2 queries in a transaction(remember this method is supposed to be atomic), which is usually slower. It makes more sense to look up in either in business layer or metastore manager layer.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for the explanation! The main reason I’m hesitant to add That said, I agree it makes sense to add it for optimization purposes. How about I open a follow-up PR for that change after this one is merged, so it doesn’t block the current work?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Having the |
||
| Map<String, Object> params = | ||
| Map.of("policy_catalog_id", policyCatalogId, "policy_id", policyId, "realm_id", realmId); | ||
| String query = generateSelectQuery(ModelPolicyMappingRecord.class, params); | ||
| return fetchPolicyMappingRecords(query); | ||
| } | ||
|
|
||
| private List<PolarisPolicyMappingRecord> fetchPolicyMappingRecords(String query) { | ||
| try { | ||
| List<PolarisPolicyMappingRecord> results = | ||
| datasourceOperations.executeSelect( | ||
| query, | ||
| ModelPolicyMappingRecord.class, | ||
| ModelPolicyMappingRecord::toPolicyMappingRecord, | ||
| null, | ||
| Integer.MAX_VALUE); | ||
| return results == null ? Collections.emptyList() : results; | ||
| } catch (SQLException e) { | ||
| throw new RuntimeException( | ||
| String.format("Failed to retrieve policy mapping records %s", e.getMessage()), e); | ||
| } | ||
| } | ||
|
|
||
| @Nullable | ||
| @Override | ||
| public <T extends PolarisStorageConfigurationInfo> | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.