Skip to content

Commit 188f0c3

Browse files
kihwalHexiaoqiao
authored andcommitted
HDFS-15622. Deleted blocks linger in the replications queue. Contributed by Ahmed Hussein.
(cherry picked from commit da1b6e3)
1 parent f0b8980 commit 188f0c3

File tree

3 files changed

+81
-13
lines changed

3 files changed

+81
-13
lines changed

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/LowRedundancyBlocks.java

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.hadoop.hdfs.server.blockmanagement;
1919

2020
import java.util.ArrayList;
21+
import java.util.HashSet;
2122
import java.util.Iterator;
2223
import java.util.LinkedList;
2324
import java.util.List;
@@ -500,6 +501,8 @@ synchronized List<List<BlockInfo>> chooseLowRedundancyBlocks(
500501
* the block count is met or iteration reaches the end of the lowest priority
501502
* list, in which case bookmarks for each block list are reset to the heads
502503
* of their respective lists.
504+
* If a block is deleted (has invalid bcId), it will be removed from the low
505+
* redundancy queues.
503506
*
504507
* @param blocksToProcess - number of blocks to fetch from low redundancy
505508
* blocks.
@@ -515,21 +518,32 @@ synchronized List<List<BlockInfo>> chooseLowRedundancyBlocks(
515518

516519
int count = 0;
517520
int priority = 0;
521+
HashSet<BlockInfo> toRemove = new HashSet<>();
518522
for (; count < blocksToProcess && priority < LEVEL; priority++) {
519-
if (priority == QUEUE_WITH_CORRUPT_BLOCKS) {
520-
// do not choose corrupted blocks.
521-
continue;
522-
}
523-
524523
// Go through all blocks that need reconstructions with current priority.
525524
// Set the iterator to the first unprocessed block at this priority level
525+
// We do not want to skip QUEUE_WITH_CORRUPT_BLOCKS because we still need
526+
// to look for deleted blocks if any.
527+
final boolean inCorruptLevel = (QUEUE_WITH_CORRUPT_BLOCKS == priority);
526528
final Iterator<BlockInfo> i = priorityQueues.get(priority).getBookmark();
527529
final List<BlockInfo> blocks = new LinkedList<>();
528-
blocksToReconstruct.add(blocks);
529-
// Loop through all remaining blocks in the list.
530+
if (!inCorruptLevel) {
531+
blocksToReconstruct.add(blocks);
532+
}
530533
for(; count < blocksToProcess && i.hasNext(); count++) {
531-
blocks.add(i.next());
534+
BlockInfo block = i.next();
535+
if (block.isDeleted()) {
536+
toRemove.add(block);
537+
continue;
538+
}
539+
if (!inCorruptLevel) {
540+
blocks.add(block);
541+
}
542+
}
543+
for (BlockInfo bInfo : toRemove) {
544+
remove(bInfo, priority);
532545
}
546+
toRemove.clear();
533547
}
534548

535549
if (priority == LEVEL || resetIterators) {

hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestLowRedundancyBlockQueues.java

Lines changed: 46 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import java.util.Collection;
2222
import java.util.Iterator;
2323
import java.util.List;
24+
import java.util.concurrent.atomic.AtomicLong;
2425

2526
import org.apache.hadoop.hdfs.StripedFileTestUtil;
2627
import org.apache.hadoop.hdfs.protocol.Block;
@@ -41,6 +42,7 @@
4142
public class TestLowRedundancyBlockQueues {
4243

4344
private final ErasureCodingPolicy ecPolicy;
45+
private static AtomicLong mockINodeId = new AtomicLong(0);
4446

4547
public TestLowRedundancyBlockQueues(ErasureCodingPolicy policy) {
4648
ecPolicy = policy;
@@ -52,7 +54,15 @@ public static Collection<Object[]> policies() {
5254
}
5355

5456
private BlockInfo genBlockInfo(long id) {
55-
return new BlockInfoContiguous(new Block(id), (short) 3);
57+
return genBlockInfo(id, false);
58+
}
59+
60+
private BlockInfo genBlockInfo(long id, boolean isCorruptBlock) {
61+
BlockInfo bInfo = new BlockInfoContiguous(new Block(id), (short) 3);
62+
if (!isCorruptBlock) {
63+
bInfo.setBlockCollectionId(mockINodeId.incrementAndGet());
64+
}
65+
return bInfo;
5666
}
5767

5868
private BlockInfo genStripedBlockInfo(long id, long numBytes) {
@@ -93,6 +103,41 @@ private void verifyBlockStats(LowRedundancyBlocks queues,
93103
queues.getHighestPriorityECBlockCount());
94104
}
95105

106+
/**
107+
* Tests that deleted blocks should not be returned by
108+
* {@link LowRedundancyBlocks#chooseLowRedundancyBlocks(int, boolean)}.
109+
* @throws Exception
110+
*/
111+
@Test
112+
public void testDeletedBlocks() throws Exception {
113+
int numBlocks = 5;
114+
LowRedundancyBlocks queues = new LowRedundancyBlocks();
115+
// create 5 blockinfos. The first one is corrupt.
116+
for (int ind = 0; ind < numBlocks; ind++) {
117+
BlockInfo blockInfo = genBlockInfo(ind, ind == 0);
118+
queues.add(blockInfo, 2, 0, 0, 3);
119+
}
120+
List<List<BlockInfo>> blocks;
121+
// Get two blocks from the queue, but we should only get one because first
122+
// block is deleted.
123+
blocks = queues.chooseLowRedundancyBlocks(2, false);
124+
125+
assertEquals(1, blocks.get(2).size());
126+
assertEquals(1, blocks.get(2).get(0).getBlockId());
127+
128+
// Get the next blocks - should be ID 2
129+
blocks = queues.chooseLowRedundancyBlocks(1, false);
130+
assertEquals(2, blocks.get(2).get(0).getBlockId());
131+
132+
// Get the next block, but also reset this time - should be ID 3 returned
133+
blocks = queues.chooseLowRedundancyBlocks(1, true);
134+
assertEquals(3, blocks.get(2).get(0).getBlockId());
135+
136+
// Get one more block and due to resetting the queue it will be block id 1
137+
blocks = queues.chooseLowRedundancyBlocks(1, false);
138+
assertEquals(1, blocks.get(2).get(0).getBlockId());
139+
}
140+
96141
@Test
97142
public void testQueuePositionCanBeReset() throws Throwable {
98143
LowRedundancyBlocks queues = new LowRedundancyBlocks();

hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import java.util.Map;
3939
import java.util.Set;
4040
import java.util.concurrent.ThreadLocalRandom;
41+
import java.util.concurrent.atomic.AtomicLong;
4142

4243
import org.apache.hadoop.conf.Configuration;
4344
import org.apache.hadoop.hdfs.AddBlockFlag;
@@ -82,7 +83,7 @@ public class TestReplicationPolicy extends BaseReplicationPolicyTest {
8283
// The interval for marking a datanode as stale,
8384
private static final long staleInterval =
8485
DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_DEFAULT;
85-
86+
private static AtomicLong mockINodeId = new AtomicLong(0);
8687
@Rule
8788
public ExpectedException exception = ExpectedException.none();
8889

@@ -825,7 +826,15 @@ public void testRereplicate3() throws Exception {
825826
}
826827

827828
private BlockInfo genBlockInfo(long id) {
828-
return new BlockInfoContiguous(new Block(id), (short) 3);
829+
return genBlockInfo(id, false);
830+
}
831+
832+
private BlockInfo genBlockInfo(long id, boolean isBlockCorrupted) {
833+
BlockInfo bInfo = new BlockInfoContiguous(new Block(id), (short) 3);
834+
if (!isBlockCorrupted) {
835+
bInfo.setBlockCollectionId(mockINodeId.incrementAndGet());
836+
}
837+
return bInfo;
829838
}
830839

831840
/**
@@ -848,15 +857,15 @@ public void testReplicationWithPriority() throws Exception {
848857
// Adding the blocks directly to normal priority
849858

850859
neededReconstruction.add(genBlockInfo(ThreadLocalRandom.current().
851-
nextLong()), 2, 0, 0, 3);
860+
nextLong(), true), 2, 0, 0, 3);
852861
}
853862
// Lets wait for the replication interval, to start process normal
854863
// priority blocks
855864
Thread.sleep(DFS_NAMENODE_REPLICATION_INTERVAL);
856865

857866
// Adding the block directly to high priority list
858867
neededReconstruction.add(genBlockInfo(ThreadLocalRandom.current().
859-
nextLong()), 1, 0, 0, 3);
868+
nextLong(), true), 1, 0, 0, 3);
860869

861870
// Lets wait for the replication interval
862871
Thread.sleep(DFS_NAMENODE_REPLICATION_INTERVAL);

0 commit comments

Comments
 (0)