2222import java .util .List ;
2323import java .util .function .Function ;
2424import java .util .function .Supplier ;
25+ import org .apache .hadoop .hbase .HConstants ;
2526import org .apache .hadoop .hbase .ServerName ;
2627import org .apache .hadoop .hbase .TableExistsException ;
2728import org .apache .hadoop .hbase .TableName ;
@@ -95,16 +96,20 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
9596 (n , k ) -> n .compareKey ((String ) k );
9697 private final static AvlKeyComparator <MetaQueue > META_QUEUE_KEY_COMPARATOR =
9798 (n , k ) -> n .compareKey ((TableName ) k );
99+ private final static AvlKeyComparator <GlobalQueue > GLOBAL_QUEUE_KEY_COMPARATOR =
100+ (n , k ) -> n .compareKey ((String ) k );
98101
99102 private final FairQueue <ServerName > serverRunQueue = new FairQueue <>();
100103 private final FairQueue <TableName > tableRunQueue = new FairQueue <>();
101104 private final FairQueue <String > peerRunQueue = new FairQueue <>();
102105 private final FairQueue <TableName > metaRunQueue = new FairQueue <>();
106+ private final FairQueue <String > globalRunQueue = new FairQueue <>();
103107
104108 private final ServerQueue [] serverBuckets = new ServerQueue [128 ];
105109 private TableQueue tableMap = null ;
106110 private PeerQueue peerMap = null ;
107111 private MetaQueue metaMap = null ;
112+ private GlobalQueue globalMap = null ;
108113
109114 private final SchemaLocking locking ;
110115
@@ -128,6 +133,8 @@ protected void enqueue(final Procedure proc, final boolean addFront) {
128133 doAdd (serverRunQueue , getServerQueue (spi .getServerName (), spi ), proc , addFront );
129134 } else if (isPeerProcedure (proc )) {
130135 doAdd (peerRunQueue , getPeerQueue (getPeerId (proc )), proc , addFront );
136+ } else if (isGlobalProcedure (proc )) {
137+ doAdd (globalRunQueue , getGlobalQueue (getGlobalId (proc )), proc , addFront );
131138 } else {
132139 // TODO: at the moment we only have Table and Server procedures
133140 // if you are implementing a non-table/non-server procedure, you have two options: create
@@ -163,14 +170,19 @@ private <T extends Comparable<T>> void doAdd(FairQueue<T> fairq, Queue<T> queue,
163170
164171 @ Override
165172 protected boolean queueHasRunnables () {
166- return metaRunQueue .hasRunnables () || tableRunQueue .hasRunnables ()
167- || serverRunQueue .hasRunnables () || peerRunQueue .hasRunnables ();
173+ return globalRunQueue .hasRunnables () || metaRunQueue .hasRunnables ()
174+ || tableRunQueue .hasRunnables () || serverRunQueue .hasRunnables ()
175+ || peerRunQueue .hasRunnables ();
168176 }
169177
170178 @ Override
171179 protected Procedure dequeue () {
172- // meta procedure is always the first priority
173- Procedure <?> pollResult = doPoll (metaRunQueue );
180+ // pull global first
181+ Procedure <?> pollResult = doPoll (globalRunQueue );
182+ // then meta procedure
183+ if (pollResult == null ) {
184+ pollResult = doPoll (metaRunQueue );
185+ }
174186 // For now, let server handling have precedence over table handling; presumption is that it
175187 // is more important handling crashed servers than it is running the
176188 // enabling/disabling tables, etc.
@@ -268,6 +280,14 @@ private void clearQueue() {
268280 clear (peerMap , peerRunQueue , PEER_QUEUE_KEY_COMPARATOR );
269281 peerMap = null ;
270282
283+ // Remove Meta
284+ clear (metaMap , metaRunQueue , META_QUEUE_KEY_COMPARATOR );
285+ metaMap = null ;
286+
287+ // Remove Global
288+ clear (globalMap , globalRunQueue , GLOBAL_QUEUE_KEY_COMPARATOR );
289+ globalMap = null ;
290+
271291 assert size () == 0 : "expected queue size to be 0, got " + size ();
272292 }
273293
@@ -300,6 +320,7 @@ protected int queueSize() {
300320 count += queueSize (tableMap );
301321 count += queueSize (peerMap );
302322 count += queueSize (metaMap );
323+ count += queueSize (globalMap );
303324 return count ;
304325 }
305326
@@ -502,6 +523,51 @@ private static boolean isMetaProcedure(Procedure<?> proc) {
502523 return proc instanceof MetaProcedureInterface ;
503524 }
504525
526+ // ============================================================================
527+ // Global Queue Lookup Helpers
528+ // ============================================================================
529+ private GlobalQueue getGlobalQueue (String globalId ) {
530+ GlobalQueue node = AvlTree .get (globalMap , globalId , GLOBAL_QUEUE_KEY_COMPARATOR );
531+ if (node != null ) {
532+ return node ;
533+ }
534+ node = new GlobalQueue (globalId , locking .getGlobalLock (globalId ));
535+ globalMap = AvlTree .insert (globalMap , node );
536+ return node ;
537+ }
538+
539+ private void removeGlobalQueue (String globalId ) {
540+ globalMap = AvlTree .remove (globalMap , globalId , GLOBAL_QUEUE_KEY_COMPARATOR );
541+ locking .removeGlobalLock (globalId );
542+ }
543+
544+ private void tryCleanupGlobalQueue (String globalId , Procedure <?> procedure ) {
545+ schedLock ();
546+ try {
547+ GlobalQueue queue = AvlTree .get (globalMap , globalId , GLOBAL_QUEUE_KEY_COMPARATOR );
548+ if (queue == null ) {
549+ return ;
550+ }
551+
552+ final LockAndQueue lock = locking .getGlobalLock (globalId );
553+ if (queue .isEmpty () && lock .tryExclusiveLock (procedure )) {
554+ removeFromRunQueue (globalRunQueue , queue ,
555+ () -> "clean up global queue after " + procedure + " completed" );
556+ removeGlobalQueue (globalId );
557+ }
558+ } finally {
559+ schedUnlock ();
560+ }
561+ }
562+
563+ private static boolean isGlobalProcedure (Procedure <?> proc ) {
564+ return proc instanceof GlobalProcedureInterface ;
565+ }
566+
567+ private static String getGlobalId (Procedure <?> proc ) {
568+ return ((GlobalProcedureInterface ) proc ).getGlobalId ();
569+ }
570+
505571 // ============================================================================
506572 // Table Locking Helpers
507573 // ============================================================================
@@ -1006,6 +1072,51 @@ public void wakeMetaExclusiveLock(Procedure<?> procedure) {
10061072 }
10071073 }
10081074
1075+ // ============================================================================
1076+ // Global Locking Helpers
1077+ // ============================================================================
1078+ /**
1079+ * Try to acquire the share lock on global.
1080+ * @see #wakeGlobalExclusiveLock(Procedure, String)
1081+ * @param procedure the procedure trying to acquire the lock
1082+ * @return true if the procedure has to wait for global to be available
1083+ */
1084+ public boolean waitGlobalExclusiveLock (Procedure <?> procedure , String globalId ) {
1085+ schedLock ();
1086+ try {
1087+ final LockAndQueue lock = locking .getGlobalLock (globalId );
1088+ if (lock .tryExclusiveLock (procedure )) {
1089+ removeFromRunQueue (globalRunQueue , getGlobalQueue (globalId ),
1090+ () -> procedure + " held shared lock" );
1091+ return false ;
1092+ }
1093+ waitProcedure (lock , procedure );
1094+ logLockedResource (LockedResourceType .GLOBAL , HConstants .EMPTY_STRING );
1095+ return true ;
1096+ } finally {
1097+ schedUnlock ();
1098+ }
1099+ }
1100+
1101+ /**
1102+ * Wake the procedures waiting for global.
1103+ * @see #waitGlobalExclusiveLock(Procedure, String)
1104+ * @param procedure the procedure releasing the lock
1105+ */
1106+ public void wakeGlobalExclusiveLock (Procedure <?> procedure , String globalId ) {
1107+ schedLock ();
1108+ try {
1109+ final LockAndQueue lock = locking .getGlobalLock (globalId );
1110+ lock .releaseExclusiveLock (procedure );
1111+ addToRunQueue (globalRunQueue , getGlobalQueue (globalId ),
1112+ () -> procedure + " released shared lock" );
1113+ int waitingCount = wakeWaitingProcedures (lock );
1114+ wakePollIfNeeded (waitingCount );
1115+ } finally {
1116+ schedUnlock ();
1117+ }
1118+ }
1119+
10091120 /**
10101121 * For debugging. Expensive.
10111122 */
0 commit comments