2222import  java .util .List ;
2323import  java .util .function .Function ;
2424import  java .util .function .Supplier ;
25+ import  org .apache .hadoop .hbase .HConstants ;
2526import  org .apache .hadoop .hbase .ServerName ;
2627import  org .apache .hadoop .hbase .TableExistsException ;
2728import  org .apache .hadoop .hbase .TableName ;
@@ -95,16 +96,20 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
9596    (n , k ) -> n .compareKey ((String ) k );
9697  private  final  static  AvlKeyComparator <MetaQueue > META_QUEUE_KEY_COMPARATOR  =
9798    (n , k ) -> n .compareKey ((TableName ) k );
99+   private  final  static  AvlKeyComparator <GlobalQueue > GLOBAL_QUEUE_KEY_COMPARATOR  =
100+     (n , k ) -> n .compareKey ((String ) k );
98101
99102  private  final  FairQueue <ServerName > serverRunQueue  = new  FairQueue <>();
100103  private  final  FairQueue <TableName > tableRunQueue  = new  FairQueue <>();
101104  private  final  FairQueue <String > peerRunQueue  = new  FairQueue <>();
102105  private  final  FairQueue <TableName > metaRunQueue  = new  FairQueue <>();
106+   private  final  FairQueue <String > globalRunQueue  = new  FairQueue <>();
103107
104108  private  final  ServerQueue [] serverBuckets  = new  ServerQueue [128 ];
105109  private  TableQueue  tableMap  = null ;
106110  private  PeerQueue  peerMap  = null ;
107111  private  MetaQueue  metaMap  = null ;
112+   private  GlobalQueue  globalMap  = null ;
108113
109114  private  final  SchemaLocking  locking ;
110115
@@ -128,6 +133,8 @@ protected void enqueue(final Procedure proc, final boolean addFront) {
128133      doAdd (serverRunQueue , getServerQueue (spi .getServerName (), spi ), proc , addFront );
129134    } else  if  (isPeerProcedure (proc )) {
130135      doAdd (peerRunQueue , getPeerQueue (getPeerId (proc )), proc , addFront );
136+     } else  if  (isGlobalProcedure (proc )) {
137+       doAdd (globalRunQueue , getGlobalQueue (getGlobalId (proc )), proc , addFront );
131138    } else  {
132139      // TODO: at the moment we only have Table and Server procedures 
133140      // if you are implementing a non-table/non-server procedure, you have two options: create 
@@ -163,14 +170,19 @@ private <T extends Comparable<T>> void doAdd(FairQueue<T> fairq, Queue<T> queue,
163170
164171  @ Override 
165172  protected  boolean  queueHasRunnables () {
166-     return  metaRunQueue .hasRunnables () || tableRunQueue .hasRunnables ()
167-       || serverRunQueue .hasRunnables () || peerRunQueue .hasRunnables ();
173+     return  globalRunQueue .hasRunnables () || metaRunQueue .hasRunnables ()
174+       || tableRunQueue .hasRunnables () || serverRunQueue .hasRunnables ()
175+       || peerRunQueue .hasRunnables ();
168176  }
169177
170178  @ Override 
171179  protected  Procedure  dequeue () {
172-     // meta procedure is always the first priority 
173-     Procedure <?> pollResult  = doPoll (metaRunQueue );
180+     // pull global first 
181+     Procedure <?> pollResult  = doPoll (globalRunQueue );
182+     // then meta procedure 
183+     if  (pollResult  == null ) {
184+       pollResult  = doPoll (metaRunQueue );
185+     }
174186    // For now, let server handling have precedence over table handling; presumption is that it 
175187    // is more important handling crashed servers than it is running the 
176188    // enabling/disabling tables, etc. 
@@ -268,6 +280,14 @@ private void clearQueue() {
268280    clear (peerMap , peerRunQueue , PEER_QUEUE_KEY_COMPARATOR );
269281    peerMap  = null ;
270282
283+     // Remove Meta 
284+     clear (metaMap , metaRunQueue , META_QUEUE_KEY_COMPARATOR );
285+     metaMap  = null ;
286+ 
287+     // Remove Global 
288+     clear (globalMap , globalRunQueue , GLOBAL_QUEUE_KEY_COMPARATOR );
289+     globalMap  = null ;
290+ 
271291    assert  size () == 0  : "expected queue size to be 0, got "  + size ();
272292  }
273293
@@ -300,6 +320,7 @@ protected int queueSize() {
300320    count  += queueSize (tableMap );
301321    count  += queueSize (peerMap );
302322    count  += queueSize (metaMap );
323+     count  += queueSize (globalMap );
303324    return  count ;
304325  }
305326
@@ -502,6 +523,51 @@ private static boolean isMetaProcedure(Procedure<?> proc) {
502523    return  proc  instanceof  MetaProcedureInterface ;
503524  }
504525
526+   // ============================================================================ 
527+   // Global Queue Lookup Helpers 
528+   // ============================================================================ 
529+   private  GlobalQueue  getGlobalQueue (String  globalId ) {
530+     GlobalQueue  node  = AvlTree .get (globalMap , globalId , GLOBAL_QUEUE_KEY_COMPARATOR );
531+     if  (node  != null ) {
532+       return  node ;
533+     }
534+     node  = new  GlobalQueue (globalId , locking .getGlobalLock (globalId ));
535+     globalMap  = AvlTree .insert (globalMap , node );
536+     return  node ;
537+   }
538+ 
539+   private  void  removeGlobalQueue (String  globalId ) {
540+     globalMap  = AvlTree .remove (globalMap , globalId , GLOBAL_QUEUE_KEY_COMPARATOR );
541+     locking .removeGlobalLock (globalId );
542+   }
543+ 
544+   private  void  tryCleanupGlobalQueue (String  globalId , Procedure <?> procedure ) {
545+     schedLock ();
546+     try  {
547+       GlobalQueue  queue  = AvlTree .get (globalMap , globalId , GLOBAL_QUEUE_KEY_COMPARATOR );
548+       if  (queue  == null ) {
549+         return ;
550+       }
551+ 
552+       final  LockAndQueue  lock  = locking .getGlobalLock (globalId );
553+       if  (queue .isEmpty () && lock .tryExclusiveLock (procedure )) {
554+         removeFromRunQueue (globalRunQueue , queue ,
555+           () -> "clean up global queue after "  + procedure  + " completed" );
556+         removeGlobalQueue (globalId );
557+       }
558+     } finally  {
559+       schedUnlock ();
560+     }
561+   }
562+ 
563+   private  static  boolean  isGlobalProcedure (Procedure <?> proc ) {
564+     return  proc  instanceof  GlobalProcedureInterface ;
565+   }
566+ 
567+   private  static  String  getGlobalId (Procedure <?> proc ) {
568+     return  ((GlobalProcedureInterface ) proc ).getGlobalId ();
569+   }
570+ 
505571  // ============================================================================ 
506572  // Table Locking Helpers 
507573  // ============================================================================ 
@@ -1006,6 +1072,51 @@ public void wakeMetaExclusiveLock(Procedure<?> procedure) {
10061072    }
10071073  }
10081074
1075+   // ============================================================================ 
1076+   // Global Locking Helpers 
1077+   // ============================================================================ 
1078+   /** 
1079+    * Try to acquire the share lock on global. 
1080+    * @see #wakeGlobalExclusiveLock(Procedure, String) 
1081+    * @param procedure the procedure trying to acquire the lock 
1082+    * @return true if the procedure has to wait for global to be available 
1083+    */ 
1084+   public  boolean  waitGlobalExclusiveLock (Procedure <?> procedure , String  globalId ) {
1085+     schedLock ();
1086+     try  {
1087+       final  LockAndQueue  lock  = locking .getGlobalLock (globalId );
1088+       if  (lock .tryExclusiveLock (procedure )) {
1089+         removeFromRunQueue (globalRunQueue , getGlobalQueue (globalId ),
1090+           () -> procedure  + " held shared lock" );
1091+         return  false ;
1092+       }
1093+       waitProcedure (lock , procedure );
1094+       logLockedResource (LockedResourceType .GLOBAL , HConstants .EMPTY_STRING );
1095+       return  true ;
1096+     } finally  {
1097+       schedUnlock ();
1098+     }
1099+   }
1100+ 
1101+   /** 
1102+    * Wake the procedures waiting for global. 
1103+    * @see #waitGlobalExclusiveLock(Procedure, String) 
1104+    * @param procedure the procedure releasing the lock 
1105+    */ 
1106+   public  void  wakeGlobalExclusiveLock (Procedure <?> procedure , String  globalId ) {
1107+     schedLock ();
1108+     try  {
1109+       final  LockAndQueue  lock  = locking .getGlobalLock (globalId );
1110+       lock .releaseExclusiveLock (procedure );
1111+       addToRunQueue (globalRunQueue , getGlobalQueue (globalId ),
1112+         () -> procedure  + " released shared lock" );
1113+       int  waitingCount  = wakeWaitingProcedures (lock );
1114+       wakePollIfNeeded (waitingCount );
1115+     } finally  {
1116+       schedUnlock ();
1117+     }
1118+   }
1119+ 
10091120  /** 
10101121   * For debugging. Expensive. 
10111122   */ 
0 commit comments