From 5aa0968e06b6ed513cecdb20a200e41fdad9fbac Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Mon, 28 Jun 2021 18:15:28 +0200 Subject: [PATCH 1/2] Introduce ChunkedBlobOutputStream (#74620) Extracted the chunked output stream logic from #74313 and added tests for it to make it easier to review. --- .../blobstore/ChunkedBlobOutputStream.java | 152 ++++++++++++++++++ .../ChunkedBlobOutputStreamTests.java | 149 +++++++++++++++++ 2 files changed, 301 insertions(+) create mode 100644 server/src/main/java/org/elasticsearch/repositories/blobstore/ChunkedBlobOutputStream.java create mode 100644 server/src/test/java/org/elasticsearch/repositories/blobstore/ChunkedBlobOutputStreamTests.java diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/ChunkedBlobOutputStream.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/ChunkedBlobOutputStream.java new file mode 100644 index 0000000000000..bf3c99019443d --- /dev/null +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/ChunkedBlobOutputStream.java @@ -0,0 +1,152 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ +package org.elasticsearch.repositories.blobstore; + +import org.apache.lucene.store.AlreadyClosedException; +import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput; +import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.core.Releasables; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.List; + +/** + * Base class for doing chunked writes to a blob store. Some blob stores require either up-front knowledge of the size of the blob that + * will be written or writing it in chunks that are then joined into the final blob at the end of the write. This class provides a basis + * on which to implement an output stream that encapsulates such a chunked write. + * + * @param type of chunk identifier + */ +public abstract class ChunkedBlobOutputStream extends OutputStream { + + /** + * List of identifiers of already written chunks. + */ + protected final List parts = new ArrayList<>(); + + /** + * Size of the write buffer above which it must be flushed to storage. + */ + private final long maxBytesToBuffer; + + /** + * Big arrays to be able to allocate buffers from pooled bytes. + */ + private final BigArrays bigArrays; + + /** + * Current write buffer. + */ + protected ReleasableBytesStreamOutput buffer; + + /** + * Set to true once no more calls to {@link #write} are expected and the blob has been received by {@link #write} in full so that + * {@link #close()} knows whether to clean up existing chunks or finish a chunked write. + */ + protected boolean successful = false; + + /** + * Is set to {@code true} once this stream has been closed. + */ + private boolean closed = false; + + /** + * Number of bytes flushed to blob storage so far. + */ + protected long flushedBytes = 0L; + + protected ChunkedBlobOutputStream(BigArrays bigArrays, long maxBytesToBuffer) { + this.bigArrays = bigArrays; + if (maxBytesToBuffer <= 0) { + throw new IllegalArgumentException("maximum buffer size must be positive"); + } + this.maxBytesToBuffer = maxBytesToBuffer; + buffer = new ReleasableBytesStreamOutput(bigArrays); + } + + @Override + public final void write(int b) throws IOException { + buffer.write(b); + maybeFlushBuffer(); + } + + @Override + public final void write(byte[] b, int off, int len) throws IOException { + buffer.write(b, off, len); + maybeFlushBuffer(); + } + + @Override + public final void close() throws IOException { + if (closed) { + assert false : "this output stream should only be closed once"; + throw new AlreadyClosedException("already closed"); + } + closed = true; + try { + if (successful) { + onCompletion(); + } else { + onFailure(); + } + } finally { + Releasables.close(buffer); + } + } + + /** + * Mark all blob bytes as properly received by {@link #write}, indicating that {@link #close} may finalize the blob. + */ + public final void markSuccess() { + this.successful = true; + } + + /** + * Finish writing the current buffer contents to storage and track them by the given {@code partId}. Depending on whether all contents + * have already been written either prepare the write buffer for additional writes or release the buffer. + * + * @param partId part identifier to track for use when closing + */ + protected final void finishPart(T partId) { + flushedBytes += buffer.size(); + parts.add(partId); + buffer.close(); + // only need a new buffer if we're not done yet + if (successful) { + buffer = null; + } else { + buffer = new ReleasableBytesStreamOutput(bigArrays); + } + } + + /** + * Write the contents of {@link #buffer} to storage. Implementations should call {@link #finishPart} at the end to track the the chunk + * of data just written and ready {@link #buffer} for the next write. + */ + protected abstract void flushBuffer() throws IOException; + + /** + * Invoked once all write chunks/parts are ready to be combined into the final blob. Implementations must invoke the necessary logic + * for combining the uploaded chunks into the final blob in this method. + */ + protected abstract void onCompletion() throws IOException; + + /** + * Invoked in case writing all chunks of data to storage failed. Implementations should run any cleanup required for the already + * written data in this method. + */ + protected abstract void onFailure(); + + private void maybeFlushBuffer() throws IOException { + if (buffer.size() >= maxBytesToBuffer) { + flushBuffer(); + } + } +} diff --git a/server/src/test/java/org/elasticsearch/repositories/blobstore/ChunkedBlobOutputStreamTests.java b/server/src/test/java/org/elasticsearch/repositories/blobstore/ChunkedBlobOutputStreamTests.java new file mode 100644 index 0000000000000..8dc2b8d21b2d9 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/repositories/blobstore/ChunkedBlobOutputStreamTests.java @@ -0,0 +1,149 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.repositories.blobstore; + +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.util.MockBigArrays; +import org.elasticsearch.common.util.MockPageCacheRecycler; +import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; +import org.elasticsearch.test.ESTestCase; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.zip.CRC32; +import java.util.zip.CheckedOutputStream; + +public class ChunkedBlobOutputStreamTests extends ESTestCase { + + private BigArrays bigArrays; + + @Override + public void setUp() throws Exception { + bigArrays = new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService()); + super.setUp(); + } + + @Override + public void tearDown() throws Exception { + super.tearDown(); + } + + public void testSuccessfulChunkedWrite() throws IOException { + final long chunkSize = randomLongBetween(10, 1024); + final CRC32 checksumIn = new CRC32(); + final CRC32 checksumOut = new CRC32(); + final CheckedOutputStream out = new CheckedOutputStream(OutputStream.nullOutputStream(), checksumOut); + final AtomicLong writtenBytesCounter = new AtomicLong(0L); + final long bytesToWrite = randomLongBetween(chunkSize - 5, 1000 * chunkSize); + long written = 0; + try (ChunkedBlobOutputStream stream = new ChunkedBlobOutputStream<>(bigArrays, chunkSize) { + + private final AtomicInteger partIdSupplier = new AtomicInteger(); + + @Override + protected void flushBuffer() throws IOException { + final BytesReference bytes = buffer.bytes(); + bytes.writeTo(out); + writtenBytesCounter.addAndGet(bytes.length()); + finishPart(partIdSupplier.incrementAndGet()); + } + + @Override + protected void onCompletion() throws IOException { + if (buffer.size() > 0) { + flushBuffer(); + } + out.flush(); + for (int i = 0; i < partIdSupplier.get(); i++) { + assertEquals((long) i + 1, (long) parts.get(i)); + } + } + + @Override + protected void onFailure() { + fail("not supposed to fail"); + } + }) { + final byte[] buffer = new byte[randomInt(Math.toIntExact(2 * chunkSize)) + 1]; + while (written < bytesToWrite) { + if (randomBoolean()) { + random().nextBytes(buffer); + final int offset = randomInt(buffer.length - 2) + 1; + final int length = Math.toIntExact(Math.min(bytesToWrite - written, buffer.length - offset)); + stream.write(buffer, offset, length); + checksumIn.update(buffer, offset, length); + written += length; + } else { + int oneByte = randomByte(); + stream.write(oneByte); + checksumIn.update(oneByte); + written++; + } + } + stream.markSuccess(); + } + assertEquals(bytesToWrite, written); + assertEquals(bytesToWrite, writtenBytesCounter.get()); + assertEquals(checksumIn.getValue(), checksumOut.getValue()); + } + + public void testExceptionDuringChunkedWrite() throws IOException { + final long chunkSize = randomLongBetween(10, 1024); + final AtomicLong writtenBytesCounter = new AtomicLong(0L); + final long bytesToWrite = randomLongBetween(chunkSize - 5, 1000 * chunkSize); + long written = 0; + final AtomicBoolean onFailureCalled = new AtomicBoolean(false); + try (ChunkedBlobOutputStream stream = new ChunkedBlobOutputStream<>(bigArrays, chunkSize) { + + private final AtomicInteger partIdSupplier = new AtomicInteger(); + + @Override + protected void flushBuffer() { + writtenBytesCounter.addAndGet(buffer.size()); + finishPart(partIdSupplier.incrementAndGet()); + } + + @Override + protected void onCompletion() { + fail("supposed to fail"); + } + + @Override + protected void onFailure() { + for (int i = 0; i < partIdSupplier.get(); i++) { + assertEquals((long) i + 1, (long) parts.get(i)); + } + assertTrue(onFailureCalled.compareAndSet(false, true)); + } + }) { + final byte[] buffer = new byte[randomInt(Math.toIntExact(2 * chunkSize)) + 1]; + while (written < bytesToWrite) { + if (rarely()) { + break; + } else if (randomBoolean()) { + random().nextBytes(buffer); + final int offset = randomInt(buffer.length - 2) + 1; + final int length = Math.toIntExact(Math.min(bytesToWrite - written, buffer.length - offset)); + stream.write(buffer, offset, length); + written += length; + } else { + int oneByte = randomByte(); + stream.write(oneByte); + written++; + } + } + } + assertTrue(onFailureCalled.get()); + } +} From f5d0ae20d9748091cfc12d4ec7213e1a658baa90 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Tue, 29 Jun 2021 11:29:55 +0200 Subject: [PATCH 2/2] Save Memory on Large Repository Metadata Blob Writes (#74313) This PR adds a new API for doing streaming serialization writes to a repository to enable repository metadata of arbitrary size and at bounded memory during writing. The existing write-APIs require knowledge of the eventual blob size beforehand. This forced us to materialize the serialized blob in memory before writing, costing a lot of memory in case of e.g. very large `RepositoryData` (and limiting us to `2G` max blob size). With this PR the requirement to fully materialize the serialized metadata goes away and the memory overhead becomes completely bounded by the outbound buffer size of the repository implementation. As we move to larger repositories this makes master node stability a lot more predictable since writing out `RepositoryData` does not take as much memory any longer (same applies to shard level metadata), enables aggregating multiple metadata blobs into a single larger blobs without massive overhead and removes the 2G size limit on `RepositoryData`. --- .../blobstore/url/URLBlobContainer.java | 10 ++ .../azure/AzureBlobContainer.java | 10 ++ .../repositories/azure/AzureBlobStore.java | 63 +++++++- .../repositories/azure/AzureRepository.java | 2 +- .../azure/AzureBlobContainerRetriesTests.java | 80 +++++++++- .../gcs/GoogleCloudStorageBlobContainer.java | 10 ++ .../gcs/GoogleCloudStorageBlobStore.java | 95 +++++++++--- ...CloudStorageBlobContainerRetriesTests.java | 8 +- .../repositories/hdfs/HdfsBlobContainer.java | 34 +++++ .../repositories/s3/S3BlobContainer.java | 139 ++++++++++++++---- .../repositories/s3/S3BlobStore.java | 10 +- .../repositories/s3/S3Repository.java | 2 +- .../s3/S3BlobContainerRetriesTests.java | 103 ++++++++++++- .../common/blobstore/BlobContainer.java | 14 ++ .../common/blobstore/fs/FsBlobContainer.java | 57 +++++++ .../support/FilterBlobContainer.java | 8 + .../blobstore/BlobStoreRepository.java | 68 ++++----- .../blobstore/ChecksumBlobStoreFormat.java | 65 ++++---- .../ChunkedBlobOutputStreamTests.java | 8 +- .../snapshots/BlobStoreFormatTests.java | 12 +- .../SnapshotInfoBlobSerializationTests.java | 25 +--- .../gcs/GoogleCloudStorageHttpHandler.java | 9 -- .../AbstractSnapshotIntegTestCase.java | 3 +- .../snapshots/mockstore/MockRepository.java | 61 ++++++-- .../encrypted/EncryptedRepository.java | 20 +++ .../cache/common/TestUtils.java | 12 ++ .../testkit/RepositoryAnalysisFailureIT.java | 19 +++ .../testkit/RepositoryAnalysisSuccessIT.java | 19 +++ 28 files changed, 772 insertions(+), 194 deletions(-) diff --git a/modules/repository-url/src/main/java/org/elasticsearch/common/blobstore/url/URLBlobContainer.java b/modules/repository-url/src/main/java/org/elasticsearch/common/blobstore/url/URLBlobContainer.java index 0d8e6806a43e3..db0b92a894053 100644 --- a/modules/repository-url/src/main/java/org/elasticsearch/common/blobstore/url/URLBlobContainer.java +++ b/modules/repository-url/src/main/java/org/elasticsearch/common/blobstore/url/URLBlobContainer.java @@ -8,6 +8,7 @@ package org.elasticsearch.common.blobstore.url; +import org.elasticsearch.core.CheckedConsumer; import org.elasticsearch.core.SuppressForbidden; import org.elasticsearch.common.blobstore.BlobContainer; import org.elasticsearch.common.blobstore.BlobMetadata; @@ -20,6 +21,7 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; +import java.io.OutputStream; import java.net.URL; import java.nio.file.NoSuchFileException; import java.security.AccessController; @@ -121,6 +123,14 @@ public void writeBlob(String blobName, InputStream inputStream, long blobSize, b throw new UnsupportedOperationException("URL repository doesn't support this operation"); } + @Override + public void writeBlob(String blobName, + boolean failIfAlreadyExists, + boolean atomic, + CheckedConsumer writer) throws IOException { + throw new UnsupportedOperationException("URL repository doesn't support this operation"); + } + @Override public void writeBlobAtomic(String blobName, BytesReference bytes, boolean failIfAlreadyExists) throws IOException { throw new UnsupportedOperationException("URL repository doesn't support this operation"); diff --git a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobContainer.java b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobContainer.java index 48fbe10dd862d..1fc1d6b8778d9 100644 --- a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobContainer.java +++ b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobContainer.java @@ -12,6 +12,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.core.util.Throwables; +import org.elasticsearch.core.CheckedConsumer; import org.elasticsearch.core.Nullable; import org.elasticsearch.common.blobstore.BlobContainer; import org.elasticsearch.common.blobstore.BlobMetadata; @@ -22,6 +23,7 @@ import java.io.IOException; import java.io.InputStream; +import java.io.OutputStream; import java.nio.file.NoSuchFileException; import java.util.Iterator; import java.util.Map; @@ -99,6 +101,14 @@ public void writeBlob(String blobName, BytesReference bytes, boolean failIfAlrea blobStore.writeBlob(buildKey(blobName), bytes, failIfAlreadyExists); } + @Override + public void writeBlob(String blobName, + boolean failIfAlreadyExists, + boolean atomic, + CheckedConsumer writer) throws IOException { + blobStore.writeBlob(buildKey(blobName), failIfAlreadyExists, writer); + } + @Override public DeleteResult delete() throws IOException { return blobStore.deleteBlobDirectory(keyPath); diff --git a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobStore.java b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobStore.java index e95c62fe5a92a..bf7c06af31e54 100644 --- a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobStore.java +++ b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobStore.java @@ -33,6 +33,8 @@ import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.cluster.metadata.RepositoryMetadata; +import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.core.CheckedConsumer; import org.elasticsearch.core.Nullable; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.blobstore.BlobContainer; @@ -46,6 +48,7 @@ import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.repositories.azure.AzureRepository.Repository; +import org.elasticsearch.repositories.blobstore.ChunkedBlobOutputStream; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; @@ -53,6 +56,7 @@ import java.io.FilterInputStream; import java.io.IOException; import java.io.InputStream; +import java.io.OutputStream; import java.net.HttpURLConnection; import java.net.URI; import java.net.URISyntaxException; @@ -83,6 +87,8 @@ public class AzureBlobStore implements BlobStore { private final AzureStorageService service; + private final BigArrays bigArrays; + private final String clientName; private final String container; private final LocationMode locationMode; @@ -91,10 +97,11 @@ public class AzureBlobStore implements BlobStore { private final Stats stats = new Stats(); private final BiConsumer statsConsumer; - public AzureBlobStore(RepositoryMetadata metadata, AzureStorageService service) { + public AzureBlobStore(RepositoryMetadata metadata, AzureStorageService service, BigArrays bigArrays) { this.container = Repository.CONTAINER_SETTING.get(metadata.settings()); this.clientName = Repository.CLIENT_NAME.get(metadata.settings()); this.service = service; + this.bigArrays = bigArrays; // locationMode is set per repository, not per client this.locationMode = Repository.LOCATION_MODE_SETTING.get(metadata.settings()); this.maxSinglePartUploadSize = Repository.MAX_SINGLE_PART_UPLOAD_SIZE_SETTING.get(metadata.settings()); @@ -384,6 +391,49 @@ public void writeBlob(String blobName, BytesReference bytes, boolean failIfAlrea executeSingleUpload(blobName, byteBufferFlux, bytes.length(), failIfAlreadyExists); } + public void writeBlob(String blobName, + boolean failIfAlreadyExists, + CheckedConsumer writer) throws IOException { + final BlockBlobAsyncClient blockBlobAsyncClient = asyncClient().getBlobContainerAsyncClient(container) + .getBlobAsyncClient(blobName).getBlockBlobAsyncClient(); + try (ChunkedBlobOutputStream out = new ChunkedBlobOutputStream(bigArrays, getUploadBlockSize()) { + + @Override + protected void flushBuffer() { + if (buffer.size() == 0) { + return; + } + final String blockId = makeMultipartBlockId(); + SocketAccess.doPrivilegedVoidException(() -> blockBlobAsyncClient.stageBlock( + blockId, + Flux.fromArray(BytesReference.toByteBuffers(buffer.bytes())), + buffer.size() + ).block()); + finishPart(blockId); + } + + @Override + protected void onCompletion() { + if (flushedBytes == 0L) { + writeBlob(blobName, buffer.bytes(), failIfAlreadyExists); + } else { + flushBuffer(); + SocketAccess.doPrivilegedVoidException( + () -> blockBlobAsyncClient.commitBlockList(parts, failIfAlreadyExists == false).block()); + } + } + + @Override + protected void onFailure() { + // Nothing to do here, already uploaded blocks will be GCed by Azure after a week. + // see https://docs.microsoft.com/en-us/rest/api/storageservices/put-block#remarks + } + }) { + writer.accept(out); + out.markSuccess(); + } + } + public void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException { assert inputStream.markSupported() : "Should not be used with non-mark supporting streams as their retry handling in the SDK is broken"; @@ -440,13 +490,11 @@ private void executeMultipartUpload(String blobName, InputStream inputStream, lo assert blobSize == (((nbParts - 1) * partSize) + lastPartSize) : "blobSize does not match multipart sizes"; final List blockIds = new ArrayList<>(nbParts); - final Base64.Encoder base64Encoder = Base64.getEncoder().withoutPadding(); - final Base64.Decoder base64UrlDecoder = Base64.getUrlDecoder(); for (int i = 0; i < nbParts; i++) { final long length = i < nbParts - 1 ? partSize : lastPartSize; Flux byteBufferFlux = convertStreamToByteBuffer(inputStream, length, DEFAULT_UPLOAD_BUFFERS_SIZE); - final String blockId = base64Encoder.encodeToString(base64UrlDecoder.decode(UUIDs.base64UUID())); + final String blockId = makeMultipartBlockId(); blockBlobAsyncClient.stageBlock(blockId, byteBufferFlux, length).block(); blockIds.add(blockId); } @@ -455,6 +503,13 @@ private void executeMultipartUpload(String blobName, InputStream inputStream, lo }); } + private static final Base64.Encoder base64Encoder = Base64.getEncoder().withoutPadding(); + private static final Base64.Decoder base64UrlDecoder = Base64.getUrlDecoder(); + + private String makeMultipartBlockId() { + return base64Encoder.encodeToString(base64UrlDecoder.decode(UUIDs.base64UUID())); + } + /** * Converts the provided input stream into a Flux of ByteBuffer. To avoid having large amounts of outstanding * memory this Flux reads the InputStream into ByteBuffers of {@code chunkSize} size. diff --git a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureRepository.java b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureRepository.java index 0b92f14db9c71..36a37c1b97ed4 100644 --- a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureRepository.java +++ b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureRepository.java @@ -119,7 +119,7 @@ protected BlobStore getBlobStore() { @Override protected AzureBlobStore createBlobStore() { - final AzureBlobStore blobStore = new AzureBlobStore(metadata, storageService); + final AzureBlobStore blobStore = new AzureBlobStore(metadata, storageService, bigArrays); logger.debug(() -> new ParameterizedMessage( "using container [{}], chunk_size [{}], compress [{}], base_path [{}]", diff --git a/plugins/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureBlobContainerRetriesTests.java b/plugins/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureBlobContainerRetriesTests.java index 29dde60d590e7..0909fc7a5e237 100644 --- a/plugins/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureBlobContainerRetriesTests.java +++ b/plugins/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureBlobContainerRetriesTests.java @@ -15,6 +15,7 @@ import fixture.azure.AzureHttpHandler; import org.elasticsearch.cluster.metadata.RepositoryMetadata; import org.elasticsearch.common.Strings; +import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.core.SuppressForbidden; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.blobstore.BlobContainer; @@ -58,6 +59,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.stream.Collectors; @@ -172,7 +174,7 @@ int getMaxReadRetries(String clientName) { .put(MAX_SINGLE_PART_UPLOAD_SIZE_SETTING.getKey(), new ByteSizeValue(1, ByteSizeUnit.MB)) .build()); - return new AzureBlobContainer(BlobPath.EMPTY, new AzureBlobStore(repositoryMetadata, service)); + return new AzureBlobContainer(BlobPath.EMPTY, new AzureBlobStore(repositoryMetadata, service, BigArrays.NON_RECYCLING_INSTANCE)); } public void testReadNonexistentBlobThrowsNoSuchFileException() { @@ -391,6 +393,82 @@ public void testWriteLargeBlob() throws Exception { assertThat(blocks.isEmpty(), is(true)); } + public void testWriteLargeBlobStreaming() throws Exception { + final int maxRetries = randomIntBetween(2, 5); + + final int blobSize = (int) ByteSizeUnit.MB.toBytes(10); + final byte[] data = randomBytes(blobSize); + + final int nbErrors = 2; // we want all requests to fail at least once + final AtomicInteger counterUploads = new AtomicInteger(0); + final AtomicLong bytesReceived = new AtomicLong(0L); + final CountDown countDownComplete = new CountDown(nbErrors); + + final Map blocks = new ConcurrentHashMap<>(); + httpServer.createContext("/account/container/write_large_blob_streaming", exchange -> { + + if ("PUT".equals(exchange.getRequestMethod())) { + final Map params = new HashMap<>(); + RestUtils.decodeQueryString(exchange.getRequestURI().getRawQuery(), 0, params); + + final String blockId = params.get("blockid"); + assert Strings.hasText(blockId) == false || AzureFixtureHelper.assertValidBlockId(blockId); + + if (Strings.hasText(blockId) && (counterUploads.incrementAndGet() % 2 == 0)) { + final BytesReference blockData = Streams.readFully(exchange.getRequestBody()); + blocks.put(blockId, blockData); + bytesReceived.addAndGet(blockData.length()); + exchange.sendResponseHeaders(RestStatus.CREATED.getStatus(), -1); + exchange.close(); + return; + } + + final String complete = params.get("comp"); + if ("blocklist".equals(complete) && (countDownComplete.countDown())) { + final String blockList = Streams.copyToString(new InputStreamReader(exchange.getRequestBody(), UTF_8)); + final List blockUids = Arrays.stream(blockList.split("")) + .filter(line -> line.contains("")) + .map(line -> line.substring(0, line.indexOf(""))) + .collect(Collectors.toList()); + + final ByteArrayOutputStream blob = new ByteArrayOutputStream(); + for (String blockUid : blockUids) { + BytesReference block = blocks.remove(blockUid); + assert block != null; + block.writeTo(blob); + } + assertArrayEquals(data, blob.toByteArray()); + exchange.getResponseHeaders().add("x-ms-request-server-encrypted", "false"); + exchange.sendResponseHeaders(RestStatus.CREATED.getStatus(), -1); + exchange.close(); + return; + } + } + + if (randomBoolean()) { + Streams.readFully(exchange.getRequestBody()); + AzureHttpHandler.sendError(exchange, randomFrom(RestStatus.INTERNAL_SERVER_ERROR, RestStatus.SERVICE_UNAVAILABLE)); + } + exchange.close(); + }); + + final BlobContainer blobContainer = createBlobContainer(maxRetries); + blobContainer.writeBlob("write_large_blob_streaming", false, randomBoolean(), out -> { + int outstanding = data.length; + while (outstanding > 0) { + if (randomBoolean()) { + int toWrite = Math.toIntExact(Math.min(randomIntBetween(64, data.length), outstanding)); + out.write(data, data.length - outstanding, toWrite); + outstanding -= toWrite; + } else { + out.write(data[data.length - outstanding]); + outstanding--; + } + } + }); + assertEquals(blobSize, bytesReceived.get()); + } + public void testRetryUntilFail() throws Exception { final int maxRetries = randomIntBetween(2, 5); final AtomicInteger requestsReceived = new AtomicInteger(0); diff --git a/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainer.java b/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainer.java index 54f6427e0746b..064a10cf3b825 100644 --- a/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainer.java +++ b/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainer.java @@ -15,9 +15,11 @@ import org.elasticsearch.common.blobstore.DeleteResult; import org.elasticsearch.common.blobstore.support.AbstractBlobContainer; import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.core.CheckedConsumer; import java.io.IOException; import java.io.InputStream; +import java.io.OutputStream; import java.util.Iterator; import java.util.Map; @@ -76,6 +78,14 @@ public void writeBlob(String blobName, BytesReference bytes, boolean failIfAlrea blobStore.writeBlob(buildKey(blobName), bytes, failIfAlreadyExists); } + @Override + public void writeBlob(String blobName, + boolean failIfAlreadyExists, + boolean atomic, + CheckedConsumer writer) throws IOException { + blobStore.writeBlob(buildKey(blobName), failIfAlreadyExists, writer); + } + @Override public void writeBlobAtomic(String blobName, BytesReference bytes, boolean failIfAlreadyExists) throws IOException { writeBlob(blobName, bytes, failIfAlreadyExists); diff --git a/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStore.java b/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStore.java index 2284ae83c0c85..84fdbe4e4b889 100644 --- a/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStore.java +++ b/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStore.java @@ -22,6 +22,7 @@ import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.core.CheckedConsumer; import org.elasticsearch.core.SuppressForbidden; import org.elasticsearch.common.blobstore.BlobContainer; import org.elasticsearch.common.blobstore.BlobMetadata; @@ -37,8 +38,10 @@ import org.elasticsearch.common.unit.ByteSizeValue; import java.io.ByteArrayInputStream; +import java.io.FilterOutputStream; import java.io.IOException; import java.io.InputStream; +import java.io.OutputStream; import java.nio.ByteBuffer; import java.nio.channels.Channels; import java.nio.channels.WritableByteChannel; @@ -265,6 +268,52 @@ long getLargeBlobThresholdInBytes() { {Storage.BlobWriteOption.doesNotExist(), Storage.BlobWriteOption.md5Match()}; private static final Storage.BlobWriteOption[] OVERWRITE_CHECK_MD5 = {Storage.BlobWriteOption.md5Match()}; + void writeBlob(String blobName, boolean failIfAlreadyExists, CheckedConsumer writer) throws IOException { + final BlobInfo blobInfo = BlobInfo.newBuilder(bucketName, blobName).build(); + final Storage.BlobWriteOption[] writeOptions = failIfAlreadyExists ? NO_OVERWRITE_NO_MD5 : OVERWRITE_NO_MD5; + + StorageException storageException = null; + + for (int retry = 0; retry < 3; ++retry) { + try { + final WriteChannel writeChannel = SocketAccess.doPrivilegedIOException(() -> client().writer(blobInfo, writeOptions)); + try (OutputStream out = new FilterOutputStream(Channels.newOutputStream(new WritableBlobChannel(writeChannel))) { + @Override + public void write(byte[] b, int off, int len) throws IOException { + int written = 0; + while (written < len) { + // at most write the default chunk size in one go to prevent allocating huge buffers in the SDK + // see com.google.cloud.BaseWriteChannel#DEFAULT_CHUNK_SIZE + final int toWrite = Math.min(len - written, 60 * 256 * 1024); + out.write(b, off + written, toWrite); + written += toWrite; + } + } + }) { + writer.accept(out); + SocketAccess.doPrivilegedVoidIOException(writeChannel::close); + } + stats.trackPutOperation(); + return; + } catch (final StorageException se) { + final int errorCode = se.getCode(); + if (errorCode == HTTP_GONE) { + logger.warn(() -> new ParameterizedMessage("Retrying broken resumable upload session for blob {}", blobInfo), se); + storageException = ExceptionsHelper.useOrSuppress(storageException, se); + continue; + } else if (failIfAlreadyExists && errorCode == HTTP_PRECON_FAILED) { + throw new FileAlreadyExistsException(blobInfo.getBlobId().getName(), null, se.getMessage()); + } + if (storageException != null) { + se.addSuppressed(storageException); + } + throw se; + } + } + assert storageException != null; + throw storageException; + } + /** * Uploads a blob using the "resumable upload" method (multiple requests, which * can be independently retried in case of failure, see @@ -297,24 +346,9 @@ private void writeBlobResumable(BlobInfo blobInfo, InputStream inputStream, long * It is not enough to wrap the call to Streams#copy, we have to wrap the privileged calls too; this is because Streams#copy * is in the stacktrace and is not granted the permissions needed to close and write the channel. */ - org.elasticsearch.core.internal.io.Streams.copy(inputStream, Channels.newOutputStream(new WritableByteChannel() { - - @SuppressForbidden(reason = "channel is based on a socket") - @Override - public int write(final ByteBuffer src) throws IOException { - return SocketAccess.doPrivilegedIOException(() -> writeChannel.write(src)); - } - - @Override - public boolean isOpen() { - return writeChannel.isOpen(); - } - - @Override - public void close() throws IOException { - SocketAccess.doPrivilegedVoidIOException(writeChannel::close); - } - }), buffer); + org.elasticsearch.core.internal.io.Streams.copy( + inputStream, Channels.newOutputStream(new WritableBlobChannel(writeChannel)), buffer); + SocketAccess.doPrivilegedVoidIOException(writeChannel::close); // We don't track this operation on the http layer as // we do with the GET/LIST operations since this operations // can trigger multiple underlying http requests but only one @@ -478,4 +512,29 @@ private static String buildKey(String keyPath, String s) { public Map stats() { return stats.toMap(); } + + private static final class WritableBlobChannel implements WritableByteChannel { + + private final WriteChannel channel; + + WritableBlobChannel(WriteChannel writeChannel) { + this.channel = writeChannel; + } + + @SuppressForbidden(reason = "channel is based on a socket") + @Override + public int write(final ByteBuffer src) throws IOException { + return SocketAccess.doPrivilegedIOException(() -> channel.write(src)); + } + + @Override + public boolean isOpen() { + return channel.isOpen(); + } + + @Override + public void close() { + // we manually close the channel later to have control over whether or not we want to finalize a blob + } + } } diff --git a/plugins/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainerRetriesTests.java b/plugins/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainerRetriesTests.java index a94630cd5b86a..a3a0b17138f4c 100644 --- a/plugins/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainerRetriesTests.java +++ b/plugins/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainerRetriesTests.java @@ -357,8 +357,12 @@ public void testWriteLargeBlob() throws IOException { final TimeValue readTimeout = allowReadTimeout.get() ? TimeValue.timeValueSeconds(3) : null; final BlobContainer blobContainer = createBlobContainer(nbErrors + 1, readTimeout, null, null); - try (InputStream stream = new InputStreamIndexInput(new ByteArrayIndexInput("desc", data), data.length)) { - blobContainer.writeBlob("write_large_blob", stream, data.length, false); + if (randomBoolean()) { + try (InputStream stream = new InputStreamIndexInput(new ByteArrayIndexInput("desc", data), data.length)) { + blobContainer.writeBlob("write_large_blob", stream, data.length, false); + } + } else { + blobContainer.writeBlob("write_large_blob", false, randomBoolean(), out -> out.write(data)); } assertThat(countInits.get(), equalTo(0)); diff --git a/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsBlobContainer.java b/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsBlobContainer.java index 52e951e258c86..b60b0a969429d 100644 --- a/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsBlobContainer.java +++ b/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsBlobContainer.java @@ -15,6 +15,7 @@ import org.apache.hadoop.fs.Options; import org.apache.hadoop.fs.Options.CreateOpts; import org.apache.hadoop.fs.Path; +import org.elasticsearch.core.CheckedConsumer; import org.elasticsearch.core.Nullable; import org.elasticsearch.common.blobstore.BlobContainer; import org.elasticsearch.common.blobstore.BlobMetadata; @@ -31,6 +32,7 @@ import java.io.FilterInputStream; import java.io.IOException; import java.io.InputStream; +import java.io.OutputStream; import java.nio.file.FileAlreadyExistsException; import java.nio.file.NoSuchFileException; import java.util.Collections; @@ -156,6 +158,38 @@ public void writeBlob(String blobName, BytesReference bytes, boolean failIfAlrea }); } + @Override + public void writeBlob(String blobName, + boolean failIfAlreadyExists, + boolean atomic, + CheckedConsumer writer) throws IOException { + Path blob = new Path(path, blobName); + if (atomic) { + final Path tempBlobPath = new Path(path, FsBlobContainer.tempBlobName(blobName)); + store.execute((Operation) fileContext -> { + try (FSDataOutputStream stream = fileContext.create(tempBlobPath, EnumSet.of(CreateFlag.CREATE, CreateFlag.SYNC_BLOCK))) { + writer.accept(stream); + fileContext.rename(tempBlobPath, blob, failIfAlreadyExists ? Options.Rename.NONE : Options.Rename.OVERWRITE); + } catch (org.apache.hadoop.fs.FileAlreadyExistsException faee) { + throw new FileAlreadyExistsException(blob.toString(), null, faee.getMessage()); + } + return null; + }); + } else { + // we pass CREATE, which means it fails if a blob already exists. + final EnumSet flags = failIfAlreadyExists ? EnumSet.of(CreateFlag.CREATE, CreateFlag.SYNC_BLOCK) + : EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE, CreateFlag.SYNC_BLOCK); + store.execute((Operation) fileContext -> { + try (FSDataOutputStream stream = fileContext.create(blob, flags)) { + writer.accept(stream); + } catch (org.apache.hadoop.fs.FileAlreadyExistsException faee) { + throw new FileAlreadyExistsException(blob.toString(), null, faee.getMessage()); + } + return null; + }); + } + } + @Override public void writeBlobAtomic(String blobName, BytesReference bytes, boolean failIfAlreadyExists) throws IOException { final String tempBlob = FsBlobContainer.tempBlobName(blobName); diff --git a/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java b/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java index 05e0cb7aff730..3fe9496c18502 100644 --- a/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java +++ b/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java @@ -27,6 +27,7 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.lucene.util.SetOnce; import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.core.CheckedConsumer; import org.elasticsearch.core.Nullable; import org.elasticsearch.common.Strings; import org.elasticsearch.common.blobstore.BlobContainer; @@ -41,10 +42,12 @@ import org.elasticsearch.core.Tuple; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.repositories.blobstore.ChunkedBlobOutputStream; import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; +import java.io.OutputStream; import java.util.ArrayList; import java.util.Collections; import java.util.Iterator; @@ -129,6 +132,106 @@ public void writeBlob(String blobName, InputStream inputStream, long blobSize, b }); } + @Override + public void writeBlob(String blobName, + boolean failIfAlreadyExists, + boolean atomic, + CheckedConsumer writer) throws IOException { + try (AmazonS3Reference clientReference = blobStore.clientReference(); + ChunkedBlobOutputStream out = new ChunkedBlobOutputStream( + blobStore.bigArrays(), blobStore.bufferSizeInBytes()) { + + private final SetOnce uploadId = new SetOnce<>(); + + @Override + protected void flushBuffer() throws IOException { + flushBuffer(false); + } + + private void flushBuffer(boolean lastPart) throws IOException { + if (buffer.size() == 0) { + return; + } + if (flushedBytes == 0L) { + assert lastPart == false : "use single part upload if there's only a single part"; + uploadId.set(SocketAccess.doPrivileged(() -> + clientReference.client().initiateMultipartUpload(initiateMultiPartUpload(blobName)).getUploadId())); + if (Strings.isEmpty(uploadId.get())) { + throw new IOException("Failed to initialize multipart upload " + blobName); + } + } + assert lastPart == false || successful : "must only write last part if successful"; + final UploadPartRequest uploadRequest = createPartUploadRequest( + buffer.bytes().streamInput(), uploadId.get(), parts.size() + 1, blobName, buffer.size(), lastPart); + final UploadPartResult uploadResponse = + SocketAccess.doPrivileged(() -> clientReference.client().uploadPart(uploadRequest)); + finishPart(uploadResponse.getPartETag()); + } + + @Override + protected void onCompletion() throws IOException { + if (flushedBytes == 0L) { + writeBlob(blobName, buffer.bytes(), failIfAlreadyExists); + } else { + flushBuffer(true); + final CompleteMultipartUploadRequest complRequest = + new CompleteMultipartUploadRequest(blobStore.bucket(), blobName, uploadId.get(), parts); + complRequest.setRequestMetricCollector(blobStore.multiPartUploadMetricCollector); + SocketAccess.doPrivilegedVoid(() -> clientReference.client().completeMultipartUpload(complRequest)); + } + } + + @Override + protected void onFailure() { + if (Strings.hasText(uploadId.get())) { + abortMultiPartUpload(uploadId.get(), blobName); + } + } + }) { + writer.accept(out); + out.markSuccess(); + } + } + + private UploadPartRequest createPartUploadRequest(InputStream stream, + String uploadId, + int number, + String blobName, + long size, + boolean lastPart) { + final UploadPartRequest uploadRequest = new UploadPartRequest(); + uploadRequest.setBucketName(blobStore.bucket()); + uploadRequest.setKey(blobName); + uploadRequest.setUploadId(uploadId); + uploadRequest.setPartNumber(number); + uploadRequest.setInputStream(stream); + uploadRequest.setRequestMetricCollector(blobStore.multiPartUploadMetricCollector); + uploadRequest.setPartSize(size); + uploadRequest.setLastPart(lastPart); + return uploadRequest; + } + + private void abortMultiPartUpload(String uploadId, String blobName) { + final AbortMultipartUploadRequest abortRequest = + new AbortMultipartUploadRequest(blobStore.bucket(), blobName, uploadId); + try (AmazonS3Reference clientReference = blobStore.clientReference()) { + SocketAccess.doPrivilegedVoid(() -> clientReference.client().abortMultipartUpload(abortRequest)); + } + } + + private InitiateMultipartUploadRequest initiateMultiPartUpload(String blobName) { + final InitiateMultipartUploadRequest initRequest = new InitiateMultipartUploadRequest(blobStore.bucket(), blobName); + initRequest.setStorageClass(blobStore.getStorageClass()); + initRequest.setCannedACL(blobStore.getCannedACL()); + initRequest.setRequestMetricCollector(blobStore.multiPartUploadMetricCollector); + if (blobStore.serverSideEncryption()) { + final ObjectMetadata md = new ObjectMetadata(); + md.setSSEAlgorithm(ObjectMetadata.AES_256_SERVER_SIDE_ENCRYPTION); + initRequest.setObjectMetadata(md); + } + return initRequest; + } + // package private for testing long getLargeBlobThresholdInBytes() { return blobStore.bufferSizeInBytes(); @@ -389,19 +492,10 @@ void executeMultipartUpload(final S3BlobStore blobStore, final SetOnce uploadId = new SetOnce<>(); final String bucketName = blobStore.bucket(); boolean success = false; - - final InitiateMultipartUploadRequest initRequest = new InitiateMultipartUploadRequest(bucketName, blobName); - initRequest.setStorageClass(blobStore.getStorageClass()); - initRequest.setCannedACL(blobStore.getCannedACL()); - initRequest.setRequestMetricCollector(blobStore.multiPartUploadMetricCollector); - if (blobStore.serverSideEncryption()) { - final ObjectMetadata md = new ObjectMetadata(); - md.setSSEAlgorithm(ObjectMetadata.AES_256_SERVER_SIDE_ENCRYPTION); - initRequest.setObjectMetadata(md); - } try (AmazonS3Reference clientReference = blobStore.clientReference()) { - uploadId.set(SocketAccess.doPrivileged(() -> clientReference.client().initiateMultipartUpload(initRequest).getUploadId())); + uploadId.set(SocketAccess.doPrivileged(() -> + clientReference.client().initiateMultipartUpload(initiateMultiPartUpload(blobName)).getUploadId())); if (Strings.isEmpty(uploadId.get())) { throw new IOException("Failed to initialize multipart upload " + blobName); } @@ -410,21 +504,9 @@ void executeMultipartUpload(final S3BlobStore blobStore, long bytesCount = 0; for (int i = 1; i <= nbParts; i++) { - final UploadPartRequest uploadRequest = new UploadPartRequest(); - uploadRequest.setBucketName(bucketName); - uploadRequest.setKey(blobName); - uploadRequest.setUploadId(uploadId.get()); - uploadRequest.setPartNumber(i); - uploadRequest.setInputStream(input); - uploadRequest.setRequestMetricCollector(blobStore.multiPartUploadMetricCollector); - - if (i < nbParts) { - uploadRequest.setPartSize(partSize); - uploadRequest.setLastPart(false); - } else { - uploadRequest.setPartSize(lastPartSize); - uploadRequest.setLastPart(true); - } + final boolean lastPart = i == nbParts; + final UploadPartRequest uploadRequest = + createPartUploadRequest(input, uploadId.get(), i, blobName, lastPart ? lastPartSize : partSize, lastPart); bytesCount += uploadRequest.getPartSize(); final UploadPartResult uploadResponse = SocketAccess.doPrivileged(() -> clientReference.client().uploadPart(uploadRequest)); @@ -446,10 +528,7 @@ void executeMultipartUpload(final S3BlobStore blobStore, throw new IOException("Unable to upload object [" + blobName + "] using multipart upload", e); } finally { if ((success == false) && Strings.hasLength(uploadId.get())) { - final AbortMultipartUploadRequest abortRequest = new AbortMultipartUploadRequest(bucketName, blobName, uploadId.get()); - try (AmazonS3Reference clientReference = blobStore.clientReference()) { - SocketAccess.doPrivilegedVoid(() -> clientReference.client().abortMultipartUpload(abortRequest)); - } + abortMultiPartUpload(uploadId.get(), blobName); } } } diff --git a/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobStore.java b/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobStore.java index a3279c1ef4976..90a86c4910ba5 100644 --- a/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobStore.java +++ b/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobStore.java @@ -22,6 +22,7 @@ import org.elasticsearch.common.blobstore.BlobStore; import org.elasticsearch.common.blobstore.BlobStoreException; import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.util.BigArrays; import java.io.IOException; import java.util.HashMap; @@ -35,6 +36,8 @@ class S3BlobStore implements BlobStore { private final S3Service service; + private final BigArrays bigArrays; + private final String bucket; private final ByteSizeValue bufferSize; @@ -56,8 +59,9 @@ class S3BlobStore implements BlobStore { S3BlobStore(S3Service service, String bucket, boolean serverSideEncryption, ByteSizeValue bufferSize, String cannedACL, String storageClass, - RepositoryMetadata repositoryMetadata) { + RepositoryMetadata repositoryMetadata, BigArrays bigArrays) { this.service = service; + this.bigArrays = bigArrays; this.bucket = bucket; this.serverSideEncryption = serverSideEncryption; this.bufferSize = bufferSize; @@ -136,6 +140,10 @@ public String bucket() { return bucket; } + public BigArrays bigArrays() { + return bigArrays; + } + public boolean serverSideEncryption() { return serverSideEncryption; } diff --git a/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java b/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java index b8c7a84182187..d8c4e49de1ea3 100644 --- a/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java +++ b/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java @@ -336,7 +336,7 @@ private void logCooldownInfo() { @Override protected S3BlobStore createBlobStore() { - return new S3BlobStore(service, bucket, serverSideEncryption, bufferSize, cannedACL, storageClass, repositoryMetadata); + return new S3BlobStore(service, bucket, serverSideEncryption, bufferSize, cannedACL, storageClass, metadata, bigArrays); } // only use for testing diff --git a/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java b/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java index 5505cd592a8e6..fb26d06a8a3ce 100644 --- a/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java +++ b/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java @@ -12,6 +12,7 @@ import com.amazonaws.util.Base16; import org.apache.http.HttpStatus; import org.elasticsearch.cluster.metadata.RepositoryMetadata; +import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.core.Nullable; import org.elasticsearch.core.SuppressForbidden; import org.elasticsearch.common.blobstore.BlobContainer; @@ -42,6 +43,7 @@ import java.util.Locale; import java.util.Objects; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import static org.elasticsearch.repositories.s3.S3ClientSettings.DISABLE_CHUNKED_ENCODING; import static org.elasticsearch.repositories.s3.S3ClientSettings.ENDPOINT_SETTING; @@ -126,7 +128,7 @@ protected BlobContainer createBlobContainer(final @Nullable Integer maxRetries, bufferSize == null ? S3Repository.BUFFER_SIZE_SETTING.getDefault(Settings.EMPTY) : bufferSize, S3Repository.CANNED_ACL_SETTING.getDefault(Settings.EMPTY), S3Repository.STORAGE_CLASS_SETTING.getDefault(Settings.EMPTY), - repositoryMetadata)) { + repositoryMetadata, BigArrays.NON_RECYCLING_INSTANCE)) { @Override public InputStream readBlob(String blobName) throws IOException { return new AssertingInputStream(super.readBlob(blobName), blobName); @@ -296,6 +298,105 @@ public void testWriteLargeBlob() throws Exception { assertThat(countDownComplete.isCountedDown(), is(true)); } + public void testWriteLargeBlobStreaming() throws Exception { + final boolean useTimeout = rarely(); + final TimeValue readTimeout = useTimeout ? TimeValue.timeValueMillis(randomIntBetween(100, 500)) : null; + final ByteSizeValue bufferSize = new ByteSizeValue(5, ByteSizeUnit.MB); + final BlobContainer blobContainer = createBlobContainer(null, readTimeout, true, bufferSize); + + final int parts = randomIntBetween(1, 5); + final long lastPartSize = randomLongBetween(10, 512); + final long blobSize = (parts * bufferSize.getBytes()) + lastPartSize; + + final int nbErrors = 2; // we want all requests to fail at least once + final CountDown countDownInitiate = new CountDown(nbErrors); + final AtomicInteger counterUploads = new AtomicInteger(0); + final AtomicLong bytesReceived = new AtomicLong(0L); + final CountDown countDownComplete = new CountDown(nbErrors); + + httpServer.createContext("/bucket/write_large_blob_streaming", exchange -> { + final long contentLength = Long.parseLong(exchange.getRequestHeaders().getFirst("Content-Length")); + + if ("POST".equals(exchange.getRequestMethod()) + && exchange.getRequestURI().getQuery().equals("uploads")) { + // initiate multipart upload request + if (countDownInitiate.countDown()) { + byte[] response = ("\n" + + "\n" + + " bucket\n" + + " write_large_blob_streaming\n" + + " TEST\n" + + "").getBytes(StandardCharsets.UTF_8); + exchange.getResponseHeaders().add("Content-Type", "application/xml"); + exchange.sendResponseHeaders(HttpStatus.SC_OK, response.length); + exchange.getResponseBody().write(response); + exchange.close(); + return; + } + } else if ("PUT".equals(exchange.getRequestMethod()) + && exchange.getRequestURI().getQuery().contains("uploadId=TEST") + && exchange.getRequestURI().getQuery().contains("partNumber=")) { + // upload part request + MD5DigestCalculatingInputStream md5 = new MD5DigestCalculatingInputStream(exchange.getRequestBody()); + BytesReference bytes = Streams.readFully(md5); + + if (counterUploads.incrementAndGet() % 2 == 0) { + bytesReceived.addAndGet(bytes.length()); + exchange.getResponseHeaders().add("ETag", Base16.encodeAsString(md5.getMd5Digest())); + exchange.sendResponseHeaders(HttpStatus.SC_OK, -1); + exchange.close(); + return; + } + + } else if ("POST".equals(exchange.getRequestMethod()) + && exchange.getRequestURI().getQuery().equals("uploadId=TEST")) { + // complete multipart upload request + if (countDownComplete.countDown()) { + Streams.readFully(exchange.getRequestBody()); + byte[] response = ("\n" + + "\n" + + " bucket\n" + + " write_large_blob_streaming\n" + + "").getBytes(StandardCharsets.UTF_8); + exchange.getResponseHeaders().add("Content-Type", "application/xml"); + exchange.sendResponseHeaders(HttpStatus.SC_OK, response.length); + exchange.getResponseBody().write(response); + exchange.close(); + return; + } + } + + // sends an error back or let the request time out + if (useTimeout == false) { + if (randomBoolean() && contentLength > 0) { + Streams.readFully(exchange.getRequestBody(), new byte[randomIntBetween(1, Math.toIntExact(contentLength - 1))]); + } else { + Streams.readFully(exchange.getRequestBody()); + exchange.sendResponseHeaders(randomFrom(HttpStatus.SC_INTERNAL_SERVER_ERROR, HttpStatus.SC_BAD_GATEWAY, + HttpStatus.SC_SERVICE_UNAVAILABLE, HttpStatus.SC_GATEWAY_TIMEOUT), -1); + } + exchange.close(); + } + }); + + blobContainer.writeBlob("write_large_blob_streaming", false, randomBoolean(), out -> { + final byte[] buffer = new byte[16 * 1024]; + long outstanding = blobSize; + while (outstanding > 0) { + if (randomBoolean()) { + int toWrite = Math.toIntExact(Math.min(randomIntBetween(64, buffer.length), outstanding)); + out.write(buffer, 0, toWrite); + outstanding -= toWrite; + } else { + out.write(0); + outstanding--; + } + } + }); + + assertEquals(blobSize, bytesReceived.get()); + } + /** * Asserts that an InputStream is fully consumed, or aborted, when it is closed */ diff --git a/server/src/main/java/org/elasticsearch/common/blobstore/BlobContainer.java b/server/src/main/java/org/elasticsearch/common/blobstore/BlobContainer.java index 266a1536274b2..90da41f654528 100644 --- a/server/src/main/java/org/elasticsearch/common/blobstore/BlobContainer.java +++ b/server/src/main/java/org/elasticsearch/common/blobstore/BlobContainer.java @@ -9,9 +9,11 @@ package org.elasticsearch.common.blobstore; import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.core.CheckedConsumer; import java.io.IOException; import java.io.InputStream; +import java.io.OutputStream; import java.nio.file.FileAlreadyExistsException; import java.nio.file.NoSuchFileException; import java.util.Iterator; @@ -116,6 +118,18 @@ default void writeBlob(String blobName, BytesReference bytes, boolean failIfAlre writeBlob(blobName, bytes.streamInput(), bytes.length(), failIfAlreadyExists); } + /** + * Write a blob by providing a consumer that will write its contents to an output stream. This method allows serializing a blob's + * contents directly to the blob store without having to materialize the serialized version in full before writing. + * + * @param blobName the name of the blob to write + * @param failIfAlreadyExists whether to throw a FileAlreadyExistsException if the given blob already exists + * @param atomic whether the write should be atomic in case the implementation supports it + * @param writer consumer for an output stream that will write the blob contents to the stream + */ + void writeBlob(String blobName, boolean failIfAlreadyExists, boolean atomic, + CheckedConsumer writer) throws IOException; + /** * Reads blob content from a {@link BytesReference} and writes it to the container in a new blob with the given name, * using an atomic write operation if the implementation supports it. diff --git a/server/src/main/java/org/elasticsearch/common/blobstore/fs/FsBlobContainer.java b/server/src/main/java/org/elasticsearch/common/blobstore/fs/FsBlobContainer.java index 37a665e9bcc6e..ec051b21ed8f4 100644 --- a/server/src/main/java/org/elasticsearch/common/blobstore/fs/FsBlobContainer.java +++ b/server/src/main/java/org/elasticsearch/common/blobstore/fs/FsBlobContainer.java @@ -18,9 +18,11 @@ import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.collect.Iterators; import org.elasticsearch.common.io.Streams; +import org.elasticsearch.core.CheckedConsumer; import org.elasticsearch.core.internal.io.IOUtils; import java.io.FileNotFoundException; +import java.io.FilterOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -243,6 +245,49 @@ public void writeBlob(String blobName, BytesReference bytes, boolean failIfAlrea IOUtils.fsync(path, true); } + @Override + public void writeBlob(String blobName, + boolean failIfAlreadyExists, + boolean atomic, + CheckedConsumer writer) throws IOException { + if (atomic) { + final String tempBlob = tempBlobName(blobName); + try { + writeToPath(tempBlob, true, writer); + moveBlobAtomic(tempBlob, blobName, failIfAlreadyExists); + } catch (IOException ex) { + try { + deleteBlobsIgnoringIfNotExists(Iterators.single(tempBlob)); + } catch (IOException e) { + ex.addSuppressed(e); + } + throw ex; + } + } else { + writeToPath(blobName, failIfAlreadyExists, writer); + } + IOUtils.fsync(path, true); + } + + private void writeToPath(String blobName, boolean failIfAlreadyExists, CheckedConsumer writer) + throws IOException { + final Path file = path.resolve(blobName); + try { + try (OutputStream out = new BlobOutputStream(file)) { + writer.accept(out); + } + } catch (FileAlreadyExistsException faee) { + if (failIfAlreadyExists) { + throw faee; + } + deleteBlobsIgnoringIfNotExists(Iterators.single(blobName)); + try (OutputStream out = new BlobOutputStream(file)) { + writer.accept(out); + } + } + IOUtils.fsync(file, false); + } + @Override public void writeBlobAtomic(final String blobName, BytesReference bytes, boolean failIfAlreadyExists) throws IOException { final String tempBlob = tempBlobName(blobName); @@ -306,4 +351,16 @@ public static String tempBlobName(final String blobName) { public static boolean isTempBlobName(final String blobName) { return blobName.startsWith(TEMP_FILE_PREFIX); } + + private static class BlobOutputStream extends FilterOutputStream { + + BlobOutputStream(Path file) throws IOException { + super(Files.newOutputStream(file, StandardOpenOption.CREATE_NEW)); + } + + @Override + public void write(byte[] b, int off, int len) throws IOException { + out.write(b, off, len); + } + } } diff --git a/server/src/main/java/org/elasticsearch/common/blobstore/support/FilterBlobContainer.java b/server/src/main/java/org/elasticsearch/common/blobstore/support/FilterBlobContainer.java index cb287531c8f7b..d2746fcb8d1f8 100644 --- a/server/src/main/java/org/elasticsearch/common/blobstore/support/FilterBlobContainer.java +++ b/server/src/main/java/org/elasticsearch/common/blobstore/support/FilterBlobContainer.java @@ -13,9 +13,11 @@ import org.elasticsearch.common.blobstore.BlobPath; import org.elasticsearch.common.blobstore.DeleteResult; import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.core.CheckedConsumer; import java.io.IOException; import java.io.InputStream; +import java.io.OutputStream; import java.util.Iterator; import java.util.Map; import java.util.Objects; @@ -61,6 +63,12 @@ public void writeBlob(String blobName, InputStream inputStream, long blobSize, b delegate.writeBlob(blobName, inputStream, blobSize, failIfAlreadyExists); } + @Override + public void writeBlob(String blobName, boolean failIfAlreadyExists, boolean atomic, + CheckedConsumer writer) throws IOException { + delegate.writeBlob(blobName, failIfAlreadyExists, atomic, writer); + } + @Override public void writeBlobAtomic(String blobName, BytesReference bytes, boolean failIfAlreadyExists) throws IOException { delegate.writeBlobAtomic(blobName, bytes, failIfAlreadyExists); diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index 63cdece736485..dd4e8031afc60 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -53,11 +53,9 @@ import org.elasticsearch.common.blobstore.DeleteResult; import org.elasticsearch.common.blobstore.fs.FsBlobContainer; import org.elasticsearch.common.bytes.BytesArray; -import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.compress.NotXContentException; import org.elasticsearch.common.io.Streams; -import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput; import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.lucene.store.InputStreamIndexInput; import org.elasticsearch.common.metrics.CounterMetric; @@ -74,6 +72,7 @@ import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.core.CheckedConsumer; import org.elasticsearch.core.Nullable; import org.elasticsearch.core.Releasable; import org.elasticsearch.core.Tuple; @@ -119,6 +118,7 @@ import java.io.FilterInputStream; import java.io.IOException; import java.io.InputStream; +import java.io.OutputStream; import java.nio.file.NoSuchFileException; import java.util.ArrayList; import java.util.Collection; @@ -558,15 +558,13 @@ public void cloneShardSnapshot( sourceMeta.asClone(target.getName(), startTime, threadPool.absoluteTimeInMillis() - startTime), shardContainer, target.getUUID(), - compress, - bigArrays + compress ); INDEX_SHARD_SNAPSHOTS_FORMAT.write( existingSnapshots.withClone(source.getName(), target.getName()), shardContainer, newGen, - compress, - bigArrays + compress ); return new ShardSnapshotResult( newGen, @@ -768,17 +766,11 @@ public RepositoryStats stats() { public void initializeSnapshot(SnapshotId snapshotId, List indices, Metadata clusterMetadata) { try { // Write Global Metadata - GLOBAL_METADATA_FORMAT.write(clusterMetadata, blobContainer(), snapshotId.getUUID(), compress, bigArrays); + GLOBAL_METADATA_FORMAT.write(clusterMetadata, blobContainer(), snapshotId.getUUID(), compress); // write the index metadata for each index in the snapshot for (IndexId index : indices) { - INDEX_METADATA_FORMAT.write( - clusterMetadata.index(index.getName()), - indexContainer(index), - snapshotId.getUUID(), - compress, - bigArrays - ); + INDEX_METADATA_FORMAT.write(clusterMetadata.index(index.getName()), indexContainer(index), snapshotId.getUUID(), compress); } } catch (IOException ex) { throw new SnapshotCreationException(metadata.name(), snapshotId, ex); @@ -1421,7 +1413,7 @@ public void finalizeSnapshot( executor.execute( ActionRunnable.run( allMetaListener, - () -> GLOBAL_METADATA_FORMAT.write(clusterMetadata, blobContainer(), snapshotId.getUUID(), compress, bigArrays) + () -> GLOBAL_METADATA_FORMAT.write(clusterMetadata, blobContainer(), snapshotId.getUUID(), compress) ) ); @@ -1435,7 +1427,7 @@ public void finalizeSnapshot( if (metaUUID == null) { // We don't yet have this version of the metadata so we write it metaUUID = UUIDs.base64UUID(); - INDEX_METADATA_FORMAT.write(indexMetaData, indexContainer(index), metaUUID, compress, bigArrays); + INDEX_METADATA_FORMAT.write(indexMetaData, indexContainer(index), metaUUID, compress); indexMetaIdentifiers.put(identifiers, metaUUID); } indexMetas.put(index, identifiers); @@ -1444,8 +1436,7 @@ public void finalizeSnapshot( clusterMetadata.index(index.getName()), indexContainer(index), snapshotId.getUUID(), - compress, - bigArrays + compress ); } })); @@ -1453,7 +1444,7 @@ public void finalizeSnapshot( executor.execute( ActionRunnable.run( allMetaListener, - () -> SNAPSHOT_FORMAT.write(snapshotInfo, blobContainer(), snapshotId.getUUID(), compress, bigArrays) + () -> SNAPSHOT_FORMAT.write(snapshotInfo, blobContainer(), snapshotId.getUUID(), compress) ) ); }, onUpdateFailure); @@ -2279,13 +2270,11 @@ public void onFailure(Exception e) { } final String indexBlob = INDEX_FILE_PREFIX + Long.toString(newGen); logger.debug("Repository [{}] writing new index generational blob [{}]", metadata.name(), indexBlob); - try (ReleasableBytesStreamOutput out = new ReleasableBytesStreamOutput(bigArrays)) { + writeAtomic(blobContainer(), indexBlob, out -> { try (XContentBuilder xContentBuilder = XContentFactory.jsonBuilder(Streams.noCloseStream(out))) { newRepositoryData.snapshotsToXContent(xContentBuilder, version); } - final BytesReference serializedRepoData = out.bytes(); - writeAtomic(blobContainer(), indexBlob, serializedRepoData, true); - } + }, true); maybeWriteIndexLatest(newGen); // Step 3: Update CS to reflect new repository generation. @@ -2378,7 +2367,7 @@ private void maybeWriteIndexLatest(long newGen) { if (supportURLRepo) { logger.debug("Repository [{}] updating index.latest with generation [{}]", metadata.name(), newGen); try { - writeAtomic(blobContainer(), INDEX_LATEST_BLOB, new BytesArray(Numbers.longToBytes(newGen)), false); + writeAtomic(blobContainer(), INDEX_LATEST_BLOB, out -> out.write(Numbers.longToBytes(newGen)), false); } catch (Exception e) { logger.warn( () -> new ParameterizedMessage( @@ -2559,10 +2548,14 @@ private long latestGeneration(Collection rootBlobs) { return latest; } - private void writeAtomic(BlobContainer container, final String blobName, final BytesReference bytesRef, boolean failIfAlreadyExists) - throws IOException { + private void writeAtomic( + BlobContainer container, + final String blobName, + CheckedConsumer writer, + boolean failIfAlreadyExists + ) throws IOException { logger.trace(() -> new ParameterizedMessage("[{}] Writing [{}] to {} atomically", metadata.name(), blobName, container.path())); - container.writeBlobAtomic(blobName, bytesRef, failIfAlreadyExists); + container.writeBlob(blobName, failIfAlreadyExists, true, writer); } @Override @@ -2713,13 +2706,7 @@ public void snapshotShard(SnapshotShardContext context) { // reference a generation that has not had all its files fully upload. indexGeneration = UUIDs.randomBase64UUID(); try { - INDEX_SHARD_SNAPSHOTS_FORMAT.write( - updatedBlobStoreIndexShardSnapshots, - shardContainer, - indexGeneration, - compress, - bigArrays - ); + INDEX_SHARD_SNAPSHOTS_FORMAT.write(updatedBlobStoreIndexShardSnapshots, shardContainer, indexGeneration, compress); } catch (IOException e) { throw new IndexShardSnapshotFailedException( shardId, @@ -2796,7 +2783,7 @@ public void snapshotShard(SnapshotShardContext context) { ); try { final String snapshotUUID = snapshotId.getUUID(); - INDEX_SHARD_SNAPSHOT_FORMAT.write(blobStoreIndexShardSnapshot, shardContainer, snapshotUUID, compress, bigArrays); + INDEX_SHARD_SNAPSHOT_FORMAT.write(blobStoreIndexShardSnapshot, shardContainer, snapshotUUID, compress); } catch (IOException e) { throw new IndexShardSnapshotFailedException(shardId, "Failed to write commit point", e); } @@ -3169,7 +3156,7 @@ private ShardSnapshotMetaDeleteResult deleteFromShardSnapshotMeta( final BlobStoreIndexShardSnapshots updatedSnapshots = new BlobStoreIndexShardSnapshots(newSnapshotsList); if (indexGeneration < 0L) { writtenGeneration = UUIDs.randomBase64UUID(); - INDEX_SHARD_SNAPSHOTS_FORMAT.write(updatedSnapshots, shardContainer, writtenGeneration, compress, bigArrays); + INDEX_SHARD_SNAPSHOTS_FORMAT.write(updatedSnapshots, shardContainer, writtenGeneration, compress); } else { writtenGeneration = String.valueOf(indexGeneration); writeShardIndexBlobAtomic(shardContainer, indexGeneration, updatedSnapshots); @@ -3209,12 +3196,11 @@ private void writeShardIndexBlobAtomic( () -> new ParameterizedMessage("[{}] Writing shard index [{}] to [{}]", metadata.name(), indexGeneration, shardContainer.path()) ); final String blobName = INDEX_SHARD_SNAPSHOTS_FORMAT.blobName(String.valueOf(indexGeneration)); - INDEX_SHARD_SNAPSHOTS_FORMAT.serialize( - updatedSnapshots, + writeAtomic( + shardContainer, blobName, - compress, - bigArrays, - bytes -> writeAtomic(shardContainer, blobName, bytes, true) + out -> INDEX_SHARD_SNAPSHOTS_FORMAT.serialize(updatedSnapshots, blobName, compress, out), + true ); } diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/ChecksumBlobStoreFormat.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/ChecksumBlobStoreFormat.java index 00f136b6413fa..0425213c9a650 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/ChecksumBlobStoreFormat.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/ChecksumBlobStoreFormat.java @@ -19,12 +19,9 @@ import org.elasticsearch.common.Numbers; import org.elasticsearch.common.blobstore.BlobContainer; import org.elasticsearch.common.bytes.BytesArray; -import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.compress.CompressorFactory; import org.elasticsearch.common.io.Streams; -import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput; import org.elasticsearch.common.lucene.store.IndexOutputOutputStream; -import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.xcontent.LoggingDeprecationHandler; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.common.xcontent.ToXContent; @@ -32,7 +29,6 @@ import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentType; -import org.elasticsearch.core.CheckedConsumer; import org.elasticsearch.gateway.CorruptStateException; import java.io.FilterInputStream; @@ -272,47 +268,38 @@ private int getAvailable() throws IOException { * @param name blob name * @param compress whether to use compression */ - public void write(T obj, BlobContainer blobContainer, String name, boolean compress, BigArrays bigArrays) throws IOException { + public void write(T obj, BlobContainer blobContainer, String name, boolean compress) throws IOException { final String blobName = blobName(name); - serialize(obj, blobName, compress, bigArrays, bytes -> blobContainer.writeBlob(blobName, bytes, false)); + blobContainer.writeBlob(blobName, false, false, out -> serialize(obj, blobName, compress, out)); } - public void serialize( - final T obj, - final String blobName, - final boolean compress, - BigArrays bigArrays, - CheckedConsumer consumer - ) throws IOException { - try (ReleasableBytesStreamOutput outputStream = new ReleasableBytesStreamOutput(bigArrays)) { - try ( - OutputStreamIndexOutput indexOutput = new OutputStreamIndexOutput( - "ChecksumBlobStoreFormat.writeBlob(blob=\"" + blobName + "\")", - blobName, - org.elasticsearch.common.io.Streams.noCloseStream(outputStream), - BUFFER_SIZE + public void serialize(final T obj, final String blobName, final boolean compress, OutputStream outputStream) throws IOException { + try ( + OutputStreamIndexOutput indexOutput = new OutputStreamIndexOutput( + "ChecksumBlobStoreFormat.serialize(blob=\"" + blobName + "\")", + blobName, + org.elasticsearch.common.io.Streams.noCloseStream(outputStream), + BUFFER_SIZE + ) + ) { + CodecUtil.writeHeader(indexOutput, codec, VERSION); + try (OutputStream indexOutputOutputStream = new IndexOutputOutputStream(indexOutput) { + @Override + public void close() { + // this is important since some of the XContentBuilders write bytes on close. + // in order to write the footer we need to prevent closing the actual index input. + } + }; + XContentBuilder builder = XContentFactory.contentBuilder( + XContentType.SMILE, + compress ? CompressorFactory.COMPRESSOR.threadLocalOutputStream(indexOutputOutputStream) : indexOutputOutputStream ) ) { - CodecUtil.writeHeader(indexOutput, codec, VERSION); - try (OutputStream indexOutputOutputStream = new IndexOutputOutputStream(indexOutput) { - @Override - public void close() { - // this is important since some of the XContentBuilders write bytes on close. - // in order to write the footer we need to prevent closing the actual index input. - } - }; - XContentBuilder builder = XContentFactory.contentBuilder( - XContentType.SMILE, - compress ? CompressorFactory.COMPRESSOR.threadLocalOutputStream(indexOutputOutputStream) : indexOutputOutputStream - ) - ) { - builder.startObject(); - obj.toXContent(builder, SNAPSHOT_ONLY_FORMAT_PARAMS); - builder.endObject(); - } - CodecUtil.writeFooter(indexOutput); + builder.startObject(); + obj.toXContent(builder, SNAPSHOT_ONLY_FORMAT_PARAMS); + builder.endObject(); } - consumer.accept(outputStream.bytes()); + CodecUtil.writeFooter(indexOutput); } } } diff --git a/server/src/test/java/org/elasticsearch/repositories/blobstore/ChunkedBlobOutputStreamTests.java b/server/src/test/java/org/elasticsearch/repositories/blobstore/ChunkedBlobOutputStreamTests.java index 8dc2b8d21b2d9..6e354a25f4e11 100644 --- a/server/src/test/java/org/elasticsearch/repositories/blobstore/ChunkedBlobOutputStreamTests.java +++ b/server/src/test/java/org/elasticsearch/repositories/blobstore/ChunkedBlobOutputStreamTests.java @@ -9,6 +9,7 @@ package org.elasticsearch.repositories.blobstore; import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.io.Streams; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.MockBigArrays; @@ -17,7 +18,6 @@ import org.elasticsearch.test.ESTestCase; import java.io.IOException; -import java.io.OutputStream; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -43,11 +43,11 @@ public void testSuccessfulChunkedWrite() throws IOException { final long chunkSize = randomLongBetween(10, 1024); final CRC32 checksumIn = new CRC32(); final CRC32 checksumOut = new CRC32(); - final CheckedOutputStream out = new CheckedOutputStream(OutputStream.nullOutputStream(), checksumOut); + final CheckedOutputStream out = new CheckedOutputStream(Streams.NULL_OUTPUT_STREAM, checksumOut); final AtomicLong writtenBytesCounter = new AtomicLong(0L); final long bytesToWrite = randomLongBetween(chunkSize - 5, 1000 * chunkSize); long written = 0; - try (ChunkedBlobOutputStream stream = new ChunkedBlobOutputStream<>(bigArrays, chunkSize) { + try (ChunkedBlobOutputStream stream = new ChunkedBlobOutputStream(bigArrays, chunkSize) { private final AtomicInteger partIdSupplier = new AtomicInteger(); @@ -104,7 +104,7 @@ public void testExceptionDuringChunkedWrite() throws IOException { final long bytesToWrite = randomLongBetween(chunkSize - 5, 1000 * chunkSize); long written = 0; final AtomicBoolean onFailureCalled = new AtomicBoolean(false); - try (ChunkedBlobOutputStream stream = new ChunkedBlobOutputStream<>(bigArrays, chunkSize) { + try (ChunkedBlobOutputStream stream = new ChunkedBlobOutputStream(bigArrays, chunkSize) { private final AtomicInteger partIdSupplier = new AtomicInteger(); diff --git a/server/src/test/java/org/elasticsearch/snapshots/BlobStoreFormatTests.java b/server/src/test/java/org/elasticsearch/snapshots/BlobStoreFormatTests.java index 57aba7691e8c0..2ff9d68ce8657 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/BlobStoreFormatTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/BlobStoreFormatTests.java @@ -17,7 +17,6 @@ import org.elasticsearch.common.blobstore.fs.FsBlobStore; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.io.Streams; -import org.elasticsearch.common.util.MockBigArrays; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.ToXContentFragment; import org.elasticsearch.common.xcontent.XContentBuilder; @@ -87,9 +86,9 @@ public void testBlobStoreOperations() throws IOException { // Write blobs in different formats final String randomText = randomAlphaOfLengthBetween(0, 1024 * 8 * 3); final String normalText = "checksum smile: " + randomText; - checksumSMILE.write(new BlobObj(normalText), blobContainer, "check-smile", false, MockBigArrays.NON_RECYCLING_INSTANCE); + checksumSMILE.write(new BlobObj(normalText), blobContainer, "check-smile", false); final String compressedText = "checksum smile compressed: " + randomText; - checksumSMILE.write(new BlobObj(compressedText), blobContainer, "check-smile-comp", true, MockBigArrays.NON_RECYCLING_INSTANCE); + checksumSMILE.write(new BlobObj(compressedText), blobContainer, "check-smile-comp", true); // Assert that all checksum blobs can be read assertEquals(normalText, checksumSMILE.read("repo", blobContainer, "check-smile", xContentRegistry()).getText()); @@ -109,8 +108,8 @@ public void testCompressionIsApplied() throws IOException { (repo, parser) -> BlobObj.fromXContent(parser) ); BlobObj blobObj = new BlobObj(veryRedundantText.toString()); - checksumFormat.write(blobObj, blobContainer, "blob-comp", true, MockBigArrays.NON_RECYCLING_INSTANCE); - checksumFormat.write(blobObj, blobContainer, "blob-not-comp", false, MockBigArrays.NON_RECYCLING_INSTANCE); + checksumFormat.write(blobObj, blobContainer, "blob-comp", true); + checksumFormat.write(blobObj, blobContainer, "blob-not-comp", false); Map blobs = blobContainer.listBlobsByPrefix("blob-"); assertEquals(blobs.size(), 2); assertThat(blobs.get("blob-not-comp").length(), greaterThan(blobs.get("blob-comp").length())); @@ -121,12 +120,13 @@ public void testBlobCorruption() throws IOException { BlobContainer blobContainer = blobStore.blobContainer(BlobPath.EMPTY); String testString = randomAlphaOfLength(randomInt(10000)); BlobObj blobObj = new BlobObj(testString); + ChecksumBlobStoreFormat checksumFormat = new ChecksumBlobStoreFormat<>( BLOB_CODEC, "%s", (repo, parser) -> BlobObj.fromXContent(parser) ); - checksumFormat.write(blobObj, blobContainer, "test-path", randomBoolean(), MockBigArrays.NON_RECYCLING_INSTANCE); + checksumFormat.write(blobObj, blobContainer, "test-path", randomBoolean()); assertEquals(checksumFormat.read("repo", blobContainer, "test-path", xContentRegistry()).getText(), testString); randomCorruption(blobContainer, "test-path"); try { diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotInfoBlobSerializationTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotInfoBlobSerializationTests.java index 6d1073e14dfb7..f8a7c1a1ac59b 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotInfoBlobSerializationTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotInfoBlobSerializationTests.java @@ -9,9 +9,7 @@ package org.elasticsearch.snapshots; import org.elasticsearch.Version; -import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.support.PlainActionFuture; -import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.repositories.blobstore.BlobStoreRepository; import org.elasticsearch.test.AbstractWireTestCase; @@ -32,22 +30,13 @@ protected SnapshotInfo mutateInstance(SnapshotInfo instance) throws IOException @Override protected SnapshotInfo copyInstance(SnapshotInfo instance, Version version) throws IOException { - final PlainActionFuture future = new PlainActionFuture<>(); - BlobStoreRepository.SNAPSHOT_FORMAT.serialize( - instance, - "test", - randomBoolean(), - BigArrays.NON_RECYCLING_INSTANCE, - bytes -> ActionListener.completeWith( - future, - () -> BlobStoreRepository.SNAPSHOT_FORMAT.deserialize( - instance.repository(), - NamedXContentRegistry.EMPTY, - bytes.streamInput() - ) - ) + final BytesStreamOutput out = new BytesStreamOutput(); + BlobStoreRepository.SNAPSHOT_FORMAT.serialize(instance, "test", randomBoolean(), out); + return BlobStoreRepository.SNAPSHOT_FORMAT.deserialize( + instance.repository(), + NamedXContentRegistry.EMPTY, + out.bytes().streamInput() ); - return future.actionGet(); } } diff --git a/test/fixtures/gcs-fixture/src/main/java/fixture/gcs/GoogleCloudStorageHttpHandler.java b/test/fixtures/gcs-fixture/src/main/java/fixture/gcs/GoogleCloudStorageHttpHandler.java index e5ed67391347b..d0486b5b0102f 100644 --- a/test/fixtures/gcs-fixture/src/main/java/fixture/gcs/GoogleCloudStorageHttpHandler.java +++ b/test/fixtures/gcs-fixture/src/main/java/fixture/gcs/GoogleCloudStorageHttpHandler.java @@ -20,11 +20,8 @@ import org.elasticsearch.common.bytes.CompositeBytesReference; import org.elasticsearch.common.io.Streams; import org.elasticsearch.common.regex.Regex; -import org.elasticsearch.common.xcontent.XContentHelper; -import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.core.SuppressForbidden; import org.elasticsearch.core.Tuple; -import org.elasticsearch.repositories.blobstore.BlobStoreRepository; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.rest.RestUtils; @@ -33,7 +30,6 @@ import java.io.InputStream; import java.io.InputStreamReader; import java.net.URLDecoder; -import java.nio.file.Paths; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; @@ -211,11 +207,6 @@ public void handle(final HttpExchange exchange) throws IOException { blobs.put(blobName, BytesArray.EMPTY); byte[] response = requestBody.utf8ToString().getBytes(UTF_8); - if (Paths.get(blobName).getFileName().toString().startsWith(BlobStoreRepository.UPLOADED_DATA_BLOB_PREFIX) == false) { - final Map parsedBody = XContentHelper.convertToMap(requestBody, false, XContentType.JSON).v2(); - assert parsedBody.get("md5Hash") != null - : "file [" + blobName + "] is not a data blob but did not come with a md5 checksum"; - } exchange.getResponseHeaders().add("Content-Type", "application/json"); exchange.getResponseHeaders() .add( diff --git a/test/framework/src/main/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java b/test/framework/src/main/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java index f90e66ae863aa..9e3b5c5657610 100644 --- a/test/framework/src/main/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java @@ -38,7 +38,6 @@ import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.core.Nullable; import org.elasticsearch.core.TimeValue; -import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.xcontent.DeprecationHandler; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.common.xcontent.XContentBuilder; @@ -383,7 +382,7 @@ protected String initWithSnapshotVersion(String repoName, Path repoPath, Version PlainActionFuture.get(f -> blobStoreRepository.threadPool().generic().execute(ActionRunnable.run(f, () -> BlobStoreRepository.SNAPSHOT_FORMAT.write(downgradedSnapshotInfo, blobStoreRepository.blobStore().blobContainer(blobStoreRepository.basePath()), snapshotInfo.snapshotId().getUUID(), - randomBoolean(), internalCluster().getCurrentMasterNodeInstance(BigArrays.class))))); + randomBoolean())))); return oldVersionSnapshot; } diff --git a/test/framework/src/main/java/org/elasticsearch/snapshots/mockstore/MockRepository.java b/test/framework/src/main/java/org/elasticsearch/snapshots/mockstore/MockRepository.java index f7941f911d6e9..0fbb39a66fdce 100644 --- a/test/framework/src/main/java/org/elasticsearch/snapshots/mockstore/MockRepository.java +++ b/test/framework/src/main/java/org/elasticsearch/snapshots/mockstore/MockRepository.java @@ -24,6 +24,7 @@ import org.elasticsearch.common.blobstore.support.FilterBlobContainer; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.collect.Iterators; +import org.elasticsearch.core.CheckedConsumer; import org.elasticsearch.core.PathUtils; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Setting.Property; @@ -40,6 +41,7 @@ import java.io.IOException; import java.io.InputStream; +import java.io.OutputStream; import java.io.UnsupportedEncodingException; import java.nio.file.Path; import java.security.MessageDigest; @@ -488,11 +490,7 @@ public Map listBlobsByPrefix(String blobNamePrefix) throws @Override public void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException { - maybeIOExceptionOrBlock(blobName); - if (blockOnWriteShardLevelMeta && blobName.startsWith(BlobStoreRepository.SNAPSHOT_PREFIX) - && path().equals(basePath()) == false) { - blockExecutionAndMaybeWait(blobName); - } + beforeWrite(blobName); super.writeBlob(blobName, inputStream, blobSize, failIfAlreadyExists); if (RandomizedContext.current().getRandom().nextBoolean()) { // for network based repositories, the blob may have been written but we may still @@ -502,19 +500,35 @@ && path().equals(basePath()) == false) { } @Override - public void writeBlobAtomic(final String blobName, final BytesReference bytes, - final boolean failIfAlreadyExists) throws IOException { - final Random random = RandomizedContext.current().getRandom(); - if (failOnIndexLatest && BlobStoreRepository.INDEX_LATEST_BLOB.equals(blobName)) { - throw new IOException("Random IOException"); + public void writeBlob(String blobName, + boolean failIfAlreadyExists, + boolean atomic, + CheckedConsumer writer) throws IOException { + if (atomic) { + beforeAtomicWrite(blobName); + } else { + beforeWrite(blobName); } - if (blobName.startsWith(BlobStoreRepository.INDEX_FILE_PREFIX)) { - if (blockAndFailOnWriteIndexFile) { - blockExecutionAndFail(blobName); - } else if (blockOnWriteIndexFile) { - blockExecutionAndMaybeWait(blobName); - } + super.writeBlob(blobName, failIfAlreadyExists, atomic, writer); + if (RandomizedContext.current().getRandom().nextBoolean()) { + // for network based repositories, the blob may have been written but we may still + // get an error with the client connection, so an IOException here simulates this + maybeIOExceptionOrBlock(blobName); + } + } + + private void beforeWrite(String blobName) throws IOException { + maybeIOExceptionOrBlock(blobName); + if (blockOnWriteShardLevelMeta && blobName.startsWith(BlobStoreRepository.SNAPSHOT_PREFIX) + && path().equals(basePath()) == false) { + blockExecutionAndMaybeWait(blobName); } + } + + @Override + public void writeBlobAtomic(final String blobName, final BytesReference bytes, + final boolean failIfAlreadyExists) throws IOException { + final Random random = beforeAtomicWrite(blobName); if ((delegate() instanceof FsBlobContainer) && (random.nextBoolean())) { // Simulate a failure between the write and move operation in FsBlobContainer final String tempBlobName = FsBlobContainer.tempBlobName(blobName); @@ -529,6 +543,21 @@ public void writeBlobAtomic(final String blobName, final BytesReference bytes, super.writeBlobAtomic(blobName, bytes, failIfAlreadyExists); } } + + private Random beforeAtomicWrite(String blobName) throws IOException { + final Random random = RandomizedContext.current().getRandom(); + if (failOnIndexLatest && BlobStoreRepository.INDEX_LATEST_BLOB.equals(blobName)) { + throw new IOException("Random IOException"); + } + if (blobName.startsWith(BlobStoreRepository.INDEX_FILE_PREFIX)) { + if (blockAndFailOnWriteIndexFile) { + blockExecutionAndFail(blobName); + } else if (blockOnWriteIndexFile) { + blockExecutionAndMaybeWait(blobName); + } + } + return random; + } } } } diff --git a/x-pack/plugin/repository-encrypted/src/main/java/org/elasticsearch/repositories/encrypted/EncryptedRepository.java b/x-pack/plugin/repository-encrypted/src/main/java/org/elasticsearch/repositories/encrypted/EncryptedRepository.java index d4ed2830a4f5c..eb1411bc1a2ed 100644 --- a/x-pack/plugin/repository-encrypted/src/main/java/org/elasticsearch/repositories/encrypted/EncryptedRepository.java +++ b/x-pack/plugin/repository-encrypted/src/main/java/org/elasticsearch/repositories/encrypted/EncryptedRepository.java @@ -34,6 +34,7 @@ import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.core.CheckedConsumer; import org.elasticsearch.core.CheckedFunction; import org.elasticsearch.core.Tuple; import org.elasticsearch.indices.recovery.RecoverySettings; @@ -49,6 +50,7 @@ import java.io.IOException; import java.io.InputStream; +import java.io.OutputStream; import java.nio.charset.StandardCharsets; import java.nio.file.NoSuchFileException; import java.security.GeneralSecurityException; @@ -628,6 +630,24 @@ public void writeBlob(String blobName, BytesReference bytes, boolean failIfAlrea } } + @Override + public void writeBlob( + String blobName, + boolean failIfAlreadyExists, + boolean atomic, + CheckedConsumer writer + ) throws IOException { + // TODO: this is just a stop-gap solution for until we have an encrypted output stream wrapper + try (ReleasableBytesStreamOutput out = new ReleasableBytesStreamOutput(bigArrays)) { + writer.accept(out); + if (atomic) { + writeBlobAtomic(blobName, out.bytes(), failIfAlreadyExists); + } else { + writeBlob(blobName, out.bytes(), failIfAlreadyExists); + } + } + } + private ChainingInputStream encryptedInput(InputStream inputStream, SingleUseKey singleUseNonceAndDEK, BytesReference dekIdBytes) throws IOException { return ChainingInputStream.chain( diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/common/TestUtils.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/common/TestUtils.java index ae2a146655964..f90d82832059f 100644 --- a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/common/TestUtils.java +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/common/TestUtils.java @@ -23,6 +23,7 @@ import org.elasticsearch.common.io.Streams; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.core.CheckedConsumer; import org.elasticsearch.core.Nullable; import org.elasticsearch.core.PathUtilsForTesting; import org.elasticsearch.xpack.searchablesnapshots.cache.blob.BlobStoreCacheService; @@ -33,6 +34,7 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; +import java.io.OutputStream; import java.nio.channels.FileChannel; import java.nio.file.FileSystem; import java.nio.file.OpenOption; @@ -271,6 +273,16 @@ public void writeBlob(String blobName, InputStream inputStream, long blobSize, b throw unsupportedException(); } + @Override + public void writeBlob( + String blobName, + boolean failIfAlreadyExists, + boolean atomic, + CheckedConsumer writer + ) { + throw unsupportedException(); + } + @Override public void writeBlobAtomic(String blobName, BytesReference bytes, boolean failIfAlreadyExists) { throw unsupportedException(); diff --git a/x-pack/plugin/snapshot-repo-test-kit/src/internalClusterTest/java/org/elasticsearch/repositories/blobstore/testkit/RepositoryAnalysisFailureIT.java b/x-pack/plugin/snapshot-repo-test-kit/src/internalClusterTest/java/org/elasticsearch/repositories/blobstore/testkit/RepositoryAnalysisFailureIT.java index ef9d629dc6be5..592c32bd4b24d 100644 --- a/x-pack/plugin/snapshot-repo-test-kit/src/internalClusterTest/java/org/elasticsearch/repositories/blobstore/testkit/RepositoryAnalysisFailureIT.java +++ b/x-pack/plugin/snapshot-repo-test-kit/src/internalClusterTest/java/org/elasticsearch/repositories/blobstore/testkit/RepositoryAnalysisFailureIT.java @@ -17,12 +17,14 @@ import org.elasticsearch.common.blobstore.DeleteResult; import org.elasticsearch.common.blobstore.support.PlainBlobMetadata; import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.util.concurrent.CountDown; import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.core.CheckedConsumer; import org.elasticsearch.core.List; import org.elasticsearch.core.Nullable; import org.elasticsearch.env.Environment; @@ -44,6 +46,7 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; +import java.io.OutputStream; import java.nio.file.FileAlreadyExistsException; import java.util.Arrays; import java.util.Collection; @@ -462,6 +465,22 @@ public void writeBlob(String blobName, BytesReference bytes, boolean failIfAlrea writeBlob(blobName, bytes.streamInput(), bytes.length(), failIfAlreadyExists); } + @Override + public void writeBlob( + String blobName, + boolean failIfAlreadyExists, + boolean atomic, + CheckedConsumer writer + ) throws IOException { + final BytesStreamOutput out = new BytesStreamOutput(); + writer.accept(out); + if (atomic) { + writeBlobAtomic(blobName, out.bytes(), failIfAlreadyExists); + } else { + writeBlob(blobName, out.bytes(), failIfAlreadyExists); + } + } + @Override public void writeBlobAtomic(String blobName, BytesReference bytes, boolean failIfAlreadyExists) throws IOException { final StreamInput inputStream; diff --git a/x-pack/plugin/snapshot-repo-test-kit/src/internalClusterTest/java/org/elasticsearch/repositories/blobstore/testkit/RepositoryAnalysisSuccessIT.java b/x-pack/plugin/snapshot-repo-test-kit/src/internalClusterTest/java/org/elasticsearch/repositories/blobstore/testkit/RepositoryAnalysisSuccessIT.java index a1dd39e485a4c..188eb4772e908 100644 --- a/x-pack/plugin/snapshot-repo-test-kit/src/internalClusterTest/java/org/elasticsearch/repositories/blobstore/testkit/RepositoryAnalysisSuccessIT.java +++ b/x-pack/plugin/snapshot-repo-test-kit/src/internalClusterTest/java/org/elasticsearch/repositories/blobstore/testkit/RepositoryAnalysisSuccessIT.java @@ -16,11 +16,13 @@ import org.elasticsearch.common.blobstore.DeleteResult; import org.elasticsearch.common.blobstore.support.PlainBlobMetadata; import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.core.CheckedConsumer; import org.elasticsearch.core.List; import org.elasticsearch.core.Nullable; import org.elasticsearch.core.TimeValue; @@ -42,6 +44,7 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; +import java.io.OutputStream; import java.util.Collection; import java.util.Collections; import java.util.Iterator; @@ -336,6 +339,22 @@ public void writeBlob(String blobName, BytesReference bytes, boolean failIfAlrea writeBlob(blobName, bytes.streamInput(), bytes.length(), failIfAlreadyExists); } + @Override + public void writeBlob( + String blobName, + boolean failIfAlreadyExists, + boolean atomic, + CheckedConsumer writer + ) throws IOException { + final BytesStreamOutput out = new BytesStreamOutput(); + writer.accept(out); + if (atomic) { + writeBlobAtomic(blobName, out.bytes(), failIfAlreadyExists); + } else { + writeBlob(blobName, out.bytes(), failIfAlreadyExists); + } + } + @Override public void writeBlobAtomic(String blobName, BytesReference bytes, boolean failIfAlreadyExists) throws IOException { writeBlobAtomic(blobName, bytes.streamInput(), bytes.length(), failIfAlreadyExists);