diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java index b34ef863d565..32776dde3e65 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java @@ -448,30 +448,22 @@ void groupAndSendMultiAction(List currentActions, int numAttempt) { boolean isReplica = false; List unknownReplicaActions = null; + List locateRegionFailedActions = null; for (Action action : currentActions) { if (isOperationTimeoutExceeded()) { - String message = numAttempt == 1 - ? "Operation timeout exceeded during resolution of region locations, " - + "prior to executing any actions." - : "Operation timeout exceeded during re-resolution of region locations on retry " - + (numAttempt - 1) + "."; - - message += " Meta may be slow or operation timeout too short for batch size or retries."; - OperationTimeoutExceededException exception = - new OperationTimeoutExceededException(message); - - // Clear any actions we already resolved, because none will have been executed yet - // We are going to fail all passed actions because there's no way we can execute any - // if operation timeout is exceeded. actionsByServer.clear(); - for (Action actionToFail : currentActions) { - manageLocationError(actionToFail, exception); - } + failIncompleteActionsWithOpTimeout(currentActions, locateRegionFailedActions, numAttempt); return; } RegionLocations locs = findAllLocationsOrFail(action, true); - if (locs == null) continue; + if (locs == null) { + if (locateRegionFailedActions == null) { + locateRegionFailedActions = new ArrayList<>(1); + } + locateRegionFailedActions.add(action); + continue; + } boolean isReplicaAction = !RegionReplicaUtil.isDefaultReplica(action.getReplicaId()); if (isReplica && !isReplicaAction) { // This is the property of the current implementation, not a requirement. @@ -488,6 +480,10 @@ void groupAndSendMultiAction(List currentActions, int numAttempt) { } else { // TODO: relies on primary location always being fetched manageLocationError(action, null); + if (locateRegionFailedActions == null) { + locateRegionFailedActions = new ArrayList<>(1); + } + locateRegionFailedActions.add(action); } } else { byte[] regionName = loc.getRegionInfo().getRegionName(); @@ -561,6 +557,39 @@ private RegionLocations findAllLocationsOrFail(Action action, boolean useCache) return loc; } + /** + * For failing all actions that were being grouped during a groupAndSendMultiAction when operation + * timeout was exceeded and there is no time remaining to continue grouping/sending any of the + * actions. We don't fail any actions which have already failed to completion during grouping due + * to location error (they already have an error set and had action counter decremented for) + * @param actions actions being processed by the groupAndSend when operation + * timeout occurred + * @param locateRegionFailedActions actions already failed to completion due to location error + * @param numAttempt the number of attempts so far + */ + private void failIncompleteActionsWithOpTimeout(List actions, + List locateRegionFailedActions, int numAttempt) { + String message = numAttempt == 1 + ? "Operation timeout exceeded during resolution of region locations, " + + "prior to executing any actions." + : "Operation timeout exceeded during re-resolution of region locations on retry " + + (numAttempt - 1) + "."; + message += " Meta may be slow or operation timeout too short for batch size or retries."; + OperationTimeoutExceededException exception = new OperationTimeoutExceededException(message); + + for (Action actionToFail : actions) { + // Action equality is implemented as row equality so we check action index equality + // since we don't want two different actions for the same row to be considered equal here + boolean actionAlreadyFailed = + locateRegionFailedActions != null && locateRegionFailedActions.stream().anyMatch( + failedAction -> failedAction.getOriginalIndex() == actionToFail.getOriginalIndex() + && failedAction.getReplicaId() == actionToFail.getReplicaId()); + if (!actionAlreadyFailed) { + manageLocationError(actionToFail, exception); + } + } + } + /** * Send a multi action structure to the servers, after a delay depending on the attempt number. * Asynchronous. diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientOperationTimeout.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientOperationTimeout.java index 114886a587a5..a975fdb3c9da 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientOperationTimeout.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientOperationTimeout.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.net.SocketTimeoutException; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseClassTestRule; @@ -321,6 +322,70 @@ public void testMultiGetRetryTimeout() { } } + /** + * Test that for a batch operation where region location resolution fails for the first action in + * the batch and consumes the entire operation timeout, that the location error is preserved for + * the first action and that the rest of the batch is failed fast with + * OperationTimeoutExceededException , this also (indirectly) tests that the action counter is + * decremented properly for all actions, see last catch block + */ + @Test + public void testMultiOperationTimoutWithLocationError() throws IOException, InterruptedException { + // Need meta delay > meta scan timeout > operation timeout (with no retries) so that the + // meta scan for resolving region location for the first action times out after the operation + // timeout has been exceeded leaving no time to attempt region location resolution for any + // other actions remaining in the batch + int operationTimeout = 100; + int metaScanTimeout = 150; + DELAY_META_SCAN = 200; + + Configuration conf = new Configuration(UTIL.getConfiguration()); + conf.setLong(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, operationTimeout); + conf.setLong(ConnectionConfiguration.HBASE_CLIENT_META_SCANNER_TIMEOUT, metaScanTimeout); + conf.setLong(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0); + + try (Connection specialConnection = ConnectionFactory.createConnection(conf); + Table specialTable = specialConnection.getTable(TABLE_NAME)) { + + // Region location resolution for first action should fail due to meta scan timeout and cause + // the batch to exceed the operation timeout, second and third action should then be failed + // fast with OperationTimeoutExceededException + Get firstAction = new Get(Bytes.toBytes(0)).addColumn(FAMILY, QUALIFIER); + Get secondAction = firstAction; + Get thirdAction = new Get(Bytes.toBytes(1)).addColumn(FAMILY, QUALIFIER); + List gets = Arrays.asList(firstAction, secondAction, thirdAction); + try { + specialTable.batch(gets, new Object[3]); + Assert.fail("Should not reach here"); + } catch (RetriesExhaustedWithDetailsException exception) { + byte[] firstExceptionRow = exception.getRow(0).getRow(); + Assert.assertEquals(firstAction.getRow(), firstExceptionRow); + + // CallTimeout comes from the scan timeout to meta table in locateRegionInMeta + Throwable firstActionCause = exception.getCause(0); + Assert.assertTrue(firstActionCause instanceof RetriesExhaustedException); + Assert.assertTrue(firstActionCause.getCause() instanceof CallTimeoutException); + + byte[] secondExceptionRow = exception.getRow(1).getRow(); + Assert.assertEquals(secondAction.getRow(), secondExceptionRow); + + Throwable secondActionCause = exception.getCause(1); + Assert.assertTrue(secondActionCause instanceof OperationTimeoutExceededException); + + byte[] thirdExceptionRow = exception.getRow(2).getRow(); + Assert.assertEquals(thirdAction.getRow(), thirdExceptionRow); + + Throwable thirdActionCause = exception.getCause(2); + Assert.assertTrue(thirdActionCause instanceof OperationTimeoutExceededException); + } + } catch (SocketTimeoutException ste) { + if (ste.getMessage().contains("time out before the actionsInProgress changed to zero")) { + Assert.fail("Not all actions had action counter decremented: " + ste); + } + throw ste; + } + } + /** * Tests that scan on a table throws {@link RetriesExhaustedException} when the operation takes * longer than 'hbase.client.scanner.timeout.period'.