4242import org .slf4j .LoggerFactory ;
4343
4444/**
45- * {@link TaskHandler} responsible for deleting all of the files in a manifest and the manifest
46- * itself. Since data files may be present in multiple manifests across different snapshots, we
47- * assume a data file that doesn't exist is missing because it was already deleted by another task.
45+ * {@link TaskHandler} responsible for deleting table files: 1. Manifest files: It contains all the
46+ * files in a manifest and the manifest itself. Since data files may be present in multiple
47+ * manifests across different snapshots, we assume a data file that doesn't exist is missing because
48+ * it was already deleted by another task. 2. Table metadata files: It contains previous metadata
49+ * and statistics files, which are grouped and deleted in batch
4850 */
51+ // TODO: Rename this class since we introducing metadata cleanup here
4952public class ManifestFileCleanupTaskHandler implements TaskHandler {
5053 public static final int MAX_ATTEMPTS = 3 ;
5154 public static final int FILE_DELETION_RETRY_MILLIS = 100 ;
@@ -62,66 +65,119 @@ public ManifestFileCleanupTaskHandler(
6265
6366 @ Override
6467 public boolean canHandleTask (TaskEntity task ) {
65- return task .getTaskType () == AsyncTaskType .FILE_CLEANUP ;
68+ return task .getTaskType () == AsyncTaskType .MANIFEST_FILE_CLEANUP
69+ || task .getTaskType () == AsyncTaskType .METADATA_FILE_BATCH_CLEANUP ;
6670 }
6771
6872 @ Override
6973 public boolean handleTask (TaskEntity task ) {
7074 ManifestCleanupTask cleanupTask = task .readData (ManifestCleanupTask .class );
71- ManifestFile manifestFile = decodeManifestData (cleanupTask .getManifestFileData ());
7275 TableIdentifier tableId = cleanupTask .getTableId ();
7376 try (FileIO authorizedFileIO = fileIOSupplier .apply (task )) {
74-
75- // if the file doesn't exist, we assume that another task execution was successful, but failed
76- // to drop the task entity. Log a warning and return success
77- if (!TaskUtils .exists (manifestFile .path (), authorizedFileIO )) {
77+ if (task .getTaskType () == AsyncTaskType .MANIFEST_FILE_CLEANUP ) {
78+ ManifestFile manifestFile = decodeManifestData (cleanupTask .getManifestFileData ());
79+ return cleanUpManifestFile (manifestFile , authorizedFileIO , tableId );
80+ } else if (task .getTaskType () == AsyncTaskType .METADATA_FILE_BATCH_CLEANUP ) {
81+ return cleanUpMetadataFiles (cleanupTask .getMetadataFiles (), authorizedFileIO , tableId );
82+ } else {
7883 LOGGER
7984 .atWarn ()
80- .addKeyValue ("manifestFile" , manifestFile .path ())
8185 .addKeyValue ("tableId" , tableId )
82- .log ("Manifest cleanup task scheduled, but manifest file doesn't exist" );
83- return true ;
84- }
85-
86- ManifestReader <DataFile > dataFiles = ManifestFiles .read (manifestFile , authorizedFileIO );
87- List <CompletableFuture <Void >> dataFileDeletes =
88- StreamSupport .stream (
89- Spliterators .spliteratorUnknownSize (dataFiles .iterator (), Spliterator .IMMUTABLE ),
90- false )
91- .map (
92- file ->
93- tryDelete (
94- tableId , authorizedFileIO , manifestFile , file .path ().toString (), null , 1 ))
95- .toList ();
96- LOGGER .debug (
97- "Scheduled {} data files to be deleted from manifest {}" ,
98- dataFileDeletes .size (),
99- manifestFile .path ());
100- try {
101- // wait for all data files to be deleted, then wait for the manifest itself to be deleted
102- CompletableFuture .allOf (dataFileDeletes .toArray (CompletableFuture []::new ))
103- .thenCompose (
104- (v ) -> {
105- LOGGER
106- .atInfo ()
107- .addKeyValue ("manifestFile" , manifestFile .path ())
108- .log ("All data files in manifest deleted - deleting manifest" );
109- return tryDelete (
110- tableId , authorizedFileIO , manifestFile , manifestFile .path (), null , 1 );
111- })
112- .get ();
113- return true ;
114- } catch (InterruptedException e ) {
115- LOGGER .error (
116- "Interrupted exception deleting data files from manifest {}" , manifestFile .path (), e );
117- throw new RuntimeException (e );
118- } catch (ExecutionException e ) {
119- LOGGER .error ("Unable to delete data files from manifest {}" , manifestFile .path (), e );
86+ .log ("Unknown task type {}" , task .getTaskType ());
12087 return false ;
12188 }
12289 }
12390 }
12491
92+ private boolean cleanUpManifestFile (
93+ ManifestFile manifestFile , FileIO fileIO , TableIdentifier tableId ) {
94+ // if the file doesn't exist, we assume that another task execution was successful, but
95+ // failed to drop the task entity. Log a warning and return success
96+ if (!TaskUtils .exists (manifestFile .path (), fileIO )) {
97+ LOGGER
98+ .atWarn ()
99+ .addKeyValue ("manifestFile" , manifestFile .path ())
100+ .addKeyValue ("tableId" , tableId )
101+ .log ("Manifest cleanup task scheduled, but manifest file doesn't exist" );
102+ return true ;
103+ }
104+
105+ ManifestReader <DataFile > dataFiles = ManifestFiles .read (manifestFile , fileIO );
106+ List <CompletableFuture <Void >> dataFileDeletes =
107+ StreamSupport .stream (
108+ Spliterators .spliteratorUnknownSize (dataFiles .iterator (), Spliterator .IMMUTABLE ),
109+ false )
110+ .map (file -> tryDelete (tableId , fileIO , manifestFile , file .path ().toString (), null , 1 ))
111+ .toList ();
112+ LOGGER .debug (
113+ "Scheduled {} data files to be deleted from manifest {}" ,
114+ dataFileDeletes .size (),
115+ manifestFile .path ());
116+ try {
117+ // wait for all data files to be deleted, then wait for the manifest itself to be deleted
118+ CompletableFuture .allOf (dataFileDeletes .toArray (CompletableFuture []::new ))
119+ .thenCompose (
120+ (v ) -> {
121+ LOGGER
122+ .atInfo ()
123+ .addKeyValue ("manifestFile" , manifestFile .path ())
124+ .log ("All data files in manifest deleted - deleting manifest" );
125+ return tryDelete (tableId , fileIO , manifestFile , manifestFile .path (), null , 1 );
126+ })
127+ .get ();
128+ return true ;
129+ } catch (InterruptedException e ) {
130+ LOGGER .error (
131+ "Interrupted exception deleting data files from manifest {}" , manifestFile .path (), e );
132+ throw new RuntimeException (e );
133+ } catch (ExecutionException e ) {
134+ LOGGER .error ("Unable to delete data files from manifest {}" , manifestFile .path (), e );
135+ return false ;
136+ }
137+ }
138+
139+ private boolean cleanUpMetadataFiles (
140+ List <String > metadataFiles , FileIO fileIO , TableIdentifier tableId ) {
141+ List <String > validFiles =
142+ metadataFiles .stream ().filter (file -> TaskUtils .exists (file , fileIO )).toList ();
143+ if (validFiles .isEmpty ()) {
144+ LOGGER
145+ .atWarn ()
146+ .addKeyValue ("metadataFiles" , metadataFiles .toString ())
147+ .addKeyValue ("tableId" , tableId )
148+ .log ("Table metadata cleanup task scheduled, but the none of the file in batch exists" );
149+ return true ;
150+ }
151+ if (validFiles .size () < metadataFiles .size ()) {
152+ List <String > missingFiles =
153+ metadataFiles .stream ().filter (file -> !TaskUtils .exists (file , fileIO )).toList ();
154+ LOGGER
155+ .atWarn ()
156+ .addKeyValue ("metadataFiles" , metadataFiles .toString ())
157+ .addKeyValue ("missingFiles" , missingFiles )
158+ .addKeyValue ("tableId" , tableId )
159+ .log (
160+ "Table metadata cleanup task scheduled, but {} files in the batch are missing" ,
161+ missingFiles .size ());
162+ }
163+
164+ // Schedule the deletion for each file asynchronously
165+ List <CompletableFuture <Void >> deleteFutures =
166+ validFiles .stream ().map (file -> tryDelete (tableId , fileIO , null , file , null , 1 )).toList ();
167+
168+ try {
169+ // Wait for all delete operations to finish
170+ CompletableFuture <Void > allDeletes =
171+ CompletableFuture .allOf (deleteFutures .toArray (new CompletableFuture [0 ]));
172+ allDeletes .join ();
173+ } catch (Exception e ) {
174+ LOGGER .error ("Exception detected during metadata file deletion" , e );
175+ return false ;
176+ }
177+
178+ return true ;
179+ }
180+
125181 private static ManifestFile decodeManifestData (String manifestFileData ) {
126182 try {
127183 return ManifestFiles .decode (Base64 .decodeBase64 (manifestFileData ));
@@ -134,16 +190,16 @@ private CompletableFuture<Void> tryDelete(
134190 TableIdentifier tableId ,
135191 FileIO fileIO ,
136192 ManifestFile manifestFile ,
137- String dataFile ,
193+ String file ,
138194 Throwable e ,
139195 int attempt ) {
140196 if (e != null && attempt <= MAX_ATTEMPTS ) {
141197 LOGGER
142198 .atWarn ()
143- .addKeyValue ("dataFile " , dataFile )
199+ .addKeyValue ("file " , file )
144200 .addKeyValue ("attempt" , attempt )
145201 .addKeyValue ("error" , e .getMessage ())
146- .log ("Error encountered attempting to delete data file" );
202+ .log ("Error encountered attempting to delete file" );
147203 }
148204 if (attempt > MAX_ATTEMPTS && e != null ) {
149205 return CompletableFuture .failedFuture (e );
@@ -155,27 +211,27 @@ private CompletableFuture<Void> tryDelete(
155211 // file's existence, but then it is deleted before we have a chance to
156212 // send the delete request. In such a case, we <i>should</i> retry
157213 // and find
158- if (TaskUtils .exists (dataFile , fileIO )) {
159- fileIO .deleteFile (dataFile );
214+ if (TaskUtils .exists (file , fileIO )) {
215+ fileIO .deleteFile (file );
160216 } else {
161217 LOGGER
162218 .atInfo ()
163- .addKeyValue ("dataFile " , dataFile )
164- .addKeyValue ("manifestFile" , manifestFile .path ())
219+ .addKeyValue ("file " , file )
220+ .addKeyValue ("manifestFile" , manifestFile != null ? manifestFile .path () : "" )
165221 .addKeyValue ("tableId" , tableId )
166- .log ("Manifest cleanup task scheduled, but data file doesn't exist" );
222+ .log ("table file cleanup task scheduled, but data file doesn't exist" );
167223 }
168224 },
169225 executorService )
170226 .exceptionallyComposeAsync (
171227 newEx -> {
172228 LOGGER
173229 .atWarn ()
174- .addKeyValue ("dataFile" , dataFile )
175- .addKeyValue ("tableIdentifer " , tableId )
176- .addKeyValue ("manifestFile" , manifestFile .path ())
230+ .addKeyValue ("dataFile" , file )
231+ .addKeyValue ("tableIdentifier " , tableId )
232+ .addKeyValue ("manifestFile" , manifestFile != null ? manifestFile .path () : "" )
177233 .log ("Exception caught deleting data file from manifest" , newEx );
178- return tryDelete (tableId , fileIO , manifestFile , dataFile , newEx , attempt + 1 );
234+ return tryDelete (tableId , fileIO , manifestFile , file , newEx , attempt + 1 );
179235 },
180236 CompletableFuture .delayedExecutor (
181237 FILE_DELETION_RETRY_MILLIS , TimeUnit .MILLISECONDS , executorService ));
@@ -185,12 +241,18 @@ private CompletableFuture<Void> tryDelete(
185241 public static final class ManifestCleanupTask {
186242 private TableIdentifier tableId ;
187243 private String manifestFileData ;
244+ private List <String > metadataFiles ;
188245
189246 public ManifestCleanupTask (TableIdentifier tableId , String manifestFileData ) {
190247 this .tableId = tableId ;
191248 this .manifestFileData = manifestFileData ;
192249 }
193250
251+ public ManifestCleanupTask (TableIdentifier tableId , List <String > metadataFiles ) {
252+ this .tableId = tableId ;
253+ this .metadataFiles = metadataFiles ;
254+ }
255+
194256 public ManifestCleanupTask () {}
195257
196258 public TableIdentifier getTableId () {
@@ -209,17 +271,26 @@ public void setManifestFileData(String manifestFileData) {
209271 this .manifestFileData = manifestFileData ;
210272 }
211273
274+ public List <String > getMetadataFiles () {
275+ return metadataFiles ;
276+ }
277+
278+ public void setMetadataFiles (List <String > metadataFiles ) {
279+ this .metadataFiles = metadataFiles ;
280+ }
281+
212282 @ Override
213283 public boolean equals (Object object ) {
214284 if (this == object ) return true ;
215285 if (!(object instanceof ManifestCleanupTask that )) return false ;
216286 return Objects .equals (tableId , that .tableId )
217- && Objects .equals (manifestFileData , that .manifestFileData );
287+ && Objects .equals (manifestFileData , that .manifestFileData )
288+ && Objects .equals (metadataFiles , that .metadataFiles );
218289 }
219290
220291 @ Override
221292 public int hashCode () {
222- return Objects .hash (tableId , manifestFileData );
293+ return Objects .hash (tableId , manifestFileData , metadataFiles );
223294 }
224295 }
225296}
0 commit comments