Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -623,6 +623,7 @@ public UnsafeSorterIterator getIterator(int startIndex) throws IOException {
return iter;
} else {
LinkedList<UnsafeSorterIterator> queue = new LinkedList<>();
logger.debug("number of spillWriters: {}", spillWriters.size());
int i = 0;
for (UnsafeSorterSpillWriter spillWriter : spillWriters) {
if (i + spillWriter.recordsSpilled() > startIndex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,19 @@ public final class UnsafeSorterSpillReader extends UnsafeSorterIterator implemen
// Variables that change with every record read:
private int recordLength;
private long keyPrefix;
private int numRecords;
private final int numRecords;
private int numRecordsRemaining;

private byte[] arr = new byte[1024 * 1024];
private Object baseObject = arr;
private final long baseOffset = Platform.BYTE_ARRAY_OFFSET;
private final TaskContext taskContext = TaskContext.get();

private final long buffSize;
private final File file;
private final BlockId blockId;
private final SerializerManager serializerManager;

public UnsafeSorterSpillReader(
SerializerManager serializerManager,
File file,
Expand All @@ -72,12 +77,27 @@ public UnsafeSorterSpillReader(
bufferSizeBytes = DEFAULT_BUFFER_SIZE_BYTES;
}

// No need to hold the file open until records need to be loaded.
// This is to prevent too many files open issue partially.
try (InputStream bs = new NioBufferedFileInputStream(file, (int) bufferSizeBytes);
DataInputStream dataIn = new DataInputStream(serializerManager.wrapStream(blockId, bs))) {
this.numRecords = dataIn.readInt();
this.numRecordsRemaining = numRecords;
}

this.buffSize = bufferSizeBytes;
this.file = file;
this.blockId = blockId;
this.serializerManager = serializerManager;
}

private void initStreams() throws IOException {
final InputStream bs =
new NioBufferedFileInputStream(file, (int) bufferSizeBytes);
new NioBufferedFileInputStream(file, (int) buffSize);
try {
this.in = serializerManager.wrapStream(blockId, bs);
this.din = new DataInputStream(this.in);
numRecords = numRecordsRemaining = din.readInt();
this.numRecordsRemaining = din.readInt();
} catch (IOException e) {
Closeables.close(bs, /* swallowIOException = */ true);
throw e;
Expand All @@ -104,6 +124,12 @@ public void loadNext() throws IOException {
if (taskContext != null) {
taskContext.killTaskIfInterrupted();
}
if (this.din == null) {
// It is time to initialize and hold the input stream of the spill file
// for loading records. Keeping the input stream open too early will very possibly
// encounter too many file open issue.
initStreams();
}
recordLength = din.readInt();
keyPrefix = din.readLong();
if (recordLength > arr.length) {
Expand Down