From 969d033e3a131dceccfd8fc57f78eb9db717a1e3 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Wed, 3 Feb 2021 12:26:40 +0100 Subject: [PATCH] Fix Threading in Snapshot Restore (#68390) Same as #68023 but even less likely (couldn't really find a quick way to write a test for it for that reason). Fix is the same, fork off to the generic pool for listener handling. Also, this allows removing the forking in the transport action since we don't do any long runnning work on the calling thread any longer in the restore method. --- .../restore/TransportRestoreSnapshotAction.java | 4 +--- .../main/java/org/elasticsearch/node/Node.java | 3 +-- .../elasticsearch/snapshots/RestoreService.java | 16 +++++++++------- .../snapshots/SnapshotResiliencyTests.java | 1 - 4 files changed, 11 insertions(+), 13 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/restore/TransportRestoreSnapshotAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/restore/TransportRestoreSnapshotAction.java index 4ed8c6845bbc5..c4431c47c30ef 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/restore/TransportRestoreSnapshotAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/restore/TransportRestoreSnapshotAction.java @@ -31,10 +31,8 @@ public class TransportRestoreSnapshotAction extends TransportMasterNodeAction repositoryDataListener = new StepListener<>(); - repository.getRepositoryData(repositoryDataListener); - repositoryDataListener.whenComplete(repositoryData -> { + ActionListener repoDataListener = ActionListener.wrap(repositoryData -> { final String snapshotName = request.snapshot(); final Optional matchingSnapshotId = repositoryData.getSnapshotIds().stream() .filter(s -> snapshotName.equals(s.getName())).findFirst(); @@ -515,7 +513,7 @@ private boolean checkPartial(String index) { } private void validateExistingIndex(IndexMetadata currentIndexMetadata, IndexMetadata snapshotIndexMetadata, - String renamedIndex, boolean partial) { + String renamedIndex, boolean partial) { // Index exist - checking that it's closed if (currentIndexMetadata.getState() != IndexMetadata.State.CLOSE) { // TODO: Enable restore for open indices @@ -610,6 +608,10 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS } }); }, listener::onFailure); + // fork handling the above listener to the generic pool since it loads various pieces of metadata from the repository over a + // longer period of time + repository.getRepositoryData(new ThreadedActionListener<>(logger, clusterService.getClusterApplierService().threadPool(), + ThreadPool.Names.GENERIC, repoDataListener, false)); } catch (Exception e) { logger.warn(() -> new ParameterizedMessage("[{}] failed to restore snapshot", request.repository() + ":" + request.snapshot()), e); diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java index 7ae72cecafd05..aecc7b1aa0fab 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java @@ -1574,7 +1574,6 @@ clusterService, indicesService, threadPool, shardStateAction, mappingUpdatedActi mapperRegistry, indexScopedSettings, null), - clusterSettings, shardLimitValidator ); actions.put(PutMappingAction.INSTANCE,