Skip to content

Commit 0f0de4f

Browse files
committed
HDFS-17561. Make HeartbeatManager.Monitor use up-to-date heartbeatRecheckInterval
1 parent 4ddf19c commit 0f0de4f

File tree

3 files changed

+42
-42
lines changed

3 files changed

+42
-42
lines changed

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,8 @@ public class DatanodeManager {
9090

9191
private volatile long heartbeatIntervalSeconds;
9292
private volatile int heartbeatRecheckInterval;
93+
/** Used by {@link HeartbeatManager}. */
94+
private volatile long heartbeatRecheckIntervalForMonitor;
9395
/**
9496
* Stores the datanode -> block map.
9597
* <p>
@@ -346,6 +348,7 @@ public class DatanodeManager {
346348
DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_KEY,
347349
DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_DEFAULT);
348350
this.staleInterval = getStaleIntervalFromConf(conf, heartbeatExpireInterval);
351+
refreshHeartbeatRecheckIntervalForMonitor();
349352
this.ratioUseStaleDataNodesForWrite = conf.getFloat(
350353
DFSConfigKeys.DFS_NAMENODE_USE_STALE_DATANODE_FOR_WRITE_RATIO_KEY,
351354
DFSConfigKeys.DFS_NAMENODE_USE_STALE_DATANODE_FOR_WRITE_RATIO_DEFAULT);
@@ -2210,6 +2213,19 @@ private void setHeartbeatInterval(long intervalSeconds,
22102213
this.heartbeatExpireInterval = 2L * recheckInterval + 10 * 1000
22112214
* intervalSeconds;
22122215
this.blockInvalidateLimit = getBlockInvalidateLimit(blockInvalidateLimit);
2216+
refreshHeartbeatRecheckIntervalForMonitor();
2217+
}
2218+
2219+
@VisibleForTesting
2220+
public void refreshHeartbeatRecheckIntervalForMonitor() {
2221+
if (avoidStaleDataNodesForWrite && staleInterval < heartbeatRecheckInterval) {
2222+
heartbeatRecheckIntervalForMonitor = staleInterval;
2223+
LOG.info("Setting heartbeat recheck interval to " + staleInterval
2224+
+ " since " + DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_KEY
2225+
+ " is less than " + heartbeatRecheckInterval);
2226+
} else {
2227+
heartbeatRecheckIntervalForMonitor = heartbeatRecheckInterval;
2228+
}
22132229
}
22142230

22152231
private int getBlockInvalidateLimitFromHBInterval() {
@@ -2351,4 +2367,8 @@ public boolean isSlowPeerCollectorInitialized() {
23512367
public long getSlowPeerCollectionInterval() {
23522368
return slowPeerCollectionInterval;
23532369
}
2370+
2371+
public long getHeartbeatRecheckIntervalForMonitor() {
2372+
return heartbeatRecheckIntervalForMonitor;
2373+
}
23542374
}

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java

Lines changed: 4 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -67,8 +67,6 @@ class HeartbeatManager implements DatanodeStatistics {
6767
/** Statistics, which are synchronized by the heartbeat manager lock. */
6868
private final DatanodeStats stats = new DatanodeStats();
6969

70-
/** The time period to check for expired datanodes. */
71-
private final long heartbeatRecheckInterval;
7270
/** Heartbeat monitor thread. */
7371
private final Daemon heartbeatThread = new Daemon(new Monitor());
7472
private final StopWatch heartbeatStopWatch = new StopWatch();
@@ -86,31 +84,12 @@ class HeartbeatManager implements DatanodeStatistics {
8684
final BlockManager blockManager, final Configuration conf) {
8785
this.namesystem = namesystem;
8886
this.blockManager = blockManager;
89-
boolean avoidStaleDataNodesForWrite = conf.getBoolean(
90-
DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_KEY,
91-
DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_DEFAULT);
92-
long recheckInterval = conf.getInt(
93-
DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY,
94-
DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_DEFAULT); // 5 min
95-
long staleInterval = conf.getLong(
96-
DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_KEY,
97-
DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_DEFAULT);// 30s
9887
enableLogStaleNodes = conf.getBoolean(
9988
DFSConfigKeys.DFS_NAMENODE_ENABLE_LOG_STALE_DATANODE_KEY,
10089
DFSConfigKeys.DFS_NAMENODE_ENABLE_LOG_STALE_DATANODE_DEFAULT);
10190
this.numOfDeadDatanodesRemove = conf.getInt(
10291
DFSConfigKeys.DFS_NAMENODE_REMOVE_DEAD_DATANODE_BATCHNUM_KEY,
10392
DFSConfigKeys.DFS_NAMENODE_REMOVE_BAD_BATCH_NUM_DEFAULT);
104-
105-
if (avoidStaleDataNodesForWrite && staleInterval < recheckInterval) {
106-
this.heartbeatRecheckInterval = staleInterval;
107-
LOG.info("Setting heartbeat recheck interval to " + staleInterval
108-
+ " since " + DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_KEY
109-
+ " is less than "
110-
+ DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY);
111-
} else {
112-
this.heartbeatRecheckInterval = recheckInterval;
113-
}
11493
}
11594

11695
void activate() {
@@ -355,7 +334,8 @@ void restartHeartbeatStopWatch() {
355334
@VisibleForTesting
356335
boolean shouldAbortHeartbeatCheck(long offset) {
357336
long elapsed = heartbeatStopWatch.now(TimeUnit.MILLISECONDS);
358-
return elapsed + offset > heartbeatRecheckInterval;
337+
return elapsed + offset > blockManager.getDatanodeManager()
338+
.getHeartbeatRecheckIntervalForMonitor();
359339
}
360340

361341
/**
@@ -545,7 +525,8 @@ public void run() {
545525
restartHeartbeatStopWatch();
546526
try {
547527
final long now = Time.monotonicNow();
548-
if (lastHeartbeatCheck + heartbeatRecheckInterval < now) {
528+
if (lastHeartbeatCheck + blockManager.getDatanodeManager()
529+
.getHeartbeatRecheckIntervalForMonitor() < now) {
549530
heartbeatCheck();
550531
lastHeartbeatCheck = now;
551532
}

hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestHeartbeatHandling.java

Lines changed: 18 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -259,24 +259,23 @@ public void testHeartbeatBlockRecovery() throws Exception {
259259

260260
@Test
261261
public void testHeartbeatStopWatch() throws Exception {
262-
Namesystem ns = Mockito.mock(Namesystem.class);
263-
BlockManager bm = Mockito.mock(BlockManager.class);
264-
Configuration conf = new Configuration();
265-
long recheck = 2000;
266-
conf.setLong(
267-
DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, recheck);
268-
HeartbeatManager monitor = new HeartbeatManager(ns, bm, conf);
269-
monitor.restartHeartbeatStopWatch();
270-
assertFalse(monitor.shouldAbortHeartbeatCheck(0));
271-
// sleep shorter than recheck and verify shouldn't abort
272-
Thread.sleep(100);
273-
assertFalse(monitor.shouldAbortHeartbeatCheck(0));
274-
// sleep longer than recheck and verify should abort unless ignore delay
275-
Thread.sleep(recheck);
276-
assertTrue(monitor.shouldAbortHeartbeatCheck(0));
277-
assertFalse(monitor.shouldAbortHeartbeatCheck(-recheck*3));
278-
// ensure it resets properly
279-
monitor.restartHeartbeatStopWatch();
280-
assertFalse(monitor.shouldAbortHeartbeatCheck(0));
262+
Namesystem ns = Mockito.mock(Namesystem.class);
263+
Configuration conf = new Configuration();
264+
long recheck = 2000;
265+
conf.setLong(DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, recheck);
266+
BlockManager bm = new BlockManager(ns, false, conf);
267+
HeartbeatManager monitor = new HeartbeatManager(ns, bm, conf);
268+
monitor.restartHeartbeatStopWatch();
269+
assertFalse(monitor.shouldAbortHeartbeatCheck(0));
270+
// sleep shorter than recheck and verify shouldn't abort
271+
Thread.sleep(100);
272+
assertFalse(monitor.shouldAbortHeartbeatCheck(0));
273+
// sleep longer than recheck and verify should abort unless ignore delay
274+
Thread.sleep(recheck);
275+
assertTrue(monitor.shouldAbortHeartbeatCheck(0));
276+
assertFalse(monitor.shouldAbortHeartbeatCheck(-recheck * 3));
277+
// ensure it resets properly
278+
monitor.restartHeartbeatStopWatch();
279+
assertFalse(monitor.shouldAbortHeartbeatCheck(0));
281280
}
282281
}

0 commit comments

Comments
 (0)