diff --git a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/task/TableCleanupTaskHandlerTest.java b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/task/TableCleanupTaskHandlerTest.java index e107eb830e..06901b0f13 100644 --- a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/task/TableCleanupTaskHandlerTest.java +++ b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/task/TableCleanupTaskHandlerTest.java @@ -167,7 +167,8 @@ public void testTableCleanup() throws IOException { taskEntity2 -> taskEntity2.getTaskType()) .returns( new ManifestFileCleanupTaskHandler.ManifestCleanupTask( - tableIdentifier, List.of(statisticsFile.path())), + tableIdentifier, + List.of(snapshot.manifestListLocation(), statisticsFile.path())), entity -> entity.readData( ManifestFileCleanupTaskHandler.ManifestCleanupTask.class))); @@ -224,7 +225,7 @@ public void close() { .getOrCreateMetaStoreManager(realmContext) .loadTasks(callContext.getPolarisCallContext(), "test", 5) .getEntities()) - .hasSize(1); + .hasSize(2); } @Test @@ -285,15 +286,41 @@ public void close() { .getOrCreateMetaStoreManager(realmContext) .loadTasks(callContext.getPolarisCallContext(), "test", 5) .getEntities()) - .hasSize(2) + .hasSize(4) .satisfiesExactly( taskEntity -> assertThat(taskEntity) .returns(PolarisEntityType.TASK.getCode(), PolarisBaseEntity::getTypeCode) .extracting(TaskEntity::of) .returns( - AsyncTaskType.MANIFEST_FILE_CLEANUP, + AsyncTaskType.METADATA_FILE_BATCH_CLEANUP, taskEntity1 -> taskEntity1.getTaskType()) + .returns( + new ManifestFileCleanupTaskHandler.ManifestCleanupTask( + tableIdentifier, List.of(snapshot.manifestListLocation())), + entity -> + entity.readData( + ManifestFileCleanupTaskHandler.ManifestCleanupTask.class)), + taskEntity -> + assertThat(taskEntity) + .returns(PolarisEntityType.TASK.getCode(), PolarisBaseEntity::getTypeCode) + .extracting(TaskEntity::of) + .returns( + AsyncTaskType.METADATA_FILE_BATCH_CLEANUP, + taskEntity2 -> taskEntity2.getTaskType()) + .returns( + new ManifestFileCleanupTaskHandler.ManifestCleanupTask( + tableIdentifier, List.of(snapshot.manifestListLocation())), + entity -> + entity.readData( + ManifestFileCleanupTaskHandler.ManifestCleanupTask.class)), + taskEntity -> + assertThat(taskEntity) + .returns(PolarisEntityType.TASK.getCode(), PolarisBaseEntity::getTypeCode) + .extracting(TaskEntity::of) + .returns( + AsyncTaskType.MANIFEST_FILE_CLEANUP, + taskEntity3 -> taskEntity3.getTaskType()) .returns( new ManifestFileCleanupTaskHandler.ManifestCleanupTask( tableIdentifier, @@ -307,7 +334,7 @@ public void close() { .extracting(TaskEntity::of) .returns( AsyncTaskType.MANIFEST_FILE_CLEANUP, - taskEntity2 -> taskEntity2.getTaskType()) + taskEntity4 -> taskEntity4.getTaskType()) .returns( new ManifestFileCleanupTaskHandler.ManifestCleanupTask( tableIdentifier, @@ -413,7 +440,11 @@ public void testTableCleanupMultipleSnapshots() throws IOException { .returns( new ManifestFileCleanupTaskHandler.ManifestCleanupTask( tableIdentifier, - List.of(statisticsFile1.path(), statisticsFile2.path())), + List.of( + snapshot.manifestListLocation(), + snapshot2.manifestListLocation(), + statisticsFile1.path(), + statisticsFile2.path())), entity -> entity.readData( ManifestFileCleanupTaskHandler.ManifestCleanupTask.class))); @@ -569,7 +600,11 @@ public void testTableCleanupMultipleMetadata() throws IOException { new ManifestFileCleanupTaskHandler.ManifestCleanupTask( tableIdentifier, List.of( - firstMetadataFile, statisticsFile1.path(), statisticsFile2.path())), + firstMetadataFile, + snapshot.manifestListLocation(), + snapshot2.manifestListLocation(), + statisticsFile1.path(), + statisticsFile2.path())), entity -> entity.readData( ManifestFileCleanupTaskHandler.ManifestCleanupTask.class))); diff --git a/service/common/src/main/java/org/apache/polaris/service/task/TableCleanupTaskHandler.java b/service/common/src/main/java/org/apache/polaris/service/task/TableCleanupTaskHandler.java index 3e5dd23f58..3a9cab8f15 100644 --- a/service/common/src/main/java/org/apache/polaris/service/task/TableCleanupTaskHandler.java +++ b/service/common/src/main/java/org/apache/polaris/service/task/TableCleanupTaskHandler.java @@ -25,6 +25,7 @@ import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.iceberg.ManifestFile; +import org.apache.iceberg.Snapshot; import org.apache.iceberg.StatisticsFile; import org.apache.iceberg.TableMetadata; import org.apache.iceberg.TableMetadataParser; @@ -243,7 +244,10 @@ private List> getMetadataFileBatches(TableMetadata tableMetadata, i List> result = new ArrayList<>(); List metadataFiles = Stream.concat( - tableMetadata.previousFiles().stream().map(TableMetadata.MetadataLogEntry::file), + Stream.concat( + tableMetadata.previousFiles().stream() + .map(TableMetadata.MetadataLogEntry::file), + tableMetadata.snapshots().stream().map(Snapshot::manifestListLocation)), tableMetadata.statisticsFiles().stream().map(StatisticsFile::path)) .toList();