diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestReadBufferManager.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestReadBufferManager.java index 705cc2530d335..f7d775eda52c9 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestReadBufferManager.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestReadBufferManager.java @@ -19,9 +19,12 @@ package org.apache.hadoop.fs.azurebfs.services; import java.io.IOException; +import java.util.Arrays; +import java.util.HashSet; import java.util.LinkedList; import java.util.List; import java.util.Random; +import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -53,6 +56,7 @@ public ITestReadBufferManager() throws Exception { public void testPurgeBufferManagerForParallelStreams() throws Exception { describe("Testing purging of buffers from ReadBufferManager for " + "parallel input streams"); + final long checkExecutionWaitTime = 1_000L; final int numBuffers = 16; final LinkedList freeList = new LinkedList<>(); for (int i=0; i < numBuffers; i++) { @@ -60,15 +64,26 @@ public void testPurgeBufferManagerForParallelStreams() throws Exception { } ExecutorService executorService = Executors.newFixedThreadPool(4); AzureBlobFileSystem fs = getABFSWithReadAheadConfig(); + final Set inProgressBuffers = new HashSet<>(); + final Set streamsInTest = new HashSet<>(); + final ReadBufferManager bufferManager = ReadBufferManager.getBufferManager(); + final Boolean[] executionCompletion = new Boolean[4]; + Arrays.fill(executionCompletion, false); try { for (int i = 0; i < 4; i++) { final String fileName = methodName.getMethodName() + i; + final int iteration = i; executorService.submit((Callable) () -> { byte[] fileContent = getRandomBytesArray(ONE_MB); Path testFilePath = createFileWithContent(fs, fileName, fileContent); try (FSDataInputStream iStream = fs.open(testFilePath)) { - iStream.read(); + streamsInTest.add( + (AbfsInputStream) iStream.getWrappedStream()); + iStream.read(); + inProgressBuffers.addAll( + bufferManager.getInProgressCopiedList()); } + executionCompletion[iteration] = true; return null; }); } @@ -76,16 +91,54 @@ public void testPurgeBufferManagerForParallelStreams() throws Exception { executorService.shutdown(); } - ReadBufferManager bufferManager = ReadBufferManager.getBufferManager(); - assertListEmpty("CompletedList", bufferManager.getCompletedReadListCopy()); - assertListEmpty("InProgressList", bufferManager.getInProgressCopiedList()); - assertListEmpty("ReadAheadQueue", bufferManager.getReadAheadQueueCopy()); - Assertions.assertThat(bufferManager.getFreeListCopy()) - .describedAs("After closing all streams free list contents should match with " + freeList) - .hasSize(numBuffers) - .containsExactlyInAnyOrderElementsOf(freeList); + /* + * Since, the read from inputStream is happening in parallel thread, the + * test has to wait for the execution to get over. If we don't wait, test + * main thread will go on to do assertion where the stream execution may or + * may not happen. + */ + while (!checkIfAllExecutionCompleted(executionCompletion)) { + Thread.sleep(checkExecutionWaitTime); + } + + /* + * The close() method of AbfsInputStream would lead to purge of completedList. + * Because the readBufferWorkers are running in parallel thread, due to race condition, + * after close and before assert, it can happen that processing of inProgress buffer + * can get completed and hence we cannot assert on completedList to be empty. + * That is why completedList are checked to not have a buffer other than the + * buffers in inProgressQueue just before the closure of AbfsInputStream object. + */ + assertCompletedListContainsSubSetOfCertainSet( + bufferManager.getCompletedReadListCopy(), inProgressBuffers, + streamsInTest); + for (AbfsInputStream stream : streamsInTest) { + assertListDoesnotContainBuffersForIstream( + bufferManager.getReadAheadQueueCopy(), stream); + } + } + private void assertCompletedListContainsSubSetOfCertainSet(final List completedList, + Set bufferSet, final Set streamsInTest) { + for (ReadBuffer buffer : completedList) { + if (!streamsInTest.contains(buffer.getStream())) { + return; + } + Assertions.assertThat(bufferSet) + .describedAs( + "CompletedList contains a buffer which is not part of bufferSet.") + .contains(buffer); } + } + + private Boolean checkIfAllExecutionCompleted(Boolean[] completionFlagArray) { + for (Boolean completionFlag : completionFlagArray) { + if (!completionFlag) { + return false; + } + } + return true; + } private void assertListEmpty(String listName, List list) { Assertions.assertThat(list) @@ -101,39 +154,47 @@ public void testPurgeBufferManagerForSequentialStream() throws Exception { final String fileName = methodName.getMethodName(); byte[] fileContent = getRandomBytesArray(ONE_MB); Path testFilePath = createFileWithContent(fs, fileName, fileContent); + Set inProgressBufferSet = new HashSet<>(); + ReadBufferManager bufferManager = ReadBufferManager.getBufferManager(); + final Set streamsInTest = new HashSet<>(); AbfsInputStream iStream1 = null; // stream1 will be closed right away. try { iStream1 = (AbfsInputStream) fs.open(testFilePath).getWrappedStream(); + streamsInTest.add(iStream1); // Just reading one byte will trigger all read ahead calls. iStream1.read(); } finally { IOUtils.closeStream(iStream1); + inProgressBufferSet.addAll(bufferManager.getInProgressCopiedList()); } - ReadBufferManager bufferManager = ReadBufferManager.getBufferManager(); AbfsInputStream iStream2 = null; try { iStream2 = (AbfsInputStream) fs.open(testFilePath).getWrappedStream(); + streamsInTest.add(iStream2); iStream2.read(); - // After closing stream1, none of the buffers associated with stream1 should be present. - assertListDoesnotContainBuffersForIstream(bufferManager.getInProgressCopiedList(), iStream1); - assertListDoesnotContainBuffersForIstream(bufferManager.getCompletedReadListCopy(), iStream1); + // After closing stream1, no queued buffers of stream1 should be present. assertListDoesnotContainBuffersForIstream(bufferManager.getReadAheadQueueCopy(), iStream1); } finally { // closing the stream later. IOUtils.closeStream(iStream2); + inProgressBufferSet.addAll(bufferManager.getInProgressCopiedList()); } - // After closing stream2, none of the buffers associated with stream2 should be present. - assertListDoesnotContainBuffersForIstream(bufferManager.getInProgressCopiedList(), iStream2); - assertListDoesnotContainBuffersForIstream(bufferManager.getCompletedReadListCopy(), iStream2); + // After closing stream2, no queued buffers of stream2 should be present. assertListDoesnotContainBuffersForIstream(bufferManager.getReadAheadQueueCopy(), iStream2); - // After closing both the streams, all lists should be empty. - assertListEmpty("CompletedList", bufferManager.getCompletedReadListCopy()); - assertListEmpty("InProgressList", bufferManager.getInProgressCopiedList()); - assertListEmpty("ReadAheadQueue", bufferManager.getReadAheadQueueCopy()); - + /* + * The close() method of AbfsInputStream would lead to purge of completedList. + * Because the readBufferWorkers are running in parallel thread, due to race condition, + * after close and before assert, it can happen that processing of inProgress buffer + * can get completed and hence we cannot assert on completedList to be empty. + * That is why completedList are checked to not have a buffer other than the + * buffers in inProgressQueue just before the closure of AbfsInputStream object. + */ + assertCompletedListContainsSubSetOfCertainSet( + bufferManager.getCompletedReadListCopy(), inProgressBufferSet, + streamsInTest); }