Skip to content

Commit e6c2729

Browse files
[Enhancement] Refactor Cleanup Task Handler (#516)
* refactor cleanup task handler * format * make base class abstract * make cleanup task record * simplify logger * update test after merge * update task handler register after merge * fix test error after merge * refine call context and ut after merge
1 parent 692602e commit e6c2729

File tree

9 files changed

+570
-449
lines changed

9 files changed

+570
-449
lines changed

polaris-core/src/main/java/org/apache/polaris/core/entity/AsyncTaskType.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
public enum AsyncTaskType {
2525
ENTITY_CLEANUP_SCHEDULER(1),
2626
MANIFEST_FILE_CLEANUP(2),
27-
METADATA_FILE_BATCH_CLEANUP(3);
27+
BATCH_FILE_CLEANUP(3);
2828

2929
private final int typeCode;
3030

Lines changed: 321 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,321 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.polaris.service.quarkus.task;
20+
21+
import static org.assertj.core.api.Assertions.assertThat;
22+
import static org.assertj.core.api.Assertions.assertThatPredicate;
23+
24+
import io.quarkus.test.junit.QuarkusTest;
25+
import jakarta.inject.Inject;
26+
import java.io.IOException;
27+
import java.util.HashMap;
28+
import java.util.List;
29+
import java.util.Map;
30+
import java.util.Set;
31+
import java.util.UUID;
32+
import java.util.concurrent.CompletableFuture;
33+
import java.util.concurrent.Executors;
34+
import java.util.concurrent.atomic.AtomicInteger;
35+
import java.util.stream.Stream;
36+
import org.apache.iceberg.ManifestFile;
37+
import org.apache.iceberg.Snapshot;
38+
import org.apache.iceberg.StatisticsFile;
39+
import org.apache.iceberg.TableMetadata;
40+
import org.apache.iceberg.catalog.Namespace;
41+
import org.apache.iceberg.catalog.TableIdentifier;
42+
import org.apache.iceberg.inmemory.InMemoryFileIO;
43+
import org.apache.iceberg.io.FileIO;
44+
import org.apache.polaris.core.PolarisCallContext;
45+
import org.apache.polaris.core.PolarisDefaultDiagServiceImpl;
46+
import org.apache.polaris.core.context.CallContext;
47+
import org.apache.polaris.core.context.RealmContext;
48+
import org.apache.polaris.core.entity.AsyncTaskType;
49+
import org.apache.polaris.core.entity.PolarisBaseEntity;
50+
import org.apache.polaris.core.entity.PolarisTaskConstants;
51+
import org.apache.polaris.core.entity.TaskEntity;
52+
import org.apache.polaris.core.persistence.MetaStoreManagerFactory;
53+
import org.apache.polaris.core.persistence.PolarisResolvedPathWrapper;
54+
import org.apache.polaris.core.storage.PolarisStorageActions;
55+
import org.apache.polaris.service.catalog.io.FileIOFactory;
56+
import org.apache.polaris.service.task.BatchFileCleanupTaskHandler;
57+
import org.apache.polaris.service.task.TaskFileIOSupplier;
58+
import org.apache.polaris.service.task.TaskUtils;
59+
import org.jetbrains.annotations.NotNull;
60+
import org.junit.jupiter.api.Test;
61+
62+
@QuarkusTest
63+
public class BatchFileCleanupTaskHandlerTest {
64+
@Inject MetaStoreManagerFactory metaStoreManagerFactory;
65+
private final RealmContext realmContext = () -> "realmName";
66+
67+
private TaskFileIOSupplier buildTaskFileIOSupplier(FileIO fileIO) {
68+
return new TaskFileIOSupplier(
69+
new FileIOFactory() {
70+
@Override
71+
public FileIO loadFileIO(
72+
@NotNull CallContext callContext,
73+
@NotNull String ioImplClassName,
74+
@NotNull Map<String, String> properties,
75+
@NotNull TableIdentifier identifier,
76+
@NotNull Set<String> tableLocations,
77+
@NotNull Set<PolarisStorageActions> storageActions,
78+
@NotNull PolarisResolvedPathWrapper resolvedEntityPath) {
79+
return fileIO;
80+
}
81+
});
82+
}
83+
84+
private void addTaskLocation(TaskEntity task) {
85+
Map<String, String> internalPropertiesAsMap = new HashMap<>(task.getInternalPropertiesAsMap());
86+
internalPropertiesAsMap.put(PolarisTaskConstants.STORAGE_LOCATION, "file:///tmp/");
87+
((PolarisBaseEntity) task).setInternalPropertiesAsMap(internalPropertiesAsMap);
88+
}
89+
90+
@Test
91+
public void testMetadataFileCleanup() throws IOException {
92+
PolarisCallContext polarisCallContext =
93+
new PolarisCallContext(
94+
metaStoreManagerFactory.getOrCreateSessionSupplier(realmContext).get(),
95+
new PolarisDefaultDiagServiceImpl());
96+
try (CallContext callCtx = CallContext.of(realmContext, polarisCallContext)) {
97+
CallContext.setCurrentContext(callCtx);
98+
FileIO fileIO =
99+
new InMemoryFileIO() {
100+
@Override
101+
public void close() {
102+
// no-op
103+
}
104+
};
105+
TableIdentifier tableIdentifier =
106+
TableIdentifier.of(Namespace.of("db1", "schema1"), "table1");
107+
BatchFileCleanupTaskHandler handler =
108+
new BatchFileCleanupTaskHandler(
109+
buildTaskFileIOSupplier(fileIO), Executors.newSingleThreadExecutor());
110+
111+
long snapshotId1 = 100L;
112+
ManifestFile manifestFile1 =
113+
TaskTestUtils.manifestFile(
114+
fileIO, "manifest1.avro", snapshotId1, "dataFile1.parquet", "dataFile2.parquet");
115+
ManifestFile manifestFile2 =
116+
TaskTestUtils.manifestFile(
117+
fileIO, "manifest2.avro", snapshotId1, "dataFile3.parquet", "dataFile4.parquet");
118+
Snapshot snapshot =
119+
TaskTestUtils.newSnapshot(
120+
fileIO, "manifestList.avro", 1, snapshotId1, 99L, manifestFile1, manifestFile2);
121+
StatisticsFile statisticsFile1 =
122+
TaskTestUtils.writeStatsFile(
123+
snapshot.snapshotId(),
124+
snapshot.sequenceNumber(),
125+
"/metadata/" + UUID.randomUUID() + ".stats",
126+
fileIO);
127+
String firstMetadataFile = "v1-295495059.metadata.json";
128+
TableMetadata firstMetadata =
129+
TaskTestUtils.writeTableMetadata(
130+
fileIO, firstMetadataFile, List.of(statisticsFile1), snapshot);
131+
assertThat(TaskUtils.exists(firstMetadataFile, fileIO)).isTrue();
132+
133+
ManifestFile manifestFile3 =
134+
TaskTestUtils.manifestFile(
135+
fileIO, "manifest3.avro", snapshot.snapshotId() + 1, "dataFile5.parquet");
136+
Snapshot snapshot2 =
137+
TaskTestUtils.newSnapshot(
138+
fileIO,
139+
"manifestList2.avro",
140+
snapshot.sequenceNumber() + 1,
141+
snapshot.snapshotId() + 1,
142+
snapshot.snapshotId(),
143+
manifestFile1,
144+
manifestFile3); // exclude manifest2 from the new snapshot
145+
StatisticsFile statisticsFile2 =
146+
TaskTestUtils.writeStatsFile(
147+
snapshot2.snapshotId(),
148+
snapshot2.sequenceNumber(),
149+
"/metadata/" + UUID.randomUUID() + ".stats",
150+
fileIO);
151+
String secondMetadataFile = "v1-295495060.metadata.json";
152+
TableMetadata secondMetadata =
153+
TaskTestUtils.writeTableMetadata(
154+
fileIO,
155+
secondMetadataFile,
156+
firstMetadata,
157+
firstMetadataFile,
158+
List.of(statisticsFile2),
159+
snapshot2);
160+
assertThat(TaskUtils.exists(firstMetadataFile, fileIO)).isTrue();
161+
assertThat(TaskUtils.exists(secondMetadataFile, fileIO)).isTrue();
162+
163+
List<String> cleanupFiles =
164+
Stream.concat(
165+
secondMetadata.previousFiles().stream()
166+
.map(TableMetadata.MetadataLogEntry::file)
167+
.filter(file -> TaskUtils.exists(file, fileIO)),
168+
secondMetadata.statisticsFiles().stream()
169+
.map(StatisticsFile::path)
170+
.filter(file -> TaskUtils.exists(file, fileIO)))
171+
.toList();
172+
173+
TaskEntity task =
174+
new TaskEntity.Builder()
175+
.withTaskType(AsyncTaskType.BATCH_FILE_CLEANUP)
176+
.withData(
177+
new BatchFileCleanupTaskHandler.BatchFileCleanupTask(
178+
tableIdentifier, cleanupFiles))
179+
.setName(UUID.randomUUID().toString())
180+
.build();
181+
182+
addTaskLocation(task);
183+
assertThatPredicate(handler::canHandleTask).accepts(task);
184+
assertThat(handler.handleTask(task, callCtx)).isTrue();
185+
186+
assertThatPredicate((String file) -> TaskUtils.exists(file, fileIO))
187+
.rejects(firstMetadataFile);
188+
assertThatPredicate((String file) -> TaskUtils.exists(file, fileIO))
189+
.rejects(statisticsFile1.path());
190+
assertThatPredicate((String file) -> TaskUtils.exists(file, fileIO))
191+
.rejects(statisticsFile2.path());
192+
}
193+
}
194+
195+
@Test
196+
public void testMetadataFileCleanupIfFileNotExist() throws IOException {
197+
PolarisCallContext polarisCallContext =
198+
new PolarisCallContext(
199+
metaStoreManagerFactory.getOrCreateSessionSupplier(realmContext).get(),
200+
new PolarisDefaultDiagServiceImpl());
201+
try (CallContext callCtx = CallContext.of(realmContext, polarisCallContext)) {
202+
CallContext.setCurrentContext(callCtx);
203+
FileIO fileIO = new InMemoryFileIO();
204+
TableIdentifier tableIdentifier =
205+
TableIdentifier.of(Namespace.of("db1", "schema1"), "table1");
206+
BatchFileCleanupTaskHandler handler =
207+
new BatchFileCleanupTaskHandler(
208+
buildTaskFileIOSupplier(fileIO), Executors.newSingleThreadExecutor());
209+
long snapshotId = 100L;
210+
ManifestFile manifestFile =
211+
TaskTestUtils.manifestFile(
212+
fileIO, "manifest1.avro", snapshotId, "dataFile1.parquet", "dataFile2.parquet");
213+
TestSnapshot snapshot =
214+
TaskTestUtils.newSnapshot(fileIO, "manifestList.avro", 1, snapshotId, 99L, manifestFile);
215+
String metadataFile = "v1-49494949.metadata.json";
216+
StatisticsFile statisticsFile =
217+
TaskTestUtils.writeStatsFile(
218+
snapshot.snapshotId(),
219+
snapshot.sequenceNumber(),
220+
"/metadata/" + UUID.randomUUID() + ".stats",
221+
fileIO);
222+
TaskTestUtils.writeTableMetadata(fileIO, metadataFile, List.of(statisticsFile), snapshot);
223+
224+
fileIO.deleteFile(statisticsFile.path());
225+
assertThat(TaskUtils.exists(statisticsFile.path(), fileIO)).isFalse();
226+
227+
TaskEntity task =
228+
new TaskEntity.Builder()
229+
.withTaskType(AsyncTaskType.BATCH_FILE_CLEANUP)
230+
.withData(
231+
new BatchFileCleanupTaskHandler.BatchFileCleanupTask(
232+
tableIdentifier, List.of(statisticsFile.path())))
233+
.setName(UUID.randomUUID().toString())
234+
.build();
235+
236+
addTaskLocation(task);
237+
assertThatPredicate(handler::canHandleTask).accepts(task);
238+
assertThat(handler.handleTask(task, callCtx)).isTrue();
239+
}
240+
}
241+
242+
@Test
243+
public void testCleanupWithRetries() throws IOException {
244+
PolarisCallContext polarisCallContext =
245+
new PolarisCallContext(
246+
metaStoreManagerFactory.getOrCreateSessionSupplier(realmContext).get(),
247+
new PolarisDefaultDiagServiceImpl());
248+
try (CallContext callCtx = CallContext.of(realmContext, polarisCallContext)) {
249+
CallContext.setCurrentContext(callCtx);
250+
Map<String, AtomicInteger> retryCounter = new HashMap<>();
251+
FileIO fileIO =
252+
new InMemoryFileIO() {
253+
@Override
254+
public void close() {
255+
// no-op
256+
}
257+
258+
@Override
259+
public void deleteFile(String location) {
260+
int attempts =
261+
retryCounter
262+
.computeIfAbsent(location, k -> new AtomicInteger(0))
263+
.incrementAndGet();
264+
if (attempts < 3) {
265+
throw new RuntimeException("Simulating failure to test retries");
266+
} else {
267+
super.deleteFile(location);
268+
}
269+
}
270+
};
271+
TableIdentifier tableIdentifier =
272+
TableIdentifier.of(Namespace.of("db1", "schema1"), "table1");
273+
BatchFileCleanupTaskHandler handler =
274+
new BatchFileCleanupTaskHandler(
275+
buildTaskFileIOSupplier(fileIO), Executors.newSingleThreadExecutor());
276+
long snapshotId = 100L;
277+
ManifestFile manifestFile =
278+
TaskTestUtils.manifestFile(
279+
fileIO, "manifest1.avro", snapshotId, "dataFile1.parquet", "dataFile2.parquet");
280+
TestSnapshot snapshot =
281+
TaskTestUtils.newSnapshot(fileIO, "manifestList.avro", 1, snapshotId, 99L, manifestFile);
282+
String metadataFile = "v1-49494949.metadata.json";
283+
StatisticsFile statisticsFile =
284+
TaskTestUtils.writeStatsFile(
285+
snapshot.snapshotId(),
286+
snapshot.sequenceNumber(),
287+
"/metadata/" + UUID.randomUUID() + ".stats",
288+
fileIO);
289+
TaskTestUtils.writeTableMetadata(fileIO, metadataFile, List.of(statisticsFile), snapshot);
290+
assertThat(TaskUtils.exists(statisticsFile.path(), fileIO)).isTrue();
291+
292+
TaskEntity task =
293+
new TaskEntity.Builder()
294+
.withTaskType(AsyncTaskType.BATCH_FILE_CLEANUP)
295+
.withData(
296+
new BatchFileCleanupTaskHandler.BatchFileCleanupTask(
297+
tableIdentifier, List.of(statisticsFile.path())))
298+
.setName(UUID.randomUUID().toString())
299+
.build();
300+
301+
CompletableFuture<Void> future =
302+
CompletableFuture.runAsync(
303+
() -> {
304+
CallContext.setCurrentContext(callCtx);
305+
addTaskLocation(task);
306+
assertThatPredicate(handler::canHandleTask).accepts(task);
307+
handler.handleTask(task, callCtx); // this will schedule the batch deletion
308+
});
309+
310+
// Wait for all async tasks to finish
311+
future.join();
312+
313+
// Check if the file was successfully deleted after retries
314+
assertThat(TaskUtils.exists(statisticsFile.path(), fileIO)).isFalse();
315+
316+
// Ensure that retries happened as expected
317+
assertThat(retryCounter.containsKey(statisticsFile.path())).isTrue();
318+
assertThat(retryCounter.get(statisticsFile.path()).get()).isEqualTo(3);
319+
}
320+
}
321+
}

0 commit comments

Comments
 (0)