@@ -75,7 +75,7 @@ public class FavoredStochasticBalancer extends StochasticLoadBalancer implements
7575
7676 private static final Logger LOG = LoggerFactory .getLogger (FavoredStochasticBalancer .class );
7777
78- private FavoredNodesManager fnm ;
78+ protected FavoredNodesManager fnm ;
7979
8080 @ Override
8181 public void setFavoredNodesManager (FavoredNodesManager fnm ) {
@@ -85,8 +85,8 @@ public void setFavoredNodesManager(FavoredNodesManager fnm) {
8585 @ Override
8686 protected List <CandidateGenerator > createCandidateGenerators () {
8787 List <CandidateGenerator > fnPickers = new ArrayList <>(2 );
88- fnPickers .add (new FavoredNodeLoadPicker ());
89- fnPickers .add (new FavoredNodeLocalityPicker ());
88+ fnPickers .add (new FavoredNodeLoadPicker (fnm ));
89+ fnPickers .add (new FavoredNodeLocalityPicker (fnm ));
9090 return fnPickers ;
9191 }
9292
@@ -516,147 +516,6 @@ public void generateFavoredNodesForMergedRegion(RegionInfo merged, RegionInfo []
516516 updateFavoredNodesForRegion (merged , fnm .getFavoredNodes (mergeParents [0 ]));
517517 }
518518
519- /**
520- * Pick favored nodes with the highest locality for a region with lowest locality.
521- */
522- private class FavoredNodeLocalityPicker extends CandidateGenerator {
523-
524- @ Override
525- protected BalanceAction generate (BalancerClusterState cluster ) {
526-
527- int thisServer = pickRandomServer (cluster );
528- int thisRegion ;
529- if (thisServer == -1 ) {
530- LOG .trace ("Could not pick lowest local region server" );
531- return BalanceAction .NULL_ACTION ;
532- } else {
533- // Pick lowest local region on this server
534- thisRegion = pickLowestLocalRegionOnServer (cluster , thisServer );
535- }
536- if (thisRegion == -1 ) {
537- if (cluster .regionsPerServer [thisServer ].length > 0 ) {
538- LOG .trace ("Could not pick lowest local region even when region server held "
539- + cluster .regionsPerServer [thisServer ].length + " regions" );
540- }
541- return BalanceAction .NULL_ACTION ;
542- }
543-
544- RegionInfo hri = cluster .regions [thisRegion ];
545- List <ServerName > favoredNodes = fnm .getFavoredNodes (hri );
546- int otherServer ;
547- if (favoredNodes == null ) {
548- if (!FavoredNodesManager .isFavoredNodeApplicable (hri )) {
549- otherServer = pickOtherRandomServer (cluster , thisServer );
550- } else {
551- // No FN, ignore
552- LOG .trace ("Ignoring, no favored nodes for region: " + hri );
553- return BalanceAction .NULL_ACTION ;
554- }
555- } else {
556- // Pick other favored node with the highest locality
557- otherServer = getDifferentFavoredNode (cluster , favoredNodes , thisServer );
558- }
559- return getAction (thisServer , thisRegion , otherServer , -1 );
560- }
561-
562- private int getDifferentFavoredNode (BalancerClusterState cluster , List <ServerName > favoredNodes ,
563- int currentServer ) {
564- List <Integer > fnIndex = new ArrayList <>();
565- for (ServerName sn : favoredNodes ) {
566- if (cluster .serversToIndex .containsKey (sn .getAddress ())) {
567- fnIndex .add (cluster .serversToIndex .get (sn .getAddress ()));
568- }
569- }
570- float locality = 0 ;
571- int highestLocalRSIndex = -1 ;
572- for (Integer index : fnIndex ) {
573- if (index != currentServer ) {
574- float temp = cluster .localityPerServer [index ];
575- if (temp >= locality ) {
576- locality = temp ;
577- highestLocalRSIndex = index ;
578- }
579- }
580- }
581- return highestLocalRSIndex ;
582- }
583-
584- private int pickLowestLocalRegionOnServer (BalancerClusterState cluster , int server ) {
585- return cluster .getLowestLocalityRegionOnServer (server );
586- }
587- }
588-
589- /*
590- * This is like LoadCandidateGenerator, but we choose appropriate FN for the region on the
591- * most loaded server.
592- */
593- class FavoredNodeLoadPicker extends CandidateGenerator {
594-
595- @ Override
596- BalanceAction generate (BalancerClusterState cluster ) {
597- cluster .sortServersByRegionCount ();
598- int thisServer = pickMostLoadedServer (cluster );
599- int thisRegion = pickRandomRegion (cluster , thisServer , 0 );
600- RegionInfo hri = cluster .regions [thisRegion ];
601- int otherServer ;
602- List <ServerName > favoredNodes = fnm .getFavoredNodes (hri );
603- if (favoredNodes == null ) {
604- if (!FavoredNodesManager .isFavoredNodeApplicable (hri )) {
605- otherServer = pickLeastLoadedServer (cluster , thisServer );
606- } else {
607- return BalanceAction .NULL_ACTION ;
608- }
609- } else {
610- otherServer = pickLeastLoadedFNServer (cluster , favoredNodes , thisServer );
611- }
612- return getAction (thisServer , thisRegion , otherServer , -1 );
613- }
614-
615- private int pickLeastLoadedServer (final BalancerClusterState cluster , int thisServer ) {
616- Integer [] servers = cluster .serverIndicesSortedByRegionCount ;
617- int index ;
618- for (index = 0 ; index < servers .length ; index ++) {
619- if ((servers [index ] != null ) && servers [index ] != thisServer ) {
620- break ;
621- }
622- }
623- return servers [index ];
624- }
625-
626- private int pickLeastLoadedFNServer (final BalancerClusterState cluster ,
627- List <ServerName > favoredNodes , int currentServerIndex ) {
628- List <Integer > fnIndex = new ArrayList <>();
629- for (ServerName sn : favoredNodes ) {
630- if (cluster .serversToIndex .containsKey (sn .getAddress ())) {
631- fnIndex .add (cluster .serversToIndex .get (sn .getAddress ()));
632- }
633- }
634- int leastLoadedFN = -1 ;
635- int load = Integer .MAX_VALUE ;
636- for (Integer index : fnIndex ) {
637- if (index != currentServerIndex ) {
638- int temp = cluster .getNumRegions (index );
639- if (temp < load ) {
640- load = temp ;
641- leastLoadedFN = index ;
642- }
643- }
644- }
645- return leastLoadedFN ;
646- }
647-
648- private int pickMostLoadedServer (final BalancerClusterState cluster ) {
649- Integer [] servers = cluster .serverIndicesSortedByRegionCount ;
650- int index ;
651- for (index = servers .length - 1 ; index > 0 ; index --) {
652- if (servers [index ] != null ) {
653- break ;
654- }
655- }
656- return servers [index ];
657- }
658- }
659-
660519 /**
661520 * For all regions correctly assigned to favored nodes, we just use the stochastic balancer
662521 * implementation. For the misplaced regions, we assign a bogus server to it and AM takes care.
0 commit comments