1818package org .apache .hadoop .hbase .mapreduce ;
1919
2020import java .io .IOException ;
21- import java .lang .reflect .Method ;
2221import java .util .Map ;
2322import org .apache .hadoop .conf .Configuration ;
2423import org .apache .hadoop .hbase .DoNotRetryIOException ;
@@ -60,7 +59,6 @@ public class TableRecordReaderImpl {
6059 private ImmutableBytesWritable key = null ;
6160 private Result value = null ;
6261 private TaskAttemptContext context = null ;
63- private Method getCounter = null ;
6462 private long numRestarts = 0 ;
6563 private long numStale = 0 ;
6664 private long timestamp ;
@@ -96,25 +94,6 @@ public void restart(byte[] firstRow) throws IOException {
9694 }
9795 }
9896
99- /**
100- * In new mapreduce APIs, TaskAttemptContext has two getCounter methods
101- * Check if getCounter(String, String) method is available.
102- * @return The getCounter method or null if not available.
103- */
104- protected static Method retrieveGetCounterWithStringsParams (TaskAttemptContext context )
105- throws IOException {
106- Method m = null ;
107- try {
108- m = context .getClass ().getMethod ("getCounter" ,
109- new Class [] {String .class , String .class });
110- } catch (SecurityException e ) {
111- throw new IOException ("Failed test for getCounter" , e );
112- } catch (NoSuchMethodException e ) {
113- // Ignore
114- }
115- return m ;
116- }
117-
11897 /**
11998 * Sets the HBase table.
12099 *
@@ -145,7 +124,6 @@ public void initialize(InputSplit inputsplit,
145124 InterruptedException {
146125 if (context != null ) {
147126 this .context = context ;
148- getCounter = retrieveGetCounterWithStringsParams (context );
149127 }
150128 restart (scan .getStartRow ());
151129 }
@@ -213,8 +191,7 @@ public boolean nextKeyValue() throws IOException, InterruptedException {
213191 rowcount ++;
214192 if (rowcount >= logPerRowCount ) {
215193 long now = System .currentTimeMillis ();
216- LOG .info ("Mapper took " + (now -timestamp )
217- + "ms to process " + rowcount + " rows" );
194+ LOG .info ("Mapper took {}ms to process {} rows" , (now - timestamp ), rowcount );
218195 timestamp = now ;
219196 rowcount = 0 ;
220197 }
@@ -266,8 +243,7 @@ public boolean nextKeyValue() throws IOException, InterruptedException {
266243 updateCounters ();
267244 if (logScannerActivity ) {
268245 long now = System .currentTimeMillis ();
269- LOG .info ("Mapper took " + (now -timestamp )
270- + "ms to process " + rowcount + " rows" );
246+ LOG .info ("Mapper took {}ms to process {} rows" , (now - timestamp ), rowcount );
271247 LOG .info (ioe .toString (), ioe );
272248 String lastRow = lastSuccessfulRow == null ?
273249 "null" : Bytes .toStringBinary (lastSuccessfulRow );
@@ -283,36 +259,40 @@ public boolean nextKeyValue() throws IOException, InterruptedException {
283259 * If hbase runs on old version of mapreduce, it won't be able to get
284260 * access to counters and TableRecorderReader can't update counter values.
285261 */
286- private void updateCounters () throws IOException {
262+ private void updateCounters () {
287263 ScanMetrics scanMetrics = scanner .getScanMetrics ();
288264 if (scanMetrics == null ) {
289265 return ;
290266 }
291267
292- updateCounters (scanMetrics , numRestarts , getCounter , context , numStale );
268+ updateCounters (scanMetrics , numRestarts , context , numStale );
293269 }
294270
295271 protected static void updateCounters (ScanMetrics scanMetrics , long numScannerRestarts ,
296- Method getCounter , TaskAttemptContext context , long numStale ) {
272+ TaskAttemptContext context , long numStale ) {
297273 // we can get access to counters only if hbase uses new mapreduce APIs
298- if (getCounter == null ) {
274+ if (context == null ) {
299275 return ;
300276 }
301277
302- try {
303- for (Map .Entry <String , Long > entry :scanMetrics .getMetricsMap ().entrySet ()) {
304- Counter ct = (Counter )getCounter .invoke (context ,
305- HBASE_COUNTER_GROUP_NAME , entry .getKey ());
306-
307- ct .increment (entry .getValue ());
278+ for (Map .Entry <String , Long > entry : scanMetrics .getMetricsMap ().entrySet ()) {
279+ Counter counter = context .getCounter (HBASE_COUNTER_GROUP_NAME , entry .getKey ());
280+ if (counter != null ) {
281+ counter .increment (entry .getValue ());
282+ }
283+ }
284+ if (numScannerRestarts != 0L ) {
285+ Counter counter = context .getCounter (HBASE_COUNTER_GROUP_NAME , "NUM_SCANNER_RESTARTS" );
286+ if (counter != null ) {
287+ counter .increment (numScannerRestarts );
288+ }
289+ }
290+ if (numStale != 0L ) {
291+ Counter counter = context .getCounter (HBASE_COUNTER_GROUP_NAME , "NUM_SCAN_RESULTS_STALE" );
292+ if (counter != null ) {
293+ counter .increment (numStale );
294+ }
308295 }
309- ((Counter ) getCounter .invoke (context , HBASE_COUNTER_GROUP_NAME ,
310- "NUM_SCANNER_RESTARTS" )).increment (numScannerRestarts );
311- ((Counter ) getCounter .invoke (context , HBASE_COUNTER_GROUP_NAME ,
312- "NUM_SCAN_RESULTS_STALE" )).increment (numStale );
313- } catch (Exception e ) {
314- LOG .debug ("can't update counter." + StringUtils .stringifyException (e ));
315- }
316296 }
317297
318298 /**
0 commit comments