From dcc2960d5f60add9bfd9446df59b0d0d06365947 Mon Sep 17 00:00:00 2001 From: Rajesh Balamohan Date: Mon, 11 Sep 2017 07:06:12 +0530 Subject: [PATCH 1/2] [SPARK-21971][CORE] Too many open files in Spark due to concurrent files being opened --- .../unsafe/sort/UnsafeExternalSorter.java | 1 + .../unsafe/sort/UnsafeSorterSpillReader.java | 30 +++++++++++++++++-- 2 files changed, 28 insertions(+), 3 deletions(-) diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java index e2059cec132d2..cf5f05a7010e0 100644 --- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java +++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java @@ -623,6 +623,7 @@ public UnsafeSorterIterator getIterator(int startIndex) throws IOException { return iter; } else { LinkedList queue = new LinkedList<>(); + logger.debug("number of spillWriters: {}", spillWriters.size()); int i = 0; for (UnsafeSorterSpillWriter spillWriter : spillWriters) { if (i + spillWriter.recordsSpilled() > startIndex) { diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java index 9521ab86a12d5..97710dd432b5e 100644 --- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java +++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java @@ -46,7 +46,7 @@ public final class UnsafeSorterSpillReader extends UnsafeSorterIterator implemen // Variables that change with every record read: private int recordLength; private long keyPrefix; - private int numRecords; + private final int numRecords; private int numRecordsRemaining; private byte[] arr = new byte[1024 * 1024]; @@ -54,6 +54,11 @@ public final class UnsafeSorterSpillReader extends UnsafeSorterIterator implemen private final long baseOffset = Platform.BYTE_ARRAY_OFFSET; private final TaskContext taskContext = TaskContext.get(); + private final long buffSize; + private final File file; + private final BlockId blockId; + private final SerializerManager serializerManager; + public UnsafeSorterSpillReader( SerializerManager serializerManager, File file, @@ -72,12 +77,27 @@ public UnsafeSorterSpillReader( bufferSizeBytes = DEFAULT_BUFFER_SIZE_BYTES; } + try (InputStream bs = new NioBufferedFileInputStream(file, (int) bufferSizeBytes); + DataInputStream dataIn = new DataInputStream(serializerManager.wrapStream(blockId, bs))) { + this.numRecords = dataIn.readInt(); + this.numRecordsRemaining = numRecords; + } + + this.buffSize = bufferSizeBytes; + this.file = file; + this.blockId = blockId; + this.serializerManager = serializerManager; + + logger.debug("bufSize: {}, file: {}, records: {}", buffSize, file, this.numRecords); + } + + private void initStreams() throws IOException { final InputStream bs = - new NioBufferedFileInputStream(file, (int) bufferSizeBytes); + new NioBufferedFileInputStream(file, (int) buffSize); try { this.in = serializerManager.wrapStream(blockId, bs); this.din = new DataInputStream(this.in); - numRecords = numRecordsRemaining = din.readInt(); + this.numRecordsRemaining = din.readInt(); } catch (IOException e) { Closeables.close(bs, /* swallowIOException = */ true); throw e; @@ -104,6 +124,10 @@ public void loadNext() throws IOException { if (taskContext != null) { taskContext.killTaskIfInterrupted(); } + if (this.din == null) { + // Good time to init (if all files are opened, we can get Too Many files exception) + initStreams(); + } recordLength = din.readInt(); keyPrefix = din.readLong(); if (recordLength > arr.length) { From ea5f9d9903185690670da6384dbd6d2b08b8177f Mon Sep 17 00:00:00 2001 From: Rajesh Balamohan Date: Mon, 11 Sep 2017 12:58:07 +0530 Subject: [PATCH 2/2] [SPARK-21971][CORE] Too many open files in Spark due to concurrent files being opened --- .../collection/unsafe/sort/UnsafeSorterSpillReader.java | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java index 97710dd432b5e..4df75de9fc637 100644 --- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java +++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java @@ -77,6 +77,8 @@ public UnsafeSorterSpillReader( bufferSizeBytes = DEFAULT_BUFFER_SIZE_BYTES; } + // No need to hold the file open until records need to be loaded. + // This is to prevent too many files open issue partially. try (InputStream bs = new NioBufferedFileInputStream(file, (int) bufferSizeBytes); DataInputStream dataIn = new DataInputStream(serializerManager.wrapStream(blockId, bs))) { this.numRecords = dataIn.readInt(); @@ -87,8 +89,6 @@ public UnsafeSorterSpillReader( this.file = file; this.blockId = blockId; this.serializerManager = serializerManager; - - logger.debug("bufSize: {}, file: {}, records: {}", buffSize, file, this.numRecords); } private void initStreams() throws IOException { @@ -125,7 +125,9 @@ public void loadNext() throws IOException { taskContext.killTaskIfInterrupted(); } if (this.din == null) { - // Good time to init (if all files are opened, we can get Too Many files exception) + // It is time to initialize and hold the input stream of the spill file + // for loading records. Keeping the input stream open too early will very possibly + // encounter too many file open issue. initStreams(); } recordLength = din.readInt();