2626import java .util .HashMap ;
2727import java .util .List ;
2828import java .util .Map ;
29+ import java .util .Set ;
30+ import java .util .concurrent .TimeUnit ;
2931import org .agrona .collections .Hashing ;
3032import org .agrona .collections .Int2IntCounterMap ;
3133import org .apache .hadoop .hbase .HDFSBlocksDistribution ;
3234import org .apache .hadoop .hbase .ServerName ;
3335import org .apache .hadoop .hbase .client .RegionInfo ;
3436import org .apache .hadoop .hbase .client .RegionReplicaUtil ;
3537import org .apache .hadoop .hbase .master .RackManager ;
38+ import org .apache .hadoop .hbase .master .RegionPlan ;
3639import org .apache .hadoop .hbase .net .Address ;
3740import org .apache .hadoop .hbase .util .Pair ;
3841import org .apache .yetus .audience .InterfaceAudience ;
3942import org .slf4j .Logger ;
4043import org .slf4j .LoggerFactory ;
4144
45+ import org .apache .hbase .thirdparty .com .google .common .base .Supplier ;
46+ import org .apache .hbase .thirdparty .com .google .common .base .Suppliers ;
47+ import org .apache .hbase .thirdparty .com .google .common .collect .ImmutableList ;
48+
4249/**
4350 * An efficient array based implementation similar to ClusterState for keeping the status of the
4451 * cluster in terms of region assignment and distribution. LoadBalancers, such as
@@ -122,6 +129,14 @@ class BalancerClusterState {
122129 // Maps regionName -> oldServerName -> cache ratio of the region on the old server
123130 Map <String , Pair <ServerName , Float >> regionCacheRatioOnOldServerMap ;
124131
132+ private Supplier <List <Integer >> shuffledServerIndicesSupplier =
133+ Suppliers .memoizeWithExpiration (() -> {
134+ Collection <Integer > serverIndices = serversToIndex .values ();
135+ List <Integer > shuffledServerIndices = new ArrayList <>(serverIndices );
136+ Collections .shuffle (shuffledServerIndices );
137+ return shuffledServerIndices ;
138+ }, 5 , TimeUnit .SECONDS );
139+
125140 static class DefaultRackManager extends RackManager {
126141 @ Override
127142 public String getRack (ServerName server ) {
@@ -705,7 +720,41 @@ enum LocalityType {
705720 RACK
706721 }
707722
708- public void doAction (BalanceAction action ) {
723+ public List <RegionPlan > convertActionToPlans (BalanceAction action ) {
724+ switch (action .getType ()) {
725+ case NULL :
726+ break ;
727+ case ASSIGN_REGION :
728+ // FindBugs: Having the assert quietens FB BC_UNCONFIRMED_CAST warnings
729+ assert action instanceof AssignRegionAction : action .getClass ();
730+ AssignRegionAction ar = (AssignRegionAction ) action ;
731+ return ImmutableList .of (regionMoved (ar .getRegion (), -1 , ar .getServer ()));
732+ case MOVE_REGION :
733+ assert action instanceof MoveRegionAction : action .getClass ();
734+ MoveRegionAction mra = (MoveRegionAction ) action ;
735+ return ImmutableList
736+ .of (regionMoved (mra .getRegion (), mra .getFromServer (), mra .getToServer ()));
737+ case SWAP_REGIONS :
738+ assert action instanceof SwapRegionsAction : action .getClass ();
739+ SwapRegionsAction a = (SwapRegionsAction ) action ;
740+ return ImmutableList .of (regionMoved (a .getFromRegion (), a .getFromServer (), a .getToServer ()),
741+ regionMoved (a .getToRegion (), a .getToServer (), a .getFromServer ()));
742+ case MOVE_BATCH :
743+ assert action instanceof MoveBatchAction : action .getClass ();
744+ MoveBatchAction mba = (MoveBatchAction ) action ;
745+ List <RegionPlan > mbRegionPlans = new ArrayList <>();
746+ for (MoveRegionAction moveRegionAction : mba .getMoveActions ()) {
747+ mbRegionPlans .add (regionMoved (moveRegionAction .getRegion (),
748+ moveRegionAction .getFromServer (), moveRegionAction .getToServer ()));
749+ }
750+ return mbRegionPlans ;
751+ default :
752+ throw new RuntimeException ("Unknown action:" + action .getType ());
753+ }
754+ return Collections .emptyList ();
755+ }
756+
757+ public List <RegionPlan > doAction (BalanceAction action ) {
709758 switch (action .getType ()) {
710759 case NULL :
711760 break ;
@@ -715,30 +764,47 @@ public void doAction(BalanceAction action) {
715764 AssignRegionAction ar = (AssignRegionAction ) action ;
716765 regionsPerServer [ar .getServer ()] =
717766 addRegion (regionsPerServer [ar .getServer ()], ar .getRegion ());
718- regionMoved (ar .getRegion (), -1 , ar .getServer ());
719- break ;
767+ return ImmutableList .of (regionMoved (ar .getRegion (), -1 , ar .getServer ()));
720768 case MOVE_REGION :
721769 assert action instanceof MoveRegionAction : action .getClass ();
722770 MoveRegionAction mra = (MoveRegionAction ) action ;
723771 regionsPerServer [mra .getFromServer ()] =
724772 removeRegion (regionsPerServer [mra .getFromServer ()], mra .getRegion ());
725773 regionsPerServer [mra .getToServer ()] =
726774 addRegion (regionsPerServer [mra .getToServer ()], mra .getRegion ());
727- regionMoved ( mra . getRegion (), mra . getFromServer (), mra . getToServer ());
728- break ;
775+ return ImmutableList
776+ . of ( regionMoved ( mra . getRegion (), mra . getFromServer (), mra . getToServer ())) ;
729777 case SWAP_REGIONS :
730778 assert action instanceof SwapRegionsAction : action .getClass ();
731779 SwapRegionsAction a = (SwapRegionsAction ) action ;
732780 regionsPerServer [a .getFromServer ()] =
733781 replaceRegion (regionsPerServer [a .getFromServer ()], a .getFromRegion (), a .getToRegion ());
734782 regionsPerServer [a .getToServer ()] =
735783 replaceRegion (regionsPerServer [a .getToServer ()], a .getToRegion (), a .getFromRegion ());
736- regionMoved (a .getFromRegion (), a .getFromServer (), a .getToServer ());
737- regionMoved (a .getToRegion (), a .getToServer (), a .getFromServer ());
738- break ;
784+ return ImmutableList .of (regionMoved (a .getFromRegion (), a .getFromServer (), a .getToServer ()),
785+ regionMoved (a .getToRegion (), a .getToServer (), a .getFromServer ()));
786+ case MOVE_BATCH :
787+ assert action instanceof MoveBatchAction : action .getClass ();
788+ MoveBatchAction mba = (MoveBatchAction ) action ;
789+ List <RegionPlan > mbRegionPlans = new ArrayList <>();
790+ for (int serverIndex : mba .getServerToRegionsToRemove ().keySet ()) {
791+ Set <Integer > regionsToRemove = mba .getServerToRegionsToRemove ().get (serverIndex );
792+ regionsPerServer [serverIndex ] =
793+ removeRegions (regionsPerServer [serverIndex ], regionsToRemove );
794+ }
795+ for (int serverIndex : mba .getServerToRegionsToAdd ().keySet ()) {
796+ Set <Integer > regionsToAdd = mba .getServerToRegionsToAdd ().get (serverIndex );
797+ regionsPerServer [serverIndex ] = addRegions (regionsPerServer [serverIndex ], regionsToAdd );
798+ }
799+ for (MoveRegionAction moveRegionAction : mba .getMoveActions ()) {
800+ mbRegionPlans .add (regionMoved (moveRegionAction .getRegion (),
801+ moveRegionAction .getFromServer (), moveRegionAction .getToServer ()));
802+ }
803+ return mbRegionPlans ;
739804 default :
740- throw new RuntimeException ("Uknown action:" + action .getType ());
805+ throw new RuntimeException ("Unknown action:" + action .getType ());
741806 }
807+ return Collections .emptyList ();
742808 }
743809
744810 /**
@@ -822,7 +888,7 @@ void doAssignRegion(RegionInfo regionInfo, ServerName serverName) {
822888 doAction (new AssignRegionAction (region , server ));
823889 }
824890
825- void regionMoved (int region , int oldServer , int newServer ) {
891+ RegionPlan regionMoved (int region , int oldServer , int newServer ) {
826892 regionIndexToServerIndex [region ] = newServer ;
827893 if (initialRegionIndexToServerIndex [region ] == newServer ) {
828894 numMovedRegions --; // region moved back to original location
@@ -853,6 +919,11 @@ void regionMoved(int region, int oldServer, int newServer) {
853919 updateForLocation (serverIndexToRackIndex , regionsPerRack , colocatedReplicaCountsPerRack ,
854920 oldServer , newServer , primary , region );
855921 }
922+
923+ // old server name can be null
924+ ServerName oldServerName = oldServer == -1 ? null : servers [oldServer ];
925+
926+ return new RegionPlan (regions [region ], oldServerName , servers [newServer ]);
856927 }
857928
858929 /**
@@ -899,6 +970,48 @@ int[] addRegion(int[] regions, int regionIndex) {
899970 return newRegions ;
900971 }
901972
973+ int [] removeRegions (int [] regions , Set <Integer > regionIndicesToRemove ) {
974+ // Calculate the size of the new regions array
975+ int newSize = regions .length - regionIndicesToRemove .size ();
976+ if (newSize < 0 ) {
977+ throw new IllegalStateException (
978+ "Region indices mismatch: more regions to remove than in the regions array" );
979+ }
980+
981+ int [] newRegions = new int [newSize ];
982+ int newIndex = 0 ;
983+
984+ // Copy only the regions not in the removal set
985+ for (int region : regions ) {
986+ if (!regionIndicesToRemove .contains (region )) {
987+ newRegions [newIndex ++] = region ;
988+ }
989+ }
990+
991+ // If the newIndex is smaller than newSize, some regions were missing from the input array
992+ if (newIndex != newSize ) {
993+ throw new IllegalStateException ("Region indices mismatch: some regions in the removal "
994+ + "set were not found in the regions array" );
995+ }
996+
997+ return newRegions ;
998+ }
999+
1000+ int [] addRegions (int [] regions , Set <Integer > regionIndicesToAdd ) {
1001+ int [] newRegions = new int [regions .length + regionIndicesToAdd .size ()];
1002+
1003+ // Copy the existing regions to the new array
1004+ System .arraycopy (regions , 0 , newRegions , 0 , regions .length );
1005+
1006+ // Add the new regions at the end of the array
1007+ int newIndex = regions .length ;
1008+ for (int regionIndex : regionIndicesToAdd ) {
1009+ newRegions [newIndex ++] = regionIndex ;
1010+ }
1011+
1012+ return newRegions ;
1013+ }
1014+
9021015 int [] addRegionSorted (int [] regions , int regionIndex ) {
9031016 int [] newRegions = new int [regions .length + 1 ];
9041017 int i = 0 ;
@@ -998,6 +1111,10 @@ void setNumMovedRegions(int numMovedRegions) {
9981111 this .numMovedRegions = numMovedRegions ;
9991112 }
10001113
1114+ List <Integer > getShuffledServerIndices () {
1115+ return shuffledServerIndicesSupplier .get ();
1116+ }
1117+
10011118 @ Override
10021119 public String toString () {
10031120 StringBuilder desc = new StringBuilder ("Cluster={servers=[" );
0 commit comments