From 31e59ee0cb59fd4e61f75397e0dcf7c2ae93ec9a Mon Sep 17 00:00:00 2001 From: Pranav Saxena Date: Thu, 8 Dec 2022 20:43:25 -0800 Subject: [PATCH 1/5] testPurgeBufferManagerForParallelStreams fix --- .../services/ITestReadBufferManager.java | 43 ++++++++++++++++--- 1 file changed, 36 insertions(+), 7 deletions(-) diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestReadBufferManager.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestReadBufferManager.java index 705cc2530d335..be2610a7fba91 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestReadBufferManager.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestReadBufferManager.java @@ -19,9 +19,12 @@ package org.apache.hadoop.fs.azurebfs.services; import java.io.IOException; +import java.util.Arrays; +import java.util.HashSet; import java.util.LinkedList; import java.util.List; import java.util.Random; +import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -60,15 +63,25 @@ public void testPurgeBufferManagerForParallelStreams() throws Exception { } ExecutorService executorService = Executors.newFixedThreadPool(4); AzureBlobFileSystem fs = getABFSWithReadAheadConfig(); + final Set inProgressBuffers = new HashSet<>(); + final ReadBufferManager bufferManager = ReadBufferManager.getBufferManager(); + final Boolean[] executionCompletion = new Boolean[4]; + Arrays.fill(executionCompletion, false); try { for (int i = 0; i < 4; i++) { final String fileName = methodName.getMethodName() + i; + final int iteration = i; executorService.submit((Callable) () -> { byte[] fileContent = getRandomBytesArray(ONE_MB); Path testFilePath = createFileWithContent(fs, fileName, fileContent); try (FSDataInputStream iStream = fs.open(testFilePath)) { iStream.read(); + for(ReadBuffer buffer : bufferManager.getInProgressCopiedList()) { + inProgressBuffers.add(buffer); + } } + executionCompletion[iteration] = true; + return null; }); } @@ -76,16 +89,32 @@ public void testPurgeBufferManagerForParallelStreams() throws Exception { executorService.shutdown(); } - ReadBufferManager bufferManager = ReadBufferManager.getBufferManager(); - assertListEmpty("CompletedList", bufferManager.getCompletedReadListCopy()); - assertListEmpty("InProgressList", bufferManager.getInProgressCopiedList()); + while(!checkIfAllExecutionCompleted(executionCompletion)) { + Thread.sleep(1_000L); + } + + assertCompletedListContainsSubSetOfCertainSet(bufferManager.getCompletedReadListCopy(), inProgressBuffers); assertListEmpty("ReadAheadQueue", bufferManager.getReadAheadQueueCopy()); - Assertions.assertThat(bufferManager.getFreeListCopy()) - .describedAs("After closing all streams free list contents should match with " + freeList) - .hasSize(numBuffers) - .containsExactlyInAnyOrderElementsOf(freeList); + } + + private void assertCompletedListContainsSubSetOfCertainSet(final List completedList, + Set bufferSet) { + for (ReadBuffer buffer : completedList) { + Assertions.assertThat(bufferSet) + .describedAs( + "CompletedList contains a buffer which is not part of bufferSet.") + .contains(buffer); + } + } + private Boolean checkIfAllExecutionCompleted(Boolean[] completionFlagArray) { + for (Boolean completionFlag : completionFlagArray) { + if (!completionFlag) { + return false; + } } + return true; + } private void assertListEmpty(String listName, List list) { Assertions.assertThat(list) From d517425d52fa429b14c705d947bb709628685308 Mon Sep 17 00:00:00 2001 From: Pranav Saxena Date: Thu, 8 Dec 2022 20:58:01 -0800 Subject: [PATCH 2/5] testPurgeBufferManagerForSequentialStream fix --- .../services/ITestReadBufferManager.java | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestReadBufferManager.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestReadBufferManager.java index be2610a7fba91..69aa83cebc349 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestReadBufferManager.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestReadBufferManager.java @@ -56,6 +56,7 @@ public ITestReadBufferManager() throws Exception { public void testPurgeBufferManagerForParallelStreams() throws Exception { describe("Testing purging of buffers from ReadBufferManager for " + "parallel input streams"); + final long checkExecutionWaitTime = 1_000L; final int numBuffers = 16; final LinkedList freeList = new LinkedList<>(); for (int i=0; i < numBuffers; i++) { @@ -81,7 +82,6 @@ public void testPurgeBufferManagerForParallelStreams() throws Exception { } } executionCompletion[iteration] = true; - return null; }); } @@ -90,7 +90,7 @@ public void testPurgeBufferManagerForParallelStreams() throws Exception { } while(!checkIfAllExecutionCompleted(executionCompletion)) { - Thread.sleep(1_000L); + Thread.sleep(checkExecutionWaitTime); } assertCompletedListContainsSubSetOfCertainSet(bufferManager.getCompletedReadListCopy(), inProgressBuffers); @@ -130,6 +130,8 @@ public void testPurgeBufferManagerForSequentialStream() throws Exception { final String fileName = methodName.getMethodName(); byte[] fileContent = getRandomBytesArray(ONE_MB); Path testFilePath = createFileWithContent(fs, fileName, fileContent); + Set inProgressBufferSet = new HashSet<>(); + ReadBufferManager bufferManager = ReadBufferManager.getBufferManager(); AbfsInputStream iStream1 = null; // stream1 will be closed right away. @@ -139,30 +141,27 @@ public void testPurgeBufferManagerForSequentialStream() throws Exception { iStream1.read(); } finally { IOUtils.closeStream(iStream1); + inProgressBufferSet.addAll(bufferManager.getInProgressCopiedList()); } - ReadBufferManager bufferManager = ReadBufferManager.getBufferManager(); AbfsInputStream iStream2 = null; try { iStream2 = (AbfsInputStream) fs.open(testFilePath).getWrappedStream(); iStream2.read(); // After closing stream1, none of the buffers associated with stream1 should be present. - assertListDoesnotContainBuffersForIstream(bufferManager.getInProgressCopiedList(), iStream1); - assertListDoesnotContainBuffersForIstream(bufferManager.getCompletedReadListCopy(), iStream1); assertListDoesnotContainBuffersForIstream(bufferManager.getReadAheadQueueCopy(), iStream1); } finally { // closing the stream later. IOUtils.closeStream(iStream2); + inProgressBufferSet.addAll(bufferManager.getInProgressCopiedList()); } // After closing stream2, none of the buffers associated with stream2 should be present. - assertListDoesnotContainBuffersForIstream(bufferManager.getInProgressCopiedList(), iStream2); - assertListDoesnotContainBuffersForIstream(bufferManager.getCompletedReadListCopy(), iStream2); assertListDoesnotContainBuffersForIstream(bufferManager.getReadAheadQueueCopy(), iStream2); // After closing both the streams, all lists should be empty. - assertListEmpty("CompletedList", bufferManager.getCompletedReadListCopy()); - assertListEmpty("InProgressList", bufferManager.getInProgressCopiedList()); assertListEmpty("ReadAheadQueue", bufferManager.getReadAheadQueueCopy()); + assertCompletedListContainsSubSetOfCertainSet(bufferManager.getCompletedReadListCopy(), inProgressBufferSet); + } From d9a5f3a5579bf0156f841b479dc70dd725de2e1e Mon Sep 17 00:00:00 2001 From: Pranav Saxena Date: Thu, 8 Dec 2022 21:17:31 -0800 Subject: [PATCH 3/5] checks for certain streams as many tests can run in parallel as per pom.xml --- .../services/ITestReadBufferManager.java | 26 ++++++++++++------- 1 file changed, 16 insertions(+), 10 deletions(-) diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestReadBufferManager.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestReadBufferManager.java index 69aa83cebc349..cb7dd67473d80 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestReadBufferManager.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestReadBufferManager.java @@ -65,6 +65,7 @@ public void testPurgeBufferManagerForParallelStreams() throws Exception { ExecutorService executorService = Executors.newFixedThreadPool(4); AzureBlobFileSystem fs = getABFSWithReadAheadConfig(); final Set inProgressBuffers = new HashSet<>(); + final Set streamsInTest = new HashSet<>(); final ReadBufferManager bufferManager = ReadBufferManager.getBufferManager(); final Boolean[] executionCompletion = new Boolean[4]; Arrays.fill(executionCompletion, false); @@ -76,10 +77,9 @@ public void testPurgeBufferManagerForParallelStreams() throws Exception { byte[] fileContent = getRandomBytesArray(ONE_MB); Path testFilePath = createFileWithContent(fs, fileName, fileContent); try (FSDataInputStream iStream = fs.open(testFilePath)) { + streamsInTest.add((AbfsInputStream) iStream.getWrappedStream()); iStream.read(); - for(ReadBuffer buffer : bufferManager.getInProgressCopiedList()) { - inProgressBuffers.add(buffer); - } + inProgressBuffers.addAll(bufferManager.getInProgressCopiedList()); } executionCompletion[iteration] = true; return null; @@ -93,13 +93,18 @@ public void testPurgeBufferManagerForParallelStreams() throws Exception { Thread.sleep(checkExecutionWaitTime); } - assertCompletedListContainsSubSetOfCertainSet(bufferManager.getCompletedReadListCopy(), inProgressBuffers); - assertListEmpty("ReadAheadQueue", bufferManager.getReadAheadQueueCopy()); + assertCompletedListContainsSubSetOfCertainSet(bufferManager.getCompletedReadListCopy(), inProgressBuffers, streamsInTest); + for(AbfsInputStream stream : streamsInTest) { + assertListDoesnotContainBuffersForIstream(bufferManager.getReadAheadQueueCopy(), stream); + } } private void assertCompletedListContainsSubSetOfCertainSet(final List completedList, - Set bufferSet) { + Set bufferSet, final Set streamsInTest) { for (ReadBuffer buffer : completedList) { + if(!streamsInTest.contains(buffer.getStream())) { + return; + } Assertions.assertThat(bufferSet) .describedAs( "CompletedList contains a buffer which is not part of bufferSet.") @@ -132,11 +137,13 @@ public void testPurgeBufferManagerForSequentialStream() throws Exception { Path testFilePath = createFileWithContent(fs, fileName, fileContent); Set inProgressBufferSet = new HashSet<>(); ReadBufferManager bufferManager = ReadBufferManager.getBufferManager(); + final Set streamsInTest = new HashSet<>(); AbfsInputStream iStream1 = null; // stream1 will be closed right away. try { iStream1 = (AbfsInputStream) fs.open(testFilePath).getWrappedStream(); + streamsInTest.add(iStream1); // Just reading one byte will trigger all read ahead calls. iStream1.read(); } finally { @@ -146,6 +153,7 @@ public void testPurgeBufferManagerForSequentialStream() throws Exception { AbfsInputStream iStream2 = null; try { iStream2 = (AbfsInputStream) fs.open(testFilePath).getWrappedStream(); + streamsInTest.add(iStream2); iStream2.read(); // After closing stream1, none of the buffers associated with stream1 should be present. assertListDoesnotContainBuffersForIstream(bufferManager.getReadAheadQueueCopy(), iStream1); @@ -157,10 +165,8 @@ public void testPurgeBufferManagerForSequentialStream() throws Exception { // After closing stream2, none of the buffers associated with stream2 should be present. assertListDoesnotContainBuffersForIstream(bufferManager.getReadAheadQueueCopy(), iStream2); - // After closing both the streams, all lists should be empty. - assertListEmpty("ReadAheadQueue", bufferManager.getReadAheadQueueCopy()); - - assertCompletedListContainsSubSetOfCertainSet(bufferManager.getCompletedReadListCopy(), inProgressBufferSet); + assertCompletedListContainsSubSetOfCertainSet(bufferManager.getCompletedReadListCopy(), inProgressBufferSet, + streamsInTest); } From 18e63f680424657bc75ad14d845c44886a80f6fb Mon Sep 17 00:00:00 2001 From: Pranav Saxena Date: Thu, 8 Dec 2022 21:59:46 -0800 Subject: [PATCH 4/5] Added comments for reference in ITestReadBufferManager --- .../services/ITestReadBufferManager.java | 59 ++++++++++++++----- 1 file changed, 43 insertions(+), 16 deletions(-) diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestReadBufferManager.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestReadBufferManager.java index cb7dd67473d80..90534396e987e 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestReadBufferManager.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestReadBufferManager.java @@ -77,9 +77,11 @@ public void testPurgeBufferManagerForParallelStreams() throws Exception { byte[] fileContent = getRandomBytesArray(ONE_MB); Path testFilePath = createFileWithContent(fs, fileName, fileContent); try (FSDataInputStream iStream = fs.open(testFilePath)) { - streamsInTest.add((AbfsInputStream) iStream.getWrappedStream()); - iStream.read(); - inProgressBuffers.addAll(bufferManager.getInProgressCopiedList()); + streamsInTest.add( + (AbfsInputStream) iStream.getWrappedStream()); + iStream.read(); + inProgressBuffers.addAll( + bufferManager.getInProgressCopiedList()); } executionCompletion[iteration] = true; return null; @@ -89,20 +91,37 @@ public void testPurgeBufferManagerForParallelStreams() throws Exception { executorService.shutdown(); } - while(!checkIfAllExecutionCompleted(executionCompletion)) { - Thread.sleep(checkExecutionWaitTime); - } + /* + * Since, the read from inputStream is happening in parallel thread, the + * test has to wait for the execution to get over. If we don't wait, test + * main thread will go on to do assertion where the stream execution may or + * may not happen. + */ + while (!checkIfAllExecutionCompleted(executionCompletion)) { + Thread.sleep(checkExecutionWaitTime); + } - assertCompletedListContainsSubSetOfCertainSet(bufferManager.getCompletedReadListCopy(), inProgressBuffers, streamsInTest); - for(AbfsInputStream stream : streamsInTest) { - assertListDoesnotContainBuffersForIstream(bufferManager.getReadAheadQueueCopy(), stream); - } + /* + * The close() method of AbfsInputStream would lead to purge of completedList. + * Because the readBufferWorkers are running in parallel thread, due to race condition, + * after close and before assert, it can happen that processing of inProgress buffer + * can get completed and hence we cannot assert on completedList to be empty. + * That is why completedList are checked to not have a buffer other than the + * buffers in inProgressQueue just before the closure of AbfsInputStream object. + */ + assertCompletedListContainsSubSetOfCertainSet( + bufferManager.getCompletedReadListCopy(), inProgressBuffers, + streamsInTest); + for (AbfsInputStream stream : streamsInTest) { + assertListDoesnotContainBuffersForIstream( + bufferManager.getReadAheadQueueCopy(), stream); + } } private void assertCompletedListContainsSubSetOfCertainSet(final List completedList, Set bufferSet, final Set streamsInTest) { for (ReadBuffer buffer : completedList) { - if(!streamsInTest.contains(buffer.getStream())) { + if (!streamsInTest.contains(buffer.getStream())) { return; } Assertions.assertThat(bufferSet) @@ -155,19 +174,27 @@ public void testPurgeBufferManagerForSequentialStream() throws Exception { iStream2 = (AbfsInputStream) fs.open(testFilePath).getWrappedStream(); streamsInTest.add(iStream2); iStream2.read(); - // After closing stream1, none of the buffers associated with stream1 should be present. + // After closing stream1, no queued buffers of stream1 should be present. assertListDoesnotContainBuffersForIstream(bufferManager.getReadAheadQueueCopy(), iStream1); } finally { // closing the stream later. IOUtils.closeStream(iStream2); inProgressBufferSet.addAll(bufferManager.getInProgressCopiedList()); } - // After closing stream2, none of the buffers associated with stream2 should be present. + // After closing stream2, no queued buffers of stream2 should be present. assertListDoesnotContainBuffersForIstream(bufferManager.getReadAheadQueueCopy(), iStream2); - assertCompletedListContainsSubSetOfCertainSet(bufferManager.getCompletedReadListCopy(), inProgressBufferSet, - streamsInTest); - + /* + * The close() method of AbfsInputStream would lead to purge of completedList. + * Because the readBufferWorkers are running in parallel thread, due to race condition, + * after close and before assert, it can happen that processing of inProgress buffer + * can get completed and hence we cannot assert on completedList to be empty. + * That is why completedList are checked to not have a buffer other than the + * buffers in inProgressQueue just before the closure of AbfsInputStream object. + */ + assertCompletedListContainsSubSetOfCertainSet( + bufferManager.getCompletedReadListCopy(), inProgressBufferSet, + streamsInTest); } From b4c374c80502599618d496aef2cae427149138cb Mon Sep 17 00:00:00 2001 From: Pranav Saxena Date: Thu, 8 Dec 2022 22:26:48 -0800 Subject: [PATCH 5/5] fix indentation --- .../services/ITestReadBufferManager.java | 70 +++++++++---------- 1 file changed, 35 insertions(+), 35 deletions(-) diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestReadBufferManager.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestReadBufferManager.java index 90534396e987e..f7d775eda52c9 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestReadBufferManager.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestReadBufferManager.java @@ -91,31 +91,31 @@ public void testPurgeBufferManagerForParallelStreams() throws Exception { executorService.shutdown(); } - /* - * Since, the read from inputStream is happening in parallel thread, the - * test has to wait for the execution to get over. If we don't wait, test - * main thread will go on to do assertion where the stream execution may or - * may not happen. - */ - while (!checkIfAllExecutionCompleted(executionCompletion)) { - Thread.sleep(checkExecutionWaitTime); - } + /* + * Since, the read from inputStream is happening in parallel thread, the + * test has to wait for the execution to get over. If we don't wait, test + * main thread will go on to do assertion where the stream execution may or + * may not happen. + */ + while (!checkIfAllExecutionCompleted(executionCompletion)) { + Thread.sleep(checkExecutionWaitTime); + } - /* - * The close() method of AbfsInputStream would lead to purge of completedList. - * Because the readBufferWorkers are running in parallel thread, due to race condition, - * after close and before assert, it can happen that processing of inProgress buffer - * can get completed and hence we cannot assert on completedList to be empty. - * That is why completedList are checked to not have a buffer other than the - * buffers in inProgressQueue just before the closure of AbfsInputStream object. - */ - assertCompletedListContainsSubSetOfCertainSet( - bufferManager.getCompletedReadListCopy(), inProgressBuffers, - streamsInTest); - for (AbfsInputStream stream : streamsInTest) { - assertListDoesnotContainBuffersForIstream( - bufferManager.getReadAheadQueueCopy(), stream); - } + /* + * The close() method of AbfsInputStream would lead to purge of completedList. + * Because the readBufferWorkers are running in parallel thread, due to race condition, + * after close and before assert, it can happen that processing of inProgress buffer + * can get completed and hence we cannot assert on completedList to be empty. + * That is why completedList are checked to not have a buffer other than the + * buffers in inProgressQueue just before the closure of AbfsInputStream object. + */ + assertCompletedListContainsSubSetOfCertainSet( + bufferManager.getCompletedReadListCopy(), inProgressBuffers, + streamsInTest); + for (AbfsInputStream stream : streamsInTest) { + assertListDoesnotContainBuffersForIstream( + bufferManager.getReadAheadQueueCopy(), stream); + } } private void assertCompletedListContainsSubSetOfCertainSet(final List completedList, @@ -184,17 +184,17 @@ public void testPurgeBufferManagerForSequentialStream() throws Exception { // After closing stream2, no queued buffers of stream2 should be present. assertListDoesnotContainBuffersForIstream(bufferManager.getReadAheadQueueCopy(), iStream2); - /* - * The close() method of AbfsInputStream would lead to purge of completedList. - * Because the readBufferWorkers are running in parallel thread, due to race condition, - * after close and before assert, it can happen that processing of inProgress buffer - * can get completed and hence we cannot assert on completedList to be empty. - * That is why completedList are checked to not have a buffer other than the - * buffers in inProgressQueue just before the closure of AbfsInputStream object. - */ - assertCompletedListContainsSubSetOfCertainSet( - bufferManager.getCompletedReadListCopy(), inProgressBufferSet, - streamsInTest); + /* + * The close() method of AbfsInputStream would lead to purge of completedList. + * Because the readBufferWorkers are running in parallel thread, due to race condition, + * after close and before assert, it can happen that processing of inProgress buffer + * can get completed and hence we cannot assert on completedList to be empty. + * That is why completedList are checked to not have a buffer other than the + * buffers in inProgressQueue just before the closure of AbfsInputStream object. + */ + assertCompletedListContainsSubSetOfCertainSet( + bufferManager.getCompletedReadListCopy(), inProgressBufferSet, + streamsInTest); }