164164import java .util .stream .StreamSupport ;
165165
166166import static org .elasticsearch .index .mapper .SourceToParse .source ;
167- import static org .elasticsearch .index .seqno .SequenceNumbers .NO_OPS_PERFORMED ;
168167import static org .elasticsearch .index .seqno .SequenceNumbers .UNASSIGNED_SEQ_NO ;
169168
170169public class IndexShard extends AbstractIndexShardComponent implements IndicesClusterStateService .Shard {
@@ -1307,16 +1306,18 @@ public Engine.Result applyTranslogOperation(Translog.Operation operation, Engine
13071306 return result ;
13081307 }
13091308
1310- // package-private for testing
1311- int runTranslogRecovery (Engine engine , Translog .Snapshot snapshot ) throws IOException {
1312- recoveryState .getTranslog ().totalOperations (snapshot .totalOperations ());
1313- recoveryState .getTranslog ().totalOperationsOnStart (snapshot .totalOperations ());
1309+ /**
1310+ * Replays translog operations from the provided translog {@code snapshot} to the current engine using the given {@code origin}.
1311+ * The callback {@code onOperationRecovered} is notified after each translog operation is replayed successfully.
1312+ */
1313+ int runTranslogRecovery (Engine engine , Translog .Snapshot snapshot , Engine .Operation .Origin origin ,
1314+ Runnable onOperationRecovered ) throws IOException {
13141315 int opsRecovered = 0 ;
13151316 Translog .Operation operation ;
13161317 while ((operation = snapshot .next ()) != null ) {
13171318 try {
13181319 logger .trace ("[translog] recover op {}" , operation );
1319- Engine .Result result = applyTranslogOperation (operation , Engine . Operation . Origin . LOCAL_TRANSLOG_RECOVERY );
1320+ Engine .Result result = applyTranslogOperation (operation , origin );
13201321 switch (result .getResultType ()) {
13211322 case FAILURE :
13221323 throw result .getFailure ();
@@ -1329,7 +1330,7 @@ int runTranslogRecovery(Engine engine, Translog.Snapshot snapshot) throws IOExce
13291330 }
13301331
13311332 opsRecovered ++;
1332- recoveryState . getTranslog (). incrementRecoveredOperations ();
1333+ onOperationRecovered . run ();
13331334 } catch (Exception e ) {
13341335 if (ExceptionsHelper .status (e ) == RestStatus .BAD_REQUEST ) {
13351336 // mainly for MapperParsingException and Failure to detect xcontent
@@ -1347,8 +1348,15 @@ int runTranslogRecovery(Engine engine, Translog.Snapshot snapshot) throws IOExce
13471348 * Operations from the translog will be replayed to bring lucene up to date.
13481349 **/
13491350 public void openEngineAndRecoverFromTranslog () throws IOException {
1351+ final RecoveryState .Translog translogRecoveryStats = recoveryState .getTranslog ();
1352+ final Engine .TranslogRecoveryRunner translogRecoveryRunner = (engine , snapshot ) -> {
1353+ translogRecoveryStats .totalOperations (snapshot .totalOperations ());
1354+ translogRecoveryStats .totalOperationsOnStart (snapshot .totalOperations ());
1355+ return runTranslogRecovery (engine , snapshot , Engine .Operation .Origin .LOCAL_TRANSLOG_RECOVERY ,
1356+ translogRecoveryStats ::incrementRecoveredOperations );
1357+ };
13501358 innerOpenEngineAndTranslog ();
1351- getEngine ().recoverFromTranslog (this :: runTranslogRecovery , Long .MAX_VALUE );
1359+ getEngine ().recoverFromTranslog (translogRecoveryRunner , Long .MAX_VALUE );
13521360 }
13531361
13541362 /**
@@ -1386,11 +1394,7 @@ private void innerOpenEngineAndTranslog() throws IOException {
13861394 final String translogUUID = store .readLastCommittedSegmentsInfo ().getUserData ().get (Translog .TRANSLOG_UUID_KEY );
13871395 final long globalCheckpoint = Translog .readGlobalCheckpoint (translogConfig .getTranslogPath (), translogUUID );
13881396 replicationTracker .updateGlobalCheckpointOnReplica (globalCheckpoint , "read from translog checkpoint" );
1389-
1390- assertMaxUnsafeAutoIdInCommit ();
1391-
1392- final long minRetainedTranslogGen = Translog .readMinTranslogGeneration (translogConfig .getTranslogPath (), translogUUID );
1393- store .trimUnsafeCommits (globalCheckpoint , minRetainedTranslogGen , config .getIndexSettings ().getIndexVersionCreated ());
1397+ trimUnsafeCommits ();
13941398
13951399 createNewEngine (config );
13961400 verifyNotClosed ();
@@ -1401,6 +1405,15 @@ private void innerOpenEngineAndTranslog() throws IOException {
14011405 assert recoveryState .getStage () == RecoveryState .Stage .TRANSLOG : "TRANSLOG stage expected but was: " + recoveryState .getStage ();
14021406 }
14031407
1408+ private void trimUnsafeCommits () throws IOException {
1409+ assert currentEngineReference .get () == null : "engine is running" ;
1410+ final String translogUUID = store .readLastCommittedSegmentsInfo ().getUserData ().get (Translog .TRANSLOG_UUID_KEY );
1411+ final long globalCheckpoint = Translog .readGlobalCheckpoint (translogConfig .getTranslogPath (), translogUUID );
1412+ final long minRetainedTranslogGen = Translog .readMinTranslogGeneration (translogConfig .getTranslogPath (), translogUUID );
1413+ assertMaxUnsafeAutoIdInCommit ();
1414+ store .trimUnsafeCommits (globalCheckpoint , minRetainedTranslogGen , indexSettings .getIndexVersionCreated ());
1415+ }
1416+
14041417 private boolean assertSequenceNumbersInCommit () throws IOException {
14051418 final Map <String , String > userData = SegmentInfos .readLatestCommit (store .directory ()).getUserData ();
14061419 assert userData .containsKey (SequenceNumbers .LOCAL_CHECKPOINT_KEY ) : "commit point doesn't contains a local checkpoint" ;
@@ -1501,7 +1514,7 @@ private void ensureWriteAllowed(Engine.Operation.Origin origin) throws IllegalIn
15011514 if (origin == Engine .Operation .Origin .PRIMARY ) {
15021515 assert assertPrimaryMode ();
15031516 } else {
1504- assert origin == Engine .Operation .Origin .REPLICA ;
1517+ assert origin == Engine .Operation .Origin .REPLICA || origin == Engine . Operation . Origin . LOCAL_RESET ;
15051518 assert assertReplicationTarget ();
15061519 }
15071520 if (writeAllowedStates .contains (state ) == false ) {
@@ -2207,9 +2220,7 @@ public void onFailedEngine(String reason, @Nullable Exception failure) {
22072220
22082221 private Engine createNewEngine (EngineConfig config ) {
22092222 synchronized (mutex ) {
2210- if (state == IndexShardState .CLOSED ) {
2211- throw new AlreadyClosedException (shardId + " can't create engine - shard is closed" );
2212- }
2223+ verifyNotClosed ();
22132224 assert this .currentEngineReference .get () == null ;
22142225 Engine engine = newEngine (config );
22152226 onNewEngine (engine ); // call this before we pass the memory barrier otherwise actions that happen
@@ -2355,19 +2366,14 @@ public void acquireReplicaOperationPermit(final long opPrimaryTerm, final long g
23552366 bumpPrimaryTerm (opPrimaryTerm , () -> {
23562367 updateGlobalCheckpointOnReplica (globalCheckpoint , "primary term transition" );
23572368 final long currentGlobalCheckpoint = getGlobalCheckpoint ();
2358- final long localCheckpoint ;
2359- if (currentGlobalCheckpoint == UNASSIGNED_SEQ_NO ) {
2360- localCheckpoint = NO_OPS_PERFORMED ;
2369+ final long maxSeqNo = seqNoStats ().getMaxSeqNo ();
2370+ logger .info ("detected new primary with primary term [{}], global checkpoint [{}], max_seq_no [{}]" ,
2371+ opPrimaryTerm , currentGlobalCheckpoint , maxSeqNo );
2372+ if (currentGlobalCheckpoint < maxSeqNo ) {
2373+ resetEngineToGlobalCheckpoint ();
23612374 } else {
2362- localCheckpoint = currentGlobalCheckpoint ;
2375+ getEngine (). rollTranslogGeneration () ;
23632376 }
2364- logger .trace (
2365- "detected new primary with primary term [{}], resetting local checkpoint from [{}] to [{}]" ,
2366- opPrimaryTerm ,
2367- getLocalCheckpoint (),
2368- localCheckpoint );
2369- getEngine ().resetLocalCheckpoint (localCheckpoint );
2370- getEngine ().rollTranslogGeneration ();
23712377 });
23722378 }
23732379 }
@@ -2663,4 +2669,26 @@ public ParsedDocument newNoopTombstoneDoc(String reason) {
26632669 }
26642670 };
26652671 }
2672+
2673+ /**
2674+ * Rollback the current engine to the safe commit, then replay local translog up to the global checkpoint.
2675+ */
2676+ void resetEngineToGlobalCheckpoint () throws IOException {
2677+ assert getActiveOperationsCount () == 0 : "Ongoing writes [" + getActiveOperations () + "]" ;
2678+ sync (); // persist the global checkpoint to disk
2679+ final long globalCheckpoint = getGlobalCheckpoint ();
2680+ final Engine newEngine ;
2681+ synchronized (mutex ) {
2682+ verifyNotClosed ();
2683+ IOUtils .close (currentEngineReference .getAndSet (null ));
2684+ trimUnsafeCommits ();
2685+ newEngine = createNewEngine (newEngineConfig ());
2686+ active .set (true );
2687+ }
2688+ final Engine .TranslogRecoveryRunner translogRunner = (engine , snapshot ) -> runTranslogRecovery (
2689+ engine , snapshot , Engine .Operation .Origin .LOCAL_RESET , () -> {
2690+ // TODO: add a dedicate recovery stats for the reset translog
2691+ });
2692+ newEngine .recoverFromTranslog (translogRunner , globalCheckpoint );
2693+ }
26662694}
0 commit comments