@@ -219,10 +219,22 @@ protected static class RunResult implements Comparable<RunResult> {
219219    public  RunResult (long  duration , Histogram  hist ) {
220220      this .duration  = duration ;
221221      this .hist  = hist ;
222+       numbOfReplyOverThreshold  = 0 ;
223+       numOfReplyFromReplica  = 0 ;
224+     }
225+ 
226+     public  RunResult (long  duration , long  numbOfReplyOverThreshold , long  numOfReplyFromReplica ,
227+       Histogram  hist ) {
228+       this .duration  = duration ;
229+       this .hist  = hist ;
230+       this .numbOfReplyOverThreshold  = numbOfReplyOverThreshold ;
231+       this .numOfReplyFromReplica  = numOfReplyFromReplica ;
222232    }
223233
224234    public  final  long  duration ;
225235    public  final  Histogram  hist ;
236+     public  final  long  numbOfReplyOverThreshold ;
237+     public  final  long  numOfReplyFromReplica ;
226238
227239    @ Override 
228240    public  String  toString () {
@@ -492,6 +504,10 @@ public void setStatus(final String msg) throws IOException {
492504          });
493505          LOG .info ("Finished "  + Thread .currentThread ().getName () + " in "  + run .duration  +
494506            "ms over "  + threadOpts .perClientRunRows  + " rows" );
507+           if  (opts .latencyThreshold  > 0 ) {
508+             LOG .info ("Number of replies over latency threshold "  + opts .latencyThreshold  +
509+               "(ms) is "  + run .numbOfReplyOverThreshold );
510+           }
495511          return  run ;
496512        }
497513      });
@@ -512,10 +528,12 @@ public void setStatus(final String msg) throws IOException {
512528    long  total  = 0 ;
513529    float  avgLatency  = 0  ;
514530    float  avgTPS  = 0 ;
531+     long  replicaWins  = 0 ;
515532    for  (RunResult  result  : results ) {
516533      total  += result .duration ;
517534      avgLatency  += result .hist .getSnapshot ().getMean ();
518535      avgTPS  += opts .perClientRunRows  * 1.0f  / result .duration ;
536+       replicaWins  += result .numOfReplyFromReplica ;
519537    }
520538    avgTPS  *= 1000 ; // ms to second 
521539    avgLatency  = avgLatency  / results .length ;
@@ -525,12 +543,15 @@ public void setStatus(final String msg) throws IOException {
525543      + "\t Avg: "  + (total  / results .length ) + "ms" );
526544    LOG .info ("[ Avg latency (us)]\t "  + Math .round (avgLatency ));
527545    LOG .info ("[ Avg TPS/QPS]\t "  + Math .round (avgTPS ) + "\t  row per second" );
546+     if  (opts .replicas  > 1 ) {
547+       LOG .info ("[results from replica regions] "  + replicaWins );
548+     }
549+ 
528550    for  (int  i  = 0 ; i  < opts .connCount ; i ++) {
529551      cons [i ].close ();
530552      asyncCons [i ].close ();
531553    }
532554
533- 
534555    return  results ;
535556  }
536557
@@ -706,6 +727,7 @@ static class TestOptions {
706727    int  columns  = 1 ;
707728    int  families  = 1 ;
708729    int  caching  = 30 ;
730+     int  latencyThreshold  = 0 ; // in millsecond 
709731    boolean  addColumns  = true ;
710732    MemoryCompactionPolicy  inMemoryCompaction  =
711733        MemoryCompactionPolicy .valueOf (
@@ -741,6 +763,7 @@ public TestOptions(TestOptions that) {
741763      this .useTags  = that .useTags ;
742764      this .noOfTags  = that .noOfTags ;
743765      this .reportLatency  = that .reportLatency ;
766+       this .latencyThreshold  = that .latencyThreshold ;
744767      this .multiGet  = that .multiGet ;
745768      this .multiPut  = that .multiPut ;
746769      this .inMemoryCF  = that .inMemoryCF ;
@@ -1130,6 +1153,7 @@ private static long nextRandomSeed() {
11301153
11311154    private  String  testName ;
11321155    private  Histogram  latencyHistogram ;
1156+     private  Histogram  replicaLatencyHistogram ;
11331157    private  Histogram  valueSizeHistogram ;
11341158    private  Histogram  rpcCallsHistogram ;
11351159    private  Histogram  remoteRpcCallsHistogram ;
@@ -1138,6 +1162,8 @@ private static long nextRandomSeed() {
11381162    private  Histogram  bytesInResultsHistogram ;
11391163    private  Histogram  bytesInRemoteResultsHistogram ;
11401164    private  RandomDistribution .Zipf  zipf ;
1165+     private  long  numOfReplyOverLatencyThreshold  = 0 ;
1166+     private  long  numOfReplyFromReplica  = 0 ;
11411167
11421168    /** 
11431169     * Note that all subclasses of this class must provide a public constructor 
@@ -1175,13 +1201,28 @@ int getValueLength(final Random r) {
11751201    }
11761202
11771203    void  updateValueSize (final  Result  [] rs ) throws  IOException  {
1178-       if  (rs  == null  || !isRandomValueSize ()) return ;
1179-       for  (Result  r : rs ) updateValueSize (r );
1204+       updateValueSize (rs , 0 );
1205+     }
1206+ 
1207+     void  updateValueSize (final  Result  [] rs , final  long  latency ) throws  IOException  {
1208+       if  (rs  == null  || (latency  == 0 )) return ;
1209+       for  (Result  r : rs ) updateValueSize (r , latency );
11801210    }
11811211
11821212    void  updateValueSize (final  Result  r ) throws  IOException  {
1183-       if  (r  == null  || !isRandomValueSize ()) return ;
1213+       updateValueSize (r , 0 );
1214+     }
1215+ 
1216+     void  updateValueSize (final  Result  r , final  long  latency ) throws  IOException  {
1217+       if  (r  == null  || (latency  == 0 )) return ;
11841218      int  size  = 0 ;
1219+       // update replicaHistogram 
1220+       if  (r .isStale ()) {
1221+         replicaLatencyHistogram .update (latency  / 1000 );
1222+         numOfReplyFromReplica  ++;
1223+       }
1224+       if  (!isRandomValueSize ()) return ;
1225+ 
11851226      for  (CellScanner  scanner  = r .cellScanner (); scanner .advance ();) {
11861227        size  += scanner .current ().getValueLength ();
11871228      }
@@ -1245,6 +1286,10 @@ public Histogram getLatencyHistogram() {
12451286    void  testSetup () throws  IOException  {
12461287      // test metrics 
12471288      latencyHistogram  = YammerHistogramUtils .newHistogram (new  UniformReservoir (1024  * 500 ));
1289+       // If it is a replica test, set up histogram for replica. 
1290+       if  (opts .replicas  > 1 ) {
1291+         replicaLatencyHistogram  = YammerHistogramUtils .newHistogram (new  UniformReservoir (1024  * 500 ));
1292+       }
12481293      valueSizeHistogram  = YammerHistogramUtils .newHistogram (new  UniformReservoir (1024  * 500 ));
12491294      // scan metrics 
12501295      rpcCallsHistogram  = YammerHistogramUtils .newHistogram (new  UniformReservoir (1024  * 500 ));
@@ -1268,6 +1313,10 @@ void testTakedown() throws IOException {
12681313        status .setStatus ("Test : "  + testName  + ", Thread : "  + Thread .currentThread ().getName ());
12691314        status .setStatus ("Latency (us) : "  + YammerHistogramUtils .getHistogramReport (
12701315            latencyHistogram ));
1316+         if  (opts .replicas  > 1 ) {
1317+           status .setStatus ("Latency (us) from Replica Regions: "  +
1318+             YammerHistogramUtils .getHistogramReport (replicaLatencyHistogram ));
1319+         }
12711320        status .setStatus ("Num measures (latency) : "  + latencyHistogram .getCount ());
12721321        status .setStatus (YammerHistogramUtils .getPrettyHistogramReport (latencyHistogram ));
12731322        if  (valueSizeHistogram .getCount () > 0 ) {
@@ -1349,15 +1398,19 @@ void testTimed() throws IOException, InterruptedException {
13491398          long  startTime  = System .nanoTime ();
13501399          boolean  requestSent  = false ;
13511400          try  (TraceScope  scope  = TraceUtil .createTrace ("test row" );){
1352-             requestSent  = testRow (i );
1401+             requestSent  = testRow (i ,  startTime );
13531402          }
13541403          if  ( (i  - startRow ) > opts .measureAfter ) {
13551404            // If multiget or multiput is enabled, say set to 10, testRow() returns immediately 
13561405            // first 9 times and sends the actual get request in the 10th iteration. 
13571406            // We should only set latency when actual request is sent because otherwise 
13581407            // it turns out to be 0. 
13591408            if  (requestSent ) {
1360-               latencyHistogram .update ((System .nanoTime () - startTime ) / 1000 );
1409+               long  latency  = (System .nanoTime () - startTime ) / 1000 ;
1410+               latencyHistogram .update (latency );
1411+               if  ((opts .latencyThreshold  > 0 ) && (latency  / 1000  >= opts .latencyThreshold )) {
1412+                 numOfReplyOverLatencyThreshold  ++;
1413+               }
13611414            }
13621415            if  (status  != null  && i  > 0  && (i  % getReportingPeriod ()) == 0 ) {
13631416              status .setStatus (generateStatus (startRow , i , lastRow ));
@@ -1389,7 +1442,7 @@ public String getShortValueSizeReport() {
13891442     *         False if not, multiGet and multiPut e.g., the rows are sent 
13901443     *         to server only if enough gets/puts are gathered. 
13911444     */ 
1392-     abstract  boolean  testRow (final  int  i ) throws  IOException , InterruptedException ;
1445+     abstract  boolean  testRow (final  int  i ,  final   long   startTime ) throws  IOException , InterruptedException ;
13931446  }
13941447
13951448  static  abstract  class  Test  extends  TestBase  {
@@ -1460,7 +1513,7 @@ static class AsyncRandomReadTest extends AsyncTableTest {
14601513    }
14611514
14621515    @ Override 
1463-     boolean  testRow (final  int  i ) throws  IOException , InterruptedException  {
1516+     boolean  testRow (final  int  i ,  final   long   startTime ) throws  IOException , InterruptedException  {
14641517      if  (opts .randomSleep  > 0 ) {
14651518        Thread .sleep (rd .nextInt (opts .randomSleep ));
14661519      }
@@ -1569,7 +1622,7 @@ void testTakedown() throws IOException {
15691622    }
15701623
15711624    @ Override 
1572-     boolean  testRow (final  int  i ) throws  IOException  {
1625+     boolean  testRow (final  int  i ,  final   long   startTime ) throws  IOException  {
15731626      if  (this .testScanner  == null ) {
15741627        Scan  scan  =
15751628            new  Scan ().withStartRow (format (opts .startRow )).setCaching (opts .caching )
@@ -1603,7 +1656,7 @@ static class AsyncSequentialReadTest extends AsyncTableTest {
16031656    }
16041657
16051658    @ Override 
1606-     boolean  testRow (final  int  i ) throws  IOException , InterruptedException  {
1659+     boolean  testRow (final  int  i ,  final   long   startTime ) throws  IOException , InterruptedException  {
16071660      Get  get  = new  Get (format (i ));
16081661      for  (int  family  = 0 ; family  < opts .families ; family ++) {
16091662        byte [] familyName  = Bytes .toBytes (FAMILY_NAME_BASE  + family );
@@ -1645,7 +1698,7 @@ protected byte[] generateRow(final int i) {
16451698
16461699    @ Override 
16471700    @ SuppressWarnings ("ReturnValueIgnored" )
1648-     boolean  testRow (final  int  i ) throws  IOException , InterruptedException  {
1701+     boolean  testRow (final  int  i ,  final   long   startTime ) throws  IOException , InterruptedException  {
16491702      byte [] row  = generateRow (i );
16501703      Put  put  = new  Put (row );
16511704      for  (int  family  = 0 ; family  < opts .families ; family ++) {
@@ -1720,7 +1773,7 @@ static class RandomSeekScanTest extends TableTest {
17201773    }
17211774
17221775    @ Override 
1723-     boolean  testRow (final  int  i ) throws  IOException  {
1776+     boolean  testRow (final  int  i ,  final   long   startTime ) throws  IOException  {
17241777      Scan  scan  = new  Scan ().withStartRow (getRandomRow (this .rand , opts .totalRows ))
17251778          .setCaching (opts .caching ).setCacheBlocks (opts .cacheBlocks )
17261779          .setAsyncPrefetch (opts .asyncPrefetch ).setReadType (opts .scanReadType )
@@ -1768,7 +1821,7 @@ static abstract class RandomScanWithRangeTest extends TableTest {
17681821    }
17691822
17701823    @ Override 
1771-     boolean  testRow (final  int  i ) throws  IOException  {
1824+     boolean  testRow (final  int  i ,  final   long   startTime ) throws  IOException  {
17721825      Pair <byte [], byte []> startAndStopRow  = getStartAndStopRow ();
17731826      Scan  scan  = new  Scan ().withStartRow (startAndStopRow .getFirst ())
17741827          .withStopRow (startAndStopRow .getSecond ()).setCaching (opts .caching )
@@ -1871,6 +1924,7 @@ static class RandomReadTest extends TableTest {
18711924    private  final  Consistency  consistency ;
18721925    private  ArrayList <Get > gets ;
18731926    private  Random  rd  = new  Random ();
1927+     private  long  numOfReplyFromReplica  = 0 ;
18741928
18751929    RandomReadTest (Connection  con , TestOptions  options , Status  status ) {
18761930      super (con , options , status );
@@ -1882,7 +1936,7 @@ static class RandomReadTest extends TableTest {
18821936    }
18831937
18841938    @ Override 
1885-     boolean  testRow (final  int  i ) throws  IOException , InterruptedException  {
1939+     boolean  testRow (final  int  i ,  final   long   startTime ) throws  IOException , InterruptedException  {
18861940      if  (opts .randomSleep  > 0 ) {
18871941        Thread .sleep (rd .nextInt (opts .randomSleep ));
18881942      }
@@ -1907,13 +1961,24 @@ boolean testRow(final int i) throws IOException, InterruptedException {
19071961        this .gets .add (get );
19081962        if  (this .gets .size () == opts .multiGet ) {
19091963          Result  [] rs  = this .table .get (this .gets );
1910-           updateValueSize (rs );
1964+           if  (opts .replicas  > 1 ) {
1965+             long  latency  = System .nanoTime () - startTime ;
1966+             updateValueSize (rs , latency );
1967+           } else  {
1968+             updateValueSize (rs );
1969+           }
19111970          this .gets .clear ();
19121971        } else  {
19131972          return  false ;
19141973        }
19151974      } else  {
1916-         updateValueSize (this .table .get (get ));
1975+         if  (opts .replicas  > 1 ) {
1976+           Result  r  = this .table .get (get );
1977+           long  latency  = System .nanoTime () - startTime ;
1978+           updateValueSize (r , latency );
1979+         } else  {
1980+           updateValueSize (this .table .get (get ));
1981+         }
19171982      }
19181983      return  true ;
19191984    }
@@ -1964,7 +2029,7 @@ void testTakedown() throws IOException {
19642029
19652030
19662031    @ Override 
1967-     boolean  testRow (final  int  i ) throws  IOException  {
2032+     boolean  testRow (final  int  i ,  final   long   startTime ) throws  IOException  {
19682033      if  (this .testScanner  == null ) {
19692034        Scan  scan  = new  Scan ().withStartRow (format (opts .startRow )).setCaching (opts .caching )
19702035            .setCacheBlocks (opts .cacheBlocks ).setAsyncPrefetch (opts .asyncPrefetch )
@@ -2027,7 +2092,7 @@ static class IncrementTest extends CASTableTest {
20272092    }
20282093
20292094    @ Override 
2030-     boolean  testRow (final  int  i ) throws  IOException  {
2095+     boolean  testRow (final  int  i ,  final   long   startTime ) throws  IOException  {
20312096      Increment  increment  = new  Increment (format (i ));
20322097      // unlike checkAndXXX tests, which make most sense to do on a single value, 
20332098      // if multiple families are specified for an increment test we assume it is 
@@ -2047,7 +2112,7 @@ static class AppendTest extends CASTableTest {
20472112    }
20482113
20492114    @ Override 
2050-     boolean  testRow (final  int  i ) throws  IOException  {
2115+     boolean  testRow (final  int  i ,  final   long   startTime ) throws  IOException  {
20512116      byte  [] bytes  = format (i );
20522117      Append  append  = new  Append (bytes );
20532118      // unlike checkAndXXX tests, which make most sense to do on a single value, 
@@ -2068,7 +2133,7 @@ static class CheckAndMutateTest extends CASTableTest {
20682133    }
20692134
20702135    @ Override 
2071-     boolean  testRow (final  int  i ) throws  IOException  {
2136+     boolean  testRow (final  int  i ,  final   long   startTime ) throws  IOException  {
20722137      final  byte  [] bytes  = format (i );
20732138      // checkAndXXX tests operate on only a single value 
20742139      // Put a known value so when we go to check it, it is there. 
@@ -2089,7 +2154,7 @@ static class CheckAndPutTest extends CASTableTest {
20892154    }
20902155
20912156    @ Override 
2092-     boolean  testRow (final  int  i ) throws  IOException  {
2157+     boolean  testRow (final  int  i ,  final   long   startTime ) throws  IOException  {
20932158      final  byte  [] bytes  = format (i );
20942159      // checkAndXXX tests operate on only a single value 
20952160      // Put a known value so when we go to check it, it is there. 
@@ -2108,7 +2173,7 @@ static class CheckAndDeleteTest extends CASTableTest {
21082173    }
21092174
21102175    @ Override 
2111-     boolean  testRow (final  int  i ) throws  IOException  {
2176+     boolean  testRow (final  int  i ,  final   long   startTime ) throws  IOException  {
21122177      final  byte  [] bytes  = format (i );
21132178      // checkAndXXX tests operate on only a single value 
21142179      // Put a known value so when we go to check it, it is there. 
@@ -2129,7 +2194,7 @@ static class SequentialReadTest extends TableTest {
21292194    }
21302195
21312196    @ Override 
2132-     boolean  testRow (final  int  i ) throws  IOException  {
2197+     boolean  testRow (final  int  i ,  final   long   startTime ) throws  IOException  {
21332198      Get  get  = new  Get (format (i ));
21342199      for  (int  family  = 0 ; family  < opts .families ; family ++) {
21352200        byte [] familyName  = Bytes .toBytes (FAMILY_NAME_BASE  + family );
@@ -2167,7 +2232,7 @@ protected byte[] generateRow(final int i) {
21672232    }
21682233
21692234    @ Override 
2170-     boolean  testRow (final  int  i ) throws  IOException  {
2235+     boolean  testRow (final  int  i ,  final   long   startTime ) throws  IOException  {
21712236      byte [] row  = generateRow (i );
21722237      Put  put  = new  Put (row );
21732238      for  (int  family  = 0 ; family  < opts .families ; family ++) {
@@ -2224,7 +2289,7 @@ static class FilteredScanTest extends TableTest {
22242289    }
22252290
22262291    @ Override 
2227-     boolean  testRow (int  i ) throws  IOException  {
2292+     boolean  testRow (int  i ,  final   long   startTime ) throws  IOException  {
22282293      byte [] value  = generateData (this .rand , getValueLength (this .rand ));
22292294      Scan  scan  = constructScan (value );
22302295      ResultScanner  scanner  = null ;
@@ -2368,7 +2433,8 @@ static RunResult runOneClient(final Class<? extends TestBase> cmd, Configuration
23682433      " ("  + calculateMbps ((int )(opts .perClientRunRows  * opts .sampleRate ), totalElapsedTime ,
23692434          getAverageValueLength (opts ), opts .families , opts .columns ) + ")" );
23702435
2371-     return  new  RunResult (totalElapsedTime , t .getLatencyHistogram ());
2436+     return  new  RunResult (totalElapsedTime , t .numOfReplyOverLatencyThreshold ,
2437+       t .numOfReplyFromReplica , t .getLatencyHistogram ());
23722438  }
23732439
23742440  private  static  int  getAverageValueLength (final  TestOptions  opts ) {
@@ -2434,6 +2500,8 @@ protected static void printUsage(final String shortName, final String message) {
24342500    System .err .println (" traceRate       Enable HTrace spans. Initiate tracing every N rows. "  +
24352501      "Default: 0" );
24362502    System .err .println (" latency         Set to report operation latencies. Default: False" );
2503+     System .err .println (" latencyThreshold  Set to report number of operations with latency "  +
2504+       "over lantencyThreshold, unit in millisecond, default 0" );
24372505    System .err .println (" measureAfter    Start to measure the latency once 'measureAfter'"  +
24382506        " rows have been treated. Default: 0" );
24392507    System .err .println (" valueSize       Pass value size to use: Default: " 
@@ -2631,6 +2699,12 @@ static TestOptions parseOpts(Queue<String> args) {
26312699        continue ;
26322700      }
26332701
2702+       final  String  latencyThreshold  = "--latencyThreshold=" ;
2703+       if  (cmd .startsWith (latencyThreshold )) {
2704+         opts .latencyThreshold  = Integer .parseInt (cmd .substring (latencyThreshold .length ()));
2705+         continue ;
2706+       }
2707+ 
26342708      final  String  latency  = "--latency" ;
26352709      if  (cmd .startsWith (latency )) {
26362710        opts .reportLatency  = true ;
0 commit comments