From 9bd2ac436a253da0d7248a78e72f5cb409e8a018 Mon Sep 17 00:00:00 2001 From: zhaoym6 Date: Tue, 22 Oct 2019 23:32:12 +0800 Subject: [PATCH 1/4] HDFS-14308. DFSStripedInputStream curStripeBuf is not freed by unbuffer() --- .../hadoop/hdfs/DFSStripedInputStream.java | 18 ++++++++- .../hdfs/TestDFSStripedInputStream.java | 37 +++++++++++++++++++ 2 files changed, 54 insertions(+), 1 deletion(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java index 3f688d410be7e..a0e7d6504d76f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java @@ -70,7 +70,8 @@ public class DFSStripedInputStream extends DFSInputStream { private final int groupSize; /** the buffer for a complete stripe. */ private ByteBuffer curStripeBuf; - private ByteBuffer parityBuf; + @VisibleForTesting + protected ByteBuffer parityBuf; private final ErasureCodingPolicy ecPolicy; private RawErasureDecoder decoder; @@ -554,4 +555,19 @@ public synchronized void releaseBuffer(ByteBuffer buffer) { throw new UnsupportedOperationException( "Not support enhanced byte buffer access."); } + + @Override + public synchronized void unbuffer() { + closeCurrentBlockReaders(); + if (curStripeBuf != null) { + curStripeBuf.clear(); + BUFFER_POOL.putBuffer(curStripeBuf); + curStripeBuf = null; + } + if (parityBuf != null) { + parityBuf.clear(); + BUFFER_POOL.putBuffer(parityBuf); + parityBuf = null; + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java index 8c2367d38b706..aedea3c8acde4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java @@ -627,4 +627,41 @@ private void emptyBufferPoolForCurrentPolicy(ElasticByteBufferPool ebbp, } } } + + @Test + public void testUnbuffer() throws Exception { + final int numBlocks = 2; + final int fileSize = numBlocks * blockGroupSize; + DFSTestUtil.createStripedFile(cluster, filePath, null, numBlocks, + stripesPerBlock, false, ecPolicy); + LocatedBlocks lbs = fs.getClient().namenode. + getBlockLocations(filePath.toString(), 0, fileSize); + + for (LocatedBlock lb : lbs.getLocatedBlocks()) { + assert lb instanceof LocatedStripedBlock; + LocatedStripedBlock bg = (LocatedStripedBlock)(lb); + for (int i = 0; i < dataBlocks; i++) { + Block blk = new Block(bg.getBlock().getBlockId() + i, + stripesPerBlock * cellSize, + bg.getBlock().getGenerationStamp()); + blk.setGenerationStamp(bg.getBlock().getGenerationStamp()); + cluster.injectBlocks(i, Arrays.asList(blk), + bg.getBlock().getBlockPoolId()); + } + } + DFSStripedInputStream in = new DFSStripedInputStream(fs.getClient(), + filePath.toString(), false, ecPolicy, null); + ByteBuffer readBuffer = ByteBuffer.allocate(fileSize); + int done = 0; + while (done < fileSize) { + int ret = in.read(readBuffer); + assertTrue(ret > 0); + done += ret; + } + in.unbuffer(); + ByteBuffer curStripeBuf = (in.getCurStripeBuf()); + assertNull(curStripeBuf); + assertNull(in.parityBuf); + in.close(); + } } From 5edfbf86526ff191d5622f2c7c4a9595d77491c1 Mon Sep 17 00:00:00 2001 From: zhaoym6 Date: Wed, 23 Oct 2019 07:37:34 +0800 Subject: [PATCH 2/4] HDFS-14308. DFSStripedInputStream curStripeBuf is not freed by unbuffer() --- .../main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java index a0e7d6504d76f..02ff06fa39d84 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java @@ -130,7 +130,7 @@ private void resetCurStripeBuffer(boolean shouldAllocateBuf) { curStripeRange = new StripeRange(0, 0); } - protected ByteBuffer getParityBuffer() { + protected synchronized ByteBuffer getParityBuffer() { if (parityBuf == null) { parityBuf = BUFFER_POOL.getBuffer(useDirectBuffer(), cellSize * parityBlkNum); From c43e5c251547566bcca29b6588ec556f473c6f4c Mon Sep 17 00:00:00 2001 From: zhaoym6 Date: Wed, 23 Oct 2019 13:47:21 +0800 Subject: [PATCH 3/4] remove the buffer clear in the unbuffer --- .../main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java index 02ff06fa39d84..a6de7860d7138 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java @@ -560,12 +560,10 @@ public synchronized void releaseBuffer(ByteBuffer buffer) { public synchronized void unbuffer() { closeCurrentBlockReaders(); if (curStripeBuf != null) { - curStripeBuf.clear(); BUFFER_POOL.putBuffer(curStripeBuf); curStripeBuf = null; } if (parityBuf != null) { - parityBuf.clear(); BUFFER_POOL.putBuffer(parityBuf); parityBuf = null; } From 23f9c9cc168bd71c822aac7e342b586fe25752a6 Mon Sep 17 00:00:00 2001 From: zhaoym6 Date: Fri, 25 Oct 2019 17:25:36 +0800 Subject: [PATCH 4/4] call super unbuffer() --- .../main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java index a6de7860d7138..cf29791fd4d22 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java @@ -558,7 +558,7 @@ public synchronized void releaseBuffer(ByteBuffer buffer) { @Override public synchronized void unbuffer() { - closeCurrentBlockReaders(); + super.unbuffer(); if (curStripeBuf != null) { BUFFER_POOL.putBuffer(curStripeBuf); curStripeBuf = null;