Skip to content

Commit 5d1964b

Browse files
authored
Ignore shard started requests when primary term does not match (#37899)
This commit changes the StartedShardEntry so that it also contains the primary term of the shard to start. This way the master node can also checks that the primary term from the start request is equal to the current shard's primary term in the cluster state, and it can ignore any shard started request that would concerns a previous instance of the shard that would have been allocated to the same node. Such situation are likely to happen with frozen (or restored) indices and the replication of closed indices, because with replicated closed indices the shards will be initialized again after the index is closed and can potentially be re initialized again if the index is reopened as a frozen index. In such cases the lifecycle of the shards would be something like: * shard is STARTED * index is closed * shards is INITIALIZING (index state is CLOSED, primary term is X) * index is reopened * shards are INITIALIZING again (index state is OPENED, potentially frozen, primary term is X+1) Adding the primary term to the shard started request will allow to discard potential StartedShardEntry requests received by the master node if the request concerns the shard with primary term X because it has been moved/reinitialized in the meanwhile under the primary term X+1. Relates to #33888
1 parent 7f1784e commit 5d1964b

File tree

7 files changed

+248
-82
lines changed

7 files changed

+248
-82
lines changed

server/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java

Lines changed: 39 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -494,12 +494,20 @@ public int hashCode() {
494494
}
495495
}
496496

497-
public void shardStarted(final ShardRouting shardRouting, final String message, Listener listener) {
498-
shardStarted(shardRouting, message, listener, clusterService.state());
497+
public void shardStarted(final ShardRouting shardRouting,
498+
final long primaryTerm,
499+
final String message,
500+
final Listener listener) {
501+
shardStarted(shardRouting, primaryTerm, message, listener, clusterService.state());
499502
}
500-
public void shardStarted(final ShardRouting shardRouting, final String message, Listener listener, ClusterState currentState) {
501-
StartedShardEntry shardEntry = new StartedShardEntry(shardRouting.shardId(), shardRouting.allocationId().getId(), message);
502-
sendShardAction(SHARD_STARTED_ACTION_NAME, currentState, shardEntry, listener);
503+
504+
public void shardStarted(final ShardRouting shardRouting,
505+
final long primaryTerm,
506+
final String message,
507+
final Listener listener,
508+
final ClusterState currentState) {
509+
StartedShardEntry entry = new StartedShardEntry(shardRouting.shardId(), shardRouting.allocationId().getId(), primaryTerm, message);
510+
sendShardAction(SHARD_STARTED_ACTION_NAME, currentState, entry, listener);
503511
}
504512

505513
private static class ShardStartedTransportHandler implements TransportRequestHandler<StartedShardEntry> {
@@ -544,7 +552,7 @@ public ClusterTasksResult<StartedShardEntry> execute(ClusterState currentState,
544552
List<ShardRouting> shardRoutingsToBeApplied = new ArrayList<>(tasks.size());
545553
Set<ShardRouting> seenShardRoutings = new HashSet<>(); // to prevent duplicates
546554
for (StartedShardEntry task : tasks) {
547-
ShardRouting matched = currentState.getRoutingTable().getByAllocationId(task.shardId, task.allocationId);
555+
final ShardRouting matched = currentState.getRoutingTable().getByAllocationId(task.shardId, task.allocationId);
548556
if (matched == null) {
549557
// tasks that correspond to non-existent shards are marked as successful. The reason is that we resend shard started
550558
// events on every cluster state publishing that does not contain the shard as started yet. This means that old stale
@@ -553,6 +561,19 @@ public ClusterTasksResult<StartedShardEntry> execute(ClusterState currentState,
553561
logger.debug("{} ignoring shard started task [{}] (shard does not exist anymore)", task.shardId, task);
554562
builder.success(task);
555563
} else {
564+
if (matched.primary() && task.primaryTerm > 0) {
565+
final IndexMetaData indexMetaData = currentState.metaData().index(task.shardId.getIndex());
566+
assert indexMetaData != null;
567+
final long currentPrimaryTerm = indexMetaData.primaryTerm(task.shardId.id());
568+
if (currentPrimaryTerm != task.primaryTerm) {
569+
assert currentPrimaryTerm > task.primaryTerm : "received a primary term with a higher term than in the " +
570+
"current cluster state (received [" + task.primaryTerm + "] but current is [" + currentPrimaryTerm + "])";
571+
logger.debug("{} ignoring shard started task [{}] (primary term {} does not match current term {})",
572+
task.shardId, task, task.primaryTerm, currentPrimaryTerm);
573+
builder.success(task);
574+
continue;
575+
}
576+
}
556577
if (matched.initializing() == false) {
557578
assert matched.active() : "expected active shard routing for task " + task + " but found " + matched;
558579
// same as above, this might have been a stale in-flight request, so we just ignore.
@@ -597,15 +618,20 @@ public void onFailure(String source, Exception e) {
597618
public static class StartedShardEntry extends TransportRequest {
598619
final ShardId shardId;
599620
final String allocationId;
621+
final long primaryTerm;
600622
final String message;
601623

602624
StartedShardEntry(StreamInput in) throws IOException {
603625
super(in);
604626
shardId = ShardId.readShardId(in);
605627
allocationId = in.readString();
606628
if (in.getVersion().before(Version.V_6_3_0)) {
607-
final long primaryTerm = in.readVLong();
629+
primaryTerm = in.readVLong();
608630
assert primaryTerm == UNASSIGNED_PRIMARY_TERM : "shard is only started by itself: primary term [" + primaryTerm + "]";
631+
} else if (in.getVersion().onOrAfter(Version.V_7_0_0)) { // TODO update version to 6.7.0 after backport
632+
primaryTerm = in.readVLong();
633+
} else {
634+
primaryTerm = UNASSIGNED_PRIMARY_TERM;
609635
}
610636
this.message = in.readString();
611637
if (in.getVersion().before(Version.V_6_3_0)) {
@@ -614,9 +640,10 @@ public static class StartedShardEntry extends TransportRequest {
614640
}
615641
}
616642

617-
public StartedShardEntry(ShardId shardId, String allocationId, String message) {
643+
public StartedShardEntry(final ShardId shardId, final String allocationId, final long primaryTerm, final String message) {
618644
this.shardId = shardId;
619645
this.allocationId = allocationId;
646+
this.primaryTerm = primaryTerm;
620647
this.message = message;
621648
}
622649

@@ -627,6 +654,8 @@ public void writeTo(StreamOutput out) throws IOException {
627654
out.writeString(allocationId);
628655
if (out.getVersion().before(Version.V_6_3_0)) {
629656
out.writeVLong(0L);
657+
} else if (out.getVersion().onOrAfter(Version.V_7_0_0)) { // TODO update version to 6.7.0 after backport
658+
out.writeVLong(primaryTerm);
630659
}
631660
out.writeString(message);
632661
if (out.getVersion().before(Version.V_6_3_0)) {
@@ -636,8 +665,8 @@ public void writeTo(StreamOutput out) throws IOException {
636665

637666
@Override
638667
public String toString() {
639-
return String.format(Locale.ROOT, "StartedShardEntry{shardId [%s], allocationId [%s], message [%s]}",
640-
shardId, allocationId, message);
668+
return String.format(Locale.ROOT, "StartedShardEntry{shardId [%s], allocationId [%s], primary term [%d], message [%s]}",
669+
shardId, allocationId, primaryTerm, message);
641670
}
642671
}
643672

server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -575,13 +575,14 @@ private void createShard(DiscoveryNodes nodes, RoutingTable routingTable, ShardR
575575
}
576576

577577
try {
578-
logger.debug("{} creating shard", shardRouting.shardId());
578+
final long primaryTerm = state.metaData().index(shardRouting.index()).primaryTerm(shardRouting.id());
579+
logger.debug("{} creating shard with primary term [{}]", shardRouting.shardId(), primaryTerm);
579580
RecoveryState recoveryState = new RecoveryState(shardRouting, nodes.getLocalNode(), sourceNode);
580581
indicesService.createShard(
581582
shardRouting,
582583
recoveryState,
583584
recoveryTargetService,
584-
new RecoveryListener(shardRouting),
585+
new RecoveryListener(shardRouting, primaryTerm),
585586
repositoriesService,
586587
failedShardHandler,
587588
globalCheckpointSyncer,
@@ -598,9 +599,10 @@ private void updateShard(DiscoveryNodes nodes, ShardRouting shardRouting, Shard
598599
"local shard has a different allocation id but wasn't cleaning by removeShards. "
599600
+ "cluster state: " + shardRouting + " local: " + currentRoutingEntry;
600601

602+
final long primaryTerm;
601603
try {
602604
final IndexMetaData indexMetaData = clusterState.metaData().index(shard.shardId().getIndex());
603-
final long primaryTerm = indexMetaData.primaryTerm(shard.shardId().id());
605+
primaryTerm = indexMetaData.primaryTerm(shard.shardId().id());
604606
final Set<String> inSyncIds = indexMetaData.inSyncAllocationIds(shard.shardId().id());
605607
final IndexShardRoutingTable indexShardRoutingTable = routingTable.shardRoutingTable(shardRouting.shardId());
606608
final Set<String> pre60AllocationIds = indexShardRoutingTable.assignedShards()
@@ -633,7 +635,7 @@ private void updateShard(DiscoveryNodes nodes, ShardRouting shardRouting, Shard
633635
shardRouting.shardId(), state, nodes.getMasterNode());
634636
}
635637
if (nodes.getMasterNode() != null) {
636-
shardStateAction.shardStarted(shardRouting, "master " + nodes.getMasterNode() +
638+
shardStateAction.shardStarted(shardRouting, primaryTerm, "master " + nodes.getMasterNode() +
637639
" marked shard as initializing, but shard state is [" + state + "], mark shard as started",
638640
SHARD_STATE_ACTION_LISTENER, clusterState);
639641
}
@@ -673,15 +675,24 @@ private static DiscoveryNode findSourceNodeForPeerRecovery(Logger logger, Routin
673675

674676
private class RecoveryListener implements PeerRecoveryTargetService.RecoveryListener {
675677

678+
/**
679+
* ShardRouting with which the shard was created
680+
*/
676681
private final ShardRouting shardRouting;
677682

678-
private RecoveryListener(ShardRouting shardRouting) {
683+
/**
684+
* Primary term with which the shard was created
685+
*/
686+
private final long primaryTerm;
687+
688+
private RecoveryListener(final ShardRouting shardRouting, final long primaryTerm) {
679689
this.shardRouting = shardRouting;
690+
this.primaryTerm = primaryTerm;
680691
}
681692

682693
@Override
683-
public void onRecoveryDone(RecoveryState state) {
684-
shardStateAction.shardStarted(shardRouting, "after " + state.getRecoverySource(), SHARD_STATE_ACTION_LISTENER);
694+
public void onRecoveryDone(final RecoveryState state) {
695+
shardStateAction.shardStarted(shardRouting, primaryTerm, "after " + state.getRecoverySource(), SHARD_STATE_ACTION_LISTENER);
685696
}
686697

687698
@Override

0 commit comments

Comments
 (0)