Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public class ClientSideRegionScanner extends AbstractClientScanner {
private HRegion region;
RegionScanner scanner;
List<Cell> values;
boolean hasMore = true;

public ClientSideRegionScanner(Configuration conf, FileSystem fs, Path rootDir,
TableDescriptor htd, RegionInfo hri, Scan scan, ScanMetrics scanMetrics) throws IOException {
Expand Down Expand Up @@ -90,12 +91,13 @@ public ClientSideRegionScanner(Configuration conf, FileSystem fs, Path rootDir,

@Override
public Result next() throws IOException {
values.clear();
scanner.nextRaw(values);
if (values.isEmpty()) {
// we are done
return null;
}
do {
if (!hasMore) {
return null;
}
values.clear();
this.hasMore = scanner.nextRaw(values);
} while (values.isEmpty());

Result result = Result.create(values);
if (this.scanMetrics != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,34 @@
*/
package org.apache.hadoop.hbase.client;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.anyList;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;

import java.io.IOException;
import java.util.Arrays;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.filter.FilterBase;
import org.apache.hadoop.hbase.io.hfile.BlockCache;
import org.apache.hadoop.hbase.io.hfile.IndexOnlyLruBlockCache;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.regionserver.StoreScanner;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
Expand All @@ -47,6 +59,8 @@ public class TestClientSideRegionScanner {
HBaseClassTestRule.forClass(TestClientSideRegionScanner.class);

private final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
private static final TableName TABLE_NAME = TableName.valueOf("test");
private static final byte[] FAM_NAME = Bytes.toBytes("f");

private Configuration conf;
private Path rootDir;
Expand Down Expand Up @@ -113,4 +127,78 @@ public void testNoBlockCache() throws IOException {
BlockCache blockCache = clientSideRegionScanner.getRegion().getBlockCache();
assertNull(blockCache);
}

@Test
public void testContinuesToScanIfHasMore() throws IOException {
// Conditions for this test to set up RegionScannerImpl to bail on the scan
// after a single iteration
// 1. Configure preadMaxBytes to something small to trigger scannerContext#returnImmediately
// 2. Configure a filter to filter out some rows, in this case rows with values < 5
// 3. Configure the filter's hasFilterRow to return true so RegionScannerImpl sets
// the limitScope to something with a depth of 0, so we bail on the scan after the first
// iteration

Configuration copyConf = new Configuration(conf);
copyConf.setLong(StoreScanner.STORESCANNER_PREAD_MAX_BYTES, 1);
Scan scan = new Scan();
scan.setFilter(new FiltersRowsLessThan5());
scan.setLimit(1);

try (Table table = TEST_UTIL.createTable(TABLE_NAME, FAM_NAME)) {
TableDescriptor htd = TEST_UTIL.getAdmin().getDescriptor(TABLE_NAME);
RegionInfo hri = TEST_UTIL.getAdmin().getRegions(TABLE_NAME).get(0);

for (int i = 0; i < 10; ++i) {
table.put(createPut(i));
}

// Flush contents to disk so we can scan the fs
TEST_UTIL.getAdmin().flush(TABLE_NAME);

ClientSideRegionScanner clientSideRegionScanner =
new ClientSideRegionScanner(copyConf, fs, rootDir, htd, hri, scan, null);
RegionScanner scannerSpy = spy(clientSideRegionScanner.scanner);
clientSideRegionScanner.scanner = scannerSpy;
Result result = clientSideRegionScanner.next();

verify(scannerSpy, times(6)).nextRaw(anyList());
assertNotNull(result);
assertEquals(Bytes.toInt(result.getRow()), 5);
assertTrue(clientSideRegionScanner.hasMore);

for (int i = 6; i < 10; ++i) {
result = clientSideRegionScanner.next();
verify(scannerSpy, times(i + 1)).nextRaw(anyList());
assertNotNull(result);
assertEquals(Bytes.toInt(result.getRow()), i);
}

result = clientSideRegionScanner.next();
assertNull(result);
assertFalse(clientSideRegionScanner.hasMore);
}
}

private static Put createPut(int rowAsInt) {
byte[] row = Bytes.toBytes(rowAsInt);
Put put = new Put(row);
put.addColumn(FAM_NAME, row, row);
return put;
}

private static class FiltersRowsLessThan5 extends FilterBase {

@Override
public boolean filterRowKey(Cell cell) {
byte[] rowKey = Arrays.copyOfRange(cell.getRowArray(), cell.getRowOffset(),
cell.getRowLength() + cell.getRowOffset());
int intValue = Bytes.toInt(rowKey);
return intValue < 5;
}

@Override
public boolean hasFilterRow() {
return true;
}
}
}