Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.elasticsearch.common.blobstore.url;

import org.elasticsearch.core.CheckedConsumer;
import org.elasticsearch.core.SuppressForbidden;
import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.blobstore.BlobMetadata;
Expand All @@ -20,6 +21,7 @@
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URL;
import java.nio.file.NoSuchFileException;
import java.security.AccessController;
Expand Down Expand Up @@ -121,6 +123,14 @@ public void writeBlob(String blobName, InputStream inputStream, long blobSize, b
throw new UnsupportedOperationException("URL repository doesn't support this operation");
}

@Override
public void writeBlob(String blobName,
boolean failIfAlreadyExists,
boolean atomic,
CheckedConsumer<OutputStream, IOException> writer) throws IOException {
throw new UnsupportedOperationException("URL repository doesn't support this operation");
}

@Override
public void writeBlobAtomic(String blobName, BytesReference bytes, boolean failIfAlreadyExists) throws IOException {
throw new UnsupportedOperationException("URL repository doesn't support this operation");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.core.util.Throwables;
import org.elasticsearch.core.CheckedConsumer;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.blobstore.BlobMetadata;
Expand All @@ -22,6 +23,7 @@

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.file.NoSuchFileException;
import java.util.Iterator;
import java.util.Map;
Expand Down Expand Up @@ -99,6 +101,14 @@ public void writeBlob(String blobName, BytesReference bytes, boolean failIfAlrea
blobStore.writeBlob(buildKey(blobName), bytes, failIfAlreadyExists);
}

@Override
public void writeBlob(String blobName,
boolean failIfAlreadyExists,
boolean atomic,
CheckedConsumer<OutputStream, IOException> writer) throws IOException {
blobStore.writeBlob(buildKey(blobName), failIfAlreadyExists, writer);
}

@Override
public DeleteResult delete() throws IOException {
return blobStore.deleteBlobDirectory(keyPath);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.core.CheckedConsumer;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.blobstore.BlobContainer;
Expand All @@ -46,13 +48,15 @@
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.repositories.azure.AzureRepository.Repository;
import org.elasticsearch.repositories.blobstore.ChunkedBlobOutputStream;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

import java.io.FilterInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.HttpURLConnection;
import java.net.URI;
import java.net.URISyntaxException;
Expand Down Expand Up @@ -83,6 +87,8 @@ public class AzureBlobStore implements BlobStore {

private final AzureStorageService service;

private final BigArrays bigArrays;

private final String clientName;
private final String container;
private final LocationMode locationMode;
Expand All @@ -91,10 +97,11 @@ public class AzureBlobStore implements BlobStore {
private final Stats stats = new Stats();
private final BiConsumer<String, URL> statsConsumer;

public AzureBlobStore(RepositoryMetadata metadata, AzureStorageService service) {
public AzureBlobStore(RepositoryMetadata metadata, AzureStorageService service, BigArrays bigArrays) {
this.container = Repository.CONTAINER_SETTING.get(metadata.settings());
this.clientName = Repository.CLIENT_NAME.get(metadata.settings());
this.service = service;
this.bigArrays = bigArrays;
// locationMode is set per repository, not per client
this.locationMode = Repository.LOCATION_MODE_SETTING.get(metadata.settings());
this.maxSinglePartUploadSize = Repository.MAX_SINGLE_PART_UPLOAD_SIZE_SETTING.get(metadata.settings());
Expand Down Expand Up @@ -384,6 +391,49 @@ public void writeBlob(String blobName, BytesReference bytes, boolean failIfAlrea
executeSingleUpload(blobName, byteBufferFlux, bytes.length(), failIfAlreadyExists);
}

public void writeBlob(String blobName,
boolean failIfAlreadyExists,
CheckedConsumer<OutputStream, IOException> writer) throws IOException {
final BlockBlobAsyncClient blockBlobAsyncClient = asyncClient().getBlobContainerAsyncClient(container)
.getBlobAsyncClient(blobName).getBlockBlobAsyncClient();
try (ChunkedBlobOutputStream<String> out = new ChunkedBlobOutputStream<String>(bigArrays, getUploadBlockSize()) {

@Override
protected void flushBuffer() {
if (buffer.size() == 0) {
return;
}
final String blockId = makeMultipartBlockId();
SocketAccess.doPrivilegedVoidException(() -> blockBlobAsyncClient.stageBlock(
blockId,
Flux.fromArray(BytesReference.toByteBuffers(buffer.bytes())),
buffer.size()
).block());
finishPart(blockId);
}

@Override
protected void onCompletion() {
if (flushedBytes == 0L) {
writeBlob(blobName, buffer.bytes(), failIfAlreadyExists);
} else {
flushBuffer();
SocketAccess.doPrivilegedVoidException(
() -> blockBlobAsyncClient.commitBlockList(parts, failIfAlreadyExists == false).block());
}
}

@Override
protected void onFailure() {
// Nothing to do here, already uploaded blocks will be GCed by Azure after a week.
// see https://docs.microsoft.com/en-us/rest/api/storageservices/put-block#remarks
}
}) {
writer.accept(out);
out.markSuccess();
}
}

public void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException {
assert inputStream.markSupported()
: "Should not be used with non-mark supporting streams as their retry handling in the SDK is broken";
Expand Down Expand Up @@ -440,13 +490,11 @@ private void executeMultipartUpload(String blobName, InputStream inputStream, lo
assert blobSize == (((nbParts - 1) * partSize) + lastPartSize) : "blobSize does not match multipart sizes";

final List<String> blockIds = new ArrayList<>(nbParts);
final Base64.Encoder base64Encoder = Base64.getEncoder().withoutPadding();
final Base64.Decoder base64UrlDecoder = Base64.getUrlDecoder();
for (int i = 0; i < nbParts; i++) {
final long length = i < nbParts - 1 ? partSize : lastPartSize;
Flux<ByteBuffer> byteBufferFlux = convertStreamToByteBuffer(inputStream, length, DEFAULT_UPLOAD_BUFFERS_SIZE);

final String blockId = base64Encoder.encodeToString(base64UrlDecoder.decode(UUIDs.base64UUID()));
final String blockId = makeMultipartBlockId();
blockBlobAsyncClient.stageBlock(blockId, byteBufferFlux, length).block();
blockIds.add(blockId);
}
Expand All @@ -455,6 +503,13 @@ private void executeMultipartUpload(String blobName, InputStream inputStream, lo
});
}

private static final Base64.Encoder base64Encoder = Base64.getEncoder().withoutPadding();
private static final Base64.Decoder base64UrlDecoder = Base64.getUrlDecoder();

private String makeMultipartBlockId() {
return base64Encoder.encodeToString(base64UrlDecoder.decode(UUIDs.base64UUID()));
}

/**
* Converts the provided input stream into a Flux of ByteBuffer. To avoid having large amounts of outstanding
* memory this Flux reads the InputStream into ByteBuffers of {@code chunkSize} size.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ protected BlobStore getBlobStore() {

@Override
protected AzureBlobStore createBlobStore() {
final AzureBlobStore blobStore = new AzureBlobStore(metadata, storageService);
final AzureBlobStore blobStore = new AzureBlobStore(metadata, storageService, bigArrays);

logger.debug(() -> new ParameterizedMessage(
"using container [{}], chunk_size [{}], compress [{}], base_path [{}]",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import fixture.azure.AzureHttpHandler;
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.core.SuppressForbidden;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.blobstore.BlobContainer;
Expand Down Expand Up @@ -58,6 +59,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -172,7 +174,7 @@ int getMaxReadRetries(String clientName) {
.put(MAX_SINGLE_PART_UPLOAD_SIZE_SETTING.getKey(), new ByteSizeValue(1, ByteSizeUnit.MB))
.build());

return new AzureBlobContainer(BlobPath.EMPTY, new AzureBlobStore(repositoryMetadata, service));
return new AzureBlobContainer(BlobPath.EMPTY, new AzureBlobStore(repositoryMetadata, service, BigArrays.NON_RECYCLING_INSTANCE));
}

public void testReadNonexistentBlobThrowsNoSuchFileException() {
Expand Down Expand Up @@ -391,6 +393,82 @@ public void testWriteLargeBlob() throws Exception {
assertThat(blocks.isEmpty(), is(true));
}

public void testWriteLargeBlobStreaming() throws Exception {
final int maxRetries = randomIntBetween(2, 5);

final int blobSize = (int) ByteSizeUnit.MB.toBytes(10);
final byte[] data = randomBytes(blobSize);

final int nbErrors = 2; // we want all requests to fail at least once
final AtomicInteger counterUploads = new AtomicInteger(0);
final AtomicLong bytesReceived = new AtomicLong(0L);
final CountDown countDownComplete = new CountDown(nbErrors);

final Map<String, BytesReference> blocks = new ConcurrentHashMap<>();
httpServer.createContext("/account/container/write_large_blob_streaming", exchange -> {

if ("PUT".equals(exchange.getRequestMethod())) {
final Map<String, String> params = new HashMap<>();
RestUtils.decodeQueryString(exchange.getRequestURI().getRawQuery(), 0, params);

final String blockId = params.get("blockid");
assert Strings.hasText(blockId) == false || AzureFixtureHelper.assertValidBlockId(blockId);

if (Strings.hasText(blockId) && (counterUploads.incrementAndGet() % 2 == 0)) {
final BytesReference blockData = Streams.readFully(exchange.getRequestBody());
blocks.put(blockId, blockData);
bytesReceived.addAndGet(blockData.length());
exchange.sendResponseHeaders(RestStatus.CREATED.getStatus(), -1);
exchange.close();
return;
}

final String complete = params.get("comp");
if ("blocklist".equals(complete) && (countDownComplete.countDown())) {
final String blockList = Streams.copyToString(new InputStreamReader(exchange.getRequestBody(), UTF_8));
final List<String> blockUids = Arrays.stream(blockList.split("<Latest>"))
.filter(line -> line.contains("</Latest>"))
.map(line -> line.substring(0, line.indexOf("</Latest>")))
.collect(Collectors.toList());

final ByteArrayOutputStream blob = new ByteArrayOutputStream();
for (String blockUid : blockUids) {
BytesReference block = blocks.remove(blockUid);
assert block != null;
block.writeTo(blob);
}
assertArrayEquals(data, blob.toByteArray());
exchange.getResponseHeaders().add("x-ms-request-server-encrypted", "false");
exchange.sendResponseHeaders(RestStatus.CREATED.getStatus(), -1);
exchange.close();
return;
}
}

if (randomBoolean()) {
Streams.readFully(exchange.getRequestBody());
AzureHttpHandler.sendError(exchange, randomFrom(RestStatus.INTERNAL_SERVER_ERROR, RestStatus.SERVICE_UNAVAILABLE));
}
exchange.close();
});

final BlobContainer blobContainer = createBlobContainer(maxRetries);
blobContainer.writeBlob("write_large_blob_streaming", false, randomBoolean(), out -> {
int outstanding = data.length;
while (outstanding > 0) {
if (randomBoolean()) {
int toWrite = Math.toIntExact(Math.min(randomIntBetween(64, data.length), outstanding));
out.write(data, data.length - outstanding, toWrite);
outstanding -= toWrite;
} else {
out.write(data[data.length - outstanding]);
outstanding--;
}
}
});
assertEquals(blobSize, bytesReceived.get());
}

public void testRetryUntilFail() throws Exception {
final int maxRetries = randomIntBetween(2, 5);
final AtomicInteger requestsReceived = new AtomicInteger(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@
import org.elasticsearch.common.blobstore.DeleteResult;
import org.elasticsearch.common.blobstore.support.AbstractBlobContainer;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.core.CheckedConsumer;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Iterator;
import java.util.Map;

Expand Down Expand Up @@ -76,6 +78,14 @@ public void writeBlob(String blobName, BytesReference bytes, boolean failIfAlrea
blobStore.writeBlob(buildKey(blobName), bytes, failIfAlreadyExists);
}

@Override
public void writeBlob(String blobName,
boolean failIfAlreadyExists,
boolean atomic,
CheckedConsumer<OutputStream, IOException> writer) throws IOException {
blobStore.writeBlob(buildKey(blobName), failIfAlreadyExists, writer);
}

@Override
public void writeBlobAtomic(String blobName, BytesReference bytes, boolean failIfAlreadyExists) throws IOException {
writeBlob(blobName, bytes, failIfAlreadyExists);
Expand Down
Loading