diff --git a/api/polaris-catalog-service/build.gradle.kts b/api/polaris-catalog-service/build.gradle.kts index 2fd861d80b..5f0a8134fd 100644 --- a/api/polaris-catalog-service/build.gradle.kts +++ b/api/polaris-catalog-service/build.gradle.kts @@ -36,7 +36,6 @@ val policyManagementModels = "CatalogIdentifier", "CreatePolicyRequest", "LoadPolicyResponse", - "PolicyIdentifier", "Policy", "PolicyAttachmentTarget", "AttachPolicyRequest", @@ -59,6 +58,7 @@ dependencies { implementation(libs.jakarta.inject.api) implementation(libs.jakarta.validation.api) implementation(libs.swagger.annotations) + implementation(libs.guava) implementation(libs.jakarta.servlet.api) implementation(libs.jakarta.ws.rs.api) @@ -103,6 +103,9 @@ openApiGenerate { "ErrorModel" to "org.apache.iceberg.rest.responses.ErrorResponse", "IcebergErrorResponse" to "org.apache.iceberg.rest.responses.ErrorResponse", "TableIdentifier" to "org.apache.iceberg.catalog.TableIdentifier", + + // Custom types defined below + "PolicyIdentifier" to "org.apache.polaris.service.types.PolicyIdentifier", ) } diff --git a/api/polaris-catalog-service/src/main/java/org/apache/polaris/service/types/PolicyIdentifier.java b/api/polaris-catalog-service/src/main/java/org/apache/polaris/service/types/PolicyIdentifier.java new file mode 100644 index 0000000000..59dfd30249 --- /dev/null +++ b/api/polaris-catalog-service/src/main/java/org/apache/polaris/service/types/PolicyIdentifier.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.service.types; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import io.swagger.annotations.ApiModelProperty; +import java.util.Objects; +import org.apache.iceberg.catalog.Namespace; + +/** + * Represents a modified version of the PolicyIdentifier that is different from the one generated by + * the OpenAPI generator + * + *

the open api generation inlines the namespace definition, generates a {@code List} + * directly, instead of generating a Namespace class. This version uses {@link + * org.apache.iceberg.catalog.Namespace} for namespace field. + * + *

TODO: make code generation use {@link org.apache.iceberg.catalog.Namespace} directly + */ +public class PolicyIdentifier { + + private final Namespace namespace; + private final String name; + + /** Reference to one or more levels of a namespace */ + @ApiModelProperty( + example = "[\"accounting\",\"tax\"]", + required = true, + value = "Reference to one or more levels of a namespace") + @JsonProperty(value = "namespace", required = true) + public Namespace getNamespace() { + return namespace; + } + + /** */ + @ApiModelProperty(required = true, value = "") + @JsonProperty(value = "name", required = true) + public String getName() { + return name; + } + + @JsonCreator + public PolicyIdentifier( + @JsonProperty(value = "namespace", required = true) Namespace namespace, + @JsonProperty(value = "name", required = true) String name) { + this.namespace = Objects.requireNonNullElse(namespace, Namespace.empty()); + this.name = name; + } + + public static Builder builder() { + return new Builder(); + } + + public static Builder builder(Namespace namespace, String name) { + return new Builder(namespace, name); + } + + public static final class Builder { + private Namespace namespace; + private String name; + + private Builder() {} + + private Builder(Namespace namespace, String name) { + this.namespace = Objects.requireNonNullElse(namespace, Namespace.empty()); + this.name = name; + } + + public Builder setNamespace(Namespace namespace) { + this.namespace = namespace; + return this; + } + + public Builder setName(String name) { + this.name = name; + return this; + } + + public PolicyIdentifier build() { + PolicyIdentifier inst = new PolicyIdentifier(namespace, name); + return inst; + } + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + PolicyIdentifier policyIdentifier = (PolicyIdentifier) o; + return Objects.equals(this.namespace, policyIdentifier.namespace) + && Objects.equals(this.name, policyIdentifier.name); + } + + @Override + public int hashCode() { + return Objects.hash(namespace, name); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("class PolicyIdentifier {\n"); + + sb.append(" namespace: ").append(toIndentedString(namespace)).append("\n"); + sb.append(" name: ").append(toIndentedString(name)).append("\n"); + sb.append("}"); + return sb.toString(); + } + + /** + * Convert the given object to string with each line indented by 4 spaces (except the first line). + */ + private String toIndentedString(Object o) { + if (o == null) { + return "null"; + } + return o.toString().replace("\n", "\n "); + } +} diff --git a/polaris-core/src/main/java/org/apache/polaris/core/catalog/PolarisCatalogHelpers.java b/polaris-core/src/main/java/org/apache/polaris/core/catalog/PolarisCatalogHelpers.java index ce0345bf39..e10c24f2f1 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/catalog/PolarisCatalogHelpers.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/catalog/PolarisCatalogHelpers.java @@ -36,10 +36,14 @@ public class PolarisCatalogHelpers { private PolarisCatalogHelpers() {} public static List tableIdentifierToList(TableIdentifier identifier) { + return identifierToList(identifier.namespace(), identifier.name()); + } + + public static List identifierToList(Namespace namespace, String name) { ImmutableList.Builder fullList = - ImmutableList.builderWithExpectedSize(identifier.namespace().length() + 1); - fullList.addAll(Arrays.asList(identifier.namespace().levels())); - fullList.add(identifier.name()); + ImmutableList.builderWithExpectedSize(namespace.length() + 1); + fullList.addAll(Arrays.asList(namespace.levels())); + fullList.add(name); return fullList.build(); } diff --git a/polaris-core/src/main/java/org/apache/polaris/core/policy/exceptions/NoSuchPolicyException.java b/polaris-core/src/main/java/org/apache/polaris/core/policy/exceptions/NoSuchPolicyException.java new file mode 100644 index 0000000000..642bd36adb --- /dev/null +++ b/polaris-core/src/main/java/org/apache/polaris/core/policy/exceptions/NoSuchPolicyException.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.core.policy.exceptions; + +import org.apache.polaris.core.exceptions.PolarisException; + +public class NoSuchPolicyException extends PolarisException { + + public NoSuchPolicyException(String message) { + super(message); + } + + public NoSuchPolicyException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/polaris-core/src/main/java/org/apache/polaris/core/policy/exceptions/PolicyVersionMismatchException.java b/polaris-core/src/main/java/org/apache/polaris/core/policy/exceptions/PolicyVersionMismatchException.java new file mode 100644 index 0000000000..c2f8f8f78e --- /dev/null +++ b/polaris-core/src/main/java/org/apache/polaris/core/policy/exceptions/PolicyVersionMismatchException.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.core.policy.exceptions; + +import org.apache.polaris.core.exceptions.PolarisException; + +public class PolicyVersionMismatchException extends PolarisException { + public PolicyVersionMismatchException(String message) { + super(message); + } + + public PolicyVersionMismatchException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/PolicyCatalogTest.java b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/PolicyCatalogTest.java new file mode 100644 index 0000000000..83bdecaaa4 --- /dev/null +++ b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/PolicyCatalogTest.java @@ -0,0 +1,496 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.service.quarkus.catalog; + +import static org.mockito.ArgumentMatchers.isA; +import static org.mockito.Mockito.when; + +import com.google.common.collect.ImmutableMap; +import io.quarkus.test.junit.QuarkusMock; +import io.quarkus.test.junit.QuarkusTest; +import io.quarkus.test.junit.QuarkusTestProfile; +import io.quarkus.test.junit.TestProfile; +import jakarta.inject.Inject; +import jakarta.ws.rs.core.SecurityContext; +import java.io.IOException; +import java.lang.reflect.Method; +import java.time.Clock; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.function.Supplier; +import org.apache.commons.lang3.NotImplementedException; +import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.exceptions.AlreadyExistsException; +import org.apache.polaris.core.PolarisCallContext; +import org.apache.polaris.core.PolarisDiagnostics; +import org.apache.polaris.core.admin.model.AwsStorageConfigInfo; +import org.apache.polaris.core.admin.model.StorageConfigInfo; +import org.apache.polaris.core.auth.AuthenticatedPolarisPrincipal; +import org.apache.polaris.core.auth.PolarisAuthorizerImpl; +import org.apache.polaris.core.config.FeatureConfiguration; +import org.apache.polaris.core.config.PolarisConfigurationStore; +import org.apache.polaris.core.context.CallContext; +import org.apache.polaris.core.context.RealmContext; +import org.apache.polaris.core.entity.CatalogEntity; +import org.apache.polaris.core.entity.PolarisEntity; +import org.apache.polaris.core.entity.PolarisEntitySubType; +import org.apache.polaris.core.entity.PolarisEntityType; +import org.apache.polaris.core.entity.PrincipalEntity; +import org.apache.polaris.core.persistence.MetaStoreManagerFactory; +import org.apache.polaris.core.persistence.PolarisEntityManager; +import org.apache.polaris.core.persistence.PolarisMetaStoreManager; +import org.apache.polaris.core.persistence.bootstrap.RootCredentialsSet; +import org.apache.polaris.core.persistence.cache.EntityCache; +import org.apache.polaris.core.persistence.dao.entity.BaseResult; +import org.apache.polaris.core.persistence.dao.entity.PrincipalSecretsResult; +import org.apache.polaris.core.persistence.transactional.TransactionalPersistence; +import org.apache.polaris.core.policy.PredefinedPolicyTypes; +import org.apache.polaris.core.policy.exceptions.NoSuchPolicyException; +import org.apache.polaris.core.policy.exceptions.PolicyVersionMismatchException; +import org.apache.polaris.core.policy.validator.InvalidPolicyException; +import org.apache.polaris.core.storage.PolarisStorageIntegration; +import org.apache.polaris.core.storage.PolarisStorageIntegrationProvider; +import org.apache.polaris.core.storage.aws.AwsCredentialsStorageIntegration; +import org.apache.polaris.core.storage.aws.AwsStorageConfigurationInfo; +import org.apache.polaris.core.storage.cache.StorageCredentialCache; +import org.apache.polaris.service.admin.PolarisAdminService; +import org.apache.polaris.service.catalog.PolarisPassthroughResolutionView; +import org.apache.polaris.service.catalog.iceberg.IcebergCatalog; +import org.apache.polaris.service.catalog.io.DefaultFileIOFactory; +import org.apache.polaris.service.catalog.io.FileIOFactory; +import org.apache.polaris.service.catalog.policy.PolicyCatalog; +import org.apache.polaris.service.config.RealmEntityManagerFactory; +import org.apache.polaris.service.storage.PolarisStorageIntegrationProviderImpl; +import org.apache.polaris.service.task.TaskExecutor; +import org.apache.polaris.service.types.Policy; +import org.apache.polaris.service.types.PolicyIdentifier; +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInfo; +import org.mockito.Mockito; +import software.amazon.awssdk.services.sts.StsClient; +import software.amazon.awssdk.services.sts.model.AssumeRoleRequest; +import software.amazon.awssdk.services.sts.model.AssumeRoleResponse; +import software.amazon.awssdk.services.sts.model.Credentials; + +@QuarkusTest +@TestProfile(PolicyCatalogTest.Profile.class) +public class PolicyCatalogTest { + public static class Profile implements QuarkusTestProfile { + + @Override + public Map getConfigOverrides() { + return Map.of( + "polaris.features.defaults.\"ALLOW_SPECIFYING_FILE_IO_IMPL\"", + "true", + "polaris.features.defaults.\"INITIALIZE_DEFAULT_CATALOG_FILEIO_FOR_TEST\"", + "true", + "polaris.features.defaults.\"SUPPORTED_CATALOG_STORAGE_TYPES\"", + "[\"FILE\"]"); + } + } + + protected static final Namespace NS = Namespace.of("newdb"); + protected static final TableIdentifier TABLE = TableIdentifier.of(NS, "table"); + public static final String CATALOG_NAME = "polaris-catalog"; + public static final String TEST_ACCESS_KEY = "test_access_key"; + public static final String SECRET_ACCESS_KEY = "secret_access_key"; + public static final String SESSION_TOKEN = "session_token"; + + private static final Namespace NS1 = Namespace.of("ns1"); + private static final PolicyIdentifier POLICY1 = new PolicyIdentifier(NS1, "p1"); + private static final PolicyIdentifier POLICY2 = new PolicyIdentifier(NS1, "p2"); + private static final PolicyIdentifier POLICY3 = new PolicyIdentifier(NS1, "p3"); + private static final PolicyIdentifier POLICY4 = new PolicyIdentifier(NS1, "p4"); + + @Inject MetaStoreManagerFactory managerFactory; + @Inject PolarisConfigurationStore configurationStore; + @Inject PolarisStorageIntegrationProvider storageIntegrationProvider; + @Inject PolarisDiagnostics diagServices; + + private PolicyCatalog policyCatalog; + private IcebergCatalog icebergCatalog; + private CallContext callContext; + private AwsStorageConfigInfo storageConfigModel; + private String realmName; + private PolarisMetaStoreManager metaStoreManager; + private PolarisCallContext polarisContext; + private PolarisAdminService adminService; + private PolarisEntityManager entityManager; + private FileIOFactory fileIOFactory; + private AuthenticatedPolarisPrincipal authenticatedRoot; + private PolarisEntity catalogEntity; + private SecurityContext securityContext; + + @BeforeAll + public static void setUpMocks() { + PolarisStorageIntegrationProviderImpl mock = + Mockito.mock(PolarisStorageIntegrationProviderImpl.class); + QuarkusMock.installMockForType(mock, PolarisStorageIntegrationProviderImpl.class); + } + + @BeforeEach + @SuppressWarnings("unchecked") + public void before(TestInfo testInfo) { + realmName = + "realm_%s_%s" + .formatted( + testInfo.getTestMethod().map(Method::getName).orElse("test"), System.nanoTime()); + RealmContext realmContext = () -> realmName; + metaStoreManager = managerFactory.getOrCreateMetaStoreManager(realmContext); + polarisContext = + new PolarisCallContext( + managerFactory.getOrCreateSessionSupplier(realmContext).get(), + diagServices, + configurationStore, + Clock.systemDefaultZone()); + entityManager = + new PolarisEntityManager( + metaStoreManager, new StorageCredentialCache(), new EntityCache(metaStoreManager)); + + callContext = CallContext.of(realmContext, polarisContext); + + PrincipalEntity rootEntity = + new PrincipalEntity( + PolarisEntity.of( + metaStoreManager + .readEntityByName( + polarisContext, + null, + PolarisEntityType.PRINCIPAL, + PolarisEntitySubType.NULL_SUBTYPE, + "root") + .getEntity())); + + authenticatedRoot = new AuthenticatedPolarisPrincipal(rootEntity, Set.of()); + + securityContext = Mockito.mock(SecurityContext.class); + when(securityContext.getUserPrincipal()).thenReturn(authenticatedRoot); + when(securityContext.isUserInRole(isA(String.class))).thenReturn(true); + adminService = + new PolarisAdminService( + callContext, + entityManager, + metaStoreManager, + securityContext, + new PolarisAuthorizerImpl(new PolarisConfigurationStore() {})); + + String storageLocation = "s3://my-bucket/path/to/data"; + storageConfigModel = + AwsStorageConfigInfo.builder() + .setRoleArn("arn:aws:iam::012345678901:role/jdoe") + .setExternalId("externalId") + .setUserArn("aws::a:user:arn") + .setStorageType(StorageConfigInfo.StorageTypeEnum.S3) + .setAllowedLocations(List.of(storageLocation, "s3://externally-owned-bucket")) + .build(); + catalogEntity = + adminService.createCatalog( + new CatalogEntity.Builder() + .setName(CATALOG_NAME) + .setDefaultBaseLocation(storageLocation) + .setReplaceNewLocationPrefixWithCatalogDefault("file:") + .addProperty( + FeatureConfiguration.ALLOW_EXTERNAL_TABLE_LOCATION.catalogConfig(), "true") + .addProperty( + FeatureConfiguration.ALLOW_UNSTRUCTURED_TABLE_LOCATION.catalogConfig(), "true") + .setStorageConfigurationInfo(storageConfigModel, storageLocation) + .build()); + + PolarisPassthroughResolutionView passthroughView = + new PolarisPassthroughResolutionView( + callContext, entityManager, securityContext, CATALOG_NAME); + TaskExecutor taskExecutor = Mockito.mock(); + RealmEntityManagerFactory realmEntityManagerFactory = + new RealmEntityManagerFactory(createMockMetaStoreManagerFactory()); + this.fileIOFactory = + new DefaultFileIOFactory(realmEntityManagerFactory, managerFactory, configurationStore); + + StsClient stsClient = Mockito.mock(StsClient.class); + when(stsClient.assumeRole(isA(AssumeRoleRequest.class))) + .thenReturn( + AssumeRoleResponse.builder() + .credentials( + Credentials.builder() + .accessKeyId(TEST_ACCESS_KEY) + .secretAccessKey(SECRET_ACCESS_KEY) + .sessionToken(SESSION_TOKEN) + .build()) + .build()); + PolarisStorageIntegration storageIntegration = + new AwsCredentialsStorageIntegration(stsClient); + when(storageIntegrationProvider.getStorageIntegrationForConfig( + isA(AwsStorageConfigurationInfo.class))) + .thenReturn((PolarisStorageIntegration) storageIntegration); + + this.policyCatalog = new PolicyCatalog(metaStoreManager, callContext, passthroughView); + this.icebergCatalog = + new IcebergCatalog( + entityManager, + metaStoreManager, + callContext, + passthroughView, + securityContext, + taskExecutor, + fileIOFactory); + this.icebergCatalog.initialize( + CATALOG_NAME, + ImmutableMap.of( + CatalogProperties.FILE_IO_IMPL, "org.apache.iceberg.inmemory.InMemoryFileIO")); + } + + @AfterEach + public void after() throws IOException { + metaStoreManager.purge(polarisContext); + } + + private MetaStoreManagerFactory createMockMetaStoreManagerFactory() { + return new MetaStoreManagerFactory() { + @Override + public PolarisMetaStoreManager getOrCreateMetaStoreManager(RealmContext realmContext) { + return metaStoreManager; + } + + @Override + public Supplier getOrCreateSessionSupplier( + RealmContext realmContext) { + return () -> ((TransactionalPersistence) polarisContext.getMetaStore()); + } + + @Override + public StorageCredentialCache getOrCreateStorageCredentialCache(RealmContext realmContext) { + return new StorageCredentialCache(); + } + + @Override + public EntityCache getOrCreateEntityCache(RealmContext realmContext) { + return new EntityCache(metaStoreManager); + } + + @Override + public Map bootstrapRealms( + Iterable realms, RootCredentialsSet rootCredentialsSet) { + throw new NotImplementedException("Bootstrapping realms is not supported"); + } + + @Override + public Map purgeRealms(Iterable realms) { + throw new NotImplementedException("Purging realms is not supported"); + } + }; + } + + @Test + public void testCreatePolicyDoesNotThrow() { + icebergCatalog.createNamespace(NS1); + Assertions.assertThatCode( + () -> + policyCatalog.createPolicy( + POLICY1, + PredefinedPolicyTypes.DATA_COMPACTION.getName(), + "test", + "{\"enable\": false}")) + .doesNotThrowAnyException(); + } + + @Test + public void testCreatePolicyAlreadyExists() { + icebergCatalog.createNamespace(NS1); + policyCatalog.createPolicy( + POLICY1, PredefinedPolicyTypes.DATA_COMPACTION.getName(), "test", "{\"enable\": false}"); + + Assertions.assertThatThrownBy( + () -> + policyCatalog.createPolicy( + POLICY1, + PredefinedPolicyTypes.DATA_COMPACTION.getName(), + "test", + "{\"enable\": false}")) + .isInstanceOf(AlreadyExistsException.class); + + Assertions.assertThatThrownBy( + () -> + policyCatalog.createPolicy( + POLICY1, + PredefinedPolicyTypes.SNAPSHOT_RETENTION.getName(), + "test", + "{\"enable\": false}")) + .isInstanceOf(AlreadyExistsException.class); + } + + @Test + public void testListPolicies() { + icebergCatalog.createNamespace(NS1); + policyCatalog.createPolicy( + POLICY1, PredefinedPolicyTypes.DATA_COMPACTION.getName(), "test", "{\"enable\": false}"); + + policyCatalog.createPolicy( + POLICY2, + PredefinedPolicyTypes.METADATA_COMPACTION.getName(), + "test", + "{\"enable\": false}"); + + policyCatalog.createPolicy( + POLICY3, PredefinedPolicyTypes.SNAPSHOT_RETENTION.getName(), "test", "{\"enable\": false}"); + + policyCatalog.createPolicy( + POLICY4, + PredefinedPolicyTypes.ORPHAN_FILE_REMOVAL.getName(), + "test", + "{\"enable\": false}"); + + List listResult = policyCatalog.listPolicies(NS1, null); + Assertions.assertThat(listResult).hasSize(4); + Assertions.assertThat(listResult).containsExactlyInAnyOrder(POLICY1, POLICY2, POLICY3, POLICY4); + } + + @Test + public void testListPoliciesFilterByPolicyType() { + icebergCatalog.createNamespace(NS1); + policyCatalog.createPolicy( + POLICY1, PredefinedPolicyTypes.DATA_COMPACTION.getName(), "test", "{\"enable\": false}"); + + policyCatalog.createPolicy( + POLICY2, + PredefinedPolicyTypes.METADATA_COMPACTION.getName(), + "test", + "{\"enable\": false}"); + + policyCatalog.createPolicy( + POLICY3, PredefinedPolicyTypes.SNAPSHOT_RETENTION.getName(), "test", "{\"enable\": false}"); + + policyCatalog.createPolicy( + POLICY4, + PredefinedPolicyTypes.ORPHAN_FILE_REMOVAL.getName(), + "test", + "{\"enable\": false}"); + + List listResult = + policyCatalog.listPolicies(NS1, PredefinedPolicyTypes.ORPHAN_FILE_REMOVAL); + Assertions.assertThat(listResult).hasSize(1); + Assertions.assertThat(listResult).containsExactlyInAnyOrder(POLICY4); + } + + @Test + public void testLoadPolicy() { + icebergCatalog.createNamespace(NS1); + policyCatalog.createPolicy( + POLICY1, PredefinedPolicyTypes.DATA_COMPACTION.getName(), "test", "{\"enable\": false}"); + + Policy policy = policyCatalog.loadPolicy(POLICY1); + Assertions.assertThat(policy.getVersion()).isEqualTo(0); + Assertions.assertThat(policy.getPolicyType()) + .isEqualTo(PredefinedPolicyTypes.DATA_COMPACTION.getName()); + Assertions.assertThat(policy.getContent()).isEqualTo("{\"enable\": false}"); + Assertions.assertThat(policy.getName()).isEqualTo("p1"); + Assertions.assertThat(policy.getDescription()).isEqualTo("test"); + } + + @Test + public void testCreatePolicyWithInvalidContent() { + icebergCatalog.createNamespace(NS1); + + Assertions.assertThatThrownBy( + () -> + policyCatalog.createPolicy( + POLICY1, PredefinedPolicyTypes.DATA_COMPACTION.getName(), "test", "invalid")) + .isInstanceOf(InvalidPolicyException.class); + } + + @Test + public void testLoadPolicyNotExist() { + icebergCatalog.createNamespace(NS1); + + Assertions.assertThatThrownBy(() -> policyCatalog.loadPolicy(POLICY1)) + .isInstanceOf(NoSuchPolicyException.class); + } + + @Test + public void testUpdatePolicy() { + icebergCatalog.createNamespace(NS1); + policyCatalog.createPolicy( + POLICY1, PredefinedPolicyTypes.DATA_COMPACTION.getName(), "test", "{\"enable\": false}"); + + policyCatalog.updatePolicy(POLICY1, "updated", "{\"enable\": true}", 0); + + Policy policy = policyCatalog.loadPolicy(POLICY1); + Assertions.assertThat(policy.getVersion()).isEqualTo(1); + Assertions.assertThat(policy.getPolicyType()) + .isEqualTo(PredefinedPolicyTypes.DATA_COMPACTION.getName()); + Assertions.assertThat(policy.getContent()).isEqualTo("{\"enable\": true}"); + Assertions.assertThat(policy.getName()).isEqualTo("p1"); + Assertions.assertThat(policy.getDescription()).isEqualTo("updated"); + } + + @Test + public void testUpdatePolicyWithWrongVersion() { + icebergCatalog.createNamespace(NS1); + policyCatalog.createPolicy( + POLICY1, PredefinedPolicyTypes.DATA_COMPACTION.getName(), "test", "{\"enable\": false}"); + + Assertions.assertThatThrownBy( + () -> policyCatalog.updatePolicy(POLICY1, "updated", "{\"enable\": true}", 1)) + .isInstanceOf(PolicyVersionMismatchException.class); + } + + @Test + public void testUpdatePolicyWithInvalidContent() { + icebergCatalog.createNamespace(NS1); + policyCatalog.createPolicy( + POLICY1, PredefinedPolicyTypes.DATA_COMPACTION.getName(), "test", "{\"enable\": false}"); + + Assertions.assertThatThrownBy( + () -> policyCatalog.updatePolicy(POLICY1, "updated", "invalid", 0)) + .isInstanceOf(InvalidPolicyException.class); + } + + @Test + public void testUpdatePolicyNotExist() { + icebergCatalog.createNamespace(NS1); + + Assertions.assertThatThrownBy( + () -> policyCatalog.updatePolicy(POLICY1, "updated", "{\"enable\": true}", 0)) + .isInstanceOf(NoSuchPolicyException.class); + } + + @Test + public void testDropPolicy() { + icebergCatalog.createNamespace(NS1); + policyCatalog.createPolicy( + POLICY1, PredefinedPolicyTypes.DATA_COMPACTION.getName(), "test", "{\"enable\": false}"); + + Assertions.assertThat(policyCatalog.dropPolicy(POLICY1, false)).isTrue(); + Assertions.assertThatThrownBy(() -> policyCatalog.loadPolicy(POLICY1)) + .isInstanceOf(NoSuchPolicyException.class); + } + + @Test + public void testDropPolicyNotExist() { + icebergCatalog.createNamespace(NS1); + + Assertions.assertThatThrownBy(() -> policyCatalog.dropPolicy(POLICY1, false)) + .isInstanceOf(NoSuchPolicyException.class); + } +} diff --git a/service/common/build.gradle.kts b/service/common/build.gradle.kts index fec8f12c74..ca00893e07 100644 --- a/service/common/build.gradle.kts +++ b/service/common/build.gradle.kts @@ -102,6 +102,7 @@ dependencies { testFixturesImplementation(project(":polaris-api-management-model")) testFixturesImplementation(project(":polaris-api-management-service")) testFixturesImplementation(project(":polaris-api-iceberg-service")) + testFixturesImplementation(project(":polaris-api-catalog-service")) testFixturesImplementation(libs.jakarta.enterprise.cdi.api) testFixturesImplementation(libs.jakarta.annotation.api) diff --git a/service/common/src/main/java/org/apache/polaris/service/catalog/policy/PolicyCatalog.java b/service/common/src/main/java/org/apache/polaris/service/catalog/policy/PolicyCatalog.java new file mode 100644 index 0000000000..a4fb23da7d --- /dev/null +++ b/service/common/src/main/java/org/apache/polaris/service/catalog/policy/PolicyCatalog.java @@ -0,0 +1,285 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.service.catalog.policy; + +import java.util.List; +import java.util.Map; +import java.util.Optional; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.exceptions.AlreadyExistsException; +import org.apache.iceberg.exceptions.BadRequestException; +import org.apache.polaris.core.context.CallContext; +import org.apache.polaris.core.entity.CatalogEntity; +import org.apache.polaris.core.entity.PolarisEntity; +import org.apache.polaris.core.entity.PolarisEntitySubType; +import org.apache.polaris.core.entity.PolarisEntityType; +import org.apache.polaris.core.persistence.PolarisMetaStoreManager; +import org.apache.polaris.core.persistence.PolarisResolvedPathWrapper; +import org.apache.polaris.core.persistence.dao.entity.DropEntityResult; +import org.apache.polaris.core.persistence.dao.entity.EntityResult; +import org.apache.polaris.core.persistence.resolver.PolarisResolutionManifestCatalogView; +import org.apache.polaris.core.policy.PolicyEntity; +import org.apache.polaris.core.policy.PolicyType; +import org.apache.polaris.core.policy.exceptions.NoSuchPolicyException; +import org.apache.polaris.core.policy.exceptions.PolicyVersionMismatchException; +import org.apache.polaris.core.policy.validator.PolicyValidators; +import org.apache.polaris.service.types.Policy; +import org.apache.polaris.service.types.PolicyIdentifier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class PolicyCatalog { + private static final Logger LOGGER = LoggerFactory.getLogger(PolicyCatalog.class); + + private final CallContext callContext; + private final PolarisResolutionManifestCatalogView resolvedEntityView; + private final CatalogEntity catalogEntity; + private long catalogId = -1; + private PolarisMetaStoreManager metaStoreManager; + + public PolicyCatalog( + PolarisMetaStoreManager metaStoreManager, + CallContext callContext, + PolarisResolutionManifestCatalogView resolvedEntityView) { + this.callContext = callContext; + this.resolvedEntityView = resolvedEntityView; + this.catalogEntity = + CatalogEntity.of(resolvedEntityView.getResolvedReferenceCatalogEntity().getRawLeafEntity()); + this.catalogId = catalogEntity.getId(); + this.metaStoreManager = metaStoreManager; + } + + public Policy createPolicy( + PolicyIdentifier policyIdentifier, String type, String description, String content) { + PolarisResolvedPathWrapper resolvedParent = + resolvedEntityView.getResolvedPath(policyIdentifier.getNamespace()); + if (resolvedParent == null) { + // Illegal state because the namespace should've already been in the static resolution set. + throw new IllegalStateException( + String.format("Failed to fetch resolved parent for Policy '%s'", policyIdentifier)); + } + + List catalogPath = resolvedParent.getRawFullPath(); + + PolarisResolvedPathWrapper resolvedPolicyEntities = + resolvedEntityView.getPassthroughResolvedPath( + policyIdentifier, PolarisEntityType.POLICY, PolarisEntitySubType.NULL_SUBTYPE); + + PolicyEntity entity = + PolicyEntity.of( + resolvedPolicyEntities == null ? null : resolvedPolicyEntities.getRawLeafEntity()); + + if (entity != null) { + throw new AlreadyExistsException("Policy already exists %s", policyIdentifier); + } + + PolicyType policyType = PolicyType.fromName(type); + if (policyType == null) { + throw new BadRequestException("Unknown policy type: %s", type); + } + + entity = + new PolicyEntity.Builder( + policyIdentifier.getNamespace(), policyIdentifier.getName(), policyType) + .setCatalogId(catalogId) + .setParentId(resolvedParent.getRawLeafEntity().getId()) + .setDescription(description) + .setContent(content) + .setId( + metaStoreManager.generateNewEntityId(callContext.getPolarisCallContext()).getId()) + .setCreateTimestamp(System.currentTimeMillis()) + .build(); + + PolicyValidators.validate(entity); + + EntityResult res = + metaStoreManager.createEntityIfNotExists( + callContext.getPolarisCallContext(), PolarisEntity.toCoreList(catalogPath), entity); + + if (!res.isSuccess()) { + + switch (res.getReturnStatus()) { + case ENTITY_ALREADY_EXISTS: + throw new AlreadyExistsException("Policy already exists %s", policyIdentifier); + + default: + throw new IllegalStateException( + String.format( + "Unknown error status for identifier %s: %s with extraInfo: %s", + policyIdentifier, res.getReturnStatus(), res.getExtraInformation())); + } + } + + PolicyEntity resultEntity = PolicyEntity.of(res.getEntity()); + LOGGER.debug( + "Created Policy entity {} with PolicyIdentifier {}", resultEntity, policyIdentifier); + return constructPolicy(resultEntity); + } + + public List listPolicies(Namespace namespace, PolicyType policyType) { + PolarisResolvedPathWrapper resolvedEntities = resolvedEntityView.getResolvedPath(namespace); + if (resolvedEntities == null) { + throw new IllegalStateException( + String.format("Failed to fetch resolved namespace '%s'", namespace)); + } + + List catalogPath = resolvedEntities.getRawFullPath(); + List policyEntities = + metaStoreManager + .listEntities( + callContext.getPolarisCallContext(), + PolarisEntity.toCoreList(catalogPath), + PolarisEntityType.POLICY, + PolarisEntitySubType.NULL_SUBTYPE) + .getEntities() + .stream() + .map( + polarisEntityActiveRecord -> + PolicyEntity.of( + metaStoreManager + .loadEntity( + callContext.getPolarisCallContext(), + polarisEntityActiveRecord.getCatalogId(), + polarisEntityActiveRecord.getId(), + polarisEntityActiveRecord.getType()) + .getEntity())) + .filter( + policyEntity -> policyType == null || policyEntity.getPolicyType() == policyType) + .toList(); + + List entities = + policyEntities.stream().map(PolarisEntity::nameAndId).toList(); + + return entities.stream() + .map( + entity -> + PolicyIdentifier.builder() + .setNamespace(namespace) + .setName(entity.getName()) + .build()) + .toList(); + } + + public Policy loadPolicy(PolicyIdentifier policyIdentifier) { + PolarisResolvedPathWrapper resolvedEntities = + resolvedEntityView.getPassthroughResolvedPath( + policyIdentifier, PolarisEntityType.POLICY, PolarisEntitySubType.NULL_SUBTYPE); + + PolicyEntity policy = + PolicyEntity.of(resolvedEntities == null ? null : resolvedEntities.getRawLeafEntity()); + + if (policy == null) { + throw new NoSuchPolicyException(String.format("Policy does not exist: %s", policyIdentifier)); + } + return constructPolicy(policy); + } + + public Policy updatePolicy( + PolicyIdentifier policyIdentifier, + String newDescription, + String newContent, + int currentPolicyVersion) { + PolarisResolvedPathWrapper resolvedEntities = + resolvedEntityView.getPassthroughResolvedPath( + policyIdentifier, PolarisEntityType.POLICY, PolarisEntitySubType.NULL_SUBTYPE); + + PolicyEntity policy = + PolicyEntity.of(resolvedEntities == null ? null : resolvedEntities.getRawLeafEntity()); + + if (policy == null) { + throw new NoSuchPolicyException(String.format("Policy does not exist: %s", policyIdentifier)); + } + + // Verify that the current version of the policy matches the version that the user is trying to + // update + int policyVersion = policy.getPolicyVersion(); + if (currentPolicyVersion != policyVersion) { + throw new PolicyVersionMismatchException( + String.format( + "Policy version mismatch. Given version is %d, current version is %d", + currentPolicyVersion, policyVersion)); + } + + if (newDescription.equals(policy.getDescription()) && newContent.equals(policy.getContent())) { + // No need to update the policy if the new description and content are the same as the current + return constructPolicy(policy); + } + + PolicyEntity.Builder newPolicyBuilder = new PolicyEntity.Builder(policy); + newPolicyBuilder.setContent(newContent); + newPolicyBuilder.setDescription(newDescription); + newPolicyBuilder.setPolicyVersion(policyVersion + 1); + PolicyEntity newPolicyEntity = newPolicyBuilder.build(); + + PolicyValidators.validate(newPolicyEntity); + + List catalogPath = resolvedEntities.getRawParentPath(); + newPolicyEntity = + Optional.ofNullable( + metaStoreManager + .updateEntityPropertiesIfNotChanged( + callContext.getPolarisCallContext(), + PolarisEntity.toCoreList(catalogPath), + newPolicyEntity) + .getEntity()) + .map(PolicyEntity::of) + .orElse(null); + + if (newPolicyEntity == null) { + throw new IllegalStateException( + String.format("Failed to update policy %s", policyIdentifier)); + } + + return constructPolicy(newPolicyEntity); + } + + public boolean dropPolicy(PolicyIdentifier policyIdentifier, boolean detachAll) { + // TODO: Implement detachAll when we support attach/detach policy + PolarisResolvedPathWrapper resolvedEntities = + resolvedEntityView.getPassthroughResolvedPath( + policyIdentifier, PolarisEntityType.POLICY, PolarisEntitySubType.NULL_SUBTYPE); + if (resolvedEntities == null) { + throw new NoSuchPolicyException(String.format("Policy does not exist: %s", policyIdentifier)); + } + + List catalogPath = resolvedEntities.getRawParentPath(); + PolarisEntity leafEntity = resolvedEntities.getRawLeafEntity(); + + DropEntityResult dropEntityResult = + metaStoreManager.dropEntityIfExists( + callContext.getPolarisCallContext(), + PolarisEntity.toCoreList(catalogPath), + leafEntity, + Map.of(), + false); + + return dropEntityResult.isSuccess(); + } + + private static Policy constructPolicy(PolicyEntity policyEntity) { + return Policy.builder() + .setPolicyType(policyEntity.getPolicyType().getName()) + .setInheritable(policyEntity.getPolicyType().isInheritable()) + .setName(policyEntity.getName()) + .setDescription(policyEntity.getDescription()) + .setContent(policyEntity.getContent()) + .setVersion(policyEntity.getPolicyVersion()) + .build(); + } +} diff --git a/service/common/src/main/java/org/apache/polaris/service/exception/PolarisExceptionMapper.java b/service/common/src/main/java/org/apache/polaris/service/exception/PolarisExceptionMapper.java index 0636f5b97b..22b9e187db 100644 --- a/service/common/src/main/java/org/apache/polaris/service/exception/PolarisExceptionMapper.java +++ b/service/common/src/main/java/org/apache/polaris/service/exception/PolarisExceptionMapper.java @@ -25,6 +25,8 @@ import org.apache.iceberg.rest.responses.ErrorResponse; import org.apache.polaris.core.exceptions.AlreadyExistsException; import org.apache.polaris.core.exceptions.PolarisException; +import org.apache.polaris.core.policy.exceptions.NoSuchPolicyException; +import org.apache.polaris.core.policy.exceptions.PolicyVersionMismatchException; import org.apache.polaris.core.policy.validator.InvalidPolicyException; import org.apache.polaris.service.context.UnresolvableRealmContextException; import org.slf4j.Logger; @@ -47,6 +49,10 @@ private Response.Status getStatus(PolarisException exception) { return Response.Status.NOT_FOUND; } else if (exception instanceof InvalidPolicyException) { return Response.Status.BAD_REQUEST; + } else if (exception instanceof NoSuchPolicyException) { + return Response.Status.NOT_FOUND; + } else if (exception instanceof PolicyVersionMismatchException) { + return Response.Status.CONFLICT; } else { return Response.Status.INTERNAL_SERVER_ERROR; } diff --git a/service/common/src/testFixtures/java/org/apache/polaris/service/catalog/PolarisPassthroughResolutionView.java b/service/common/src/testFixtures/java/org/apache/polaris/service/catalog/PolarisPassthroughResolutionView.java index 8c98e910b1..8919ef5f02 100644 --- a/service/common/src/testFixtures/java/org/apache/polaris/service/catalog/PolarisPassthroughResolutionView.java +++ b/service/common/src/testFixtures/java/org/apache/polaris/service/catalog/PolarisPassthroughResolutionView.java @@ -31,6 +31,7 @@ import org.apache.polaris.core.persistence.resolver.PolarisResolutionManifest; import org.apache.polaris.core.persistence.resolver.PolarisResolutionManifestCatalogView; import org.apache.polaris.core.persistence.resolver.ResolverPath; +import org.apache.polaris.service.types.PolicyIdentifier; /** * For test purposes or for elevated-privilege scenarios where entity resolution is allowed to @@ -94,6 +95,15 @@ public PolarisResolvedPathWrapper getResolvedPath( identifier); manifest.resolveAll(); return manifest.getResolvedPath(identifier, entityType, subType); + } else if (key instanceof PolicyIdentifier policyIdentifier) { + manifest.addPath( + new ResolverPath( + PolarisCatalogHelpers.identifierToList( + policyIdentifier.getNamespace(), policyIdentifier.getName()), + entityType), + policyIdentifier); + manifest.resolveAll(); + return manifest.getResolvedPath(policyIdentifier, entityType, subType); } else { throw new IllegalStateException( String.format( @@ -130,6 +140,14 @@ public PolarisResolvedPathWrapper getPassthroughResolvedPath( new ResolverPath(PolarisCatalogHelpers.tableIdentifierToList(identifier), entityType), identifier); return manifest.getPassthroughResolvedPath(identifier, entityType, subType); + } else if (key instanceof PolicyIdentifier policyIdentifier) { + manifest.addPassthroughPath( + new ResolverPath( + PolarisCatalogHelpers.identifierToList( + policyIdentifier.getNamespace(), policyIdentifier.getName()), + entityType), + policyIdentifier); + return manifest.getPassthroughResolvedPath(policyIdentifier, entityType, subType); } else { throw new IllegalStateException( String.format(