Skip to content

Commit 34b1406

Browse files
Stephen O'Donnelljojochuang
authored andcommitted
HDFS-14333. Datanode fails to start if any disk has errors during Namenode registration. Contributed by Stephen O'Donnell.
Signed-off-by: Wei-Chiu Chuang <[email protected]>
1 parent aab7b77 commit 34b1406

File tree

7 files changed

+215
-22
lines changed

7 files changed

+215
-22
lines changed

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,7 @@
166166
import org.apache.hadoop.hdfs.server.datanode.erasurecode.ErasureCodingWorker;
167167
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
168168
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
169+
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.AddBlockPoolException;
169170
import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeDiskMetrics;
170171
import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
171172
import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodePeerMetrics;
@@ -1689,13 +1690,37 @@ void initBlockPool(BPOfferService bpos) throws IOException {
16891690
// Exclude failed disks before initializing the block pools to avoid startup
16901691
// failures.
16911692
checkDiskError();
1692-
1693-
data.addBlockPool(nsInfo.getBlockPoolID(), getConf());
1693+
try {
1694+
data.addBlockPool(nsInfo.getBlockPoolID(), getConf());
1695+
} catch (AddBlockPoolException e) {
1696+
handleAddBlockPoolError(e);
1697+
}
16941698
blockScanner.enableBlockPoolId(bpos.getBlockPoolId());
16951699
initDirectoryScanner(getConf());
16961700
initDiskBalancer(data, getConf());
16971701
}
16981702

1703+
/**
1704+
* Handles an AddBlockPoolException object thrown from
1705+
* {@link org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeList#
1706+
* addBlockPool}. Will ensure that all volumes that encounted a
1707+
* AddBlockPoolException are removed from the DataNode and marked as failed
1708+
* volumes in the same way as a runtime volume failure.
1709+
*
1710+
* @param e this exception is a container for all IOException objects caught
1711+
* in FsVolumeList#addBlockPool.
1712+
*/
1713+
private void handleAddBlockPoolError(AddBlockPoolException e)
1714+
throws IOException {
1715+
Map<FsVolumeSpi, IOException> unhealthyDataDirs =
1716+
e.getFailingVolumes();
1717+
if (unhealthyDataDirs != null && !unhealthyDataDirs.isEmpty()) {
1718+
handleVolumeFailures(unhealthyDataDirs.keySet());
1719+
} else {
1720+
LOG.debug("HandleAddBlockPoolError called with empty exception list");
1721+
}
1722+
}
1723+
16991724
List<BPOfferService> getAllBpOs() {
17001725
return blockPoolManager.getAllNamenodeThreads();
17011726
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
20+
21+
import java.io.IOException;
22+
import java.util.Map;
23+
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
24+
25+
/**
26+
* This exception collects all IOExceptions thrown when adding block pools and
27+
* scanning volumes. It keeps the information about which volume is associated
28+
* with an exception.
29+
*
30+
*/
31+
public class AddBlockPoolException extends RuntimeException {
32+
private Map<FsVolumeSpi, IOException> unhealthyDataDirs;
33+
public AddBlockPoolException(Map<FsVolumeSpi, IOException>
34+
unhealthyDataDirs) {
35+
this.unhealthyDataDirs = unhealthyDataDirs;
36+
}
37+
38+
public Map<FsVolumeSpi, IOException> getFailingVolumes() {
39+
return unhealthyDataDirs;
40+
}
41+
@Override
42+
public String toString() {
43+
return getClass().getName() + ": " + unhealthyDataDirs.toString();
44+
}
45+
}

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java

Lines changed: 14 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.util.ArrayList;
2323
import java.util.Collection;
2424
import java.util.Collections;
25+
import java.util.concurrent.ConcurrentHashMap;
2526
import java.util.concurrent.ConcurrentLinkedQueue;
2627
import java.util.Iterator;
2728
import java.util.List;
@@ -188,8 +189,8 @@ void getAllVolumesMap(final String bpid,
188189
final RamDiskReplicaTracker ramDiskReplicaMap)
189190
throws IOException {
190191
long totalStartTime = Time.monotonicNow();
191-
final List<IOException> exceptions = Collections.synchronizedList(
192-
new ArrayList<IOException>());
192+
final Map<FsVolumeSpi, IOException> unhealthyDataDirs =
193+
new ConcurrentHashMap<FsVolumeSpi, IOException>();
193194
List<Thread> replicaAddingThreads = new ArrayList<Thread>();
194195
for (final FsVolumeImpl v : volumes) {
195196
Thread t = new Thread() {
@@ -208,7 +209,7 @@ public void run() {
208209
} catch (IOException ioe) {
209210
FsDatasetImpl.LOG.info("Caught exception while adding replicas " +
210211
"from " + v + ". Will throw later.", ioe);
211-
exceptions.add(ioe);
212+
unhealthyDataDirs.put(v, ioe);
212213
}
213214
}
214215
};
@@ -222,13 +223,13 @@ public void run() {
222223
throw new IOException(ie);
223224
}
224225
}
225-
if (!exceptions.isEmpty()) {
226-
throw exceptions.get(0);
227-
}
228226
long totalTimeTaken = Time.monotonicNow() - totalStartTime;
229227
FsDatasetImpl.LOG
230228
.info("Total time to add all replicas to map for block pool " + bpid
231229
+ ": " + totalTimeTaken + "ms");
230+
if (!unhealthyDataDirs.isEmpty()) {
231+
throw new AddBlockPoolException(unhealthyDataDirs);
232+
}
232233
}
233234

234235
/**
@@ -398,9 +399,8 @@ void removeVolumeFailureInfo(StorageLocation location) {
398399

399400
void addBlockPool(final String bpid, final Configuration conf) throws IOException {
400401
long totalStartTime = Time.monotonicNow();
401-
402-
final List<IOException> exceptions = Collections.synchronizedList(
403-
new ArrayList<IOException>());
402+
final Map<FsVolumeSpi, IOException> unhealthyDataDirs =
403+
new ConcurrentHashMap<FsVolumeSpi, IOException>();
404404
List<Thread> blockPoolAddingThreads = new ArrayList<Thread>();
405405
for (final FsVolumeImpl v : volumes) {
406406
Thread t = new Thread() {
@@ -418,7 +418,7 @@ public void run() {
418418
} catch (IOException ioe) {
419419
FsDatasetImpl.LOG.info("Caught exception while scanning " + v +
420420
". Will throw later.", ioe);
421-
exceptions.add(ioe);
421+
unhealthyDataDirs.put(v, ioe);
422422
}
423423
}
424424
};
@@ -432,15 +432,14 @@ public void run() {
432432
throw new IOException(ie);
433433
}
434434
}
435-
if (!exceptions.isEmpty()) {
436-
throw exceptions.get(0);
437-
}
438-
439435
long totalTimeTaken = Time.monotonicNow() - totalStartTime;
440436
FsDatasetImpl.LOG.info("Total time to scan all replicas for block pool " +
441437
bpid + ": " + totalTimeTaken + "ms");
438+
if (!unhealthyDataDirs.isEmpty()) {
439+
throw new AddBlockPoolException(unhealthyDataDirs);
440+
}
442441
}
443-
442+
444443
void removeBlockPool(String bpid, Map<DatanodeStorage, BlockListAsLongs>
445444
blocksPerVolume) {
446445
for (FsVolumeImpl v : volumes) {

hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2403,14 +2403,19 @@ public boolean restartDataNode(DataNodeProperties dnprop) throws IOException {
24032403
return restartDataNode(dnprop, false);
24042404
}
24052405

2406-
private void waitDataNodeFullyStarted(final DataNode dn)
2406+
public void waitDatanodeFullyStarted(DataNode dn, int timeout)
24072407
throws TimeoutException, InterruptedException {
24082408
GenericTestUtils.waitFor(new Supplier<Boolean>() {
24092409
@Override
24102410
public Boolean get() {
24112411
return dn.isDatanodeFullyStarted();
24122412
}
2413-
}, 100, 60000);
2413+
}, 100, timeout);
2414+
}
2415+
2416+
private void waitDataNodeFullyStarted(final DataNode dn)
2417+
throws TimeoutException, InterruptedException {
2418+
waitDatanodeFullyStarted(dn, 60000);
24142419
}
24152420

24162421
/**

hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -450,7 +450,7 @@ Map<Block, BInfo> getBlockMap() {
450450
* Class used for tracking datanode level storage utilization similar
451451
* to {@link FSVolumeSet}
452452
*/
453-
private static class SimulatedStorage {
453+
static class SimulatedStorage {
454454
private final Map<String, SimulatedBPStorage> map =
455455
new ConcurrentHashMap<>();
456456

@@ -635,7 +635,11 @@ public FsDatasetSpi getDataset() {
635635

636636
@Override
637637
public StorageLocation getStorageLocation() {
638-
return null;
638+
try {
639+
return StorageLocation.parse("[DISK]file:///simulated");
640+
} catch (Exception e) {
641+
return null;
642+
}
639643
}
640644

641645
@Override
@@ -681,6 +685,10 @@ public VolumeCheckResult check(VolumeCheckContext context)
681685
private final String datanodeUuid;
682686
private final DataNode datanode;
683687

688+
public List<SimulatedStorage> getStorages() {
689+
return storages;
690+
}
691+
684692
public SimulatedFSDataset(DataStorage storage, Configuration conf) {
685693
this(null, storage, conf);
686694
}

hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,19 +24,24 @@
2424
import static org.junit.Assert.assertNotNull;
2525
import static org.junit.Assert.assertThat;
2626
import static org.junit.Assert.assertTrue;
27+
import static org.junit.Assert.fail;
2728

2829
import java.io.File;
2930
import java.io.IOException;
3031
import java.net.InetSocketAddress;
3132
import java.net.Socket;
3233
import java.util.Collection;
3334
import java.util.HashMap;
35+
import java.util.Iterator;
3436
import java.util.List;
3537
import java.util.Map;
38+
import java.util.Set;
3639
import java.util.concurrent.TimeUnit;
40+
import java.util.concurrent.TimeoutException;
3741

3842
import org.apache.commons.io.FileUtils;
3943
import org.apache.commons.io.filefilter.TrueFileFilter;
44+
import org.apache.commons.lang3.ArrayUtils;
4045
import org.apache.hadoop.conf.Configuration;
4146
import org.apache.hadoop.fs.FileSystem;
4247
import org.apache.hadoop.fs.FileUtil;
@@ -64,6 +69,7 @@
6469
import org.apache.hadoop.hdfs.server.common.Storage;
6570
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
6671
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
72+
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.AddBlockPoolException;
6773
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetTestUtil;
6874
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
6975
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
@@ -219,6 +225,50 @@ public Boolean get() {
219225
" is created and replicated");
220226
}
221227

228+
/*
229+
* If one of the sub-folders under the finalized directory is unreadable,
230+
* either due to permissions or a filesystem corruption, the DN will fail
231+
* to read it when scanning it for blocks to load into the replica map. This
232+
* test ensures the DN does not exit and reports the failed volume to the
233+
* NN (HDFS-14333). This is done by using a simulated FsDataset that throws
234+
* an exception for a failed volume when the block pool is initialized.
235+
*/
236+
@Test(timeout=15000)
237+
public void testDnStartsAfterDiskErrorScanningBlockPool() throws Exception {
238+
// Don't use the cluster configured in the setup() method for this test.
239+
cluster.shutdown(true);
240+
cluster.close();
241+
242+
conf.set(DFSConfigKeys.DFS_DATANODE_FSDATASET_FACTORY_KEY,
243+
BadDiskFSDataset.Factory.class.getName());
244+
245+
final MiniDFSCluster localCluster = new MiniDFSCluster
246+
.Builder(conf).numDataNodes(1).build();
247+
248+
try {
249+
localCluster.waitActive();
250+
DataNode dn = localCluster.getDataNodes().get(0);
251+
252+
try {
253+
localCluster.waitDatanodeFullyStarted(dn, 3000);
254+
} catch (TimeoutException e) {
255+
fail("Datanode did not get fully started");
256+
}
257+
assertTrue(dn.isDatanodeUp());
258+
259+
// trigger DN to send heartbeat
260+
DataNodeTestUtils.triggerHeartbeat(dn);
261+
final BlockManager bm = localCluster.getNamesystem().getBlockManager();
262+
// trigger NN handle heartbeat
263+
BlockManagerTestUtil.checkHeartbeat(bm);
264+
265+
// NN now should have the failed volume
266+
assertEquals(1, localCluster.getNamesystem().getVolumeFailuresTotal());
267+
} finally {
268+
localCluster.close();
269+
}
270+
}
271+
222272
/**
223273
* Test that DataStorage and BlockPoolSliceStorage remove the failed volume
224274
* after failure.
@@ -760,4 +810,64 @@ private int countRealBlocks(Map<String, BlockLocs> map) {
760810
}
761811
return total;
762812
}
813+
814+
private static class BadDiskFSDataset extends SimulatedFSDataset {
815+
816+
BadDiskFSDataset(DataStorage storage, Configuration conf) {
817+
super(storage, conf);
818+
}
819+
820+
private String[] failedStorageLocations = null;
821+
822+
@Override
823+
public void addBlockPool(String bpid, Configuration conf) {
824+
super.addBlockPool(bpid, conf);
825+
Map<FsVolumeSpi, IOException>
826+
unhealthyDataDirs = new HashMap<>();
827+
unhealthyDataDirs.put(this.getStorages().get(0).getVolume(),
828+
new IOException());
829+
throw new AddBlockPoolException(unhealthyDataDirs);
830+
}
831+
832+
@Override
833+
public synchronized void removeVolumes(Collection<StorageLocation> volumes,
834+
boolean clearFailure) {
835+
Iterator<StorageLocation> itr = volumes.iterator();
836+
String[] failedLocations = new String[volumes.size()];
837+
int index = 0;
838+
while(itr.hasNext()) {
839+
StorageLocation s = itr.next();
840+
failedLocations[index] = s.getUri().getPath();
841+
index += 1;
842+
}
843+
failedStorageLocations = failedLocations;
844+
}
845+
846+
@Override
847+
public void handleVolumeFailures(Set<FsVolumeSpi> failedVolumes) {
848+
// do nothing
849+
}
850+
851+
@Override
852+
public VolumeFailureSummary getVolumeFailureSummary() {
853+
if (failedStorageLocations != null) {
854+
return new VolumeFailureSummary(failedStorageLocations, 0, 0);
855+
} else {
856+
return new VolumeFailureSummary(ArrayUtils.EMPTY_STRING_ARRAY, 0, 0);
857+
}
858+
}
859+
860+
static class Factory extends FsDatasetSpi.Factory<BadDiskFSDataset> {
861+
@Override
862+
public BadDiskFSDataset newInstance(DataNode datanode,
863+
DataStorage storage, Configuration conf) throws IOException {
864+
return new BadDiskFSDataset(storage, conf);
865+
}
866+
867+
@Override
868+
public boolean isSimulated() {
869+
return true;
870+
}
871+
}
872+
}
763873
}

hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImplTestUtils.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -500,6 +500,7 @@ public void verifyBlockPoolMissing(String bpid) throws IOException {
500500
* @param level the level to set
501501
*/
502502
public static void setFsDatasetImplLogLevel(Level level) {
503-
GenericTestUtils.setLogLevel(FsDatasetImpl.LOG, level);
503+
GenericTestUtils.setLogLevel(FsDatasetImpl.LOG,
504+
org.slf4j.event.Level.valueOf(level.toString()));
504505
}
505506
}

0 commit comments

Comments
 (0)