Skip to content

Commit 8b9dfeb

Browse files
Fix Threading in Snapshot Restore (#68390) (#68438)
Same as #68023 but even less likely (couldn't really find a quick way to write a test for it for that reason). Fix is the same, fork off to the generic pool for listener handling. Also, this allows removing the forking in the transport action since we don't do any long runnning work on the calling thread any longer in the restore method.
1 parent c1dc10a commit 8b9dfeb

File tree

4 files changed

+11
-13
lines changed

4 files changed

+11
-13
lines changed

server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/restore/TransportRestoreSnapshotAction.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,10 +31,8 @@ public class TransportRestoreSnapshotAction extends TransportMasterNodeAction<Re
3131
public TransportRestoreSnapshotAction(TransportService transportService, ClusterService clusterService,
3232
ThreadPool threadPool, RestoreService restoreService, ActionFilters actionFilters,
3333
IndexNameExpressionResolver indexNameExpressionResolver) {
34-
// Using the generic instead of the snapshot threadpool here as the snapshot threadpool might be blocked on long running tasks
35-
// which would block the request from getting an error response because of the ongoing task
3634
super(RestoreSnapshotAction.NAME, transportService, clusterService, threadPool, actionFilters,
37-
RestoreSnapshotRequest::new, indexNameExpressionResolver, RestoreSnapshotResponse::new, ThreadPool.Names.GENERIC);
35+
RestoreSnapshotRequest::new, indexNameExpressionResolver, RestoreSnapshotResponse::new, ThreadPool.Names.SAME);
3836
this.restoreService = restoreService;
3937
}
4038

server/src/main/java/org/elasticsearch/node/Node.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -612,8 +612,7 @@ protected Node(final Environment initialEnvironment,
612612
TransportNodesSnapshotsStatus nodesSnapshotsStatus = new TransportNodesSnapshotsStatus(threadPool, clusterService,
613613
transportService, snapshotShardsService, actionModule.getActionFilters());
614614
RestoreService restoreService = new RestoreService(clusterService, repositoryService, clusterModule.getAllocationService(),
615-
metadataCreateIndexService, indexMetadataVerifier, clusterService.getClusterSettings(), shardLimitValidator);
616-
615+
metadataCreateIndexService, indexMetadataVerifier, shardLimitValidator);
617616
final DiskThresholdMonitor diskThresholdMonitor = new DiskThresholdMonitor(settings, clusterService::state,
618617
clusterService.getClusterSettings(), client, threadPool::relativeTimeInMillis, rerouteService);
619618
clusterInfoService.addListener(diskThresholdMonitor::onNewInfo);

server/src/main/java/org/elasticsearch/snapshots/RestoreService.java

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,9 @@
1616
import org.apache.logging.log4j.message.ParameterizedMessage;
1717
import org.elasticsearch.Version;
1818
import org.elasticsearch.action.ActionListener;
19-
import org.elasticsearch.action.StepListener;
2019
import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotRequest;
2120
import org.elasticsearch.action.support.IndicesOptions;
21+
import org.elasticsearch.action.support.ThreadedActionListener;
2222
import org.elasticsearch.cluster.ClusterChangedEvent;
2323
import org.elasticsearch.cluster.ClusterState;
2424
import org.elasticsearch.cluster.ClusterStateApplier;
@@ -66,6 +66,7 @@
6666
import org.elasticsearch.repositories.RepositoriesService;
6767
import org.elasticsearch.repositories.Repository;
6868
import org.elasticsearch.repositories.RepositoryData;
69+
import org.elasticsearch.threadpool.ThreadPool;
6970

7071
import java.util.ArrayList;
7172
import java.util.Arrays;
@@ -152,8 +153,7 @@ public class RestoreService implements ClusterStateApplier {
152153

153154
public RestoreService(ClusterService clusterService, RepositoriesService repositoriesService,
154155
AllocationService allocationService, MetadataCreateIndexService createIndexService,
155-
IndexMetadataVerifier indexMetadataVerifier, ClusterSettings clusterSettings,
156-
ShardLimitValidator shardLimitValidator) {
156+
IndexMetadataVerifier indexMetadataVerifier, ShardLimitValidator shardLimitValidator) {
157157
this.clusterService = clusterService;
158158
this.repositoriesService = repositoriesService;
159159
this.allocationService = allocationService;
@@ -191,9 +191,7 @@ public void restoreSnapshot(final RestoreSnapshotRequest request,
191191
// Read snapshot info and metadata from the repository
192192
final String repositoryName = request.repository();
193193
Repository repository = repositoriesService.repository(repositoryName);
194-
final StepListener<RepositoryData> repositoryDataListener = new StepListener<>();
195-
repository.getRepositoryData(repositoryDataListener);
196-
repositoryDataListener.whenComplete(repositoryData -> {
194+
ActionListener<RepositoryData> repoDataListener = ActionListener.wrap(repositoryData -> {
197195
final String snapshotName = request.snapshot();
198196
final Optional<SnapshotId> matchingSnapshotId = repositoryData.getSnapshotIds().stream()
199197
.filter(s -> snapshotName.equals(s.getName())).findFirst();
@@ -515,7 +513,7 @@ private boolean checkPartial(String index) {
515513
}
516514

517515
private void validateExistingIndex(IndexMetadata currentIndexMetadata, IndexMetadata snapshotIndexMetadata,
518-
String renamedIndex, boolean partial) {
516+
String renamedIndex, boolean partial) {
519517
// Index exist - checking that it's closed
520518
if (currentIndexMetadata.getState() != IndexMetadata.State.CLOSE) {
521519
// TODO: Enable restore for open indices
@@ -610,6 +608,10 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
610608
}
611609
});
612610
}, listener::onFailure);
611+
// fork handling the above listener to the generic pool since it loads various pieces of metadata from the repository over a
612+
// longer period of time
613+
repository.getRepositoryData(new ThreadedActionListener<>(logger, clusterService.getClusterApplierService().threadPool(),
614+
ThreadPool.Names.GENERIC, repoDataListener, false));
613615
} catch (Exception e) {
614616
logger.warn(() -> new ParameterizedMessage("[{}] failed to restore snapshot",
615617
request.repository() + ":" + request.snapshot()), e);

server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1574,7 +1574,6 @@ clusterService, indicesService, threadPool, shardStateAction, mappingUpdatedActi
15741574
mapperRegistry,
15751575
indexScopedSettings,
15761576
null),
1577-
clusterSettings,
15781577
shardLimitValidator
15791578
);
15801579
actions.put(PutMappingAction.INSTANCE,

0 commit comments

Comments
 (0)