Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -189,16 +189,18 @@ public void run() {
final DfsClientShm shm = (DfsClientShm)slot.getShm();
final DomainSocket shmSock = shm.getPeer().getDomainSocket();
final String path = shmSock.getPath();
DomainSocket domainSocket = pathToDomainSocket.get(path);
DataOutputStream out = null;
boolean success = false;
int retries = 2;
try {
while (retries > 0) {
try {
if (domainSocket == null || !domainSocket.isOpen()) {
// we are running in single thread mode, no protection needed for
// domainSocket
domainSocket = DomainSocket.connect(path);
// we are running in single thread mode, no protection needed for
// pathToDomainSocket
pathToDomainSocket.put(path, domainSocket);
}

out = new DataOutputStream(
Expand All @@ -221,13 +223,16 @@ public void run() {
} catch (SocketException se) {
// the domain socket on datanode may be timed out, we retry once
retries--;
domainSocket.close();
domainSocket = null;
if (domainSocket != null) {
domainSocket.close();
domainSocket = null;
pathToDomainSocket.remove(path);
}
if (retries == 0) {
throw new SocketException("Create domain socket failed");
}
}
}
} // end of while block
} catch (IOException e) {
LOG.warn(ShortCircuitCache.this + ": failed to release "
+ "short-circuit shared memory slot " + slot + " by sending "
Expand All @@ -240,10 +245,10 @@ public void run() {
} else {
shm.getEndpointShmManager().shutdown(shm);
IOUtilsClient.cleanupWithLogger(LOG, domainSocket, out);
domainSocket = null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do you remove the line 243? If remove it, the line 228 looks redundant.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do you remove the line 243?

domainSocket is now a local variable (defined at line 192). We don't need to update it at the end of this method. Note that this finally clause belongs to the try-clause starts at line 195.

If remove it, the line 228 looks redundant.

For line 228, it's needed in the next retry of the while-loop. The check at line 199 will catch it and trigger a re-connect.

pathToDomainSocket.remove(path);
}
}
}
} // end of run()
}

public interface ShortCircuitReplicaCreator {
Expand Down Expand Up @@ -354,7 +359,11 @@ public interface ShortCircuitReplicaCreator {
*/
private final DfsClientShmManager shmManager;

private DomainSocket domainSocket = null;
/**
* A map contains all DomainSockets used in SlotReleaser. Keys are the domain socket
* paths of short-circuit shared memory segments.
*/
private Map<String, DomainSocket> pathToDomainSocket = new HashMap<>();

public static ShortCircuitCache fromConf(ShortCircuitConf conf) {
return new ShortCircuitCache(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector;
import org.apache.hadoop.hdfs.server.datanode.ShortCircuitRegistry;
import org.apache.hadoop.hdfs.server.datanode.ShortCircuitRegistry.RegisteredShm;
Expand Down Expand Up @@ -957,6 +958,83 @@ public void testDomainSocketClosedByDN() throws Exception {
}
}

// Regression test for HDFS-16535
@Test(timeout = 60000)
public void testDomainSocketClosedByMultipleDNs() throws Exception {
TemporarySocketDirectory sockDir = new TemporarySocketDirectory();
String testName = "testDomainSocketClosedByMultipleDNs";
Configuration conf = createShortCircuitConf(testName, sockDir);
conf.set(DFS_DOMAIN_SOCKET_PATH_KEY, new File(sockDir.getDir(),
testName + "._PORT").getAbsolutePath());
MiniDFSCluster cluster =
new MiniDFSCluster.Builder(conf).numDataNodes(2).build();

try {
cluster.waitActive();
DistributedFileSystem fs = cluster.getFileSystem();
final ShortCircuitCache cache =
fs.getClient().getClientContext().getShortCircuitCache();

ExtendedBlockId blockId0 = new ExtendedBlockId(123, "xyz");
ExtendedBlockId blockId1 = new ExtendedBlockId(456, "xyz");

DataNode dn0 = cluster.getDataNodes().get(0);
DataNode dn1 = cluster.getDataNodes().get(1);

DomainPeer peer0 = new DomainPeer(DomainSocket.connect(new File(
sockDir.getDir(), testName + "." + dn0.getXferPort()).getAbsolutePath()));
DomainPeer peer1 = new DomainPeer(DomainSocket.connect(new File(
sockDir.getDir(), testName + "." + dn1.getXferPort()).getAbsolutePath()));

final DatanodeInfo dnInfo0 = new DatanodeInfo.DatanodeInfoBuilder()
.setNodeID(dn0.getDatanodeId()).build();
final DatanodeInfo dnInfo1 = new DatanodeInfo.DatanodeInfoBuilder()
.setNodeID(dn1.getDatanodeId()).build();

// Allocate 2 shm slots from DataNode-0
MutableBoolean usedPeer = new MutableBoolean(false);
Slot slot1 = cache.allocShmSlot(dnInfo0, peer0, usedPeer, blockId0,
"testDomainSocketClosedByMultipleDNs_client");
dn0.getShortCircuitRegistry()
.registerSlot(blockId0, slot1.getSlotId(), false);

Slot slot2 = cache.allocShmSlot(dnInfo0, peer0, usedPeer, blockId0,
"testDomainSocketClosedByMultipleDNs_client");
dn0.getShortCircuitRegistry()
.registerSlot(blockId0, slot2.getSlotId(), false);

// Allocate 1 shm slot from DataNode-1
Slot slot3 = cache.allocShmSlot(dnInfo1, peer1, usedPeer, blockId1,
"testDomainSocketClosedByMultipleDNs_client");
dn1.getShortCircuitRegistry()
.registerSlot(blockId1, slot3.getSlotId(), false);

Assert.assertEquals(2, cache.getDfsClientShmManager().getShmNum());
Assert.assertEquals(1, dn0.getShortCircuitRegistry().getShmNum());
Assert.assertEquals(1, dn1.getShortCircuitRegistry().getShmNum());

// Release the slot of DataNode-1 first.
cache.scheduleSlotReleaser(slot3);
Thread.sleep(2000);
Assert.assertEquals(1, cache.getDfsClientShmManager().getShmNum());

// Release the slots of DataNode-0.
cache.scheduleSlotReleaser(slot1);
Thread.sleep(2000);
Assert.assertEquals("0 ShmNum means the shm of DataNode-0 is shutdown" +
" due to slot release failures.",
1, cache.getDfsClientShmManager().getShmNum());
cache.scheduleSlotReleaser(slot2);
Thread.sleep(2000);

Assert.assertEquals(0, dn0.getShortCircuitRegistry().getShmNum());
Assert.assertEquals(0, dn1.getShortCircuitRegistry().getShmNum());
Assert.assertEquals(0, cache.getDfsClientShmManager().getShmNum());
} finally {
cluster.shutdown();
}
}

@Test(timeout = 60000)
public void testDNRestart() throws Exception {
TemporarySocketDirectory sockDir = new TemporarySocketDirectory();
Expand Down