4040import org .elasticsearch .Assertions ;
4141import org .elasticsearch .ElasticsearchException ;
4242import org .elasticsearch .ExceptionsHelper ;
43- import org .elasticsearch .Version ;
4443import org .elasticsearch .action .ActionListener ;
4544import org .elasticsearch .action .admin .indices .flush .FlushRequest ;
4645import org .elasticsearch .action .admin .indices .forcemerge .ForceMergeRequest ;
@@ -326,17 +325,15 @@ public IndexShard(
326325 this .pendingPrimaryTerm = primaryTerm ;
327326 this .globalCheckpointListeners =
328327 new GlobalCheckpointListeners (shardId , threadPool .executor (ThreadPool .Names .LISTENER ), threadPool .scheduler (), logger );
329- final ReplicationTracker replicationTracker =
330- new ReplicationTracker (
331- shardId ,
332- aId ,
333- indexSettings ,
334- primaryTerm ,
335- UNASSIGNED_SEQ_NO ,
336- globalCheckpointListeners ::globalCheckpointUpdated ,
337- threadPool ::absoluteTimeInMillis ,
338- (retentionLeases , listener ) -> retentionLeaseSyncer .sync (shardId , retentionLeases , listener ));
339- this .replicationTracker = replicationTracker ;
328+ this .replicationTracker = new ReplicationTracker (
329+ shardId ,
330+ aId ,
331+ indexSettings ,
332+ primaryTerm ,
333+ UNASSIGNED_SEQ_NO ,
334+ globalCheckpointListeners ::globalCheckpointUpdated ,
335+ threadPool ::absoluteTimeInMillis ,
336+ (retentionLeases , listener ) -> retentionLeaseSyncer .sync (shardId , retentionLeases , listener ));
340337
341338 // the query cache is a node-level thing, however we want the most popular filters
342339 // to be computed on a per-shard basis
@@ -443,16 +440,17 @@ public void updateShardState(final ShardRouting newRouting,
443440 final ShardRouting currentRouting ;
444441 synchronized (mutex ) {
445442 currentRouting = this .shardRouting ;
443+ assert currentRouting != null ;
446444
447445 if (!newRouting .shardId ().equals (shardId ())) {
448446 throw new IllegalArgumentException ("Trying to set a routing entry with shardId " +
449447 newRouting .shardId () + " on a shard with shardId " + shardId ());
450448 }
451- if (( currentRouting == null || newRouting .isSameAllocation (currentRouting ) ) == false ) {
449+ if (newRouting .isSameAllocation (currentRouting ) == false ) {
452450 throw new IllegalArgumentException ("Trying to set a routing entry with a different allocation. Current " +
453451 currentRouting + ", new " + newRouting );
454452 }
455- if (currentRouting != null && currentRouting .primary () && newRouting .primary () == false ) {
453+ if (currentRouting .primary () && newRouting .primary () == false ) {
456454 throw new IllegalArgumentException ("illegal state: trying to move shard from primary mode to replica mode. Current "
457455 + currentRouting + ", new " + newRouting );
458456 }
@@ -586,7 +584,7 @@ public void onFailure(Exception e) {
586584 : "a started primary with non-pending operation term must be in primary mode " + this .shardRouting ;
587585 shardStateUpdated .countDown ();
588586 }
589- if (currentRouting != null && currentRouting .active () == false && newRouting .active ()) {
587+ if (currentRouting .active () == false && newRouting .active ()) {
590588 indexEventListener .afterIndexShardStarted (this );
591589 }
592590 if (newRouting .equals (currentRouting ) == false ) {
@@ -631,8 +629,7 @@ public IndexShardState markAsRecovering(String reason, RecoveryState recoverySta
631629 public void relocated (final String targetAllocationId , final Consumer <ReplicationTracker .PrimaryContext > consumer )
632630 throws IllegalIndexShardStateException , IllegalStateException , InterruptedException {
633631 assert shardRouting .primary () : "only primaries can be marked as relocated: " + shardRouting ;
634- final Releasable forceRefreshes = refreshListeners .forceRefreshes ();
635- try {
632+ try (Releasable forceRefreshes = refreshListeners .forceRefreshes ()) {
636633 indexShardOperationPermits .blockOperations (30 , TimeUnit .MINUTES , () -> {
637634 forceRefreshes .close ();
638635 // no shard operation permits are being held here, move state from started to relocated
@@ -665,8 +662,6 @@ public void relocated(final String targetAllocationId, final Consumer<Replicatio
665662 // Fail primary relocation source and target shards.
666663 failShard ("timed out waiting for relocation hand-off to complete" , null );
667664 throw new IndexShardClosedException (shardId (), "timed out waiting for relocation hand-off to complete" );
668- } finally {
669- forceRefreshes .close ();
670665 }
671666 }
672667
@@ -745,7 +740,7 @@ private Engine.IndexResult applyIndexOperation(Engine engine, long seqNo, long o
745740 sourceWithResolvedType = new SourceToParse (sourceToParse .index (), resolvedType , sourceToParse .id (),
746741 sourceToParse .source (), sourceToParse .getXContentType (), sourceToParse .routing ());
747742 }
748- operation = prepareIndex (docMapper (resolvedType ), indexSettings . getIndexVersionCreated (), sourceWithResolvedType ,
743+ operation = prepareIndex (docMapper (resolvedType ), sourceWithResolvedType ,
749744 seqNo , opPrimaryTerm , version , versionType , origin , autoGeneratedTimeStamp , isRetry , ifSeqNo , ifPrimaryTerm );
750745 Mapping update = operation .parsedDoc ().dynamicMappingsUpdate ();
751746 if (update != null ) {
@@ -763,7 +758,7 @@ private Engine.IndexResult applyIndexOperation(Engine engine, long seqNo, long o
763758 return index (engine , operation );
764759 }
765760
766- public static Engine .Index prepareIndex (DocumentMapperForType docMapper , Version indexCreatedVersion , SourceToParse source , long seqNo ,
761+ public static Engine .Index prepareIndex (DocumentMapperForType docMapper , SourceToParse source , long seqNo ,
767762 long primaryTerm , long version , VersionType versionType , Engine .Operation .Origin origin ,
768763 long autoGeneratedIdTimestamp , boolean isRetry ,
769764 long ifSeqNo , long ifPrimaryTerm ) {
@@ -1529,7 +1524,7 @@ private void innerOpenEngineAndTranslog() throws IOException {
15291524 // time elapses after the engine is created above (pulling the config settings) until we set the engine reference, during
15301525 // which settings changes could possibly have happened, so here we forcefully push any config changes to the new engine.
15311526 onSettingsChanged ();
1532- assertSequenceNumbersInCommit ();
1527+ assert assertSequenceNumbersInCommit ();
15331528 assert recoveryState .getStage () == RecoveryState .Stage .TRANSLOG : "TRANSLOG stage expected but was: " + recoveryState .getStage ();
15341529 }
15351530
@@ -1546,7 +1541,7 @@ private boolean assertSequenceNumbersInCommit() throws IOException {
15461541 return true ;
15471542 }
15481543
1549- protected void onNewEngine (Engine newEngine ) {
1544+ private void onNewEngine (Engine newEngine ) {
15501545 refreshListeners .setCurrentRefreshLocationSupplier (newEngine ::getTranslogLastWriteLocation );
15511546 }
15521547
@@ -1858,10 +1853,6 @@ public List<Segment> segments(boolean verbose) {
18581853 return getEngine ().segments (verbose );
18591854 }
18601855
1861- public void flushAndCloseEngine () throws IOException {
1862- getEngine ().flushAndClose ();
1863- }
1864-
18651856 public String getHistoryUUID () {
18661857 return getEngine ().getHistoryUUID ();
18671858 }
@@ -2876,7 +2867,7 @@ protected void write(List<Tuple<Translog.Location, Consumer<Exception>>> candida
28762867 }
28772868 }
28782869 };
2879- };
2870+ }
28802871
28812872 /**
28822873 * Syncs the given location with the underlying storage unless already synced. This method might return immediately without
@@ -2988,7 +2979,7 @@ private RefreshListeners buildRefreshListeners() {
29882979 return new RefreshListeners (
29892980 indexSettings ::getMaxRefreshListeners ,
29902981 () -> refresh ("too_many_listeners" ),
2991- threadPool .executor (ThreadPool .Names .LISTENER ):: execute ,
2982+ threadPool .executor (ThreadPool .Names .LISTENER ),
29922983 logger , threadPool .getThreadContext (),
29932984 externalRefreshMetric );
29942985 }
0 commit comments