From 2cebf024caf1fc9b27bedcf885cd462002b9ca3f Mon Sep 17 00:00:00 2001 From: Alexandre Dutra Date: Mon, 2 Jun 2025 22:04:54 +0200 Subject: [PATCH] feat(cdi): Remove CallContext.close() Now that every catalog created by PolarisCallContextCatalogFactory is correctly closed by IcebergCatalogAdapter, we can finally remove the CallContext.close() method and the associated cloaseables group. This simplification will hopefully pave the way to a more robust handling of request-scoped beans in task executor threads. --- .../polaris/core/context/CallContext.java | 62 +-- .../InMemoryStorageIntegrationTest.java | 72 ++-- .../BasePolarisMetaStoreManagerTest.java | 225 +++++----- .../quarkus/config/QuarkusProducers.java | 4 - .../auth/JWTSymmetricKeyGeneratorTest.java | 20 - .../task/BatchFileCleanupTaskHandlerTest.java | 396 +++++++++--------- .../ManifestFileCleanupTaskHandlerTest.java | 273 ++++++------ .../test/PolarisIntegrationTestFixture.java | 9 +- .../catalog/iceberg/IcebergCatalog.java | 1 - .../PolarisCallContextCatalogFactory.java | 2 - .../service/catalog/io/FileIOFactoryTest.java | 6 - .../apache/polaris/service/TestServices.java | 7 - 12 files changed, 470 insertions(+), 607 deletions(-) diff --git a/polaris-core/src/main/java/org/apache/polaris/core/context/CallContext.java b/polaris-core/src/main/java/org/apache/polaris/core/context/CallContext.java index 6cd56bc304..54859647d0 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/context/CallContext.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/context/CallContext.java @@ -18,16 +18,8 @@ */ package org.apache.polaris.core.context; -import jakarta.annotation.Nonnull; -import java.io.IOException; -import java.util.HashMap; -import java.util.Map; -import java.util.stream.Collectors; -import org.apache.iceberg.io.CloseableGroup; import org.apache.polaris.core.PolarisCallContext; import org.apache.polaris.core.PolarisDiagnostics; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * Stores elements associated with an individual REST request such as RealmContext, caller @@ -37,15 +29,9 @@ * principal/role entities may be defined within a Realm-specific persistence layer, and the * underlying nature of the persistence layer may differ between different realms. */ -public interface CallContext extends AutoCloseable { +public interface CallContext { InheritableThreadLocal CURRENT_CONTEXT = new InheritableThreadLocal<>(); - // For requests that make use of a Catalog instance, this holds the instance that was - // created, scoped to the current call context. - String REQUEST_PATH_CATALOG_INSTANCE_KEY = "REQUEST_PATH_CATALOG_INSTANCE"; - - String CLOSEABLES = "closeables"; - static CallContext setCurrentContext(CallContext context) { CURRENT_CONTEXT.set(context); return context; @@ -65,7 +51,6 @@ static void unsetCurrentContext() { static CallContext of( final RealmContext realmContext, final PolarisCallContext polarisCallContext) { - Map map = new HashMap<>(); return new CallContext() { @Override public RealmContext getRealmContext() { @@ -76,28 +61,14 @@ public RealmContext getRealmContext() { public PolarisCallContext getPolarisCallContext() { return polarisCallContext; } - - @Override - public Map contextVariables() { - return map; - } }; } - /** - * Copy the {@link CallContext}. {@link #contextVariables()} will be copied except for {@link - * #closeables()}. The original {@link #contextVariables()} map is untouched and {@link - * #closeables()} in the original {@link CallContext} should be closed along with the {@link - * CallContext}. - */ + /** Copy the {@link CallContext}. */ static CallContext copyOf(CallContext base) { String realmId = base.getRealmContext().getRealmIdentifier(); RealmContext realmContext = () -> realmId; PolarisCallContext polarisCallContext = PolarisCallContext.copyOf(base.getPolarisCallContext()); - Map contextVariables = - base.contextVariables().entrySet().stream() - .filter(e -> !e.getKey().equals(CLOSEABLES)) - .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); return new CallContext() { @Override public RealmContext getRealmContext() { @@ -108,11 +79,6 @@ public RealmContext getRealmContext() { public PolarisCallContext getPolarisCallContext() { return polarisCallContext; } - - @Override - public Map contextVariables() { - return contextVariables; - } }; } @@ -122,28 +88,4 @@ public Map contextVariables() { * @return the inner context used for delegating services */ PolarisCallContext getPolarisCallContext(); - - Map contextVariables(); - - default @Nonnull CloseableGroup closeables() { - return (CloseableGroup) - contextVariables().computeIfAbsent(CLOSEABLES, key -> new CloseableGroup()); - } - - @Override - default void close() { - if (CURRENT_CONTEXT.get() == this) { - unsetCurrentContext(); - CloseableGroup closeables = closeables(); - try { - closeables.close(); - } catch (IOException e) { - Logger logger = LoggerFactory.getLogger(CallContext.class); - logger - .atWarn() - .addKeyValue("closeableGroup", closeables) - .log("Unable to close closeable group", e); - } - } - } } diff --git a/polaris-core/src/test/java/org/apache/polaris/core/storage/InMemoryStorageIntegrationTest.java b/polaris-core/src/test/java/org/apache/polaris/core/storage/InMemoryStorageIntegrationTest.java index 12e83e2d2f..c060974f44 100644 --- a/polaris-core/src/test/java/org/apache/polaris/core/storage/InMemoryStorageIntegrationTest.java +++ b/polaris-core/src/test/java/org/apache/polaris/core/storage/InMemoryStorageIntegrationTest.java @@ -105,43 +105,41 @@ public void testValidateAccessToLocationsWithWildcard() { } }, Clock.systemUTC()); - try (CallContext ignored = - CallContext.setCurrentContext(CallContext.of(() -> "realm", polarisCallContext))) { - Map> result = - storage.validateAccessToLocations( - new FileStorageConfigurationInfo(List.of("file://", "*")), - Set.of(PolarisStorageActions.READ), - Set.of( - "s3://bucket/path/to/warehouse/namespace/table", - "file:///etc/passwd", - "a/relative/subdirectory")); - Assertions.assertThat(result) - .hasSize(3) - .hasEntrySatisfying( - "s3://bucket/path/to/warehouse/namespace/table", - val -> - Assertions.assertThat(val) - .hasSize(1) - .containsKey(PolarisStorageActions.READ) - .extractingByKey(PolarisStorageActions.READ) - .returns(true, PolarisStorageIntegration.ValidationResult::isSuccess)) - .hasEntrySatisfying( - "file:///etc/passwd", - val -> - Assertions.assertThat(val) - .hasSize(1) - .containsKey(PolarisStorageActions.READ) - .extractingByKey(PolarisStorageActions.READ) - .returns(true, PolarisStorageIntegration.ValidationResult::isSuccess)) - .hasEntrySatisfying( - "a/relative/subdirectory", - val -> - Assertions.assertThat(val) - .hasSize(1) - .containsKey(PolarisStorageActions.READ) - .extractingByKey(PolarisStorageActions.READ) - .returns(true, PolarisStorageIntegration.ValidationResult::isSuccess)); - } + CallContext.setCurrentContext(CallContext.of(() -> "realm", polarisCallContext)); + Map> result = + storage.validateAccessToLocations( + new FileStorageConfigurationInfo(List.of("file://", "*")), + Set.of(PolarisStorageActions.READ), + Set.of( + "s3://bucket/path/to/warehouse/namespace/table", + "file:///etc/passwd", + "a/relative/subdirectory")); + Assertions.assertThat(result) + .hasSize(3) + .hasEntrySatisfying( + "s3://bucket/path/to/warehouse/namespace/table", + val -> + Assertions.assertThat(val) + .hasSize(1) + .containsKey(PolarisStorageActions.READ) + .extractingByKey(PolarisStorageActions.READ) + .returns(true, PolarisStorageIntegration.ValidationResult::isSuccess)) + .hasEntrySatisfying( + "file:///etc/passwd", + val -> + Assertions.assertThat(val) + .hasSize(1) + .containsKey(PolarisStorageActions.READ) + .extractingByKey(PolarisStorageActions.READ) + .returns(true, PolarisStorageIntegration.ValidationResult::isSuccess)) + .hasEntrySatisfying( + "a/relative/subdirectory", + val -> + Assertions.assertThat(val) + .hasSize(1) + .containsKey(PolarisStorageActions.READ) + .extractingByKey(PolarisStorageActions.READ) + .returns(true, PolarisStorageIntegration.ValidationResult::isSuccess)); } @Test diff --git a/polaris-core/src/testFixtures/java/org/apache/polaris/core/persistence/BasePolarisMetaStoreManagerTest.java b/polaris-core/src/testFixtures/java/org/apache/polaris/core/persistence/BasePolarisMetaStoreManagerTest.java index 0f834bc760..5abda44606 100644 --- a/polaris-core/src/testFixtures/java/org/apache/polaris/core/persistence/BasePolarisMetaStoreManagerTest.java +++ b/polaris-core/src/testFixtures/java/org/apache/polaris/core/persistence/BasePolarisMetaStoreManagerTest.java @@ -104,137 +104,126 @@ protected void testBrowse() { @Test protected void testCreateEntities() { PolarisMetaStoreManager metaStoreManager = polarisTestMetaStoreManager.polarisMetaStoreManager; - try (CallContext callCtx = - CallContext.of(() -> "testRealm", polarisTestMetaStoreManager.polarisCallContext)) { - if (CallContext.getCurrentContext() == null) { - CallContext.setCurrentContext(callCtx); - } - TaskEntity task1 = createTask("task1", 100L); - TaskEntity task2 = createTask("task2", 101L); - List createdEntities = - metaStoreManager - .createEntitiesIfNotExist( - polarisTestMetaStoreManager.polarisCallContext, null, List.of(task1, task2)) - .getEntities(); - - Assertions.assertThat(createdEntities) - .isNotNull() - .hasSize(2) - .extracting(PolarisEntity::toCore) - .containsExactly(PolarisEntity.toCore(task1), PolarisEntity.toCore(task2)); - - List listedEntities = - metaStoreManager - .listEntities( - polarisTestMetaStoreManager.polarisCallContext, - null, - PolarisEntityType.TASK, - PolarisEntitySubType.NULL_SUBTYPE, - PageToken.readEverything()) - .getEntities(); - Assertions.assertThat(listedEntities) - .isNotNull() - .hasSize(2) - .containsExactly( - new EntityNameLookupRecord( - task1.getCatalogId(), - task1.getId(), - task1.getParentId(), - task1.getName(), - task1.getTypeCode(), - task1.getSubTypeCode()), - new EntityNameLookupRecord( - task2.getCatalogId(), - task2.getId(), - task2.getParentId(), - task2.getName(), - task2.getTypeCode(), - task2.getSubTypeCode())); + CallContext callCtx = + CallContext.of(() -> "testRealm", polarisTestMetaStoreManager.polarisCallContext); + if (CallContext.getCurrentContext() == null) { + CallContext.setCurrentContext(callCtx); } + TaskEntity task1 = createTask("task1", 100L); + TaskEntity task2 = createTask("task2", 101L); + List createdEntities = + metaStoreManager + .createEntitiesIfNotExist( + polarisTestMetaStoreManager.polarisCallContext, null, List.of(task1, task2)) + .getEntities(); + + Assertions.assertThat(createdEntities) + .isNotNull() + .hasSize(2) + .extracting(PolarisEntity::toCore) + .containsExactly(PolarisEntity.toCore(task1), PolarisEntity.toCore(task2)); + + List listedEntities = + metaStoreManager + .listEntities( + polarisTestMetaStoreManager.polarisCallContext, + null, + PolarisEntityType.TASK, + PolarisEntitySubType.NULL_SUBTYPE, + PageToken.readEverything()) + .getEntities(); + Assertions.assertThat(listedEntities) + .isNotNull() + .hasSize(2) + .containsExactly( + new EntityNameLookupRecord( + task1.getCatalogId(), + task1.getId(), + task1.getParentId(), + task1.getName(), + task1.getTypeCode(), + task1.getSubTypeCode()), + new EntityNameLookupRecord( + task2.getCatalogId(), + task2.getId(), + task2.getParentId(), + task2.getName(), + task2.getTypeCode(), + task2.getSubTypeCode())); } @Test protected void testCreateEntitiesAlreadyExisting() { PolarisMetaStoreManager metaStoreManager = polarisTestMetaStoreManager.polarisMetaStoreManager; - try (CallContext callCtx = - CallContext.of(() -> "testRealm", polarisTestMetaStoreManager.polarisCallContext)) { - if (CallContext.getCurrentContext() == null) { - CallContext.setCurrentContext(callCtx); - } - TaskEntity task1 = createTask("task1", 100L); - TaskEntity task2 = createTask("task2", 101L); - List createdEntities = - metaStoreManager - .createEntitiesIfNotExist( - polarisTestMetaStoreManager.polarisCallContext, null, List.of(task1, task2)) - .getEntities(); - - Assertions.assertThat(createdEntities) - .isNotNull() - .hasSize(2) - .extracting(PolarisEntity::toCore) - .containsExactly(PolarisEntity.toCore(task1), PolarisEntity.toCore(task2)); - - TaskEntity task3 = createTask("task3", 103L); - - // entities task1 and task2 already exist with the same identifier, so the full list is - // returned - createdEntities = - metaStoreManager - .createEntitiesIfNotExist( - polarisTestMetaStoreManager.polarisCallContext, - null, - List.of(task1, task2, task3)) - .getEntities(); - Assertions.assertThat(createdEntities) - .isNotNull() - .hasSize(3) - .extracting(PolarisEntity::toCore) - .containsExactly( - PolarisEntity.toCore(task1), - PolarisEntity.toCore(task2), - PolarisEntity.toCore(task3)); + CallContext callCtx = + CallContext.of(() -> "testRealm", polarisTestMetaStoreManager.polarisCallContext); + if (CallContext.getCurrentContext() == null) { + CallContext.setCurrentContext(callCtx); } + TaskEntity task1 = createTask("task1", 100L); + TaskEntity task2 = createTask("task2", 101L); + List createdEntities = + metaStoreManager + .createEntitiesIfNotExist( + polarisTestMetaStoreManager.polarisCallContext, null, List.of(task1, task2)) + .getEntities(); + + Assertions.assertThat(createdEntities) + .isNotNull() + .hasSize(2) + .extracting(PolarisEntity::toCore) + .containsExactly(PolarisEntity.toCore(task1), PolarisEntity.toCore(task2)); + + TaskEntity task3 = createTask("task3", 103L); + + // entities task1 and task2 already exist with the same identifier, so the full list is + // returned + createdEntities = + metaStoreManager + .createEntitiesIfNotExist( + polarisTestMetaStoreManager.polarisCallContext, null, List.of(task1, task2, task3)) + .getEntities(); + Assertions.assertThat(createdEntities) + .isNotNull() + .hasSize(3) + .extracting(PolarisEntity::toCore) + .containsExactly( + PolarisEntity.toCore(task1), PolarisEntity.toCore(task2), PolarisEntity.toCore(task3)); } @Test protected void testCreateEntitiesWithConflict() { PolarisMetaStoreManager metaStoreManager = polarisTestMetaStoreManager.polarisMetaStoreManager; - try (CallContext callCtx = - CallContext.of(() -> "testRealm", polarisTestMetaStoreManager.polarisCallContext)) { - if (CallContext.getCurrentContext() == null) { - CallContext.setCurrentContext(callCtx); - } - TaskEntity task1 = createTask("task1", 100L); - TaskEntity task2 = createTask("task2", 101L); - TaskEntity task3 = createTask("task3", 103L); - List createdEntities = - metaStoreManager - .createEntitiesIfNotExist( - polarisTestMetaStoreManager.polarisCallContext, - null, - List.of(task1, task2, task3)) - .getEntities(); - - Assertions.assertThat(createdEntities) - .isNotNull() - .hasSize(3) - .extracting(PolarisEntity::toCore) - .containsExactly( - PolarisEntity.toCore(task1), - PolarisEntity.toCore(task2), - PolarisEntity.toCore(task3)); - - TaskEntity secondTask3 = createTask("task3", 104L); - - TaskEntity task4 = createTask("task4", 105L); - createdEntities = - metaStoreManager - .createEntitiesIfNotExist( - polarisTestMetaStoreManager.polarisCallContext, null, List.of(secondTask3, task4)) - .getEntities(); - Assertions.assertThat(createdEntities).isNull(); + CallContext callCtx = + CallContext.of(() -> "testRealm", polarisTestMetaStoreManager.polarisCallContext); + if (CallContext.getCurrentContext() == null) { + CallContext.setCurrentContext(callCtx); } + TaskEntity task1 = createTask("task1", 100L); + TaskEntity task2 = createTask("task2", 101L); + TaskEntity task3 = createTask("task3", 103L); + List createdEntities = + metaStoreManager + .createEntitiesIfNotExist( + polarisTestMetaStoreManager.polarisCallContext, null, List.of(task1, task2, task3)) + .getEntities(); + + Assertions.assertThat(createdEntities) + .isNotNull() + .hasSize(3) + .extracting(PolarisEntity::toCore) + .containsExactly( + PolarisEntity.toCore(task1), PolarisEntity.toCore(task2), PolarisEntity.toCore(task3)); + + TaskEntity secondTask3 = createTask("task3", 104L); + + TaskEntity task4 = createTask("task4", 105L); + createdEntities = + metaStoreManager + .createEntitiesIfNotExist( + polarisTestMetaStoreManager.polarisCallContext, null, List.of(secondTask3, task4)) + .getEntities(); + Assertions.assertThat(createdEntities).isNull(); } private static TaskEntity createTask(String taskName, long id) { diff --git a/quarkus/service/src/main/java/org/apache/polaris/service/quarkus/config/QuarkusProducers.java b/quarkus/service/src/main/java/org/apache/polaris/service/quarkus/config/QuarkusProducers.java index 40b79980dc..c4b3a3ae1b 100644 --- a/quarkus/service/src/main/java/org/apache/polaris/service/quarkus/config/QuarkusProducers.java +++ b/quarkus/service/src/main/java/org/apache/polaris/service/quarkus/config/QuarkusProducers.java @@ -138,10 +138,6 @@ public CallContext callContext(RealmContext realmContext, PolarisCallContext pol return CallContext.of(realmContext, polarisCallContext); } - public void closeCallContext(@Disposes CallContext callContext) { - callContext.close(); - } - // Polaris service beans - selected from @Identifier-annotated beans @Produces diff --git a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/auth/JWTSymmetricKeyGeneratorTest.java b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/auth/JWTSymmetricKeyGeneratorTest.java index fed5d20db0..b7c3ceef45 100644 --- a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/auth/JWTSymmetricKeyGeneratorTest.java +++ b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/auth/JWTSymmetricKeyGeneratorTest.java @@ -24,10 +24,7 @@ import com.auth0.jwt.JWTVerifier; import com.auth0.jwt.algorithms.Algorithm; import com.auth0.jwt.interfaces.DecodedJWT; -import java.util.Map; import org.apache.polaris.core.PolarisCallContext; -import org.apache.polaris.core.context.CallContext; -import org.apache.polaris.core.context.RealmContext; import org.apache.polaris.core.entity.PolarisBaseEntity; import org.apache.polaris.core.entity.PolarisEntitySubType; import org.apache.polaris.core.entity.PolarisEntityType; @@ -49,23 +46,6 @@ public class JWTSymmetricKeyGeneratorTest { @Test public void testJWTSymmetricKeyGenerator() { PolarisCallContext polarisCallContext = new PolarisCallContext(null, null, null, null); - CallContext.setCurrentContext( - new CallContext() { - @Override - public RealmContext getRealmContext() { - return () -> "realm"; - } - - @Override - public PolarisCallContext getPolarisCallContext() { - return polarisCallContext; - } - - @Override - public Map contextVariables() { - return Map.of(); - } - }); PolarisMetaStoreManager metastoreManager = Mockito.mock(PolarisMetaStoreManager.class); String mainSecret = "test_secret"; String clientId = "test_client_id"; diff --git a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/task/BatchFileCleanupTaskHandlerTest.java b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/task/BatchFileCleanupTaskHandlerTest.java index 662f88bb05..9259917841 100644 --- a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/task/BatchFileCleanupTaskHandlerTest.java +++ b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/task/BatchFileCleanupTaskHandlerTest.java @@ -94,114 +94,110 @@ public void testMetadataFileCleanup() throws IOException { new PolarisCallContext( metaStoreManagerFactory.getOrCreateSessionSupplier(realmContext).get(), new PolarisDefaultDiagServiceImpl()); - try (CallContext callCtx = CallContext.of(realmContext, polarisCallContext)) { - CallContext.setCurrentContext(callCtx); - FileIO fileIO = - new InMemoryFileIO() { - @Override - public void close() { - // no-op - } - }; - TableIdentifier tableIdentifier = - TableIdentifier.of(Namespace.of("db1", "schema1"), "table1"); - BatchFileCleanupTaskHandler handler = - new BatchFileCleanupTaskHandler( - buildTaskFileIOSupplier(fileIO), Executors.newSingleThreadExecutor()); + CallContext callCtx = CallContext.of(realmContext, polarisCallContext); + FileIO fileIO = + new InMemoryFileIO() { + @Override + public void close() { + // no-op + } + }; + TableIdentifier tableIdentifier = TableIdentifier.of(Namespace.of("db1", "schema1"), "table1"); + BatchFileCleanupTaskHandler handler = + new BatchFileCleanupTaskHandler( + buildTaskFileIOSupplier(fileIO), Executors.newSingleThreadExecutor()); - long snapshotId1 = 100L; - ManifestFile manifestFile1 = - TaskTestUtils.manifestFile( - fileIO, "manifest1.avro", snapshotId1, "dataFile1.parquet", "dataFile2.parquet"); - ManifestFile manifestFile2 = - TaskTestUtils.manifestFile( - fileIO, "manifest2.avro", snapshotId1, "dataFile3.parquet", "dataFile4.parquet"); - Snapshot snapshot = - TaskTestUtils.newSnapshot( - fileIO, "manifestList.avro", 1, snapshotId1, 99L, manifestFile1, manifestFile2); - StatisticsFile statisticsFile1 = - TaskTestUtils.writeStatsFile( - snapshot.snapshotId(), - snapshot.sequenceNumber(), - "/metadata/" + UUID.randomUUID() + ".stats", - fileIO); - PartitionStatisticsFile partitionStatisticsFile1 = - TaskTestUtils.writePartitionStatsFile( - snapshot.snapshotId(), - "/metadata/" + "partition-stats-" + UUID.randomUUID() + ".parquet", - fileIO); - String firstMetadataFile = "v1-295495059.metadata.json"; - TableMetadata firstMetadata = - TaskTestUtils.writeTableMetadata( - fileIO, - firstMetadataFile, - List.of(statisticsFile1), - List.of(partitionStatisticsFile1), - snapshot); - assertThat(TaskUtils.exists(firstMetadataFile, fileIO)).isTrue(); + long snapshotId1 = 100L; + ManifestFile manifestFile1 = + TaskTestUtils.manifestFile( + fileIO, "manifest1.avro", snapshotId1, "dataFile1.parquet", "dataFile2.parquet"); + ManifestFile manifestFile2 = + TaskTestUtils.manifestFile( + fileIO, "manifest2.avro", snapshotId1, "dataFile3.parquet", "dataFile4.parquet"); + Snapshot snapshot = + TaskTestUtils.newSnapshot( + fileIO, "manifestList.avro", 1, snapshotId1, 99L, manifestFile1, manifestFile2); + StatisticsFile statisticsFile1 = + TaskTestUtils.writeStatsFile( + snapshot.snapshotId(), + snapshot.sequenceNumber(), + "/metadata/" + UUID.randomUUID() + ".stats", + fileIO); + PartitionStatisticsFile partitionStatisticsFile1 = + TaskTestUtils.writePartitionStatsFile( + snapshot.snapshotId(), + "/metadata/" + "partition-stats-" + UUID.randomUUID() + ".parquet", + fileIO); + String firstMetadataFile = "v1-295495059.metadata.json"; + TableMetadata firstMetadata = + TaskTestUtils.writeTableMetadata( + fileIO, + firstMetadataFile, + List.of(statisticsFile1), + List.of(partitionStatisticsFile1), + snapshot); + assertThat(TaskUtils.exists(firstMetadataFile, fileIO)).isTrue(); - ManifestFile manifestFile3 = - TaskTestUtils.manifestFile( - fileIO, "manifest3.avro", snapshot.snapshotId() + 1, "dataFile5.parquet"); - Snapshot snapshot2 = - TaskTestUtils.newSnapshot( - fileIO, - "manifestList2.avro", - snapshot.sequenceNumber() + 1, - snapshot.snapshotId() + 1, - snapshot.snapshotId(), - manifestFile1, - manifestFile3); // exclude manifest2 from the new snapshot - StatisticsFile statisticsFile2 = - TaskTestUtils.writeStatsFile( - snapshot2.snapshotId(), - snapshot2.sequenceNumber(), - "/metadata/" + UUID.randomUUID() + ".stats", - fileIO); - PartitionStatisticsFile partitionStatisticsFile2 = - TaskTestUtils.writePartitionStatsFile( - snapshot2.snapshotId(), - "/metadata/" + "partition-stats-" + UUID.randomUUID() + ".parquet", - fileIO); - String secondMetadataFile = "v1-295495060.metadata.json"; - TableMetadata secondMetadata = - TaskTestUtils.writeTableMetadata( - fileIO, - secondMetadataFile, - firstMetadata, - firstMetadataFile, - List.of(statisticsFile2), - List.of(partitionStatisticsFile2), - snapshot2); - assertThat(TaskUtils.exists(firstMetadataFile, fileIO)).isTrue(); - assertThat(TaskUtils.exists(secondMetadataFile, fileIO)).isTrue(); + ManifestFile manifestFile3 = + TaskTestUtils.manifestFile( + fileIO, "manifest3.avro", snapshot.snapshotId() + 1, "dataFile5.parquet"); + Snapshot snapshot2 = + TaskTestUtils.newSnapshot( + fileIO, + "manifestList2.avro", + snapshot.sequenceNumber() + 1, + snapshot.snapshotId() + 1, + snapshot.snapshotId(), + manifestFile1, + manifestFile3); // exclude manifest2 from the new snapshot + StatisticsFile statisticsFile2 = + TaskTestUtils.writeStatsFile( + snapshot2.snapshotId(), + snapshot2.sequenceNumber(), + "/metadata/" + UUID.randomUUID() + ".stats", + fileIO); + PartitionStatisticsFile partitionStatisticsFile2 = + TaskTestUtils.writePartitionStatsFile( + snapshot2.snapshotId(), + "/metadata/" + "partition-stats-" + UUID.randomUUID() + ".parquet", + fileIO); + String secondMetadataFile = "v1-295495060.metadata.json"; + TableMetadata secondMetadata = + TaskTestUtils.writeTableMetadata( + fileIO, + secondMetadataFile, + firstMetadata, + firstMetadataFile, + List.of(statisticsFile2), + List.of(partitionStatisticsFile2), + snapshot2); + assertThat(TaskUtils.exists(firstMetadataFile, fileIO)).isTrue(); + assertThat(TaskUtils.exists(secondMetadataFile, fileIO)).isTrue(); - List cleanupFiles = - Stream.of( - secondMetadata.previousFiles().stream().map(TableMetadata.MetadataLogEntry::file), - secondMetadata.statisticsFiles().stream().map(StatisticsFile::path), - secondMetadata.partitionStatisticsFiles().stream() - .map(PartitionStatisticsFile::path)) - .flatMap(s -> s) - .filter(file -> TaskUtils.exists(file, fileIO)) - .toList(); + List cleanupFiles = + Stream.of( + secondMetadata.previousFiles().stream().map(TableMetadata.MetadataLogEntry::file), + secondMetadata.statisticsFiles().stream().map(StatisticsFile::path), + secondMetadata.partitionStatisticsFiles().stream() + .map(PartitionStatisticsFile::path)) + .flatMap(s -> s) + .filter(file -> TaskUtils.exists(file, fileIO)) + .toList(); - TaskEntity task = - new TaskEntity.Builder() - .withTaskType(AsyncTaskType.BATCH_FILE_CLEANUP) - .withData( - new BatchFileCleanupTaskHandler.BatchFileCleanupTask( - tableIdentifier, cleanupFiles)) - .setName(UUID.randomUUID().toString()) - .build(); + TaskEntity task = + new TaskEntity.Builder() + .withTaskType(AsyncTaskType.BATCH_FILE_CLEANUP) + .withData( + new BatchFileCleanupTaskHandler.BatchFileCleanupTask(tableIdentifier, cleanupFiles)) + .setName(UUID.randomUUID().toString()) + .build(); - addTaskLocation(task); - assertThatPredicate(handler::canHandleTask).accepts(task); - assertThat(handler.handleTask(task, callCtx)).isTrue(); + addTaskLocation(task); + assertThatPredicate(handler::canHandleTask).accepts(task); + assertThat(handler.handleTask(task, callCtx)).isTrue(); - for (String cleanupFile : cleanupFiles) { - assertThatPredicate((String file) -> TaskUtils.exists(file, fileIO)).rejects(cleanupFile); - } + for (String cleanupFile : cleanupFiles) { + assertThatPredicate((String file) -> TaskUtils.exists(file, fileIO)).rejects(cleanupFile); } } @@ -211,45 +207,43 @@ public void testMetadataFileCleanupIfFileNotExist() throws IOException { new PolarisCallContext( metaStoreManagerFactory.getOrCreateSessionSupplier(realmContext).get(), new PolarisDefaultDiagServiceImpl()); - try (CallContext callCtx = CallContext.of(realmContext, polarisCallContext)) { - CallContext.setCurrentContext(callCtx); - FileIO fileIO = new InMemoryFileIO(); - TableIdentifier tableIdentifier = - TableIdentifier.of(Namespace.of("db1", "schema1"), "table1"); - BatchFileCleanupTaskHandler handler = - new BatchFileCleanupTaskHandler( - buildTaskFileIOSupplier(fileIO), Executors.newSingleThreadExecutor()); - long snapshotId = 100L; - ManifestFile manifestFile = - TaskTestUtils.manifestFile( - fileIO, "manifest1.avro", snapshotId, "dataFile1.parquet", "dataFile2.parquet"); - TestSnapshot snapshot = - TaskTestUtils.newSnapshot(fileIO, "manifestList.avro", 1, snapshotId, 99L, manifestFile); - String metadataFile = "v1-49494949.metadata.json"; - StatisticsFile statisticsFile = - TaskTestUtils.writeStatsFile( - snapshot.snapshotId(), - snapshot.sequenceNumber(), - "/metadata/" + UUID.randomUUID() + ".stats", - fileIO); - TaskTestUtils.writeTableMetadata(fileIO, metadataFile, List.of(statisticsFile), snapshot); + CallContext callCtx = CallContext.of(realmContext, polarisCallContext); + CallContext.setCurrentContext(callCtx); + FileIO fileIO = new InMemoryFileIO(); + TableIdentifier tableIdentifier = TableIdentifier.of(Namespace.of("db1", "schema1"), "table1"); + BatchFileCleanupTaskHandler handler = + new BatchFileCleanupTaskHandler( + buildTaskFileIOSupplier(fileIO), Executors.newSingleThreadExecutor()); + long snapshotId = 100L; + ManifestFile manifestFile = + TaskTestUtils.manifestFile( + fileIO, "manifest1.avro", snapshotId, "dataFile1.parquet", "dataFile2.parquet"); + TestSnapshot snapshot = + TaskTestUtils.newSnapshot(fileIO, "manifestList.avro", 1, snapshotId, 99L, manifestFile); + String metadataFile = "v1-49494949.metadata.json"; + StatisticsFile statisticsFile = + TaskTestUtils.writeStatsFile( + snapshot.snapshotId(), + snapshot.sequenceNumber(), + "/metadata/" + UUID.randomUUID() + ".stats", + fileIO); + TaskTestUtils.writeTableMetadata(fileIO, metadataFile, List.of(statisticsFile), snapshot); - fileIO.deleteFile(statisticsFile.path()); - assertThat(TaskUtils.exists(statisticsFile.path(), fileIO)).isFalse(); + fileIO.deleteFile(statisticsFile.path()); + assertThat(TaskUtils.exists(statisticsFile.path(), fileIO)).isFalse(); - TaskEntity task = - new TaskEntity.Builder() - .withTaskType(AsyncTaskType.BATCH_FILE_CLEANUP) - .withData( - new BatchFileCleanupTaskHandler.BatchFileCleanupTask( - tableIdentifier, List.of(statisticsFile.path()))) - .setName(UUID.randomUUID().toString()) - .build(); + TaskEntity task = + new TaskEntity.Builder() + .withTaskType(AsyncTaskType.BATCH_FILE_CLEANUP) + .withData( + new BatchFileCleanupTaskHandler.BatchFileCleanupTask( + tableIdentifier, List.of(statisticsFile.path()))) + .setName(UUID.randomUUID().toString()) + .build(); - addTaskLocation(task); - assertThatPredicate(handler::canHandleTask).accepts(task); - assertThat(handler.handleTask(task, callCtx)).isTrue(); - } + addTaskLocation(task); + assertThatPredicate(handler::canHandleTask).accepts(task); + assertThat(handler.handleTask(task, callCtx)).isTrue(); } @Test @@ -258,77 +252,73 @@ public void testCleanupWithRetries() throws IOException { new PolarisCallContext( metaStoreManagerFactory.getOrCreateSessionSupplier(realmContext).get(), new PolarisDefaultDiagServiceImpl()); - try (CallContext callCtx = CallContext.of(realmContext, polarisCallContext)) { - CallContext.setCurrentContext(callCtx); - Map retryCounter = new HashMap<>(); - FileIO fileIO = - new InMemoryFileIO() { - @Override - public void close() { - // no-op - } + CallContext callCtx = CallContext.of(realmContext, polarisCallContext); + CallContext.setCurrentContext(callCtx); + Map retryCounter = new HashMap<>(); + FileIO fileIO = + new InMemoryFileIO() { + @Override + public void close() { + // no-op + } - @Override - public void deleteFile(String location) { - int attempts = - retryCounter - .computeIfAbsent(location, k -> new AtomicInteger(0)) - .incrementAndGet(); - if (attempts < 3) { - throw new RuntimeException("Simulating failure to test retries"); - } else { - super.deleteFile(location); - } + @Override + public void deleteFile(String location) { + int attempts = + retryCounter.computeIfAbsent(location, k -> new AtomicInteger(0)).incrementAndGet(); + if (attempts < 3) { + throw new RuntimeException("Simulating failure to test retries"); + } else { + super.deleteFile(location); } - }; - TableIdentifier tableIdentifier = - TableIdentifier.of(Namespace.of("db1", "schema1"), "table1"); - BatchFileCleanupTaskHandler handler = - new BatchFileCleanupTaskHandler( - buildTaskFileIOSupplier(fileIO), Executors.newSingleThreadExecutor()); - long snapshotId = 100L; - ManifestFile manifestFile = - TaskTestUtils.manifestFile( - fileIO, "manifest1.avro", snapshotId, "dataFile1.parquet", "dataFile2.parquet"); - TestSnapshot snapshot = - TaskTestUtils.newSnapshot(fileIO, "manifestList.avro", 1, snapshotId, 99L, manifestFile); - String metadataFile = "v1-49494949.metadata.json"; - StatisticsFile statisticsFile = - TaskTestUtils.writeStatsFile( - snapshot.snapshotId(), - snapshot.sequenceNumber(), - "/metadata/" + UUID.randomUUID() + ".stats", - fileIO); - TaskTestUtils.writeTableMetadata(fileIO, metadataFile, List.of(statisticsFile), snapshot); - assertThat(TaskUtils.exists(statisticsFile.path(), fileIO)).isTrue(); + } + }; + TableIdentifier tableIdentifier = TableIdentifier.of(Namespace.of("db1", "schema1"), "table1"); + BatchFileCleanupTaskHandler handler = + new BatchFileCleanupTaskHandler( + buildTaskFileIOSupplier(fileIO), Executors.newSingleThreadExecutor()); + long snapshotId = 100L; + ManifestFile manifestFile = + TaskTestUtils.manifestFile( + fileIO, "manifest1.avro", snapshotId, "dataFile1.parquet", "dataFile2.parquet"); + TestSnapshot snapshot = + TaskTestUtils.newSnapshot(fileIO, "manifestList.avro", 1, snapshotId, 99L, manifestFile); + String metadataFile = "v1-49494949.metadata.json"; + StatisticsFile statisticsFile = + TaskTestUtils.writeStatsFile( + snapshot.snapshotId(), + snapshot.sequenceNumber(), + "/metadata/" + UUID.randomUUID() + ".stats", + fileIO); + TaskTestUtils.writeTableMetadata(fileIO, metadataFile, List.of(statisticsFile), snapshot); + assertThat(TaskUtils.exists(statisticsFile.path(), fileIO)).isTrue(); - TaskEntity task = - new TaskEntity.Builder() - .withTaskType(AsyncTaskType.BATCH_FILE_CLEANUP) - .withData( - new BatchFileCleanupTaskHandler.BatchFileCleanupTask( - tableIdentifier, List.of(statisticsFile.path()))) - .setName(UUID.randomUUID().toString()) - .build(); + TaskEntity task = + new TaskEntity.Builder() + .withTaskType(AsyncTaskType.BATCH_FILE_CLEANUP) + .withData( + new BatchFileCleanupTaskHandler.BatchFileCleanupTask( + tableIdentifier, List.of(statisticsFile.path()))) + .setName(UUID.randomUUID().toString()) + .build(); - CompletableFuture future = - CompletableFuture.runAsync( - () -> { - CallContext.setCurrentContext(callCtx); - addTaskLocation(task); - assertThatPredicate(handler::canHandleTask).accepts(task); - handler.handleTask(task, callCtx); // this will schedule the batch deletion - }); + CompletableFuture future = + CompletableFuture.runAsync( + () -> { + CallContext.setCurrentContext(callCtx); + addTaskLocation(task); + assertThatPredicate(handler::canHandleTask).accepts(task); + handler.handleTask(task, callCtx); // this will schedule the batch deletion + }); - // Wait for all async tasks to finish - future.join(); + // Wait for all async tasks to finish + future.join(); - // Check if the file was successfully deleted after retries - assertThat(TaskUtils.exists(statisticsFile.path(), fileIO)).isFalse(); + // Check if the file was successfully deleted after retries + assertThat(TaskUtils.exists(statisticsFile.path(), fileIO)).isFalse(); - // Ensure that retries happened as expected - assertThat(retryCounter.containsKey(statisticsFile.path())).isTrue(); - assertThat(retryCounter.get(statisticsFile.path()).get()).isEqualTo(3); - } + // Ensure that retries happened as expected + assertThat(retryCounter.containsKey(statisticsFile.path())).isTrue(); + assertThat(retryCounter.get(statisticsFile.path()).get()).isEqualTo(3); } } diff --git a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/task/ManifestFileCleanupTaskHandlerTest.java b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/task/ManifestFileCleanupTaskHandlerTest.java index 39cd619bd4..58fa14d7ec 100644 --- a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/task/ManifestFileCleanupTaskHandlerTest.java +++ b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/task/ManifestFileCleanupTaskHandlerTest.java @@ -93,32 +93,28 @@ public void testCleanupFileNotExists() throws IOException { new PolarisCallContext( metaStoreManagerFactory.getOrCreateSessionSupplier(realmContext).get(), new PolarisDefaultDiagServiceImpl()); - try (CallContext callCtx = CallContext.of(realmContext, polarisCallContext)) { - CallContext.setCurrentContext(callCtx); - FileIO fileIO = new InMemoryFileIO(); - TableIdentifier tableIdentifier = - TableIdentifier.of(Namespace.of("db1", "schema1"), "table1"); + CallContext callCtx = CallContext.of(realmContext, polarisCallContext); + FileIO fileIO = new InMemoryFileIO(); + TableIdentifier tableIdentifier = TableIdentifier.of(Namespace.of("db1", "schema1"), "table1"); - ManifestFileCleanupTaskHandler handler = - new ManifestFileCleanupTaskHandler( - buildTaskFileIOSupplier(fileIO), Executors.newSingleThreadExecutor()); - ManifestFile manifestFile = - TaskTestUtils.manifestFile( - fileIO, "manifest1.avro", 1L, "dataFile1.parquet", "dataFile2.parquet"); - fileIO.deleteFile(manifestFile.path()); - TaskEntity task = - new TaskEntity.Builder() - .withTaskType(AsyncTaskType.MANIFEST_FILE_CLEANUP) - .withData( - new ManifestFileCleanupTaskHandler.ManifestCleanupTask( - tableIdentifier, - Base64.encodeBase64String(ManifestFiles.encode(manifestFile)))) - .setName(UUID.randomUUID().toString()) - .build(); - addTaskLocation(task); - assertThatPredicate(handler::canHandleTask).accepts(task); - assertThat(handler.handleTask(task, callCtx)).isTrue(); - } + ManifestFileCleanupTaskHandler handler = + new ManifestFileCleanupTaskHandler( + buildTaskFileIOSupplier(fileIO), Executors.newSingleThreadExecutor()); + ManifestFile manifestFile = + TaskTestUtils.manifestFile( + fileIO, "manifest1.avro", 1L, "dataFile1.parquet", "dataFile2.parquet"); + fileIO.deleteFile(manifestFile.path()); + TaskEntity task = + new TaskEntity.Builder() + .withTaskType(AsyncTaskType.MANIFEST_FILE_CLEANUP) + .withData( + new ManifestFileCleanupTaskHandler.ManifestCleanupTask( + tableIdentifier, Base64.encodeBase64String(ManifestFiles.encode(manifestFile)))) + .setName(UUID.randomUUID().toString()) + .build(); + addTaskLocation(task); + assertThatPredicate(handler::canHandleTask).accepts(task); + assertThat(handler.handleTask(task, callCtx)).isTrue(); } @Test @@ -127,30 +123,27 @@ public void testCleanupFileManifestExistsDataFilesDontExist() throws IOException new PolarisCallContext( metaStoreManagerFactory.getOrCreateSessionSupplier(realmContext).get(), new PolarisDefaultDiagServiceImpl()); - try (CallContext callCtx = CallContext.of(realmContext, polarisCallContext)) { - CallContext.setCurrentContext(callCtx); - FileIO fileIO = new InMemoryFileIO(); - TableIdentifier tableIdentifier = - TableIdentifier.of(Namespace.of("db1", "schema1"), "table1"); - ManifestFileCleanupTaskHandler handler = - new ManifestFileCleanupTaskHandler( - buildTaskFileIOSupplier(fileIO), Executors.newSingleThreadExecutor()); - ManifestFile manifestFile = - TaskTestUtils.manifestFile( - fileIO, "manifest1.avro", 100L, "dataFile1.parquet", "dataFile2.parquet"); - TaskEntity task = - new TaskEntity.Builder() - .withTaskType(AsyncTaskType.MANIFEST_FILE_CLEANUP) - .withData( - new ManifestFileCleanupTaskHandler.ManifestCleanupTask( - tableIdentifier, - Base64.encodeBase64String(ManifestFiles.encode(manifestFile)))) - .setName(UUID.randomUUID().toString()) - .build(); - addTaskLocation(task); - assertThatPredicate(handler::canHandleTask).accepts(task); - assertThat(handler.handleTask(task, callCtx)).isTrue(); - } + CallContext callCtx = CallContext.of(realmContext, polarisCallContext); + CallContext.setCurrentContext(callCtx); + FileIO fileIO = new InMemoryFileIO(); + TableIdentifier tableIdentifier = TableIdentifier.of(Namespace.of("db1", "schema1"), "table1"); + ManifestFileCleanupTaskHandler handler = + new ManifestFileCleanupTaskHandler( + buildTaskFileIOSupplier(fileIO), Executors.newSingleThreadExecutor()); + ManifestFile manifestFile = + TaskTestUtils.manifestFile( + fileIO, "manifest1.avro", 100L, "dataFile1.parquet", "dataFile2.parquet"); + TaskEntity task = + new TaskEntity.Builder() + .withTaskType(AsyncTaskType.MANIFEST_FILE_CLEANUP) + .withData( + new ManifestFileCleanupTaskHandler.ManifestCleanupTask( + tableIdentifier, Base64.encodeBase64String(ManifestFiles.encode(manifestFile)))) + .setName(UUID.randomUUID().toString()) + .build(); + addTaskLocation(task); + assertThatPredicate(handler::canHandleTask).accepts(task); + assertThat(handler.handleTask(task, callCtx)).isTrue(); } @Test @@ -159,47 +152,44 @@ public void testCleanupFiles() throws IOException { new PolarisCallContext( metaStoreManagerFactory.getOrCreateSessionSupplier(realmContext).get(), new PolarisDefaultDiagServiceImpl()); - try (CallContext callCtx = CallContext.of(realmContext, polarisCallContext)) { - CallContext.setCurrentContext(callCtx); - FileIO fileIO = - new InMemoryFileIO() { - @Override - public void close() { - // no-op - } - }; - TableIdentifier tableIdentifier = - TableIdentifier.of(Namespace.of("db1", "schema1"), "table1"); - ManifestFileCleanupTaskHandler handler = - new ManifestFileCleanupTaskHandler( - buildTaskFileIOSupplier(fileIO), Executors.newSingleThreadExecutor()); - String dataFile1Path = "dataFile1.parquet"; - OutputFile dataFile1 = fileIO.newOutputFile(dataFile1Path); - PositionOutputStream out1 = dataFile1.createOrOverwrite(); - out1.write("the data".getBytes(UTF_8)); - out1.close(); - String dataFile2Path = "dataFile2.parquet"; - OutputFile dataFile2 = fileIO.newOutputFile(dataFile2Path); - PositionOutputStream out2 = dataFile2.createOrOverwrite(); - out2.write("the data".getBytes(UTF_8)); - out2.close(); - ManifestFile manifestFile = - TaskTestUtils.manifestFile(fileIO, "manifest1.avro", 100L, dataFile1Path, dataFile2Path); - TaskEntity task = - new TaskEntity.Builder() - .withTaskType(AsyncTaskType.MANIFEST_FILE_CLEANUP) - .withData( - new ManifestFileCleanupTaskHandler.ManifestCleanupTask( - tableIdentifier, - Base64.encodeBase64String(ManifestFiles.encode(manifestFile)))) - .setName(UUID.randomUUID().toString()) - .build(); - addTaskLocation(task); - assertThatPredicate(handler::canHandleTask).accepts(task); - assertThat(handler.handleTask(task, callCtx)).isTrue(); - assertThatPredicate((String f) -> TaskUtils.exists(f, fileIO)).rejects(dataFile1Path); - assertThatPredicate((String f) -> TaskUtils.exists(f, fileIO)).rejects(dataFile2Path); - } + CallContext callCtx = CallContext.of(realmContext, polarisCallContext); + CallContext.setCurrentContext(callCtx); + FileIO fileIO = + new InMemoryFileIO() { + @Override + public void close() { + // no-op + } + }; + TableIdentifier tableIdentifier = TableIdentifier.of(Namespace.of("db1", "schema1"), "table1"); + ManifestFileCleanupTaskHandler handler = + new ManifestFileCleanupTaskHandler( + buildTaskFileIOSupplier(fileIO), Executors.newSingleThreadExecutor()); + String dataFile1Path = "dataFile1.parquet"; + OutputFile dataFile1 = fileIO.newOutputFile(dataFile1Path); + PositionOutputStream out1 = dataFile1.createOrOverwrite(); + out1.write("the data".getBytes(UTF_8)); + out1.close(); + String dataFile2Path = "dataFile2.parquet"; + OutputFile dataFile2 = fileIO.newOutputFile(dataFile2Path); + PositionOutputStream out2 = dataFile2.createOrOverwrite(); + out2.write("the data".getBytes(UTF_8)); + out2.close(); + ManifestFile manifestFile = + TaskTestUtils.manifestFile(fileIO, "manifest1.avro", 100L, dataFile1Path, dataFile2Path); + TaskEntity task = + new TaskEntity.Builder() + .withTaskType(AsyncTaskType.MANIFEST_FILE_CLEANUP) + .withData( + new ManifestFileCleanupTaskHandler.ManifestCleanupTask( + tableIdentifier, Base64.encodeBase64String(ManifestFiles.encode(manifestFile)))) + .setName(UUID.randomUUID().toString()) + .build(); + addTaskLocation(task); + assertThatPredicate(handler::canHandleTask).accepts(task); + assertThat(handler.handleTask(task, callCtx)).isTrue(); + assertThatPredicate((String f) -> TaskUtils.exists(f, fileIO)).rejects(dataFile1Path); + assertThatPredicate((String f) -> TaskUtils.exists(f, fileIO)).rejects(dataFile2Path); } @Test @@ -208,62 +198,57 @@ public void testCleanupFilesWithRetries() throws IOException { new PolarisCallContext( metaStoreManagerFactory.getOrCreateSessionSupplier(realmContext).get(), new PolarisDefaultDiagServiceImpl()); - try (CallContext callCtx = CallContext.of(realmContext, polarisCallContext)) { - CallContext.setCurrentContext(callCtx); - Map retryCounter = new HashMap<>(); - FileIO fileIO = - new InMemoryFileIO() { - @Override - public void close() { - // no-op - } + CallContext callCtx = CallContext.of(realmContext, polarisCallContext); + CallContext.setCurrentContext(callCtx); + Map retryCounter = new HashMap<>(); + FileIO fileIO = + new InMemoryFileIO() { + @Override + public void close() { + // no-op + } - @Override - public void deleteFile(String location) { - int attempts = - retryCounter - .computeIfAbsent(location, k -> new AtomicInteger(0)) - .incrementAndGet(); - if (attempts < 3) { - throw new RuntimeException("I'm failing to test retries"); - } else { - // succeed on the third attempt - super.deleteFile(location); - } + @Override + public void deleteFile(String location) { + int attempts = + retryCounter.computeIfAbsent(location, k -> new AtomicInteger(0)).incrementAndGet(); + if (attempts < 3) { + throw new RuntimeException("I'm failing to test retries"); + } else { + // succeed on the third attempt + super.deleteFile(location); } - }; + } + }; - TableIdentifier tableIdentifier = - TableIdentifier.of(Namespace.of("db1", "schema1"), "table1"); - ManifestFileCleanupTaskHandler handler = - new ManifestFileCleanupTaskHandler( - buildTaskFileIOSupplier(fileIO), Executors.newSingleThreadExecutor()); - String dataFile1Path = "dataFile1.parquet"; - OutputFile dataFile1 = fileIO.newOutputFile(dataFile1Path); - PositionOutputStream out1 = dataFile1.createOrOverwrite(); - out1.write("the data".getBytes(UTF_8)); - out1.close(); - String dataFile2Path = "dataFile2.parquet"; - OutputFile dataFile2 = fileIO.newOutputFile(dataFile2Path); - PositionOutputStream out2 = dataFile2.createOrOverwrite(); - out2.write("the data".getBytes(UTF_8)); - out2.close(); - ManifestFile manifestFile = - TaskTestUtils.manifestFile(fileIO, "manifest1.avro", 100L, dataFile1Path, dataFile2Path); - TaskEntity task = - new TaskEntity.Builder() - .withTaskType(AsyncTaskType.MANIFEST_FILE_CLEANUP) - .withData( - new ManifestFileCleanupTaskHandler.ManifestCleanupTask( - tableIdentifier, - Base64.encodeBase64String(ManifestFiles.encode(manifestFile)))) - .setName(UUID.randomUUID().toString()) - .build(); - addTaskLocation(task); - assertThatPredicate(handler::canHandleTask).accepts(task); - assertThat(handler.handleTask(task, callCtx)).isTrue(); - assertThatPredicate((String f) -> TaskUtils.exists(f, fileIO)).rejects(dataFile1Path); - assertThatPredicate((String f) -> TaskUtils.exists(f, fileIO)).rejects(dataFile2Path); - } + TableIdentifier tableIdentifier = TableIdentifier.of(Namespace.of("db1", "schema1"), "table1"); + ManifestFileCleanupTaskHandler handler = + new ManifestFileCleanupTaskHandler( + buildTaskFileIOSupplier(fileIO), Executors.newSingleThreadExecutor()); + String dataFile1Path = "dataFile1.parquet"; + OutputFile dataFile1 = fileIO.newOutputFile(dataFile1Path); + PositionOutputStream out1 = dataFile1.createOrOverwrite(); + out1.write("the data".getBytes(UTF_8)); + out1.close(); + String dataFile2Path = "dataFile2.parquet"; + OutputFile dataFile2 = fileIO.newOutputFile(dataFile2Path); + PositionOutputStream out2 = dataFile2.createOrOverwrite(); + out2.write("the data".getBytes(UTF_8)); + out2.close(); + ManifestFile manifestFile = + TaskTestUtils.manifestFile(fileIO, "manifest1.avro", 100L, dataFile1Path, dataFile2Path); + TaskEntity task = + new TaskEntity.Builder() + .withTaskType(AsyncTaskType.MANIFEST_FILE_CLEANUP) + .withData( + new ManifestFileCleanupTaskHandler.ManifestCleanupTask( + tableIdentifier, Base64.encodeBase64String(ManifestFiles.encode(manifestFile)))) + .setName(UUID.randomUUID().toString()) + .build(); + addTaskLocation(task); + assertThatPredicate(handler::canHandleTask).accepts(task); + assertThat(handler.handleTask(task, callCtx)).isTrue(); + assertThatPredicate((String f) -> TaskUtils.exists(f, fileIO)).rejects(dataFile1Path); + assertThatPredicate((String f) -> TaskUtils.exists(f, fileIO)).rejects(dataFile2Path); } } diff --git a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/test/PolarisIntegrationTestFixture.java b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/test/PolarisIntegrationTestFixture.java index 259cfc648f..2dc0400bcb 100644 --- a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/test/PolarisIntegrationTestFixture.java +++ b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/test/PolarisIntegrationTestFixture.java @@ -115,13 +115,12 @@ private PolarisPrincipalSecrets fetchAdminSecrets() { PolarisCallContext polarisContext = new PolarisCallContext( metaStoreSession, helper.diagServices, helper.configurationStore, helper.clock); - try (CallContext ctx = CallContext.of(realmContext, polarisContext)) { - CallContext.setCurrentContext(ctx); + try { PolarisMetaStoreManager metaStoreManager = - helper.metaStoreManagerFactory.getOrCreateMetaStoreManager(ctx.getRealmContext()); + helper.metaStoreManagerFactory.getOrCreateMetaStoreManager(realmContext); EntityResult principal = metaStoreManager.readEntityByName( - ctx.getPolarisCallContext(), + polarisContext, null, PolarisEntityType.PRINCIPAL, PolarisEntitySubType.NULL_SUBTYPE, @@ -129,7 +128,7 @@ private PolarisPrincipalSecrets fetchAdminSecrets() { Map propertiesMap = readInternalProperties(principal); return metaStoreManager - .loadPrincipalSecrets(ctx.getPolarisCallContext(), propertiesMap.get("client_id")) + .loadPrincipalSecrets(polarisContext, propertiesMap.get("client_id")) .getPrincipalSecrets(); } finally { CallContext.unsetCurrentContext(); diff --git a/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java b/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java index c972b2b643..5ab88e862a 100644 --- a/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java +++ b/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java @@ -255,7 +255,6 @@ public void initialize(String name, Map properties) { CatalogProperties.FILE_IO_IMPL); } - callContext.closeables().addCloseable(this); this.closeableGroup = new CloseableGroup(); closeableGroup.addCloseable(metricsReporter()); closeableGroup.setSuppressCloseFailure(true); diff --git a/service/common/src/main/java/org/apache/polaris/service/context/catalog/PolarisCallContextCatalogFactory.java b/service/common/src/main/java/org/apache/polaris/service/context/catalog/PolarisCallContextCatalogFactory.java index 96445f9493..5ee0cea23f 100644 --- a/service/common/src/main/java/org/apache/polaris/service/context/catalog/PolarisCallContextCatalogFactory.java +++ b/service/common/src/main/java/org/apache/polaris/service/context/catalog/PolarisCallContextCatalogFactory.java @@ -99,8 +99,6 @@ public Catalog createCallContextCatalog( fileIOFactory, polarisEventListener); - context.contextVariables().put(CallContext.REQUEST_PATH_CATALOG_INSTANCE_KEY, catalogInstance); - CatalogEntity catalog = CatalogEntity.of(baseCatalogEntity); Map catalogProperties = new HashMap<>(catalog.getPropertiesAsMap()); String defaultBaseLocation = catalog.getDefaultBaseLocation(); diff --git a/service/common/src/test/java/org/apache/polaris/service/catalog/io/FileIOFactoryTest.java b/service/common/src/test/java/org/apache/polaris/service/catalog/io/FileIOFactoryTest.java index 64a85df323..28a181ef8b 100644 --- a/service/common/src/test/java/org/apache/polaris/service/catalog/io/FileIOFactoryTest.java +++ b/service/common/src/test/java/org/apache/polaris/service/catalog/io/FileIOFactoryTest.java @@ -26,7 +26,6 @@ import jakarta.annotation.Nonnull; import java.lang.reflect.Method; import java.time.Clock; -import java.util.HashMap; import java.util.List; import java.util.Map; import org.apache.iceberg.Schema; @@ -154,11 +153,6 @@ public PolarisCallContext getPolarisCallContext() { testServices.configurationStore(), Mockito.mock(Clock.class)); } - - @Override - public Map contextVariables() { - return new HashMap<>(); - } }; } diff --git a/service/common/src/testFixtures/java/org/apache/polaris/service/TestServices.java b/service/common/src/testFixtures/java/org/apache/polaris/service/TestServices.java index 16f20c1c6d..19de127774 100644 --- a/service/common/src/testFixtures/java/org/apache/polaris/service/TestServices.java +++ b/service/common/src/testFixtures/java/org/apache/polaris/service/TestServices.java @@ -27,7 +27,6 @@ import java.time.Clock; import java.time.Instant; import java.util.Date; -import java.util.HashMap; import java.util.Map; import java.util.Optional; import java.util.Set; @@ -178,13 +177,7 @@ public PolarisCallContext getPolarisCallContext() { configurationStore, Mockito.mock(Clock.class)); } - - @Override - public Map contextVariables() { - return new HashMap<>(); - } }; - CallContext.setCurrentContext(callContext); PolarisEntityManager entityManager = realmEntityManagerFactory.getOrCreateEntityManager(realmContext); PolarisMetaStoreManager metaStoreManager =