Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.blobstore.BlobMetaData;
Expand All @@ -50,6 +51,7 @@
import java.nio.channels.WritableByteChannel;
import java.nio.file.FileAlreadyExistsException;
import java.nio.file.NoSuchFileException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
Expand All @@ -70,11 +72,13 @@ class GoogleCloudStorageBlobStore implements BlobStore {

private final String bucketName;
private final String clientName;
private final String encryptionKeyName;
private final GoogleCloudStorageService storageService;

GoogleCloudStorageBlobStore(String bucketName, String clientName, GoogleCloudStorageService storageService) {
GoogleCloudStorageBlobStore(String bucketName, String clientName, String encryptionKeyName, GoogleCloudStorageService storageService) {
this.bucketName = bucketName;
this.clientName = clientName;
this.encryptionKeyName = encryptionKeyName;
this.storageService = storageService;
if (doesBucketExist(bucketName) == false) {
throw new BlobStoreException("Bucket [" + bucketName + "] does not exist");
Expand Down Expand Up @@ -218,9 +222,15 @@ void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean
*/
private void writeBlobResumable(BlobInfo blobInfo, InputStream inputStream, boolean failIfAlreadyExists) throws IOException {
try {
final Storage.BlobWriteOption[] writeOptions = failIfAlreadyExists ?
new Storage.BlobWriteOption[] { Storage.BlobWriteOption.doesNotExist() } :
new Storage.BlobWriteOption[0];
final List<Storage.BlobWriteOption> writeOptionsBuilder = new ArrayList<>(2);
if (Strings.hasText(this.encryptionKeyName)) {
writeOptionsBuilder.add(Storage.BlobWriteOption.kmsKeyName(this.encryptionKeyName));
}
if (failIfAlreadyExists) {
writeOptionsBuilder.add(Storage.BlobWriteOption.doesNotExist());
}
final Storage.BlobWriteOption[] writeOptions = writeOptionsBuilder
.toArray(new Storage.BlobWriteOption[writeOptionsBuilder.size()]);
final WriteChannel writeChannel = SocketAccess
.doPrivilegedIOException(() -> client().writer(blobInfo, writeOptions));
Streams.copy(inputStream, Channels.newOutputStream(new WritableByteChannel() {
Expand Down Expand Up @@ -264,9 +274,15 @@ private void writeBlobMultipart(BlobInfo blobInfo, InputStream inputStream, long
final ByteArrayOutputStream baos = new ByteArrayOutputStream(Math.toIntExact(blobSize));
Streams.copy(inputStream, baos);
try {
final Storage.BlobTargetOption[] targetOptions = failIfAlreadyExists ?
new Storage.BlobTargetOption[] { Storage.BlobTargetOption.doesNotExist() } :
new Storage.BlobTargetOption[0];
final List<Storage.BlobTargetOption> targetOptionsBuilder = new ArrayList<>(2);
if (Strings.hasText(this.encryptionKeyName)) {
targetOptionsBuilder.add(Storage.BlobTargetOption.kmsKeyName(this.encryptionKeyName));
}
if (failIfAlreadyExists) {
targetOptionsBuilder.add(Storage.BlobTargetOption.doesNotExist());
}
final Storage.BlobTargetOption[] targetOptions = targetOptionsBuilder
.toArray(new Storage.BlobTargetOption[targetOptionsBuilder.size()]);
SocketAccess.doPrivilegedVoidIOException(
() -> client().create(blobInfo, baos.toByteArray(), targetOptions));
} catch (final StorageException se) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ class GoogleCloudStorageRepository extends BlobStoreRepository {
static final Setting<ByteSizeValue> CHUNK_SIZE =
byteSizeSetting("chunk_size", MAX_CHUNK_SIZE, MIN_CHUNK_SIZE, MAX_CHUNK_SIZE, Property.NodeScope, Property.Dynamic);
static final Setting<String> CLIENT_NAME = new Setting<>("client", "default", Function.identity());
// KMS_KEY_NAME in Google's lingo
static final Setting<String> ENCRYPTION_KEY_NAME = simpleString("encryption_key_name", Property.NodeScope, Property.Dynamic);

private final Settings settings;
private final GoogleCloudStorageService storageService;
Expand All @@ -66,6 +68,7 @@ class GoogleCloudStorageRepository extends BlobStoreRepository {
private final ByteSizeValue chunkSize;
private final String bucket;
private final String clientName;
private final String encryptionKeyName;

GoogleCloudStorageRepository(RepositoryMetaData metadata, Environment environment,
NamedXContentRegistry namedXContentRegistry,
Expand All @@ -89,12 +92,14 @@ class GoogleCloudStorageRepository extends BlobStoreRepository {
this.chunkSize = getSetting(CHUNK_SIZE, metadata);
this.bucket = getSetting(BUCKET, metadata);
this.clientName = CLIENT_NAME.get(metadata.settings());
logger.debug("using bucket [{}], base_path [{}], chunk_size [{}], compress [{}]", bucket, basePath, chunkSize, compress);
this.encryptionKeyName = ENCRYPTION_KEY_NAME.get(metadata.settings());
logger.debug("using bucket [{}], base_path [{}], chunk_size [{}], compress [{}], encryption_key_name [{}]", bucket, basePath,
chunkSize, compress, encryptionKeyName);
}

@Override
protected GoogleCloudStorageBlobStore createBlobStore() {
return new GoogleCloudStorageBlobStore(bucket, clientName, storageService);
return new GoogleCloudStorageBlobStore(bucket, clientName, encryptionKeyName, storageService);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,13 @@ public class GoogleCloudStorageBlobStoreContainerTests extends ESBlobStoreContai
protected BlobStore newBlobStore() {
final String bucketName = randomAlphaOfLength(randomIntBetween(1, 10)).toLowerCase(Locale.ROOT);
final String clientName = randomAlphaOfLength(randomIntBetween(1, 10)).toLowerCase(Locale.ROOT);
final String encryptionKeyName = randomAlphaOfLength(randomIntBetween(1, 10)).toLowerCase(Locale.ROOT);
final GoogleCloudStorageService storageService = mock(GoogleCloudStorageService.class);
try {
when(storageService.client(any(String.class))).thenReturn(new MockStorage(bucketName, new ConcurrentHashMap<>()));
} catch (final Exception e) {
throw new RuntimeException(e);
}
return new GoogleCloudStorageBlobStore(bucketName, clientName, storageService);
return new GoogleCloudStorageBlobStore(bucketName, clientName, encryptionKeyName, storageService);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,13 @@ public class GoogleCloudStorageBlobStoreTests extends ESBlobStoreTestCase {
protected BlobStore newBlobStore() {
final String bucketName = randomAlphaOfLength(randomIntBetween(1, 10)).toLowerCase(Locale.ROOT);
final String clientName = randomAlphaOfLength(randomIntBetween(1, 10)).toLowerCase(Locale.ROOT);
final String encryptionKeyName = randomAlphaOfLength(randomIntBetween(1, 10)).toLowerCase(Locale.ROOT);
final GoogleCloudStorageService storageService = mock(GoogleCloudStorageService.class);
try {
when(storageService.client(any(String.class))).thenReturn(new MockStorage(bucketName, new ConcurrentHashMap<>()));
} catch (final Exception e) {
throw new RuntimeException(e);
}
return new GoogleCloudStorageBlobStore(bucketName, clientName, storageService);
return new GoogleCloudStorageBlobStore(bucketName, clientName, encryptionKeyName, storageService);
}
}