|
63 | 63 | import org.apache.hadoop.hdfs.protocol.ExtendedBlock; |
64 | 64 | import org.apache.hadoop.hdfs.protocol.LocatedBlock; |
65 | 65 | import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader; |
| 66 | +import org.apache.hadoop.hdfs.server.datanode.DataNode; |
66 | 67 | import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector; |
67 | 68 | import org.apache.hadoop.hdfs.server.datanode.ShortCircuitRegistry; |
68 | 69 | import org.apache.hadoop.hdfs.server.datanode.ShortCircuitRegistry.RegisteredShm; |
@@ -957,6 +958,83 @@ public void testDomainSocketClosedByDN() throws Exception { |
957 | 958 | } |
958 | 959 | } |
959 | 960 |
|
| 961 | + // Regression test for HDFS-16535 |
| 962 | + @Test(timeout = 60000) |
| 963 | + public void testDomainSocketClosedByMultipleDNs() throws Exception { |
| 964 | + TemporarySocketDirectory sockDir = new TemporarySocketDirectory(); |
| 965 | + String testName = "testDomainSocketClosedByMultipleDNs"; |
| 966 | + Configuration conf = createShortCircuitConf(testName, sockDir); |
| 967 | + conf.set(DFS_DOMAIN_SOCKET_PATH_KEY, new File(sockDir.getDir(), |
| 968 | + testName + "._PORT").getAbsolutePath()); |
| 969 | + MiniDFSCluster cluster = |
| 970 | + new MiniDFSCluster.Builder(conf).numDataNodes(2).build(); |
| 971 | + |
| 972 | + try { |
| 973 | + cluster.waitActive(); |
| 974 | + DistributedFileSystem fs = cluster.getFileSystem(); |
| 975 | + final ShortCircuitCache cache = |
| 976 | + fs.getClient().getClientContext().getShortCircuitCache(); |
| 977 | + |
| 978 | + ExtendedBlockId blockId0 = new ExtendedBlockId(123, "xyz"); |
| 979 | + ExtendedBlockId blockId1 = new ExtendedBlockId(456, "xyz"); |
| 980 | + |
| 981 | + DataNode dn0 = cluster.getDataNodes().get(0); |
| 982 | + DataNode dn1 = cluster.getDataNodes().get(1); |
| 983 | + |
| 984 | + DomainPeer peer0 = new DomainPeer(DomainSocket.connect(new File( |
| 985 | + sockDir.getDir(), testName + "." + dn0.getXferPort()).getAbsolutePath())); |
| 986 | + DomainPeer peer1 = new DomainPeer(DomainSocket.connect(new File( |
| 987 | + sockDir.getDir(), testName + "." + dn1.getXferPort()).getAbsolutePath())); |
| 988 | + |
| 989 | + final DatanodeInfo dnInfo0 = new DatanodeInfo.DatanodeInfoBuilder() |
| 990 | + .setNodeID(dn0.getDatanodeId()).build(); |
| 991 | + final DatanodeInfo dnInfo1 = new DatanodeInfo.DatanodeInfoBuilder() |
| 992 | + .setNodeID(dn1.getDatanodeId()).build(); |
| 993 | + |
| 994 | + // Allocate 2 shm slots from DataNode-0 |
| 995 | + MutableBoolean usedPeer = new MutableBoolean(false); |
| 996 | + Slot slot1 = cache.allocShmSlot(dnInfo0, peer0, usedPeer, blockId0, |
| 997 | + "testDomainSocketClosedByMultipleDNs_client"); |
| 998 | + dn0.getShortCircuitRegistry() |
| 999 | + .registerSlot(blockId0, slot1.getSlotId(), false); |
| 1000 | + |
| 1001 | + Slot slot2 = cache.allocShmSlot(dnInfo0, peer0, usedPeer, blockId0, |
| 1002 | + "testDomainSocketClosedByMultipleDNs_client"); |
| 1003 | + dn0.getShortCircuitRegistry() |
| 1004 | + .registerSlot(blockId0, slot2.getSlotId(), false); |
| 1005 | + |
| 1006 | + // Allocate 1 shm slot from DataNode-1 |
| 1007 | + Slot slot3 = cache.allocShmSlot(dnInfo1, peer1, usedPeer, blockId1, |
| 1008 | + "testDomainSocketClosedByMultipleDNs_client"); |
| 1009 | + dn1.getShortCircuitRegistry() |
| 1010 | + .registerSlot(blockId1, slot3.getSlotId(), false); |
| 1011 | + |
| 1012 | + Assert.assertEquals(2, cache.getDfsClientShmManager().getShmNum()); |
| 1013 | + Assert.assertEquals(1, dn0.getShortCircuitRegistry().getShmNum()); |
| 1014 | + Assert.assertEquals(1, dn1.getShortCircuitRegistry().getShmNum()); |
| 1015 | + |
| 1016 | + // Release the slot of DataNode-1 first. |
| 1017 | + cache.scheduleSlotReleaser(slot3); |
| 1018 | + Thread.sleep(2000); |
| 1019 | + Assert.assertEquals(1, cache.getDfsClientShmManager().getShmNum()); |
| 1020 | + |
| 1021 | + // Release the slots of DataNode-0. |
| 1022 | + cache.scheduleSlotReleaser(slot1); |
| 1023 | + Thread.sleep(2000); |
| 1024 | + Assert.assertEquals("0 ShmNum means the shm of DataNode-0 is shutdown" + |
| 1025 | + " due to slot release failures.", |
| 1026 | + 1, cache.getDfsClientShmManager().getShmNum()); |
| 1027 | + cache.scheduleSlotReleaser(slot2); |
| 1028 | + Thread.sleep(2000); |
| 1029 | + |
| 1030 | + Assert.assertEquals(0, dn0.getShortCircuitRegistry().getShmNum()); |
| 1031 | + Assert.assertEquals(0, dn1.getShortCircuitRegistry().getShmNum()); |
| 1032 | + Assert.assertEquals(0, cache.getDfsClientShmManager().getShmNum()); |
| 1033 | + } finally { |
| 1034 | + cluster.shutdown(); |
| 1035 | + } |
| 1036 | + } |
| 1037 | + |
960 | 1038 | @Test(timeout = 60000) |
961 | 1039 | public void testDNRestart() throws Exception { |
962 | 1040 | TemporarySocketDirectory sockDir = new TemporarySocketDirectory(); |
|
0 commit comments