Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -248,10 +248,6 @@ public FileVisitResult visitFileFailed(final Path file, final IOException exc) t
return unremoved;
}

// TODO: replace with constants class if needed (cf. org.apache.lucene.util.Constants)
Copy link
Contributor Author

@original-brownbear original-brownbear Jun 5, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See #42883, the failure described there actually comes up easily in tests here when testing concurrent access to the snapshot repository.

private static final boolean LINUX = System.getProperty("os.name").startsWith("Linux");
private static final boolean MAC_OS_X = System.getProperty("os.name").startsWith("Mac OS X");

/**
* Ensure that any writes to the given file is written to the storage device that contains it. The {@code isDir} parameter specifies
* whether or not the path to sync is a directory. This is needed because we open for read and ignore an {@link IOException} since not
Expand All @@ -267,10 +263,6 @@ public static void fsync(final Path fileToSync, final boolean isDir) throws IOEx
file.force(true);
} catch (final IOException ioe) {
if (isDir) {
assert (LINUX || MAC_OS_X) == false :
"on Linux and MacOSX fsyncing a directory should not throw IOException, "+
"we just don't want to rely on that in production (undocumented); got: " + ioe;
// ignore exception if it is a directory
return;
}
// throw original exception
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,12 +114,7 @@ public InputStream readBlob(String name) throws IOException {
}

@Override
public void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException {
throw new UnsupportedOperationException("URL repository doesn't support this operation");
}

@Override
public void writeBlobAtomic(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException {
public void writeBlobAtomic(String blobName, InputStream inputStream, long blobSize) throws IOException {
throw new UnsupportedOperationException("URL repository doesn't support this operation");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,21 +95,15 @@ public InputStream readBlob(String blobName) throws IOException {
}

@Override
public void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException {
public void writeBlobAtomic(String blobName, InputStream inputStream, long blobSize) throws IOException {
logger.trace("writeBlob({}, stream, {})", buildKey(blobName), blobSize);

try {
blobStore.writeBlob(buildKey(blobName), inputStream, blobSize, failIfAlreadyExists);
blobStore.writeBlob(buildKey(blobName), inputStream, blobSize);
} catch (URISyntaxException|StorageException e) {
throw new IOException("Can not write blob " + blobName, e);
}
}

@Override
public void writeBlobAtomic(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException {
writeBlob(blobName, inputStream, blobSize, failIfAlreadyExists);
}

@Override
public void deleteBlob(String blobName) throws IOException {
logger.trace("deleteBlob({})", blobName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import java.io.IOException;
import java.io.InputStream;
import java.net.URISyntaxException;
import java.nio.file.FileAlreadyExistsException;
import java.util.Collections;
import java.util.Map;
import java.util.function.Function;
Expand Down Expand Up @@ -104,13 +103,12 @@ public Map<String, BlobMetaData> listBlobsByPrefix(String keyPath, String prefix
return service.listBlobsByPrefix(clientName, container, keyPath, prefix);
}

public void writeBlob(String blobName, InputStream inputStream, long blobSize) throws URISyntaxException, StorageException {
service.writeBlob(this.clientName, container, blobName, inputStream, blobSize);
}

public Map<String, BlobContainer> children(BlobPath path) throws URISyntaxException, StorageException {
return Collections.unmodifiableMap(service.children(clientName, container, path).stream().collect(
Collectors.toMap(Function.identity(), name -> new AzureBlobContainer(path.add(name), this, threadPool))));
}

public void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists)
throws URISyntaxException, StorageException, FileAlreadyExistsException {
service.writeBlob(this.clientName, container, blobName, inputStream, blobSize, failIfAlreadyExists);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import com.microsoft.azure.storage.OperationContext;
import com.microsoft.azure.storage.RetryExponentialRetry;
import com.microsoft.azure.storage.RetryPolicy;
import com.microsoft.azure.storage.StorageErrorCodeStrings;
import com.microsoft.azure.storage.StorageException;
import com.microsoft.azure.storage.blob.BlobInputStream;
import com.microsoft.azure.storage.blob.BlobListingDetails;
Expand All @@ -50,10 +49,8 @@

import java.io.IOException;
import java.io.InputStream;
import java.net.HttpURLConnection;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.file.FileAlreadyExistsException;
import java.security.InvalidKeyException;
import java.util.EnumSet;
import java.util.HashMap;
Expand Down Expand Up @@ -253,25 +250,15 @@ public Set<String> children(String account, String container, BlobPath path) thr
return Set.copyOf(blobsBuilder);
}

public void writeBlob(String account, String container, String blobName, InputStream inputStream, long blobSize,
boolean failIfAlreadyExists)
throws URISyntaxException, StorageException, FileAlreadyExistsException {
public void writeBlob(String account, String container, String blobName, InputStream inputStream, long blobSize)
throws URISyntaxException, StorageException {
logger.trace(() -> new ParameterizedMessage("writeBlob({}, stream, {})", blobName, blobSize));
final Tuple<CloudBlobClient, Supplier<OperationContext>> client = client(account);
final CloudBlobContainer blobContainer = client.v1().getContainerReference(container);
final CloudBlockBlob blob = blobContainer.getBlockBlobReference(blobName);
try {
final AccessCondition accessCondition =
failIfAlreadyExists ? AccessCondition.generateIfNotExistsCondition() : AccessCondition.generateEmptyCondition();
SocketAccess.doPrivilegedVoidException(() ->
blob.upload(inputStream, blobSize, accessCondition, null, client.v2().get()));
} catch (final StorageException se) {
if (failIfAlreadyExists && se.getHttpStatusCode() == HttpURLConnection.HTTP_CONFLICT &&
StorageErrorCodeStrings.BLOB_ALREADY_EXISTS.equals(se.getErrorCode())) {
throw new FileAlreadyExistsException(blobName, null, se.getMessage());
}
throw se;
}
final AccessCondition accessCondition = AccessCondition.generateEmptyCondition();
SocketAccess.doPrivilegedVoidException(() ->
blob.upload(inputStream, blobSize, accessCondition, null, client.v2().get()));
logger.trace(() -> new ParameterizedMessage("writeBlob({}, stream, {}) - done", blobName, blobSize));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import java.io.IOException;
import java.io.InputStream;
import java.net.SocketPermission;
import java.nio.file.FileAlreadyExistsException;
import java.nio.file.NoSuchFileException;
import java.security.AccessController;
import java.util.HashMap;
Expand Down Expand Up @@ -99,11 +98,8 @@ public Map<String, BlobMetaData> listBlobsByPrefix(String account, String contai
}

@Override
public void writeBlob(String account, String container, String blobName, InputStream inputStream, long blobSize,
boolean failIfAlreadyExists) throws StorageException, FileAlreadyExistsException {
if (failIfAlreadyExists && blobs.containsKey(blobName)) {
throw new FileAlreadyExistsException(blobName);
}
public void writeBlob(String account, String container, String blobName, InputStream inputStream, long blobSize)
throws StorageException {
try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) {
blobs.put(blobName, outputStream);
Streams.copy(inputStream, outputStream);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,13 +72,8 @@ public InputStream readBlob(String blobName) throws IOException {
}

@Override
public void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException {
blobStore.writeBlob(buildKey(blobName), inputStream, blobSize, failIfAlreadyExists);
}

@Override
public void writeBlobAtomic(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException {
writeBlob(blobName, inputStream, blobSize, failIfAlreadyExists);
public void writeBlobAtomic(String blobName, InputStream inputStream, long blobSize) throws IOException {
blobStore.writeBlob(buildKey(blobName), inputStream, blobSize);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.WritableByteChannel;
import java.nio.file.FileAlreadyExistsException;
import java.nio.file.NoSuchFileException;
import java.util.Map;
import java.util.ArrayList;
Expand All @@ -59,7 +58,6 @@
import java.util.stream.Collectors;

import static java.net.HttpURLConnection.HTTP_NOT_FOUND;
import static java.net.HttpURLConnection.HTTP_PRECON_FAILED;

class GoogleCloudStorageBlobStore implements BlobStore {

Expand Down Expand Up @@ -208,16 +206,15 @@ public void close() throws IOException {

/**
* Writes a blob in the specific bucket
* @param inputStream content of the blob to be written
* @param inputStream content of the blob to be written
* @param blobSize expected size of the blob to be written
* @param failIfAlreadyExists whether to throw a FileAlreadyExistsException if the given blob already exists
*/
void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException {
void writeBlob(String blobName, InputStream inputStream, long blobSize) throws IOException {
final BlobInfo blobInfo = BlobInfo.newBuilder(bucketName, blobName).build();
if (blobSize > LARGE_BLOB_THRESHOLD_BYTE_SIZE) {
writeBlobResumable(blobInfo, inputStream, failIfAlreadyExists);
writeBlobResumable(blobInfo, inputStream);
} else {
writeBlobMultipart(blobInfo, inputStream, blobSize, failIfAlreadyExists);
writeBlobMultipart(blobInfo, inputStream, blobSize);
}
}

Expand All @@ -227,38 +224,26 @@ void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean
* https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload
* @param blobInfo the info for the blob to be uploaded
* @param inputStream the stream containing the blob data
* @param failIfAlreadyExists whether to throw a FileAlreadyExistsException if the given blob already exists
*/
private void writeBlobResumable(BlobInfo blobInfo, InputStream inputStream, boolean failIfAlreadyExists) throws IOException {
try {
final Storage.BlobWriteOption[] writeOptions = failIfAlreadyExists ?
new Storage.BlobWriteOption[] { Storage.BlobWriteOption.doesNotExist() } :
new Storage.BlobWriteOption[0];
final WriteChannel writeChannel = SocketAccess
.doPrivilegedIOException(() -> client().writer(blobInfo, writeOptions));
Streams.copy(inputStream, Channels.newOutputStream(new WritableByteChannel() {
@Override
public boolean isOpen() {
return writeChannel.isOpen();
}
private void writeBlobResumable(BlobInfo blobInfo, InputStream inputStream) throws IOException {
final WriteChannel writeChannel = SocketAccess.doPrivilegedIOException(() -> client().writer(blobInfo));
Streams.copy(inputStream, Channels.newOutputStream(new WritableByteChannel() {
@Override
public boolean isOpen() {
return writeChannel.isOpen();
}

@Override
public void close() throws IOException {
SocketAccess.doPrivilegedVoidIOException(writeChannel::close);
}
@Override
public void close() throws IOException {
SocketAccess.doPrivilegedVoidIOException(writeChannel::close);
}

@SuppressForbidden(reason = "Channel is based of a socket not a file")
@Override
public int write(ByteBuffer src) throws IOException {
return SocketAccess.doPrivilegedIOException(() -> writeChannel.write(src));
}
}));
} catch (final StorageException se) {
if (failIfAlreadyExists && se.getCode() == HTTP_PRECON_FAILED) {
throw new FileAlreadyExistsException(blobInfo.getBlobId().getName(), null, se.getMessage());
@SuppressForbidden(reason = "Channel is based of a socket not a file")
@Override
public int write(ByteBuffer src) throws IOException {
return SocketAccess.doPrivilegedIOException(() -> writeChannel.write(src));
}
throw se;
}
}));
}

/**
Expand All @@ -269,25 +254,13 @@ public int write(ByteBuffer src) throws IOException {
* @param blobInfo the info for the blob to be uploaded
* @param inputStream the stream containing the blob data
* @param blobSize the size
* @param failIfAlreadyExists whether to throw a FileAlreadyExistsException if the given blob already exists
*/
private void writeBlobMultipart(BlobInfo blobInfo, InputStream inputStream, long blobSize, boolean failIfAlreadyExists)
private void writeBlobMultipart(BlobInfo blobInfo, InputStream inputStream, long blobSize)
throws IOException {
assert blobSize <= LARGE_BLOB_THRESHOLD_BYTE_SIZE : "large blob uploads should use the resumable upload method";
final ByteArrayOutputStream baos = new ByteArrayOutputStream(Math.toIntExact(blobSize));
Streams.copy(inputStream, baos);
try {
final Storage.BlobTargetOption[] targetOptions = failIfAlreadyExists ?
new Storage.BlobTargetOption[] { Storage.BlobTargetOption.doesNotExist() } :
new Storage.BlobTargetOption[0];
SocketAccess.doPrivilegedVoidIOException(
() -> client().create(blobInfo, baos.toByteArray(), targetOptions));
} catch (final StorageException se) {
if (failIfAlreadyExists && se.getCode() == HTTP_PRECON_FAILED) {
throw new FileAlreadyExistsException(blobInfo.getBlobId().getName(), null, se.getMessage());
}
throw se;
}
SocketAccess.doPrivilegedVoidIOException(() -> client().create(blobInfo, baos.toByteArray()));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import java.io.FilterInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.FileAlreadyExistsException;
import java.nio.file.NoSuchFileException;
import java.util.Collections;
import java.util.EnumSet;
Expand Down Expand Up @@ -94,27 +93,7 @@ public InputStream readBlob(String blobName) throws IOException {
}

@Override
public void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException {
store.execute((Operation<Void>) fileContext -> {
Path blob = new Path(path, blobName);
// we pass CREATE, which means it fails if a blob already exists.
EnumSet<CreateFlag> flags = failIfAlreadyExists ? EnumSet.of(CreateFlag.CREATE, CreateFlag.SYNC_BLOCK)
: EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE, CreateFlag.SYNC_BLOCK);
try (FSDataOutputStream stream = fileContext.create(blob, flags, CreateOpts.bufferSize(bufferSize))) {
int bytesRead;
byte[] buffer = new byte[bufferSize];
while ((bytesRead = inputStream.read(buffer)) != -1) {
stream.write(buffer, 0, bytesRead);
}
} catch (org.apache.hadoop.fs.FileAlreadyExistsException faee) {
throw new FileAlreadyExistsException(blob.toString(), null, faee.getMessage());
}
return null;
});
}

@Override
public void writeBlobAtomic(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException {
public void writeBlobAtomic(String blobName, InputStream inputStream, long blobSize) throws IOException {
final String tempBlob = FsBlobContainer.tempBlobName(blobName);
store.execute((Operation<Void>) fileContext -> {
final Path tempBlobPath = new Path(path, tempBlob);
Expand All @@ -127,11 +106,7 @@ public void writeBlobAtomic(String blobName, InputStream inputStream, long blobS
}
}
final Path blob = new Path(path, blobName);
try {
fileContext.rename(tempBlobPath, blob, failIfAlreadyExists ? Options.Rename.NONE : Options.Rename.OVERWRITE);
} catch (org.apache.hadoop.fs.FileAlreadyExistsException faee) {
throw new FileAlreadyExistsException(blob.toString(), null, faee.getMessage());
}
fileContext.rename(tempBlobPath, blob, Options.Rename.OVERWRITE);
return null;
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ public void testReadOnly() throws Exception {
assertTrue(util.exists(hdfsPath));

byte[] data = randomBytes(randomIntBetween(10, scaledRandomIntBetween(1024, 1 << 16)));
writeBlob(container, "foo", new BytesArray(data), randomBoolean());
writeBlob(container, "foo", new BytesArray(data));
assertArrayEquals(readBlobFully(container, "foo", data.length), data);
assertTrue(container.blobExists("foo"));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,11 +104,8 @@ public InputStream readBlob(String blobName) throws IOException {
}
}

/**
* This implementation ignores the failIfAlreadyExists flag as the S3 API has no way to enforce this due to its weak consistency model.
*/
@Override
public void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException {
public void writeBlobAtomic(String blobName, InputStream inputStream, long blobSize) throws IOException {
SocketAccess.doPrivilegedIOException(() -> {
if (blobSize <= blobStore.bufferSizeInBytes()) {
executeSingleUpload(blobStore, buildKey(blobName), inputStream, blobSize);
Expand All @@ -119,11 +116,6 @@ public void writeBlob(String blobName, InputStream inputStream, long blobSize, b
});
}

@Override
public void writeBlobAtomic(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException {
writeBlob(blobName, inputStream, blobSize, failIfAlreadyExists);
}

@Override
public void deleteBlob(String blobName) throws IOException {
deleteBlobIgnoringIfNotExists(blobName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,6 @@ public void testDeleteBlob() {
assumeFalse("not implemented because of S3's weak consistency model", true);
}

@Override
public void testVerifyOverwriteFails() {
assumeFalse("not implemented because of S3's weak consistency model", true);
}

public void testExecuteSingleUploadBlobSizeTooLarge() {
final long blobSize = ByteSizeUnit.GB.toBytes(randomIntBetween(6, 10));
final S3BlobStore blobStore = mock(S3BlobStore.class);
Expand Down
Loading