@@ -75,7 +75,7 @@ public class FavoredStochasticBalancer extends StochasticLoadBalancer implements
7575
7676  private  static  final  Logger  LOG  = LoggerFactory .getLogger (FavoredStochasticBalancer .class );
7777
78-   protected  FavoredNodesManager  fnm ;
78+   private  FavoredNodesManager  fnm ;
7979
8080  @ Override 
8181  public  void  setFavoredNodesManager (FavoredNodesManager  fnm ) {
@@ -85,8 +85,8 @@ public void setFavoredNodesManager(FavoredNodesManager fnm) {
8585  @ Override 
8686  protected  List <CandidateGenerator > createCandidateGenerators () {
8787    List <CandidateGenerator > fnPickers  = new  ArrayList <>(2 );
88-     fnPickers .add (new  FavoredNodeLoadPicker (fnm ));
89-     fnPickers .add (new  FavoredNodeLocalityPicker (fnm ));
88+     fnPickers .add (new  FavoredNodeLoadPicker ());
89+     fnPickers .add (new  FavoredNodeLocalityPicker ());
9090    return  fnPickers ;
9191  }
9292
@@ -516,6 +516,147 @@ public void generateFavoredNodesForMergedRegion(RegionInfo merged, RegionInfo []
516516    updateFavoredNodesForRegion (merged , fnm .getFavoredNodes (mergeParents [0 ]));
517517  }
518518
519+   /** 
520+    * Pick favored nodes with the highest locality for a region with lowest locality. 
521+    */ 
522+   private  class  FavoredNodeLocalityPicker  extends  CandidateGenerator  {
523+ 
524+     @ Override 
525+     protected  BalanceAction  generate (BalancerClusterState  cluster ) {
526+ 
527+       int  thisServer  = pickRandomServer (cluster );
528+       int  thisRegion ;
529+       if  (thisServer  == -1 ) {
530+         LOG .trace ("Could not pick lowest local region server" );
531+         return  BalanceAction .NULL_ACTION ;
532+       } else  {
533+         // Pick lowest local region on this server 
534+         thisRegion  = pickLowestLocalRegionOnServer (cluster , thisServer );
535+       }
536+       if  (thisRegion  == -1 ) {
537+         if  (cluster .regionsPerServer [thisServer ].length  > 0 ) {
538+           LOG .trace ("Could not pick lowest local region even when region server held " 
539+             + cluster .regionsPerServer [thisServer ].length  + " regions" );
540+         }
541+         return  BalanceAction .NULL_ACTION ;
542+       }
543+ 
544+       RegionInfo  hri  = cluster .regions [thisRegion ];
545+       List <ServerName > favoredNodes  = fnm .getFavoredNodes (hri );
546+       int  otherServer ;
547+       if  (favoredNodes  == null ) {
548+         if  (!FavoredNodesManager .isFavoredNodeApplicable (hri )) {
549+           otherServer  = pickOtherRandomServer (cluster , thisServer );
550+         } else  {
551+           // No FN, ignore 
552+           LOG .trace ("Ignoring, no favored nodes for region: "  + hri );
553+           return  BalanceAction .NULL_ACTION ;
554+         }
555+       } else  {
556+         // Pick other favored node with the highest locality 
557+         otherServer  = getDifferentFavoredNode (cluster , favoredNodes , thisServer );
558+       }
559+       return  getAction (thisServer , thisRegion , otherServer , -1 );
560+     }
561+ 
562+     private  int  getDifferentFavoredNode (BalancerClusterState  cluster , List <ServerName > favoredNodes ,
563+         int  currentServer ) {
564+       List <Integer > fnIndex  = new  ArrayList <>();
565+       for  (ServerName  sn  : favoredNodes ) {
566+         if  (cluster .serversToIndex .containsKey (sn .getAddress ())) {
567+           fnIndex .add (cluster .serversToIndex .get (sn .getAddress ()));
568+         }
569+       }
570+       float  locality  = 0 ;
571+       int  highestLocalRSIndex  = -1 ;
572+       for  (Integer  index  : fnIndex ) {
573+         if  (index  != currentServer ) {
574+           float  temp  = cluster .localityPerServer [index ];
575+           if  (temp  >= locality ) {
576+             locality  = temp ;
577+             highestLocalRSIndex  = index ;
578+           }
579+         }
580+       }
581+       return  highestLocalRSIndex ;
582+     }
583+ 
584+     private  int  pickLowestLocalRegionOnServer (BalancerClusterState  cluster , int  server ) {
585+       return  cluster .getLowestLocalityRegionOnServer (server );
586+     }
587+   }
588+ 
589+   /* 
590+    * This is like LoadCandidateGenerator, but we choose appropriate FN for the region on the 
591+    * most loaded server. 
592+    */ 
593+   class  FavoredNodeLoadPicker  extends  CandidateGenerator  {
594+ 
595+     @ Override 
596+     BalanceAction  generate (BalancerClusterState  cluster ) {
597+       cluster .sortServersByRegionCount ();
598+       int  thisServer  = pickMostLoadedServer (cluster );
599+       int  thisRegion  = pickRandomRegion (cluster , thisServer , 0 );
600+       RegionInfo  hri  = cluster .regions [thisRegion ];
601+       int  otherServer ;
602+       List <ServerName > favoredNodes  = fnm .getFavoredNodes (hri );
603+       if  (favoredNodes  == null ) {
604+         if  (!FavoredNodesManager .isFavoredNodeApplicable (hri )) {
605+           otherServer  = pickLeastLoadedServer (cluster , thisServer );
606+         } else  {
607+           return  BalanceAction .NULL_ACTION ;
608+         }
609+       } else  {
610+         otherServer  = pickLeastLoadedFNServer (cluster , favoredNodes , thisServer );
611+       }
612+       return  getAction (thisServer , thisRegion , otherServer , -1 );
613+     }
614+ 
615+     private  int  pickLeastLoadedServer (final  BalancerClusterState  cluster , int  thisServer ) {
616+       Integer [] servers  = cluster .serverIndicesSortedByRegionCount ;
617+       int  index ;
618+       for  (index  = 0 ; index  < servers .length  ; index ++) {
619+         if  ((servers [index ] != null ) && servers [index ] != thisServer ) {
620+           break ;
621+         }
622+       }
623+       return  servers [index ];
624+     }
625+ 
626+     private  int  pickLeastLoadedFNServer (final  BalancerClusterState  cluster ,
627+       List <ServerName > favoredNodes , int  currentServerIndex ) {
628+       List <Integer > fnIndex  = new  ArrayList <>();
629+       for  (ServerName  sn  : favoredNodes ) {
630+         if  (cluster .serversToIndex .containsKey (sn .getAddress ())) {
631+           fnIndex .add (cluster .serversToIndex .get (sn .getAddress ()));
632+         }
633+       }
634+       int  leastLoadedFN  = -1 ;
635+       int  load  = Integer .MAX_VALUE ;
636+       for  (Integer  index  : fnIndex ) {
637+         if  (index  != currentServerIndex ) {
638+           int  temp  = cluster .getNumRegions (index );
639+           if  (temp  < load ) {
640+             load  = temp ;
641+             leastLoadedFN  = index ;
642+           }
643+         }
644+       }
645+       return  leastLoadedFN ;
646+     }
647+ 
648+     private  int  pickMostLoadedServer (final  BalancerClusterState  cluster ) {
649+       Integer [] servers  = cluster .serverIndicesSortedByRegionCount ;
650+       int  index ;
651+       for  (index  = servers .length  - 1 ; index  > 0  ; index --) {
652+         if  (servers [index ] != null ) {
653+           break ;
654+         }
655+       }
656+       return  servers [index ];
657+     }
658+   }
659+ 
519660  /** 
520661   * For all regions correctly assigned to favored nodes, we just use the stochastic balancer 
521662   * implementation. For the misplaced regions, we assign a bogus server to it and AM takes care. 
0 commit comments