Skip to content

Commit f39ec60

Browse files
saxenapranavAnuj Modi
authored andcommitted
Cherry-Pick: HADOOP-18546. ABFS. disable purging list of in progress reads in abfs
1 parent 2e95a90 commit f39ec60

File tree

3 files changed

+80
-1
lines changed

3 files changed

+80
-1
lines changed

hadoop-common-project/hadoop-common/src/main/resources/core-default.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2319,6 +2319,12 @@
23192319
<description>The AbstractFileSystem for gs: uris.</description>
23202320
</property>
23212321

2322+
<property>
2323+
<name>fs.azure.enable.readahead</name>
2324+
<value>true</value>
2325+
<description>Enabled readahead/prefetching in AbfsInputStream.</description>
2326+
</property>
2327+
23222328
<property>
23232329
<name>io.seqfile.compress.blocksize</name>
23242330
<value>1000000</value>

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -544,7 +544,6 @@ public synchronized void purgeBuffersForStream(AbfsInputStream stream) {
544544
LOGGER.debug("Purging stale buffers for AbfsInputStream {} ", stream);
545545
readAheadQueue.removeIf(readBuffer -> readBuffer.getStream() == stream);
546546
purgeList(stream, completedReadList);
547-
purgeList(stream, inProgressList);
548547
}
549548

550549
/**
@@ -642,4 +641,9 @@ void testMimicFullUseAndAddFailedBuffer(ReadBuffer buf) {
642641
freeList.clear();
643642
completedReadList.add(buf);
644643
}
644+
645+
@VisibleForTesting
646+
int getNumBuffers() {
647+
return NUM_BUFFERS;
648+
}
645649
}

hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,12 @@ public class TestAbfsInputStream extends
7373
REDUCED_READ_BUFFER_AGE_THRESHOLD * 10; // 30 sec
7474
private static final int ALWAYS_READ_BUFFER_SIZE_TEST_FILE_SIZE = 16 * ONE_MB;
7575

76+
@Override
77+
public void teardown() throws Exception {
78+
super.teardown();
79+
ReadBufferManager.getBufferManager().testResetReadBufferManager();
80+
}
81+
7682
private AbfsRestOperation getMockRestOp() {
7783
AbfsRestOperation op = mock(AbfsRestOperation.class);
7884
AbfsHttpOperation httpOp = mock(AbfsHttpOperation.class);
@@ -384,6 +390,69 @@ public void testSuccessfulReadAhead() throws Exception {
384390
checkEvictedStatus(inputStream, 0, true);
385391
}
386392

393+
/**
394+
* This test expects InProgressList is not purged by the inputStream close.
395+
*/
396+
@Test
397+
public void testStreamPurgeDuringReadAheadCallExecuting() throws Exception {
398+
AbfsClient client = getMockAbfsClient();
399+
AbfsRestOperation successOp = getMockRestOp();
400+
final Long serverCommunicationMockLatency = 3_000L;
401+
final Long readBufferTransferToInProgressProbableTime = 1_000L;
402+
final Integer readBufferQueuedCount = 3;
403+
404+
Mockito.doAnswer(invocationOnMock -> {
405+
//sleeping thread to mock the network latency from client to backend.
406+
Thread.sleep(serverCommunicationMockLatency);
407+
return successOp;
408+
})
409+
.when(client)
410+
.read(any(String.class), any(Long.class), any(byte[].class),
411+
any(Integer.class), any(Integer.class), any(String.class),
412+
any(String.class), any(TracingContext.class));
413+
414+
final ReadBufferManager readBufferManager
415+
= ReadBufferManager.getBufferManager();
416+
417+
final int readBufferTotal = readBufferManager.getNumBuffers();
418+
final int expectedFreeListBufferCount = readBufferTotal
419+
- readBufferQueuedCount;
420+
421+
try (AbfsInputStream inputStream = getAbfsInputStream(client,
422+
"testSuccessfulReadAhead.txt")) {
423+
// As this is try-with-resources block, the close() method of the created
424+
// abfsInputStream object shall be called on the end of the block.
425+
queueReadAheads(inputStream);
426+
427+
//Sleeping to give ReadBufferWorker to pick the readBuffers for processing.
428+
Thread.sleep(readBufferTransferToInProgressProbableTime);
429+
430+
Assertions.assertThat(readBufferManager.getInProgressCopiedList())
431+
.describedAs(String.format("InProgressList should have %d elements",
432+
readBufferQueuedCount))
433+
.hasSize(readBufferQueuedCount);
434+
Assertions.assertThat(readBufferManager.getFreeListCopy())
435+
.describedAs(String.format("FreeList should have %d elements",
436+
expectedFreeListBufferCount))
437+
.hasSize(expectedFreeListBufferCount);
438+
Assertions.assertThat(readBufferManager.getCompletedReadListCopy())
439+
.describedAs("CompletedList should have 0 elements")
440+
.hasSize(0);
441+
}
442+
443+
Assertions.assertThat(readBufferManager.getInProgressCopiedList())
444+
.describedAs(String.format("InProgressList should have %d elements",
445+
readBufferQueuedCount))
446+
.hasSize(readBufferQueuedCount);
447+
Assertions.assertThat(readBufferManager.getFreeListCopy())
448+
.describedAs(String.format("FreeList should have %d elements",
449+
expectedFreeListBufferCount))
450+
.hasSize(expectedFreeListBufferCount);
451+
Assertions.assertThat(readBufferManager.getCompletedReadListCopy())
452+
.describedAs("CompletedList should have 0 elements")
453+
.hasSize(0);
454+
}
455+
387456
/**
388457
* This test expects ReadAheadManager to throw exception if the read ahead
389458
* thread had failed within the last thresholdAgeMilliseconds.

0 commit comments

Comments
 (0)