Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,8 @@ public class ShardStateAction {
private final ClusterService clusterService;
private final ThreadPool threadPool;

// a list of shards that failed during replication
// we keep track of these shards in order to avoid sending duplicate failed shard requests for a single failing shard.
private final ResultDeduplicator<FailedShardEntry, Void> remoteFailedShardsDeduplicator = new ResultDeduplicator<>();
// we deduplicate these shard state requests in order to avoid sending duplicate failed/started shard requests for a shard
private final ResultDeduplicator<TransportRequest, Void> remoteShardStateUpdateDeduplicator = new ResultDeduplicator<>();

@Inject
public ShardStateAction(
Expand Down Expand Up @@ -196,15 +195,26 @@ public void remoteShardFailed(
ActionListener<Void> listener
) {
assert primaryTerm > 0L : "primary term should be strictly positive";
remoteFailedShardsDeduplicator.executeOnce(
remoteShardStateUpdateDeduplicator.executeOnce(
new FailedShardEntry(shardId, allocationId, primaryTerm, message, failure, markAsStale),
listener,
(req, reqListener) -> sendShardAction(SHARD_FAILED_ACTION_NAME, clusterService.state(), req, reqListener)
);
}

int remoteShardFailedCacheSize() {
return remoteFailedShardsDeduplicator.size();
int remoteShardRequestsInFlight() {
return remoteShardStateUpdateDeduplicator.size();
}

/**
* Clears out {@link #remoteShardStateUpdateDeduplicator}. Called by
* {@link org.elasticsearch.indices.cluster.IndicesClusterStateService} in case of a master failover to enable sending fresh requests
* to the new master right away on master failover.
* This method is best effort in so far that it might clear out valid requests in edge cases during master failover. This is not an
* issue functionally and merely results in some unnecessary transport requests.
*/
public void clearRemoteShardRequestDeduplicator() {
remoteShardStateUpdateDeduplicator.clear();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we call this from multiple threads, there is a bit of best-effort over this method, I think that is worth documenting.

For instance, this may clear out a remote shard failed request deduplication to the new master in edge cases. This does no real harm, since we still protect the master.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

++ added

}

/**
Expand Down Expand Up @@ -588,14 +598,11 @@ public void shardStarted(
final ActionListener<Void> listener,
final ClusterState currentState
) {
final StartedShardEntry entry = new StartedShardEntry(
shardRouting.shardId(),
shardRouting.allocationId().getId(),
primaryTerm,
message,
timestampRange
remoteShardStateUpdateDeduplicator.executeOnce(
new StartedShardEntry(shardRouting.shardId(), shardRouting.allocationId().getId(), primaryTerm, message, timestampRange),
listener,
(req, l) -> sendShardAction(SHARD_STARTED_ACTION_NAME, currentState, req, l)
);
sendShardAction(SHARD_STARTED_ACTION_NAME, currentState, entry, listener);
}

private static class ShardStartedTransportHandler implements TransportRequestHandler<StartedShardEntry> {
Expand Down Expand Up @@ -842,6 +849,23 @@ public String toString() {
message
);
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
StartedShardEntry that = (StartedShardEntry) o;
return primaryTerm == that.primaryTerm
&& shardId.equals(that.shardId)
&& allocationId.equals(that.allocationId)
&& message.equals(that.message)
&& timestampRange.equals(that.timestampRange);
}

@Override
public int hashCode() {
return Objects.hash(shardId, allocationId, primaryTerm, message, timestampRange);
}
}

public static class NoLongerPrimaryShardException extends ElasticsearchException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,13 @@ public synchronized void applyClusterState(final ClusterChangedEvent event) {

final ClusterState state = event.state();

final DiscoveryNode currentMaster = state.nodes().getMasterNode();
if (currentMaster != null && currentMaster.equals(event.previousState().nodes().getMasterNode()) == false) {
// master node changed, clear request deduplicator so we send out new state update requests right away without waiting for
// the in-flight ones to fail first
shardStateAction.clearRemoteShardRequestDeduplicator();
}

// we need to clean the shards and indices we have on this node, since we
// are going to recover them again once state persistence is disabled (no master / not recovered)
// TODO: feels hacky, a block disables state persistence, and then we clean the allocated shards, maybe another flag in blocks?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ public void tearDown() throws Exception {
clusterService.close();
transportService.close();
super.tearDown();
assertThat(shardStateAction.remoteShardFailedCacheSize(), equalTo(0));
assertThat(shardStateAction.remoteShardRequestsInFlight(), equalTo(0));
}

@AfterClass
Expand Down Expand Up @@ -382,6 +382,40 @@ public void onFailure(Exception e) {
assertThat(transport.capturedRequests(), arrayWithSize(0));
}

public void testDeduplicateRemoteShardStarted() throws InterruptedException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we either add a test or randomly clear the deduplicator here and then validate we see two requests at the end?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

++ done

final String index = "test";
setState(clusterService, ClusterStateCreationUtils.stateWithActivePrimary(index, true, randomInt(5)));
ShardRouting startedShard = getRandomShardRouting(index);
int numListeners = between(1, 100);
CountDownLatch latch = new CountDownLatch(numListeners);
long primaryTerm = randomLongBetween(1, Long.MAX_VALUE);
int expectedRequests = 1;
for (int i = 0; i < numListeners; i++) {
if (rarely() && i > 0) {
expectedRequests++;
shardStateAction.clearRemoteShardRequestDeduplicator();
}
shardStateAction.shardStarted(startedShard, primaryTerm, "started", ShardLongFieldRange.EMPTY, new ActionListener<>() {
@Override
public void onResponse(Void aVoid) {
latch.countDown();
}

@Override
public void onFailure(Exception e) {
latch.countDown();
}
});
}
CapturingTransport.CapturedRequest[] capturedRequests = transport.getCapturedRequestsAndClear();
assertThat(capturedRequests, arrayWithSize(expectedRequests));
for (int i = 0; i < expectedRequests; i++) {
transport.handleResponse(capturedRequests[i].requestId, TransportResponse.Empty.INSTANCE);
}
latch.await();
assertThat(transport.capturedRequests(), arrayWithSize(0));
}

public void testRemoteShardFailedConcurrently() throws Exception {
final String index = "test";
final AtomicBoolean shutdown = new AtomicBoolean(false);
Expand Down