Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -117,19 +117,17 @@ public void execute() throws Exception {
// This would entail that some shards could learn about a global checkpoint that would be higher than its local checkpoint.
final long globalCheckpoint = primary.globalCheckpoint();
final ReplicationGroup replicationGroup = primary.getReplicationGroup();
markUnavailableShardsAsStale(replicaRequest, replicationGroup.getInSyncAllocationIds(), replicationGroup.getRoutingTable());
performOnReplicas(replicaRequest, globalCheckpoint, replicationGroup.getRoutingTable());
markUnavailableShardsAsStale(replicaRequest, replicationGroup);
performOnReplicas(replicaRequest, globalCheckpoint, replicationGroup);
}

successfulShards.incrementAndGet(); // mark primary as successful
decPendingAndFinishIfNeeded();
}

private void markUnavailableShardsAsStale(ReplicaRequest replicaRequest, Set<String> inSyncAllocationIds,
IndexShardRoutingTable indexShardRoutingTable) {
private void markUnavailableShardsAsStale(ReplicaRequest replicaRequest, ReplicationGroup replicationGroup) {
// if inSyncAllocationIds contains allocation ids of shards that don't exist in RoutingTable, mark copies as stale
for (String allocationId : Sets.difference(inSyncAllocationIds, indexShardRoutingTable.getAllAllocationIds())) {
// mark copy as stale
for (String allocationId : replicationGroup.getUnavailableInSyncShards()) {
pendingActions.incrementAndGet();
replicasProxy.markShardCopyAsStaleIfNeeded(replicaRequest.shardId(), allocationId,
ReplicationOperation.this::decPendingAndFinishIfNeeded,
Expand All @@ -140,22 +138,16 @@ private void markUnavailableShardsAsStale(ReplicaRequest replicaRequest, Set<Str
}

private void performOnReplicas(final ReplicaRequest replicaRequest, final long globalCheckpoint,
final IndexShardRoutingTable indexShardRoutingTable) {
final String localNodeId = primary.routingEntry().currentNodeId();
// If the index gets deleted after primary operation, we skip replication
for (final ShardRouting shard : indexShardRoutingTable) {
if (shard.unassigned()) {
assert shard.primary() == false : "primary shard should not be unassigned in a replication group: " + shard;
totalShards.incrementAndGet();
continue;
}
final ReplicationGroup replicationGroup) {
// for total stats, add number of unassigned shards and
// number of initializing shards that are not ready yet to receive operations (recovery has not opened engine yet on the target)
totalShards.addAndGet(replicationGroup.getSkippedShards().size());

if (shard.currentNodeId().equals(localNodeId) == false) {
performOnReplica(shard, replicaRequest, globalCheckpoint);
}
final ShardRouting primaryRouting = primary.routingEntry();

if (shard.relocating() && shard.relocatingNodeId().equals(localNodeId) == false) {
performOnReplica(shard.getTargetRelocatingShard(), replicaRequest, globalCheckpoint);
for (final ShardRouting shard : replicationGroup.getReplicationTargets()) {
if (shard.isSameAllocation(primaryRouting) == false) {
performOnReplica(shard, replicaRequest, globalCheckpoint);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import com.carrotsearch.hppc.ObjectLongHashMap;
import com.carrotsearch.hppc.ObjectLongMap;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.routing.AllocationId;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
Expand Down Expand Up @@ -48,15 +49,17 @@
import java.util.stream.LongStream;

/**
* This class is responsible of tracking the global checkpoint. The global checkpoint is the highest sequence number for which all lower (or
* equal) sequence number have been processed on all shards that are currently active. Since shards count as "active" when the master starts
* This class is responsible for tracking the replication group with its progress and safety markers (local and global checkpoints).
*
* The global checkpoint is the highest sequence number for which all lower (or equal) sequence number have been processed
* on all shards that are currently active. Since shards count as "active" when the master starts
* them, and before this primary shard has been notified of this fact, we also include shards that have completed recovery. These shards
* have received all old operations via the recovery mechanism and are kept up to date by the various replications actions. The set of
* shards that are taken into account for the global checkpoint calculation are called the "in-sync shards".
* <p>
* The global checkpoint is maintained by the primary shard and is replicated to all the replicas (via {@link GlobalCheckpointSyncAction}).
*/
public class GlobalCheckpointTracker extends AbstractIndexShardComponent implements LongSupplier {
public class ReplicationTracker extends AbstractIndexShardComponent implements LongSupplier {

/**
* The allocation ID for the shard to which this tracker is a component of.
Expand Down Expand Up @@ -146,30 +149,49 @@ public static class CheckpointState implements Writeable {
*/
boolean inSync;

public CheckpointState(long localCheckpoint, long globalCheckpoint, boolean inSync) {
/**
* whether this shard is tracked in the replication group, i.e., should receive document updates from the primary.
*/
boolean tracked;

public CheckpointState(long localCheckpoint, long globalCheckpoint, boolean inSync, boolean tracked) {
this.localCheckpoint = localCheckpoint;
this.globalCheckpoint = globalCheckpoint;
this.inSync = inSync;
this.tracked = tracked;
}

public CheckpointState(StreamInput in) throws IOException {
this.localCheckpoint = in.readZLong();
this.globalCheckpoint = in.readZLong();
this.inSync = in.readBoolean();
if (in.getVersion().onOrAfter(Version.V_6_3_0)) {
this.tracked = in.readBoolean();
} else {
// Every in-sync shard copy is also tracked (see invariant). This was the case even in earlier ES versions.
// Non in-sync shard copies might be tracked or not. As this information here is only serialized during relocation hand-off,
// after which replica recoveries cannot complete anymore (i.e. they cannot move from in-sync == false to in-sync == true),
// we can treat non in-sync replica shard copies as untracked. They will go through a fresh recovery against the new
// primary and will become tracked again under this primary before they are marked as in-sync.
this.tracked = inSync;
}
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeZLong(localCheckpoint);
out.writeZLong(globalCheckpoint);
out.writeBoolean(inSync);
if (out.getVersion().onOrAfter(Version.V_6_3_0)) {
out.writeBoolean(tracked);
}
}

/**
* Returns a full copy of this object
*/
public CheckpointState copy() {
return new CheckpointState(localCheckpoint, globalCheckpoint, inSync);
return new CheckpointState(localCheckpoint, globalCheckpoint, inSync, tracked);
}

public long getLocalCheckpoint() {
Expand All @@ -186,6 +208,7 @@ public String toString() {
"localCheckpoint=" + localCheckpoint +
", globalCheckpoint=" + globalCheckpoint +
", inSync=" + inSync +
", tracked=" + tracked +
'}';
}

Expand All @@ -198,14 +221,16 @@ public boolean equals(Object o) {

if (localCheckpoint != that.localCheckpoint) return false;
if (globalCheckpoint != that.globalCheckpoint) return false;
return inSync == that.inSync;
if (inSync != that.inSync) return false;
return tracked == that.tracked;
}

@Override
public int hashCode() {
int result = Long.hashCode(localCheckpoint);
result = 31 * result + Long.hashCode(globalCheckpoint);
result = 31 * result + Boolean.hashCode(inSync);
result = 31 * result + Boolean.hashCode(tracked);
return result;
}
}
Expand Down Expand Up @@ -301,6 +326,9 @@ private boolean invariant() {
// blocking global checkpoint advancement only happens for shards that are not in-sync
assert !pendingInSync.contains(entry.getKey()) || !entry.getValue().inSync :
"shard copy " + entry.getKey() + " blocks global checkpoint advancement but is in-sync";
// in-sync shard copies are tracked
assert !entry.getValue().inSync || entry.getValue().tracked :
"shard copy " + entry.getKey() + " is in-sync but not tracked";
}

return true;
Expand Down Expand Up @@ -330,7 +358,7 @@ private static long inSyncCheckpointStates(
* @param indexSettings the index settings
* @param globalCheckpoint the last known global checkpoint for this shard, or {@link SequenceNumbers#UNASSIGNED_SEQ_NO}
*/
public GlobalCheckpointTracker(
public ReplicationTracker(
final ShardId shardId,
final String allocationId,
final IndexSettings indexSettings,
Expand All @@ -342,7 +370,7 @@ public GlobalCheckpointTracker(
this.handoffInProgress = false;
this.appliedClusterStateVersion = -1L;
this.checkpoints = new HashMap<>(1 + indexSettings.getNumberOfReplicas());
checkpoints.put(allocationId, new CheckpointState(SequenceNumbers.UNASSIGNED_SEQ_NO, globalCheckpoint, false));
checkpoints.put(allocationId, new CheckpointState(SequenceNumbers.UNASSIGNED_SEQ_NO, globalCheckpoint, false, false));
this.pendingInSync = new HashSet<>();
this.routingTable = null;
this.replicationGroup = null;
Expand All @@ -361,7 +389,8 @@ public ReplicationGroup getReplicationGroup() {

private ReplicationGroup calculateReplicationGroup() {
return new ReplicationGroup(routingTable,
checkpoints.entrySet().stream().filter(e -> e.getValue().inSync).map(Map.Entry::getKey).collect(Collectors.toSet()));
checkpoints.entrySet().stream().filter(e -> e.getValue().inSync).map(Map.Entry::getKey).collect(Collectors.toSet()),
checkpoints.entrySet().stream().filter(e -> e.getValue().tracked).map(Map.Entry::getKey).collect(Collectors.toSet()));
}

/**
Expand Down Expand Up @@ -481,7 +510,7 @@ public synchronized void updateFromMaster(final long applyingClusterStateVersion
final long localCheckpoint = pre60AllocationIds.contains(initializingId) ?
SequenceNumbers.PRE_60_NODE_CHECKPOINT : SequenceNumbers.UNASSIGNED_SEQ_NO;
final long globalCheckpoint = localCheckpoint;
checkpoints.put(initializingId, new CheckpointState(localCheckpoint, globalCheckpoint, inSync));
checkpoints.put(initializingId, new CheckpointState(localCheckpoint, globalCheckpoint, inSync, inSync));
}
}
} else {
Expand All @@ -490,18 +519,20 @@ public synchronized void updateFromMaster(final long applyingClusterStateVersion
final long localCheckpoint = pre60AllocationIds.contains(initializingId) ?
SequenceNumbers.PRE_60_NODE_CHECKPOINT : SequenceNumbers.UNASSIGNED_SEQ_NO;
final long globalCheckpoint = localCheckpoint;
checkpoints.put(initializingId, new CheckpointState(localCheckpoint, globalCheckpoint, false));
checkpoints.put(initializingId, new CheckpointState(localCheckpoint, globalCheckpoint, false, false));
}
}
for (String inSyncId : inSyncAllocationIds) {
if (shardAllocationId.equals(inSyncId)) {
// current shard is initially marked as not in-sync because we don't know better at that point
checkpoints.get(shardAllocationId).inSync = true;
CheckpointState checkpointState = checkpoints.get(shardAllocationId);
checkpointState.inSync = true;
checkpointState.tracked = true;
} else {
final long localCheckpoint = pre60AllocationIds.contains(inSyncId) ?
SequenceNumbers.PRE_60_NODE_CHECKPOINT : SequenceNumbers.UNASSIGNED_SEQ_NO;
final long globalCheckpoint = localCheckpoint;
checkpoints.put(inSyncId, new CheckpointState(localCheckpoint, globalCheckpoint, true));
checkpoints.put(inSyncId, new CheckpointState(localCheckpoint, globalCheckpoint, true, true));
}
}
}
Expand All @@ -516,19 +547,22 @@ public synchronized void updateFromMaster(final long applyingClusterStateVersion
}

/**
* Called when the recovery process for a shard is ready to open the engine on the target shard. Ensures that the right data structures
* have been set up locally to track local checkpoint information for the shard.
* Called when the recovery process for a shard has opened the engine on the target shard. Ensures that the right data structures
* have been set up locally to track local checkpoint information for the shard and that the shard is added to the replication group.
*
* @param allocationId the allocation ID of the shard for which recovery was initiated
*/
public synchronized void initiateTracking(final String allocationId) {
assert invariant();
assert primaryMode;
assert handoffInProgress == false;
CheckpointState cps = checkpoints.get(allocationId);
if (cps == null) {
// can happen if replica was removed from cluster but recovery process is unaware of it yet
throw new IllegalStateException("no local checkpoint tracking information available");
}
cps.tracked = true;
replicationGroup = calculateReplicationGroup();
assert invariant();
}

Expand All @@ -551,6 +585,7 @@ public synchronized void markAllocationIdAsInSync(final String allocationId, fin
assert localCheckpoint >= SequenceNumbers.NO_OPS_PERFORMED :
"expected known local checkpoint for " + allocationId + " but was " + localCheckpoint;
assert pendingInSync.contains(allocationId) == false : "shard copy " + allocationId + " is already marked as pending in-sync";
assert cps.tracked : "shard copy " + allocationId + " cannot be marked as in-sync as it's not tracked";
updateLocalCheckpoint(allocationId, cps, localCheckpoint);
// if it was already in-sync (because of a previously failed recovery attempt), global checkpoint must have been
// stuck from advancing
Expand Down
Loading