Skip to content

Commit 69b21fe

Browse files
committed
Merge pull request #15748 from jasontedor/shard-failure-no-master-retry
Wait for new master when failing shard Relates #14252
2 parents 3d329e3 + 386d2ab commit 69b21fe

File tree

5 files changed

+316
-120
lines changed

5 files changed

+316
-120
lines changed

core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java

Lines changed: 23 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,6 @@
6464
import org.elasticsearch.transport.BaseTransportResponseHandler;
6565
import org.elasticsearch.transport.ConnectTransportException;
6666
import org.elasticsearch.transport.EmptyTransportResponseHandler;
67-
import org.elasticsearch.transport.ReceiveTimeoutTransportException;
6867
import org.elasticsearch.transport.TransportChannel;
6968
import org.elasticsearch.transport.TransportChannelResponseHandler;
7069
import org.elasticsearch.transport.TransportException;
@@ -76,6 +75,7 @@
7675
import java.io.IOException;
7776
import java.util.Collections;
7877
import java.util.List;
78+
import java.util.Locale;
7979
import java.util.Map;
8080
import java.util.concurrent.ConcurrentMap;
8181
import java.util.concurrent.atomic.AtomicBoolean;
@@ -92,16 +92,13 @@
9292
*/
9393
public abstract class TransportReplicationAction<Request extends ReplicationRequest, ReplicaRequest extends ReplicationRequest, Response extends ReplicationResponse> extends TransportAction<Request, Response> {
9494

95-
public static final String SHARD_FAILURE_TIMEOUT = "action.support.replication.shard.failure_timeout";
96-
9795
protected final TransportService transportService;
9896
protected final ClusterService clusterService;
9997
protected final IndicesService indicesService;
10098
protected final ShardStateAction shardStateAction;
10199
protected final WriteConsistencyLevel defaultWriteConsistencyLevel;
102100
protected final TransportRequestOptions transportOptions;
103101
protected final MappingUpdatedAction mappingUpdatedAction;
104-
private final TimeValue shardFailedTimeout;
105102

106103
final String transportReplicaAction;
107104
final String transportPrimaryAction;
@@ -133,8 +130,6 @@ protected TransportReplicationAction(Settings settings, String actionName, Trans
133130
this.transportOptions = transportOptions();
134131

135132
this.defaultWriteConsistencyLevel = WriteConsistencyLevel.fromString(settings.get("action.write_consistency", "quorum"));
136-
// TODO: set a default timeout
137-
shardFailedTimeout = settings.getAsTime(SHARD_FAILURE_TIMEOUT, null);
138133
}
139134

140135
@Override
@@ -608,7 +603,7 @@ protected void doRun() throws Exception {
608603
if (logger.isTraceEnabled()) {
609604
logger.trace("action [{}] completed on shard [{}] for request [{}] with cluster state version [{}]", transportPrimaryAction, shardId, request, state.version());
610605
}
611-
replicationPhase = new ReplicationPhase(primaryResponse.v2(), primaryResponse.v1(), shardId, channel, indexShardReference, shardFailedTimeout);
606+
replicationPhase = new ReplicationPhase(primaryResponse.v2(), primaryResponse.v1(), shardId, channel, indexShardReference);
612607
} catch (Throwable e) {
613608
if (ExceptionsHelper.status(e) == RestStatus.CONFLICT) {
614609
if (logger.isTraceEnabled()) {
@@ -732,15 +727,13 @@ final class ReplicationPhase extends AbstractRunnable {
732727
private final AtomicInteger pending;
733728
private final int totalShards;
734729
private final Releasable indexShardReference;
735-
private final TimeValue shardFailedTimeout;
736730

737731
public ReplicationPhase(ReplicaRequest replicaRequest, Response finalResponse, ShardId shardId,
738-
TransportChannel channel, Releasable indexShardReference, TimeValue shardFailedTimeout) {
732+
TransportChannel channel, Releasable indexShardReference) {
739733
this.replicaRequest = replicaRequest;
740734
this.channel = channel;
741735
this.finalResponse = finalResponse;
742736
this.indexShardReference = indexShardReference;
743-
this.shardFailedTimeout = shardFailedTimeout;
744737
this.shardId = shardId;
745738

746739
// we have to get a new state after successfully indexing into the primary in order to honour recovery semantics.
@@ -882,15 +875,32 @@ public void handleException(TransportException exp) {
882875
if (ignoreReplicaException(exp)) {
883876
onReplicaFailure(nodeId, exp);
884877
} else {
885-
logger.warn("{} failed to perform {} on node {}", exp, shardId, transportReplicaAction, node);
886-
shardStateAction.shardFailed(clusterService.state(), shard, indexUUID, "failed to perform " + transportReplicaAction + " on replica on node " + node, exp, shardFailedTimeout, new ReplicationFailedShardStateListener(nodeId, exp));
878+
String message = String.format(Locale.ROOT, "failed to perform %s on replica on node %s", transportReplicaAction, node);
879+
logger.warn("{} {}", exp, shardId, message);
880+
shardStateAction.shardFailed(
881+
shard,
882+
indexUUID,
883+
message,
884+
exp,
885+
new ShardStateAction.Listener() {
886+
@Override
887+
public void onSuccess() {
888+
onReplicaFailure(nodeId, exp);
889+
}
890+
891+
@Override
892+
public void onShardFailedFailure(Exception e) {
893+
// TODO: handle catastrophic non-channel failures
894+
onReplicaFailure(nodeId, exp);
895+
}
896+
}
897+
);
887898
}
888899
}
889900
}
890901
);
891902
}
892903

893-
894904
void onReplicaFailure(String nodeId, @Nullable Throwable e) {
895905
// Only version conflict should be ignored from being put into the _shards header?
896906
if (e != null && ignoreReplicaException(e) == false) {
@@ -955,34 +965,6 @@ private void doFinish() {
955965
}
956966
}
957967
}
958-
959-
public class ReplicationFailedShardStateListener implements ShardStateAction.Listener {
960-
private final String nodeId;
961-
private Throwable failure;
962-
963-
public ReplicationFailedShardStateListener(String nodeId, Throwable failure) {
964-
this.nodeId = nodeId;
965-
this.failure = failure;
966-
}
967-
968-
@Override
969-
public void onSuccess() {
970-
onReplicaFailure(nodeId, failure);
971-
}
972-
973-
@Override
974-
public void onShardFailedNoMaster() {
975-
onReplicaFailure(nodeId, failure);
976-
}
977-
978-
@Override
979-
public void onShardFailedFailure(DiscoveryNode master, TransportException e) {
980-
if (e instanceof ReceiveTimeoutTransportException) {
981-
logger.trace("timeout sending shard failure to master [{}]", e, master);
982-
}
983-
onReplicaFailure(nodeId, failure);
984-
}
985-
}
986968
}
987969

988970
/**

core/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java

Lines changed: 89 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,11 @@
2222
import org.elasticsearch.ExceptionsHelper;
2323
import org.elasticsearch.cluster.ClusterService;
2424
import org.elasticsearch.cluster.ClusterState;
25+
import org.elasticsearch.cluster.ClusterStateObserver;
2526
import org.elasticsearch.cluster.ClusterStateTaskConfig;
2627
import org.elasticsearch.cluster.ClusterStateTaskExecutor;
2728
import org.elasticsearch.cluster.ClusterStateTaskListener;
29+
import org.elasticsearch.cluster.MasterNodeChangePredicate;
2830
import org.elasticsearch.cluster.NotMasterException;
2931
import org.elasticsearch.cluster.metadata.IndexMetaData;
3032
import org.elasticsearch.cluster.node.DiscoveryNode;
@@ -42,73 +44,118 @@
4244
import org.elasticsearch.common.logging.ESLogger;
4345
import org.elasticsearch.common.settings.Settings;
4446
import org.elasticsearch.common.unit.TimeValue;
47+
import org.elasticsearch.discovery.Discovery;
48+
import org.elasticsearch.node.NodeClosedException;
4549
import org.elasticsearch.threadpool.ThreadPool;
4650
import org.elasticsearch.transport.EmptyTransportResponseHandler;
51+
import org.elasticsearch.transport.NodeDisconnectedException;
4752
import org.elasticsearch.transport.TransportChannel;
4853
import org.elasticsearch.transport.TransportException;
4954
import org.elasticsearch.transport.TransportRequest;
5055
import org.elasticsearch.transport.TransportRequestHandler;
51-
import org.elasticsearch.transport.TransportRequestOptions;
5256
import org.elasticsearch.transport.TransportResponse;
5357
import org.elasticsearch.transport.TransportService;
5458

5559
import java.io.IOException;
5660
import java.util.ArrayList;
61+
import java.util.Arrays;
62+
import java.util.HashSet;
5763
import java.util.List;
5864
import java.util.Locale;
65+
import java.util.Set;
5966

6067
import static org.elasticsearch.cluster.routing.ShardRouting.readShardRoutingEntry;
6168

6269
public class ShardStateAction extends AbstractComponent {
70+
6371
public static final String SHARD_STARTED_ACTION_NAME = "internal:cluster/shard/started";
6472
public static final String SHARD_FAILED_ACTION_NAME = "internal:cluster/shard/failure";
6573

6674
private final TransportService transportService;
75+
private final ClusterService clusterService;
6776

6877
@Inject
6978
public ShardStateAction(Settings settings, ClusterService clusterService, TransportService transportService,
7079
AllocationService allocationService, RoutingService routingService) {
7180
super(settings);
7281
this.transportService = transportService;
82+
this.clusterService = clusterService;
7383

7484
transportService.registerRequestHandler(SHARD_STARTED_ACTION_NAME, ShardRoutingEntry::new, ThreadPool.Names.SAME, new ShardStartedTransportHandler(clusterService, new ShardStartedClusterStateTaskExecutor(allocationService, logger), logger));
7585
transportService.registerRequestHandler(SHARD_FAILED_ACTION_NAME, ShardRoutingEntry::new, ThreadPool.Names.SAME, new ShardFailedTransportHandler(clusterService, new ShardFailedClusterStateTaskExecutor(allocationService, routingService, logger), logger));
7686
}
7787

78-
public void shardFailed(final ClusterState clusterState, final ShardRouting shardRouting, final String indexUUID, final String message, @Nullable final Throwable failure, Listener listener) {
79-
shardFailed(clusterState, shardRouting, indexUUID, message, failure, null, listener);
88+
public void shardFailed(final ShardRouting shardRouting, final String indexUUID, final String message, @Nullable final Throwable failure, Listener listener) {
89+
ClusterStateObserver observer = new ClusterStateObserver(clusterService, null, logger);
90+
ShardRoutingEntry shardRoutingEntry = new ShardRoutingEntry(shardRouting, indexUUID, message, failure);
91+
sendShardFailed(observer, shardRoutingEntry, listener);
8092
}
8193

82-
public void resendShardFailed(final ClusterState clusterState, final ShardRouting shardRouting, final String indexUUID, final String message, @Nullable final Throwable failure, Listener listener) {
94+
public void resendShardFailed(final ShardRouting shardRouting, final String indexUUID, final String message, @Nullable final Throwable failure, Listener listener) {
8395
logger.trace("{} re-sending failed shard [{}], index UUID [{}], reason [{}]", shardRouting.shardId(), failure, shardRouting, indexUUID, message);
84-
shardFailed(clusterState, shardRouting, indexUUID, message, failure, listener);
96+
shardFailed(shardRouting, indexUUID, message, failure, listener);
8597
}
8698

87-
public void shardFailed(final ClusterState clusterState, final ShardRouting shardRouting, final String indexUUID, final String message, @Nullable final Throwable failure, TimeValue timeout, Listener listener) {
88-
DiscoveryNode masterNode = clusterState.nodes().masterNode();
99+
private void sendShardFailed(ClusterStateObserver observer, ShardRoutingEntry shardRoutingEntry, Listener listener) {
100+
DiscoveryNode masterNode = observer.observedState().nodes().masterNode();
89101
if (masterNode == null) {
90-
logger.warn("{} no master known to fail shard [{}]", shardRouting.shardId(), shardRouting);
91-
listener.onShardFailedNoMaster();
92-
return;
93-
}
94-
ShardRoutingEntry shardRoutingEntry = new ShardRoutingEntry(shardRouting, indexUUID, message, failure);
95-
TransportRequestOptions options = TransportRequestOptions.EMPTY;
96-
if (timeout != null) {
97-
options = TransportRequestOptions.builder().withTimeout(timeout).build();
102+
logger.warn("{} no master known to fail shard [{}]", shardRoutingEntry.getShardRouting().shardId(), shardRoutingEntry.getShardRouting());
103+
waitForNewMasterAndRetry(observer, shardRoutingEntry, listener);
104+
} else {
105+
transportService.sendRequest(masterNode,
106+
SHARD_FAILED_ACTION_NAME, shardRoutingEntry, new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
107+
@Override
108+
public void handleResponse(TransportResponse.Empty response) {
109+
listener.onSuccess();
110+
}
111+
112+
@Override
113+
public void handleException(TransportException exp) {
114+
assert exp.getCause() != null : exp;
115+
if (isMasterChannelException(exp.getCause())) {
116+
waitForNewMasterAndRetry(observer, shardRoutingEntry, listener);
117+
} else {
118+
logger.warn("{} unexpected failure while sending request to [{}] to fail shard [{}]", exp, shardRoutingEntry.getShardRouting().shardId(), masterNode, shardRoutingEntry);
119+
listener.onShardFailedFailure(exp);
120+
}
121+
}
122+
});
98123
}
99-
transportService.sendRequest(masterNode,
100-
SHARD_FAILED_ACTION_NAME, shardRoutingEntry, options, new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
101-
@Override
102-
public void handleResponse(TransportResponse.Empty response) {
103-
listener.onSuccess();
104-
}
124+
}
105125

106-
@Override
107-
public void handleException(TransportException exp) {
108-
logger.warn("{} unexpected failure while sending request to [{}] to fail shard [{}]", exp, shardRoutingEntry.shardRouting.shardId(), masterNode, shardRoutingEntry);
109-
listener.onShardFailedFailure(masterNode, exp);
126+
private static Set<Class<?>> MASTER_CHANNEL_EXCEPTIONS =
127+
new HashSet<>(Arrays.asList(
128+
NotMasterException.class,
129+
NodeDisconnectedException.class,
130+
Discovery.FailedToCommitClusterStateException.class
131+
));
132+
private static boolean isMasterChannelException(Throwable cause) {
133+
return MASTER_CHANNEL_EXCEPTIONS.contains(cause.getClass());
134+
}
135+
136+
// visible for testing
137+
protected void waitForNewMasterAndRetry(ClusterStateObserver observer, ShardRoutingEntry shardRoutingEntry, Listener listener) {
138+
observer.waitForNextChange(new ClusterStateObserver.Listener() {
139+
@Override
140+
public void onNewClusterState(ClusterState state) {
141+
if (logger.isTraceEnabled()) {
142+
logger.trace("new cluster state [{}] after waiting for master election to fail shard [{}]", shardRoutingEntry.getShardRouting().shardId(), state.prettyPrint(), shardRoutingEntry);
110143
}
111-
});
144+
sendShardFailed(observer, shardRoutingEntry, listener);
145+
}
146+
147+
@Override
148+
public void onClusterServiceClose() {
149+
logger.warn("{} node closed while handling failed shard [{}]", shardRoutingEntry.failure, shardRoutingEntry.getShardRouting().getId(), shardRoutingEntry.getShardRouting());
150+
listener.onShardFailedFailure(new NodeClosedException(clusterService.localNode()));
151+
}
152+
153+
@Override
154+
public void onTimeout(TimeValue timeout) {
155+
// we wait indefinitely for a new master
156+
assert false;
157+
}
158+
}, MasterNodeChangePredicate.INSTANCE);
112159
}
113160

114161
private static class ShardFailedTransportHandler implements TransportRequestHandler<ShardRoutingEntry> {
@@ -334,10 +381,22 @@ public interface Listener {
334381
default void onSuccess() {
335382
}
336383

337-
default void onShardFailedNoMaster() {
338-
}
339-
340-
default void onShardFailedFailure(final DiscoveryNode master, final TransportException e) {
384+
/**
385+
* Notification for non-channel exceptions that are not handled
386+
* by {@link ShardStateAction}.
387+
*
388+
* The exceptions that are handled by {@link ShardStateAction}
389+
* are:
390+
* - {@link NotMasterException}
391+
* - {@link NodeDisconnectedException}
392+
* - {@link Discovery.FailedToCommitClusterStateException}
393+
*
394+
* Any other exception is communicated to the requester via
395+
* this notification.
396+
*
397+
* @param e the unexpected cause of the failure on the master
398+
*/
399+
default void onShardFailedFailure(final Exception e) {
341400
}
342401
}
343402
}

core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -458,7 +458,7 @@ private void applyNewOrUpdatedShards(final ClusterChangedEvent event) {
458458
if (!indexService.hasShard(shardId) && shardRouting.started()) {
459459
if (failedShards.containsKey(shardRouting.shardId())) {
460460
if (nodes.masterNode() != null) {
461-
shardStateAction.resendShardFailed(event.state(), shardRouting, indexMetaData.getIndexUUID(),
461+
shardStateAction.resendShardFailed(shardRouting, indexMetaData.getIndexUUID(),
462462
"master " + nodes.masterNode() + " marked shard as started, but shard has previous failed. resending shard failure.", null, SHARD_STATE_ACTION_LISTENER);
463463
}
464464
} else {
@@ -590,7 +590,7 @@ private void applyInitializingShard(final ClusterState state, final IndexMetaDat
590590
if (!indexService.hasShard(shardId)) {
591591
if (failedShards.containsKey(shardRouting.shardId())) {
592592
if (nodes.masterNode() != null) {
593-
shardStateAction.resendShardFailed(state, shardRouting, indexMetaData.getIndexUUID(),
593+
shardStateAction.resendShardFailed(shardRouting, indexMetaData.getIndexUUID(),
594594
"master " + nodes.masterNode() + " marked shard as initializing, but shard is marked as failed, resend shard failure", null, SHARD_STATE_ACTION_LISTENER);
595595
}
596596
return;
@@ -788,7 +788,7 @@ private void sendFailShard(ShardRouting shardRouting, String indexUUID, String m
788788
try {
789789
logger.warn("[{}] marking and sending shard failed due to [{}]", failure, shardRouting.shardId(), message);
790790
failedShards.put(shardRouting.shardId(), new FailedShard(shardRouting.version()));
791-
shardStateAction.shardFailed(clusterService.state(), shardRouting, indexUUID, message, failure, SHARD_STATE_ACTION_LISTENER);
791+
shardStateAction.shardFailed(shardRouting, indexUUID, message, failure, SHARD_STATE_ACTION_LISTENER);
792792
} catch (Throwable e1) {
793793
logger.warn("[{}][{}] failed to mark shard as failed (because of [{}])", e1, shardRouting.getIndex(), shardRouting.getId(), message);
794794
}

0 commit comments

Comments
 (0)