Skip to content

Commit 02bc8ad

Browse files
committed
Add regression test for HDFS-16473
1 parent 45f926d commit 02bc8ad

File tree

1 file changed

+78
-0
lines changed

1 file changed

+78
-0
lines changed

hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitCache.java

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@
6363
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
6464
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
6565
import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
66+
import org.apache.hadoop.hdfs.server.datanode.DataNode;
6667
import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector;
6768
import org.apache.hadoop.hdfs.server.datanode.ShortCircuitRegistry;
6869
import org.apache.hadoop.hdfs.server.datanode.ShortCircuitRegistry.RegisteredShm;
@@ -957,6 +958,83 @@ public void testDomainSocketClosedByDN() throws Exception {
957958
}
958959
}
959960

961+
// Regression test for HDFS-16473
962+
@Test(timeout = 60000)
963+
public void testDomainSocketClosedByMultipleDNs() throws Exception {
964+
TemporarySocketDirectory sockDir = new TemporarySocketDirectory();
965+
String testName = "testDomainSocketClosedByMultipleDNs";
966+
Configuration conf = createShortCircuitConf(testName, sockDir);
967+
conf.set(DFS_DOMAIN_SOCKET_PATH_KEY, new File(sockDir.getDir(),
968+
testName + "._PORT").getAbsolutePath());
969+
MiniDFSCluster cluster =
970+
new MiniDFSCluster.Builder(conf).numDataNodes(2).build();
971+
972+
try {
973+
cluster.waitActive();
974+
DistributedFileSystem fs = cluster.getFileSystem();
975+
final ShortCircuitCache cache =
976+
fs.getClient().getClientContext().getShortCircuitCache();
977+
978+
ExtendedBlockId blockId0 = new ExtendedBlockId(123, "xyz");
979+
ExtendedBlockId blockId1 = new ExtendedBlockId(456, "xyz");
980+
981+
DataNode dn0 = cluster.getDataNodes().get(0);
982+
DataNode dn1 = cluster.getDataNodes().get(1);
983+
984+
DomainPeer peer0 = new DomainPeer(DomainSocket.connect(new File(
985+
sockDir.getDir(), testName + "." + dn0.getXferPort()).getAbsolutePath()));
986+
DomainPeer peer1 = new DomainPeer(DomainSocket.connect(new File(
987+
sockDir.getDir(), testName + "." + dn1.getXferPort()).getAbsolutePath()));
988+
989+
final DatanodeInfo dnInfo0 = new DatanodeInfo.DatanodeInfoBuilder()
990+
.setNodeID(dn0.getDatanodeId()).build();
991+
final DatanodeInfo dnInfo1 = new DatanodeInfo.DatanodeInfoBuilder()
992+
.setNodeID(dn1.getDatanodeId()).build();
993+
994+
// Allocate 2 shm slots from DataNode-0
995+
MutableBoolean usedPeer = new MutableBoolean(false);
996+
Slot slot1 = cache.allocShmSlot(dnInfo0, peer0, usedPeer, blockId0,
997+
"testDomainSocketClosedByMultipleDNs_client");
998+
dn0.getShortCircuitRegistry()
999+
.registerSlot(blockId0, slot1.getSlotId(), false);
1000+
1001+
Slot slot2 = cache.allocShmSlot(dnInfo0, peer0, usedPeer, blockId0,
1002+
"testDomainSocketClosedByMultipleDNs_client");
1003+
dn0.getShortCircuitRegistry()
1004+
.registerSlot(blockId0, slot2.getSlotId(), false);
1005+
1006+
// Allocate 1 shm slot from DataNode-1
1007+
Slot slot3 = cache.allocShmSlot(dnInfo1, peer1, usedPeer, blockId1,
1008+
"testDomainSocketClosedByMultipleDNs_client");
1009+
dn1.getShortCircuitRegistry()
1010+
.registerSlot(blockId1, slot3.getSlotId(), false);
1011+
1012+
Assert.assertEquals(2, cache.getDfsClientShmManager().getShmNum());
1013+
Assert.assertEquals(1, dn0.getShortCircuitRegistry().getShmNum());
1014+
Assert.assertEquals(1, dn1.getShortCircuitRegistry().getShmNum());
1015+
1016+
// Release the slot of DataNode-1 first.
1017+
cache.scheduleSlotReleaser(slot3);
1018+
Thread.sleep(2000);
1019+
Assert.assertEquals(1, cache.getDfsClientShmManager().getShmNum());
1020+
1021+
// Release the slots of DataNode-0.
1022+
cache.scheduleSlotReleaser(slot1);
1023+
Thread.sleep(2000);
1024+
Assert.assertEquals("0 ShmNum means the shm of DataNode-0 is shutdown" +
1025+
" due to slot release failures.",
1026+
1, cache.getDfsClientShmManager().getShmNum());
1027+
cache.scheduleSlotReleaser(slot2);
1028+
Thread.sleep(2000);
1029+
1030+
Assert.assertEquals(0, dn0.getShortCircuitRegistry().getShmNum());
1031+
Assert.assertEquals(0, dn1.getShortCircuitRegistry().getShmNum());
1032+
Assert.assertEquals(0, cache.getDfsClientShmManager().getShmNum());
1033+
} finally {
1034+
cluster.shutdown();
1035+
}
1036+
}
1037+
9601038
@Test(timeout = 60000)
9611039
public void testDNRestart() throws Exception {
9621040
TemporarySocketDirectory sockDir = new TemporarySocketDirectory();

0 commit comments

Comments
 (0)