Skip to content

Commit f5c64af

Browse files
Fix SnapshotStatus Transport Action Doing IO on Transport Thread (#68023)
There is a small chance here that #67947 would cause the callback for the repository data to run on a transport or CS updater thread and do a lot of IO to fetch `SnapshotInfo`. Fixed by always forking to the generic pool for the callback. Added test that triggers lots of deserializing repository data from cache on the transport thread concurrently which triggers this bug relatively reliable (more than half the runs) but is still reasonably fast (under 5s).
1 parent c91a808 commit f5c64af

File tree

2 files changed

+38
-4
lines changed

2 files changed

+38
-4
lines changed

server/src/internalClusterTest/java/org/elasticsearch/snapshots/SnapshotStatusApisIT.java

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,11 +51,13 @@
5151
import java.util.stream.Collectors;
5252

5353
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
54+
import static org.hamcrest.Matchers.allOf;
5455
import static org.hamcrest.Matchers.equalTo;
5556
import static org.hamcrest.Matchers.greaterThan;
5657
import static org.hamcrest.Matchers.hasSize;
5758
import static org.hamcrest.Matchers.instanceOf;
5859
import static org.hamcrest.Matchers.is;
60+
import static org.hamcrest.Matchers.not;
5961

6062
public class SnapshotStatusApisIT extends AbstractSnapshotIntegTestCase {
6163

@@ -514,6 +516,38 @@ public void testGetSnapshotsRequest() throws Exception {
514516
awaitNoMoreRunningOperations();
515517
}
516518

519+
public void testConcurrentCreateAndStatusAPICalls() throws Exception {
520+
for (int i = 0; i < randomIntBetween(1, 10); i++) {
521+
createIndexWithContent("test-idx-" + i);
522+
}
523+
final String repoName = "test-repo";
524+
createRepository(repoName, "fs");
525+
final int snapshots = randomIntBetween(10, 20);
526+
final List<ActionFuture<SnapshotsStatusResponse>> statuses = new ArrayList<>(snapshots);
527+
final List<ActionFuture<GetSnapshotsResponse>> gets = new ArrayList<>(snapshots);
528+
final Client dataNodeClient = dataNodeClient();
529+
final String[] snapshotNames = createNSnapshots(repoName, snapshots).toArray(Strings.EMPTY_ARRAY);
530+
531+
for (int i = 0; i < snapshots; i++) {
532+
statuses.add(dataNodeClient.admin().cluster().prepareSnapshotStatus(repoName).setSnapshots(snapshotNames).execute());
533+
gets.add(dataNodeClient.admin().cluster().prepareGetSnapshots(repoName).setSnapshots(snapshotNames).execute());
534+
}
535+
536+
for (ActionFuture<SnapshotsStatusResponse> status : statuses) {
537+
assertThat(status.get().getSnapshots(), hasSize(snapshots));
538+
for (SnapshotStatus snapshot : status.get().getSnapshots()) {
539+
assertThat(snapshot.getState(), allOf(not(SnapshotsInProgress.State.FAILED), not(SnapshotsInProgress.State.ABORTED)));
540+
}
541+
}
542+
for (ActionFuture<GetSnapshotsResponse> get : gets) {
543+
final List<SnapshotInfo> snapshotInfos = get.get().getSnapshots(repoName);
544+
assertThat(snapshotInfos, hasSize(snapshots));
545+
for (SnapshotInfo snapshotInfo : snapshotInfos) {
546+
assertThat(snapshotInfo.state(), is(SnapshotState.SUCCESS));
547+
}
548+
}
549+
}
550+
517551
private static SnapshotIndexShardStatus stateFirstShard(SnapshotStatus snapshotStatus, String indexName) {
518552
return snapshotStatus.getIndices().get(indexName).getShards().get(0);
519553
}

server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/TransportSnapshotsStatusAction.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
import org.apache.logging.log4j.Logger;
2626
import org.elasticsearch.action.ActionListener;
2727
import org.elasticsearch.action.ActionRunnable;
28-
import org.elasticsearch.action.StepListener;
2928
import org.elasticsearch.action.support.ActionFilters;
3029
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
3130
import org.elasticsearch.client.node.NodeClient;
@@ -39,6 +38,7 @@
3938
import org.elasticsearch.common.Strings;
4039
import org.elasticsearch.common.inject.Inject;
4140
import org.elasticsearch.common.util.CollectionUtils;
41+
import org.elasticsearch.common.util.concurrent.ListenableFuture;
4242
import org.elasticsearch.common.util.set.Sets;
4343
import org.elasticsearch.index.shard.ShardId;
4444
import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus;
@@ -238,9 +238,9 @@ private void loadRepositoryData(SnapshotsInProgress snapshotsInProgress, Snapsho
238238
List<SnapshotStatus> builder, Set<String> currentSnapshotNames, String repositoryName,
239239
ActionListener<SnapshotsStatusResponse> listener) {
240240
final Set<String> requestedSnapshotNames = Sets.newHashSet(request.snapshots());
241-
final StepListener<RepositoryData> repositoryDataListener = new StepListener<>();
241+
final ListenableFuture<RepositoryData> repositoryDataListener = new ListenableFuture<>();
242242
repositoriesService.getRepositoryData(repositoryName, repositoryDataListener);
243-
repositoryDataListener.whenComplete(repositoryData -> {
243+
repositoryDataListener.addListener(ActionListener.wrap(repositoryData -> {
244244
final Map<String, SnapshotId> matchedSnapshotIds = repositoryData.getSnapshotIds().stream()
245245
.filter(s -> requestedSnapshotNames.contains(s.getName()))
246246
.collect(Collectors.toMap(SnapshotId::getName, Function.identity()));
@@ -295,7 +295,7 @@ private void loadRepositoryData(SnapshotsInProgress snapshotsInProgress, Snapsho
295295
}
296296
}
297297
listener.onResponse(new SnapshotsStatusResponse(Collections.unmodifiableList(builder)));
298-
}, listener::onFailure);
298+
}, listener::onFailure), threadPool.generic());
299299
}
300300

301301
/**

0 commit comments

Comments
 (0)