diff --git a/server/src/main/java/org/elasticsearch/index/engine/Engine.java b/server/src/main/java/org/elasticsearch/index/engine/Engine.java index bcab18ba33ea3..4aa49929a252c 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -747,7 +747,7 @@ public abstract int estimateNumberOfHistoryOperations(String source, MapperService mapperService, long startingSeqNo) throws IOException; /** - * Checks if this engine has every operations since {@code startingSeqNo}(inclusive) in its translog + * Checks if this engine has every operations since {@code startingSeqNo}(inclusive) in its history (either Lucene or translog) */ public abstract boolean hasCompleteOperationHistory(String source, MapperService mapperService, long startingSeqNo) throws IOException; diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 8662de260d068..773fc02c881cc 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -514,10 +514,15 @@ public void syncTranslog() throws IOException { } /** - * Creates a new history snapshot for reading operations since the provided seqno from the translog. + * Creates a new history snapshot for reading operations since the provided seqno. + * The returned snapshot can be retrieved from either Lucene index or translog files. */ @Override public Translog.Snapshot readHistoryOperations(String source, MapperService mapperService, long startingSeqNo) throws IOException { + if (engineConfig.getIndexSettings().isSoftDeleteEnabled()) { + return newChangesSnapshot(source, mapperService, Math.max(0, startingSeqNo), Long.MAX_VALUE, false); + } + return getTranslog().newSnapshotFromMinSeqNo(startingSeqNo); } @@ -525,7 +530,14 @@ public Translog.Snapshot readHistoryOperations(String source, MapperService mapp * Returns the estimated number of history operations whose seq# at least the provided seq# in this engine. */ @Override - public int estimateNumberOfHistoryOperations(String source, MapperService mapperService, long startingSeqNo) { + public int estimateNumberOfHistoryOperations(String source, MapperService mapperService, long startingSeqNo) throws IOException { + if (engineConfig.getIndexSettings().isSoftDeleteEnabled()) { + try (Translog.Snapshot snapshot = newChangesSnapshot(source, mapperService, Math.max(0, startingSeqNo), + Long.MAX_VALUE, false)) { + return snapshot.totalOperations(); + } + } + return getTranslog().estimateTotalOperationsFromMinSeq(startingSeqNo); } @@ -2573,6 +2585,10 @@ public Translog.Snapshot newChangesSnapshot(String source, MapperService mapperS @Override public boolean hasCompleteOperationHistory(String source, MapperService mapperService, long startingSeqNo) throws IOException { + if (engineConfig.getIndexSettings().isSoftDeleteEnabled()) { + return getMinRetainedSeqNo() <= startingSeqNo; + } + final long currentLocalCheckpoint = localCheckpointTracker.getProcessedCheckpoint(); // avoid scanning translog if not necessary if (startingSeqNo > currentLocalCheckpoint) { @@ -2602,15 +2618,7 @@ public final long getMinRetainedSeqNo() { @Override public Closeable acquireRetentionLock() { if (softDeleteEnabled) { - final Releasable softDeletesRetentionLock = softDeletesPolicy.acquireRetentionLock(); - final Closeable translogRetentionLock; - try { - translogRetentionLock = translog.acquireRetentionLock(); - } catch (Exception e) { - softDeletesRetentionLock.close(); - throw e; - } - return () -> IOUtils.close(translogRetentionLock, softDeletesRetentionLock); + return softDeletesPolicy.acquireRetentionLock(); } else { return translog.acquireRetentionLock(); } diff --git a/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java b/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java index 715b25a5175fa..7185fd4319af4 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java @@ -290,6 +290,36 @@ public RetentionLease addRetentionLease( return retentionLease; } + /** + * Atomically clones an existing retention lease to a new ID. + * + * @param sourceLeaseId the identifier of the source retention lease + * @param targetLeaseId the identifier of the retention lease to create + * @param listener the callback when the retention lease is successfully added and synced to replicas + * @return the new retention lease + * @throws RetentionLeaseNotFoundException if the specified source retention lease does not exist + * @throws RetentionLeaseAlreadyExistsException if the specified target retention lease already exists + */ + RetentionLease cloneRetentionLease(String sourceLeaseId, String targetLeaseId, ActionListener listener) { + Objects.requireNonNull(listener); + final RetentionLease retentionLease; + final RetentionLeases currentRetentionLeases; + synchronized (this) { + assert primaryMode; + if (getRetentionLeases().contains(sourceLeaseId) == false) { + throw new RetentionLeaseNotFoundException(sourceLeaseId); + } + final RetentionLease sourceLease = getRetentionLeases().get(sourceLeaseId); + retentionLease = innerAddRetentionLease(targetLeaseId, sourceLease.retainingSequenceNumber(), sourceLease.source()); + currentRetentionLeases = retentionLeases; + } + + // Syncing here may not be strictly necessary, because this new lease isn't retaining any extra history that wasn't previously + // retained by the source lease; however we prefer to sync anyway since we expect to do so whenever creating a new lease. + onSyncRetentionLeases.accept(currentRetentionLeases, listener); + return retentionLease; + } + /** * Adds a new retention lease, but does not synchronise it with the rest of the replication group. * @@ -442,8 +472,16 @@ public boolean assertRetentionLeasesPersisted(final Path path) throws IOExceptio * containing the persistent node ID calculated by {@link ReplicationTracker#getPeerRecoveryRetentionLeaseId}, and retain operations * with sequence numbers strictly greater than the given global checkpoint. */ - public void addPeerRecoveryRetentionLease(String nodeId, long globalCheckpoint, ActionListener listener) { - addRetentionLease(getPeerRecoveryRetentionLeaseId(nodeId), globalCheckpoint + 1, PEER_RECOVERY_RETENTION_LEASE_SOURCE, listener); + public RetentionLease addPeerRecoveryRetentionLease(String nodeId, long globalCheckpoint, + ActionListener listener) { + return addRetentionLease(getPeerRecoveryRetentionLeaseId(nodeId), globalCheckpoint + 1, + PEER_RECOVERY_RETENTION_LEASE_SOURCE, listener); + } + + public RetentionLease cloneLocalPeerRecoveryRetentionLease(String nodeId, ActionListener listener) { + return cloneRetentionLease( + getPeerRecoveryRetentionLeaseId(routingTable.primaryShard()), + getPeerRecoveryRetentionLeaseId(nodeId), listener); } public void removePeerRecoveryRetentionLease(String nodeId, ActionListener listener) { diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 9fe76fbc78fa5..dc3795f160677 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -40,6 +40,7 @@ import org.elasticsearch.Assertions; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.flush.FlushRequest; import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest; @@ -2597,9 +2598,17 @@ public boolean isRelocatedPrimary() { return replicationTracker.isRelocated(); } - public void addPeerRecoveryRetentionLease(String nodeId, long globalCheckpoint, ActionListener listener) { + public RetentionLease addPeerRecoveryRetentionLease(String nodeId, long globalCheckpoint, + ActionListener listener) { assert assertPrimaryMode(); - replicationTracker.addPeerRecoveryRetentionLease(nodeId, globalCheckpoint, listener); + // only needed for BWC reasons involving rolling upgrades from versions that do not support PRRLs: + assert indexSettings.getIndexVersionCreated().before(Version.V_7_4_0); + return replicationTracker.addPeerRecoveryRetentionLease(nodeId, globalCheckpoint, listener); + } + + public RetentionLease cloneLocalPeerRecoveryRetentionLease(String nodeId, ActionListener listener) { + assert assertPrimaryMode(); + return replicationTracker.cloneLocalPeerRecoveryRetentionLease(nodeId, listener); } public void removePeerRecoveryRetentionLease(String nodeId, ActionListener listener) { diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index d812dedbc5cf9..77c86c6e02946 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -29,7 +29,9 @@ import org.apache.lucene.store.IndexInput; import org.apache.lucene.store.RateLimiter; import org.apache.lucene.util.ArrayUtil; +import org.apache.lucene.util.SetOnce; import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.StepListener; import org.elasticsearch.action.support.ThreadedActionListener; @@ -52,7 +54,8 @@ import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.RecoveryEngineException; -import org.elasticsearch.index.seqno.RetentionLeaseAlreadyExistsException; +import org.elasticsearch.index.seqno.ReplicationTracker; +import org.elasticsearch.index.seqno.RetentionLease; import org.elasticsearch.index.seqno.RetentionLeaseNotFoundException; import org.elasticsearch.index.seqno.RetentionLeases; import org.elasticsearch.index.seqno.SequenceNumbers; @@ -149,6 +152,10 @@ public void recoverToTarget(ActionListener listener) { IOUtils.closeWhileHandlingException(releaseResources, () -> wrappedListener.onFailure(e)); }; + final boolean useRetentionLeases = shard.indexSettings().isSoftDeleteEnabled() + && shard.indexSettings().getIndexMetaData().getState() != IndexMetaData.State.CLOSE; + final SetOnce retentionLeaseRef = new SetOnce<>(); + runUnderPrimaryPermit(() -> { final IndexShardRoutingTable routingTable = shard.getReplicationGroup().getRoutingTable(); ShardRouting targetShardRouting = routingTable.getByAllocationId(request.targetAllocationId()); @@ -158,16 +165,37 @@ public void recoverToTarget(ActionListener listener) { throw new DelayRecoveryException("source node does not have the shard listed in its state as allocated on the node"); } assert targetShardRouting.initializing() : "expected recovery target to be initializing but was " + targetShardRouting; + retentionLeaseRef.set(useRetentionLeases ? shard.getRetentionLeases().get( + ReplicationTracker.getPeerRecoveryRetentionLeaseId(targetShardRouting)) : null); }, shardId + " validating recovery target ["+ request.targetAllocationId() + "] registered ", shard, cancellableThreads, logger); final Closeable retentionLock = shard.acquireRetentionLock(); resources.add(retentionLock); final long startingSeqNo; - final boolean isSequenceNumberBasedRecovery = request.startingSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO && - isTargetSameHistory() && shard.hasCompleteHistoryOperations("peer-recovery", request.startingSeqNo()); + final boolean isSequenceNumberBasedRecovery + = request.startingSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO + && isTargetSameHistory() + && shard.hasCompleteHistoryOperations("peer-recovery", request.startingSeqNo()) + && (useRetentionLeases == false + || (retentionLeaseRef.get() != null && retentionLeaseRef.get().retainingSequenceNumber() <= request.startingSeqNo())); + // NB check hasCompleteHistoryOperations when computing isSequenceNumberBasedRecovery, even if there is a retention lease, + // because when doing a rolling upgrade from earlier than 7.4 we may create some leases that are initially unsatisfied. It's + // possible there are other cases where we cannot satisfy all leases, because that's not a property we currently expect to hold. + // Also it's pretty cheap when soft deletes are enabled, and it'd be a disaster if we tried a sequence-number-based recovery + // without having a complete history. + + if (isSequenceNumberBasedRecovery && useRetentionLeases) { + // all the history we need is retained by an existing retention lease, so we do not need a separate retention lock + retentionLock.close(); + logger.trace("history is retained by {}", retentionLeaseRef.get()); + } else { + // all the history we need is retained by the retention lock, obtained before calling shard.hasCompleteHistoryOperations() + // and before acquiring the safe commit we'll be using, so we can be certain that all operations after the safe commit's + // local checkpoint will be retained for the duration of this recovery. + logger.trace("history is retained by retention lock"); + } final StepListener sendFileStep = new StepListener<>(); - final StepListener establishRetentionLeaseStep = new StepListener<>(); final StepListener prepareEngineStep = new StepListener<>(); final StepListener sendSnapshotStep = new StepListener<>(); final StepListener finalizeStep = new StepListener<>(); @@ -184,9 +212,22 @@ public void recoverToTarget(ActionListener listener) { } catch (final Exception e) { throw new RecoveryEngineException(shard.shardId(), 1, "snapshot failed", e); } - // We need to set this to 0 to create a translog roughly according to the retention policy on the target. Note that it will - // still filter out legacy operations without seqNo. - startingSeqNo = 0; + + // Try and copy enough operations to the recovering peer so that if it is promoted to primary then it has a chance of being + // able to recover other replicas using operations-based recoveries. If we are not using retention leases then we + // conservatively copy all available operations. If we are using retention leases then "enough operations" is just the + // operations from the local checkpoint of the safe commit onwards, because when using soft deletes the safe commit retains + // at least as much history as anything else. The safe commit will often contain all the history retained by the current set + // of retention leases, but this is not guaranteed: an earlier peer recovery from a different primary might have created a + // retention lease for some history that this primary already discarded, since we discard history when the global checkpoint + // advances and not when creating a new safe commit. In any case this is a best-effort thing since future recoveries can + // always fall back to file-based ones, and only really presents a problem if this primary fails before things have settled + // down. + startingSeqNo = useRetentionLeases + ? Long.parseLong(safeCommitRef.getIndexCommit().getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)) + 1L + : 0; + logger.trace("performing file-based recovery followed by history replay starting at [{}]", startingSeqNo); + try { final int estimateNumOps = shard.estimateNumberOfHistoryOperations("peer-recovery", startingSeqNo); shard.store().incRef(); @@ -201,8 +242,7 @@ public void recoverToTarget(ActionListener listener) { }); final StepListener deleteRetentionLeaseStep = new StepListener<>(); - if (shard.indexSettings().isSoftDeleteEnabled() - && shard.indexSettings().getIndexMetaData().getState() != IndexMetaData.State.CLOSE) { + if (useRetentionLeases) { runUnderPrimaryPermit(() -> { try { // If the target previously had a copy of this shard then a file-based recovery might move its global @@ -223,7 +263,15 @@ public void recoverToTarget(ActionListener listener) { deleteRetentionLeaseStep.whenComplete(ignored -> { assert Transports.assertNotTransportThread(RecoverySourceHandler.this + "[phase1]"); - phase1(safeCommitRef.getIndexCommit(), shard.getLastKnownGlobalCheckpoint(), () -> estimateNumOps, sendFileStep); + + final Consumer> createRetentionLeaseAsync; + if (useRetentionLeases) { + createRetentionLeaseAsync = l -> createRetentionLease(startingSeqNo, l); + } else { + createRetentionLeaseAsync = l -> l.onResponse(null); + } + + phase1(safeCommitRef.getIndexCommit(), createRetentionLeaseAsync, () -> estimateNumOps, sendFileStep); }, onFailure); } catch (final Exception e) { @@ -233,28 +281,6 @@ public void recoverToTarget(ActionListener listener) { assert startingSeqNo >= 0 : "startingSeqNo must be non negative. got: " + startingSeqNo; sendFileStep.whenComplete(r -> { - if (shard.indexSettings().isSoftDeleteEnabled() - && shard.indexSettings().getIndexMetaData().getState() != IndexMetaData.State.CLOSE) { - runUnderPrimaryPermit(() -> { - try { - // conservative estimate of the GCP for creating the lease. TODO use the actual GCP once it's appropriate - final long globalCheckpoint = startingSeqNo - 1; - // blindly create the lease. TODO integrate this with the recovery process - shard.addPeerRecoveryRetentionLease(request.targetNode().getId(), globalCheckpoint, - new ThreadedActionListener<>(logger, shard.getThreadPool(), - ThreadPool.Names.GENERIC, establishRetentionLeaseStep, false)); - } catch (RetentionLeaseAlreadyExistsException e) { - logger.debug("peer-recovery retention lease already exists", e); - establishRetentionLeaseStep.onResponse(null); - } - }, shardId + " establishing retention lease for [" + request.targetAllocationId() + "]", - shard, cancellableThreads, logger); - } else { - establishRetentionLeaseStep.onResponse(null); - } - }, onFailure); - - establishRetentionLeaseStep.whenComplete(r -> { assert Transports.assertNotTransportThread(RecoverySourceHandler.this + "[prepareTargetForTranslog]"); // For a sequence based recovery, the target can keep its local translog prepareTargetForTranslog(isSequenceNumberBasedRecovery == false, @@ -273,14 +299,16 @@ public void recoverToTarget(ActionListener listener) { shardId + " initiating tracking of " + request.targetAllocationId(), shard, cancellableThreads, logger); final long endingSeqNo = shard.seqNoStats().getMaxSeqNo(); - if (logger.isTraceEnabled()) { - logger.trace("snapshot translog for recovery; current size is [{}]", - shard.estimateNumberOfHistoryOperations("peer-recovery", startingSeqNo)); - } + logger.trace("snapshot translog for recovery; current size is [{}]", + shard.estimateNumberOfHistoryOperations("peer-recovery", startingSeqNo)); final Translog.Snapshot phase2Snapshot = shard.getHistoryOperations("peer-recovery", startingSeqNo); resources.add(phase2Snapshot); - // we can release the retention lock here because the snapshot itself will retain the required operations. - retentionLock.close(); + + if (useRetentionLeases == false || isSequenceNumberBasedRecovery == false) { + // we can release the retention lock here because the snapshot itself will retain the required operations. + retentionLock.close(); + } + // we have to capture the max_seen_auto_id_timestamp and the max_seq_no_of_updates to make sure that these values // are at least as high as the corresponding values on the primary when any of these operations were executed on it. final long maxSeenAutoIdTimestamp = shard.getMaxSeenAutoIdTimestamp(); @@ -399,16 +427,9 @@ static final class SendFileResult { * segments that are missing. Only segments that have the same size and * checksum can be reused */ - void phase1(IndexCommit snapshot, long globalCheckpoint, IntSupplier translogOps, ActionListener listener) { + void phase1(IndexCommit snapshot, Consumer> createRetentionLease, + IntSupplier translogOps, ActionListener listener) { cancellableThreads.checkForCancel(); - // Total size of segment files that are recovered - long totalSizeInBytes = 0; - // Total size of segment files that were able to be re-used - long existingTotalSizeInBytes = 0; - final List phase1FileNames = new ArrayList<>(); - final List phase1FileSizes = new ArrayList<>(); - final List phase1ExistingFileNames = new ArrayList<>(); - final List phase1ExistingFileSizes = new ArrayList<>(); final Store store = shard.store(); try { StopWatch stopWatch = new StopWatch().start(); @@ -428,6 +449,16 @@ void phase1(IndexCommit snapshot, long globalCheckpoint, IntSupplier translogOps } } if (canSkipPhase1(recoverySourceMetadata, request.metadataSnapshot()) == false) { + final List phase1FileNames = new ArrayList<>(); + final List phase1FileSizes = new ArrayList<>(); + final List phase1ExistingFileNames = new ArrayList<>(); + final List phase1ExistingFileSizes = new ArrayList<>(); + + // Total size of segment files that are recovered + long totalSizeInBytes = 0; + // Total size of segment files that were able to be re-used + long existingTotalSizeInBytes = 0; + // Generate a "diff" of all the identical, different, and missing // segment files on the target node, using the existing files on // the source node @@ -462,6 +493,7 @@ void phase1(IndexCommit snapshot, long globalCheckpoint, IntSupplier translogOps phase1ExistingFileNames.size(), new ByteSizeValue(existingTotalSizeInBytes)); final StepListener sendFileInfoStep = new StepListener<>(); final StepListener sendFilesStep = new StepListener<>(); + final StepListener createRetentionLeaseStep = new StepListener<>(); final StepListener cleanFilesStep = new StepListener<>(); cancellableThreads.execute(() -> recoveryTarget.receiveFileInfo(phase1FileNames, phase1FileSizes, phase1ExistingFileNames, @@ -470,8 +502,20 @@ void phase1(IndexCommit snapshot, long globalCheckpoint, IntSupplier translogOps sendFileInfoStep.whenComplete(r -> sendFiles(store, phase1Files.toArray(new StoreFileMetaData[0]), translogOps, sendFilesStep), listener::onFailure); - sendFilesStep.whenComplete(r -> - cleanFiles(store, recoverySourceMetadata, translogOps, globalCheckpoint, cleanFilesStep), listener::onFailure); + sendFilesStep.whenComplete(r -> createRetentionLease.accept(createRetentionLeaseStep), listener::onFailure); + + createRetentionLeaseStep.whenComplete(retentionLease -> + { + final long lastKnownGlobalCheckpoint = shard.getLastKnownGlobalCheckpoint(); + assert retentionLease == null || retentionLease.retainingSequenceNumber() - 1 <= lastKnownGlobalCheckpoint + : retentionLease + " vs " + lastKnownGlobalCheckpoint; + // Establishes new empty translog on the replica with global checkpoint set to lastKnownGlobalCheckpoint. We want + // the commit we just copied to be a safe commit on the replica, so why not set the global checkpoint on the replica + // to the max seqno of this commit? Because (in rare corner cases) this commit might not be a safe commit here on + // the primary, and in these cases the max seqno would be too high to be valid as a global checkpoint. + cleanFiles(store, recoverySourceMetadata, translogOps, lastKnownGlobalCheckpoint, cleanFilesStep); + }, + listener::onFailure); final long totalSize = totalSizeInBytes; final long existingTotalSize = existingTotalSizeInBytes; @@ -482,18 +526,59 @@ void phase1(IndexCommit snapshot, long globalCheckpoint, IntSupplier translogOps phase1ExistingFileSizes, existingTotalSize, took)); }, listener::onFailure); } else { - logger.trace("skipping [phase1]- identical sync id [{}] found on both source and target", - recoverySourceMetadata.getSyncId()); - final TimeValue took = stopWatch.totalTime(); - logger.trace("recovery [phase1]: took [{}]", took); - listener.onResponse(new SendFileResult(phase1FileNames, phase1FileSizes, totalSizeInBytes, phase1ExistingFileNames, - phase1ExistingFileSizes, existingTotalSizeInBytes, took)); + logger.trace("skipping [phase1] since source and target have identical sync id [{}]", recoverySourceMetadata.getSyncId()); + + // but we must still create a retention lease + final StepListener createRetentionLeaseStep = new StepListener<>(); + createRetentionLease.accept(createRetentionLeaseStep); + createRetentionLeaseStep.whenComplete(retentionLease -> { + final TimeValue took = stopWatch.totalTime(); + logger.trace("recovery [phase1]: took [{}]", took); + listener.onResponse(new SendFileResult(Collections.emptyList(), Collections.emptyList(), 0L, Collections.emptyList(), + Collections.emptyList(), 0L, took)); + }, listener::onFailure); + } } catch (Exception e) { - throw new RecoverFilesRecoveryException(request.shardId(), phase1FileNames.size(), new ByteSizeValue(totalSizeInBytes), e); + throw new RecoverFilesRecoveryException(request.shardId(), 0, new ByteSizeValue(0L), e); } } + private void createRetentionLease(final long startingSeqNo, ActionListener listener) { + runUnderPrimaryPermit(() -> { + // Clone the peer recovery retention lease belonging to the source shard. We are retaining history between the the local + // checkpoint of the safe commit we're creating and this lease's retained seqno with the retention lock, and by cloning an + // existing lease we (approximately) know that all our peers are also retaining history as requested by the cloned lease. If + // the recovery now fails before copying enough history over then a subsequent attempt will find this lease, determine it is + // not enough, and fall back to a file-based recovery. + // + // (approximately) because we do not guarantee to be able to satisfy every lease on every peer. + logger.trace("cloning primary's retention lease"); + try { + final StepListener cloneRetentionLeaseStep = new StepListener<>(); + final RetentionLease clonedLease + = shard.cloneLocalPeerRecoveryRetentionLease(request.targetNode().getId(), + new ThreadedActionListener<>(logger, shard.getThreadPool(), + ThreadPool.Names.GENERIC, cloneRetentionLeaseStep, false)); + logger.trace("cloned primary's retention lease as [{}]", clonedLease); + cloneRetentionLeaseStep.whenComplete(rr -> listener.onResponse(clonedLease), listener::onFailure); + } catch (RetentionLeaseNotFoundException e) { + // it's possible that the primary has no retention lease yet if we are doing a rolling upgrade from a version before + // 7.4, and in that case we just create a lease using the local checkpoint of the safe commit which we're using for + // recovery as a conservative estimate for the global checkpoint. + assert shard.indexSettings().getIndexVersionCreated().before(Version.V_7_4_0); + final StepListener addRetentionLeaseStep = new StepListener<>(); + final long estimatedGlobalCheckpoint = startingSeqNo - 1; + final RetentionLease newLease = shard.addPeerRecoveryRetentionLease(request.targetNode().getId(), + estimatedGlobalCheckpoint, new ThreadedActionListener<>(logger, shard.getThreadPool(), + ThreadPool.Names.GENERIC, addRetentionLeaseStep, false)); + addRetentionLeaseStep.whenComplete(rr -> listener.onResponse(newLease), listener::onFailure); + logger.trace("created retention lease with estimated checkpoint of [{}]", estimatedGlobalCheckpoint); + } + }, shardId + " establishing retention lease for [" + request.targetAllocationId() + "]", + shard, cancellableThreads, logger); + } + boolean canSkipPhase1(Store.MetadataSnapshot source, Store.MetadataSnapshot target) { if (source.getSyncId() == null || source.getSyncId().equals(target.getSyncId()) == false) { return false; diff --git a/server/src/test/java/org/elasticsearch/gateway/RecoveryFromGatewayIT.java b/server/src/test/java/org/elasticsearch/gateway/RecoveryFromGatewayIT.java index 962788f09d23b..4656c2da54155 100644 --- a/server/src/test/java/org/elasticsearch/gateway/RecoveryFromGatewayIT.java +++ b/server/src/test/java/org/elasticsearch/gateway/RecoveryFromGatewayIT.java @@ -40,6 +40,7 @@ import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.index.Index; +import org.elasticsearch.index.IndexService; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.MergePolicyConfig; import org.elasticsearch.index.engine.Engine; @@ -427,8 +428,12 @@ public void testReuseInFileBasedPeerRecovery() throws Exception { .setSettings(Settings.builder() .put("number_of_shards", 1) .put("number_of_replicas", 1) + // disable merges to keep segments the same - .put(MergePolicyConfig.INDEX_MERGE_ENABLED, "false") + .put(MergePolicyConfig.INDEX_MERGE_ENABLED, false) + + // expire retention leases quickly + .put(IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING.getKey(), "100ms") ).get(); logger.info("--> indexing docs"); @@ -472,10 +477,13 @@ public Settings onNodeStopped(String nodeName) throws Exception { .put(IndexSettings.INDEX_TRANSLOG_RETENTION_AGE_SETTING.getKey(), "-1") .put(IndexSettings.INDEX_TRANSLOG_RETENTION_SIZE_SETTING.getKey(), "-1") .put(IndexSettings.INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING.getKey(), 0) + .put(IndexSettings.INDEX_SOFT_DELETES_RETENTION_LEASE_PERIOD_SETTING.getKey(), "0s") ).get(); - client(primaryNode).admin().indices().prepareFlush("test").setForce(true).get(); + assertBusy(() -> assertThat(client().admin().indices().prepareStats("test").get().getShards()[0] + .getRetentionLeaseStats().retentionLeases().leases().size(), equalTo(1))); + client().admin().indices().prepareFlush("test").setForce(true).get(); if (softDeleteEnabled) { // We need an extra flush to advance the min_retained_seqno of the SoftDeletesPolicy - client(primaryNode).admin().indices().prepareFlush("test").setForce(true).get(); + client().admin().indices().prepareFlush("test").setForce(true).get(); } return super.onNodeStopped(nodeName); } diff --git a/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java b/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java index 9c6340459f5f0..f05ddce567a8a 100644 --- a/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java +++ b/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java @@ -49,6 +49,7 @@ import org.elasticsearch.index.engine.InternalEngineFactory; import org.elasticsearch.index.engine.InternalEngineTests; import org.elasticsearch.index.mapper.SourceToParse; +import org.elasticsearch.index.seqno.RetentionLease; import org.elasticsearch.index.seqno.RetentionLeases; import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.IndexShard; @@ -79,6 +80,7 @@ import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.everyItem; +import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.isIn; @@ -290,6 +292,15 @@ public void testRecoveryAfterPrimaryPromotion() throws Exception { // We need an extra flush to advance the min_retained_seqno on the new primary so ops-based won't happen. // The min_retained_seqno only advances when a merge asks for the retention query. newPrimary.flush(new FlushRequest().force(true)); + + // We also need to make sure that there is no retention lease holding on to any history. The lease for the old primary + // expires since there are no unassigned shards in this replication group). + assertBusy(() -> { + newPrimary.syncRetentionLeases(); + //noinspection OptionalGetWithoutIsPresent since there must be at least one lease + assertThat(newPrimary.getRetentionLeases().leases().stream().mapToLong(RetentionLease::retainingSequenceNumber) + .min().getAsLong(), greaterThan(newPrimary.seqNoStats().getMaxSeqNo())); + }); } uncommittedOpsOnPrimary = shards.indexDocs(randomIntBetween(0, 10)); totalDocs += uncommittedOpsOnPrimary; diff --git a/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerRetentionLeaseTests.java b/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerRetentionLeaseTests.java index 393ff44ef5c66..7611fad5a7e43 100644 --- a/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerRetentionLeaseTests.java +++ b/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerRetentionLeaseTests.java @@ -20,6 +20,8 @@ package org.elasticsearch.index.seqno; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.action.support.replication.ReplicationResponse; import org.elasticsearch.cluster.routing.AllocationId; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.settings.Settings; @@ -247,6 +249,103 @@ public void testRemoveRetentionLease() { } } + public void testCloneRetentionLease() { + final AllocationId allocationId = AllocationId.newInitializing(); + final AtomicReference replicationTrackerRef = new AtomicReference<>(); + final AtomicLong timeReference = new AtomicLong(); + final AtomicBoolean synced = new AtomicBoolean(); + final ReplicationTracker replicationTracker = new ReplicationTracker( + new ShardId("test", "_na", 0), + allocationId.getId(), + IndexSettingsModule.newIndexSettings("test", Settings.EMPTY), + randomLongBetween(1, Long.MAX_VALUE), + UNASSIGNED_SEQ_NO, + value -> {}, + timeReference::get, + (leases, listener) -> { + assertFalse(Thread.holdsLock(replicationTrackerRef.get())); + assertTrue(synced.compareAndSet(false, true)); + listener.onResponse(new ReplicationResponse()); + }); + replicationTrackerRef.set(replicationTracker); + replicationTracker.updateFromMaster( + randomNonNegativeLong(), + Collections.singleton(allocationId.getId()), + routingTable(Collections.emptySet(), allocationId)); + replicationTracker.activatePrimaryMode(SequenceNumbers.NO_OPS_PERFORMED); + + final long addTime = randomLongBetween(timeReference.get(), Long.MAX_VALUE); + timeReference.set(addTime); + final long minimumRetainingSequenceNumber = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, Long.MAX_VALUE); + final PlainActionFuture addFuture = new PlainActionFuture<>(); + replicationTracker.addRetentionLease("source", minimumRetainingSequenceNumber, "test-source", addFuture); + addFuture.actionGet(); + assertTrue(synced.get()); + synced.set(false); + + final long cloneTime = randomLongBetween(timeReference.get(), Long.MAX_VALUE); + timeReference.set(cloneTime); + final PlainActionFuture cloneFuture = new PlainActionFuture<>(); + final RetentionLease clonedLease = replicationTracker.cloneRetentionLease("source", "target", cloneFuture); + cloneFuture.actionGet(); + assertTrue(synced.get()); + synced.set(false); + + assertThat(clonedLease.id(), equalTo("target")); + assertThat(clonedLease.retainingSequenceNumber(), equalTo(minimumRetainingSequenceNumber)); + assertThat(clonedLease.timestamp(), equalTo(cloneTime)); + assertThat(clonedLease.source(), equalTo("test-source")); + + assertThat(replicationTracker.getRetentionLeases().get("target"), equalTo(clonedLease)); + } + + public void testCloneNonexistentRetentionLease() { + final AllocationId allocationId = AllocationId.newInitializing(); + final ReplicationTracker replicationTracker = new ReplicationTracker( + new ShardId("test", "_na", 0), + allocationId.getId(), + IndexSettingsModule.newIndexSettings("test", Settings.EMPTY), + randomLongBetween(1, Long.MAX_VALUE), + UNASSIGNED_SEQ_NO, + value -> {}, + () -> 0L, + (leases, listener) -> { }); + replicationTracker.updateFromMaster( + randomNonNegativeLong(), + Collections.singleton(allocationId.getId()), + routingTable(Collections.emptySet(), allocationId)); + replicationTracker.activatePrimaryMode(SequenceNumbers.NO_OPS_PERFORMED); + + assertThat(expectThrows(RetentionLeaseNotFoundException.class, + () -> replicationTracker.cloneRetentionLease("nonexistent-lease-id", "target", ActionListener.wrap(() -> {}))).getMessage(), + equalTo("retention lease with ID [nonexistent-lease-id] not found")); + } + + public void testCloneDuplicateRetentionLease() { + final AllocationId allocationId = AllocationId.newInitializing(); + final ReplicationTracker replicationTracker = new ReplicationTracker( + new ShardId("test", "_na", 0), + allocationId.getId(), + IndexSettingsModule.newIndexSettings("test", Settings.EMPTY), + randomLongBetween(1, Long.MAX_VALUE), + UNASSIGNED_SEQ_NO, + value -> {}, + () -> 0L, + (leases, listener) -> { }); + replicationTracker.updateFromMaster( + randomNonNegativeLong(), + Collections.singleton(allocationId.getId()), + routingTable(Collections.emptySet(), allocationId)); + replicationTracker.activatePrimaryMode(SequenceNumbers.NO_OPS_PERFORMED); + + replicationTracker.addRetentionLease("source", randomLongBetween(0L, Long.MAX_VALUE), "test-source", ActionListener.wrap(() -> {})); + replicationTracker.addRetentionLease("exists", randomLongBetween(0L, Long.MAX_VALUE), "test-source", ActionListener.wrap(() -> {})); + + assertThat(expectThrows(RetentionLeaseAlreadyExistsException.class, + () -> replicationTracker.cloneRetentionLease("source", "exists", ActionListener.wrap(() -> {}))).getMessage(), + equalTo("retention lease with ID [exists] already exists")); + } + public void testRemoveNotFound() { final AllocationId allocationId = AllocationId.newInitializing(); long primaryTerm = randomLongBetween(1, Long.MAX_VALUE); diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 69f0078c3f672..9482054486dc2 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -1558,14 +1558,17 @@ public String[] listAll() throws IOException { public void testRefreshMetric() throws IOException { IndexShard shard = newStartedShard(); - assertThat(shard.refreshStats().getTotal(), equalTo(2L)); // refresh on: finalize and end of recovery + // refresh on: finalize and end of recovery + // finalizing a replica involves two refreshes with soft deletes because of estimateNumberOfHistoryOperations() + final long initialRefreshes = shard.routingEntry().primary() || shard.indexSettings().isSoftDeleteEnabled() == false ? 2L : 3L; + assertThat(shard.refreshStats().getTotal(), equalTo(initialRefreshes)); long initialTotalTime = shard.refreshStats().getTotalTimeInMillis(); // check time advances for (int i = 1; shard.refreshStats().getTotalTimeInMillis() == initialTotalTime; i++) { indexDoc(shard, "_doc", "test"); - assertThat(shard.refreshStats().getTotal(), equalTo(2L + i - 1)); + assertThat(shard.refreshStats().getTotal(), equalTo(initialRefreshes + i - 1)); shard.refresh("test"); - assertThat(shard.refreshStats().getTotal(), equalTo(2L + i)); + assertThat(shard.refreshStats().getTotal(), equalTo(initialRefreshes + i)); assertThat(shard.refreshStats().getTotalTimeInMillis(), greaterThanOrEqualTo(initialTotalTime)); } long refreshCount = shard.refreshStats().getTotal(); @@ -1592,18 +1595,18 @@ public void testExternalRefreshMetric() throws IOException { assertThat(shard.refreshStats().getExternalTotal(), equalTo(2L + i)); assertThat(shard.refreshStats().getExternalTotalTimeInMillis(), greaterThanOrEqualTo(initialTotalTime)); } - long externalRefreshCount = shard.refreshStats().getExternalTotal(); - + final long externalRefreshCount = shard.refreshStats().getExternalTotal(); + final long extraInternalRefreshes = shard.routingEntry().primary() || shard.indexSettings().isSoftDeleteEnabled() == false ? 0 : 1; indexDoc(shard, "_doc", "test"); try (Engine.GetResult ignored = shard.get(new Engine.Get(true, false, "_doc", "test", new Term(IdFieldMapper.NAME, Uid.encodeId("test"))))) { assertThat(shard.refreshStats().getExternalTotal(), equalTo(externalRefreshCount)); - assertThat(shard.refreshStats().getExternalTotal(), equalTo(shard.refreshStats().getTotal() - 1)); + assertThat(shard.refreshStats().getExternalTotal(), equalTo(shard.refreshStats().getTotal() - 1 - extraInternalRefreshes)); } indexDoc(shard, "_doc", "test"); shard.writeIndexingBuffer(); assertThat(shard.refreshStats().getExternalTotal(), equalTo(externalRefreshCount)); - assertThat(shard.refreshStats().getExternalTotal(), equalTo(shard.refreshStats().getTotal() - 2)); + assertThat(shard.refreshStats().getExternalTotal(), equalTo(shard.refreshStats().getTotal() - 2 - extraInternalRefreshes)); closeShards(shard); } diff --git a/server/src/test/java/org/elasticsearch/index/shard/PrimaryReplicaSyncerTests.java b/server/src/test/java/org/elasticsearch/index/shard/PrimaryReplicaSyncerTests.java index 481aaa233caed..617fffa6d1b16 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/PrimaryReplicaSyncerTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/PrimaryReplicaSyncerTests.java @@ -124,10 +124,16 @@ public void testSyncerSendsOffCorrectDocuments() throws Exception { assertThat(resyncRequest.getMaxSeenAutoIdTimestampOnPrimary(), equalTo(shard.getMaxSeenAutoIdTimestamp())); } if (syncNeeded && globalCheckPoint < numDocs - 1) { - int skippedOps = Math.toIntExact(globalCheckPoint + 1); // everything up to global checkpoint included - assertThat(resyncTask.getSkippedOperations(), equalTo(skippedOps)); - assertThat(resyncTask.getResyncedOperations(), equalTo(numDocs - skippedOps)); - assertThat(resyncTask.getTotalOperations(), equalTo(globalCheckPoint == numDocs - 1 ? 0 : numDocs)); + if (shard.indexSettings.isSoftDeleteEnabled()) { + assertThat(resyncTask.getSkippedOperations(), equalTo(0)); + assertThat(resyncTask.getResyncedOperations(), equalTo(resyncTask.getTotalOperations())); + assertThat(resyncTask.getTotalOperations(), equalTo(Math.toIntExact(numDocs - 1 - globalCheckPoint))); + } else { + int skippedOps = Math.toIntExact(globalCheckPoint + 1); // everything up to global checkpoint included + assertThat(resyncTask.getSkippedOperations(), equalTo(skippedOps)); + assertThat(resyncTask.getResyncedOperations(), equalTo(numDocs - skippedOps)); + assertThat(resyncTask.getTotalOperations(), equalTo(globalCheckPoint == numDocs - 1 ? 0 : numDocs)); + } } else { assertThat(resyncTask.getSkippedOperations(), equalTo(0)); assertThat(resyncTask.getResyncedOperations(), equalTo(0)); diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java b/server/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java index 60659b78d98cf..9ab56df76d5c3 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java @@ -36,15 +36,22 @@ import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.action.support.WriteRequest.RefreshPolicy; +import org.elasticsearch.action.support.replication.ReplicationResponse; +import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.action.shard.ShardStateAction; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.routing.IndexShardRoutingTable; import org.elasticsearch.cluster.routing.RecoverySource; import org.elasticsearch.cluster.routing.RecoverySource.PeerRecoverySource; import org.elasticsearch.cluster.routing.RecoverySource.SnapshotRecoverySource; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.UnassignedInfo; import org.elasticsearch.cluster.routing.allocation.command.AllocateEmptyPrimaryAllocationCommand; import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.Priority; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; @@ -52,12 +59,14 @@ import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexService; +import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.MockEngineFactoryPlugin; import org.elasticsearch.index.analysis.AbstractTokenFilterFactory; import org.elasticsearch.index.analysis.TokenFilterFactory; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.mapper.MapperParsingException; import org.elasticsearch.index.recovery.RecoveryStats; +import org.elasticsearch.index.seqno.ReplicationTracker; import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.ShardId; @@ -868,6 +877,7 @@ public void testHistoryRetention() throws Exception { internalCluster().stopRandomNode(InternalTestCluster.nameFilter(secondNodeToStop)); final long desyncNanoTime = System.nanoTime(); + //noinspection StatementWithEmptyBody while (System.nanoTime() <= desyncNanoTime) { // time passes } @@ -1007,6 +1017,40 @@ public void testRecoveryFlushReplica() throws Exception { assertThat(syncIds, hasSize(1)); } + public void testRecoveryUsingSyncedFlushWithoutRetentionLease() throws Exception { + internalCluster().ensureAtLeastNumDataNodes(2); + String indexName = "test-index"; + createIndex(indexName, Settings.builder() + .put("index.number_of_shards", 1) + .put("index.number_of_replicas", 1) + .put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), "24h") // do not reallocate the lost shard + .put(IndexSettings.INDEX_SOFT_DELETES_RETENTION_LEASE_PERIOD_SETTING.getKey(), "100ms") // expire leases quickly + .put(IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING.getKey(), "100ms") // sync frequently + .build()); + int numDocs = randomIntBetween(0, 10); + indexRandom(randomBoolean(), false, randomBoolean(), IntStream.range(0, numDocs) + .mapToObj(n -> client().prepareIndex(indexName, "_doc").setSource("num", n)).collect(toList())); + ensureGreen(indexName); + + final ShardId shardId = new ShardId(resolveIndex(indexName), 0); + assertThat(SyncedFlushUtil.attemptSyncedFlush(logger, internalCluster(), shardId).successfulShards(), equalTo(2)); + + final ClusterState clusterState = client().admin().cluster().prepareState().get().getState(); + final ShardRouting shardToResync = randomFrom(clusterState.routingTable().shardRoutingTable(shardId).activeShards()); + internalCluster().restartNode(clusterState.nodes().get(shardToResync.currentNodeId()).getName(), + new InternalTestCluster.RestartCallback() { + @Override + public Settings onNodeStopped(String nodeName) throws Exception { + assertBusy(() -> assertFalse(client().admin().indices().prepareStats(indexName).get() + .getShards()[0].getRetentionLeaseStats().retentionLeases().contains( + ReplicationTracker.getPeerRecoveryRetentionLeaseId(shardToResync)))); + return super.onNodeStopped(nodeName); + } + }); + + ensureGreen(indexName); + } + public void testRecoverLocallyUpToGlobalCheckpoint() throws Exception { internalCluster().ensureAtLeastNumDataNodes(2); List nodes = randomSubsetOf(2, StreamSupport.stream(clusterService().state().nodes().getDataNodes().spliterator(), false) @@ -1065,6 +1109,139 @@ public void testRecoverLocallyUpToGlobalCheckpoint() throws Exception { } } + public void testUsesFileBasedRecoveryIfRetentionLeaseMissing() throws Exception { + internalCluster().ensureAtLeastNumDataNodes(2); + + String indexName = "test-index"; + createIndex(indexName, Settings.builder() + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) + .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true) + .put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), "12h") + .build()); + indexRandom(randomBoolean(), randomBoolean(), randomBoolean(), IntStream.range(0, between(0, 100)) + .mapToObj(n -> client().prepareIndex(indexName, "_doc").setSource("num", n)).collect(toList())); + ensureGreen(indexName); + + final ShardId shardId = new ShardId(resolveIndex(indexName), 0); + final DiscoveryNodes discoveryNodes = clusterService().state().nodes(); + final IndexShardRoutingTable indexShardRoutingTable = clusterService().state().routingTable().shardRoutingTable(shardId); + + final IndexShard primary = internalCluster().getInstance(IndicesService.class, + discoveryNodes.get(indexShardRoutingTable.primaryShard().currentNodeId()).getName()).getShardOrNull(shardId); + + final ShardRouting replicaShardRouting = indexShardRoutingTable.replicaShards().get(0); + internalCluster().restartNode(discoveryNodes.get(replicaShardRouting.currentNodeId()).getName(), + new InternalTestCluster.RestartCallback() { + @Override + public Settings onNodeStopped(String nodeName) throws Exception { + assertFalse(client().admin().cluster().prepareHealth() + .setWaitForNodes(Integer.toString(discoveryNodes.getSize() - 1)) + .setWaitForEvents(Priority.LANGUID).get().isTimedOut()); + + final PlainActionFuture future = new PlainActionFuture<>(); + primary.removeRetentionLease(ReplicationTracker.getPeerRecoveryRetentionLeaseId(replicaShardRouting), future); + future.get(); + + return super.onNodeStopped(nodeName); + } + }); + + ensureGreen(indexName); + + //noinspection OptionalGetWithoutIsPresent because it fails the test if absent + final RecoveryState recoveryState = client().admin().indices().prepareRecoveries(indexName).get() + .shardRecoveryStates().get(indexName).stream().filter(rs -> rs.getPrimary() == false).findFirst().get(); + assertThat(recoveryState.getIndex().totalFileCount(), greaterThan(0)); + } + + public void testUsesFileBasedRecoveryIfRetentionLeaseAheadOfGlobalCheckpoint() throws Exception { + internalCluster().ensureAtLeastNumDataNodes(2); + + String indexName = "test-index"; + createIndex(indexName, Settings.builder() + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) + .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true) + .put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), "12h") + .build()); + indexRandom(randomBoolean(), randomBoolean(), randomBoolean(), IntStream.range(0, between(0, 100)) + .mapToObj(n -> client().prepareIndex(indexName, "_doc").setSource("num", n)).collect(toList())); + ensureGreen(indexName); + + final ShardId shardId = new ShardId(resolveIndex(indexName), 0); + final DiscoveryNodes discoveryNodes = clusterService().state().nodes(); + final IndexShardRoutingTable indexShardRoutingTable = clusterService().state().routingTable().shardRoutingTable(shardId); + + final IndexShard primary = internalCluster().getInstance(IndicesService.class, + discoveryNodes.get(indexShardRoutingTable.primaryShard().currentNodeId()).getName()).getShardOrNull(shardId); + + final ShardRouting replicaShardRouting = indexShardRoutingTable.replicaShards().get(0); + internalCluster().restartNode(discoveryNodes.get(replicaShardRouting.currentNodeId()).getName(), + new InternalTestCluster.RestartCallback() { + @Override + public Settings onNodeStopped(String nodeName) throws Exception { + assertFalse(client().admin().cluster().prepareHealth() + .setWaitForNodes(Integer.toString(discoveryNodes.getSize() - 1)) + .setWaitForEvents(Priority.LANGUID).get().isTimedOut()); + + indexRandom(randomBoolean(), randomBoolean(), randomBoolean(), IntStream.range(0, between(1, 100)) + .mapToObj(n -> client().prepareIndex(indexName, "_doc").setSource("num", n)).collect(toList())); + + // We do not guarantee that the replica can recover locally all the way to its own global checkpoint before starting + // to recover from the primary, so we must be careful not to perform an operations-based recovery if this would require + // some operations that are not being retained. Emulate this by advancing the lease ahead of the replica's GCP: + primary.renewRetentionLease(ReplicationTracker.getPeerRecoveryRetentionLeaseId(replicaShardRouting), + primary.seqNoStats().getMaxSeqNo() + 1, ReplicationTracker.PEER_RECOVERY_RETENTION_LEASE_SOURCE); + + return super.onNodeStopped(nodeName); + } + }); + + ensureGreen(indexName); + + //noinspection OptionalGetWithoutIsPresent because it fails the test if absent + final RecoveryState recoveryState = client().admin().indices().prepareRecoveries(indexName).get() + .shardRecoveryStates().get(indexName).stream().filter(rs -> rs.getPrimary() == false).findFirst().get(); + assertThat(recoveryState.getIndex().totalFileCount(), greaterThan(0)); + } + + public void testDoesNotCopyOperationsInSafeCommit() throws Exception { + internalCluster().ensureAtLeastNumDataNodes(2); + + String indexName = "test-index"; + createIndex(indexName, Settings.builder() + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0) + .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true).build()); + indexRandom(randomBoolean(), randomBoolean(), randomBoolean(), IntStream.range(0, between(0, 100)) + .mapToObj(n -> client().prepareIndex(indexName, "_doc").setSource("num", n)).collect(toList())); + + final ShardId shardId = new ShardId(resolveIndex(indexName), 0); + final DiscoveryNodes discoveryNodes = clusterService().state().nodes(); + final IndexShardRoutingTable indexShardRoutingTable = clusterService().state().routingTable().shardRoutingTable(shardId); + + final IndexShard primary = internalCluster().getInstance(IndicesService.class, + discoveryNodes.get(indexShardRoutingTable.primaryShard().currentNodeId()).getName()).getShardOrNull(shardId); + final long maxSeqNoBeforeRecovery = primary.seqNoStats().getMaxSeqNo(); + assertBusy(() -> assertThat(primary.getLastSyncedGlobalCheckpoint(), equalTo(maxSeqNoBeforeRecovery))); + assertThat(client().admin().indices().prepareFlush(indexName).get().getFailedShards(), is(0)); // makes a safe commit + + indexRandom(randomBoolean(), randomBoolean(), randomBoolean(), IntStream.range(0, between(0, 100)) + .mapToObj(n -> client().prepareIndex(indexName, "_doc").setSource("num", n)).collect(toList())); + + assertAcked(client().admin().indices().prepareUpdateSettings(indexName) + .setSettings(Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1))); + ensureGreen(indexName); + final long maxSeqNoAfterRecovery = primary.seqNoStats().getMaxSeqNo(); + + //noinspection OptionalGetWithoutIsPresent because it fails the test if absent + final RecoveryState recoveryState = client().admin().indices().prepareRecoveries(indexName).get() + .shardRecoveryStates().get(indexName).stream().filter(rs -> rs.getPrimary() == false).findFirst().get(); + assertThat((long)recoveryState.getTranslog().recoveredOperations(), + lessThanOrEqualTo(maxSeqNoAfterRecovery - maxSeqNoBeforeRecovery)); + } + public static final class TestAnalysisPlugin extends Plugin implements AnalysisPlugin { final AtomicBoolean throwParsingError = new AtomicBoolean(); @Override diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java b/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java index f6e1de0233bf7..a175699ad2cf6 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java @@ -64,6 +64,7 @@ import org.elasticsearch.index.mapper.SeqNoFieldMapper; import org.elasticsearch.index.mapper.Uid; import org.elasticsearch.index.seqno.ReplicationTracker; +import org.elasticsearch.index.seqno.RetentionLease; import org.elasticsearch.index.seqno.RetentionLeases; import org.elasticsearch.index.seqno.SeqNoStats; import org.elasticsearch.index.seqno.SequenceNumbers; @@ -78,6 +79,7 @@ import org.elasticsearch.test.DummyShardLock; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.IndexSettingsModule; +import org.elasticsearch.test.VersionUtils; import org.elasticsearch.threadpool.FixedExecutorBuilder; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; @@ -100,6 +102,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Consumer; import java.util.function.IntSupplier; import java.util.zip.CRC32; @@ -441,6 +444,18 @@ public void testThrowExceptionOnPrimaryRelocatedBeforePhase1Started() throws IOE ((ActionListener)invocation.getArguments()[0]).onResponse(() -> {}); return null; }).when(shard).acquirePrimaryOperationPermit(any(), anyString(), anyObject()); + + final IndexMetaData.Builder indexMetaData = IndexMetaData.builder("test").settings(Settings.builder() + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, between(0,5)) + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, between(1,5)) + .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), randomBoolean()) + .put(IndexMetaData.SETTING_VERSION_CREATED, VersionUtils.randomVersion(random())) + .put(IndexMetaData.SETTING_INDEX_UUID, UUIDs.randomBase64UUID(random()))); + if (randomBoolean()) { + indexMetaData.state(IndexMetaData.State.CLOSE); + } + when(shard.indexSettings()).thenReturn(new IndexSettings(indexMetaData.build(), Settings.EMPTY)); + final AtomicBoolean phase1Called = new AtomicBoolean(); final AtomicBoolean prepareTargetForTranslogCalled = new AtomicBoolean(); final AtomicBoolean phase2Called = new AtomicBoolean(); @@ -453,9 +468,10 @@ public void testThrowExceptionOnPrimaryRelocatedBeforePhase1Started() throws IOE between(1, 8)) { @Override - void phase1(IndexCommit snapshot, long globalCheckpoint, IntSupplier translogOps, ActionListener listener) { + void phase1(IndexCommit snapshot, Consumer> createRetentionLease, + IntSupplier translogOps, ActionListener listener) { phase1Called.set(true); - super.phase1(snapshot, globalCheckpoint, translogOps, listener); + super.phase1(snapshot, createRetentionLease, translogOps, listener); } @Override @@ -670,7 +686,9 @@ public void cleanFiles(int totalTranslogOps, long globalCheckpoint, Store.Metada final StepListener phase1Listener = new StepListener<>(); try { final CountDownLatch latch = new CountDownLatch(1); - handler.phase1(DirectoryReader.listCommits(dir).get(0), randomNonNegativeLong(), () -> 0, + handler.phase1(DirectoryReader.listCommits(dir).get(0), + l -> recoveryExecutor.execute(() -> l.onResponse(null)), + () -> 0, new LatchedActionListener<>(phase1Listener, latch)); latch.await(); phase1Listener.result(); diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java b/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java index cecd7dcd80a4c..1ef039ba168f5 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java @@ -79,7 +79,8 @@ public void testTranslogHistoryTransferred() throws Exception { shards.addReplica(); shards.startAll(); final IndexShard replica = shards.getReplicas().get(0); - assertThat(getTranslog(replica).totalOperations(), equalTo(docs + moreDocs)); + boolean softDeletesEnabled = replica.indexSettings().isSoftDeleteEnabled(); + assertThat(getTranslog(replica).totalOperations(), equalTo(softDeletesEnabled ? moreDocs : docs + moreDocs)); shards.assertAllEqual(docs + moreDocs); } } @@ -294,7 +295,8 @@ public void testDifferentHistoryUUIDDisablesOPsRecovery() throws Exception { shards.recoverReplica(newReplica); // file based recovery should be made assertThat(newReplica.recoveryState().getIndex().fileDetails(), not(empty())); - assertThat(getTranslog(newReplica).totalOperations(), equalTo(numDocs)); + boolean softDeletesEnabled = replica.indexSettings().isSoftDeleteEnabled(); + assertThat(getTranslog(newReplica).totalOperations(), equalTo(softDeletesEnabled ? nonFlushedDocs : numDocs)); // history uuid was restored assertThat(newReplica.getHistoryUUID(), equalTo(historyUUID)); @@ -410,7 +412,8 @@ public void testShouldFlushAfterPeerRecovery() throws Exception { shards.recoverReplica(replica); // Make sure the flushing will eventually be completed (eg. `shouldPeriodicallyFlush` is false) assertBusy(() -> assertThat(getEngine(replica).shouldPeriodicallyFlush(), equalTo(false))); - assertThat(getTranslog(replica).totalOperations(), equalTo(numDocs)); + boolean softDeletesEnabled = replica.indexSettings().isSoftDeleteEnabled(); + assertThat(getTranslog(replica).totalOperations(), equalTo(softDeletesEnabled ? 0 : numDocs)); shards.assertAllEqual(numDocs); } }