1717 */ 
1818package  org .apache .hadoop .hdfs .qjournal .client ;
1919
20+ import  static  org .apache .hadoop .hdfs .DFSConfigKeys .DFS_JOURNALNODE_MAINTENANCE_NODES_DEFAULT ;
21+ import  static  org .apache .hadoop .hdfs .DFSConfigKeys .DFS_JOURNALNODE_MAINTENANCE_NODES_KEY ;
22+ 
2023import  java .io .IOException ;
2124import  java .net .InetSocketAddress ;
2225import  java .net .URI ;
3134import  java .util .concurrent .TimeUnit ;
3235import  java .util .concurrent .TimeoutException ;
3336
37+ import  org .apache .hadoop .hdfs .server .blockmanagement .HostSet ;
3438import  org .apache .hadoop .util .Lists ;
3539import  org .slf4j .Logger ;
3640import  org .slf4j .LoggerFactory ;
6266import  org .apache .hadoop .classification .VisibleForTesting ;
6367import  org .apache .hadoop .thirdparty .com .google .common .base .Joiner ;
6468import  org .apache .hadoop .util .Preconditions ;
69+ 
6570import  org .apache .hadoop .thirdparty .protobuf .TextFormat ;
6671
6772/** 
@@ -108,6 +113,7 @@ public class QuorumJournalManager implements JournalManager {
108113  private  static  final  int  OUTPUT_BUFFER_CAPACITY_DEFAULT  = 512  * 1024 ;
109114  private  int  outputBufferCapacity ;
110115  private  final  URLConnectionFactory  connectionFactory ;
116+   private  int  quorumJournalCount ;
111117
112118  /** Limit logging about input stream selection to every 5 seconds max. */ 
113119  private  static  final  long  SELECT_INPUT_STREAM_LOG_INTERVAL_MS  = 5000 ;
@@ -144,7 +150,14 @@ public QuorumJournalManager(Configuration conf,
144150    this .uri  = uri ;
145151    this .nsInfo  = nsInfo ;
146152    this .nameServiceId  = nameServiceId ;
147-     this .loggers  = new  AsyncLoggerSet (createLoggers (loggerFactory ));
153+     this .loggers  = new  AsyncLoggerSet (createLoggers (loggerFactory ), this .quorumJournalCount );
154+ 
155+     // Check whether the number of jn maintenance lists is valid 
156+     int  quorumThreshold  = quorumJournalCount  / 2  + 1 ;
157+     Preconditions .checkArgument (
158+         this .loggers .size () >= quorumThreshold ,
159+         "The total journalnode minus %s the number of blacklists must be greater than or equal to" 
160+             + " %s!" , DFS_JOURNALNODE_MAINTENANCE_NODES_KEY , quorumThreshold );
148161
149162    this .maxTxnsPerRpc  =
150163        conf .getInt (QJM_RPC_MAX_TXNS_KEY , QJM_RPC_MAX_TXNS_DEFAULT );
@@ -250,6 +263,9 @@ Map<AsyncLogger, NewEpochResponseProto> createNewUniqueEpoch()
250263
251264  @ Override 
252265  public  void  format (NamespaceInfo  nsInfo , boolean  force ) throws  IOException  {
266+     if  (isEnableJnMaintenance ()) {
267+       throw  new  IOException ("format() does not support enabling jn maintenance mode" );
268+     }
253269    QuorumCall <AsyncLogger , Void > call  = loggers .format (nsInfo , force );
254270    try  {
255271      call .waitFor (loggers .size (), loggers .size (), 0 , timeoutMs ,
@@ -406,21 +422,39 @@ private void recoverUnclosedSegment(long segmentTxId) throws IOException {
406422            logToSync .getStartTxId (),
407423            logToSync .getEndTxId ()));
408424  }
409-   
410-   static  List <AsyncLogger > createLoggers (Configuration  conf ,
425+ 
426+   List <AsyncLogger > createLoggers (Configuration  conf ,
427+                                   URI  uri ,
428+                                   NamespaceInfo  nsInfo ,
429+                                   AsyncLogger .Factory  factory ,
430+                                   String  nameServiceId )
431+       throws  IOException  {
432+     String [] skipNodesHostPort  = conf .getTrimmedStrings (
433+         DFS_JOURNALNODE_MAINTENANCE_NODES_KEY , DFS_JOURNALNODE_MAINTENANCE_NODES_DEFAULT );
434+     return  createLoggers (conf , uri , nsInfo , factory , nameServiceId , skipNodesHostPort );
435+   }
436+ 
437+   private  List <AsyncLogger > createLoggers (Configuration  conf ,
411438                                         URI  uri ,
412439                                         NamespaceInfo  nsInfo ,
413440                                         AsyncLogger .Factory  factory ,
414-                                          String  nameServiceId )
441+                                          String  nameServiceId ,
442+                                          String [] skipNodesHostPort )
415443      throws  IOException  {
416444    List <AsyncLogger > ret  = Lists .newArrayList ();
417445    List <InetSocketAddress > addrs  = Util .getAddressesList (uri , conf );
418446    if  (addrs .size () % 2  == 0 ) {
419447      LOG .warn ("Quorum journal URI '"  + uri  + "' has an even number "  +
420448          "of Journal Nodes specified. This is not recommended!" );
421449    }
450+     this .quorumJournalCount  = addrs .size ();
451+     HostSet  skipSet  = DFSUtil .getInetSocketAddress (skipNodesHostPort );
422452    String  jid  = parseJournalId (uri );
423453    for  (InetSocketAddress  addr  : addrs ) {
454+       if (skipSet .match (addr )) {
455+         LOG .info ("The node {} is a maintenance node and will skip initialization." , addr );
456+         continue ;
457+       }
424458      ret .add (factory .createLogger (conf , nsInfo , jid , nameServiceId , addr ));
425459    }
426460    return  ret ;
@@ -667,6 +701,9 @@ AsyncLoggerSet getLoggerSetForTests() {
667701
668702  @ Override 
669703  public  void  doPreUpgrade () throws  IOException  {
704+     if  (isEnableJnMaintenance ()) {
705+       throw  new  IOException ("doPreUpgrade() does not support enabling jn maintenance mode" );
706+     }
670707    QuorumCall <AsyncLogger , Void > call  = loggers .doPreUpgrade ();
671708    try  {
672709      call .waitFor (loggers .size (), loggers .size (), 0 , timeoutMs ,
@@ -684,6 +721,9 @@ public void doPreUpgrade() throws IOException {
684721
685722  @ Override 
686723  public  void  doUpgrade (Storage  storage ) throws  IOException  {
724+     if  (isEnableJnMaintenance ()) {
725+       throw  new  IOException ("doUpgrade() does not support enabling jn maintenance mode" );
726+     }
687727    QuorumCall <AsyncLogger , Void > call  = loggers .doUpgrade (storage );
688728    try  {
689729      call .waitFor (loggers .size (), loggers .size (), 0 , timeoutMs ,
@@ -701,6 +741,9 @@ public void doUpgrade(Storage storage) throws IOException {
701741
702742  @ Override 
703743  public  void  doFinalize () throws  IOException  {
744+     if  (isEnableJnMaintenance ()) {
745+       throw  new  IOException ("doFinalize() does not support enabling jn maintenance mode" );
746+     }
704747    QuorumCall <AsyncLogger , Void > call  = loggers .doFinalize ();
705748    try  {
706749      call .waitFor (loggers .size (), loggers .size (), 0 , timeoutMs ,
@@ -719,6 +762,9 @@ public void doFinalize() throws IOException {
719762  @ Override 
720763  public  boolean  canRollBack (StorageInfo  storage , StorageInfo  prevStorage ,
721764      int  targetLayoutVersion ) throws  IOException  {
765+     if  (isEnableJnMaintenance ()) {
766+       throw  new  IOException ("canRollBack() does not support enabling jn maintenance mode" );
767+     }
722768    QuorumCall <AsyncLogger , Boolean > call  = loggers .canRollBack (storage ,
723769        prevStorage , targetLayoutVersion );
724770    try  {
@@ -753,6 +799,9 @@ public boolean canRollBack(StorageInfo storage, StorageInfo prevStorage,
753799
754800  @ Override 
755801  public  void  doRollback () throws  IOException  {
802+     if  (isEnableJnMaintenance ()) {
803+       throw  new  IOException ("doRollback() does not support enabling jn maintenance mode" );
804+     }
756805    QuorumCall <AsyncLogger , Void > call  = loggers .doRollback ();
757806    try  {
758807      call .waitFor (loggers .size (), loggers .size (), 0 , timeoutMs ,
@@ -770,6 +819,9 @@ public void doRollback() throws IOException {
770819
771820  @ Override 
772821  public  void  discardSegments (long  startTxId ) throws  IOException  {
822+     if  (isEnableJnMaintenance ()) {
823+       throw  new  IOException ("discardSegments() does not support enabling jn maintenance mode" );
824+     }
773825    QuorumCall <AsyncLogger , Void > call  = loggers .discardSegments (startTxId );
774826    try  {
775827      call .waitFor (loggers .size (), loggers .size (), 0 ,
@@ -789,6 +841,9 @@ public void discardSegments(long startTxId) throws IOException {
789841
790842  @ Override 
791843  public  long  getJournalCTime () throws  IOException  {
844+     if  (isEnableJnMaintenance ()) {
845+       throw  new  IOException ("getJournalCTime() does not support enabling jn maintenance mode" );
846+     }
792847    QuorumCall <AsyncLogger , Long > call  = loggers .getJournalCTime ();
793848    try  {
794849      call .waitFor (loggers .size (), loggers .size (), 0 ,
@@ -819,4 +874,8 @@ public long getJournalCTime() throws IOException {
819874
820875    throw  new  AssertionError ("Unreachable code." );
821876  }
877+ 
878+   private  boolean  isEnableJnMaintenance () {
879+     return  this .loggers .size () < quorumJournalCount ;
880+   }
822881}
0 commit comments