4848import org .elasticsearch .common .inject .Inject ;
4949import org .elasticsearch .common .io .stream .StreamInput ;
5050import org .elasticsearch .common .io .stream .StreamOutput ;
51+ import org .elasticsearch .common .settings .Setting ;
5152import org .elasticsearch .common .unit .TimeValue ;
5253import org .elasticsearch .index .shard .ShardId ;
5354import org .elasticsearch .node .NodeClosedException ;
7273import java .util .Objects ;
7374import java .util .Set ;
7475import java .util .function .Predicate ;
76+ import java .util .function .Supplier ;
7577
7678import static org .elasticsearch .index .seqno .SequenceNumbers .UNASSIGNED_PRIMARY_TERM ;
7779
@@ -82,10 +84,34 @@ public class ShardStateAction {
8284 public static final String SHARD_STARTED_ACTION_NAME = "internal:cluster/shard/started" ;
8385 public static final String SHARD_FAILED_ACTION_NAME = "internal:cluster/shard/failure" ;
8486
87+ /**
88+ * Adjusts the priority of the followup reroute task. NORMAL is right for reasonable clusters, but in a badly configured cluster it may
89+ * be necessary to raise this higher to recover the older behaviour of rerouting after processing every shard-started task. Deliberately
90+ * undocumented, since this is a last-resort escape hatch for experts rather than something we want to expose to anyone, and deprecated
91+ * since we will remove it once we have confirmed from experience that this priority is appropriate in all cases.
92+ */
93+ public static final Setting <Priority > FOLLOW_UP_REROUTE_PRIORITY_SETTING
94+ = new Setting <>("cluster.routing.allocation.shard_state.reroute.priority" , Priority .NORMAL .toString (),
95+ ShardStateAction ::parseReroutePriority , Setting .Property .NodeScope , Setting .Property .Dynamic , Setting .Property .Deprecated );
96+
97+ private static Priority parseReroutePriority (String priorityString ) {
98+ final Priority priority = Priority .valueOf (priorityString .toUpperCase (Locale .ROOT ));
99+ switch (priority ) {
100+ case NORMAL :
101+ case HIGH :
102+ case URGENT :
103+ return priority ;
104+ }
105+ throw new IllegalArgumentException (
106+ "priority [" + priority + "] not supported for [" + FOLLOW_UP_REROUTE_PRIORITY_SETTING .getKey () + "]" );
107+ }
108+
85109 private final TransportService transportService ;
86110 private final ClusterService clusterService ;
87111 private final ThreadPool threadPool ;
88112
113+ private volatile Priority followUpRerouteTaskPriority ;
114+
89115 // a list of shards that failed during replication
90116 // we keep track of these shards in order to avoid sending duplicate failed shard requests for a single failing shard.
91117 private final TransportRequestDeduplicator <FailedShardEntry > remoteFailedShardsDeduplicator = new TransportRequestDeduplicator <>();
@@ -97,11 +123,18 @@ public ShardStateAction(ClusterService clusterService, TransportService transpor
97123 this .clusterService = clusterService ;
98124 this .threadPool = threadPool ;
99125
126+ followUpRerouteTaskPriority = FOLLOW_UP_REROUTE_PRIORITY_SETTING .get (clusterService .getSettings ());
127+ clusterService .getClusterSettings ().addSettingsUpdateConsumer (FOLLOW_UP_REROUTE_PRIORITY_SETTING ,
128+ this ::setFollowUpRerouteTaskPriority );
129+
100130 transportService .registerRequestHandler (SHARD_STARTED_ACTION_NAME , ThreadPool .Names .SAME , StartedShardEntry ::new ,
101- new ShardStartedTransportHandler (clusterService , new ShardStartedClusterStateTaskExecutor (allocationService , logger ), logger ));
131+ new ShardStartedTransportHandler (clusterService ,
132+ new ShardStartedClusterStateTaskExecutor (allocationService , rerouteService , () -> followUpRerouteTaskPriority , logger ),
133+ logger ));
102134 transportService .registerRequestHandler (SHARD_FAILED_ACTION_NAME , ThreadPool .Names .SAME , FailedShardEntry ::new ,
103135 new ShardFailedTransportHandler (clusterService ,
104- new ShardFailedClusterStateTaskExecutor (allocationService , rerouteService , logger ), logger ));
136+ new ShardFailedClusterStateTaskExecutor (allocationService , rerouteService , () -> followUpRerouteTaskPriority , logger ),
137+ logger ));
105138 }
106139
107140 private void sendShardAction (final String actionName , final ClusterState currentState ,
@@ -218,6 +251,10 @@ public void onTimeout(TimeValue timeout) {
218251 }, changePredicate );
219252 }
220253
254+ private void setFollowUpRerouteTaskPriority (Priority followUpRerouteTaskPriority ) {
255+ this .followUpRerouteTaskPriority = followUpRerouteTaskPriority ;
256+ }
257+
221258 private static class ShardFailedTransportHandler implements TransportRequestHandler <FailedShardEntry > {
222259 private final ClusterService clusterService ;
223260 private final ShardFailedClusterStateTaskExecutor shardFailedClusterStateTaskExecutor ;
@@ -285,11 +322,14 @@ public static class ShardFailedClusterStateTaskExecutor implements ClusterStateT
285322 private final AllocationService allocationService ;
286323 private final RerouteService rerouteService ;
287324 private final Logger logger ;
325+ private final Supplier <Priority > prioritySupplier ;
288326
289- public ShardFailedClusterStateTaskExecutor (AllocationService allocationService , RerouteService rerouteService , Logger logger ) {
327+ public ShardFailedClusterStateTaskExecutor (AllocationService allocationService , RerouteService rerouteService ,
328+ Supplier <Priority > prioritySupplier , Logger logger ) {
290329 this .allocationService = allocationService ;
291330 this .rerouteService = rerouteService ;
292331 this .logger = logger ;
332+ this .prioritySupplier = prioritySupplier ;
293333 }
294334
295335 @ Override
@@ -383,7 +423,7 @@ public void clusterStatePublished(ClusterChangedEvent clusterChangedEvent) {
383423 // assign it again, even if that means putting it back on the node on which it previously failed:
384424 final String reason = String .format (Locale .ROOT , "[%d] unassigned shards after failing shards" , numberOfUnassignedShards );
385425 logger .trace ("{}, scheduling a reroute" , reason );
386- rerouteService .reroute (reason , Priority . HIGH , ActionListener .wrap (
426+ rerouteService .reroute (reason , prioritySupplier . get () , ActionListener .wrap (
387427 r -> logger .trace ("{}, reroute completed" , reason ),
388428 e -> logger .debug (new ParameterizedMessage ("{}, reroute failed" , reason ), e )));
389429 }
@@ -520,10 +560,15 @@ public static class ShardStartedClusterStateTaskExecutor
520560 implements ClusterStateTaskExecutor <StartedShardEntry >, ClusterStateTaskListener {
521561 private final AllocationService allocationService ;
522562 private final Logger logger ;
563+ private final RerouteService rerouteService ;
564+ private final Supplier <Priority > prioritySupplier ;
523565
524- public ShardStartedClusterStateTaskExecutor (AllocationService allocationService , Logger logger ) {
566+ public ShardStartedClusterStateTaskExecutor (AllocationService allocationService , RerouteService rerouteService ,
567+ Supplier <Priority > prioritySupplier , Logger logger ) {
525568 this .allocationService = allocationService ;
526569 this .logger = logger ;
570+ this .rerouteService = rerouteService ;
571+ this .prioritySupplier = prioritySupplier ;
527572 }
528573
529574 @ Override
@@ -598,6 +643,13 @@ public void onFailure(String source, Exception e) {
598643 logger .error (() -> new ParameterizedMessage ("unexpected failure during [{}]" , source ), e );
599644 }
600645 }
646+
647+ @ Override
648+ public void clusterStatePublished (ClusterChangedEvent clusterChangedEvent ) {
649+ rerouteService .reroute ("reroute after starting shards" , prioritySupplier .get (), ActionListener .wrap (
650+ r -> logger .trace ("reroute after starting shards succeeded" ),
651+ e -> logger .debug ("reroute after starting shards failed" , e )));
652+ }
601653 }
602654
603655 public static class StartedShardEntry extends TransportRequest {
0 commit comments