Skip to content

Commit 5520bb9

Browse files
Deduplicate RepositoryData on a Best Effort Basis (#67947) (#68015)
Enhance transport request deduplicator to allow for more general deduplication. Make use of that to enable deduplicate RepositoryData under concurrent request load (which so far has been the only situation where RepositoryData has created unmanageable memory pressure).
1 parent 0c78b0d commit 5520bb9

File tree

7 files changed

+53
-38
lines changed

7 files changed

+53
-38
lines changed
Lines changed: 25 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,8 @@
1717
* under the License.
1818
*/
1919

20-
package org.elasticsearch.transport;
20+
package org.elasticsearch.action;
2121

22-
import org.elasticsearch.action.ActionListener;
2322
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
2423

2524
import java.util.ArrayList;
@@ -28,9 +27,11 @@
2827
import java.util.function.BiConsumer;
2928

3029
/**
31-
* Deduplicator that keeps track of requests that should not be sent/executed in parallel.
30+
* Deduplicator for arbitrary keys and results that can be used to ensure a given action is only executed once at a time for a given
31+
* request.
32+
* @param <T> Request type
3233
*/
33-
public final class TransportRequestDeduplicator<T> {
34+
public final class ResultDeduplicator<T, R> {
3435

3536
private final ConcurrentMap<T, CompositeListener> requests = ConcurrentCollections.newConcurrentMap();
3637

@@ -44,8 +45,8 @@ public final class TransportRequestDeduplicator<T> {
4445
* @param listener Listener to invoke on request completion
4546
* @param callback Callback to be invoked with request and completion listener the first time the request is added to the deduplicator
4647
*/
47-
public void executeOnce(T request, ActionListener<Void> listener, BiConsumer<T, ActionListener<Void>> callback) {
48-
ActionListener<Void> completionListener = requests.computeIfAbsent(request, CompositeListener::new).addListener(listener);
48+
public void executeOnce(T request, ActionListener<R> listener, BiConsumer<T, ActionListener<R>> callback) {
49+
ActionListener<R> completionListener = requests.computeIfAbsent(request, CompositeListener::new).addListener(listener);
4950
if (completionListener != null) {
5051
callback.accept(request, completionListener);
5152
}
@@ -63,20 +64,21 @@ public int size() {
6364
return requests.size();
6465
}
6566

66-
private final class CompositeListener implements ActionListener<Void> {
67+
private final class CompositeListener implements ActionListener<R> {
6768

68-
private final List<ActionListener<Void>> listeners = new ArrayList<>();
69+
private final List<ActionListener<R>> listeners = new ArrayList<>();
6970

7071
private final T request;
7172

7273
private boolean isNotified;
7374
private Exception failure;
75+
private R response;
7476

7577
CompositeListener(T request) {
7678
this.request = request;
7779
}
7880

79-
CompositeListener addListener(ActionListener<Void> listener) {
81+
CompositeListener addListener(ActionListener<R> listener) {
8082
synchronized (this) {
8183
if (this.isNotified == false) {
8284
listeners.add(listener);
@@ -86,35 +88,35 @@ CompositeListener addListener(ActionListener<Void> listener) {
8688
if (failure != null) {
8789
listener.onFailure(failure);
8890
} else {
89-
listener.onResponse(null);
91+
listener.onResponse(response);
9092
}
9193
return null;
9294
}
9395

94-
private void onCompleted(Exception failure) {
96+
@Override
97+
public void onResponse(R response) {
9598
synchronized (this) {
96-
this.failure = failure;
99+
this.response = response;
97100
this.isNotified = true;
98101
}
99102
try {
100-
if (failure == null) {
101-
ActionListener.onResponse(listeners, null);
102-
} else {
103-
ActionListener.onFailure(listeners, failure);
104-
}
103+
ActionListener.onResponse(listeners, response);
105104
} finally {
106105
requests.remove(request);
107106
}
108107
}
109108

110-
@Override
111-
public void onResponse(final Void aVoid) {
112-
onCompleted(null);
113-
}
114-
115109
@Override
116110
public void onFailure(Exception failure) {
117-
onCompleted(failure);
111+
synchronized (this) {
112+
this.failure = failure;
113+
this.isNotified = true;
114+
}
115+
try {
116+
ActionListener.onFailure(listeners, failure);
117+
} finally {
118+
requests.remove(request);
119+
}
118120
}
119121
}
120122
}

server/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@
6666
import org.elasticsearch.transport.TransportChannel;
6767
import org.elasticsearch.transport.TransportException;
6868
import org.elasticsearch.transport.TransportRequest;
69-
import org.elasticsearch.transport.TransportRequestDeduplicator;
69+
import org.elasticsearch.action.ResultDeduplicator;
7070
import org.elasticsearch.transport.TransportRequestHandler;
7171
import org.elasticsearch.transport.TransportResponse;
7272
import org.elasticsearch.transport.TransportService;
@@ -122,7 +122,7 @@ private static Priority parseReroutePriority(String priorityString) {
122122

123123
// a list of shards that failed during replication
124124
// we keep track of these shards in order to avoid sending duplicate failed shard requests for a single failing shard.
125-
private final TransportRequestDeduplicator<FailedShardEntry> remoteFailedShardsDeduplicator = new TransportRequestDeduplicator<>();
125+
private final ResultDeduplicator<FailedShardEntry, Void> remoteFailedShardsDeduplicator = new ResultDeduplicator<>();
126126

127127
@Inject
128128
public ShardStateAction(ClusterService clusterService, TransportService transportService,

server/src/main/java/org/elasticsearch/repositories/Repository.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,8 @@ default Repository create(RepositoryMetadata metadata, Function<String, Reposito
111111
* Returns a {@link RepositoryData} to describe the data in the repository, including the snapshots
112112
* and the indices across all snapshots found in the repository. Throws a {@link RepositoryException}
113113
* if there was an error in reading the data.
114+
* @param listener listener that may be resolved on different kinds of threads including transport and cluster state applier threads
115+
* and therefore must fork to a new thread for executing any long running actions
114116
*/
115117
void getRepositoryData(ActionListener<RepositoryData> listener);
116118

server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,7 @@
122122
import org.elasticsearch.snapshots.SnapshotMissingException;
123123
import org.elasticsearch.snapshots.SnapshotsService;
124124
import org.elasticsearch.threadpool.ThreadPool;
125+
import org.elasticsearch.action.ResultDeduplicator;
125126

126127
import java.io.FilterInputStream;
127128
import java.io.IOException;
@@ -1377,11 +1378,7 @@ public void getRepositoryData(ActionListener<RepositoryData> listener) {
13771378
// Fast path loading repository data directly from cache if we're in fully consistent mode and the cache matches up with
13781379
// the latest known repository generation
13791380
if (bestEffortConsistency == false && cached.generation() == latestKnownRepoGen.get() && cached.hasData()) {
1380-
try {
1381-
listener.onResponse(cached.repositoryData());
1382-
} catch (Exception e) {
1383-
listener.onFailure(e);
1384-
}
1381+
repoDataDeduplicator.executeOnce(metadata, listener, (metadata, l) -> l.onResponse(cached.repositoryData()));
13851382
return;
13861383
}
13871384
if (metadata.generation() == RepositoryData.UNKNOWN_REPO_GEN && isReadOnly() == false &&
@@ -1392,7 +1389,12 @@ public void getRepositoryData(ActionListener<RepositoryData> listener) {
13921389
} else {
13931390
logger.trace("[{}] loading un-cached repository data with best known repository generation [{}]", metadata.name(),
13941391
latestKnownRepoGen);
1395-
threadPool.generic().execute(ActionRunnable.wrap(listener, this::doGetRepositoryData));
1392+
if (bestEffortConsistency) {
1393+
threadPool.generic().execute(ActionRunnable.wrap(listener, this::doGetRepositoryData));
1394+
} else {
1395+
repoDataDeduplicator.executeOnce(metadata, listener, (metadata, l) ->
1396+
threadPool.generic().execute(ActionRunnable.wrap(l, this::doGetRepositoryData)));
1397+
}
13961398
}
13971399
}
13981400

@@ -1478,10 +1480,18 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
14781480
}
14791481
}
14801482

1483+
/**
1484+
* {@link RepositoryData} loading deduplicator. This may only be used with consistent generation repositories, meaning
1485+
* {@link #bestEffortConsistency} must be {@code false}, in which case we can assume that the {@link RepositoryData} loaded is
1486+
* unique for a given value of {@link #metadata} at any point in time.
1487+
*/
1488+
private final ResultDeduplicator<RepositoryMetadata, RepositoryData> repoDataDeduplicator = new ResultDeduplicator<>();
1489+
14811490
private void doGetRepositoryData(ActionListener<RepositoryData> listener) {
14821491
// Retry loading RepositoryData in a loop in case we run into concurrent modifications of the repository.
14831492
// Keep track of the most recent generation we failed to load so we can break out of the loop if we fail to load the same
14841493
// generation repeatedly.
1494+
14851495
long lastFailedGeneration = RepositoryData.UNKNOWN_REPO_GEN;
14861496
while (true) {
14871497
final long genToLoad;

server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@
5757
import org.elasticsearch.repositories.ShardGenerations;
5858
import org.elasticsearch.threadpool.ThreadPool;
5959
import org.elasticsearch.transport.TransportException;
60-
import org.elasticsearch.transport.TransportRequestDeduplicator;
60+
import org.elasticsearch.action.ResultDeduplicator;
6161
import org.elasticsearch.transport.TransportResponseHandler;
6262
import org.elasticsearch.transport.TransportService;
6363

@@ -91,8 +91,8 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements
9191
private final Map<Snapshot, Map<ShardId, IndexShardSnapshotStatus>> shardSnapshots = new HashMap<>();
9292

9393
// A map of snapshots to the shardIds that we already reported to the master as failed
94-
private final TransportRequestDeduplicator<UpdateIndexShardSnapshotStatusRequest> remoteFailedRequestDeduplicator =
95-
new TransportRequestDeduplicator<>();
94+
private final ResultDeduplicator<UpdateIndexShardSnapshotStatusRequest, Void> remoteFailedRequestDeduplicator =
95+
new ResultDeduplicator<>();
9696

9797
public SnapshotShardsService(Settings settings, ClusterService clusterService, RepositoriesService repositoriesService,
9898
TransportService transportService, IndicesService indicesService) {

server/src/main/java/org/elasticsearch/tasks/TaskCancellationService.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@
3636
import org.elasticsearch.transport.TransportChannel;
3737
import org.elasticsearch.transport.TransportException;
3838
import org.elasticsearch.transport.TransportRequest;
39-
import org.elasticsearch.transport.TransportRequestDeduplicator;
39+
import org.elasticsearch.action.ResultDeduplicator;
4040
import org.elasticsearch.transport.TransportRequestHandler;
4141
import org.elasticsearch.transport.TransportRequestOptions;
4242
import org.elasticsearch.transport.TransportResponse;
@@ -52,7 +52,7 @@ public class TaskCancellationService {
5252
private static final Logger logger = LogManager.getLogger(TaskCancellationService.class);
5353
private final TransportService transportService;
5454
private final TaskManager taskManager;
55-
private final TransportRequestDeduplicator<CancelRequest> deduplicator = new TransportRequestDeduplicator<>();
55+
private final ResultDeduplicator<CancelRequest, Void> deduplicator = new ResultDeduplicator<>();
5656

5757
public TaskCancellationService(TransportService transportService) {
5858
this.transportService = transportService;
Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import org.apache.lucene.util.SetOnce;
2222
import org.elasticsearch.action.ActionListener;
23+
import org.elasticsearch.action.ResultDeduplicator;
2324
import org.elasticsearch.tasks.TaskId;
2425
import org.elasticsearch.test.ESTestCase;
2526

@@ -29,7 +30,7 @@
2930
import static org.hamcrest.CoreMatchers.equalTo;
3031
import static org.hamcrest.CoreMatchers.sameInstance;
3132

32-
public class TransportRequestDeduplicatorTests extends ESTestCase {
33+
public class ResultDeduplicatorTests extends ESTestCase {
3334

3435
public void testRequestDeduplication() throws Exception {
3536
AtomicInteger successCount = new AtomicInteger();
@@ -40,7 +41,7 @@ public void testRequestDeduplication() throws Exception {
4041
public void setParentTask(final TaskId taskId) {
4142
}
4243
};
43-
final TransportRequestDeduplicator<TransportRequest> deduplicator = new TransportRequestDeduplicator<>();
44+
final ResultDeduplicator<TransportRequest, Void> deduplicator = new ResultDeduplicator<>();
4445
final SetOnce<ActionListener<Void>> listenerHolder = new SetOnce<>();
4546
int iterationsPerThread = scaledRandomIntBetween(100, 1000);
4647
Thread[] threads = new Thread[between(1, 4)];

0 commit comments

Comments
 (0)