Skip to content

Commit 360a96f

Browse files
committed
HDFS-13709. Report bad block to NN when transfer block encounter EIO exception. Contributed by Chen Zhang.
1 parent abae6ff commit 360a96f

File tree

7 files changed

+208
-18
lines changed

7 files changed

+208
-18
lines changed

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,8 @@ class BlockSender implements java.io.Closeable {
183183
// would risk sending too much unnecessary data. 512 (1 disk sector)
184184
// is likely to result in minimal extra IO.
185185
private static final long CHUNK_SIZE = 512;
186+
187+
private static final String EIO_ERROR = "Input/output error";
186188
/**
187189
* Constructor
188190
*
@@ -576,7 +578,14 @@ private int sendPacket(ByteBuffer pkt, int maxChunks, OutputStream out,
576578

577579
int dataOff = checksumOff + checksumDataLen;
578580
if (!transferTo) { // normal transfer
579-
ris.readDataFully(buf, dataOff, dataLen);
581+
try {
582+
ris.readDataFully(buf, dataOff, dataLen);
583+
} catch (IOException ioe) {
584+
if (ioe.getMessage().startsWith(EIO_ERROR)) {
585+
throw new DiskFileCorruptException("A disk IO error occurred", ioe);
586+
}
587+
throw ioe;
588+
}
580589

581590
if (verifyChecksum) {
582591
verifyChecksum(buf, dataOff, dataLen, numChunks, checksumOff);
@@ -623,6 +632,13 @@ private int sendPacket(ByteBuffer pkt, int maxChunks, OutputStream out,
623632
* It was done here because the NIO throws an IOException for EPIPE.
624633
*/
625634
String ioem = e.getMessage();
635+
/*
636+
* If we got an EIO when reading files or transferTo the client socket,
637+
* it's very likely caused by bad disk track or other file corruptions.
638+
*/
639+
if (ioem.startsWith(EIO_ERROR)) {
640+
throw new DiskFileCorruptException("A disk IO error occurred", e);
641+
}
626642
if (!ioem.startsWith("Broken pipe") && !ioem.startsWith("Connection reset")) {
627643
LOG.error("BlockSender.sendChunks() exception: ", e);
628644
datanode.getBlockScanner().markSuspectBlock(

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java

Lines changed: 36 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2610,13 +2610,7 @@ public void run() {
26102610
metrics.incrBlocksReplicated();
26112611
}
26122612
} catch (IOException ie) {
2613-
if (ie instanceof InvalidChecksumSizeException) {
2614-
// Add the block to the front of the scanning queue if metadata file
2615-
// is corrupt. We already add the block to front of scanner if the
2616-
// peer disconnects.
2617-
LOG.info("Adding block: {} for scanning", b);
2618-
blockScanner.markSuspectBlock(data.getVolume(b).getStorageID(), b);
2619-
}
2613+
handleBadBlock(b, ie, false);
26202614
LOG.warn("{}:Failed to transfer {} to {} got",
26212615
bpReg, b, targets[0], ie);
26222616
} finally {
@@ -3462,6 +3456,41 @@ private void handleVolumeFailures(Set<FsVolumeSpi> unhealthyVolumes) {
34623456
handleDiskError(sb.toString());
34633457
}
34643458

3459+
/**
3460+
* A bad block need to be handled, either to add to blockScanner suspect queue
3461+
* or report to NameNode directly.
3462+
*
3463+
* If the method is called by scanner, then the block must be a bad block, we
3464+
* report it to NameNode directly. Otherwise if we judge it as a bad block
3465+
* according to exception type, then we try to add the bad block to
3466+
* blockScanner suspect queue if blockScanner is enabled, or report to
3467+
* NameNode directly otherwise.
3468+
*
3469+
* @param block The suspicious block
3470+
* @param e The exception encountered when accessing the block
3471+
* @param fromScanner Is it from blockScanner. The blockScanner will call this
3472+
* method only when it's sure that the block is corrupt.
3473+
*/
3474+
void handleBadBlock(ExtendedBlock block, IOException e, boolean fromScanner) {
3475+
3476+
boolean isBadBlock = fromScanner || (e instanceof DiskFileCorruptException
3477+
|| e instanceof InvalidChecksumSizeException);
3478+
3479+
if (!isBadBlock) {
3480+
return;
3481+
}
3482+
if (!fromScanner && blockScanner.isEnabled()) {
3483+
blockScanner.markSuspectBlock(data.getVolume(block).getStorageID(),
3484+
block);
3485+
} else {
3486+
try {
3487+
reportBadBlocks(block);
3488+
} catch (IOException ie) {
3489+
LOG.warn("report bad block {} failed", block, ie);
3490+
}
3491+
}
3492+
}
3493+
34653494
@VisibleForTesting
34663495
public long getLastDiskErrorCheck() {
34673496
return lastDiskErrorCheck;
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hdfs.server.datanode;
19+
20+
import java.io.IOException;
21+
22+
/**
23+
* When kernel report a "Input/output error", we use this exception to
24+
* represents some corruption(e.g. bad disk track) happened on some disk file.
25+
*/
26+
public class DiskFileCorruptException extends IOException {
27+
/**
28+
* Instantiate.
29+
* @param msg the exception message
30+
* @param cause the underlying cause
31+
*/
32+
public DiskFileCorruptException(String msg, Throwable cause) {
33+
super(msg, cause);
34+
}
35+
36+
public DiskFileCorruptException(String msg) {
37+
super(msg);
38+
}
39+
}

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/VolumeScanner.java

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -290,12 +290,7 @@ public void handle(ExtendedBlock block, IOException e) {
290290
return;
291291
}
292292
LOG.warn("Reporting bad {} on {}", block, volume);
293-
try {
294-
scanner.datanode.reportBadBlocks(block, volume);
295-
} catch (IOException ie) {
296-
// This is bad, but not bad enough to shut down the scanner.
297-
LOG.warn("Cannot report bad block " + block, ie);
298-
}
293+
scanner.datanode.handleBadBlock(block, e, true);
299294
}
300295
}
301296

hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReplication.java

Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import com.google.common.base.Supplier;
2828

2929
import java.io.IOException;
30+
import java.io.InputStream;
3031
import java.io.OutputStream;
3132
import java.net.InetSocketAddress;
3233
import java.util.ArrayList;
@@ -53,7 +54,9 @@
5354
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
5455
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
5556
import org.apache.hadoop.hdfs.server.datanode.DataNode;
57+
import org.apache.hadoop.hdfs.server.datanode.DataStorage;
5658
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
59+
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
5760
import org.apache.hadoop.hdfs.server.datanode.FsDatasetTestUtils.MaterializedReplica;
5861
import org.apache.hadoop.hdfs.server.datanode.InternalDataNodeTestUtils;
5962
import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
@@ -154,6 +157,67 @@ private void cleanupFile(FileSystem fileSys, Path name) throws IOException {
154157
assertTrue(!fileSys.exists(name));
155158
}
156159

160+
private static class CorruptFileSimulatedFSDataset
161+
extends SimulatedFSDataset {
162+
/**
163+
* Simulated input and output streams.
164+
*
165+
*/
166+
static private class CorruptFileSimulatedInputStream
167+
extends java.io.InputStream {
168+
private InputStream inputStream;
169+
170+
CorruptFileSimulatedInputStream(InputStream is) {
171+
inputStream = is;
172+
}
173+
174+
@Override
175+
public int read() throws IOException {
176+
int ret = inputStream.read();
177+
if (ret > 0) {
178+
throw new IOException("Input/output error");
179+
}
180+
return ret;
181+
}
182+
183+
@Override
184+
public int read(byte[] b) throws IOException {
185+
int ret = inputStream.read(b);
186+
if (ret > 0) {
187+
throw new IOException("Input/output error");
188+
}
189+
return ret;
190+
}
191+
}
192+
193+
CorruptFileSimulatedFSDataset(DataNode datanode, DataStorage storage,
194+
Configuration conf) {
195+
super(storage, conf);
196+
}
197+
198+
@Override
199+
public synchronized InputStream getBlockInputStream(ExtendedBlock b,
200+
long seekOffset) throws IOException {
201+
InputStream result = super.getBlockInputStream(b);
202+
IOUtils.skipFully(result, seekOffset);
203+
return new CorruptFileSimulatedInputStream(result);
204+
}
205+
206+
static class Factory
207+
extends FsDatasetSpi.Factory<CorruptFileSimulatedFSDataset> {
208+
@Override
209+
public CorruptFileSimulatedFSDataset newInstance(DataNode datanode,
210+
DataStorage storage, Configuration conf) throws IOException {
211+
return new CorruptFileSimulatedFSDataset(datanode, storage, conf);
212+
}
213+
214+
@Override
215+
public boolean isSimulated() {
216+
return true;
217+
}
218+
}
219+
}
220+
157221
private void testBadBlockReportOnTransfer(
158222
boolean corruptBlockByDeletingBlockFile) throws Exception {
159223
Configuration conf = new HdfsConfiguration();
@@ -205,6 +269,53 @@ private void testBadBlockReportOnTransfer(
205269
cluster.shutdown();
206270
}
207271

272+
@Test(timeout = 30000)
273+
public void testBadBlockReportOnTransferCorruptFile() throws Exception {
274+
Configuration conf = new HdfsConfiguration();
275+
conf.set(DFSConfigKeys.DFS_DATANODE_FSDATASET_FACTORY_KEY,
276+
CorruptFileSimulatedFSDataset.Factory.class.getName());
277+
// Disable BlockScanner to trigger reportBadBlocks
278+
conf.setLong(DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, -1L);
279+
FileSystem fs;
280+
int replicaCount = 0;
281+
short replFactor = 1;
282+
MiniDFSCluster cluster =
283+
new MiniDFSCluster.Builder(conf).numDataNodes(2).build();
284+
cluster.waitActive();
285+
try {
286+
fs = cluster.getFileSystem();
287+
final DFSClient dfsClient = new DFSClient(
288+
new InetSocketAddress("localhost", cluster.getNameNodePort()), conf);
289+
290+
// Create file with replication factor of 1
291+
Path file1 = new Path("/tmp/testBadBlockReportOnTransfer/file1");
292+
DFSTestUtil.createFile(fs, file1, 1024, replFactor, 0);
293+
DFSTestUtil.waitReplication(fs, file1, replFactor);
294+
295+
// Increase replication factor, this should invoke transfer request
296+
// Receiving datanode fails on checksum and reports it to namenode
297+
replFactor = 2;
298+
fs.setReplication(file1, replFactor);
299+
300+
// Now get block details and check if the block is corrupt
301+
GenericTestUtils.waitFor(() -> {
302+
try {
303+
return dfsClient.getNamenode()
304+
.getBlockLocations(file1.toString(), 0, Long.MAX_VALUE).get(0)
305+
.isCorrupt();
306+
} catch (IOException ie) {
307+
return false;
308+
}
309+
}, 1000, 15000);
310+
replicaCount = dfsClient.getNamenode()
311+
.getBlockLocations(file1.toString(), 0, Long.MAX_VALUE).get(0)
312+
.getLocations().length;
313+
assertEquals("replication should not success", 1, replicaCount);
314+
} finally {
315+
cluster.shutdown();
316+
}
317+
}
318+
208319
/*
209320
* Test if Datanode reports bad blocks during replication request
210321
*/

hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -598,7 +598,7 @@ public long getAvailable() throws IOException {
598598

599599
@Override
600600
public StorageType getStorageType() {
601-
return null;
601+
return StorageType.DISK;
602602
}
603603

604604
@Override
@@ -1178,7 +1178,7 @@ public synchronized ReplicaHandler createTemporary(StorageType storageType,
11781178
return new ReplicaHandler(binfo, null);
11791179
}
11801180

1181-
protected synchronized InputStream getBlockInputStream(ExtendedBlock b)
1181+
public synchronized InputStream getBlockInputStream(ExtendedBlock b)
11821182
throws IOException {
11831183
BInfo binfo = getBlockMap(b).get(b.getLocalBlock());
11841184
if (binfo == null) {

hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -240,10 +240,10 @@ public Boolean get() {
240240
@Test
241241
public void testDataTransferWhenBytesPerChecksumIsZero() throws IOException {
242242
DataNode dn0 = cluster.getDataNodes().get(0);
243-
// Make a mock blockScanner class and return false whenever isEnabled is
243+
// Make a mock blockScanner class and return true whenever isEnabled is
244244
// called on blockScanner
245245
BlockScanner mockScanner = Mockito.mock(BlockScanner.class);
246-
Mockito.when(mockScanner.isEnabled()).thenReturn(false);
246+
Mockito.when(mockScanner.isEnabled()).thenReturn(true);
247247
dn0.setBlockScanner(mockScanner);
248248
Path filePath = new Path("test.dat");
249249
FSDataOutputStream out = fs.create(filePath, (short) 1);

0 commit comments

Comments
 (0)