Skip to content

Commit 78dc03c

Browse files
Fix Threading in Snapshot Restore (#68390)
Same as #68023 but even less likely (couldn't really find a quick way to write a test for it for that reason). Fix is the same, fork off to the generic pool for listener handling. Also, this allows removing the forking in the transport action since we don't do any long runnning work on the calling thread any longer in the restore method.
1 parent 44f27b6 commit 78dc03c

File tree

4 files changed

+11
-13
lines changed

4 files changed

+11
-13
lines changed

server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/restore/TransportRestoreSnapshotAction.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,10 +32,8 @@ public class TransportRestoreSnapshotAction extends TransportMasterNodeAction<Re
3232
public TransportRestoreSnapshotAction(TransportService transportService, ClusterService clusterService,
3333
ThreadPool threadPool, RestoreService restoreService, ActionFilters actionFilters,
3434
IndexNameExpressionResolver indexNameExpressionResolver) {
35-
// Using the generic instead of the snapshot threadpool here as the snapshot threadpool might be blocked on long running tasks
36-
// which would block the request from getting an error response because of the ongoing task
3735
super(RestoreSnapshotAction.NAME, transportService, clusterService, threadPool, actionFilters,
38-
RestoreSnapshotRequest::new, indexNameExpressionResolver, RestoreSnapshotResponse::new, ThreadPool.Names.GENERIC);
36+
RestoreSnapshotRequest::new, indexNameExpressionResolver, RestoreSnapshotResponse::new, ThreadPool.Names.SAME);
3937
this.restoreService = restoreService;
4038
}
4139

server/src/main/java/org/elasticsearch/node/Node.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -597,8 +597,7 @@ protected Node(final Environment initialEnvironment,
597597
SnapshotShardsService snapshotShardsService = new SnapshotShardsService(settings, clusterService, repositoryService,
598598
transportService, indicesService);
599599
RestoreService restoreService = new RestoreService(clusterService, repositoryService, clusterModule.getAllocationService(),
600-
metadataCreateIndexService, indexMetadataVerifier, clusterService.getClusterSettings(), shardLimitValidator);
601-
600+
metadataCreateIndexService, indexMetadataVerifier, shardLimitValidator);
602601
final DiskThresholdMonitor diskThresholdMonitor = new DiskThresholdMonitor(settings, clusterService::state,
603602
clusterService.getClusterSettings(), client, threadPool::relativeTimeInMillis, rerouteService);
604603
clusterInfoService.addListener(diskThresholdMonitor::onNewInfo);

server/src/main/java/org/elasticsearch/snapshots/RestoreService.java

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,9 @@
1616
import org.apache.logging.log4j.message.ParameterizedMessage;
1717
import org.elasticsearch.Version;
1818
import org.elasticsearch.action.ActionListener;
19-
import org.elasticsearch.action.StepListener;
2019
import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotRequest;
2120
import org.elasticsearch.action.support.IndicesOptions;
21+
import org.elasticsearch.action.support.ThreadedActionListener;
2222
import org.elasticsearch.cluster.ClusterChangedEvent;
2323
import org.elasticsearch.cluster.ClusterState;
2424
import org.elasticsearch.cluster.ClusterStateApplier;
@@ -66,6 +66,7 @@
6666
import org.elasticsearch.repositories.RepositoriesService;
6767
import org.elasticsearch.repositories.Repository;
6868
import org.elasticsearch.repositories.RepositoryData;
69+
import org.elasticsearch.threadpool.ThreadPool;
6970

7071
import java.util.ArrayList;
7172
import java.util.Arrays;
@@ -152,8 +153,7 @@ public class RestoreService implements ClusterStateApplier {
152153

153154
public RestoreService(ClusterService clusterService, RepositoriesService repositoriesService,
154155
AllocationService allocationService, MetadataCreateIndexService createIndexService,
155-
IndexMetadataVerifier indexMetadataVerifier, ClusterSettings clusterSettings,
156-
ShardLimitValidator shardLimitValidator) {
156+
IndexMetadataVerifier indexMetadataVerifier, ShardLimitValidator shardLimitValidator) {
157157
this.clusterService = clusterService;
158158
this.repositoriesService = repositoriesService;
159159
this.allocationService = allocationService;
@@ -191,9 +191,7 @@ public void restoreSnapshot(final RestoreSnapshotRequest request,
191191
// Read snapshot info and metadata from the repository
192192
final String repositoryName = request.repository();
193193
Repository repository = repositoriesService.repository(repositoryName);
194-
final StepListener<RepositoryData> repositoryDataListener = new StepListener<>();
195-
repository.getRepositoryData(repositoryDataListener);
196-
repositoryDataListener.whenComplete(repositoryData -> {
194+
ActionListener<RepositoryData> repoDataListener = ActionListener.wrap(repositoryData -> {
197195
final String snapshotName = request.snapshot();
198196
final Optional<SnapshotId> matchingSnapshotId = repositoryData.getSnapshotIds().stream()
199197
.filter(s -> snapshotName.equals(s.getName())).findFirst();
@@ -499,7 +497,7 @@ private boolean checkPartial(String index) {
499497
}
500498

501499
private void validateExistingIndex(IndexMetadata currentIndexMetadata, IndexMetadata snapshotIndexMetadata,
502-
String renamedIndex, boolean partial) {
500+
String renamedIndex, boolean partial) {
503501
// Index exist - checking that it's closed
504502
if (currentIndexMetadata.getState() != IndexMetadata.State.CLOSE) {
505503
// TODO: Enable restore for open indices
@@ -594,6 +592,10 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
594592
}
595593
});
596594
}, listener::onFailure);
595+
// fork handling the above listener to the generic pool since it loads various pieces of metadata from the repository over a
596+
// longer period of time
597+
repository.getRepositoryData(new ThreadedActionListener<>(logger, clusterService.getClusterApplierService().threadPool(),
598+
ThreadPool.Names.GENERIC, repoDataListener, false));
597599
} catch (Exception e) {
598600
logger.warn(() -> new ParameterizedMessage("[{}] failed to restore snapshot",
599601
request.repository() + ":" + request.snapshot()), e);

server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1567,7 +1567,6 @@ clusterService, indicesService, threadPool, shardStateAction, mappingUpdatedActi
15671567
mapperRegistry,
15681568
indexScopedSettings,
15691569
null),
1570-
clusterSettings,
15711570
shardLimitValidator
15721571
);
15731572
actions.put(PutMappingAction.INSTANCE,

0 commit comments

Comments
 (0)