Skip to content

Commit a41c004

Browse files
cleaning up partition stats
1 parent 328c2e5 commit a41c004

File tree

4 files changed

+103
-25
lines changed

4 files changed

+103
-25
lines changed

quarkus/service/src/test/java/org/apache/polaris/service/quarkus/task/BatchFileCleanupTaskHandlerTest.java

Lines changed: 31 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import java.util.concurrent.atomic.AtomicInteger;
3636
import java.util.stream.Stream;
3737
import org.apache.iceberg.ManifestFile;
38+
import org.apache.iceberg.PartitionStatisticsFile;
3839
import org.apache.iceberg.Snapshot;
3940
import org.apache.iceberg.StatisticsFile;
4041
import org.apache.iceberg.TableMetadata;
@@ -124,10 +125,20 @@ public void close() {
124125
snapshot.sequenceNumber(),
125126
"/metadata/" + UUID.randomUUID() + ".stats",
126127
fileIO);
128+
PartitionStatisticsFile partitionStatisticsFile1 =
129+
TaskTestUtils.writePartitionStatsFile(
130+
snapshot.snapshotId(),
131+
snapshot.sequenceNumber(),
132+
"/metadata/" + UUID.randomUUID() + ".partition_stats",
133+
fileIO);
127134
String firstMetadataFile = "v1-295495059.metadata.json";
128135
TableMetadata firstMetadata =
129136
TaskTestUtils.writeTableMetadata(
130-
fileIO, firstMetadataFile, List.of(statisticsFile1), snapshot);
137+
fileIO,
138+
firstMetadataFile,
139+
List.of(statisticsFile1),
140+
List.of(partitionStatisticsFile1),
141+
snapshot);
131142
assertThat(TaskUtils.exists(firstMetadataFile, fileIO)).isTrue();
132143

133144
ManifestFile manifestFile3 =
@@ -148,6 +159,12 @@ public void close() {
148159
snapshot2.sequenceNumber(),
149160
"/metadata/" + UUID.randomUUID() + ".stats",
150161
fileIO);
162+
PartitionStatisticsFile partitionStatisticsFile2 =
163+
TaskTestUtils.writePartitionStatsFile(
164+
snapshot2.snapshotId(),
165+
snapshot2.sequenceNumber(),
166+
"/metadata/" + UUID.randomUUID() + ".partition_stats",
167+
fileIO);
151168
String secondMetadataFile = "v1-295495060.metadata.json";
152169
TableMetadata secondMetadata =
153170
TaskTestUtils.writeTableMetadata(
@@ -156,18 +173,21 @@ public void close() {
156173
firstMetadata,
157174
firstMetadataFile,
158175
List.of(statisticsFile2),
176+
List.of(partitionStatisticsFile2),
159177
snapshot2);
160178
assertThat(TaskUtils.exists(firstMetadataFile, fileIO)).isTrue();
161179
assertThat(TaskUtils.exists(secondMetadataFile, fileIO)).isTrue();
162180

163181
List<String> cleanupFiles =
164-
Stream.concat(
165-
secondMetadata.previousFiles().stream()
166-
.map(TableMetadata.MetadataLogEntry::file)
167-
.filter(file -> TaskUtils.exists(file, fileIO)),
168-
secondMetadata.statisticsFiles().stream()
169-
.map(StatisticsFile::path)
170-
.filter(file -> TaskUtils.exists(file, fileIO)))
182+
Stream.of(
183+
secondMetadata.previousFiles().stream().map(TableMetadata.MetadataLogEntry::file),
184+
secondMetadata.statisticsFiles().stream().map(StatisticsFile::path),
185+
firstMetadata.partitionStatisticsFiles().stream()
186+
.map(PartitionStatisticsFile::path),
187+
secondMetadata.partitionStatisticsFiles().stream()
188+
.map(PartitionStatisticsFile::path))
189+
.flatMap(s -> s)
190+
.filter(file -> TaskUtils.exists(file, fileIO))
171191
.toList();
172192

173193
TaskEntity task =
@@ -183,12 +203,9 @@ public void close() {
183203
assertThatPredicate(handler::canHandleTask).accepts(task);
184204
assertThat(handler.handleTask(task, callCtx)).isTrue();
185205

186-
assertThatPredicate((String file) -> TaskUtils.exists(file, fileIO))
187-
.rejects(firstMetadataFile);
188-
assertThatPredicate((String file) -> TaskUtils.exists(file, fileIO))
189-
.rejects(statisticsFile1.path());
190-
assertThatPredicate((String file) -> TaskUtils.exists(file, fileIO))
191-
.rejects(statisticsFile2.path());
206+
for (String cleanupFile : cleanupFiles) {
207+
assertThatPredicate((String file) -> TaskUtils.exists(file, fileIO)).rejects(cleanupFile);
208+
}
192209
}
193210
}
194211

quarkus/service/src/test/java/org/apache/polaris/service/quarkus/task/TableCleanupTaskHandlerTest.java

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import org.apache.commons.codec.binary.Base64;
3333
import org.apache.iceberg.ManifestFile;
3434
import org.apache.iceberg.ManifestFiles;
35+
import org.apache.iceberg.PartitionStatisticsFile;
3536
import org.apache.iceberg.Snapshot;
3637
import org.apache.iceberg.StatisticsFile;
3738
import org.apache.iceberg.TableMetadata;
@@ -506,10 +507,20 @@ public void testTableCleanupMultipleMetadata() throws IOException {
506507
snapshot.sequenceNumber(),
507508
"/metadata/" + UUID.randomUUID() + ".stats",
508509
fileIO);
510+
PartitionStatisticsFile partitionStatisticsFile1 =
511+
TaskTestUtils.writePartitionStatsFile(
512+
snapshot.snapshotId(),
513+
snapshot.sequenceNumber(),
514+
"/metadata/" + UUID.randomUUID() + ".partition_stats",
515+
fileIO);
509516
String firstMetadataFile = "v1-295495059.metadata.json";
510517
TableMetadata firstMetadata =
511518
TaskTestUtils.writeTableMetadata(
512-
fileIO, firstMetadataFile, List.of(statisticsFile1), snapshot);
519+
fileIO,
520+
firstMetadataFile,
521+
List.of(statisticsFile1),
522+
List.of(partitionStatisticsFile1),
523+
snapshot);
513524
assertThat(TaskUtils.exists(firstMetadataFile, fileIO)).isTrue();
514525

515526
ManifestFile manifestFile3 =
@@ -530,13 +541,20 @@ public void testTableCleanupMultipleMetadata() throws IOException {
530541
snapshot2.sequenceNumber(),
531542
"/metadata/" + UUID.randomUUID() + ".stats",
532543
fileIO);
544+
PartitionStatisticsFile partitionStatisticsFile2 =
545+
TaskTestUtils.writePartitionStatsFile(
546+
snapshot2.snapshotId(),
547+
snapshot2.sequenceNumber(),
548+
"/metadata/" + UUID.randomUUID() + ".partition_stats",
549+
fileIO);
533550
String secondMetadataFile = "v1-295495060.metadata.json";
534551
TaskTestUtils.writeTableMetadata(
535552
fileIO,
536553
secondMetadataFile,
537554
firstMetadata,
538555
firstMetadataFile,
539556
List.of(statisticsFile2),
557+
List.of(partitionStatisticsFile2),
540558
snapshot2);
541559
assertThat(TaskUtils.exists(firstMetadataFile, fileIO)).isTrue();
542560
assertThat(TaskUtils.exists(secondMetadataFile, fileIO)).isTrue();
@@ -596,7 +614,9 @@ public void testTableCleanupMultipleMetadata() throws IOException {
596614
snapshot.manifestListLocation(),
597615
snapshot2.manifestListLocation(),
598616
statisticsFile1.path(),
599-
statisticsFile2.path())),
617+
statisticsFile2.path(),
618+
partitionStatisticsFile1.path(),
619+
partitionStatisticsFile2.path())),
600620
entity ->
601621
entity.readData(
602622
BatchFileCleanupTaskHandler.BatchFileCleanupTask.class)));

quarkus/service/src/test/java/org/apache/polaris/service/quarkus/task/TaskTestUtils.java

Lines changed: 42 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,12 @@
3030
import org.apache.iceberg.FileFormat;
3131
import org.apache.iceberg.GenericBlobMetadata;
3232
import org.apache.iceberg.GenericStatisticsFile;
33+
import org.apache.iceberg.ImmutableGenericPartitionStatisticsFile;
3334
import org.apache.iceberg.ManifestFile;
3435
import org.apache.iceberg.ManifestFiles;
3536
import org.apache.iceberg.ManifestWriter;
3637
import org.apache.iceberg.PartitionSpec;
38+
import org.apache.iceberg.PartitionStatisticsFile;
3739
import org.apache.iceberg.Schema;
3840
import org.apache.iceberg.Snapshot;
3941
import org.apache.iceberg.SortOrder;
@@ -71,7 +73,7 @@ static ManifestFile manifestFile(
7173

7274
static TableMetadata writeTableMetadata(FileIO fileIO, String metadataFile, Snapshot... snapshots)
7375
throws IOException {
74-
return writeTableMetadata(fileIO, metadataFile, null, null, null, snapshots);
76+
return writeTableMetadata(fileIO, metadataFile, null, null, null, null, snapshots);
7577
}
7678

7779
static TableMetadata writeTableMetadata(
@@ -80,7 +82,18 @@ static TableMetadata writeTableMetadata(
8082
List<StatisticsFile> statisticsFiles,
8183
Snapshot... snapshots)
8284
throws IOException {
83-
return writeTableMetadata(fileIO, metadataFile, null, null, statisticsFiles, snapshots);
85+
return writeTableMetadata(fileIO, metadataFile, null, null, statisticsFiles, null, snapshots);
86+
}
87+
88+
static TableMetadata writeTableMetadata(
89+
FileIO fileIO,
90+
String metadataFile,
91+
List<StatisticsFile> statisticsFiles,
92+
List<PartitionStatisticsFile> partitionStatsFiles,
93+
Snapshot... snapshots)
94+
throws IOException {
95+
return writeTableMetadata(
96+
fileIO, metadataFile, null, null, statisticsFiles, partitionStatsFiles, snapshots);
8497
}
8598

8699
static TableMetadata writeTableMetadata(
@@ -89,6 +102,7 @@ static TableMetadata writeTableMetadata(
89102
TableMetadata prevMetadata,
90103
String prevMetadataFile,
91104
List<StatisticsFile> statisticsFiles,
105+
List<PartitionStatisticsFile> partitionStatsFiles,
92106
Snapshot... snapshots)
93107
throws IOException {
94108
TableMetadata.Builder tmBuilder;
@@ -106,11 +120,15 @@ static TableMetadata writeTableMetadata(
106120
.addPartitionSpec(PartitionSpec.unpartitioned());
107121

108122
int statisticsFileIndex = 0;
123+
int partitionStatsFileIndex = 0;
109124
for (Snapshot snapshot : snapshots) {
110125
tmBuilder.addSnapshot(snapshot);
111126
if (statisticsFiles != null) {
112127
tmBuilder.setStatistics(statisticsFiles.get(statisticsFileIndex++));
113128
}
129+
if (partitionStatsFiles != null) {
130+
tmBuilder.setPartitionStatistics(partitionStatsFiles.get(partitionStatsFileIndex++));
131+
}
114132
}
115133
TableMetadata tableMetadata = tmBuilder.build();
116134
PositionOutputStream out = fileIO.newOutputFile(metadataFile).createOrOverwrite();
@@ -161,4 +179,26 @@ public static StatisticsFile writeStatsFile(
161179
puffinWriter.writtenBlobsMetadata().stream().map(GenericBlobMetadata::from).toList());
162180
}
163181
}
182+
183+
public static PartitionStatisticsFile writePartitionStatsFile(
184+
long snapshotId, long snapshotSequenceNumber, String statsLocation, FileIO fileIO)
185+
throws IOException {
186+
187+
try (PuffinWriter puffinWriter = Puffin.write(fileIO.newOutputFile(statsLocation)).build()) {
188+
puffinWriter.add(
189+
new Blob(
190+
"some-blob-type",
191+
List.of(1),
192+
snapshotId,
193+
snapshotSequenceNumber,
194+
ByteBuffer.wrap("blob content".getBytes(StandardCharsets.UTF_8))));
195+
puffinWriter.finish();
196+
197+
return ImmutableGenericPartitionStatisticsFile.builder()
198+
.snapshotId(snapshotId)
199+
.path(statsLocation)
200+
.fileSizeInBytes(puffinWriter.fileSize())
201+
.build();
202+
}
203+
}
164204
}

service/common/src/main/java/org/apache/polaris/service/task/TableCleanupTaskHandler.java

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import java.util.stream.Collectors;
2626
import java.util.stream.Stream;
2727
import org.apache.iceberg.ManifestFile;
28+
import org.apache.iceberg.PartitionStatisticsFile;
2829
import org.apache.iceberg.Snapshot;
2930
import org.apache.iceberg.StatisticsFile;
3031
import org.apache.iceberg.TableMetadata;
@@ -112,7 +113,6 @@ public boolean handleTask(TaskEntity cleanupTask, CallContext callContext) {
112113
metaStoreManager,
113114
polarisCallContext);
114115

115-
// TODO: handle partition statistics files
116116
Stream<TaskEntity> metadataFileCleanupTasks =
117117
getMetadataTaskStream(
118118
cleanupTask,
@@ -243,12 +243,13 @@ private Stream<TaskEntity> getMetadataTaskStream(
243243
private List<List<String>> getMetadataFileBatches(TableMetadata tableMetadata, int batchSize) {
244244
List<List<String>> result = new ArrayList<>();
245245
List<String> metadataFiles =
246-
Stream.concat(
247-
Stream.concat(
248-
tableMetadata.previousFiles().stream()
249-
.map(TableMetadata.MetadataLogEntry::file),
250-
tableMetadata.snapshots().stream().map(Snapshot::manifestListLocation)),
251-
tableMetadata.statisticsFiles().stream().map(StatisticsFile::path))
246+
Stream.of(
247+
tableMetadata.previousFiles().stream().map(TableMetadata.MetadataLogEntry::file),
248+
tableMetadata.snapshots().stream().map(Snapshot::manifestListLocation),
249+
tableMetadata.statisticsFiles().stream().map(StatisticsFile::path),
250+
tableMetadata.partitionStatisticsFiles().stream()
251+
.map(PartitionStatisticsFile::path))
252+
.flatMap(s -> s)
252253
.toList();
253254

254255
for (int i = 0; i < metadataFiles.size(); i += batchSize) {

0 commit comments

Comments
 (0)