Skip to content

Commit ed5d846

Browse files
committed
HBASE-26122: Implement an optional maximum size for Gets, after which a partial result is returned
1 parent 5e8a269 commit ed5d846

File tree

7 files changed

+201
-15
lines changed

7 files changed

+201
-15
lines changed

hbase-client/src/main/java/org/apache/hadoop/hbase/client/Get.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ public class Get extends Query implements Row {
7575
private TimeRange tr = TimeRange.allTime();
7676
private boolean checkExistenceOnly = false;
7777
private Map<byte [], NavigableSet<byte []>> familyMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
78+
private long maxResultSize = -1;
7879

7980
/**
8081
* Create a Get operation for the specified row.
@@ -281,6 +282,21 @@ public Get setFilter(Filter filter) {
281282
return this;
282283
}
283284

285+
/**
286+
* Set the maximum result size. The default is -1; this means that no specific
287+
* maximum result size will be set for this Get.
288+
*
289+
* If set to a value greater than zero, the server may respond with a Result where
290+
* {@link Result#mayHaveMoreCellsInRow()} is true. The user is required to handle
291+
* this case.
292+
*
293+
* @param maxResultSize The maximum result size in bytes
294+
*/
295+
public Get setMaxResultSize(long maxResultSize) {
296+
this.maxResultSize = maxResultSize;
297+
return this;
298+
}
299+
284300
/* Accessors */
285301

286302
/**
@@ -400,6 +416,13 @@ public Map<String, Object> getFingerprint() {
400416
return map;
401417
}
402418

419+
/**
420+
* @return the maximum result size in bytes. See {@link #setMaxResultSize(long)}
421+
*/
422+
public long getMaxResultSize() {
423+
return maxResultSize;
424+
}
425+
403426
/**
404427
* Compile the details beyond the scope of getFingerprint (row, columns,
405428
* timestamps, etc.) into a Map along with the fingerprinted information.

hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -640,6 +640,9 @@ public static Get toGet(final ClientProtos.Get proto) throws IOException {
640640
if (proto.hasLoadColumnFamiliesOnDemand()) {
641641
get.setLoadColumnFamiliesOnDemand(proto.getLoadColumnFamiliesOnDemand());
642642
}
643+
if (proto.hasMaxResultSize()) {
644+
get.setMaxResultSize(proto.getMaxResultSize());
645+
}
643646
return get;
644647
}
645648

@@ -1296,6 +1299,9 @@ public static ClientProtos.Get toGet(
12961299
if (loadColumnFamiliesOnDemand != null) {
12971300
builder.setLoadColumnFamiliesOnDemand(loadColumnFamiliesOnDemand);
12981301
}
1302+
if (get.getMaxResultSize() > 0) {
1303+
builder.setMaxResultSize(get.getMaxResultSize());
1304+
}
12991305
return builder.build();
13001306
}
13011307

@@ -1497,6 +1503,7 @@ public static ClientProtos.Result toResultNoData(final Result result) {
14971503
ClientProtos.Result.Builder builder = ClientProtos.Result.newBuilder();
14981504
builder.setAssociatedCellCount(size);
14991505
builder.setStale(result.isStale());
1506+
builder.setPartial(result.mayHaveMoreCellsInRow());
15001507
return builder.build();
15011508
}
15021509

@@ -1587,7 +1594,7 @@ public static Result toResult(final ClientProtos.Result proto, final CellScanner
15871594

15881595
return (cells == null || cells.isEmpty())
15891596
? (proto.getStale() ? EMPTY_RESULT_STALE : EMPTY_RESULT)
1590-
: Result.create(cells, null, proto.getStale());
1597+
: Result.create(cells, null, proto.getStale(), proto.getPartial());
15911598
}
15921599

15931600

hbase-protocol-shaded/src/main/protobuf/client/Client.proto

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,8 @@ message Get {
9090
optional Consistency consistency = 12 [default = STRONG];
9191
repeated ColumnFamilyTimeRange cf_time_range = 13;
9292
optional bool load_column_families_on_demand = 14; /* DO NOT add defaults to load_column_families_on_demand. */
93+
94+
optional uint64 max_result_size = 15;
9395
}
9496

9597
message Result {

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java

Lines changed: 29 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,7 @@
142142
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
143143
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
144144
import org.apache.hadoop.hbase.regionserver.compactions.ForbidMajorCompactionChecker;
145+
import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope;
145146
import org.apache.hadoop.hbase.regionserver.throttle.CompactionThroughputControllerFactory;
146147
import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
147148
import org.apache.hadoop.hbase.regionserver.throttle.StoreHotnessProtector;
@@ -3871,8 +3872,7 @@ public void prepareMiniBatchOperations(MiniBatchOperationInProgress<Mutation> mi
38713872
Result result;
38723873
if (returnResults) {
38733874
// convert duplicate increment/append to get
3874-
List<Cell> results = region.get(toGet(mutation), false, nonceGroup, nonce);
3875-
result = Result.create(results);
3875+
result = region.get(toGet(mutation), false, nonceGroup, nonce);
38763876
} else {
38773877
result = Result.EMPTY_RESULT;
38783878
}
@@ -7524,9 +7524,7 @@ public static boolean rowIsInRange(RegionInfo info, final byte [] row, final int
75247524
@Override
75257525
public Result get(final Get get) throws IOException {
75267526
prepareGet(get);
7527-
List<Cell> results = get(get, true);
7528-
boolean stale = this.getRegionInfo().getReplicaId() != 0;
7529-
return Result.create(results, get.isCheckExistenceOnly() ? !results.isEmpty() : null, stale);
7527+
return get(get, true, HConstants.NO_NONCE, HConstants.NO_NONCE);
75307528
}
75317529

75327530
void prepareGet(final Get get) throws IOException {
@@ -7545,17 +7543,37 @@ void prepareGet(final Get get) throws IOException {
75457543

75467544
@Override
75477545
public List<Cell> get(Get get, boolean withCoprocessor) throws IOException {
7548-
return get(get, withCoprocessor, HConstants.NO_NONCE, HConstants.NO_NONCE);
7546+
return get(get, null, withCoprocessor, HConstants.NO_NONCE, HConstants.NO_NONCE);
75497547
}
75507548

7551-
private List<Cell> get(Get get, boolean withCoprocessor, long nonceGroup, long nonce)
7549+
private Result get(Get get, boolean withCoprocessor, long nonceGroup, long nonce)
75527550
throws IOException {
7553-
return TraceUtil.trace(() -> getInternal(get, withCoprocessor, nonceGroup, nonce),
7551+
7552+
ScannerContext scannerContext = get.getMaxResultSize() > 0
7553+
? ScannerContext.newBuilder()
7554+
.setSizeLimit(LimitScope.BETWEEN_CELLS, get.getMaxResultSize(), get.getMaxResultSize())
7555+
.build()
7556+
: null;
7557+
7558+
List<Cell> result = get(get, scannerContext, withCoprocessor, nonceGroup, nonce);
7559+
boolean stale = this.getRegionInfo().getReplicaId() != 0;
7560+
7561+
return Result.create(
7562+
result,
7563+
get.isCheckExistenceOnly() ? !result.isEmpty() : null,
7564+
stale,
7565+
scannerContext != null && scannerContext.mayHaveMoreCellsInRow());
7566+
}
7567+
7568+
private List<Cell> get(Get get, ScannerContext scannerContext, boolean withCoprocessor,
7569+
long nonceGroup, long nonce) throws IOException {
7570+
return TraceUtil.trace(
7571+
() -> getInternal(get, scannerContext, withCoprocessor, nonceGroup, nonce),
75547572
() -> createRegionSpan("Region.get"));
75557573
}
75567574

7557-
private List<Cell> getInternal(Get get, boolean withCoprocessor, long nonceGroup, long nonce)
7558-
throws IOException {
7575+
private List<Cell> getInternal(Get get, ScannerContext scannerContext, boolean withCoprocessor,
7576+
long nonceGroup, long nonce) throws IOException {
75597577
List<Cell> results = new ArrayList<>();
75607578
long before = EnvironmentEdgeManager.currentTime();
75617579

@@ -7572,7 +7590,7 @@ private List<Cell> getInternal(Get get, boolean withCoprocessor, long nonceGroup
75727590
}
75737591
try (RegionScanner scanner = getScanner(scan, null, nonceGroup, nonce)) {
75747592
List<Cell> tmp = new ArrayList<>();
7575-
scanner.next(tmp);
7593+
scanner.next(tmp, scannerContext);
75767594
// Copy EC to heap, then close the scanner.
75777595
// This can be an EXPENSIVE call. It may make an extra copy from offheap to onheap buffers.
75787596
// See more details in HBASE-26036.

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2695,10 +2695,17 @@ private Result get(Get get, HRegion region, RegionScannersCloseCallBack closeCal
26952695
if (scan.getLoadColumnFamiliesOnDemandValue() == null) {
26962696
scan.setLoadColumnFamiliesOnDemand(region.isLoadingCfsOnDemandDefault());
26972697
}
2698+
2699+
ScannerContext scannerContext = get.getMaxResultSize() > 0
2700+
? ScannerContext.newBuilder()
2701+
.setSizeLimit(LimitScope.BETWEEN_CELLS, get.getMaxResultSize(), get.getMaxResultSize())
2702+
.build()
2703+
: null;
2704+
26982705
RegionScannerImpl scanner = null;
26992706
try {
27002707
scanner = region.getScanner(scan);
2701-
scanner.next(results);
2708+
scanner.next(results, scannerContext);
27022709
} finally {
27032710
if (scanner != null) {
27042711
if (closeCallBack == null) {
@@ -2723,7 +2730,8 @@ private Result get(Get get, HRegion region, RegionScannersCloseCallBack closeCal
27232730
}
27242731
region.metricsUpdateForGet(results, before);
27252732

2726-
return Result.create(results, get.isCheckExistenceOnly() ? !results.isEmpty() : null, stale);
2733+
return Result.create(results, get.isCheckExistenceOnly() ? !results.isEmpty() : null, stale,
2734+
scannerContext != null && scannerContext.mayHaveMoreCellsInRow());
27272735
}
27282736

27292737
private void checkBatchSizeAndLogLargeSize(MultiRequest request) throws ServiceException {

hbase-server/src/test/java/org/apache/hadoop/hbase/TestPartialResultsFromClientSide.java

Lines changed: 44 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,20 +24,23 @@
2424
import static org.junit.Assert.assertThrows;
2525
import static org.junit.Assert.assertTrue;
2626
import static org.junit.Assert.fail;
27-
2827
import java.io.IOException;
2928
import java.util.ArrayList;
3029
import java.util.List;
3130
import org.apache.hadoop.hbase.client.Delete;
31+
import org.apache.hadoop.hbase.client.Get;
3232
import org.apache.hadoop.hbase.client.Put;
3333
import org.apache.hadoop.hbase.client.RegionInfo;
3434
import org.apache.hadoop.hbase.client.Result;
3535
import org.apache.hadoop.hbase.client.ResultScanner;
3636
import org.apache.hadoop.hbase.client.Scan;
3737
import org.apache.hadoop.hbase.client.Table;
38+
import org.apache.hadoop.hbase.filter.BinaryComparator;
3839
import org.apache.hadoop.hbase.filter.ColumnPrefixFilter;
3940
import org.apache.hadoop.hbase.filter.ColumnRangeFilter;
41+
import org.apache.hadoop.hbase.filter.FamilyFilter;
4042
import org.apache.hadoop.hbase.filter.Filter;
43+
import org.apache.hadoop.hbase.filter.FilterList;
4144
import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
4245
import org.apache.hadoop.hbase.filter.FirstKeyValueMatchingQualifiersFilter;
4346
import org.apache.hadoop.hbase.filter.RandomRowFilter;
@@ -134,6 +137,46 @@ public static void tearDownAfterClass() throws Exception {
134137
TEST_UTIL.shutdownMiniCluster();
135138
}
136139

140+
@Test
141+
public void testGetPartialResults() throws Exception {
142+
byte[] row = ROWS[0];
143+
144+
Result result;
145+
int cf = 0;
146+
int qf = 0;
147+
int total = 0;
148+
149+
do {
150+
// this will ensure we always return only 1 result
151+
Get get = new Get(row)
152+
.setMaxResultSize(1);
153+
154+
// we want to page through the entire row, this will ensure we always get the next
155+
if (total > 0) {
156+
get.setFilter(new FilterList(FilterList.Operator.MUST_PASS_ALL,
157+
new ColumnRangeFilter(QUALIFIERS[qf], true, null, false),
158+
new FamilyFilter(CompareOperator.GREATER_OR_EQUAL, new BinaryComparator(FAMILIES[cf]))));
159+
}
160+
161+
// all values are the same, but there should be a value
162+
result = TABLE.get(get);
163+
assertTrue(String.format("Value for family %s (# %s) and qualifier %s (# %s)",
164+
Bytes.toStringBinary(FAMILIES[cf]), cf, Bytes.toStringBinary(QUALIFIERS[qf]), qf),
165+
Bytes.equals(VALUE, result.getValue(FAMILIES[cf], QUALIFIERS[qf])));
166+
167+
total++;
168+
if (++qf >= NUM_QUALIFIERS) {
169+
cf++;
170+
qf = 0;
171+
}
172+
} while (result.mayHaveMoreCellsInRow());
173+
174+
// ensure we iterated all cells in row
175+
assertEquals(NUM_COLS, total);
176+
assertEquals(NUM_FAMILIES, cf);
177+
assertEquals(0, qf);
178+
}
179+
137180
/**
138181
* Ensure that the expected key values appear in a result returned from a scanner that is
139182
* combining partial results into complete results

hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7879,4 +7879,89 @@ public void run() {
78797879
assertFalse("Region lock holder should not have been interrupted", holderInterrupted.get());
78807880
}
78817881

7882+
@Test
7883+
public void testOversizedGetsReturnPartialResult() throws IOException {
7884+
HRegion region = initHRegion(tableName, name.getMethodName(), CONF, fam1);
7885+
7886+
Put p = new Put(row)
7887+
.addColumn(fam1, qual1, value1)
7888+
.addColumn(fam1, qual2, value2);
7889+
7890+
region.put(p);
7891+
7892+
Get get = new Get(row)
7893+
.addColumn(fam1, qual1)
7894+
.addColumn(fam1, qual2)
7895+
.setMaxResultSize(1); // 0 doesn't count as a limit, according to HBase
7896+
7897+
Result r = region.get(get);
7898+
7899+
assertTrue("Expected partial result, but result was not marked as partial", r.mayHaveMoreCellsInRow());
7900+
}
7901+
7902+
@Test
7903+
public void testGetsWithoutResultSizeLimitAreNotPartial() throws IOException {
7904+
HRegion region = initHRegion(tableName, name.getMethodName(), CONF, fam1);
7905+
7906+
Put p = new Put(row)
7907+
.addColumn(fam1, qual1, value1)
7908+
.addColumn(fam1, qual2, value2);
7909+
7910+
region.put(p);
7911+
7912+
Get get = new Get(row)
7913+
.addColumn(fam1, qual1)
7914+
.addColumn(fam1, qual2);
7915+
7916+
Result r = region.get(get);
7917+
7918+
assertFalse("Expected full result, but it was marked as partial", r.mayHaveMoreCellsInRow());
7919+
assertTrue(Bytes.equals(value1, r.getValue(fam1, qual1)));
7920+
assertTrue(Bytes.equals(value2, r.getValue(fam1, qual2)));
7921+
}
7922+
7923+
@Test
7924+
public void testGetsWithinResultSizeLimitAreNotPartial() throws IOException {
7925+
HRegion region = initHRegion(tableName, name.getMethodName(), CONF, fam1);
7926+
7927+
Put p = new Put(row)
7928+
.addColumn(fam1, qual1, value1)
7929+
.addColumn(fam1, qual2, value2);
7930+
7931+
region.put(p);
7932+
7933+
Get get = new Get(row)
7934+
.addColumn(fam1, qual1)
7935+
.addColumn(fam1, qual2)
7936+
.setMaxResultSize(Long.MAX_VALUE);
7937+
7938+
Result r = region.get(get);
7939+
7940+
assertFalse("Expected full result, but it was marked as partial", r.mayHaveMoreCellsInRow());
7941+
assertTrue(Bytes.equals(value1, r.getValue(fam1, qual1)));
7942+
assertTrue(Bytes.equals(value2, r.getValue(fam1, qual2)));
7943+
}
7944+
7945+
@Test
7946+
public void testGetsWithResultSizeLimitReturnPartialResults() throws IOException {
7947+
HRegion region = initHRegion(tableName, name.getMethodName(), CONF, fam1);
7948+
7949+
Put p = new Put(row)
7950+
.addColumn(fam1, qual1, value1)
7951+
.addColumn(fam1, qual2, value2);
7952+
7953+
region.put(p);
7954+
7955+
Get get = new Get(row)
7956+
.addColumn(fam1, qual1)
7957+
.addColumn(fam1, qual2)
7958+
.setMaxResultSize(10);
7959+
7960+
Result r = region.get(get);
7961+
7962+
assertTrue("Expected partial result, but it was marked as complete", r.mayHaveMoreCellsInRow());
7963+
assertTrue(Bytes.equals(value1, r.getValue(fam1, qual1)));
7964+
assertEquals("Got more results than expected", 1, r.size());
7965+
}
7966+
78827967
}

0 commit comments

Comments
 (0)