diff --git a/server/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java b/server/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java index 3af4c29455a90..6a1b4a70bec9d 100644 --- a/server/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java +++ b/server/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java @@ -79,9 +79,8 @@ public class ShardStateAction { private final ClusterService clusterService; private final ThreadPool threadPool; - // a list of shards that failed during replication - // we keep track of these shards in order to avoid sending duplicate failed shard requests for a single failing shard. - private final ResultDeduplicator remoteFailedShardsDeduplicator = new ResultDeduplicator<>(); + // we deduplicate these shard state requests in order to avoid sending duplicate failed/started shard requests for a shard + private final ResultDeduplicator remoteShardStateUpdateDeduplicator = new ResultDeduplicator<>(); @Inject public ShardStateAction( @@ -196,15 +195,26 @@ public void remoteShardFailed( ActionListener listener ) { assert primaryTerm > 0L : "primary term should be strictly positive"; - remoteFailedShardsDeduplicator.executeOnce( + remoteShardStateUpdateDeduplicator.executeOnce( new FailedShardEntry(shardId, allocationId, primaryTerm, message, failure, markAsStale), listener, (req, reqListener) -> sendShardAction(SHARD_FAILED_ACTION_NAME, clusterService.state(), req, reqListener) ); } - int remoteShardFailedCacheSize() { - return remoteFailedShardsDeduplicator.size(); + int remoteShardRequestsInFlight() { + return remoteShardStateUpdateDeduplicator.size(); + } + + /** + * Clears out {@link #remoteShardStateUpdateDeduplicator}. Called by + * {@link org.elasticsearch.indices.cluster.IndicesClusterStateService} in case of a master failover to enable sending fresh requests + * to the new master right away on master failover. + * This method is best effort in so far that it might clear out valid requests in edge cases during master failover. This is not an + * issue functionally and merely results in some unnecessary transport requests. + */ + public void clearRemoteShardRequestDeduplicator() { + remoteShardStateUpdateDeduplicator.clear(); } /** @@ -588,14 +598,11 @@ public void shardStarted( final ActionListener listener, final ClusterState currentState ) { - final StartedShardEntry entry = new StartedShardEntry( - shardRouting.shardId(), - shardRouting.allocationId().getId(), - primaryTerm, - message, - timestampRange + remoteShardStateUpdateDeduplicator.executeOnce( + new StartedShardEntry(shardRouting.shardId(), shardRouting.allocationId().getId(), primaryTerm, message, timestampRange), + listener, + (req, l) -> sendShardAction(SHARD_STARTED_ACTION_NAME, currentState, req, l) ); - sendShardAction(SHARD_STARTED_ACTION_NAME, currentState, entry, listener); } private static class ShardStartedTransportHandler implements TransportRequestHandler { @@ -842,6 +849,23 @@ public String toString() { message ); } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + StartedShardEntry that = (StartedShardEntry) o; + return primaryTerm == that.primaryTerm + && shardId.equals(that.shardId) + && allocationId.equals(that.allocationId) + && message.equals(that.message) + && timestampRange.equals(that.timestampRange); + } + + @Override + public int hashCode() { + return Objects.hash(shardId, allocationId, primaryTerm, message, timestampRange); + } } public static class NoLongerPrimaryShardException extends ElasticsearchException { diff --git a/server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java b/server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java index 062dabd4249c8..771e4278cba1a 100644 --- a/server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java +++ b/server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java @@ -197,6 +197,13 @@ public synchronized void applyClusterState(final ClusterChangedEvent event) { final ClusterState state = event.state(); + final DiscoveryNode currentMaster = state.nodes().getMasterNode(); + if (currentMaster != null && currentMaster.equals(event.previousState().nodes().getMasterNode()) == false) { + // master node changed, clear request deduplicator so we send out new state update requests right away without waiting for + // the in-flight ones to fail first + shardStateAction.clearRemoteShardRequestDeduplicator(); + } + // we need to clean the shards and indices we have on this node, since we // are going to recover them again once state persistence is disabled (no master / not recovered) // TODO: feels hacky, a block disables state persistence, and then we clean the allocated shards, maybe another flag in blocks? diff --git a/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java b/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java index 4474a6308ec7f..2d987bef32f1d 100644 --- a/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java @@ -149,7 +149,7 @@ public void tearDown() throws Exception { clusterService.close(); transportService.close(); super.tearDown(); - assertThat(shardStateAction.remoteShardFailedCacheSize(), equalTo(0)); + assertThat(shardStateAction.remoteShardRequestsInFlight(), equalTo(0)); } @AfterClass @@ -382,6 +382,40 @@ public void onFailure(Exception e) { assertThat(transport.capturedRequests(), arrayWithSize(0)); } + public void testDeduplicateRemoteShardStarted() throws InterruptedException { + final String index = "test"; + setState(clusterService, ClusterStateCreationUtils.stateWithActivePrimary(index, true, randomInt(5))); + ShardRouting startedShard = getRandomShardRouting(index); + int numListeners = between(1, 100); + CountDownLatch latch = new CountDownLatch(numListeners); + long primaryTerm = randomLongBetween(1, Long.MAX_VALUE); + int expectedRequests = 1; + for (int i = 0; i < numListeners; i++) { + if (rarely() && i > 0) { + expectedRequests++; + shardStateAction.clearRemoteShardRequestDeduplicator(); + } + shardStateAction.shardStarted(startedShard, primaryTerm, "started", ShardLongFieldRange.EMPTY, new ActionListener<>() { + @Override + public void onResponse(Void aVoid) { + latch.countDown(); + } + + @Override + public void onFailure(Exception e) { + latch.countDown(); + } + }); + } + CapturingTransport.CapturedRequest[] capturedRequests = transport.getCapturedRequestsAndClear(); + assertThat(capturedRequests, arrayWithSize(expectedRequests)); + for (int i = 0; i < expectedRequests; i++) { + transport.handleResponse(capturedRequests[i].requestId, TransportResponse.Empty.INSTANCE); + } + latch.await(); + assertThat(transport.capturedRequests(), arrayWithSize(0)); + } + public void testRemoteShardFailedConcurrently() throws Exception { final String index = "test"; final AtomicBoolean shutdown = new AtomicBoolean(false);