1919
2020package org .elasticsearch .http .nio ;
2121
22+ import io .netty .buffer .ByteBuf ;
23+ import io .netty .buffer .Unpooled ;
2224import io .netty .handler .codec .http .DefaultFullHttpRequest ;
2325import io .netty .handler .codec .http .DefaultHttpHeaders ;
2426import io .netty .handler .codec .http .FullHttpRequest ;
2830import io .netty .handler .codec .http .cookie .Cookie ;
2931import io .netty .handler .codec .http .cookie .ServerCookieDecoder ;
3032import io .netty .handler .codec .http .cookie .ServerCookieEncoder ;
31- import org .elasticsearch .common .bytes .BytesArray ;
3233import org .elasticsearch .common .bytes .BytesReference ;
3334import org .elasticsearch .http .HttpRequest ;
3435import org .elasticsearch .rest .RestRequest ;
4041import java .util .List ;
4142import java .util .Map ;
4243import java .util .Set ;
44+ import java .util .concurrent .atomic .AtomicBoolean ;
4345import java .util .stream .Collectors ;
4446
4547public class NioHttpRequest implements HttpRequest {
@@ -48,16 +50,22 @@ public class NioHttpRequest implements HttpRequest {
4850 private final BytesReference content ;
4951 private final HttpHeadersMap headers ;
5052 private final int sequence ;
53+ private final AtomicBoolean released ;
54+ private final boolean pooled ;
5155
5256 NioHttpRequest (FullHttpRequest request , int sequence ) {
57+ this (request , new HttpHeadersMap (request .headers ()), sequence , new AtomicBoolean (false ), true ,
58+ ByteBufUtils .toBytesReference (request .content ()));
59+ }
60+
61+ private NioHttpRequest (FullHttpRequest request , HttpHeadersMap headers , int sequence , AtomicBoolean released , boolean pooled ,
62+ BytesReference content ) {
5363 this .request = request ;
54- headers = new HttpHeadersMap (request .headers ());
5564 this .sequence = sequence ;
56- if (request .content ().isReadable ()) {
57- this .content = ByteBufUtils .toBytesReference (request .content ());
58- } else {
59- this .content = BytesArray .EMPTY ;
60- }
65+ this .headers = headers ;
66+ this .content = content ;
67+ this .pooled = pooled ;
68+ this .released = released ;
6169 }
6270
6371 @ Override
@@ -105,17 +113,32 @@ public String uri() {
105113
106114 @ Override
107115 public BytesReference content () {
116+ assert released .get () == false ;
108117 return content ;
109118 }
110119
111120 @ Override
112121 public void release () {
113- // NioHttpRequest works from copied unpooled bytes no need to release anything
122+ if (pooled && released .compareAndSet (false , true )) {
123+ request .release ();
124+ }
114125 }
115126
116127 @ Override
117128 public HttpRequest releaseAndCopy () {
118- return this ;
129+ assert released .get () == false ;
130+ if (pooled == false ) {
131+ return this ;
132+ }
133+ try {
134+ final ByteBuf copiedContent = Unpooled .copiedBuffer (request .content ());
135+ return new NioHttpRequest (
136+ new DefaultFullHttpRequest (request .protocolVersion (), request .method (), request .uri (), copiedContent , request .headers (),
137+ request .trailingHeaders ()),
138+ headers , sequence , new AtomicBoolean (false ), false , ByteBufUtils .toBytesReference (copiedContent ));
139+ } finally {
140+ release ();
141+ }
119142 }
120143
121144 @ Override
@@ -156,7 +179,8 @@ public HttpRequest removeHeader(String header) {
156179 trailingHeaders .remove (header );
157180 FullHttpRequest requestWithoutHeader = new DefaultFullHttpRequest (request .protocolVersion (), request .method (), request .uri (),
158181 request .content (), headersWithoutContentTypeHeader , trailingHeaders );
159- return new NioHttpRequest (requestWithoutHeader , sequence );
182+ return new NioHttpRequest (requestWithoutHeader , new HttpHeadersMap (requestWithoutHeader .headers ()), sequence , released ,
183+ pooled , content );
160184 }
161185
162186 @ Override
0 commit comments