2828import com .amazonaws .internal .StaticCredentialsProvider ;
2929import com .amazonaws .services .s3 .AmazonS3 ;
3030import com .amazonaws .services .s3 .AmazonS3Client ;
31+
3132import org .apache .logging .log4j .Logger ;
32- import org .elasticsearch .ElasticsearchException ;
3333import org .elasticsearch .common .Strings ;
34- import org .elasticsearch .common .component .AbstractLifecycleComponent ;
35- import org .elasticsearch .common .logging .DeprecationLogger ;
36- import org .elasticsearch .common .settings .SecureString ;
37- import org .elasticsearch .common .settings .Setting ;
34+ import org .elasticsearch .common .collect .MapBuilder ;
35+ import org .elasticsearch .common .component .AbstractComponent ;
3836import org .elasticsearch .common .settings .Settings ;
39-
40- import java .util .HashMap ;
4137import java .util .Map ;
42- import java .util .function .Function ;
43-
44-
45- class InternalAwsS3Service extends AbstractLifecycleComponent implements AwsS3Service {
38+ import static java .util .Collections .emptyMap ;
4639
47- // pkg private for tests
48- static final Setting <String > CLIENT_NAME = new Setting <>("client" , "default" , Function .identity ());
4940
50- private final Map < String , S3ClientSettings > clientsSettings ;
41+ class InternalAwsS3Service extends AbstractComponent implements AwsS3Service {
5142
52- private final Map <String , AmazonS3Client > clientsCache = new HashMap <>();
43+ private volatile Map <String , AmazonS3Reference > clientsCache = emptyMap ();
44+ private volatile Map <String , S3ClientSettings > clientsSettings = emptyMap ();
5345
54- InternalAwsS3Service (Settings settings , Map < String , S3ClientSettings > clientsSettings ) {
46+ InternalAwsS3Service (Settings settings ) {
5547 super (settings );
56- this .clientsSettings = clientsSettings ;
5748 }
5849
50+ /**
51+ * Reloads the settings for the AmazonS3 client. New clients will be build using
52+ * these. Old clients are usable until released. On release they will be
53+ * destroyed contrary to being returned to the cache.
54+ */
5955 @ Override
60- public synchronized AmazonS3 client (Settings repositorySettings ) {
61- String clientName = CLIENT_NAME .get (repositorySettings );
62- AmazonS3Client client = clientsCache .get (clientName );
63- if (client != null ) {
64- return client ;
65- }
56+ public synchronized Map <String , S3ClientSettings > updateClientsSettings (Map <String , S3ClientSettings > clientsSettings ) {
57+ // shutdown all unused clients
58+ // others will shutdown on their respective release
59+ releaseCachedClients ();
60+ final Map <String , S3ClientSettings > prevSettings = this .clientsSettings ;
61+ this .clientsSettings = MapBuilder .newMapBuilder (clientsSettings ).immutableMap ();
62+ assert this .clientsSettings .containsKey ("default" ) : "always at least have 'default'" ;
63+ // clients are built lazily by {@link client(String)}
64+ return prevSettings ;
65+ }
6666
67- S3ClientSettings clientSettings = clientsSettings .get (clientName );
68- if (clientSettings == null ) {
69- throw new IllegalArgumentException ("Unknown s3 client name [" + clientName + "]. Existing client configs: " +
70- Strings .collectionToDelimitedString (clientsSettings .keySet (), "," ));
67+ /**
68+ * Attempts to retrieve a client by name from the cache. If the client does not
69+ * exist it will be created.
70+ */
71+ @ Override
72+ public AmazonS3Reference client (String clientName ) {
73+ AmazonS3Reference clientReference = clientsCache .get (clientName );
74+ if ((clientReference != null ) && clientReference .tryIncRef ()) {
75+ return clientReference ;
7176 }
77+ synchronized (this ) {
78+ clientReference = clientsCache .get (clientName );
79+ if ((clientReference != null ) && clientReference .tryIncRef ()) {
80+ return clientReference ;
81+ }
82+ final S3ClientSettings clientSettings = clientsSettings .get (clientName );
83+ if (clientSettings == null ) {
84+ throw new IllegalArgumentException ("Unknown s3 client name [" + clientName + "]. Existing client configs: "
85+ + Strings .collectionToDelimitedString (clientsSettings .keySet (), "," ));
86+ }
87+ logger .debug ("creating S3 client with client_name [{}], endpoint [{}]" , clientName , clientSettings .endpoint );
88+ clientReference = new AmazonS3Reference (buildClient (clientSettings ));
89+ clientReference .incRef ();
90+ clientsCache = MapBuilder .newMapBuilder (clientsCache ).put (clientName , clientReference ).immutableMap ();
91+ return clientReference ;
92+ }
93+ }
7294
73- logger .debug ("creating S3 client with client_name [{}], endpoint [{}]" , clientName , clientSettings .endpoint );
74-
75- AWSCredentialsProvider credentials = buildCredentials (logger , deprecationLogger , clientSettings , repositorySettings );
76- ClientConfiguration configuration = buildConfiguration (clientSettings );
77-
78- client = new AmazonS3Client (credentials , configuration );
79-
95+ private AmazonS3 buildClient (S3ClientSettings clientSettings ) {
96+ final AWSCredentialsProvider credentials = buildCredentials (logger , clientSettings );
97+ final ClientConfiguration configuration = buildConfiguration (clientSettings );
98+ final AmazonS3 client = buildClient (credentials , configuration );
8099 if (Strings .hasText (clientSettings .endpoint )) {
81100 client .setEndpoint (clientSettings .endpoint );
82101 }
83-
84- clientsCache .put (clientName , client );
85102 return client ;
86103 }
87104
105+ // proxy for testing
106+ AmazonS3 buildClient (AWSCredentialsProvider credentials , ClientConfiguration configuration ) {
107+ return new AmazonS3Client (credentials , configuration );
108+ }
109+
88110 // pkg private for tests
89111 static ClientConfiguration buildConfiguration (S3ClientSettings clientSettings ) {
90- ClientConfiguration clientConfiguration = new ClientConfiguration ();
112+ final ClientConfiguration clientConfiguration = new ClientConfiguration ();
91113 // the response metadata cache is only there for diagnostics purposes,
92114 // but can force objects from every response to the old generation.
93115 clientConfiguration .setResponseMetadataCacheSize (0 );
@@ -109,27 +131,8 @@ static ClientConfiguration buildConfiguration(S3ClientSettings clientSettings) {
109131 }
110132
111133 // pkg private for tests
112- static AWSCredentialsProvider buildCredentials (Logger logger , DeprecationLogger deprecationLogger ,
113- S3ClientSettings clientSettings , Settings repositorySettings ) {
114-
115-
116- BasicAWSCredentials credentials = clientSettings .credentials ;
117- if (S3Repository .ACCESS_KEY_SETTING .exists (repositorySettings )) {
118- if (S3Repository .SECRET_KEY_SETTING .exists (repositorySettings ) == false ) {
119- throw new IllegalArgumentException ("Repository setting [" + S3Repository .ACCESS_KEY_SETTING .getKey () +
120- " must be accompanied by setting [" + S3Repository .SECRET_KEY_SETTING .getKey () + "]" );
121- }
122- try (SecureString key = S3Repository .ACCESS_KEY_SETTING .get (repositorySettings );
123- SecureString secret = S3Repository .SECRET_KEY_SETTING .get (repositorySettings )) {
124- credentials = new BasicAWSCredentials (key .toString (), secret .toString ());
125- }
126- // backcompat for reading keys out of repository settings
127- deprecationLogger .deprecated ("Using s3 access/secret key from repository settings. Instead " +
128- "store these in named clients and the elasticsearch keystore for secure settings." );
129- } else if (S3Repository .SECRET_KEY_SETTING .exists (repositorySettings )) {
130- throw new IllegalArgumentException ("Repository setting [" + S3Repository .SECRET_KEY_SETTING .getKey () +
131- " must be accompanied by setting [" + S3Repository .ACCESS_KEY_SETTING .getKey () + "]" );
132- }
134+ static AWSCredentialsProvider buildCredentials (Logger logger , S3ClientSettings clientSettings ) {
135+ final BasicAWSCredentials credentials = clientSettings .credentials ;
133136 if (credentials == null ) {
134137 logger .debug ("Using instance profile credentials" );
135138 return new PrivilegedInstanceProfileCredentialsProvider ();
@@ -140,20 +143,15 @@ static AWSCredentialsProvider buildCredentials(Logger logger, DeprecationLogger
140143 }
141144
142145 @ Override
143- protected void doStart () throws ElasticsearchException {
144- }
145-
146- @ Override
147- protected void doStop () throws ElasticsearchException {
148- }
149-
150- @ Override
151- protected void doClose () throws ElasticsearchException {
152- for (AmazonS3Client client : clientsCache .values ()) {
153- client .shutdown ();
146+ public synchronized void releaseCachedClients () {
147+ // the clients will shutdown when they will not be used anymore
148+ for (final AmazonS3Reference clientReference : clientsCache .values ()) {
149+ clientReference .decRef ();
154150 }
155-
156- // Ensure that IdleConnectionReaper is shutdown
151+ // clear previously cached clients, they will be build lazily
152+ clientsCache = emptyMap ();
153+ // shutdown IdleConnectionReaper background thread
154+ // it will be restarted on new client usage
157155 IdleConnectionReaper .shutdown ();
158156 }
159157
@@ -174,4 +172,5 @@ public void refresh() {
174172 SocketAccess .doPrivilegedVoid (credentials ::refresh );
175173 }
176174 }
175+
177176}
0 commit comments