Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ enum MoreResults {
// indicate if it is a remote server call
protected boolean isRegionServerRemote = true;
private long nextCallSeq = 0;
private long lastCallSeq = -1;
protected final RpcControllerFactory rpcControllerFactory;

/**
Expand Down Expand Up @@ -193,9 +194,11 @@ private ScanResponse next() throws IOException {
this.scanMetrics != null, renew, scan.getLimit());
try {
ScanResponse response = getStub().scan(getRpcController(), request);
lastCallSeq = nextCallSeq;
nextCallSeq++;
return response;
} catch (Exception e) {
lastCallSeq = nextCallSeq;
IOException ioe = ProtobufUtil.handleRemoteException(e);
if (logScannerActivity) {
LOG.info(
Expand Down Expand Up @@ -257,9 +260,11 @@ protected Result[] rpcCall() throws Exception {
return null;
}
ScanResponse response;
boolean isRetry = false;
if (this.scannerId == -1L) {
response = openScanner();
} else {
if (lastCallSeq == nextCallSeq) isRetry = true;
response = next();
}
long timestamp = EnvironmentEdgeManager.currentTime();
Expand All @@ -269,14 +274,26 @@ protected Result[] rpcCall() throws Exception {
cursor = ProtobufUtil.toCursor(response.getCursor());
}
Result[] rrs = ResponseConverter.getResults(getRpcControllerCellScanner(), response);
int rows = rrs == null ? 0 : rrs.length;
if (logScannerActivity) {
long now = EnvironmentEdgeManager.currentTime();
if (now - timestamp > logCutOffLatency) {
int rows = rrs == null ? 0 : rrs.length;
LOG.info(
"Took " + (now - timestamp) + "ms to fetch " + rows + " rows from scanner=" + scannerId);
}
}

if (isRetry && !isHeartBeat && rows == 0) {
// It is possible that the RPC before retry resulted in a DoNotRetryIOException
// but due to connection issues the client didn't get it and thrown a retriable
// connection exception. In this case the scanner is already closed on the
// server side and will return empty results on the retry attempt. This
// should lead to resetting the scanner instead of considering it finished.
LOG.info("Unexpected empty result on retried scan RPC. Resetting scanner {}", this.scannerId);
throw new ScannerResetException(
"Resetting the scanner -- unexpected empty result on retried scan RPC.");
}

updateServerSideMetrics(scanMetrics, response);
// moreResults is only used for the case where a filter exhausts all elements
if (response.hasMoreResults()) {
Expand Down