From f79f1d2696bc85e863fc0e3c1366abf739411259 Mon Sep 17 00:00:00 2001 From: "Haiyang.Hu" Date: Sat, 28 Oct 2023 19:19:32 +0800 Subject: [PATCH 1/7] HDFS-17218. NameNode should process time out excess redundancy blocks --- .../org/apache/hadoop/hdfs/DFSConfigKeys.java | 7 + .../server/blockmanagement/BlockManager.java | 111 +++++++++++++++ .../blockmanagement/ExcessRedundancyMap.java | 70 ++++++++-- .../src/main/resources/hdfs-default.xml | 18 +++ .../apache/hadoop/hdfs/MiniDFSCluster.java | 19 +++ .../blockmanagement/TestBlockManager.java | 128 ++++++++++++++++++ 6 files changed, 345 insertions(+), 8 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index 88a18d9cf0763..2d56c1a3acf50 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -315,6 +315,13 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final int DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_DEFAULT = 300; + public static final String DFS_NAMENODE_EXCESS_REDUNDANCY_TIMEOUT_SEC_KEY = + "dfs.namenode.excess.redundancy.timeout-sec"; + public static final long DFS_NAMENODE_EXCESS_REDUNDANCY_TIMEOUT_SEC = 3600; + public static final String DFS_NAMENODE_EXCESS_REDUNDANCY_TIMEOUT_CHECK_LIMIT + = "dfs.namenode.excess.redundancy.timeout.check.limit"; + public static final long DFS_NAMENODE_EXCESS_REDUNDANCY_TIMEOUT_CHECK_LIMIT_DEFAULT = 1000; + public static final String DFS_NAMENODE_MAINTENANCE_REPLICATION_MIN_KEY = "dfs.namenode.maintenance.replication.min"; public static final int DFS_NAMENODE_MAINTENANCE_REPLICATION_MIN_DEFAULT diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index 2351bb4782873..dc34befb313f4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -86,6 +86,7 @@ import org.apache.hadoop.hdfs.server.blockmanagement.NumberReplicas.StoredReplicaState; import org.apache.hadoop.hdfs.server.blockmanagement.PendingDataNodeMessages.ReportedBlockInfo; import org.apache.hadoop.hdfs.server.blockmanagement.PendingReconstructionBlocks.PendingBlockInfo; +import org.apache.hadoop.hdfs.server.blockmanagement.ExcessRedundancyMap.ExcessBlockInfo; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; import org.apache.hadoop.hdfs.server.namenode.CachedBlock; @@ -116,6 +117,7 @@ import static org.apache.hadoop.hdfs.util.StripedBlockUtil.getInternalBlockLength; +import org.apache.hadoop.hdfs.util.LightWeightHashSet; import org.apache.hadoop.metrics2.util.MBeans; import org.apache.hadoop.net.Node; import org.apache.hadoop.security.UserGroupInformation; @@ -482,6 +484,16 @@ public int getPendingSPSPaths() { /** Storages accessible from multiple DNs. */ private final ProvidedStorageMap providedStorageMap; + /** + * Timeout for excess redundancy block. + */ + private long excessRedundancyTimeout; + + /** + * Limits number of blocks used to check for excess redundancy timeout. + */ + private long excessRedundancyTimeoutCheckLimit; + public BlockManager(final Namesystem namesystem, boolean haEnabled, final Configuration conf) throws IOException { this.namesystem = namesystem; @@ -589,6 +601,12 @@ public BlockManager(final Namesystem namesystem, boolean haEnabled, conf.getBoolean(DFS_NAMENODE_CORRUPT_BLOCK_DELETE_IMMEDIATELY_ENABLED, DFS_NAMENODE_CORRUPT_BLOCK_DELETE_IMMEDIATELY_ENABLED_DEFAULT); + setExcessRedundancyTimeout(conf.getLong(DFS_NAMENODE_EXCESS_REDUNDANCY_TIMEOUT_SEC_KEY, + DFS_NAMENODE_EXCESS_REDUNDANCY_TIMEOUT_SEC)); + setExcessRedundancyTimeoutCheckLimit(conf.getLong( + DFS_NAMENODE_EXCESS_REDUNDANCY_TIMEOUT_CHECK_LIMIT, + DFS_NAMENODE_EXCESS_REDUNDANCY_TIMEOUT_CHECK_LIMIT_DEFAULT)); + printInitialConfigs(); } @@ -3040,6 +3058,98 @@ void rescanPostponedMisreplicatedBlocks() { (Time.monotonicNow() - startTime), endSize, (startSize - endSize)); } } + + /** + * Sets the timeout (in seconds) for excess redundancy blocks, if the provided timeout is + * less than or equal to 0, the default value is used (converted to milliseconds). + * @param timeOut The time (in seconds) to set as the excess redundancy block timeout. + */ + public void setExcessRedundancyTimeout(long timeOut) { + if (timeOut <= 0) { + this.excessRedundancyTimeout = DFS_NAMENODE_EXCESS_REDUNDANCY_TIMEOUT_SEC * 1000L; + } else { + this.excessRedundancyTimeout = timeOut * 1000L; + } + } + + /** + * Sets the limit number of blocks for checking excess redundancy timeout. + * If the provided limit is less than or equal to 0, the default limit is used. + * + * @param limit The limit number of blocks used to check for excess redundancy timeout. + */ + public void setExcessRedundancyTimeoutCheckLimit(long limit) { + if (excessRedundancyTimeoutCheckLimit <= 0) { + this.excessRedundancyTimeoutCheckLimit = + DFS_NAMENODE_EXCESS_REDUNDANCY_TIMEOUT_CHECK_LIMIT_DEFAULT; + } else { + this.excessRedundancyTimeoutCheckLimit = limit; + } + } + + /** + * Process timed-out blocks in the excess redundancy map. + */ + void processTimedOutExcessBlocks() { + if (excessRedundancyMap.size() == 0) { + return; + } + namesystem.writeLock(); + long now = Time.monotonicNow(); + int processed = 0; + try { + Iterator>> iter = + excessRedundancyMap.getExcessRedundancyMap().entrySet().iterator(); + while (iter.hasNext() && processed < excessRedundancyTimeoutCheckLimit) { + Map.Entry> entry = iter.next(); + String datanodeUuid = entry.getKey(); + LightWeightHashSet blocks = entry.getValue(); + List sortedBlocks = new ArrayList<>(blocks); + // Sort blocks by timestamp in descending order. + Collections.sort(sortedBlocks); + + for (ExcessBlockInfo excessBlockInfo : sortedBlocks) { + if (processed >= excessRedundancyTimeoutCheckLimit) { + break; + } + BlockInfo blockInfo = excessBlockInfo.getBlockInfo(); + BlockInfo bi = blocksMap.getStoredBlock(blockInfo); + if (bi == null || bi.isDeleted()) { + continue; + } + + // If the datanode doesn't have any excess block that has exceeded the timeout, + // can exit this loop. + if (now <= excessBlockInfo.getTimeStamp() + excessRedundancyTimeout) { + break; + } + + Iterator iterator = blockInfo.getStorageInfos(); + while (iterator.hasNext()) { + DatanodeStorageInfo datanodeStorageInfo = iterator.next(); + DatanodeDescriptor datanodeDescriptor = datanodeStorageInfo.getDatanodeDescriptor(); + if (datanodeDescriptor.getDatanodeUuid().equals(datanodeUuid)) { + if (datanodeStorageInfo.getState().equals(State.NORMAL)) { + final Block block = getBlockOnStorage(blockInfo, + datanodeStorageInfo); + if (!containsInvalidateBlock(datanodeDescriptor, block)) { + addToInvalidates(block, datanodeDescriptor); + LOG.debug("Excess block timeout ({}, {}) is added to invalidated.", + block, datanodeDescriptor); + } + excessBlockInfo.setTimeStamp(); + processed ++; + break; + } + } + } + } + } + } finally { + namesystem.writeUnlock("processTimedOutExcessBlocks"); + LOG.info("processTimedOutExcessBlocks {} msecs.", (Time.monotonicNow() - now)); + } + } Collection processReport( final DatanodeStorageInfo storageInfo, @@ -5231,6 +5341,7 @@ public void run() { computeDatanodeWork(); processPendingReconstructions(); rescanPostponedMisreplicatedBlocks(); + processTimedOutExcessBlocks(); lastRedundancyCycleTS.set(Time.monotonicNow()); } TimeUnit.MILLISECONDS.sleep(redundancyRecheckIntervalMs); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ExcessRedundancyMap.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ExcessRedundancyMap.java index 225a962692c51..7583700d1d19f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ExcessRedundancyMap.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ExcessRedundancyMap.java @@ -27,6 +27,8 @@ import org.apache.hadoop.classification.VisibleForTesting; +import static org.apache.hadoop.util.Time.monotonicNow; + /** * Maps a datnode to the set of excess redundancy details. * @@ -35,7 +37,7 @@ class ExcessRedundancyMap { public static final Logger blockLog = NameNode.blockStateChangeLog; - private final Map> map =new HashMap<>(); + private final Map> map = new HashMap<>(); private final AtomicLong size = new AtomicLong(0L); /** @@ -50,7 +52,7 @@ long size() { */ @VisibleForTesting synchronized int getSize4Testing(String dnUuid) { - final LightWeightHashSet set = map.get(dnUuid); + final LightWeightHashSet set = map.get(dnUuid); return set == null? 0: set.size(); } @@ -64,8 +66,8 @@ synchronized void clear() { * datanode and the given block? */ synchronized boolean contains(DatanodeDescriptor dn, BlockInfo blk) { - final LightWeightHashSet set = map.get(dn.getDatanodeUuid()); - return set != null && set.contains(blk); + final LightWeightHashSet set = map.get(dn.getDatanodeUuid()); + return set != null && set.contains(new ExcessBlockInfo(blk)); } /** @@ -75,12 +77,12 @@ synchronized boolean contains(DatanodeDescriptor dn, BlockInfo blk) { * @return true if the block is added. */ synchronized boolean add(DatanodeDescriptor dn, BlockInfo blk) { - LightWeightHashSet set = map.get(dn.getDatanodeUuid()); + LightWeightHashSet set = map.get(dn.getDatanodeUuid()); if (set == null) { set = new LightWeightHashSet<>(); map.put(dn.getDatanodeUuid(), set); } - final boolean added = set.add(blk); + final boolean added = set.add(new ExcessBlockInfo(blk)); if (added) { size.incrementAndGet(); blockLog.debug("BLOCK* ExcessRedundancyMap.add({}, {})", dn, blk); @@ -95,12 +97,12 @@ synchronized boolean add(DatanodeDescriptor dn, BlockInfo blk) { * @return true if the block is removed. */ synchronized boolean remove(DatanodeDescriptor dn, BlockInfo blk) { - final LightWeightHashSet set = map.get(dn.getDatanodeUuid()); + final LightWeightHashSet set = map.get(dn.getDatanodeUuid()); if (set == null) { return false; } - final boolean removed = set.remove(blk); + final boolean removed = set.remove(new ExcessBlockInfo(blk)); if (removed) { size.decrementAndGet(); blockLog.debug("BLOCK* ExcessRedundancyMap.remove({}, {})", dn, blk); @@ -111,4 +113,56 @@ synchronized boolean remove(DatanodeDescriptor dn, BlockInfo blk) { } return removed; } + + synchronized Map> getExcessRedundancyMap() { + return map; + } + + /** + * An object that contains information about a block that is being excess redundancy. + * It records the timestamp when added excess redundancy map of this block. + */ + static class ExcessBlockInfo implements Comparable { + private long timeStamp; + private BlockInfo blockInfo; + + ExcessBlockInfo(BlockInfo blockInfo) { + this.timeStamp = monotonicNow(); + this.blockInfo = blockInfo; + } + + public BlockInfo getBlockInfo() { + return blockInfo; + } + + long getTimeStamp() { + return timeStamp; + } + + void setTimeStamp() { + timeStamp = monotonicNow(); + } + + @Override + public int hashCode() { + return blockInfo.hashCode(); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (!(obj instanceof ExcessBlockInfo)) { + return false; + } + ExcessBlockInfo other = (ExcessBlockInfo) obj; + return (this.blockInfo.equals(other.blockInfo)); + } + + @Override + public int compareTo(ExcessBlockInfo o) { + return Long.compare(o.timeStamp, this.timeStamp); + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index 52075a24f1e32..89fbddc37fa95 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -5409,6 +5409,24 @@ + + dfs.namenode.excess.redundancy.timeout-sec + 3600 + + Timeout in seconds for excess redundancy block. If this value is 0 or less, + then it will default to 3600 minutes. + + + + + dfs.namenode.excess.redundancy.timeout.check.limit + 1000 + + Limits number of blocks used to check for excess redundancy timeout. + If this value is 0 or less, then it will default to 1000. + + + dfs.namenode.stale.datanode.minimum.interval 3 diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java index ca8ae04bbf753..6c20c141a15e7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java @@ -2092,6 +2092,25 @@ public FsDatasetTestUtils getFsDatasetTestUtils(DataNode dn) { .newInstance(dn); } + /** + * Wait for the datanodes in the cluster to process any block + * deletions that have already been asynchronously queued. + */ + public void waitForDNDeletions() + throws TimeoutException, InterruptedException { + GenericTestUtils.waitFor(new Supplier() { + @Override + public Boolean get() { + for (DataNode dn : getDataNodes()) { + if (getFsDatasetTestUtils(dn).getPendingAsyncDeletions() > 0) { + return false; + } + } + return true; + } + }, 1000, 10000); + } + /** * Gets the rpc port used by the NameNode, because the caller * supplied port is not necessarily the actual port used. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java index 3e00491e99335..78971cefbbd85 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java @@ -20,6 +20,7 @@ import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector; import org.apache.hadoop.thirdparty.com.google.common.base.Joiner; import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableList; import org.apache.hadoop.thirdparty.com.google.common.collect.LinkedListMultimap; @@ -112,6 +113,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.FutureTask; +import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; @@ -2201,4 +2203,130 @@ public void testBlockReportSetNoAckBlockToInvalidate() throws Exception { assertEquals(1, getLongCounter("IncrementalBlockReportsNumOps", rb)); } } + + /** + * Test NameNode should process time out excess redundancy blocks. + * @throws IOException + * @throws InterruptedException + * @throws TimeoutException + */ + @Test(timeout = 360000) + public void testProcessTimedOutExcessBlocks() throws IOException, + InterruptedException, TimeoutException { + Configuration config = new HdfsConfiguration(); + // Bump up replication interval. + config.setInt(DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY, 10000); + // Set the excess redundancy block timeout. + long timeOut = 60L; + config.setLong(DFSConfigKeys.DFS_NAMENODE_EXCESS_REDUNDANCY_TIMEOUT_SEC_KEY, timeOut); + + DataNodeFaultInjector oldInjector = DataNodeFaultInjector.get(); + + final Semaphore semaphore = new Semaphore(0); + try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(config).numDataNodes(3).build()) { + DistributedFileSystem fs = cluster.getFileSystem(); + BlockManager blockManager = cluster.getNameNode().getNamesystem().getBlockManager(); + cluster.waitActive(); + + final DataNodeFaultInjector injector = new DataNodeFaultInjector() { + @Override + public void delayDeleteReplica() { + // Lets wait for the remove replica process. + try { + semaphore.acquire(1); + } catch (InterruptedException e) { + // ignore. + } + } + }; + DataNodeFaultInjector.set(injector); + + // Create file. + Path path = new Path("/testfile"); + DFSTestUtil.createFile(fs, path, 1024, (short) 3, 0); + DFSTestUtil.waitReplication(fs, path, (short) 3); + LocatedBlock lb = DFSTestUtil.getAllBlocks(fs, path).get(0); + ExtendedBlock extendedBlock = lb.getBlock(); + DatanodeInfo[] loc = lb.getLocations(); + assertEquals(3, loc.length); + + // Set replication as 2, to choose excess. + fs.setReplication(path, (short) 2); + + // Check excessRedundancyMap and invalidateBlocks size as 1. + assertEquals(1, blockManager.getExcessBlocksCount()); + assertEquals(1, blockManager.getPendingDeletionBlocksCount()); + DataNode excessDn = Arrays.stream(loc). + filter(datanodeInfo -> blockManager.getExcessSize4Testing( + datanodeInfo.getDatanodeUuid()) > 0) + .map(datanodeInfo -> cluster.getDataNode(datanodeInfo.getIpcPort())) + .findFirst() + .orElse(null); + + // Schedule blocks for deletion at excessDn. + assertEquals(1, blockManager.computeInvalidateWork(1)); + // Check excessRedundancyMap size as 1. + assertEquals(1, blockManager.getExcessBlocksCount()); + // Check invalidateBlocks size as 0. + assertEquals(0, blockManager.getPendingDeletionBlocksCount()); + assertNotNull(excessDn); + + // Name node will ask datanode to delete replicas in heartbeat response. + cluster.triggerHeartbeats(); + + // Wait for the datanode to process any block deletions + // that have already been asynchronously queued. + DataNode finalExcessDn = excessDn; + GenericTestUtils.waitFor( + () -> cluster.getFsDatasetTestUtils(finalExcessDn).getPendingAsyncDeletions() == 1, + 100, 1000); + + // Restart the datanode. + int ipcPort = excessDn.getDatanodeId().getIpcPort(); + MiniDFSCluster.DataNodeProperties dataNodeProperties = cluster.stopDataNode( + excessDn.getDatanodeId().getXferAddr()); + assertTrue(cluster.restartDataNode(dataNodeProperties, true)); + semaphore.release(1); + cluster.waitActive(); + + // Check replica is exists in excessDn. + excessDn = cluster.getDataNode(ipcPort); + assertNotNull(cluster.getFsDatasetTestUtils(excessDn).fetchReplica(extendedBlock)); + assertEquals(0, cluster.getFsDatasetTestUtils(excessDn).getPendingAsyncDeletions()); + + // Verify excess redundancy blocks have not timed out. + blockManager.processTimedOutExcessBlocks(); + assertEquals(0, blockManager.getPendingDeletionBlocksCount()); + + // Verify excess redundancy block time out. + Thread.sleep(timeOut * 1000); + blockManager.processTimedOutExcessBlocks(); + + // Check excessRedundancyMap and invalidateBlocks size as 1. + assertEquals(1, blockManager.getExcessSize4Testing(excessDn.getDatanodeUuid())); + assertEquals(1, blockManager.getExcessBlocksCount()); + assertEquals(1, blockManager.getPendingDeletionBlocksCount()); + + // Schedule blocks for deletion. + assertEquals(1, blockManager.computeInvalidateWork(1)); + + cluster.triggerHeartbeats(); + + // Make it resume the removeReplicaFromMem method. + semaphore.release(1); + + // Wait for the datanode in the cluster to process any block + // deletions that have already been asynchronously queued + cluster.waitForDNDeletions(); + + // Trigger immediate deletion report. + cluster.triggerDeletionReports(); + + // The replica num should be 2. + assertEquals(2, DFSTestUtil.getAllBlocks(fs, path).get(0).getLocations().length); + assertEquals(0, blockManager.getExcessBlocksCount()); + } finally { + DataNodeFaultInjector.set(oldInjector); + } + } } \ No newline at end of file From 7fa80cc5de2c2b929d1c950e397f6a4d12f0449d Mon Sep 17 00:00:00 2001 From: "Haiyang.Hu" Date: Sun, 29 Oct 2023 10:08:04 +0800 Subject: [PATCH 2/7] HDFS-17218. Fix checkstytle --- .../apache/hadoop/hdfs/server/blockmanagement/BlockManager.java | 2 +- .../hadoop/hdfs/server/blockmanagement/ExcessRedundancyMap.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index dc34befb313f4..3f33beccb869f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -3138,7 +3138,7 @@ void processTimedOutExcessBlocks() { block, datanodeDescriptor); } excessBlockInfo.setTimeStamp(); - processed ++; + processed++; break; } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ExcessRedundancyMap.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ExcessRedundancyMap.java index 7583700d1d19f..0dc3caeedc240 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ExcessRedundancyMap.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ExcessRedundancyMap.java @@ -162,7 +162,7 @@ public boolean equals(Object obj) { @Override public int compareTo(ExcessBlockInfo o) { - return Long.compare(o.timeStamp, this.timeStamp); + return Long.compare(this.timeStamp, o.timeStamp); } } } From cafe3c666a53c04f84a6eccaa4d3a2d0423987db Mon Sep 17 00:00:00 2001 From: "Haiyang.Hu" Date: Tue, 7 Nov 2023 13:11:27 +0800 Subject: [PATCH 3/7] HDFS-17218. Modify patch based on comments --- .../hdfs/server/blockmanagement/BlockManager.java | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index 3f33beccb869f..058286e8432d4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -3112,18 +3112,20 @@ void processTimedOutExcessBlocks() { if (processed >= excessRedundancyTimeoutCheckLimit) { break; } - BlockInfo blockInfo = excessBlockInfo.getBlockInfo(); - BlockInfo bi = blocksMap.getStoredBlock(blockInfo); - if (bi == null || bi.isDeleted()) { - continue; - } + processed++; // If the datanode doesn't have any excess block that has exceeded the timeout, // can exit this loop. if (now <= excessBlockInfo.getTimeStamp() + excessRedundancyTimeout) { break; } + BlockInfo blockInfo = excessBlockInfo.getBlockInfo(); + BlockInfo bi = blocksMap.getStoredBlock(blockInfo); + if (bi == null || bi.isDeleted()) { + continue; + } + Iterator iterator = blockInfo.getStorageInfos(); while (iterator.hasNext()) { DatanodeStorageInfo datanodeStorageInfo = iterator.next(); @@ -3138,7 +3140,6 @@ void processTimedOutExcessBlocks() { block, datanodeDescriptor); } excessBlockInfo.setTimeStamp(); - processed++; break; } } From 77c13421c0fd86b3007c6a1dc828862f090b3acf Mon Sep 17 00:00:00 2001 From: "Haiyang.Hu" Date: Tue, 7 Nov 2023 20:12:36 +0800 Subject: [PATCH 4/7] HDFS-17218. Modify patch based on comments --- .../server/blockmanagement/BlockManager.java | 22 +++++++++---------- .../blockmanagement/ExcessRedundancyMap.java | 17 +++++++++----- .../blockmanagement/TestBlockManager.java | 2 +- 3 files changed, 22 insertions(+), 19 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index 058286e8432d4..a06dc4dd02d26 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -3104,7 +3104,7 @@ void processTimedOutExcessBlocks() { Map.Entry> entry = iter.next(); String datanodeUuid = entry.getKey(); LightWeightHashSet blocks = entry.getValue(); - List sortedBlocks = new ArrayList<>(blocks); + List sortedBlocks = new ArrayList<>(blocks); // Sort blocks by timestamp in descending order. Collections.sort(sortedBlocks); @@ -3130,18 +3130,16 @@ void processTimedOutExcessBlocks() { while (iterator.hasNext()) { DatanodeStorageInfo datanodeStorageInfo = iterator.next(); DatanodeDescriptor datanodeDescriptor = datanodeStorageInfo.getDatanodeDescriptor(); - if (datanodeDescriptor.getDatanodeUuid().equals(datanodeUuid)) { - if (datanodeStorageInfo.getState().equals(State.NORMAL)) { - final Block block = getBlockOnStorage(blockInfo, - datanodeStorageInfo); - if (!containsInvalidateBlock(datanodeDescriptor, block)) { - addToInvalidates(block, datanodeDescriptor); - LOG.debug("Excess block timeout ({}, {}) is added to invalidated.", - block, datanodeDescriptor); - } - excessBlockInfo.setTimeStamp(); - break; + if (datanodeDescriptor.getDatanodeUuid().equals(datanodeUuid) && + datanodeStorageInfo.getState().equals(State.NORMAL)) { + final Block block = getBlockOnStorage(blockInfo, datanodeStorageInfo); + if (!containsInvalidateBlock(datanodeDescriptor, block)) { + addToInvalidates(block, datanodeDescriptor); + LOG.debug("Excess block timeout ({}, {}) is added to invalidated.", + block, datanodeDescriptor); } + excessBlockInfo.setTimeStamp(); + break; } } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ExcessRedundancyMap.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ExcessRedundancyMap.java index 0dc3caeedc240..e11a112173e0c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ExcessRedundancyMap.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ExcessRedundancyMap.java @@ -67,7 +67,7 @@ synchronized void clear() { */ synchronized boolean contains(DatanodeDescriptor dn, BlockInfo blk) { final LightWeightHashSet set = map.get(dn.getDatanodeUuid()); - return set != null && set.contains(new ExcessBlockInfo(blk)); + return set != null && set.contains(blk); } /** @@ -102,7 +102,7 @@ synchronized boolean remove(DatanodeDescriptor dn, BlockInfo blk) { return false; } - final boolean removed = set.remove(new ExcessBlockInfo(blk)); + final boolean removed = set.remove(blk); if (removed) { size.decrementAndGet(); blockLog.debug("BLOCK* ExcessRedundancyMap.remove({}, {})", dn, blk); @@ -153,11 +153,16 @@ public boolean equals(Object obj) { if (this == obj) { return true; } - if (!(obj instanceof ExcessBlockInfo)) { - return false; + + if (obj instanceof ExcessBlockInfo) { + ExcessBlockInfo other = (ExcessBlockInfo) obj; + return this.blockInfo.equals(other.blockInfo); + } + + if (obj instanceof BlockInfo) { + return this.blockInfo.equals(obj); } - ExcessBlockInfo other = (ExcessBlockInfo) obj; - return (this.blockInfo.equals(other.blockInfo)); + return false; } @Override diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java index 78971cefbbd85..8ce69a45ebb77 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java @@ -2271,7 +2271,7 @@ public void delayDeleteReplica() { assertEquals(0, blockManager.getPendingDeletionBlocksCount()); assertNotNull(excessDn); - // Name node will ask datanode to delete replicas in heartbeat response. + // NameNode will ask datanode to delete replicas in heartbeat response. cluster.triggerHeartbeats(); // Wait for the datanode to process any block deletions From 59e36ccd62f34783f9141ddd343aef6b774272de Mon Sep 17 00:00:00 2001 From: "Haiyang.Hu" Date: Sat, 11 Nov 2023 17:43:16 +0800 Subject: [PATCH 5/7] HDFS-17218. Modify patch based on comments --- .../server/blockmanagement/BlockManager.java | 23 ++++++----- .../blockmanagement/ExcessRedundancyMap.java | 40 ++++++------------- 2 files changed, 26 insertions(+), 37 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index a06dc4dd02d26..aaf9beded23d5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -30,6 +30,7 @@ import java.util.BitSet; import java.util.Collection; import java.util.Collections; +import java.util.Comparator; import java.util.EnumSet; import java.util.HashMap; import java.util.HashSet; @@ -50,6 +51,7 @@ import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; import javax.management.ObjectName; import org.apache.hadoop.HadoopIllegalArgumentException; @@ -3098,15 +3100,18 @@ void processTimedOutExcessBlocks() { long now = Time.monotonicNow(); int processed = 0; try { - Iterator>> iter = + Iterator>> iter = excessRedundancyMap.getExcessRedundancyMap().entrySet().iterator(); while (iter.hasNext() && processed < excessRedundancyTimeoutCheckLimit) { - Map.Entry> entry = iter.next(); + Map.Entry> entry = iter.next(); String datanodeUuid = entry.getKey(); - LightWeightHashSet blocks = entry.getValue(); - List sortedBlocks = new ArrayList<>(blocks); + LightWeightHashSet blocks = entry.getValue(); // Sort blocks by timestamp in descending order. - Collections.sort(sortedBlocks); + List sortedBlocks = blocks.stream() + .filter(block -> block instanceof ExcessBlockInfo) + .map(block -> (ExcessBlockInfo) block) + .sorted(Comparator.comparingLong(ExcessBlockInfo::getTimeStamp)) + .collect(Collectors.toList()); for (ExcessBlockInfo excessBlockInfo : sortedBlocks) { if (processed >= excessRedundancyTimeoutCheckLimit) { @@ -3132,11 +3137,11 @@ void processTimedOutExcessBlocks() { DatanodeDescriptor datanodeDescriptor = datanodeStorageInfo.getDatanodeDescriptor(); if (datanodeDescriptor.getDatanodeUuid().equals(datanodeUuid) && datanodeStorageInfo.getState().equals(State.NORMAL)) { - final Block block = getBlockOnStorage(blockInfo, datanodeStorageInfo); - if (!containsInvalidateBlock(datanodeDescriptor, block)) { - addToInvalidates(block, datanodeDescriptor); + final Block b = getBlockOnStorage(blockInfo, datanodeStorageInfo); + if (!containsInvalidateBlock(datanodeDescriptor, b)) { + addToInvalidates(b, datanodeDescriptor); LOG.debug("Excess block timeout ({}, {}) is added to invalidated.", - block, datanodeDescriptor); + b, datanodeDescriptor); } excessBlockInfo.setTimeStamp(); break; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ExcessRedundancyMap.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ExcessRedundancyMap.java index e11a112173e0c..92c3c7952410b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ExcessRedundancyMap.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ExcessRedundancyMap.java @@ -21,6 +21,7 @@ import java.util.Map; import java.util.concurrent.atomic.AtomicLong; +import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.util.LightWeightHashSet; import org.slf4j.Logger; @@ -37,7 +38,7 @@ class ExcessRedundancyMap { public static final Logger blockLog = NameNode.blockStateChangeLog; - private final Map> map = new HashMap<>(); + private final Map> map = new HashMap<>(); private final AtomicLong size = new AtomicLong(0L); /** @@ -52,7 +53,7 @@ long size() { */ @VisibleForTesting synchronized int getSize4Testing(String dnUuid) { - final LightWeightHashSet set = map.get(dnUuid); + final LightWeightHashSet set = map.get(dnUuid); return set == null? 0: set.size(); } @@ -66,7 +67,7 @@ synchronized void clear() { * datanode and the given block? */ synchronized boolean contains(DatanodeDescriptor dn, BlockInfo blk) { - final LightWeightHashSet set = map.get(dn.getDatanodeUuid()); + final LightWeightHashSet set = map.get(dn.getDatanodeUuid()); return set != null && set.contains(blk); } @@ -77,7 +78,7 @@ synchronized boolean contains(DatanodeDescriptor dn, BlockInfo blk) { * @return true if the block is added. */ synchronized boolean add(DatanodeDescriptor dn, BlockInfo blk) { - LightWeightHashSet set = map.get(dn.getDatanodeUuid()); + LightWeightHashSet set = map.get(dn.getDatanodeUuid()); if (set == null) { set = new LightWeightHashSet<>(); map.put(dn.getDatanodeUuid(), set); @@ -97,11 +98,10 @@ synchronized boolean add(DatanodeDescriptor dn, BlockInfo blk) { * @return true if the block is removed. */ synchronized boolean remove(DatanodeDescriptor dn, BlockInfo blk) { - final LightWeightHashSet set = map.get(dn.getDatanodeUuid()); + final LightWeightHashSet set = map.get(dn.getDatanodeUuid()); if (set == null) { return false; } - final boolean removed = set.remove(blk); if (removed) { size.decrementAndGet(); @@ -114,7 +114,7 @@ synchronized boolean remove(DatanodeDescriptor dn, BlockInfo blk) { return removed; } - synchronized Map> getExcessRedundancyMap() { + synchronized Map> getExcessRedundancyMap() { return map; } @@ -122,11 +122,12 @@ synchronized Map> getExcessRedundanc * An object that contains information about a block that is being excess redundancy. * It records the timestamp when added excess redundancy map of this block. */ - static class ExcessBlockInfo implements Comparable { + static class ExcessBlockInfo extends Block { private long timeStamp; - private BlockInfo blockInfo; + private final BlockInfo blockInfo; ExcessBlockInfo(BlockInfo blockInfo) { + super(blockInfo.getBlockId(), blockInfo.getNumBytes(), blockInfo.getGenerationStamp()); this.timeStamp = monotonicNow(); this.blockInfo = blockInfo; } @@ -145,29 +146,12 @@ void setTimeStamp() { @Override public int hashCode() { - return blockInfo.hashCode(); + return super.hashCode(); } @Override public boolean equals(Object obj) { - if (this == obj) { - return true; - } - - if (obj instanceof ExcessBlockInfo) { - ExcessBlockInfo other = (ExcessBlockInfo) obj; - return this.blockInfo.equals(other.blockInfo); - } - - if (obj instanceof BlockInfo) { - return this.blockInfo.equals(obj); - } - return false; - } - - @Override - public int compareTo(ExcessBlockInfo o) { - return Long.compare(this.timeStamp, o.timeStamp); + return super.equals(obj); } } } From 09eed20fc0df5c71731d279a2b516a4b8db4e6b6 Mon Sep 17 00:00:00 2001 From: "Haiyang.Hu" Date: Thu, 16 Nov 2023 13:08:33 +0800 Subject: [PATCH 6/7] HDFS-17218. Modify patch based on comments --- .../hadoop/hdfs/server/blockmanagement/BlockManager.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index aaf9beded23d5..37dc45d31ba8f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -3064,13 +3064,13 @@ void rescanPostponedMisreplicatedBlocks() { /** * Sets the timeout (in seconds) for excess redundancy blocks, if the provided timeout is * less than or equal to 0, the default value is used (converted to milliseconds). - * @param timeOut The time (in seconds) to set as the excess redundancy block timeout. + * @param timeout The time (in seconds) to set as the excess redundancy block timeout. */ - public void setExcessRedundancyTimeout(long timeOut) { - if (timeOut <= 0) { + public void setExcessRedundancyTimeout(long timeout) { + if (timeout <= 0) { this.excessRedundancyTimeout = DFS_NAMENODE_EXCESS_REDUNDANCY_TIMEOUT_SEC * 1000L; } else { - this.excessRedundancyTimeout = timeOut * 1000L; + this.excessRedundancyTimeout = timeout * 1000L; } } From b7f9b236cd9e38657ac2b64d215ec688320c8f6b Mon Sep 17 00:00:00 2001 From: "Haiyang.Hu" Date: Mon, 20 Nov 2023 13:05:59 +0800 Subject: [PATCH 7/7] HDFS-17218. Modify patch based on comments --- .../src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java | 2 +- .../hadoop/hdfs/server/blockmanagement/BlockManager.java | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index 2d56c1a3acf50..f92a2ad56581b 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -317,7 +317,7 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final String DFS_NAMENODE_EXCESS_REDUNDANCY_TIMEOUT_SEC_KEY = "dfs.namenode.excess.redundancy.timeout-sec"; - public static final long DFS_NAMENODE_EXCESS_REDUNDANCY_TIMEOUT_SEC = 3600; + public static final long DFS_NAMENODE_EXCESS_REDUNDANCY_TIMEOUT_SEC_DEAFULT = 3600; public static final String DFS_NAMENODE_EXCESS_REDUNDANCY_TIMEOUT_CHECK_LIMIT = "dfs.namenode.excess.redundancy.timeout.check.limit"; public static final long DFS_NAMENODE_EXCESS_REDUNDANCY_TIMEOUT_CHECK_LIMIT_DEFAULT = 1000; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index 37dc45d31ba8f..ac45b082dfeb2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -604,7 +604,7 @@ public BlockManager(final Namesystem namesystem, boolean haEnabled, DFS_NAMENODE_CORRUPT_BLOCK_DELETE_IMMEDIATELY_ENABLED_DEFAULT); setExcessRedundancyTimeout(conf.getLong(DFS_NAMENODE_EXCESS_REDUNDANCY_TIMEOUT_SEC_KEY, - DFS_NAMENODE_EXCESS_REDUNDANCY_TIMEOUT_SEC)); + DFS_NAMENODE_EXCESS_REDUNDANCY_TIMEOUT_SEC_DEAFULT)); setExcessRedundancyTimeoutCheckLimit(conf.getLong( DFS_NAMENODE_EXCESS_REDUNDANCY_TIMEOUT_CHECK_LIMIT, DFS_NAMENODE_EXCESS_REDUNDANCY_TIMEOUT_CHECK_LIMIT_DEFAULT)); @@ -3068,7 +3068,7 @@ void rescanPostponedMisreplicatedBlocks() { */ public void setExcessRedundancyTimeout(long timeout) { if (timeout <= 0) { - this.excessRedundancyTimeout = DFS_NAMENODE_EXCESS_REDUNDANCY_TIMEOUT_SEC * 1000L; + this.excessRedundancyTimeout = DFS_NAMENODE_EXCESS_REDUNDANCY_TIMEOUT_SEC_DEAFULT * 1000L; } else { this.excessRedundancyTimeout = timeout * 1000L; }