4747import org .elasticsearch .common .inject .Inject ;
4848import org .elasticsearch .common .io .stream .StreamInput ;
4949import org .elasticsearch .common .io .stream .StreamOutput ;
50- import org .elasticsearch .common .settings .Setting ;
5150import org .elasticsearch .common .unit .TimeValue ;
5251import org .elasticsearch .index .shard .ShardId ;
5352import org .elasticsearch .node .NodeClosedException ;
7271import java .util .Objects ;
7372import java .util .Set ;
7473import java .util .function .Predicate ;
75- import java .util .function .Supplier ;
7674
7775public class ShardStateAction {
7876
@@ -81,34 +79,10 @@ public class ShardStateAction {
8179 public static final String SHARD_STARTED_ACTION_NAME = "internal:cluster/shard/started" ;
8280 public static final String SHARD_FAILED_ACTION_NAME = "internal:cluster/shard/failure" ;
8381
84- /**
85- * Adjusts the priority of the followup reroute task. NORMAL is right for reasonable clusters, but in a badly configured cluster it may
86- * be necessary to raise this higher to recover the older behaviour of rerouting after processing every shard-started task. Deliberately
87- * undocumented, since this is a last-resort escape hatch for experts rather than something we want to expose to anyone, and deprecated
88- * since we will remove it once we have confirmed from experience that this priority is appropriate in all cases.
89- */
90- public static final Setting <Priority > FOLLOW_UP_REROUTE_PRIORITY_SETTING
91- = new Setting <>("cluster.routing.allocation.shard_state.reroute.priority" , Priority .NORMAL .toString (),
92- ShardStateAction ::parseReroutePriority , Setting .Property .NodeScope , Setting .Property .Dynamic , Setting .Property .Deprecated );
93-
94- private static Priority parseReroutePriority (String priorityString ) {
95- final Priority priority = Priority .valueOf (priorityString .toUpperCase (Locale .ROOT ));
96- switch (priority ) {
97- case NORMAL :
98- case HIGH :
99- case URGENT :
100- return priority ;
101- }
102- throw new IllegalArgumentException (
103- "priority [" + priority + "] not supported for [" + FOLLOW_UP_REROUTE_PRIORITY_SETTING .getKey () + "]" );
104- }
105-
10682 private final TransportService transportService ;
10783 private final ClusterService clusterService ;
10884 private final ThreadPool threadPool ;
10985
110- private volatile Priority followUpRerouteTaskPriority ;
111-
11286 // a list of shards that failed during replication
11387 // we keep track of these shards in order to avoid sending duplicate failed shard requests for a single failing shard.
11488 private final TransportRequestDeduplicator <FailedShardEntry > remoteFailedShardsDeduplicator = new TransportRequestDeduplicator <>();
@@ -120,17 +94,13 @@ public ShardStateAction(ClusterService clusterService, TransportService transpor
12094 this .clusterService = clusterService ;
12195 this .threadPool = threadPool ;
12296
123- followUpRerouteTaskPriority = FOLLOW_UP_REROUTE_PRIORITY_SETTING .get (clusterService .getSettings ());
124- clusterService .getClusterSettings ().addSettingsUpdateConsumer (FOLLOW_UP_REROUTE_PRIORITY_SETTING ,
125- this ::setFollowUpRerouteTaskPriority );
126-
12797 transportService .registerRequestHandler (SHARD_STARTED_ACTION_NAME , ThreadPool .Names .SAME , StartedShardEntry ::new ,
12898 new ShardStartedTransportHandler (clusterService ,
129- new ShardStartedClusterStateTaskExecutor (allocationService , rerouteService , () -> followUpRerouteTaskPriority , logger ),
99+ new ShardStartedClusterStateTaskExecutor (allocationService , rerouteService , logger ),
130100 logger ));
131101 transportService .registerRequestHandler (SHARD_FAILED_ACTION_NAME , ThreadPool .Names .SAME , FailedShardEntry ::new ,
132102 new ShardFailedTransportHandler (clusterService ,
133- new ShardFailedClusterStateTaskExecutor (allocationService , rerouteService , () -> followUpRerouteTaskPriority , logger ),
103+ new ShardFailedClusterStateTaskExecutor (allocationService , rerouteService , logger ),
134104 logger ));
135105 }
136106
@@ -248,10 +218,6 @@ public void onTimeout(TimeValue timeout) {
248218 }, changePredicate );
249219 }
250220
251- private void setFollowUpRerouteTaskPriority (Priority followUpRerouteTaskPriority ) {
252- this .followUpRerouteTaskPriority = followUpRerouteTaskPriority ;
253- }
254-
255221 private static class ShardFailedTransportHandler implements TransportRequestHandler <FailedShardEntry > {
256222 private final ClusterService clusterService ;
257223 private final ShardFailedClusterStateTaskExecutor shardFailedClusterStateTaskExecutor ;
@@ -319,14 +285,11 @@ public static class ShardFailedClusterStateTaskExecutor implements ClusterStateT
319285 private final AllocationService allocationService ;
320286 private final RerouteService rerouteService ;
321287 private final Logger logger ;
322- private final Supplier <Priority > prioritySupplier ;
323288
324- public ShardFailedClusterStateTaskExecutor (AllocationService allocationService , RerouteService rerouteService ,
325- Supplier <Priority > prioritySupplier , Logger logger ) {
289+ public ShardFailedClusterStateTaskExecutor (AllocationService allocationService , RerouteService rerouteService , Logger logger ) {
326290 this .allocationService = allocationService ;
327291 this .rerouteService = rerouteService ;
328292 this .logger = logger ;
329- this .prioritySupplier = prioritySupplier ;
330293 }
331294
332295 @ Override
@@ -420,7 +383,7 @@ public void clusterStatePublished(ClusterChangedEvent clusterChangedEvent) {
420383 // assign it again, even if that means putting it back on the node on which it previously failed:
421384 final String reason = String .format (Locale .ROOT , "[%d] unassigned shards after failing shards" , numberOfUnassignedShards );
422385 logger .trace ("{}, scheduling a reroute" , reason );
423- rerouteService .reroute (reason , prioritySupplier . get () , ActionListener .wrap (
386+ rerouteService .reroute (reason , Priority . NORMAL , ActionListener .wrap (
424387 r -> logger .trace ("{}, reroute completed" , reason ),
425388 e -> logger .debug (new ParameterizedMessage ("{}, reroute failed" , reason ), e )));
426389 }
@@ -552,14 +515,11 @@ public static class ShardStartedClusterStateTaskExecutor
552515 private final AllocationService allocationService ;
553516 private final Logger logger ;
554517 private final RerouteService rerouteService ;
555- private final Supplier <Priority > prioritySupplier ;
556518
557- public ShardStartedClusterStateTaskExecutor (AllocationService allocationService , RerouteService rerouteService ,
558- Supplier <Priority > prioritySupplier , Logger logger ) {
519+ public ShardStartedClusterStateTaskExecutor (AllocationService allocationService , RerouteService rerouteService , Logger logger ) {
559520 this .allocationService = allocationService ;
560521 this .logger = logger ;
561522 this .rerouteService = rerouteService ;
562- this .prioritySupplier = prioritySupplier ;
563523 }
564524
565525 @ Override
@@ -637,7 +597,7 @@ public void onFailure(String source, Exception e) {
637597
638598 @ Override
639599 public void clusterStatePublished (ClusterChangedEvent clusterChangedEvent ) {
640- rerouteService .reroute ("reroute after starting shards" , prioritySupplier . get () , ActionListener .wrap (
600+ rerouteService .reroute ("reroute after starting shards" , Priority . NORMAL , ActionListener .wrap (
641601 r -> logger .trace ("reroute after starting shards succeeded" ),
642602 e -> logger .debug ("reroute after starting shards failed" , e )));
643603 }
0 commit comments