From dc6bb3fb22f733b6f297d074dc273982ca0abe6a Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Thu, 28 Jan 2021 06:58:55 +0100 Subject: [PATCH] Fix SnapshotStatus Transport Action Doing IO on Transport Thread (#68023) There is a small chance here that #67947 would cause the callback for the repository data to run on a transport or CS updater thread and do a lot of IO to fetch `SnapshotInfo`. Fixed by always forking to the generic pool for the callback. Added test that triggers lots of deserializing repository data from cache on the transport thread concurrently which triggers this bug relatively reliable (more than half the runs) but is still reasonably fast (under 5s). --- .../snapshots/SnapshotStatusApisIT.java | 34 +++++++++++++++++++ .../TransportSnapshotsStatusAction.java | 8 ++--- 2 files changed, 38 insertions(+), 4 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/SnapshotStatusApisIT.java b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/SnapshotStatusApisIT.java index a73795cf676b7..27b35dfc20792 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/SnapshotStatusApisIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/SnapshotStatusApisIT.java @@ -49,10 +49,12 @@ import java.util.stream.Collectors; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.hamcrest.Matchers.allOf; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.not; public class SnapshotStatusApisIT extends AbstractSnapshotIntegTestCase { @@ -417,6 +419,38 @@ public void testGetSnapshotsRequest() throws Exception { awaitNoMoreRunningOperations(); } + public void testConcurrentCreateAndStatusAPICalls() throws Exception { + for (int i = 0; i < randomIntBetween(1, 10); i++) { + createIndexWithContent("test-idx-" + i); + } + final String repoName = "test-repo"; + createRepository(repoName, "fs"); + final int snapshots = randomIntBetween(10, 20); + final List> statuses = new ArrayList<>(snapshots); + final List> gets = new ArrayList<>(snapshots); + final Client dataNodeClient = dataNodeClient(); + final String[] snapshotNames = createNSnapshots(repoName, snapshots).toArray(Strings.EMPTY_ARRAY); + + for (int i = 0; i < snapshots; i++) { + statuses.add(dataNodeClient.admin().cluster().prepareSnapshotStatus(repoName).setSnapshots(snapshotNames).execute()); + gets.add(dataNodeClient.admin().cluster().prepareGetSnapshots(repoName).setSnapshots(snapshotNames).execute()); + } + + for (ActionFuture status : statuses) { + assertThat(status.get().getSnapshots(), hasSize(snapshots)); + for (SnapshotStatus snapshot : status.get().getSnapshots()) { + assertThat(snapshot.getState(), allOf(not(SnapshotsInProgress.State.FAILED), not(SnapshotsInProgress.State.ABORTED))); + } + } + for (ActionFuture get : gets) { + final List snapshotInfos = get.get().getSnapshots(); + assertThat(snapshotInfos, hasSize(snapshots)); + for (SnapshotInfo snapshotInfo : snapshotInfos) { + assertThat(snapshotInfo.state(), is(SnapshotState.SUCCESS)); + } + } + } + private static SnapshotIndexShardStatus stateFirstShard(SnapshotStatus snapshotStatus, String indexName) { return snapshotStatus.getIndices().get(indexName).getShards().get(0); } diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/TransportSnapshotsStatusAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/TransportSnapshotsStatusAction.java index 00654979a5a81..154db4ddf434e 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/TransportSnapshotsStatusAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/TransportSnapshotsStatusAction.java @@ -25,7 +25,6 @@ import org.apache.logging.log4j.Logger; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRunnable; -import org.elasticsearch.action.StepListener; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.master.TransportMasterNodeAction; import org.elasticsearch.cluster.ClusterState; @@ -38,6 +37,7 @@ import org.elasticsearch.common.Strings; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.util.CollectionUtils; +import org.elasticsearch.common.util.concurrent.ListenableFuture; import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus; @@ -237,9 +237,9 @@ private void loadRepositoryData(SnapshotsInProgress snapshotsInProgress, Snapsho List builder, Set currentSnapshotNames, String repositoryName, ActionListener listener) { final Set requestedSnapshotNames = Sets.newHashSet(request.snapshots()); - final StepListener repositoryDataListener = new StepListener<>(); + final ListenableFuture repositoryDataListener = new ListenableFuture<>(); repositoriesService.getRepositoryData(repositoryName, repositoryDataListener); - repositoryDataListener.whenComplete(repositoryData -> { + repositoryDataListener.addListener(ActionListener.wrap(repositoryData -> { final Map matchedSnapshotIds = repositoryData.getSnapshotIds().stream() .filter(s -> requestedSnapshotNames.contains(s.getName())) .collect(Collectors.toMap(SnapshotId::getName, Function.identity())); @@ -294,7 +294,7 @@ private void loadRepositoryData(SnapshotsInProgress snapshotsInProgress, Snapsho } } listener.onResponse(new SnapshotsStatusResponse(Collections.unmodifiableList(builder))); - }, listener::onFailure); + }, listener::onFailure), threadPool.generic()); } /**