diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatImpl.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatImpl.java index 22c19be7ce2a..c467a3cbb9a1 100644 --- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatImpl.java +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatImpl.java @@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.client.RegionLocator; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Scan.ReadType; import org.apache.hadoop.hbase.client.TableDescriptor; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.regionserver.HRegion; @@ -128,6 +129,14 @@ public class TableSnapshotInputFormatImpl { public static final boolean SNAPSHOT_INPUTFORMAT_SCAN_METRICS_ENABLED_DEFAULT = true; + /** + * The {@link ReadType} which should be set on the {@link Scan} to read the HBase Snapshot, + * default STREAM. + */ + public static final String SNAPSHOT_INPUTFORMAT_SCANNER_READTYPE = + "hbase.TableSnapshotInputFormat.scanner.readtype"; + public static final ReadType SNAPSHOT_INPUTFORMAT_SCANNER_READTYPE_DEFAULT = ReadType.STREAM; + /** * Implementation class for InputSplit logic common between mapred and mapreduce. */ @@ -382,6 +391,15 @@ public static Scan extractScanFromConf(Configuration conf) throws IOException { } else { throw new IllegalArgumentException("Unable to create scan"); } + + if (scan.getReadType() == ReadType.DEFAULT) { + LOG.info("Provided Scan has DEFAULT ReadType," + + " updating STREAM for Snapshot-based InputFormat"); + // Update the "DEFAULT" ReadType to be "STREAM" to try to improve the default case. + scan.setReadType(conf.getEnum(SNAPSHOT_INPUTFORMAT_SCANNER_READTYPE, + SNAPSHOT_INPUTFORMAT_SCANNER_READTYPE_DEFAULT)); + } + return scan; } diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSnapshotInputFormat.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSnapshotInputFormat.java index 34e6b274dab7..f4e9f7d09d22 100644 --- a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSnapshotInputFormat.java +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSnapshotInputFormat.java @@ -22,6 +22,8 @@ import static org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl.SNAPSHOT_INPUTFORMAT_ROW_LIMIT_PER_INPUTSPLIT; import static org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl.SNAPSHOT_INPUTFORMAT_LOCALITY_BY_REGION_LOCATION; import static org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl.SNAPSHOT_INPUTFORMAT_LOCALITY_BY_REGION_LOCATION_DEFAULT; +import static org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl.SNAPSHOT_INPUTFORMAT_SCANNER_READTYPE; +import static org.junit.Assert.assertEquals; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -39,6 +41,7 @@ import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Scan.ReadType; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.TestTableSnapshotScanner; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; @@ -407,6 +410,36 @@ public void testNoDuplicateResultsWhenSplitting() throws Exception { } } + @Test + public void testScannerReadTypeConfiguration() throws IOException { + Configuration conf = new Configuration(false); + // Explicitly set ReadTypes should persist + for (ReadType readType : Arrays.asList(ReadType.PREAD, ReadType.STREAM)) { + Scan scanWithReadType = new Scan(); + scanWithReadType.setReadType(readType); + assertEquals(scanWithReadType.getReadType(), + serializeAndReturn(conf, scanWithReadType).getReadType()); + } + // We should only see the DEFAULT ReadType getting updated to STREAM. + Scan scanWithoutReadType = new Scan(); + assertEquals(ReadType.DEFAULT, scanWithoutReadType.getReadType()); + assertEquals(ReadType.STREAM, serializeAndReturn(conf, scanWithoutReadType).getReadType()); + + // We should still be able to force a certain ReadType when DEFAULT is given. + conf.setEnum(SNAPSHOT_INPUTFORMAT_SCANNER_READTYPE, ReadType.PREAD); + assertEquals(ReadType.DEFAULT, scanWithoutReadType.getReadType()); + assertEquals(ReadType.PREAD, serializeAndReturn(conf, scanWithoutReadType).getReadType()); + } + + /** + * Serializes and deserializes the given scan in the same manner that + * TableSnapshotInputFormat does. + */ + private Scan serializeAndReturn(Configuration conf, Scan s) throws IOException { + conf.set(TableInputFormat.SCAN, TableMapReduceUtil.convertScanToString(s)); + return TableSnapshotInputFormatImpl.extractScanFromConf(conf); + } + private void verifyWithMockedMapReduce(Job job, int numRegions, int expectedNumSplits, byte[] startRow, byte[] stopRow) throws IOException, InterruptedException {