|
25 | 25 | import org.elasticsearch.common.collect.Tuple; |
26 | 26 | import org.elasticsearch.common.io.Streams; |
27 | 27 | import org.elasticsearch.common.io.stream.BytesStreamOutput; |
| 28 | +import org.elasticsearch.common.lease.Releasable; |
28 | 29 | import org.elasticsearch.common.settings.Settings; |
29 | 30 | import org.elasticsearch.common.util.PageCacheRecycler; |
30 | 31 | import org.elasticsearch.common.util.concurrent.ThreadContext; |
|
35 | 36 | import java.util.Collections; |
36 | 37 | import java.util.List; |
37 | 38 | import java.util.Objects; |
| 39 | +import java.util.concurrent.atomic.AtomicBoolean; |
38 | 40 | import java.util.function.BiConsumer; |
39 | 41 |
|
40 | 42 | import static org.hamcrest.Matchers.instanceOf; |
@@ -172,6 +174,53 @@ public void testPipelineHandling() throws IOException { |
172 | 174 | } |
173 | 175 | } |
174 | 176 |
|
| 177 | + public void testEnsureBodyIsNotPrematurelyReleased() throws IOException { |
| 178 | + final PageCacheRecycler recycler = PageCacheRecycler.NON_RECYCLING_INSTANCE; |
| 179 | + BiConsumer<TcpChannel, InboundMessage> messageHandler = (c, m) -> {}; |
| 180 | + BiConsumer<TcpChannel, Tuple<Header, Exception>> errorHandler = (c, e) -> {}; |
| 181 | + final InboundPipeline pipeline = new InboundPipeline(Version.CURRENT, recycler, messageHandler, errorHandler); |
| 182 | + |
| 183 | + try (BytesStreamOutput streamOutput = new BytesStreamOutput()) { |
| 184 | + String actionName = "actionName"; |
| 185 | + final Version version = Version.CURRENT; |
| 186 | + final String value = randomAlphaOfLength(1000); |
| 187 | + final boolean isRequest = randomBoolean(); |
| 188 | + final long requestId = randomNonNegativeLong(); |
| 189 | + |
| 190 | + OutboundMessage message; |
| 191 | + if (isRequest) { |
| 192 | + message = new OutboundMessage.Request(threadContext, new String[0], new TestRequest(value), |
| 193 | + version, actionName, requestId, false, false); |
| 194 | + } else { |
| 195 | + message = new OutboundMessage.Response(threadContext, Collections.emptySet(), new TestResponse(value), |
| 196 | + version, requestId, false, false); |
| 197 | + } |
| 198 | + |
| 199 | + final BytesReference reference = message.serialize(streamOutput); |
| 200 | + final int fixedHeaderSize = TcpHeader.headerSize(Version.CURRENT); |
| 201 | + final int variableHeaderSize = reference.getInt(fixedHeaderSize - 4); |
| 202 | + final int totalHeaderSize = fixedHeaderSize + variableHeaderSize; |
| 203 | + final AtomicBoolean bodyReleased = new AtomicBoolean(false); |
| 204 | + for (int i = 0; i < totalHeaderSize - 1; ++i) { |
| 205 | + try (ReleasableBytesReference slice = ReleasableBytesReference.wrap(reference.slice(i, 1))) { |
| 206 | + pipeline.handleBytes(new FakeTcpChannel(), slice); |
| 207 | + } |
| 208 | + } |
| 209 | + |
| 210 | + final Releasable releasable = () -> bodyReleased.set(true); |
| 211 | + final int from = totalHeaderSize - 1; |
| 212 | + final BytesReference partHeaderPartBody = reference.slice(from, reference.length() - from - 1); |
| 213 | + try (ReleasableBytesReference slice = new ReleasableBytesReference(partHeaderPartBody, releasable)) { |
| 214 | + pipeline.handleBytes(new FakeTcpChannel(), slice); |
| 215 | + } |
| 216 | + assertFalse(bodyReleased.get()); |
| 217 | + try (ReleasableBytesReference slice = new ReleasableBytesReference(reference.slice(reference.length() - 1, 1), releasable)) { |
| 218 | + pipeline.handleBytes(new FakeTcpChannel(), slice); |
| 219 | + } |
| 220 | + assertTrue(bodyReleased.get()); |
| 221 | + } |
| 222 | + } |
| 223 | + |
175 | 224 | private static class MessageData { |
176 | 225 |
|
177 | 226 | private final Version version; |
|
0 commit comments