From 29764c7c5a60498bd6288e6ab959333933f7ee64 Mon Sep 17 00:00:00 2001 From: Josh Elser Date: Fri, 10 Sep 2021 16:24:13 -0400 Subject: [PATCH 1/2] HBASE-26273 Force ReadType.STREAM when the user does not explicitly set a ReadType on the Scan for a Snapshot-based Job HBase 2 moved over Scans to use PREAD by default instead of STREAM like HBase 1. In the context of a MapReduce job, we can generally expect that clients using the InputFormat (batch job) would be reading most of the data for a job. Cater to them, but still give users who want PREAD the ability to do so. --- .../TableSnapshotInputFormatImpl.java | 15 +++++++++ .../TestTableSnapshotInputFormat.java | 31 +++++++++++++++++++ 2 files changed, 46 insertions(+) diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatImpl.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatImpl.java index 22c19be7ce2a..3438afc3aa6f 100644 --- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatImpl.java +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatImpl.java @@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.client.RegionLocator; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Scan.ReadType; import org.apache.hadoop.hbase.client.TableDescriptor; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.regionserver.HRegion; @@ -128,6 +129,12 @@ public class TableSnapshotInputFormatImpl { public static final boolean SNAPSHOT_INPUTFORMAT_SCAN_METRICS_ENABLED_DEFAULT = true; + /** + * The {@link ReadType} which should be set on the {@link Scan} to read the HBase Snapshot, default STREAM. + */ + public static final String SNAPSHOT_INPUTFORMAT_SCANNER_READTYPE = "hbase.TableSnapshotinputFormat.scanner.readtype"; + public static final ReadType SNAPSHOT_INPUTFORMAT_SCANNER_READTYPE_DEFAULT = ReadType.STREAM; + /** * Implementation class for InputSplit logic common between mapred and mapreduce. */ @@ -382,6 +389,14 @@ public static Scan extractScanFromConf(Configuration conf) throws IOException { } else { throw new IllegalArgumentException("Unable to create scan"); } + + if (scan.getReadType() == ReadType.DEFAULT) { + LOG.info("Provided Scan has DEFAULT ReadType, updating STREAM for Snapshot-based InputFormat"); + // Update the "DEFAULT" ReadType to be "STREAM" to try to improve the default case. + scan.setReadType(conf.getEnum(SNAPSHOT_INPUTFORMAT_SCANNER_READTYPE, + SNAPSHOT_INPUTFORMAT_SCANNER_READTYPE_DEFAULT)); + } + return scan; } diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSnapshotInputFormat.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSnapshotInputFormat.java index 34e6b274dab7..6bb8cbf3e224 100644 --- a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSnapshotInputFormat.java +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSnapshotInputFormat.java @@ -22,6 +22,8 @@ import static org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl.SNAPSHOT_INPUTFORMAT_ROW_LIMIT_PER_INPUTSPLIT; import static org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl.SNAPSHOT_INPUTFORMAT_LOCALITY_BY_REGION_LOCATION; import static org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl.SNAPSHOT_INPUTFORMAT_LOCALITY_BY_REGION_LOCATION_DEFAULT; +import static org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl.SNAPSHOT_INPUTFORMAT_SCANNER_READTYPE; +import static org.junit.Assert.assertEquals; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -41,6 +43,7 @@ import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.TestTableSnapshotScanner; +import org.apache.hadoop.hbase.client.Scan.ReadType; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat.TableSnapshotRegionSplit; import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils; @@ -407,6 +410,34 @@ public void testNoDuplicateResultsWhenSplitting() throws Exception { } } + @Test + public void testScannerReadTypeConfiguration() throws IOException { + Configuration conf = new Configuration(false); + // Explicitly set ReadTypes should persist + for (ReadType readType : Arrays.asList(ReadType.PREAD, ReadType.STREAM)) { + Scan scanWithReadType = new Scan(); + scanWithReadType.setReadType(readType); + assertEquals(scanWithReadType.getReadType(), serializeAndReturn(conf, scanWithReadType).getReadType()); + } + // We should only see the DEFAULT ReadType getting updated to STREAM. + Scan scanWithoutReadType = new Scan(); + assertEquals(ReadType.DEFAULT, scanWithoutReadType.getReadType()); + assertEquals(ReadType.STREAM, serializeAndReturn(conf, scanWithoutReadType).getReadType()); + + // We should still be able to force a certain ReadType when DEFAULT is given. + conf.setEnum(SNAPSHOT_INPUTFORMAT_SCANNER_READTYPE, ReadType.PREAD); + assertEquals(ReadType.DEFAULT, scanWithoutReadType.getReadType()); + assertEquals(ReadType.PREAD, serializeAndReturn(conf, scanWithoutReadType).getReadType()); + } + + /** + * Serializes and deserializes the given scan in the same manner that TableSnapshotInputFormat does. + */ + private Scan serializeAndReturn(Configuration conf, Scan s) throws IOException { + conf.set(TableInputFormat.SCAN, TableMapReduceUtil.convertScanToString(s)); + return TableSnapshotInputFormatImpl.extractScanFromConf(conf); + } + private void verifyWithMockedMapReduce(Job job, int numRegions, int expectedNumSplits, byte[] startRow, byte[] stopRow) throws IOException, InterruptedException { From a808a2df2386b520fe6d39a27e431e9089385532 Mon Sep 17 00:00:00 2001 From: Josh Elser Date: Mon, 13 Sep 2021 16:37:31 -0400 Subject: [PATCH 2/2] Whitespace, checkstyle, and captialization on config key --- .../hbase/mapreduce/TableSnapshotInputFormatImpl.java | 11 +++++++---- .../hbase/mapreduce/TestTableSnapshotInputFormat.java | 8 +++++--- 2 files changed, 12 insertions(+), 7 deletions(-) diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatImpl.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatImpl.java index 3438afc3aa6f..c467a3cbb9a1 100644 --- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatImpl.java +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatImpl.java @@ -130,10 +130,12 @@ public class TableSnapshotInputFormatImpl { public static final boolean SNAPSHOT_INPUTFORMAT_SCAN_METRICS_ENABLED_DEFAULT = true; /** - * The {@link ReadType} which should be set on the {@link Scan} to read the HBase Snapshot, default STREAM. + * The {@link ReadType} which should be set on the {@link Scan} to read the HBase Snapshot, + * default STREAM. */ - public static final String SNAPSHOT_INPUTFORMAT_SCANNER_READTYPE = "hbase.TableSnapshotinputFormat.scanner.readtype"; - public static final ReadType SNAPSHOT_INPUTFORMAT_SCANNER_READTYPE_DEFAULT = ReadType.STREAM; + public static final String SNAPSHOT_INPUTFORMAT_SCANNER_READTYPE = + "hbase.TableSnapshotInputFormat.scanner.readtype"; + public static final ReadType SNAPSHOT_INPUTFORMAT_SCANNER_READTYPE_DEFAULT = ReadType.STREAM; /** * Implementation class for InputSplit logic common between mapred and mapreduce. @@ -391,7 +393,8 @@ public static Scan extractScanFromConf(Configuration conf) throws IOException { } if (scan.getReadType() == ReadType.DEFAULT) { - LOG.info("Provided Scan has DEFAULT ReadType, updating STREAM for Snapshot-based InputFormat"); + LOG.info("Provided Scan has DEFAULT ReadType," + + " updating STREAM for Snapshot-based InputFormat"); // Update the "DEFAULT" ReadType to be "STREAM" to try to improve the default case. scan.setReadType(conf.getEnum(SNAPSHOT_INPUTFORMAT_SCANNER_READTYPE, SNAPSHOT_INPUTFORMAT_SCANNER_READTYPE_DEFAULT)); diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSnapshotInputFormat.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSnapshotInputFormat.java index 6bb8cbf3e224..f4e9f7d09d22 100644 --- a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSnapshotInputFormat.java +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSnapshotInputFormat.java @@ -41,9 +41,9 @@ import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Scan.ReadType; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.TestTableSnapshotScanner; -import org.apache.hadoop.hbase.client.Scan.ReadType; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat.TableSnapshotRegionSplit; import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils; @@ -417,7 +417,8 @@ public void testScannerReadTypeConfiguration() throws IOException { for (ReadType readType : Arrays.asList(ReadType.PREAD, ReadType.STREAM)) { Scan scanWithReadType = new Scan(); scanWithReadType.setReadType(readType); - assertEquals(scanWithReadType.getReadType(), serializeAndReturn(conf, scanWithReadType).getReadType()); + assertEquals(scanWithReadType.getReadType(), + serializeAndReturn(conf, scanWithReadType).getReadType()); } // We should only see the DEFAULT ReadType getting updated to STREAM. Scan scanWithoutReadType = new Scan(); @@ -431,7 +432,8 @@ public void testScannerReadTypeConfiguration() throws IOException { } /** - * Serializes and deserializes the given scan in the same manner that TableSnapshotInputFormat does. + * Serializes and deserializes the given scan in the same manner that + * TableSnapshotInputFormat does. */ private Scan serializeAndReturn(Configuration conf, Scan s) throws IOException { conf.set(TableInputFormat.SCAN, TableMapReduceUtil.convertScanToString(s));