|
25 | 25 | import java.util.concurrent.Callable; |
26 | 26 | import java.util.concurrent.ExecutorService; |
27 | 27 | import java.util.concurrent.Executors; |
| 28 | +import java.util.concurrent.TimeUnit; |
28 | 29 |
|
29 | 30 | import org.apache.hadoop.conf.Configuration; |
30 | 31 | import org.apache.hadoop.fs.FSDataInputStream; |
@@ -74,17 +75,14 @@ public void testPurgeBufferManagerForParallelStreams() throws Exception { |
74 | 75 | } |
75 | 76 | } finally { |
76 | 77 | executorService.shutdown(); |
| 78 | + // wait for all tasks to finish |
| 79 | + executorService.awaitTermination(1, TimeUnit.MINUTES); |
77 | 80 | } |
78 | 81 |
|
79 | 82 | ReadBufferManager bufferManager = ReadBufferManager.getBufferManager(); |
80 | | - assertListEmpty("CompletedList", bufferManager.getCompletedReadListCopy()); |
| 83 | + // verify there is no work in progress or the readahead queue. |
81 | 84 | assertListEmpty("InProgressList", bufferManager.getInProgressCopiedList()); |
82 | 85 | assertListEmpty("ReadAheadQueue", bufferManager.getReadAheadQueueCopy()); |
83 | | - Assertions.assertThat(bufferManager.getFreeListCopy()) |
84 | | - .describedAs("After closing all streams free list contents should match with " + freeList) |
85 | | - .hasSize(numBuffers) |
86 | | - .containsExactlyInAnyOrderElementsOf(freeList); |
87 | | - |
88 | 86 | } |
89 | 87 |
|
90 | 88 | private void assertListEmpty(String listName, List<ReadBuffer> list) { |
@@ -116,22 +114,18 @@ public void testPurgeBufferManagerForSequentialStream() throws Exception { |
116 | 114 | try { |
117 | 115 | iStream2 = (AbfsInputStream) fs.open(testFilePath).getWrappedStream(); |
118 | 116 | iStream2.read(); |
119 | | - // After closing stream1, none of the buffers associated with stream1 should be present. |
120 | | - assertListDoesnotContainBuffersForIstream(bufferManager.getInProgressCopiedList(), iStream1); |
121 | | - assertListDoesnotContainBuffersForIstream(bufferManager.getCompletedReadListCopy(), iStream1); |
| 117 | + // After closing stream1, no queued buffers of stream1 should be present |
| 118 | + // assertions can't be made about the state of the other lists as it is |
| 119 | + // too prone to race conditions. |
122 | 120 | assertListDoesnotContainBuffersForIstream(bufferManager.getReadAheadQueueCopy(), iStream1); |
123 | 121 | } finally { |
124 | 122 | // closing the stream later. |
125 | 123 | IOUtils.closeStream(iStream2); |
126 | 124 | } |
127 | | - // After closing stream2, none of the buffers associated with stream2 should be present. |
128 | | - assertListDoesnotContainBuffersForIstream(bufferManager.getInProgressCopiedList(), iStream2); |
129 | | - assertListDoesnotContainBuffersForIstream(bufferManager.getCompletedReadListCopy(), iStream2); |
| 125 | + // After closing stream2, no queued buffers of stream2 should be present. |
130 | 126 | assertListDoesnotContainBuffersForIstream(bufferManager.getReadAheadQueueCopy(), iStream2); |
131 | 127 |
|
132 | | - // After closing both the streams, all lists should be empty. |
133 | | - assertListEmpty("CompletedList", bufferManager.getCompletedReadListCopy()); |
134 | | - assertListEmpty("InProgressList", bufferManager.getInProgressCopiedList()); |
| 128 | + // After closing both the streams, read queue should be empty. |
135 | 129 | assertListEmpty("ReadAheadQueue", bufferManager.getReadAheadQueueCopy()); |
136 | 130 |
|
137 | 131 | } |
|
0 commit comments