Skip to content

Commit bd1fa84

Browse files
committed
HDFS-17372. CommandProcessingThread#queue should use LinkedBlockingDeque to prevent high priority command blocked by low priority command.
1 parent 4f0f5a5 commit bd1fa84

File tree

9 files changed

+53
-16
lines changed

9 files changed

+53
-16
lines changed

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -178,7 +178,7 @@ public HeartbeatResponse sendHeartbeat(DatanodeRegistration registration,
178178
}
179179
return new HeartbeatResponse(cmds, PBHelper.convert(resp.getHaStatus()),
180180
rollingUpdateStatus, resp.getFullBlockReportLeaseId(),
181-
resp.getIsSlownode());
181+
resp.getIsSlownode(), resp.getIsCommandHighPriority());
182182
}
183183

184184
@Override

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1657,13 +1657,15 @@ public void setBlockToken(final LocatedBlock b,
16571657
}
16581658
}
16591659

1660-
void addKeyUpdateCommand(final List<DatanodeCommand> cmds,
1660+
boolean addKeyUpdateCommand(final List<DatanodeCommand> cmds,
16611661
final DatanodeDescriptor nodeinfo) {
16621662
// check access key update
16631663
if (isBlockTokenEnabled() && nodeinfo.needKeyUpdate()) {
16641664
cmds.add(new KeyUpdateCommand(blockTokenSecretManager.exportKeys()));
16651665
nodeinfo.setNeedKeyUpdate(false);
1666+
return true;
16661667
}
1668+
return false;
16671669
}
16681670

16691671
public DataEncryptionKey generateDataEncryptionKey() {

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@
7171
import java.util.*;
7272
import java.util.concurrent.ThreadLocalRandom;
7373
import java.util.concurrent.TimeUnit;
74+
import java.util.concurrent.atomic.AtomicBoolean;
7475
import java.util.function.Consumer;
7576

7677
/**
@@ -1813,7 +1814,8 @@ public DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg,
18131814
int xmitsInProgress, int failedVolumes,
18141815
VolumeFailureSummary volumeFailureSummary,
18151816
@Nonnull SlowPeerReports slowPeers,
1816-
@Nonnull SlowDiskReports slowDisks) throws IOException {
1817+
@Nonnull SlowDiskReports slowDisks,
1818+
AtomicBoolean containsHighPriorityCmd) throws IOException {
18171819
final DatanodeDescriptor nodeinfo;
18181820
try {
18191821
nodeinfo = getDatanode(nodeReg);
@@ -1922,7 +1924,8 @@ public DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg,
19221924
// cache commands
19231925
addCacheCommands(blockPoolId, nodeinfo, cmds);
19241926
// key update command
1925-
blockManager.addKeyUpdateCommand(cmds, nodeinfo);
1927+
boolean isKeyUpdated = blockManager.addKeyUpdateCommand(cmds, nodeinfo);
1928+
containsHighPriorityCmd.set(isKeyUpdated);
19261929

19271930
// check for balancer bandwidth update
19281931
if (nodeinfo.getBalancerBandwidth() > 0) {

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java

Lines changed: 26 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@
3636
import java.util.TreeSet;
3737
import java.util.concurrent.BlockingQueue;
3838
import java.util.concurrent.CountDownLatch;
39-
import java.util.concurrent.LinkedBlockingQueue;
39+
import java.util.concurrent.LinkedBlockingDeque;
4040
import java.util.concurrent.ThreadLocalRandom;
4141
import java.util.concurrent.atomic.AtomicBoolean;
4242
import java.util.concurrent.atomic.AtomicLong;
@@ -738,7 +738,8 @@ private void offerService() throws Exception {
738738
if (state == HAServiceState.ACTIVE) {
739739
handleRollingUpgradeStatus(resp);
740740
}
741-
commandProcessingThread.enqueue(resp.getCommands());
741+
commandProcessingThread.enqueue(resp.getCommands(),
742+
resp.getIsContainsHighPriorityCmds());
742743
isSlownode = resp.getIsSlownode();
743744
}
744745
}
@@ -1389,7 +1390,7 @@ class CommandProcessingThread extends Thread {
13891390
CommandProcessingThread(BPServiceActor actor) {
13901391
super("Command processor");
13911392
this.actor = actor;
1392-
this.queue = new LinkedBlockingQueue<>();
1393+
this.queue = new LinkedBlockingDeque<>();
13931394
setDaemon(true);
13941395
}
13951396

@@ -1468,6 +1469,11 @@ private boolean processCommand(DatanodeCommand[] cmds) {
14681469
return true;
14691470
}
14701471

1472+
/**
1473+
* Only used for cacheReport.
1474+
* @param cmd
1475+
* @throws InterruptedException
1476+
*/
14711477
void enqueue(DatanodeCommand cmd) throws InterruptedException {
14721478
if (cmd == null) {
14731479
return;
@@ -1476,6 +1482,11 @@ void enqueue(DatanodeCommand cmd) throws InterruptedException {
14761482
dn.getMetrics().incrActorCmdQueueLength(1);
14771483
}
14781484

1485+
/**
1486+
* Used for blockReport.
1487+
* @param cmds
1488+
* @throws InterruptedException
1489+
*/
14791490
void enqueue(List<DatanodeCommand> cmds) throws InterruptedException {
14801491
if (cmds == null) {
14811492
return;
@@ -1485,8 +1496,18 @@ void enqueue(List<DatanodeCommand> cmds) throws InterruptedException {
14851496
dn.getMetrics().incrActorCmdQueueLength(1);
14861497
}
14871498

1488-
void enqueue(DatanodeCommand[] cmds) throws InterruptedException {
1489-
queue.put(() -> processCommand(cmds));
1499+
/**
1500+
* Used for heartbeating.
1501+
* @param cmds
1502+
* @throws InterruptedException
1503+
*/
1504+
void enqueue(DatanodeCommand[] cmds,
1505+
boolean containsHighPriorityCmds) throws InterruptedException {
1506+
if (containsHighPriorityCmds) {
1507+
((LinkedBlockingDeque<Runnable>) queue).putFirst(() -> processCommand(cmds));
1508+
} else {
1509+
queue.put(() -> processCommand(cmds));
1510+
}
14901511
dn.getMetrics().incrActorCmdQueueLength(1);
14911512
}
14921513
}

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,7 @@
9797
import static org.apache.hadoop.hdfs.DFSUtil.isParentEntry;
9898

9999
import java.nio.charset.StandardCharsets;
100+
import java.util.concurrent.atomic.AtomicBoolean;
100101
import java.util.concurrent.atomic.AtomicLong;
101102

102103
import org.apache.commons.text.CaseUtils;
@@ -4429,11 +4430,12 @@ HeartbeatResponse handleHeartbeat(DatanodeRegistration nodeReg,
44294430
throws IOException {
44304431
readLock();
44314432
try {
4433+
AtomicBoolean containsHighPriorityCmd = new AtomicBoolean(false);
44324434
//get datanode commands
44334435
DatanodeCommand[] cmds = blockManager.getDatanodeManager().handleHeartbeat(
44344436
nodeReg, reports, getBlockPoolId(), cacheCapacity, cacheUsed,
44354437
xceiverCount, xmitsInProgress, failedVolumes, volumeFailureSummary,
4436-
slowPeers, slowDisks);
4438+
slowPeers, slowDisks, containsHighPriorityCmd);
44374439
long blockReportLeaseId = 0;
44384440
if (requestFullBlockReportLease) {
44394441
blockReportLeaseId = blockManager.requestBlockReportLeaseId(nodeReg);
@@ -4448,7 +4450,7 @@ nodeReg, reports, getBlockPoolId(), cacheCapacity, cacheUsed,
44484450
boolean isSlownode = slownodes.contains(nodeReg.getDatanodeUuid());
44494451

44504452
return new HeartbeatResponse(cmds, haState, rollingUpgradeInfo,
4451-
blockReportLeaseId, isSlownode);
4453+
blockReportLeaseId, isSlownode, containsHighPriorityCmd.get());
44524454
} finally {
44534455
readUnlock("handleHeartbeat");
44544456
}

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/HeartbeatResponse.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,21 +38,24 @@ public class HeartbeatResponse {
3838
private final long fullBlockReportLeaseId;
3939

4040
private final boolean isSlownode;
41+
42+
private final boolean containsHighPriorityCmds;
4143

4244
public HeartbeatResponse(DatanodeCommand[] cmds,
4345
NNHAStatusHeartbeat haStatus, RollingUpgradeStatus rollingUpdateStatus,
4446
long fullBlockReportLeaseId) {
45-
this(cmds, haStatus, rollingUpdateStatus, fullBlockReportLeaseId, false);
47+
this(cmds, haStatus, rollingUpdateStatus, fullBlockReportLeaseId, false, false);
4648
}
4749

4850
public HeartbeatResponse(DatanodeCommand[] cmds,
4951
NNHAStatusHeartbeat haStatus, RollingUpgradeStatus rollingUpdateStatus,
50-
long fullBlockReportLeaseId, boolean isSlownode) {
52+
long fullBlockReportLeaseId, boolean isSlownode, boolean containsHighPriorityCmds) {
5153
commands = cmds;
5254
this.haStatus = haStatus;
5355
this.rollingUpdateStatus = rollingUpdateStatus;
5456
this.fullBlockReportLeaseId = fullBlockReportLeaseId;
5557
this.isSlownode = isSlownode;
58+
this.containsHighPriorityCmds = containsHighPriorityCmds;
5659
}
5760

5861
public DatanodeCommand[] getCommands() {
@@ -74,4 +77,8 @@ public long getFullBlockReportLeaseId() {
7477
public boolean getIsSlownode() {
7578
return isSlownode;
7679
}
80+
81+
public boolean getIsContainsHighPriorityCmds() {
82+
return containsHighPriorityCmds;
83+
}
7784
}

hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -224,6 +224,7 @@ message HeartbeatResponseProto {
224224
optional RollingUpgradeStatusProto rollingUpgradeStatusV2 = 4;
225225
optional uint64 fullBlockReportLeaseId = 5 [ default = 0 ];
226226
optional bool isSlownode = 6 [ default = false ];
227+
optional bool isCommandHighPriority = 7 [ default = false ];
227228
}
228229

229230
/**

hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeManager.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import java.util.Map.Entry;
3636
import java.util.Random;
3737
import java.util.Set;
38+
import java.util.concurrent.atomic.AtomicBoolean;
3839

3940
import org.slf4j.Logger;
4041
import org.slf4j.LoggerFactory;
@@ -1039,7 +1040,7 @@ private void verifyPendingRecoveryTasks(
10391040
Mockito.when(dm.getDatanode(dnReg)).thenReturn(nodeInfo);
10401041
DatanodeCommand[] cmds = dm.handleHeartbeat(
10411042
dnReg, new StorageReport[1], "bp-123", 0, 0, 10, 0, 0, null,
1042-
SlowPeerReports.EMPTY_REPORT, SlowDiskReports.EMPTY_REPORT);
1043+
SlowPeerReports.EMPTY_REPORT, SlowDiskReports.EMPTY_REPORT, new AtomicBoolean(false));
10431044

10441045
long expectedNumCmds = Arrays.stream(
10451046
new int[]{numReplicationTasks + numECTasksToBeReplicated, numECTasksToBeErasureCoded})
@@ -1171,7 +1172,7 @@ public void verifyComputeReconstructedTaskNum(int xmitsInProgress, int numReplic
11711172

11721173
dm.handleHeartbeat(nodeReg, new StorageReport[1], "bp-123", 0, 0,
11731174
10, xmitsInProgress, 0, null, SlowPeerReports.EMPTY_REPORT,
1174-
SlowDiskReports.EMPTY_REPORT);
1175+
SlowDiskReports.EMPTY_REPORT, new AtomicBoolean(false));
11751176

11761177
Mockito.verify(nodeInfo).getReplicationCommand(captor.capture());
11771178
int numReplicationTasks = captor.getValue();

hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -245,7 +245,7 @@ public HeartbeatResponse answer(InvocationOnMock invocation)
245245
throws Throwable {
246246
HeartbeatResponse heartbeatResponse = new HeartbeatResponse(
247247
datanodeCommands[nnIdx], mockHaStatuses[nnIdx], null,
248-
0, isSlownode);
248+
0, isSlownode, false);
249249

250250
return heartbeatResponse;
251251
}

0 commit comments

Comments
 (0)