Skip to content

Commit a6e6552

Browse files
committed
HBASE-28128 Reject requests at RPC layer when RegionServer is aborting (#5447)
Signed-off-by: Nick Dimiduk <[email protected]> Reviewed-by: Duo Zhang <[email protected]>
1 parent f4f7c84 commit a6e6552

File tree

2 files changed

+284
-12
lines changed

2 files changed

+284
-12
lines changed

hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerRpcConnection.java

Lines changed: 28 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
import org.apache.hadoop.hbase.ipc.RpcServer.CallCleanup;
4949
import org.apache.hadoop.hbase.nio.ByteBuff;
5050
import org.apache.hadoop.hbase.nio.SingleByteBuff;
51+
import org.apache.hadoop.hbase.regionserver.RegionServerAbortedException;
5152
import org.apache.hadoop.hbase.security.AccessDeniedException;
5253
import org.apache.hadoop.hbase.security.HBaseSaslRpcServer;
5354
import org.apache.hadoop.hbase.security.SaslStatus;
@@ -617,6 +618,19 @@ protected void processRequest(ByteBuff buf) throws IOException, InterruptedExcep
617618
Span span = TraceUtil.createRemoteSpan("RpcServer.process", traceCtx);
618619
try (Scope ignored = span.makeCurrent()) {
619620
int id = header.getCallId();
621+
// HBASE-28128 - if server is aborting, don't bother trying to process. It will
622+
// fail at the handler layer, but worse might result in CallQueueTooBigException if the
623+
// queue is full but server is not properly processing requests. Better to throw an aborted
624+
// exception here so that the client can properly react.
625+
if (rpcServer.server != null && rpcServer.server.isAborted()) {
626+
RegionServerAbortedException serverIsAborted = new RegionServerAbortedException(
627+
"Server " + rpcServer.server.getServerName() + " aborting");
628+
this.rpcServer.metrics.exception(serverIsAborted);
629+
sendErrorResponseForCall(id, totalRequestSize, span, serverIsAborted.getMessage(),
630+
serverIsAborted);
631+
return;
632+
}
633+
620634
if (RpcServer.LOG.isTraceEnabled()) {
621635
RpcServer.LOG.trace("RequestHeader " + TextFormat.shortDebugString(header)
622636
+ " totalRequestSize: " + totalRequestSize + " bytes");
@@ -628,14 +642,11 @@ protected void processRequest(ByteBuff buf) throws IOException, InterruptedExcep
628642
(totalRequestSize + this.rpcServer.callQueueSizeInBytes.sum())
629643
> this.rpcServer.maxQueueSizeInBytes
630644
) {
631-
final ServerCall<?> callTooBig = createCall(id, this.service, null, null, null, null,
632-
totalRequestSize, null, 0, this.callCleanup);
633645
this.rpcServer.metrics.exception(RpcServer.CALL_QUEUE_TOO_BIG_EXCEPTION);
634-
callTooBig.setResponse(null, null, RpcServer.CALL_QUEUE_TOO_BIG_EXCEPTION,
646+
sendErrorResponseForCall(id, totalRequestSize, span,
635647
"Call queue is full on " + this.rpcServer.server.getServerName()
636-
+ ", is hbase.ipc.server.max.callqueue.size too small?");
637-
TraceUtil.setError(span, RpcServer.CALL_QUEUE_TOO_BIG_EXCEPTION);
638-
callTooBig.sendResponseIfReady();
648+
+ ", is hbase.ipc.server.max.callqueue.size too small?",
649+
RpcServer.CALL_QUEUE_TOO_BIG_EXCEPTION);
639650
return;
640651
}
641652
MethodDescriptor md = null;
@@ -690,12 +701,8 @@ protected void processRequest(ByteBuff buf) throws IOException, InterruptedExcep
690701
responseThrowable = thrown;
691702
}
692703

693-
ServerCall<?> readParamsFailedCall = createCall(id, this.service, null, null, null, null,
694-
totalRequestSize, null, 0, this.callCleanup);
695-
readParamsFailedCall.setResponse(null, null, responseThrowable,
696-
msg + "; " + responseThrowable.getMessage());
697-
TraceUtil.setError(span, responseThrowable);
698-
readParamsFailedCall.sendResponseIfReady();
704+
sendErrorResponseForCall(id, totalRequestSize, span,
705+
msg + "; " + responseThrowable.getMessage(), responseThrowable);
699706
return;
700707
}
701708

@@ -725,6 +732,15 @@ protected void processRequest(ByteBuff buf) throws IOException, InterruptedExcep
725732
}
726733
}
727734

735+
private void sendErrorResponseForCall(int id, long totalRequestSize, Span span, String msg,
736+
Throwable responseThrowable) throws IOException {
737+
ServerCall<?> failedcall = createCall(id, this.service, null, null, null, null,
738+
totalRequestSize, null, 0, this.callCleanup);
739+
failedcall.setResponse(null, null, responseThrowable, msg);
740+
TraceUtil.setError(span, responseThrowable);
741+
failedcall.sendResponseIfReady();
742+
}
743+
728744
protected final RpcResponse getErrorResponse(String msg, Exception e) throws IOException {
729745
ResponseHeader.Builder headerBuilder = ResponseHeader.newBuilder().setCallId(-1);
730746
ServerCall.setExceptionResponse(e, msg, headerBuilder);
Lines changed: 256 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,256 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hbase.regionserver;
19+
20+
import static org.junit.Assert.assertEquals;
21+
import static org.junit.Assert.assertNotNull;
22+
23+
import java.io.IOException;
24+
import java.util.Optional;
25+
import java.util.concurrent.atomic.AtomicReference;
26+
import org.apache.hadoop.conf.Configuration;
27+
import org.apache.hadoop.hbase.HBaseClassTestRule;
28+
import org.apache.hadoop.hbase.HBaseTestingUtility;
29+
import org.apache.hadoop.hbase.HConstants;
30+
import org.apache.hadoop.hbase.HRegionLocation;
31+
import org.apache.hadoop.hbase.ServerName;
32+
import org.apache.hadoop.hbase.StartMiniClusterOption;
33+
import org.apache.hadoop.hbase.TableName;
34+
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
35+
import org.apache.hadoop.hbase.client.Connection;
36+
import org.apache.hadoop.hbase.client.ConnectionFactory;
37+
import org.apache.hadoop.hbase.client.Put;
38+
import org.apache.hadoop.hbase.client.RegionInfo;
39+
import org.apache.hadoop.hbase.client.RetriesExhaustedException;
40+
import org.apache.hadoop.hbase.client.Table;
41+
import org.apache.hadoop.hbase.client.TableDescriptor;
42+
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
43+
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
44+
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
45+
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
46+
import org.apache.hadoop.hbase.coprocessor.RegionObserver;
47+
import org.apache.hadoop.hbase.ipc.CallRunner;
48+
import org.apache.hadoop.hbase.ipc.PluggableBlockingQueue;
49+
import org.apache.hadoop.hbase.ipc.PriorityFunction;
50+
import org.apache.hadoop.hbase.ipc.TestPluggableQueueImpl;
51+
import org.apache.hadoop.hbase.testclassification.MediumTests;
52+
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
53+
import org.apache.hadoop.hbase.util.Bytes;
54+
import org.apache.hadoop.hbase.util.JVMClusterUtil;
55+
import org.apache.hadoop.hbase.util.Threads;
56+
import org.junit.AfterClass;
57+
import org.junit.BeforeClass;
58+
import org.junit.ClassRule;
59+
import org.junit.Test;
60+
import org.junit.experimental.categories.Category;
61+
import org.slf4j.Logger;
62+
import org.slf4j.LoggerFactory;
63+
64+
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
65+
66+
@Category({ RegionServerTests.class, MediumTests.class })
67+
public class TestRegionServerRejectDuringAbort {
68+
69+
@ClassRule
70+
public static final HBaseClassTestRule CLASS_RULE =
71+
HBaseClassTestRule.forClass(TestRegionServerRejectDuringAbort.class);
72+
73+
private static final Logger LOG =
74+
LoggerFactory.getLogger(TestRegionServerRejectDuringAbort.class);
75+
76+
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
77+
78+
private static TableName TABLE_NAME = TableName.valueOf("RSRejectOnAbort");
79+
80+
private static byte[] CF = Bytes.toBytes("cf");
81+
82+
private static final int REGIONS_NUM = 5;
83+
84+
private static final AtomicReference<Exception> THROWN_EXCEPTION = new AtomicReference<>(null);
85+
86+
private static volatile boolean shouldThrowTooBig = false;
87+
88+
@BeforeClass
89+
public static void setUp() throws Exception {
90+
// Will schedule a abort timeout task after SLEEP_TIME_WHEN_CLOSE_REGION ms
91+
UTIL.getConfiguration().set("hbase.ipc.server.callqueue.type", "pluggable");
92+
UTIL.getConfiguration().setClass("hbase.ipc.server.callqueue.pluggable.queue.class.name",
93+
CallQueueTooBigThrowingQueue.class, PluggableBlockingQueue.class);
94+
StartMiniClusterOption option = StartMiniClusterOption.builder().numRegionServers(2).build();
95+
UTIL.startMiniCluster(option);
96+
TableDescriptor td = TableDescriptorBuilder.newBuilder(TABLE_NAME)
97+
.setCoprocessor(SleepWhenCloseCoprocessor.class.getName())
98+
.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(CF).build()).build();
99+
UTIL.getAdmin().createTable(td, Bytes.toBytes("0"), Bytes.toBytes("9"), REGIONS_NUM);
100+
}
101+
102+
public static final class CallQueueTooBigThrowingQueue extends TestPluggableQueueImpl {
103+
104+
public CallQueueTooBigThrowingQueue(int maxQueueLength, PriorityFunction priority,
105+
Configuration conf) {
106+
super(maxQueueLength, priority, conf);
107+
}
108+
109+
@Override
110+
public boolean offer(CallRunner callRunner) {
111+
if (
112+
shouldThrowTooBig
113+
&& callRunner.getRpcCall().getParam() instanceof ClientProtos.MutateRequest
114+
) {
115+
return false;
116+
}
117+
return super.offer(callRunner);
118+
}
119+
}
120+
121+
@AfterClass
122+
public static void tearDown() throws Exception {
123+
UTIL.shutdownMiniCluster();
124+
}
125+
126+
/**
127+
* Tests that the logic in ServerRpcConnection works such that if the server is aborted, it short
128+
* circuits any other logic. This means we no longer even attempt to enqueue the request onto the
129+
* call queue. We verify this by using a special call queue which we can trigger to always return
130+
* CallQueueTooBigException. If the logic works, despite forcing those exceptions, we should not
131+
* see them.
132+
*/
133+
@Test
134+
public void testRejectRequestsOnAbort() throws Exception {
135+
// We don't want to disrupt the server carrying meta, because we plan to disrupt requests to
136+
// the server. Disrupting meta requests messes with the test.
137+
HRegionServer serverWithoutMeta = null;
138+
for (JVMClusterUtil.RegionServerThread regionServerThread : UTIL.getMiniHBaseCluster()
139+
.getRegionServerThreads()) {
140+
HRegionServer regionServer = regionServerThread.getRegionServer();
141+
if (
142+
regionServer.getRegions(TableName.META_TABLE_NAME).isEmpty()
143+
&& !regionServer.getRegions(TABLE_NAME).isEmpty()
144+
) {
145+
serverWithoutMeta = regionServer;
146+
break;
147+
}
148+
}
149+
150+
assertNotNull("couldn't find a server without meta, but with test table regions",
151+
serverWithoutMeta);
152+
153+
Thread writer = new Thread(getWriterThreadRunnable(serverWithoutMeta.getServerName()));
154+
writer.setDaemon(true);
155+
writer.start();
156+
157+
// Trigger the abort. Our WriterThread will detect the first RegionServerAbortedException
158+
// and trigger our custom queue to reject any more requests. This would typically result in
159+
// CallQueueTooBigException, unless our logic in ServerRpcConnection to preempt the processing
160+
// of a request is working.
161+
serverWithoutMeta.abort("Abort RS for test");
162+
163+
UTIL.waitFor(60_000, () -> THROWN_EXCEPTION.get() != null);
164+
assertEquals(THROWN_EXCEPTION.get().getCause().getClass(), RegionServerAbortedException.class);
165+
}
166+
167+
private Runnable getWriterThreadRunnable(ServerName loadServer) {
168+
return () -> {
169+
try {
170+
Configuration conf = UTIL.getConfiguration();
171+
conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 5);
172+
try (Connection conn = ConnectionFactory.createConnection(conf);
173+
Table table = conn.getTable(TABLE_NAME)) {
174+
// find the first region to exist on our test server, then submit requests to it
175+
for (HRegionLocation regionLocation : table.getRegionLocator().getAllRegionLocations()) {
176+
if (regionLocation.getServerName().equals(loadServer)) {
177+
submitRequestsToRegion(table, regionLocation.getRegion());
178+
return;
179+
}
180+
}
181+
throw new RuntimeException("Failed to find any regions for loadServer " + loadServer);
182+
}
183+
} catch (Exception e) {
184+
LOG.warn("Failed to load data", e);
185+
synchronized (THROWN_EXCEPTION) {
186+
THROWN_EXCEPTION.set(e);
187+
THROWN_EXCEPTION.notifyAll();
188+
}
189+
}
190+
};
191+
}
192+
193+
private void submitRequestsToRegion(Table table, RegionInfo regionInfo) throws IOException {
194+
// We will block closes of the regions with a CP, so no need to worry about the region getting
195+
// reassigned. Just use the same rowkey always.
196+
byte[] rowKey = getRowKeyWithin(regionInfo);
197+
198+
int i = 0;
199+
while (true) {
200+
try {
201+
i++;
202+
table.put(new Put(rowKey).addColumn(CF, Bytes.toBytes(i), Bytes.toBytes(i)));
203+
} catch (IOException e) {
204+
// only catch RegionServerAbortedException once. After that, the next exception thrown
205+
// is our test case
206+
if (
207+
!shouldThrowTooBig && e instanceof RetriesExhaustedException
208+
&& e.getCause() instanceof RegionServerAbortedException
209+
) {
210+
shouldThrowTooBig = true;
211+
} else {
212+
throw e;
213+
}
214+
}
215+
216+
// small sleep to relieve pressure
217+
Threads.sleep(10);
218+
}
219+
}
220+
221+
private byte[] getRowKeyWithin(RegionInfo regionInfo) {
222+
byte[] rowKey;
223+
// region is start of table, find one after start key
224+
if (regionInfo.getStartKey().length == 0) {
225+
if (regionInfo.getEndKey().length == 0) {
226+
// doesn't matter, single region table
227+
return Bytes.toBytes(1);
228+
} else {
229+
// find a row just before endkey
230+
rowKey = Bytes.copy(regionInfo.getEndKey());
231+
rowKey[rowKey.length - 1]--;
232+
return rowKey;
233+
}
234+
} else {
235+
return regionInfo.getStartKey();
236+
}
237+
}
238+
239+
public static class SleepWhenCloseCoprocessor implements RegionCoprocessor, RegionObserver {
240+
241+
public SleepWhenCloseCoprocessor() {
242+
}
243+
244+
@Override
245+
public Optional<RegionObserver> getRegionObserver() {
246+
return Optional.of(this);
247+
}
248+
249+
@Override
250+
public void preClose(ObserverContext<RegionCoprocessorEnvironment> c, boolean abortRequested)
251+
throws IOException {
252+
// Wait so that the region can't close until we get the information we need from our test
253+
UTIL.waitFor(60_000, () -> THROWN_EXCEPTION.get() != null);
254+
}
255+
}
256+
}

0 commit comments

Comments
 (0)