Skip to content

Commit 6eae477

Browse files
committed
HBASE-28101 Addendum do not throw EOFException out directly (#5431)
Signed-off-by: Nihal Jain <[email protected]> (cherry picked from commit 4b76a95)
1 parent a9e4c7b commit 6eae477

File tree

1 file changed

+41
-24
lines changed

1 file changed

+41
-24
lines changed

hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcDuplexHandler.java

Lines changed: 41 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,37 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
122122
}
123123
}
124124

125+
private void finishCall(ResponseHeader responseHeader, ByteBufInputStream in, Call call)
126+
throws IOException {
127+
Message value;
128+
if (call.responseDefaultType != null) {
129+
Message.Builder builder = call.responseDefaultType.newBuilderForType();
130+
if (!builder.mergeDelimitedFrom(in)) {
131+
// The javadoc of mergeDelimitedFrom says returning false means the stream reaches EOF
132+
// before reading any bytes out, so here we need to manually finish create the EOFException
133+
// and finish the call
134+
call.setException(new EOFException("EOF while reading response with type: "
135+
+ call.responseDefaultType.getClass().getName()));
136+
return;
137+
}
138+
value = builder.build();
139+
} else {
140+
value = null;
141+
}
142+
CellScanner cellBlockScanner;
143+
if (responseHeader.hasCellBlockMeta()) {
144+
int size = responseHeader.getCellBlockMeta().getLength();
145+
// Maybe we could read directly from the ByteBuf.
146+
// The problem here is that we do not know when to release it.
147+
byte[] cellBlock = new byte[size];
148+
in.readFully(cellBlock);
149+
cellBlockScanner = cellBlockBuilder.createCellScanner(this.codec, this.compressor, cellBlock);
150+
} else {
151+
cellBlockScanner = null;
152+
}
153+
call.setResponse(value, cellBlockScanner);
154+
}
155+
125156
private void readResponse(ChannelHandlerContext ctx, ByteBuf buf) throws IOException {
126157
int totalSize = buf.readInt();
127158
ByteBufInputStream in = new ByteBufInputStream(buf);
@@ -162,31 +193,17 @@ private void readResponse(ChannelHandlerContext ctx, ByteBuf buf) throws IOExcep
162193
call.setException(remoteExc);
163194
return;
164195
}
165-
Message value;
166-
if (call.responseDefaultType != null) {
167-
Message.Builder builder = call.responseDefaultType.newBuilderForType();
168-
if (!builder.mergeDelimitedFrom(in)) {
169-
// The javadoc of mergeDelimitedFrom says returning false means the stream reaches EOF
170-
// before reading any bytes out, so here we need to manually throw the EOFException out
171-
throw new EOFException(
172-
"EOF while reading response with type: " + call.responseDefaultType.getClass().getName());
173-
}
174-
value = builder.build();
175-
} else {
176-
value = null;
177-
}
178-
CellScanner cellBlockScanner;
179-
if (responseHeader.hasCellBlockMeta()) {
180-
int size = responseHeader.getCellBlockMeta().getLength();
181-
// Maybe we could read directly from the ByteBuf.
182-
// The problem here is that we do not know when to release it.
183-
byte[] cellBlock = new byte[size];
184-
buf.readBytes(cellBlock);
185-
cellBlockScanner = cellBlockBuilder.createCellScanner(this.codec, this.compressor, cellBlock);
186-
} else {
187-
cellBlockScanner = null;
196+
try {
197+
finishCall(responseHeader, in, call);
198+
} catch (IOException e) {
199+
// As the call has been removed from id2Call map, if we hit an exception here, the
200+
// exceptionCaught method can not help us finish the call, so here we need to catch the
201+
// exception and finish it
202+
// And in netty, the decoding the frame based, when reaching here we have already read a full
203+
// frame, so hitting exception here does not mean the stream decoding is broken, thus we do
204+
// not need to throw the exception out and close the connection.
205+
call.setException(e);
188206
}
189-
call.setResponse(value, cellBlockScanner);
190207
}
191208

192209
@Override

0 commit comments

Comments
 (0)