8282import org .elasticsearch .index .engine .EngineConfig ;
8383import org .elasticsearch .index .engine .EngineException ;
8484import org .elasticsearch .index .engine .EngineFactory ;
85+ import org .elasticsearch .index .engine .ReadOnlyEngine ;
8586import org .elasticsearch .index .engine .RefreshFailedEngineException ;
8687import org .elasticsearch .index .engine .Segment ;
8788import org .elasticsearch .index .engine .SegmentsStats ;
156157import java .util .concurrent .atomic .AtomicReference ;
157158import java .util .function .BiConsumer ;
158159import java .util .function .Consumer ;
160+ import java .util .function .Function ;
159161import java .util .function .Supplier ;
160162import java .util .stream .Collectors ;
161163import java .util .stream .StreamSupport ;
@@ -712,18 +714,18 @@ private IndexShardState changeState(IndexShardState newState, String reason) {
712714
713715 public Engine .IndexResult applyIndexOperationOnPrimary (long version , VersionType versionType , SourceToParse sourceToParse ,
714716 long autoGeneratedTimestamp , boolean isRetry ) throws IOException {
715- return applyIndexOperation (SequenceNumbers .UNASSIGNED_SEQ_NO , operationPrimaryTerm , version , versionType , autoGeneratedTimestamp ,
716- isRetry , Engine .Operation .Origin .PRIMARY , sourceToParse );
717+ return applyIndexOperation (getEngine (), SequenceNumbers .UNASSIGNED_SEQ_NO , operationPrimaryTerm , version , versionType ,
718+ autoGeneratedTimestamp , isRetry , Engine .Operation .Origin .PRIMARY , sourceToParse );
717719 }
718720
719721 public Engine .IndexResult applyIndexOperationOnReplica (long seqNo , long version , VersionType versionType ,
720722 long autoGeneratedTimeStamp , boolean isRetry , SourceToParse sourceToParse )
721723 throws IOException {
722- return applyIndexOperation (seqNo , operationPrimaryTerm , version , versionType , autoGeneratedTimeStamp , isRetry ,
724+ return applyIndexOperation (getEngine (), seqNo , operationPrimaryTerm , version , versionType , autoGeneratedTimeStamp , isRetry ,
723725 Engine .Operation .Origin .REPLICA , sourceToParse );
724726 }
725727
726- private Engine .IndexResult applyIndexOperation (long seqNo , long opPrimaryTerm , long version , VersionType versionType ,
728+ private Engine .IndexResult applyIndexOperation (Engine engine , long seqNo , long opPrimaryTerm , long version , VersionType versionType ,
727729 long autoGeneratedTimeStamp , boolean isRetry , Engine .Operation .Origin origin ,
728730 SourceToParse sourceToParse ) throws IOException {
729731 assert opPrimaryTerm <= this .operationPrimaryTerm : "op term [ " + opPrimaryTerm + " ] > shard term [" + this .operationPrimaryTerm
@@ -748,7 +750,7 @@ private Engine.IndexResult applyIndexOperation(long seqNo, long opPrimaryTerm, l
748750 return new Engine .IndexResult (e , version , opPrimaryTerm , seqNo );
749751 }
750752
751- return index (getEngine () , operation );
753+ return index (engine , operation );
752754 }
753755
754756 public static Engine .Index prepareIndex (DocumentMapperForType docMapper , Version indexCreatedVersion , SourceToParse source , long seqNo ,
@@ -789,17 +791,17 @@ private Engine.IndexResult index(Engine engine, Engine.Index index) throws IOExc
789791 }
790792
791793 public Engine .NoOpResult markSeqNoAsNoop (long seqNo , String reason ) throws IOException {
792- return markSeqNoAsNoop (seqNo , operationPrimaryTerm , reason , Engine .Operation .Origin .REPLICA );
794+ return markSeqNoAsNoop (getEngine (), seqNo , operationPrimaryTerm , reason , Engine .Operation .Origin .REPLICA );
793795 }
794796
795- private Engine .NoOpResult markSeqNoAsNoop (long seqNo , long opPrimaryTerm , String reason ,
797+ private Engine .NoOpResult markSeqNoAsNoop (Engine engine , long seqNo , long opPrimaryTerm , String reason ,
796798 Engine .Operation .Origin origin ) throws IOException {
797799 assert opPrimaryTerm <= this .operationPrimaryTerm : "op term [ " + opPrimaryTerm + " ] > shard term [" + this .operationPrimaryTerm
798800 + "]" ;
799801 long startTime = System .nanoTime ();
800802 ensureWriteAllowed (origin );
801803 final Engine .NoOp noOp = new Engine .NoOp (seqNo , opPrimaryTerm , origin , startTime , reason );
802- return noOp (getEngine () , noOp );
804+ return noOp (engine , noOp );
803805 }
804806
805807 private Engine .NoOpResult noOp (Engine engine , Engine .NoOp noOp ) {
@@ -820,16 +822,17 @@ public Engine.DeleteResult getFailedDeleteResult(Exception e, long version) {
820822
821823 public Engine .DeleteResult applyDeleteOperationOnPrimary (long version , String type , String id , VersionType versionType )
822824 throws IOException {
823- return applyDeleteOperation (SequenceNumbers .UNASSIGNED_SEQ_NO , operationPrimaryTerm , version , type , id , versionType ,
825+ return applyDeleteOperation (getEngine (), SequenceNumbers .UNASSIGNED_SEQ_NO , operationPrimaryTerm , version , type , id , versionType ,
824826 Engine .Operation .Origin .PRIMARY );
825827 }
826828
827829 public Engine .DeleteResult applyDeleteOperationOnReplica (long seqNo , long version , String type , String id ,
828830 VersionType versionType ) throws IOException {
829- return applyDeleteOperation (seqNo , operationPrimaryTerm , version , type , id , versionType , Engine .Operation .Origin .REPLICA );
831+ return applyDeleteOperation (getEngine (), seqNo , operationPrimaryTerm , version , type , id , versionType ,
832+ Engine .Operation .Origin .REPLICA );
830833 }
831834
832- private Engine .DeleteResult applyDeleteOperation (long seqNo , long opPrimaryTerm , long version , String type , String id ,
835+ private Engine .DeleteResult applyDeleteOperation (Engine engine , long seqNo , long opPrimaryTerm , long version , String type , String id ,
833836 VersionType versionType , Engine .Operation .Origin origin ) throws IOException {
834837 assert opPrimaryTerm <= this .operationPrimaryTerm : "op term [ " + opPrimaryTerm + " ] > shard term [" + this .operationPrimaryTerm
835838 + "]" ;
@@ -855,7 +858,7 @@ private Engine.DeleteResult applyDeleteOperation(long seqNo, long opPrimaryTerm,
855858 final Term uid = extractUidForDelete (type , id );
856859 final Engine .Delete delete = prepareDelete (type , id , uid , seqNo , opPrimaryTerm , version ,
857860 versionType , origin );
858- return delete (getEngine () , delete );
861+ return delete (engine , delete );
859862 }
860863
861864 private static Engine .Delete prepareDelete (String type , String id , Term uid , long seqNo , long primaryTerm , long version ,
@@ -1305,25 +1308,32 @@ public void updateMaxUnsafeAutoIdTimestamp(long maxSeenAutoIdTimestampFromPrimar
13051308 }
13061309
13071310 public Engine .Result applyTranslogOperation (Translog .Operation operation , Engine .Operation .Origin origin ) throws IOException {
1311+ return applyTranslogOperation (getEngine (), operation , origin );
1312+ }
1313+
1314+ private Engine .Result applyTranslogOperation (Engine engine , Translog .Operation operation ,
1315+ Engine .Operation .Origin origin ) throws IOException {
1316+ // If a translog op is replayed on the primary (eg. ccr), we need to use external instead of null for its version type.
1317+ final VersionType versionType = (origin == Engine .Operation .Origin .PRIMARY ) ? VersionType .EXTERNAL : null ;
13081318 final Engine .Result result ;
13091319 switch (operation .opType ()) {
13101320 case INDEX :
13111321 final Translog .Index index = (Translog .Index ) operation ;
13121322 // we set canHaveDuplicates to true all the time such that we de-optimze the translog case and ensure that all
13131323 // autoGeneratedID docs that are coming from the primary are updated correctly.
1314- result = applyIndexOperation (index .seqNo (), index .primaryTerm (), index .version (),
1324+ result = applyIndexOperation (engine , index .seqNo (), index .primaryTerm (), index .version (),
13151325 index .versionType ().versionTypeForReplicationAndRecovery (), index .getAutoGeneratedIdTimestamp (), true , origin ,
13161326 source (shardId .getIndexName (), index .type (), index .id (), index .source (),
13171327 XContentHelper .xContentType (index .source ())).routing (index .routing ()).parent (index .parent ()));
13181328 break ;
13191329 case DELETE :
13201330 final Translog .Delete delete = (Translog .Delete ) operation ;
1321- result = applyDeleteOperation (delete .seqNo (), delete .primaryTerm (), delete .version (), delete .type (), delete .id (),
1331+ result = applyDeleteOperation (engine , delete .seqNo (), delete .primaryTerm (), delete .version (), delete .type (), delete .id (),
13221332 delete .versionType ().versionTypeForReplicationAndRecovery (), origin );
13231333 break ;
13241334 case NO_OP :
13251335 final Translog .NoOp noOp = (Translog .NoOp ) operation ;
1326- result = markSeqNoAsNoop (noOp .seqNo (), noOp .primaryTerm (), noOp .reason (), origin );
1336+ result = markSeqNoAsNoop (engine , noOp .seqNo (), noOp .primaryTerm (), noOp .reason (), origin );
13271337 break ;
13281338 default :
13291339 throw new IllegalStateException ("No operation defined for [" + operation + "]" );
@@ -1342,7 +1352,7 @@ int runTranslogRecovery(Engine engine, Translog.Snapshot snapshot, Engine.Operat
13421352 while ((operation = snapshot .next ()) != null ) {
13431353 try {
13441354 logger .trace ("[translog] recover op {}" , operation );
1345- Engine .Result result = applyTranslogOperation (operation , origin );
1355+ Engine .Result result = applyTranslogOperation (engine , operation , origin );
13461356 switch (result .getResultType ()) {
13471357 case FAILURE :
13481358 throw result .getFailure ();
@@ -1422,18 +1432,26 @@ private void innerOpenEngineAndTranslog() throws IOException {
14221432 final long globalCheckpoint = Translog .readGlobalCheckpoint (translogConfig .getTranslogPath (), translogUUID );
14231433 replicationTracker .updateGlobalCheckpointOnReplica (globalCheckpoint , "read from translog checkpoint" );
14241434 trimUnsafeCommits ();
1425-
1426- createNewEngine (config );
1427- verifyNotClosed ();
1428- // We set active because we are now writing operations to the engine; this way, if we go idle after some time and become inactive,
1429- // we still give sync'd flush a chance to run:
1430- active .set (true );
1435+ synchronized (mutex ) {
1436+ verifyNotClosed ();
1437+ assert currentEngineReference .get () == null : "engine is running" ;
1438+ // we must create a new engine under mutex (see IndexShard#snapshotStoreMetadata).
1439+ final Engine newEngine = engineFactory .newReadWriteEngine (config );
1440+ onNewEngine (newEngine );
1441+ currentEngineReference .set (newEngine );
1442+ // We set active because we are now writing operations to the engine; this way,
1443+ // if we go idle after some time and become inactive, we still give sync'd flush a chance to run.
1444+ active .set (true );
1445+ }
1446+ // time elapses after the engine is created above (pulling the config settings) until we set the engine reference, during
1447+ // which settings changes could possibly have happened, so here we forcefully push any config changes to the new engine.
1448+ onSettingsChanged ();
14311449 assertSequenceNumbersInCommit ();
14321450 assert recoveryState .getStage () == RecoveryState .Stage .TRANSLOG : "TRANSLOG stage expected but was: " + recoveryState .getStage ();
14331451 }
14341452
14351453 private void trimUnsafeCommits () throws IOException {
1436- assert currentEngineReference .get () == null : "engine is running" ;
1454+ assert currentEngineReference .get () == null || currentEngineReference . get () instanceof ReadOnlyEngine : "a write engine is running" ;
14371455 final String translogUUID = store .readLastCommittedSegmentsInfo ().getUserData ().get (Translog .TRANSLOG_UUID_KEY );
14381456 final long globalCheckpoint = Translog .readGlobalCheckpoint (translogConfig .getTranslogPath (), translogUUID );
14391457 final long minRetainedTranslogGen = Translog .readMinTranslogGeneration (translogConfig .getTranslogPath (), translogUUID );
@@ -2266,31 +2284,6 @@ public void onFailedEngine(String reason, @Nullable Exception failure) {
22662284 }
22672285 }
22682286
2269- private Engine createNewEngine (EngineConfig config ) {
2270- synchronized (mutex ) {
2271- verifyNotClosed ();
2272- assert this .currentEngineReference .get () == null ;
2273- Engine engine = newEngine (config );
2274- onNewEngine (engine ); // call this before we pass the memory barrier otherwise actions that happen
2275- // inside the callback are not visible. This one enforces happens-before
2276- this .currentEngineReference .set (engine );
2277- }
2278-
2279- // time elapses after the engine is created above (pulling the config settings) until we set the engine reference, during which
2280- // settings changes could possibly have happened, so here we forcefully push any config changes to the new engine:
2281- Engine engine = getEngineOrNull ();
2282-
2283- // engine could perhaps be null if we were e.g. concurrently closed:
2284- if (engine != null ) {
2285- engine .onSettingsChanged ();
2286- }
2287- return engine ;
2288- }
2289-
2290- protected Engine newEngine (EngineConfig config ) {
2291- return engineFactory .newReadWriteEngine (config );
2292- }
2293-
22942287 private static void persistMetadata (
22952288 final ShardPath shardPath ,
22962289 final IndexSettings indexSettings ,
@@ -2814,21 +2807,47 @@ public ParsedDocument newNoopTombstoneDoc(String reason) {
28142807 void resetEngineToGlobalCheckpoint () throws IOException {
28152808 assert getActiveOperationsCount () == 0 : "Ongoing writes [" + getActiveOperations () + "]" ;
28162809 sync (); // persist the global checkpoint to disk
2817- final long globalCheckpoint = getGlobalCheckpoint ();
2818- final Engine newEngine ;
2810+ final SeqNoStats seqNoStats = seqNoStats ();
2811+ final TranslogStats translogStats = translogStats ();
2812+ // flush to make sure the latest commit, which will be opened by the read-only engine, includes all operations.
2813+ flush (new FlushRequest ().waitIfOngoing (true ));
28192814 synchronized (mutex ) {
28202815 verifyNotClosed ();
2821- IOUtils .close (currentEngineReference .getAndSet (null ));
2816+ // we must create a new engine under mutex (see IndexShard#snapshotStoreMetadata).
2817+ final Engine readOnlyEngine = new ReadOnlyEngine (newEngineConfig (), seqNoStats , translogStats , false , Function .identity ());
2818+ IOUtils .close (currentEngineReference .getAndSet (readOnlyEngine ));
2819+ }
2820+
2821+ Engine newEngine = null ;
2822+ try {
2823+ final long globalCheckpoint = getGlobalCheckpoint ();
28222824 trimUnsafeCommits ();
2823- newEngine = createNewEngine (newEngineConfig ());
2824- active .set (true );
2825+ synchronized (mutex ) {
2826+ verifyNotClosed ();
2827+ // we must create a new engine under mutex (see IndexShard#snapshotStoreMetadata).
2828+ newEngine = engineFactory .newReadWriteEngine (newEngineConfig ());
2829+ onNewEngine (newEngine );
2830+ }
2831+ newEngine .advanceMaxSeqNoOfUpdatesOrDeletes (globalCheckpoint );
2832+ final Engine .TranslogRecoveryRunner translogRunner = (engine , snapshot ) -> runTranslogRecovery (
2833+ engine , snapshot , Engine .Operation .Origin .LOCAL_RESET , () -> {
2834+ // TODO: add a dedicate recovery stats for the reset translog
2835+ });
2836+ newEngine .recoverFromTranslog (translogRunner , globalCheckpoint );
2837+ synchronized (mutex ) {
2838+ verifyNotClosed ();
2839+ IOUtils .close (currentEngineReference .getAndSet (newEngine ));
2840+ // We set active because we are now writing operations to the engine; this way,
2841+ // if we go idle after some time and become inactive, we still give sync'd flush a chance to run.
2842+ active .set (true );
2843+ newEngine = null ;
2844+ }
2845+ // time elapses after the engine is created above (pulling the config settings) until we set the engine reference, during
2846+ // which settings changes could possibly have happened, so here we forcefully push any config changes to the new engine.
2847+ onSettingsChanged ();
2848+ } finally {
2849+ IOUtils .close (newEngine );
28252850 }
2826- newEngine .advanceMaxSeqNoOfUpdatesOrDeletes (globalCheckpoint );
2827- final Engine .TranslogRecoveryRunner translogRunner = (engine , snapshot ) -> runTranslogRecovery (
2828- engine , snapshot , Engine .Operation .Origin .LOCAL_RESET , () -> {
2829- // TODO: add a dedicate recovery stats for the reset translog
2830- });
2831- newEngine .recoverFromTranslog (translogRunner , globalCheckpoint );
28322851 }
28332852
28342853 /**
0 commit comments