Skip to content
This repository was archived by the owner on Nov 11, 2022. It is now read-only.

Commit 9c3af6e

Browse files
authored
Merge pull request #484 from jkff/file-based-sink
FileBasedSink removes temp files from successful bundles
2 parents 6706611 + fdbb51f commit 9c3af6e

File tree

2 files changed

+39
-9
lines changed

2 files changed

+39
-9
lines changed

sdk/src/main/java/com/google/cloud/dataflow/sdk/io/FileBasedSink.java

Lines changed: 29 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,9 @@
4444
import java.util.ArrayList;
4545
import java.util.Collection;
4646
import java.util.Collections;
47+
import java.util.HashSet;
4748
import java.util.List;
49+
import java.util.Set;
4850

4951
/**
5052
* Abstract {@link Sink} for file-based output. An implementation of FileBasedSink writes file-based
@@ -312,7 +314,7 @@ public void finalize(Iterable<FileResult> writerResults, PipelineOptions options
312314

313315
// Optionally remove temporary files.
314316
if (temporaryFileRetention == TemporaryFileRetention.REMOVE) {
315-
removeTemporaryFiles(options);
317+
removeTemporaryFiles(files, options);
316318
}
317319
}
318320

@@ -369,22 +371,44 @@ protected final List<String> generateDestinationFilenames(int numFiles) {
369371
return destFilenames;
370372
}
371373

374+
/**
375+
* Use {@link #removeTemporaryFiles(Collection, PipelineOptions)} instead.
376+
*/
377+
@Deprecated
378+
protected final void removeTemporaryFiles(PipelineOptions options) throws IOException {
379+
removeTemporaryFiles(Collections.<String>emptyList(), options);
380+
}
381+
372382
/**
373383
* Removes temporary output files. Uses the temporary filename to find files to remove.
374384
*
385+
* <p>Additionally, to partially mitigate the effects of filesystems with eventually-consistent
386+
* directory matching APIs, takes a list of files that are known to exist - i.e. removes the
387+
* union of the known files and files that the filesystem says exist in the directory.
388+
*
389+
* <p>Assumes that, if globbing had been strongly consistent, it would have matched all
390+
* of knownFiles - i.e. on a strongly consistent filesystem, knownFiles can be ignored.
391+
*
375392
* <p>Can be called from subclasses that override {@link FileBasedWriteOperation#finalize}.
376393
* <b>Note:</b>If finalize is overridden and does <b>not</b> rename or otherwise finalize
377394
* temporary files, this method will remove them.
378395
*/
379-
protected final void removeTemporaryFiles(PipelineOptions options) throws IOException {
396+
protected final void removeTemporaryFiles(
397+
Collection<String> knownFiles, PipelineOptions options) throws IOException {
380398
String pattern = buildTemporaryFilename(baseTemporaryFilename, "*");
381399
LOG.debug("Finding temporary bundle output files matching {}.", pattern);
382400
FileOperations fileOperations = FileOperationsFactory.getFileOperations(pattern, options);
383401
IOChannelFactory factory = IOChannelUtils.getFactory(pattern);
384402
Collection<String> matches = factory.match(pattern);
385-
LOG.debug("{} temporary files matched {}", matches.size(), pattern);
386-
LOG.debug("Removing {} files.", matches.size());
387-
fileOperations.remove(matches);
403+
Set<String> allMatches = new HashSet<>(matches);
404+
allMatches.addAll(knownFiles);
405+
LOG.debug(
406+
"Removing {} temporary files matching {} ({} matched glob, {} additional known files)",
407+
allMatches.size(),
408+
pattern,
409+
matches.size(),
410+
allMatches.size() - matches.size());
411+
fileOperations.remove(allMatches);
388412
}
389413

390414
/**

sdk/src/test/java/com/google/cloud/dataflow/sdk/io/FileBasedSinkTest.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -248,22 +248,28 @@ private void testRemoveTemporaryFiles(int numFiles, String baseTemporaryFilename
248248
PipelineOptions options = PipelineOptionsFactory.create();
249249
SimpleSink.SimpleWriteOperation writeOp = buildWriteOperation(baseTemporaryFilename);
250250

251-
List<File> temporaryFiles = new ArrayList<>();
251+
List<String> temporaryFiles = new ArrayList<>();
252252
List<File> outputFiles = new ArrayList<>();
253253
for (int i = 0; i < numFiles; i++) {
254254
File tmpFile = tmpFolder.newFile(
255255
FileBasedWriteOperation.buildTemporaryFilename(baseTemporaryFilename, "" + i));
256-
temporaryFiles.add(tmpFile);
256+
temporaryFiles.add(tmpFile.getAbsolutePath());
257257
File outputFile = tmpFolder.newFile(baseOutputFilename + i);
258258
outputFiles.add(outputFile);
259259
}
260260

261-
writeOp.removeTemporaryFiles(options);
261+
File extraFile = new File(tmpFolder.newFolder("extra"), "foo");
262+
assertTrue(extraFile.createNewFile());
263+
264+
temporaryFiles.add(extraFile.getAbsolutePath());
265+
266+
writeOp.removeTemporaryFiles(temporaryFiles, options);
262267

263268
for (int i = 0; i < numFiles; i++) {
264-
assertFalse(temporaryFiles.get(i).exists());
269+
assertFalse(new File(temporaryFiles.get(i)).exists());
265270
assertTrue(outputFiles.get(i).exists());
266271
}
272+
assertFalse(extraFile.exists());
267273
}
268274

269275
/**

0 commit comments

Comments
 (0)