Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,13 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final int
DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_DEFAULT = 300;

public static final String DFS_NAMENODE_EXCESS_REDUNDANCY_TIMEOUT_SEC_KEY =
"dfs.namenode.excess.redundancy.timeout-sec";
public static final long DFS_NAMENODE_EXCESS_REDUNDANCY_TIMEOUT_SEC_DEAFULT = 3600;
public static final String DFS_NAMENODE_EXCESS_REDUNDANCY_TIMEOUT_CHECK_LIMIT
= "dfs.namenode.excess.redundancy.timeout.check.limit";
public static final long DFS_NAMENODE_EXCESS_REDUNDANCY_TIMEOUT_CHECK_LIMIT_DEFAULT = 1000;

public static final String DFS_NAMENODE_MAINTENANCE_REPLICATION_MIN_KEY =
"dfs.namenode.maintenance.replication.min";
public static final int DFS_NAMENODE_MAINTENANCE_REPLICATION_MIN_DEFAULT
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.BitSet;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
Expand All @@ -50,6 +51,7 @@
import java.util.concurrent.ConcurrentLinkedQueue;

import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import javax.management.ObjectName;

import org.apache.hadoop.HadoopIllegalArgumentException;
Expand Down Expand Up @@ -86,6 +88,7 @@
import org.apache.hadoop.hdfs.server.blockmanagement.NumberReplicas.StoredReplicaState;
import org.apache.hadoop.hdfs.server.blockmanagement.PendingDataNodeMessages.ReportedBlockInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.PendingReconstructionBlocks.PendingBlockInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.ExcessRedundancyMap.ExcessBlockInfo;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
import org.apache.hadoop.hdfs.server.namenode.CachedBlock;
Expand Down Expand Up @@ -116,6 +119,7 @@

import static org.apache.hadoop.hdfs.util.StripedBlockUtil.getInternalBlockLength;

import org.apache.hadoop.hdfs.util.LightWeightHashSet;
import org.apache.hadoop.metrics2.util.MBeans;
import org.apache.hadoop.net.Node;
import org.apache.hadoop.security.UserGroupInformation;
Expand Down Expand Up @@ -482,6 +486,16 @@ public int getPendingSPSPaths() {
/** Storages accessible from multiple DNs. */
private final ProvidedStorageMap providedStorageMap;

/**
* Timeout for excess redundancy block.
*/
private long excessRedundancyTimeout;

/**
* Limits number of blocks used to check for excess redundancy timeout.
*/
private long excessRedundancyTimeoutCheckLimit;

public BlockManager(final Namesystem namesystem, boolean haEnabled,
final Configuration conf) throws IOException {
this.namesystem = namesystem;
Expand Down Expand Up @@ -589,6 +603,12 @@ public BlockManager(final Namesystem namesystem, boolean haEnabled,
conf.getBoolean(DFS_NAMENODE_CORRUPT_BLOCK_DELETE_IMMEDIATELY_ENABLED,
DFS_NAMENODE_CORRUPT_BLOCK_DELETE_IMMEDIATELY_ENABLED_DEFAULT);

setExcessRedundancyTimeout(conf.getLong(DFS_NAMENODE_EXCESS_REDUNDANCY_TIMEOUT_SEC_KEY,
DFS_NAMENODE_EXCESS_REDUNDANCY_TIMEOUT_SEC_DEAFULT));
setExcessRedundancyTimeoutCheckLimit(conf.getLong(
DFS_NAMENODE_EXCESS_REDUNDANCY_TIMEOUT_CHECK_LIMIT,
DFS_NAMENODE_EXCESS_REDUNDANCY_TIMEOUT_CHECK_LIMIT_DEFAULT));

printInitialConfigs();
}

Expand Down Expand Up @@ -3040,6 +3060,100 @@ void rescanPostponedMisreplicatedBlocks() {
(Time.monotonicNow() - startTime), endSize, (startSize - endSize));
}
}

/**
* Sets the timeout (in seconds) for excess redundancy blocks, if the provided timeout is
* less than or equal to 0, the default value is used (converted to milliseconds).
* @param timeout The time (in seconds) to set as the excess redundancy block timeout.
*/
public void setExcessRedundancyTimeout(long timeout) {
if (timeout <= 0) {
this.excessRedundancyTimeout = DFS_NAMENODE_EXCESS_REDUNDANCY_TIMEOUT_SEC_DEAFULT * 1000L;
} else {
this.excessRedundancyTimeout = timeout * 1000L;
}
}

/**
* Sets the limit number of blocks for checking excess redundancy timeout.
* If the provided limit is less than or equal to 0, the default limit is used.
*
* @param limit The limit number of blocks used to check for excess redundancy timeout.
*/
public void setExcessRedundancyTimeoutCheckLimit(long limit) {
if (excessRedundancyTimeoutCheckLimit <= 0) {
this.excessRedundancyTimeoutCheckLimit =
DFS_NAMENODE_EXCESS_REDUNDANCY_TIMEOUT_CHECK_LIMIT_DEFAULT;
} else {
this.excessRedundancyTimeoutCheckLimit = limit;
}
}

/**
* Process timed-out blocks in the excess redundancy map.
*/
void processTimedOutExcessBlocks() {
if (excessRedundancyMap.size() == 0) {
return;
}
namesystem.writeLock();
long now = Time.monotonicNow();
int processed = 0;
try {
Iterator<Map.Entry<String, LightWeightHashSet<Block>>> iter =
excessRedundancyMap.getExcessRedundancyMap().entrySet().iterator();
while (iter.hasNext() && processed < excessRedundancyTimeoutCheckLimit) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the size of excessRedundancyMap is large and there are few items that have timed out, the lock holding time of this method may be very long. It is recommended to try to avoid this situation, such as increasing the value of variable processed for every block processed, rather than just for blocks that have timed out.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Get it, i will update it later.

Thanks @zhangshuyan0 for your comment.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update PR.
Hi @zhangshuyan0 please help me review it again when you have free time. Thanks~

Map.Entry<String, LightWeightHashSet<Block>> entry = iter.next();
String datanodeUuid = entry.getKey();
LightWeightHashSet<Block> blocks = entry.getValue();
// Sort blocks by timestamp in descending order.
List<ExcessBlockInfo> sortedBlocks = blocks.stream()
.filter(block -> block instanceof ExcessBlockInfo)
.map(block -> (ExcessBlockInfo) block)
.sorted(Comparator.comparingLong(ExcessBlockInfo::getTimeStamp))
.collect(Collectors.toList());

for (ExcessBlockInfo excessBlockInfo : sortedBlocks) {
if (processed >= excessRedundancyTimeoutCheckLimit) {
break;
}

processed++;
// If the datanode doesn't have any excess block that has exceeded the timeout,
// can exit this loop.
if (now <= excessBlockInfo.getTimeStamp() + excessRedundancyTimeout) {
break;
}

BlockInfo blockInfo = excessBlockInfo.getBlockInfo();
BlockInfo bi = blocksMap.getStoredBlock(blockInfo);
if (bi == null || bi.isDeleted()) {
continue;
}

Iterator<DatanodeStorageInfo> iterator = blockInfo.getStorageInfos();
while (iterator.hasNext()) {
DatanodeStorageInfo datanodeStorageInfo = iterator.next();
DatanodeDescriptor datanodeDescriptor = datanodeStorageInfo.getDatanodeDescriptor();
if (datanodeDescriptor.getDatanodeUuid().equals(datanodeUuid) &&
datanodeStorageInfo.getState().equals(State.NORMAL)) {
final Block b = getBlockOnStorage(blockInfo, datanodeStorageInfo);
if (!containsInvalidateBlock(datanodeDescriptor, b)) {
addToInvalidates(b, datanodeDescriptor);
LOG.debug("Excess block timeout ({}, {}) is added to invalidated.",
b, datanodeDescriptor);
}
excessBlockInfo.setTimeStamp();
break;
}
}
}
}
} finally {
namesystem.writeUnlock("processTimedOutExcessBlocks");
LOG.info("processTimedOutExcessBlocks {} msecs.", (Time.monotonicNow() - now));
}
}

Collection<Block> processReport(
final DatanodeStorageInfo storageInfo,
Expand Down Expand Up @@ -5231,6 +5345,7 @@ public void run() {
computeDatanodeWork();
processPendingReconstructions();
rescanPostponedMisreplicatedBlocks();
processTimedOutExcessBlocks();
lastRedundancyCycleTS.set(Time.monotonicNow());
}
TimeUnit.MILLISECONDS.sleep(redundancyRecheckIntervalMs);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,15 @@
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.util.LightWeightHashSet;
import org.slf4j.Logger;

import org.apache.hadoop.classification.VisibleForTesting;

import static org.apache.hadoop.util.Time.monotonicNow;

/**
* Maps a datnode to the set of excess redundancy details.
*
Expand All @@ -35,7 +38,7 @@
class ExcessRedundancyMap {
public static final Logger blockLog = NameNode.blockStateChangeLog;

private final Map<String, LightWeightHashSet<BlockInfo>> map =new HashMap<>();
private final Map<String, LightWeightHashSet<Block>> map = new HashMap<>();
private final AtomicLong size = new AtomicLong(0L);

/**
Expand All @@ -50,7 +53,7 @@ long size() {
*/
@VisibleForTesting
synchronized int getSize4Testing(String dnUuid) {
final LightWeightHashSet<BlockInfo> set = map.get(dnUuid);
final LightWeightHashSet<Block> set = map.get(dnUuid);
return set == null? 0: set.size();
}

Expand All @@ -64,7 +67,7 @@ synchronized void clear() {
* datanode and the given block?
*/
synchronized boolean contains(DatanodeDescriptor dn, BlockInfo blk) {
final LightWeightHashSet<BlockInfo> set = map.get(dn.getDatanodeUuid());
final LightWeightHashSet<Block> set = map.get(dn.getDatanodeUuid());
return set != null && set.contains(blk);
}

Expand All @@ -75,12 +78,12 @@ synchronized boolean contains(DatanodeDescriptor dn, BlockInfo blk) {
* @return true if the block is added.
*/
synchronized boolean add(DatanodeDescriptor dn, BlockInfo blk) {
LightWeightHashSet<BlockInfo> set = map.get(dn.getDatanodeUuid());
LightWeightHashSet<Block> set = map.get(dn.getDatanodeUuid());
if (set == null) {
set = new LightWeightHashSet<>();
map.put(dn.getDatanodeUuid(), set);
}
final boolean added = set.add(blk);
final boolean added = set.add(new ExcessBlockInfo(blk));
if (added) {
size.incrementAndGet();
blockLog.debug("BLOCK* ExcessRedundancyMap.add({}, {})", dn, blk);
Expand All @@ -95,11 +98,10 @@ synchronized boolean add(DatanodeDescriptor dn, BlockInfo blk) {
* @return true if the block is removed.
*/
synchronized boolean remove(DatanodeDescriptor dn, BlockInfo blk) {
final LightWeightHashSet<BlockInfo> set = map.get(dn.getDatanodeUuid());
final LightWeightHashSet<Block> set = map.get(dn.getDatanodeUuid());
if (set == null) {
return false;
}

final boolean removed = set.remove(blk);
if (removed) {
size.decrementAndGet();
Expand All @@ -111,4 +113,45 @@ synchronized boolean remove(DatanodeDescriptor dn, BlockInfo blk) {
}
return removed;
}

synchronized Map<String, LightWeightHashSet<Block>> getExcessRedundancyMap() {
return map;
}

/**
* An object that contains information about a block that is being excess redundancy.
* It records the timestamp when added excess redundancy map of this block.
*/
static class ExcessBlockInfo extends Block {
private long timeStamp;
private final BlockInfo blockInfo;

ExcessBlockInfo(BlockInfo blockInfo) {
super(blockInfo.getBlockId(), blockInfo.getNumBytes(), blockInfo.getGenerationStamp());
this.timeStamp = monotonicNow();
this.blockInfo = blockInfo;
}

public BlockInfo getBlockInfo() {
return blockInfo;
}

long getTimeStamp() {
return timeStamp;
}

void setTimeStamp() {
timeStamp = monotonicNow();
}

@Override
public int hashCode() {
return super.hashCode();
}

@Override
public boolean equals(Object obj) {
return super.equals(obj);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5409,6 +5409,24 @@
</description>
</property>

<property>
<name>dfs.namenode.excess.redundancy.timeout-sec</name>
<value>3600</value>
<description>
Timeout in seconds for excess redundancy block. If this value is 0 or less,
then it will default to 3600 minutes.
</description>
</property>

<property>
<name>dfs.namenode.excess.redundancy.timeout.check.limit</name>
<value>1000</value>
<description>
Limits number of blocks used to check for excess redundancy timeout.
If this value is 0 or less, then it will default to 1000.
</description>
</property>

<property>
<name>dfs.namenode.stale.datanode.minimum.interval</name>
<value>3</value>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2092,6 +2092,25 @@ public FsDatasetTestUtils getFsDatasetTestUtils(DataNode dn) {
.newInstance(dn);
}

/**
* Wait for the datanodes in the cluster to process any block
* deletions that have already been asynchronously queued.
*/
public void waitForDNDeletions()
throws TimeoutException, InterruptedException {
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
for (DataNode dn : getDataNodes()) {
if (getFsDatasetTestUtils(dn).getPendingAsyncDeletions() > 0) {
return false;
}
}
return true;
}
}, 1000, 10000);
}

/**
* Gets the rpc port used by the NameNode, because the caller
* supplied port is not necessarily the actual port used.
Expand Down
Loading