Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.ArrayDeque;
import java.util.Arrays;
import java.util.Queue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicLong;
import java.util.zip.Checksum;

Expand Down Expand Up @@ -532,7 +533,7 @@ private boolean shouldVerifyChecksum() {
* Receives and processes a packet. It can contain many chunks.
* returns the number of data bytes that the packet has.
*/
private int receivePacket() throws IOException {
private int receivePacket(final Semaphore ackSema) throws IOException {
// read the next packet
packetReceiver.receiveNextPacket(in);

Expand Down Expand Up @@ -616,6 +617,9 @@ private int receivePacket() throws IOException {
handleMirrorOutError(e);
}
}
if (ackSema != null) {
ackSema.release();
}

ByteBuffer dataBuf = packetReceiver.getDataSlice();
ByteBuffer checksumBuf = packetReceiver.getChecksumSlice();
Expand Down Expand Up @@ -984,13 +988,15 @@ void receiveBlock(
this.isReplaceBlock = isReplaceBlock;

try {
Semaphore ackSema = null;
if (isClient && !isTransfer) {
ackSema = new Semaphore(0);
responder = new Daemon(datanode.threadGroup,
new PacketResponder(replyOut, mirrIn, downstreams));
new PacketResponder(replyOut, mirrIn, downstreams, ackSema));
responder.start(); // start thread to processes responses
}

while (receivePacket() >= 0) { /* Receive until the last packet */ }
while (receivePacket(ackSema) >= 0) { /* Receive until the last packet */ }

// wait for all outstanding packet responses. And then
// indicate responder to gracefully shutdown.
Expand Down Expand Up @@ -1246,16 +1252,20 @@ class PacketResponder implements Runnable, Closeable {
/** for log and error messages */
private final String myString;
private boolean sending = false;
/** for synchronization with BlockReceiver */
private final Semaphore ackSema;

@Override
public String toString() {
return myString;
}

PacketResponder(final DataOutputStream upstreamOut,
final DataInputStream downstreamIn, final DatanodeInfo[] downstreams) {
final DataInputStream downstreamIn, final DatanodeInfo[] downstreams,
final Semaphore ackSema) {
this.downstreamIn = downstreamIn;
this.upstreamOut = upstreamOut;
this.ackSema = ackSema;

this.type = downstreams == null? PacketResponderType.NON_PIPELINE
: downstreams.length == 0? PacketResponderType.LAST_IN_PIPELINE
Expand Down Expand Up @@ -1395,6 +1405,9 @@ public void run() {
long seqno = PipelineAck.UNKOWN_SEQNO;
long ackRecvNanoTime = 0;
try {
if (ackSema != null) {
ackSema.acquire();
}
if (type != PacketResponderType.LAST_IN_PIPELINE && !mirrorError) {
DataNodeFaultInjector.get().failPipeline(replicaInfo, mirrorAddr);
// read an ack from downstream datanode
Expand Down