Skip to content

Commit 4b51258

Browse files
stiga-huangHarshitGupta11
authored andcommitted
HDFS-16535. SlotReleaser should reuse the domain socket based on socket paths (apache#4158)
Reviewed-by: Lisheng Sun <[email protected]>
1 parent 5ff3b9d commit 4b51258

File tree

2 files changed

+95
-8
lines changed

2 files changed

+95
-8
lines changed

hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitCache.java

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -189,16 +189,18 @@ public void run() {
189189
final DfsClientShm shm = (DfsClientShm)slot.getShm();
190190
final DomainSocket shmSock = shm.getPeer().getDomainSocket();
191191
final String path = shmSock.getPath();
192+
DomainSocket domainSocket = pathToDomainSocket.get(path);
192193
DataOutputStream out = null;
193194
boolean success = false;
194195
int retries = 2;
195196
try {
196197
while (retries > 0) {
197198
try {
198199
if (domainSocket == null || !domainSocket.isOpen()) {
199-
// we are running in single thread mode, no protection needed for
200-
// domainSocket
201200
domainSocket = DomainSocket.connect(path);
201+
// we are running in single thread mode, no protection needed for
202+
// pathToDomainSocket
203+
pathToDomainSocket.put(path, domainSocket);
202204
}
203205

204206
out = new DataOutputStream(
@@ -221,13 +223,16 @@ public void run() {
221223
} catch (SocketException se) {
222224
// the domain socket on datanode may be timed out, we retry once
223225
retries--;
224-
domainSocket.close();
225-
domainSocket = null;
226+
if (domainSocket != null) {
227+
domainSocket.close();
228+
domainSocket = null;
229+
pathToDomainSocket.remove(path);
230+
}
226231
if (retries == 0) {
227232
throw new SocketException("Create domain socket failed");
228233
}
229234
}
230-
}
235+
} // end of while block
231236
} catch (IOException e) {
232237
LOG.warn(ShortCircuitCache.this + ": failed to release "
233238
+ "short-circuit shared memory slot " + slot + " by sending "
@@ -240,10 +245,10 @@ public void run() {
240245
} else {
241246
shm.getEndpointShmManager().shutdown(shm);
242247
IOUtilsClient.cleanupWithLogger(LOG, domainSocket, out);
243-
domainSocket = null;
248+
pathToDomainSocket.remove(path);
244249
}
245250
}
246-
}
251+
} // end of run()
247252
}
248253

249254
public interface ShortCircuitReplicaCreator {
@@ -354,7 +359,11 @@ public interface ShortCircuitReplicaCreator {
354359
*/
355360
private final DfsClientShmManager shmManager;
356361

357-
private DomainSocket domainSocket = null;
362+
/**
363+
* A map contains all DomainSockets used in SlotReleaser. Keys are the domain socket
364+
* paths of short-circuit shared memory segments.
365+
*/
366+
private Map<String, DomainSocket> pathToDomainSocket = new HashMap<>();
358367

359368
public static ShortCircuitCache fromConf(ShortCircuitConf conf) {
360369
return new ShortCircuitCache(

hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitCache.java

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@
6363
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
6464
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
6565
import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
66+
import org.apache.hadoop.hdfs.server.datanode.DataNode;
6667
import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector;
6768
import org.apache.hadoop.hdfs.server.datanode.ShortCircuitRegistry;
6869
import org.apache.hadoop.hdfs.server.datanode.ShortCircuitRegistry.RegisteredShm;
@@ -957,6 +958,83 @@ public void testDomainSocketClosedByDN() throws Exception {
957958
}
958959
}
959960

961+
// Regression test for HDFS-16535
962+
@Test(timeout = 60000)
963+
public void testDomainSocketClosedByMultipleDNs() throws Exception {
964+
TemporarySocketDirectory sockDir = new TemporarySocketDirectory();
965+
String testName = "testDomainSocketClosedByMultipleDNs";
966+
Configuration conf = createShortCircuitConf(testName, sockDir);
967+
conf.set(DFS_DOMAIN_SOCKET_PATH_KEY, new File(sockDir.getDir(),
968+
testName + "._PORT").getAbsolutePath());
969+
MiniDFSCluster cluster =
970+
new MiniDFSCluster.Builder(conf).numDataNodes(2).build();
971+
972+
try {
973+
cluster.waitActive();
974+
DistributedFileSystem fs = cluster.getFileSystem();
975+
final ShortCircuitCache cache =
976+
fs.getClient().getClientContext().getShortCircuitCache();
977+
978+
ExtendedBlockId blockId0 = new ExtendedBlockId(123, "xyz");
979+
ExtendedBlockId blockId1 = new ExtendedBlockId(456, "xyz");
980+
981+
DataNode dn0 = cluster.getDataNodes().get(0);
982+
DataNode dn1 = cluster.getDataNodes().get(1);
983+
984+
DomainPeer peer0 = new DomainPeer(DomainSocket.connect(new File(
985+
sockDir.getDir(), testName + "." + dn0.getXferPort()).getAbsolutePath()));
986+
DomainPeer peer1 = new DomainPeer(DomainSocket.connect(new File(
987+
sockDir.getDir(), testName + "." + dn1.getXferPort()).getAbsolutePath()));
988+
989+
final DatanodeInfo dnInfo0 = new DatanodeInfo.DatanodeInfoBuilder()
990+
.setNodeID(dn0.getDatanodeId()).build();
991+
final DatanodeInfo dnInfo1 = new DatanodeInfo.DatanodeInfoBuilder()
992+
.setNodeID(dn1.getDatanodeId()).build();
993+
994+
// Allocate 2 shm slots from DataNode-0
995+
MutableBoolean usedPeer = new MutableBoolean(false);
996+
Slot slot1 = cache.allocShmSlot(dnInfo0, peer0, usedPeer, blockId0,
997+
"testDomainSocketClosedByMultipleDNs_client");
998+
dn0.getShortCircuitRegistry()
999+
.registerSlot(blockId0, slot1.getSlotId(), false);
1000+
1001+
Slot slot2 = cache.allocShmSlot(dnInfo0, peer0, usedPeer, blockId0,
1002+
"testDomainSocketClosedByMultipleDNs_client");
1003+
dn0.getShortCircuitRegistry()
1004+
.registerSlot(blockId0, slot2.getSlotId(), false);
1005+
1006+
// Allocate 1 shm slot from DataNode-1
1007+
Slot slot3 = cache.allocShmSlot(dnInfo1, peer1, usedPeer, blockId1,
1008+
"testDomainSocketClosedByMultipleDNs_client");
1009+
dn1.getShortCircuitRegistry()
1010+
.registerSlot(blockId1, slot3.getSlotId(), false);
1011+
1012+
Assert.assertEquals(2, cache.getDfsClientShmManager().getShmNum());
1013+
Assert.assertEquals(1, dn0.getShortCircuitRegistry().getShmNum());
1014+
Assert.assertEquals(1, dn1.getShortCircuitRegistry().getShmNum());
1015+
1016+
// Release the slot of DataNode-1 first.
1017+
cache.scheduleSlotReleaser(slot3);
1018+
Thread.sleep(2000);
1019+
Assert.assertEquals(1, cache.getDfsClientShmManager().getShmNum());
1020+
1021+
// Release the slots of DataNode-0.
1022+
cache.scheduleSlotReleaser(slot1);
1023+
Thread.sleep(2000);
1024+
Assert.assertEquals("0 ShmNum means the shm of DataNode-0 is shutdown" +
1025+
" due to slot release failures.",
1026+
1, cache.getDfsClientShmManager().getShmNum());
1027+
cache.scheduleSlotReleaser(slot2);
1028+
Thread.sleep(2000);
1029+
1030+
Assert.assertEquals(0, dn0.getShortCircuitRegistry().getShmNum());
1031+
Assert.assertEquals(0, dn1.getShortCircuitRegistry().getShmNum());
1032+
Assert.assertEquals(0, cache.getDfsClientShmManager().getShmNum());
1033+
} finally {
1034+
cluster.shutdown();
1035+
}
1036+
}
1037+
9601038
@Test(timeout = 60000)
9611039
public void testDNRestart() throws Exception {
9621040
TemporarySocketDirectory sockDir = new TemporarySocketDirectory();

0 commit comments

Comments
 (0)