|
19 | 19 |
|
20 | 20 | package org.elasticsearch.discovery.ec2; |
21 | 21 |
|
22 | | -import java.io.Closeable; |
23 | | -import java.io.IOException; |
24 | 22 | import java.util.Random; |
| 23 | +import java.util.concurrent.atomic.AtomicReference; |
25 | 24 |
|
26 | | -import com.amazonaws.AmazonClientException; |
27 | | -import com.amazonaws.AmazonWebServiceRequest; |
28 | 25 | import com.amazonaws.ClientConfiguration; |
29 | 26 | import com.amazonaws.auth.AWSCredentialsProvider; |
30 | 27 | import com.amazonaws.auth.BasicAWSCredentials; |
|
35 | 32 | import com.amazonaws.services.ec2.AmazonEC2; |
36 | 33 | import com.amazonaws.services.ec2.AmazonEC2Client; |
37 | 34 | import org.apache.logging.log4j.Logger; |
| 35 | +import org.elasticsearch.ElasticsearchException; |
38 | 36 | import org.elasticsearch.common.Randomness; |
| 37 | +import org.elasticsearch.common.Strings; |
39 | 38 | import org.elasticsearch.common.component.AbstractComponent; |
40 | | -import org.elasticsearch.common.settings.SecureString; |
41 | 39 | import org.elasticsearch.common.settings.Settings; |
| 40 | +import org.elasticsearch.common.util.LazyInitializable; |
42 | 41 |
|
43 | | -class AwsEc2ServiceImpl extends AbstractComponent implements AwsEc2Service, Closeable { |
| 42 | +class AwsEc2ServiceImpl extends AbstractComponent implements AwsEc2Service { |
44 | 43 |
|
45 | 44 | public static final String EC2_METADATA_URL = "http://169.254.169.254/latest/meta-data/"; |
46 | 45 |
|
47 | | - private AmazonEC2Client client; |
| 46 | + private final AtomicReference<LazyInitializable<AmazonEc2Reference, ElasticsearchException>> lazyClientReference = |
| 47 | + new AtomicReference<>(); |
48 | 48 |
|
49 | 49 | AwsEc2ServiceImpl(Settings settings) { |
50 | 50 | super(settings); |
51 | 51 | } |
52 | 52 |
|
53 | | - @Override |
54 | | - public synchronized AmazonEC2 client() { |
55 | | - if (client != null) { |
56 | | - return client; |
57 | | - } |
58 | | - |
59 | | - this.client = new AmazonEC2Client(buildCredentials(logger, settings), buildConfiguration(logger, settings)); |
60 | | - String endpoint = findEndpoint(logger, settings); |
61 | | - if (endpoint != null) { |
62 | | - client.setEndpoint(endpoint); |
| 53 | + private AmazonEC2 buildClient(Ec2ClientSettings clientSettings) { |
| 54 | + final AWSCredentialsProvider credentials = buildCredentials(logger, clientSettings); |
| 55 | + final ClientConfiguration configuration = buildConfiguration(logger, clientSettings); |
| 56 | + final AmazonEC2 client = buildClient(credentials, configuration); |
| 57 | + if (Strings.hasText(clientSettings.endpoint)) { |
| 58 | + logger.debug("using explicit ec2 endpoint [{}]", clientSettings.endpoint); |
| 59 | + client.setEndpoint(clientSettings.endpoint); |
63 | 60 | } |
64 | | - |
65 | | - return this.client; |
| 61 | + return client; |
66 | 62 | } |
67 | 63 |
|
68 | | - protected static AWSCredentialsProvider buildCredentials(Logger logger, Settings settings) { |
69 | | - AWSCredentialsProvider credentials; |
70 | | - |
71 | | - try (SecureString key = ACCESS_KEY_SETTING.get(settings); |
72 | | - SecureString secret = SECRET_KEY_SETTING.get(settings)) { |
73 | | - if (key.length() == 0 && secret.length() == 0) { |
74 | | - logger.debug("Using either environment variables, system properties or instance profile credentials"); |
75 | | - credentials = new DefaultAWSCredentialsProviderChain(); |
76 | | - } else { |
77 | | - logger.debug("Using basic key/secret credentials"); |
78 | | - credentials = new StaticCredentialsProvider(new BasicAWSCredentials(key.toString(), secret.toString())); |
79 | | - } |
80 | | - } |
81 | | - |
82 | | - return credentials; |
| 64 | + // proxy for testing |
| 65 | + AmazonEC2 buildClient(AWSCredentialsProvider credentials, ClientConfiguration configuration) { |
| 66 | + final AmazonEC2 client = new AmazonEC2Client(credentials, configuration); |
| 67 | + return client; |
83 | 68 | } |
84 | 69 |
|
85 | | - protected static ClientConfiguration buildConfiguration(Logger logger, Settings settings) { |
86 | | - ClientConfiguration clientConfiguration = new ClientConfiguration(); |
| 70 | + // pkg private for tests |
| 71 | + static ClientConfiguration buildConfiguration(Logger logger, Ec2ClientSettings clientSettings) { |
| 72 | + final ClientConfiguration clientConfiguration = new ClientConfiguration(); |
87 | 73 | // the response metadata cache is only there for diagnostics purposes, |
88 | 74 | // but can force objects from every response to the old generation. |
89 | 75 | clientConfiguration.setResponseMetadataCacheSize(0); |
90 | | - clientConfiguration.setProtocol(PROTOCOL_SETTING.get(settings)); |
91 | | - |
92 | | - if (PROXY_HOST_SETTING.exists(settings)) { |
93 | | - String proxyHost = PROXY_HOST_SETTING.get(settings); |
94 | | - Integer proxyPort = PROXY_PORT_SETTING.get(settings); |
95 | | - try (SecureString proxyUsername = PROXY_USERNAME_SETTING.get(settings); |
96 | | - SecureString proxyPassword = PROXY_PASSWORD_SETTING.get(settings)) { |
97 | | - |
98 | | - clientConfiguration |
99 | | - .withProxyHost(proxyHost) |
100 | | - .withProxyPort(proxyPort) |
101 | | - .withProxyUsername(proxyUsername.toString()) |
102 | | - .withProxyPassword(proxyPassword.toString()); |
103 | | - } |
| 76 | + clientConfiguration.setProtocol(clientSettings.protocol); |
| 77 | + if (Strings.hasText(clientSettings.proxyHost)) { |
| 78 | + // TODO: remove this leniency, these settings should exist together and be validated |
| 79 | + clientConfiguration.setProxyHost(clientSettings.proxyHost); |
| 80 | + clientConfiguration.setProxyPort(clientSettings.proxyPort); |
| 81 | + clientConfiguration.setProxyUsername(clientSettings.proxyUsername); |
| 82 | + clientConfiguration.setProxyPassword(clientSettings.proxyPassword); |
104 | 83 | } |
105 | | - |
106 | 84 | // Increase the number of retries in case of 5xx API responses |
107 | 85 | final Random rand = Randomness.get(); |
108 | | - RetryPolicy retryPolicy = new RetryPolicy( |
| 86 | + final RetryPolicy retryPolicy = new RetryPolicy( |
109 | 87 | RetryPolicy.RetryCondition.NO_RETRY_CONDITION, |
110 | | - new RetryPolicy.BackoffStrategy() { |
111 | | - @Override |
112 | | - public long delayBeforeNextRetry(AmazonWebServiceRequest originalRequest, |
113 | | - AmazonClientException exception, |
114 | | - int retriesAttempted) { |
115 | | - // with 10 retries the max delay time is 320s/320000ms (10 * 2^5 * 1 * 1000) |
116 | | - logger.warn("EC2 API request failed, retry again. Reason was:", exception); |
117 | | - return 1000L * (long) (10d * Math.pow(2, retriesAttempted / 2.0d) * (1.0d + rand.nextDouble())); |
118 | | - } |
| 88 | + (originalRequest, exception, retriesAttempted) -> { |
| 89 | + // with 10 retries the max delay time is 320s/320000ms (10 * 2^5 * 1 * 1000) |
| 90 | + logger.warn("EC2 API request failed, retry again. Reason was:", exception); |
| 91 | + return 1000L * (long) (10d * Math.pow(2, retriesAttempted / 2.0d) * (1.0d + rand.nextDouble())); |
119 | 92 | }, |
120 | 93 | 10, |
121 | 94 | false); |
122 | 95 | clientConfiguration.setRetryPolicy(retryPolicy); |
123 | | - clientConfiguration.setSocketTimeout((int) READ_TIMEOUT_SETTING.get(settings).millis()); |
124 | | - |
| 96 | + clientConfiguration.setSocketTimeout(clientSettings.readTimeoutMillis); |
125 | 97 | return clientConfiguration; |
126 | 98 | } |
127 | 99 |
|
128 | | - protected static String findEndpoint(Logger logger, Settings settings) { |
129 | | - String endpoint = null; |
130 | | - if (ENDPOINT_SETTING.exists(settings)) { |
131 | | - endpoint = ENDPOINT_SETTING.get(settings); |
132 | | - logger.debug("using explicit ec2 endpoint [{}]", endpoint); |
| 100 | + // pkg private for tests |
| 101 | + static AWSCredentialsProvider buildCredentials(Logger logger, Ec2ClientSettings clientSettings) { |
| 102 | + final BasicAWSCredentials credentials = clientSettings.credentials; |
| 103 | + if (credentials == null) { |
| 104 | + logger.debug("Using either environment variables, system properties or instance profile credentials"); |
| 105 | + return new DefaultAWSCredentialsProviderChain(); |
| 106 | + } else { |
| 107 | + logger.debug("Using basic key/secret credentials"); |
| 108 | + return new StaticCredentialsProvider(credentials); |
133 | 109 | } |
134 | | - return endpoint; |
135 | 110 | } |
136 | 111 |
|
137 | 112 | @Override |
138 | | - public void close() throws IOException { |
139 | | - if (client != null) { |
140 | | - client.shutdown(); |
| 113 | + public AmazonEc2Reference client() { |
| 114 | + final LazyInitializable<AmazonEc2Reference, ElasticsearchException> clientReference = this.lazyClientReference.get(); |
| 115 | + if (clientReference == null) { |
| 116 | + throw new IllegalStateException("Missing ec2 client configs"); |
141 | 117 | } |
| 118 | + return clientReference.getOrCompute(); |
| 119 | + } |
142 | 120 |
|
143 | | - // Ensure that IdleConnectionReaper is shutdown |
| 121 | + /** |
| 122 | + * Refreshes the settings for the AmazonEC2 client. The new client will be build |
| 123 | + * using these new settings. The old client is usable until released. On release it |
| 124 | + * will be destroyed instead of being returned to the cache. |
| 125 | + */ |
| 126 | + @Override |
| 127 | + public void refreshAndClearCache(Ec2ClientSettings clientSettings) { |
| 128 | + final LazyInitializable<AmazonEc2Reference, ElasticsearchException> newClient = new LazyInitializable<>( |
| 129 | + () -> new AmazonEc2Reference(buildClient(clientSettings)), clientReference -> clientReference.incRef(), |
| 130 | + clientReference -> clientReference.decRef()); |
| 131 | + final LazyInitializable<AmazonEc2Reference, ElasticsearchException> oldClient = this.lazyClientReference.getAndSet(newClient); |
| 132 | + if (oldClient != null) { |
| 133 | + oldClient.reset(); |
| 134 | + } |
| 135 | + } |
| 136 | + |
| 137 | + @Override |
| 138 | + public void close() { |
| 139 | + final LazyInitializable<AmazonEc2Reference, ElasticsearchException> clientReference = this.lazyClientReference.getAndSet(null); |
| 140 | + if (clientReference != null) { |
| 141 | + clientReference.reset(); |
| 142 | + } |
| 143 | + // shutdown IdleConnectionReaper background thread |
| 144 | + // it will be restarted on new client usage |
144 | 145 | IdleConnectionReaper.shutdown(); |
145 | 146 | } |
| 147 | + |
146 | 148 | } |
0 commit comments