Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.polaris.service.catalog.common;

import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.polaris.core.entity.PolarisEntitySubType;
import org.apache.polaris.core.entity.PolarisEntityType;
import org.apache.polaris.core.persistence.PolarisResolvedPathWrapper;
import org.apache.polaris.core.persistence.resolver.PolarisResolutionManifestCatalogView;

/** Utility methods for working with Polaris catalog entities. */
public class CatalogUtils {

/**
* Find the resolved entity path that may contain storage information
*
* @param resolvedEntityView The resolved entity view containing catalog entities.
* @param tableIdentifier The table identifier for which to find storage information.
* @return The resolved path wrapper that may contain storage information.
*/
public static PolarisResolvedPathWrapper findResolvedStorageEntity(
PolarisResolutionManifestCatalogView resolvedEntityView, TableIdentifier tableIdentifier) {
PolarisResolvedPathWrapper resolvedTableEntities =
resolvedEntityView.getResolvedPath(
tableIdentifier, PolarisEntityType.TABLE_LIKE, PolarisEntitySubType.ICEBERG_TABLE);
if (resolvedTableEntities != null) {
return resolvedTableEntities;
}
return resolvedEntityView.getResolvedPath(tableIdentifier.namespace());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -121,14 +121,12 @@
import org.apache.polaris.core.persistence.resolver.ResolverFactory;
import org.apache.polaris.core.persistence.resolver.ResolverPath;
import org.apache.polaris.core.persistence.resolver.ResolverStatus;
import org.apache.polaris.core.storage.AccessConfig;
import org.apache.polaris.core.storage.PolarisCredentialVendor;
import org.apache.polaris.core.storage.PolarisStorageActions;
import org.apache.polaris.core.storage.PolarisStorageConfigurationInfo;
import org.apache.polaris.core.storage.StorageLocation;
import org.apache.polaris.core.storage.StorageUtil;
import org.apache.polaris.core.storage.cache.StorageCredentialCache;
import org.apache.polaris.service.catalog.SupportsNotifications;
import org.apache.polaris.service.catalog.common.CatalogUtils;
import org.apache.polaris.service.catalog.common.LocationUtils;
import org.apache.polaris.service.catalog.io.FileIOFactory;
import org.apache.polaris.service.catalog.io.FileIOUtil;
Expand All @@ -143,7 +141,7 @@

/** Defines the relationship between PolarisEntities and Iceberg's business logic. */
public class IcebergCatalog extends BaseMetastoreViewCatalog
implements SupportsNamespaces, SupportsNotifications, Closeable, SupportsCredentialDelegation {
implements SupportsNamespaces, SupportsNotifications, Closeable {
private static final Logger LOGGER = LoggerFactory.getLogger(IcebergCatalog.class);

private static final Joiner SLASH = Joiner.on("/");
Expand All @@ -163,7 +161,6 @@ public class IcebergCatalog extends BaseMetastoreViewCatalog
};

private final PolarisDiagnostics diagnostics;
private final StorageCredentialCache storageCredentialCache;
private final ResolverFactory resolverFactory;
private final CallContext callContext;
private final RealmConfig realmConfig;
Expand Down Expand Up @@ -194,7 +191,6 @@ public class IcebergCatalog extends BaseMetastoreViewCatalog
*/
public IcebergCatalog(
PolarisDiagnostics diagnostics,
StorageCredentialCache storageCredentialCache,
ResolverFactory resolverFactory,
PolarisMetaStoreManager metaStoreManager,
CallContext callContext,
Expand All @@ -204,7 +200,6 @@ public IcebergCatalog(
FileIOFactory fileIOFactory,
PolarisEventListener polarisEventListener) {
this.diagnostics = diagnostics;
this.storageCredentialCache = storageCredentialCache;
this.resolverFactory = resolverFactory;
this.callContext = callContext;
this.realmConfig = callContext.getRealmConfig();
Expand Down Expand Up @@ -385,7 +380,9 @@ public boolean dropTable(TableIdentifier tableIdentifier, boolean purge) {
lastMetadata = null;
}

Optional<PolarisEntity> storageInfoEntity = findStorageInfo(tableIdentifier);
Optional<PolarisEntity> storageInfoEntity =
FileIOUtil.findStorageInfoFromHierarchy(
CatalogUtils.findResolvedStorageEntity(resolvedEntityView, tableIdentifier));

// The storageProperties we stash away in the Task should be the superset of the
// internalProperties of the StorageInfoEntity to be able to use its StorageIntegration
Expand Down Expand Up @@ -831,31 +828,6 @@ public boolean sendNotification(
PolarisEntitySubType.ICEBERG_TABLE, identifier, notificationRequest);
}

@Override
public AccessConfig getAccessConfig(
TableIdentifier tableIdentifier,
TableMetadata tableMetadata,
Set<PolarisStorageActions> storageActions,
Optional<String> refreshCredentialsEndpoint) {
Optional<PolarisEntity> storageInfo = findStorageInfo(tableIdentifier);
if (storageInfo.isEmpty()) {
LOGGER
.atWarn()
.addKeyValue("tableIdentifier", tableIdentifier)
.log("Table entity has no storage configuration in its hierarchy");
return AccessConfig.builder().supportsCredentialVending(false).build();
}
return FileIOUtil.refreshAccessConfig(
callContext,
storageCredentialCache,
getCredentialVendor(),
tableIdentifier,
StorageUtil.getLocationsUsedByTable(tableMetadata),
storageActions,
storageInfo.get(),
refreshCredentialsEndpoint);
}

private String buildPrefixedLocation(TableIdentifier tableIdentifier) {
StringBuilder locationBuilder = new StringBuilder();
locationBuilder.append(defaultBaseLocation);
Expand Down Expand Up @@ -961,19 +933,6 @@ public String transformTableLikeLocation(TableIdentifier tableIdentifier, String
tableIdentifier, applyReplaceNewLocationWithCatalogDefault(location));
}

private @Nonnull Optional<PolarisEntity> findStorageInfo(TableIdentifier tableIdentifier) {
PolarisResolvedPathWrapper resolvedTableEntities =
resolvedEntityView.getResolvedPath(
tableIdentifier, PolarisEntityType.TABLE_LIKE, PolarisEntitySubType.ICEBERG_TABLE);

PolarisResolvedPathWrapper resolvedStorageEntity =
resolvedTableEntities == null
? resolvedEntityView.getResolvedPath(tableIdentifier.namespace())
: resolvedTableEntities;

return FileIOUtil.findStorageInfoFromHierarchy(resolvedStorageEntity);
}

/**
* Validates that the specified {@code location} is valid for whatever storage config is found for
* this TableLike's parent hierarchy.
Expand Down Expand Up @@ -2130,10 +2089,6 @@ private PolarisMetaStoreManager getMetaStoreManager() {
return metaStoreManager;
}

private PolarisCredentialVendor getCredentialVendor() {
return metaStoreManager;
}

@VisibleForTesting
public void setFileIOFactory(FileIOFactory newFactory) {
this.fileIOFactory = newFactory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@
import org.apache.polaris.service.catalog.api.IcebergRestCatalogApiService;
import org.apache.polaris.service.catalog.api.IcebergRestConfigurationApiService;
import org.apache.polaris.service.catalog.common.CatalogAdapter;
import org.apache.polaris.service.catalog.io.AccessConfigProvider;
import org.apache.polaris.service.config.ReservedProperties;
import org.apache.polaris.service.context.catalog.CallContextCatalogFactory;
import org.apache.polaris.service.events.listeners.PolarisEventListener;
Expand Down Expand Up @@ -154,6 +155,7 @@ public class IcebergCatalogAdapter
private final CatalogHandlerUtils catalogHandlerUtils;
private final Instance<ExternalCatalogFactory> externalCatalogFactories;
private final PolarisEventListener polarisEventListener;
private final AccessConfigProvider accessConfigProvider;

@Inject
public IcebergCatalogAdapter(
Expand All @@ -170,7 +172,8 @@ public IcebergCatalogAdapter(
ReservedProperties reservedProperties,
CatalogHandlerUtils catalogHandlerUtils,
@Any Instance<ExternalCatalogFactory> externalCatalogFactories,
PolarisEventListener polarisEventListener) {
PolarisEventListener polarisEventListener,
AccessConfigProvider accessConfigProvider) {
this.diagnostics = diagnostics;
this.realmContext = realmContext;
this.callContext = callContext;
Expand All @@ -186,6 +189,7 @@ public IcebergCatalogAdapter(
this.catalogHandlerUtils = catalogHandlerUtils;
this.externalCatalogFactories = externalCatalogFactories;
this.polarisEventListener = polarisEventListener;
this.accessConfigProvider = accessConfigProvider;
}

/**
Expand Down Expand Up @@ -225,7 +229,8 @@ IcebergCatalogHandler newHandlerWrapper(SecurityContext securityContext, String
reservedProperties,
catalogHandlerUtils,
externalCatalogFactories,
polarisEventListener);
polarisEventListener,
accessConfigProvider);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,12 @@
import org.apache.polaris.core.secrets.UserSecretsManager;
import org.apache.polaris.core.storage.AccessConfig;
import org.apache.polaris.core.storage.PolarisStorageActions;
import org.apache.polaris.core.storage.StorageUtil;
import org.apache.polaris.service.catalog.AccessDelegationMode;
import org.apache.polaris.service.catalog.SupportsNotifications;
import org.apache.polaris.service.catalog.common.CatalogHandler;
import org.apache.polaris.service.catalog.common.CatalogUtils;
import org.apache.polaris.service.catalog.io.AccessConfigProvider;
import org.apache.polaris.service.config.ReservedProperties;
import org.apache.polaris.service.context.catalog.CallContextCatalogFactory;
import org.apache.polaris.service.events.listeners.PolarisEventListener;
Expand Down Expand Up @@ -135,6 +138,7 @@ public class IcebergCatalogHandler extends CatalogHandler implements AutoCloseab
private final ReservedProperties reservedProperties;
private final CatalogHandlerUtils catalogHandlerUtils;
private final PolarisEventListener polarisEventListener;
private final AccessConfigProvider accessConfigProvider;

// Catalog instance will be initialized after authorizing resolver successfully resolves
// the catalog entity.
Expand All @@ -158,7 +162,8 @@ public IcebergCatalogHandler(
ReservedProperties reservedProperties,
CatalogHandlerUtils catalogHandlerUtils,
Instance<ExternalCatalogFactory> externalCatalogFactories,
PolarisEventListener polarisEventListener) {
PolarisEventListener polarisEventListener,
AccessConfigProvider accessConfigProvider) {
super(
diagnostics,
callContext,
Expand All @@ -173,6 +178,7 @@ public IcebergCatalogHandler(
this.reservedProperties = reservedProperties;
this.catalogHandlerUtils = catalogHandlerUtils;
this.polarisEventListener = polarisEventListener;
this.accessConfigProvider = accessConfigProvider;
}

private CatalogEntity getResolvedCatalogEntity() {
Expand Down Expand Up @@ -793,16 +799,19 @@ private LoadTableResponse.Builder buildLoadTableResponseWithDelegationCredential
Optional<String> refreshCredentialsEndpoint) {
LoadTableResponse.Builder responseBuilder =
LoadTableResponse.builder().withTableMetadata(tableMetadata);
PolarisResolvedPathWrapper resolvedStoragePath =
CatalogUtils.findResolvedStorageEntity(resolutionManifest, tableIdentifier);

if (baseCatalog instanceof IcebergCatalog && resolvedStoragePath != null) {

if (baseCatalog instanceof SupportsCredentialDelegation credentialDelegation) {
LOGGER
.atDebug()
.addKeyValue("tableIdentifier", tableIdentifier)
.addKeyValue("tableLocation", tableMetadata.location())
.log("Fetching client credentials for table");
AccessConfig accessConfig =
credentialDelegation.getAccessConfig(
tableIdentifier, tableMetadata, actions, refreshCredentialsEndpoint);
accessConfigProvider.getAccessConfig(
callContext,
tableIdentifier,
StorageUtil.getLocationsUsedByTable(tableMetadata),
actions,
refreshCredentialsEndpoint,
resolvedStoragePath);
Map<String, String> credentialConfig = accessConfig.credentials();
if (delegationModes.contains(VENDED_CREDENTIALS)) {
if (!credentialConfig.isEmpty()) {
Expand Down

This file was deleted.

Loading