@@ -46,6 +46,7 @@ class NettyRpcFrameDecoder extends ByteToMessageDecoder {
4646 final NettyServerRpcConnection connection ;
4747
4848 private boolean requestTooBig ;
49+ private boolean requestTooBigSent ;
4950 private String requestTooBigMessage ;
5051
5152 public NettyRpcFrameDecoder (int maxFrameLength , NettyServerRpcConnection connection ) {
@@ -55,8 +56,12 @@ public NettyRpcFrameDecoder(int maxFrameLength, NettyServerRpcConnection connect
5556
5657 @ Override
5758 protected void decode (ChannelHandlerContext ctx , ByteBuf in , List <Object > out ) throws Exception {
59+ if (requestTooBigSent ) {
60+ in .skipBytes (in .readableBytes ());
61+ return ;
62+ }
5863 if (requestTooBig ) {
59- handleTooBigRequest (in );
64+ handleTooBigRequest (ctx , in );
6065 return ;
6166 }
6267
@@ -80,7 +85,7 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) t
8085 NettyRpcServer .LOG .warn (requestTooBigMessage );
8186
8287 if (connection .connectionHeaderRead ) {
83- handleTooBigRequest (in );
88+ handleTooBigRequest (ctx , in );
8489 return ;
8590 }
8691 ctx .channel ().close ();
@@ -98,7 +103,7 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) t
98103 out .add (in .readRetainedSlice (frameLengthInt ));
99104 }
100105
101- private void handleTooBigRequest (ByteBuf in ) throws IOException {
106+ private void handleTooBigRequest (ChannelHandlerContext ctx , ByteBuf in ) throws IOException {
102107 in .skipBytes (FRAME_LENGTH_FIELD_LENGTH );
103108 in .markReaderIndex ();
104109 int preIndex = in .readerIndex ();
@@ -143,6 +148,10 @@ private void handleTooBigRequest(ByteBuf in) throws IOException {
143148 // instead of calling reqTooBig.sendResponseIfReady()
144149 reqTooBig .param = null ;
145150 connection .channel .writeAndFlush (reqTooBig ).addListener (ChannelFutureListener .CLOSE );
151+ in .skipBytes (in .readableBytes ());
152+ requestTooBigSent = true ;
153+ // disable auto read as we do not care newer data from this channel any more
154+ ctx .channel ().config ().setAutoRead (false );
146155 }
147156
148157 private RPCProtos .RequestHeader getHeader (ByteBuf in , int headerSize ) throws IOException {
0 commit comments