3636import java .util .TreeSet ;
3737import java .util .concurrent .BlockingQueue ;
3838import java .util .concurrent .CountDownLatch ;
39- import java .util .concurrent .LinkedBlockingQueue ;
39+ import java .util .concurrent .LinkedBlockingDeque ;
4040import java .util .concurrent .ThreadLocalRandom ;
4141import java .util .concurrent .atomic .AtomicBoolean ;
4242import java .util .concurrent .atomic .AtomicLong ;
6262import org .apache .hadoop .hdfs .server .protocol .DisallowedDatanodeException ;
6363import org .apache .hadoop .hdfs .server .protocol .HeartbeatResponse ;
6464import org .apache .hadoop .hdfs .server .protocol .InvalidBlockReportLeaseException ;
65+ import org .apache .hadoop .hdfs .server .protocol .KeyUpdateCommand ;
6566import org .apache .hadoop .hdfs .server .protocol .NamespaceInfo ;
6667import org .apache .hadoop .hdfs .server .protocol .SlowDiskReports ;
6768import org .apache .hadoop .hdfs .server .protocol .SlowPeerReports ;
@@ -738,7 +739,19 @@ private void offerService() throws Exception {
738739 if (state == HAServiceState .ACTIVE ) {
739740 handleRollingUpgradeStatus (resp );
740741 }
741- commandProcessingThread .enqueue (resp .getCommands ());
742+ // Note: effect only when the KeyUpdateCommand was the last
743+ // or penultimate command in DatanodeCommand[].
744+ DatanodeCommand [] cmds = resp .getCommands ();
745+ boolean isContaisHighPriorityCmd = false ;
746+ if (cmds != null ) {
747+ int length = cmds .length ;
748+ for (int iter = length - 1 ; iter >= 0 && iter >= length - 2 ; iter --) {
749+ isContaisHighPriorityCmd = isContaisHighPriorityCmd ||
750+ cmds [iter ] instanceof KeyUpdateCommand ;
751+ }
752+ }
753+ commandProcessingThread .enqueue (cmds ,
754+ isContaisHighPriorityCmd );
742755 isSlownode = resp .getIsSlownode ();
743756 }
744757 }
@@ -1389,7 +1402,7 @@ class CommandProcessingThread extends Thread {
13891402 CommandProcessingThread (BPServiceActor actor ) {
13901403 super ("Command processor" );
13911404 this .actor = actor ;
1392- this .queue = new LinkedBlockingQueue <>();
1405+ this .queue = new LinkedBlockingDeque <>();
13931406 setDaemon (true );
13941407 }
13951408
@@ -1468,6 +1481,11 @@ private boolean processCommand(DatanodeCommand[] cmds) {
14681481 return true ;
14691482 }
14701483
1484+ /**
1485+ * Used for cacheReport.
1486+ * @param cmd
1487+ * @throws InterruptedException
1488+ */
14711489 void enqueue (DatanodeCommand cmd ) throws InterruptedException {
14721490 if (cmd == null ) {
14731491 return ;
@@ -1476,6 +1494,11 @@ void enqueue(DatanodeCommand cmd) throws InterruptedException {
14761494 dn .getMetrics ().incrActorCmdQueueLength (1 );
14771495 }
14781496
1497+ /**
1498+ * Used for full block report.
1499+ * @param cmds
1500+ * @throws InterruptedException
1501+ */
14791502 void enqueue (List <DatanodeCommand > cmds ) throws InterruptedException {
14801503 if (cmds == null ) {
14811504 return ;
@@ -1485,8 +1508,18 @@ void enqueue(List<DatanodeCommand> cmds) throws InterruptedException {
14851508 dn .getMetrics ().incrActorCmdQueueLength (1 );
14861509 }
14871510
1488- void enqueue (DatanodeCommand [] cmds ) throws InterruptedException {
1489- queue .put (() -> processCommand (cmds ));
1511+ /**
1512+ * Used for regular heartbeat.
1513+ * @param cmds
1514+ * @throws InterruptedException
1515+ */
1516+ void enqueue (DatanodeCommand [] cmds ,
1517+ boolean containsHighPriorityCmds ) throws InterruptedException {
1518+ if (containsHighPriorityCmds ) {
1519+ ((LinkedBlockingDeque <Runnable >) queue ).putFirst (() -> processCommand (cmds ));
1520+ } else {
1521+ queue .put (() -> processCommand (cmds ));
1522+ }
14901523 dn .getMetrics ().incrActorCmdQueueLength (1 );
14911524 }
14921525 }
0 commit comments