Skip to content

Commit 9b9c9fc

Browse files
committed
HBASE-28595: client side fix for partial results when exception from scan RPC is lost
1 parent 860ac60 commit 9b9c9fc

File tree

1 file changed

+18
-1
lines changed

1 file changed

+18
-1
lines changed

hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,7 @@ enum MoreResults {
9797
// indicate if it is a remote server call
9898
protected boolean isRegionServerRemote = true;
9999
private long nextCallSeq = 0;
100+
private long lastCallSeq = -1;
100101
protected final RpcControllerFactory rpcControllerFactory;
101102

102103
/**
@@ -193,9 +194,11 @@ private ScanResponse next() throws IOException {
193194
this.scanMetrics != null, renew, scan.getLimit());
194195
try {
195196
ScanResponse response = getStub().scan(getRpcController(), request);
197+
lastCallSeq = nextCallSeq;
196198
nextCallSeq++;
197199
return response;
198200
} catch (Exception e) {
201+
lastCallSeq = nextCallSeq;
199202
IOException ioe = ProtobufUtil.handleRemoteException(e);
200203
if (logScannerActivity) {
201204
LOG.info(
@@ -257,9 +260,11 @@ protected Result[] rpcCall() throws Exception {
257260
return null;
258261
}
259262
ScanResponse response;
263+
boolean isRetry = false;
260264
if (this.scannerId == -1L) {
261265
response = openScanner();
262266
} else {
267+
if (lastCallSeq == nextCallSeq) isRetry = true;
263268
response = next();
264269
}
265270
long timestamp = EnvironmentEdgeManager.currentTime();
@@ -269,14 +274,26 @@ protected Result[] rpcCall() throws Exception {
269274
cursor = ProtobufUtil.toCursor(response.getCursor());
270275
}
271276
Result[] rrs = ResponseConverter.getResults(getRpcControllerCellScanner(), response);
277+
int rows = rrs == null ? 0 : rrs.length;
272278
if (logScannerActivity) {
273279
long now = EnvironmentEdgeManager.currentTime();
274280
if (now - timestamp > logCutOffLatency) {
275-
int rows = rrs == null ? 0 : rrs.length;
276281
LOG.info(
277282
"Took " + (now - timestamp) + "ms to fetch " + rows + " rows from scanner=" + scannerId);
278283
}
279284
}
285+
286+
if (isRetry && !isHeartBeat && rows == 0) {
287+
// It is possible that the RPC before retry resulted in a DoNotRetryIOException
288+
// but due to connection issues the client didn't get it and thrown a retriable
289+
// connection exception. In this case the scanner is already closed on the
290+
// server side and will return empty results on the retry attempt. This
291+
// should lead to resetting the scanner instead of considering it finished.
292+
LOG.info("Unexpected empty result on retried scan RPC. Resetting scanner {}", this.scannerId);
293+
throw new ScannerResetException(
294+
"Resetting the scanner -- unexpected empty result on retried scan RPC.");
295+
}
296+
280297
updateServerSideMetrics(scanMetrics, response);
281298
// moreResults is only used for the case where a filter exhausts all elements
282299
if (response.hasMoreResults()) {

0 commit comments

Comments
 (0)