Skip to content

Commit f879268

Browse files
authored
Add Azure support for ranged read blob operations (#54358)
This commit adds support for ranged read blob operations to the repository azure plugin. It adds the necessary plumbing down to the Azure SDK with additionnal unit tests and also adds a QA test for searchable snapshots. Relates #50999
1 parent a5c7bec commit f879268

File tree

7 files changed

+307
-42
lines changed

7 files changed

+307
-42
lines changed

plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobContainer.java

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
package org.elasticsearch.repositories.azure;
2121

22+
import com.microsoft.azure.storage.Constants;
2223
import com.microsoft.azure.storage.LocationMode;
2324
import com.microsoft.azure.storage.StorageException;
2425
import org.apache.logging.log4j.LogManager;
@@ -68,10 +69,8 @@ private boolean blobExists(String blobName) {
6869
return false;
6970
}
7071

71-
@Override
72-
public InputStream readBlob(String blobName) throws IOException {
73-
logger.trace("readBlob({})", blobName);
74-
72+
private InputStream openInputStream(String blobName, long position, @Nullable Long length) throws IOException {
73+
logger.trace("readBlob({}) from position [{}] with length [{}]", blobName, position, length != null ? length : "unlimited");
7574
if (blobStore.getLocationMode() == LocationMode.SECONDARY_ONLY && !blobExists(blobName)) {
7675
// On Azure, if the location path is a secondary location, and the blob does not
7776
// exist, instead of returning immediately from the getInputStream call below
@@ -81,9 +80,8 @@ public InputStream readBlob(String blobName) throws IOException {
8180
// stream to it.
8281
throw new NoSuchFileException("Blob [" + blobName + "] does not exist");
8382
}
84-
8583
try {
86-
return blobStore.getInputStream(buildKey(blobName));
84+
return blobStore.getInputStream(buildKey(blobName), position, length);
8785
} catch (StorageException e) {
8886
if (e.getHttpStatusCode() == HttpURLConnection.HTTP_NOT_FOUND) {
8987
throw new NoSuchFileException(e.getMessage());
@@ -94,6 +92,21 @@ public InputStream readBlob(String blobName) throws IOException {
9492
}
9593
}
9694

95+
@Override
96+
public InputStream readBlob(String blobName) throws IOException {
97+
return openInputStream(blobName, 0L, null);
98+
}
99+
100+
@Override
101+
public InputStream readBlob(String blobName, long position, long length) throws IOException {
102+
return openInputStream(blobName, position, length);
103+
}
104+
105+
@Override
106+
public long readBlobPreferredLength() {
107+
return Constants.DEFAULT_MINIMUM_READ_SIZE_IN_BYTES;
108+
}
109+
97110
@Override
98111
public void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException {
99112
logger.trace("writeBlob({}, stream, {})", buildKey(blobName), blobSize);

plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobStore.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import com.microsoft.azure.storage.LocationMode;
2323
import com.microsoft.azure.storage.StorageException;
2424
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
25+
import org.elasticsearch.common.Nullable;
2526
import org.elasticsearch.common.blobstore.BlobContainer;
2627
import org.elasticsearch.common.blobstore.BlobMetaData;
2728
import org.elasticsearch.common.blobstore.BlobPath;
@@ -100,8 +101,8 @@ public DeleteResult deleteBlobDirectory(String path, Executor executor)
100101
return service.deleteBlobDirectory(clientName, container, path, executor);
101102
}
102103

103-
public InputStream getInputStream(String blob) throws URISyntaxException, StorageException, IOException {
104-
return service.getInputStream(clientName, container, blob);
104+
public InputStream getInputStream(String blob, long position, @Nullable Long length) throws URISyntaxException, StorageException {
105+
return service.getInputStream(clientName, container, blob, position, length);
105106
}
106107

107108
public Map<String, BlobMetaData> listBlobsByPrefix(String keyPath, String prefix)

plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureStorageService.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import org.apache.logging.log4j.Logger;
4343
import org.apache.logging.log4j.message.ParameterizedMessage;
4444
import org.elasticsearch.action.support.PlainActionFuture;
45+
import org.elasticsearch.common.Nullable;
4546
import org.elasticsearch.common.blobstore.BlobMetaData;
4647
import org.elasticsearch.common.blobstore.BlobPath;
4748
import org.elasticsearch.common.blobstore.DeleteResult;
@@ -256,13 +257,13 @@ public void onAfter() {
256257
return new DeleteResult(blobsDeleted.get(), bytesDeleted.get());
257258
}
258259

259-
public InputStream getInputStream(String account, String container, String blob)
260-
throws URISyntaxException, StorageException, IOException {
260+
public InputStream getInputStream(String account, String container, String blob, long position, @Nullable Long length)
261+
throws URISyntaxException, StorageException {
261262
final Tuple<CloudBlobClient, Supplier<OperationContext>> client = client(account);
262263
final CloudBlockBlob blockBlobReference = client.v1().getContainerReference(container).getBlockBlobReference(blob);
263264
logger.trace(() -> new ParameterizedMessage("reading container [{}], blob [{}]", container, blob));
264265
final BlobInputStream is = SocketAccess.doPrivilegedException(() ->
265-
blockBlobReference.openInputStream(null, null, client.v2().get()));
266+
blockBlobReference.openInputStream(position, length, null, null, client.v2().get()));
266267
return giveSocketPermissionsToStream(is);
267268
}
268269

plugins/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureBlobContainerRetriesTests.java

Lines changed: 142 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,14 @@
2525
import com.sun.net.httpserver.HttpExchange;
2626
import com.sun.net.httpserver.HttpServer;
2727
import fixture.azure.AzureHttpHandler;
28+
import org.apache.http.HttpStatus;
2829
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
2930
import org.elasticsearch.common.Strings;
3031
import org.elasticsearch.common.SuppressForbidden;
3132
import org.elasticsearch.common.blobstore.BlobContainer;
3233
import org.elasticsearch.common.blobstore.BlobPath;
3334
import org.elasticsearch.common.bytes.BytesReference;
35+
import org.elasticsearch.common.collect.Tuple;
3436
import org.elasticsearch.common.io.Streams;
3537
import org.elasticsearch.common.lucene.store.ByteArrayIndexInput;
3638
import org.elasticsearch.common.lucene.store.InputStreamIndexInput;
@@ -63,6 +65,7 @@
6365
import java.util.Locale;
6466
import java.util.Map;
6567
import java.util.Objects;
68+
import java.util.Optional;
6669
import java.util.concurrent.ConcurrentHashMap;
6770
import java.util.concurrent.TimeUnit;
6871
import java.util.concurrent.atomic.AtomicBoolean;
@@ -81,15 +84,19 @@
8184
import static org.elasticsearch.repositories.blobstore.ESBlobStoreRepositoryIntegTestCase.randomBytes;
8285
import static org.hamcrest.Matchers.containsString;
8386
import static org.hamcrest.Matchers.equalTo;
87+
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
8488
import static org.hamcrest.Matchers.is;
8589
import static org.hamcrest.Matchers.lessThan;
90+
import static org.hamcrest.Matchers.lessThanOrEqualTo;
8691

8792
/**
8893
* This class tests how a {@link AzureBlobContainer} and its underlying SDK client are retrying requests when reading or writing blobs.
8994
*/
9095
@SuppressForbidden(reason = "use a http server")
9196
public class AzureBlobContainerRetriesTests extends ESTestCase {
9297

98+
private static final long MAX_RANGE_VAL = Long.MAX_VALUE - 1L;
99+
93100
private HttpServer httpServer;
94101
private ThreadPool threadPool;
95102

@@ -128,7 +135,7 @@ private BlobContainer createBlobContainer(final int maxRetries) {
128135
final AzureStorageService service = new AzureStorageService(clientSettings.build()) {
129136
@Override
130137
RetryPolicyFactory createRetryPolicy(final AzureStorageSettings azureStorageSettings) {
131-
return new RetryExponentialRetry(1, 100, 500, azureStorageSettings.getMaxRetries());
138+
return new RetryExponentialRetry(1, 10, 100, azureStorageSettings.getMaxRetries());
132139
}
133140

134141
@Override
@@ -150,7 +157,16 @@ BlobRequestOptions getBlobRequestOptionsForWriteBlob() {
150157

151158
public void testReadNonexistentBlobThrowsNoSuchFileException() {
152159
final BlobContainer blobContainer = createBlobContainer(between(1, 5));
153-
final Exception exception = expectThrows(NoSuchFileException.class, () -> blobContainer.readBlob("read_nonexistent_blob"));
160+
final Exception exception = expectThrows(NoSuchFileException.class,
161+
() -> {
162+
if (randomBoolean()) {
163+
blobContainer.readBlob("read_nonexistent_blob");
164+
} else {
165+
final long position = randomLongBetween(0, MAX_RANGE_VAL - 1L);
166+
final long length = randomLongBetween(1, MAX_RANGE_VAL - position);
167+
blobContainer.readBlob("read_nonexistent_blob", position, length);
168+
}
169+
});
154170
assertThat(exception.getMessage().toLowerCase(Locale.ROOT), containsString("not found"));
155171
}
156172

@@ -160,34 +176,35 @@ public void testReadBlobWithRetries() throws Exception {
160176
final CountDown countDownGet = new CountDown(maxRetries);
161177
final byte[] bytes = randomBlobContent();
162178
httpServer.createContext("/container/read_blob_max_retries", exchange -> {
163-
Streams.readFully(exchange.getRequestBody());
164-
if ("HEAD".equals(exchange.getRequestMethod())) {
165-
if (countDownHead.countDown()) {
166-
exchange.getResponseHeaders().add("Content-Type", "application/octet-stream");
167-
exchange.getResponseHeaders().add("x-ms-blob-content-length", String.valueOf(bytes.length));
168-
exchange.getResponseHeaders().add("x-ms-blob-type", "blockblob");
169-
exchange.sendResponseHeaders(RestStatus.OK.getStatus(), -1);
170-
exchange.close();
171-
return;
179+
try {
180+
Streams.readFully(exchange.getRequestBody());
181+
if ("HEAD".equals(exchange.getRequestMethod())) {
182+
if (countDownHead.countDown()) {
183+
exchange.getResponseHeaders().add("Content-Type", "application/octet-stream");
184+
exchange.getResponseHeaders().add("x-ms-blob-content-length", String.valueOf(bytes.length));
185+
exchange.getResponseHeaders().add("x-ms-blob-type", "blockblob");
186+
exchange.sendResponseHeaders(RestStatus.OK.getStatus(), -1);
187+
return;
188+
}
189+
} else if ("GET".equals(exchange.getRequestMethod())) {
190+
if (countDownGet.countDown()) {
191+
final int rangeStart = getRangeStart(exchange);
192+
assertThat(rangeStart, lessThan(bytes.length));
193+
final int length = bytes.length - rangeStart;
194+
exchange.getResponseHeaders().add("Content-Type", "application/octet-stream");
195+
exchange.getResponseHeaders().add("x-ms-blob-content-length", String.valueOf(length));
196+
exchange.getResponseHeaders().add("x-ms-blob-type", "blockblob");
197+
exchange.sendResponseHeaders(RestStatus.OK.getStatus(), length);
198+
exchange.getResponseBody().write(bytes, rangeStart, length);
199+
return;
200+
}
172201
}
173-
} else if ("GET".equals(exchange.getRequestMethod())) {
174-
if (countDownGet.countDown()) {
175-
final int rangeStart = getRangeStart(exchange);
176-
assertThat(rangeStart, lessThan(bytes.length));
177-
final int length = bytes.length - rangeStart;
178-
exchange.getResponseHeaders().add("Content-Type", "application/octet-stream");
179-
exchange.getResponseHeaders().add("x-ms-blob-content-length", String.valueOf(length));
180-
exchange.getResponseHeaders().add("x-ms-blob-type", "blockblob");
181-
exchange.sendResponseHeaders(RestStatus.OK.getStatus(), length);
182-
exchange.getResponseBody().write(bytes, rangeStart, length);
183-
exchange.close();
184-
return;
202+
if (randomBoolean()) {
203+
AzureHttpHandler.sendError(exchange, randomFrom(RestStatus.INTERNAL_SERVER_ERROR, RestStatus.SERVICE_UNAVAILABLE));
185204
}
205+
} finally {
206+
exchange.close();
186207
}
187-
if (randomBoolean()) {
188-
AzureHttpHandler.sendError(exchange, randomFrom(RestStatus.INTERNAL_SERVER_ERROR, RestStatus.SERVICE_UNAVAILABLE));
189-
}
190-
exchange.close();
191208
});
192209

193210
final BlobContainer blobContainer = createBlobContainer(maxRetries);
@@ -198,6 +215,58 @@ public void testReadBlobWithRetries() throws Exception {
198215
}
199216
}
200217

218+
public void testReadRangeBlobWithRetries() throws Exception {
219+
final int maxRetries = randomIntBetween(1, 5);
220+
final CountDown countDownHead = new CountDown(maxRetries);
221+
final CountDown countDownGet = new CountDown(maxRetries);
222+
final byte[] bytes = randomBlobContent();
223+
httpServer.createContext("/container/read_range_blob_max_retries", exchange -> {
224+
try {
225+
Streams.readFully(exchange.getRequestBody());
226+
if ("HEAD".equals(exchange.getRequestMethod())) {
227+
if (countDownHead.countDown()) {
228+
exchange.getResponseHeaders().add("Content-Type", "application/octet-stream");
229+
exchange.getResponseHeaders().add("x-ms-blob-content-length", String.valueOf(bytes.length));
230+
exchange.getResponseHeaders().add("x-ms-blob-type", "blockblob");
231+
exchange.sendResponseHeaders(RestStatus.OK.getStatus(), -1);
232+
return;
233+
}
234+
} else if ("GET".equals(exchange.getRequestMethod())) {
235+
if (countDownGet.countDown()) {
236+
final int rangeStart = getRangeStart(exchange);
237+
assertThat(rangeStart, lessThan(bytes.length));
238+
final Optional<Integer> rangeEnd = getRangeEnd(exchange);
239+
assertThat(rangeEnd.isPresent(), is(true));
240+
assertThat(rangeEnd.get(), greaterThanOrEqualTo(rangeStart));
241+
final int length = (rangeEnd.get() - rangeStart) + 1;
242+
assertThat(length, lessThanOrEqualTo(bytes.length - rangeStart));
243+
exchange.getResponseHeaders().add("Content-Type", "application/octet-stream");
244+
exchange.getResponseHeaders().add("x-ms-blob-content-length", String.valueOf(length));
245+
exchange.getResponseHeaders().add("x-ms-blob-type", "blockblob");
246+
exchange.sendResponseHeaders(RestStatus.OK.getStatus(), length);
247+
exchange.getResponseBody().write(bytes, rangeStart, length);
248+
return;
249+
}
250+
}
251+
if (randomBoolean()) {
252+
AzureHttpHandler.sendError(exchange, randomFrom(RestStatus.INTERNAL_SERVER_ERROR, RestStatus.SERVICE_UNAVAILABLE));
253+
}
254+
} finally {
255+
exchange.close();
256+
}
257+
});
258+
259+
final BlobContainer blobContainer = createBlobContainer(maxRetries);
260+
final int position = randomIntBetween(0, bytes.length - 1);
261+
final int length = randomIntBetween(1, bytes.length - position);
262+
try (InputStream inputStream = blobContainer.readBlob("read_range_blob_max_retries", position, length)) {
263+
final byte[] bytesRead = BytesReference.toBytes(Streams.readFully(inputStream));
264+
assertArrayEquals(Arrays.copyOfRange(bytes, position, Math.min(bytes.length, position + length)), bytesRead);
265+
assertThat(countDownHead.isCountedDown(), is(true));
266+
assertThat(countDownGet.isCountedDown(), is(true));
267+
}
268+
}
269+
201270
public void testWriteBlobWithRetries() throws Exception {
202271
final int maxRetries = randomIntBetween(1, 5);
203272
final CountDown countDown = new CountDown(maxRetries);
@@ -339,14 +408,56 @@ private static byte[] randomBlobContent() {
339408
return randomByteArrayOfLength(randomIntBetween(1, frequently() ? 512 : 1 << 20)); // rarely up to 1mb
340409
}
341410

342-
private static int getRangeStart(final HttpExchange exchange) {
411+
private static final Pattern RANGE_PATTERN = Pattern.compile("^bytes=([0-9]+)-([0-9]+)$");
412+
413+
private static Tuple<Long, Long> getRanges(HttpExchange exchange) {
343414
final String rangeHeader = exchange.getRequestHeaders().getFirst("X-ms-range");
344415
if (rangeHeader == null) {
345-
return 0;
416+
return Tuple.tuple(0L, MAX_RANGE_VAL);
346417
}
347418

348-
final Matcher matcher = Pattern.compile("^bytes=([0-9]+)-([0-9]+)$").matcher(rangeHeader);
419+
final Matcher matcher = RANGE_PATTERN.matcher(rangeHeader);
349420
assertTrue(rangeHeader + " matches expected pattern", matcher.matches());
350-
return Math.toIntExact(Long.parseLong(matcher.group(1)));
421+
final long rangeStart = Long.parseLong(matcher.group(1));
422+
final long rangeEnd = Long.parseLong(matcher.group(2));
423+
assertThat(rangeStart, lessThanOrEqualTo(rangeEnd));
424+
return Tuple.tuple(rangeStart, rangeEnd);
425+
}
426+
427+
private static int getRangeStart(HttpExchange exchange) {
428+
return Math.toIntExact(getRanges(exchange).v1());
429+
}
430+
431+
private static Optional<Integer> getRangeEnd(HttpExchange exchange) {
432+
final long rangeEnd = getRanges(exchange).v2();
433+
if (rangeEnd == MAX_RANGE_VAL) {
434+
return Optional.empty();
435+
}
436+
return Optional.of(Math.toIntExact(rangeEnd));
437+
}
438+
439+
private static void sendIncompleteContent(HttpExchange exchange, byte[] bytes) throws IOException {
440+
final int rangeStart = getRangeStart(exchange);
441+
assertThat(rangeStart, lessThan(bytes.length));
442+
final Optional<Integer> rangeEnd = getRangeEnd(exchange);
443+
final int length;
444+
if (rangeEnd.isPresent()) {
445+
// adapt range end to be compliant to https://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.35
446+
final int effectiveRangeEnd = Math.min(rangeEnd.get(), bytes.length - 1);
447+
length = effectiveRangeEnd - rangeStart;
448+
} else {
449+
length = bytes.length - rangeStart - 1;
450+
}
451+
exchange.getResponseHeaders().add("Content-Type", "application/octet-stream");
452+
exchange.getResponseHeaders().add("x-ms-blob-content-length", String.valueOf(length));
453+
exchange.getResponseHeaders().add("x-ms-blob-type", "blockblob");
454+
exchange.sendResponseHeaders(HttpStatus.SC_OK, length);
455+
final int bytesToSend = randomIntBetween(0, length - 1);
456+
if (bytesToSend > 0) {
457+
exchange.getResponseBody().write(bytes, rangeStart, bytesToSend);
458+
}
459+
if (randomBoolean()) {
460+
exchange.getResponseBody().flush();
461+
}
351462
}
352463
}

test/fixtures/azure-fixture/docker-compose.yml

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,3 +8,12 @@ services:
88
- ./testfixtures_shared/shared:/fixture/shared
99
ports:
1010
- "8091"
11+
12+
azure-fixture-other:
13+
build:
14+
context: .
15+
dockerfile: Dockerfile
16+
volumes:
17+
- ./testfixtures_shared/shared:/fixture/shared
18+
ports:
19+
- "8091"

0 commit comments

Comments
 (0)