Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,15 @@ public void run() {
// Cancelled
return;
}
} catch (OperationTimeoutExceededException e) {
// The operation has timed out before executing the actual callable. This may be due to
// slow/hotspotted
// meta or the operation timeout set too low for the number of requests. Circumventing the
// usual failure flow
// ensure the meta cache is not cleared and will not result in a doomed feedback loop in
// which the meta continues to be hotspotted
failAll(multiAction, server, numAttempt, e);
return;
} catch (IOException e) {
// The service itself failed . It may be an error coming from the communication
// layer, but, as well, a functional error raised by the server.
Expand Down Expand Up @@ -676,6 +685,25 @@ Retry manageError(int originalIndex, Row row, Retry canRetry, Throwable throwabl
return canRetry;
}

/**
* Fail all the actions from this multiaction after an OperationTimeoutExceededException
* @param actions the actions still to do from the initial list
* @param server the destination
* @param numAttempt the number of attempts so far
* @param throwable the throwable that caused the failure
*/
private void failAll(MultiAction actions, ServerName server, int numAttempt,
Throwable throwable) {
int failed = 0;
for (Map.Entry<byte[], List<Action>> e : actions.actions.entrySet()) {
for (Action action : e.getValue()) {
setError(action.getOriginalIndex(), action.getAction(), throwable, server);
++failed;
}
}
logNoResubmit(server, numAttempt, actions.size(), throwable, failed, 0);
}

/**
* Resubmit all the actions from this multiaction after a failure.
* @param rsActions the actions still to do from the initial list
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import java.io.IOException;
import java.io.InterruptedIOException;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.yetus.audience.InterfaceAudience;
Expand Down Expand Up @@ -64,7 +63,8 @@ public T call(int operationTimeout) throws IOException {
int remainingTime = tracker.getRemainingTime(operationTimeout);
if (remainingTime <= 1) {
// "1" is a special return value in RetryingTimeTracker, see its implementation.
throw new DoNotRetryIOException("Operation rpcTimeout");
throw new OperationTimeoutExceededException(
"Timeout exceeded before call began. Slow meta may be the cause or the operation timeout is too short.");
}
return super.call(Math.min(rpcTimeout, remainingTime));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;

import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.yetus.audience.InterfaceAudience;

/**
* Thrown when a batch operation exceeds the operation timeout
*/

@InterfaceAudience.Public

public class OperationTimeoutExceededException extends DoNotRetryIOException {

public OperationTimeoutExceededException() {
super();
}

public OperationTimeoutExceededException(String msg) {

super(msg);

}

public OperationTimeoutExceededException(String msg, Throwable t) {
super(msg, t);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,23 +15,21 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase;
package org.apache.hadoop.hbase.client;

import static org.apache.hadoop.hbase.client.MetricsConnection.CLIENT_SIDE_METRICS_ENABLED_KEY;

import java.io.IOException;
import java.net.SocketTimeoutException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.RetriesExhaustedException;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.StartMiniClusterOption;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.ipc.CallTimeoutException;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.RSRpcServices;
Expand Down Expand Up @@ -78,7 +76,8 @@ public class TestClientOperationTimeout {
private static int DELAY_GET;
private static int DELAY_SCAN;
private static int DELAY_MUTATE;
private static int DELAY_BATCH_MUTATE;
private static int DELAY_BATCH;
private static int DELAY_META_SCAN;

private static final TableName TABLE_NAME = TableName.valueOf("Timeout");
private static final byte[] FAMILY = Bytes.toBytes("family");
Expand Down Expand Up @@ -112,7 +111,8 @@ public void setUp() throws Exception {
DELAY_GET = 0;
DELAY_SCAN = 0;
DELAY_MUTATE = 0;
DELAY_BATCH_MUTATE = 0;
DELAY_BATCH = 0;
DELAY_META_SCAN = 0;
}

@AfterClass
Expand Down Expand Up @@ -157,12 +157,12 @@ public void testPutTimeout() {
}

/**
* Tests that a batch mutate on a table throws {@link SocketTimeoutException} when the operation
* takes longer than 'hbase.client.operation.timeout'.
* Tests that a batch mutate and batch get on a table throws {@link SocketTimeoutException} when
* the operation takes longer than 'hbase.client.operation.timeout'.
*/
@Test
public void testMultiPutsTimeout() {
DELAY_BATCH_MUTATE = 600;
public void testMultiTimeout() {
DELAY_BATCH = 600;
Put put1 = new Put(ROW);
put1.addColumn(FAMILY, QUALIFIER, VALUE);
Put put2 = new Put(ROW);
Expand All @@ -176,6 +176,72 @@ public void testMultiPutsTimeout() {
} catch (Exception e) {
Assert.assertTrue(e instanceof SocketTimeoutException);
}

Get get1 = new Get(ROW);
get1.addColumn(FAMILY, QUALIFIER);
Get get2 = new Get(ROW);
get2.addColumn(FAMILY, QUALIFIER);

List<Get> gets = new ArrayList<>();
gets.add(get1);
gets.add(get2);
try {
TABLE.batch(gets, new Object[2]);
Assert.fail("should not reach here");
} catch (Exception e) {
Assert.assertTrue(e instanceof SocketTimeoutException);
}
}

/**
* Tests that a batch get on a table throws
* {@link org.apache.hadoop.hbase.client.OperationTimeoutExceededException} when the region lookup
* takes longer than the 'hbase.client.operation.timeout'
*/
@Test
public void testMultiGetMetaTimeout() throws IOException {

Configuration conf = new Configuration(UTIL.getConfiguration());

// the operation timeout must be lower than the delay from a meta scan to etch region locations
// of the get requests. Simply increasing the meta scan timeout to greater than the
// HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD will result in SocketTimeoutException on the scans thus
// avoiding the simulation of load on meta. See: HBASE-27487
conf.setLong(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 400);
conf.setBoolean(CLIENT_SIDE_METRICS_ENABLED_KEY, true);
try (Connection specialConnection = ConnectionFactory.createConnection(conf);
Table specialTable = specialConnection.getTable(TABLE_NAME)) {

MetricsConnection metrics =
((ConnectionImplementation) specialConnection).getConnectionMetrics();
long metaCacheNumClearServerPreFailure = metrics.metaCacheNumClearServer.getCount();

DELAY_META_SCAN = 400;
List<Get> gets = new ArrayList<>();
// we need to ensure the region look-ups eat up more time than the operation timeout without
// exceeding the scan timeout.
for (int i = 0; i < 100; i++) {
gets.add(new Get(Bytes.toBytes(i)).addColumn(FAMILY, QUALIFIER));
}
try {
specialTable.get(gets);
Assert.fail("should not reach here");
} catch (Exception e) {
RetriesExhaustedWithDetailsException expected = (RetriesExhaustedWithDetailsException) e;
Assert.assertEquals(100, expected.getNumExceptions());

// verify we do not clear the cache in this situation otherwise we will create pathological
// feedback loop with multigets See: HBASE-27487
long metaCacheNumClearServerPostFailure = metrics.metaCacheNumClearServer.getCount();
Assert.assertEquals(metaCacheNumClearServerPreFailure, metaCacheNumClearServerPostFailure);

for (Throwable cause : expected.getCauses()) {
Assert.assertTrue(cause instanceof OperationTimeoutExceededException);
}

}
}

}

/**
Expand Down Expand Up @@ -240,7 +306,12 @@ public ClientProtos.MutateResponse mutate(RpcController rpcc,
public ClientProtos.ScanResponse scan(RpcController controller,
ClientProtos.ScanRequest request) throws ServiceException {
try {
Thread.sleep(DELAY_SCAN);
String regionName = Bytes.toString(request.getRegion().getValue().toByteArray());
if (regionName.contains(TableName.META_TABLE_NAME.getNameAsString())) {
Thread.sleep(DELAY_META_SCAN);
} else {
Thread.sleep(DELAY_SCAN);
}
} catch (InterruptedException e) {
LOG.error("Sleep interrupted during scan operation", e);
}
Expand All @@ -251,7 +322,7 @@ public ClientProtos.ScanResponse scan(RpcController controller,
public ClientProtos.MultiResponse multi(RpcController rpcc, ClientProtos.MultiRequest request)
throws ServiceException {
try {
Thread.sleep(DELAY_BATCH_MUTATE);
Thread.sleep(DELAY_BATCH);
} catch (InterruptedException e) {
LOG.error("Sleep interrupted during multi operation", e);
}
Expand Down