3939import org .elasticsearch .common .inject .Inject ;
4040import org .elasticsearch .common .io .stream .StreamInput ;
4141import org .elasticsearch .common .io .stream .StreamOutput ;
42+ import org .elasticsearch .common .logging .ESLogger ;
4243import org .elasticsearch .common .settings .Settings ;
4344import org .elasticsearch .common .unit .TimeValue ;
4445import org .elasticsearch .threadpool .ThreadPool ;
5859
5960import static org .elasticsearch .cluster .routing .ShardRouting .readShardRoutingEntry ;
6061
61-
6262public class ShardStateAction extends AbstractComponent {
6363 public static final String SHARD_STARTED_ACTION_NAME = "internal:cluster/shard/started" ;
6464 public static final String SHARD_FAILED_ACTION_NAME = "internal:cluster/shard/failure" ;
6565
6666 private final TransportService transportService ;
67- private final ClusterService clusterService ;
68- private final AllocationService allocationService ;
69- private final RoutingService routingService ;
7067
7168 @ Inject
7269 public ShardStateAction (Settings settings , ClusterService clusterService , TransportService transportService ,
7370 AllocationService allocationService , RoutingService routingService ) {
7471 super (settings );
75- this .clusterService = clusterService ;
7672 this .transportService = transportService ;
77- this .allocationService = allocationService ;
78- this .routingService = routingService ;
7973
80- transportService .registerRequestHandler (SHARD_STARTED_ACTION_NAME , ShardRoutingEntry ::new , ThreadPool .Names .SAME , new ShardStartedTransportHandler ());
81- transportService .registerRequestHandler (SHARD_FAILED_ACTION_NAME , ShardRoutingEntry ::new , ThreadPool .Names .SAME , new ShardFailedTransportHandler ());
74+ transportService .registerRequestHandler (SHARD_STARTED_ACTION_NAME , ShardRoutingEntry ::new , ThreadPool .Names .SAME , new ShardStartedTransportHandler (clusterService , new ShardStartedClusterStateTaskExecutor (allocationService , logger ), logger ));
75+ transportService .registerRequestHandler (SHARD_FAILED_ACTION_NAME , ShardRoutingEntry ::new , ThreadPool .Names .SAME , new ShardFailedTransportHandler (clusterService , new ShardFailedClusterStateTaskExecutor (allocationService , routingService , logger ), logger ));
76+ }
77+
78+ public void shardFailed (final ClusterState clusterState , final ShardRouting shardRouting , final String indexUUID , final String message , @ Nullable final Throwable failure , Listener listener ) {
79+ shardFailed (clusterState , shardRouting , indexUUID , message , failure , null , listener );
8280 }
8381
84- public void shardFailed (final ShardRouting shardRouting , final String indexUUID , final String message , @ Nullable final Throwable failure , Listener listener ) {
85- shardFailed (shardRouting , indexUUID , message , failure , null , listener );
82+ public void resendShardFailed (final ClusterState clusterState , final ShardRouting shardRouting , final String indexUUID , final String message , @ Nullable final Throwable failure , Listener listener ) {
83+ logger .trace ("{} re-sending failed shard [{}], index UUID [{}], reason [{}]" , shardRouting .shardId (), failure , shardRouting , indexUUID , message );
84+ shardFailed (clusterState , shardRouting , indexUUID , message , failure , listener );
8685 }
8786
88- public void shardFailed (final ShardRouting shardRouting , final String indexUUID , final String message , @ Nullable final Throwable failure , TimeValue timeout , Listener listener ) {
89- DiscoveryNode masterNode = clusterService . state () .nodes ().masterNode ();
87+ public void shardFailed (final ClusterState clusterState , final ShardRouting shardRouting , final String indexUUID , final String message , @ Nullable final Throwable failure , TimeValue timeout , Listener listener ) {
88+ DiscoveryNode masterNode = clusterState .nodes ().masterNode ();
9089 if (masterNode == null ) {
91- logger .warn ("can't send shard failed for {}, no master known." , shardRouting );
90+ logger .warn ("{} no master known to fail shard [{}]" , shardRouting . shardId () , shardRouting );
9291 listener .onShardFailedNoMaster ();
9392 return ;
9493 }
95- innerShardFailed (shardRouting , indexUUID , masterNode , message , failure , timeout , listener );
96- }
97-
98- public void resendShardFailed (final ShardRouting shardRouting , final String indexUUID , final DiscoveryNode masterNode , final String message , @ Nullable final Throwable failure , Listener listener ) {
99- logger .trace ("{} re-sending failed shard for {}, indexUUID [{}], reason [{}]" , failure , shardRouting .shardId (), shardRouting , indexUUID , message );
100- innerShardFailed (shardRouting , indexUUID , masterNode , message , failure , null , listener );
101- }
102-
103- private void innerShardFailed (final ShardRouting shardRouting , final String indexUUID , final DiscoveryNode masterNode , final String message , final Throwable failure , TimeValue timeout , Listener listener ) {
10494 ShardRoutingEntry shardRoutingEntry = new ShardRoutingEntry (shardRouting , indexUUID , message , failure );
10595 TransportRequestOptions options = TransportRequestOptions .EMPTY ;
10696 if (timeout != null ) {
@@ -115,33 +105,49 @@ public void handleResponse(TransportResponse.Empty response) {
115105
116106 @ Override
117107 public void handleException (TransportException exp ) {
118- logger .warn ("unexpected failure while sending request to [{}] to fail shard [{}]" , exp , masterNode , shardRoutingEntry );
108+ logger .warn ("{} unexpected failure while sending request to [{}] to fail shard [{}]" , exp , shardRoutingEntry . shardRouting . shardId () , masterNode , shardRoutingEntry );
119109 listener .onShardFailedFailure (masterNode , exp );
120110 }
121111 });
122112 }
123113
124- private class ShardFailedTransportHandler implements TransportRequestHandler <ShardRoutingEntry > {
114+ private static class ShardFailedTransportHandler implements TransportRequestHandler <ShardRoutingEntry > {
115+ private final ClusterService clusterService ;
116+ private final ShardFailedClusterStateTaskExecutor shardFailedClusterStateTaskExecutor ;
117+ private final ESLogger logger ;
118+
119+ public ShardFailedTransportHandler (ClusterService clusterService , ShardFailedClusterStateTaskExecutor shardFailedClusterStateTaskExecutor , ESLogger logger ) {
120+ this .clusterService = clusterService ;
121+ this .shardFailedClusterStateTaskExecutor = shardFailedClusterStateTaskExecutor ;
122+ this .logger = logger ;
123+ }
124+
125125 @ Override
126126 public void messageReceived (ShardRoutingEntry request , TransportChannel channel ) throws Exception {
127- handleShardFailureOnMaster (request , new ClusterStateTaskListener () {
127+ logger .warn ("{} received shard failed for {}" , request .failure , request .shardRouting .shardId (), request );
128+ clusterService .submitStateUpdateTask (
129+ "shard-failed (" + request .shardRouting + "), message [" + request .message + "]" ,
130+ request ,
131+ ClusterStateTaskConfig .build (Priority .HIGH ),
132+ shardFailedClusterStateTaskExecutor ,
133+ new ClusterStateTaskListener () {
128134 @ Override
129135 public void onFailure (String source , Throwable t ) {
130- logger .error ("unexpected failure while failing shard [{}]" , t , request .shardRouting );
136+ logger .error ("{} unexpected failure while failing shard [{}]" , t , request . shardRouting . shardId () , request .shardRouting );
131137 try {
132138 channel .sendResponse (t );
133139 } catch (Throwable channelThrowable ) {
134- logger .warn ("failed to send failure [{}] while failing shard [{}]" , channelThrowable , t , request .shardRouting );
140+ logger .warn ("{} failed to send failure [{}] while failing shard [{}]" , channelThrowable , request . shardRouting . shardId () , t , request .shardRouting );
135141 }
136142 }
137143
138144 @ Override
139145 public void onNoLongerMaster (String source ) {
140- logger .error ("no longer master while failing shard [{}]" , request .shardRouting );
146+ logger .error ("{} no longer master while failing shard [{}]" , request . shardRouting . shardId () , request .shardRouting );
141147 try {
142148 channel .sendResponse (new NotMasterException (source ));
143149 } catch (Throwable channelThrowable ) {
144- logger .warn ("failed to send no longer master while failing shard [{}]" , channelThrowable , request .shardRouting );
150+ logger .warn ("{} failed to send no longer master while failing shard [{}]" , channelThrowable , request . shardRouting . shardId () , request .shardRouting );
145151 }
146152 }
147153
@@ -150,15 +156,25 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
150156 try {
151157 channel .sendResponse (TransportResponse .Empty .INSTANCE );
152158 } catch (Throwable channelThrowable ) {
153- logger .warn ("failed to send response while failing shard [{}]" , channelThrowable , request .shardRouting );
159+ logger .warn ("{} failed to send response while failing shard [{}]" , channelThrowable , request . shardRouting . shardId () , request .shardRouting );
154160 }
155161 }
156162 }
157163 );
158164 }
159165 }
160166
161- class ShardFailedClusterStateHandler implements ClusterStateTaskExecutor <ShardRoutingEntry > {
167+ private static class ShardFailedClusterStateTaskExecutor implements ClusterStateTaskExecutor <ShardRoutingEntry > {
168+ private final AllocationService allocationService ;
169+ private final RoutingService routingService ;
170+ private final ESLogger logger ;
171+
172+ public ShardFailedClusterStateTaskExecutor (AllocationService allocationService , RoutingService routingService , ESLogger logger ) {
173+ this .allocationService = allocationService ;
174+ this .routingService = routingService ;
175+ this .logger = logger ;
176+ }
177+
162178 @ Override
163179 public BatchResult <ShardRoutingEntry > execute (ClusterState currentState , List <ShardRoutingEntry > tasks ) throws Exception {
164180 BatchResult .Builder <ShardRoutingEntry > batchResultBuilder = BatchResult .builder ();
@@ -192,48 +208,56 @@ public void clusterStatePublished(ClusterState newClusterState) {
192208 }
193209 }
194210
195- private final ShardFailedClusterStateHandler shardFailedClusterStateHandler = new ShardFailedClusterStateHandler ();
196-
197- private void handleShardFailureOnMaster (final ShardRoutingEntry shardRoutingEntry , ClusterStateTaskListener listener ) {
198- logger .warn ("{} received shard failed for {}" , shardRoutingEntry .failure , shardRoutingEntry .shardRouting .shardId (), shardRoutingEntry );
199- clusterService .submitStateUpdateTask (
200- "shard-failed (" + shardRoutingEntry .shardRouting + "), message [" + shardRoutingEntry .message + "]" ,
201- shardRoutingEntry ,
202- ClusterStateTaskConfig .build (Priority .HIGH ),
203- shardFailedClusterStateHandler ,
204- listener );
205- }
206-
207- public void shardStarted (final ShardRouting shardRouting , String indexUUID , final String reason ) {
208- DiscoveryNode masterNode = clusterService .state ().nodes ().masterNode ();
211+ public void shardStarted (final ClusterState clusterState , final ShardRouting shardRouting , String indexUUID , final String reason ) {
212+ DiscoveryNode masterNode = clusterState .nodes ().masterNode ();
209213 if (masterNode == null ) {
210- logger .warn ("{} can't send shard started for {}, no master known. " , shardRouting .shardId (), shardRouting );
214+ logger .warn ("{} no master known to start shard [{}] " , shardRouting .shardId (), shardRouting );
211215 return ;
212216 }
213- shardStarted (shardRouting , indexUUID , reason , masterNode );
214- }
215-
216- public void shardStarted (final ShardRouting shardRouting , String indexUUID , final String reason , final DiscoveryNode masterNode ) {
217217 ShardRoutingEntry shardRoutingEntry = new ShardRoutingEntry (shardRouting , indexUUID , reason , null );
218- logger .debug ("{} sending shard started for {}" , shardRoutingEntry . shardRouting . shardId () , shardRoutingEntry );
218+ logger .debug ("sending start shard [{}]" , shardRoutingEntry );
219219 transportService .sendRequest (masterNode ,
220220 SHARD_STARTED_ACTION_NAME , new ShardRoutingEntry (shardRouting , indexUUID , reason , null ), new EmptyTransportResponseHandler (ThreadPool .Names .SAME ) {
221221 @ Override
222222 public void handleException (TransportException exp ) {
223- logger .warn ("failed to send shard started to [{}]" , exp , masterNode );
223+ logger .warn ("{} failure sending start shard [{}] to [{}]" , exp , shardRouting . shardId (), masterNode , shardRouting );
224224 }
225225 });
226226 }
227227
228- class ShardStartedTransportHandler implements TransportRequestHandler <ShardRoutingEntry > {
228+ private static class ShardStartedTransportHandler implements TransportRequestHandler <ShardRoutingEntry > {
229+ private final ClusterService clusterService ;
230+ private final ShardStartedClusterStateTaskExecutor shardStartedClusterStateTaskExecutor ;
231+ private final ESLogger logger ;
232+
233+ public ShardStartedTransportHandler (ClusterService clusterService , ShardStartedClusterStateTaskExecutor shardStartedClusterStateTaskExecutor , ESLogger logger ) {
234+ this .clusterService = clusterService ;
235+ this .shardStartedClusterStateTaskExecutor = shardStartedClusterStateTaskExecutor ;
236+ this .logger = logger ;
237+ }
238+
229239 @ Override
230240 public void messageReceived (ShardRoutingEntry request , TransportChannel channel ) throws Exception {
231- handleShardStartedOnMaster (request );
241+ logger .debug ("{} received shard started for [{}]" , request .shardRouting .shardId (), request );
242+ clusterService .submitStateUpdateTask (
243+ "shard-started (" + request .shardRouting + "), reason [" + request .message + "]" ,
244+ request ,
245+ ClusterStateTaskConfig .build (Priority .URGENT ),
246+ shardStartedClusterStateTaskExecutor ,
247+ shardStartedClusterStateTaskExecutor );
232248 channel .sendResponse (TransportResponse .Empty .INSTANCE );
233249 }
234250 }
235251
236- class ShardStartedClusterStateHandler implements ClusterStateTaskExecutor <ShardRoutingEntry >, ClusterStateTaskListener {
252+ private static class ShardStartedClusterStateTaskExecutor implements ClusterStateTaskExecutor <ShardRoutingEntry >, ClusterStateTaskListener {
253+ private final AllocationService allocationService ;
254+ private final ESLogger logger ;
255+
256+ public ShardStartedClusterStateTaskExecutor (AllocationService allocationService , ESLogger logger ) {
257+ this .allocationService = allocationService ;
258+ this .logger = logger ;
259+ }
260+
237261 @ Override
238262 public BatchResult <ShardRoutingEntry > execute (ClusterState currentState , List <ShardRoutingEntry > tasks ) throws Exception {
239263 BatchResult .Builder <ShardRoutingEntry > builder = BatchResult .builder ();
@@ -262,19 +286,6 @@ public void onFailure(String source, Throwable t) {
262286 }
263287 }
264288
265- private final ShardStartedClusterStateHandler shardStartedClusterStateHandler = new ShardStartedClusterStateHandler ();
266-
267- private void handleShardStartedOnMaster (final ShardRoutingEntry shardRoutingEntry ) {
268- logger .debug ("received shard started for {}" , shardRoutingEntry );
269-
270- clusterService .submitStateUpdateTask (
271- "shard-started (" + shardRoutingEntry .shardRouting + "), reason [" + shardRoutingEntry .message + "]" ,
272- shardRoutingEntry ,
273- ClusterStateTaskConfig .build (Priority .URGENT ),
274- shardStartedClusterStateHandler ,
275- shardStartedClusterStateHandler );
276- }
277-
278289 public static class ShardRoutingEntry extends TransportRequest {
279290 ShardRouting shardRouting ;
280291 String indexUUID = IndexMetaData .INDEX_UUID_NA_VALUE ;
0 commit comments