Skip to content

Commit 4a48dbe

Browse files
committed
HBASE-28101 Addendum do not throw EOFException out directly (#5431)
Signed-off-by: Nihal Jain <[email protected]> (cherry picked from commit 4b76a95)
1 parent e33b173 commit 4a48dbe

File tree

1 file changed

+41
-24
lines changed

1 file changed

+41
-24
lines changed

hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcDuplexHandler.java

Lines changed: 41 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,37 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
126126
}
127127
}
128128

129+
private void finishCall(ResponseHeader responseHeader, ByteBufInputStream in, Call call)
130+
throws IOException {
131+
Message value;
132+
if (call.responseDefaultType != null) {
133+
Message.Builder builder = call.responseDefaultType.newBuilderForType();
134+
if (!builder.mergeDelimitedFrom(in)) {
135+
// The javadoc of mergeDelimitedFrom says returning false means the stream reaches EOF
136+
// before reading any bytes out, so here we need to manually finish create the EOFException
137+
// and finish the call
138+
call.setException(new EOFException("EOF while reading response with type: "
139+
+ call.responseDefaultType.getClass().getName()));
140+
return;
141+
}
142+
value = builder.build();
143+
} else {
144+
value = null;
145+
}
146+
CellScanner cellBlockScanner;
147+
if (responseHeader.hasCellBlockMeta()) {
148+
int size = responseHeader.getCellBlockMeta().getLength();
149+
// Maybe we could read directly from the ByteBuf.
150+
// The problem here is that we do not know when to release it.
151+
byte[] cellBlock = new byte[size];
152+
in.readFully(cellBlock);
153+
cellBlockScanner = cellBlockBuilder.createCellScanner(this.codec, this.compressor, cellBlock);
154+
} else {
155+
cellBlockScanner = null;
156+
}
157+
call.setResponse(value, cellBlockScanner);
158+
}
159+
129160
private void readResponse(ChannelHandlerContext ctx, ByteBuf buf) throws IOException {
130161
int totalSize = buf.readInt();
131162
ByteBufInputStream in = new ByteBufInputStream(buf);
@@ -166,31 +197,17 @@ private void readResponse(ChannelHandlerContext ctx, ByteBuf buf) throws IOExcep
166197
call.setException(remoteExc);
167198
return;
168199
}
169-
Message value;
170-
if (call.responseDefaultType != null) {
171-
Message.Builder builder = call.responseDefaultType.newBuilderForType();
172-
if (!builder.mergeDelimitedFrom(in)) {
173-
// The javadoc of mergeDelimitedFrom says returning false means the stream reaches EOF
174-
// before reading any bytes out, so here we need to manually throw the EOFException out
175-
throw new EOFException(
176-
"EOF while reading response with type: " + call.responseDefaultType.getClass().getName());
177-
}
178-
value = builder.build();
179-
} else {
180-
value = null;
181-
}
182-
CellScanner cellBlockScanner;
183-
if (responseHeader.hasCellBlockMeta()) {
184-
int size = responseHeader.getCellBlockMeta().getLength();
185-
// Maybe we could read directly from the ByteBuf.
186-
// The problem here is that we do not know when to release it.
187-
byte[] cellBlock = new byte[size];
188-
buf.readBytes(cellBlock);
189-
cellBlockScanner = cellBlockBuilder.createCellScanner(this.codec, this.compressor, cellBlock);
190-
} else {
191-
cellBlockScanner = null;
200+
try {
201+
finishCall(responseHeader, in, call);
202+
} catch (IOException e) {
203+
// As the call has been removed from id2Call map, if we hit an exception here, the
204+
// exceptionCaught method can not help us finish the call, so here we need to catch the
205+
// exception and finish it
206+
// And in netty, the decoding the frame based, when reaching here we have already read a full
207+
// frame, so hitting exception here does not mean the stream decoding is broken, thus we do
208+
// not need to throw the exception out and close the connection.
209+
call.setException(e);
192210
}
193-
call.setResponse(value, cellBlockScanner);
194211
}
195212

196213
@Override

0 commit comments

Comments
 (0)