Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ public void insertAll(Iterator<Product2<K, V>> records) throws IOException {
final File file = tempShuffleBlockIdPlusFile._2();
final BlockId blockId = tempShuffleBlockIdPlusFile._1();
partitionWriters[i] =
blockManager.getDiskWriter(blockId, file, serInstance, fileBufferSize, writeMetrics).open();
blockManager.getDiskWriter(blockId, file, serInstance, fileBufferSize, writeMetrics);
}
// Creating the file to write to and creating a disk writer both involve interacting with
// the disk, and can take a long time in aggregate when we open many files, so should be
Expand Down Expand Up @@ -143,15 +143,20 @@ public long[] writePartitionedFile(
boolean threwException = true;
try {
for (int i = 0; i < numPartitions; i++) {
final FileInputStream in = new FileInputStream(partitionWriters[i].fileSegment().file());
File file = partitionWriters[i].fileSegment().file();
if (!file.exists()) {
lengths[i] = 0;
continue;
}
final FileInputStream in = new FileInputStream(file);
boolean copyThrewException = true;
try {
lengths[i] = Utils.copyStream(in, out, false, transferToEnabled);
copyThrewException = false;
} finally {
Closeables.close(in, copyThrewException);
}
if (!partitionWriters[i].fileSegment().file().delete()) {
if (!file.delete()) {
logger.error("Unable to delete file for partition {}", i);
}
}
Expand Down