From d7e79a85bca61e080d7b5987cfb6fe0117255206 Mon Sep 17 00:00:00 2001 From: Yechao Chen Date: Fri, 7 Aug 2020 12:56:49 +0800 Subject: [PATCH 1/3] HBASE-24831 Avoid invoke Counter using reflection in SnapshotInputFormat --- .../mapreduce/TableRecordReaderImpl.java | 54 +++++-------------- .../mapreduce/TableSnapshotInputFormat.java | 5 +- 2 files changed, 14 insertions(+), 45 deletions(-) diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableRecordReaderImpl.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableRecordReaderImpl.java index 21589fd0e0d0..65aee32426a4 100644 --- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableRecordReaderImpl.java +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableRecordReaderImpl.java @@ -18,7 +18,6 @@ package org.apache.hadoop.hbase.mapreduce; import java.io.IOException; -import java.lang.reflect.Method; import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.DoNotRetryIOException; @@ -30,7 +29,6 @@ import org.apache.hadoop.hbase.client.metrics.ScanMetrics; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.mapreduce.Counter; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.util.StringUtils; @@ -61,7 +59,6 @@ public class TableRecordReaderImpl { private ImmutableBytesWritable key = null; private Result value = null; private TaskAttemptContext context = null; - private Method getCounter = null; private long numRestarts = 0; private long numStale = 0; private long timestamp; @@ -97,25 +94,6 @@ public void restart(byte[] firstRow) throws IOException { } } - /** - * In new mapreduce APIs, TaskAttemptContext has two getCounter methods - * Check if getCounter(String, String) method is available. - * @return The getCounter method or null if not available. - */ - protected static Method retrieveGetCounterWithStringsParams(TaskAttemptContext context) - throws IOException { - Method m = null; - try { - m = context.getClass().getMethod("getCounter", - new Class [] {String.class, String.class}); - } catch (SecurityException e) { - throw new IOException("Failed test for getCounter", e); - } catch (NoSuchMethodException e) { - // Ignore - } - return m; - } - /** * Sets the HBase table. * @param htable The table to scan. @@ -144,7 +122,6 @@ public void initialize(InputSplit inputsplit, InterruptedException { if (context != null) { this.context = context; - getCounter = retrieveGetCounterWithStringsParams(context); } restart(scan.getStartRow()); } @@ -282,35 +259,30 @@ public boolean nextKeyValue() throws IOException, InterruptedException { * If hbase runs on old version of mapreduce, it won't be able to get * access to counters and TableRecorderReader can't update counter values. */ - private void updateCounters() throws IOException { + private void updateCounters() { ScanMetrics scanMetrics = scanner.getScanMetrics(); if (scanMetrics == null) { return; } - updateCounters(scanMetrics, numRestarts, getCounter, context, numStale); + updateCounters(scanMetrics, numRestarts, context, numStale); } protected static void updateCounters(ScanMetrics scanMetrics, long numScannerRestarts, - Method getCounter, TaskAttemptContext context, long numStale) { + TaskAttemptContext context, long numStale) { // we can get access to counters only if hbase uses new mapreduce APIs - if (getCounter == null) { + if (context == null) { return; } - - try { - for (Map.Entry entry:scanMetrics.getMetricsMap().entrySet()) { - Counter ct = (Counter)getCounter.invoke(context, - HBASE_COUNTER_GROUP_NAME, entry.getKey()); - - ct.increment(entry.getValue()); - } - ((Counter) getCounter.invoke(context, HBASE_COUNTER_GROUP_NAME, - "NUM_SCANNER_RESTARTS")).increment(numScannerRestarts); - ((Counter) getCounter.invoke(context, HBASE_COUNTER_GROUP_NAME, - "NUM_SCAN_RESULTS_STALE")).increment(numStale); - } catch (Exception e) { - LOG.debug("can't update counter." + StringUtils.stringifyException(e)); + for (Map.Entry entry : scanMetrics.getMetricsMap().entrySet()) { + context.getCounter(HBASE_COUNTER_GROUP_NAME, entry.getKey()).increment(entry.getValue()); + } + if (numScannerRestarts != 0L) { + context.getCounter(HBASE_COUNTER_GROUP_NAME, "NUM_SCANNER_RESTARTS") + .increment(numScannerRestarts); + } + if (numStale != 0L) { + context.getCounter(HBASE_COUNTER_GROUP_NAME, "NUM_SCAN_RESULTS_STALE").increment(numStale); } } diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormat.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormat.java index 2fbbb5180372..3ca6c0323688 100644 --- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormat.java +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormat.java @@ -21,7 +21,6 @@ import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; -import java.lang.reflect.Method; import java.util.ArrayList; import java.util.List; import org.apache.hadoop.fs.Path; @@ -138,13 +137,11 @@ static class TableSnapshotRegionRecordReader extends private TableSnapshotInputFormatImpl.RecordReader delegate = new TableSnapshotInputFormatImpl.RecordReader(); private TaskAttemptContext context; - private Method getCounter; @Override public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { this.context = context; - getCounter = TableRecordReaderImpl.retrieveGetCounterWithStringsParams(context); delegate.initialize( ((TableSnapshotRegionSplit) split).delegate, context.getConfiguration()); @@ -156,7 +153,7 @@ public boolean nextKeyValue() throws IOException, InterruptedException { if (result) { ScanMetrics scanMetrics = delegate.getScanner().getScanMetrics(); if (scanMetrics != null && context != null) { - TableRecordReaderImpl.updateCounters(scanMetrics, 0, getCounter, context, 0); + TableRecordReaderImpl.updateCounters(scanMetrics, 0, context, 0); } } return result; From 2246fac9a3540858a2ce90aa9ccd07862da94f3a Mon Sep 17 00:00:00 2001 From: Yechao Chen Date: Tue, 11 Aug 2020 09:23:22 +0800 Subject: [PATCH 2/3] HBASE-24831 Avoid invoke Counter using reflection in SnapshotInputFormat --- .../hadoop/hbase/mapreduce/TableRecordReaderImpl.java | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableRecordReaderImpl.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableRecordReaderImpl.java index 65aee32426a4..6227f98a5982 100644 --- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableRecordReaderImpl.java +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableRecordReaderImpl.java @@ -189,8 +189,7 @@ public boolean nextKeyValue() throws IOException, InterruptedException { rowcount ++; if (rowcount >= logPerRowCount) { long now = System.currentTimeMillis(); - LOG.info("Mapper took " + (now-timestamp) - + "ms to process " + rowcount + " rows"); + LOG.info("Mapper took {}ms to process {} rows", (now - timestamp), rowcount); timestamp = now; rowcount = 0; } @@ -242,8 +241,7 @@ public boolean nextKeyValue() throws IOException, InterruptedException { updateCounters(); if (logScannerActivity) { long now = System.currentTimeMillis(); - LOG.info("Mapper took " + (now-timestamp) - + "ms to process " + rowcount + " rows"); + LOG.info("Mapper took {}ms to process {} rows", (now - timestamp), rowcount); LOG.info(ioe.toString(), ioe); String lastRow = lastSuccessfulRow == null ? "null" : Bytes.toStringBinary(lastSuccessfulRow); From f7338baee417a8893e9fafc2aa4f8a0fe802d47e Mon Sep 17 00:00:00 2001 From: Yechao Chen Date: Tue, 25 Aug 2020 20:53:33 +0800 Subject: [PATCH 3/3] HBASE-24831 Avoid invoke Counter using reflection in SnapshotInputFormat --- .../mapreduce/TableRecordReaderImpl.java | 30 ++++++++++++------- 1 file changed, 20 insertions(+), 10 deletions(-) diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableRecordReaderImpl.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableRecordReaderImpl.java index 6227f98a5982..5242bd94c8cc 100644 --- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableRecordReaderImpl.java +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableRecordReaderImpl.java @@ -29,6 +29,7 @@ import org.apache.hadoop.hbase.client.metrics.ScanMetrics; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.mapreduce.Counter; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.util.StringUtils; @@ -272,16 +273,25 @@ protected static void updateCounters(ScanMetrics scanMetrics, long numScannerRes if (context == null) { return; } - for (Map.Entry entry : scanMetrics.getMetricsMap().entrySet()) { - context.getCounter(HBASE_COUNTER_GROUP_NAME, entry.getKey()).increment(entry.getValue()); - } - if (numScannerRestarts != 0L) { - context.getCounter(HBASE_COUNTER_GROUP_NAME, "NUM_SCANNER_RESTARTS") - .increment(numScannerRestarts); - } - if (numStale != 0L) { - context.getCounter(HBASE_COUNTER_GROUP_NAME, "NUM_SCAN_RESULTS_STALE").increment(numStale); - } + + for (Map.Entry entry : scanMetrics.getMetricsMap().entrySet()) { + Counter counter = context.getCounter(HBASE_COUNTER_GROUP_NAME, entry.getKey()); + if (counter != null) { + counter.increment(entry.getValue()); + } + } + if (numScannerRestarts != 0L) { + Counter counter = context.getCounter(HBASE_COUNTER_GROUP_NAME, "NUM_SCANNER_RESTARTS"); + if (counter != null) { + counter.increment(numScannerRestarts); + } + } + if (numStale != 0L) { + Counter counter = context.getCounter(HBASE_COUNTER_GROUP_NAME, "NUM_SCAN_RESULTS_STALE"); + if (counter != null) { + counter.increment(numStale); + } + } } /**