Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions hbase-client/src/main/java/org/apache/hadoop/hbase/client/Get.java
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ public class Get extends Query implements Row {
private TimeRange tr = TimeRange.allTime();
private boolean checkExistenceOnly = false;
private Map<byte [], NavigableSet<byte []>> familyMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
private long maxResultSize = -1;

/**
* Create a Get operation for the specified row.
Expand Down Expand Up @@ -281,6 +282,21 @@ public Get setFilter(Filter filter) {
return this;
}

/**
* Set the maximum result size. The default is -1; this means that no specific
* maximum result size will be set for this Get.
*
* If set to a value greater than zero, the server may respond with a Result where
* {@link Result#mayHaveMoreCellsInRow()} is true. The user is required to handle
* this case.
*
* @param maxResultSize The maximum result size in bytes
*/
public Get setMaxResultSize(long maxResultSize) {
this.maxResultSize = maxResultSize;
return this;
}

/* Accessors */

/**
Expand Down Expand Up @@ -400,6 +416,13 @@ public Map<String, Object> getFingerprint() {
return map;
}

/**
* @return the maximum result size in bytes. See {@link #setMaxResultSize(long)}
*/
public long getMaxResultSize() {
return maxResultSize;
}

/**
* Compile the details beyond the scope of getFingerprint (row, columns,
* timestamps, etc.) into a Map along with the fingerprinted information.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -640,6 +640,9 @@ public static Get toGet(final ClientProtos.Get proto) throws IOException {
if (proto.hasLoadColumnFamiliesOnDemand()) {
get.setLoadColumnFamiliesOnDemand(proto.getLoadColumnFamiliesOnDemand());
}
if (proto.hasMaxResultSize()) {
get.setMaxResultSize(proto.getMaxResultSize());
}
return get;
}

Expand Down Expand Up @@ -1296,6 +1299,9 @@ public static ClientProtos.Get toGet(
if (loadColumnFamiliesOnDemand != null) {
builder.setLoadColumnFamiliesOnDemand(loadColumnFamiliesOnDemand);
}
if (get.getMaxResultSize() > 0) {
builder.setMaxResultSize(get.getMaxResultSize());
}
return builder.build();
}

Expand Down Expand Up @@ -1497,6 +1503,7 @@ public static ClientProtos.Result toResultNoData(final Result result) {
ClientProtos.Result.Builder builder = ClientProtos.Result.newBuilder();
builder.setAssociatedCellCount(size);
builder.setStale(result.isStale());
builder.setPartial(result.mayHaveMoreCellsInRow());
return builder.build();
}

Expand Down Expand Up @@ -1587,7 +1594,7 @@ public static Result toResult(final ClientProtos.Result proto, final CellScanner

return (cells == null || cells.isEmpty())
? (proto.getStale() ? EMPTY_RESULT_STALE : EMPTY_RESULT)
: Result.create(cells, null, proto.getStale());
: Result.create(cells, null, proto.getStale(), proto.getPartial());
}


Expand Down
2 changes: 2 additions & 0 deletions hbase-protocol-shaded/src/main/protobuf/client/Client.proto
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ message Get {
optional Consistency consistency = 12 [default = STRONG];
repeated ColumnFamilyTimeRange cf_time_range = 13;
optional bool load_column_families_on_demand = 14; /* DO NOT add defaults to load_column_families_on_demand. */

optional uint64 max_result_size = 15;
}

message Result {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
import org.apache.hadoop.hbase.regionserver.compactions.ForbidMajorCompactionChecker;
import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope;
import org.apache.hadoop.hbase.regionserver.throttle.CompactionThroughputControllerFactory;
import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
import org.apache.hadoop.hbase.regionserver.throttle.StoreHotnessProtector;
Expand Down Expand Up @@ -3871,8 +3872,7 @@ public void prepareMiniBatchOperations(MiniBatchOperationInProgress<Mutation> mi
Result result;
if (returnResults) {
// convert duplicate increment/append to get
List<Cell> results = region.get(toGet(mutation), false, nonceGroup, nonce);
result = Result.create(results);
result = region.get(toGet(mutation), false, nonceGroup, nonce);
} else {
result = Result.EMPTY_RESULT;
}
Expand Down Expand Up @@ -7524,9 +7524,7 @@ public static boolean rowIsInRange(RegionInfo info, final byte [] row, final int
@Override
public Result get(final Get get) throws IOException {
prepareGet(get);
List<Cell> results = get(get, true);
boolean stale = this.getRegionInfo().getReplicaId() != 0;
return Result.create(results, get.isCheckExistenceOnly() ? !results.isEmpty() : null, stale);
return get(get, true, HConstants.NO_NONCE, HConstants.NO_NONCE);
}

void prepareGet(final Get get) throws IOException {
Expand All @@ -7545,17 +7543,37 @@ void prepareGet(final Get get) throws IOException {

@Override
public List<Cell> get(Get get, boolean withCoprocessor) throws IOException {
return get(get, withCoprocessor, HConstants.NO_NONCE, HConstants.NO_NONCE);
return get(get, null, withCoprocessor, HConstants.NO_NONCE, HConstants.NO_NONCE);
}

private List<Cell> get(Get get, boolean withCoprocessor, long nonceGroup, long nonce)
private Result get(Get get, boolean withCoprocessor, long nonceGroup, long nonce)
throws IOException {
return TraceUtil.trace(() -> getInternal(get, withCoprocessor, nonceGroup, nonce),

ScannerContext scannerContext = get.getMaxResultSize() > 0
? ScannerContext.newBuilder()
.setSizeLimit(LimitScope.BETWEEN_CELLS, get.getMaxResultSize(), get.getMaxResultSize())
.build()
: null;

List<Cell> result = get(get, scannerContext, withCoprocessor, nonceGroup, nonce);
boolean stale = this.getRegionInfo().getReplicaId() != 0;

return Result.create(
result,
get.isCheckExistenceOnly() ? !result.isEmpty() : null,
stale,
scannerContext != null && scannerContext.mayHaveMoreCellsInRow());
}

private List<Cell> get(Get get, ScannerContext scannerContext, boolean withCoprocessor,
long nonceGroup, long nonce) throws IOException {
return TraceUtil.trace(
() -> getInternal(get, scannerContext, withCoprocessor, nonceGroup, nonce),
() -> createRegionSpan("Region.get"));
}

private List<Cell> getInternal(Get get, boolean withCoprocessor, long nonceGroup, long nonce)
throws IOException {
private List<Cell> getInternal(Get get, ScannerContext scannerContext, boolean withCoprocessor,
long nonceGroup, long nonce) throws IOException {
List<Cell> results = new ArrayList<>();
long before = EnvironmentEdgeManager.currentTime();

Expand All @@ -7572,7 +7590,7 @@ private List<Cell> getInternal(Get get, boolean withCoprocessor, long nonceGroup
}
try (RegionScanner scanner = getScanner(scan, null, nonceGroup, nonce)) {
List<Cell> tmp = new ArrayList<>();
scanner.next(tmp);
scanner.next(tmp, scannerContext);
// Copy EC to heap, then close the scanner.
// This can be an EXPENSIVE call. It may make an extra copy from offheap to onheap buffers.
// See more details in HBASE-26036.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2695,10 +2695,17 @@ private Result get(Get get, HRegion region, RegionScannersCloseCallBack closeCal
if (scan.getLoadColumnFamiliesOnDemandValue() == null) {
scan.setLoadColumnFamiliesOnDemand(region.isLoadingCfsOnDemandDefault());
}

ScannerContext scannerContext = get.getMaxResultSize() > 0
? ScannerContext.newBuilder()
.setSizeLimit(LimitScope.BETWEEN_CELLS, get.getMaxResultSize(), get.getMaxResultSize())
.build()
: null;

RegionScannerImpl scanner = null;
try {
scanner = region.getScanner(scan);
scanner.next(results);
scanner.next(results, scannerContext);
} finally {
if (scanner != null) {
if (closeCallBack == null) {
Expand All @@ -2723,7 +2730,8 @@ private Result get(Get get, HRegion region, RegionScannersCloseCallBack closeCal
}
region.metricsUpdateForGet(results, before);

return Result.create(results, get.isCheckExistenceOnly() ? !results.isEmpty() : null, stale);
return Result.create(results, get.isCheckExistenceOnly() ? !results.isEmpty() : null, stale,
scannerContext != null && scannerContext.mayHaveMoreCellsInRow());
}

private void checkBatchSizeAndLogLargeSize(MultiRequest request) throws ServiceException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,9 @@ public synchronized boolean next(List<Cell> outResults, ScannerContext scannerCo
}
region.startRegionOperation(Operation.SCAN);
try {
if (scannerContext == null) {
scannerContext = defaultScannerContext;
}
return nextRaw(outResults, scannerContext);
} finally {
region.closeRegionOperation(Operation.SCAN);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,20 +24,23 @@
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.filter.BinaryComparator;
import org.apache.hadoop.hbase.filter.ColumnPrefixFilter;
import org.apache.hadoop.hbase.filter.ColumnRangeFilter;
import org.apache.hadoop.hbase.filter.FamilyFilter;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
import org.apache.hadoop.hbase.filter.FirstKeyValueMatchingQualifiersFilter;
import org.apache.hadoop.hbase.filter.RandomRowFilter;
Expand Down Expand Up @@ -134,6 +137,46 @@ public static void tearDownAfterClass() throws Exception {
TEST_UTIL.shutdownMiniCluster();
}

@Test
public void testGetPartialResults() throws Exception {
byte[] row = ROWS[0];

Result result;
int cf = 0;
int qf = 0;
int total = 0;

do {
// this will ensure we always return only 1 result
Get get = new Get(row)
.setMaxResultSize(1);

// we want to page through the entire row, this will ensure we always get the next
if (total > 0) {
get.setFilter(new FilterList(FilterList.Operator.MUST_PASS_ALL,
new ColumnRangeFilter(QUALIFIERS[qf], true, null, false),
new FamilyFilter(CompareOperator.GREATER_OR_EQUAL, new BinaryComparator(FAMILIES[cf]))));
}

// all values are the same, but there should be a value
result = TABLE.get(get);
assertTrue(String.format("Value for family %s (# %s) and qualifier %s (# %s)",
Bytes.toStringBinary(FAMILIES[cf]), cf, Bytes.toStringBinary(QUALIFIERS[qf]), qf),
Bytes.equals(VALUE, result.getValue(FAMILIES[cf], QUALIFIERS[qf])));

total++;
if (++qf >= NUM_QUALIFIERS) {
cf++;
qf = 0;
}
} while (result.mayHaveMoreCellsInRow());

// ensure we iterated all cells in row
assertEquals(NUM_COLS, total);
assertEquals(NUM_FAMILIES, cf);
assertEquals(0, qf);
}

/**
* Ensure that the expected key values appear in a result returned from a scanner that is
* combining partial results into complete results
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7879,4 +7879,89 @@ public void run() {
assertFalse("Region lock holder should not have been interrupted", holderInterrupted.get());
}

@Test
public void testOversizedGetsReturnPartialResult() throws IOException {
HRegion region = initHRegion(tableName, name.getMethodName(), CONF, fam1);

Put p = new Put(row)
.addColumn(fam1, qual1, value1)
.addColumn(fam1, qual2, value2);

region.put(p);

Get get = new Get(row)
.addColumn(fam1, qual1)
.addColumn(fam1, qual2)
.setMaxResultSize(1); // 0 doesn't count as a limit, according to HBase

Result r = region.get(get);

assertTrue("Expected partial result, but result was not marked as partial", r.mayHaveMoreCellsInRow());
}

@Test
public void testGetsWithoutResultSizeLimitAreNotPartial() throws IOException {
HRegion region = initHRegion(tableName, name.getMethodName(), CONF, fam1);

Put p = new Put(row)
.addColumn(fam1, qual1, value1)
.addColumn(fam1, qual2, value2);

region.put(p);

Get get = new Get(row)
.addColumn(fam1, qual1)
.addColumn(fam1, qual2);

Result r = region.get(get);

assertFalse("Expected full result, but it was marked as partial", r.mayHaveMoreCellsInRow());
assertTrue(Bytes.equals(value1, r.getValue(fam1, qual1)));
assertTrue(Bytes.equals(value2, r.getValue(fam1, qual2)));
}

@Test
public void testGetsWithinResultSizeLimitAreNotPartial() throws IOException {
HRegion region = initHRegion(tableName, name.getMethodName(), CONF, fam1);

Put p = new Put(row)
.addColumn(fam1, qual1, value1)
.addColumn(fam1, qual2, value2);

region.put(p);

Get get = new Get(row)
.addColumn(fam1, qual1)
.addColumn(fam1, qual2)
.setMaxResultSize(Long.MAX_VALUE);

Result r = region.get(get);

assertFalse("Expected full result, but it was marked as partial", r.mayHaveMoreCellsInRow());
assertTrue(Bytes.equals(value1, r.getValue(fam1, qual1)));
assertTrue(Bytes.equals(value2, r.getValue(fam1, qual2)));
}

@Test
public void testGetsWithResultSizeLimitReturnPartialResults() throws IOException {
HRegion region = initHRegion(tableName, name.getMethodName(), CONF, fam1);

Put p = new Put(row)
.addColumn(fam1, qual1, value1)
.addColumn(fam1, qual2, value2);

region.put(p);

Get get = new Get(row)
.addColumn(fam1, qual1)
.addColumn(fam1, qual2)
.setMaxResultSize(10);

Result r = region.get(get);

assertTrue("Expected partial result, but it was marked as complete", r.mayHaveMoreCellsInRow());
assertTrue(Bytes.equals(value1, r.getValue(fam1, qual1)));
assertEquals("Got more results than expected", 1, r.size());
}

}