Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateObserver;
import org.elasticsearch.cluster.action.index.MappingUpdatedAction;
import org.elasticsearch.cluster.action.shard.NoOpShardStateActionListener;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
Expand Down Expand Up @@ -81,13 +80,16 @@
*/
public abstract class TransportReplicationAction<Request extends ReplicationRequest, ReplicaRequest extends ReplicationRequest, Response extends ActionWriteResponse> extends TransportAction<Request, Response> {

public static final String SHARD_FAILURE_TIMEOUT = "action.support.replication.shard.failure_timeout";

protected final TransportService transportService;
protected final ClusterService clusterService;
protected final IndicesService indicesService;
protected final ShardStateAction shardStateAction;
protected final WriteConsistencyLevel defaultWriteConsistencyLevel;
protected final TransportRequestOptions transportOptions;
protected final MappingUpdatedAction mappingUpdatedAction;
private final TimeValue shardFailedTimeout;

final String transportReplicaAction;
final String executor;
Expand Down Expand Up @@ -117,6 +119,8 @@ protected TransportReplicationAction(Settings settings, String actionName, Trans
this.transportOptions = transportOptions();

this.defaultWriteConsistencyLevel = WriteConsistencyLevel.fromString(settings.get("action.write_consistency", "quorum"));
// TODO: set a default timeout
shardFailedTimeout = settings.getAsTime(SHARD_FAILURE_TIMEOUT, null);
}

@Override
Expand Down Expand Up @@ -351,7 +355,6 @@ final class PrimaryPhase extends AbstractRunnable {
private final AtomicBoolean finished = new AtomicBoolean(false);
private volatile Releasable indexShardReference;


PrimaryPhase(Request request, ActionListener<Response> listener) {
this.internalRequest = new InternalRequest(request);
this.listener = listener;
Expand Down Expand Up @@ -578,7 +581,7 @@ void performOnPrimary(final ShardRouting primary, final ShardIterator shardsIt)
PrimaryOperationRequest por = new PrimaryOperationRequest(primary.id(), internalRequest.concreteIndex(), internalRequest.request());
Tuple<Response, ReplicaRequest> primaryResponse = shardOperationOnPrimary(observer.observedState(), por);
logger.trace("operation completed on primary [{}]", primary);
replicationPhase = new ReplicationPhase(shardsIt, primaryResponse.v2(), primaryResponse.v1(), observer, primary, internalRequest, listener, indexShardReference);
replicationPhase = new ReplicationPhase(shardsIt, primaryResponse.v2(), primaryResponse.v1(), observer, primary, internalRequest, listener, indexShardReference, shardFailedTimeout);
} catch (Throwable e) {
// shard has not been allocated yet, retry it here
if (retryPrimaryException(e)) {
Expand Down Expand Up @@ -687,7 +690,7 @@ private void failReplicaIfNeeded(String index, int shardId, Throwable t) {
/**
* inner class is responsible for send the requests to all replica shards and manage the responses
*/
final class ReplicationPhase extends AbstractRunnable implements ShardStateAction.Listener {
final class ReplicationPhase extends AbstractRunnable {

private final ReplicaRequest replicaRequest;
private final Response finalResponse;
Expand All @@ -702,21 +705,24 @@ final class ReplicationPhase extends AbstractRunnable implements ShardStateActio
private final int totalShards;
private final ClusterStateObserver observer;
private final Releasable indexShardReference;
private final TimeValue shardFailedTimeout;

/**
* the constructor doesn't take any action, just calculates state. Call {@link #run()} to start
* replicating.
*/
public ReplicationPhase(ShardIterator originalShardIt, ReplicaRequest replicaRequest, Response finalResponse,
ClusterStateObserver observer, ShardRouting originalPrimaryShard,
InternalRequest internalRequest, ActionListener<Response> listener, Releasable indexShardReference) {
InternalRequest internalRequest, ActionListener<Response> listener, Releasable indexShardReference,
TimeValue shardFailedTimeout) {
this.replicaRequest = replicaRequest;
this.listener = listener;
this.finalResponse = finalResponse;
this.originalPrimaryShard = originalPrimaryShard;
this.observer = observer;
indexMetaData = observer.observedState().metaData().index(internalRequest.concreteIndex());
this.indexShardReference = indexShardReference;
this.shardFailedTimeout = shardFailedTimeout;

ShardRouting shard;
// we double check on the state, if it got changed we need to make sure we take the latest one cause
Expand Down Expand Up @@ -822,16 +828,6 @@ public void onFailure(Throwable t) {
forceFinishAsFailed(t);
}

@Override
public void onShardFailedNoMaster() {

}

@Override
public void onShardFailedFailure(DiscoveryNode master, TransportException e) {

}

/**
* start sending current requests to replicas
*/
Expand Down Expand Up @@ -893,14 +889,14 @@ public void handleResponse(TransportResponse.Empty vResponse) {

@Override
public void handleException(TransportException exp) {
onReplicaFailure(nodeId, exp);
logger.trace("[{}] transport failure during replica request [{}] ", exp, node, replicaRequest);
if (ignoreReplicaException(exp) == false) {
if (ignoreReplicaException(exp)) {
onReplicaFailure(nodeId, exp);
} else {
logger.warn("{} failed to perform {} on node {}", exp, shardIt.shardId(), actionName, node);
shardStateAction.shardFailed(shard, indexMetaData.getIndexUUID(), "failed to perform " + actionName + " on replica on node " + node, exp, ReplicationPhase.this);
shardStateAction.shardFailed(shard, indexMetaData.getIndexUUID(), "failed to perform " + actionName + " on replica on node " + node, exp, shardFailedTimeout, new ReplicationFailedShardStateListener(nodeId, exp));
}
}

});
} else {
try {
Expand Down Expand Up @@ -989,6 +985,33 @@ private void doFinish() {
}
}

public class ReplicationFailedShardStateListener implements ShardStateAction.Listener {
private final String nodeId;
private Throwable failure;

public ReplicationFailedShardStateListener(String nodeId, Throwable failure) {
this.nodeId = nodeId;
this.failure = failure;
}

@Override
public void onSuccess() {
onReplicaFailure(nodeId, failure);
}

@Override
public void onShardFailedNoMaster() {
onReplicaFailure(nodeId, failure);
}

@Override
public void onShardFailedFailure(DiscoveryNode master, TransportException e) {
if (e instanceof ReceiveTimeoutTransportException) {
logger.trace("timeout sending shard failure to master [{}]", e, master);
}
onReplicaFailure(nodeId, failure);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we swallow the exception from the master here. We should do something with (log it?).

Also - I'm only OK with committing this now as is (i.e., potentially ignoring shard failures due to timeouts) if we plan to follow up with some proper retry mechanism in the next step. If not, I rather not have a timeout for now.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the plan is to tackle the core of handling the various failure scenarios in another pull request. This and the prior pull request are just setting the stage.

}
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.elasticsearch.action.admin.indices.close.TransportCloseIndexAction;
import org.elasticsearch.action.support.DestructiveOperations;
import org.elasticsearch.action.support.replication.TransportReplicationAction;
import org.elasticsearch.cluster.action.index.MappingUpdatedAction;
import org.elasticsearch.cluster.action.index.NodeIndexDeletedAction;
import org.elasticsearch.cluster.action.index.NodeMappingRefreshAction;
Expand Down Expand Up @@ -86,7 +87,6 @@
import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService;
import org.elasticsearch.indices.cache.request.IndicesRequestCache;
import org.elasticsearch.indices.recovery.RecoverySettings;
import org.elasticsearch.indices.store.IndicesStore;
import org.elasticsearch.indices.ttl.IndicesTTLService;
import org.elasticsearch.search.SearchService;
import org.elasticsearch.search.internal.DefaultSearchContext;
Expand Down Expand Up @@ -206,6 +206,7 @@ private void registerBuiltinClusterSettings() {
registerClusterDynamicSetting(TransportService.SETTING_TRACE_LOG_EXCLUDE + ".*", Validator.EMPTY);
registerClusterDynamicSetting(TransportCloseIndexAction.SETTING_CLUSTER_INDICES_CLOSE_ENABLE, Validator.BOOLEAN);
registerClusterDynamicSetting(ShardsLimitAllocationDecider.CLUSTER_TOTAL_SHARDS_PER_NODE, Validator.INTEGER);
registerClusterDynamicSetting(TransportReplicationAction.SHARD_FAILURE_TIMEOUT, Validator.TIME_NON_NEGATIVE);
}

private void registerBuiltinIndexSettings() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.*;
Expand All @@ -45,6 +46,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;

import static org.elasticsearch.cluster.routing.ShardRouting.readShardRoutingEntry;

Expand Down Expand Up @@ -78,24 +80,37 @@ public ShardStateAction(Settings settings, ClusterService clusterService, Transp
}

public void shardFailed(final ShardRouting shardRouting, final String indexUUID, final String message, @Nullable final Throwable failure, Listener listener) {
shardFailed(shardRouting, indexUUID, message, failure, null, listener);
}

public void shardFailed(final ShardRouting shardRouting, final String indexUUID, final String message, @Nullable final Throwable failure, TimeValue timeout, Listener listener) {
DiscoveryNode masterNode = clusterService.state().nodes().masterNode();
if (masterNode == null) {
logger.warn("can't send shard failed for {}, no master known.", shardRouting);
listener.onShardFailedNoMaster();
return;
}
innerShardFailed(shardRouting, indexUUID, masterNode, message, failure, listener);
innerShardFailed(shardRouting, indexUUID, masterNode, message, failure, timeout, listener);
}

public void resendShardFailed(final ShardRouting shardRouting, final String indexUUID, final DiscoveryNode masterNode, final String message, @Nullable final Throwable failure, Listener listener) {
logger.trace("{} re-sending failed shard for {}, indexUUID [{}], reason [{}]", failure, shardRouting.shardId(), shardRouting, indexUUID, message);
innerShardFailed(shardRouting, indexUUID, masterNode, message, failure, listener);
innerShardFailed(shardRouting, indexUUID, masterNode, message, failure, null, listener);
}

private void innerShardFailed(final ShardRouting shardRouting, final String indexUUID, final DiscoveryNode masterNode, final String message, final Throwable failure, Listener listener) {
private void innerShardFailed(final ShardRouting shardRouting, final String indexUUID, final DiscoveryNode masterNode, final String message, final Throwable failure, TimeValue timeout, Listener listener) {
ShardRoutingEntry shardRoutingEntry = new ShardRoutingEntry(shardRouting, indexUUID, message, failure);
TransportRequestOptions options = TransportRequestOptions.EMPTY;
if (timeout != null) {
options = TransportRequestOptions.builder().withTimeout(timeout).build();
}
transportService.sendRequest(masterNode,
SHARD_FAILED_ACTION_NAME, shardRoutingEntry, new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
SHARD_FAILED_ACTION_NAME, shardRoutingEntry, options, new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
@Override
public void handleResponse(TransportResponse.Empty response) {
listener.onSuccess();
}

@Override
public void handleException(TransportException exp) {
logger.warn("failed to send failed shard to {}", exp, masterNode);
Expand Down Expand Up @@ -288,6 +303,7 @@ public String toString() {
}

public interface Listener {
default void onSuccess() {}
default void onShardFailedNoMaster() {}
default void onShardFailedFailure(final DiscoveryNode master, final TransportException e) {}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import org.junit.BeforeClass;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.concurrent.CountDownLatch;
Expand Down Expand Up @@ -351,10 +352,11 @@ protected void runReplicateTest(IndexShardRoutingTable shardRoutingTable, int as
internalRequest.concreteIndex(shardId.index().name());
Releasable reference = getOrCreateIndexShardOperationsCounter();
assertIndexShardCounter(2);
// TODO: set a default timeout
TransportReplicationAction<Request, Request, Response>.ReplicationPhase replicationPhase =
action.new ReplicationPhase(shardIt, request,
new Response(), new ClusterStateObserver(clusterService, logger),
primaryShard, internalRequest, listener, reference);
primaryShard, internalRequest, listener, reference, null);

assertThat(replicationPhase.totalShards(), equalTo(totalShards));
assertThat(replicationPhase.pending(), equalTo(assignedReplicas));
Expand All @@ -368,17 +370,27 @@ action.new ReplicationPhase(shardIt, request,
int pending = replicationPhase.pending();
int criticalFailures = 0; // failures that should fail the shard
int successful = 1;
List<CapturingTransport.CapturedRequest> failures = new ArrayList<>();
for (CapturingTransport.CapturedRequest capturedRequest : capturedRequests) {
if (randomBoolean()) {
Throwable t;
if (randomBoolean()) {
boolean criticalFailure = randomBoolean();
if (criticalFailure) {
t = new CorruptIndexException("simulated", (String) null);
criticalFailures++;
} else {
t = new IndexShardNotStartedException(shardId, IndexShardState.RECOVERING);
}
logger.debug("--> simulating failure on {} with [{}]", capturedRequest.node, t.getClass().getSimpleName());
transport.handleResponse(capturedRequest.requestId, t);
if (criticalFailure) {
CapturingTransport.CapturedRequest[] shardFailedRequests = transport.capturedRequests();
transport.clear();
assertEquals(1, shardFailedRequests.length);
CapturingTransport.CapturedRequest shardFailedRequest = shardFailedRequests[0];
failures.add(shardFailedRequest);
transport.handleResponse(shardFailedRequest.requestId, TransportResponse.Empty.INSTANCE);
}
} else {
successful++;
transport.handleResponse(capturedRequest.requestId, TransportResponse.Empty.INSTANCE);
Expand All @@ -395,7 +407,7 @@ action.new ReplicationPhase(shardIt, request,
assertThat(shardInfo.getSuccessful(), equalTo(successful));
assertThat(shardInfo.getTotal(), equalTo(totalShards));

assertThat("failed to see enough shard failures", transport.capturedRequests().length, equalTo(criticalFailures));
assertThat("failed to see enough shard failures", failures.size(), equalTo(criticalFailures));
for (CapturingTransport.CapturedRequest capturedRequest : transport.capturedRequests()) {
assertThat(capturedRequest.action, equalTo(ShardStateAction.SHARD_FAILED_ACTION_NAME));
}
Expand Down Expand Up @@ -464,9 +476,15 @@ public void testCounterIncrementedWhileReplicationOngoing() throws InterruptedEx
primaryPhase = action.new PrimaryPhase(request, listener);
primaryPhase.run();
assertIndexShardCounter(2);
assertThat(transport.capturedRequests().length, equalTo(1));
CapturingTransport.CapturedRequest[] replicationRequests = transport.capturedRequests();
transport.clear();
assertThat(replicationRequests.length, equalTo(1));
// try with failure response
transport.handleResponse(transport.capturedRequests()[0].requestId, new CorruptIndexException("simulated", (String) null));
transport.handleResponse(replicationRequests[0].requestId, new CorruptIndexException("simulated", (String) null));
CapturingTransport.CapturedRequest[] shardFailedRequests = transport.capturedRequests();
transport.clear();
assertEquals(1, shardFailedRequests.length);
transport.handleResponse(shardFailedRequests[0].requestId, TransportResponse.Empty.INSTANCE);
assertIndexShardCounter(1);
}

Expand Down
Loading