Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -150,4 +150,29 @@ public static <T> Builder<T> builder() {
"If set to true, allows tables to have external locations outside the default structure.")
.defaultValue(false)
.build();

public static final PolarisConfiguration<Boolean> CLEANUP_ON_NAMESPACE_DROP =
PolarisConfiguration.<Boolean>builder()
.key("CLEANUP_ON_NAMESPACE_DROP")
.catalogConfig("cleanup.on.namespace.drop")
.description("If set to true, clean up data when a namespace is dropped")
.defaultValue(false)
.build();

public static final PolarisConfiguration<Boolean> CLEANUP_ON_CATALOG_DROP =
PolarisConfiguration.<Boolean>builder()
.key("CLEANUP_ON_CATALOG_DROP")
.catalogConfig("cleanup.on.catalog.drop")
.description("If set to true, clean up data when a catalog is dropped")
.defaultValue(false)
.build();

public static final PolarisConfiguration<Boolean> DROP_WITH_PURGE_ENABLED =
PolarisConfiguration.<Boolean>builder()
.key("DROP_WITH_PURGE_ENABLED")
.catalogConfig("drop-with-purge.enabled")
.description(
"If set to true, allows tables to be dropped with the purge parameter set to true.")
.defaultValue(true)
.build();
}
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,6 @@
*/
public class PolarisAdminService {
private static final Logger LOGGER = LoggerFactory.getLogger(PolarisAdminService.class);
public static final String CLEANUP_ON_CATALOG_DROP = "CLEANUP_ON_CATALOG_DROP";

private final CallContext callContext;
private final PolarisEntityManager entityManager;
Expand Down Expand Up @@ -578,7 +577,7 @@ public void deleteCatalog(String name) {
boolean cleanup =
polarisCallContext
.getConfigurationStore()
.getConfiguration(polarisCallContext, CLEANUP_ON_CATALOG_DROP, false);
.getConfiguration(polarisCallContext, PolarisConfiguration.CLEANUP_ON_CATALOG_DROP);
PolarisMetaStoreManager.DropEntityResult dropEntityResult =
entityManager
.getMetaStoreManager()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,6 @@ public class BasePolarisCatalog extends BaseMetastoreViewCatalog
&& !(ex instanceof UnprocessableEntityException)
&& isStorageProviderRetryableException(ex);
};
public static final String CLEANUP_ON_NAMESPACE_DROP = "CLEANUP_ON_NAMESPACE_DROP";

private final PolarisEntityManager entityManager;
private final CallContext callContext;
Expand Down Expand Up @@ -628,7 +627,8 @@ public boolean dropNamespace(Namespace namespace) throws NamespaceNotEmptyExcept
Map.of(),
polarisCallContext
.getConfigurationStore()
.getConfiguration(polarisCallContext, CLEANUP_ON_NAMESPACE_DROP, false));
.getConfiguration(
polarisCallContext, PolarisConfiguration.CLEANUP_ON_NAMESPACE_DROP));

if (!dropEntityResult.isSuccess() && dropEntityResult.failedBecauseNotEmpty()) {
throw new NamespaceNotEmptyException("Namespace %s is not empty", namespace);
Expand Down Expand Up @@ -1811,6 +1811,7 @@ private void updateTableLike(TableIdentifier identifier, PolarisEntity entity) {
}
}

@SuppressWarnings("FormatStringAnnotation")
private @NotNull PolarisMetaStoreManager.DropEntityResult dropTableLike(
PolarisEntitySubType subType,
TableIdentifier identifier,
Expand All @@ -1826,6 +1827,28 @@ private void updateTableLike(TableIdentifier identifier, PolarisEntity entity) {

List<PolarisEntity> catalogPath = resolvedEntities.getRawParentPath();
PolarisEntity leafEntity = resolvedEntities.getRawLeafEntity();

// Check that purge is enabled, if it is set:
if (catalogPath != null && !catalogPath.isEmpty() && purge) {
boolean dropWithPurgeEnabled =
callContext
.getPolarisCallContext()
.getConfigurationStore()
.getConfiguration(
callContext.getPolarisCallContext(),
catalogEntity,
PolarisConfiguration.DROP_WITH_PURGE_ENABLED);
if (!dropWithPurgeEnabled) {
throw new ForbiddenException(
String.format(
"Unable to purge entity: %s. To enable this feature, set the Polaris configuration %s "
+ "or the catalog configuration %s",
identifier.name(),
PolarisConfiguration.DROP_WITH_PURGE_ENABLED.key,
PolarisConfiguration.DROP_WITH_PURGE_ENABLED.catalogConfig()));
}
}

return entityManager
.getMetaStoreManager()
.dropEntityIfExists(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1146,6 +1146,67 @@ public void testDropTableWithPurge() {
Assertions.assertThat(fileIO).isNotNull().isInstanceOf(InMemoryFileIO.class);
}

@Test
public void testDropTableWithPurgeDisabled() {
// Create a catalog with purge disabled:
String noPurgeCatalogName = CATALOG_NAME + "_no_purge";
String storageLocation = "s3://testDropTableWithPurgeDisabled/data";
AwsStorageConfigInfo noPurgeStorageConfigModel =
AwsStorageConfigInfo.builder()
.setRoleArn("arn:aws:iam::012345678901:role/jdoe")
.setExternalId("externalId")
.setUserArn("aws::a:user:arn")
.setStorageType(StorageConfigInfo.StorageTypeEnum.S3)
.build();
adminService.createCatalog(
new CatalogEntity.Builder()
.setName(noPurgeCatalogName)
.setDefaultBaseLocation(storageLocation)
.setReplaceNewLocationPrefixWithCatalogDefault("file:")
.addProperty(PolarisConfiguration.ALLOW_EXTERNAL_TABLE_LOCATION.catalogConfig(), "true")
.addProperty(
PolarisConfiguration.ALLOW_UNSTRUCTURED_TABLE_LOCATION.catalogConfig(), "true")
.addProperty(PolarisConfiguration.DROP_WITH_PURGE_ENABLED.catalogConfig(), "false")
.setStorageConfigurationInfo(noPurgeStorageConfigModel, storageLocation)
.build());
RealmContext realmContext = () -> "realm";
CallContext callContext = CallContext.of(realmContext, polarisContext);
PolarisPassthroughResolutionView passthroughView =
new PolarisPassthroughResolutionView(
callContext, entityManager, authenticatedRoot, noPurgeCatalogName);
BasePolarisCatalog noPurgeCatalog =
new BasePolarisCatalog(
entityManager,
callContext,
passthroughView,
authenticatedRoot,
Mockito.mock(),
new DefaultFileIOFactory());
noPurgeCatalog.initialize(
noPurgeCatalogName,
ImmutableMap.of(
CatalogProperties.FILE_IO_IMPL, "org.apache.iceberg.inmemory.InMemoryFileIO"));

if (this.requiresNamespaceCreate()) {
((SupportsNamespaces) noPurgeCatalog).createNamespace(NS);
}

Assertions.assertThatPredicate(noPurgeCatalog::tableExists)
.as("Table should not exist before create")
.rejects(TABLE);

Table table = noPurgeCatalog.buildTable(TABLE, SCHEMA).create();
Assertions.assertThatPredicate(noPurgeCatalog::tableExists)
.as("Table should exist after create")
.accepts(TABLE);
Assertions.assertThat(table).isInstanceOf(BaseTable.class);

// Attempt to drop the table:
Assertions.assertThatThrownBy(() -> noPurgeCatalog.dropTable(TABLE, true))
.isInstanceOf(ForbiddenException.class)
.hasMessageContaining(PolarisConfiguration.DROP_WITH_PURGE_ENABLED.key);
}

private TableMetadata createSampleTableMetadata(String tableLocation) {
Schema schema =
new Schema(
Expand Down