Skip to content

Commit 272805c

Browse files
committed
HBASE-27271 BufferCallBeforeInitHandler should ignore the flush request (#4676)
Signed-off-by: Balazs Meszaros <[email protected]> (cherry picked from commit fb529e2) Conflicts: hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java
1 parent bcec871 commit 272805c

File tree

2 files changed

+50
-35
lines changed

2 files changed

+50
-35
lines changed

hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BufferCallBeforeInitHandler.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@
3434
@InterfaceAudience.Private
3535
class BufferCallBeforeInitHandler extends ChannelDuplexHandler {
3636

37+
static final String NAME = "BufferCall";
38+
3739
private enum BufferCallAction {
3840
FLUSH,
3941
FAIL
@@ -78,6 +80,11 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
7880
}
7981
}
8082

83+
@Override
84+
public void flush(ChannelHandlerContext ctx) throws Exception {
85+
// do not flush anything out
86+
}
87+
8188
@Override
8289
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
8390
if (evt instanceof BufferCallEvent) {

hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java

Lines changed: 43 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@
5151
import org.apache.hbase.thirdparty.io.netty.channel.Channel;
5252
import org.apache.hbase.thirdparty.io.netty.channel.ChannelFuture;
5353
import org.apache.hbase.thirdparty.io.netty.channel.ChannelFutureListener;
54-
import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandler;
54+
import org.apache.hbase.thirdparty.io.netty.channel.ChannelInitializer;
5555
import org.apache.hbase.thirdparty.io.netty.channel.ChannelOption;
5656
import org.apache.hbase.thirdparty.io.netty.channel.ChannelPipeline;
5757
import org.apache.hbase.thirdparty.io.netty.channel.EventLoop;
@@ -156,14 +156,14 @@ public void cleanupConnection() {
156156

157157
private void established(Channel ch) throws IOException {
158158
assert eventLoop.inEventLoop();
159-
ChannelPipeline p = ch.pipeline();
160-
String addBeforeHandler = p.context(BufferCallBeforeInitHandler.class).name();
161-
p.addBefore(addBeforeHandler, null,
162-
new IdleStateHandler(0, rpcClient.minIdleTimeBeforeClose, 0, TimeUnit.MILLISECONDS));
163-
p.addBefore(addBeforeHandler, null, new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4));
164-
p.addBefore(addBeforeHandler, null,
165-
new NettyRpcDuplexHandler(this, rpcClient.cellBlockBuilder, codec, compressor));
166-
p.fireUserEventTriggered(BufferCallEvent.success());
159+
ch.pipeline()
160+
.addBefore(BufferCallBeforeInitHandler.NAME, null,
161+
new IdleStateHandler(0, rpcClient.minIdleTimeBeforeClose, 0, TimeUnit.MILLISECONDS))
162+
.addBefore(BufferCallBeforeInitHandler.NAME, null,
163+
new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4))
164+
.addBefore(BufferCallBeforeInitHandler.NAME, null,
165+
new NettyRpcDuplexHandler(this, rpcClient.cellBlockBuilder, codec, compressor))
166+
.fireUserEventTriggered(BufferCallEvent.success());
167167
}
168168

169169
private boolean reloginInProgress;
@@ -218,8 +218,8 @@ private void saslNegotiate(final Channel ch) {
218218
failInit(ch, e);
219219
return;
220220
}
221-
ch.pipeline().addFirst("SaslDecoder", new SaslChallengeDecoder()).addAfter("SaslDecoder",
222-
"SaslHandler", saslHandler);
221+
ch.pipeline().addBefore(BufferCallBeforeInitHandler.NAME, null, new SaslChallengeDecoder())
222+
.addBefore(BufferCallBeforeInitHandler.NAME, null, saslHandler);
223223
NettyFutureUtils.addListener(saslPromise, new FutureListener<Boolean>() {
224224

225225
@Override
@@ -230,32 +230,33 @@ public void operationComplete(Future<Boolean> future) throws Exception {
230230
if (saslHandler.isNeedProcessConnectionHeader()) {
231231
Promise<Boolean> connectionHeaderPromise = ch.eventLoop().newPromise();
232232
// create the handler to handle the connection header
233-
ChannelHandler chHandler = new NettyHBaseRpcConnectionHeaderHandler(
234-
connectionHeaderPromise, conf, connectionHeaderWithLength);
233+
NettyHBaseRpcConnectionHeaderHandler chHandler =
234+
new NettyHBaseRpcConnectionHeaderHandler(connectionHeaderPromise, conf,
235+
connectionHeaderWithLength);
235236

236237
// add ReadTimeoutHandler to deal with server doesn't response connection header
237238
// because of the different configuration in client side and server side
238-
p.addFirst(
239-
new ReadTimeoutHandler(RpcClient.DEFAULT_SOCKET_TIMEOUT_READ, TimeUnit.MILLISECONDS));
240-
p.addLast(chHandler);
241-
NettyFutureUtils
242-
.consume(connectionHeaderPromise.addListener(new FutureListener<Boolean>() {
243-
@Override
244-
public void operationComplete(Future<Boolean> future) throws Exception {
245-
if (future.isSuccess()) {
246-
ChannelPipeline p = ch.pipeline();
247-
p.remove(ReadTimeoutHandler.class);
248-
p.remove(NettyHBaseRpcConnectionHeaderHandler.class);
249-
// don't send connection header, NettyHbaseRpcConnectionHeaderHandler
250-
// sent it already
251-
established(ch);
252-
} else {
253-
final Throwable error = future.cause();
254-
scheduleRelogin(error);
255-
failInit(ch, toIOE(error));
256-
}
239+
final String readTimeoutHandlerName = "ReadTimeout";
240+
p.addBefore(BufferCallBeforeInitHandler.NAME, readTimeoutHandlerName,
241+
new ReadTimeoutHandler(RpcClient.DEFAULT_SOCKET_TIMEOUT_READ, TimeUnit.MILLISECONDS))
242+
.addBefore(BufferCallBeforeInitHandler.NAME, null, chHandler);
243+
NettyFutureUtils.addListener(connectionHeaderPromise, new FutureListener<Boolean>() {
244+
@Override
245+
public void operationComplete(Future<Boolean> future) throws Exception {
246+
if (future.isSuccess()) {
247+
ChannelPipeline p = ch.pipeline();
248+
p.remove(readTimeoutHandlerName);
249+
p.remove(NettyHBaseRpcConnectionHeaderHandler.class);
250+
// don't send connection header, NettyHbaseRpcConnectionHeaderHandler
251+
// sent it already
252+
established(ch);
253+
} else {
254+
final Throwable error = future.cause();
255+
scheduleRelogin(error);
256+
failInit(ch, toIOE(error));
257257
}
258-
}));
258+
}
259+
});
259260
} else {
260261
// send the connection header to server
261262
NettyFutureUtils.safeWrite(ch, connectionHeaderWithLength.retainedDuplicate());
@@ -278,8 +279,15 @@ private void connect() throws UnknownHostException {
278279
.option(ChannelOption.TCP_NODELAY, rpcClient.isTcpNoDelay())
279280
.option(ChannelOption.SO_KEEPALIVE, rpcClient.tcpKeepAlive)
280281
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, rpcClient.connectTO)
281-
.handler(new BufferCallBeforeInitHandler()).localAddress(rpcClient.localAddr)
282-
.remoteAddress(remoteAddr).connect().addListener(new ChannelFutureListener() {
282+
.handler(new ChannelInitializer<Channel>() {
283+
284+
@Override
285+
protected void initChannel(Channel ch) throws Exception {
286+
ch.pipeline().addLast(BufferCallBeforeInitHandler.NAME,
287+
new BufferCallBeforeInitHandler());
288+
}
289+
}).localAddress(rpcClient.localAddr).remoteAddress(remoteAddr).connect()
290+
.addListener(new ChannelFutureListener() {
283291

284292
@Override
285293
public void operationComplete(ChannelFuture future) throws Exception {

0 commit comments

Comments
 (0)