From a27c2e55cffc4bc1181642b797fa38372627f327 Mon Sep 17 00:00:00 2001 From: Michael Collado Date: Thu, 22 Aug 2024 13:37:40 -0700 Subject: [PATCH 1/5] Add support for customer write.data.path and write.metadata.path with test for object store location provider --- .../PolarisStorageConfigurationInfo.java | 34 +++++- .../service/catalog/BasePolarisCatalog.java | 77 ++++++++++---- .../PolarisApplicationIntegrationTest.java | 6 +- .../src/test_spark_sql_s3_with_privileges.py | 100 +++++++++++++++++- 4 files changed, 192 insertions(+), 25 deletions(-) diff --git a/polaris-core/src/main/java/org/apache/polaris/core/storage/PolarisStorageConfigurationInfo.java b/polaris-core/src/main/java/org/apache/polaris/core/storage/PolarisStorageConfigurationInfo.java index a169cd555d..2ddab7420f 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/storage/PolarisStorageConfigurationInfo.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/storage/PolarisStorageConfigurationInfo.java @@ -25,11 +25,16 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableList; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Locale; +import java.util.Map; +import java.util.Objects; import java.util.Optional; +import java.util.stream.Collector; +import java.util.stream.Stream; import org.apache.polaris.core.PolarisConfiguration; import org.apache.polaris.core.PolarisDiagnostics; import org.apache.polaris.core.admin.model.Catalog; @@ -174,11 +179,38 @@ public static Optional forEntityPath( LOGGER.debug( "Allowing unstructured table location for entity: {}", entityPathReversed.get(0).getName()); - return configInfo; + return userSpecifiedWriteLocations(entityPathReversed.get(0).getPropertiesAsMap()) + .map( + locs -> + (PolarisStorageConfigurationInfo) + new StorageConfigurationOverride( + configInfo, + ImmutableList.builder() + .addAll(configInfo.getAllowedLocations()) + .addAll(locs) + .build())) + .orElse(configInfo); } }); } + private static Optional> userSpecifiedWriteLocations( + Map properties) { + return Optional.ofNullable(properties) + .flatMap( + p -> + Stream.of( + p.get(TableLikeEntity.USER_SPECIFIED_WRITE_DATA_LOCATION_KEY), + p.get(TableLikeEntity.USER_SPECIFIED_WRITE_METADATA_LOCATION_KEY)) + .filter(Objects::nonNull) + .collect( + Collector., Optional>>of( + ArrayList::new, + List::add, + (l, r) -> ImmutableList.builder().addAll(l).addAll(r).build(), + l -> l.isEmpty() ? Optional.empty() : Optional.of(l)))); + } + private static @NotNull Optional findStorageInfoFromHierarchy( List entityPath) { for (int i = entityPath.size() - 1; i >= 0; i--) { diff --git a/polaris-service/src/main/java/org/apache/polaris/service/catalog/BasePolarisCatalog.java b/polaris-service/src/main/java/org/apache/polaris/service/catalog/BasePolarisCatalog.java index 2d818760f4..ec8f9b3b01 100644 --- a/polaris-service/src/main/java/org/apache/polaris/service/catalog/BasePolarisCatalog.java +++ b/polaris-service/src/main/java/org/apache/polaris/service/catalog/BasePolarisCatalog.java @@ -367,6 +367,20 @@ private Set getLocationsAllowedToBeAccessed(TableMetadata tableMetadata) Set locations = new HashSet<>(); locations.add(concatFilePrefixes(basicLocation, "data/", "/")); locations.add(concatFilePrefixes(basicLocation, "metadata/", "/")); + if (tableMetadata + .properties() + .containsKey(TableLikeEntity.USER_SPECIFIED_WRITE_DATA_LOCATION_KEY)) { + locations.add( + tableMetadata.properties().get(TableLikeEntity.USER_SPECIFIED_WRITE_DATA_LOCATION_KEY)); + } + if (tableMetadata + .properties() + .containsKey(TableLikeEntity.USER_SPECIFIED_WRITE_METADATA_LOCATION_KEY)) { + locations.add( + tableMetadata + .properties() + .get(TableLikeEntity.USER_SPECIFIED_WRITE_METADATA_LOCATION_KEY)); + } return locations; } @@ -854,10 +868,6 @@ private Map refreshCredentials( Set storageActions, Set tableLocations, PolarisEntity entity) { - // Important: Any locations added to the set of requested locations need to be validated - // prior to requested subscoped credentials. - tableLocations.forEach(tl -> validateLocationForTableLike(tableIdentifier, tl)); - Boolean skipCredentialSubscopingIndirection = getBooleanContextConfiguration( SKIP_CREDENTIAL_SUBSCOPING_INDIRECTION, SKIP_CREDENTIAL_SUBSCOPING_INDIRECTION_DEFAULT); @@ -924,6 +934,17 @@ private void validateLocationForTableLike( TableIdentifier identifier, String location, PolarisResolvedPathWrapper resolvedStorageEntity) { + validateLocationsForTableLike(identifier, Set.of(location), resolvedStorageEntity); + } + + /** + * Validates that the specified {@code locations} are valid for whatever storage config is found + * for this TableLike's parent hierarchy. + */ + private void validateLocationsForTableLike( + TableIdentifier identifier, + Set locations, + PolarisResolvedPathWrapper resolvedStorageEntity) { Optional optStorageConfiguration = PolarisStorageConfigurationInfo.forEntityPath( callContext.getPolarisCallContext().getDiagServices(), @@ -934,7 +955,7 @@ private void validateLocationForTableLike( Map> validationResults = InMemoryStorageIntegration.validateSubpathsOfAllowedLocations( - storageConfigInfo, Set.of(PolarisStorageActions.ALL), Set.of(location)); + storageConfigInfo, Set.of(PolarisStorageActions.ALL), locations); validationResults .values() .forEach( @@ -945,12 +966,12 @@ private void validateLocationForTableLike( result -> { if (!result.isSuccess()) { throw new ForbiddenException( - "Invalid location '%s' for identifier '%s': %s", - location, identifier, result.getMessage()); + "Invalid locations '%s' for identifier '%s': %s", + locations, identifier, result.getMessage()); } else { LOGGER.debug( - "Validated location '{}' for identifier '{}'", - location, + "Validated locations '{}' for identifier '{}'", + locations, identifier); } })); @@ -975,10 +996,11 @@ private void validateLocationForTableLike( // } }, () -> { - if (location.startsWith("file:") || location.startsWith("http")) { + if (locations.stream() + .anyMatch(location -> location.startsWith("file:") || location.startsWith("http"))) { throw new ForbiddenException( - "Invalid location '%s' for identifier '%s': File locations are not allowed", - location, identifier); + "Invalid locations '%s' for identifier '%s': File locations are not allowed", + locations, identifier); } }); } @@ -998,8 +1020,7 @@ private void validateNoLocationOverlap( LOGGER.debug("Skipping location overlap validation for identifier '{}'", identifier); } else { // if (entity.getSubType().equals(PolarisEntitySubType.TABLE)) { // TODO - is this necessary for views? overlapping views do not expose subdirectories via the - // credential vending - // so this feels like an unnecessary restriction + // credential vending so this feels like an unnecessary restriction LOGGER.debug("Validating no overlap with sibling tables or namespaces"); validateNoLocationOverlap(location, resolvedNamespace, identifier.name()); } @@ -1249,12 +1270,31 @@ public void doCommit(TableMetadata base, TableMetadata metadata) { : resolvedTableEntities.getRawParentPath(); CatalogEntity catalog = CatalogEntity.of(resolvedNamespace.getFirst()); - if (base == null || !metadata.location().equals(base.location())) { + if (base == null + || !metadata.location().equals(base.location()) + || !Objects.equal( + base.properties().get(TableLikeEntity.USER_SPECIFIED_WRITE_DATA_LOCATION_KEY), + metadata.properties().get(TableLikeEntity.USER_SPECIFIED_WRITE_DATA_LOCATION_KEY))) { // If location is changing then we must validate that the requested location is valid // for the storage configuration inherited under this entity's path. - validateLocationForTableLike(tableIdentifier, metadata.location(), resolvedStorageEntity); - // also validate that the view location doesn't overlap an existing table - validateNoLocationOverlap(tableIdentifier, resolvedNamespace, metadata.location()); + Set dataLocations = new HashSet<>(); + dataLocations.add(metadata.location()); + if (metadata.properties().get(TableLikeEntity.USER_SPECIFIED_WRITE_DATA_LOCATION_KEY) + != null) { + dataLocations.add( + metadata.properties().get(TableLikeEntity.USER_SPECIFIED_WRITE_DATA_LOCATION_KEY)); + } + if (metadata.properties().get(TableLikeEntity.USER_SPECIFIED_WRITE_METADATA_LOCATION_KEY) + != null) { + dataLocations.add( + metadata + .properties() + .get(TableLikeEntity.USER_SPECIFIED_WRITE_METADATA_LOCATION_KEY)); + } + validateLocationsForTableLike(tableIdentifier, dataLocations, resolvedStorageEntity); + // also validate that the table location doesn't overlap an existing table + dataLocations.forEach( + location -> validateNoLocationOverlap(tableIdentifier, resolvedNamespace, location)); // and that the metadata file points to a location within the table's directory structure if (metadata.metadataFileLocation() != null) { validateMetadataFileInTableDir(tableIdentifier, metadata, catalog); @@ -1921,7 +1961,6 @@ private List listTableLike(PolarisEntitySubType subType, Namesp * @return FileIO object */ private FileIO loadFileIO(String ioImpl, Map properties) { - blockedUserSpecifiedWriteLocation(properties); Map propertiesWithS3CustomizedClientFactory = new HashMap<>(properties); propertiesWithS3CustomizedClientFactory.put( S3FileIOProperties.CLIENT_FACTORY, PolarisS3FileIOClientFactory.class.getName()); diff --git a/polaris-service/src/test/java/org/apache/polaris/service/PolarisApplicationIntegrationTest.java b/polaris-service/src/test/java/org/apache/polaris/service/PolarisApplicationIntegrationTest.java index 50f30b417b..175da7fc59 100644 --- a/polaris-service/src/test/java/org/apache/polaris/service/PolarisApplicationIntegrationTest.java +++ b/polaris-service/src/test/java/org/apache/polaris/service/PolarisApplicationIntegrationTest.java @@ -442,8 +442,7 @@ public void testIcebergCreateTablesWithWritePathBlocked(TestInfo testInfo) throw .withProperties(Map.of("write.data.path", "s3://my-bucket/path/to/data")) .create()) .isInstanceOf(ForbiddenException.class) - .hasMessage( - "Forbidden: Delegate access to table with user-specified write location is temporarily not supported."); + .hasMessageContaining("Forbidden: Invalid locations"); Assertions.assertThatThrownBy( () -> @@ -461,8 +460,7 @@ public void testIcebergCreateTablesWithWritePathBlocked(TestInfo testInfo) throw Map.of("write.metadata.path", "s3://my-bucket/path/to/data")) .create()) .isInstanceOf(ForbiddenException.class) - .hasMessage( - "Forbidden: Delegate access to table with user-specified write location is temporarily not supported."); + .hasMessageContaining("Forbidden: Invalid locations"); } catch (BadRequestException e) { LOGGER.info("Received expected exception {}", e.getMessage()); } diff --git a/regtests/t_pyspark/src/test_spark_sql_s3_with_privileges.py b/regtests/t_pyspark/src/test_spark_sql_s3_with_privileges.py index edc6401eea..be4cc81193 100644 --- a/regtests/t_pyspark/src/test_spark_sql_s3_with_privileges.py +++ b/regtests/t_pyspark/src/test_spark_sql_s3_with_privileges.py @@ -39,7 +39,7 @@ from polaris.management import ApiClient as ManagementApiClient from polaris.management import PolarisDefaultApi, Principal, PrincipalRole, CatalogRole, \ CatalogGrant, CatalogPrivilege, ApiException, CreateCatalogRoleRequest, CreatePrincipalRoleRequest, \ - CreatePrincipalRequest, AddGrantRequest, GrantCatalogRoleRequest, GrantPrincipalRoleRequest + CreatePrincipalRequest, AddGrantRequest, GrantCatalogRoleRequest, GrantPrincipalRoleRequest, UpdateCatalogRequest from polaris.management.exceptions import ForbiddenException @@ -513,6 +513,104 @@ def test_spark_credentials_can_delete_after_purge(root_client, snowflake_catalog pytest.fail(f"Expected all data to be deleted, but found data files {objects['Contents']}") +@pytest.mark.skipif(os.environ.get('AWS_TEST_ENABLED', 'False').lower() != 'true', reason='AWS_TEST_ENABLED is not set or is false') +def test_spark_credentials_can_write_with_random_prefix(root_client, snowflake_catalog, polaris_catalog_url, snowman, + snowman_catalog_client, test_bucket): + """ + Using snowman, create namespaces and a table. Insert into the table in multiple operations and update existing records + to generate multiple metadata.json files and manfiests. Drop the table with purge=true. Poll S3 and validate all of + the files are deleted. + + Using the reader principal's credentials verify read access. Validate the reader cannot insert into the table. + :param root_client: + :param snowflake_catalog: + :param polaris_catalog_url: + :param snowman: + :param reader: + :return: + """ + snowflake_catalog.properties.additional_properties['allow.unstructured.table.location'] = 'true' + root_client.update_catalog(catalog_name=snowflake_catalog.name, + update_catalog_request=UpdateCatalogRequest(properties=snowflake_catalog.properties.to_dict(), + current_entity_version=snowflake_catalog.entity_version)) + with IcebergSparkSession(credentials=f'{snowman.principal.client_id}:{snowman.credentials.client_secret}', + catalog_name=snowflake_catalog.name, + polaris_url=polaris_catalog_url) as spark: + table_name = f'iceberg_test_table_{str(uuid.uuid4())[-10:]}' + spark.sql(f'USE {snowflake_catalog.name}') + spark.sql('CREATE NAMESPACE db1') + spark.sql('CREATE NAMESPACE db1.schema') + spark.sql('SHOW NAMESPACES') + spark.sql('USE db1.schema') + spark.sql(f"CREATE TABLE {table_name} (col1 int, col2 string) TBLPROPERTIES ('write.object-storage.enabled'='true','write.data.path'='s3://{test_bucket}/polaris_test/snowflake_catalog/{table_name}data')") + spark.sql('SHOW TABLES') + + # several inserts and an update, which should cause earlier files to show up as deleted in the later manifests + spark.sql(f"""INSERT INTO {table_name} VALUES + (10, 'mystring'), + (20, 'anotherstring'), + (30, null) + """) + spark.sql(f"""INSERT INTO {table_name} VALUES + (40, 'mystring'), + (50, 'anotherstring'), + (60, null) + """) + spark.sql(f"""INSERT INTO {table_name} VALUES + (70, 'mystring'), + (80, 'anotherstring'), + (90, null) + """) + spark.sql(f"UPDATE {table_name} SET col2='changed string' WHERE col1 BETWEEN 20 AND 50") + count = spark.sql(f"SELECT * FROM {table_name}").count() + + assert count == 9 + + # fetch aws credentials to examine the metadata files + response = snowman_catalog_client.load_table(snowflake_catalog.name, unquote('db1%1Fschema'), table_name, + "true") + assert response.config is not None + assert 's3.access-key-id' in response.config + assert 's3.secret-access-key' in response.config + assert 's3.session-token' in response.config + + s3 = boto3.client('s3', + aws_access_key_id=response.config['s3.access-key-id'], + aws_secret_access_key=response.config['s3.secret-access-key'], + aws_session_token=response.config['s3.session-token']) + + objects = s3.list_objects(Bucket=test_bucket, Delimiter='/', + Prefix=f'polaris_test/snowflake_catalog/{table_name}data/') + assert objects is not None + assert len(objects['CommonPrefixes']) >= 3 + + print(f"Found common prefixes in S3 {objects['CommonPrefixes']}") + objs_to_delete = [] + for prefix in objects['CommonPrefixes']: + data_objects = s3.list_objects(Bucket=test_bucket, Delimiter='/', + Prefix=f'{prefix["Prefix"]}schema/{table_name}/') + assert data_objects is not None + print(data_objects) + assert 'Contents' in data_objects + objs_to_delete.extend([{'Key': obj['Key']} for obj in data_objects['Contents']]) + + objects = s3.list_objects(Bucket=test_bucket, Delimiter='/', + Prefix=f'polaris_test/snowflake_catalog/db1/schema/{table_name}/metadata/') + assert objects is not None + assert 'Contents' in objects + assert len(objects['Contents']) == 15 # 5 metadata.json files, 4 manifest lists, and 6 manifests + print(f"Found {len(objects['Contents'])} metadata files in S3 before drop") + + # use the api client to ensure the purge flag is set to true + snowman_catalog_client.drop_table(snowflake_catalog.name, + codecs.decode("1F", "hex").decode("UTF-8").join(['db1', 'schema']), table_name, + purge_requested=True) + spark.sql('DROP NAMESPACE db1.schema') + spark.sql('DROP NAMESPACE db1') + s3.delete_objects(Bucket=test_bucket, + Delete={'Objects': objs_to_delete}) + + @pytest.mark.skipif(os.environ.get('AWS_TEST_ENABLED', 'False').lower() != 'true', reason='AWS_TEST_ENABLED is not set or is false') # @pytest.mark.skip(reason="This test is flaky") def test_spark_credentials_can_create_views(snowflake_catalog, polaris_catalog_url, snowman): From 77c784ed3acad81481cee65ad522cc776eec4407 Mon Sep 17 00:00:00 2001 From: Michael Collado Date: Fri, 23 Aug 2024 11:50:46 -0700 Subject: [PATCH 2/5] Add test for object-store layout under default table directory --- .../src/test_spark_sql_s3_with_privileges.py | 95 +++++++++++++++++++ 1 file changed, 95 insertions(+) diff --git a/regtests/t_pyspark/src/test_spark_sql_s3_with_privileges.py b/regtests/t_pyspark/src/test_spark_sql_s3_with_privileges.py index be4cc81193..af8a9633ad 100644 --- a/regtests/t_pyspark/src/test_spark_sql_s3_with_privileges.py +++ b/regtests/t_pyspark/src/test_spark_sql_s3_with_privileges.py @@ -611,6 +611,101 @@ def test_spark_credentials_can_write_with_random_prefix(root_client, snowflake_c Delete={'Objects': objs_to_delete}) +@pytest.mark.skipif(os.environ.get('AWS_TEST_ENABLED', 'False').lower() != 'true', reason='AWS_TEST_ENABLED is not set or is false') +def test_spark_object_store_layout_under_table_dir(root_client, snowflake_catalog, polaris_catalog_url, snowman, + snowman_catalog_client, test_bucket): + """ + Using snowman, create namespaces and a table. Insert into the table in multiple operations and update existing records + to generate multiple metadata.json files and manfiests. Drop the table with purge=true. Poll S3 and validate all of + the files are deleted. + + Using the reader principal's credentials verify read access. Validate the reader cannot insert into the table. + :param root_client: + :param snowflake_catalog: + :param polaris_catalog_url: + :param snowman: + :param reader: + :return: + """ + with IcebergSparkSession(credentials=f'{snowman.principal.client_id}:{snowman.credentials.client_secret}', + catalog_name=snowflake_catalog.name, + polaris_url=polaris_catalog_url) as spark: + table_name = f'iceberg_test_table_{str(uuid.uuid4())[-10:]}' + spark.sql(f'USE {snowflake_catalog.name}') + spark.sql('CREATE NAMESPACE db1') + spark.sql('CREATE NAMESPACE db1.schema') + spark.sql('SHOW NAMESPACES') + spark.sql('USE db1.schema') + table_base_dir = f'polaris_test/snowflake_catalog/db1/schema/{table_name}/obj_layout/' + spark.sql(f"CREATE TABLE {table_name} (col1 int, col2 string) TBLPROPERTIES ('write.object-storage.enabled'='true','write.data.path'='s3://{test_bucket}/{table_base_dir}')") + spark.sql('SHOW TABLES') + + # several inserts and an update, which should cause earlier files to show up as deleted in the later manifests + spark.sql(f"""INSERT INTO {table_name} VALUES + (10, 'mystring'), + (20, 'anotherstring'), + (30, null) + """) + spark.sql(f"""INSERT INTO {table_name} VALUES + (40, 'mystring'), + (50, 'anotherstring'), + (60, null) + """) + spark.sql(f"""INSERT INTO {table_name} VALUES + (70, 'mystring'), + (80, 'anotherstring'), + (90, null) + """) + spark.sql(f"UPDATE {table_name} SET col2='changed string' WHERE col1 BETWEEN 20 AND 50") + count = spark.sql(f"SELECT * FROM {table_name}").count() + + assert count == 9 + + # fetch aws credentials to examine the metadata files + response = snowman_catalog_client.load_table(snowflake_catalog.name, unquote('db1%1Fschema'), table_name, + "true") + assert response.config is not None + assert 's3.access-key-id' in response.config + assert 's3.secret-access-key' in response.config + assert 's3.session-token' in response.config + + s3 = boto3.client('s3', + aws_access_key_id=response.config['s3.access-key-id'], + aws_secret_access_key=response.config['s3.secret-access-key'], + aws_session_token=response.config['s3.session-token']) + + objects = s3.list_objects(Bucket=test_bucket, Delimiter='/', + Prefix=table_base_dir) + assert objects is not None + assert len(objects['CommonPrefixes']) >= 3 + + print(f"Found common prefixes in S3 {objects['CommonPrefixes']}") + objs_to_delete = [] + for prefix in objects['CommonPrefixes']: + data_objects = s3.list_objects(Bucket=test_bucket, Delimiter='/', + Prefix=f'{prefix["Prefix"]}') + assert data_objects is not None + print(data_objects) + assert 'Contents' in data_objects + objs_to_delete.extend([{'Key': obj['Key']} for obj in data_objects['Contents']]) + + objects = s3.list_objects(Bucket=test_bucket, Delimiter='/', + Prefix=f'polaris_test/snowflake_catalog/db1/schema/{table_name}/metadata/') + assert objects is not None + assert 'Contents' in objects + assert len(objects['Contents']) == 15 # 5 metadata.json files, 4 manifest lists, and 6 manifests + print(f"Found {len(objects['Contents'])} metadata files in S3 before drop") + + # use the api client to ensure the purge flag is set to true + snowman_catalog_client.drop_table(snowflake_catalog.name, + codecs.decode("1F", "hex").decode("UTF-8").join(['db1', 'schema']), table_name, + purge_requested=True) + spark.sql('DROP NAMESPACE db1.schema') + spark.sql('DROP NAMESPACE db1') + s3.delete_objects(Bucket=test_bucket, + Delete={'Objects': objs_to_delete}) + + @pytest.mark.skipif(os.environ.get('AWS_TEST_ENABLED', 'False').lower() != 'true', reason='AWS_TEST_ENABLED is not set or is false') # @pytest.mark.skip(reason="This test is flaky") def test_spark_credentials_can_create_views(snowflake_catalog, polaris_catalog_url, snowman): From 4650c8bec5b64a16611fadb017d4b49e096e060d Mon Sep 17 00:00:00 2001 From: Michael Collado Date: Fri, 23 Aug 2024 12:08:03 -0700 Subject: [PATCH 3/5] Updated comments in the new tests --- .../src/test_spark_sql_s3_with_privileges.py | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/regtests/t_pyspark/src/test_spark_sql_s3_with_privileges.py b/regtests/t_pyspark/src/test_spark_sql_s3_with_privileges.py index af8a9633ad..f58e911273 100644 --- a/regtests/t_pyspark/src/test_spark_sql_s3_with_privileges.py +++ b/regtests/t_pyspark/src/test_spark_sql_s3_with_privileges.py @@ -517,9 +517,10 @@ def test_spark_credentials_can_delete_after_purge(root_client, snowflake_catalog def test_spark_credentials_can_write_with_random_prefix(root_client, snowflake_catalog, polaris_catalog_url, snowman, snowman_catalog_client, test_bucket): """ - Using snowman, create namespaces and a table. Insert into the table in multiple operations and update existing records - to generate multiple metadata.json files and manfiests. Drop the table with purge=true. Poll S3 and validate all of - the files are deleted. + Update the catalog configuration to support unstructured table locations. Using snowman, create namespaces and a + table configured to use object-store layout in a folder under the catalog root, outside of the default table + directory. Insert into the table in multiple operations and update existing records to generate multiple metadata.json + files and manifests. Validate the data files are present under the expected subdirectory. Delete the files afterward. Using the reader principal's credentials verify read access. Validate the reader cannot insert into the table. :param root_client: @@ -615,9 +616,10 @@ def test_spark_credentials_can_write_with_random_prefix(root_client, snowflake_c def test_spark_object_store_layout_under_table_dir(root_client, snowflake_catalog, polaris_catalog_url, snowman, snowman_catalog_client, test_bucket): """ - Using snowman, create namespaces and a table. Insert into the table in multiple operations and update existing records - to generate multiple metadata.json files and manfiests. Drop the table with purge=true. Poll S3 and validate all of - the files are deleted. + Using snowman, create namespaces and a table configured to use object-store layout, using a folder under the default + table directory structure. Insert into the table in multiple operations and update existing records + to generate multiple metadata.json files and manifests. Validate the data files are present under the expected + subdirectory. Delete the files afterward. Using the reader principal's credentials verify read access. Validate the reader cannot insert into the table. :param root_client: From 0c32a7a9aa28d93636458e5b909ef4471eacf349 Mon Sep 17 00:00:00 2001 From: Michael Collado Date: Tue, 27 Aug 2024 10:49:06 -0700 Subject: [PATCH 4/5] address comments --- .../PolarisStorageConfigurationInfo.java | 35 ++++++++----------- .../service/catalog/BasePolarisCatalog.java | 9 +++-- 2 files changed, 20 insertions(+), 24 deletions(-) diff --git a/polaris-core/src/main/java/org/apache/polaris/core/storage/PolarisStorageConfigurationInfo.java b/polaris-core/src/main/java/org/apache/polaris/core/storage/PolarisStorageConfigurationInfo.java index 2ddab7420f..67d5b04e09 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/storage/PolarisStorageConfigurationInfo.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/storage/PolarisStorageConfigurationInfo.java @@ -33,7 +33,7 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; -import java.util.stream.Collector; +import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.polaris.core.PolarisConfiguration; import org.apache.polaris.core.PolarisDiagnostics; @@ -179,36 +179,29 @@ public static Optional forEntityPath( LOGGER.debug( "Allowing unstructured table location for entity: {}", entityPathReversed.get(0).getName()); - return userSpecifiedWriteLocations(entityPathReversed.get(0).getPropertiesAsMap()) - .map( - locs -> - (PolarisStorageConfigurationInfo) - new StorageConfigurationOverride( - configInfo, - ImmutableList.builder() - .addAll(configInfo.getAllowedLocations()) - .addAll(locs) - .build())) - .orElse(configInfo); + + List locs = + userSpecifiedWriteLocations(entityPathReversed.get(0).getPropertiesAsMap()); + return new StorageConfigurationOverride( + configInfo, + ImmutableList.builder() + .addAll(configInfo.getAllowedLocations()) + .addAll(locs) + .build()); } }); } - private static Optional> userSpecifiedWriteLocations( - Map properties) { + private static List userSpecifiedWriteLocations(Map properties) { return Optional.ofNullable(properties) - .flatMap( + .map( p -> Stream.of( p.get(TableLikeEntity.USER_SPECIFIED_WRITE_DATA_LOCATION_KEY), p.get(TableLikeEntity.USER_SPECIFIED_WRITE_METADATA_LOCATION_KEY)) .filter(Objects::nonNull) - .collect( - Collector., Optional>>of( - ArrayList::new, - List::add, - (l, r) -> ImmutableList.builder().addAll(l).addAll(r).build(), - l -> l.isEmpty() ? Optional.empty() : Optional.of(l)))); + .collect(Collectors.toList())) + .orElse(List.of()); } private static @NotNull Optional findStorageInfoFromHierarchy( diff --git a/polaris-service/src/main/java/org/apache/polaris/service/catalog/BasePolarisCatalog.java b/polaris-service/src/main/java/org/apache/polaris/service/catalog/BasePolarisCatalog.java index ec8f9b3b01..552d6a93eb 100644 --- a/polaris-service/src/main/java/org/apache/polaris/service/catalog/BasePolarisCatalog.java +++ b/polaris-service/src/main/java/org/apache/polaris/service/catalog/BasePolarisCatalog.java @@ -996,11 +996,14 @@ private void validateLocationsForTableLike( // } }, () -> { - if (locations.stream() - .anyMatch(location -> location.startsWith("file:") || location.startsWith("http"))) { + List invalidLocations = + locations.stream() + .filter(location -> location.startsWith("file:") || location.startsWith("http")) + .collect(Collectors.toList()); + if (!invalidLocations.isEmpty()) { throw new ForbiddenException( "Invalid locations '%s' for identifier '%s': File locations are not allowed", - locations, identifier); + invalidLocations, identifier); } }); } From df92bf5bc2db3eaa8e39811c3d2b779e52b52aa9 Mon Sep 17 00:00:00 2001 From: Michael Collado Date: Tue, 27 Aug 2024 11:14:17 -0700 Subject: [PATCH 5/5] Fix merge error --- .../polaris/core/storage/PolarisStorageConfigurationInfo.java | 1 + 1 file changed, 1 insertion(+) diff --git a/polaris-core/src/main/java/org/apache/polaris/core/storage/PolarisStorageConfigurationInfo.java b/polaris-core/src/main/java/org/apache/polaris/core/storage/PolarisStorageConfigurationInfo.java index 67d5b04e09..3c73da3946 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/storage/PolarisStorageConfigurationInfo.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/storage/PolarisStorageConfigurationInfo.java @@ -42,6 +42,7 @@ import org.apache.polaris.core.entity.CatalogEntity; import org.apache.polaris.core.entity.PolarisEntity; import org.apache.polaris.core.entity.PolarisEntityConstants; +import org.apache.polaris.core.entity.TableLikeEntity; import org.apache.polaris.core.storage.aws.AwsStorageConfigurationInfo; import org.apache.polaris.core.storage.azure.AzureStorageConfigurationInfo; import org.apache.polaris.core.storage.gcp.GcpStorageConfigurationInfo;