Skip to content

Commit 095f31b

Browse files
authored
Replica start peer recovery with safe commit (#28181)
Today a replica starts a peer-recovery with the last commit. If the last commit is not a safe commit, a replica will immediately fallback to the file based sync which is more expensive than the sequence based recovery. This commit modifies the peer-recovery in replica to start with a safe commit. Moreover we can keep the existing translog on the target if the recovery is sequence based recovery. Relates #10708
1 parent f2db2a0 commit 095f31b

15 files changed

+196
-133
lines changed

server/src/main/java/org/elasticsearch/index/engine/Engine.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1512,6 +1512,11 @@ public interface Warmer {
15121512
*/
15131513
public abstract Engine recoverFromTranslog() throws IOException;
15141514

1515+
/**
1516+
* Do not replay translog operations, but make the engine be ready.
1517+
*/
1518+
public abstract void skipTranslogRecovery();
1519+
15151520
/**
15161521
* Returns <code>true</code> iff this engine is currently recovering from translog.
15171522
*/

server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -401,6 +401,15 @@ public InternalEngine recoverFromTranslog() throws IOException {
401401
return this;
402402
}
403403

404+
@Override
405+
public void skipTranslogRecovery() {
406+
if (openMode != EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG) {
407+
throw new IllegalStateException("Can't skip translog recovery with open mode: " + openMode);
408+
}
409+
assert pendingTranslogRecovery.get() : "translogRecovery is not pending but should be";
410+
pendingTranslogRecovery.set(false); // we are good - now we can commit
411+
}
412+
404413
private IndexCommit getStartingCommitPoint() throws IOException {
405414
if (openMode == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG) {
406415
final long lastSyncedGlobalCheckpoint = translog.getLastSyncedGlobalCheckpoint();

server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1304,9 +1304,20 @@ public void openIndexAndCreateTranslog(boolean forceNewHistoryUUID, long globalC
13041304
* opens the engine on top of the existing lucene engine and translog.
13051305
* Operations from the translog will be replayed to bring lucene up to date.
13061306
**/
1307-
public void openIndexAndTranslog() throws IOException {
1307+
public void openIndexAndRecoveryFromTranslog() throws IOException {
13081308
assert recoveryState.getRecoverySource().getType() == RecoverySource.Type.EXISTING_STORE;
13091309
innerOpenEngineAndTranslog(EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG, false);
1310+
getEngine().recoverFromTranslog();
1311+
}
1312+
1313+
/**
1314+
* Opens the engine on top of the existing lucene engine and translog.
1315+
* The translog is kept but its operations won't be replayed.
1316+
*/
1317+
public void openIndexAndSkipTranslogRecovery() throws IOException {
1318+
assert recoveryState.getRecoverySource().getType() == RecoverySource.Type.PEER;
1319+
innerOpenEngineAndTranslog(EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG, false);
1320+
getEngine().skipTranslogRecovery();
13101321
}
13111322

13121323
private void innerOpenEngineAndTranslog(final EngineConfig.OpenMode openMode, final boolean forceNewHistoryUUID) throws IOException {
@@ -1339,13 +1350,12 @@ private void innerOpenEngineAndTranslog(final EngineConfig.OpenMode openMode, fi
13391350
globalCheckpointTracker.updateGlobalCheckpointOnReplica(Translog.readGlobalCheckpoint(translogConfig.getTranslogPath()),
13401351
"read from translog checkpoint");
13411352
}
1342-
Engine newEngine = createNewEngine(config);
1353+
createNewEngine(config);
13431354
verifyNotClosed();
13441355
if (openMode == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG) {
13451356
// We set active because we are now writing operations to the engine; this way, if we go idle after some time and become inactive,
13461357
// we still give sync'd flush a chance to run:
13471358
active.set(true);
1348-
newEngine.recoverFromTranslog();
13491359
}
13501360
assertSequenceNumbersInCommit();
13511361
assert recoveryState.getStage() == RecoveryState.Stage.TRANSLOG : "TRANSLOG stage expected but was: " + recoveryState.getStage();

server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -401,7 +401,7 @@ private void internalRecoverFromStore(IndexShard indexShard) throws IndexShardRe
401401
logger.debug("failed to list file details", e);
402402
}
403403
if (indexShouldExists) {
404-
indexShard.openIndexAndTranslog();
404+
indexShard.openIndexAndRecoveryFromTranslog();
405405
indexShard.getEngine().fillSeqNoGaps(indexShard.getPrimaryTerm());
406406
} else {
407407
indexShard.createIndexAndTranslog();

server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121

2222
import org.apache.logging.log4j.message.ParameterizedMessage;
2323
import org.apache.logging.log4j.util.Supplier;
24+
import org.apache.lucene.index.DirectoryReader;
25+
import org.apache.lucene.index.IndexCommit;
2426
import org.apache.lucene.store.AlreadyClosedException;
2527
import org.apache.lucene.store.RateLimiter;
2628
import org.elasticsearch.ElasticsearchException;
@@ -39,6 +41,7 @@
3941
import org.elasticsearch.common.util.CancellableThreads;
4042
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
4143
import org.elasticsearch.index.IndexNotFoundException;
44+
import org.elasticsearch.index.engine.CombinedDeletionPolicy;
4245
import org.elasticsearch.index.engine.RecoveryEngineException;
4346
import org.elasticsearch.index.mapper.MapperException;
4447
import org.elasticsearch.index.seqno.SequenceNumbers;
@@ -60,6 +63,7 @@
6063
import org.elasticsearch.transport.TransportService;
6164

6265
import java.io.IOException;
66+
import java.util.List;
6367
import java.util.concurrent.atomic.AtomicLong;
6468
import java.util.concurrent.atomic.AtomicReference;
6569

@@ -108,8 +112,8 @@ public PeerRecoveryTargetService(Settings settings, ThreadPool threadPool, Trans
108112
FileChunkTransportRequestHandler());
109113
transportService.registerRequestHandler(Actions.CLEAN_FILES, RecoveryCleanFilesRequest::new, ThreadPool.Names.GENERIC, new
110114
CleanFilesRequestHandler());
111-
transportService.registerRequestHandler(Actions.PREPARE_TRANSLOG, RecoveryPrepareForTranslogOperationsRequest::new, ThreadPool
112-
.Names.GENERIC, new PrepareForTranslogOperationsRequestHandler());
115+
transportService.registerRequestHandler(Actions.PREPARE_TRANSLOG, ThreadPool.Names.GENERIC,
116+
RecoveryPrepareForTranslogOperationsRequest::new, new PrepareForTranslogOperationsRequestHandler());
113117
transportService.registerRequestHandler(Actions.TRANSLOG_OPS, RecoveryTranslogOperationsRequest::new, ThreadPool.Names.GENERIC,
114118
new TranslogOperationsRequestHandler());
115119
transportService.registerRequestHandler(Actions.FINALIZE, RecoveryFinalizeRecoveryRequest::new, ThreadPool.Names.GENERIC, new
@@ -353,7 +357,9 @@ private StartRecoveryRequest getStartRecoveryRequest(final RecoveryTarget recove
353357
public static long getStartingSeqNo(final RecoveryTarget recoveryTarget) {
354358
try {
355359
final long globalCheckpoint = Translog.readGlobalCheckpoint(recoveryTarget.translogLocation());
356-
final SequenceNumbers.CommitInfo seqNoStats = recoveryTarget.store().loadSeqNoInfo(null);
360+
final List<IndexCommit> existingCommits = DirectoryReader.listCommits(recoveryTarget.store().directory());
361+
final IndexCommit safeCommit = CombinedDeletionPolicy.findSafeCommitPoint(existingCommits, globalCheckpoint);
362+
final SequenceNumbers.CommitInfo seqNoStats = recoveryTarget.store().loadSeqNoInfo(safeCommit);
357363
if (seqNoStats.maxSeqNo <= globalCheckpoint) {
358364
assert seqNoStats.localCheckpoint <= globalCheckpoint;
359365
/*
@@ -387,7 +393,7 @@ class PrepareForTranslogOperationsRequestHandler implements TransportRequestHand
387393
public void messageReceived(RecoveryPrepareForTranslogOperationsRequest request, TransportChannel channel) throws Exception {
388394
try (RecoveryRef recoveryRef = onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId()
389395
)) {
390-
recoveryRef.target().prepareForTranslogOperations(request.totalTranslogOps());
396+
recoveryRef.target().prepareForTranslogOperations(request.deleteLocalTranslog(), request.totalTranslogOps());
391397
}
392398
channel.sendResponse(TransportResponse.Empty.INSTANCE);
393399
}

server/src/main/java/org/elasticsearch/indices/recovery/RecoveryPrepareForTranslogOperationsRequest.java

Lines changed: 30 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -28,19 +28,33 @@
2828

2929
import java.io.IOException;
3030

31-
public class RecoveryPrepareForTranslogOperationsRequest extends TransportRequest {
31+
class RecoveryPrepareForTranslogOperationsRequest extends TransportRequest {
3232

33-
private long recoveryId;
34-
private ShardId shardId;
35-
private int totalTranslogOps = RecoveryState.Translog.UNKNOWN;
33+
private final long recoveryId;
34+
private final ShardId shardId;
35+
private final int totalTranslogOps;
36+
private final boolean deleteLocalTranslog;
3637

37-
public RecoveryPrepareForTranslogOperationsRequest() {
38-
}
39-
40-
RecoveryPrepareForTranslogOperationsRequest(long recoveryId, ShardId shardId, int totalTranslogOps) {
38+
RecoveryPrepareForTranslogOperationsRequest(long recoveryId, ShardId shardId, int totalTranslogOps, boolean deleteLocalTranslog) {
4139
this.recoveryId = recoveryId;
4240
this.shardId = shardId;
4341
this.totalTranslogOps = totalTranslogOps;
42+
this.deleteLocalTranslog = deleteLocalTranslog;
43+
}
44+
45+
RecoveryPrepareForTranslogOperationsRequest(StreamInput in) throws IOException {
46+
super.readFrom(in);
47+
recoveryId = in.readLong();
48+
shardId = ShardId.readShardId(in);
49+
totalTranslogOps = in.readVInt();
50+
if (in.getVersion().before(Version.V_6_0_0_alpha1)) {
51+
in.readLong(); // maxUnsafeAutoIdTimestamp
52+
}
53+
if (in.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
54+
deleteLocalTranslog = in.readBoolean();
55+
} else {
56+
deleteLocalTranslog = true;
57+
}
4458
}
4559

4660
public long recoveryId() {
@@ -55,15 +69,11 @@ public int totalTranslogOps() {
5569
return totalTranslogOps;
5670
}
5771

58-
@Override
59-
public void readFrom(StreamInput in) throws IOException {
60-
super.readFrom(in);
61-
recoveryId = in.readLong();
62-
shardId = ShardId.readShardId(in);
63-
totalTranslogOps = in.readVInt();
64-
if (in.getVersion().before(Version.V_6_0_0_alpha1)) {
65-
in.readLong(); // maxUnsafeAutoIdTimestamp
66-
}
72+
/**
73+
* Whether or not the recover target should delete its local translog
74+
*/
75+
boolean deleteLocalTranslog() {
76+
return deleteLocalTranslog;
6777
}
6878

6979
@Override
@@ -75,5 +85,8 @@ public void writeTo(StreamOutput out) throws IOException {
7585
if (out.getVersion().before(Version.V_6_0_0_alpha1)) {
7686
out.writeLong(IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP); // maxUnsafeAutoIdTimestamp
7787
}
88+
if (out.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
89+
out.writeBoolean(deleteLocalTranslog);
90+
}
7891
}
7992
}

server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -150,9 +150,9 @@ public RecoveryResponse recoverToTarget() throws IOException {
150150

151151
final long startingSeqNo;
152152
final long requiredSeqNoRangeStart;
153-
final boolean isSequenceNumberBasedRecoveryPossible = request.startingSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO &&
153+
final boolean isSequenceNumberBasedRecovery = request.startingSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO &&
154154
isTargetSameHistory() && isTranslogReadyForSequenceNumberBasedRecovery();
155-
if (isSequenceNumberBasedRecoveryPossible) {
155+
if (isSequenceNumberBasedRecovery) {
156156
logger.trace("performing sequence numbers based recovery. starting at [{}]", request.startingSeqNo());
157157
startingSeqNo = request.startingSeqNo();
158158
requiredSeqNoRangeStart = startingSeqNo;
@@ -188,7 +188,8 @@ public RecoveryResponse recoverToTarget() throws IOException {
188188
runUnderPrimaryPermit(() -> shard.initiateTracking(request.targetAllocationId()));
189189

190190
try {
191-
prepareTargetForTranslog(translog.estimateTotalOperationsFromMinSeq(startingSeqNo));
191+
// For a sequence based recovery, the target can keep its local translog
192+
prepareTargetForTranslog(isSequenceNumberBasedRecovery == false, translog.estimateTotalOperationsFromMinSeq(startingSeqNo));
192193
} catch (final Exception e) {
193194
throw new RecoveryEngineException(shard.shardId(), 1, "prepare target for translog failed", e);
194195
}
@@ -421,13 +422,13 @@ public void phase1(final IndexCommit snapshot, final Supplier<Integer> translogO
421422
}
422423
}
423424

424-
void prepareTargetForTranslog(final int totalTranslogOps) throws IOException {
425+
void prepareTargetForTranslog(final boolean createNewTranslog, final int totalTranslogOps) throws IOException {
425426
StopWatch stopWatch = new StopWatch().start();
426427
logger.trace("recovery [phase1]: prepare remote engine for translog");
427428
final long startEngineStart = stopWatch.totalTime().millis();
428429
// Send a request preparing the new shard's translog to receive operations. This ensures the shard engine is started and disables
429430
// garbage collection (not the JVM's GC!) of tombstone deletes.
430-
cancellableThreads.executeIO(() -> recoveryTarget.prepareForTranslogOperations(totalTranslogOps));
431+
cancellableThreads.executeIO(() -> recoveryTarget.prepareForTranslogOperations(createNewTranslog, totalTranslogOps));
431432
stopWatch.stop();
432433

433434
response.startTime = stopWatch.totalTime().millis() - startEngineStart;

server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -362,10 +362,14 @@ private void ensureRefCount() {
362362
/*** Implementation of {@link RecoveryTargetHandler } */
363363

364364
@Override
365-
public void prepareForTranslogOperations(int totalTranslogOps) throws IOException {
365+
public void prepareForTranslogOperations(boolean createNewTranslog, int totalTranslogOps) throws IOException {
366366
state().getTranslog().totalOperations(totalTranslogOps);
367-
// TODO: take the local checkpoint from store as global checkpoint, once we know it's safe
368-
indexShard().openIndexAndCreateTranslog(false, SequenceNumbers.UNASSIGNED_SEQ_NO);
367+
if (createNewTranslog) {
368+
// TODO: Assigns the global checkpoint to the max_seqno of the safe commit if the index version >= 6.2
369+
indexShard().openIndexAndCreateTranslog(false, SequenceNumbers.UNASSIGNED_SEQ_NO);
370+
} else {
371+
indexShard().openIndexAndSkipTranslogRecovery();
372+
}
369373
}
370374

371375
@Override

server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,10 +32,10 @@ public interface RecoveryTargetHandler {
3232

3333
/**
3434
* Prepares the target to receive translog operations, after all file have been copied
35-
*
36-
* @param totalTranslogOps total translog operations expected to be sent
35+
* @param createNewTranslog whether or not to delete the local translog on the target
36+
* @param totalTranslogOps total translog operations expected to be sent
3737
*/
38-
void prepareForTranslogOperations(int totalTranslogOps) throws IOException;
38+
void prepareForTranslogOperations(boolean createNewTranslog, int totalTranslogOps) throws IOException;
3939

4040
/**
4141
* The finalize request refreshes the engine now that new segments are available, enables garbage collection of tombstone files, and

server/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -76,9 +76,9 @@ public RemoteRecoveryTargetHandler(long recoveryId, ShardId shardId, TransportSe
7676
}
7777

7878
@Override
79-
public void prepareForTranslogOperations(int totalTranslogOps) throws IOException {
79+
public void prepareForTranslogOperations(boolean createNewTranslog, int totalTranslogOps) throws IOException {
8080
transportService.submitRequest(targetNode, PeerRecoveryTargetService.Actions.PREPARE_TRANSLOG,
81-
new RecoveryPrepareForTranslogOperationsRequest(recoveryId, shardId, totalTranslogOps),
81+
new RecoveryPrepareForTranslogOperationsRequest(recoveryId, shardId, totalTranslogOps, createNewTranslog),
8282
TransportRequestOptions.builder().withTimeout(recoverySettings.internalActionTimeout()).build(),
8383
EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
8484
}

0 commit comments

Comments
 (0)