diff --git a/common/network-common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java b/common/network-common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java index c20fab83c3460..ea9b3ce4e3522 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java +++ b/common/network-common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java @@ -18,12 +18,13 @@ package org.apache.spark.network.buffer; import java.io.File; -import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; import java.io.RandomAccessFile; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; +import java.nio.file.Files; +import java.nio.file.StandardOpenOption; import com.google.common.base.Objects; import com.google.common.io.ByteStreams; @@ -93,9 +94,9 @@ public ByteBuffer nioByteBuffer() throws IOException { @Override public InputStream createInputStream() throws IOException { - FileInputStream is = null; + InputStream is = null; try { - is = new FileInputStream(file); + is = Files.newInputStream(file.toPath()); ByteStreams.skipFully(is, offset); return new LimitedInputStream(is, length); } catch (IOException e) { @@ -132,7 +133,7 @@ public Object convertToNetty() throws IOException { if (conf.lazyFileDescriptor()) { return new DefaultFileRegion(file, offset, length); } else { - FileChannel fileChannel = new FileInputStream(file).getChannel(); + FileChannel fileChannel = FileChannel.open(file.toPath(), StandardOpenOption.READ); return new DefaultFileRegion(fileChannel, offset, length); } } diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java index 2f160d12af22b..66b67e282c80d 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java @@ -18,11 +18,11 @@ package org.apache.spark.network.shuffle; import java.io.File; -import java.io.FileOutputStream; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.Channels; import java.nio.channels.WritableByteChannel; +import java.nio.file.Files; import java.util.Arrays; import org.slf4j.Logger; @@ -165,7 +165,7 @@ private class DownloadCallback implements StreamCallback { DownloadCallback(int chunkIndex) throws IOException { this.targetFile = tempShuffleFileManager.createTempShuffleFile(); - this.channel = Channels.newChannel(new FileOutputStream(targetFile)); + this.channel = Channels.newChannel(Files.newOutputStream(targetFile.toPath())); this.chunkIndex = chunkIndex; } diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleIndexInformation.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleIndexInformation.java index ec57f0259d55c..39ca9ba574853 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleIndexInformation.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleIndexInformation.java @@ -19,10 +19,10 @@ import java.io.DataInputStream; import java.io.File; -import java.io.FileInputStream; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.LongBuffer; +import java.nio.file.Files; /** * Keeps the index information for a particular map output @@ -38,7 +38,7 @@ public ShuffleIndexInformation(File indexFile) throws IOException { offsets = buffer.asLongBuffer(); DataInputStream dis = null; try { - dis = new DataInputStream(new FileInputStream(indexFile)); + dis = new DataInputStream(Files.newInputStream(indexFile.toPath())); dis.readFully(buffer.array()); } finally { if (dis != null) { diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java index 323a5d3c52831..a9b5236ab8173 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java @@ -18,9 +18,9 @@ package org.apache.spark.shuffle.sort; import java.io.File; -import java.io.FileInputStream; -import java.io.FileOutputStream; import java.io.IOException; +import java.nio.channels.FileChannel; +import static java.nio.file.StandardOpenOption.*; import javax.annotation.Nullable; import scala.None$; @@ -75,7 +75,6 @@ final class BypassMergeSortShuffleWriter extends ShuffleWriter { private static final Logger logger = LoggerFactory.getLogger(BypassMergeSortShuffleWriter.class); private final int fileBufferSize; - private final boolean transferToEnabled; private final int numPartitions; private final BlockManager blockManager; private final Partitioner partitioner; @@ -107,7 +106,6 @@ final class BypassMergeSortShuffleWriter extends ShuffleWriter { SparkConf conf) { // Use getSizeAsKb (not bytes) to maintain backwards compatibility if no units are provided this.fileBufferSize = (int) conf.getSizeAsKb("spark.shuffle.file.buffer", "32k") * 1024; - this.transferToEnabled = conf.getBoolean("spark.file.transferTo", true); this.blockManager = blockManager; final ShuffleDependency dep = handle.dependency(); this.mapId = mapId; @@ -188,17 +186,21 @@ private long[] writePartitionedFile(File outputFile) throws IOException { return lengths; } - final FileOutputStream out = new FileOutputStream(outputFile, true); + // This file needs to opened in append mode in order to work around a Linux kernel bug that + // affects transferTo; see SPARK-3948 for more details. + final FileChannel out = FileChannel.open(outputFile.toPath(), WRITE, APPEND, CREATE); final long writeStartTime = System.nanoTime(); boolean threwException = true; try { for (int i = 0; i < numPartitions; i++) { final File file = partitionWriterSegments[i].file(); if (file.exists()) { - final FileInputStream in = new FileInputStream(file); + final FileChannel in = FileChannel.open(file.toPath(), READ); boolean copyThrewException = true; try { - lengths[i] = Utils.copyStream(in, out, false, transferToEnabled); + long size = in.size(); + Utils.copyFileStreamNIO(in, out, 0, size); + lengths[i] = size; copyThrewException = false; } finally { Closeables.close(in, copyThrewException); diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java index 1b578491b81d7..c0ebe3cc9b792 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java @@ -20,6 +20,7 @@ import javax.annotation.Nullable; import java.io.*; import java.nio.channels.FileChannel; +import static java.nio.file.StandardOpenOption.*; import java.util.Iterator; import scala.Option; @@ -290,7 +291,7 @@ private long[] mergeSpills(SpillInfo[] spills, File outputFile) throws IOExcepti final boolean encryptionEnabled = blockManager.serializerManager().encryptionEnabled(); try { if (spills.length == 0) { - new FileOutputStream(outputFile).close(); // Create an empty file + java.nio.file.Files.newOutputStream(outputFile.toPath()).close(); // Create an empty file return new long[partitioner.numPartitions()]; } else if (spills.length == 1) { // Here, we don't need to perform any metrics updates because the bytes written to this @@ -367,7 +368,7 @@ private long[] mergeSpillsWithFileStream( final InputStream[] spillInputStreams = new InputStream[spills.length]; final OutputStream bos = new BufferedOutputStream( - new FileOutputStream(outputFile), + java.nio.file.Files.newOutputStream(outputFile.toPath()), outputBufferSizeInBytes); // Use a counting output stream to avoid having to close the underlying file and ask // the file system for its size after each partition is written. @@ -442,11 +443,11 @@ private long[] mergeSpillsWithTransferTo(SpillInfo[] spills, File outputFile) th boolean threwException = true; try { for (int i = 0; i < spills.length; i++) { - spillInputChannels[i] = new FileInputStream(spills[i].file).getChannel(); + spillInputChannels[i] = FileChannel.open(spills[i].file.toPath(), READ); } // This file needs to opened in append mode in order to work around a Linux kernel bug that // affects transferTo; see SPARK-3948 for more details. - mergedFileOutputChannel = new FileOutputStream(outputFile, true).getChannel(); + mergedFileOutputChannel = FileChannel.open(outputFile.toPath(), WRITE, CREATE, APPEND); long bytesWrittenToMergedFile = 0; for (int partition = 0; partition < numPartitions; partition++) { diff --git a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala index 15540485170d0..94a3a78e94165 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala @@ -18,6 +18,7 @@ package org.apache.spark.shuffle import java.io._ +import java.nio.file.Files import com.google.common.io.ByteStreams @@ -141,7 +142,8 @@ private[spark] class IndexShuffleBlockResolver( val indexFile = getIndexFile(shuffleId, mapId) val indexTmp = Utils.tempFileWith(indexFile) try { - val out = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(indexTmp))) + val out = new DataOutputStream( + new BufferedOutputStream(Files.newOutputStream(indexTmp.toPath))) Utils.tryWithSafeFinally { // We take in lengths of each block, need to convert it to offsets. var offset = 0L @@ -196,7 +198,7 @@ private[spark] class IndexShuffleBlockResolver( // find out the consolidated file, then the offset within that from our index val indexFile = getIndexFile(blockId.shuffleId, blockId.mapId) - val in = new DataInputStream(new FileInputStream(indexFile)) + val in = new DataInputStream(Files.newInputStream(indexFile.toPath)) try { ByteStreams.skipFully(in, blockId.reduceId * 8) val offset = in.readLong() diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala index 8aafda5e45d52..a08563562b874 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala @@ -18,6 +18,8 @@ package org.apache.spark.util.collection import java.io._ +import java.nio.channels.{Channels, FileChannel} +import java.nio.file.StandardOpenOption import java.util.Comparator import scala.collection.BufferedIterator @@ -460,7 +462,7 @@ class ExternalAppendOnlyMap[K, V, C]( ) private var batchIndex = 0 // Which batch we're in - private var fileStream: FileInputStream = null + private var fileChannel: FileChannel = null // An intermediate stream that reads from exactly one batch // This guards against pre-fetching and other arbitrary behavior of higher level streams @@ -477,14 +479,14 @@ class ExternalAppendOnlyMap[K, V, C]( if (batchIndex < batchOffsets.length - 1) { if (deserializeStream != null) { deserializeStream.close() - fileStream.close() + fileChannel.close() deserializeStream = null - fileStream = null + fileChannel = null } val start = batchOffsets(batchIndex) - fileStream = new FileInputStream(file) - fileStream.getChannel.position(start) + fileChannel = FileChannel.open(file.toPath, StandardOpenOption.READ) + fileChannel.position(start) batchIndex += 1 val end = batchOffsets(batchIndex) @@ -492,7 +494,8 @@ class ExternalAppendOnlyMap[K, V, C]( assert(end >= start, "start = " + start + ", end = " + end + ", batchOffsets = " + batchOffsets.mkString("[", ", ", "]")) - val bufferedStream = new BufferedInputStream(ByteStreams.limit(fileStream, end - start)) + val bufferedStream = new BufferedInputStream( + ByteStreams.limit(Channels.newInputStream(fileChannel), end - start)) val wrappedStream = serializerManager.wrapStream(blockId, bufferedStream) ser.deserializeStream(wrappedStream) } else { @@ -552,9 +555,9 @@ class ExternalAppendOnlyMap[K, V, C]( ds.close() deserializeStream = null } - if (fileStream != null) { - fileStream.close() - fileStream = null + if (fileChannel != null) { + fileChannel.close() + fileChannel = null } if (file.exists()) { if (!file.delete()) { diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala index 176f84fa2a0d2..3593cfd507783 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala @@ -18,6 +18,8 @@ package org.apache.spark.util.collection import java.io._ +import java.nio.channels.{Channels, FileChannel} +import java.nio.file.StandardOpenOption import java.util.Comparator import scala.collection.mutable @@ -492,7 +494,7 @@ private[spark] class ExternalSorter[K, V, C]( // Intermediate file and deserializer streams that read from exactly one batch // This guards against pre-fetching and other arbitrary behavior of higher level streams - var fileStream: FileInputStream = null + var fileChannel: FileChannel = null var deserializeStream = nextBatchStream() // Also sets fileStream var nextItem: (K, C) = null @@ -505,14 +507,14 @@ private[spark] class ExternalSorter[K, V, C]( if (batchId < batchOffsets.length - 1) { if (deserializeStream != null) { deserializeStream.close() - fileStream.close() + fileChannel.close() deserializeStream = null - fileStream = null + fileChannel = null } val start = batchOffsets(batchId) - fileStream = new FileInputStream(spill.file) - fileStream.getChannel.position(start) + fileChannel = FileChannel.open(spill.file.toPath, StandardOpenOption.READ) + fileChannel.position(start) batchId += 1 val end = batchOffsets(batchId) @@ -520,7 +522,8 @@ private[spark] class ExternalSorter[K, V, C]( assert(end >= start, "start = " + start + ", end = " + end + ", batchOffsets = " + batchOffsets.mkString("[", ", ", "]")) - val bufferedStream = new BufferedInputStream(ByteStreams.limit(fileStream, end - start)) + val bufferedStream = new BufferedInputStream( + ByteStreams.limit(Channels.newInputStream(fileChannel), end - start)) val wrappedStream = serializerManager.wrapStream(spill.blockId, bufferedStream) serInstance.deserializeStream(wrappedStream) @@ -610,7 +613,7 @@ private[spark] class ExternalSorter[K, V, C]( batchId = batchOffsets.length // Prevent reading any other batch val ds = deserializeStream deserializeStream = null - fileStream = null + fileChannel = null if (ds != null) { ds.close() }