@@ -168,7 +168,12 @@ protected static class Cluster {
168168 int [] initialRegionIndexToServerIndex ; //regionIndex -> serverIndex (initial cluster state)
169169 int [] regionIndexToTableIndex ; //regionIndex -> tableIndex
170170 int [][] numRegionsPerServerPerTable ; //serverIndex -> tableIndex -> # regions
171- int [] numMaxRegionsPerTable ; //tableIndex -> max number of regions in a single RS
171+ int [] numRegionsPerTable ; // tableIndex -> region count
172+ double [] meanRegionsPerTable ; // mean region count per table
173+ double regionSkewByTable ; // skew on RS per by table
174+ double minRegionSkewByTable ; // min skew on RS per by table
175+ double maxRegionSkewByTable ; // max skew on RS per by table
176+
172177 int [] regionIndexToPrimaryIndex ; //regionIndex -> regionIndex of the primary
173178 boolean hasRegionReplicas = false ; //whether there is regions with replicas
174179
@@ -376,28 +381,37 @@ protected Cluster(
376381
377382 numTables = tables .size ();
378383 numRegionsPerServerPerTable = new int [numServers ][numTables ];
384+ numRegionsPerTable = new int [numTables ];
379385
380386 for (int i = 0 ; i < numServers ; i ++) {
381387 for (int j = 0 ; j < numTables ; j ++) {
382388 numRegionsPerServerPerTable [i ][j ] = 0 ;
383389 }
384390 }
385391
392+ for (int i = 0 ; i < numTables ; i ++) {
393+ numRegionsPerTable [i ] = 0 ;
394+ }
395+
386396 for (int i =0 ; i < regionIndexToServerIndex .length ; i ++) {
387397 if (regionIndexToServerIndex [i ] >= 0 ) {
388398 numRegionsPerServerPerTable [regionIndexToServerIndex [i ]][regionIndexToTableIndex [i ]]++;
399+ numRegionsPerTable [regionIndexToTableIndex [i ]]++;
389400 }
390401 }
391402
392- numMaxRegionsPerTable = new int [numTables ];
393- for (int [] aNumRegionsPerServerPerTable : numRegionsPerServerPerTable ) {
394- for (tableIndex = 0 ; tableIndex < aNumRegionsPerServerPerTable .length ; tableIndex ++) {
395- if (aNumRegionsPerServerPerTable [tableIndex ] > numMaxRegionsPerTable [tableIndex ]) {
396- numMaxRegionsPerTable [tableIndex ] = aNumRegionsPerServerPerTable [tableIndex ];
397- }
398- }
403+ // Avoid repeated computation for planning
404+ meanRegionsPerTable = new double [numTables ];
405+ maxRegionSkewByTable = 0 ;
406+ minRegionSkewByTable = 0 ;
407+ for (int i = 0 ; i < numTables ; i ++) {
408+ meanRegionsPerTable [i ] = Double .valueOf (numRegionsPerTable [i ]) / numServers ;
409+ minRegionSkewByTable += Cluster .getMinSkew (numRegionsPerTable [i ], numServers );
410+ maxRegionSkewByTable += Cluster .getMaxSkew (numRegionsPerTable [i ], numServers );
399411 }
400412
413+ computeRegionSkewPerTable ();
414+
401415 for (int i = 0 ; i < regions .length ; i ++) {
402416 RegionInfo info = regions [i ];
403417 if (RegionReplicaUtil .isDefaultReplica (info )) {
@@ -522,6 +536,51 @@ public boolean serverHasTooFewRegions(int server) {
522536 return numRegions < minLoad ;
523537 }
524538
539+ /**
540+ * Return the min skew of distribution
541+ */
542+ public static double getMinSkew (double total , double numServers ) {
543+ double mean = total / numServers ;
544+ // It's possible that there aren't enough regions to go around
545+ double min ;
546+ if (numServers > total ) {
547+ min = ((numServers - total ) * mean + (1 - mean ) * total ) ;
548+ } else {
549+ // Some will have 1 more than everything else.
550+ int numHigh = (int ) (total - (Math .floor (mean ) * numServers ));
551+ int numLow = (int ) (numServers - numHigh );
552+ min = numHigh * (Math .ceil (mean ) - mean ) + numLow * (mean - Math .floor (mean ));
553+ }
554+ return min ;
555+ }
556+
557+ /**
558+ * Return the max deviation of distribution
559+ * Compute max as if all region servers had 0 and one had the sum of all costs. This must be
560+ * a zero sum cost for this to make sense.
561+ */
562+ public static double getMaxSkew (double total , double numServers ) {
563+ double mean = total / numServers ;
564+ return (total - mean ) + (numServers - 1 ) * mean ;
565+ }
566+
567+ /**
568+ * Scale the value between 0 and 1.
569+ *
570+ * @param min Min value
571+ * @param max The Max value
572+ * @param value The value to be scaled.
573+ * @return The scaled value.
574+ */
575+ public static double scale (double min , double max , double value ) {
576+ if (max <= min || value <= min ) {
577+ return 0 ;
578+ }
579+ if ((max - min ) == 0 ) return 0 ;
580+
581+ return Math .max (0d , Math .min (1d , (value - min ) / (max - min )));
582+ }
583+
525584 /**
526585 * Retrieves and lazily initializes a field storing the locality of
527586 * every region/server combination
@@ -579,6 +638,21 @@ public int getRegionSizeMB(int region) {
579638 return regionLoads [region ].getLast ().getStorefileSizeMB ();
580639 }
581640
641+ /**
642+ * Recompute the region skew during init or plan of moves.
643+ */
644+ private void computeRegionSkewPerTable () {
645+ // reinitialize for recomputation
646+ regionSkewByTable = 0 ;
647+
648+ for (int [] aNumRegionsPerServerPerTable : numRegionsPerServerPerTable ) {
649+ for (int tableIndex = 0 ; tableIndex < aNumRegionsPerServerPerTable .length ; tableIndex ++) {
650+ regionSkewByTable += Math .abs (aNumRegionsPerServerPerTable [tableIndex ]
651+ - meanRegionsPerTable [tableIndex ]);
652+ }
653+ }
654+ }
655+
582656 /**
583657 * Computes and caches the locality for each region/rack combinations,
584658 * as well as storing a mapping of region -> server and region -> rack such that server
@@ -834,22 +908,20 @@ void regionMoved(int region, int oldServer, int newServer) {
834908 int tableIndex = regionIndexToTableIndex [region ];
835909 if (oldServer >= 0 ) {
836910 numRegionsPerServerPerTable [oldServer ][tableIndex ]--;
911+ // update regionSkewPerTable for the move from old server
912+ regionSkewByTable +=
913+ Math .abs (numRegionsPerServerPerTable [oldServer ][tableIndex ]
914+ - meanRegionsPerTable [tableIndex ])
915+ - Math .abs (numRegionsPerServerPerTable [oldServer ][tableIndex ] + 1
916+ - meanRegionsPerTable [tableIndex ]);
837917 }
838918 numRegionsPerServerPerTable [newServer ][tableIndex ]++;
839-
840- //check whether this caused maxRegionsPerTable in the new Server to be updated
841- if (numRegionsPerServerPerTable [newServer ][tableIndex ] > numMaxRegionsPerTable [tableIndex ]) {
842- numMaxRegionsPerTable [tableIndex ] = numRegionsPerServerPerTable [newServer ][tableIndex ];
843- } else if (oldServer >= 0 && (numRegionsPerServerPerTable [oldServer ][tableIndex ] + 1 )
844- == numMaxRegionsPerTable [tableIndex ]) {
845- //recompute maxRegionsPerTable since the previous value was coming from the old server
846- numMaxRegionsPerTable [tableIndex ] = 0 ;
847- for (int [] aNumRegionsPerServerPerTable : numRegionsPerServerPerTable ) {
848- if (aNumRegionsPerServerPerTable [tableIndex ] > numMaxRegionsPerTable [tableIndex ]) {
849- numMaxRegionsPerTable [tableIndex ] = aNumRegionsPerServerPerTable [tableIndex ];
850- }
851- }
852- }
919+ // update regionSkewPerTable for the move to new server
920+ regionSkewByTable +=
921+ Math .abs (numRegionsPerServerPerTable [newServer ][tableIndex ]
922+ - meanRegionsPerTable [tableIndex ])
923+ - Math .abs (numRegionsPerServerPerTable [newServer ][tableIndex ] - 1
924+ - meanRegionsPerTable [tableIndex ]);
853925
854926 // update for servers
855927 int primary = regionIndexToPrimaryIndex [region ];
@@ -1019,7 +1091,7 @@ public String toString() {
10191091 .append (Arrays .toString (serverIndicesSortedByRegionCount ))
10201092 .append (", regionsPerServer=" ).append (Arrays .deepToString (regionsPerServer ));
10211093
1022- desc .append (", numMaxRegionsPerTable =" ).append (Arrays . toString ( numMaxRegionsPerTable ) )
1094+ desc .append (", regionSkewByTable =" ).append (regionSkewByTable )
10231095 .append (", numRegions=" ).append (numRegions ).append (", numServers=" ).append (numServers )
10241096 .append (", numTables=" ).append (numTables ).append (", numMovedRegions=" )
10251097 .append (numMovedRegions ).append ('}' );
0 commit comments