@@ -162,7 +162,12 @@ protected static class Cluster {
162162 int [] initialRegionIndexToServerIndex ; //regionIndex -> serverIndex (initial cluster state)
163163 int [] regionIndexToTableIndex ; //regionIndex -> tableIndex
164164 int [][] numRegionsPerServerPerTable ; //serverIndex -> tableIndex -> # regions
165- int [] numMaxRegionsPerTable ; //tableIndex -> max number of regions in a single RS
165+ int [] numRegionsPerTable ; // tableIndex -> region count
166+ double [] meanRegionsPerTable ; // mean region count per table
167+ double regionSkewByTable ; // skew on RS per by table
168+ double minRegionSkewByTable ; // min skew on RS per by table
169+ double maxRegionSkewByTable ; // max skew on RS per by table
170+
166171 int [] regionIndexToPrimaryIndex ; //regionIndex -> regionIndex of the primary
167172 boolean hasRegionReplicas = false ; //whether there is regions with replicas
168173
@@ -370,28 +375,37 @@ protected Cluster(
370375
371376 numTables = tables .size ();
372377 numRegionsPerServerPerTable = new int [numServers ][numTables ];
378+ numRegionsPerTable = new int [numTables ];
373379
374380 for (int i = 0 ; i < numServers ; i ++) {
375381 for (int j = 0 ; j < numTables ; j ++) {
376382 numRegionsPerServerPerTable [i ][j ] = 0 ;
377383 }
378384 }
379385
386+ for (int i = 0 ; i < numTables ; i ++) {
387+ numRegionsPerTable [i ] = 0 ;
388+ }
389+
380390 for (int i =0 ; i < regionIndexToServerIndex .length ; i ++) {
381391 if (regionIndexToServerIndex [i ] >= 0 ) {
382392 numRegionsPerServerPerTable [regionIndexToServerIndex [i ]][regionIndexToTableIndex [i ]]++;
393+ numRegionsPerTable [regionIndexToTableIndex [i ]]++;
383394 }
384395 }
385396
386- numMaxRegionsPerTable = new int [numTables ];
387- for (int [] aNumRegionsPerServerPerTable : numRegionsPerServerPerTable ) {
388- for (tableIndex = 0 ; tableIndex < aNumRegionsPerServerPerTable .length ; tableIndex ++) {
389- if (aNumRegionsPerServerPerTable [tableIndex ] > numMaxRegionsPerTable [tableIndex ]) {
390- numMaxRegionsPerTable [tableIndex ] = aNumRegionsPerServerPerTable [tableIndex ];
391- }
392- }
397+ // Avoid repeated computation for planning
398+ meanRegionsPerTable = new double [numTables ];
399+ maxRegionSkewByTable = 0 ;
400+ minRegionSkewByTable = 0 ;
401+ for (int i = 0 ; i < numTables ; i ++) {
402+ meanRegionsPerTable [i ] = Double .valueOf (numRegionsPerTable [i ]) / numServers ;
403+ minRegionSkewByTable += Cluster .getMinSkew (numRegionsPerTable [i ], numServers );
404+ maxRegionSkewByTable += Cluster .getMaxSkew (numRegionsPerTable [i ], numServers );
393405 }
394406
407+ computeRegionSkewPerTable ();
408+
395409 for (int i = 0 ; i < regions .length ; i ++) {
396410 RegionInfo info = regions [i ];
397411 if (RegionReplicaUtil .isDefaultReplica (info )) {
@@ -516,6 +530,53 @@ public boolean serverHasTooFewRegions(int server) {
516530 return numRegions < minLoad ;
517531 }
518532
533+ /**
534+ * Return the min skew of distribution
535+ */
536+ public static double getMinSkew (double total , double numServers ) {
537+ double mean = total / numServers ;
538+ // It's possible that there aren't enough regions to go around
539+ double min ;
540+ if (numServers > total ) {
541+ min = ((numServers - total ) * mean + (1 - mean ) * total ) ;
542+ } else {
543+ // Some will have 1 more than everything else.
544+ int numHigh = (int ) (total - (Math .floor (mean ) * numServers ));
545+ int numLow = (int ) (numServers - numHigh );
546+ min = numHigh * (Math .ceil (mean ) - mean ) + numLow * (mean - Math .floor (mean ));
547+ }
548+ return min ;
549+ }
550+
551+ /**
552+ * Return the max deviation of distribution
553+ * Compute max as if all region servers had 0 and one had the sum of all costs. This must be
554+ * a zero sum cost for this to make sense.
555+ */
556+ public static double getMaxSkew (double total , double numServers ) {
557+ double mean = total / numServers ;
558+ return (total - mean ) + (numServers - 1 ) * mean ;
559+ }
560+
561+ /**
562+ * Scale the value between 0 and 1.
563+ *
564+ * @param min Min value
565+ * @param max The Max value
566+ * @param value The value to be scaled.
567+ * @return The scaled value.
568+ */
569+ public static double scale (double min , double max , double value ) {
570+ if (max <= min || value <= min ) {
571+ return 0 ;
572+ }
573+ if ((max - min ) == 0 ) {
574+ return 0 ;
575+ }
576+
577+ return Math .max (0d , Math .min (1d , (value - min ) / (max - min )));
578+ }
579+
519580 /**
520581 * Retrieves and lazily initializes a field storing the locality of
521582 * every region/server combination
@@ -573,6 +634,21 @@ public int getRegionSizeMB(int region) {
573634 return regionLoads [region ].getLast ().getStorefileSizeMB ();
574635 }
575636
637+ /**
638+ * Recompute the region skew during init or plan of moves.
639+ */
640+ private void computeRegionSkewPerTable () {
641+ // reinitialize for recomputation
642+ regionSkewByTable = 0 ;
643+
644+ for (int [] aNumRegionsPerServerPerTable : numRegionsPerServerPerTable ) {
645+ for (int tableIndex = 0 ; tableIndex < aNumRegionsPerServerPerTable .length ; tableIndex ++) {
646+ regionSkewByTable += Math .abs (aNumRegionsPerServerPerTable [tableIndex ]
647+ - meanRegionsPerTable [tableIndex ]);
648+ }
649+ }
650+ }
651+
576652 /**
577653 * Computes and caches the locality for each region/rack combinations,
578654 * as well as storing a mapping of region -> server and region -> rack such that server
@@ -828,22 +904,20 @@ void regionMoved(int region, int oldServer, int newServer) {
828904 int tableIndex = regionIndexToTableIndex [region ];
829905 if (oldServer >= 0 ) {
830906 numRegionsPerServerPerTable [oldServer ][tableIndex ]--;
907+ // update regionSkewPerTable for the move from old server
908+ regionSkewByTable +=
909+ Math .abs (numRegionsPerServerPerTable [oldServer ][tableIndex ]
910+ - meanRegionsPerTable [tableIndex ])
911+ - Math .abs (numRegionsPerServerPerTable [oldServer ][tableIndex ] + 1
912+ - meanRegionsPerTable [tableIndex ]);
831913 }
832914 numRegionsPerServerPerTable [newServer ][tableIndex ]++;
833-
834- //check whether this caused maxRegionsPerTable in the new Server to be updated
835- if (numRegionsPerServerPerTable [newServer ][tableIndex ] > numMaxRegionsPerTable [tableIndex ]) {
836- numMaxRegionsPerTable [tableIndex ] = numRegionsPerServerPerTable [newServer ][tableIndex ];
837- } else if (oldServer >= 0 && (numRegionsPerServerPerTable [oldServer ][tableIndex ] + 1 )
838- == numMaxRegionsPerTable [tableIndex ]) {
839- //recompute maxRegionsPerTable since the previous value was coming from the old server
840- numMaxRegionsPerTable [tableIndex ] = 0 ;
841- for (int [] aNumRegionsPerServerPerTable : numRegionsPerServerPerTable ) {
842- if (aNumRegionsPerServerPerTable [tableIndex ] > numMaxRegionsPerTable [tableIndex ]) {
843- numMaxRegionsPerTable [tableIndex ] = aNumRegionsPerServerPerTable [tableIndex ];
844- }
845- }
846- }
915+ // update regionSkewPerTable for the move to new server
916+ regionSkewByTable +=
917+ Math .abs (numRegionsPerServerPerTable [newServer ][tableIndex ]
918+ - meanRegionsPerTable [tableIndex ])
919+ - Math .abs (numRegionsPerServerPerTable [newServer ][tableIndex ] - 1
920+ - meanRegionsPerTable [tableIndex ]);
847921
848922 // update for servers
849923 int primary = regionIndexToPrimaryIndex [region ];
@@ -1013,7 +1087,7 @@ public String toString() {
10131087 .append (Arrays .toString (serverIndicesSortedByRegionCount ))
10141088 .append (", regionsPerServer=" ).append (Arrays .deepToString (regionsPerServer ));
10151089
1016- desc .append (", numMaxRegionsPerTable =" ).append (Arrays . toString ( numMaxRegionsPerTable ) )
1090+ desc .append (", regionSkewByTable =" ).append (regionSkewByTable )
10171091 .append (", numRegions=" ).append (numRegions ).append (", numServers=" ).append (numServers )
10181092 .append (", numTables=" ).append (numTables ).append (", numMovedRegions=" )
10191093 .append (numMovedRegions ).append ('}' );
0 commit comments