Skip to content

Commit 8947c1e

Browse files
Save Memory on Large Repository Metadata Blob Writes (#74313)
This PR adds a new API for doing streaming serialization writes to a repository to enable repository metadata of arbitrary size and at bounded memory during writing. The existing write-APIs require knowledge of the eventual blob size beforehand. This forced us to materialize the serialized blob in memory before writing, costing a lot of memory in case of e.g. very large `RepositoryData` (and limiting us to `2G` max blob size). With this PR the requirement to fully materialize the serialized metadata goes away and the memory overhead becomes completely bounded by the outbound buffer size of the repository implementation. As we move to larger repositories this makes master node stability a lot more predictable since writing out `RepositoryData` does not take as much memory any longer (same applies to shard level metadata), enables aggregating multiple metadata blobs into a single larger blobs without massive overhead and removes the 2G size limit on `RepositoryData`.
1 parent 36ae5ac commit 8947c1e

File tree

27 files changed

+765
-182
lines changed

27 files changed

+765
-182
lines changed

modules/repository-url/src/main/java/org/elasticsearch/common/blobstore/url/URLBlobContainer.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
package org.elasticsearch.common.blobstore.url;
1010

11+
import org.elasticsearch.core.CheckedConsumer;
1112
import org.elasticsearch.core.SuppressForbidden;
1213
import org.elasticsearch.common.blobstore.BlobContainer;
1314
import org.elasticsearch.common.blobstore.BlobMetadata;
@@ -20,6 +21,7 @@
2021
import java.io.FileNotFoundException;
2122
import java.io.IOException;
2223
import java.io.InputStream;
24+
import java.io.OutputStream;
2325
import java.net.URL;
2426
import java.nio.file.NoSuchFileException;
2527
import java.security.AccessController;
@@ -121,6 +123,14 @@ public void writeBlob(String blobName, InputStream inputStream, long blobSize, b
121123
throw new UnsupportedOperationException("URL repository doesn't support this operation");
122124
}
123125

126+
@Override
127+
public void writeBlob(String blobName,
128+
boolean failIfAlreadyExists,
129+
boolean atomic,
130+
CheckedConsumer<OutputStream, IOException> writer) throws IOException {
131+
throw new UnsupportedOperationException("URL repository doesn't support this operation");
132+
}
133+
124134
@Override
125135
public void writeBlobAtomic(String blobName, BytesReference bytes, boolean failIfAlreadyExists) throws IOException {
126136
throw new UnsupportedOperationException("URL repository doesn't support this operation");

plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobContainer.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import org.apache.logging.log4j.LogManager;
1313
import org.apache.logging.log4j.Logger;
1414
import org.apache.logging.log4j.core.util.Throwables;
15+
import org.elasticsearch.core.CheckedConsumer;
1516
import org.elasticsearch.core.Nullable;
1617
import org.elasticsearch.common.blobstore.BlobContainer;
1718
import org.elasticsearch.common.blobstore.BlobMetadata;
@@ -22,6 +23,7 @@
2223

2324
import java.io.IOException;
2425
import java.io.InputStream;
26+
import java.io.OutputStream;
2527
import java.nio.file.NoSuchFileException;
2628
import java.util.Iterator;
2729
import java.util.Map;
@@ -99,6 +101,14 @@ public void writeBlob(String blobName, BytesReference bytes, boolean failIfAlrea
99101
blobStore.writeBlob(buildKey(blobName), bytes, failIfAlreadyExists);
100102
}
101103

104+
@Override
105+
public void writeBlob(String blobName,
106+
boolean failIfAlreadyExists,
107+
boolean atomic,
108+
CheckedConsumer<OutputStream, IOException> writer) throws IOException {
109+
blobStore.writeBlob(buildKey(blobName), failIfAlreadyExists, writer);
110+
}
111+
102112
@Override
103113
public DeleteResult delete() throws IOException {
104114
return blobStore.deleteBlobDirectory(keyPath);

plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobStore.java

Lines changed: 59 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@
3333
import org.apache.logging.log4j.Logger;
3434
import org.apache.logging.log4j.message.ParameterizedMessage;
3535
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
36+
import org.elasticsearch.common.util.BigArrays;
37+
import org.elasticsearch.core.CheckedConsumer;
3638
import org.elasticsearch.core.Nullable;
3739
import org.elasticsearch.common.UUIDs;
3840
import org.elasticsearch.common.blobstore.BlobContainer;
@@ -46,13 +48,15 @@
4648
import org.elasticsearch.common.unit.ByteSizeUnit;
4749
import org.elasticsearch.common.unit.ByteSizeValue;
4850
import org.elasticsearch.repositories.azure.AzureRepository.Repository;
51+
import org.elasticsearch.repositories.blobstore.ChunkedBlobOutputStream;
4952
import reactor.core.publisher.Flux;
5053
import reactor.core.publisher.Mono;
5154
import reactor.core.scheduler.Schedulers;
5255

5356
import java.io.FilterInputStream;
5457
import java.io.IOException;
5558
import java.io.InputStream;
59+
import java.io.OutputStream;
5660
import java.net.HttpURLConnection;
5761
import java.net.URI;
5862
import java.net.URISyntaxException;
@@ -82,6 +86,8 @@ public class AzureBlobStore implements BlobStore {
8286

8387
private final AzureStorageService service;
8488

89+
private final BigArrays bigArrays;
90+
8591
private final String clientName;
8692
private final String container;
8793
private final LocationMode locationMode;
@@ -90,10 +96,11 @@ public class AzureBlobStore implements BlobStore {
9096
private final Stats stats = new Stats();
9197
private final BiConsumer<String, URL> statsConsumer;
9298

93-
public AzureBlobStore(RepositoryMetadata metadata, AzureStorageService service) {
99+
public AzureBlobStore(RepositoryMetadata metadata, AzureStorageService service, BigArrays bigArrays) {
94100
this.container = Repository.CONTAINER_SETTING.get(metadata.settings());
95101
this.clientName = Repository.CLIENT_NAME.get(metadata.settings());
96102
this.service = service;
103+
this.bigArrays = bigArrays;
97104
// locationMode is set per repository, not per client
98105
this.locationMode = Repository.LOCATION_MODE_SETTING.get(metadata.settings());
99106
this.maxSinglePartUploadSize = Repository.MAX_SINGLE_PART_UPLOAD_SIZE_SETTING.get(metadata.settings());
@@ -383,6 +390,49 @@ public void writeBlob(String blobName, BytesReference bytes, boolean failIfAlrea
383390
executeSingleUpload(blobName, byteBufferFlux, bytes.length(), failIfAlreadyExists);
384391
}
385392

393+
public void writeBlob(String blobName,
394+
boolean failIfAlreadyExists,
395+
CheckedConsumer<OutputStream, IOException> writer) throws IOException {
396+
final BlockBlobAsyncClient blockBlobAsyncClient = asyncClient().getBlobContainerAsyncClient(container)
397+
.getBlobAsyncClient(blobName).getBlockBlobAsyncClient();
398+
try (ChunkedBlobOutputStream<String> out = new ChunkedBlobOutputStream<>(bigArrays, getUploadBlockSize()) {
399+
400+
@Override
401+
protected void flushBuffer() {
402+
if (buffer.size() == 0) {
403+
return;
404+
}
405+
final String blockId = makeMultipartBlockId();
406+
SocketAccess.doPrivilegedVoidException(() -> blockBlobAsyncClient.stageBlock(
407+
blockId,
408+
Flux.fromArray(BytesReference.toByteBuffers(buffer.bytes())),
409+
buffer.size()
410+
).block());
411+
finishPart(blockId);
412+
}
413+
414+
@Override
415+
protected void onCompletion() {
416+
if (flushedBytes == 0L) {
417+
writeBlob(blobName, buffer.bytes(), failIfAlreadyExists);
418+
} else {
419+
flushBuffer();
420+
SocketAccess.doPrivilegedVoidException(
421+
() -> blockBlobAsyncClient.commitBlockList(parts, failIfAlreadyExists == false).block());
422+
}
423+
}
424+
425+
@Override
426+
protected void onFailure() {
427+
// Nothing to do here, already uploaded blocks will be GCed by Azure after a week.
428+
// see https://docs.microsoft.com/en-us/rest/api/storageservices/put-block#remarks
429+
}
430+
}) {
431+
writer.accept(out);
432+
out.markSuccess();
433+
}
434+
}
435+
386436
public void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException {
387437
assert inputStream.markSupported()
388438
: "Should not be used with non-mark supporting streams as their retry handling in the SDK is broken";
@@ -439,13 +489,11 @@ private void executeMultipartUpload(String blobName, InputStream inputStream, lo
439489
assert blobSize == (((nbParts - 1) * partSize) + lastPartSize) : "blobSize does not match multipart sizes";
440490

441491
final List<String> blockIds = new ArrayList<>(nbParts);
442-
final Base64.Encoder base64Encoder = Base64.getEncoder().withoutPadding();
443-
final Base64.Decoder base64UrlDecoder = Base64.getUrlDecoder();
444492
for (int i = 0; i < nbParts; i++) {
445493
final long length = i < nbParts - 1 ? partSize : lastPartSize;
446494
Flux<ByteBuffer> byteBufferFlux = convertStreamToByteBuffer(inputStream, length, DEFAULT_UPLOAD_BUFFERS_SIZE);
447495

448-
final String blockId = base64Encoder.encodeToString(base64UrlDecoder.decode(UUIDs.base64UUID()));
496+
final String blockId = makeMultipartBlockId();
449497
blockBlobAsyncClient.stageBlock(blockId, byteBufferFlux, length).block();
450498
blockIds.add(blockId);
451499
}
@@ -454,6 +502,13 @@ private void executeMultipartUpload(String blobName, InputStream inputStream, lo
454502
});
455503
}
456504

505+
private static final Base64.Encoder base64Encoder = Base64.getEncoder().withoutPadding();
506+
private static final Base64.Decoder base64UrlDecoder = Base64.getUrlDecoder();
507+
508+
private String makeMultipartBlockId() {
509+
return base64Encoder.encodeToString(base64UrlDecoder.decode(UUIDs.base64UUID()));
510+
}
511+
457512
/**
458513
* Converts the provided input stream into a Flux of ByteBuffer. To avoid having large amounts of outstanding
459514
* memory this Flux reads the InputStream into ByteBuffers of {@code chunkSize} size.

plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureRepository.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ protected BlobStore getBlobStore() {
124124

125125
@Override
126126
protected AzureBlobStore createBlobStore() {
127-
final AzureBlobStore blobStore = new AzureBlobStore(metadata, storageService);
127+
final AzureBlobStore blobStore = new AzureBlobStore(metadata, storageService, bigArrays);
128128

129129
logger.debug(() -> new ParameterizedMessage(
130130
"using container [{}], chunk_size [{}], compress [{}], base_path [{}]",

plugins/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureBlobContainerRetriesTests.java

Lines changed: 79 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import fixture.azure.AzureHttpHandler;
1616
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
1717
import org.elasticsearch.common.Strings;
18+
import org.elasticsearch.common.util.BigArrays;
1819
import org.elasticsearch.core.SuppressForbidden;
1920
import org.elasticsearch.common.UUIDs;
2021
import org.elasticsearch.common.blobstore.BlobContainer;
@@ -58,6 +59,7 @@
5859
import java.util.concurrent.ConcurrentHashMap;
5960
import java.util.concurrent.TimeUnit;
6061
import java.util.concurrent.atomic.AtomicInteger;
62+
import java.util.concurrent.atomic.AtomicLong;
6163
import java.util.regex.Matcher;
6264
import java.util.regex.Pattern;
6365
import java.util.stream.Collectors;
@@ -172,7 +174,7 @@ int getMaxReadRetries(String clientName) {
172174
.put(MAX_SINGLE_PART_UPLOAD_SIZE_SETTING.getKey(), new ByteSizeValue(1, ByteSizeUnit.MB))
173175
.build());
174176

175-
return new AzureBlobContainer(BlobPath.EMPTY, new AzureBlobStore(repositoryMetadata, service));
177+
return new AzureBlobContainer(BlobPath.EMPTY, new AzureBlobStore(repositoryMetadata, service, BigArrays.NON_RECYCLING_INSTANCE));
176178
}
177179

178180
public void testReadNonexistentBlobThrowsNoSuchFileException() {
@@ -391,6 +393,82 @@ public void testWriteLargeBlob() throws Exception {
391393
assertThat(blocks.isEmpty(), is(true));
392394
}
393395

396+
public void testWriteLargeBlobStreaming() throws Exception {
397+
final int maxRetries = randomIntBetween(2, 5);
398+
399+
final int blobSize = (int) ByteSizeUnit.MB.toBytes(10);
400+
final byte[] data = randomBytes(blobSize);
401+
402+
final int nbErrors = 2; // we want all requests to fail at least once
403+
final AtomicInteger counterUploads = new AtomicInteger(0);
404+
final AtomicLong bytesReceived = new AtomicLong(0L);
405+
final CountDown countDownComplete = new CountDown(nbErrors);
406+
407+
final Map<String, BytesReference> blocks = new ConcurrentHashMap<>();
408+
httpServer.createContext("/account/container/write_large_blob_streaming", exchange -> {
409+
410+
if ("PUT".equals(exchange.getRequestMethod())) {
411+
final Map<String, String> params = new HashMap<>();
412+
RestUtils.decodeQueryString(exchange.getRequestURI().getRawQuery(), 0, params);
413+
414+
final String blockId = params.get("blockid");
415+
assert Strings.hasText(blockId) == false || AzureFixtureHelper.assertValidBlockId(blockId);
416+
417+
if (Strings.hasText(blockId) && (counterUploads.incrementAndGet() % 2 == 0)) {
418+
final BytesReference blockData = Streams.readFully(exchange.getRequestBody());
419+
blocks.put(blockId, blockData);
420+
bytesReceived.addAndGet(blockData.length());
421+
exchange.sendResponseHeaders(RestStatus.CREATED.getStatus(), -1);
422+
exchange.close();
423+
return;
424+
}
425+
426+
final String complete = params.get("comp");
427+
if ("blocklist".equals(complete) && (countDownComplete.countDown())) {
428+
final String blockList = Streams.copyToString(new InputStreamReader(exchange.getRequestBody(), UTF_8));
429+
final List<String> blockUids = Arrays.stream(blockList.split("<Latest>"))
430+
.filter(line -> line.contains("</Latest>"))
431+
.map(line -> line.substring(0, line.indexOf("</Latest>")))
432+
.collect(Collectors.toList());
433+
434+
final ByteArrayOutputStream blob = new ByteArrayOutputStream();
435+
for (String blockUid : blockUids) {
436+
BytesReference block = blocks.remove(blockUid);
437+
assert block != null;
438+
block.writeTo(blob);
439+
}
440+
assertArrayEquals(data, blob.toByteArray());
441+
exchange.getResponseHeaders().add("x-ms-request-server-encrypted", "false");
442+
exchange.sendResponseHeaders(RestStatus.CREATED.getStatus(), -1);
443+
exchange.close();
444+
return;
445+
}
446+
}
447+
448+
if (randomBoolean()) {
449+
Streams.readFully(exchange.getRequestBody());
450+
AzureHttpHandler.sendError(exchange, randomFrom(RestStatus.INTERNAL_SERVER_ERROR, RestStatus.SERVICE_UNAVAILABLE));
451+
}
452+
exchange.close();
453+
});
454+
455+
final BlobContainer blobContainer = createBlobContainer(maxRetries);
456+
blobContainer.writeBlob("write_large_blob_streaming", false, randomBoolean(), out -> {
457+
int outstanding = data.length;
458+
while (outstanding > 0) {
459+
if (randomBoolean()) {
460+
int toWrite = Math.toIntExact(Math.min(randomIntBetween(64, data.length), outstanding));
461+
out.write(data, data.length - outstanding, toWrite);
462+
outstanding -= toWrite;
463+
} else {
464+
out.write(data[data.length - outstanding]);
465+
outstanding--;
466+
}
467+
}
468+
});
469+
assertEquals(blobSize, bytesReceived.get());
470+
}
471+
394472
public void testRetryUntilFail() throws Exception {
395473
final int maxRetries = randomIntBetween(2, 5);
396474
final AtomicInteger requestsReceived = new AtomicInteger(0);

plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainer.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,11 @@
1515
import org.elasticsearch.common.blobstore.DeleteResult;
1616
import org.elasticsearch.common.blobstore.support.AbstractBlobContainer;
1717
import org.elasticsearch.common.bytes.BytesReference;
18+
import org.elasticsearch.core.CheckedConsumer;
1819

1920
import java.io.IOException;
2021
import java.io.InputStream;
22+
import java.io.OutputStream;
2123
import java.util.Iterator;
2224
import java.util.Map;
2325

@@ -76,6 +78,14 @@ public void writeBlob(String blobName, BytesReference bytes, boolean failIfAlrea
7678
blobStore.writeBlob(buildKey(blobName), bytes, failIfAlreadyExists);
7779
}
7880

81+
@Override
82+
public void writeBlob(String blobName,
83+
boolean failIfAlreadyExists,
84+
boolean atomic,
85+
CheckedConsumer<OutputStream, IOException> writer) throws IOException {
86+
blobStore.writeBlob(buildKey(blobName), failIfAlreadyExists, writer);
87+
}
88+
7989
@Override
8090
public void writeBlobAtomic(String blobName, BytesReference bytes, boolean failIfAlreadyExists) throws IOException {
8191
writeBlob(blobName, bytes, failIfAlreadyExists);

0 commit comments

Comments
 (0)