1717 */
1818package org .apache .hadoop .hdfs .qjournal .client ;
1919
20+ import static org .apache .hadoop .hdfs .DFSConfigKeys .DFS_JOURNALNODE_MAINTENANCE_NODES_DEFAULT ;
21+ import static org .apache .hadoop .hdfs .DFSConfigKeys .DFS_JOURNALNODE_MAINTENANCE_NODES_KEY ;
22+
2023import java .io .IOException ;
2124import java .net .InetSocketAddress ;
2225import java .net .URI ;
3134import java .util .concurrent .TimeUnit ;
3235import java .util .concurrent .TimeoutException ;
3336
37+ import org .apache .hadoop .hdfs .server .blockmanagement .HostSet ;
3438import org .apache .hadoop .util .Lists ;
3539import org .slf4j .Logger ;
3640import org .slf4j .LoggerFactory ;
@@ -108,6 +112,7 @@ public class QuorumJournalManager implements JournalManager {
108112 private static final int OUTPUT_BUFFER_CAPACITY_DEFAULT = 512 * 1024 ;
109113 private int outputBufferCapacity ;
110114 private final URLConnectionFactory connectionFactory ;
115+ private int quorumJournalCount ;
111116
112117 /** Limit logging about input stream selection to every 5 seconds max. */
113118 private static final long SELECT_INPUT_STREAM_LOG_INTERVAL_MS = 5000 ;
@@ -144,7 +149,18 @@ public QuorumJournalManager(Configuration conf,
144149 this .uri = uri ;
145150 this .nsInfo = nsInfo ;
146151 this .nameServiceId = nameServiceId ;
147- this .loggers = new AsyncLoggerSet (createLoggers (loggerFactory ));
152+
153+ // createLoggers() will set quorumJournalCount to total number of journal nodes while return a
154+ // list of healthy/good journal nodes.
155+ List <AsyncLogger > asyncLoggerList = createLoggers (loggerFactory );
156+ this .loggers = new AsyncLoggerSet (asyncLoggerList , this .quorumJournalCount );
157+
158+ // Check whether the number of jn maintenance lists is valid
159+ int quorumThreshold = quorumJournalCount / 2 + 1 ;
160+ Preconditions .checkArgument (
161+ this .loggers .size () >= quorumThreshold ,
162+ "The total journalnode minus %s the number of blacklists must be greater than or equal to"
163+ + " %s!" , DFS_JOURNALNODE_MAINTENANCE_NODES_KEY , quorumThreshold );
148164
149165 this .maxTxnsPerRpc =
150166 conf .getInt (QJM_RPC_MAX_TXNS_KEY , QJM_RPC_MAX_TXNS_DEFAULT );
@@ -250,6 +266,10 @@ Map<AsyncLogger, NewEpochResponseProto> createNewUniqueEpoch()
250266
251267 @ Override
252268 public void format (NamespaceInfo nsInfo , boolean force ) throws IOException {
269+ if (isJNInMaintenanceMode ()) {
270+ throw new IOException (
271+ "Formatting a journal node is not support while in jn maintenance mode" );
272+ }
253273 QuorumCall <AsyncLogger , Void > call = loggers .format (nsInfo , force );
254274 try {
255275 call .waitFor (loggers .size (), loggers .size (), 0 , timeoutMs ,
@@ -406,21 +426,39 @@ private void recoverUnclosedSegment(long segmentTxId) throws IOException {
406426 logToSync .getStartTxId (),
407427 logToSync .getEndTxId ()));
408428 }
409-
410- static List <AsyncLogger > createLoggers (Configuration conf ,
429+
430+ List <AsyncLogger > createLoggers (Configuration conf ,
431+ URI uri ,
432+ NamespaceInfo nsInfo ,
433+ AsyncLogger .Factory factory ,
434+ String nameServiceId )
435+ throws IOException {
436+ String [] skipNodesHostPort = conf .getTrimmedStrings (
437+ DFS_JOURNALNODE_MAINTENANCE_NODES_KEY , DFS_JOURNALNODE_MAINTENANCE_NODES_DEFAULT );
438+ return createLoggers (conf , uri , nsInfo , factory , nameServiceId , skipNodesHostPort );
439+ }
440+
441+ private List <AsyncLogger > createLoggers (Configuration conf ,
411442 URI uri ,
412443 NamespaceInfo nsInfo ,
413444 AsyncLogger .Factory factory ,
414- String nameServiceId )
445+ String nameServiceId ,
446+ String [] skipNodesHostPort )
415447 throws IOException {
416448 List <AsyncLogger > ret = Lists .newArrayList ();
417449 List <InetSocketAddress > addrs = Util .getAddressesList (uri , conf );
418450 if (addrs .size () % 2 == 0 ) {
419451 LOG .warn ("Quorum journal URI '" + uri + "' has an even number " +
420452 "of Journal Nodes specified. This is not recommended!" );
421453 }
454+ setQuorumJournalCount (addrs .size ());
455+ HostSet skipSet = DFSUtil .getHostSet (skipNodesHostPort );
422456 String jid = parseJournalId (uri );
423457 for (InetSocketAddress addr : addrs ) {
458+ if (skipSet .match (addr )) {
459+ LOG .info ("The node {} is a maintenance node and will be skipped." , addr );
460+ continue ;
461+ }
424462 ret .add (factory .createLogger (conf , nsInfo , jid , nameServiceId , addr ));
425463 }
426464 return ret ;
@@ -667,6 +705,9 @@ AsyncLoggerSet getLoggerSetForTests() {
667705
668706 @ Override
669707 public void doPreUpgrade () throws IOException {
708+ if (isJNInMaintenanceMode ()) {
709+ throw new IOException ("doPreUpgrade() is not support while in jn maintenance mode" );
710+ }
670711 QuorumCall <AsyncLogger , Void > call = loggers .doPreUpgrade ();
671712 try {
672713 call .waitFor (loggers .size (), loggers .size (), 0 , timeoutMs ,
@@ -684,6 +725,9 @@ public void doPreUpgrade() throws IOException {
684725
685726 @ Override
686727 public void doUpgrade (Storage storage ) throws IOException {
728+ if (isJNInMaintenanceMode ()) {
729+ throw new IOException ("doUpgrade() is not support while in jn maintenance mode" );
730+ }
687731 QuorumCall <AsyncLogger , Void > call = loggers .doUpgrade (storage );
688732 try {
689733 call .waitFor (loggers .size (), loggers .size (), 0 , timeoutMs ,
@@ -701,6 +745,9 @@ public void doUpgrade(Storage storage) throws IOException {
701745
702746 @ Override
703747 public void doFinalize () throws IOException {
748+ if (isJNInMaintenanceMode ()) {
749+ throw new IOException ("doFinalize() is not support while in jn maintenance mode" );
750+ }
704751 QuorumCall <AsyncLogger , Void > call = loggers .doFinalize ();
705752 try {
706753 call .waitFor (loggers .size (), loggers .size (), 0 , timeoutMs ,
@@ -719,6 +766,9 @@ public void doFinalize() throws IOException {
719766 @ Override
720767 public boolean canRollBack (StorageInfo storage , StorageInfo prevStorage ,
721768 int targetLayoutVersion ) throws IOException {
769+ if (isJNInMaintenanceMode ()) {
770+ throw new IOException ("canRollBack() is not support while in jn maintenance mode" );
771+ }
722772 QuorumCall <AsyncLogger , Boolean > call = loggers .canRollBack (storage ,
723773 prevStorage , targetLayoutVersion );
724774 try {
@@ -753,6 +803,9 @@ public boolean canRollBack(StorageInfo storage, StorageInfo prevStorage,
753803
754804 @ Override
755805 public void doRollback () throws IOException {
806+ if (isJNInMaintenanceMode ()) {
807+ throw new IOException ("doRollback() is not support while in jn maintenance mode" );
808+ }
756809 QuorumCall <AsyncLogger , Void > call = loggers .doRollback ();
757810 try {
758811 call .waitFor (loggers .size (), loggers .size (), 0 , timeoutMs ,
@@ -770,6 +823,9 @@ public void doRollback() throws IOException {
770823
771824 @ Override
772825 public void discardSegments (long startTxId ) throws IOException {
826+ if (isJNInMaintenanceMode ()) {
827+ throw new IOException ("discardSegments() is not support while in jn maintenance mode" );
828+ }
773829 QuorumCall <AsyncLogger , Void > call = loggers .discardSegments (startTxId );
774830 try {
775831 call .waitFor (loggers .size (), loggers .size (), 0 ,
@@ -789,6 +845,9 @@ public void discardSegments(long startTxId) throws IOException {
789845
790846 @ Override
791847 public long getJournalCTime () throws IOException {
848+ if (isJNInMaintenanceMode ()) {
849+ throw new IOException ("getJournalCTime() is not support while in jn maintenance mode" );
850+ }
792851 QuorumCall <AsyncLogger , Long > call = loggers .getJournalCTime ();
793852 try {
794853 call .waitFor (loggers .size (), loggers .size (), 0 ,
@@ -819,4 +878,12 @@ public long getJournalCTime() throws IOException {
819878
820879 throw new AssertionError ("Unreachable code." );
821880 }
881+
882+ public void setQuorumJournalCount (int quorumJournalCount ) {
883+ this .quorumJournalCount = quorumJournalCount ;
884+ }
885+
886+ private boolean isJNInMaintenanceMode () {
887+ return this .loggers .size () < quorumJournalCount ;
888+ }
822889}
0 commit comments