Skip to content

Commit 2192d45

Browse files
committed
HBASE-27189 NettyServerRpcConnection is not properly closed when the netty channel is closed
1 parent 2197b38 commit 2192d45

File tree

4 files changed

+116
-5
lines changed

4 files changed

+116
-5
lines changed

hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcFrameDecoder.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ public NettyRpcFrameDecoder(int maxFrameLength) {
5050
this.maxFrameLength = maxFrameLength;
5151
}
5252

53-
private NettyServerRpcConnection connection;
53+
NettyServerRpcConnection connection;
5454

5555
void setConnection(NettyServerRpcConnection connection) {
5656
this.connection = connection;

hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServer.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -88,8 +88,7 @@ public class NettyRpcServer extends RpcServer {
8888

8989
private final CountDownLatch closed = new CountDownLatch(1);
9090
private final Channel serverChannel;
91-
private final ChannelGroup allChannels =
92-
new DefaultChannelGroup(GlobalEventExecutor.INSTANCE, true);
91+
final ChannelGroup allChannels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE, true);
9392
private final ByteBufAllocator channelAllocator;
9493

9594
public NettyRpcServer(Server server, String name, List<BlockingServiceAndInterface> services,

hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyServerRpcConnection.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,11 @@ class NettyServerRpcConnection extends ServerRpcConnection {
4949
NettyServerRpcConnection(NettyRpcServer rpcServer, Channel channel) {
5050
super(rpcServer);
5151
this.channel = channel;
52+
// register close hook to release resources
53+
channel.closeFuture().addListener(f -> {
54+
disposeSasl();
55+
callCleanupIfNeeded();
56+
});
5257
InetSocketAddress inetSocketAddress = ((InetSocketAddress) channel.remoteAddress());
5358
this.addr = inetSocketAddress.getAddress();
5459
if (addr == null) {
@@ -101,9 +106,7 @@ void process(ByteBuff buf) throws IOException, InterruptedException {
101106

102107
@Override
103108
public synchronized void close() {
104-
disposeSasl();
105109
channel.close();
106-
callCleanupIfNeeded();
107110
}
108111

109112
@Override
Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hbase.ipc;
19+
20+
import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.SERVICE;
21+
import static org.junit.Assert.assertEquals;
22+
import static org.junit.Assert.assertNotNull;
23+
import static org.mockito.Mockito.mock;
24+
import static org.mockito.Mockito.times;
25+
import static org.mockito.Mockito.verify;
26+
27+
import java.io.IOException;
28+
import java.net.InetSocketAddress;
29+
import java.util.concurrent.TimeUnit;
30+
import org.apache.hadoop.conf.Configuration;
31+
import org.apache.hadoop.hbase.HBaseClassTestRule;
32+
import org.apache.hadoop.hbase.HBaseConfiguration;
33+
import org.apache.hadoop.hbase.Waiter;
34+
import org.apache.hadoop.hbase.security.HBaseSaslRpcServer;
35+
import org.apache.hadoop.hbase.testclassification.MediumTests;
36+
import org.apache.hadoop.hbase.testclassification.RPCTests;
37+
import org.junit.After;
38+
import org.junit.Before;
39+
import org.junit.ClassRule;
40+
import org.junit.Test;
41+
import org.junit.experimental.categories.Category;
42+
43+
import org.apache.hbase.thirdparty.com.google.common.collect.Iterators;
44+
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
45+
import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
46+
import org.apache.hbase.thirdparty.io.netty.channel.Channel;
47+
import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup;
48+
import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioSocketChannel;
49+
50+
import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EchoRequestProto;
51+
import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto;
52+
53+
/**
54+
* Confirm that we truly close the NettyRpcConnection when the netty channel is closed.
55+
*/
56+
@Category({ RPCTests.class, MediumTests.class })
57+
public class TestNettyIPCCloseConnection {
58+
59+
@ClassRule
60+
public static final HBaseClassTestRule CLASS_RULE =
61+
HBaseClassTestRule.forClass(TestNettyIPCCloseConnection.class);
62+
63+
private static Configuration CONF = HBaseConfiguration.create();
64+
65+
private NioEventLoopGroup group;
66+
67+
private NettyRpcServer server;
68+
69+
private NettyRpcClient client;
70+
71+
private TestProtobufRpcProto.BlockingInterface stub;
72+
73+
@Before
74+
public void setUp() throws IOException {
75+
group = new NioEventLoopGroup();
76+
server = new NettyRpcServer(null, getClass().getSimpleName(),
77+
Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)),
78+
new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1), true);
79+
NettyRpcClientConfigHelper.setEventLoopConfig(CONF, group, NioSocketChannel.class);
80+
client = new NettyRpcClient(CONF);
81+
server.start();
82+
stub = TestProtobufRpcServiceImpl.newBlockingStub(client, server.getListenerAddress());
83+
}
84+
85+
@After
86+
public void tearDown() throws Exception {
87+
Closeables.close(client, true);
88+
server.stop();
89+
group.shutdownGracefully().sync();
90+
}
91+
92+
@Test
93+
public void test() throws Exception {
94+
assertEquals("test",
95+
stub.echo(null, EchoRequestProto.newBuilder().setMessage("test").build()).getMessage());
96+
Channel channel = Iterators.getOnlyElement(server.allChannels.iterator());
97+
assertNotNull(channel);
98+
NettyRpcFrameDecoder decoder = channel.pipeline().get(NettyRpcFrameDecoder.class);
99+
// set a mock saslServer to verify that it will call the dispose method of this instance
100+
HBaseSaslRpcServer saslServer = mock(HBaseSaslRpcServer.class);
101+
decoder.connection.saslServer = saslServer;
102+
client.close();
103+
// the channel should have been closed
104+
channel.closeFuture().await(5, TimeUnit.SECONDS);
105+
// verify that we have called the dispose method and set saslServer to null
106+
Waiter.waitFor(CONF, 5000, () -> decoder.connection.saslServer == null);
107+
verify(saslServer, times(1)).dispose();
108+
}
109+
}

0 commit comments

Comments
 (0)