@@ -236,39 +236,9 @@ protected TransportRequestOptions transportOptions(Settings settings) {
236236 return TransportRequestOptions .EMPTY ;
237237 }
238238
239- private String concreteIndex (final ClusterState state , final ReplicationRequest request ) {
240- return resolveIndex () ? indexNameExpressionResolver .concreteSingleIndex (state , request ).getName () : request .index ();
241- }
242-
243- private ClusterBlockException blockExceptions (final ClusterState state , final String indexName ) {
244- ClusterBlockLevel globalBlockLevel = globalBlockLevel ();
245- if (globalBlockLevel != null ) {
246- ClusterBlockException blockException = state .blocks ().globalBlockedException (globalBlockLevel );
247- if (blockException != null ) {
248- return blockException ;
249- }
250- }
251- ClusterBlockLevel indexBlockLevel = indexBlockLevel ();
252- if (indexBlockLevel != null ) {
253- ClusterBlockException blockException = state .blocks ().indexBlockedException (indexBlockLevel , indexName );
254- if (blockException != null ) {
255- return blockException ;
256- }
257- }
258- return null ;
259- }
260-
261239 protected boolean retryPrimaryException (final Throwable e ) {
262240 return e .getClass () == ReplicationOperation .RetryOnPrimaryException .class
263- || TransportActions .isShardNotAvailableException (e )
264- || isRetryableClusterBlockException (e );
265- }
266-
267- boolean isRetryableClusterBlockException (final Throwable e ) {
268- if (e instanceof ClusterBlockException ) {
269- return ((ClusterBlockException ) e ).retryable ();
270- }
271- return false ;
241+ || TransportActions .isShardNotAvailableException (e );
272242 }
273243
274244 protected class OperationTransportHandler implements TransportRequestHandler <Request > {
@@ -351,15 +321,6 @@ protected void doRun() throws Exception {
351321 @ Override
352322 public void onResponse (PrimaryShardReference primaryShardReference ) {
353323 try {
354- final ClusterState clusterState = clusterService .state ();
355- final IndexMetaData indexMetaData = clusterState .metaData ().getIndexSafe (primaryShardReference .routingEntry ().index ());
356-
357- final ClusterBlockException blockException = blockExceptions (clusterState , indexMetaData .getIndex ().getName ());
358- if (blockException != null ) {
359- logger .trace ("cluster is blocked, action failed on primary" , blockException );
360- throw blockException ;
361- }
362-
363324 if (primaryShardReference .isRelocated ()) {
364325 primaryShardReference .close (); // release shard operation lock as soon as possible
365326 setPhase (replicationTask , "primary_delegation" );
@@ -373,7 +334,7 @@ public void onResponse(PrimaryShardReference primaryShardReference) {
373334 response .readFrom (in );
374335 return response ;
375336 };
376- DiscoveryNode relocatingNode = clusterState .nodes ().get (primary .relocatingNodeId ());
337+ DiscoveryNode relocatingNode = clusterService . state () .nodes ().get (primary .relocatingNodeId ());
377338 transportService .sendRequest (relocatingNode , transportPrimaryAction ,
378339 new ConcreteShardRequest <>(request , primary .allocationId ().getRelocationId (), primaryTerm ),
379340 transportOptions ,
@@ -752,42 +713,35 @@ public void onFailure(Exception e) {
752713 protected void doRun () {
753714 setPhase (task , "routing" );
754715 final ClusterState state = observer .setAndGetObservedState ();
755- final String concreteIndex = concreteIndex (state , request );
756- final ClusterBlockException blockException = blockExceptions (state , concreteIndex );
757- if (blockException != null ) {
758- if (blockException .retryable ()) {
759- logger .trace ("cluster is blocked, scheduling a retry" , blockException );
760- retry (blockException );
761- } else {
762- finishAsFailed (blockException );
763- }
764- } else {
765- // request does not have a shardId yet, we need to pass the concrete index to resolve shardId
766- final IndexMetaData indexMetaData = state .metaData ().index (concreteIndex );
767- if (indexMetaData == null ) {
768- retry (new IndexNotFoundException (concreteIndex ));
769- return ;
770- }
771- if (indexMetaData .getState () == IndexMetaData .State .CLOSE ) {
772- throw new IndexClosedException (indexMetaData .getIndex ());
773- }
716+ if (handleBlockExceptions (state )) {
717+ return ;
718+ }
719+
720+ // request does not have a shardId yet, we need to pass the concrete index to resolve shardId
721+ final String concreteIndex = concreteIndex (state );
722+ final IndexMetaData indexMetaData = state .metaData ().index (concreteIndex );
723+ if (indexMetaData == null ) {
724+ retry (new IndexNotFoundException (concreteIndex ));
725+ return ;
726+ }
727+ if (indexMetaData .getState () == IndexMetaData .State .CLOSE ) {
728+ throw new IndexClosedException (indexMetaData .getIndex ());
729+ }
774730
775- // resolve all derived request fields, so we can route and apply it
776- resolveRequest (indexMetaData , request );
777- assert request .shardId () != null : "request shardId must be set in resolveRequest" ;
778- assert request .waitForActiveShards () != ActiveShardCount .DEFAULT :
779- "request waitForActiveShards must be set in resolveRequest" ;
731+ // resolve all derived request fields, so we can route and apply it
732+ resolveRequest (indexMetaData , request );
733+ assert request .shardId () != null : "request shardId must be set in resolveRequest" ;
734+ assert request .waitForActiveShards () != ActiveShardCount .DEFAULT : "request waitForActiveShards must be set in resolveRequest" ;
780735
781- final ShardRouting primary = primary (state );
782- if (retryIfUnavailable (state , primary )) {
783- return ;
784- }
785- final DiscoveryNode node = state .nodes ().get (primary .currentNodeId ());
786- if (primary .currentNodeId ().equals (state .nodes ().getLocalNodeId ())) {
787- performLocalAction (state , primary , node , indexMetaData );
788- } else {
789- performRemoteAction (state , primary , node );
790- }
736+ final ShardRouting primary = primary (state );
737+ if (retryIfUnavailable (state , primary )) {
738+ return ;
739+ }
740+ final DiscoveryNode node = state .nodes ().get (primary .currentNodeId ());
741+ if (primary .currentNodeId ().equals (state .nodes ().getLocalNodeId ())) {
742+ performLocalAction (state , primary , node , indexMetaData );
743+ } else {
744+ performRemoteAction (state , primary , node );
791745 }
792746 }
793747
@@ -839,11 +793,44 @@ private boolean retryIfUnavailable(ClusterState state, ShardRouting primary) {
839793 return false ;
840794 }
841795
796+ private String concreteIndex (ClusterState state ) {
797+ return resolveIndex () ? indexNameExpressionResolver .concreteSingleIndex (state , request ).getName () : request .index ();
798+ }
799+
842800 private ShardRouting primary (ClusterState state ) {
843801 IndexShardRoutingTable indexShard = state .getRoutingTable ().shardRoutingTable (request .shardId ());
844802 return indexShard .primaryShard ();
845803 }
846804
805+ private boolean handleBlockExceptions (ClusterState state ) {
806+ ClusterBlockLevel globalBlockLevel = globalBlockLevel ();
807+ if (globalBlockLevel != null ) {
808+ ClusterBlockException blockException = state .blocks ().globalBlockedException (globalBlockLevel );
809+ if (blockException != null ) {
810+ handleBlockException (blockException );
811+ return true ;
812+ }
813+ }
814+ ClusterBlockLevel indexBlockLevel = indexBlockLevel ();
815+ if (indexBlockLevel != null ) {
816+ ClusterBlockException blockException = state .blocks ().indexBlockedException (indexBlockLevel , concreteIndex (state ));
817+ if (blockException != null ) {
818+ handleBlockException (blockException );
819+ return true ;
820+ }
821+ }
822+ return false ;
823+ }
824+
825+ private void handleBlockException (ClusterBlockException blockException ) {
826+ if (blockException .retryable ()) {
827+ logger .trace ("cluster is blocked, scheduling a retry" , blockException );
828+ retry (blockException );
829+ } else {
830+ finishAsFailed (blockException );
831+ }
832+ }
833+
847834 private void performAction (final DiscoveryNode node , final String action , final boolean isPrimaryAction ,
848835 final TransportRequest requestToPerform ) {
849836 transportService .sendRequest (node , action , requestToPerform , transportOptions , new TransportResponseHandler <Response >() {
0 commit comments