2727import org .elasticsearch .action .ActionListener ;
2828import org .elasticsearch .action .UnavailableShardsException ;
2929import org .elasticsearch .action .support .ActiveShardCount ;
30+ import org .elasticsearch .action .support .RetryableAction ;
3031import org .elasticsearch .action .support .TransportActions ;
3132import org .elasticsearch .cluster .action .shard .ShardStateAction ;
3233import org .elasticsearch .cluster .routing .IndexShardRoutingTable ;
3334import org .elasticsearch .cluster .routing .ShardRouting ;
3435import org .elasticsearch .common .Nullable ;
36+ import org .elasticsearch .common .breaker .CircuitBreakingException ;
3537import org .elasticsearch .common .io .stream .StreamInput ;
38+ import org .elasticsearch .common .unit .TimeValue ;
39+ import org .elasticsearch .common .util .concurrent .EsRejectedExecutionException ;
3640import org .elasticsearch .index .seqno .SequenceNumbers ;
3741import org .elasticsearch .index .shard .ReplicationGroup ;
3842import org .elasticsearch .index .shard .ShardId ;
3943import org .elasticsearch .node .NodeClosedException ;
4044import org .elasticsearch .rest .RestStatus ;
45+ import org .elasticsearch .threadpool .ThreadPool ;
46+ import org .elasticsearch .transport .ConnectTransportException ;
4147
4248import java .io .IOException ;
4349import java .util .ArrayList ;
@@ -54,6 +60,7 @@ public class ReplicationOperation<
5460 PrimaryResultT extends ReplicationOperation .PrimaryResult <ReplicaRequest >
5561 > {
5662 private final Logger logger ;
63+ private final ThreadPool threadPool ;
5764 private final Request request ;
5865 private final String opType ;
5966 private final AtomicInteger totalShards = new AtomicInteger ();
@@ -72,6 +79,8 @@ public class ReplicationOperation<
7279 private final Primary <Request , ReplicaRequest , PrimaryResultT > primary ;
7380 private final Replicas <ReplicaRequest > replicasProxy ;
7481 private final AtomicBoolean finished = new AtomicBoolean ();
82+ private final TimeValue initialRetryBackoffBound ;
83+ private final TimeValue retryTimeout ;
7584 private final long primaryTerm ;
7685
7786 // exposed for tests
@@ -84,14 +93,18 @@ public class ReplicationOperation<
8493 public ReplicationOperation (Request request , Primary <Request , ReplicaRequest , PrimaryResultT > primary ,
8594 ActionListener <PrimaryResultT > listener ,
8695 Replicas <ReplicaRequest > replicas ,
87- Logger logger , String opType , long primaryTerm ) {
96+ Logger logger , ThreadPool threadPool , String opType , long primaryTerm , TimeValue initialRetryBackoffBound ,
97+ TimeValue retryTimeout ) {
8898 this .replicasProxy = replicas ;
8999 this .primary = primary ;
90100 this .resultListener = listener ;
91101 this .logger = logger ;
102+ this .threadPool = threadPool ;
92103 this .request = request ;
93104 this .opType = opType ;
94105 this .primaryTerm = primaryTerm ;
106+ this .initialRetryBackoffBound = initialRetryBackoffBound ;
107+ this .retryTimeout = retryTimeout ;
95108 }
96109
97110 public void execute () throws Exception {
@@ -130,8 +143,9 @@ private void handlePrimaryResult(final PrimaryResultT primaryResult) {
130143 final long maxSeqNoOfUpdatesOrDeletes = primary .maxSeqNoOfUpdatesOrDeletes ();
131144 assert maxSeqNoOfUpdatesOrDeletes != SequenceNumbers .UNASSIGNED_SEQ_NO : "seqno_of_updates still uninitialized" ;
132145 final ReplicationGroup replicationGroup = primary .getReplicationGroup ();
146+ final PendingReplicationActions pendingReplicationActions = primary .getPendingReplicationActions ();
133147 markUnavailableShardsAsStale (replicaRequest , replicationGroup );
134- performOnReplicas (replicaRequest , globalCheckpoint , maxSeqNoOfUpdatesOrDeletes , replicationGroup );
148+ performOnReplicas (replicaRequest , globalCheckpoint , maxSeqNoOfUpdatesOrDeletes , replicationGroup , pendingReplicationActions );
135149 }
136150 primaryResult .runPostReplicationActions (new ActionListener <Void >() {
137151
@@ -165,7 +179,8 @@ private void markUnavailableShardsAsStale(ReplicaRequest replicaRequest, Replica
165179 }
166180
167181 private void performOnReplicas (final ReplicaRequest replicaRequest , final long globalCheckpoint ,
168- final long maxSeqNoOfUpdatesOrDeletes , final ReplicationGroup replicationGroup ) {
182+ final long maxSeqNoOfUpdatesOrDeletes , final ReplicationGroup replicationGroup ,
183+ final PendingReplicationActions pendingReplicationActions ) {
169184 // for total stats, add number of unassigned shards and
170185 // number of initializing shards that are not ready yet to receive operations (recovery has not opened engine yet on the target)
171186 totalShards .addAndGet (replicationGroup .getSkippedShards ().size ());
@@ -174,52 +189,78 @@ private void performOnReplicas(final ReplicaRequest replicaRequest, final long g
174189
175190 for (final ShardRouting shard : replicationGroup .getReplicationTargets ()) {
176191 if (shard .isSameAllocation (primaryRouting ) == false ) {
177- performOnReplica (shard , replicaRequest , globalCheckpoint , maxSeqNoOfUpdatesOrDeletes );
192+ performOnReplica (shard , replicaRequest , globalCheckpoint , maxSeqNoOfUpdatesOrDeletes , pendingReplicationActions );
178193 }
179194 }
180195 }
181196
182197 private void performOnReplica (final ShardRouting shard , final ReplicaRequest replicaRequest ,
183- final long globalCheckpoint , final long maxSeqNoOfUpdatesOrDeletes ) {
198+ final long globalCheckpoint , final long maxSeqNoOfUpdatesOrDeletes ,
199+ final PendingReplicationActions pendingReplicationActions ) {
184200 if (logger .isTraceEnabled ()) {
185201 logger .trace ("[{}] sending op [{}] to replica {} for request [{}]" , shard .shardId (), opType , shard , replicaRequest );
186202 }
187-
188203 totalShards .incrementAndGet ();
189204 pendingActions .incrementAndGet ();
190- replicasProxy .performOn (shard , replicaRequest , primaryTerm , globalCheckpoint , maxSeqNoOfUpdatesOrDeletes ,
191- new ActionListener <ReplicaResponse >() {
192- @ Override
193- public void onResponse (ReplicaResponse response ) {
194- successfulShards .incrementAndGet ();
195- try {
196- updateCheckPoints (shard , response ::localCheckpoint , response ::globalCheckpoint );
197- } finally {
198- decPendingAndFinishIfNeeded ();
199- }
205+ final ActionListener <ReplicaResponse > replicationListener = new ActionListener <ReplicaResponse >() {
206+ @ Override
207+ public void onResponse (ReplicaResponse response ) {
208+ successfulShards .incrementAndGet ();
209+ try {
210+ updateCheckPoints (shard , response ::localCheckpoint , response ::globalCheckpoint );
211+ } finally {
212+ decPendingAndFinishIfNeeded ();
200213 }
214+ }
201215
202- @ Override
203- public void onFailure (Exception replicaException ) {
204- logger .trace (() -> new ParameterizedMessage (
205- "[{}] failure while performing [{}] on replica {}, request [{}]" ,
206- shard .shardId (), opType , shard , replicaRequest ), replicaException );
207- // Only report "critical" exceptions - TODO: Reach out to the master node to get the latest shard state then report.
208- if (TransportActions .isShardNotAvailableException (replicaException ) == false ) {
209- RestStatus restStatus = ExceptionsHelper .status (replicaException );
210- shardReplicaFailures .add (new ReplicationResponse .ShardInfo .Failure (
211- shard .shardId (), shard .currentNodeId (), replicaException , restStatus , false ));
212- }
213- String message = String .format (Locale .ROOT , "failed to perform %s on replica %s" , opType , shard );
214- replicasProxy .failShardIfNeeded (shard , primaryTerm , message , replicaException ,
215- ActionListener .wrap (r -> decPendingAndFinishIfNeeded (), ReplicationOperation .this ::onNoLongerPrimary ));
216+ @ Override
217+ public void onFailure (Exception replicaException ) {
218+ logger .trace (() -> new ParameterizedMessage (
219+ "[{}] failure while performing [{}] on replica {}, request [{}]" ,
220+ shard .shardId (), opType , shard , replicaRequest ), replicaException );
221+ // Only report "critical" exceptions - TODO: Reach out to the master node to get the latest shard state then report.
222+ if (TransportActions .isShardNotAvailableException (replicaException ) == false ) {
223+ RestStatus restStatus = ExceptionsHelper .status (replicaException );
224+ shardReplicaFailures .add (new ReplicationResponse .ShardInfo .Failure (
225+ shard .shardId (), shard .currentNodeId (), replicaException , restStatus , false ));
216226 }
227+ String message = String .format (Locale .ROOT , "failed to perform %s on replica %s" , opType , shard );
228+ replicasProxy .failShardIfNeeded (shard , primaryTerm , message , replicaException ,
229+ ActionListener .wrap (r -> decPendingAndFinishIfNeeded (), ReplicationOperation .this ::onNoLongerPrimary ));
230+ }
217231
218- @ Override
219- public String toString () {
220- return "[" + replicaRequest + "][" + shard + "]" ;
221- }
222- });
232+ @ Override
233+ public String toString () {
234+ return "[" + replicaRequest + "][" + shard + "]" ;
235+ }
236+ };
237+
238+ final String allocationId = shard .allocationId ().getId ();
239+ final RetryableAction <ReplicaResponse > replicationAction = new RetryableAction <ReplicaResponse >(logger , threadPool ,
240+ initialRetryBackoffBound , retryTimeout , replicationListener ) {
241+
242+ @ Override
243+ public void tryAction (ActionListener <ReplicaResponse > listener ) {
244+ replicasProxy .performOn (shard , replicaRequest , primaryTerm , globalCheckpoint , maxSeqNoOfUpdatesOrDeletes , listener );
245+ }
246+
247+ @ Override
248+ public void onFinished () {
249+ super .onFinished ();
250+ pendingReplicationActions .removeReplicationAction (allocationId , this );
251+ }
252+
253+ @ Override
254+ public boolean shouldRetry (Exception e ) {
255+ final Throwable cause = ExceptionsHelper .unwrapCause (e );
256+ return cause instanceof CircuitBreakingException ||
257+ cause instanceof EsRejectedExecutionException ||
258+ cause instanceof ConnectTransportException ;
259+ }
260+ };
261+
262+ pendingReplicationActions .addPendingAction (allocationId , replicationAction );
263+ replicationAction .run ();
223264 }
224265
225266 private void updateCheckPoints (ShardRouting shard , LongSupplier localCheckpointSupplier , LongSupplier globalCheckpointSupplier ) {
@@ -396,6 +437,13 @@ public interface Primary<
396437 * @return the replication group
397438 */
398439 ReplicationGroup getReplicationGroup ();
440+
441+ /**
442+ * Returns the pending replication actions on the primary shard
443+ *
444+ * @return the pending replication actions
445+ */
446+ PendingReplicationActions getPendingReplicationActions ();
399447 }
400448
401449 /**
0 commit comments