diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index b3f76bbdf31cc..76cd41df18701 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -159,6 +159,7 @@ import java.util.Locale; import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.Set; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; @@ -1359,6 +1360,81 @@ public void prepareForIndexRecovery() { assert currentEngineReference.get() == null; } + /** + * A best effort to bring up this shard to the global checkpoint using the local translog before performing a peer recovery. + * + * @return a sequence number that an operation-based peer recovery can start with. + * This is the first operation after the local checkpoint of the safe commit if exists. + */ + public long recoverLocallyUpToGlobalCheckpoint() { + if (state != IndexShardState.RECOVERING) { + throw new IndexShardNotRecoveringException(shardId, state); + } + assert recoveryState.getStage() == RecoveryState.Stage.INDEX : "unexpected recovery stage [" + recoveryState.getStage() + "]"; + assert routingEntry().recoverySource().getType() == RecoverySource.Type.PEER : "not a peer recovery [" + routingEntry() + "]"; + final Optional safeCommit; + final long globalCheckpoint; + try { + final String translogUUID = store.readLastCommittedSegmentsInfo().getUserData().get(Translog.TRANSLOG_UUID_KEY); + globalCheckpoint = Translog.readGlobalCheckpoint(translogConfig.getTranslogPath(), translogUUID); + safeCommit = store.findSafeIndexCommit(globalCheckpoint); + } catch (org.apache.lucene.index.IndexNotFoundException e) { + logger.trace("skip local recovery as no index commit found"); + return UNASSIGNED_SEQ_NO; + } catch (Exception e) { + logger.debug("skip local recovery as failed to find the safe commit", e); + return UNASSIGNED_SEQ_NO; + } + if (safeCommit.isPresent() == false) { + logger.trace("skip local recovery as no safe commit found"); + return UNASSIGNED_SEQ_NO; + } + assert safeCommit.get().localCheckpoint <= globalCheckpoint : safeCommit.get().localCheckpoint + " > " + globalCheckpoint; + try { + maybeCheckIndex(); // check index here and won't do it again if ops-based recovery occurs + recoveryState.setStage(RecoveryState.Stage.TRANSLOG); + if (safeCommit.get().localCheckpoint == globalCheckpoint) { + logger.trace("skip local recovery as the safe commit is up to date; safe commit {} global checkpoint {}", + safeCommit.get(), globalCheckpoint); + recoveryState.getTranslog().totalLocal(0); + return globalCheckpoint + 1; + } + try { + final Engine.TranslogRecoveryRunner translogRecoveryRunner = (engine, snapshot) -> { + recoveryState.getTranslog().totalLocal(snapshot.totalOperations()); + final int recoveredOps = runTranslogRecovery(engine, snapshot, Engine.Operation.Origin.LOCAL_TRANSLOG_RECOVERY, + recoveryState.getTranslog()::incrementRecoveredOperations); + recoveryState.getTranslog().totalLocal(recoveredOps); // adjust the total local to reflect the actual count + return recoveredOps; + }; + innerOpenEngineAndTranslog(); + getEngine().recoverFromTranslog(translogRecoveryRunner, globalCheckpoint); + logger.trace("shard locally recovered up to {}", getEngine().getSeqNoStats(globalCheckpoint)); + } finally { + synchronized (mutex) { + IOUtils.close(currentEngineReference.getAndSet(null)); + } + } + } catch (Exception e) { + logger.debug(new ParameterizedMessage("failed to recover shard locally up to global checkpoint {}", globalCheckpoint), e); + return UNASSIGNED_SEQ_NO; + } + try { + // we need to find the safe commit again as we should have created a new one during the local recovery + final Optional newSafeCommit = store.findSafeIndexCommit(globalCheckpoint); + assert newSafeCommit.isPresent() : "no safe commit found after local recovery"; + return newSafeCommit.get().localCheckpoint + 1; + } catch (Exception e) { + if (Assertions.ENABLED) { + throw new AssertionError( + "failed to find the safe commit after recovering shard locally up to global checkpoint " + globalCheckpoint, e); + } + logger.debug(new ParameterizedMessage( + "failed to find the safe commit after recovering shard locally up to global checkpoint {}", globalCheckpoint), e); + return UNASSIGNED_SEQ_NO; + } + } + public void trimOperationOfPreviousPrimaryTerms(long aboveSeqNo) { getEngine().trimOperationsFromTranslog(getOperationPrimaryTerm(), aboveSeqNo); } @@ -1462,6 +1538,9 @@ int runTranslogRecovery(Engine engine, Translog.Snapshot snapshot, Engine.Operat * Operations from the translog will be replayed to bring lucene up to date. **/ public void openEngineAndRecoverFromTranslog() throws IOException { + assert recoveryState.getStage() == RecoveryState.Stage.INDEX : "unexpected recovery stage [" + recoveryState.getStage() + "]"; + maybeCheckIndex(); + recoveryState.setStage(RecoveryState.Stage.TRANSLOG); final RecoveryState.Translog translogRecoveryStats = recoveryState.getTranslog(); final Engine.TranslogRecoveryRunner translogRecoveryRunner = (engine, snapshot) -> { translogRecoveryStats.totalOperations(snapshot.totalOperations()); @@ -1478,6 +1557,8 @@ public void openEngineAndRecoverFromTranslog() throws IOException { * The translog is kept but its operations won't be replayed. */ public void openEngineAndSkipTranslogRecovery() throws IOException { + assert routingEntry().recoverySource().getType() == RecoverySource.Type.PEER : "not a peer recovery [" + routingEntry() + "]"; + assert recoveryState.getStage() == RecoveryState.Stage.TRANSLOG : "unexpected recovery stage [" + recoveryState.getStage() + "]"; innerOpenEngineAndTranslog(); getEngine().skipTranslogRecovery(); } @@ -1486,17 +1567,6 @@ private void innerOpenEngineAndTranslog() throws IOException { if (state != IndexShardState.RECOVERING) { throw new IndexShardNotRecoveringException(shardId, state); } - recoveryState.setStage(RecoveryState.Stage.VERIFY_INDEX); - // also check here, before we apply the translog - if (Booleans.isTrue(checkIndexOnStartup) || "checksum".equals(checkIndexOnStartup)) { - try { - checkIndex(); - } catch (IOException ex) { - throw new RecoveryFailedException(recoveryState, "check index failed", ex); - } - } - recoveryState.setStage(RecoveryState.Stage.TRANSLOG); - final EngineConfig config = newEngineConfig(); // we disable deletes since we allow for operations to be executed against the shard while recovering @@ -1552,14 +1622,22 @@ private void onNewEngine(Engine newEngine) { */ public void performRecoveryRestart() throws IOException { synchronized (mutex) { - if (state != IndexShardState.RECOVERING) { - throw new IndexShardNotRecoveringException(shardId, state); - } assert refreshListeners.pendingCount() == 0 : "we can't restart with pending listeners"; - final Engine engine = this.currentEngineReference.getAndSet(null); - IOUtils.close(engine); - recoveryState().setStage(RecoveryState.Stage.INIT); + IOUtils.close(currentEngineReference.getAndSet(null)); + resetRecoveryStage(); + } + } + + /** + * If a file-based recovery occurs, a recovery target calls this method to reset the recovery stage. + */ + public void resetRecoveryStage() { + assert routingEntry().recoverySource().getType() == RecoverySource.Type.PEER : "not a peer recovery [" + routingEntry() + "]"; + assert currentEngineReference.get() == null; + if (state != IndexShardState.RECOVERING) { + throw new IndexShardNotRecoveringException(shardId, state); } + recoveryState().setStage(RecoveryState.Stage.INIT); } /** @@ -2296,6 +2374,17 @@ public void noopUpdate(String type) { internalIndexingStats.noopUpdate(type); } + public void maybeCheckIndex() { + recoveryState.setStage(RecoveryState.Stage.VERIFY_INDEX); + if (Booleans.isTrue(checkIndexOnStartup) || "checksum".equals(checkIndexOnStartup)) { + try { + checkIndex(); + } catch (IOException ex) { + throw new RecoveryFailedException(recoveryState, "check index failed", ex); + } + } + } + void checkIndex() throws IOException { if (store.tryIncRef()) { try { diff --git a/server/src/main/java/org/elasticsearch/index/store/Store.java b/server/src/main/java/org/elasticsearch/index/store/Store.java index 410774114bd78..eae9ca3a8bda6 100644 --- a/server/src/main/java/org/elasticsearch/index/store/Store.java +++ b/server/src/main/java/org/elasticsearch/index/store/Store.java @@ -96,6 +96,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -1560,6 +1561,22 @@ public void trimUnsafeCommits(final long lastSyncedGlobalCheckpoint, final long } } + /** + * Returns a {@link org.elasticsearch.index.seqno.SequenceNumbers.CommitInfo} of the safe commit if exists. + */ + public Optional findSafeIndexCommit(long globalCheckpoint) throws IOException { + final List commits = DirectoryReader.listCommits(directory); + assert commits.isEmpty() == false : "no commit found"; + final IndexCommit safeCommit = CombinedDeletionPolicy.findSafeCommitPoint(commits, globalCheckpoint); + final SequenceNumbers.CommitInfo commitInfo = SequenceNumbers.loadSeqNoInfoFromLuceneCommit(safeCommit.getUserData().entrySet()); + // all operations of the safe commit must be at most the global checkpoint. + if (commitInfo.maxSeqNo <= globalCheckpoint) { + return Optional.of(commitInfo); + } else { + return Optional.empty(); + } + } + private static void updateCommitData(IndexWriter writer, Map keysToUpdate) throws IOException { final Map userData = getUserData(writer); userData.putAll(keysToUpdate); diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java b/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java index 8a11cdf5ec961..8ef1ec1ae7249 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java @@ -22,8 +22,6 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; -import org.apache.lucene.index.DirectoryReader; -import org.apache.lucene.index.IndexCommit; import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.store.RateLimiter; import org.elasticsearch.ElasticsearchException; @@ -44,18 +42,14 @@ import org.elasticsearch.common.util.CancellableThreads; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.index.IndexNotFoundException; -import org.elasticsearch.index.engine.CombinedDeletionPolicy; import org.elasticsearch.index.engine.RecoveryEngineException; import org.elasticsearch.index.mapper.MapperException; -import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.IllegalIndexShardStateException; import org.elasticsearch.index.shard.IndexEventListener; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardNotFoundException; import org.elasticsearch.index.store.Store; -import org.elasticsearch.index.translog.Translog; -import org.elasticsearch.index.translog.TranslogCorruptedException; import org.elasticsearch.indices.recovery.RecoveriesCollection.RecoveryRef; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; @@ -68,12 +62,11 @@ import org.elasticsearch.transport.TransportService; import java.io.IOException; -import java.util.List; -import java.util.StringJoiner; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Consumer; import static org.elasticsearch.common.unit.TimeValue.timeValueMillis; +import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO; /** * The recovery target handles recoveries of peer shards of the shard+node to recover to. @@ -178,9 +171,12 @@ private void doRecovery(final long recoveryId) { cancellableThreads = recoveryTarget.cancellableThreads(); try { assert recoveryTarget.sourceNode() != null : "can not do a recovery without a source node"; - request = getStartRecoveryRequest(recoveryTarget); logger.trace("{} preparing shard for peer recovery", recoveryTarget.shardId()); recoveryTarget.indexShard().prepareForIndexRecovery(); + final long startingSeqNo = recoveryTarget.indexShard().recoverLocallyUpToGlobalCheckpoint(); + assert startingSeqNo == UNASSIGNED_SEQ_NO || recoveryTarget.state().getStage() == RecoveryState.Stage.TRANSLOG : + "unexpected recovery stage [" + recoveryTarget.state().getStage() + "] starting seqno [ " + startingSeqNo + "]"; + request = getStartRecoveryRequest(logger, clusterService.localNode(), recoveryTarget, startingSeqNo); } catch (final Exception e) { // this will be logged as warning later on... logger.trace("unexpected error while preparing shard for peer recovery, failing recovery", e); @@ -319,7 +315,7 @@ public RecoveryResponse read(StreamInput in) throws IOException { * @param recoveryTarget the target of the recovery * @return a snapshot of the store metadata */ - private Store.MetadataSnapshot getStoreMetadataSnapshot(final RecoveryTarget recoveryTarget) { + private static Store.MetadataSnapshot getStoreMetadataSnapshot(final Logger logger, final RecoveryTarget recoveryTarget) { try { return recoveryTarget.indexShard().snapshotStoreMetadata(); } catch (final org.apache.lucene.index.IndexNotFoundException e) { @@ -335,38 +331,25 @@ private Store.MetadataSnapshot getStoreMetadataSnapshot(final RecoveryTarget rec /** * Prepare the start recovery request. * + * @param logger the logger + * @param localNode the local node of the recovery target * @param recoveryTarget the target of the recovery + * @param startingSeqNo a sequence number that an operation-based peer recovery can start with. + * This is the first operation after the local checkpoint of the safe commit if exists. * @return a start recovery request */ - private StartRecoveryRequest getStartRecoveryRequest(final RecoveryTarget recoveryTarget) { + public static StartRecoveryRequest getStartRecoveryRequest(Logger logger, DiscoveryNode localNode, + RecoveryTarget recoveryTarget, long startingSeqNo) { final StartRecoveryRequest request; logger.trace("{} collecting local files for [{}]", recoveryTarget.shardId(), recoveryTarget.sourceNode()); - final Store.MetadataSnapshot metadataSnapshot = getStoreMetadataSnapshot(recoveryTarget); + final Store.MetadataSnapshot metadataSnapshot = getStoreMetadataSnapshot(logger, recoveryTarget); logger.trace("{} local file count [{}]", recoveryTarget.shardId(), metadataSnapshot.size()); - - final long startingSeqNo; - if (metadataSnapshot.size() > 0) { - startingSeqNo = getStartingSeqNo(logger, recoveryTarget); - } else { - startingSeqNo = SequenceNumbers.UNASSIGNED_SEQ_NO; - } - - if (startingSeqNo == SequenceNumbers.UNASSIGNED_SEQ_NO) { - logger.trace("{} preparing for file-based recovery from [{}]", recoveryTarget.shardId(), recoveryTarget.sourceNode()); - } else { - logger.trace( - "{} preparing for sequence-number-based recovery starting at sequence number [{}] from [{}]", - recoveryTarget.shardId(), - startingSeqNo, - recoveryTarget.sourceNode()); - } - request = new StartRecoveryRequest( recoveryTarget.shardId(), recoveryTarget.indexShard().routingEntry().allocationId().getId(), recoveryTarget.sourceNode(), - clusterService.localNode(), + localNode, metadataSnapshot, recoveryTarget.state().getPrimary(), recoveryTarget.recoveryId(), @@ -374,50 +357,6 @@ private StartRecoveryRequest getStartRecoveryRequest(final RecoveryTarget recove return request; } - /** - * Get the starting sequence number for a sequence-number-based request. - * - * @param recoveryTarget the target of the recovery - * @return the starting sequence number or {@link SequenceNumbers#UNASSIGNED_SEQ_NO} if obtaining the starting sequence number - * failed - */ - public static long getStartingSeqNo(final Logger logger, final RecoveryTarget recoveryTarget) { - try { - final Store store = recoveryTarget.store(); - final String translogUUID = store.readLastCommittedSegmentsInfo().getUserData().get(Translog.TRANSLOG_UUID_KEY); - final long globalCheckpoint = Translog.readGlobalCheckpoint(recoveryTarget.translogLocation(), translogUUID); - final List existingCommits = DirectoryReader.listCommits(store.directory()); - final IndexCommit safeCommit = CombinedDeletionPolicy.findSafeCommitPoint(existingCommits, globalCheckpoint); - final SequenceNumbers.CommitInfo seqNoStats = Store.loadSeqNoInfo(safeCommit); - if (logger.isTraceEnabled()) { - final StringJoiner descriptionOfExistingCommits = new StringJoiner(","); - for (IndexCommit commit : existingCommits) { - descriptionOfExistingCommits.add(CombinedDeletionPolicy.commitDescription(commit)); - } - logger.trace("Calculate starting seqno based on global checkpoint [{}], safe commit [{}], existing commits [{}]", - globalCheckpoint, CombinedDeletionPolicy.commitDescription(safeCommit), descriptionOfExistingCommits); - } - if (seqNoStats.maxSeqNo <= globalCheckpoint) { - assert seqNoStats.localCheckpoint <= globalCheckpoint; - /* - * Commit point is good for sequence-number based recovery as the maximum sequence number included in it is below the global - * checkpoint (i.e., it excludes any operations that may not be on the primary). Recovery will start at the first operation - * after the local checkpoint stored in the commit. - */ - return seqNoStats.localCheckpoint + 1; - } else { - return SequenceNumbers.UNASSIGNED_SEQ_NO; - } - } catch (final TranslogCorruptedException | IOException e) { - /* - * This can happen, for example, if a phase one of the recovery completed successfully, a network partition happens before the - * translog on the recovery target is opened, the recovery enters a retry loop seeing now that the index files are on disk and - * proceeds to attempt a sequence-number-based recovery. - */ - return SequenceNumbers.UNASSIGNED_SEQ_NO; - } - } - public interface RecoveryListener { void onRecoveryDone(RecoveryState state); diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryState.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryState.java index eacbcd37a958c..8f398db2f9211 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryState.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryState.java @@ -19,6 +19,7 @@ package org.elasticsearch.indices.recovery; +import org.elasticsearch.Version; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.RecoverySource; import org.elasticsearch.cluster.routing.ShardRouting; @@ -33,6 +34,7 @@ import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; +import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.ShardId; import java.io.IOException; @@ -459,6 +461,7 @@ public static class Translog extends Timer implements ToXContentFragment, Writea private int recovered; private int total = UNKNOWN; private int totalOnStart = UNKNOWN; + private int totalLocal = UNKNOWN; public Translog() { } @@ -468,6 +471,9 @@ public Translog(StreamInput in) throws IOException { recovered = in.readVInt(); total = in.readVInt(); totalOnStart = in.readVInt(); + if (in.getVersion().onOrAfter(Version.V_8_0_0)) { + totalLocal = in.readVInt(); + } } @Override @@ -476,6 +482,9 @@ public void writeTo(StreamOutput out) throws IOException { out.writeVInt(recovered); out.writeVInt(total); out.writeVInt(totalOnStart); + if (out.getVersion().onOrAfter(Version.V_8_0_0)) { + out.writeVInt(totalLocal); + } } public synchronized void reset() { @@ -483,6 +492,7 @@ public synchronized void reset() { recovered = 0; total = UNKNOWN; totalOnStart = UNKNOWN; + totalLocal = UNKNOWN; } public synchronized void incrementRecoveredOperations() { @@ -524,8 +534,8 @@ public synchronized int totalOperations() { } public synchronized void totalOperations(int total) { - this.total = total; - assert total == UNKNOWN || total >= recovered : "total, if known, should be > recovered. total [" + total + + this.total = totalLocal == UNKNOWN ? total : totalLocal + total; + assert total == UNKNOWN || this.total >= recovered : "total, if known, should be > recovered. total [" + total + "], recovered [" + recovered + "]"; } @@ -540,7 +550,20 @@ public synchronized int totalOperationsOnStart() { } public synchronized void totalOperationsOnStart(int total) { - this.totalOnStart = total; + this.totalOnStart = totalLocal == UNKNOWN ? total : totalLocal + total; + } + + /** + * Sets the total number of translog operations to be recovered locally before performing peer recovery + * @see IndexShard#recoverLocallyUpToGlobalCheckpoint() + */ + public synchronized void totalLocal(int totalLocal) { + assert totalLocal >= recovered : totalLocal + " < " + recovered; + this.totalLocal = totalLocal; + } + + public synchronized int totalLocal() { + return totalLocal; } public synchronized float recoveredPercent() { diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java index 0b3f95b2ec4a2..2c3243ccf9cd3 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java @@ -380,6 +380,8 @@ public void receiveFileInfo(List phase1FileNames, int totalTranslogOps, ActionListener listener) { ActionListener.completeWith(listener, () -> { + indexShard.resetRecoveryStage(); + indexShard.prepareForIndexRecovery(); final RecoveryState.Index index = state().getIndex(); for (int i = 0; i < phase1ExistingFileNames.size(); i++) { index.addFileDetail(phase1ExistingFileNames.get(i), phase1ExistingFileSizes.get(i), true); @@ -417,7 +419,8 @@ public void cleanFiles(int totalTranslogOps, long globalCheckpoint, Store.Metada } else { assert indexShard.assertRetentionLeasesPersisted(); } - + indexShard.maybeCheckIndex(); + state().setStage(RecoveryState.Stage.TRANSLOG); } catch (CorruptIndexException | IndexFormatTooNewException | IndexFormatTooOldException ex) { // this is a fatal exception at this stage. // this means we transferred files from the remote that have not be checksummed and they are diff --git a/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java b/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java index c60f32132c646..9c6340459f5f0 100644 --- a/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java +++ b/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java @@ -66,6 +66,7 @@ import java.util.EnumSet; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; @@ -114,31 +115,26 @@ public void testRecoveryOfDisconnectedReplica() throws Exception { int docs = shards.indexDocs(randomInt(50)); shards.flush(); final IndexShard originalReplica = shards.getReplicas().get(0); - long replicaCommittedLocalCheckpoint = docs - 1; - boolean replicaHasDocsSinceLastFlushedCheckpoint = false; for (int i = 0; i < randomInt(2); i++) { final int indexedDocs = shards.indexDocs(randomInt(5)); docs += indexedDocs; - if (indexedDocs > 0) { - replicaHasDocsSinceLastFlushedCheckpoint = true; - } final boolean flush = randomBoolean(); if (flush) { originalReplica.flush(new FlushRequest()); - replicaHasDocsSinceLastFlushedCheckpoint = false; - replicaCommittedLocalCheckpoint = docs - 1; } } // simulate a background global checkpoint sync at which point we expect the global checkpoint to advance on the replicas shards.syncGlobalCheckpoint(); - + long globalCheckpointOnReplica = originalReplica.getLastSyncedGlobalCheckpoint(); + Optional safeCommitOnReplica = + originalReplica.store().findSafeIndexCommit(globalCheckpointOnReplica); + assertTrue(safeCommitOnReplica.isPresent()); shards.removeReplica(originalReplica); final int missingOnReplica = shards.indexDocs(randomInt(5)); docs += missingOnReplica; - replicaHasDocsSinceLastFlushedCheckpoint |= missingOnReplica > 0; final boolean translogTrimmed; if (randomBoolean()) { @@ -157,14 +153,15 @@ public void testRecoveryOfDisconnectedReplica() throws Exception { final IndexShard recoveredReplica = shards.addReplicaWithExistingPath(originalReplica.shardPath(), originalReplica.routingEntry().currentNodeId()); shards.recoverReplica(recoveredReplica); - if (translogTrimmed && replicaHasDocsSinceLastFlushedCheckpoint) { + if (translogTrimmed && missingOnReplica > 0) { // replica has something to catch up with, but since we trimmed the primary translog, we should fall back to full recovery assertThat(recoveredReplica.recoveryState().getIndex().fileDetails(), not(empty())); } else { assertThat(recoveredReplica.recoveryState().getIndex().fileDetails(), empty()); - assertThat( - recoveredReplica.recoveryState().getTranslog().recoveredOperations(), - equalTo(Math.toIntExact(docs - (replicaCommittedLocalCheckpoint + 1)))); + assertThat(recoveredReplica.recoveryState().getTranslog().recoveredOperations(), + equalTo(Math.toIntExact(docs - 1 - safeCommitOnReplica.get().localCheckpoint))); + assertThat(recoveredReplica.recoveryState().getTranslog().totalLocal(), + equalTo(Math.toIntExact(globalCheckpointOnReplica - safeCommitOnReplica.get().localCheckpoint))); } docs += shards.indexDocs(randomInt(5)); @@ -231,10 +228,9 @@ public void testRecoveryAfterPrimaryPromotion() throws Exception { try (ReplicationGroup shards = createGroup(2)) { shards.startAll(); int totalDocs = shards.indexDocs(randomInt(10)); - int committedDocs = 0; + shards.syncGlobalCheckpoint(); if (randomBoolean()) { shards.flush(); - committedDocs = totalDocs; } final IndexShard oldPrimary = shards.getPrimary(); @@ -254,7 +250,10 @@ public void testRecoveryAfterPrimaryPromotion() throws Exception { oldPrimary.flush(new FlushRequest(index.getName())); } } - + long globalCheckpointOnOldPrimary = oldPrimary.getLastSyncedGlobalCheckpoint(); + Optional safeCommitOnOldPrimary = + oldPrimary.store().findSafeIndexCommit(globalCheckpointOnOldPrimary); + assertTrue(safeCommitOnOldPrimary.isPresent()); shards.promoteReplicaToPrimary(newPrimary).get(); // check that local checkpoint of new primary is properly tracked after primary promotion @@ -310,7 +309,10 @@ public void testRecoveryAfterPrimaryPromotion() throws Exception { if (expectSeqNoRecovery) { assertThat(newReplica.recoveryState().getIndex().fileDetails(), empty()); - assertThat(newReplica.recoveryState().getTranslog().recoveredOperations(), equalTo(totalDocs - committedDocs)); + assertThat(newReplica.recoveryState().getTranslog().totalLocal(), + equalTo(Math.toIntExact(globalCheckpointOnOldPrimary - safeCommitOnOldPrimary.get().localCheckpoint))); + assertThat(newReplica.recoveryState().getTranslog().recoveredOperations(), + equalTo(Math.toIntExact(totalDocs - 1 - safeCommitOnOldPrimary.get().localCheckpoint))); } else { assertThat(newReplica.recoveryState().getIndex().fileDetails(), not(empty())); assertThat(newReplica.recoveryState().getTranslog().recoveredOperations(), equalTo(uncommittedOpsOnPrimary)); diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java b/server/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java index f5d72a01eb0a6..5894e9c364fb8 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java @@ -34,6 +34,7 @@ import org.elasticsearch.action.admin.indices.stats.ShardStats; import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.action.support.WriteRequest.RefreshPolicy; import org.elasticsearch.cluster.action.shard.ShardStateAction; import org.elasticsearch.cluster.metadata.IndexMetaData; @@ -52,9 +53,11 @@ import org.elasticsearch.index.IndexService; import org.elasticsearch.index.analysis.AbstractTokenFilterFactory; import org.elasticsearch.index.analysis.TokenFilterFactory; +import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.mapper.MapperParsingException; import org.elasticsearch.index.recovery.RecoveryStats; import org.elasticsearch.index.seqno.SequenceNumbers; +import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.store.Store; import org.elasticsearch.indices.IndicesService; @@ -99,6 +102,7 @@ import java.util.stream.Collectors; import java.util.stream.IntStream; import java.util.stream.Stream; +import java.util.stream.StreamSupport; import static java.util.Collections.singletonMap; import static java.util.stream.Collectors.toList; @@ -997,6 +1001,63 @@ public void testRecoveryFlushReplica() throws Exception { assertThat(syncIds, hasSize(1)); } + public void testRecoverLocallyUpToGlobalCheckpoint() throws Exception { + internalCluster().ensureAtLeastNumDataNodes(2); + List nodes = randomSubsetOf(2, StreamSupport.stream(clusterService().state().nodes().getDataNodes().spliterator(), false) + .map(node -> node.value.getName()).collect(Collectors.toSet())); + String indexName = "test-index"; + createIndex(indexName, Settings.builder() + .put("index.number_of_shards", 1) + .put("index.number_of_replicas", 1) + // disable global checkpoint background sync so we can verify the start recovery request + .put(IndexService.GLOBAL_CHECKPOINT_SYNC_INTERVAL_SETTING.getKey(), "12h") + .put("index.routing.allocation.include._name", String.join(",", nodes)) + .build()); + ensureGreen(indexName); + int numDocs = randomIntBetween(0, 100); + indexRandom(randomBoolean(), false, randomBoolean(), IntStream.range(0, numDocs) + .mapToObj(n -> client().prepareIndex(indexName, "_doc").setSource("num", n)).collect(toList())); + client().admin().indices().prepareRefresh(indexName).get(); // avoid refresh when we are failing a shard + String failingNode = randomFrom(nodes); + PlainActionFuture startRecoveryRequestFuture = new PlainActionFuture<>(); + for (String node : nodes) { + MockTransportService transportService = (MockTransportService) internalCluster().getInstance(TransportService.class, node); + transportService.addSendBehavior((connection, requestId, action, request, options) -> { + if (action.equals(PeerRecoverySourceService.Actions.START_RECOVERY)) { + startRecoveryRequestFuture.onResponse((StartRecoveryRequest) request); + } + connection.sendRequest(requestId, action, request, options); + }); + } + IndexShard shard = internalCluster().getInstance(IndicesService.class, failingNode) + .getShardOrNull(new ShardId(resolveIndex(indexName), 0)); + final long lastSyncedGlobalCheckpoint = shard.getLastSyncedGlobalCheckpoint(); + final long localCheckpointOfSafeCommit; + try(Engine.IndexCommitRef safeCommitRef = shard.acquireSafeIndexCommit()){ + localCheckpointOfSafeCommit = + SequenceNumbers.loadSeqNoInfoFromLuceneCommit(safeCommitRef.getIndexCommit().getUserData().entrySet()).localCheckpoint; + } + final long maxSeqNo = shard.seqNoStats().getMaxSeqNo(); + shard.failShard("test", new IOException("simulated")); + StartRecoveryRequest startRecoveryRequest = startRecoveryRequestFuture.actionGet(); + SequenceNumbers.CommitInfo commitInfoAfterLocalRecovery = SequenceNumbers.loadSeqNoInfoFromLuceneCommit( + startRecoveryRequest.metadataSnapshot().getCommitUserData().entrySet()); + assertThat(commitInfoAfterLocalRecovery.localCheckpoint, equalTo(lastSyncedGlobalCheckpoint)); + assertThat(commitInfoAfterLocalRecovery.maxSeqNo, equalTo(lastSyncedGlobalCheckpoint)); + assertThat(startRecoveryRequest.startingSeqNo(), equalTo(lastSyncedGlobalCheckpoint + 1)); + ensureGreen(indexName); + for (RecoveryState recoveryState : client().admin().indices().prepareRecoveries().get().shardRecoveryStates().get(indexName)) { + if (startRecoveryRequest.targetNode().equals(recoveryState.getTargetNode())) { + assertThat("total recovered translog operations must include both local and remote recovery", + recoveryState.getTranslog().recoveredOperations(), equalTo(Math.toIntExact(maxSeqNo - localCheckpointOfSafeCommit))); + } + } + for (String node : nodes) { + MockTransportService transportService = (MockTransportService) internalCluster().getInstance(TransportService.class, node); + transportService.clearAllRules(); + } + } + public static final class TestAnalysisPlugin extends Plugin implements AnalysisPlugin { final AtomicBoolean throwParsingError = new AtomicBoolean(); @Override diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java b/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java index 1154ce99078b0..e3d299067910f 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java @@ -19,21 +19,22 @@ package org.elasticsearch.indices.recovery; -import org.apache.lucene.index.DirectoryReader; -import org.apache.lucene.index.IndexCommit; -import org.apache.lucene.index.IndexWriter; -import org.apache.lucene.index.IndexWriterConfig; -import org.apache.lucene.index.NoMergePolicy; import org.apache.lucene.store.IOContext; import org.apache.lucene.store.IndexInput; +import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.flush.FlushRequest; +import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.routing.RecoverySource; +import org.elasticsearch.cluster.routing.ShardRoutingHelper; +import org.elasticsearch.common.CheckedFunction; import org.elasticsearch.common.Randomness; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.bytes.BytesArray; -import org.elasticsearch.common.lucene.Lucene; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.index.mapper.SourceToParse; import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardTestCase; @@ -41,93 +42,23 @@ import org.elasticsearch.index.store.StoreFileMetaData; import org.elasticsearch.index.translog.Translog; +import java.io.IOException; import java.util.ArrayList; import java.util.Collections; -import java.util.HashMap; import java.util.List; -import java.util.Map; +import java.util.Optional; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CyclicBarrier; import java.util.stream.Collectors; +import java.util.stream.LongStream; +import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; public class PeerRecoveryTargetServiceTests extends IndexShardTestCase { - public void testGetStartingSeqNo() throws Exception { - final IndexShard replica = newShard(false); - try { - // Empty store - { - recoveryEmptyReplica(replica, true); - final RecoveryTarget recoveryTarget = new RecoveryTarget(replica, null, null); - assertThat(PeerRecoveryTargetService.getStartingSeqNo(logger, recoveryTarget), equalTo(0L)); - recoveryTarget.decRef(); - } - // Last commit is good - use it. - final long initDocs = scaledRandomIntBetween(1, 10); - { - for (int i = 0; i < initDocs; i++) { - indexDoc(replica, "_doc", Integer.toString(i)); - if (randomBoolean()) { - flushShard(replica); - } - } - flushShard(replica); - replica.updateGlobalCheckpointOnReplica(initDocs - 1, "test"); - replica.sync(); - final RecoveryTarget recoveryTarget = new RecoveryTarget(replica, null, null); - assertThat(PeerRecoveryTargetService.getStartingSeqNo(logger, recoveryTarget), equalTo(initDocs)); - recoveryTarget.decRef(); - } - // Global checkpoint does not advance, last commit is not good - use the previous commit - final int moreDocs = randomIntBetween(1, 10); - { - for (int i = 0; i < moreDocs; i++) { - indexDoc(replica, "_doc", Long.toString(i)); - if (randomBoolean()) { - flushShard(replica); - } - } - flushShard(replica); - final RecoveryTarget recoveryTarget = new RecoveryTarget(replica, null, null); - assertThat(PeerRecoveryTargetService.getStartingSeqNo(logger, recoveryTarget), equalTo(initDocs)); - recoveryTarget.decRef(); - } - // Advances the global checkpoint, a safe commit also advances - { - replica.updateGlobalCheckpointOnReplica(initDocs + moreDocs - 1, "test"); - replica.sync(); - final RecoveryTarget recoveryTarget = new RecoveryTarget(replica, null, null); - assertThat(PeerRecoveryTargetService.getStartingSeqNo(logger, recoveryTarget), equalTo(initDocs + moreDocs)); - recoveryTarget.decRef(); - } - // Different translogUUID, fallback to file-based - { - replica.close("test", false); - final List commits = DirectoryReader.listCommits(replica.store().directory()); - IndexWriterConfig iwc = new IndexWriterConfig(null) - .setSoftDeletesField(Lucene.SOFT_DELETES_FIELD) - .setCommitOnClose(false) - .setMergePolicy(NoMergePolicy.INSTANCE) - .setOpenMode(IndexWriterConfig.OpenMode.APPEND); - try (IndexWriter writer = new IndexWriter(replica.store().directory(), iwc)) { - final Map userData = new HashMap<>(commits.get(commits.size() - 1).getUserData()); - userData.put(Translog.TRANSLOG_UUID_KEY, UUIDs.randomBase64UUID()); - writer.setLiveCommitData(userData.entrySet()); - writer.commit(); - } - final RecoveryTarget recoveryTarget = new RecoveryTarget(replica, null, null); - assertThat(PeerRecoveryTargetService.getStartingSeqNo(logger, recoveryTarget), equalTo(SequenceNumbers.UNASSIGNED_SEQ_NO)); - recoveryTarget.decRef(); - } - } finally { - closeShards(replica); - } - } - public void testWriteFileChunksConcurrently() throws Exception { IndexShard sourceShard = newStartedShard(true); int numDocs = between(20, 100); @@ -202,4 +133,86 @@ public void testWriteFileChunksConcurrently() throws Exception { assertThat(diff.different, empty()); closeShards(sourceShard, targetShard); } + + public void testPrepareIndexForPeerRecovery() throws Exception { + CheckedFunction populateData = shard -> { + List seqNos = LongStream.range(0, 100).boxed().collect(Collectors.toList()); + Randomness.shuffle(seqNos); + for (long seqNo : seqNos) { + shard.applyIndexOperationOnReplica(seqNo, 1, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, new SourceToParse( + shard.shardId().getIndexName(), "_doc", UUIDs.randomBase64UUID(), new BytesArray("{}"), XContentType.JSON)); + if (randomInt(100) < 5) { + shard.flush(new FlushRequest().waitIfOngoing(true)); + } + } + shard.sync(); + long globalCheckpoint = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, shard.getLocalCheckpoint()); + shard.updateGlobalCheckpointOnReplica(globalCheckpoint, "test"); + shard.sync(); + return globalCheckpoint; + }; + DiscoveryNode localNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(), + Collections.emptyMap(), Collections.emptySet(), Version.CURRENT); + + // empty copy + IndexShard shard = newShard(false); + shard.markAsRecovering("for testing", new RecoveryState(shard.routingEntry(), localNode, localNode)); + shard.prepareForIndexRecovery(); + assertThat(shard.recoverLocallyUpToGlobalCheckpoint(), equalTo(UNASSIGNED_SEQ_NO)); + assertThat(shard.recoveryState().getTranslog().totalLocal(), equalTo(RecoveryState.Translog.UNKNOWN)); + assertThat(shard.recoveryState().getTranslog().recoveredOperations(), equalTo(0)); + closeShards(shard); + + // good copy + shard = newStartedShard(false); + long globalCheckpoint = populateData.apply(shard); + Optional safeCommit = shard.store().findSafeIndexCommit(globalCheckpoint); + assertTrue(safeCommit.isPresent()); + IndexShard replica = reinitShard(shard, ShardRoutingHelper.initWithSameId(shard.routingEntry(), + RecoverySource.PeerRecoverySource.INSTANCE)); + replica.markAsRecovering("for testing", new RecoveryState(replica.routingEntry(), localNode, localNode)); + replica.prepareForIndexRecovery(); + assertThat(replica.recoverLocallyUpToGlobalCheckpoint(), equalTo(globalCheckpoint + 1)); + assertThat(replica.recoveryState().getTranslog().totalLocal(), + equalTo(Math.toIntExact(globalCheckpoint - safeCommit.get().localCheckpoint))); + assertThat(replica.recoveryState().getTranslog().recoveredOperations(), + equalTo(Math.toIntExact(globalCheckpoint - safeCommit.get().localCheckpoint))); + closeShards(replica); + + // corrupted copy + shard = newStartedShard(false); + if (randomBoolean()) { + populateData.apply(shard); + } + shard.store().markStoreCorrupted(new IOException("test")); + replica = reinitShard(shard, ShardRoutingHelper.initWithSameId(shard.routingEntry(), + RecoverySource.PeerRecoverySource.INSTANCE)); + replica.markAsRecovering("for testing", new RecoveryState(replica.routingEntry(), localNode, localNode)); + replica.prepareForIndexRecovery(); + assertThat(replica.recoverLocallyUpToGlobalCheckpoint(), equalTo(UNASSIGNED_SEQ_NO)); + assertThat(replica.recoveryState().getTranslog().totalLocal(), equalTo(RecoveryState.Translog.UNKNOWN)); + assertThat(replica.recoveryState().getTranslog().recoveredOperations(), equalTo(0)); + closeShards(replica); + + // copy with truncated translog + shard = newStartedShard(false); + globalCheckpoint = populateData.apply(shard); + replica = reinitShard(shard, ShardRoutingHelper.initWithSameId(shard.routingEntry(), + RecoverySource.PeerRecoverySource.INSTANCE)); + String translogUUID = Translog.createEmptyTranslog(replica.shardPath().resolveTranslog(), globalCheckpoint, + replica.shardId(), replica.getPendingPrimaryTerm()); + replica.store().associateIndexWithNewTranslog(translogUUID); + safeCommit = replica.store().findSafeIndexCommit(globalCheckpoint); + replica.markAsRecovering("for testing", new RecoveryState(replica.routingEntry(), localNode, localNode)); + replica.prepareForIndexRecovery(); + if (safeCommit.isPresent()) { + assertThat(replica.recoverLocallyUpToGlobalCheckpoint(), equalTo(safeCommit.get().localCheckpoint + 1)); + assertThat(replica.recoveryState().getTranslog().totalLocal(), equalTo(0)); + } else { + assertThat(replica.recoverLocallyUpToGlobalCheckpoint(), equalTo(UNASSIGNED_SEQ_NO)); + assertThat(replica.recoveryState().getTranslog().totalLocal(), equalTo(RecoveryState.Translog.UNKNOWN)); + } + assertThat(replica.recoveryState().getTranslog().recoveredOperations(), equalTo(0)); + closeShards(replica); + } } diff --git a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java index 2c6dff473caba..e0e2aee60e61f 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java @@ -627,18 +627,9 @@ protected final void recoverUnstartedReplica(final IndexShard replica, } replica.prepareForIndexRecovery(); final RecoveryTarget recoveryTarget = targetSupplier.apply(replica, pNode); - final String targetAllocationId = recoveryTarget.indexShard().routingEntry().allocationId().getId(); - - final Store.MetadataSnapshot snapshot = getMetadataSnapshotOrEmpty(replica); - final long startingSeqNo; - if (snapshot.size() > 0) { - startingSeqNo = PeerRecoveryTargetService.getStartingSeqNo(logger, recoveryTarget); - } else { - startingSeqNo = SequenceNumbers.UNASSIGNED_SEQ_NO; - } - - final StartRecoveryRequest request = new StartRecoveryRequest(replica.shardId(), targetAllocationId, - pNode, rNode, snapshot, replica.routingEntry().primary(), 0, startingSeqNo); + final long startingSeqNo = recoveryTarget.indexShard().recoverLocallyUpToGlobalCheckpoint(); + final StartRecoveryRequest request = PeerRecoveryTargetService.getStartRecoveryRequest( + logger, rNode, recoveryTarget, startingSeqNo); final RecoverySourceHandler recovery = new RecoverySourceHandler(primary, new AsyncRecoveryTarget(recoveryTarget, threadPool.generic()), threadPool, request, Math.toIntExact(ByteSizeUnit.MB.toBytes(1)), between(1, 8));