diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java index 3f1773ccc79e3..06b9ecb1790bc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java @@ -32,6 +32,7 @@ import java.util.ArrayDeque; import java.util.Arrays; import java.util.Queue; +import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicLong; import java.util.zip.Checksum; @@ -532,7 +533,7 @@ private boolean shouldVerifyChecksum() { * Receives and processes a packet. It can contain many chunks. * returns the number of data bytes that the packet has. */ - private int receivePacket() throws IOException { + private int receivePacket(final Semaphore ackSema) throws IOException { // read the next packet packetReceiver.receiveNextPacket(in); @@ -616,6 +617,9 @@ private int receivePacket() throws IOException { handleMirrorOutError(e); } } + if (ackSema != null) { + ackSema.release(); + } ByteBuffer dataBuf = packetReceiver.getDataSlice(); ByteBuffer checksumBuf = packetReceiver.getChecksumSlice(); @@ -984,13 +988,15 @@ void receiveBlock( this.isReplaceBlock = isReplaceBlock; try { + Semaphore ackSema = null; if (isClient && !isTransfer) { + ackSema = new Semaphore(0); responder = new Daemon(datanode.threadGroup, - new PacketResponder(replyOut, mirrIn, downstreams)); + new PacketResponder(replyOut, mirrIn, downstreams, ackSema)); responder.start(); // start thread to processes responses } - while (receivePacket() >= 0) { /* Receive until the last packet */ } + while (receivePacket(ackSema) >= 0) { /* Receive until the last packet */ } // wait for all outstanding packet responses. And then // indicate responder to gracefully shutdown. @@ -1246,6 +1252,8 @@ class PacketResponder implements Runnable, Closeable { /** for log and error messages */ private final String myString; private boolean sending = false; + /** for synchronization with BlockReceiver */ + private final Semaphore ackSema; @Override public String toString() { @@ -1253,9 +1261,11 @@ public String toString() { } PacketResponder(final DataOutputStream upstreamOut, - final DataInputStream downstreamIn, final DatanodeInfo[] downstreams) { + final DataInputStream downstreamIn, final DatanodeInfo[] downstreams, + final Semaphore ackSema) { this.downstreamIn = downstreamIn; this.upstreamOut = upstreamOut; + this.ackSema = ackSema; this.type = downstreams == null? PacketResponderType.NON_PIPELINE : downstreams.length == 0? PacketResponderType.LAST_IN_PIPELINE @@ -1395,6 +1405,9 @@ public void run() { long seqno = PipelineAck.UNKOWN_SEQNO; long ackRecvNanoTime = 0; try { + if (ackSema != null) { + ackSema.acquire(); + } if (type != PacketResponderType.LAST_IN_PIPELINE && !mirrorError) { DataNodeFaultInjector.get().failPipeline(replicaInfo, mirrorAddr); // read an ack from downstream datanode