Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.hadoop.hbase.mapreduce;

import java.io.IOException;
import java.lang.reflect.Method;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.DoNotRetryIOException;
Expand Down Expand Up @@ -61,7 +60,6 @@ public class TableRecordReaderImpl {
private ImmutableBytesWritable key = null;
private Result value = null;
private TaskAttemptContext context = null;
private Method getCounter = null;
private long numRestarts = 0;
private long numStale = 0;
private long timestamp;
Expand Down Expand Up @@ -97,25 +95,6 @@ public void restart(byte[] firstRow) throws IOException {
}
}

/**
* In new mapreduce APIs, TaskAttemptContext has two getCounter methods
* Check if getCounter(String, String) method is available.
* @return The getCounter method or null if not available.
*/
protected static Method retrieveGetCounterWithStringsParams(TaskAttemptContext context)
throws IOException {
Method m = null;
try {
m = context.getClass().getMethod("getCounter",
new Class [] {String.class, String.class});
} catch (SecurityException e) {
throw new IOException("Failed test for getCounter", e);
} catch (NoSuchMethodException e) {
// Ignore
}
return m;
}

/**
* Sets the HBase table.
* @param htable The table to scan.
Expand Down Expand Up @@ -144,7 +123,6 @@ public void initialize(InputSplit inputsplit,
InterruptedException {
if (context != null) {
this.context = context;
getCounter = retrieveGetCounterWithStringsParams(context);
}
restart(scan.getStartRow());
}
Expand Down Expand Up @@ -212,8 +190,7 @@ public boolean nextKeyValue() throws IOException, InterruptedException {
rowcount ++;
if (rowcount >= logPerRowCount) {
long now = System.currentTimeMillis();
LOG.info("Mapper took " + (now-timestamp)
+ "ms to process " + rowcount + " rows");
LOG.info("Mapper took {}ms to process {} rows", (now - timestamp), rowcount);
timestamp = now;
rowcount = 0;
}
Expand Down Expand Up @@ -265,8 +242,7 @@ public boolean nextKeyValue() throws IOException, InterruptedException {
updateCounters();
if (logScannerActivity) {
long now = System.currentTimeMillis();
LOG.info("Mapper took " + (now-timestamp)
+ "ms to process " + rowcount + " rows");
LOG.info("Mapper took {}ms to process {} rows", (now - timestamp), rowcount);
LOG.info(ioe.toString(), ioe);
String lastRow = lastSuccessfulRow == null ?
"null" : Bytes.toStringBinary(lastSuccessfulRow);
Expand All @@ -282,36 +258,40 @@ public boolean nextKeyValue() throws IOException, InterruptedException {
* If hbase runs on old version of mapreduce, it won't be able to get
* access to counters and TableRecorderReader can't update counter values.
*/
private void updateCounters() throws IOException {
private void updateCounters() {
ScanMetrics scanMetrics = scanner.getScanMetrics();
if (scanMetrics == null) {
return;
}

updateCounters(scanMetrics, numRestarts, getCounter, context, numStale);
updateCounters(scanMetrics, numRestarts, context, numStale);
}

protected static void updateCounters(ScanMetrics scanMetrics, long numScannerRestarts,
Method getCounter, TaskAttemptContext context, long numStale) {
TaskAttemptContext context, long numStale) {
// we can get access to counters only if hbase uses new mapreduce APIs
if (getCounter == null) {
if (context == null) {
return;
}

try {
for (Map.Entry<String, Long> entry:scanMetrics.getMetricsMap().entrySet()) {
Counter ct = (Counter)getCounter.invoke(context,
HBASE_COUNTER_GROUP_NAME, entry.getKey());

ct.increment(entry.getValue());
for (Map.Entry<String, Long> entry : scanMetrics.getMetricsMap().entrySet()) {
Counter counter = context.getCounter(HBASE_COUNTER_GROUP_NAME, entry.getKey());
if (counter != null) {
counter.increment(entry.getValue());
}
}
if (numScannerRestarts != 0L) {
Counter counter = context.getCounter(HBASE_COUNTER_GROUP_NAME, "NUM_SCANNER_RESTARTS");
if (counter != null) {
counter.increment(numScannerRestarts);
}
}
if (numStale != 0L) {
Counter counter = context.getCounter(HBASE_COUNTER_GROUP_NAME, "NUM_SCAN_RESULTS_STALE");
if (counter != null) {
counter.increment(numStale);
}
}
((Counter) getCounter.invoke(context, HBASE_COUNTER_GROUP_NAME,
"NUM_SCANNER_RESTARTS")).increment(numScannerRestarts);
((Counter) getCounter.invoke(context, HBASE_COUNTER_GROUP_NAME,
"NUM_SCAN_RESULTS_STALE")).increment(numStale);
} catch (Exception e) {
LOG.debug("can't update counter." + StringUtils.stringifyException(e));
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.fs.Path;
Expand Down Expand Up @@ -138,13 +137,11 @@ static class TableSnapshotRegionRecordReader extends
private TableSnapshotInputFormatImpl.RecordReader delegate =
new TableSnapshotInputFormatImpl.RecordReader();
private TaskAttemptContext context;
private Method getCounter;

@Override
public void initialize(InputSplit split, TaskAttemptContext context) throws IOException,
InterruptedException {
this.context = context;
getCounter = TableRecordReaderImpl.retrieveGetCounterWithStringsParams(context);
delegate.initialize(
((TableSnapshotRegionSplit) split).delegate,
context.getConfiguration());
Expand All @@ -156,7 +153,7 @@ public boolean nextKeyValue() throws IOException, InterruptedException {
if (result) {
ScanMetrics scanMetrics = delegate.getScanner().getScanMetrics();
if (scanMetrics != null && context != null) {
TableRecordReaderImpl.updateCounters(scanMetrics, 0, getCounter, context, 0);
TableRecordReaderImpl.updateCounters(scanMetrics, 0, context, 0);
}
}
return result;
Expand Down