From 878d7317205a073deac56455adfb5d349b23cab6 Mon Sep 17 00:00:00 2001 From: Armin Date: Sun, 17 Mar 2019 19:32:19 +0100 Subject: [PATCH 01/21] add threadpool to blobstore repository --- .../repository/url/URLRepositoryPlugin.java | 7 +++++-- .../repositories/url/URLRepository.java | 5 +++-- .../repositories/url/URLRepositoryTests.java | 19 ++++++++++++++++- .../repositories/azure/AzureRepository.java | 5 +++-- .../azure/AzureRepositoryPlugin.java | 7 +++++-- .../azure/AzureRepositorySettingsTests.java | 4 +++- .../gcs/GoogleCloudStoragePlugin.java | 7 +++++-- .../gcs/GoogleCloudStorageRepository.java | 5 +++-- .../repositories/hdfs/HdfsPlugin.java | 6 ++++-- .../repositories/hdfs/HdfsRepository.java | 5 +++-- .../repositories/s3/S3Repository.java | 5 +++-- .../repositories/s3/S3RepositoryPlugin.java | 11 ++++++---- .../s3/RepositoryCredentialsTests.java | 21 +++++++++++-------- .../s3/S3BlobStoreRepositoryTests.java | 8 ++++--- .../repositories/s3/S3RepositoryTests.java | 4 +++- .../plugins/RepositoryPlugin.java | 4 +++- .../repositories/RepositoriesModule.java | 4 ++-- .../blobstore/BlobStoreRepository.java | 7 +++++-- .../repositories/fs/FsRepository.java | 7 ++++--- .../repositories/RepositoriesModuleTests.java | 12 ++++++----- .../BlobStoreRepositoryRestoreTests.java | 2 +- .../blobstore/BlobStoreRepositoryTests.java | 8 ++++--- ...etadataLoadingDuringSnapshotRestoreIT.java | 11 ++++++---- .../snapshots/SnapshotResiliencyTests.java | 2 +- .../snapshots/mockstore/MockRepository.java | 10 +++++---- .../elasticsearch/xpack/core/XPackPlugin.java | 3 ++- .../snapshots/SourceOnlySnapshotIT.java | 4 +++- .../SourceOnlySnapshotShardTests.java | 2 +- .../core/LocalStateCompositeXPackPlugin.java | 7 ++++--- 29 files changed, 133 insertions(+), 69 deletions(-) diff --git a/modules/repository-url/src/main/java/org/elasticsearch/plugin/repository/url/URLRepositoryPlugin.java b/modules/repository-url/src/main/java/org/elasticsearch/plugin/repository/url/URLRepositoryPlugin.java index a28413d213a97..6e88f0e0deb54 100644 --- a/modules/repository-url/src/main/java/org/elasticsearch/plugin/repository/url/URLRepositoryPlugin.java +++ b/modules/repository-url/src/main/java/org/elasticsearch/plugin/repository/url/URLRepositoryPlugin.java @@ -26,6 +26,7 @@ import org.elasticsearch.plugins.RepositoryPlugin; import org.elasticsearch.repositories.Repository; import org.elasticsearch.repositories.url.URLRepository; +import org.elasticsearch.threadpool.ThreadPool; import java.util.Arrays; import java.util.Collections; @@ -44,7 +45,9 @@ public List> getSettings() { } @Override - public Map getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry) { - return Collections.singletonMap(URLRepository.TYPE, metadata -> new URLRepository(metadata, env, namedXContentRegistry)); + public Map getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry, + ThreadPool threadPool) { + return Collections.singletonMap(URLRepository.TYPE, + metadata -> new URLRepository(metadata, env, namedXContentRegistry, threadPool)); } } diff --git a/modules/repository-url/src/main/java/org/elasticsearch/repositories/url/URLRepository.java b/modules/repository-url/src/main/java/org/elasticsearch/repositories/url/URLRepository.java index 4728e1b0d9eb6..0ea2a1b72c574 100644 --- a/modules/repository-url/src/main/java/org/elasticsearch/repositories/url/URLRepository.java +++ b/modules/repository-url/src/main/java/org/elasticsearch/repositories/url/URLRepository.java @@ -33,6 +33,7 @@ import org.elasticsearch.env.Environment; import org.elasticsearch.repositories.RepositoryException; import org.elasticsearch.repositories.blobstore.BlobStoreRepository; +import org.elasticsearch.threadpool.ThreadPool; import java.net.MalformedURLException; import java.net.URISyntaxException; @@ -82,8 +83,8 @@ public class URLRepository extends BlobStoreRepository { * Constructs a read-only URL-based repository */ public URLRepository(RepositoryMetaData metadata, Environment environment, - NamedXContentRegistry namedXContentRegistry) { - super(metadata, environment.settings(), namedXContentRegistry); + NamedXContentRegistry namedXContentRegistry, ThreadPool threadPool) { + super(metadata, environment.settings(), namedXContentRegistry, threadPool); if (URL_SETTING.exists(metadata.settings()) == false && REPOSITORIES_URL_SETTING.exists(environment.settings()) == false) { throw new RepositoryException(metadata.name(), "missing url"); diff --git a/modules/repository-url/src/test/java/org/elasticsearch/repositories/url/URLRepositoryTests.java b/modules/repository-url/src/test/java/org/elasticsearch/repositories/url/URLRepositoryTests.java index 2de4c132673db..001ca992d5390 100644 --- a/modules/repository-url/src/test/java/org/elasticsearch/repositories/url/URLRepositoryTests.java +++ b/modules/repository-url/src/test/java/org/elasticsearch/repositories/url/URLRepositoryTests.java @@ -26,10 +26,13 @@ import org.elasticsearch.env.TestEnvironment; import org.elasticsearch.repositories.RepositoryException; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.TestThreadPool; +import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; import java.nio.file.Path; import java.util.Collections; +import java.util.concurrent.TimeUnit; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.CoreMatchers.not; @@ -37,9 +40,23 @@ public class URLRepositoryTests extends ESTestCase { + private ThreadPool threadPool; + + @Override + public void setUp() throws Exception { + super.setUp(); + threadPool = new TestThreadPool("URLRepositoryTests"); + } + + @Override + public void tearDown() throws Exception { + super.tearDown(); + ThreadPool.terminate(threadPool, 1L, TimeUnit.MINUTES); + } + private URLRepository createRepository(Settings baseSettings, RepositoryMetaData repositoryMetaData) { return new URLRepository(repositoryMetaData, TestEnvironment.newEnvironment(baseSettings), - new NamedXContentRegistry(Collections.emptyList())) { + new NamedXContentRegistry(Collections.emptyList()), threadPool) { @Override protected void assertSnapshotOrGenericThread() { // eliminate thread name check as we create repo manually on test/main threads diff --git a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureRepository.java b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureRepository.java index f1790347c736a..33ee9b64c2683 100644 --- a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureRepository.java +++ b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureRepository.java @@ -38,6 +38,7 @@ import org.elasticsearch.repositories.blobstore.BlobStoreRepository; import org.elasticsearch.snapshots.SnapshotCreationException; import org.elasticsearch.snapshots.SnapshotId; +import org.elasticsearch.threadpool.ThreadPool; import java.net.URISyntaxException; import java.util.List; @@ -84,8 +85,8 @@ public static final class Repository { private final boolean readonly; public AzureRepository(RepositoryMetaData metadata, Environment environment, NamedXContentRegistry namedXContentRegistry, - AzureStorageService storageService) { - super(metadata, environment.settings(), namedXContentRegistry); + AzureStorageService storageService, ThreadPool threadPool) { + super(metadata, environment.settings(), namedXContentRegistry, threadPool); this.chunkSize = Repository.CHUNK_SIZE_SETTING.get(metadata.settings()); this.storageService = storageService; diff --git a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureRepositoryPlugin.java b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureRepositoryPlugin.java index c6e8335bd5a6d..ab48cf1314ec5 100644 --- a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureRepositoryPlugin.java +++ b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureRepositoryPlugin.java @@ -28,6 +28,8 @@ import org.elasticsearch.plugins.ReloadablePlugin; import org.elasticsearch.plugins.RepositoryPlugin; import org.elasticsearch.repositories.Repository; +import org.elasticsearch.threadpool.ThreadPool; + import java.util.Arrays; import java.util.Collections; import java.util.List; @@ -47,9 +49,10 @@ public AzureRepositoryPlugin(Settings settings) { } @Override - public Map getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry) { + public Map getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry, + ThreadPool threadPool) { return Collections.singletonMap(AzureRepository.TYPE, - (metadata) -> new AzureRepository(metadata, env, namedXContentRegistry, azureStoreService)); + (metadata) -> new AzureRepository(metadata, env, namedXContentRegistry, azureStoreService, threadPool)); } @Override diff --git a/plugins/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureRepositorySettingsTests.java b/plugins/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureRepositorySettingsTests.java index 43891a8e9d57c..71f16b1413a01 100644 --- a/plugins/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureRepositorySettingsTests.java +++ b/plugins/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureRepositorySettingsTests.java @@ -28,6 +28,7 @@ import org.elasticsearch.env.Environment; import org.elasticsearch.env.TestEnvironment; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.ThreadPool; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.nullValue; @@ -42,7 +43,8 @@ private AzureRepository azureRepository(Settings settings) { .put(settings) .build(); final AzureRepository azureRepository = new AzureRepository(new RepositoryMetaData("foo", "azure", internalSettings), - TestEnvironment.newEnvironment(internalSettings), NamedXContentRegistry.EMPTY, mock(AzureStorageService.class)); + TestEnvironment.newEnvironment(internalSettings), NamedXContentRegistry.EMPTY, mock(AzureStorageService.class), + mock(ThreadPool.class)); assertThat(azureRepository.getBlobStore(), is(nullValue())); return azureRepository; } diff --git a/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStoragePlugin.java b/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStoragePlugin.java index 3186d2547a327..8e46b305a3350 100644 --- a/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStoragePlugin.java +++ b/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStoragePlugin.java @@ -27,6 +27,8 @@ import org.elasticsearch.plugins.ReloadablePlugin; import org.elasticsearch.plugins.RepositoryPlugin; import org.elasticsearch.repositories.Repository; +import org.elasticsearch.threadpool.ThreadPool; + import java.util.Arrays; import java.util.Collections; import java.util.List; @@ -49,9 +51,10 @@ protected GoogleCloudStorageService createStorageService() { } @Override - public Map getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry) { + public Map getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry, + ThreadPool threadPool) { return Collections.singletonMap(GoogleCloudStorageRepository.TYPE, - (metadata) -> new GoogleCloudStorageRepository(metadata, env, namedXContentRegistry, this.storageService)); + metadata -> new GoogleCloudStorageRepository(metadata, env, namedXContentRegistry, this.storageService, threadPool)); } @Override diff --git a/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageRepository.java b/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageRepository.java index 9bcd6a8f6c527..804fafd5e855e 100644 --- a/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageRepository.java +++ b/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageRepository.java @@ -31,6 +31,7 @@ import org.elasticsearch.env.Environment; import org.elasticsearch.repositories.RepositoryException; import org.elasticsearch.repositories.blobstore.BlobStoreRepository; +import org.elasticsearch.threadpool.ThreadPool; import java.util.function.Function; @@ -63,8 +64,8 @@ class GoogleCloudStorageRepository extends BlobStoreRepository { GoogleCloudStorageRepository(RepositoryMetaData metadata, Environment environment, NamedXContentRegistry namedXContentRegistry, - GoogleCloudStorageService storageService) { - super(metadata, environment.settings(), namedXContentRegistry); + GoogleCloudStorageService storageService, ThreadPool threadPool) { + super(metadata, environment.settings(), namedXContentRegistry, threadPool); this.storageService = storageService; String basePath = BASE_PATH.get(metadata.settings()); diff --git a/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsPlugin.java b/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsPlugin.java index c0b3d805bcc8f..a6dc5fe7db140 100644 --- a/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsPlugin.java +++ b/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsPlugin.java @@ -36,6 +36,7 @@ import org.elasticsearch.plugins.Plugin; import org.elasticsearch.plugins.RepositoryPlugin; import org.elasticsearch.repositories.Repository; +import org.elasticsearch.threadpool.ThreadPool; public final class HdfsPlugin extends Plugin implements RepositoryPlugin { @@ -110,7 +111,8 @@ private static Void eagerInit() { } @Override - public Map getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry) { - return Collections.singletonMap("hdfs", (metadata) -> new HdfsRepository(metadata, env, namedXContentRegistry)); + public Map getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry, + ThreadPool threadPool) { + return Collections.singletonMap("hdfs", (metadata) -> new HdfsRepository(metadata, env, namedXContentRegistry, threadPool)); } } diff --git a/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsRepository.java b/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsRepository.java index ac0ed7d24cf5e..b614753d83883 100644 --- a/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsRepository.java +++ b/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsRepository.java @@ -40,6 +40,7 @@ import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.env.Environment; import org.elasticsearch.repositories.blobstore.BlobStoreRepository; +import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; import java.io.UncheckedIOException; @@ -67,8 +68,8 @@ public final class HdfsRepository extends BlobStoreRepository { private static final ByteSizeValue DEFAULT_BUFFER_SIZE = new ByteSizeValue(100, ByteSizeUnit.KB); public HdfsRepository(RepositoryMetaData metadata, Environment environment, - NamedXContentRegistry namedXContentRegistry) { - super(metadata, environment.settings(), namedXContentRegistry); + NamedXContentRegistry namedXContentRegistry, ThreadPool threadPool) { + super(metadata, environment.settings(), namedXContentRegistry, threadPool); this.environment = environment; this.chunkSize = metadata.settings().getAsBytesSize("chunk_size", null); diff --git a/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java b/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java index 522f15661bd64..771cea7296bd8 100644 --- a/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java +++ b/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java @@ -36,6 +36,7 @@ import org.elasticsearch.monitor.jvm.JvmInfo; import org.elasticsearch.repositories.RepositoryException; import org.elasticsearch.repositories.blobstore.BlobStoreRepository; +import org.elasticsearch.threadpool.ThreadPool; import java.util.function.Function; @@ -165,8 +166,8 @@ class S3Repository extends BlobStoreRepository { S3Repository(final RepositoryMetaData metadata, final Settings settings, final NamedXContentRegistry namedXContentRegistry, - final S3Service service) { - super(metadata, settings, namedXContentRegistry); + final S3Service service, final ThreadPool threadPool) { + super(metadata, settings, namedXContentRegistry, threadPool); this.service = service; this.repositoryMetaData = metadata; diff --git a/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RepositoryPlugin.java b/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RepositoryPlugin.java index a2f9da5f846ef..bb044771e6085 100644 --- a/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RepositoryPlugin.java +++ b/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RepositoryPlugin.java @@ -30,6 +30,7 @@ import org.elasticsearch.plugins.ReloadablePlugin; import org.elasticsearch.plugins.RepositoryPlugin; import org.elasticsearch.repositories.Repository; +import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; import java.security.AccessController; @@ -77,13 +78,15 @@ public S3RepositoryPlugin(final Settings settings) { // proxy method for testing protected S3Repository createRepository(final RepositoryMetaData metadata, final Settings settings, - final NamedXContentRegistry registry) { - return new S3Repository(metadata, settings, registry, service); + final NamedXContentRegistry registry, final ThreadPool threadPool) { + return new S3Repository(metadata, settings, registry, service, threadPool); } @Override - public Map getRepositories(final Environment env, final NamedXContentRegistry registry) { - return Collections.singletonMap(S3Repository.TYPE, (metadata) -> createRepository(metadata, env.settings(), registry)); + public Map getRepositories(final Environment env, final NamedXContentRegistry registry, + final ThreadPool threadPool) { + return Collections.singletonMap(S3Repository.TYPE, + metadata -> createRepository(metadata, env.settings(), registry, threadPool)); } @Override diff --git a/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/RepositoryCredentialsTests.java b/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/RepositoryCredentialsTests.java index ca5893b57b2a4..89cc35ccf0cc3 100644 --- a/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/RepositoryCredentialsTests.java +++ b/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/RepositoryCredentialsTests.java @@ -30,12 +30,14 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; import java.security.AccessController; import java.security.PrivilegedAction; import static org.hamcrest.Matchers.is; +import static org.mockito.Mockito.mock; @SuppressForbidden(reason = "test fixture requires System.setProperty") public class RepositoryCredentialsTests extends ESTestCase { @@ -61,9 +63,9 @@ static final class ClientAndCredentials extends AmazonS3Wrapper { } static final class ProxyS3Service extends S3Service { - + private static final Logger logger = LogManager.getLogger(ProxyS3Service.class); - + @Override AmazonS3 buildClient(final S3ClientSettings clientSettings) { final AmazonS3 client = super.buildClient(clientSettings); @@ -77,8 +79,9 @@ AmazonS3 buildClient(final S3ClientSettings clientSettings) { } @Override - protected S3Repository createRepository(RepositoryMetaData metadata, Settings settings, NamedXContentRegistry registry) { - return new S3Repository(metadata, settings, registry, service){ + protected S3Repository createRepository(RepositoryMetaData metadata, Settings settings, NamedXContentRegistry registry, + ThreadPool threadPool) { + return new S3Repository(metadata, settings, registry, service, threadPool){ @Override protected void assertSnapshotOrGenericThread() { // eliminate thread name check as we create repo manually on test/main threads @@ -106,7 +109,7 @@ public void testRepositoryCredentialsOverrideSecureCredentials() throws IOExcept .put(S3Repository.ACCESS_KEY_SETTING.getKey(), "insecure_aws_key") .put(S3Repository.SECRET_KEY_SETTING.getKey(), "insecure_aws_secret").build()); try (S3RepositoryPlugin s3Plugin = new ProxyS3RepositoryPlugin(settings); - S3Repository s3repo = createAndStartRepository(metadata, s3Plugin); + S3Repository s3repo = createAndStartRepository(metadata, s3Plugin, mock(ThreadPool.class)); AmazonS3Reference s3Ref = ((S3BlobStore) s3repo.blobStore()).clientReference()) { final AWSCredentials credentials = ((ProxyS3RepositoryPlugin.ClientAndCredentials) s3Ref.client()).credentials.getCredentials(); assertThat(credentials.getAWSAccessKeyId(), is("insecure_aws_key")); @@ -129,7 +132,7 @@ public void testRepositoryCredentialsOnly() throws IOException { .put(S3Repository.SECRET_KEY_SETTING.getKey(), "insecure_aws_secret") .build()); try (S3RepositoryPlugin s3Plugin = new ProxyS3RepositoryPlugin(Settings.EMPTY); - S3Repository s3repo = createAndStartRepository(metadata, s3Plugin); + S3Repository s3repo = createAndStartRepository(metadata, s3Plugin, mock(ThreadPool.class)); AmazonS3Reference s3Ref = ((S3BlobStore) s3repo.blobStore()).clientReference()) { final AWSCredentials credentials = ((ProxyS3RepositoryPlugin.ClientAndCredentials) s3Ref.client()).credentials.getCredentials(); assertThat(credentials.getAWSAccessKeyId(), is("insecure_aws_key")); @@ -144,8 +147,8 @@ public void testRepositoryCredentialsOnly() throws IOException { + " See the breaking changes documentation for the next major version."); } - private S3Repository createAndStartRepository(RepositoryMetaData metadata, S3RepositoryPlugin s3Plugin) { - final S3Repository repository = s3Plugin.createRepository(metadata, Settings.EMPTY, NamedXContentRegistry.EMPTY); + private S3Repository createAndStartRepository(RepositoryMetaData metadata, S3RepositoryPlugin s3Plugin, ThreadPool threadPool) { + final S3Repository repository = s3Plugin.createRepository(metadata, Settings.EMPTY, NamedXContentRegistry.EMPTY, threadPool); repository.start(); return repository; } @@ -168,7 +171,7 @@ public void testReinitSecureCredentials() throws IOException { } final RepositoryMetaData metadata = new RepositoryMetaData("dummy-repo", "mock", builder.build()); try (S3RepositoryPlugin s3Plugin = new ProxyS3RepositoryPlugin(settings); - S3Repository s3repo = createAndStartRepository(metadata, s3Plugin)) { + S3Repository s3repo = createAndStartRepository(metadata, s3Plugin, mock(ThreadPool.class))) { try (AmazonS3Reference s3Ref = ((S3BlobStore) s3repo.blobStore()).clientReference()) { final AWSCredentials credentials = ((ProxyS3RepositoryPlugin.ClientAndCredentials) s3Ref.client()).credentials .getCredentials(); diff --git a/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java b/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java index 739452dc178c4..61c0328e516b7 100644 --- a/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java +++ b/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java @@ -37,6 +37,7 @@ import org.elasticsearch.rest.RestResponse; import org.elasticsearch.rest.action.admin.cluster.RestGetRepositoriesAction; import org.elasticsearch.test.rest.FakeRestRequest; +import org.elasticsearch.threadpool.ThreadPool; import org.junit.AfterClass; import org.junit.BeforeClass; @@ -114,14 +115,15 @@ public TestS3RepositoryPlugin(final Settings settings) { } @Override - public Map getRepositories(final Environment env, final NamedXContentRegistry registry) { + public Map getRepositories(final Environment env, final NamedXContentRegistry registry, + final ThreadPool threadPool) { return Collections.singletonMap(S3Repository.TYPE, - (metadata) -> new S3Repository(metadata, env.settings(), registry, new S3Service() { + metadata -> new S3Repository(metadata, env.settings(), registry, new S3Service() { @Override AmazonS3 buildClient(S3ClientSettings clientSettings) { return new MockAmazonS3(blobs, bucket, serverSideEncryption, cannedACL, storageClass); } - })); + }, threadPool)); } } diff --git a/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3RepositoryTests.java b/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3RepositoryTests.java index 36fa8b684bbb9..af04c420408ad 100644 --- a/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3RepositoryTests.java +++ b/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3RepositoryTests.java @@ -27,6 +27,7 @@ import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.repositories.RepositoryException; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.ThreadPool; import org.hamcrest.Matchers; import java.util.Map; @@ -35,6 +36,7 @@ import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.nullValue; +import static org.mockito.Mockito.mock; public class S3RepositoryTests extends ESTestCase { @@ -118,7 +120,7 @@ public void testDefaultBufferSize() { } private S3Repository createS3Repo(RepositoryMetaData metadata) { - return new S3Repository(metadata, Settings.EMPTY, NamedXContentRegistry.EMPTY, new DummyS3Service()) { + return new S3Repository(metadata, Settings.EMPTY, NamedXContentRegistry.EMPTY, new DummyS3Service(), mock(ThreadPool.class)) { @Override protected void assertSnapshotOrGenericThread() { // eliminate thread name check as we create repo manually on test/main threads diff --git a/server/src/main/java/org/elasticsearch/plugins/RepositoryPlugin.java b/server/src/main/java/org/elasticsearch/plugins/RepositoryPlugin.java index 5c15040609863..9084ed7dfaca0 100644 --- a/server/src/main/java/org/elasticsearch/plugins/RepositoryPlugin.java +++ b/server/src/main/java/org/elasticsearch/plugins/RepositoryPlugin.java @@ -25,6 +25,7 @@ import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.env.Environment; import org.elasticsearch.repositories.Repository; +import org.elasticsearch.threadpool.ThreadPool; /** * An extension point for {@link Plugin} implementations to add custom snapshot repositories. @@ -39,7 +40,8 @@ public interface RepositoryPlugin { * The key of the returned {@link Map} is the type name of the repository and * the value is a factory to construct the {@link Repository} interface. */ - default Map getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry) { + default Map getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry, + ThreadPool threadPool) { return Collections.emptyMap(); } diff --git a/server/src/main/java/org/elasticsearch/repositories/RepositoriesModule.java b/server/src/main/java/org/elasticsearch/repositories/RepositoriesModule.java index 90e3c94dfb3c5..7695ea946ea49 100644 --- a/server/src/main/java/org/elasticsearch/repositories/RepositoriesModule.java +++ b/server/src/main/java/org/elasticsearch/repositories/RepositoriesModule.java @@ -47,10 +47,10 @@ public class RepositoriesModule extends AbstractModule { public RepositoriesModule(Environment env, List repoPlugins, TransportService transportService, ClusterService clusterService, ThreadPool threadPool, NamedXContentRegistry namedXContentRegistry) { Map factories = new HashMap<>(); - factories.put(FsRepository.TYPE, (metadata) -> new FsRepository(metadata, env, namedXContentRegistry)); + factories.put(FsRepository.TYPE, metadata -> new FsRepository(metadata, env, namedXContentRegistry, threadPool)); for (RepositoryPlugin repoPlugin : repoPlugins) { - Map newRepoTypes = repoPlugin.getRepositories(env, namedXContentRegistry); + Map newRepoTypes = repoPlugin.getRepositories(env, namedXContentRegistry, threadPool); for (Map.Entry entry : newRepoTypes.entrySet()) { if (factories.put(entry.getKey(), entry.getValue()) != null) { throw new IllegalArgumentException("Repository type [" + entry.getKey() + "] is already registered"); diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index e71114533f3d1..7c96e607af8e3 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -164,6 +164,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp protected final NamedXContentRegistry namedXContentRegistry; + private final ThreadPool threadPool; + private static final int BUFFER_SIZE = 4096; private static final String SNAPSHOT_PREFIX = "snap-"; @@ -236,10 +238,11 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp * @param metadata The metadata for this repository including name and settings * @param settings Settings for the node this repository object is created on */ - protected BlobStoreRepository(RepositoryMetaData metadata, Settings settings, - NamedXContentRegistry namedXContentRegistry) { + protected BlobStoreRepository(RepositoryMetaData metadata, Settings settings, NamedXContentRegistry namedXContentRegistry, + ThreadPool threadPool) { this.settings = settings; this.metadata = metadata; + this.threadPool = threadPool; this.namedXContentRegistry = namedXContentRegistry; this.compress = COMPRESS_SETTING.get(metadata.settings()); snapshotRateLimiter = getRateLimiter(metadata.settings(), "max_snapshot_bytes_per_sec", new ByteSizeValue(40, ByteSizeUnit.MB)); diff --git a/server/src/main/java/org/elasticsearch/repositories/fs/FsRepository.java b/server/src/main/java/org/elasticsearch/repositories/fs/FsRepository.java index e3e986c1eca9a..c2144ca1a9f0f 100644 --- a/server/src/main/java/org/elasticsearch/repositories/fs/FsRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/fs/FsRepository.java @@ -32,6 +32,7 @@ import org.elasticsearch.env.Environment; import org.elasticsearch.repositories.RepositoryException; import org.elasticsearch.repositories.blobstore.BlobStoreRepository; +import org.elasticsearch.threadpool.ThreadPool; import java.nio.file.Path; import java.util.function.Function; @@ -70,9 +71,9 @@ public class FsRepository extends BlobStoreRepository { /** * Constructs a shared file system repository. */ - public FsRepository(RepositoryMetaData metadata, Environment environment, - NamedXContentRegistry namedXContentRegistry) { - super(metadata, environment.settings(), namedXContentRegistry); + public FsRepository(RepositoryMetaData metadata, Environment environment, NamedXContentRegistry namedXContentRegistry, + ThreadPool threadPool) { + super(metadata, environment.settings(), namedXContentRegistry, threadPool); this.environment = environment; String location = REPOSITORIES_LOCATION_SETTING.get(metadata.settings()); if (location.isEmpty()) { diff --git a/server/src/test/java/org/elasticsearch/repositories/RepositoriesModuleTests.java b/server/src/test/java/org/elasticsearch/repositories/RepositoriesModuleTests.java index 96a9670d16202..0fc9974f534f8 100644 --- a/server/src/test/java/org/elasticsearch/repositories/RepositoriesModuleTests.java +++ b/server/src/test/java/org/elasticsearch/repositories/RepositoriesModuleTests.java @@ -43,12 +43,14 @@ public class RepositoriesModuleTests extends ESTestCase { private RepositoryPlugin plugin1; private RepositoryPlugin plugin2; private Repository.Factory factory; + private ThreadPool threadPool; @Override public void setUp() throws Exception { super.setUp(); environment = mock(Environment.class); contentRegistry = mock(NamedXContentRegistry.class); + threadPool = mock(ThreadPool.class); plugin1 = mock(RepositoryPlugin.class); plugin2 = mock(RepositoryPlugin.class); factory = mock(Repository.Factory.class); @@ -58,8 +60,8 @@ public void setUp() throws Exception { } public void testCanRegisterTwoRepositoriesWithDifferentTypes() { - when(plugin1.getRepositories(environment, contentRegistry)).thenReturn(Collections.singletonMap("type1", factory)); - when(plugin2.getRepositories(environment, contentRegistry)).thenReturn(Collections.singletonMap("type2", factory)); + when(plugin1.getRepositories(environment, contentRegistry, threadPool)).thenReturn(Collections.singletonMap("type1", factory)); + when(plugin2.getRepositories(environment, contentRegistry, threadPool)).thenReturn(Collections.singletonMap("type2", factory)); // Would throw new RepositoriesModule(environment, repoPlugins, mock(TransportService.class), mock(ClusterService.class), @@ -67,8 +69,8 @@ public void testCanRegisterTwoRepositoriesWithDifferentTypes() { } public void testCannotRegisterTwoRepositoriesWithSameTypes() { - when(plugin1.getRepositories(environment, contentRegistry)).thenReturn(Collections.singletonMap("type1", factory)); - when(plugin2.getRepositories(environment, contentRegistry)).thenReturn(Collections.singletonMap("type1", factory)); + when(plugin1.getRepositories(environment, contentRegistry, threadPool)).thenReturn(Collections.singletonMap("type1", factory)); + when(plugin2.getRepositories(environment, contentRegistry, threadPool)).thenReturn(Collections.singletonMap("type1", factory)); IllegalArgumentException ex = expectThrows(IllegalArgumentException.class, () -> new RepositoriesModule(environment, repoPlugins, mock(TransportService.class), mock(ClusterService.class), @@ -89,7 +91,7 @@ public void testCannotRegisterTwoInternalRepositoriesWithSameTypes() { } public void testCannotRegisterNormalAndInternalRepositoriesWithSameTypes() { - when(plugin1.getRepositories(environment, contentRegistry)).thenReturn(Collections.singletonMap("type1", factory)); + when(plugin1.getRepositories(environment, contentRegistry, threadPool)).thenReturn(Collections.singletonMap("type1", factory)); when(plugin2.getInternalRepositories(environment, contentRegistry)).thenReturn(Collections.singletonMap("type1", factory)); IllegalArgumentException ex = expectThrows(IllegalArgumentException.class, diff --git a/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryRestoreTests.java b/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryRestoreTests.java index 1b59f558db584..a904879321d58 100644 --- a/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryRestoreTests.java +++ b/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryRestoreTests.java @@ -180,7 +180,7 @@ public void testSnapshotWithConflictingName() throws IOException { private Repository createRepository() { Settings settings = Settings.builder().put("location", randomAlphaOfLength(10)).build(); RepositoryMetaData repositoryMetaData = new RepositoryMetaData(randomAlphaOfLength(10), FsRepository.TYPE, settings); - final FsRepository repository = new FsRepository(repositoryMetaData, createEnvironment(), xContentRegistry()) { + final FsRepository repository = new FsRepository(repositoryMetaData, createEnvironment(), xContentRegistry(), threadPool) { @Override protected void assertSnapshotOrGenericThread() { // eliminate thread name check as we create repo manually diff --git a/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryTests.java b/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryTests.java index 2cad99c48bc47..e2cb2fd071a11 100644 --- a/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryTests.java +++ b/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryTests.java @@ -39,6 +39,7 @@ import org.elasticsearch.snapshots.SnapshotState; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.ESSingleNodeTestCase; +import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; import java.nio.file.Path; @@ -66,12 +67,13 @@ protected Collection> getPlugins() { } // the reason for this plug-in is to drop any assertSnapshotOrGenericThread as mostly all access in this test goes from test threads - public static class FsLikeRepoPlugin extends org.elasticsearch.plugins.Plugin implements RepositoryPlugin { + public static class FsLikeRepoPlugin extends Plugin implements RepositoryPlugin { @Override - public Map getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry) { + public Map getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry, + ThreadPool threadPool) { return Collections.singletonMap(REPO_TYPE, - (metadata) -> new FsRepository(metadata, env, namedXContentRegistry) { + (metadata) -> new FsRepository(metadata, env, namedXContentRegistry, threadPool) { @Override protected void assertSnapshotOrGenericThread() { // eliminate thread name check as we access blobStore on test/main threads diff --git a/server/src/test/java/org/elasticsearch/snapshots/MetadataLoadingDuringSnapshotRestoreIT.java b/server/src/test/java/org/elasticsearch/snapshots/MetadataLoadingDuringSnapshotRestoreIT.java index 13b74df4e3d2b..040f12c956696 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/MetadataLoadingDuringSnapshotRestoreIT.java +++ b/server/src/test/java/org/elasticsearch/snapshots/MetadataLoadingDuringSnapshotRestoreIT.java @@ -35,6 +35,7 @@ import org.elasticsearch.repositories.Repository; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.snapshots.mockstore.MockRepository; +import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; import java.util.Collection; @@ -187,8 +188,8 @@ public static class CountingMockRepository extends MockRepository { public CountingMockRepository(final RepositoryMetaData metadata, final Environment environment, - final NamedXContentRegistry namedXContentRegistry) throws IOException { - super(metadata, environment, namedXContentRegistry); + final NamedXContentRegistry namedXContentRegistry, ThreadPool threadPool) { + super(metadata, environment, namedXContentRegistry, threadPool); } @Override @@ -207,8 +208,10 @@ public IndexMetaData getSnapshotIndexMetaData(SnapshotId snapshotId, IndexId ind /** A plugin that uses CountingMockRepository as implementation of the Repository **/ public static class CountingMockRepositoryPlugin extends MockRepository.Plugin { @Override - public Map getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry) { - return Collections.singletonMap("coutingmock", (metadata) -> new CountingMockRepository(metadata, env, namedXContentRegistry)); + public Map getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry, + ThreadPool threadPool) { + return Collections.singletonMap("coutingmock", + metadata -> new CountingMockRepository(metadata, env, namedXContentRegistry, threadPool)); } } } diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java index 9c1d256b552bd..43d0e89f86196 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java @@ -809,7 +809,7 @@ public void onFailure(final Exception e) { repositoriesService = new RepositoriesService( settings, clusterService, transportService, Collections.singletonMap(FsRepository.TYPE, metaData -> { - final Repository repository = new FsRepository(metaData, environment, xContentRegistry()) { + final Repository repository = new FsRepository(metaData, environment, xContentRegistry(), threadPool) { @Override protected void assertSnapshotOrGenericThread() { // eliminate thread name check as we create repo in the test thread diff --git a/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockRepository.java b/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockRepository.java index 8a49324757f27..9ce111e1d3011 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockRepository.java +++ b/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockRepository.java @@ -42,6 +42,7 @@ import org.elasticsearch.repositories.Repository; import org.elasticsearch.repositories.fs.FsRepository; import org.elasticsearch.snapshots.SnapshotId; +import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; import java.io.InputStream; @@ -69,8 +70,9 @@ public static class Plugin extends org.elasticsearch.plugins.Plugin implements R @Override - public Map getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry) { - return Collections.singletonMap("mock", (metadata) -> new MockRepository(metadata, env, namedXContentRegistry)); + public Map getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry, + ThreadPool threadPool) { + return Collections.singletonMap("mock", (metadata) -> new MockRepository(metadata, env, namedXContentRegistry, threadPool)); } @Override @@ -113,8 +115,8 @@ public long getFailureCount() { private volatile boolean blocked = false; public MockRepository(RepositoryMetaData metadata, Environment environment, - NamedXContentRegistry namedXContentRegistry) throws IOException { - super(overrideSettings(metadata, environment), environment, namedXContentRegistry); + NamedXContentRegistry namedXContentRegistry, ThreadPool threadPool) { + super(overrideSettings(metadata, environment), environment, namedXContentRegistry, threadPool); randomControlIOExceptionRate = metadata.settings().getAsDouble("random_control_io_exception_rate", 0.0); randomDataFileIOExceptionRate = metadata.settings().getAsDouble("random_data_file_io_exception_rate", 0.0); useLuceneCorruptionException = metadata.settings().getAsBoolean("use_lucene_corruption", false); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackPlugin.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackPlugin.java index bc861b3904f96..1a26e8aa88c17 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackPlugin.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackPlugin.java @@ -358,7 +358,8 @@ default Optional getRequiredFeature() { } @Override - public Map getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry) { + public Map getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry, + ThreadPool threadPool) { return Collections.singletonMap("source", SourceOnlySnapshotRepository.newRepositoryFactory()); } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotIT.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotIT.java index 00b199eef4419..81be978d33103 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotIT.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotIT.java @@ -37,6 +37,7 @@ import org.elasticsearch.search.slice.SliceBuilder; import org.elasticsearch.search.sort.SortOrder; import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.threadpool.ThreadPool; import org.hamcrest.Matchers; import java.io.IOException; @@ -72,7 +73,8 @@ protected Collection> getMockPlugins() { public static final class MyPlugin extends Plugin implements RepositoryPlugin, EnginePlugin { @Override - public Map getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry) { + public Map getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry, + ThreadPool threadPool) { return Collections.singletonMap("source", SourceOnlySnapshotRepository.newRepositoryFactory()); } @Override diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotShardTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotShardTests.java index ec1f002d05ba8..6a37e8265c096 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotShardTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotShardTests.java @@ -328,7 +328,7 @@ private Environment createEnvironment() { private Repository createRepository() throws IOException { Settings settings = Settings.builder().put("location", randomAlphaOfLength(10)).build(); RepositoryMetaData repositoryMetaData = new RepositoryMetaData(randomAlphaOfLength(10), FsRepository.TYPE, settings); - return new FsRepository(repositoryMetaData, createEnvironment(), xContentRegistry()); + return new FsRepository(repositoryMetaData, createEnvironment(), xContentRegistry(), threadPool); } private static void runAsSnapshot(ThreadPool pool, Runnable runnable) { diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/LocalStateCompositeXPackPlugin.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/LocalStateCompositeXPackPlugin.java index 1dd07a5df81ff..909434ae99763 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/LocalStateCompositeXPackPlugin.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/LocalStateCompositeXPackPlugin.java @@ -395,9 +395,10 @@ public List> getPersistentTasksExecutor(ClusterServic } @Override - public Map getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry) { - HashMap repositories = new HashMap<>(super.getRepositories(env, namedXContentRegistry)); - filterPlugins(RepositoryPlugin.class).forEach(r -> repositories.putAll(r.getRepositories(env, namedXContentRegistry))); + public Map getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry, + ThreadPool threadPool) { + HashMap repositories = new HashMap<>(super.getRepositories(env, namedXContentRegistry, threadPool)); + filterPlugins(RepositoryPlugin.class).forEach(r -> repositories.putAll(r.getRepositories(env, namedXContentRegistry, threadPool))); return repositories; } From 18cd7856cacf18101d191dee8dd1e50d8faf32a0 Mon Sep 17 00:00:00 2001 From: Armin Date: Sun, 17 Mar 2019 19:59:40 +0100 Subject: [PATCH 02/21] async delete api --- .../repositories/FilterRepository.java | 5 +++-- .../elasticsearch/repositories/Repository.java | 3 ++- .../blobstore/BlobStoreRepository.java | 3 ++- .../snapshots/SnapshotsService.java | 17 +++++++++-------- .../repositories/RepositoriesServiceTests.java | 3 ++- .../index/shard/RestoreOnlyRepository.java | 3 ++- .../xpack/ccr/repository/CcrRepository.java | 2 +- 7 files changed, 21 insertions(+), 15 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/repositories/FilterRepository.java b/server/src/main/java/org/elasticsearch/repositories/FilterRepository.java index 4e8e9b6c7f569..afc38bda86c5b 100644 --- a/server/src/main/java/org/elasticsearch/repositories/FilterRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/FilterRepository.java @@ -20,6 +20,7 @@ import org.apache.lucene.index.IndexCommit; import org.elasticsearch.Version; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.metadata.RepositoryMetaData; @@ -84,8 +85,8 @@ public SnapshotInfo finalizeSnapshot(SnapshotId snapshotId, List indice } @Override - public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId) { - in.deleteSnapshot(snapshotId, repositoryStateId); + public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, ActionListener listener) { + in.deleteSnapshot(snapshotId, repositoryStateId, listener); } @Override diff --git a/server/src/main/java/org/elasticsearch/repositories/Repository.java b/server/src/main/java/org/elasticsearch/repositories/Repository.java index 1ca6f5e148510..78991d4174b80 100644 --- a/server/src/main/java/org/elasticsearch/repositories/Repository.java +++ b/server/src/main/java/org/elasticsearch/repositories/Repository.java @@ -20,6 +20,7 @@ import org.apache.lucene.index.IndexCommit; import org.elasticsearch.Version; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.metadata.RepositoryMetaData; @@ -141,7 +142,7 @@ SnapshotInfo finalizeSnapshot(SnapshotId snapshotId, List indices, long * @param snapshotId snapshot id * @param repositoryStateId the unique id identifying the state of the repository when the snapshot deletion began */ - void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId); + void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, ActionListener listener); /** * Returns snapshot throttle time in nanoseconds diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index 7c96e607af8e3..192ec588cdee9 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -33,6 +33,7 @@ import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.Version; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.metadata.RepositoryMetaData; @@ -414,7 +415,7 @@ public void initializeSnapshot(SnapshotId snapshotId, List indices, Met } @Override - public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId) { + public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, ActionListener listener) { if (isReadOnly()) { throw new RepositoryException(metadata.name(), "cannot delete snapshot from a readonly repository"); } diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java index 7ba53cb5d1e1c..d21f27c4f195c 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java @@ -27,6 +27,7 @@ import org.apache.lucene.util.CollectionUtil; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRunnable; import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotRequest; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterState; @@ -1308,15 +1309,15 @@ public static boolean isRepositoryInUse(ClusterState clusterState, String reposi * @param repositoryStateId the unique id representing the state of the repository at the time the deletion began */ private void deleteSnapshotFromRepository(Snapshot snapshot, @Nullable ActionListener listener, long repositoryStateId) { - threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(() -> { - try { + threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(new ActionRunnable(listener) { + @Override + protected void doRun() { Repository repository = repositoriesService.repository(snapshot.getRepository()); - repository.deleteSnapshot(snapshot.getSnapshotId(), repositoryStateId); - logger.info("snapshot [{}] deleted", snapshot); - - removeSnapshotDeletionFromClusterState(snapshot, null, listener); - } catch (Exception ex) { - removeSnapshotDeletionFromClusterState(snapshot, ex, listener); + repository.deleteSnapshot(snapshot.getSnapshotId(), repositoryStateId, ActionListener.wrap(v -> { + logger.info("snapshot [{}] deleted", snapshot); + removeSnapshotDeletionFromClusterState(snapshot, null, listener); + }, ex -> removeSnapshotDeletionFromClusterState(snapshot, ex, listener) + )); } }); } diff --git a/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java b/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java index c02ab0d185610..ddec403102d31 100644 --- a/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java +++ b/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java @@ -21,6 +21,7 @@ import org.apache.lucene.index.IndexCommit; import org.elasticsearch.Version; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.metadata.RepositoryMetaData; @@ -149,7 +150,7 @@ public SnapshotInfo finalizeSnapshot(SnapshotId snapshotId, List indice } @Override - public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId) { + public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, ActionListener listener) { } diff --git a/test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java b/test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java index 11bdfb7bcc741..4cc11e0f046b4 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java +++ b/test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java @@ -20,6 +20,7 @@ import org.apache.lucene.index.IndexCommit; import org.elasticsearch.Version; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.metadata.RepositoryMetaData; @@ -103,7 +104,7 @@ public SnapshotInfo finalizeSnapshot(SnapshotId snapshotId, List indice } @Override - public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId) { + public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, ActionListener listener) { } @Override diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java index 0b445a3eb01ef..5a0472339c192 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java @@ -260,7 +260,7 @@ public SnapshotInfo finalizeSnapshot(SnapshotId snapshotId, List indice } @Override - public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId) { + public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, ActionListener listener) { throw new UnsupportedOperationException("Unsupported for repository of type: " + TYPE); } From 05ffbad7b1b5bb50f0badc62bb56086410bd96de Mon Sep 17 00:00:00 2001 From: Armin Date: Sun, 17 Mar 2019 21:38:46 +0100 Subject: [PATCH 03/21] still fails --- .../blobstore/BlobStoreRepository.java | 120 +++++++++++------- .../snapshots/SnapshotsService.java | 3 + .../repositories/RepositoriesModuleTests.java | 10 +- 3 files changed, 80 insertions(+), 53 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index 192ec588cdee9..37f6977c6089c 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -110,6 +110,8 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import static org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.FileInfo.canonicalName; @@ -425,56 +427,88 @@ public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, Action try { snapshot = getSnapshotInfo(snapshotId); } catch (SnapshotMissingException ex) { - throw ex; + listener.onFailure(ex); + return; } catch (IllegalStateException | SnapshotException | ElasticsearchParseException ex) { logger.warn(() -> new ParameterizedMessage("cannot read snapshot file [{}]", snapshotId), ex); } + // Delete snapshot from the index file, since it is the maintainer of truth of active snapshots + final RepositoryData updatedRepositoryData = repositoryData.removeSnapshot(snapshotId); try { - // Delete snapshot from the index file, since it is the maintainer of truth of active snapshots - final RepositoryData updatedRepositoryData = repositoryData.removeSnapshot(snapshotId); writeIndexGen(updatedRepositoryData, repositoryStateId); + } catch (IOException | ResourceNotFoundException ex) { + listener.onFailure(new RepositoryException(metadata.name(), "failed to delete snapshot [" + snapshotId + "]", ex)); + return; + } + deleteSnapshotBlobs(snapshot, snapshotId, repositoryData, updatedRepositoryData, listener); + } + private void deleteSnapshotBlobs(SnapshotInfo snapshot, SnapshotId snapshotId, RepositoryData repositoryData, + RepositoryData updatedRepositoryData, ActionListener listener) { + final Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT); + final AtomicInteger outstanding = new AtomicInteger(2); + final ActionListener deleteListener = ActionListener.wrap(() -> { + if (outstanding.decrementAndGet() == 0) { + deleteIndices(snapshot, repositoryData, snapshotId, updatedRepositoryData, listener); + } + }); + executor.execute(() -> { // delete the snapshot file deleteSnapshotBlobIgnoringErrors(snapshot, snapshotId.getUUID()); + deleteListener.onResponse(null); + }); + executor.execute(() -> { // delete the global metadata file deleteGlobalMetaDataBlobIgnoringErrors(snapshot, snapshotId.getUUID()); + deleteListener.onResponse(null); + }); + } - // Now delete all indices - if (snapshot != null) { - final List indices = snapshot.indices(); - for (String index : indices) { - final IndexId indexId = repositoryData.resolveIndexId(index); + private void deleteIndices(SnapshotInfo snapshot, RepositoryData repositoryData, SnapshotId snapshotId, + RepositoryData updatedRepositoryData, ActionListener listener) { + // Now delete all indices + if (snapshot != null) { + final List indices = snapshot.indices(); + for (String index : indices) { + final IndexId indexId = repositoryData.resolveIndexId(index); - IndexMetaData indexMetaData = null; - try { - indexMetaData = getSnapshotIndexMetaData(snapshotId, indexId); - } catch (ElasticsearchParseException | IOException ex) { - logger.warn(() -> - new ParameterizedMessage("[{}] [{}] failed to read metadata for index", snapshotId, index), ex); - } + IndexMetaData indexMetaData = null; + try { + indexMetaData = getSnapshotIndexMetaData(snapshotId, indexId); + } catch (ElasticsearchParseException | IOException ex) { + logger.warn(() -> + new ParameterizedMessage("[{}] [{}] failed to read metadata for index", snapshotId, index), ex); + } - deleteIndexMetaDataBlobIgnoringErrors(snapshot, indexId); + deleteIndexMetaDataBlobIgnoringErrors(snapshot, indexId); - if (indexMetaData != null) { - for (int shardId = 0; shardId < indexMetaData.getNumberOfShards(); shardId++) { - try { - delete(snapshotId, indexId, new ShardId(indexMetaData.getIndex(), shardId)); - } catch (SnapshotException ex) { - final int finalShardId = shardId; - logger.warn(() -> new ParameterizedMessage("[{}] failed to delete shard data for shard [{}][{}]", - snapshotId, index, finalShardId), ex); - } + if (indexMetaData != null) { + for (int shardId = 0; shardId < indexMetaData.getNumberOfShards(); shardId++) { + try { + final ShardId sid = new ShardId(indexMetaData.getIndex(), shardId); + new Context(snapshotId, indexId, sid, sid).delete(); + } catch (SnapshotException ex) { + final int finalShardId = shardId; + logger.warn(() -> new ParameterizedMessage("[{}] failed to delete shard data for shard [{}][{}]", + snapshotId, index, finalShardId), ex); } } } } + } - // cleanup indices that are no longer part of the repository - final Collection indicesToCleanUp = Sets.newHashSet(repositoryData.getIndices().values()); - indicesToCleanUp.removeAll(updatedRepositoryData.getIndices().values()); - final BlobContainer indicesBlobContainer = blobStore().blobContainer(basePath().add("indices")); - for (final IndexId indexId : indicesToCleanUp) { + // cleanup indices that are no longer part of the repository + final Collection indicesToCleanUp = Sets.newHashSet(repositoryData.getIndices().values()); + indicesToCleanUp.removeAll(updatedRepositoryData.getIndices().values()); + final BlobContainer indicesBlobContainer = blobStore().blobContainer(basePath().add("indices")); + if (indicesToCleanUp.isEmpty()) { + listener.onResponse(null); + return; + } + final AtomicInteger outstanding = new AtomicInteger(indicesToCleanUp.size()); + for (final IndexId indexId : indicesToCleanUp) { + threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(() -> { try { indicesBlobContainer.deleteBlob(indexId.getId()); } catch (DirectoryNotEmptyException dnee) { @@ -482,16 +516,17 @@ public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, Action // we'll ignore that and accept that cleanup didn't fully succeed. // since we are using UUIDs for path names, this won't be an issue for // snapshotting indices of the same name - logger.debug(() -> new ParameterizedMessage("[{}] index [{}] no longer part of any snapshots in the repository, " + + logger.error(() -> new ParameterizedMessage("[{}] index [{}] no longer part of any snapshots in the repository, " + "but failed to clean up its index folder due to the directory not being empty.", metadata.name(), indexId), dnee); - } catch (IOException ioe) { + } catch (Exception e) { // a different IOException occurred while trying to delete - will just log the issue for now - logger.debug(() -> new ParameterizedMessage("[{}] index [{}] no longer part of any snapshots in the repository, " + - "but failed to clean up its index folder.", metadata.name(), indexId), ioe); + logger.error(() -> new ParameterizedMessage("[{}] index [{}] no longer part of any snapshots in the repository, " + + "but failed to clean up its index folder.", metadata.name(), indexId), e); } - } - } catch (IOException | ResourceNotFoundException ex) { - throw new RepositoryException(metadata.name(), "failed to delete snapshot [" + snapshotId + "]", ex); + if (outstanding.decrementAndGet() == 0) { + listener.onResponse(null); + } + }); } } @@ -920,17 +955,6 @@ public void verify(String seed, DiscoveryNode localNode) { } } - /** - * Delete shard snapshot - * - * @param snapshotId snapshot id - * @param shardId shard id - */ - private void delete(SnapshotId snapshotId, IndexId indexId, ShardId shardId) { - Context context = new Context(snapshotId, indexId, shardId, shardId); - context.delete(); - } - @Override public String toString() { return "BlobStoreRepository[" + diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java index d21f27c4f195c..149497cb0119a 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java @@ -1309,6 +1309,9 @@ public static boolean isRepositoryInUse(ClusterState clusterState, String reposi * @param repositoryStateId the unique id representing the state of the repository at the time the deletion began */ private void deleteSnapshotFromRepository(Snapshot snapshot, @Nullable ActionListener listener, long repositoryStateId) { + if (listener == null) { + listener = ActionListener.wrap(() -> {}); + } threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(new ActionRunnable(listener) { @Override protected void doRun() { diff --git a/server/src/test/java/org/elasticsearch/repositories/RepositoriesModuleTests.java b/server/src/test/java/org/elasticsearch/repositories/RepositoriesModuleTests.java index 0fc9974f534f8..767ecc38b18db 100644 --- a/server/src/test/java/org/elasticsearch/repositories/RepositoriesModuleTests.java +++ b/server/src/test/java/org/elasticsearch/repositories/RepositoriesModuleTests.java @@ -64,8 +64,8 @@ public void testCanRegisterTwoRepositoriesWithDifferentTypes() { when(plugin2.getRepositories(environment, contentRegistry, threadPool)).thenReturn(Collections.singletonMap("type2", factory)); // Would throw - new RepositoriesModule(environment, repoPlugins, mock(TransportService.class), mock(ClusterService.class), - mock(ThreadPool.class), contentRegistry); + new RepositoriesModule( + environment, repoPlugins, mock(TransportService.class), mock(ClusterService.class), threadPool, contentRegistry); } public void testCannotRegisterTwoRepositoriesWithSameTypes() { @@ -74,7 +74,7 @@ public void testCannotRegisterTwoRepositoriesWithSameTypes() { IllegalArgumentException ex = expectThrows(IllegalArgumentException.class, () -> new RepositoriesModule(environment, repoPlugins, mock(TransportService.class), mock(ClusterService.class), - mock(ThreadPool.class), contentRegistry)); + threadPool, contentRegistry)); assertEquals("Repository type [type1] is already registered", ex.getMessage()); } @@ -85,7 +85,7 @@ public void testCannotRegisterTwoInternalRepositoriesWithSameTypes() { IllegalArgumentException ex = expectThrows(IllegalArgumentException.class, () -> new RepositoriesModule(environment, repoPlugins, mock(TransportService.class), mock(ClusterService.class), - mock(ThreadPool.class), contentRegistry)); + threadPool, contentRegistry)); assertEquals("Internal repository type [type1] is already registered", ex.getMessage()); } @@ -96,7 +96,7 @@ public void testCannotRegisterNormalAndInternalRepositoriesWithSameTypes() { IllegalArgumentException ex = expectThrows(IllegalArgumentException.class, () -> new RepositoriesModule(environment, repoPlugins, mock(TransportService.class), mock(ClusterService.class), - mock(ThreadPool.class), contentRegistry)); + threadPool, contentRegistry)); assertEquals("Internal repository type [type1] is already registered as a non-internal repository", ex.getMessage()); } From 8f5081eed46d331f2366e0f8a7e64cf4c1e2fdc5 Mon Sep 17 00:00:00 2001 From: Armin Date: Mon, 18 Mar 2019 07:44:27 +0100 Subject: [PATCH 04/21] tests pass --- .../blobstore/BlobStoreRepository.java | 47 ++++++++++--------- 1 file changed, 24 insertions(+), 23 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index 37f6977c6089c..3dfd4fd9aad4c 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -31,7 +31,6 @@ import org.apache.lucene.util.SetOnce; import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.ExceptionsHelper; -import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.metadata.IndexMetaData; @@ -419,29 +418,30 @@ public void initializeSnapshot(SnapshotId snapshotId, List indices, Met @Override public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, ActionListener listener) { if (isReadOnly()) { - throw new RepositoryException(metadata.name(), "cannot delete snapshot from a readonly repository"); - } - - final RepositoryData repositoryData = getRepositoryData(); - SnapshotInfo snapshot = null; - try { - snapshot = getSnapshotInfo(snapshotId); - } catch (SnapshotMissingException ex) { - listener.onFailure(ex); - return; - } catch (IllegalStateException | SnapshotException | ElasticsearchParseException ex) { - logger.warn(() -> new ParameterizedMessage("cannot read snapshot file [{}]", snapshotId), ex); - } - - // Delete snapshot from the index file, since it is the maintainer of truth of active snapshots - final RepositoryData updatedRepositoryData = repositoryData.removeSnapshot(snapshotId); - try { - writeIndexGen(updatedRepositoryData, repositoryStateId); - } catch (IOException | ResourceNotFoundException ex) { - listener.onFailure(new RepositoryException(metadata.name(), "failed to delete snapshot [" + snapshotId + "]", ex)); - return; + listener.onFailure(new RepositoryException(metadata.name(), "cannot delete snapshot from a readonly repository")); + } else { + SnapshotInfo snapshot = null; + try { + snapshot = getSnapshotInfo(snapshotId); + } catch (SnapshotMissingException ex) { + listener.onFailure(ex); + return; + } catch (IllegalStateException | SnapshotException | ElasticsearchParseException ex) { + logger.warn(() -> new ParameterizedMessage("cannot read snapshot file [{}]", snapshotId), ex); + } + // Delete snapshot from the index file, since it is the maintainer of truth of active snapshots + final RepositoryData repositoryData; + final RepositoryData updatedRepositoryData; + try { + repositoryData = getRepositoryData(); + updatedRepositoryData = repositoryData.removeSnapshot(snapshotId); + writeIndexGen(updatedRepositoryData, repositoryStateId); + } catch (Exception ex) { + listener.onFailure(new RepositoryException(metadata.name(), "failed to delete snapshot [" + snapshotId + "]", ex)); + return; + } + deleteSnapshotBlobs(snapshot, snapshotId, repositoryData, updatedRepositoryData, listener); } - deleteSnapshotBlobs(snapshot, snapshotId, repositoryData, updatedRepositoryData, listener); } private void deleteSnapshotBlobs(SnapshotInfo snapshot, SnapshotId snapshotId, RepositoryData repositoryData, @@ -503,6 +503,7 @@ private void deleteIndices(SnapshotInfo snapshot, RepositoryData repositoryData, indicesToCleanUp.removeAll(updatedRepositoryData.getIndices().values()); final BlobContainer indicesBlobContainer = blobStore().blobContainer(basePath().add("indices")); if (indicesToCleanUp.isEmpty()) { + // We're done, no indices to clean up listener.onResponse(null); return; } From a1b3a8a3d340f9fa346536a3849a53defabb2ae9 Mon Sep 17 00:00:00 2001 From: Armin Date: Mon, 18 Mar 2019 08:44:13 +0100 Subject: [PATCH 05/21] tests pass --- .../blobstore/BlobStoreRepository.java | 56 +++++++++++-------- .../snapshots/SnapshotsService.java | 3 - .../RepositoriesServiceTests.java | 2 +- .../index/shard/RestoreOnlyRepository.java | 1 + 4 files changed, 36 insertions(+), 26 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index 3dfd4fd9aad4c..2b8f20e116db8 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -470,34 +470,46 @@ private void deleteIndices(SnapshotInfo snapshot, RepositoryData repositoryData, // Now delete all indices if (snapshot != null) { final List indices = snapshot.indices(); + if (indices.isEmpty()) { + deleteUnreferencedIndices(repositoryData, updatedRepositoryData, listener); + return; + } + final AtomicInteger outstandingIndices = new AtomicInteger(indices.size()); for (String index : indices) { - final IndexId indexId = repositoryData.resolveIndexId(index); - - IndexMetaData indexMetaData = null; - try { - indexMetaData = getSnapshotIndexMetaData(snapshotId, indexId); - } catch (ElasticsearchParseException | IOException ex) { - logger.warn(() -> - new ParameterizedMessage("[{}] [{}] failed to read metadata for index", snapshotId, index), ex); - } - - deleteIndexMetaDataBlobIgnoringErrors(snapshot, indexId); - - if (indexMetaData != null) { - for (int shardId = 0; shardId < indexMetaData.getNumberOfShards(); shardId++) { - try { - final ShardId sid = new ShardId(indexMetaData.getIndex(), shardId); - new Context(snapshotId, indexId, sid, sid).delete(); - } catch (SnapshotException ex) { - final int finalShardId = shardId; - logger.warn(() -> new ParameterizedMessage("[{}] failed to delete shard data for shard [{}][{}]", - snapshotId, index, finalShardId), ex); + threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(() -> { + final IndexId indexId = repositoryData.resolveIndexId(index); + IndexMetaData indexMetaData = null; + try { + indexMetaData = getSnapshotIndexMetaData(snapshotId, indexId); + } catch (Exception ex) { + logger.warn(() -> + new ParameterizedMessage("[{}] [{}] failed to read metadata for index", snapshotId, index), ex); + } + deleteIndexMetaDataBlobIgnoringErrors(snapshot, indexId); + if (indexMetaData != null) { + for (int shardId = 0; shardId < indexMetaData.getNumberOfShards(); shardId++) { + try { + final ShardId sid = new ShardId(indexMetaData.getIndex(), shardId); + new Context(snapshotId, indexId, sid, sid).delete(); + } catch (SnapshotException ex) { + final int finalShardId = shardId; + logger.warn(() -> new ParameterizedMessage("[{}] failed to delete shard data for shard [{}][{}]", + snapshotId, index, finalShardId), ex); + } } } - } + if (outstandingIndices.decrementAndGet() == 0) { + deleteUnreferencedIndices(repositoryData, updatedRepositoryData, listener); + } + }); } + } else { + deleteUnreferencedIndices(repositoryData, updatedRepositoryData, listener); } + } + private void deleteUnreferencedIndices(RepositoryData repositoryData, RepositoryData updatedRepositoryData, + ActionListener listener) { // cleanup indices that are no longer part of the repository final Collection indicesToCleanUp = Sets.newHashSet(repositoryData.getIndices().values()); indicesToCleanUp.removeAll(updatedRepositoryData.getIndices().values()); diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java index 149497cb0119a..d21f27c4f195c 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java @@ -1309,9 +1309,6 @@ public static boolean isRepositoryInUse(ClusterState clusterState, String reposi * @param repositoryStateId the unique id representing the state of the repository at the time the deletion began */ private void deleteSnapshotFromRepository(Snapshot snapshot, @Nullable ActionListener listener, long repositoryStateId) { - if (listener == null) { - listener = ActionListener.wrap(() -> {}); - } threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(new ActionRunnable(listener) { @Override protected void doRun() { diff --git a/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java b/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java index ddec403102d31..981004f48efe8 100644 --- a/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java +++ b/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java @@ -151,7 +151,7 @@ public SnapshotInfo finalizeSnapshot(SnapshotId snapshotId, List indice @Override public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, ActionListener listener) { - + listener.onResponse(null); } @Override diff --git a/test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java b/test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java index 4cc11e0f046b4..bc60b4c194622 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java +++ b/test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java @@ -105,6 +105,7 @@ public SnapshotInfo finalizeSnapshot(SnapshotId snapshotId, List indice @Override public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, ActionListener listener) { + listener.onResponse(null); } @Override From 305b5009acfb8c0a1a95424e5e7948598ded7d52 Mon Sep 17 00:00:00 2001 From: Armin Date: Mon, 18 Mar 2019 08:46:18 +0100 Subject: [PATCH 06/21] shorter --- .../repositories/url/URLRepositoryTests.java | 19 ++----------------- 1 file changed, 2 insertions(+), 17 deletions(-) diff --git a/modules/repository-url/src/test/java/org/elasticsearch/repositories/url/URLRepositoryTests.java b/modules/repository-url/src/test/java/org/elasticsearch/repositories/url/URLRepositoryTests.java index 001ca992d5390..96a82ee0b9d24 100644 --- a/modules/repository-url/src/test/java/org/elasticsearch/repositories/url/URLRepositoryTests.java +++ b/modules/repository-url/src/test/java/org/elasticsearch/repositories/url/URLRepositoryTests.java @@ -26,37 +26,22 @@ import org.elasticsearch.env.TestEnvironment; import org.elasticsearch.repositories.RepositoryException; import org.elasticsearch.test.ESTestCase; -import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; import java.nio.file.Path; import java.util.Collections; -import java.util.concurrent.TimeUnit; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.CoreMatchers.not; import static org.hamcrest.CoreMatchers.nullValue; +import static org.mockito.Mockito.mock; public class URLRepositoryTests extends ESTestCase { - private ThreadPool threadPool; - - @Override - public void setUp() throws Exception { - super.setUp(); - threadPool = new TestThreadPool("URLRepositoryTests"); - } - - @Override - public void tearDown() throws Exception { - super.tearDown(); - ThreadPool.terminate(threadPool, 1L, TimeUnit.MINUTES); - } - private URLRepository createRepository(Settings baseSettings, RepositoryMetaData repositoryMetaData) { return new URLRepository(repositoryMetaData, TestEnvironment.newEnvironment(baseSettings), - new NamedXContentRegistry(Collections.emptyList()), threadPool) { + new NamedXContentRegistry(Collections.emptyList()), mock(ThreadPool.class)) { @Override protected void assertSnapshotOrGenericThread() { // eliminate thread name check as we create repo manually on test/main threads From d27e6c69e08b8909fadbb88df1464fe847049439 Mon Sep 17 00:00:00 2001 From: Armin Date: Mon, 18 Mar 2019 08:49:13 +0100 Subject: [PATCH 07/21] add javadoc --- .../src/main/java/org/elasticsearch/repositories/Repository.java | 1 + 1 file changed, 1 insertion(+) diff --git a/server/src/main/java/org/elasticsearch/repositories/Repository.java b/server/src/main/java/org/elasticsearch/repositories/Repository.java index 78991d4174b80..20f7c42cb21dd 100644 --- a/server/src/main/java/org/elasticsearch/repositories/Repository.java +++ b/server/src/main/java/org/elasticsearch/repositories/Repository.java @@ -141,6 +141,7 @@ SnapshotInfo finalizeSnapshot(SnapshotId snapshotId, List indices, long * * @param snapshotId snapshot id * @param repositoryStateId the unique id identifying the state of the repository when the snapshot deletion began + * @param listener completion listener */ void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, ActionListener listener); From 4f8d93d36d5b54909e48a4de9eaeadb23c0cea29 Mon Sep 17 00:00:00 2001 From: Armin Date: Fri, 22 Mar 2019 17:41:30 +0100 Subject: [PATCH 08/21] CR comments --- .../blobstore/BlobStoreRepository.java | 102 ++++++++++-------- 1 file changed, 59 insertions(+), 43 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index c06696327c016..b6d567c5002e5 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -33,10 +33,13 @@ import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRunnable; +import org.elasticsearch.action.support.GroupedActionListener; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.metadata.RepositoryMetaData; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Numbers; import org.elasticsearch.common.Strings; import org.elasticsearch.common.UUIDs; @@ -110,7 +113,6 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.Executor; -import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import static org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.FileInfo.canonicalName; @@ -444,28 +446,30 @@ public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, Action } } - private void deleteSnapshotBlobs(SnapshotInfo snapshot, SnapshotId snapshotId, RepositoryData repositoryData, + private void deleteSnapshotBlobs(@Nullable SnapshotInfo snapshot, SnapshotId snapshotId, RepositoryData repositoryData, RepositoryData updatedRepositoryData, ActionListener listener) { final Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT); - final AtomicInteger outstanding = new AtomicInteger(2); - final ActionListener deleteListener = ActionListener.wrap(() -> { - if (outstanding.decrementAndGet() == 0) { - deleteIndices(snapshot, repositoryData, snapshotId, updatedRepositoryData, listener); + final ActionListener deleteListener = new GroupedActionListener<>( + ActionListener.wrap(() -> deleteIndices(snapshot, repositoryData, snapshotId, updatedRepositoryData, listener)), 2); + executor.execute(new ActionRunnable(listener) { + @Override + protected void doRun() { + // delete the snapshot file + deleteSnapshotBlobIgnoringErrors(snapshot, snapshotId.getUUID()); + deleteListener.onResponse(null); } }); - executor.execute(() -> { - // delete the snapshot file - deleteSnapshotBlobIgnoringErrors(snapshot, snapshotId.getUUID()); - deleteListener.onResponse(null); - }); - executor.execute(() -> { - // delete the global metadata file - deleteGlobalMetaDataBlobIgnoringErrors(snapshot, snapshotId.getUUID()); - deleteListener.onResponse(null); + executor.execute(new ActionRunnable(listener) { + @Override + protected void doRun() { + // delete the global metadata file + deleteGlobalMetaDataBlobIgnoringErrors(snapshot, snapshotId.getUUID()); + deleteListener.onResponse(null); + } }); } - private void deleteIndices(SnapshotInfo snapshot, RepositoryData repositoryData, SnapshotId snapshotId, + private void deleteIndices(@Nullable SnapshotInfo snapshot, RepositoryData repositoryData, SnapshotId snapshotId, RepositoryData updatedRepositoryData, ActionListener listener) { // Now delete all indices if (snapshot != null) { @@ -474,32 +478,44 @@ private void deleteIndices(SnapshotInfo snapshot, RepositoryData repositoryData, deleteUnreferencedIndices(repositoryData, updatedRepositoryData, listener); return; } - final AtomicInteger outstandingIndices = new AtomicInteger(indices.size()); + final ActionListener groupedListener = new GroupedActionListener<>(new ActionListener>() { + @Override + public void onResponse(final Collection voids) { + deleteUnreferencedIndices(repositoryData, updatedRepositoryData, listener); + } + + @Override + public void onFailure(Exception e) { + listener.onFailure(e); + } + }, indices.size()); for (String index : indices) { - threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(() -> { - final IndexId indexId = repositoryData.resolveIndexId(index); - IndexMetaData indexMetaData = null; - try { - indexMetaData = getSnapshotIndexMetaData(snapshotId, indexId); - } catch (Exception ex) { - logger.warn(() -> - new ParameterizedMessage("[{}] [{}] failed to read metadata for index", snapshotId, index), ex); - } - deleteIndexMetaDataBlobIgnoringErrors(snapshot, indexId); - if (indexMetaData != null) { - for (int shardId = 0; shardId < indexMetaData.getNumberOfShards(); shardId++) { - try { - final ShardId sid = new ShardId(indexMetaData.getIndex(), shardId); - new Context(snapshotId, indexId, sid, sid).delete(); - } catch (SnapshotException ex) { - final int finalShardId = shardId; - logger.warn(() -> new ParameterizedMessage("[{}] failed to delete shard data for shard [{}][{}]", - snapshotId, index, finalShardId), ex); + threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(new ActionRunnable(groupedListener) { + + @Override + protected void doRun() { + final IndexId indexId = repositoryData.resolveIndexId(index); + IndexMetaData indexMetaData = null; + try { + indexMetaData = getSnapshotIndexMetaData(snapshotId, indexId); + } catch (Exception ex) { + logger.warn(() -> + new ParameterizedMessage("[{}] [{}] failed to read metadata for index", snapshotId, index), ex); + } + deleteIndexMetaDataBlobIgnoringErrors(snapshot, indexId); + if (indexMetaData != null) { + for (int shardId = 0; shardId < indexMetaData.getNumberOfShards(); shardId++) { + try { + final ShardId sid = new ShardId(indexMetaData.getIndex(), shardId); + new Context(snapshotId, indexId, sid, sid).delete(); + } catch (SnapshotException ex) { + final int finalShardId = shardId; + logger.warn(() -> new ParameterizedMessage("[{}] failed to delete shard data for shard [{}][{}]", + snapshotId, index, finalShardId), ex); + } } } - } - if (outstandingIndices.decrementAndGet() == 0) { - deleteUnreferencedIndices(repositoryData, updatedRepositoryData, listener); + groupedListener.onResponse(null); } }); } @@ -519,7 +535,8 @@ private void deleteUnreferencedIndices(RepositoryData repositoryData, Repository listener.onResponse(null); return; } - final AtomicInteger outstanding = new AtomicInteger(indicesToCleanUp.size()); + final ActionListener groupedListener = + new GroupedActionListener<>(ActionListener.map(listener, v -> null), indicesToCleanUp.size()); for (final IndexId indexId : indicesToCleanUp) { threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(() -> { try { @@ -535,9 +552,8 @@ private void deleteUnreferencedIndices(RepositoryData repositoryData, Repository // a different IOException occurred while trying to delete - will just log the issue for now logger.warn(() -> new ParameterizedMessage("[{}] index [{}] no longer part of any snapshots in the repository, " + "but failed to clean up its index folder.", metadata.name(), indexId), e); - } - if (outstanding.decrementAndGet() == 0) { - listener.onResponse(null); + } finally { + groupedListener.onResponse(null); } }); } From ae6fb85967c01858f9f6953bbacf76672799fabc Mon Sep 17 00:00:00 2001 From: Armin Date: Fri, 22 Mar 2019 18:50:19 +0100 Subject: [PATCH 09/21] add threadpool to internal repo call --- .../java/org/elasticsearch/plugins/RepositoryPlugin.java | 3 ++- .../elasticsearch/repositories/RepositoriesModule.java | 2 +- .../repositories/RepositoriesModuleTests.java | 9 ++++++--- .../src/main/java/org/elasticsearch/xpack/ccr/Ccr.java | 7 +++---- .../xpack/core/LocalStateCompositeXPackPlugin.java | 8 +++++--- 5 files changed, 17 insertions(+), 12 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/plugins/RepositoryPlugin.java b/server/src/main/java/org/elasticsearch/plugins/RepositoryPlugin.java index 9084ed7dfaca0..ede5c5e3611f9 100644 --- a/server/src/main/java/org/elasticsearch/plugins/RepositoryPlugin.java +++ b/server/src/main/java/org/elasticsearch/plugins/RepositoryPlugin.java @@ -54,7 +54,8 @@ default Map getRepositories(Environment env, NamedXC * The key of the returned {@link Map} is the type name of the repository and * the value is a factory to construct the {@link Repository} interface. */ - default Map getInternalRepositories(Environment env, NamedXContentRegistry namedXContentRegistry) { + default Map getInternalRepositories(Environment env, NamedXContentRegistry namedXContentRegistry, + ThreadPool threadPool) { return Collections.emptyMap(); } } diff --git a/server/src/main/java/org/elasticsearch/repositories/RepositoriesModule.java b/server/src/main/java/org/elasticsearch/repositories/RepositoriesModule.java index 7695ea946ea49..5ea853b0b5501 100644 --- a/server/src/main/java/org/elasticsearch/repositories/RepositoriesModule.java +++ b/server/src/main/java/org/elasticsearch/repositories/RepositoriesModule.java @@ -60,7 +60,7 @@ public RepositoriesModule(Environment env, List repoPlugins, T Map internalFactories = new HashMap<>(); for (RepositoryPlugin repoPlugin : repoPlugins) { - Map newRepoTypes = repoPlugin.getInternalRepositories(env, namedXContentRegistry); + Map newRepoTypes = repoPlugin.getInternalRepositories(env, namedXContentRegistry, threadPool); for (Map.Entry entry : newRepoTypes.entrySet()) { if (internalFactories.put(entry.getKey(), entry.getValue()) != null) { throw new IllegalArgumentException("Internal repository type [" + entry.getKey() + "] is already registered"); diff --git a/server/src/test/java/org/elasticsearch/repositories/RepositoriesModuleTests.java b/server/src/test/java/org/elasticsearch/repositories/RepositoriesModuleTests.java index 767ecc38b18db..cd31ce121b245 100644 --- a/server/src/test/java/org/elasticsearch/repositories/RepositoriesModuleTests.java +++ b/server/src/test/java/org/elasticsearch/repositories/RepositoriesModuleTests.java @@ -80,8 +80,10 @@ public void testCannotRegisterTwoRepositoriesWithSameTypes() { } public void testCannotRegisterTwoInternalRepositoriesWithSameTypes() { - when(plugin1.getInternalRepositories(environment, contentRegistry)).thenReturn(Collections.singletonMap("type1", factory)); - when(plugin2.getInternalRepositories(environment, contentRegistry)).thenReturn(Collections.singletonMap("type1", factory)); + when(plugin1.getInternalRepositories(environment, contentRegistry, threadPool)) + .thenReturn(Collections.singletonMap("type1", factory)); + when(plugin2.getInternalRepositories(environment, contentRegistry, threadPool)) + .thenReturn(Collections.singletonMap("type1", factory)); IllegalArgumentException ex = expectThrows(IllegalArgumentException.class, () -> new RepositoriesModule(environment, repoPlugins, mock(TransportService.class), mock(ClusterService.class), @@ -92,7 +94,8 @@ public void testCannotRegisterTwoInternalRepositoriesWithSameTypes() { public void testCannotRegisterNormalAndInternalRepositoriesWithSameTypes() { when(plugin1.getRepositories(environment, contentRegistry, threadPool)).thenReturn(Collections.singletonMap("type1", factory)); - when(plugin2.getInternalRepositories(environment, contentRegistry)).thenReturn(Collections.singletonMap("type1", factory)); + when(plugin2.getInternalRepositories(environment, contentRegistry, threadPool)) + .thenReturn(Collections.singletonMap("type1", factory)); IllegalArgumentException ex = expectThrows(IllegalArgumentException.class, () -> new RepositoriesModule(environment, repoPlugins, mock(TransportService.class), mock(ClusterService.class), diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java index 7fa7f37f4b71d..3eda554a84bd4 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java @@ -132,7 +132,6 @@ public class Ccr extends Plugin implements ActionPlugin, PersistentTaskPlugin, E private final CcrLicenseChecker ccrLicenseChecker; private final SetOnce restoreSourceService = new SetOnce<>(); private final SetOnce ccrSettings = new SetOnce<>(); - private final SetOnce threadPool = new SetOnce<>(); private Client client; private final boolean transportClientMode; @@ -177,7 +176,6 @@ public Collection createComponents( CcrSettings ccrSettings = new CcrSettings(settings, clusterService.getClusterSettings()); this.ccrSettings.set(ccrSettings); - this.threadPool.set(threadPool); CcrRestoreSourceService restoreSourceService = new CcrRestoreSourceService(threadPool, ccrSettings); this.restoreSourceService.set(restoreSourceService); return Arrays.asList( @@ -326,9 +324,10 @@ public List> getExecutorBuilders(Settings settings) { } @Override - public Map getInternalRepositories(Environment env, NamedXContentRegistry namedXContentRegistry) { + public Map getInternalRepositories(Environment env, NamedXContentRegistry namedXContentRegistry, + ThreadPool threadPool) { Repository.Factory repositoryFactory = - (metadata) -> new CcrRepository(metadata, client, ccrLicenseChecker, settings, ccrSettings.get(), threadPool.get()); + (metadata) -> new CcrRepository(metadata, client, ccrLicenseChecker, settings, ccrSettings.get(), threadPool); return Collections.singletonMap(CcrRepository.TYPE, repositoryFactory); } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/LocalStateCompositeXPackPlugin.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/LocalStateCompositeXPackPlugin.java index 909434ae99763..fe56bfbb785bf 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/LocalStateCompositeXPackPlugin.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/LocalStateCompositeXPackPlugin.java @@ -403,10 +403,12 @@ public Map getRepositories(Environment env, NamedXCo } @Override - public Map getInternalRepositories(Environment env, NamedXContentRegistry namedXContentRegistry) { - HashMap internalRepositories = new HashMap<>(super.getInternalRepositories(env, namedXContentRegistry)); + public Map getInternalRepositories(Environment env, NamedXContentRegistry namedXContentRegistry, + ThreadPool threadPool) { + HashMap internalRepositories = + new HashMap<>(super.getInternalRepositories(env, namedXContentRegistry, threadPool)); filterPlugins(RepositoryPlugin.class).forEach(r -> - internalRepositories.putAll(r.getInternalRepositories(env, namedXContentRegistry))); + internalRepositories.putAll(r.getInternalRepositories(env, namedXContentRegistry, threadPool))); return internalRepositories; } From c24bb71810d001ed7f9751fc21733ffdcc050289 Mon Sep 17 00:00:00 2001 From: Armin Date: Mon, 1 Apr 2019 18:25:29 +0200 Subject: [PATCH 10/21] use generic pool for snapshot restore --- .../snapshots/restore/TransportRestoreSnapshotAction.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/restore/TransportRestoreSnapshotAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/restore/TransportRestoreSnapshotAction.java index d8dcc5eb8f846..f83f655bc5a23 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/restore/TransportRestoreSnapshotAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/restore/TransportRestoreSnapshotAction.java @@ -50,7 +50,7 @@ public TransportRestoreSnapshotAction(TransportService transportService, Cluster @Override protected String executor() { - return ThreadPool.Names.SNAPSHOT; + return ThreadPool.Names.GENERIC; } @Override From 585bcbbb775936b0591eedd605db2dd415a1dbd0 Mon Sep 17 00:00:00 2001 From: Armin Date: Mon, 1 Apr 2019 18:42:00 +0200 Subject: [PATCH 11/21] CR: add a little doc --- .../repositories/blobstore/BlobStoreRepository.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index b6d567c5002e5..fb8ae62acd9cb 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -239,8 +239,9 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp /** * Constructs new BlobStoreRepository - * @param metadata The metadata for this repository including name and settings - * @param settings Settings for the node this repository object is created on + * @param metadata The metadata for this repository including name and settings + * @param settings Settings for the node this repository object is created on + * @param threadPool Threadpool to run long running repository manipulations on asynchronously */ protected BlobStoreRepository(RepositoryMetaData metadata, Settings settings, NamedXContentRegistry namedXContentRegistry, ThreadPool threadPool) { From 2895f19b4784a44c8a1060cbb2f466e512c0ce92 Mon Sep 17 00:00:00 2001 From: Armin Date: Mon, 1 Apr 2019 19:47:01 +0200 Subject: [PATCH 12/21] CR: chain actions to dry things up a little --- .../blobstore/BlobStoreRepository.java | 24 +++++++------------ 1 file changed, 9 insertions(+), 15 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index fb8ae62acd9cb..478d8110cde18 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -451,7 +451,10 @@ private void deleteSnapshotBlobs(@Nullable SnapshotInfo snapshot, SnapshotId sn RepositoryData updatedRepositoryData, ActionListener listener) { final Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT); final ActionListener deleteListener = new GroupedActionListener<>( - ActionListener.wrap(() -> deleteIndices(snapshot, repositoryData, snapshotId, updatedRepositoryData, listener)), 2); + ActionListener.wrap(v -> deleteIndices(snapshot, repositoryData, snapshotId, + ActionListener.wrap( + vv -> deleteUnreferencedIndices(repositoryData, updatedRepositoryData, listener), listener::onFailure)), + listener::onFailure), 2); executor.execute(new ActionRunnable(listener) { @Override protected void doRun() { @@ -471,25 +474,16 @@ protected void doRun() { } private void deleteIndices(@Nullable SnapshotInfo snapshot, RepositoryData repositoryData, SnapshotId snapshotId, - RepositoryData updatedRepositoryData, ActionListener listener) { + ActionListener listener) { // Now delete all indices if (snapshot != null) { final List indices = snapshot.indices(); if (indices.isEmpty()) { - deleteUnreferencedIndices(repositoryData, updatedRepositoryData, listener); + listener.onResponse(null); return; } - final ActionListener groupedListener = new GroupedActionListener<>(new ActionListener>() { - @Override - public void onResponse(final Collection voids) { - deleteUnreferencedIndices(repositoryData, updatedRepositoryData, listener); - } - - @Override - public void onFailure(Exception e) { - listener.onFailure(e); - } - }, indices.size()); + final ActionListener groupedListener = + new GroupedActionListener<>(ActionListener.map(listener, v -> null), indices.size()); for (String index : indices) { threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(new ActionRunnable(groupedListener) { @@ -521,7 +515,7 @@ protected void doRun() { }); } } else { - deleteUnreferencedIndices(repositoryData, updatedRepositoryData, listener); + listener.onResponse(null); } } From d04592011b92523a7af6a880e093b3a9b252180f Mon Sep 17 00:00:00 2001 From: Armin Date: Mon, 1 Apr 2019 21:14:06 +0200 Subject: [PATCH 13/21] dry up logic using callback chain --- .../blobstore/BlobStoreRepository.java | 88 +++++++++---------- 1 file changed, 41 insertions(+), 47 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index 3e7f784ecd998..b6943a08800f9 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -442,18 +442,21 @@ public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, Action listener.onFailure(new RepositoryException(metadata.name(), "failed to delete snapshot [" + snapshotId + "]", ex)); return; } - deleteSnapshotBlobs(snapshot, snapshotId, repositoryData, updatedRepositoryData, listener); + final SnapshotInfo finalSnapshotInfo = snapshot; + final ActionListener afterDeleteIndices = ActionListener.wrap( + vv -> deleteUnreferencedIndices(repositoryData, updatedRepositoryData, listener), listener::onFailure); + deleteSnapshotBlobs(snapshot, snapshotId, + snapshot == null || snapshot.indices().isEmpty() + ? afterDeleteIndices + : ActionListener.wrap( + v -> deleteIndices(finalSnapshotInfo, repositoryData, snapshotId, afterDeleteIndices), listener::onFailure) + ); } } - private void deleteSnapshotBlobs(@Nullable SnapshotInfo snapshot, SnapshotId snapshotId, RepositoryData repositoryData, - RepositoryData updatedRepositoryData, ActionListener listener) { + private void deleteSnapshotBlobs(@Nullable SnapshotInfo snapshot, SnapshotId snapshotId, ActionListener listener) { final Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT); - final ActionListener deleteListener = new GroupedActionListener<>( - ActionListener.wrap(v -> deleteIndices(snapshot, repositoryData, snapshotId, - ActionListener.wrap( - vv -> deleteUnreferencedIndices(repositoryData, updatedRepositoryData, listener), listener::onFailure)), - listener::onFailure), 2); + final ActionListener deleteListener = new GroupedActionListener<>(ActionListener.map(listener, v -> null), 2); executor.execute(new ActionRunnable(listener) { @Override protected void doRun() { @@ -472,49 +475,40 @@ protected void doRun() { }); } - private void deleteIndices(@Nullable SnapshotInfo snapshot, RepositoryData repositoryData, SnapshotId snapshotId, + private void deleteIndices(SnapshotInfo snapshot, RepositoryData repositoryData, SnapshotId snapshotId, ActionListener listener) { - // Now delete all indices - if (snapshot != null) { - final List indices = snapshot.indices(); - if (indices.isEmpty()) { - listener.onResponse(null); - return; - } - final ActionListener groupedListener = - new GroupedActionListener<>(ActionListener.map(listener, v -> null), indices.size()); - for (String index : indices) { - threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(new ActionRunnable(groupedListener) { - - @Override - protected void doRun() { - final IndexId indexId = repositoryData.resolveIndexId(index); - IndexMetaData indexMetaData = null; - try { - indexMetaData = getSnapshotIndexMetaData(snapshotId, indexId); - } catch (Exception ex) { - logger.warn(() -> - new ParameterizedMessage("[{}] [{}] failed to read metadata for index", snapshotId, index), ex); - } - deleteIndexMetaDataBlobIgnoringErrors(snapshot, indexId); - if (indexMetaData != null) { - for (int shardId = 0; shardId < indexMetaData.getNumberOfShards(); shardId++) { - try { - final ShardId sid = new ShardId(indexMetaData.getIndex(), shardId); - new Context(snapshotId, indexId, sid, sid).delete(); - } catch (SnapshotException ex) { - final int finalShardId = shardId; - logger.warn(() -> new ParameterizedMessage("[{}] failed to delete shard data for shard [{}][{}]", - snapshotId, index, finalShardId), ex); - } + final List indices = snapshot.indices(); + final ActionListener groupedListener = + new GroupedActionListener<>(ActionListener.map(listener, v -> null), indices.size()); + for (String index : indices) { + threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(new ActionRunnable(groupedListener) { + + @Override + protected void doRun() { + final IndexId indexId = repositoryData.resolveIndexId(index); + IndexMetaData indexMetaData = null; + try { + indexMetaData = getSnapshotIndexMetaData(snapshotId, indexId); + } catch (Exception ex) { + logger.warn(() -> + new ParameterizedMessage("[{}] [{}] failed to read metadata for index", snapshotId, index), ex); + } + deleteIndexMetaDataBlobIgnoringErrors(snapshot, indexId); + if (indexMetaData != null) { + for (int shardId = 0; shardId < indexMetaData.getNumberOfShards(); shardId++) { + try { + final ShardId sid = new ShardId(indexMetaData.getIndex(), shardId); + new Context(snapshotId, indexId, sid, sid).delete(); + } catch (SnapshotException ex) { + final int finalShardId = shardId; + logger.warn(() -> new ParameterizedMessage("[{}] failed to delete shard data for shard [{}][{}]", + snapshotId, index, finalShardId), ex); } } - groupedListener.onResponse(null); } - }); - } - } else { - listener.onResponse(null); + groupedListener.onResponse(null); + } + }); } } From 6bac7c053ad71727a2a93000e298b9d3d53702bf Mon Sep 17 00:00:00 2001 From: Armin Date: Mon, 1 Apr 2019 21:47:18 +0200 Subject: [PATCH 14/21] add comment --- .../repositories/blobstore/BlobStoreRepository.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index b6943a08800f9..9d2bbaec9e31c 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -447,7 +447,7 @@ public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, Action vv -> deleteUnreferencedIndices(repositoryData, updatedRepositoryData, listener), listener::onFailure); deleteSnapshotBlobs(snapshot, snapshotId, snapshot == null || snapshot.indices().isEmpty() - ? afterDeleteIndices + ? afterDeleteIndices // if we don't have any indices to delete move to cleaning up unreferenced indices right away : ActionListener.wrap( v -> deleteIndices(finalSnapshotInfo, repositoryData, snapshotId, afterDeleteIndices), listener::onFailure) ); From e1c816202812aef59a9f3eb2ae8cc62f0d7980a1 Mon Sep 17 00:00:00 2001 From: Armin Date: Mon, 1 Apr 2019 22:00:56 +0200 Subject: [PATCH 15/21] dry logic up some more --- .../blobstore/BlobStoreRepository.java | 43 ++++++++----------- 1 file changed, 18 insertions(+), 25 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index 9d2bbaec9e31c..61a76f29583ef 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -443,14 +443,19 @@ public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, Action return; } final SnapshotInfo finalSnapshotInfo = snapshot; - final ActionListener afterDeleteIndices = ActionListener.wrap( - vv -> deleteUnreferencedIndices(repositoryData, updatedRepositoryData, listener), listener::onFailure); + final Collection unreferencedIndices = Sets.newHashSet(repositoryData.getIndices().values()); + unreferencedIndices.removeAll(updatedRepositoryData.getIndices().values()); + final ActionListener afterDeleteIndices = unreferencedIndices.isEmpty() + ? listener // if we don't have any newly unreferenced indices we move to the next step directly + : ActionListener.wrap( + vv -> deleteUnreferencedIndices(unreferencedIndices, listener), listener::onFailure); deleteSnapshotBlobs(snapshot, snapshotId, snapshot == null || snapshot.indices().isEmpty() - ? afterDeleteIndices // if we don't have any indices to delete move to cleaning up unreferenced indices right away + ? afterDeleteIndices // if we don't have any indices to delete we move to the next step : ActionListener.wrap( - v -> deleteIndices(finalSnapshotInfo, repositoryData, snapshotId, afterDeleteIndices), listener::onFailure) - ); + v -> deleteIndices( + finalSnapshotInfo.indices().stream().map(repositoryData::resolveIndexId).collect(Collectors.toList()), + snapshotId, afterDeleteIndices), listener::onFailure)); } } @@ -475,25 +480,22 @@ protected void doRun() { }); } - private void deleteIndices(SnapshotInfo snapshot, RepositoryData repositoryData, SnapshotId snapshotId, - ActionListener listener) { - final List indices = snapshot.indices(); + private void deleteIndices(List indices, SnapshotId snapshotId, ActionListener listener) { final ActionListener groupedListener = new GroupedActionListener<>(ActionListener.map(listener, v -> null), indices.size()); - for (String index : indices) { + for (IndexId indexId: indices) { threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(new ActionRunnable(groupedListener) { @Override protected void doRun() { - final IndexId indexId = repositoryData.resolveIndexId(index); IndexMetaData indexMetaData = null; try { indexMetaData = getSnapshotIndexMetaData(snapshotId, indexId); } catch (Exception ex) { logger.warn(() -> - new ParameterizedMessage("[{}] [{}] failed to read metadata for index", snapshotId, index), ex); + new ParameterizedMessage("[{}] [{}] failed to read metadata for index", snapshotId, indexId.getName()), ex); } - deleteIndexMetaDataBlobIgnoringErrors(snapshot, indexId); + deleteIndexMetaDataBlobIgnoringErrors(snapshotId, indexId); if (indexMetaData != null) { for (int shardId = 0; shardId < indexMetaData.getNumberOfShards(); shardId++) { try { @@ -502,7 +504,7 @@ protected void doRun() { } catch (SnapshotException ex) { final int finalShardId = shardId; logger.warn(() -> new ParameterizedMessage("[{}] failed to delete shard data for shard [{}][{}]", - snapshotId, index, finalShardId), ex); + snapshotId, indexId.getName(), finalShardId), ex); } } } @@ -512,17 +514,9 @@ protected void doRun() { } } - private void deleteUnreferencedIndices(RepositoryData repositoryData, RepositoryData updatedRepositoryData, - ActionListener listener) { - // cleanup indices that are no longer part of the repository - final Collection indicesToCleanUp = Sets.newHashSet(repositoryData.getIndices().values()); - indicesToCleanUp.removeAll(updatedRepositoryData.getIndices().values()); + // cleanup indices that are no longer part of the repository + private void deleteUnreferencedIndices(Collection indicesToCleanUp, ActionListener listener) { final BlobContainer indicesBlobContainer = blobStore().blobContainer(basePath().add("indices")); - if (indicesToCleanUp.isEmpty()) { - // We're done, no indices to clean up - listener.onResponse(null); - return; - } final ActionListener groupedListener = new GroupedActionListener<>(ActionListener.map(listener, v -> null), indicesToCleanUp.size()); for (final IndexId indexId : indicesToCleanUp) { @@ -573,8 +567,7 @@ private void deleteGlobalMetaDataBlobIgnoringErrors(final SnapshotInfo snapshotI } } - private void deleteIndexMetaDataBlobIgnoringErrors(final SnapshotInfo snapshotInfo, final IndexId indexId) { - final SnapshotId snapshotId = snapshotInfo.snapshotId(); + private void deleteIndexMetaDataBlobIgnoringErrors(SnapshotId snapshotId, IndexId indexId) { BlobContainer indexMetaDataBlobContainer = blobStore().blobContainer(basePath().add("indices").add(indexId.getId())); try { indexMetaDataFormat.delete(indexMetaDataBlobContainer, snapshotId.getUUID()); From def77dcf29e333a99a5a19d068f0d93d98284333 Mon Sep 17 00:00:00 2001 From: Armin Date: Mon, 1 Apr 2019 22:03:27 +0200 Subject: [PATCH 16/21] dry logic up some more --- .../repositories/blobstore/BlobStoreRepository.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index 61a76f29583ef..002570f20b0bb 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -481,8 +481,7 @@ protected void doRun() { } private void deleteIndices(List indices, SnapshotId snapshotId, ActionListener listener) { - final ActionListener groupedListener = - new GroupedActionListener<>(ActionListener.map(listener, v -> null), indices.size()); + final ActionListener groupedListener = new GroupedActionListener<>(ActionListener.map(listener, v -> null), indices.size()); for (IndexId indexId: indices) { threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(new ActionRunnable(groupedListener) { From 0c065255d93bf00d4a7df15c32116fd7ce5c2bc4 Mon Sep 17 00:00:00 2001 From: Armin Date: Wed, 3 Apr 2019 07:45:39 +0200 Subject: [PATCH 17/21] much simpler --- .../blobstore/BlobStoreFormat.java | 4 +- .../blobstore/BlobStoreRepository.java | 119 +++++------------- 2 files changed, 33 insertions(+), 90 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreFormat.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreFormat.java index eb9dc41236d8c..dc9f8092e3fc0 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreFormat.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreFormat.java @@ -100,11 +100,11 @@ public void delete(BlobContainer blobContainer, String name) throws IOException /** * Checks obj in the blob container */ - public boolean exists(BlobContainer blobContainer, String name) throws IOException { + public boolean exists(BlobContainer blobContainer, String name) { return blobContainer.blobExists(blobName(name)); } - protected String blobName(String name) { + public String blobName(String name) { return String.format(Locale.ROOT, blobNameFormat, name); } diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index 408da73248bd8..5fe3e0300a092 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -39,7 +39,6 @@ import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.metadata.RepositoryMetaData; import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Numbers; import org.elasticsearch.common.Strings; import org.elasticsearch.common.UUIDs; @@ -104,15 +103,16 @@ import java.io.FilterInputStream; import java.io.IOException; import java.io.InputStream; -import java.nio.file.DirectoryNotEmptyException; import java.nio.file.FileAlreadyExistsException; import java.nio.file.NoSuchFileException; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; -import java.util.concurrent.Executor; import java.util.stream.Collectors; import static org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.FileInfo.canonicalName; @@ -445,42 +445,38 @@ public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, Action final SnapshotInfo finalSnapshotInfo = snapshot; final Collection unreferencedIndices = Sets.newHashSet(repositoryData.getIndices().values()); unreferencedIndices.removeAll(updatedRepositoryData.getIndices().values()); - final ActionListener afterDeleteIndices = unreferencedIndices.isEmpty() - ? listener // if we don't have any newly unreferenced indices we move to the next step directly - : ActionListener.wrap( - vv -> deleteUnreferencedIndices(unreferencedIndices, listener), listener::onFailure); - deleteSnapshotBlobs(snapshot, snapshotId, - snapshot == null || snapshot.indices().isEmpty() - ? afterDeleteIndices // if we don't have any indices to delete we move to the next step - : ActionListener.wrap( - v -> deleteIndices( - finalSnapshotInfo.indices().stream().map(repositoryData::resolveIndexId).collect(Collectors.toList()), - snapshotId, afterDeleteIndices), listener::onFailure)); - } - } - - private void deleteSnapshotBlobs(@Nullable SnapshotInfo snapshot, SnapshotId snapshotId, ActionListener listener) { - final Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT); - final ActionListener deleteListener = new GroupedActionListener<>(ActionListener.map(listener, v -> null), 2); - executor.execute(new ActionRunnable(listener) { - @Override - protected void doRun() { - // delete the snapshot file - deleteSnapshotBlobIgnoringErrors(snapshot, snapshotId.getUUID()); - deleteListener.onResponse(null); - } - }); - executor.execute(new ActionRunnable(listener) { - @Override - protected void doRun() { - // delete the global metadata file - deleteGlobalMetaDataBlobIgnoringErrors(snapshot, snapshotId.getUUID()); - deleteListener.onResponse(null); + try { + blobContainer().deleteBlobsIgnoringIfNotExists( + Arrays.asList(snapshotFormat.blobName(snapshotId.getUUID()), globalMetaDataFormat.blobName(snapshotId.getUUID()))); + } catch (IOException e) { + logger.warn(() -> new ParameterizedMessage("[{}] Unable to delete global metadata files", snapshotId), e); } - }); + deleteIndices( + Optional.ofNullable(finalSnapshotInfo) + .map(info -> info.indices().stream().map(repositoryData::resolveIndexId).collect(Collectors.toList())) + .orElse(Collections.emptyList()), + snapshotId, + ActionListener.wrap(v -> { + try { + blobStore().blobContainer(basePath().add("indices")).deleteBlobsIgnoringIfNotExists( + unreferencedIndices.stream().map(IndexId::getId).collect(Collectors.toList())); + } catch (IOException e) { + // a different Exception occurred while trying to delete - will just log the issue for now + logger.warn(() -> + new ParameterizedMessage( + "[{}] indices {} are no longer part of any snapshots in the repository, " + + "but failed to clean up their index folders.", metadata.name(), unreferencedIndices), e); + } + listener.onResponse(null); + }, listener::onFailure) + ); + } } private void deleteIndices(List indices, SnapshotId snapshotId, ActionListener listener) { + if (indices.isEmpty()) { + listener.onResponse(null); + } final ActionListener groupedListener = new GroupedActionListener<>(ActionListener.map(listener, v -> null), indices.size()); for (IndexId indexId: indices) { threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(new ActionRunnable(groupedListener) { @@ -513,59 +509,6 @@ protected void doRun() { } } - // cleanup indices that are no longer part of the repository - private void deleteUnreferencedIndices(Collection indicesToCleanUp, ActionListener listener) { - final BlobContainer indicesBlobContainer = blobStore().blobContainer(basePath().add("indices")); - final ActionListener groupedListener = - new GroupedActionListener<>(ActionListener.map(listener, v -> null), indicesToCleanUp.size()); - for (final IndexId indexId : indicesToCleanUp) { - threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(() -> { - try { - indicesBlobContainer.deleteBlobIgnoringIfNotExists(indexId.getId()); - } catch (DirectoryNotEmptyException dnee) { - // if the directory isn't empty for some reason, it will fail to clean up; - // we'll ignore that and accept that cleanup didn't fully succeed. - // since we are using UUIDs for path names, this won't be an issue for - // snapshotting indices of the same name - logger.warn(() -> new ParameterizedMessage("[{}] index [{}] no longer part of any snapshots in the repository, " + - "but failed to clean up its index folder due to the directory not being empty.", metadata.name(), indexId), dnee); - } catch (Exception e) { - // a different IOException occurred while trying to delete - will just log the issue for now - logger.warn(() -> new ParameterizedMessage("[{}] index [{}] no longer part of any snapshots in the repository, " + - "but failed to clean up its index folder.", metadata.name(), indexId), e); - } finally { - groupedListener.onResponse(null); - } - }); - } - } - - private void deleteSnapshotBlobIgnoringErrors(final SnapshotInfo snapshotInfo, final String blobId) { - try { - snapshotFormat.delete(blobContainer(), blobId); - } catch (IOException e) { - if (snapshotInfo != null) { - logger.warn(() -> new ParameterizedMessage("[{}] Unable to delete snapshot file [{}]", - snapshotInfo.snapshotId(), blobId), e); - } else { - logger.warn(() -> new ParameterizedMessage("Unable to delete snapshot file [{}]", blobId), e); - } - } - } - - private void deleteGlobalMetaDataBlobIgnoringErrors(final SnapshotInfo snapshotInfo, final String blobId) { - try { - globalMetaDataFormat.delete(blobContainer(), blobId); - } catch (IOException e) { - if (snapshotInfo != null) { - logger.warn(() -> new ParameterizedMessage("[{}] Unable to delete global metadata file [{}]", - snapshotInfo.snapshotId(), blobId), e); - } else { - logger.warn(() -> new ParameterizedMessage("Unable to delete global metadata file [{}]", blobId), e); - } - } - } - private void deleteIndexMetaDataBlobIgnoringErrors(SnapshotId snapshotId, IndexId indexId) { BlobContainer indexMetaDataBlobContainer = blobStore().blobContainer(basePath().add("indices").add(indexId.getId())); try { From 1f921ee261189552278b26ea88baf6e5b1a93387 Mon Sep 17 00:00:00 2001 From: Armin Date: Wed, 3 Apr 2019 07:54:41 +0200 Subject: [PATCH 18/21] much simpler --- .../repositories/blobstore/BlobStoreRepository.java | 1 + 1 file changed, 1 insertion(+) diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index 5fe3e0300a092..f2cfed2bc70e8 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -476,6 +476,7 @@ public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, Action private void deleteIndices(List indices, SnapshotId snapshotId, ActionListener listener) { if (indices.isEmpty()) { listener.onResponse(null); + return; } final ActionListener groupedListener = new GroupedActionListener<>(ActionListener.map(listener, v -> null), indices.size()); for (IndexId indexId: indices) { From 36b2e3888b18745edf4bb4d6e4af4c5b3165f174 Mon Sep 17 00:00:00 2001 From: Armin Date: Wed, 3 Apr 2019 18:36:40 +0200 Subject: [PATCH 19/21] remove comment --- .../repositories/blobstore/BlobStoreRepository.java | 1 - 1 file changed, 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index f2cfed2bc70e8..f71e277c60caf 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -461,7 +461,6 @@ public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, Action blobStore().blobContainer(basePath().add("indices")).deleteBlobsIgnoringIfNotExists( unreferencedIndices.stream().map(IndexId::getId).collect(Collectors.toList())); } catch (IOException e) { - // a different Exception occurred while trying to delete - will just log the issue for now logger.warn(() -> new ParameterizedMessage( "[{}] indices {} are no longer part of any snapshots in the repository, " + From 59bdc975dcf53ce2dfd9b32642004f9eb315b2de Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Thu, 4 Apr 2019 15:37:18 +0200 Subject: [PATCH 20/21] CR comments --- .../snapshots/create/TransportCreateSnapshotAction.java | 2 ++ .../snapshots/restore/TransportRestoreSnapshotAction.java | 2 ++ .../repositories/blobstore/BlobStoreRepository.java | 6 +++--- 3 files changed, 7 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/create/TransportCreateSnapshotAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/create/TransportCreateSnapshotAction.java index 1907e414bb121..73f9a0742a719 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/create/TransportCreateSnapshotAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/create/TransportCreateSnapshotAction.java @@ -49,6 +49,8 @@ public TransportCreateSnapshotAction(TransportService transportService, ClusterS @Override protected String executor() { + // Using the generic instead of the snapshot threadpool here as the snapshot threadpool might be blocked on long running tasks + // which would block the request from getting an error response because of the ongoing task return ThreadPool.Names.GENERIC; } diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/restore/TransportRestoreSnapshotAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/restore/TransportRestoreSnapshotAction.java index f83f655bc5a23..a0f86719cad70 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/restore/TransportRestoreSnapshotAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/restore/TransportRestoreSnapshotAction.java @@ -50,6 +50,8 @@ public TransportRestoreSnapshotAction(TransportService transportService, Cluster @Override protected String executor() { + // Using the generic instead of the snapshot threadpool here as the snapshot threadpool might be blocked on long running tasks + // which would block the request from getting an error response because of the ongoing task return ThreadPool.Names.GENERIC; } diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index f71e277c60caf..97076ac0dfcac 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -456,7 +456,7 @@ public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, Action .map(info -> info.indices().stream().map(repositoryData::resolveIndexId).collect(Collectors.toList())) .orElse(Collections.emptyList()), snapshotId, - ActionListener.wrap(v -> { + ActionListener.map(listener, v -> { try { blobStore().blobContainer(basePath().add("indices")).deleteBlobsIgnoringIfNotExists( unreferencedIndices.stream().map(IndexId::getId).collect(Collectors.toList())); @@ -466,8 +466,8 @@ public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, Action "[{}] indices {} are no longer part of any snapshots in the repository, " + "but failed to clean up their index folders.", metadata.name(), unreferencedIndices), e); } - listener.onResponse(null); - }, listener::onFailure) + return null; + }) ); } } From c311311d51a545ee1e4673e2182a3986c330c39a Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Thu, 4 Apr 2019 15:38:32 +0200 Subject: [PATCH 21/21] CR comments --- .../repositories/blobstore/BlobStoreRepository.java | 3 --- 1 file changed, 3 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index 97076ac0dfcac..9351c5bf84e87 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -166,8 +166,6 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp protected final RepositoryMetaData metadata; - protected final NamedXContentRegistry namedXContentRegistry; - private final ThreadPool threadPool; private static final int BUFFER_SIZE = 4096; @@ -248,7 +246,6 @@ protected BlobStoreRepository(RepositoryMetaData metadata, Settings settings, Na this.settings = settings; this.metadata = metadata; this.threadPool = threadPool; - this.namedXContentRegistry = namedXContentRegistry; this.compress = COMPRESS_SETTING.get(metadata.settings()); snapshotRateLimiter = getRateLimiter(metadata.settings(), "max_snapshot_bytes_per_sec", new ByteSizeValue(40, ByteSizeUnit.MB)); restoreRateLimiter = getRateLimiter(metadata.settings(), "max_restore_bytes_per_sec", new ByteSizeValue(40, ByteSizeUnit.MB));