Skip to content

Commit 0e1d607

Browse files
jinggouvirajjasani
authored andcommitted
HBASE-27902 Utility to invoke coproc on multiple servers using AsyncAdmin (#5266)
Signed-off-by: Duo Zhang <[email protected]> Signed-off-by: Viraj Jasani <[email protected]>
1 parent 8f3cb6e commit 0e1d607

File tree

2 files changed

+256
-0
lines changed

2 files changed

+256
-0
lines changed
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hbase.client;
19+
20+
import java.util.Collections;
21+
import java.util.Map;
22+
import java.util.concurrent.CompletableFuture;
23+
import java.util.concurrent.ConcurrentHashMap;
24+
import java.util.function.Function;
25+
import org.apache.hadoop.hbase.ServerName;
26+
import org.apache.hadoop.hbase.util.FutureUtils;
27+
import org.apache.yetus.audience.InterfaceAudience;
28+
29+
import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel;
30+
31+
/**
32+
* Additional Asynchronous Admin capabilities for clients.
33+
*/
34+
@InterfaceAudience.Public
35+
public final class AsyncAdminClientUtils {
36+
37+
private AsyncAdminClientUtils() {
38+
}
39+
40+
/**
41+
* Execute the given coprocessor call on all region servers.
42+
* <p>
43+
* The {@code stubMaker} is just a delegation to the {@code newStub} call. Usually it is only a
44+
* one line lambda expression, like:
45+
*
46+
* <pre>
47+
* channel -&gt; xxxService.newStub(channel)
48+
* </pre>
49+
*
50+
* @param asyncAdmin the asynchronous administrative API for HBase.
51+
* @param stubMaker a delegation to the actual {@code newStub} call.
52+
* @param callable a delegation to the actual protobuf rpc call. See the comment of
53+
* {@link ServiceCaller} for more details.
54+
* @param <S> the type of the asynchronous stub
55+
* @param <R> the type of the return value
56+
* @return Map of each region server to its result of the protobuf rpc call, wrapped by a
57+
* {@link CompletableFuture}.
58+
* @see ServiceCaller
59+
*/
60+
public static <S, R> CompletableFuture<Map<ServerName, Object>>
61+
coprocessorServiceOnAllRegionServers(AsyncAdmin asyncAdmin, Function<RpcChannel, S> stubMaker,
62+
ServiceCaller<S, R> callable) {
63+
CompletableFuture<Map<ServerName, Object>> future = new CompletableFuture<>();
64+
FutureUtils.addListener(asyncAdmin.getRegionServers(), (regionServers, error) -> {
65+
if (error != null) {
66+
future.completeExceptionally(error);
67+
return;
68+
}
69+
Map<ServerName, Object> resultMap = new ConcurrentHashMap<>();
70+
for (ServerName regionServer : regionServers) {
71+
FutureUtils.addListener(asyncAdmin.coprocessorService(stubMaker, callable, regionServer),
72+
(server, err) -> {
73+
if (err != null) {
74+
resultMap.put(regionServer, err);
75+
} else {
76+
resultMap.put(regionServer, server);
77+
}
78+
if (resultMap.size() == regionServers.size()) {
79+
future.complete(Collections.unmodifiableMap(resultMap));
80+
}
81+
});
82+
}
83+
});
84+
return future;
85+
}
86+
}
Lines changed: 170 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,170 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hbase.coprocessor;
19+
20+
import static org.junit.Assert.assertEquals;
21+
import static org.junit.Assert.assertTrue;
22+
23+
import java.io.FileNotFoundException;
24+
import java.util.Collections;
25+
import java.util.Map;
26+
import java.util.concurrent.ExecutionException;
27+
import java.util.concurrent.atomic.AtomicInteger;
28+
import org.apache.hadoop.hbase.HBaseClassTestRule;
29+
import org.apache.hadoop.hbase.HConstants;
30+
import org.apache.hadoop.hbase.ServerName;
31+
import org.apache.hadoop.hbase.client.AsyncAdminClientUtils;
32+
import org.apache.hadoop.hbase.client.ConnectionFactory;
33+
import org.apache.hadoop.hbase.client.RetriesExhaustedException;
34+
import org.apache.hadoop.hbase.client.ServiceCaller;
35+
import org.apache.hadoop.hbase.client.TestAsyncAdminBase;
36+
import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
37+
import org.apache.hadoop.hbase.testclassification.ClientTests;
38+
import org.apache.hadoop.hbase.testclassification.MediumTests;
39+
import org.junit.AfterClass;
40+
import org.junit.BeforeClass;
41+
import org.junit.ClassRule;
42+
import org.junit.Test;
43+
import org.junit.experimental.categories.Category;
44+
import org.junit.runner.RunWith;
45+
import org.junit.runners.Parameterized;
46+
47+
import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
48+
import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
49+
import org.apache.hbase.thirdparty.com.google.protobuf.Service;
50+
51+
import org.apache.hadoop.hbase.shaded.coprocessor.protobuf.generated.DummyRegionServerEndpointProtos.DummyRequest;
52+
import org.apache.hadoop.hbase.shaded.coprocessor.protobuf.generated.DummyRegionServerEndpointProtos.DummyResponse;
53+
import org.apache.hadoop.hbase.shaded.coprocessor.protobuf.generated.DummyRegionServerEndpointProtos.DummyService;
54+
55+
@RunWith(Parameterized.class)
56+
@Category({ ClientTests.class, MediumTests.class })
57+
public class TestAsyncCoprocessorOnAllRegionServersEndpoint extends TestAsyncAdminBase {
58+
@ClassRule
59+
public static final HBaseClassTestRule CLASS_RULE =
60+
HBaseClassTestRule.forClass(TestAsyncCoprocessorOnAllRegionServersEndpoint.class);
61+
62+
private static final String THROW_CLASS_NAME = "java.io.FileNotFoundException";
63+
private static final String DUMMY_VALUE = "val";
64+
private static final int NUM_SLAVES = 5;
65+
private static final int NUM_SUCCESS_REGION_SERVERS = 3;
66+
67+
@BeforeClass
68+
public static void setUpBeforeClass() throws Exception {
69+
TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 60000);
70+
TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 120000);
71+
TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
72+
TEST_UTIL.getConfiguration().setStrings(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY,
73+
ProtobufCoprocessorService.class.getName());
74+
TEST_UTIL.getConfiguration().setStrings(CoprocessorHost.REGIONSERVER_COPROCESSOR_CONF_KEY,
75+
DummyRegionServerEndpoint.class.getName());
76+
TEST_UTIL.startMiniCluster(NUM_SLAVES);
77+
ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
78+
}
79+
80+
@AfterClass
81+
public static void tearDownAfterClass() throws Exception {
82+
TEST_UTIL.shutdownMiniCluster();
83+
}
84+
85+
@Test
86+
public void testRegionServersCoprocessorService()
87+
throws ExecutionException, InterruptedException {
88+
DummyRequest request = DummyRequest.getDefaultInstance();
89+
Map<ServerName,
90+
Object> resultMap = AsyncAdminClientUtils.coprocessorServiceOnAllRegionServers(admin,
91+
DummyService::newStub, (ServiceCaller<DummyService.Stub, DummyResponse>) (stub, controller,
92+
rpcCallback) -> stub.dummyCall(controller, request, rpcCallback))
93+
.get();
94+
95+
resultMap.forEach((k, v) -> {
96+
assertTrue(v instanceof DummyResponse);
97+
DummyResponse resp = (DummyResponse) v;
98+
assertEquals(DUMMY_VALUE, resp.getValue());
99+
});
100+
}
101+
102+
@Test
103+
public void testRegionServerCoprocessorsServiceAllFail()
104+
throws ExecutionException, InterruptedException {
105+
DummyRequest request = DummyRequest.getDefaultInstance();
106+
Map<ServerName,
107+
Object> resultMap = AsyncAdminClientUtils.coprocessorServiceOnAllRegionServers(admin,
108+
DummyService::newStub, (ServiceCaller<DummyService.Stub, DummyResponse>) (stub, controller,
109+
rpcCallback) -> stub.dummyThrow(controller, request, rpcCallback))
110+
.get();
111+
112+
resultMap.forEach((k, v) -> {
113+
assertTrue(v instanceof RetriesExhaustedException);
114+
Throwable e = (Throwable) v;
115+
assertTrue(e.getMessage().contains(THROW_CLASS_NAME));
116+
});
117+
}
118+
119+
@Test
120+
public void testRegionServerCoprocessorsServicePartialFail()
121+
throws ExecutionException, InterruptedException {
122+
DummyRequest request = DummyRequest.getDefaultInstance();
123+
AtomicInteger callCount = new AtomicInteger();
124+
Map<ServerName, Object> resultMap =
125+
AsyncAdminClientUtils.coprocessorServiceOnAllRegionServers(admin, DummyService::newStub,
126+
(ServiceCaller<DummyService.Stub, DummyResponse>) (stub, controller, rpcCallback) -> {
127+
callCount.addAndGet(1);
128+
if (callCount.get() <= NUM_SUCCESS_REGION_SERVERS) {
129+
stub.dummyCall(controller, request, rpcCallback);
130+
} else {
131+
stub.dummyThrow(controller, request, rpcCallback);
132+
}
133+
}).get();
134+
135+
AtomicInteger successCallCount = new AtomicInteger();
136+
resultMap.forEach((k, v) -> {
137+
if (v instanceof DummyResponse) {
138+
successCallCount.addAndGet(1);
139+
DummyResponse resp = (DummyResponse) v;
140+
assertEquals(DUMMY_VALUE, resp.getValue());
141+
} else {
142+
assertTrue(v instanceof RetriesExhaustedException);
143+
Throwable e = (Throwable) v;
144+
assertTrue(e.getMessage().contains(THROW_CLASS_NAME));
145+
}
146+
});
147+
assertEquals(NUM_SUCCESS_REGION_SERVERS, successCallCount.get());
148+
}
149+
150+
public static class DummyRegionServerEndpoint extends DummyService
151+
implements RegionServerCoprocessor {
152+
@Override
153+
public Iterable<Service> getServices() {
154+
return Collections.singleton(this);
155+
}
156+
157+
@Override
158+
public void dummyCall(RpcController controller, DummyRequest request,
159+
RpcCallback<DummyResponse> callback) {
160+
callback.run(DummyResponse.newBuilder().setValue(DUMMY_VALUE).build());
161+
}
162+
163+
@Override
164+
public void dummyThrow(RpcController controller, DummyRequest request,
165+
RpcCallback<DummyResponse> done) {
166+
CoprocessorRpcUtils.setControllerException(controller,
167+
new FileNotFoundException("/file.txt"));
168+
}
169+
}
170+
}

0 commit comments

Comments
 (0)