Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,12 @@
package org.apache.hadoop.fs.azurebfs.services;

import java.io.IOException;
import java.util.Arrays;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
Expand Down Expand Up @@ -53,39 +56,89 @@ public ITestReadBufferManager() throws Exception {
public void testPurgeBufferManagerForParallelStreams() throws Exception {
describe("Testing purging of buffers from ReadBufferManager for "
+ "parallel input streams");
final long checkExecutionWaitTime = 1_000L;
final int numBuffers = 16;
final LinkedList<Integer> freeList = new LinkedList<>();
for (int i=0; i < numBuffers; i++) {
freeList.add(i);
}
ExecutorService executorService = Executors.newFixedThreadPool(4);
AzureBlobFileSystem fs = getABFSWithReadAheadConfig();
final Set<ReadBuffer> inProgressBuffers = new HashSet<>();
final Set<AbfsInputStream> streamsInTest = new HashSet<>();
final ReadBufferManager bufferManager = ReadBufferManager.getBufferManager();
final Boolean[] executionCompletion = new Boolean[4];
Arrays.fill(executionCompletion, false);
try {
for (int i = 0; i < 4; i++) {
final String fileName = methodName.getMethodName() + i;
final int iteration = i;
executorService.submit((Callable<Void>) () -> {
byte[] fileContent = getRandomBytesArray(ONE_MB);
Path testFilePath = createFileWithContent(fs, fileName, fileContent);
try (FSDataInputStream iStream = fs.open(testFilePath)) {
iStream.read();
streamsInTest.add(
(AbfsInputStream) iStream.getWrappedStream());
iStream.read();
inProgressBuffers.addAll(
bufferManager.getInProgressCopiedList());
}
executionCompletion[iteration] = true;
return null;
});
}
} finally {
executorService.shutdown();
}

ReadBufferManager bufferManager = ReadBufferManager.getBufferManager();
assertListEmpty("CompletedList", bufferManager.getCompletedReadListCopy());
assertListEmpty("InProgressList", bufferManager.getInProgressCopiedList());
assertListEmpty("ReadAheadQueue", bufferManager.getReadAheadQueueCopy());
Assertions.assertThat(bufferManager.getFreeListCopy())
.describedAs("After closing all streams free list contents should match with " + freeList)
.hasSize(numBuffers)
.containsExactlyInAnyOrderElementsOf(freeList);
/*
* Since, the read from inputStream is happening in parallel thread, the
* test has to wait for the execution to get over. If we don't wait, test
* main thread will go on to do assertion where the stream execution may or
* may not happen.
*/
while (!checkIfAllExecutionCompleted(executionCompletion)) {
Thread.sleep(checkExecutionWaitTime);
}

/*
* The close() method of AbfsInputStream would lead to purge of completedList.
* Because the readBufferWorkers are running in parallel thread, due to race condition,
* after close and before assert, it can happen that processing of inProgress buffer
* can get completed and hence we cannot assert on completedList to be empty.
* That is why completedList are checked to not have a buffer other than the
* buffers in inProgressQueue just before the closure of AbfsInputStream object.
*/
assertCompletedListContainsSubSetOfCertainSet(
bufferManager.getCompletedReadListCopy(), inProgressBuffers,
streamsInTest);
for (AbfsInputStream stream : streamsInTest) {
assertListDoesnotContainBuffersForIstream(
bufferManager.getReadAheadQueueCopy(), stream);
}
}

private void assertCompletedListContainsSubSetOfCertainSet(final List<ReadBuffer> completedList,
Set<ReadBuffer> bufferSet, final Set<AbfsInputStream> streamsInTest) {
for (ReadBuffer buffer : completedList) {
if (!streamsInTest.contains(buffer.getStream())) {
return;
}
Assertions.assertThat(bufferSet)
.describedAs(
"CompletedList contains a buffer which is not part of bufferSet.")
.contains(buffer);
}
}

private Boolean checkIfAllExecutionCompleted(Boolean[] completionFlagArray) {
for (Boolean completionFlag : completionFlagArray) {
if (!completionFlag) {
return false;
}
}
return true;
}

private void assertListEmpty(String listName, List<ReadBuffer> list) {
Assertions.assertThat(list)
Expand All @@ -101,39 +154,47 @@ public void testPurgeBufferManagerForSequentialStream() throws Exception {
final String fileName = methodName.getMethodName();
byte[] fileContent = getRandomBytesArray(ONE_MB);
Path testFilePath = createFileWithContent(fs, fileName, fileContent);
Set<ReadBuffer> inProgressBufferSet = new HashSet<>();
ReadBufferManager bufferManager = ReadBufferManager.getBufferManager();
final Set<AbfsInputStream> streamsInTest = new HashSet<>();

AbfsInputStream iStream1 = null;
// stream1 will be closed right away.
try {
iStream1 = (AbfsInputStream) fs.open(testFilePath).getWrappedStream();
streamsInTest.add(iStream1);
// Just reading one byte will trigger all read ahead calls.
iStream1.read();
} finally {
IOUtils.closeStream(iStream1);
inProgressBufferSet.addAll(bufferManager.getInProgressCopiedList());
}
ReadBufferManager bufferManager = ReadBufferManager.getBufferManager();
AbfsInputStream iStream2 = null;
try {
iStream2 = (AbfsInputStream) fs.open(testFilePath).getWrappedStream();
streamsInTest.add(iStream2);
iStream2.read();
// After closing stream1, none of the buffers associated with stream1 should be present.
assertListDoesnotContainBuffersForIstream(bufferManager.getInProgressCopiedList(), iStream1);
assertListDoesnotContainBuffersForIstream(bufferManager.getCompletedReadListCopy(), iStream1);
// After closing stream1, no queued buffers of stream1 should be present.
assertListDoesnotContainBuffersForIstream(bufferManager.getReadAheadQueueCopy(), iStream1);
} finally {
// closing the stream later.
IOUtils.closeStream(iStream2);
inProgressBufferSet.addAll(bufferManager.getInProgressCopiedList());
}
// After closing stream2, none of the buffers associated with stream2 should be present.
assertListDoesnotContainBuffersForIstream(bufferManager.getInProgressCopiedList(), iStream2);
assertListDoesnotContainBuffersForIstream(bufferManager.getCompletedReadListCopy(), iStream2);
// After closing stream2, no queued buffers of stream2 should be present.
assertListDoesnotContainBuffersForIstream(bufferManager.getReadAheadQueueCopy(), iStream2);

// After closing both the streams, all lists should be empty.
assertListEmpty("CompletedList", bufferManager.getCompletedReadListCopy());
assertListEmpty("InProgressList", bufferManager.getInProgressCopiedList());
assertListEmpty("ReadAheadQueue", bufferManager.getReadAheadQueueCopy());

/*
* The close() method of AbfsInputStream would lead to purge of completedList.
* Because the readBufferWorkers are running in parallel thread, due to race condition,
* after close and before assert, it can happen that processing of inProgress buffer
* can get completed and hence we cannot assert on completedList to be empty.
* That is why completedList are checked to not have a buffer other than the
* buffers in inProgressQueue just before the closure of AbfsInputStream object.
*/
assertCompletedListContainsSubSetOfCertainSet(
bufferManager.getCompletedReadListCopy(), inProgressBufferSet,
streamsInTest);
}


Expand Down