Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@

import java.io.IOException;
import java.io.InputStream;
import java.nio.file.FileAlreadyExistsException;
import java.util.Map;

class GoogleCloudStorageBlobContainer extends AbstractBlobContainer {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,18 +65,24 @@ class GoogleCloudStorageBlobStore extends AbstractComponent implements BlobStore
// https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload
private static final int LARGE_BLOB_THRESHOLD_BYTE_SIZE = 5 * 1024 * 1024;

private final Storage storage;
private final String bucket;
private final String bucketName;
private final String clientName;
private final GoogleCloudStorageService storageService;

GoogleCloudStorageBlobStore(Settings settings, String bucket, Storage storage) {
GoogleCloudStorageBlobStore(Settings settings, String bucketName, String clientName, GoogleCloudStorageService storageService) {
super(settings);
this.bucket = bucket;
this.storage = storage;
if (doesBucketExist(bucket) == false) {
throw new BlobStoreException("Bucket [" + bucket + "] does not exist");
this.bucketName = bucketName;
this.clientName = clientName;
this.storageService = storageService;
if (doesBucketExist(bucketName) == false) {
throw new BlobStoreException("Bucket [" + bucketName + "] does not exist");
}
}

private Storage client() throws IOException {
return storageService.client(clientName);
}

@Override
public BlobContainer blobContainer(BlobPath path) {
return new GoogleCloudStorageBlobContainer(path, this);
Expand All @@ -92,46 +98,44 @@ public void close() {
}

/**
* Return true if the given bucket exists
* Return true iff the given bucket exists
*
* @param bucketName name of the bucket
* @return true if the bucket exists, false otherwise
* @return true iff the bucket exists
*/
boolean doesBucketExist(String bucketName) {
try {
final Bucket bucket = SocketAccess.doPrivilegedIOException(() -> storage.get(bucketName));
final Bucket bucket = SocketAccess.doPrivilegedIOException(() -> client().get(bucketName));
return bucket != null;
} catch (final Exception e) {
throw new BlobStoreException("Unable to check if bucket [" + bucketName + "] exists", e);
}
}

/**
* List blobs in the bucket under the specified path. The path root is removed.
* List blobs in the specific bucket under the specified path. The path root is removed.
*
* @param path
* base path of the blobs to list
* @param path base path of the blobs to list
* @return a map of blob names and their metadata
*/
Map<String, BlobMetaData> listBlobs(String path) throws IOException {
return listBlobsByPrefix(path, "");
}

/**
* List all blobs in the bucket which have a prefix
* List all blobs in the specific bucket with names prefixed
*
* @param path
* base path of the blobs to list. This path is removed from the
* names of the blobs returned.
* @param prefix
* prefix of the blobs to list.
* @param prefix prefix of the blobs to list.
* @return a map of blob names and their metadata.
*/
Map<String, BlobMetaData> listBlobsByPrefix(String path, String prefix) throws IOException {
final String pathPrefix = buildKey(path, prefix);
final MapBuilder<String, BlobMetaData> mapBuilder = MapBuilder.newMapBuilder();
SocketAccess.doPrivilegedVoidIOException(() -> {
storage.get(bucket).list(BlobListOption.prefix(pathPrefix)).iterateAll().forEach(blob -> {
client().get(bucketName).list(BlobListOption.prefix(pathPrefix)).iterateAll().forEach(blob -> {
assert blob.getName().startsWith(path);
final String suffixName = blob.getName().substring(path.length());
mapBuilder.put(suffixName, new PlainBlobMetaData(suffixName, blob.getSize()));
Expand All @@ -141,26 +145,26 @@ Map<String, BlobMetaData> listBlobsByPrefix(String path, String prefix) throws I
}

/**
* Returns true if the blob exists in the bucket
* Returns true if the blob exists in the specific bucket
*
* @param blobName name of the blob
* @return true if the blob exists, false otherwise
* @return true iff the blob exists
*/
boolean blobExists(String blobName) throws IOException {
final BlobId blobId = BlobId.of(bucket, blobName);
final Blob blob = SocketAccess.doPrivilegedIOException(() -> storage.get(blobId));
final BlobId blobId = BlobId.of(bucketName, blobName);
final Blob blob = SocketAccess.doPrivilegedIOException(() -> client().get(blobId));
return blob != null;
}

/**
* Returns an {@link java.io.InputStream} for a given blob
* Returns an {@link java.io.InputStream} for the given blob name
*
* @param blobName name of the blob
* @return an InputStream
* @return the InputStream used to read the blob's content
*/
InputStream readBlob(String blobName) throws IOException {
final BlobId blobId = BlobId.of(bucket, blobName);
final Blob blob = SocketAccess.doPrivilegedIOException(() -> storage.get(blobId));
final BlobId blobId = BlobId.of(bucketName, blobName);
final Blob blob = SocketAccess.doPrivilegedIOException(() -> client().get(blobId));
if (blob == null) {
throw new NoSuchFileException("Blob [" + blobName + "] does not exit");
}
Expand All @@ -185,13 +189,13 @@ public void close() throws IOException {
}

/**
* Writes a blob in the bucket.
* Writes a blob in the specific bucket
*
* @param inputStream content of the blob to be written
* @param blobSize expected size of the blob to be written
*/
void writeBlob(String blobName, InputStream inputStream, long blobSize) throws IOException {
final BlobInfo blobInfo = BlobInfo.newBuilder(bucket, blobName).build();
final BlobInfo blobInfo = BlobInfo.newBuilder(bucketName, blobName).build();
if (blobSize > LARGE_BLOB_THRESHOLD_BYTE_SIZE) {
writeBlobResumable(blobInfo, inputStream);
} else {
Expand All @@ -209,8 +213,8 @@ void writeBlob(String blobName, InputStream inputStream, long blobSize) throws I
*/
private void writeBlobResumable(BlobInfo blobInfo, InputStream inputStream) throws IOException {
try {
final WriteChannel writeChannel = SocketAccess.doPrivilegedIOException(
() -> storage.writer(blobInfo, Storage.BlobWriteOption.doesNotExist()));
final WriteChannel writeChannel = SocketAccess
.doPrivilegedIOException(() -> client().writer(blobInfo, Storage.BlobWriteOption.doesNotExist()));
Streams.copy(inputStream, Channels.newOutputStream(new WritableByteChannel() {
@Override
public boolean isOpen() {
Expand All @@ -228,7 +232,7 @@ public int write(ByteBuffer src) throws IOException {
return SocketAccess.doPrivilegedIOException(() -> writeChannel.write(src));
}
}));
} catch (StorageException se) {
} catch (final StorageException se) {
if (se.getCode() == HTTP_PRECON_FAILED) {
throw new FileAlreadyExistsException(blobInfo.getBlobId().getName(), null, se.getMessage());
}
Expand All @@ -250,45 +254,43 @@ private void writeBlobMultipart(BlobInfo blobInfo, InputStream inputStream, long
assert blobSize <= LARGE_BLOB_THRESHOLD_BYTE_SIZE : "large blob uploads should use the resumable upload method";
final ByteArrayOutputStream baos = new ByteArrayOutputStream(Math.toIntExact(blobSize));
Streams.copy(inputStream, baos);
SocketAccess.doPrivilegedVoidIOException(
() -> {
try {
storage.create(blobInfo, baos.toByteArray(), Storage.BlobTargetOption.doesNotExist());
} catch (StorageException se) {
if (se.getCode() == HTTP_PRECON_FAILED) {
throw new FileAlreadyExistsException(blobInfo.getBlobId().getName(), null, se.getMessage());
}
throw se;
}
});
try {
SocketAccess.doPrivilegedVoidIOException(
() -> client().create(blobInfo, baos.toByteArray(), Storage.BlobTargetOption.doesNotExist()));
} catch (final StorageException se) {
if (se.getCode() == HTTP_PRECON_FAILED) {
throw new FileAlreadyExistsException(blobInfo.getBlobId().getName(), null, se.getMessage());
}
throw se;
}
}

/**
* Deletes a blob in the bucket
* Deletes the blob from the specific bucket
*
* @param blobName name of the blob
*/
void deleteBlob(String blobName) throws IOException {
final BlobId blobId = BlobId.of(bucket, blobName);
final boolean deleted = SocketAccess.doPrivilegedIOException(() -> storage.delete(blobId));
final BlobId blobId = BlobId.of(bucketName, blobName);
final boolean deleted = SocketAccess.doPrivilegedIOException(() -> client().delete(blobId));
if (deleted == false) {
throw new NoSuchFileException("Blob [" + blobName + "] does not exist");
}
}

/**
* Deletes multiple blobs in the bucket that have a given prefix
* Deletes multiple blobs from the specific bucket all of which have prefixed names
*
* @param prefix prefix of the buckets to delete
* @param prefix prefix of the blobs to delete
*/
void deleteBlobsByPrefix(String prefix) throws IOException {
deleteBlobs(listBlobsByPrefix("", prefix).keySet());
}

/**
* Deletes multiple blobs in the given bucket (uses a batch request to perform this)
* Deletes multiple blobs from the specific bucket using a batch request
*
* @param blobNames names of the bucket to delete
* @param blobNames names of the blobs to delete
*/
void deleteBlobs(Collection<String> blobNames) throws IOException {
if (blobNames.isEmpty()) {
Expand All @@ -299,13 +301,13 @@ void deleteBlobs(Collection<String> blobNames) throws IOException {
deleteBlob(blobNames.iterator().next());
return;
}
final List<BlobId> blobIdsToDelete = blobNames.stream().map(blobName -> BlobId.of(bucket, blobName)).collect(Collectors.toList());
final List<Boolean> deletedStatuses = SocketAccess.doPrivilegedIOException(() -> storage.delete(blobIdsToDelete));
final List<BlobId> blobIdsToDelete = blobNames.stream().map(blob -> BlobId.of(bucketName, blob)).collect(Collectors.toList());
final List<Boolean> deletedStatuses = SocketAccess.doPrivilegedIOException(() -> client().delete(blobIdsToDelete));
assert blobIdsToDelete.size() == deletedStatuses.size();
boolean failed = false;
for (int i = 0; i < blobIdsToDelete.size(); i++) {
if (deletedStatuses.get(i) == false) {
logger.error("Failed to delete blob [{}] in bucket [{}]", blobIdsToDelete.get(i).getName(), bucket);
logger.error("Failed to delete blob [{}] in bucket [{}]", blobIdsToDelete.get(i).getName(), bucketName);
failed = true;
}
}
Expand All @@ -315,26 +317,27 @@ void deleteBlobs(Collection<String> blobNames) throws IOException {
}

/**
* Moves a blob within the same bucket
* Moves a blob within the specific bucket
*
* @param sourceBlobName name of the blob to move
* @param targetBlobName new name of the blob in the same bucket
*/
void moveBlob(String sourceBlobName, String targetBlobName) throws IOException {
final BlobId sourceBlobId = BlobId.of(bucket, sourceBlobName);
final BlobId targetBlobId = BlobId.of(bucket, targetBlobName);
final BlobId sourceBlobId = BlobId.of(bucketName, sourceBlobName);
final BlobId targetBlobId = BlobId.of(bucketName, targetBlobName);
final CopyRequest request = CopyRequest.newBuilder()
.setSource(sourceBlobId)
.setTarget(targetBlobId)
.build();
SocketAccess.doPrivilegedVoidIOException(() -> {
// There's no atomic "move" in GCS so we need to copy and delete
// There's no atomic "move" in GCS so we need to copy and delete
final Storage storage = client();
final boolean deleted = SocketAccess.doPrivilegedIOException(() -> {
storage.copy(request).getResult();
final boolean deleted = storage.delete(sourceBlobId);
if (deleted == false) {
throw new IOException("Failed to move source [" + sourceBlobName + "] to target [" + targetBlobName + "]");
}
return storage.delete(sourceBlobId);
});
if (deleted == false) {
throw new IOException("Failed to move source [" + sourceBlobName + "] to target [" + targetBlobName + "]");
}
}

private static String buildKey(String keyPath, String s) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,35 +24,34 @@
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.env.Environment;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.plugins.ReInitializablePlugin;
import org.elasticsearch.plugins.RepositoryPlugin;
import org.elasticsearch.repositories.Repository;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;

public class GoogleCloudStoragePlugin extends Plugin implements RepositoryPlugin {
public class GoogleCloudStoragePlugin extends Plugin implements RepositoryPlugin, ReInitializablePlugin {

private final Map<String, GoogleCloudStorageClientSettings> clientsSettings;
// package-private for tests
final GoogleCloudStorageService storageService;

public GoogleCloudStoragePlugin(final Settings settings) {
clientsSettings = GoogleCloudStorageClientSettings.load(settings);
}

protected Map<String, GoogleCloudStorageClientSettings> getClientsSettings() {
return clientsSettings;
this.storageService = createStorageService(settings);
// eagerly load client settings so that secure settings are readable (not closed)
reinit(settings);
}

// overridable for tests
protected GoogleCloudStorageService createStorageService(Environment environment) {
return new GoogleCloudStorageService(environment, clientsSettings);
protected GoogleCloudStorageService createStorageService(Settings settings) {
return new GoogleCloudStorageService(settings);
}

@Override
public Map<String, Repository.Factory> getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry) {
return Collections.singletonMap(GoogleCloudStorageRepository.TYPE,
(metadata) -> new GoogleCloudStorageRepository(metadata, env, namedXContentRegistry, createStorageService(env)));
(metadata) -> new GoogleCloudStorageRepository(metadata, env, namedXContentRegistry, this.storageService));
}

@Override
Expand All @@ -66,4 +65,16 @@ public List<Setting<?>> getSettings() {
GoogleCloudStorageClientSettings.APPLICATION_NAME_SETTING,
GoogleCloudStorageClientSettings.TOKEN_URI_SETTING);
}

@Override
public boolean reinit(Settings settings) {
// Secure settings should be readable inside this method. Duplicate client
// settings in a format (`GoogleCloudStorageClientSettings`) that does not
// require for the `SecureSettings` to be open. Pass that around (the
// `GoogleCloudStorageClientSettings` instance) instead of the `Settings`
// instance.
final Map<String, GoogleCloudStorageClientSettings> clientsSettings = GoogleCloudStorageClientSettings.load(settings);
this.storageService.updateClientsSettings(clientsSettings);
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,6 @@
import static org.elasticsearch.common.settings.Setting.byteSizeSetting;
import static org.elasticsearch.common.settings.Setting.simpleString;

import com.google.cloud.storage.Storage;

class GoogleCloudStorageRepository extends BlobStoreRepository {

// package private for testing
Expand Down Expand Up @@ -86,8 +84,7 @@ class GoogleCloudStorageRepository extends BlobStoreRepository {

logger.debug("using bucket [{}], base_path [{}], chunk_size [{}], compress [{}]", bucket, basePath, chunkSize, compress);

Storage client = SocketAccess.doPrivilegedIOException(() -> storageService.createClient(clientName));
this.blobStore = new GoogleCloudStorageBlobStore(settings, bucket, client);
this.blobStore = new GoogleCloudStorageBlobStore(settings, bucket, clientName, storageService);
}


Expand Down
Loading