diff --git a/polaris-core/src/main/java/org/apache/polaris/core/context/CallContext.java b/polaris-core/src/main/java/org/apache/polaris/core/context/CallContext.java index 6cd56bc304..54859647d0 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/context/CallContext.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/context/CallContext.java @@ -18,16 +18,8 @@ */ package org.apache.polaris.core.context; -import jakarta.annotation.Nonnull; -import java.io.IOException; -import java.util.HashMap; -import java.util.Map; -import java.util.stream.Collectors; -import org.apache.iceberg.io.CloseableGroup; import org.apache.polaris.core.PolarisCallContext; import org.apache.polaris.core.PolarisDiagnostics; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * Stores elements associated with an individual REST request such as RealmContext, caller @@ -37,15 +29,9 @@ * principal/role entities may be defined within a Realm-specific persistence layer, and the * underlying nature of the persistence layer may differ between different realms. */ -public interface CallContext extends AutoCloseable { +public interface CallContext { InheritableThreadLocal CURRENT_CONTEXT = new InheritableThreadLocal<>(); - // For requests that make use of a Catalog instance, this holds the instance that was - // created, scoped to the current call context. - String REQUEST_PATH_CATALOG_INSTANCE_KEY = "REQUEST_PATH_CATALOG_INSTANCE"; - - String CLOSEABLES = "closeables"; - static CallContext setCurrentContext(CallContext context) { CURRENT_CONTEXT.set(context); return context; @@ -65,7 +51,6 @@ static void unsetCurrentContext() { static CallContext of( final RealmContext realmContext, final PolarisCallContext polarisCallContext) { - Map map = new HashMap<>(); return new CallContext() { @Override public RealmContext getRealmContext() { @@ -76,28 +61,14 @@ public RealmContext getRealmContext() { public PolarisCallContext getPolarisCallContext() { return polarisCallContext; } - - @Override - public Map contextVariables() { - return map; - } }; } - /** - * Copy the {@link CallContext}. {@link #contextVariables()} will be copied except for {@link - * #closeables()}. The original {@link #contextVariables()} map is untouched and {@link - * #closeables()} in the original {@link CallContext} should be closed along with the {@link - * CallContext}. - */ + /** Copy the {@link CallContext}. */ static CallContext copyOf(CallContext base) { String realmId = base.getRealmContext().getRealmIdentifier(); RealmContext realmContext = () -> realmId; PolarisCallContext polarisCallContext = PolarisCallContext.copyOf(base.getPolarisCallContext()); - Map contextVariables = - base.contextVariables().entrySet().stream() - .filter(e -> !e.getKey().equals(CLOSEABLES)) - .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); return new CallContext() { @Override public RealmContext getRealmContext() { @@ -108,11 +79,6 @@ public RealmContext getRealmContext() { public PolarisCallContext getPolarisCallContext() { return polarisCallContext; } - - @Override - public Map contextVariables() { - return contextVariables; - } }; } @@ -122,28 +88,4 @@ public Map contextVariables() { * @return the inner context used for delegating services */ PolarisCallContext getPolarisCallContext(); - - Map contextVariables(); - - default @Nonnull CloseableGroup closeables() { - return (CloseableGroup) - contextVariables().computeIfAbsent(CLOSEABLES, key -> new CloseableGroup()); - } - - @Override - default void close() { - if (CURRENT_CONTEXT.get() == this) { - unsetCurrentContext(); - CloseableGroup closeables = closeables(); - try { - closeables.close(); - } catch (IOException e) { - Logger logger = LoggerFactory.getLogger(CallContext.class); - logger - .atWarn() - .addKeyValue("closeableGroup", closeables) - .log("Unable to close closeable group", e); - } - } - } } diff --git a/polaris-core/src/test/java/org/apache/polaris/core/storage/InMemoryStorageIntegrationTest.java b/polaris-core/src/test/java/org/apache/polaris/core/storage/InMemoryStorageIntegrationTest.java index 12e83e2d2f..c060974f44 100644 --- a/polaris-core/src/test/java/org/apache/polaris/core/storage/InMemoryStorageIntegrationTest.java +++ b/polaris-core/src/test/java/org/apache/polaris/core/storage/InMemoryStorageIntegrationTest.java @@ -105,43 +105,41 @@ public void testValidateAccessToLocationsWithWildcard() { } }, Clock.systemUTC()); - try (CallContext ignored = - CallContext.setCurrentContext(CallContext.of(() -> "realm", polarisCallContext))) { - Map> result = - storage.validateAccessToLocations( - new FileStorageConfigurationInfo(List.of("file://", "*")), - Set.of(PolarisStorageActions.READ), - Set.of( - "s3://bucket/path/to/warehouse/namespace/table", - "file:///etc/passwd", - "a/relative/subdirectory")); - Assertions.assertThat(result) - .hasSize(3) - .hasEntrySatisfying( - "s3://bucket/path/to/warehouse/namespace/table", - val -> - Assertions.assertThat(val) - .hasSize(1) - .containsKey(PolarisStorageActions.READ) - .extractingByKey(PolarisStorageActions.READ) - .returns(true, PolarisStorageIntegration.ValidationResult::isSuccess)) - .hasEntrySatisfying( - "file:///etc/passwd", - val -> - Assertions.assertThat(val) - .hasSize(1) - .containsKey(PolarisStorageActions.READ) - .extractingByKey(PolarisStorageActions.READ) - .returns(true, PolarisStorageIntegration.ValidationResult::isSuccess)) - .hasEntrySatisfying( - "a/relative/subdirectory", - val -> - Assertions.assertThat(val) - .hasSize(1) - .containsKey(PolarisStorageActions.READ) - .extractingByKey(PolarisStorageActions.READ) - .returns(true, PolarisStorageIntegration.ValidationResult::isSuccess)); - } + CallContext.setCurrentContext(CallContext.of(() -> "realm", polarisCallContext)); + Map> result = + storage.validateAccessToLocations( + new FileStorageConfigurationInfo(List.of("file://", "*")), + Set.of(PolarisStorageActions.READ), + Set.of( + "s3://bucket/path/to/warehouse/namespace/table", + "file:///etc/passwd", + "a/relative/subdirectory")); + Assertions.assertThat(result) + .hasSize(3) + .hasEntrySatisfying( + "s3://bucket/path/to/warehouse/namespace/table", + val -> + Assertions.assertThat(val) + .hasSize(1) + .containsKey(PolarisStorageActions.READ) + .extractingByKey(PolarisStorageActions.READ) + .returns(true, PolarisStorageIntegration.ValidationResult::isSuccess)) + .hasEntrySatisfying( + "file:///etc/passwd", + val -> + Assertions.assertThat(val) + .hasSize(1) + .containsKey(PolarisStorageActions.READ) + .extractingByKey(PolarisStorageActions.READ) + .returns(true, PolarisStorageIntegration.ValidationResult::isSuccess)) + .hasEntrySatisfying( + "a/relative/subdirectory", + val -> + Assertions.assertThat(val) + .hasSize(1) + .containsKey(PolarisStorageActions.READ) + .extractingByKey(PolarisStorageActions.READ) + .returns(true, PolarisStorageIntegration.ValidationResult::isSuccess)); } @Test diff --git a/polaris-core/src/testFixtures/java/org/apache/polaris/core/persistence/BasePolarisMetaStoreManagerTest.java b/polaris-core/src/testFixtures/java/org/apache/polaris/core/persistence/BasePolarisMetaStoreManagerTest.java index 0f834bc760..5abda44606 100644 --- a/polaris-core/src/testFixtures/java/org/apache/polaris/core/persistence/BasePolarisMetaStoreManagerTest.java +++ b/polaris-core/src/testFixtures/java/org/apache/polaris/core/persistence/BasePolarisMetaStoreManagerTest.java @@ -104,137 +104,126 @@ protected void testBrowse() { @Test protected void testCreateEntities() { PolarisMetaStoreManager metaStoreManager = polarisTestMetaStoreManager.polarisMetaStoreManager; - try (CallContext callCtx = - CallContext.of(() -> "testRealm", polarisTestMetaStoreManager.polarisCallContext)) { - if (CallContext.getCurrentContext() == null) { - CallContext.setCurrentContext(callCtx); - } - TaskEntity task1 = createTask("task1", 100L); - TaskEntity task2 = createTask("task2", 101L); - List createdEntities = - metaStoreManager - .createEntitiesIfNotExist( - polarisTestMetaStoreManager.polarisCallContext, null, List.of(task1, task2)) - .getEntities(); - - Assertions.assertThat(createdEntities) - .isNotNull() - .hasSize(2) - .extracting(PolarisEntity::toCore) - .containsExactly(PolarisEntity.toCore(task1), PolarisEntity.toCore(task2)); - - List listedEntities = - metaStoreManager - .listEntities( - polarisTestMetaStoreManager.polarisCallContext, - null, - PolarisEntityType.TASK, - PolarisEntitySubType.NULL_SUBTYPE, - PageToken.readEverything()) - .getEntities(); - Assertions.assertThat(listedEntities) - .isNotNull() - .hasSize(2) - .containsExactly( - new EntityNameLookupRecord( - task1.getCatalogId(), - task1.getId(), - task1.getParentId(), - task1.getName(), - task1.getTypeCode(), - task1.getSubTypeCode()), - new EntityNameLookupRecord( - task2.getCatalogId(), - task2.getId(), - task2.getParentId(), - task2.getName(), - task2.getTypeCode(), - task2.getSubTypeCode())); + CallContext callCtx = + CallContext.of(() -> "testRealm", polarisTestMetaStoreManager.polarisCallContext); + if (CallContext.getCurrentContext() == null) { + CallContext.setCurrentContext(callCtx); } + TaskEntity task1 = createTask("task1", 100L); + TaskEntity task2 = createTask("task2", 101L); + List createdEntities = + metaStoreManager + .createEntitiesIfNotExist( + polarisTestMetaStoreManager.polarisCallContext, null, List.of(task1, task2)) + .getEntities(); + + Assertions.assertThat(createdEntities) + .isNotNull() + .hasSize(2) + .extracting(PolarisEntity::toCore) + .containsExactly(PolarisEntity.toCore(task1), PolarisEntity.toCore(task2)); + + List listedEntities = + metaStoreManager + .listEntities( + polarisTestMetaStoreManager.polarisCallContext, + null, + PolarisEntityType.TASK, + PolarisEntitySubType.NULL_SUBTYPE, + PageToken.readEverything()) + .getEntities(); + Assertions.assertThat(listedEntities) + .isNotNull() + .hasSize(2) + .containsExactly( + new EntityNameLookupRecord( + task1.getCatalogId(), + task1.getId(), + task1.getParentId(), + task1.getName(), + task1.getTypeCode(), + task1.getSubTypeCode()), + new EntityNameLookupRecord( + task2.getCatalogId(), + task2.getId(), + task2.getParentId(), + task2.getName(), + task2.getTypeCode(), + task2.getSubTypeCode())); } @Test protected void testCreateEntitiesAlreadyExisting() { PolarisMetaStoreManager metaStoreManager = polarisTestMetaStoreManager.polarisMetaStoreManager; - try (CallContext callCtx = - CallContext.of(() -> "testRealm", polarisTestMetaStoreManager.polarisCallContext)) { - if (CallContext.getCurrentContext() == null) { - CallContext.setCurrentContext(callCtx); - } - TaskEntity task1 = createTask("task1", 100L); - TaskEntity task2 = createTask("task2", 101L); - List createdEntities = - metaStoreManager - .createEntitiesIfNotExist( - polarisTestMetaStoreManager.polarisCallContext, null, List.of(task1, task2)) - .getEntities(); - - Assertions.assertThat(createdEntities) - .isNotNull() - .hasSize(2) - .extracting(PolarisEntity::toCore) - .containsExactly(PolarisEntity.toCore(task1), PolarisEntity.toCore(task2)); - - TaskEntity task3 = createTask("task3", 103L); - - // entities task1 and task2 already exist with the same identifier, so the full list is - // returned - createdEntities = - metaStoreManager - .createEntitiesIfNotExist( - polarisTestMetaStoreManager.polarisCallContext, - null, - List.of(task1, task2, task3)) - .getEntities(); - Assertions.assertThat(createdEntities) - .isNotNull() - .hasSize(3) - .extracting(PolarisEntity::toCore) - .containsExactly( - PolarisEntity.toCore(task1), - PolarisEntity.toCore(task2), - PolarisEntity.toCore(task3)); + CallContext callCtx = + CallContext.of(() -> "testRealm", polarisTestMetaStoreManager.polarisCallContext); + if (CallContext.getCurrentContext() == null) { + CallContext.setCurrentContext(callCtx); } + TaskEntity task1 = createTask("task1", 100L); + TaskEntity task2 = createTask("task2", 101L); + List createdEntities = + metaStoreManager + .createEntitiesIfNotExist( + polarisTestMetaStoreManager.polarisCallContext, null, List.of(task1, task2)) + .getEntities(); + + Assertions.assertThat(createdEntities) + .isNotNull() + .hasSize(2) + .extracting(PolarisEntity::toCore) + .containsExactly(PolarisEntity.toCore(task1), PolarisEntity.toCore(task2)); + + TaskEntity task3 = createTask("task3", 103L); + + // entities task1 and task2 already exist with the same identifier, so the full list is + // returned + createdEntities = + metaStoreManager + .createEntitiesIfNotExist( + polarisTestMetaStoreManager.polarisCallContext, null, List.of(task1, task2, task3)) + .getEntities(); + Assertions.assertThat(createdEntities) + .isNotNull() + .hasSize(3) + .extracting(PolarisEntity::toCore) + .containsExactly( + PolarisEntity.toCore(task1), PolarisEntity.toCore(task2), PolarisEntity.toCore(task3)); } @Test protected void testCreateEntitiesWithConflict() { PolarisMetaStoreManager metaStoreManager = polarisTestMetaStoreManager.polarisMetaStoreManager; - try (CallContext callCtx = - CallContext.of(() -> "testRealm", polarisTestMetaStoreManager.polarisCallContext)) { - if (CallContext.getCurrentContext() == null) { - CallContext.setCurrentContext(callCtx); - } - TaskEntity task1 = createTask("task1", 100L); - TaskEntity task2 = createTask("task2", 101L); - TaskEntity task3 = createTask("task3", 103L); - List createdEntities = - metaStoreManager - .createEntitiesIfNotExist( - polarisTestMetaStoreManager.polarisCallContext, - null, - List.of(task1, task2, task3)) - .getEntities(); - - Assertions.assertThat(createdEntities) - .isNotNull() - .hasSize(3) - .extracting(PolarisEntity::toCore) - .containsExactly( - PolarisEntity.toCore(task1), - PolarisEntity.toCore(task2), - PolarisEntity.toCore(task3)); - - TaskEntity secondTask3 = createTask("task3", 104L); - - TaskEntity task4 = createTask("task4", 105L); - createdEntities = - metaStoreManager - .createEntitiesIfNotExist( - polarisTestMetaStoreManager.polarisCallContext, null, List.of(secondTask3, task4)) - .getEntities(); - Assertions.assertThat(createdEntities).isNull(); + CallContext callCtx = + CallContext.of(() -> "testRealm", polarisTestMetaStoreManager.polarisCallContext); + if (CallContext.getCurrentContext() == null) { + CallContext.setCurrentContext(callCtx); } + TaskEntity task1 = createTask("task1", 100L); + TaskEntity task2 = createTask("task2", 101L); + TaskEntity task3 = createTask("task3", 103L); + List createdEntities = + metaStoreManager + .createEntitiesIfNotExist( + polarisTestMetaStoreManager.polarisCallContext, null, List.of(task1, task2, task3)) + .getEntities(); + + Assertions.assertThat(createdEntities) + .isNotNull() + .hasSize(3) + .extracting(PolarisEntity::toCore) + .containsExactly( + PolarisEntity.toCore(task1), PolarisEntity.toCore(task2), PolarisEntity.toCore(task3)); + + TaskEntity secondTask3 = createTask("task3", 104L); + + TaskEntity task4 = createTask("task4", 105L); + createdEntities = + metaStoreManager + .createEntitiesIfNotExist( + polarisTestMetaStoreManager.polarisCallContext, null, List.of(secondTask3, task4)) + .getEntities(); + Assertions.assertThat(createdEntities).isNull(); } private static TaskEntity createTask(String taskName, long id) { diff --git a/quarkus/service/src/main/java/org/apache/polaris/service/quarkus/config/QuarkusProducers.java b/quarkus/service/src/main/java/org/apache/polaris/service/quarkus/config/QuarkusProducers.java index 40b79980dc..c4b3a3ae1b 100644 --- a/quarkus/service/src/main/java/org/apache/polaris/service/quarkus/config/QuarkusProducers.java +++ b/quarkus/service/src/main/java/org/apache/polaris/service/quarkus/config/QuarkusProducers.java @@ -138,10 +138,6 @@ public CallContext callContext(RealmContext realmContext, PolarisCallContext pol return CallContext.of(realmContext, polarisCallContext); } - public void closeCallContext(@Disposes CallContext callContext) { - callContext.close(); - } - // Polaris service beans - selected from @Identifier-annotated beans @Produces diff --git a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/auth/JWTSymmetricKeyGeneratorTest.java b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/auth/JWTSymmetricKeyGeneratorTest.java index fed5d20db0..b7c3ceef45 100644 --- a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/auth/JWTSymmetricKeyGeneratorTest.java +++ b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/auth/JWTSymmetricKeyGeneratorTest.java @@ -24,10 +24,7 @@ import com.auth0.jwt.JWTVerifier; import com.auth0.jwt.algorithms.Algorithm; import com.auth0.jwt.interfaces.DecodedJWT; -import java.util.Map; import org.apache.polaris.core.PolarisCallContext; -import org.apache.polaris.core.context.CallContext; -import org.apache.polaris.core.context.RealmContext; import org.apache.polaris.core.entity.PolarisBaseEntity; import org.apache.polaris.core.entity.PolarisEntitySubType; import org.apache.polaris.core.entity.PolarisEntityType; @@ -49,23 +46,6 @@ public class JWTSymmetricKeyGeneratorTest { @Test public void testJWTSymmetricKeyGenerator() { PolarisCallContext polarisCallContext = new PolarisCallContext(null, null, null, null); - CallContext.setCurrentContext( - new CallContext() { - @Override - public RealmContext getRealmContext() { - return () -> "realm"; - } - - @Override - public PolarisCallContext getPolarisCallContext() { - return polarisCallContext; - } - - @Override - public Map contextVariables() { - return Map.of(); - } - }); PolarisMetaStoreManager metastoreManager = Mockito.mock(PolarisMetaStoreManager.class); String mainSecret = "test_secret"; String clientId = "test_client_id"; diff --git a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/task/BatchFileCleanupTaskHandlerTest.java b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/task/BatchFileCleanupTaskHandlerTest.java index 662f88bb05..9259917841 100644 --- a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/task/BatchFileCleanupTaskHandlerTest.java +++ b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/task/BatchFileCleanupTaskHandlerTest.java @@ -94,114 +94,110 @@ public void testMetadataFileCleanup() throws IOException { new PolarisCallContext( metaStoreManagerFactory.getOrCreateSessionSupplier(realmContext).get(), new PolarisDefaultDiagServiceImpl()); - try (CallContext callCtx = CallContext.of(realmContext, polarisCallContext)) { - CallContext.setCurrentContext(callCtx); - FileIO fileIO = - new InMemoryFileIO() { - @Override - public void close() { - // no-op - } - }; - TableIdentifier tableIdentifier = - TableIdentifier.of(Namespace.of("db1", "schema1"), "table1"); - BatchFileCleanupTaskHandler handler = - new BatchFileCleanupTaskHandler( - buildTaskFileIOSupplier(fileIO), Executors.newSingleThreadExecutor()); + CallContext callCtx = CallContext.of(realmContext, polarisCallContext); + FileIO fileIO = + new InMemoryFileIO() { + @Override + public void close() { + // no-op + } + }; + TableIdentifier tableIdentifier = TableIdentifier.of(Namespace.of("db1", "schema1"), "table1"); + BatchFileCleanupTaskHandler handler = + new BatchFileCleanupTaskHandler( + buildTaskFileIOSupplier(fileIO), Executors.newSingleThreadExecutor()); - long snapshotId1 = 100L; - ManifestFile manifestFile1 = - TaskTestUtils.manifestFile( - fileIO, "manifest1.avro", snapshotId1, "dataFile1.parquet", "dataFile2.parquet"); - ManifestFile manifestFile2 = - TaskTestUtils.manifestFile( - fileIO, "manifest2.avro", snapshotId1, "dataFile3.parquet", "dataFile4.parquet"); - Snapshot snapshot = - TaskTestUtils.newSnapshot( - fileIO, "manifestList.avro", 1, snapshotId1, 99L, manifestFile1, manifestFile2); - StatisticsFile statisticsFile1 = - TaskTestUtils.writeStatsFile( - snapshot.snapshotId(), - snapshot.sequenceNumber(), - "/metadata/" + UUID.randomUUID() + ".stats", - fileIO); - PartitionStatisticsFile partitionStatisticsFile1 = - TaskTestUtils.writePartitionStatsFile( - snapshot.snapshotId(), - "/metadata/" + "partition-stats-" + UUID.randomUUID() + ".parquet", - fileIO); - String firstMetadataFile = "v1-295495059.metadata.json"; - TableMetadata firstMetadata = - TaskTestUtils.writeTableMetadata( - fileIO, - firstMetadataFile, - List.of(statisticsFile1), - List.of(partitionStatisticsFile1), - snapshot); - assertThat(TaskUtils.exists(firstMetadataFile, fileIO)).isTrue(); + long snapshotId1 = 100L; + ManifestFile manifestFile1 = + TaskTestUtils.manifestFile( + fileIO, "manifest1.avro", snapshotId1, "dataFile1.parquet", "dataFile2.parquet"); + ManifestFile manifestFile2 = + TaskTestUtils.manifestFile( + fileIO, "manifest2.avro", snapshotId1, "dataFile3.parquet", "dataFile4.parquet"); + Snapshot snapshot = + TaskTestUtils.newSnapshot( + fileIO, "manifestList.avro", 1, snapshotId1, 99L, manifestFile1, manifestFile2); + StatisticsFile statisticsFile1 = + TaskTestUtils.writeStatsFile( + snapshot.snapshotId(), + snapshot.sequenceNumber(), + "/metadata/" + UUID.randomUUID() + ".stats", + fileIO); + PartitionStatisticsFile partitionStatisticsFile1 = + TaskTestUtils.writePartitionStatsFile( + snapshot.snapshotId(), + "/metadata/" + "partition-stats-" + UUID.randomUUID() + ".parquet", + fileIO); + String firstMetadataFile = "v1-295495059.metadata.json"; + TableMetadata firstMetadata = + TaskTestUtils.writeTableMetadata( + fileIO, + firstMetadataFile, + List.of(statisticsFile1), + List.of(partitionStatisticsFile1), + snapshot); + assertThat(TaskUtils.exists(firstMetadataFile, fileIO)).isTrue(); - ManifestFile manifestFile3 = - TaskTestUtils.manifestFile( - fileIO, "manifest3.avro", snapshot.snapshotId() + 1, "dataFile5.parquet"); - Snapshot snapshot2 = - TaskTestUtils.newSnapshot( - fileIO, - "manifestList2.avro", - snapshot.sequenceNumber() + 1, - snapshot.snapshotId() + 1, - snapshot.snapshotId(), - manifestFile1, - manifestFile3); // exclude manifest2 from the new snapshot - StatisticsFile statisticsFile2 = - TaskTestUtils.writeStatsFile( - snapshot2.snapshotId(), - snapshot2.sequenceNumber(), - "/metadata/" + UUID.randomUUID() + ".stats", - fileIO); - PartitionStatisticsFile partitionStatisticsFile2 = - TaskTestUtils.writePartitionStatsFile( - snapshot2.snapshotId(), - "/metadata/" + "partition-stats-" + UUID.randomUUID() + ".parquet", - fileIO); - String secondMetadataFile = "v1-295495060.metadata.json"; - TableMetadata secondMetadata = - TaskTestUtils.writeTableMetadata( - fileIO, - secondMetadataFile, - firstMetadata, - firstMetadataFile, - List.of(statisticsFile2), - List.of(partitionStatisticsFile2), - snapshot2); - assertThat(TaskUtils.exists(firstMetadataFile, fileIO)).isTrue(); - assertThat(TaskUtils.exists(secondMetadataFile, fileIO)).isTrue(); + ManifestFile manifestFile3 = + TaskTestUtils.manifestFile( + fileIO, "manifest3.avro", snapshot.snapshotId() + 1, "dataFile5.parquet"); + Snapshot snapshot2 = + TaskTestUtils.newSnapshot( + fileIO, + "manifestList2.avro", + snapshot.sequenceNumber() + 1, + snapshot.snapshotId() + 1, + snapshot.snapshotId(), + manifestFile1, + manifestFile3); // exclude manifest2 from the new snapshot + StatisticsFile statisticsFile2 = + TaskTestUtils.writeStatsFile( + snapshot2.snapshotId(), + snapshot2.sequenceNumber(), + "/metadata/" + UUID.randomUUID() + ".stats", + fileIO); + PartitionStatisticsFile partitionStatisticsFile2 = + TaskTestUtils.writePartitionStatsFile( + snapshot2.snapshotId(), + "/metadata/" + "partition-stats-" + UUID.randomUUID() + ".parquet", + fileIO); + String secondMetadataFile = "v1-295495060.metadata.json"; + TableMetadata secondMetadata = + TaskTestUtils.writeTableMetadata( + fileIO, + secondMetadataFile, + firstMetadata, + firstMetadataFile, + List.of(statisticsFile2), + List.of(partitionStatisticsFile2), + snapshot2); + assertThat(TaskUtils.exists(firstMetadataFile, fileIO)).isTrue(); + assertThat(TaskUtils.exists(secondMetadataFile, fileIO)).isTrue(); - List cleanupFiles = - Stream.of( - secondMetadata.previousFiles().stream().map(TableMetadata.MetadataLogEntry::file), - secondMetadata.statisticsFiles().stream().map(StatisticsFile::path), - secondMetadata.partitionStatisticsFiles().stream() - .map(PartitionStatisticsFile::path)) - .flatMap(s -> s) - .filter(file -> TaskUtils.exists(file, fileIO)) - .toList(); + List cleanupFiles = + Stream.of( + secondMetadata.previousFiles().stream().map(TableMetadata.MetadataLogEntry::file), + secondMetadata.statisticsFiles().stream().map(StatisticsFile::path), + secondMetadata.partitionStatisticsFiles().stream() + .map(PartitionStatisticsFile::path)) + .flatMap(s -> s) + .filter(file -> TaskUtils.exists(file, fileIO)) + .toList(); - TaskEntity task = - new TaskEntity.Builder() - .withTaskType(AsyncTaskType.BATCH_FILE_CLEANUP) - .withData( - new BatchFileCleanupTaskHandler.BatchFileCleanupTask( - tableIdentifier, cleanupFiles)) - .setName(UUID.randomUUID().toString()) - .build(); + TaskEntity task = + new TaskEntity.Builder() + .withTaskType(AsyncTaskType.BATCH_FILE_CLEANUP) + .withData( + new BatchFileCleanupTaskHandler.BatchFileCleanupTask(tableIdentifier, cleanupFiles)) + .setName(UUID.randomUUID().toString()) + .build(); - addTaskLocation(task); - assertThatPredicate(handler::canHandleTask).accepts(task); - assertThat(handler.handleTask(task, callCtx)).isTrue(); + addTaskLocation(task); + assertThatPredicate(handler::canHandleTask).accepts(task); + assertThat(handler.handleTask(task, callCtx)).isTrue(); - for (String cleanupFile : cleanupFiles) { - assertThatPredicate((String file) -> TaskUtils.exists(file, fileIO)).rejects(cleanupFile); - } + for (String cleanupFile : cleanupFiles) { + assertThatPredicate((String file) -> TaskUtils.exists(file, fileIO)).rejects(cleanupFile); } } @@ -211,45 +207,43 @@ public void testMetadataFileCleanupIfFileNotExist() throws IOException { new PolarisCallContext( metaStoreManagerFactory.getOrCreateSessionSupplier(realmContext).get(), new PolarisDefaultDiagServiceImpl()); - try (CallContext callCtx = CallContext.of(realmContext, polarisCallContext)) { - CallContext.setCurrentContext(callCtx); - FileIO fileIO = new InMemoryFileIO(); - TableIdentifier tableIdentifier = - TableIdentifier.of(Namespace.of("db1", "schema1"), "table1"); - BatchFileCleanupTaskHandler handler = - new BatchFileCleanupTaskHandler( - buildTaskFileIOSupplier(fileIO), Executors.newSingleThreadExecutor()); - long snapshotId = 100L; - ManifestFile manifestFile = - TaskTestUtils.manifestFile( - fileIO, "manifest1.avro", snapshotId, "dataFile1.parquet", "dataFile2.parquet"); - TestSnapshot snapshot = - TaskTestUtils.newSnapshot(fileIO, "manifestList.avro", 1, snapshotId, 99L, manifestFile); - String metadataFile = "v1-49494949.metadata.json"; - StatisticsFile statisticsFile = - TaskTestUtils.writeStatsFile( - snapshot.snapshotId(), - snapshot.sequenceNumber(), - "/metadata/" + UUID.randomUUID() + ".stats", - fileIO); - TaskTestUtils.writeTableMetadata(fileIO, metadataFile, List.of(statisticsFile), snapshot); + CallContext callCtx = CallContext.of(realmContext, polarisCallContext); + CallContext.setCurrentContext(callCtx); + FileIO fileIO = new InMemoryFileIO(); + TableIdentifier tableIdentifier = TableIdentifier.of(Namespace.of("db1", "schema1"), "table1"); + BatchFileCleanupTaskHandler handler = + new BatchFileCleanupTaskHandler( + buildTaskFileIOSupplier(fileIO), Executors.newSingleThreadExecutor()); + long snapshotId = 100L; + ManifestFile manifestFile = + TaskTestUtils.manifestFile( + fileIO, "manifest1.avro", snapshotId, "dataFile1.parquet", "dataFile2.parquet"); + TestSnapshot snapshot = + TaskTestUtils.newSnapshot(fileIO, "manifestList.avro", 1, snapshotId, 99L, manifestFile); + String metadataFile = "v1-49494949.metadata.json"; + StatisticsFile statisticsFile = + TaskTestUtils.writeStatsFile( + snapshot.snapshotId(), + snapshot.sequenceNumber(), + "/metadata/" + UUID.randomUUID() + ".stats", + fileIO); + TaskTestUtils.writeTableMetadata(fileIO, metadataFile, List.of(statisticsFile), snapshot); - fileIO.deleteFile(statisticsFile.path()); - assertThat(TaskUtils.exists(statisticsFile.path(), fileIO)).isFalse(); + fileIO.deleteFile(statisticsFile.path()); + assertThat(TaskUtils.exists(statisticsFile.path(), fileIO)).isFalse(); - TaskEntity task = - new TaskEntity.Builder() - .withTaskType(AsyncTaskType.BATCH_FILE_CLEANUP) - .withData( - new BatchFileCleanupTaskHandler.BatchFileCleanupTask( - tableIdentifier, List.of(statisticsFile.path()))) - .setName(UUID.randomUUID().toString()) - .build(); + TaskEntity task = + new TaskEntity.Builder() + .withTaskType(AsyncTaskType.BATCH_FILE_CLEANUP) + .withData( + new BatchFileCleanupTaskHandler.BatchFileCleanupTask( + tableIdentifier, List.of(statisticsFile.path()))) + .setName(UUID.randomUUID().toString()) + .build(); - addTaskLocation(task); - assertThatPredicate(handler::canHandleTask).accepts(task); - assertThat(handler.handleTask(task, callCtx)).isTrue(); - } + addTaskLocation(task); + assertThatPredicate(handler::canHandleTask).accepts(task); + assertThat(handler.handleTask(task, callCtx)).isTrue(); } @Test @@ -258,77 +252,73 @@ public void testCleanupWithRetries() throws IOException { new PolarisCallContext( metaStoreManagerFactory.getOrCreateSessionSupplier(realmContext).get(), new PolarisDefaultDiagServiceImpl()); - try (CallContext callCtx = CallContext.of(realmContext, polarisCallContext)) { - CallContext.setCurrentContext(callCtx); - Map retryCounter = new HashMap<>(); - FileIO fileIO = - new InMemoryFileIO() { - @Override - public void close() { - // no-op - } + CallContext callCtx = CallContext.of(realmContext, polarisCallContext); + CallContext.setCurrentContext(callCtx); + Map retryCounter = new HashMap<>(); + FileIO fileIO = + new InMemoryFileIO() { + @Override + public void close() { + // no-op + } - @Override - public void deleteFile(String location) { - int attempts = - retryCounter - .computeIfAbsent(location, k -> new AtomicInteger(0)) - .incrementAndGet(); - if (attempts < 3) { - throw new RuntimeException("Simulating failure to test retries"); - } else { - super.deleteFile(location); - } + @Override + public void deleteFile(String location) { + int attempts = + retryCounter.computeIfAbsent(location, k -> new AtomicInteger(0)).incrementAndGet(); + if (attempts < 3) { + throw new RuntimeException("Simulating failure to test retries"); + } else { + super.deleteFile(location); } - }; - TableIdentifier tableIdentifier = - TableIdentifier.of(Namespace.of("db1", "schema1"), "table1"); - BatchFileCleanupTaskHandler handler = - new BatchFileCleanupTaskHandler( - buildTaskFileIOSupplier(fileIO), Executors.newSingleThreadExecutor()); - long snapshotId = 100L; - ManifestFile manifestFile = - TaskTestUtils.manifestFile( - fileIO, "manifest1.avro", snapshotId, "dataFile1.parquet", "dataFile2.parquet"); - TestSnapshot snapshot = - TaskTestUtils.newSnapshot(fileIO, "manifestList.avro", 1, snapshotId, 99L, manifestFile); - String metadataFile = "v1-49494949.metadata.json"; - StatisticsFile statisticsFile = - TaskTestUtils.writeStatsFile( - snapshot.snapshotId(), - snapshot.sequenceNumber(), - "/metadata/" + UUID.randomUUID() + ".stats", - fileIO); - TaskTestUtils.writeTableMetadata(fileIO, metadataFile, List.of(statisticsFile), snapshot); - assertThat(TaskUtils.exists(statisticsFile.path(), fileIO)).isTrue(); + } + }; + TableIdentifier tableIdentifier = TableIdentifier.of(Namespace.of("db1", "schema1"), "table1"); + BatchFileCleanupTaskHandler handler = + new BatchFileCleanupTaskHandler( + buildTaskFileIOSupplier(fileIO), Executors.newSingleThreadExecutor()); + long snapshotId = 100L; + ManifestFile manifestFile = + TaskTestUtils.manifestFile( + fileIO, "manifest1.avro", snapshotId, "dataFile1.parquet", "dataFile2.parquet"); + TestSnapshot snapshot = + TaskTestUtils.newSnapshot(fileIO, "manifestList.avro", 1, snapshotId, 99L, manifestFile); + String metadataFile = "v1-49494949.metadata.json"; + StatisticsFile statisticsFile = + TaskTestUtils.writeStatsFile( + snapshot.snapshotId(), + snapshot.sequenceNumber(), + "/metadata/" + UUID.randomUUID() + ".stats", + fileIO); + TaskTestUtils.writeTableMetadata(fileIO, metadataFile, List.of(statisticsFile), snapshot); + assertThat(TaskUtils.exists(statisticsFile.path(), fileIO)).isTrue(); - TaskEntity task = - new TaskEntity.Builder() - .withTaskType(AsyncTaskType.BATCH_FILE_CLEANUP) - .withData( - new BatchFileCleanupTaskHandler.BatchFileCleanupTask( - tableIdentifier, List.of(statisticsFile.path()))) - .setName(UUID.randomUUID().toString()) - .build(); + TaskEntity task = + new TaskEntity.Builder() + .withTaskType(AsyncTaskType.BATCH_FILE_CLEANUP) + .withData( + new BatchFileCleanupTaskHandler.BatchFileCleanupTask( + tableIdentifier, List.of(statisticsFile.path()))) + .setName(UUID.randomUUID().toString()) + .build(); - CompletableFuture future = - CompletableFuture.runAsync( - () -> { - CallContext.setCurrentContext(callCtx); - addTaskLocation(task); - assertThatPredicate(handler::canHandleTask).accepts(task); - handler.handleTask(task, callCtx); // this will schedule the batch deletion - }); + CompletableFuture future = + CompletableFuture.runAsync( + () -> { + CallContext.setCurrentContext(callCtx); + addTaskLocation(task); + assertThatPredicate(handler::canHandleTask).accepts(task); + handler.handleTask(task, callCtx); // this will schedule the batch deletion + }); - // Wait for all async tasks to finish - future.join(); + // Wait for all async tasks to finish + future.join(); - // Check if the file was successfully deleted after retries - assertThat(TaskUtils.exists(statisticsFile.path(), fileIO)).isFalse(); + // Check if the file was successfully deleted after retries + assertThat(TaskUtils.exists(statisticsFile.path(), fileIO)).isFalse(); - // Ensure that retries happened as expected - assertThat(retryCounter.containsKey(statisticsFile.path())).isTrue(); - assertThat(retryCounter.get(statisticsFile.path()).get()).isEqualTo(3); - } + // Ensure that retries happened as expected + assertThat(retryCounter.containsKey(statisticsFile.path())).isTrue(); + assertThat(retryCounter.get(statisticsFile.path()).get()).isEqualTo(3); } } diff --git a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/task/ManifestFileCleanupTaskHandlerTest.java b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/task/ManifestFileCleanupTaskHandlerTest.java index 39cd619bd4..58fa14d7ec 100644 --- a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/task/ManifestFileCleanupTaskHandlerTest.java +++ b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/task/ManifestFileCleanupTaskHandlerTest.java @@ -93,32 +93,28 @@ public void testCleanupFileNotExists() throws IOException { new PolarisCallContext( metaStoreManagerFactory.getOrCreateSessionSupplier(realmContext).get(), new PolarisDefaultDiagServiceImpl()); - try (CallContext callCtx = CallContext.of(realmContext, polarisCallContext)) { - CallContext.setCurrentContext(callCtx); - FileIO fileIO = new InMemoryFileIO(); - TableIdentifier tableIdentifier = - TableIdentifier.of(Namespace.of("db1", "schema1"), "table1"); + CallContext callCtx = CallContext.of(realmContext, polarisCallContext); + FileIO fileIO = new InMemoryFileIO(); + TableIdentifier tableIdentifier = TableIdentifier.of(Namespace.of("db1", "schema1"), "table1"); - ManifestFileCleanupTaskHandler handler = - new ManifestFileCleanupTaskHandler( - buildTaskFileIOSupplier(fileIO), Executors.newSingleThreadExecutor()); - ManifestFile manifestFile = - TaskTestUtils.manifestFile( - fileIO, "manifest1.avro", 1L, "dataFile1.parquet", "dataFile2.parquet"); - fileIO.deleteFile(manifestFile.path()); - TaskEntity task = - new TaskEntity.Builder() - .withTaskType(AsyncTaskType.MANIFEST_FILE_CLEANUP) - .withData( - new ManifestFileCleanupTaskHandler.ManifestCleanupTask( - tableIdentifier, - Base64.encodeBase64String(ManifestFiles.encode(manifestFile)))) - .setName(UUID.randomUUID().toString()) - .build(); - addTaskLocation(task); - assertThatPredicate(handler::canHandleTask).accepts(task); - assertThat(handler.handleTask(task, callCtx)).isTrue(); - } + ManifestFileCleanupTaskHandler handler = + new ManifestFileCleanupTaskHandler( + buildTaskFileIOSupplier(fileIO), Executors.newSingleThreadExecutor()); + ManifestFile manifestFile = + TaskTestUtils.manifestFile( + fileIO, "manifest1.avro", 1L, "dataFile1.parquet", "dataFile2.parquet"); + fileIO.deleteFile(manifestFile.path()); + TaskEntity task = + new TaskEntity.Builder() + .withTaskType(AsyncTaskType.MANIFEST_FILE_CLEANUP) + .withData( + new ManifestFileCleanupTaskHandler.ManifestCleanupTask( + tableIdentifier, Base64.encodeBase64String(ManifestFiles.encode(manifestFile)))) + .setName(UUID.randomUUID().toString()) + .build(); + addTaskLocation(task); + assertThatPredicate(handler::canHandleTask).accepts(task); + assertThat(handler.handleTask(task, callCtx)).isTrue(); } @Test @@ -127,30 +123,27 @@ public void testCleanupFileManifestExistsDataFilesDontExist() throws IOException new PolarisCallContext( metaStoreManagerFactory.getOrCreateSessionSupplier(realmContext).get(), new PolarisDefaultDiagServiceImpl()); - try (CallContext callCtx = CallContext.of(realmContext, polarisCallContext)) { - CallContext.setCurrentContext(callCtx); - FileIO fileIO = new InMemoryFileIO(); - TableIdentifier tableIdentifier = - TableIdentifier.of(Namespace.of("db1", "schema1"), "table1"); - ManifestFileCleanupTaskHandler handler = - new ManifestFileCleanupTaskHandler( - buildTaskFileIOSupplier(fileIO), Executors.newSingleThreadExecutor()); - ManifestFile manifestFile = - TaskTestUtils.manifestFile( - fileIO, "manifest1.avro", 100L, "dataFile1.parquet", "dataFile2.parquet"); - TaskEntity task = - new TaskEntity.Builder() - .withTaskType(AsyncTaskType.MANIFEST_FILE_CLEANUP) - .withData( - new ManifestFileCleanupTaskHandler.ManifestCleanupTask( - tableIdentifier, - Base64.encodeBase64String(ManifestFiles.encode(manifestFile)))) - .setName(UUID.randomUUID().toString()) - .build(); - addTaskLocation(task); - assertThatPredicate(handler::canHandleTask).accepts(task); - assertThat(handler.handleTask(task, callCtx)).isTrue(); - } + CallContext callCtx = CallContext.of(realmContext, polarisCallContext); + CallContext.setCurrentContext(callCtx); + FileIO fileIO = new InMemoryFileIO(); + TableIdentifier tableIdentifier = TableIdentifier.of(Namespace.of("db1", "schema1"), "table1"); + ManifestFileCleanupTaskHandler handler = + new ManifestFileCleanupTaskHandler( + buildTaskFileIOSupplier(fileIO), Executors.newSingleThreadExecutor()); + ManifestFile manifestFile = + TaskTestUtils.manifestFile( + fileIO, "manifest1.avro", 100L, "dataFile1.parquet", "dataFile2.parquet"); + TaskEntity task = + new TaskEntity.Builder() + .withTaskType(AsyncTaskType.MANIFEST_FILE_CLEANUP) + .withData( + new ManifestFileCleanupTaskHandler.ManifestCleanupTask( + tableIdentifier, Base64.encodeBase64String(ManifestFiles.encode(manifestFile)))) + .setName(UUID.randomUUID().toString()) + .build(); + addTaskLocation(task); + assertThatPredicate(handler::canHandleTask).accepts(task); + assertThat(handler.handleTask(task, callCtx)).isTrue(); } @Test @@ -159,47 +152,44 @@ public void testCleanupFiles() throws IOException { new PolarisCallContext( metaStoreManagerFactory.getOrCreateSessionSupplier(realmContext).get(), new PolarisDefaultDiagServiceImpl()); - try (CallContext callCtx = CallContext.of(realmContext, polarisCallContext)) { - CallContext.setCurrentContext(callCtx); - FileIO fileIO = - new InMemoryFileIO() { - @Override - public void close() { - // no-op - } - }; - TableIdentifier tableIdentifier = - TableIdentifier.of(Namespace.of("db1", "schema1"), "table1"); - ManifestFileCleanupTaskHandler handler = - new ManifestFileCleanupTaskHandler( - buildTaskFileIOSupplier(fileIO), Executors.newSingleThreadExecutor()); - String dataFile1Path = "dataFile1.parquet"; - OutputFile dataFile1 = fileIO.newOutputFile(dataFile1Path); - PositionOutputStream out1 = dataFile1.createOrOverwrite(); - out1.write("the data".getBytes(UTF_8)); - out1.close(); - String dataFile2Path = "dataFile2.parquet"; - OutputFile dataFile2 = fileIO.newOutputFile(dataFile2Path); - PositionOutputStream out2 = dataFile2.createOrOverwrite(); - out2.write("the data".getBytes(UTF_8)); - out2.close(); - ManifestFile manifestFile = - TaskTestUtils.manifestFile(fileIO, "manifest1.avro", 100L, dataFile1Path, dataFile2Path); - TaskEntity task = - new TaskEntity.Builder() - .withTaskType(AsyncTaskType.MANIFEST_FILE_CLEANUP) - .withData( - new ManifestFileCleanupTaskHandler.ManifestCleanupTask( - tableIdentifier, - Base64.encodeBase64String(ManifestFiles.encode(manifestFile)))) - .setName(UUID.randomUUID().toString()) - .build(); - addTaskLocation(task); - assertThatPredicate(handler::canHandleTask).accepts(task); - assertThat(handler.handleTask(task, callCtx)).isTrue(); - assertThatPredicate((String f) -> TaskUtils.exists(f, fileIO)).rejects(dataFile1Path); - assertThatPredicate((String f) -> TaskUtils.exists(f, fileIO)).rejects(dataFile2Path); - } + CallContext callCtx = CallContext.of(realmContext, polarisCallContext); + CallContext.setCurrentContext(callCtx); + FileIO fileIO = + new InMemoryFileIO() { + @Override + public void close() { + // no-op + } + }; + TableIdentifier tableIdentifier = TableIdentifier.of(Namespace.of("db1", "schema1"), "table1"); + ManifestFileCleanupTaskHandler handler = + new ManifestFileCleanupTaskHandler( + buildTaskFileIOSupplier(fileIO), Executors.newSingleThreadExecutor()); + String dataFile1Path = "dataFile1.parquet"; + OutputFile dataFile1 = fileIO.newOutputFile(dataFile1Path); + PositionOutputStream out1 = dataFile1.createOrOverwrite(); + out1.write("the data".getBytes(UTF_8)); + out1.close(); + String dataFile2Path = "dataFile2.parquet"; + OutputFile dataFile2 = fileIO.newOutputFile(dataFile2Path); + PositionOutputStream out2 = dataFile2.createOrOverwrite(); + out2.write("the data".getBytes(UTF_8)); + out2.close(); + ManifestFile manifestFile = + TaskTestUtils.manifestFile(fileIO, "manifest1.avro", 100L, dataFile1Path, dataFile2Path); + TaskEntity task = + new TaskEntity.Builder() + .withTaskType(AsyncTaskType.MANIFEST_FILE_CLEANUP) + .withData( + new ManifestFileCleanupTaskHandler.ManifestCleanupTask( + tableIdentifier, Base64.encodeBase64String(ManifestFiles.encode(manifestFile)))) + .setName(UUID.randomUUID().toString()) + .build(); + addTaskLocation(task); + assertThatPredicate(handler::canHandleTask).accepts(task); + assertThat(handler.handleTask(task, callCtx)).isTrue(); + assertThatPredicate((String f) -> TaskUtils.exists(f, fileIO)).rejects(dataFile1Path); + assertThatPredicate((String f) -> TaskUtils.exists(f, fileIO)).rejects(dataFile2Path); } @Test @@ -208,62 +198,57 @@ public void testCleanupFilesWithRetries() throws IOException { new PolarisCallContext( metaStoreManagerFactory.getOrCreateSessionSupplier(realmContext).get(), new PolarisDefaultDiagServiceImpl()); - try (CallContext callCtx = CallContext.of(realmContext, polarisCallContext)) { - CallContext.setCurrentContext(callCtx); - Map retryCounter = new HashMap<>(); - FileIO fileIO = - new InMemoryFileIO() { - @Override - public void close() { - // no-op - } + CallContext callCtx = CallContext.of(realmContext, polarisCallContext); + CallContext.setCurrentContext(callCtx); + Map retryCounter = new HashMap<>(); + FileIO fileIO = + new InMemoryFileIO() { + @Override + public void close() { + // no-op + } - @Override - public void deleteFile(String location) { - int attempts = - retryCounter - .computeIfAbsent(location, k -> new AtomicInteger(0)) - .incrementAndGet(); - if (attempts < 3) { - throw new RuntimeException("I'm failing to test retries"); - } else { - // succeed on the third attempt - super.deleteFile(location); - } + @Override + public void deleteFile(String location) { + int attempts = + retryCounter.computeIfAbsent(location, k -> new AtomicInteger(0)).incrementAndGet(); + if (attempts < 3) { + throw new RuntimeException("I'm failing to test retries"); + } else { + // succeed on the third attempt + super.deleteFile(location); } - }; + } + }; - TableIdentifier tableIdentifier = - TableIdentifier.of(Namespace.of("db1", "schema1"), "table1"); - ManifestFileCleanupTaskHandler handler = - new ManifestFileCleanupTaskHandler( - buildTaskFileIOSupplier(fileIO), Executors.newSingleThreadExecutor()); - String dataFile1Path = "dataFile1.parquet"; - OutputFile dataFile1 = fileIO.newOutputFile(dataFile1Path); - PositionOutputStream out1 = dataFile1.createOrOverwrite(); - out1.write("the data".getBytes(UTF_8)); - out1.close(); - String dataFile2Path = "dataFile2.parquet"; - OutputFile dataFile2 = fileIO.newOutputFile(dataFile2Path); - PositionOutputStream out2 = dataFile2.createOrOverwrite(); - out2.write("the data".getBytes(UTF_8)); - out2.close(); - ManifestFile manifestFile = - TaskTestUtils.manifestFile(fileIO, "manifest1.avro", 100L, dataFile1Path, dataFile2Path); - TaskEntity task = - new TaskEntity.Builder() - .withTaskType(AsyncTaskType.MANIFEST_FILE_CLEANUP) - .withData( - new ManifestFileCleanupTaskHandler.ManifestCleanupTask( - tableIdentifier, - Base64.encodeBase64String(ManifestFiles.encode(manifestFile)))) - .setName(UUID.randomUUID().toString()) - .build(); - addTaskLocation(task); - assertThatPredicate(handler::canHandleTask).accepts(task); - assertThat(handler.handleTask(task, callCtx)).isTrue(); - assertThatPredicate((String f) -> TaskUtils.exists(f, fileIO)).rejects(dataFile1Path); - assertThatPredicate((String f) -> TaskUtils.exists(f, fileIO)).rejects(dataFile2Path); - } + TableIdentifier tableIdentifier = TableIdentifier.of(Namespace.of("db1", "schema1"), "table1"); + ManifestFileCleanupTaskHandler handler = + new ManifestFileCleanupTaskHandler( + buildTaskFileIOSupplier(fileIO), Executors.newSingleThreadExecutor()); + String dataFile1Path = "dataFile1.parquet"; + OutputFile dataFile1 = fileIO.newOutputFile(dataFile1Path); + PositionOutputStream out1 = dataFile1.createOrOverwrite(); + out1.write("the data".getBytes(UTF_8)); + out1.close(); + String dataFile2Path = "dataFile2.parquet"; + OutputFile dataFile2 = fileIO.newOutputFile(dataFile2Path); + PositionOutputStream out2 = dataFile2.createOrOverwrite(); + out2.write("the data".getBytes(UTF_8)); + out2.close(); + ManifestFile manifestFile = + TaskTestUtils.manifestFile(fileIO, "manifest1.avro", 100L, dataFile1Path, dataFile2Path); + TaskEntity task = + new TaskEntity.Builder() + .withTaskType(AsyncTaskType.MANIFEST_FILE_CLEANUP) + .withData( + new ManifestFileCleanupTaskHandler.ManifestCleanupTask( + tableIdentifier, Base64.encodeBase64String(ManifestFiles.encode(manifestFile)))) + .setName(UUID.randomUUID().toString()) + .build(); + addTaskLocation(task); + assertThatPredicate(handler::canHandleTask).accepts(task); + assertThat(handler.handleTask(task, callCtx)).isTrue(); + assertThatPredicate((String f) -> TaskUtils.exists(f, fileIO)).rejects(dataFile1Path); + assertThatPredicate((String f) -> TaskUtils.exists(f, fileIO)).rejects(dataFile2Path); } } diff --git a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/test/PolarisIntegrationTestFixture.java b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/test/PolarisIntegrationTestFixture.java index 259cfc648f..2dc0400bcb 100644 --- a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/test/PolarisIntegrationTestFixture.java +++ b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/test/PolarisIntegrationTestFixture.java @@ -115,13 +115,12 @@ private PolarisPrincipalSecrets fetchAdminSecrets() { PolarisCallContext polarisContext = new PolarisCallContext( metaStoreSession, helper.diagServices, helper.configurationStore, helper.clock); - try (CallContext ctx = CallContext.of(realmContext, polarisContext)) { - CallContext.setCurrentContext(ctx); + try { PolarisMetaStoreManager metaStoreManager = - helper.metaStoreManagerFactory.getOrCreateMetaStoreManager(ctx.getRealmContext()); + helper.metaStoreManagerFactory.getOrCreateMetaStoreManager(realmContext); EntityResult principal = metaStoreManager.readEntityByName( - ctx.getPolarisCallContext(), + polarisContext, null, PolarisEntityType.PRINCIPAL, PolarisEntitySubType.NULL_SUBTYPE, @@ -129,7 +128,7 @@ private PolarisPrincipalSecrets fetchAdminSecrets() { Map propertiesMap = readInternalProperties(principal); return metaStoreManager - .loadPrincipalSecrets(ctx.getPolarisCallContext(), propertiesMap.get("client_id")) + .loadPrincipalSecrets(polarisContext, propertiesMap.get("client_id")) .getPrincipalSecrets(); } finally { CallContext.unsetCurrentContext(); diff --git a/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java b/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java index c972b2b643..5ab88e862a 100644 --- a/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java +++ b/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java @@ -255,7 +255,6 @@ public void initialize(String name, Map properties) { CatalogProperties.FILE_IO_IMPL); } - callContext.closeables().addCloseable(this); this.closeableGroup = new CloseableGroup(); closeableGroup.addCloseable(metricsReporter()); closeableGroup.setSuppressCloseFailure(true); diff --git a/service/common/src/main/java/org/apache/polaris/service/context/catalog/PolarisCallContextCatalogFactory.java b/service/common/src/main/java/org/apache/polaris/service/context/catalog/PolarisCallContextCatalogFactory.java index 96445f9493..5ee0cea23f 100644 --- a/service/common/src/main/java/org/apache/polaris/service/context/catalog/PolarisCallContextCatalogFactory.java +++ b/service/common/src/main/java/org/apache/polaris/service/context/catalog/PolarisCallContextCatalogFactory.java @@ -99,8 +99,6 @@ public Catalog createCallContextCatalog( fileIOFactory, polarisEventListener); - context.contextVariables().put(CallContext.REQUEST_PATH_CATALOG_INSTANCE_KEY, catalogInstance); - CatalogEntity catalog = CatalogEntity.of(baseCatalogEntity); Map catalogProperties = new HashMap<>(catalog.getPropertiesAsMap()); String defaultBaseLocation = catalog.getDefaultBaseLocation(); diff --git a/service/common/src/test/java/org/apache/polaris/service/catalog/io/FileIOFactoryTest.java b/service/common/src/test/java/org/apache/polaris/service/catalog/io/FileIOFactoryTest.java index 64a85df323..28a181ef8b 100644 --- a/service/common/src/test/java/org/apache/polaris/service/catalog/io/FileIOFactoryTest.java +++ b/service/common/src/test/java/org/apache/polaris/service/catalog/io/FileIOFactoryTest.java @@ -26,7 +26,6 @@ import jakarta.annotation.Nonnull; import java.lang.reflect.Method; import java.time.Clock; -import java.util.HashMap; import java.util.List; import java.util.Map; import org.apache.iceberg.Schema; @@ -154,11 +153,6 @@ public PolarisCallContext getPolarisCallContext() { testServices.configurationStore(), Mockito.mock(Clock.class)); } - - @Override - public Map contextVariables() { - return new HashMap<>(); - } }; } diff --git a/service/common/src/testFixtures/java/org/apache/polaris/service/TestServices.java b/service/common/src/testFixtures/java/org/apache/polaris/service/TestServices.java index 16f20c1c6d..19de127774 100644 --- a/service/common/src/testFixtures/java/org/apache/polaris/service/TestServices.java +++ b/service/common/src/testFixtures/java/org/apache/polaris/service/TestServices.java @@ -27,7 +27,6 @@ import java.time.Clock; import java.time.Instant; import java.util.Date; -import java.util.HashMap; import java.util.Map; import java.util.Optional; import java.util.Set; @@ -178,13 +177,7 @@ public PolarisCallContext getPolarisCallContext() { configurationStore, Mockito.mock(Clock.class)); } - - @Override - public Map contextVariables() { - return new HashMap<>(); - } }; - CallContext.setCurrentContext(callContext); PolarisEntityManager entityManager = realmEntityManagerFactory.getOrCreateEntityManager(realmContext); PolarisMetaStoreManager metaStoreManager =