From a8e758f9058a191c321e3faa436a155fecc079ad Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Mon, 27 Dec 2021 12:37:31 +0100 Subject: [PATCH 1/3] Deduplicate Shard Started Requests Deduplicate shard started requests the same way we deduplicate shard-failed and shard snapshot state updates already. closes #81628 --- .../action/shard/ShardStateAction.java | 30 ++++++++++++++----- 1 file changed, 22 insertions(+), 8 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java b/server/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java index 3af4c29455a90..7278f026ccb7f 100644 --- a/server/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java +++ b/server/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java @@ -81,7 +81,7 @@ public class ShardStateAction { // a list of shards that failed during replication // we keep track of these shards in order to avoid sending duplicate failed shard requests for a single failing shard. - private final ResultDeduplicator remoteFailedShardsDeduplicator = new ResultDeduplicator<>(); + private final ResultDeduplicator remoteFailedShardsDeduplicator = new ResultDeduplicator<>(); @Inject public ShardStateAction( @@ -588,14 +588,11 @@ public void shardStarted( final ActionListener listener, final ClusterState currentState ) { - final StartedShardEntry entry = new StartedShardEntry( - shardRouting.shardId(), - shardRouting.allocationId().getId(), - primaryTerm, - message, - timestampRange + remoteFailedShardsDeduplicator.executeOnce( + new StartedShardEntry(shardRouting.shardId(), shardRouting.allocationId().getId(), primaryTerm, message, timestampRange), + listener, + (req, l) -> sendShardAction(SHARD_STARTED_ACTION_NAME, currentState, req, l) ); - sendShardAction(SHARD_STARTED_ACTION_NAME, currentState, entry, listener); } private static class ShardStartedTransportHandler implements TransportRequestHandler { @@ -842,6 +839,23 @@ public String toString() { message ); } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + StartedShardEntry that = (StartedShardEntry) o; + return primaryTerm == that.primaryTerm + && shardId.equals(that.shardId) + && allocationId.equals(that.allocationId) + && message.equals(that.message) + && timestampRange.equals(that.timestampRange); + } + + @Override + public int hashCode() { + return Objects.hash(shardId, allocationId, primaryTerm, message, timestampRange); + } } public static class NoLongerPrimaryShardException extends ElasticsearchException { From ca40e0c15ad429a94ff30767d6e67294d9a6b479 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Tue, 28 Dec 2021 16:25:10 +0100 Subject: [PATCH 2/3] trivial test and dedup cleanup on failover --- .../action/shard/ShardStateAction.java | 22 +++++++++----- .../cluster/IndicesClusterStateService.java | 7 +++++ .../action/shard/ShardStateActionTests.java | 29 ++++++++++++++++++- 3 files changed, 50 insertions(+), 8 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java b/server/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java index 7278f026ccb7f..b54ee301dba90 100644 --- a/server/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java +++ b/server/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java @@ -79,9 +79,8 @@ public class ShardStateAction { private final ClusterService clusterService; private final ThreadPool threadPool; - // a list of shards that failed during replication - // we keep track of these shards in order to avoid sending duplicate failed shard requests for a single failing shard. - private final ResultDeduplicator remoteFailedShardsDeduplicator = new ResultDeduplicator<>(); + // we deduplicate these shard state requests in order to avoid sending duplicate failed/started shard requests for a shard + private final ResultDeduplicator remoteShardStateUpdateDeduplicator = new ResultDeduplicator<>(); @Inject public ShardStateAction( @@ -196,15 +195,24 @@ public void remoteShardFailed( ActionListener listener ) { assert primaryTerm > 0L : "primary term should be strictly positive"; - remoteFailedShardsDeduplicator.executeOnce( + remoteShardStateUpdateDeduplicator.executeOnce( new FailedShardEntry(shardId, allocationId, primaryTerm, message, failure, markAsStale), listener, (req, reqListener) -> sendShardAction(SHARD_FAILED_ACTION_NAME, clusterService.state(), req, reqListener) ); } - int remoteShardFailedCacheSize() { - return remoteFailedShardsDeduplicator.size(); + int remoteShardRequestsInFlight() { + return remoteShardStateUpdateDeduplicator.size(); + } + + /** + * Clears out {@link #remoteShardStateUpdateDeduplicator}. Called by + * {@link org.elasticsearch.indices.cluster.IndicesClusterStateService} in case of a master failover to enable sending fresh requests + * to the new master right away on master failover. + */ + public void clearRemoteShardRequestDeduplicator() { + remoteShardStateUpdateDeduplicator.clear(); } /** @@ -588,7 +596,7 @@ public void shardStarted( final ActionListener listener, final ClusterState currentState ) { - remoteFailedShardsDeduplicator.executeOnce( + remoteShardStateUpdateDeduplicator.executeOnce( new StartedShardEntry(shardRouting.shardId(), shardRouting.allocationId().getId(), primaryTerm, message, timestampRange), listener, (req, l) -> sendShardAction(SHARD_STARTED_ACTION_NAME, currentState, req, l) diff --git a/server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java b/server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java index 062dabd4249c8..771e4278cba1a 100644 --- a/server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java +++ b/server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java @@ -197,6 +197,13 @@ public synchronized void applyClusterState(final ClusterChangedEvent event) { final ClusterState state = event.state(); + final DiscoveryNode currentMaster = state.nodes().getMasterNode(); + if (currentMaster != null && currentMaster.equals(event.previousState().nodes().getMasterNode()) == false) { + // master node changed, clear request deduplicator so we send out new state update requests right away without waiting for + // the in-flight ones to fail first + shardStateAction.clearRemoteShardRequestDeduplicator(); + } + // we need to clean the shards and indices we have on this node, since we // are going to recover them again once state persistence is disabled (no master / not recovered) // TODO: feels hacky, a block disables state persistence, and then we clean the allocated shards, maybe another flag in blocks? diff --git a/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java b/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java index 4474a6308ec7f..584c54db2ece4 100644 --- a/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java @@ -149,7 +149,7 @@ public void tearDown() throws Exception { clusterService.close(); transportService.close(); super.tearDown(); - assertThat(shardStateAction.remoteShardFailedCacheSize(), equalTo(0)); + assertThat(shardStateAction.remoteShardRequestsInFlight(), equalTo(0)); } @AfterClass @@ -382,6 +382,33 @@ public void onFailure(Exception e) { assertThat(transport.capturedRequests(), arrayWithSize(0)); } + public void testDeduplicateRemoteShardStarted() throws InterruptedException { + final String index = "test"; + setState(clusterService, ClusterStateCreationUtils.stateWithActivePrimary(index, true, randomInt(5))); + ShardRouting startedShard = getRandomShardRouting(index); + int numListeners = between(1, 100); + CountDownLatch latch = new CountDownLatch(numListeners); + long primaryTerm = randomLongBetween(1, Long.MAX_VALUE); + for (int i = 0; i < numListeners; i++) { + shardStateAction.shardStarted(startedShard, primaryTerm, "started", ShardLongFieldRange.EMPTY, new ActionListener<>() { + @Override + public void onResponse(Void aVoid) { + latch.countDown(); + } + + @Override + public void onFailure(Exception e) { + latch.countDown(); + } + }); + } + CapturingTransport.CapturedRequest[] capturedRequests = transport.getCapturedRequestsAndClear(); + assertThat(capturedRequests, arrayWithSize(1)); + transport.handleResponse(capturedRequests[0].requestId, TransportResponse.Empty.INSTANCE); + latch.await(); + assertThat(transport.capturedRequests(), arrayWithSize(0)); + } + public void testRemoteShardFailedConcurrently() throws Exception { final String index = "test"; final AtomicBoolean shutdown = new AtomicBoolean(false); From fc1b76845f2818ae1a498529439df05118f36a9d Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Tue, 28 Dec 2021 18:05:09 +0100 Subject: [PATCH 3/3] udpate comment + enhance test --- .../cluster/action/shard/ShardStateAction.java | 2 ++ .../cluster/action/shard/ShardStateActionTests.java | 11 +++++++++-- 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java b/server/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java index b54ee301dba90..6a1b4a70bec9d 100644 --- a/server/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java +++ b/server/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java @@ -210,6 +210,8 @@ int remoteShardRequestsInFlight() { * Clears out {@link #remoteShardStateUpdateDeduplicator}. Called by * {@link org.elasticsearch.indices.cluster.IndicesClusterStateService} in case of a master failover to enable sending fresh requests * to the new master right away on master failover. + * This method is best effort in so far that it might clear out valid requests in edge cases during master failover. This is not an + * issue functionally and merely results in some unnecessary transport requests. */ public void clearRemoteShardRequestDeduplicator() { remoteShardStateUpdateDeduplicator.clear(); diff --git a/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java b/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java index 584c54db2ece4..2d987bef32f1d 100644 --- a/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java @@ -389,7 +389,12 @@ public void testDeduplicateRemoteShardStarted() throws InterruptedException { int numListeners = between(1, 100); CountDownLatch latch = new CountDownLatch(numListeners); long primaryTerm = randomLongBetween(1, Long.MAX_VALUE); + int expectedRequests = 1; for (int i = 0; i < numListeners; i++) { + if (rarely() && i > 0) { + expectedRequests++; + shardStateAction.clearRemoteShardRequestDeduplicator(); + } shardStateAction.shardStarted(startedShard, primaryTerm, "started", ShardLongFieldRange.EMPTY, new ActionListener<>() { @Override public void onResponse(Void aVoid) { @@ -403,8 +408,10 @@ public void onFailure(Exception e) { }); } CapturingTransport.CapturedRequest[] capturedRequests = transport.getCapturedRequestsAndClear(); - assertThat(capturedRequests, arrayWithSize(1)); - transport.handleResponse(capturedRequests[0].requestId, TransportResponse.Empty.INSTANCE); + assertThat(capturedRequests, arrayWithSize(expectedRequests)); + for (int i = 0; i < expectedRequests; i++) { + transport.handleResponse(capturedRequests[i].requestId, TransportResponse.Empty.INSTANCE); + } latch.await(); assertThat(transport.capturedRequests(), arrayWithSize(0)); }