diff --git a/polaris-core/src/main/java/org/apache/polaris/core/config/BehaviorChangeConfiguration.java b/polaris-core/src/main/java/org/apache/polaris/core/config/BehaviorChangeConfiguration.java index db5176e7c3..e17af970f0 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/config/BehaviorChangeConfiguration.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/config/BehaviorChangeConfiguration.java @@ -19,6 +19,7 @@ package org.apache.polaris.core.config; import java.util.Optional; +import java.util.function.Function; /** * Internal configuration flags for non-feature behavior changes in Polaris. These flags control @@ -37,8 +38,9 @@ protected BehaviorChangeConfiguration( String description, T defaultValue, Optional catalogConfig, - Optional catalogConfigUnsafe) { - super(key, description, defaultValue, catalogConfig, catalogConfigUnsafe); + Optional catalogConfigUnsafe, + Optional> validation) { + super(key, description, defaultValue, catalogConfig, catalogConfigUnsafe, validation); } public static final BehaviorChangeConfiguration VALIDATE_VIEW_LOCATION_OVERLAP = diff --git a/polaris-core/src/main/java/org/apache/polaris/core/config/FeatureConfiguration.java b/polaris-core/src/main/java/org/apache/polaris/core/config/FeatureConfiguration.java index 7546212282..cf290f3da3 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/config/FeatureConfiguration.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/config/FeatureConfiguration.java @@ -20,6 +20,7 @@ import java.util.List; import java.util.Optional; +import java.util.function.Function; import org.apache.polaris.core.admin.model.StorageConfigInfo; import org.apache.polaris.core.connection.ConnectionType; import org.apache.polaris.core.context.CallContext; @@ -38,8 +39,14 @@ protected FeatureConfiguration( String description, T defaultValue, Optional catalogConfig, - Optional catalogConfigUnsafe) { - super(key, description, defaultValue, catalogConfig, catalogConfigUnsafe); + Optional catalogConfigUnsafe, + Optional> validation) { + super(key, description, defaultValue, catalogConfig, catalogConfigUnsafe, validation); + } + + public static final class Constants { + public static final int METADATA_CACHE_MAX_BYTES_NO_CACHING = 0; + public static final int METADATA_CACHE_MAX_BYTES_INFINITE_CACHING = -1; } /** @@ -287,4 +294,25 @@ public static void enforceFeatureEnabledOrThrow( + "This should only be set to 'true' for tests!") .defaultValue(false) .buildFeatureConfiguration(); + + public static final PolarisConfiguration METADATA_CACHE_MAX_BYTES = + PolarisConfiguration.builder() + .key("METADATA_CACHE_MAX_BYTES") + .catalogConfig("polaris.config.metadata-cache-max-bytes") + .description( + "If nonzero, the approximate max size a table's metadata can be in order to be cached in the persistence" + + " layer. If zero, no metadata will be cached or served from the cache. If -1, all metadata" + + " will be cached.") + .defaultValue(Constants.METADATA_CACHE_MAX_BYTES_NO_CACHING) + .validation(value -> value >= -1) + .buildFeatureConfiguration(); + + public static final PolarisConfiguration ALWAYS_FILTER_SNAPSHOTS = + PolarisConfiguration.builder() + .key("ALWAYS_FILTER_SNAPSHOTS") + .description( + "If set, Polaris will always attempt to filter snapshots from a LoadTableResponse even when " + + "doing so requires additional serialization of the TableMetadata") + .defaultValue(true) + .buildFeatureConfiguration(); } diff --git a/polaris-core/src/main/java/org/apache/polaris/core/config/PolarisConfiguration.java b/polaris-core/src/main/java/org/apache/polaris/core/config/PolarisConfiguration.java index fd983e6e88..25892fe81e 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/config/PolarisConfiguration.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/config/PolarisConfiguration.java @@ -46,6 +46,7 @@ public abstract class PolarisConfiguration { private final Optional catalogConfigImpl; private final Optional catalogConfigUnsafeImpl; private final Class typ; + private final Optional> validation; /** catalog configs are expected to start with this prefix */ private static final String SAFE_CATALOG_CONFIG_PREFIX = "polaris.config."; @@ -90,13 +91,18 @@ protected PolarisConfiguration( String description, T defaultValue, Optional catalogConfig, - Optional catalogConfigUnsafe) { + Optional catalogConfigUnsafe, + Optional> validation) { this.key = key; this.description = description; this.defaultValue = defaultValue; this.catalogConfigImpl = catalogConfig; this.catalogConfigUnsafeImpl = catalogConfigUnsafe; this.typ = (Class) defaultValue.getClass(); + this.validation = validation; + + // Force validation: + apply(defaultValue); } public boolean hasCatalogConfig() { @@ -110,6 +116,22 @@ public String catalogConfig() { "Attempted to read a catalog config key from a configuration that doesn't have one.")); } + T apply(Object value) { + T result = this.typ.cast(value); + validate(result); + return result; + } + + private void validate(T value) { + this.validation.ifPresent( + v -> { + if (!v.apply(value)) { + throw new IllegalArgumentException( + String.format("Configuration %s has invalid value %s", key, defaultValue)); + } + }); + } + public boolean hasCatalogConfigUnsafe() { return catalogConfigUnsafeImpl.isPresent(); } @@ -121,16 +143,13 @@ public String catalogConfigUnsafe() { "Attempted to read an unsafe catalog config key from a configuration that doesn't have one.")); } - T cast(Object value) { - return this.typ.cast(value); - } - public static class Builder { private String key; private String description; private T defaultValue; private Optional catalogConfig = Optional.empty(); private Optional catalogConfigUnsafe = Optional.empty(); + private Optional> validation = Optional.empty(); public Builder key(String key) { this.key = key; @@ -162,6 +181,11 @@ public Builder catalogConfig(String catalogConfig) { return this; } + public Builder validation(Function validation) { + this.validation = Optional.of(validation); + return this; + } + /** * Used to support backwards compatability before there were reserved properties. Usage of this * method should be removed over time. @@ -177,7 +201,7 @@ public Builder catalogConfigUnsafe(String catalogConfig) { return this; } - private void validateOrThrow() { + private void validateFieldsOrThrow() { if (key == null || description == null || defaultValue == null) { throw new IllegalArgumentException("key, description, and defaultValue are required"); } @@ -187,23 +211,23 @@ private void validateOrThrow() { } public FeatureConfiguration buildFeatureConfiguration() { - validateOrThrow(); + validateFieldsOrThrow(); FeatureConfiguration config = new FeatureConfiguration<>( - key, description, defaultValue, catalogConfig, catalogConfigUnsafe); + key, description, defaultValue, catalogConfig, catalogConfigUnsafe, validation); PolarisConfiguration.registerConfiguration(config); return config; } public BehaviorChangeConfiguration buildBehaviorChangeConfiguration() { - validateOrThrow(); + validateFieldsOrThrow(); if (catalogConfig.isPresent() || catalogConfigUnsafe.isPresent()) { throw new IllegalArgumentException( "catalog configs are not valid for behavior change configs"); } BehaviorChangeConfiguration config = new BehaviorChangeConfiguration<>( - key, description, defaultValue, catalogConfig, catalogConfigUnsafe); + key, description, defaultValue, catalogConfig, catalogConfigUnsafe, validation); PolarisConfiguration.registerConfiguration(config); return config; } diff --git a/polaris-core/src/main/java/org/apache/polaris/core/config/PolarisConfigurationStore.java b/polaris-core/src/main/java/org/apache/polaris/core/config/PolarisConfigurationStore.java index 662a88d72d..299832e9c7 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/config/PolarisConfigurationStore.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/config/PolarisConfigurationStore.java @@ -75,17 +75,17 @@ public interface PolarisConfigurationStore { } if (config.defaultValue instanceof Boolean) { - return config.cast(Boolean.valueOf(String.valueOf(value))); + return config.apply(Boolean.valueOf(String.valueOf(value))); } else if (config.defaultValue instanceof Integer) { - return config.cast(Integer.valueOf(String.valueOf(value))); + return config.apply(Integer.valueOf(String.valueOf(value))); } else if (config.defaultValue instanceof Long) { - return config.cast(Long.valueOf(String.valueOf(value))); + return config.apply(Long.valueOf(String.valueOf(value))); } else if (config.defaultValue instanceof Double) { - return config.cast(Double.valueOf(String.valueOf(value))); + return config.apply(Double.valueOf(String.valueOf(value))); } else if (config.defaultValue instanceof List) { - return config.cast(new ArrayList<>((List) value)); + return config.apply(new ArrayList<>((List) value)); } else { - return config.cast(value); + return config.apply(value); } } diff --git a/polaris-core/src/main/java/org/apache/polaris/core/entity/table/IcebergTableLikeEntity.java b/polaris-core/src/main/java/org/apache/polaris/core/entity/table/IcebergTableLikeEntity.java index 026a569b34..9372e25700 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/entity/table/IcebergTableLikeEntity.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/entity/table/IcebergTableLikeEntity.java @@ -38,6 +38,15 @@ public class IcebergTableLikeEntity extends TableLikeEntity { // of the internalProperties JSON file. public static final String METADATA_LOCATION_KEY = "metadata-location"; + // For applicable types, this key on the "internalProperties" map will return the content of the + // metadata.json file located at `METADATA_CACHE_LOCATION_KEY` + public static final String METADATA_CACHE_CONTENT_KEY = "metadata-cache-content"; + + // For applicable types, this key on the "internalProperties" map will return the location of the + // `metadata.json` that is cached in `METADATA_CACHE_CONTENT_KEY`. This will often match the + // current metadata location in `METADATA_LOCATION_KEY`; if it does not the cache is invalid + public static final String METADATA_CACHE_LOCATION_KEY = "metadata-cache-location"; + public static final String USER_SPECIFIED_WRITE_DATA_LOCATION_KEY = "write.data.path"; public static final String USER_SPECIFIED_WRITE_METADATA_LOCATION_KEY = "write.metadata.path"; @@ -60,6 +69,16 @@ public String getMetadataLocation() { return getInternalPropertiesAsMap().get(METADATA_LOCATION_KEY); } + @JsonIgnore + public String getMetadataCacheContent() { + return getInternalPropertiesAsMap().get(METADATA_CACHE_CONTENT_KEY); + } + + @JsonIgnore + public String getMetadataCacheLocation() { + return getInternalPropertiesAsMap().get(METADATA_CACHE_LOCATION_KEY); + } + @JsonIgnore public Optional getLastAdmittedNotificationTimestamp() { return Optional.ofNullable( @@ -114,6 +133,12 @@ public Builder setMetadataLocation(String location) { return this; } + public Builder setMetadataContent(String location, String content) { + internalProperties.put(METADATA_CACHE_LOCATION_KEY, location); + internalProperties.put(METADATA_CACHE_CONTENT_KEY, content); + return this; + } + public Builder setLastNotificationTimestamp(long timestamp) { internalProperties.put(LAST_ADMITTED_NOTIFICATION_TIMESTAMP_KEY, String.valueOf(timestamp)); return this; diff --git a/polaris-core/src/main/java/org/apache/polaris/core/persistence/cache/EntityWeigher.java b/polaris-core/src/main/java/org/apache/polaris/core/persistence/cache/EntityWeigher.java index b3409d3b90..3677984366 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/persistence/cache/EntityWeigher.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/persistence/cache/EntityWeigher.java @@ -35,7 +35,7 @@ public class EntityWeigher implements Weigher { private static final int APPROXIMATE_ENTITY_OVERHEAD = 1000; /* Represents the amount of bytes that a character is expected to take up */ - private static final int APPROXIMATE_BYTES_PER_CHAR = 3; + public static final int APPROXIMATE_BYTES_PER_CHAR = 3; /** Singleton instance */ private static final EntityWeigher instance = new EntityWeigher(); diff --git a/polaris-core/src/test/java/org/apache/polaris/core/persistence/cache/InMemoryEntityCacheTest.java b/polaris-core/src/test/java/org/apache/polaris/core/persistence/cache/InMemoryEntityCacheTest.java index 72b75ad05b..d85e1d2ad6 100644 --- a/polaris-core/src/test/java/org/apache/polaris/core/persistence/cache/InMemoryEntityCacheTest.java +++ b/polaris-core/src/test/java/org/apache/polaris/core/persistence/cache/InMemoryEntityCacheTest.java @@ -94,7 +94,7 @@ public InMemoryEntityCacheTest() { callCtx = new PolarisCallContext(metaStore, diagServices); metaStoreManager = new TransactionalMetaStoreManagerImpl(); - // bootstrap the mata store with our test schema + // bootstrap the metastore with our test schema tm = new PolarisTestMetaStoreManager(metaStoreManager, callCtx); tm.testCreateTestCatalog(); } diff --git a/polaris-core/src/test/java/org/apache/polaris/service/storage/PolarisConfigurationStoreTest.java b/polaris-core/src/test/java/org/apache/polaris/service/storage/PolarisConfigurationStoreTest.java index 9994b3833f..77fc1e0d06 100644 --- a/polaris-core/src/test/java/org/apache/polaris/service/storage/PolarisConfigurationStoreTest.java +++ b/polaris-core/src/test/java/org/apache/polaris/service/storage/PolarisConfigurationStoreTest.java @@ -72,7 +72,7 @@ public void testConfigsCanBeCastedFromString() { } @Test - public void testInvalidCastThrowsException() { + public void testInvalidApplyThrowsException() { // Bool not included because Boolean.valueOf turns non-boolean strings to false List> configs = List.of(buildConfig("int2", 12), buildConfig("long2", 34L), buildConfig("double2", 5.6D)); diff --git a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/IcebergCatalogHandlerAuthzTest.java b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/IcebergCatalogHandlerAuthzTest.java index 5c488809d1..3ad973a098 100644 --- a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/IcebergCatalogHandlerAuthzTest.java +++ b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/IcebergCatalogHandlerAuthzTest.java @@ -39,6 +39,7 @@ import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.exceptions.ForbiddenException; import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.rest.RESTResponse; import org.apache.iceberg.rest.requests.CommitTransactionRequest; import org.apache.iceberg.rest.requests.CreateNamespaceRequest; import org.apache.iceberg.rest.requests.CreateTableRequest; @@ -48,6 +49,7 @@ import org.apache.iceberg.rest.requests.RenameTableRequest; import org.apache.iceberg.rest.requests.UpdateNamespacePropertiesRequest; import org.apache.iceberg.rest.requests.UpdateTableRequest; +import org.apache.iceberg.rest.responses.LoadTableResponse; import org.apache.iceberg.view.ImmutableSQLViewRepresentation; import org.apache.iceberg.view.ImmutableViewVersion; import org.apache.polaris.core.admin.model.CreateCatalogRequest; @@ -757,6 +759,11 @@ public void testCreateTableStagedWithWriteDelegationInsufficientPermissions() { }); } + private static String getMetadataLocation(RESTResponse resp) { + final String metadataLocation = ((LoadTableResponse) resp).metadataLocation(); + return metadataLocation; + } + @Test public void testRegisterTableAllSufficientPrivileges() { Assertions.assertThat( @@ -770,7 +777,7 @@ public void testRegisterTableAllSufficientPrivileges() { // To get a handy metadata file we can use one from another table. // to avoid overlapping directories, drop the original table and recreate it via registerTable - final String metadataLocation = newWrapper().loadTable(TABLE_NS1_1, "all").metadataLocation(); + final String metadataLocation = getMetadataLocation(newWrapper().loadTable(TABLE_NS1_1, "all")); newWrapper(Set.of(PRINCIPAL_ROLE2)).dropTableWithoutPurge(TABLE_NS1_1); final RegisterTableRequest registerRequest = @@ -808,7 +815,7 @@ public void testRegisterTableInsufficientPermissions() { .isTrue(); // To get a handy metadata file we can use one from another table. - final String metadataLocation = newWrapper().loadTable(TABLE_NS1_1, "all").metadataLocation(); + final String metadataLocation = getMetadataLocation(newWrapper().loadTable(TABLE_NS1_1, "all")); final RegisterTableRequest registerRequest = new RegisterTableRequest() { diff --git a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/IcebergCatalogTest.java b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/IcebergCatalogTest.java index 7ac194ffda..26c657c0bd 100644 --- a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/IcebergCatalogTest.java +++ b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/IcebergCatalogTest.java @@ -58,6 +58,7 @@ import org.apache.iceberg.Table; import org.apache.iceberg.TableMetadata; import org.apache.iceberg.TableMetadataParser; +import org.apache.iceberg.TableOperations; import org.apache.iceberg.UpdateSchema; import org.apache.iceberg.catalog.CatalogTests; import org.apache.iceberg.catalog.Namespace; @@ -127,6 +128,7 @@ import org.apache.polaris.service.events.TestPolarisEventListener; import org.apache.polaris.service.exception.FakeAzureHttpResponse; import org.apache.polaris.service.exception.IcebergExceptionMapper; +import org.apache.polaris.service.persistence.MetadataCacheManager; import org.apache.polaris.service.quarkus.config.QuarkusReservedProperties; import org.apache.polaris.service.quarkus.test.TestData; import org.apache.polaris.service.storage.PolarisStorageIntegrationProviderImpl; @@ -181,7 +183,9 @@ public Map getConfigOverrides() { "polaris.event-listener.type", "test", "polaris.readiness.ignore-severe-issues", - "true"); + "true", + "polaris.features." + FeatureConfiguration.METADATA_CACHE_MAX_BYTES.key, + String.valueOf(FeatureConfiguration.Constants.METADATA_CACHE_MAX_BYTES_INFINITE_CACHING)); } } @@ -222,6 +226,7 @@ public Map getConfigOverrides() { private PolarisEntityManager entityManager; private FileIOFactory fileIOFactory; private PolarisEntity catalogEntity; + private PolarisResolutionManifestCatalogView passthroughView; private SecurityContext securityContext; private TestPolarisEventListener testPolarisEventListener; private ReservedProperties reservedProperties; @@ -340,6 +345,7 @@ public void before(TestInfo testInfo) { .setStorageType(StorageConfigInfo.StorageTypeEnum.S3) .setAllowedLocations(List.of(storageLocation, "s3://externally-owned-bucket")) .build(); + catalogEntity = adminService.createCatalog( new CreateCatalogRequest( @@ -358,6 +364,10 @@ public void before(TestInfo testInfo) { .build() .asCatalog())); + passthroughView = + new PolarisPassthroughResolutionView( + callContext, entityManager, securityContext, CATALOG_NAME); + RealmEntityManagerFactory realmEntityManagerFactory = new RealmEntityManagerFactory(createMockMetaStoreManagerFactory()); this.fileIOFactory = @@ -1841,6 +1851,63 @@ public FileIO loadFileIO( Assertions.assertThat(measured.getNumDeletedFiles()).as("A table was deleted").isGreaterThan(0); } + private Schema buildSchema(int fields) { + Types.NestedField[] fieldsArray = new Types.NestedField[fields]; + for (int i = 0; i < fields; i++) { + fieldsArray[i] = Types.NestedField.optional(i, "field_" + i, Types.IntegerType.get()); + } + return new Schema(fieldsArray); + } + + @Test + public void testMetadataCachingWithManualFallback() { + Namespace namespace = Namespace.of("manual-namespace"); + TableIdentifier tableIdentifier = TableIdentifier.of(namespace, "t1"); + + Schema schema = buildSchema(10); + + catalog.createNamespace(namespace); + catalog.createTable(tableIdentifier, schema); + TableMetadata originalMetadata = catalog.loadTableMetadata(tableIdentifier); + + TableMetadata cachedMetadata = + MetadataCacheManager.loadTableMetadata( + tableIdentifier, + Integer.MAX_VALUE, + polarisContext, + metaStoreManager, + passthroughView, + () -> { + throw new IllegalStateException("Fell back even though a cache entry should exist!"); + }); + + // The content should match what was cached + Assertions.assertThat(TableMetadataParser.toJson(cachedMetadata)) + .isEqualTo(TableMetadataParser.toJson(originalMetadata)); + + // Update the table + TableOperations tableOps = catalog.newTableOps(tableIdentifier); + TableMetadata updatedMetadata = tableOps.current().updateSchema(buildSchema(100)); + tableOps.commit(tableOps.current(), updatedMetadata); + + // Read from the cache; it should detect a change due to the update and load the new fallback + TableMetadata reloadedMetadata = + MetadataCacheManager.loadTableMetadata( + tableIdentifier, + Integer.MAX_VALUE, + polarisContext, + metaStoreManager, + passthroughView, + () -> { + throw new IllegalStateException( + "Fell back even though a cache entry should be updated on write"); + }); + + Assertions.assertThat(TableMetadataParser.toJson(reloadedMetadata)) + .isNotSameAs(TableMetadataParser.toJson(cachedMetadata)); + Assertions.assertThat(reloadedMetadata.schema().columns().size()).isEqualTo(100); + } + @Test public void testRegisterTableWithSlashlessMetadataLocation() { IcebergCatalog catalog = catalog(); diff --git a/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java b/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java index e39a4d0acb..997874fe0f 100644 --- a/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java +++ b/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java @@ -33,7 +33,6 @@ import java.util.Arrays; import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Locale; import java.util.Map; @@ -44,6 +43,7 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import java.util.function.Predicate; +import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.commons.lang3.exception.ExceptionUtils; @@ -104,6 +104,7 @@ import org.apache.polaris.core.persistence.PolarisMetaStoreManager; import org.apache.polaris.core.persistence.PolarisResolvedPathWrapper; import org.apache.polaris.core.persistence.ResolvedPolarisEntity; +import org.apache.polaris.core.persistence.cache.EntityWeigher; import org.apache.polaris.core.persistence.dao.entity.BaseResult; import org.apache.polaris.core.persistence.dao.entity.DropEntityResult; import org.apache.polaris.core.persistence.dao.entity.EntityResult; @@ -134,6 +135,7 @@ import org.apache.polaris.service.events.BeforeViewCommitedEvent; import org.apache.polaris.service.events.BeforeViewRefreshedEvent; import org.apache.polaris.service.events.PolarisEventListener; +import org.apache.polaris.service.persistence.MetadataCacheManager; import org.apache.polaris.service.task.TaskExecutor; import org.apache.polaris.service.types.NotificationRequest; import org.apache.polaris.service.types.NotificationType; @@ -342,8 +344,9 @@ public TableOperations newTableOps( return new BasePolarisTableOperations(getIo(), tableIdentifier, makeMetadataCurrentOnCommit); } + @VisibleForTesting @Override - protected TableOperations newTableOps(TableIdentifier tableIdentifier) { + public TableOperations newTableOps(TableIdentifier tableIdentifier) { boolean makeMetadataCurrentOnCommit = getCurrentPolarisContext() .getConfigurationStore() @@ -379,32 +382,6 @@ private String defaultNamespaceLocation(Namespace namespace) { } } - private Set getLocationsAllowedToBeAccessed(TableMetadata tableMetadata) { - Set locations = new HashSet<>(); - locations.add(tableMetadata.location()); - if (tableMetadata - .properties() - .containsKey(IcebergTableLikeEntity.USER_SPECIFIED_WRITE_DATA_LOCATION_KEY)) { - locations.add( - tableMetadata - .properties() - .get(IcebergTableLikeEntity.USER_SPECIFIED_WRITE_DATA_LOCATION_KEY)); - } - if (tableMetadata - .properties() - .containsKey(IcebergTableLikeEntity.USER_SPECIFIED_WRITE_METADATA_LOCATION_KEY)) { - locations.add( - tableMetadata - .properties() - .get(IcebergTableLikeEntity.USER_SPECIFIED_WRITE_METADATA_LOCATION_KEY)); - } - return locations; - } - - private Set getLocationsAllowedToBeAccessed(ViewMetadata viewMetadata) { - return Set.of(viewMetadata.location()); - } - @Override public boolean dropTable(TableIdentifier tableIdentifier, boolean purge) { TableOperations ops = newTableOps(tableIdentifier); @@ -871,7 +848,7 @@ public boolean sendNotification( @Override public AccessConfig getAccessConfig( TableIdentifier tableIdentifier, - TableMetadata tableMetadata, + Set locationsAllowedToBeAccessed, Set storageActions) { Optional storageInfo = findStorageInfo(tableIdentifier); if (storageInfo.isEmpty()) { @@ -887,11 +864,43 @@ public AccessConfig getAccessConfig( getCredentialVendor(), callContext.getPolarisCallContext().getConfigurationStore(), tableIdentifier, - getLocationsAllowedToBeAccessed(tableMetadata), + locationsAllowedToBeAccessed, storageActions, storageInfo.get()); } + public TableMetadata loadTableMetadata(TableIdentifier identifier) { + int maxMetadataCacheBytes = + callContext + .getPolarisCallContext() + .getConfigurationStore() + .getConfiguration( + callContext.getPolarisCallContext(), FeatureConfiguration.METADATA_CACHE_MAX_BYTES); + Supplier fallback = + () -> { + return loadTableMetadata(loadTable(identifier)); + }; + if (maxMetadataCacheBytes + == FeatureConfiguration.Constants.METADATA_CACHE_MAX_BYTES_NO_CACHING) { + return fallback.get(); + } else { + return MetadataCacheManager.loadTableMetadata( + identifier, + maxMetadataCacheBytes, + callContext.getPolarisCallContext(), + metaStoreManager, + resolvedEntityView, + fallback); + } + } + + private static TableMetadata loadTableMetadata(Table table) { + if (table instanceof BaseTable baseTable) { + return baseTable.operations().current(); + } + throw new IllegalArgumentException("Cannot load metadata for " + table.name()); + } + /** * Based on configuration settings, for callsites that need to handle potentially setting a new * base location for a TableLike entity, produces the transformed location if applicable, or else @@ -1257,22 +1266,33 @@ public TableMetadata refresh() { return current(); } + /** + * With metadata caching, the `base` may not be exactly `current()` by reference so we compare + * locations instead + */ @Override public void commit(TableMetadata base, TableMetadata metadata) { // if the metadata is already out of date, reject it - if (base != current()) { - if (base != null) { - throw new CommitFailedException("Cannot commit: stale table metadata"); - } else { + if (base == null) { + if (current() != null) { // when current is non-null, the table exists. but when base is null, the commit is trying // to create the table throw new AlreadyExistsException("Table already exists: %s", fullTableName); } + } else if (current() != null + && !current().metadataFileLocation().equals(base.metadataFileLocation())) { + throw new CommitFailedException("Cannot commit: stale table metadata"); } // if the metadata is not changed, return early if (base == metadata) { LOGGER.info("Nothing to commit."); return; + } else if (base != null + && metadata != null + && base.metadataFileLocation().equals(metadata.metadataFileLocation())) { + // if the metadata is not changed, return early + LOGGER.info("Nothing to commit."); + return; } long start = System.currentTimeMillis(); @@ -1379,7 +1399,7 @@ public void doCommit(TableMetadata base, TableMetadata metadata) { tableFileIO = loadFileIOForTableLike( tableIdentifier, - getLocationsAllowedToBeAccessed(metadata), + IcebergMetadataUtil.getLocationsAllowedToBeAccessed(metadata), resolvedStorageEntity, new HashMap<>(metadata.properties()), Set.of( @@ -1402,24 +1422,7 @@ public void doCommit(TableMetadata base, TableMetadata metadata) { .get(IcebergTableLikeEntity.USER_SPECIFIED_WRITE_DATA_LOCATION_KEY))) { // If location is changing then we must validate that the requested location is valid // for the storage configuration inherited under this entity's path. - Set dataLocations = new HashSet<>(); - dataLocations.add(metadata.location()); - if (metadata.properties().get(IcebergTableLikeEntity.USER_SPECIFIED_WRITE_DATA_LOCATION_KEY) - != null) { - dataLocations.add( - metadata - .properties() - .get(IcebergTableLikeEntity.USER_SPECIFIED_WRITE_DATA_LOCATION_KEY)); - } - if (metadata - .properties() - .get(IcebergTableLikeEntity.USER_SPECIFIED_WRITE_METADATA_LOCATION_KEY) - != null) { - dataLocations.add( - metadata - .properties() - .get(IcebergTableLikeEntity.USER_SPECIFIED_WRITE_METADATA_LOCATION_KEY)); - } + Set dataLocations = IcebergMetadataUtil.getLocationsAllowedToBeAccessed(metadata); validateLocationsForTableLike(tableIdentifier, dataLocations, resolvedStorageEntity); // also validate that the table location doesn't overlap an existing table dataLocations.forEach( @@ -1461,24 +1464,51 @@ public void doCommit(TableMetadata base, TableMetadata metadata) { IcebergTableLikeEntity entity = IcebergTableLikeEntity.of(resolvedPath == null ? null : resolvedPath.getRawLeafEntity()); String existingLocation; + int maxMetadataCacheBytes = + callContext + .getPolarisCallContext() + .getConfigurationStore() + .getConfiguration( + callContext.getPolarisCallContext(), + catalogEntity, + FeatureConfiguration.METADATA_CACHE_MAX_BYTES); + Optional metadataJsonToCache = + switch (maxMetadataCacheBytes) { + case FeatureConfiguration.Constants.METADATA_CACHE_MAX_BYTES_NO_CACHING -> { + yield Optional.empty(); + } + case FeatureConfiguration.Constants.METADATA_CACHE_MAX_BYTES_INFINITE_CACHING -> { + yield Optional.of(TableMetadataParser.toJson(metadata)); + } + default -> { + String rawMetadataJson = TableMetadataParser.toJson(metadata); + if (rawMetadataJson.length() * EntityWeigher.APPROXIMATE_BYTES_PER_CHAR + < maxMetadataCacheBytes) { + yield Optional.of(rawMetadataJson); + } else { + yield Optional.empty(); + } + } + }; + final IcebergTableLikeEntity.Builder builder; if (null == entity) { existingLocation = null; - entity = + builder = new IcebergTableLikeEntity.Builder(tableIdentifier, newLocation) .setCatalogId(getCatalogId()) .setSubType(PolarisEntitySubType.ICEBERG_TABLE) .setBaseLocation(metadata.location()) .setId( - getMetaStoreManager().generateNewEntityId(getCurrentPolarisContext()).getId()) - .build(); + getMetaStoreManager().generateNewEntityId(getCurrentPolarisContext()).getId()); } else { existingLocation = entity.getMetadataLocation(); - entity = + builder = new IcebergTableLikeEntity.Builder(entity) .setBaseLocation(metadata.location()) - .setMetadataLocation(newLocation) - .build(); + .setMetadataLocation(newLocation); } + metadataJsonToCache.ifPresent(s -> builder.setMetadataContent(newLocation, s)); + entity = builder.build(); if (!Objects.equal(existingLocation, oldLocation)) { if (null == base) { throw new AlreadyExistsException("Table already exists: %s", fullTableName); @@ -1496,12 +1526,7 @@ public void doCommit(TableMetadata base, TableMetadata metadata) { // We diverge from `BaseMetastoreTableOperations` in the below code block if (makeMetadataCurrentOnCommit) { - currentMetadata = - TableMetadata.buildFrom(metadata) - .withMetadataLocation(newLocation) - .discardChanges() - .build(); - currentMetadataLocation = newLocation; + setCurrentMetadata(newLocation, metadata); } if (null == existingLocation) { @@ -1514,6 +1539,15 @@ public void doCommit(TableMetadata base, TableMetadata metadata) { new AfterTableCommitedEvent(tableIdentifier, base, metadata)); } + public void setCurrentMetadata(String metadataLocation, TableMetadata metadata) { + currentMetadata = + TableMetadata.buildFrom(metadata) + .withMetadataLocation(metadataLocation) + .discardChanges() + .build(); + currentMetadataLocation = metadataLocation; + } + @Override public TableOperations temp(TableMetadata uncommittedMetadata) { return new TableOperations() { @@ -1780,7 +1814,7 @@ public void doCommit(ViewMetadata base, ViewMetadata metadata) { viewFileIO = loadFileIOForTableLike( identifier, - getLocationsAllowedToBeAccessed(metadata), + IcebergMetadataUtil.getLocationsAllowedToBeAccessed(metadata), resolvedStorageEntity, tableProperties, Set.of(PolarisStorageActions.READ, PolarisStorageActions.WRITE)); @@ -2289,6 +2323,7 @@ private void updateTableLike(TableIdentifier identifier, PolarisEntity entity) { } } + // Drop the table: return getMetaStoreManager() .dropEntityIfExists( getCurrentPolarisContext(), diff --git a/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogAdapter.java b/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogAdapter.java index c28fd491a6..ef5c5a7fd8 100644 --- a/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogAdapter.java +++ b/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogAdapter.java @@ -32,6 +32,7 @@ import jakarta.ws.rs.core.Response; import jakarta.ws.rs.core.SecurityContext; import java.util.EnumSet; +import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; @@ -43,8 +44,10 @@ import org.apache.iceberg.exceptions.NotAuthorizedException; import org.apache.iceberg.exceptions.NotFoundException; import org.apache.iceberg.rest.Endpoint; +import org.apache.iceberg.rest.RESTResponse; import org.apache.iceberg.rest.RESTUtil; import org.apache.iceberg.rest.ResourcePaths; +import org.apache.iceberg.rest.credentials.Credential; import org.apache.iceberg.rest.requests.CommitTransactionRequest; import org.apache.iceberg.rest.requests.CreateNamespaceRequest; import org.apache.iceberg.rest.requests.CreateTableRequest; @@ -252,15 +255,19 @@ public Response loadNamespaceMetadata( * unable to get metadata location and logs a warning. */ private Response.ResponseBuilder tryInsertETagHeader( - Response.ResponseBuilder builder, - LoadTableResponse response, - String namespace, - String tableName) { - if (response.metadataLocation() != null) { + Response.ResponseBuilder builder, RESTResponse response, String namespace, String tableName) { + final String metadataLocation; + if (response instanceof LoadTableResponse loadTableResponse) { + metadataLocation = loadTableResponse.metadataLocation(); + } else { + throw new IllegalStateException("Cannot build etag from: " + response); + } + + if (metadataLocation != null) { builder = builder.header( HttpHeaders.ETAG, - IcebergHttpUtil.generateETagForMetadataFileLocation(response.metadataLocation())); + IcebergHttpUtil.generateETagForMetadataFileLocation(metadataLocation)); } else { LOGGER .atWarn() @@ -361,7 +368,7 @@ public Response createTable( Response.ok(response), response, namespace, createTableRequest.name()) .build(); } else { - LoadTableResponse response = + RESTResponse response = catalog.createTableDirectWithWriteDelegation(ns, createTableRequest); return tryInsertETagHeader( Response.ok(response), response, namespace, createTableRequest.name()) @@ -410,7 +417,7 @@ public Response loadTable( securityContext, prefix, catalog -> { - LoadTableResponse response; + RESTResponse response; if (delegationModes.isEmpty()) { response = @@ -586,12 +593,15 @@ public Response loadCredentials( securityContext, prefix, catalog -> { - LoadTableResponse loadTableResponse = - catalog.loadTableWithAccessDelegation(tableIdentifier, "all"); + RESTResponse restResponse = catalog.loadTableWithAccessDelegation(tableIdentifier, "all"); + final List credentials; + if (restResponse instanceof LoadTableResponse loadTableResponse) { + credentials = loadTableResponse.credentials(); + } else { + throw new IllegalStateException("Cannot extract credentials from " + restResponse); + } return Response.ok( - ImmutableLoadCredentialsResponse.builder() - .credentials(loadTableResponse.credentials()) - .build()) + ImmutableLoadCredentialsResponse.builder().credentials(credentials).build()) .build(); }); } diff --git a/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java b/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java index 972f6871ea..ced5545b1c 100644 --- a/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java +++ b/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java @@ -57,6 +57,7 @@ import org.apache.iceberg.hadoop.HadoopCatalog; import org.apache.iceberg.rest.HTTPClient; import org.apache.iceberg.rest.RESTCatalog; +import org.apache.iceberg.rest.RESTResponse; import org.apache.iceberg.rest.credentials.ImmutableCredential; import org.apache.iceberg.rest.requests.CommitTransactionRequest; import org.apache.iceberg.rest.requests.CreateNamespaceRequest; @@ -404,7 +405,7 @@ public LoadTableResponse createTableDirect(Namespace namespace, CreateTableReque * @param request the table creation request * @return ETagged {@link LoadTableResponse} to uniquely identify the table metadata */ - public LoadTableResponse createTableDirectWithWriteDelegation( + public RESTResponse createTableDirectWithWriteDelegation( Namespace namespace, CreateTableRequest request) { PolarisAuthorizableOperation op = PolarisAuthorizableOperation.CREATE_TABLE_DIRECT_WITH_WRITE_DELEGATION; @@ -439,23 +440,17 @@ public LoadTableResponse createTableDirectWithWriteDelegation( .withSortOrder(request.writeOrder()) .withProperties(properties) .create(); - if (table instanceof BaseTable baseTable) { TableMetadata tableMetadata = baseTable.operations().current(); return buildLoadTableResponseWithDelegationCredentials( - tableIdentifier, - tableMetadata, - Set.of( - PolarisStorageActions.READ, - PolarisStorageActions.WRITE, - PolarisStorageActions.LIST), - SNAPSHOTS_ALL) - .build(); + tableIdentifier, + tableMetadata, + Set.of( + PolarisStorageActions.READ, PolarisStorageActions.WRITE, PolarisStorageActions.LIST)); } else if (table instanceof BaseMetadataTable) { // metadata tables are loaded on the client side, return NoSuchTableException for now throw new NoSuchTableException("Table does not exist: %s", tableIdentifier.toString()); } - throw new IllegalStateException("Cannot wrap catalog that does not produce BaseTable"); } @@ -520,7 +515,7 @@ public LoadTableResponse createTableStaged(Namespace namespace, CreateTableReque return LoadTableResponse.builder().withTableMetadata(metadata).build(); } - public LoadTableResponse createTableStagedWithWriteDelegation( + public RESTResponse createTableStagedWithWriteDelegation( Namespace namespace, CreateTableRequest request) { PolarisAuthorizableOperation op = PolarisAuthorizableOperation.CREATE_TABLE_STAGED_WITH_WRITE_DELEGATION; @@ -540,8 +535,7 @@ public LoadTableResponse createTableStagedWithWriteDelegation( TableMetadata metadata = stageTableCreateHelper(namespace, request); return buildLoadTableResponseWithDelegationCredentials( - ident, metadata, Set.of(PolarisStorageActions.ALL), SNAPSHOTS_ALL) - .build(); + ident, metadata, Set.of(PolarisStorageActions.ALL)); } /** @@ -605,11 +599,10 @@ public boolean sendNotification(TableIdentifier identifier, NotificationRequest */ private IcebergTableLikeEntity getTableEntity(TableIdentifier tableIdentifier) { PolarisResolvedPathWrapper target = resolutionManifest.getResolvedPath(tableIdentifier); - return IcebergTableLikeEntity.of(target.getRawLeafEntity()); } - public LoadTableResponse loadTable(TableIdentifier tableIdentifier, String snapshots) { + public RESTResponse loadTable(TableIdentifier tableIdentifier, String snapshots) { return loadTableIfStale(tableIdentifier, null, snapshots).get(); } @@ -623,7 +616,7 @@ public LoadTableResponse loadTable(TableIdentifier tableIdentifier, String snaps * @return {@link Optional#empty()} if the ETag is current, an {@link Optional} containing the * load table response, otherwise */ - public Optional loadTableIfStale( + public Optional loadTableIfStale( TableIdentifier tableIdentifier, IfNoneMatch ifNoneMatch, String snapshots) { PolarisAuthorizableOperation op = PolarisAuthorizableOperation.LOAD_TABLE; authorizeBasicTableLikeOperationOrThrow( @@ -649,11 +642,17 @@ public Optional loadTableIfStale( } } - LoadTableResponse rawResponse = catalogHandlerUtils.loadTable(baseCatalog, tableIdentifier); + LoadTableResponse rawResponse; + if (baseCatalog instanceof IcebergCatalog icebergCatalog) { + TableMetadata metadata = icebergCatalog.loadTableMetadata(tableIdentifier); + rawResponse = LoadTableResponse.builder().withTableMetadata(metadata).build(); + } else { + rawResponse = catalogHandlerUtils.loadTable(baseCatalog, tableIdentifier); + } return Optional.of(filterResponseToSnapshots(rawResponse, snapshots)); } - public LoadTableResponse loadTableWithAccessDelegation( + public RESTResponse loadTableWithAccessDelegation( TableIdentifier tableIdentifier, String snapshots) { return loadTableWithAccessDelegationIfStale(tableIdentifier, null, snapshots).get(); } @@ -668,7 +667,7 @@ public LoadTableResponse loadTableWithAccessDelegation( * @return {@link Optional#empty()} if the ETag is current, an {@link Optional} containing the * load table response, otherwise */ - public Optional loadTableWithAccessDelegationIfStale( + public Optional loadTableWithAccessDelegationIfStale( TableIdentifier tableIdentifier, IfNoneMatch ifNoneMatch, String snapshots) { // Here we have a single method that falls through multiple candidate // PolarisAuthorizableOperations because instead of identifying the desired operation up-front @@ -740,30 +739,26 @@ public Optional loadTableWithAccessDelegationIfStale( } } } + TableMetadata metadata = null; + if (baseCatalog instanceof IcebergCatalog icebergCatalog) { + metadata = icebergCatalog.loadTableMetadata(tableIdentifier); + } - // TODO: Find a way for the configuration or caller to better express whether to fail or omit - // when data-access is specified but access delegation grants are not found. - Table table = baseCatalog.loadTable(tableIdentifier); - - if (table instanceof BaseTable baseTable) { - TableMetadata tableMetadata = baseTable.operations().current(); + // The metadata failed to load + if (metadata == null) { + throw new NoSuchTableException("Table does not exist: %s", tableIdentifier.toString()); + } else { + metadata = filterMetadataToSnapshots(metadata, snapshots); return Optional.of( buildLoadTableResponseWithDelegationCredentials( - tableIdentifier, tableMetadata, actionsRequested, snapshots) - .build()); - } else if (table instanceof BaseMetadataTable) { - // metadata tables are loaded on the client side, return NoSuchTableException for now - throw new NoSuchTableException("Table does not exist: %s", tableIdentifier.toString()); + tableIdentifier, metadata, actionsRequested)); } - - throw new IllegalStateException("Cannot wrap catalog that does not produce BaseTable"); } - private LoadTableResponse.Builder buildLoadTableResponseWithDelegationCredentials( + private RESTResponse buildLoadTableResponseWithDelegationCredentials( TableIdentifier tableIdentifier, TableMetadata tableMetadata, - Set actions, - String snapshots) { + Set actions) { LoadTableResponse.Builder responseBuilder = LoadTableResponse.builder().withTableMetadata(tableMetadata); if (baseCatalog instanceof SupportsCredentialDelegation credentialDelegation) { @@ -785,7 +780,7 @@ private LoadTableResponse.Builder buildLoadTableResponseWithDelegationCredential .build()); } } - return responseBuilder; + return responseBuilder.build(); } private UpdateTableRequest applyUpdateFilters(UpdateTableRequest request) { @@ -879,8 +874,11 @@ public void tableExists(TableIdentifier tableIdentifier) { authorizeBasicTableLikeOperationOrThrow( op, PolarisEntitySubType.ICEBERG_TABLE, tableIdentifier); - // TODO: Just skip CatalogHandlers for this one maybe - catalogHandlerUtils.loadTable(baseCatalog, tableIdentifier); + if (baseCatalog instanceof IcebergCatalog icebergCatalog) { + icebergCatalog.loadTableMetadata(tableIdentifier); + } else { + catalogHandlerUtils.loadTable(baseCatalog, tableIdentifier); + } } public void renameTable(RenameTableRequest request) { @@ -938,20 +936,34 @@ public void commitTransaction(CommitTransactionRequest commitTransactionRequest) commitTransactionRequest.tableChanges().stream() .forEach( change -> { - Table table = baseCatalog.loadTable(change.identifier()); - if (!(table instanceof BaseTable)) { - throw new IllegalStateException( - "Cannot wrap catalog that does not produce BaseTable"); + // TODO we are still loading metadata redundantly against the same table + final TableMetadata currentMetadata; + final TableOperations tableOps; + if (baseCatalog instanceof IcebergCatalog icebergCatalog) { + tableOps = icebergCatalog.newTableOps(change.identifier()); + currentMetadata = icebergCatalog.loadTableMetadata(change.identifier()); + // Update tableOps.current() to reflect the cached metadata + if (tableOps instanceof IcebergCatalog.BasePolarisTableOperations polarisOps) { + polarisOps.setCurrentMetadata( + currentMetadata.metadataFileLocation(), currentMetadata); + polarisOps.shouldRefresh = false; + } + } else { + final Table table = baseCatalog.loadTable(change.identifier()); + if (!(table instanceof BaseTable)) { + throw new IllegalStateException( + "Cannot wrap catalog that does not produce BaseTable"); + } + tableOps = ((BaseTable) table).operations(); + currentMetadata = tableOps.current(); } + if (isCreate(change)) { throw new BadRequestException( "Unsupported operation: commitTranaction with updateForStagedCreate: %s", change); } - TableOperations tableOps = ((BaseTable) table).operations(); - TableMetadata currentMetadata = tableOps.current(); - // Validate requirements; any CommitFailedExceptions will fail the overall request change.requirements().forEach(requirement -> requirement.validate(currentMetadata)); @@ -1105,31 +1117,32 @@ public void renameView(RenameTableRequest request) { catalogHandlerUtils.renameView(viewCatalog, request); } - private @Nonnull LoadTableResponse filterResponseToSnapshots( - LoadTableResponse loadTableResponse, String snapshots) { + private @Nonnull TableMetadata filterMetadataToSnapshots( + TableMetadata metadata, String snapshots) { if (snapshots == null || snapshots.equalsIgnoreCase(SNAPSHOTS_ALL)) { - return loadTableResponse; + return metadata; } else if (snapshots.equalsIgnoreCase(SNAPSHOTS_REFS)) { - TableMetadata metadata = loadTableResponse.tableMetadata(); - Set referencedSnapshotIds = metadata.refs().values().stream() .map(SnapshotRef::snapshotId) .collect(Collectors.toSet()); - - TableMetadata filteredMetadata = - metadata.removeSnapshotsIf(s -> !referencedSnapshotIds.contains(s.snapshotId())); - - return LoadTableResponse.builder() - .withTableMetadata(filteredMetadata) - .addAllConfig(loadTableResponse.config()) - .addAllCredentials(loadTableResponse.credentials()) - .build(); + return metadata.removeSnapshotsIf(s -> !referencedSnapshotIds.contains(s.snapshotId())); } else { throw new IllegalArgumentException("Unrecognized snapshots: " + snapshots); } } + private @Nonnull LoadTableResponse filterResponseToSnapshots( + LoadTableResponse loadTableResponse, String snapshots) { + TableMetadata filteredMetadata = + filterMetadataToSnapshots(loadTableResponse.tableMetadata(), snapshots); + return LoadTableResponse.builder() + .withTableMetadata(filteredMetadata) + .addAllConfig(loadTableResponse.config()) + .addAllCredentials(loadTableResponse.credentials()) + .build(); + } + @Override public void close() throws Exception { if (baseCatalog instanceof Closeable closeable) { diff --git a/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergMetadataUtil.java b/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergMetadataUtil.java new file mode 100644 index 0000000000..aefcf9d3a9 --- /dev/null +++ b/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergMetadataUtil.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.service.catalog.iceberg; + +import java.util.HashSet; +import java.util.Set; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.view.ViewMetadata; +import org.apache.polaris.core.entity.table.IcebergTableLikeEntity; + +/** A collection of util methods related to handling Iceberg table & view metadata */ +public class IcebergMetadataUtil { + + public static Set getLocationsAllowedToBeAccessed(TableMetadata tableMetadata) { + Set locations = new HashSet<>(); + locations.add(tableMetadata.location()); + if (tableMetadata + .properties() + .containsKey(IcebergTableLikeEntity.USER_SPECIFIED_WRITE_DATA_LOCATION_KEY)) { + locations.add( + tableMetadata + .properties() + .get(IcebergTableLikeEntity.USER_SPECIFIED_WRITE_DATA_LOCATION_KEY)); + } + if (tableMetadata + .properties() + .containsKey(IcebergTableLikeEntity.USER_SPECIFIED_WRITE_METADATA_LOCATION_KEY)) { + locations.add( + tableMetadata + .properties() + .get(IcebergTableLikeEntity.USER_SPECIFIED_WRITE_METADATA_LOCATION_KEY)); + } + return locations; + } + + public static Set getLocationsAllowedToBeAccessed(ViewMetadata viewMetadata) { + return Set.of(viewMetadata.location()); + } +} diff --git a/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/SupportsCredentialDelegation.java b/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/SupportsCredentialDelegation.java index 21ec380eb0..e191ba77cd 100644 --- a/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/SupportsCredentialDelegation.java +++ b/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/SupportsCredentialDelegation.java @@ -32,8 +32,18 @@ * configuration. */ public interface SupportsCredentialDelegation { - AccessConfig getAccessConfig( + default AccessConfig getAccessConfig( TableIdentifier tableIdentifier, TableMetadata tableMetadata, + Set storageActions) { + return getAccessConfig( + tableIdentifier, + IcebergMetadataUtil.getLocationsAllowedToBeAccessed(tableMetadata), + storageActions); + } + + AccessConfig getAccessConfig( + TableIdentifier tableIdentifier, + Set locationsAllowedToBeAccessed, Set storageActions); } diff --git a/service/common/src/main/java/org/apache/polaris/service/persistence/MetadataCacheManager.java b/service/common/src/main/java/org/apache/polaris/service/persistence/MetadataCacheManager.java new file mode 100644 index 0000000000..664f659b36 --- /dev/null +++ b/service/common/src/main/java/org/apache/polaris/service/persistence/MetadataCacheManager.java @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.service.persistence; + +import java.util.Map; +import java.util.function.Supplier; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.TableMetadataParser; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.polaris.core.PolarisCallContext; +import org.apache.polaris.core.config.FeatureConfiguration; +import org.apache.polaris.core.entity.PolarisEntity; +import org.apache.polaris.core.entity.PolarisEntitySubType; +import org.apache.polaris.core.entity.PolarisEntityType; +import org.apache.polaris.core.entity.table.IcebergTableLikeEntity; +import org.apache.polaris.core.entity.table.TableLikeEntity; +import org.apache.polaris.core.persistence.PolarisMetaStoreManager; +import org.apache.polaris.core.persistence.PolarisResolvedPathWrapper; +import org.apache.polaris.core.persistence.dao.entity.EntityResult; +import org.apache.polaris.core.persistence.resolver.PolarisResolutionManifestCatalogView; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Contains utility methods related to storing TableMetadata in the metastore and retrieving it from + * the metastore + */ +public class MetadataCacheManager { + private static final Logger LOGGER = LoggerFactory.getLogger(MetadataCacheManager.class); + + /** + * Load the cached metadata.json content and location or fall back to `fallback` if one doesn't + * exist. If the metadata is not currently cached, it may be added to the cache. + */ + public static TableMetadata loadTableMetadata( + TableIdentifier tableIdentifier, + int maxBytesToCache, + PolarisCallContext callContext, + PolarisMetaStoreManager metastoreManager, + PolarisResolutionManifestCatalogView resolvedEntityView, + Supplier fallback) { + PolarisResolvedPathWrapper resolvedEntities = + resolvedEntityView.getResolvedPath( + tableIdentifier, PolarisEntityType.TABLE_LIKE, PolarisEntitySubType.ICEBERG_TABLE); + // If the table doesn't exist, just fall back fast + if (resolvedEntities == null) { + return fallback.get(); + } + LOGGER.debug(String.format("Loading cached metadata for %s", tableIdentifier)); + IcebergTableLikeEntity tableLikeEntity = + IcebergTableLikeEntity.of(resolvedEntities.getRawLeafEntity()); + Map tableEntityProperties = tableLikeEntity.getInternalPropertiesAsMap(); + String entityLocation = tableEntityProperties.get(IcebergTableLikeEntity.METADATA_LOCATION_KEY); + String cacheContent = + tableEntityProperties.get(IcebergTableLikeEntity.METADATA_CACHE_CONTENT_KEY); + String cacheLocation = + tableEntityProperties.get(IcebergTableLikeEntity.METADATA_CACHE_LOCATION_KEY); + if (cacheContent != null && !cacheContent.isEmpty() && entityLocation.equals(cacheLocation)) { + LOGGER.debug(String.format("Using cached metadata for %s", tableIdentifier)); + TableMetadata tableMetadata = TableMetadataParser.fromJson(cacheContent); + return TableMetadata.buildFrom(tableMetadata).withMetadataLocation(cacheLocation).build(); + } else { + TableMetadata fallbackMetadata = fallback.get(); + var cacheResult = + cacheTableMetadata( + tableLikeEntity, + fallbackMetadata, + maxBytesToCache, + callContext, + metastoreManager, + resolvedEntityView); + if (!cacheResult.isSuccess()) { + LOGGER.debug(String.format("Failed to cache metadata for %s", tableIdentifier)); + } + return fallbackMetadata; + } + } + + /** + * Attempt to add table metadata to the cache + * + * @return The result of trying to cache the metadata + */ + private static EntityResult cacheTableMetadata( + IcebergTableLikeEntity tableLikeEntity, + TableMetadata tableMetadata, + int maxBytesToCache, + PolarisCallContext callContext, + PolarisMetaStoreManager metaStoreManager, + PolarisResolutionManifestCatalogView resolvedEntityView) { + String metadataString = TableMetadataParser.toJson(tableMetadata); + if (maxBytesToCache + != FeatureConfiguration.Constants.METADATA_CACHE_MAX_BYTES_INFINITE_CACHING) { + if (metadataString.length() * 2 > maxBytesToCache) { + LOGGER.debug( + String.format( + "Will not cache metadata for %s; metadata above the limit of %d bytes", + tableLikeEntity.getTableIdentifier(), maxBytesToCache)); + return new EntityResult(EntityResult.ReturnStatus.SUCCESS, null); + } + } + + LOGGER.debug(String.format("Caching metadata for %s", tableLikeEntity.getTableIdentifier())); + TableLikeEntity newTableLikeEntity = + new IcebergTableLikeEntity.Builder(tableLikeEntity) + .setMetadataContent(tableLikeEntity.getMetadataLocation(), metadataString) + .build(); + PolarisResolvedPathWrapper resolvedPath = + resolvedEntityView.getResolvedPath( + tableLikeEntity.getTableIdentifier(), + PolarisEntityType.TABLE_LIKE, + PolarisEntitySubType.ICEBERG_TABLE); + try { + return metaStoreManager.updateEntityPropertiesIfNotChanged( + callContext, + PolarisEntity.toCoreList(resolvedPath.getRawParentPath()), + newTableLikeEntity); + } catch (RuntimeException e) { + // PersistenceException (& other extension-specific exceptions) may not be in scope, + // but we can make a best-effort attempt to swallow it and just forego caching + if (e.toString().contains("PersistenceException")) { + LOGGER.warn( + String.format( + "Encountered an error while caching %s: %s", + tableLikeEntity.getTableIdentifier(), e)); + return new EntityResult( + EntityResult.ReturnStatus.UNEXPECTED_ERROR_SIGNALED, e.getMessage()); + } else { + throw e; + } + } + } +}