Skip to content

Commit 4973887

Browse files
authored
Make primary-replica resync failures less lenient (#28534)
Today, failures from the primary-replica resync are ignored as the best effort to not mark shards as stale during the cluster restart. However this can be problematic if replicas failed to execute resync operations but just fine in the subsequent write operations. When this happens, replica will miss some operations from the new primary. There are some implications if the local checkpoint on replica can't advance because of the missing operations. 1. The global checkpoint won't advance - this causes both primary and replicas keep many index commits 2. Engine on replica won't flush periodically because uncommitted stats is calculated based on the local checkpoint 3. Replica can use a large number of bitsets to keep track operations seqno However we can prevent this issue but still reserve the best-effort by failing replicas which fail to execute resync operations but not mark them as stale. We have prepared to the required infrastructure in #28049 and #28054 for this change. Relates #24841
1 parent d9cc6b9 commit 4973887

File tree

4 files changed

+128
-30
lines changed

4 files changed

+128
-30
lines changed

server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.elasticsearch.cluster.action.shard.ShardStateAction;
3131
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
3232
import org.elasticsearch.cluster.node.DiscoveryNode;
33+
import org.elasticsearch.cluster.routing.ShardRouting;
3334
import org.elasticsearch.cluster.service.ClusterService;
3435
import org.elasticsearch.common.inject.Inject;
3536
import org.elasticsearch.common.settings.Settings;
@@ -45,6 +46,7 @@
4546
import org.elasticsearch.transport.TransportResponseHandler;
4647
import org.elasticsearch.transport.TransportService;
4748

49+
import java.util.function.Consumer;
4850
import java.util.function.Supplier;
4951

5052
public class TransportResyncReplicationAction extends TransportWriteAction<ResyncReplicationRequest,
@@ -83,8 +85,7 @@ protected ResyncReplicationResponse newResponseInstance() {
8385

8486
@Override
8587
protected ReplicationOperation.Replicas newReplicasProxy(long primaryTerm) {
86-
// We treat the resync as best-effort for now and don't mark unavailable shard copies as stale.
87-
return new ReplicasProxy(primaryTerm);
88+
return new ResyncActionReplicasProxy(primaryTerm);
8889
}
8990

9091
@Override
@@ -184,4 +185,22 @@ public void handleException(TransportException exp) {
184185
});
185186
}
186187

188+
/**
189+
* A proxy for primary-replica resync operations which are performed on replicas when a new primary is promoted.
190+
* Replica shards fail to execute resync operations will be failed but won't be marked as stale.
191+
* This avoids marking shards as stale during cluster restart but enforces primary-replica resync mandatory.
192+
*/
193+
class ResyncActionReplicasProxy extends ReplicasProxy {
194+
195+
ResyncActionReplicasProxy(long primaryTerm) {
196+
super(primaryTerm);
197+
}
198+
199+
@Override
200+
public void failShardIfNeeded(ShardRouting replica, String message, Exception exception, Runnable onSuccess,
201+
Consumer<Exception> onPrimaryDemoted, Consumer<Exception> onIgnoredFailure) {
202+
shardStateAction.remoteShardFailed(replica.shardId(), replica.allocationId().getId(), primaryTerm, false, message, exception,
203+
createShardActionListener(onSuccess, onPrimaryDemoted, onIgnoredFailure));
204+
}
205+
}
187206
}

server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1172,6 +1172,30 @@ public void markShardCopyAsStaleIfNeeded(ShardId shardId, String allocationId, R
11721172
// "alive" if it were to be marked as stale.
11731173
onSuccess.run();
11741174
}
1175+
1176+
protected final ShardStateAction.Listener createShardActionListener(final Runnable onSuccess,
1177+
final Consumer<Exception> onPrimaryDemoted,
1178+
final Consumer<Exception> onIgnoredFailure) {
1179+
return new ShardStateAction.Listener() {
1180+
@Override
1181+
public void onSuccess() {
1182+
onSuccess.run();
1183+
}
1184+
1185+
@Override
1186+
public void onFailure(Exception shardFailedError) {
1187+
if (shardFailedError instanceof ShardStateAction.NoLongerPrimaryShardException) {
1188+
onPrimaryDemoted.accept(shardFailedError);
1189+
} else {
1190+
// these can occur if the node is shutting down and are okay
1191+
// any other exception here is not expected and merits investigation
1192+
assert shardFailedError instanceof TransportException ||
1193+
shardFailedError instanceof NodeClosedException : shardFailedError;
1194+
onIgnoredFailure.accept(shardFailedError);
1195+
}
1196+
}
1197+
};
1198+
}
11751199
}
11761200

11771201
/**

server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java

Lines changed: 3 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -384,41 +384,16 @@ class WriteActionReplicasProxy extends ReplicasProxy {
384384
@Override
385385
public void failShardIfNeeded(ShardRouting replica, String message, Exception exception,
386386
Runnable onSuccess, Consumer<Exception> onPrimaryDemoted, Consumer<Exception> onIgnoredFailure) {
387-
388-
logger.warn((org.apache.logging.log4j.util.Supplier<?>)
389-
() -> new ParameterizedMessage("[{}] {}", replica.shardId(), message), exception);
387+
logger.warn(new ParameterizedMessage("[{}] {}", replica.shardId(), message), exception);
390388
shardStateAction.remoteShardFailed(replica.shardId(), replica.allocationId().getId(), primaryTerm, true, message, exception,
391-
createListener(onSuccess, onPrimaryDemoted, onIgnoredFailure));
389+
createShardActionListener(onSuccess, onPrimaryDemoted, onIgnoredFailure));
392390
}
393391

394392
@Override
395393
public void markShardCopyAsStaleIfNeeded(ShardId shardId, String allocationId, Runnable onSuccess,
396394
Consumer<Exception> onPrimaryDemoted, Consumer<Exception> onIgnoredFailure) {
397395
shardStateAction.remoteShardFailed(shardId, allocationId, primaryTerm, true, "mark copy as stale", null,
398-
createListener(onSuccess, onPrimaryDemoted, onIgnoredFailure));
399-
}
400-
401-
private ShardStateAction.Listener createListener(final Runnable onSuccess, final Consumer<Exception> onPrimaryDemoted,
402-
final Consumer<Exception> onIgnoredFailure) {
403-
return new ShardStateAction.Listener() {
404-
@Override
405-
public void onSuccess() {
406-
onSuccess.run();
407-
}
408-
409-
@Override
410-
public void onFailure(Exception shardFailedError) {
411-
if (shardFailedError instanceof ShardStateAction.NoLongerPrimaryShardException) {
412-
onPrimaryDemoted.accept(shardFailedError);
413-
} else {
414-
// these can occur if the node is shutting down and are okay
415-
// any other exception here is not expected and merits investigation
416-
assert shardFailedError instanceof TransportException ||
417-
shardFailedError instanceof NodeClosedException : shardFailedError;
418-
onIgnoredFailure.accept(shardFailedError);
419-
}
420-
}
421-
};
396+
createShardActionListener(onSuccess, onPrimaryDemoted, onIgnoredFailure));
422397
}
423398
}
424399
}

server/src/test/java/org/elasticsearch/cluster/routing/PrimaryAllocationIT.java

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import com.carrotsearch.hppc.cursors.IntObjectCursor;
2323
import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteRequestBuilder;
2424
import org.elasticsearch.action.admin.indices.shards.IndicesShardStoresResponse;
25+
import org.elasticsearch.action.index.IndexResponse;
2526
import org.elasticsearch.action.support.ActiveShardCount;
2627
import org.elasticsearch.cluster.ClusterState;
2728
import org.elasticsearch.cluster.metadata.IndexMetaData;
@@ -31,6 +32,10 @@
3132
import org.elasticsearch.common.settings.Settings;
3233
import org.elasticsearch.common.util.set.Sets;
3334
import org.elasticsearch.gateway.GatewayAllocator;
35+
import org.elasticsearch.index.shard.IndexShard;
36+
import org.elasticsearch.index.shard.IndexShardTestCase;
37+
import org.elasticsearch.index.shard.ShardId;
38+
import org.elasticsearch.indices.IndicesService;
3439
import org.elasticsearch.plugins.Plugin;
3540
import org.elasticsearch.test.ESIntegTestCase;
3641
import org.elasticsearch.test.InternalTestCluster;
@@ -43,15 +48,23 @@
4348
import java.util.Arrays;
4449
import java.util.Collection;
4550
import java.util.Collections;
51+
import java.util.HashSet;
4652
import java.util.List;
53+
import java.util.Set;
4754
import java.util.concurrent.ExecutionException;
55+
import java.util.concurrent.TimeUnit;
4856

57+
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS;
58+
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS;
4959
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
5060
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
5161
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
5262
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
5363
import static org.hamcrest.Matchers.empty;
5464
import static org.hamcrest.Matchers.equalTo;
65+
import static org.hamcrest.Matchers.hasSize;
66+
import static org.hamcrest.Matchers.isIn;
67+
import static org.hamcrest.Matchers.not;
5568

5669
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0)
5770
public class PrimaryAllocationIT extends ESIntegTestCase {
@@ -309,4 +322,71 @@ public void testForceAllocatePrimaryOnNoDecision() throws Exception {
309322
assertEquals(1, client().admin().cluster().prepareState().get().getState()
310323
.routingTable().index(indexName).shardsWithState(ShardRoutingState.STARTED).size());
311324
}
325+
326+
/**
327+
* This test asserts that replicas failed to execute resync operations will be failed but not marked as stale.
328+
*/
329+
public void testPrimaryReplicaResyncFailed() throws Exception {
330+
String master = internalCluster().startMasterOnlyNode(Settings.EMPTY);
331+
final int numberOfReplicas = between(2, 3);
332+
final String oldPrimary = internalCluster().startDataOnlyNode();
333+
assertAcked(
334+
prepareCreate("test", Settings.builder().put(indexSettings())
335+
.put(SETTING_NUMBER_OF_SHARDS, 1)
336+
.put(SETTING_NUMBER_OF_REPLICAS, numberOfReplicas)));
337+
final ShardId shardId = new ShardId(clusterService().state().metaData().index("test").getIndex(), 0);
338+
final Set<String> replicaNodes = new HashSet<>(internalCluster().startDataOnlyNodes(numberOfReplicas));
339+
ensureGreen();
340+
assertAcked(
341+
client(master).admin().cluster().prepareUpdateSettings()
342+
.setTransientSettings(Settings.builder().put("cluster.routing.allocation.enable", "none")).get());
343+
logger.info("--> Indexing with gap in seqno to ensure that some operations will be replayed in resync");
344+
long numDocs = scaledRandomIntBetween(5, 50);
345+
for (int i = 0; i < numDocs; i++) {
346+
IndexResponse indexResult = index("test", "doc", Long.toString(i));
347+
assertThat(indexResult.getShardInfo().getSuccessful(), equalTo(numberOfReplicas + 1));
348+
}
349+
final IndexShard oldPrimaryShard = internalCluster().getInstance(IndicesService.class, oldPrimary).getShardOrNull(shardId);
350+
IndexShardTestCase.getEngine(oldPrimaryShard).getLocalCheckpointTracker().generateSeqNo(); // Make gap in seqno.
351+
long moreDocs = scaledRandomIntBetween(1, 10);
352+
for (int i = 0; i < moreDocs; i++) {
353+
IndexResponse indexResult = index("test", "doc", Long.toString(numDocs + i));
354+
assertThat(indexResult.getShardInfo().getSuccessful(), equalTo(numberOfReplicas + 1));
355+
}
356+
final Set<String> replicasSide1 = Sets.newHashSet(randomSubsetOf(between(1, numberOfReplicas - 1), replicaNodes));
357+
final Set<String> replicasSide2 = Sets.difference(replicaNodes, replicasSide1);
358+
NetworkDisruption partition = new NetworkDisruption(new TwoPartitions(replicasSide1, replicasSide2), new NetworkDisconnect());
359+
internalCluster().setDisruptionScheme(partition);
360+
logger.info("--> isolating some replicas during primary-replica resync");
361+
partition.startDisrupting();
362+
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(oldPrimary));
363+
// Checks that we fails replicas in one side but not mark them as stale.
364+
assertBusy(() -> {
365+
ClusterState state = client(master).admin().cluster().prepareState().get().getState();
366+
final IndexShardRoutingTable shardRoutingTable = state.routingTable().shardRoutingTable(shardId);
367+
final String newPrimaryNode = state.getRoutingNodes().node(shardRoutingTable.primary.currentNodeId()).node().getName();
368+
assertThat(newPrimaryNode, not(equalTo(oldPrimary)));
369+
Set<String> selectedPartition = replicasSide1.contains(newPrimaryNode) ? replicasSide1 : replicasSide2;
370+
assertThat(shardRoutingTable.activeShards(), hasSize(selectedPartition.size()));
371+
for (ShardRouting activeShard : shardRoutingTable.activeShards()) {
372+
assertThat(state.getRoutingNodes().node(activeShard.currentNodeId()).node().getName(), isIn(selectedPartition));
373+
}
374+
assertThat(state.metaData().index("test").inSyncAllocationIds(shardId.id()), hasSize(numberOfReplicas + 1));
375+
}, 1, TimeUnit.MINUTES);
376+
assertAcked(
377+
client(master).admin().cluster().prepareUpdateSettings()
378+
.setTransientSettings(Settings.builder().put("cluster.routing.allocation.enable", "all")).get());
379+
partition.stopDisrupting();
380+
logger.info("--> stop disrupting network and re-enable allocation");
381+
assertBusy(() -> {
382+
ClusterState state = client(master).admin().cluster().prepareState().get().getState();
383+
assertThat(state.routingTable().shardRoutingTable(shardId).activeShards(), hasSize(numberOfReplicas));
384+
assertThat(state.metaData().index("test").inSyncAllocationIds(shardId.id()), hasSize(numberOfReplicas + 1));
385+
for (String node : replicaNodes) {
386+
IndexShard shard = internalCluster().getInstance(IndicesService.class, node).getShardOrNull(shardId);
387+
assertThat(shard.getLocalCheckpoint(), equalTo(numDocs + moreDocs));
388+
}
389+
});
390+
}
391+
312392
}

0 commit comments

Comments
 (0)