Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@
import org.elasticsearch.transport.BaseTransportResponseHandler;
import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.transport.EmptyTransportResponseHandler;
import org.elasticsearch.transport.ReceiveTimeoutTransportException;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportChannelResponseHandler;
import org.elasticsearch.transport.TransportException;
Expand All @@ -76,6 +75,7 @@
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
Expand All @@ -92,16 +92,13 @@
*/
public abstract class TransportReplicationAction<Request extends ReplicationRequest, ReplicaRequest extends ReplicationRequest, Response extends ReplicationResponse> extends TransportAction<Request, Response> {

public static final String SHARD_FAILURE_TIMEOUT = "action.support.replication.shard.failure_timeout";

protected final TransportService transportService;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you.

protected final ClusterService clusterService;
protected final IndicesService indicesService;
protected final ShardStateAction shardStateAction;
protected final WriteConsistencyLevel defaultWriteConsistencyLevel;
protected final TransportRequestOptions transportOptions;
protected final MappingUpdatedAction mappingUpdatedAction;
private final TimeValue shardFailedTimeout;

final String transportReplicaAction;
final String transportPrimaryAction;
Expand Down Expand Up @@ -133,8 +130,6 @@ protected TransportReplicationAction(Settings settings, String actionName, Trans
this.transportOptions = transportOptions();

this.defaultWriteConsistencyLevel = WriteConsistencyLevel.fromString(settings.get("action.write_consistency", "quorum"));
// TODO: set a default timeout
shardFailedTimeout = settings.getAsTime(SHARD_FAILURE_TIMEOUT, null);
}

@Override
Expand Down Expand Up @@ -608,7 +603,7 @@ protected void doRun() throws Exception {
if (logger.isTraceEnabled()) {
logger.trace("action [{}] completed on shard [{}] for request [{}] with cluster state version [{}]", transportPrimaryAction, shardId, request, state.version());
}
replicationPhase = new ReplicationPhase(primaryResponse.v2(), primaryResponse.v1(), shardId, channel, indexShardReference, shardFailedTimeout);
replicationPhase = new ReplicationPhase(primaryResponse.v2(), primaryResponse.v1(), shardId, channel, indexShardReference);
} catch (Throwable e) {
if (ExceptionsHelper.status(e) == RestStatus.CONFLICT) {
if (logger.isTraceEnabled()) {
Expand Down Expand Up @@ -732,15 +727,13 @@ final class ReplicationPhase extends AbstractRunnable {
private final AtomicInteger pending;
private final int totalShards;
private final Releasable indexShardReference;
private final TimeValue shardFailedTimeout;

public ReplicationPhase(ReplicaRequest replicaRequest, Response finalResponse, ShardId shardId,
TransportChannel channel, Releasable indexShardReference, TimeValue shardFailedTimeout) {
TransportChannel channel, Releasable indexShardReference) {
this.replicaRequest = replicaRequest;
this.channel = channel;
this.finalResponse = finalResponse;
this.indexShardReference = indexShardReference;
this.shardFailedTimeout = shardFailedTimeout;
this.shardId = shardId;

// we have to get a new state after successfully indexing into the primary in order to honour recovery semantics.
Expand Down Expand Up @@ -882,15 +875,32 @@ public void handleException(TransportException exp) {
if (ignoreReplicaException(exp)) {
onReplicaFailure(nodeId, exp);
} else {
logger.warn("{} failed to perform {} on node {}", exp, shardId, transportReplicaAction, node);
shardStateAction.shardFailed(clusterService.state(), shard, indexUUID, "failed to perform " + transportReplicaAction + " on replica on node " + node, exp, shardFailedTimeout, new ReplicationFailedShardStateListener(nodeId, exp));
String message = String.format(Locale.ROOT, "failed to perform %s on replica on node %s", transportReplicaAction, node);
logger.warn("{} {}", exp, shardId, message);
shardStateAction.shardFailed(
shard,
indexUUID,
message,
exp,
new ShardStateAction.Listener() {
@Override
public void onSuccess() {
onReplicaFailure(nodeId, exp);
}

@Override
public void onShardFailedFailure(Exception e) {
// TODO: handle catastrophic non-channel failures
onReplicaFailure(nodeId, exp);
}
}
);
}
}
}
);
}


void onReplicaFailure(String nodeId, @Nullable Throwable e) {
// Only version conflict should be ignored from being put into the _shards header?
if (e != null && ignoreReplicaException(e) == false) {
Expand Down Expand Up @@ -955,34 +965,6 @@ private void doFinish() {
}
}
}

public class ReplicationFailedShardStateListener implements ShardStateAction.Listener {
private final String nodeId;
private Throwable failure;

public ReplicationFailedShardStateListener(String nodeId, Throwable failure) {
this.nodeId = nodeId;
this.failure = failure;
}

@Override
public void onSuccess() {
onReplicaFailure(nodeId, failure);
}

@Override
public void onShardFailedNoMaster() {
onReplicaFailure(nodeId, failure);
}

@Override
public void onShardFailedFailure(DiscoveryNode master, TransportException e) {
if (e instanceof ReceiveTimeoutTransportException) {
logger.trace("timeout sending shard failure to master [{}]", e, master);
}
onReplicaFailure(nodeId, failure);
}
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateObserver;
import org.elasticsearch.cluster.ClusterStateTaskConfig;
import org.elasticsearch.cluster.ClusterStateTaskExecutor;
import org.elasticsearch.cluster.ClusterStateTaskListener;
import org.elasticsearch.cluster.MasterNodeChangePredicate;
import org.elasticsearch.cluster.NotMasterException;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
Expand All @@ -42,73 +44,118 @@
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.node.NodeClosedException;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.EmptyTransportResponseHandler;
import org.elasticsearch.transport.NodeDisconnectedException;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestHandler;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Set;

import static org.elasticsearch.cluster.routing.ShardRouting.readShardRoutingEntry;

public class ShardStateAction extends AbstractComponent {

public static final String SHARD_STARTED_ACTION_NAME = "internal:cluster/shard/started";
public static final String SHARD_FAILED_ACTION_NAME = "internal:cluster/shard/failure";

private final TransportService transportService;
private final ClusterService clusterService;

@Inject
public ShardStateAction(Settings settings, ClusterService clusterService, TransportService transportService,
AllocationService allocationService, RoutingService routingService) {
super(settings);
this.transportService = transportService;
this.clusterService = clusterService;

transportService.registerRequestHandler(SHARD_STARTED_ACTION_NAME, ShardRoutingEntry::new, ThreadPool.Names.SAME, new ShardStartedTransportHandler(clusterService, new ShardStartedClusterStateTaskExecutor(allocationService, logger), logger));
transportService.registerRequestHandler(SHARD_FAILED_ACTION_NAME, ShardRoutingEntry::new, ThreadPool.Names.SAME, new ShardFailedTransportHandler(clusterService, new ShardFailedClusterStateTaskExecutor(allocationService, routingService, logger), logger));
}

public void shardFailed(final ClusterState clusterState, final ShardRouting shardRouting, final String indexUUID, final String message, @Nullable final Throwable failure, Listener listener) {
shardFailed(clusterState, shardRouting, indexUUID, message, failure, null, listener);
public void shardFailed(final ShardRouting shardRouting, final String indexUUID, final String message, @Nullable final Throwable failure, Listener listener) {
ClusterStateObserver observer = new ClusterStateObserver(clusterService, null, logger);
ShardRoutingEntry shardRoutingEntry = new ShardRoutingEntry(shardRouting, indexUUID, message, failure);
sendShardFailed(observer, shardRoutingEntry, listener);
}

public void resendShardFailed(final ClusterState clusterState, final ShardRouting shardRouting, final String indexUUID, final String message, @Nullable final Throwable failure, Listener listener) {
public void resendShardFailed(final ShardRouting shardRouting, final String indexUUID, final String message, @Nullable final Throwable failure, Listener listener) {
logger.trace("{} re-sending failed shard [{}], index UUID [{}], reason [{}]", shardRouting.shardId(), failure, shardRouting, indexUUID, message);
shardFailed(clusterState, shardRouting, indexUUID, message, failure, listener);
shardFailed(shardRouting, indexUUID, message, failure, listener);
}

public void shardFailed(final ClusterState clusterState, final ShardRouting shardRouting, final String indexUUID, final String message, @Nullable final Throwable failure, TimeValue timeout, Listener listener) {
DiscoveryNode masterNode = clusterState.nodes().masterNode();
private void sendShardFailed(ClusterStateObserver observer, ShardRoutingEntry shardRoutingEntry, Listener listener) {
DiscoveryNode masterNode = observer.observedState().nodes().masterNode();
if (masterNode == null) {
logger.warn("{} no master known to fail shard [{}]", shardRouting.shardId(), shardRouting);
listener.onShardFailedNoMaster();
return;
}
ShardRoutingEntry shardRoutingEntry = new ShardRoutingEntry(shardRouting, indexUUID, message, failure);
TransportRequestOptions options = TransportRequestOptions.EMPTY;
if (timeout != null) {
options = TransportRequestOptions.builder().withTimeout(timeout).build();
logger.warn("{} no master known to fail shard [{}]", shardRoutingEntry.getShardRouting().shardId(), shardRoutingEntry.getShardRouting());
waitForNewMasterAndRetry(observer, shardRoutingEntry, listener);
} else {
transportService.sendRequest(masterNode,
SHARD_FAILED_ACTION_NAME, shardRoutingEntry, new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
@Override
public void handleResponse(TransportResponse.Empty response) {
listener.onSuccess();
}

@Override
public void handleException(TransportException exp) {
assert exp.getCause() != null : exp;
if (isMasterChannelException(exp.getCause())) {
waitForNewMasterAndRetry(observer, shardRoutingEntry, listener);
} else {
logger.warn("{} unexpected failure while sending request to [{}] to fail shard [{}]", exp, shardRoutingEntry.getShardRouting().shardId(), masterNode, shardRoutingEntry);
listener.onShardFailedFailure(exp);
}
}
});
}
transportService.sendRequest(masterNode,
SHARD_FAILED_ACTION_NAME, shardRoutingEntry, options, new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
@Override
public void handleResponse(TransportResponse.Empty response) {
listener.onSuccess();
}
}

@Override
public void handleException(TransportException exp) {
logger.warn("{} unexpected failure while sending request to [{}] to fail shard [{}]", exp, shardRoutingEntry.shardRouting.shardId(), masterNode, shardRoutingEntry);
listener.onShardFailedFailure(masterNode, exp);
private static Set<Class<?>> MASTER_CHANNEL_EXCEPTIONS =
new HashSet<>(Arrays.asList(
NotMasterException.class,
NodeDisconnectedException.class,
Discovery.FailedToCommitClusterStateException.class
));
private static boolean isMasterChannelException(Throwable cause) {
return MASTER_CHANNEL_EXCEPTIONS.contains(cause.getClass());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't we need to unwrap the cause (it's a TransportException now) ? If so, I think we need to strengthen our IT tests here..

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bleskes The cause is unwrapped in ShardStateAction#sendShardFailed.

}

// visible for testing
protected void waitForNewMasterAndRetry(ClusterStateObserver observer, ShardRoutingEntry shardRoutingEntry, Listener listener) {
observer.waitForNextChange(new ClusterStateObserver.Listener() {
@Override
public void onNewClusterState(ClusterState state) {
if (logger.isTraceEnabled()) {
logger.trace("new cluster state [{}] after waiting for master election to fail shard [{}]", shardRoutingEntry.getShardRouting().shardId(), state.prettyPrint(), shardRoutingEntry);
}
});
sendShardFailed(observer, shardRoutingEntry, listener);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add a trace log here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pushed fe39d11.

}

@Override
public void onClusterServiceClose() {
logger.warn("{} node closed while handling failed shard [{}]", shardRoutingEntry.failure, shardRoutingEntry.getShardRouting().getId(), shardRoutingEntry.getShardRouting());
listener.onShardFailedFailure(new NodeClosedException(clusterService.localNode()));
}

@Override
public void onTimeout(TimeValue timeout) {
// we wait indefinitely for a new master
assert false;
}
}, MasterNodeChangePredicate.INSTANCE);
}

private static class ShardFailedTransportHandler implements TransportRequestHandler<ShardRoutingEntry> {
Expand Down Expand Up @@ -334,10 +381,22 @@ public interface Listener {
default void onSuccess() {
}

default void onShardFailedNoMaster() {
}

default void onShardFailedFailure(final DiscoveryNode master, final TransportException e) {
/**
* Notification for non-channel exceptions that are not handled
* by {@link ShardStateAction}.
*
* The exceptions that are handled by {@link ShardStateAction}
* are:
* - {@link NotMasterException}
* - {@link NodeDisconnectedException}
* - {@link Discovery.FailedToCommitClusterStateException}
*
* Any other exception is communicated to the requester via
* this notification.
*
* @param e the unexpected cause of the failure on the master
*/
default void onShardFailedFailure(final Exception e) {
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -458,7 +458,7 @@ private void applyNewOrUpdatedShards(final ClusterChangedEvent event) {
if (!indexService.hasShard(shardId) && shardRouting.started()) {
if (failedShards.containsKey(shardRouting.shardId())) {
if (nodes.masterNode() != null) {
shardStateAction.resendShardFailed(event.state(), shardRouting, indexMetaData.getIndexUUID(),
shardStateAction.resendShardFailed(shardRouting, indexMetaData.getIndexUUID(),
"master " + nodes.masterNode() + " marked shard as started, but shard has previous failed. resending shard failure.", null, SHARD_STATE_ACTION_LISTENER);
}
} else {
Expand Down Expand Up @@ -590,7 +590,7 @@ private void applyInitializingShard(final ClusterState state, final IndexMetaDat
if (!indexService.hasShard(shardId)) {
if (failedShards.containsKey(shardRouting.shardId())) {
if (nodes.masterNode() != null) {
shardStateAction.resendShardFailed(state, shardRouting, indexMetaData.getIndexUUID(),
shardStateAction.resendShardFailed(shardRouting, indexMetaData.getIndexUUID(),
"master " + nodes.masterNode() + " marked shard as initializing, but shard is marked as failed, resend shard failure", null, SHARD_STATE_ACTION_LISTENER);
}
return;
Expand Down Expand Up @@ -788,7 +788,7 @@ private void sendFailShard(ShardRouting shardRouting, String indexUUID, String m
try {
logger.warn("[{}] marking and sending shard failed due to [{}]", failure, shardRouting.shardId(), message);
failedShards.put(shardRouting.shardId(), new FailedShard(shardRouting.version()));
shardStateAction.shardFailed(clusterService.state(), shardRouting, indexUUID, message, failure, SHARD_STATE_ACTION_LISTENER);
shardStateAction.shardFailed(shardRouting, indexUUID, message, failure, SHARD_STATE_ACTION_LISTENER);
} catch (Throwable e1) {
logger.warn("[{}][{}] failed to mark shard as failed (because of [{}])", e1, shardRouting.getIndex(), shardRouting.getId(), message);
}
Expand Down
Loading