Skip to content

Commit 29d1d25

Browse files
Deduplicate RepositoryData on a Best Effort Basis (#67947)
Enhance transport request deduplicator to allow for more general deduplication. Make use of that to enable deduplicate RepositoryData under concurrent request load (which so far has been the only situation where RepositoryData has created unmanageable memory pressure).
1 parent 03d4ba5 commit 29d1d25

File tree

7 files changed

+53
-38
lines changed

7 files changed

+53
-38
lines changed
Lines changed: 25 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,8 @@
1717
* under the License.
1818
*/
1919

20-
package org.elasticsearch.transport;
20+
package org.elasticsearch.action;
2121

22-
import org.elasticsearch.action.ActionListener;
2322
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
2423

2524
import java.util.ArrayList;
@@ -28,9 +27,11 @@
2827
import java.util.function.BiConsumer;
2928

3029
/**
31-
* Deduplicator that keeps track of requests that should not be sent/executed in parallel.
30+
* Deduplicator for arbitrary keys and results that can be used to ensure a given action is only executed once at a time for a given
31+
* request.
32+
* @param <T> Request type
3233
*/
33-
public final class TransportRequestDeduplicator<T> {
34+
public final class ResultDeduplicator<T, R> {
3435

3536
private final ConcurrentMap<T, CompositeListener> requests = ConcurrentCollections.newConcurrentMap();
3637

@@ -44,8 +45,8 @@ public final class TransportRequestDeduplicator<T> {
4445
* @param listener Listener to invoke on request completion
4546
* @param callback Callback to be invoked with request and completion listener the first time the request is added to the deduplicator
4647
*/
47-
public void executeOnce(T request, ActionListener<Void> listener, BiConsumer<T, ActionListener<Void>> callback) {
48-
ActionListener<Void> completionListener = requests.computeIfAbsent(request, CompositeListener::new).addListener(listener);
48+
public void executeOnce(T request, ActionListener<R> listener, BiConsumer<T, ActionListener<R>> callback) {
49+
ActionListener<R> completionListener = requests.computeIfAbsent(request, CompositeListener::new).addListener(listener);
4950
if (completionListener != null) {
5051
callback.accept(request, completionListener);
5152
}
@@ -63,20 +64,21 @@ public int size() {
6364
return requests.size();
6465
}
6566

66-
private final class CompositeListener implements ActionListener<Void> {
67+
private final class CompositeListener implements ActionListener<R> {
6768

68-
private final List<ActionListener<Void>> listeners = new ArrayList<>();
69+
private final List<ActionListener<R>> listeners = new ArrayList<>();
6970

7071
private final T request;
7172

7273
private boolean isNotified;
7374
private Exception failure;
75+
private R response;
7476

7577
CompositeListener(T request) {
7678
this.request = request;
7779
}
7880

79-
CompositeListener addListener(ActionListener<Void> listener) {
81+
CompositeListener addListener(ActionListener<R> listener) {
8082
synchronized (this) {
8183
if (this.isNotified == false) {
8284
listeners.add(listener);
@@ -86,35 +88,35 @@ CompositeListener addListener(ActionListener<Void> listener) {
8688
if (failure != null) {
8789
listener.onFailure(failure);
8890
} else {
89-
listener.onResponse(null);
91+
listener.onResponse(response);
9092
}
9193
return null;
9294
}
9395

94-
private void onCompleted(Exception failure) {
96+
@Override
97+
public void onResponse(R response) {
9598
synchronized (this) {
96-
this.failure = failure;
99+
this.response = response;
97100
this.isNotified = true;
98101
}
99102
try {
100-
if (failure == null) {
101-
ActionListener.onResponse(listeners, null);
102-
} else {
103-
ActionListener.onFailure(listeners, failure);
104-
}
103+
ActionListener.onResponse(listeners, response);
105104
} finally {
106105
requests.remove(request);
107106
}
108107
}
109108

110-
@Override
111-
public void onResponse(final Void aVoid) {
112-
onCompleted(null);
113-
}
114-
115109
@Override
116110
public void onFailure(Exception failure) {
117-
onCompleted(failure);
111+
synchronized (this) {
112+
this.failure = failure;
113+
this.isNotified = true;
114+
}
115+
try {
116+
ActionListener.onFailure(listeners, failure);
117+
} finally {
118+
requests.remove(request);
119+
}
118120
}
119121
}
120122
}

server/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@
6464
import org.elasticsearch.transport.TransportChannel;
6565
import org.elasticsearch.transport.TransportException;
6666
import org.elasticsearch.transport.TransportRequest;
67-
import org.elasticsearch.transport.TransportRequestDeduplicator;
67+
import org.elasticsearch.action.ResultDeduplicator;
6868
import org.elasticsearch.transport.TransportRequestHandler;
6969
import org.elasticsearch.transport.TransportResponse;
7070
import org.elasticsearch.transport.TransportService;
@@ -93,7 +93,7 @@ public class ShardStateAction {
9393

9494
// a list of shards that failed during replication
9595
// we keep track of these shards in order to avoid sending duplicate failed shard requests for a single failing shard.
96-
private final TransportRequestDeduplicator<FailedShardEntry> remoteFailedShardsDeduplicator = new TransportRequestDeduplicator<>();
96+
private final ResultDeduplicator<FailedShardEntry, Void> remoteFailedShardsDeduplicator = new ResultDeduplicator<>();
9797

9898
@Inject
9999
public ShardStateAction(ClusterService clusterService, TransportService transportService,

server/src/main/java/org/elasticsearch/repositories/Repository.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,8 @@ default Repository create(RepositoryMetadata metadata, Function<String, Reposito
110110
* Returns a {@link RepositoryData} to describe the data in the repository, including the snapshots
111111
* and the indices across all snapshots found in the repository. Throws a {@link RepositoryException}
112112
* if there was an error in reading the data.
113+
* @param listener listener that may be resolved on different kinds of threads including transport and cluster state applier threads
114+
* and therefore must fork to a new thread for executing any long running actions
113115
*/
114116
void getRepositoryData(ActionListener<RepositoryData> listener);
115117

server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,7 @@
121121
import org.elasticsearch.snapshots.SnapshotMissingException;
122122
import org.elasticsearch.snapshots.SnapshotsService;
123123
import org.elasticsearch.threadpool.ThreadPool;
124+
import org.elasticsearch.action.ResultDeduplicator;
124125

125126
import java.io.FilterInputStream;
126127
import java.io.IOException;
@@ -1361,11 +1362,7 @@ public void getRepositoryData(ActionListener<RepositoryData> listener) {
13611362
// Fast path loading repository data directly from cache if we're in fully consistent mode and the cache matches up with
13621363
// the latest known repository generation
13631364
if (bestEffortConsistency == false && cached.generation() == latestKnownRepoGen.get() && cached.hasData()) {
1364-
try {
1365-
listener.onResponse(cached.repositoryData());
1366-
} catch (Exception e) {
1367-
listener.onFailure(e);
1368-
}
1365+
repoDataDeduplicator.executeOnce(metadata, listener, (metadata, l) -> l.onResponse(cached.repositoryData()));
13691366
return;
13701367
}
13711368
if (metadata.generation() == RepositoryData.UNKNOWN_REPO_GEN && isReadOnly() == false) {
@@ -1375,7 +1372,12 @@ public void getRepositoryData(ActionListener<RepositoryData> listener) {
13751372
} else {
13761373
logger.trace("[{}] loading un-cached repository data with best known repository generation [{}]", metadata.name(),
13771374
latestKnownRepoGen);
1378-
threadPool.generic().execute(ActionRunnable.wrap(listener, this::doGetRepositoryData));
1375+
if (bestEffortConsistency) {
1376+
threadPool.generic().execute(ActionRunnable.wrap(listener, this::doGetRepositoryData));
1377+
} else {
1378+
repoDataDeduplicator.executeOnce(metadata, listener, (metadata, l) ->
1379+
threadPool.generic().execute(ActionRunnable.wrap(l, this::doGetRepositoryData)));
1380+
}
13791381
}
13801382
}
13811383

@@ -1461,10 +1463,18 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
14611463
}
14621464
}
14631465

1466+
/**
1467+
* {@link RepositoryData} loading deduplicator. This may only be used with consistent generation repositories, meaning
1468+
* {@link #bestEffortConsistency} must be {@code false}, in which case we can assume that the {@link RepositoryData} loaded is
1469+
* unique for a given value of {@link #metadata} at any point in time.
1470+
*/
1471+
private final ResultDeduplicator<RepositoryMetadata, RepositoryData> repoDataDeduplicator = new ResultDeduplicator<>();
1472+
14641473
private void doGetRepositoryData(ActionListener<RepositoryData> listener) {
14651474
// Retry loading RepositoryData in a loop in case we run into concurrent modifications of the repository.
14661475
// Keep track of the most recent generation we failed to load so we can break out of the loop if we fail to load the same
14671476
// generation repeatedly.
1477+
14681478
long lastFailedGeneration = RepositoryData.UNKNOWN_REPO_GEN;
14691479
while (true) {
14701480
final long genToLoad;

server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@
5757
import org.elasticsearch.repositories.ShardGenerations;
5858
import org.elasticsearch.threadpool.ThreadPool;
5959
import org.elasticsearch.transport.TransportException;
60-
import org.elasticsearch.transport.TransportRequestDeduplicator;
60+
import org.elasticsearch.action.ResultDeduplicator;
6161
import org.elasticsearch.transport.TransportResponseHandler;
6262
import org.elasticsearch.transport.TransportService;
6363

@@ -91,8 +91,8 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements
9191
private final Map<Snapshot, Map<ShardId, IndexShardSnapshotStatus>> shardSnapshots = new HashMap<>();
9292

9393
// A map of snapshots to the shardIds that we already reported to the master as failed
94-
private final TransportRequestDeduplicator<UpdateIndexShardSnapshotStatusRequest> remoteFailedRequestDeduplicator =
95-
new TransportRequestDeduplicator<>();
94+
private final ResultDeduplicator<UpdateIndexShardSnapshotStatusRequest, Void> remoteFailedRequestDeduplicator =
95+
new ResultDeduplicator<>();
9696

9797
public SnapshotShardsService(Settings settings, ClusterService clusterService, RepositoriesService repositoriesService,
9898
TransportService transportService, IndicesService indicesService) {

server/src/main/java/org/elasticsearch/tasks/TaskCancellationService.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@
3636
import org.elasticsearch.transport.TransportChannel;
3737
import org.elasticsearch.transport.TransportException;
3838
import org.elasticsearch.transport.TransportRequest;
39-
import org.elasticsearch.transport.TransportRequestDeduplicator;
39+
import org.elasticsearch.action.ResultDeduplicator;
4040
import org.elasticsearch.transport.TransportRequestHandler;
4141
import org.elasticsearch.transport.TransportRequestOptions;
4242
import org.elasticsearch.transport.TransportResponse;
@@ -52,7 +52,7 @@ public class TaskCancellationService {
5252
private static final Logger logger = LogManager.getLogger(TaskCancellationService.class);
5353
private final TransportService transportService;
5454
private final TaskManager taskManager;
55-
private final TransportRequestDeduplicator<CancelRequest> deduplicator = new TransportRequestDeduplicator<>();
55+
private final ResultDeduplicator<CancelRequest, Void> deduplicator = new ResultDeduplicator<>();
5656

5757
public TaskCancellationService(TransportService transportService) {
5858
this.transportService = transportService;
Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import org.apache.lucene.util.SetOnce;
2222
import org.elasticsearch.action.ActionListener;
23+
import org.elasticsearch.action.ResultDeduplicator;
2324
import org.elasticsearch.tasks.TaskId;
2425
import org.elasticsearch.test.ESTestCase;
2526

@@ -29,7 +30,7 @@
2930
import static org.hamcrest.CoreMatchers.equalTo;
3031
import static org.hamcrest.CoreMatchers.sameInstance;
3132

32-
public class TransportRequestDeduplicatorTests extends ESTestCase {
33+
public class ResultDeduplicatorTests extends ESTestCase {
3334

3435
public void testRequestDeduplication() throws Exception {
3536
AtomicInteger successCount = new AtomicInteger();
@@ -40,7 +41,7 @@ public void testRequestDeduplication() throws Exception {
4041
public void setParentTask(final TaskId taskId) {
4142
}
4243
};
43-
final TransportRequestDeduplicator<TransportRequest> deduplicator = new TransportRequestDeduplicator<>();
44+
final ResultDeduplicator<TransportRequest, Void> deduplicator = new ResultDeduplicator<>();
4445
final SetOnce<ActionListener<Void>> listenerHolder = new SetOnce<>();
4546
int iterationsPerThread = scaledRandomIntBetween(100, 1000);
4647
Thread[] threads = new Thread[between(1, 4)];

0 commit comments

Comments
 (0)