Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions docs/plugins/repository-azure.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,19 @@ The Azure Repository plugin works with all Standard storage accounts
https://azure.microsoft.com/en-gb/documentation/articles/storage-premium-storage[Premium Locally Redundant Storage] (`Premium_LRS`) is **not supported** as it is only usable as VM disk storage, not as general storage.
===============================================

You can register a proxy per client using the following settings:

[source,yaml]
----
azure.client.default.proxy.host: proxy.host
azure.client.default.proxy.port: 8888
azure.client.default.proxy.type: http
----

Supported values for `proxy.type` are `direct` (default), `http` or `socks`.
When `proxy.type` is set to `http` or `socks`, `proxy.host` and `proxy.port` must be provided.


[[repository-azure-repository-settings]]
===== Repository settings

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,16 @@
import com.microsoft.azure.storage.LocationMode;
import com.microsoft.azure.storage.StorageException;
import org.elasticsearch.common.blobstore.BlobMetaData;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Proxy;
import java.net.URISyntaxException;
import java.util.Locale;
import java.util.Map;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import com.microsoft.azure.storage.CloudStorageAccount;
import com.microsoft.azure.storage.LocationMode;
import com.microsoft.azure.storage.OperationContext;
import com.microsoft.azure.storage.RetryExponentialRetry;
import com.microsoft.azure.storage.RetryPolicy;
import com.microsoft.azure.storage.StorageException;
Expand All @@ -29,6 +30,7 @@
import com.microsoft.azure.storage.blob.CloudBlobClient;
import com.microsoft.azure.storage.blob.CloudBlobContainer;
import com.microsoft.azure.storage.blob.CloudBlockBlob;
import com.microsoft.azure.storage.blob.DeleteSnapshotsOption;
import com.microsoft.azure.storage.blob.ListBlobItem;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
Expand Down Expand Up @@ -131,12 +133,23 @@ CloudBlobClient getSelectedClient(String clientName, LocationMode mode) {
return client;
}

private OperationContext generateOperationContext(String clientName) {
OperationContext context = new OperationContext();
AzureStorageSettings azureStorageSettings = this.storageSettings.get(clientName);

if (azureStorageSettings.getProxy() != null) {
context.setProxy(azureStorageSettings.getProxy());
}

return context;
}

@Override
public boolean doesContainerExist(String account, LocationMode mode, String container) {
try {
CloudBlobClient client = this.getSelectedClient(account, mode);
CloudBlobContainer blobContainer = client.getContainerReference(container);
return SocketAccess.doPrivilegedException(blobContainer::exists);
return SocketAccess.doPrivilegedException(() -> blobContainer.exists(null, null, generateOperationContext(account)));
} catch (Exception e) {
logger.error("can not access container [{}]", container);
}
Expand All @@ -148,7 +161,7 @@ public void removeContainer(String account, LocationMode mode, String container)
CloudBlobClient client = this.getSelectedClient(account, mode);
CloudBlobContainer blobContainer = client.getContainerReference(container);
logger.trace("removing container [{}]", container);
SocketAccess.doPrivilegedException(blobContainer::deleteIfExists);
SocketAccess.doPrivilegedException(() -> blobContainer.deleteIfExists(null, null, generateOperationContext(account)));
}

@Override
Expand All @@ -157,7 +170,7 @@ public void createContainer(String account, LocationMode mode, String container)
CloudBlobClient client = this.getSelectedClient(account, mode);
CloudBlobContainer blobContainer = client.getContainerReference(container);
logger.trace("creating container [{}]", container);
SocketAccess.doPrivilegedException(blobContainer::createIfNotExists);
SocketAccess.doPrivilegedException(() -> blobContainer.createIfNotExists(null, null, generateOperationContext(account)));
} catch (IllegalArgumentException e) {
logger.trace((Supplier<?>) () -> new ParameterizedMessage("fails creating container [{}]", container), e);
throw new RepositoryException(container, e.getMessage(), e);
Expand All @@ -174,7 +187,8 @@ public void deleteFiles(String account, LocationMode mode, String container, Str
SocketAccess.doPrivilegedVoidException(() -> {
if (blobContainer.exists()) {
// We list the blobs using a flat blob listing mode
for (ListBlobItem blobItem : blobContainer.listBlobs(path, true)) {
for (ListBlobItem blobItem : blobContainer.listBlobs(path, true, EnumSet.noneOf(BlobListingDetails.class), null,
generateOperationContext(account))) {
String blobName = blobNameFromUri(blobItem.getUri());
logger.trace("removing blob [{}] full URI was [{}]", blobName, blobItem.getUri());
deleteBlob(account, mode, container, blobName);
Expand Down Expand Up @@ -208,9 +222,9 @@ public boolean blobExists(String account, LocationMode mode, String container, S
// Container name must be lower case.
CloudBlobClient client = this.getSelectedClient(account, mode);
CloudBlobContainer blobContainer = client.getContainerReference(container);
if (SocketAccess.doPrivilegedException(blobContainer::exists)) {
if (SocketAccess.doPrivilegedException(() -> blobContainer.exists(null, null, generateOperationContext(account)))) {
CloudBlockBlob azureBlob = blobContainer.getBlockBlobReference(blob);
return SocketAccess.doPrivilegedException(azureBlob::exists);
return SocketAccess.doPrivilegedException(() -> azureBlob.exists(null, null, generateOperationContext(account)));
}

return false;
Expand All @@ -223,10 +237,11 @@ public void deleteBlob(String account, LocationMode mode, String container, Stri
// Container name must be lower case.
CloudBlobClient client = this.getSelectedClient(account, mode);
CloudBlobContainer blobContainer = client.getContainerReference(container);
if (SocketAccess.doPrivilegedException(blobContainer::exists)) {
if (SocketAccess.doPrivilegedException(() -> blobContainer.exists(null, null, generateOperationContext(account)))) {
logger.trace("container [{}]: blob [{}] found. removing.", container, blob);
CloudBlockBlob azureBlob = blobContainer.getBlockBlobReference(blob);
SocketAccess.doPrivilegedVoidException(azureBlob::delete);
SocketAccess.doPrivilegedVoidException(() -> azureBlob.delete(DeleteSnapshotsOption.NONE, null, null,
generateOperationContext(account)));
}
}

Expand All @@ -235,15 +250,15 @@ public InputStream getInputStream(String account, LocationMode mode, String cont
logger.trace("reading container [{}], blob [{}]", container, blob);
CloudBlobClient client = this.getSelectedClient(account, mode);
CloudBlockBlob blockBlobReference = client.getContainerReference(container).getBlockBlobReference(blob);
return SocketAccess.doPrivilegedException(blockBlobReference::openInputStream);
return SocketAccess.doPrivilegedException(() -> blockBlobReference.openInputStream(null, null, generateOperationContext(account)));
}

@Override
public OutputStream getOutputStream(String account, LocationMode mode, String container, String blob) throws URISyntaxException, StorageException {
logger.trace("writing container [{}], blob [{}]", container, blob);
CloudBlobClient client = this.getSelectedClient(account, mode);
CloudBlockBlob blockBlobReference = client.getContainerReference(container).getBlockBlobReference(blob);
return SocketAccess.doPrivilegedException(blockBlobReference::openOutputStream);
return SocketAccess.doPrivilegedException(() -> blockBlobReference.openOutputStream(null, null, generateOperationContext(account)));
}

@Override
Expand All @@ -260,7 +275,7 @@ public Map<String, BlobMetaData> listBlobsByPrefix(String account, LocationMode
SocketAccess.doPrivilegedVoidException(() -> {
if (blobContainer.exists()) {
for (ListBlobItem blobItem : blobContainer.listBlobs(keyPath + (prefix == null ? "" : prefix), false,
enumBlobListingDetails, null, null)) {
enumBlobListingDetails, null, generateOperationContext(account))) {
URI uri = blobItem.getUri();
logger.trace("blob url [{}]", uri);

Expand All @@ -284,11 +299,11 @@ public void moveBlob(String account, LocationMode mode, String container, String
CloudBlobClient client = this.getSelectedClient(account, mode);
CloudBlobContainer blobContainer = client.getContainerReference(container);
CloudBlockBlob blobSource = blobContainer.getBlockBlobReference(sourceBlob);
if (SocketAccess.doPrivilegedException(blobSource::exists)) {
if (SocketAccess.doPrivilegedException(() -> blobSource.exists(null, null, generateOperationContext(account)))) {
CloudBlockBlob blobTarget = blobContainer.getBlockBlobReference(targetBlob);
SocketAccess.doPrivilegedVoidException(() -> {
blobTarget.startCopy(blobSource);
blobSource.delete();
blobTarget.startCopy(blobSource, null, null, null, generateOperationContext(account));
blobSource.delete(DeleteSnapshotsOption.NONE, null, null, generateOperationContext(account));
});
logger.debug("moveBlob container [{}], sourceBlob [{}], targetBlob [{}] -> done", container, sourceBlob, targetBlob);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,55 +20,90 @@
package org.elasticsearch.cloud.azure.storage;

import com.microsoft.azure.storage.RetryPolicy;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.SecureSetting;
import org.elasticsearch.common.settings.SecureString;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.AffixSetting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsException;
import org.elasticsearch.common.unit.TimeValue;

import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Proxy;
import java.net.UnknownHostException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import java.util.Set;

public final class AzureStorageSettings {
// prefix for azure client settings
private static final String PREFIX = "azure.client.";

/**
* Azure account name
*/
public static final AffixSetting<SecureString> ACCOUNT_SETTING = Setting.affixKeySetting(PREFIX, "account",
key -> SecureSetting.secureString(key, null));
/** Azure account name */
public static final AffixSetting<SecureString> ACCOUNT_SETTING =
Setting.affixKeySetting(PREFIX, "account", key -> SecureSetting.secureString(key, null));

/**
* max_retries: Number of retries in case of Azure errors. Defaults to 3 (RetryPolicy.DEFAULT_CLIENT_RETRY_COUNT).
*/
/** max_retries: Number of retries in case of Azure errors. Defaults to 3 (RetryPolicy.DEFAULT_CLIENT_RETRY_COUNT). */
private static final Setting<Integer> MAX_RETRIES_SETTING =
Setting.affixKeySetting(PREFIX, "max_retries",
(key) -> Setting.intSetting(key, RetryPolicy.DEFAULT_CLIENT_RETRY_COUNT, Setting.Property.NodeScope));

/**
* Azure key
*/
/** Azure key */
public static final AffixSetting<SecureString> KEY_SETTING = Setting.affixKeySetting(PREFIX, "key",
key -> SecureSetting.secureString(key, null));

public static final AffixSetting<TimeValue> TIMEOUT_SETTING = Setting.affixKeySetting(PREFIX, "timeout",
(key) -> Setting.timeSetting(key, TimeValue.timeValueMinutes(-1), Property.NodeScope));

/** The type of the proxy to connect to azure through. Can be direct (no proxy, default), http or socks */
public static final AffixSetting<Proxy.Type> PROXY_TYPE_SETTING = Setting.affixKeySetting(PREFIX, "proxy.type",
(key) -> new Setting<>(key, "direct", s -> Proxy.Type.valueOf(s.toUpperCase(Locale.ROOT)), Property.NodeScope));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

direct is not one of the values in the docs added for proxy.type in this PR?


/** The host name of a proxy to connect to azure through. */
public static final Setting<String> PROXY_HOST_SETTING = Setting.affixKeySetting(PREFIX, "proxy.host",
(key) -> Setting.simpleString(key, Property.NodeScope));

/** The port of a proxy to connect to azure through. */
public static final Setting<Integer> PROXY_PORT_SETTING = Setting.affixKeySetting(PREFIX, "proxy.port",
(key) -> Setting.intSetting(key, 0, 0, 65535, Setting.Property.NodeScope));

private final String account;
private final String key;
private final TimeValue timeout;
private final int maxRetries;
private final Proxy proxy;


public AzureStorageSettings(String account, String key, TimeValue timeout, int maxRetries) {
public AzureStorageSettings(String account, String key, TimeValue timeout, int maxRetries, Proxy.Type proxyType, String proxyHost,
Integer proxyPort) {
this.account = account;
this.key = key;
this.timeout = timeout;
this.maxRetries = maxRetries;

// Register the proxy if we have any
// Validate proxy settings
if (proxyType.equals(Proxy.Type.DIRECT) && (proxyPort != 0 || Strings.hasText(proxyHost))) {
throw new SettingsException("Azure Proxy port or host have been set but proxy type is not defined.");
}
if (proxyType.equals(Proxy.Type.DIRECT) == false && (proxyPort == 0 || Strings.isEmpty(proxyHost))) {
throw new SettingsException("Azure Proxy type has been set but proxy host or port is not defined.");
}

if (proxyType.equals(Proxy.Type.DIRECT)) {
proxy = null;
} else {
try {
proxy = new Proxy(proxyType, new InetSocketAddress(InetAddress.getByName(proxyHost), proxyPort));
} catch (UnknownHostException e) {
throw new SettingsException("Azure proxy host is unknown.", e);
}
}
}

public String getKey() {
Expand All @@ -87,13 +122,18 @@ public int getMaxRetries() {
return maxRetries;
}

public Proxy getProxy() {
return proxy;
}

@Override
public String toString() {
final StringBuilder sb = new StringBuilder("AzureStorageSettings{");
sb.append(", account='").append(account).append('\'');
sb.append(", key='").append(key).append('\'');
sb.append(", timeout=").append(timeout);
sb.append(", maxRetries=").append(maxRetries);
sb.append(", proxy=").append(proxy);
sb.append('}');
return sb.toString();
}
Expand Down Expand Up @@ -127,7 +167,10 @@ static AzureStorageSettings getClientSettings(Settings settings, String clientNa
SecureString key = getConfigValue(settings, clientName, KEY_SETTING)) {
return new AzureStorageSettings(account.toString(), key.toString(),
getValue(settings, clientName, TIMEOUT_SETTING),
getValue(settings, clientName, MAX_RETRIES_SETTING));
getValue(settings, clientName, MAX_RETRIES_SETTING),
getValue(settings, clientName, PROXY_TYPE_SETTING),
getValue(settings, clientName, PROXY_HOST_SETTING),
getValue(settings, clientName, PROXY_PORT_SETTING));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,10 @@ public List<Setting<?>> getSettings() {
return Arrays.asList(
AzureStorageSettings.ACCOUNT_SETTING,
AzureStorageSettings.KEY_SETTING,
AzureStorageSettings.TIMEOUT_SETTING
AzureStorageSettings.TIMEOUT_SETTING,
AzureStorageSettings.PROXY_TYPE_SETTING,
AzureStorageSettings.PROXY_HOST_SETTING,
AzureStorageSettings.PROXY_PORT_SETTING
);
}
}
Loading