Skip to content

Commit cdc5de6

Browse files
Santosh MarellaHexiaoqiao
authored andcommitted
HDFS-12914. Addendum patch. Block report leases cause missing blocks until next report. Contributed by Santosh Marella, He Xiaoqiao.
Signed-off-by: Wei-Chiu Chuang <[email protected]> Co-authored-by: He Xiaoqiao <[email protected]>
1 parent ae4143a commit cdc5de6

File tree

1 file changed

+156
-0
lines changed

1 file changed

+156
-0
lines changed
Lines changed: 156 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,156 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hadoop.hdfs.server.blockmanagement;
20+
21+
import org.apache.hadoop.hdfs.HdfsConfiguration;
22+
import org.apache.hadoop.hdfs.MiniDFSCluster;
23+
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
24+
import org.apache.hadoop.hdfs.server.datanode.DataNode;
25+
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
26+
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
27+
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
28+
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
29+
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
30+
import org.apache.hadoop.hdfs.server.protocol.FinalizeCommand;
31+
import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
32+
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
33+
import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
34+
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
35+
import org.apache.hadoop.test.GenericTestUtils.DelayAnswer;
36+
import org.junit.Test;
37+
38+
import java.util.ArrayList;
39+
import java.util.List;
40+
import java.util.Random;
41+
import java.util.concurrent.ExecutorService;
42+
import java.util.concurrent.Executors;
43+
import java.util.concurrent.Future;
44+
45+
import static org.junit.Assert.assertEquals;
46+
import static org.junit.Assert.assertTrue;
47+
import static org.mockito.ArgumentMatchers.any;
48+
import static org.mockito.Mockito.doAnswer;
49+
import static org.mockito.Mockito.spy;
50+
51+
/**
52+
* Tests that BlockReportLease in BlockManager.
53+
*/
54+
public class TestBlockReportLease {
55+
56+
/**
57+
* Test check lease about one BlockReport with many StorageBlockReport.
58+
* Before HDFS-12914, when batch storage report to NameNode, it will check
59+
* less for one storage by one, So it could part storage report can
60+
* be process normally, however, the rest storage report can not be process
61+
* since check lease failed.
62+
* After HDFS-12914, NameNode check lease once for every blockreport request,
63+
* So this issue will not exist anymore.
64+
*/
65+
@Test
66+
public void testCheckBlockReportLease() throws Exception {
67+
HdfsConfiguration conf = new HdfsConfiguration();
68+
Random rand = new Random();
69+
70+
try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
71+
.numDataNodes(1).build()) {
72+
cluster.waitActive();
73+
74+
FSNamesystem fsn = cluster.getNamesystem();
75+
BlockManager blockManager = fsn.getBlockManager();
76+
BlockManager spyBlockManager = spy(blockManager);
77+
fsn.setBlockManagerForTesting(spyBlockManager);
78+
String poolId = cluster.getNamesystem().getBlockPoolId();
79+
80+
NamenodeProtocols rpcServer = cluster.getNameNodeRpc();
81+
82+
// Test based on one DataNode report to Namenode
83+
DataNode dn = cluster.getDataNodes().get(0);
84+
DatanodeDescriptor datanodeDescriptor = spyBlockManager
85+
.getDatanodeManager().getDatanode(dn.getDatanodeId());
86+
87+
DatanodeRegistration dnRegistration = dn.getDNRegistrationForBP(poolId);
88+
StorageReport[] storages = dn.getFSDataset().getStorageReports(poolId);
89+
90+
// Send heartbeat and request full block report lease
91+
HeartbeatResponse hbResponse = rpcServer.sendHeartbeat(
92+
dnRegistration, storages, 0, 0, 0, 0, 0, null, true, null, null);
93+
94+
DelayAnswer delayer = new DelayAnswer(BlockManager.LOG);
95+
doAnswer(delayer).when(spyBlockManager).processReport(
96+
any(DatanodeStorageInfo.class),
97+
any(BlockListAsLongs.class),
98+
any(BlockReportContext.class));
99+
100+
ExecutorService pool = Executors.newFixedThreadPool(1);
101+
102+
// Trigger sendBlockReport
103+
BlockReportContext brContext = new BlockReportContext(1, 0,
104+
rand.nextLong(), hbResponse.getFullBlockReportLeaseId(), true);
105+
Future<DatanodeCommand> sendBRfuturea = pool.submit(() -> {
106+
// Build every storage with 100 blocks for sending report
107+
DatanodeStorage[] datanodeStorages
108+
= new DatanodeStorage[storages.length];
109+
for (int i = 0; i < storages.length; i++) {
110+
datanodeStorages[i] = storages[i].getStorage();
111+
}
112+
StorageBlockReport[] reports = createReports(datanodeStorages, 100);
113+
114+
// Send blockReport
115+
return rpcServer.blockReport(dnRegistration, poolId, reports,
116+
brContext);
117+
});
118+
119+
// Wait until BlockManager calls processReport
120+
delayer.waitForCall();
121+
122+
// Remove full block report lease about dn
123+
spyBlockManager.getBlockReportLeaseManager()
124+
.removeLease(datanodeDescriptor);
125+
126+
// Allow blockreport to proceed
127+
delayer.proceed();
128+
129+
// Get result, it will not null if process successfully
130+
DatanodeCommand datanodeCommand = sendBRfuturea.get();
131+
assertTrue(datanodeCommand instanceof FinalizeCommand);
132+
assertEquals(poolId, ((FinalizeCommand)datanodeCommand)
133+
.getBlockPoolId());
134+
}
135+
}
136+
137+
private StorageBlockReport[] createReports(DatanodeStorage[] dnStorages,
138+
int numBlocks) {
139+
int longsPerBlock = 3;
140+
int blockListSize = 2 + numBlocks * longsPerBlock;
141+
int numStorages = dnStorages.length;
142+
StorageBlockReport[] storageBlockReports
143+
= new StorageBlockReport[numStorages];
144+
for (int i = 0; i < numStorages; i++) {
145+
List<Long> longs = new ArrayList<Long>(blockListSize);
146+
longs.add(Long.valueOf(numBlocks));
147+
longs.add(0L);
148+
for (int j = 0; j < blockListSize; ++j) {
149+
longs.add(Long.valueOf(j));
150+
}
151+
BlockListAsLongs blockList = BlockListAsLongs.decodeLongs(longs);
152+
storageBlockReports[i] = new StorageBlockReport(dnStorages[i], blockList);
153+
}
154+
return storageBlockReports;
155+
}
156+
}

0 commit comments

Comments
 (0)