Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.index.seqno.ReplicationTracker;
import org.elasticsearch.test.rest.yaml.ObjectPath;
import org.junit.Before;

Expand Down Expand Up @@ -260,7 +261,9 @@ public void testForgetFollower() throws IOException {
final Map<?, ?> shardStatsAsMap = (Map<?, ?>) shardStats.get(0);
final Map<?, ?> retentionLeasesStats = (Map<?, ?>) shardStatsAsMap.get("retention_leases");
final List<?> leases = (List<?>) retentionLeasesStats.get("leases");
assertThat(leases, empty());
for (final Object lease : leases) {
assertThat(((Map<?, ?>) lease).get("source"), equalTo(ReplicationTracker.PEER_RECOVERY_RETENTION_LEASE_SOURCE));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we instead assert the absence of CCR leases?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not as robustly as I'd like, no. We could say there's no leases with source "ccr", but that's a lot weaker than saying the only remaining leases are PRRLs, similarly to how we previously asserted that there were no leases at all.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use toMapExcludingPeerRecoveryRetentionLeases here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not very easily. Here we are the other side of the high-level REST API, and this doesn't include indices stats so we don't have access to a RetentionLeases object. It would be quite some work to build one.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, I did not realize that it's a rest test.

}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.replication.ReplicationResponse;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.routing.AllocationId;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
Expand Down Expand Up @@ -217,10 +218,22 @@ public synchronized Tuple<Boolean, RetentionLeases> getRetentionLeases(final boo
// the primary calculates the non-expired retention leases and syncs them to replicas
final long currentTimeMillis = currentTimeMillisSupplier.getAsLong();
final long retentionLeaseMillis = indexSettings.getRetentionLeaseMillis();
final Set<String> leaseIdsForCurrentPeers
= routingTable.assignedShards().stream().map(ReplicationTracker::getPeerRecoveryRetentionLeaseId).collect(Collectors.toSet());
final Map<Boolean, List<RetentionLease>> partitionByExpiration = retentionLeases
.leases()
.stream()
.collect(Collectors.groupingBy(lease -> currentTimeMillis - lease.timestamp() > retentionLeaseMillis));
.collect(Collectors.groupingBy(lease -> {
if (lease.source().equals(PEER_RECOVERY_RETENTION_LEASE_SOURCE)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make this check a method of RetentionLease?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As in #43190 (comment) I don't think RetentionLease should know about this special kind of retention lease.

if (leaseIdsForCurrentPeers.contains(lease.id())) {
return false;
}
if (routingTable.allShardsStarted()) {
return true;
}
}
return currentTimeMillis - lease.timestamp() > retentionLeaseMillis;
}));
final Collection<RetentionLease> expiredLeases = partitionByExpiration.get(true);
if (expiredLeases == null) {
// early out as no retention leases have expired
Expand All @@ -242,7 +255,7 @@ public synchronized Tuple<Boolean, RetentionLeases> getRetentionLeases(final boo
* @param source the source of the retention lease
* @param listener the callback when the retention lease is successfully added and synced to replicas
* @return the new retention lease
* @throws IllegalArgumentException if the specified retention lease already exists
* @throws RetentionLeaseAlreadyExistsException if the specified retention lease already exists
*/
public RetentionLease addRetentionLease(
final String id,
Expand All @@ -253,30 +266,46 @@ public RetentionLease addRetentionLease(
final RetentionLease retentionLease;
final RetentionLeases currentRetentionLeases;
synchronized (this) {
assert primaryMode;
if (retentionLeases.contains(id)) {
throw new RetentionLeaseAlreadyExistsException(id);
}
retentionLease = new RetentionLease(id, retainingSequenceNumber, currentTimeMillisSupplier.getAsLong(), source);
logger.debug("adding new retention lease [{}] to current retention leases [{}]", retentionLease, retentionLeases);
retentionLeases = new RetentionLeases(
operationPrimaryTerm,
retentionLeases.version() + 1,
Stream.concat(retentionLeases.leases().stream(), Stream.of(retentionLease)).collect(Collectors.toList()));
retentionLease = innerAddRetentionLease(id, retainingSequenceNumber, source);
currentRetentionLeases = retentionLeases;
}
onSyncRetentionLeases.accept(currentRetentionLeases, listener);
return retentionLease;
}

/**
* Adds a new retention lease, but does not synchronise it with the rest of the replication group.
*
* @param id the identifier of the retention lease
* @param retainingSequenceNumber the retaining sequence number
* @param source the source of the retention lease
* @return the new retention lease
* @throws RetentionLeaseAlreadyExistsException if the specified retention lease already exists
*/
private RetentionLease innerAddRetentionLease(String id, long retainingSequenceNumber, String source) {
assert Thread.holdsLock(this);
assert primaryMode : id + "/" + retainingSequenceNumber + "/" + source;
if (retentionLeases.contains(id)) {
throw new RetentionLeaseAlreadyExistsException(id);
}
final RetentionLease retentionLease
= new RetentionLease(id, retainingSequenceNumber, currentTimeMillisSupplier.getAsLong(), source);
logger.debug("adding new retention lease [{}] to current retention leases [{}]", retentionLease, retentionLeases);
retentionLeases = new RetentionLeases(
operationPrimaryTerm,
retentionLeases.version() + 1,
Stream.concat(retentionLeases.leases().stream(), Stream.of(retentionLease)).collect(Collectors.toList()));
return retentionLease;
}

/**
* Renews an existing retention lease.
*
* @param id the identifier of the retention lease
* @param retainingSequenceNumber the retaining sequence number
* @param source the source of the retention lease
* @return the renewed retention lease
* @throws IllegalArgumentException if the specified retention lease does not exist
* @throws RetentionLeaseNotFoundException if the specified retention lease does not exist
*/
public synchronized RetentionLease renewRetentionLease(final String id, final long retainingSequenceNumber, final String source) {
assert primaryMode;
Expand Down Expand Up @@ -390,6 +419,51 @@ public boolean assertRetentionLeasesPersisted(final Path path) throws IOExceptio
return true;
}


/**
* Retention leases for peer recovery have source {@link ReplicationTracker#PEER_RECOVERY_RETENTION_LEASE_SOURCE}, a lease ID
* containing the persistent node ID calculated by {@link ReplicationTracker#getPeerRecoveryRetentionLeaseId}, and retain operations
* with sequence numbers strictly greater than the given global checkpoint.
*/
public void addPeerRecoveryRetentionLease(String nodeId, long globalCheckpoint, ActionListener<ReplicationResponse> listener) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we remove this method and prepare these parameters in IndexShard instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could but I think it's appropriate to do this here given that you need to do this when working with the ReplicationTracker in isolation, e.g. PeerRecoveryRetentionLeaseExpiryTests.

addRetentionLease(getPeerRecoveryRetentionLeaseId(nodeId), globalCheckpoint + 1, PEER_RECOVERY_RETENTION_LEASE_SOURCE, listener);
}

/**
* Source for peer recovery retention leases; see {@link ReplicationTracker#addPeerRecoveryRetentionLease}.
*/
public static final String PEER_RECOVERY_RETENTION_LEASE_SOURCE = "peer recovery";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about moving this constant and two related static methods to RetentionLease class instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think RetentionLease should know about this special kind of retention lease.


/**
* Id for a peer recovery retention lease for the given node. See {@link ReplicationTracker#addPeerRecoveryRetentionLease}.
*/
static String getPeerRecoveryRetentionLeaseId(String nodeId) {
return "peer_recovery/" + nodeId;
}

/**
* Id for a peer recovery retention lease for the given {@link ShardRouting}.
* See {@link ReplicationTracker#addPeerRecoveryRetentionLease}.
*/
public static String getPeerRecoveryRetentionLeaseId(ShardRouting shardRouting) {
return getPeerRecoveryRetentionLeaseId(shardRouting.currentNodeId());
}

/**
* Advance the peer-recovery retention lease for all tracked shard copies, for use in tests until advancing these leases is done
* properly. TODO remove this.
*/
public synchronized void advancePeerRecoveryRetentionLeasesToGlobalCheckpoints() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why are we not automatically advancing the leases when the global checkpoints advance? Is it because it breaks some tests right now?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mainly because I think this change is already large enough without this feature too, and we haven't settled for definite on whether these leases should be GCP-based. Advancing the leases is needed in the tests in very few places, but I haven't tried advancing them more eagerly.

assert primaryMode;
for (ShardRouting shardRouting : routingTable) {
if (shardRouting.assignedToNode()) {
final CheckpointState checkpointState = checkpoints.get(shardRouting.allocationId().getId());
renewRetentionLease(getPeerRecoveryRetentionLeaseId(shardRouting), checkpointState.globalCheckpoint + 1,
PEER_RECOVERY_RETENTION_LEASE_SOURCE);
}
}
}

public static class CheckpointState implements Writeable {

/**
Expand Down Expand Up @@ -616,6 +690,23 @@ private boolean invariant() {
assert checkpoints.get(aId) != null : "aId [" + aId + "] is pending in sync but isn't tracked";
}

if (primaryMode
&& indexSettings.isSoftDeleteEnabled()
&& indexSettings.getIndexMetaData().getState() == IndexMetaData.State.OPEN
&& indexSettings.getIndexVersionCreated().onOrAfter(Version.V_8_0_0)) {
// all tracked shard copies have a corresponding peer-recovery retention lease
for (final ShardRouting shardRouting : routingTable.assignedShards()) {
if (checkpoints.get(shardRouting.allocationId().getId()).tracked) {
assert retentionLeases.contains(getPeerRecoveryRetentionLeaseId(shardRouting))
: "no retention lease for tracked shard [" + shardRouting + "] in " + retentionLeases;
assert PEER_RECOVERY_RETENTION_LEASE_SOURCE.equals(
retentionLeases.get(getPeerRecoveryRetentionLeaseId(shardRouting)).source())
: "incorrect source [" + retentionLeases.get(getPeerRecoveryRetentionLeaseId(shardRouting)).source()
+ "] for [" + shardRouting + "] in " + retentionLeases;
}
}
}

return true;
}

Expand Down Expand Up @@ -669,6 +760,7 @@ public ReplicationTracker(
this.pendingInSync = new HashSet<>();
this.routingTable = null;
this.replicationGroup = null;
assert Version.V_EMPTY.equals(indexSettings.getIndexVersionCreated()) == false;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this to catch issues where tests have not been properly set up?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, if this is unset then the crucial assertions are skipped, which is Very Bad™.

assert invariant();
}

Expand Down Expand Up @@ -772,6 +864,31 @@ public synchronized void activatePrimaryMode(final long localCheckpoint) {
primaryMode = true;
updateLocalCheckpoint(shardAllocationId, checkpoints.get(shardAllocationId), localCheckpoint);
updateGlobalCheckpointOnPrimary();

if (indexSettings.isSoftDeleteEnabled()) {
final ShardRouting primaryShard = routingTable.primaryShard();
final String leaseId = getPeerRecoveryRetentionLeaseId(primaryShard);
if (retentionLeases.get(leaseId) == null) {
/*
* We might have got here here via a rolling upgrade from an older version that doesn't create peer recovery retention
* leases for every shard copy, but in this case we do not expect any leases to exist.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this might also be a recovery from store?
What about when we become primary due to a primary relocation? Do we need to do this as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment is explaining the following if (indexSettings.getIndexVersionCreated().onOrAfter(Version.V_8_0_0)). Covering the cases when the index was created in an earlier version is out of scope here.

In a primary relocation the new primary, being a tracked replica, already has a lease.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Covering the cases when the index was created in an earlier version is out of scope here.

Did you mean there will be another change here? Why don't we do it now ;). The relocating target should not have a lease if the old primary was on an old version.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change is already a substantial +665/-185, and I think it's unwise to bring BWC into scope at this time. Note that this PR is against a feature branch, not master, so we're ok with missing features for now.

*/
if (indexSettings.getIndexVersionCreated().onOrAfter(Version.V_8_0_0)) {
// We are starting up the whole replication group from scratch: if we were not (i.e. this is a replica promotion) then
// this copy must already be in-sync and active and therefore holds a retention lease for itself.
assert routingTable.activeShards().equals(Collections.singletonList(primaryShard)) : routingTable.activeShards();
assert primaryShard.allocationId().getId().equals(shardAllocationId)
: routingTable.activeShards() + " vs " + shardAllocationId;
assert replicationGroup.getReplicationTargets().equals(Collections.singletonList(primaryShard));

// Safe to call innerAddRetentionLease() without a subsequent sync since there are no other members of this replication
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we don't need a sync, but why not do one any way? This will persist the leases locally on disk

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doing a sync on the cluster applier thread isn't possible as things stand because of the reroute phase; it also would mean waiting for the sync to return, which is something we try and avoid on the applier thread.

We could explicitly persist the leases when calling activatePrimaryMode but I don't think it's necessary to do so.

// group.
innerAddRetentionLease(leaseId, Math.max(0L, checkpoints.get(shardAllocationId).globalCheckpoint + 1),
PEER_RECOVERY_RETENTION_LEASE_SOURCE);
}
}
}

assert invariant();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public interface RetentionLeaseSyncer {
RetentionLeaseSyncer EMPTY = new RetentionLeaseSyncer() {
@Override
public void sync(final ShardId shardId, final RetentionLeases retentionLeases, final ActionListener<ReplicationResponse> listener) {

listener.onResponse(new ReplicationResponse());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -274,14 +274,5 @@ private static Map<String, RetentionLease> toMap(final Collection<RetentionLease
LinkedHashMap::new));
}

/**
* A utility method to convert a retention lease collection to a map from retention lease ID to retention lease.
*
* @param retentionLeases the retention lease collection
* @return the map from retention lease ID to retention lease
*/
static Map<String, RetentionLease> toMap(final RetentionLeases retentionLeases) {
return retentionLeases.leases;
}

}

15 changes: 15 additions & 0 deletions server/src/main/java/org/elasticsearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -2415,6 +2415,21 @@ public boolean isRelocatedPrimary() {
return replicationTracker.isRelocated();
}

public void addPeerRecoveryRetentionLease(String nodeId, long globalCheckpoint, ActionListener<ReplicationResponse> listener) {
assert assertPrimaryMode();
replicationTracker.addPeerRecoveryRetentionLease(nodeId, globalCheckpoint, listener);
}

/**
* Test-only method to advance the all shards' peer-recovery retention leases to their tracked global checkpoints so that operations
* can be discarded. TODO Remove this when retention leases are advanced by other mechanisms.
*/
public void advancePeerRecoveryRetentionLeasesToGlobalCheckpoints() {
assert assertPrimaryMode();
replicationTracker.advancePeerRecoveryRetentionLeasesToGlobalCheckpoints();
syncRetentionLeases();
}

class ShardEventListener implements Engine.EventListener {
private final CopyOnWriteArrayList<Consumer<ShardFailure>> delegates = new CopyOnWriteArrayList<>();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.StepListener;
import org.elasticsearch.action.support.replication.ReplicationResponse;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.CheckedSupplier;
Expand All @@ -49,6 +51,7 @@
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.RecoveryEngineException;
import org.elasticsearch.index.seqno.LocalCheckpointTracker;
import org.elasticsearch.index.seqno.RetentionLeaseAlreadyExistsException;
import org.elasticsearch.index.seqno.RetentionLeases;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.IndexShard;
Expand Down Expand Up @@ -188,10 +191,30 @@ public void recoverToTarget(ActionListener<RecoveryResponse> listener) {
}
assert startingSeqNo >= 0 : "startingSeqNo must be non negative. got: " + startingSeqNo;

final StepListener<ReplicationResponse> establishRetentionLeaseStep = new StepListener<>();
if (shard.indexSettings().isSoftDeleteEnabled()
&& shard.indexSettings().getIndexMetaData().getState() != IndexMetaData.State.CLOSE) {
runUnderPrimaryPermit(() -> {
try {
// conservative estimate of the GCP for creating the lease. TODO use the actual GCP once it's appropriate to do so
final long globalCheckpoint = startingSeqNo - 1;
// blindly create the lease. TODO integrate this with the recovery process
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure what you mean by "blindly" here and what integration you're referring to.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With this change, retention leases have no impact on the recovery process, nor do we make any attempt to add a lease for history we've any hope of retaining. E.g. with a file-based recovery we add a lease for all history.

In due course the recovery process will be made more dependent on leases.

shard.addPeerRecoveryRetentionLease(request.targetNode().getId(), globalCheckpoint, establishRetentionLeaseStep);
} catch (RetentionLeaseAlreadyExistsException e) {
logger.debug("peer-recovery retention lease already exists", e);
establishRetentionLeaseStep.onResponse(null);
}
}, shardId + " establishing retention lease for [" + request.targetAllocationId() + "]", shard, cancellableThreads, logger);
} else {
establishRetentionLeaseStep.onResponse(null);
}

final StepListener<TimeValue> prepareEngineStep = new StepListener<>();
// For a sequence based recovery, the target can keep its local translog
prepareTargetForTranslog(isSequenceNumberBasedRecovery == false,
shard.estimateNumberOfHistoryOperations("peer-recovery", startingSeqNo), prepareEngineStep);
establishRetentionLeaseStep.whenComplete(r -> {
// For a sequence based recovery, the target can keep its local translog
prepareTargetForTranslog(isSequenceNumberBasedRecovery == false,
shard.estimateNumberOfHistoryOperations("peer-recovery", startingSeqNo), prepareEngineStep);
}, onFailure);
final StepListener<SendSnapshotResult> sendSnapshotStep = new StepListener<>();
prepareEngineStep.whenComplete(prepareEngineTime -> {
/*
Expand Down
Loading