Skip to content

Commit 44ca2fe

Browse files
zhaoyimjojochuang
authored andcommitted
HDFS-14308. DFSStripedInputStream curStripeBuf is not freed by unbuffer() (#1667)
Reviewed-by: Aravindan Vijayan <[email protected]> Reviewed-by: Wei-Chiu Chuang <[email protected]> (cherry picked from commit 30db895) (cherry picked from commit 9316ca1)
1 parent f5a3a9e commit 44ca2fe

File tree

2 files changed

+53
-2
lines changed

2 files changed

+53
-2
lines changed

hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,8 @@ public class DFSStripedInputStream extends DFSInputStream {
6969
private final int groupSize;
7070
/** the buffer for a complete stripe. */
7171
private ByteBuffer curStripeBuf;
72-
private ByteBuffer parityBuf;
72+
@VisibleForTesting
73+
protected ByteBuffer parityBuf;
7374
private final ErasureCodingPolicy ecPolicy;
7475
private RawErasureDecoder decoder;
7576

@@ -127,7 +128,7 @@ private void resetCurStripeBuffer(boolean shouldAllocateBuf) {
127128
curStripeRange = new StripeRange(0, 0);
128129
}
129130

130-
protected ByteBuffer getParityBuffer() {
131+
protected synchronized ByteBuffer getParityBuffer() {
131132
if (parityBuf == null) {
132133
parityBuf = BUFFER_POOL.getBuffer(useDirectBuffer(),
133134
cellSize * parityBlkNum);
@@ -550,4 +551,17 @@ public synchronized void releaseBuffer(ByteBuffer buffer) {
550551
throw new UnsupportedOperationException(
551552
"Not support enhanced byte buffer access.");
552553
}
554+
555+
@Override
556+
public synchronized void unbuffer() {
557+
super.unbuffer();
558+
if (curStripeBuf != null) {
559+
BUFFER_POOL.putBuffer(curStripeBuf);
560+
curStripeBuf = null;
561+
}
562+
if (parityBuf != null) {
563+
BUFFER_POOL.putBuffer(parityBuf);
564+
parityBuf = null;
565+
}
566+
}
553567
}

hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -627,4 +627,41 @@ private void emptyBufferPoolForCurrentPolicy(ElasticByteBufferPool ebbp,
627627
}
628628
}
629629
}
630+
631+
@Test
632+
public void testUnbuffer() throws Exception {
633+
final int numBlocks = 2;
634+
final int fileSize = numBlocks * blockGroupSize;
635+
DFSTestUtil.createStripedFile(cluster, filePath, null, numBlocks,
636+
stripesPerBlock, false, ecPolicy);
637+
LocatedBlocks lbs = fs.getClient().namenode.
638+
getBlockLocations(filePath.toString(), 0, fileSize);
639+
640+
for (LocatedBlock lb : lbs.getLocatedBlocks()) {
641+
assert lb instanceof LocatedStripedBlock;
642+
LocatedStripedBlock bg = (LocatedStripedBlock)(lb);
643+
for (int i = 0; i < dataBlocks; i++) {
644+
Block blk = new Block(bg.getBlock().getBlockId() + i,
645+
stripesPerBlock * cellSize,
646+
bg.getBlock().getGenerationStamp());
647+
blk.setGenerationStamp(bg.getBlock().getGenerationStamp());
648+
cluster.injectBlocks(i, Arrays.asList(blk),
649+
bg.getBlock().getBlockPoolId());
650+
}
651+
}
652+
DFSStripedInputStream in = new DFSStripedInputStream(fs.getClient(),
653+
filePath.toString(), false, ecPolicy, null);
654+
ByteBuffer readBuffer = ByteBuffer.allocate(fileSize);
655+
int done = 0;
656+
while (done < fileSize) {
657+
int ret = in.read(readBuffer);
658+
assertTrue(ret > 0);
659+
done += ret;
660+
}
661+
in.unbuffer();
662+
ByteBuffer curStripeBuf = (in.getCurStripeBuf());
663+
assertNull(curStripeBuf);
664+
assertNull(in.parityBuf);
665+
in.close();
666+
}
630667
}

0 commit comments

Comments
 (0)