Skip to content

Commit aba52cc

Browse files
committed
Fail replica shards locally upon failures
When a replication operation (index/delete/update) fails to be executed properly, we fail the replica and allow master to allocate a new copy of it. At the moment, the node hosting the primary shard is responsible of notifying the master of a failed replica. However, if the replica shard is initializing (`POST_RECOVERY` state), we have a racing condition between the failed shard message and moving the shard into the `STARTED` state. If the latter happen first, master will fail to resolve the fail shard message. This commit builds on #5800 and fails the engine of the replica shard if a replication operation fails. This protects us against the above as the shard will reject the `STARTED` command from master. It also makes us more resilient to other racing conditions in this area. Closes #5847
1 parent b18114b commit aba52cc

File tree

8 files changed

+108
-70
lines changed

8 files changed

+108
-70
lines changed

src/main/java/org/elasticsearch/action/support/replication/TransportIndexReplicationOperationAction.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import com.google.common.collect.Lists;
2323
import org.elasticsearch.ElasticsearchException;
2424
import org.elasticsearch.action.ActionListener;
25-
import org.elasticsearch.action.ActionRequest;
2625
import org.elasticsearch.action.ActionResponse;
2726
import org.elasticsearch.action.ShardOperationFailedException;
2827
import org.elasticsearch.action.support.DefaultShardOperationFailedException;
@@ -46,7 +45,7 @@
4645
/**
4746
*
4847
*/
49-
public abstract class TransportIndexReplicationOperationAction<Request extends IndexReplicationOperationRequest, Response extends ActionResponse, ShardRequest extends ShardReplicationOperationRequest, ShardReplicaRequest extends ActionRequest, ShardResponse extends ActionResponse>
48+
public abstract class TransportIndexReplicationOperationAction<Request extends IndexReplicationOperationRequest, Response extends ActionResponse, ShardRequest extends ShardReplicationOperationRequest, ShardReplicaRequest extends ShardReplicationOperationRequest, ShardResponse extends ActionResponse>
5049
extends TransportAction<Request, Response> {
5150

5251
protected final ClusterService clusterService;

src/main/java/org/elasticsearch/action/support/replication/TransportIndicesReplicationOperationAction.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121

2222
import org.elasticsearch.ElasticsearchException;
2323
import org.elasticsearch.action.ActionListener;
24-
import org.elasticsearch.action.ActionRequest;
2524
import org.elasticsearch.action.ActionResponse;
2625
import org.elasticsearch.action.support.TransportAction;
2726
import org.elasticsearch.cluster.ClusterService;
@@ -42,7 +41,7 @@
4241
/**
4342
*/
4443
public abstract class TransportIndicesReplicationOperationAction<Request extends IndicesReplicationOperationRequest, Response extends ActionResponse, IndexRequest extends IndexReplicationOperationRequest, IndexResponse extends ActionResponse,
45-
ShardRequest extends ShardReplicationOperationRequest, ShardReplicaRequest extends ActionRequest, ShardResponse extends ActionResponse>
44+
ShardRequest extends ShardReplicationOperationRequest, ShardReplicaRequest extends ShardReplicationOperationRequest, ShardResponse extends ActionResponse>
4645
extends TransportAction<Request, Response> {
4746

4847
protected final ClusterService clusterService;

src/main/java/org/elasticsearch/action/support/replication/TransportShardReplicationOperationAction.java

Lines changed: 36 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,10 @@
2222
import org.elasticsearch.ElasticsearchException;
2323
import org.elasticsearch.ElasticsearchIllegalStateException;
2424
import org.elasticsearch.ExceptionsHelper;
25-
import org.elasticsearch.action.*;
25+
import org.elasticsearch.action.ActionListener;
26+
import org.elasticsearch.action.ActionResponse;
27+
import org.elasticsearch.action.UnavailableShardsException;
28+
import org.elasticsearch.action.WriteConsistencyLevel;
2629
import org.elasticsearch.action.support.TransportAction;
2730
import org.elasticsearch.action.support.TransportActions;
2831
import org.elasticsearch.cluster.ClusterChangedEvent;
@@ -44,6 +47,8 @@
4447
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
4548
import org.elasticsearch.index.engine.DocumentAlreadyExistsException;
4649
import org.elasticsearch.index.engine.VersionConflictEngineException;
50+
import org.elasticsearch.index.service.IndexService;
51+
import org.elasticsearch.index.shard.service.IndexShard;
4752
import org.elasticsearch.indices.IndicesService;
4853
import org.elasticsearch.node.NodeClosedException;
4954
import org.elasticsearch.rest.RestStatus;
@@ -54,11 +59,9 @@
5459
import java.util.concurrent.atomic.AtomicBoolean;
5560
import java.util.concurrent.atomic.AtomicInteger;
5661

57-
import static org.elasticsearch.ExceptionsHelper.detailedMessage;
58-
5962
/**
6063
*/
61-
public abstract class TransportShardReplicationOperationAction<Request extends ShardReplicationOperationRequest, ReplicaRequest extends ActionRequest, Response extends ActionResponse> extends TransportAction<Request, Response> {
64+
public abstract class TransportShardReplicationOperationAction<Request extends ShardReplicationOperationRequest, ReplicaRequest extends ShardReplicationOperationRequest, Response extends ActionResponse> extends TransportAction<Request, Response> {
6265

6366
protected final TransportService transportService;
6467
protected final ClusterService clusterService;
@@ -242,7 +245,12 @@ public boolean isForceExecution() {
242245

243246
@Override
244247
public void messageReceived(final ReplicaOperationRequest request, final TransportChannel channel) throws Exception {
245-
shardOperationOnReplica(request);
248+
try {
249+
shardOperationOnReplica(request);
250+
} catch (Throwable t) {
251+
failReplicaIfNeeded(request.request.index(), request.shardId, t);
252+
throw t;
253+
}
246254
channel.sendResponse(TransportResponse.Empty.INSTANCE);
247255
}
248256
}
@@ -700,7 +708,7 @@ void performOnReplica(final PrimaryResponse<Response, ReplicaRequest> response,
700708

701709
final ReplicaOperationRequest shardRequest = new ReplicaOperationRequest(shardIt.shardId().id(), response.replicaRequest());
702710
if (!nodeId.equals(clusterState.nodes().localNodeId())) {
703-
DiscoveryNode node = clusterState.nodes().get(nodeId);
711+
final DiscoveryNode node = clusterState.nodes().get(nodeId);
704712
transportService.sendRequest(node, transportReplicaAction, shardRequest, transportOptions, new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
705713
@Override
706714
public void handleResponse(TransportResponse.Empty vResponse) {
@@ -710,9 +718,9 @@ public void handleResponse(TransportResponse.Empty vResponse) {
710718
@Override
711719
public void handleException(TransportException exp) {
712720
if (!ignoreReplicaException(exp.unwrapCause())) {
713-
logger.warn("Failed to perform " + transportAction + " on replica " + shardIt.shardId(), exp);
721+
logger.warn("Failed to perform " + transportAction + " on remote replica " + node + shardIt.shardId(), exp);
714722
shardStateAction.shardFailed(shard, indexMetaData.getUUID(),
715-
"Failed to perform [" + transportAction + "] on replica, message [" + detailedMessage(exp) + "]");
723+
"Failed to perform [" + transportAction + "] on replica, message [" + ExceptionsHelper.detailedMessage(exp) + "]");
716724
}
717725
finishIfPossible();
718726
}
@@ -733,11 +741,7 @@ public void run() {
733741
try {
734742
shardOperationOnReplica(shardRequest);
735743
} catch (Throwable e) {
736-
if (!ignoreReplicaException(e)) {
737-
logger.warn("Failed to perform " + transportAction + " on replica " + shardIt.shardId(), e);
738-
shardStateAction.shardFailed(shard, indexMetaData.getUUID(),
739-
"Failed to perform [" + transportAction + "] on replica, message [" + detailedMessage(e) + "]");
740-
}
744+
failReplicaIfNeeded(shard.index(), shard.id(), e);
741745
}
742746
if (counter.decrementAndGet() == 0) {
743747
listener.onResponse(response.response());
@@ -751,11 +755,7 @@ public boolean isForceExecution() {
751755
}
752756
});
753757
} catch (Throwable e) {
754-
if (!ignoreReplicaException(e)) {
755-
logger.warn("Failed to perform " + transportAction + " on replica " + shardIt.shardId(), e);
756-
shardStateAction.shardFailed(shard, indexMetaData.getUUID(),
757-
"Failed to perform [" + transportAction + "] on replica, message [" + detailedMessage(e) + "]");
758-
}
758+
failReplicaIfNeeded(shard.index(), shard.id(), e);
759759
// we want to decrement the counter here, in teh failure handling, cause we got rejected
760760
// from executing on the thread pool
761761
if (counter.decrementAndGet() == 0) {
@@ -766,18 +766,31 @@ public boolean isForceExecution() {
766766
try {
767767
shardOperationOnReplica(shardRequest);
768768
} catch (Throwable e) {
769-
if (!ignoreReplicaException(e)) {
770-
logger.warn("Failed to perform " + transportAction + " on replica" + shardIt.shardId(), e);
771-
shardStateAction.shardFailed(shard, indexMetaData.getUUID(),
772-
"Failed to perform [" + transportAction + "] on replica, message [" + detailedMessage(e) + "]");
773-
}
769+
failReplicaIfNeeded(shard.index(), shard.id(), e);
774770
}
775771
if (counter.decrementAndGet() == 0) {
776772
listener.onResponse(response.response());
777773
}
778774
}
779775
}
780776
}
777+
778+
}
779+
780+
private void failReplicaIfNeeded(String index, int shardId, Throwable t) {
781+
if (!ignoreReplicaException(t)) {
782+
IndexService indexService = indicesService.indexService(index);
783+
if (indexService == null) {
784+
logger.debug("ignoring failed replica [{}][{}] because index was already removed.", index, shardId);
785+
return;
786+
}
787+
IndexShard indexShard = indexService.shard(shardId);
788+
if (indexShard == null) {
789+
logger.debug("ignoring failed replica [{}][{}] because index was already removed.", index, shardId);
790+
return;
791+
}
792+
indexShard.failShard(transportAction + " failed on replica", t);
793+
}
781794
}
782795

783796
public static class PrimaryResponse<Response, ReplicaRequest> {

src/main/java/org/elasticsearch/index/engine/Engine.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -134,8 +134,11 @@ public interface Engine extends IndexShardComponent, CloseableComponent {
134134

135135
void recover(RecoveryHandler recoveryHandler) throws EngineException;
136136

137+
/** fail engine due to some error. the engine will also be closed. */
138+
void failEngine(String reason, @Nullable Throwable failure);
139+
137140
static interface FailedEngineListener {
138-
void onFailedEngine(ShardId shardId, Throwable t);
141+
void onFailedEngine(ShardId shardId, String reason, @Nullable Throwable t);
139142
}
140143

141144
/**

0 commit comments

Comments
 (0)