Skip to content

Commit f72fa49

Browse files
authored
Fix S3HttpHandler chunked-encoding handling (#72378)
The `S3HttpHandler` reads the contents of the uploaded blob, but if the upload used chunked encoding then the reader would skip one or more `\r\n` sequences if they appeared at the start of a chunk. This commit reworks the reader to be stricter about its interpretation of chunks, and removes some indirection via streams since we can work pretty much entirely on the underlying `BytesReference` instead. Closes #72358
1 parent 01aad86 commit f72fa49

File tree

2 files changed

+77
-83
lines changed

2 files changed

+77
-83
lines changed

plugins/repository-s3/src/internalClusterTest/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -121,13 +121,14 @@ protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
121121
final Settings.Builder builder = Settings.builder()
122122
.put(ThreadPool.ESTIMATED_TIME_INTERVAL_SETTING.getKey(), 0) // We have tests that verify an exact wait time
123123
.put(S3ClientSettings.ENDPOINT_SETTING.getConcreteSettingForNamespace("test").getKey(), httpServerUrl())
124-
// Disable chunked encoding as it simplifies a lot the request parsing on the httpServer side
125-
.put(S3ClientSettings.DISABLE_CHUNKED_ENCODING.getConcreteSettingForNamespace("test").getKey(), true)
126124
// Disable request throttling because some random values in tests might generate too many failures for the S3 client
127125
.put(S3ClientSettings.USE_THROTTLE_RETRIES_SETTING.getConcreteSettingForNamespace("test").getKey(), false)
128126
.put(super.nodeSettings(nodeOrdinal, otherSettings))
129127
.setSecureSettings(secureSettings);
130128

129+
if (randomBoolean()) {
130+
builder.put(S3ClientSettings.DISABLE_CHUNKED_ENCODING.getConcreteSettingForNamespace("test").getKey(), randomBoolean());
131+
}
131132
if (signerOverride != null) {
132133
builder.put(S3ClientSettings.SIGNER_OVERRIDE.getConcreteSettingForNamespace("test").getKey(), signerOverride);
133134
}

test/fixtures/s3-fixture/src/main/java/fixture/s3/S3HttpHandler.java

Lines changed: 74 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -10,28 +10,33 @@
1010
import com.sun.net.httpserver.Headers;
1111
import com.sun.net.httpserver.HttpExchange;
1212
import com.sun.net.httpserver.HttpHandler;
13+
import org.apache.lucene.util.BytesRef;
14+
import org.apache.lucene.util.BytesRefIterator;
1315
import org.elasticsearch.common.Nullable;
1416
import org.elasticsearch.common.SuppressForbidden;
1517
import org.elasticsearch.common.UUIDs;
1618
import org.elasticsearch.common.bytes.BytesArray;
1719
import org.elasticsearch.common.bytes.BytesReference;
20+
import org.elasticsearch.common.bytes.CompositeBytesReference;
1821
import org.elasticsearch.common.collect.Tuple;
1922
import org.elasticsearch.common.hash.MessageDigests;
2023
import org.elasticsearch.common.io.Streams;
2124
import org.elasticsearch.common.regex.Regex;
2225
import org.elasticsearch.rest.RestStatus;
2326
import org.elasticsearch.rest.RestUtils;
2427

25-
import java.io.BufferedInputStream;
2628
import java.io.ByteArrayOutputStream;
2729
import java.io.IOException;
2830
import java.io.InputStream;
2931
import java.io.InputStreamReader;
32+
import java.io.PrintStream;
3033
import java.nio.charset.StandardCharsets;
3134
import java.security.MessageDigest;
35+
import java.util.ArrayList;
3236
import java.util.HashMap;
3337
import java.util.HashSet;
3438
import java.util.Iterator;
39+
import java.util.List;
3540
import java.util.Locale;
3641
import java.util.Map;
3742
import java.util.Objects;
@@ -272,98 +277,86 @@ private static String multipartKey(final String uploadId, int partNumber) {
272277
return uploadId + "\n" + partNumber;
273278
}
274279

275-
private static CheckedInputStream createCheckedInputStream(final InputStream inputStream, final MessageDigest digest) {
276-
return new CheckedInputStream(inputStream, new Checksum() {
277-
@Override
278-
public void update(int b) {
279-
digest.update((byte) b);
280-
}
280+
private static final Pattern chunkSignaturePattern = Pattern.compile("^([0-9a-z]+);chunk-signature=([^\\r\\n]*)$");
281281

282-
@Override
283-
public void update(byte[] b, int off, int len) {
284-
digest.update(b, off, len);
285-
}
282+
private static Tuple<String, BytesReference> parseRequestBody(final HttpExchange exchange) throws IOException {
283+
try {
284+
final BytesReference bytesReference;
286285

287-
@Override
288-
public long getValue() {
289-
throw new UnsupportedOperationException();
290-
}
286+
final String headerDecodedContentLength = exchange.getRequestHeaders().getFirst("x-amz-decoded-content-length");
287+
if (headerDecodedContentLength == null) {
288+
bytesReference = Streams.readFully(exchange.getRequestBody());
289+
} else {
290+
BytesReference requestBody = Streams.readFully(exchange.getRequestBody());
291+
int chunkIndex = 0;
292+
final List<BytesReference> chunks = new ArrayList<>();
291293

292-
@Override
293-
public void reset() {
294-
digest.reset();
295-
}
296-
});
297-
}
294+
while (true) {
295+
chunkIndex += 1;
298296

299-
private static final Pattern chunkSignaturePattern = Pattern.compile("^([0-9a-z]+);chunk-signature=([^\\r\\n]*)$");
297+
final int headerLength = requestBody.indexOf((byte) '\n', 0) + 1; // includes terminating \r\n
298+
if (headerLength == 0) {
299+
throw new IllegalStateException("header of chunk [" + chunkIndex + "] was not terminated");
300+
}
301+
if (headerLength > 150) {
302+
throw new IllegalStateException(
303+
"header of chunk [" + chunkIndex + "] was too long at [" + headerLength + "] bytes");
304+
}
305+
if (headerLength < 3) {
306+
throw new IllegalStateException(
307+
"header of chunk [" + chunkIndex + "] was too short at [" + headerLength + "] bytes");
308+
}
309+
if (requestBody.get(headerLength - 1) != '\n' || requestBody.get(headerLength - 2) != '\r') {
310+
throw new IllegalStateException("header of chunk [" + chunkIndex + "] not terminated with [\\r\\n]");
311+
}
300312

301-
private static Tuple<String, BytesReference> parseRequestBody(final HttpExchange exchange) throws IOException {
302-
final BytesReference bytesReference;
313+
final String header = requestBody.slice(0, headerLength - 2).utf8ToString();
314+
final Matcher matcher = chunkSignaturePattern.matcher(header);
315+
if (matcher.find() == false) {
316+
throw new IllegalStateException(
317+
"header of chunk [" + chunkIndex + "] did not match expected pattern: [" + header + "]");
318+
}
319+
final int chunkSize = Integer.parseUnsignedInt(matcher.group(1), 16);
303320

304-
final String headerDecodedContentLength = exchange.getRequestHeaders().getFirst("x-amz-decoded-content-length");
305-
if (headerDecodedContentLength == null) {
306-
bytesReference = Streams.readFully(exchange.getRequestBody());
307-
} else {
308-
BytesReference cc = Streams.readFully(exchange.getRequestBody());
309-
310-
final ByteArrayOutputStream blob = new ByteArrayOutputStream();
311-
try (BufferedInputStream in = new BufferedInputStream(cc.streamInput())) {
312-
int chunkSize = 0;
313-
int read;
314-
while ((read = in.read()) != -1) {
315-
boolean markAndContinue = false;
316-
try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
317-
do { // search next consecutive {carriage return, new line} chars and stop
318-
if ((char) read == '\r') {
319-
int next = in.read();
320-
if (next != -1) {
321-
if (next == '\n') {
322-
break;
323-
}
324-
out.write(read);
325-
out.write(next);
326-
continue;
327-
}
328-
}
329-
out.write(read);
330-
} while ((read = in.read()) != -1);
331-
332-
final String line = new String(out.toByteArray(), UTF_8);
333-
if (line.length() == 0 || line.equals("\r\n")) {
334-
markAndContinue = true;
335-
} else {
336-
Matcher matcher = chunkSignaturePattern.matcher(line);
337-
if (matcher.find()) {
338-
markAndContinue = true;
339-
chunkSize = Integer.parseUnsignedInt(matcher.group(1), 16);
340-
}
341-
}
342-
if (markAndContinue) {
343-
in.mark(Integer.MAX_VALUE);
344-
continue;
345-
}
321+
if (requestBody.get(headerLength + chunkSize) != '\r' || requestBody.get(headerLength + chunkSize + 1) != '\n') {
322+
throw new IllegalStateException("chunk [" + chunkIndex + "] not terminated with [\\r\\n]");
346323
}
347-
if (chunkSize > 0) {
348-
in.reset();
349-
final byte[] buffer = new byte[chunkSize];
350-
in.read(buffer, 0, buffer.length);
351-
blob.write(buffer);
352-
blob.flush();
353-
chunkSize = 0;
324+
325+
if (chunkSize != 0) {
326+
chunks.add(requestBody.slice(headerLength, chunkSize));
354327
}
328+
329+
final int toSkip = headerLength + chunkSize + 2;
330+
requestBody = requestBody.slice(toSkip, requestBody.length() - toSkip);
331+
332+
if (chunkSize == 0) {
333+
break;
334+
}
335+
}
336+
337+
bytesReference = CompositeBytesReference.of(chunks.toArray(new BytesReference[0]));
338+
339+
if (bytesReference.length() != Integer.parseInt(headerDecodedContentLength)) {
340+
throw new IllegalStateException("Something went wrong when parsing the chunked request " +
341+
"[bytes read=" + bytesReference.length() + ", expected=" + headerDecodedContentLength + "]");
355342
}
356343
}
357-
if (blob.size() != Integer.parseInt(headerDecodedContentLength)) {
358-
throw new IllegalStateException("Something went wrong when parsing the chunked request " +
359-
"[bytes read=" + blob.size() + ", expected=" + headerDecodedContentLength + "]");
344+
345+
final MessageDigest digest = MessageDigests.md5();
346+
BytesRef ref;
347+
final BytesRefIterator iterator = bytesReference.iterator();
348+
while ((ref = iterator.next()) != null) {
349+
digest.update(ref.bytes, ref.offset, ref.length);
350+
}
351+
return Tuple.tuple(MessageDigests.toHexString(digest.digest()), bytesReference);
352+
} catch (Exception e) {
353+
exchange.sendResponseHeaders(500, 0);
354+
try (PrintStream printStream = new PrintStream(exchange.getResponseBody())) {
355+
printStream.println(e.toString());
356+
e.printStackTrace(printStream);
360357
}
361-
bytesReference = new BytesArray(blob.toByteArray());
358+
throw new AssertionError("parseRequestBody failed", e);
362359
}
363-
364-
final MessageDigest digest = MessageDigests.md5();
365-
Streams.readFully(createCheckedInputStream(bytesReference.streamInput(), digest));
366-
return Tuple.tuple(MessageDigests.toHexString(digest.digest()), bytesReference);
367360
}
368361

369362
public static void sendError(final HttpExchange exchange,

0 commit comments

Comments
 (0)