Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,8 @@ public class TransportRestoreSnapshotAction extends TransportMasterNodeAction<Re
public TransportRestoreSnapshotAction(TransportService transportService, ClusterService clusterService,
ThreadPool threadPool, RestoreService restoreService, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver) {
// Using the generic instead of the snapshot threadpool here as the snapshot threadpool might be blocked on long running tasks
// which would block the request from getting an error response because of the ongoing task
super(RestoreSnapshotAction.NAME, transportService, clusterService, threadPool, actionFilters,
RestoreSnapshotRequest::new, indexNameExpressionResolver, RestoreSnapshotResponse::new, ThreadPool.Names.GENERIC);
RestoreSnapshotRequest::new, indexNameExpressionResolver, RestoreSnapshotResponse::new, ThreadPool.Names.SAME);
this.restoreService = restoreService;
}

Expand Down
3 changes: 1 addition & 2 deletions server/src/main/java/org/elasticsearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -612,8 +612,7 @@ protected Node(final Environment initialEnvironment,
TransportNodesSnapshotsStatus nodesSnapshotsStatus = new TransportNodesSnapshotsStatus(threadPool, clusterService,
transportService, snapshotShardsService, actionModule.getActionFilters());
RestoreService restoreService = new RestoreService(clusterService, repositoryService, clusterModule.getAllocationService(),
metadataCreateIndexService, indexMetadataVerifier, clusterService.getClusterSettings(), shardLimitValidator);

metadataCreateIndexService, indexMetadataVerifier, shardLimitValidator);
final DiskThresholdMonitor diskThresholdMonitor = new DiskThresholdMonitor(settings, clusterService::state,
clusterService.getClusterSettings(), client, threadPool::relativeTimeInMillis, rerouteService);
clusterInfoService.addListener(diskThresholdMonitor::onNewInfo);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.StepListener;
import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotRequest;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.ThreadedActionListener;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateApplier;
Expand Down Expand Up @@ -66,6 +66,7 @@
import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.repositories.Repository;
import org.elasticsearch.repositories.RepositoryData;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.ArrayList;
import java.util.Arrays;
Expand Down Expand Up @@ -152,8 +153,7 @@ public class RestoreService implements ClusterStateApplier {

public RestoreService(ClusterService clusterService, RepositoriesService repositoriesService,
AllocationService allocationService, MetadataCreateIndexService createIndexService,
IndexMetadataVerifier indexMetadataVerifier, ClusterSettings clusterSettings,
ShardLimitValidator shardLimitValidator) {
IndexMetadataVerifier indexMetadataVerifier, ShardLimitValidator shardLimitValidator) {
this.clusterService = clusterService;
this.repositoriesService = repositoriesService;
this.allocationService = allocationService;
Expand Down Expand Up @@ -191,9 +191,7 @@ public void restoreSnapshot(final RestoreSnapshotRequest request,
// Read snapshot info and metadata from the repository
final String repositoryName = request.repository();
Repository repository = repositoriesService.repository(repositoryName);
final StepListener<RepositoryData> repositoryDataListener = new StepListener<>();
repository.getRepositoryData(repositoryDataListener);
repositoryDataListener.whenComplete(repositoryData -> {
ActionListener<RepositoryData> repoDataListener = ActionListener.wrap(repositoryData -> {
final String snapshotName = request.snapshot();
final Optional<SnapshotId> matchingSnapshotId = repositoryData.getSnapshotIds().stream()
.filter(s -> snapshotName.equals(s.getName())).findFirst();
Expand Down Expand Up @@ -515,7 +513,7 @@ private boolean checkPartial(String index) {
}

private void validateExistingIndex(IndexMetadata currentIndexMetadata, IndexMetadata snapshotIndexMetadata,
String renamedIndex, boolean partial) {
String renamedIndex, boolean partial) {
// Index exist - checking that it's closed
if (currentIndexMetadata.getState() != IndexMetadata.State.CLOSE) {
// TODO: Enable restore for open indices
Expand Down Expand Up @@ -610,6 +608,10 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
}
});
}, listener::onFailure);
// fork handling the above listener to the generic pool since it loads various pieces of metadata from the repository over a
// longer period of time
repository.getRepositoryData(new ThreadedActionListener<>(logger, clusterService.getClusterApplierService().threadPool(),
ThreadPool.Names.GENERIC, repoDataListener, false));
} catch (Exception e) {
logger.warn(() -> new ParameterizedMessage("[{}] failed to restore snapshot",
request.repository() + ":" + request.snapshot()), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1574,7 +1574,6 @@ clusterService, indicesService, threadPool, shardStateAction, mappingUpdatedActi
mapperRegistry,
indexScopedSettings,
null),
clusterSettings,
shardLimitValidator
);
actions.put(PutMappingAction.INSTANCE,
Expand Down