From c1ee90356a93f95be533be8c54f4ff66e255f819 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Thu, 12 Nov 2015 07:05:51 -0500 Subject: [PATCH] Add timeout mechanism for sending shard failures This commit adds a timeout mechanism for sending shard failures. The requesting thread can attach a listener to the timeout event so that handling it is part of the event chain. Relates #14252 --- .../TransportReplicationAction.java | 61 +++++++++++++------ .../elasticsearch/cluster/ClusterModule.java | 3 +- .../action/shard/ShardStateAction.java | 24 ++++++-- .../TransportReplicationActionTests.java | 28 +++++++-- .../action/shard/ShardStateActionTests.java | 35 +++++++++++ 5 files changed, 122 insertions(+), 29 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java index 868d1b7eadd66..23bf21bd83a11 100644 --- a/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java +++ b/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java @@ -36,7 +36,6 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateObserver; import org.elasticsearch.cluster.action.index.MappingUpdatedAction; -import org.elasticsearch.cluster.action.shard.NoOpShardStateActionListener; import org.elasticsearch.cluster.action.shard.ShardStateAction; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; @@ -81,6 +80,8 @@ */ public abstract class TransportReplicationAction extends TransportAction { + public static final String SHARD_FAILURE_TIMEOUT = "action.support.replication.shard.failure_timeout"; + protected final TransportService transportService; protected final ClusterService clusterService; protected final IndicesService indicesService; @@ -88,6 +89,7 @@ public abstract class TransportReplicationAction listener) { this.internalRequest = new InternalRequest(request); this.listener = listener; @@ -578,7 +581,7 @@ void performOnPrimary(final ShardRouting primary, final ShardIterator shardsIt) PrimaryOperationRequest por = new PrimaryOperationRequest(primary.id(), internalRequest.concreteIndex(), internalRequest.request()); Tuple primaryResponse = shardOperationOnPrimary(observer.observedState(), por); logger.trace("operation completed on primary [{}]", primary); - replicationPhase = new ReplicationPhase(shardsIt, primaryResponse.v2(), primaryResponse.v1(), observer, primary, internalRequest, listener, indexShardReference); + replicationPhase = new ReplicationPhase(shardsIt, primaryResponse.v2(), primaryResponse.v1(), observer, primary, internalRequest, listener, indexShardReference, shardFailedTimeout); } catch (Throwable e) { // shard has not been allocated yet, retry it here if (retryPrimaryException(e)) { @@ -687,7 +690,7 @@ private void failReplicaIfNeeded(String index, int shardId, Throwable t) { /** * inner class is responsible for send the requests to all replica shards and manage the responses */ - final class ReplicationPhase extends AbstractRunnable implements ShardStateAction.Listener { + final class ReplicationPhase extends AbstractRunnable { private final ReplicaRequest replicaRequest; private final Response finalResponse; @@ -702,6 +705,7 @@ final class ReplicationPhase extends AbstractRunnable implements ShardStateActio private final int totalShards; private final ClusterStateObserver observer; private final Releasable indexShardReference; + private final TimeValue shardFailedTimeout; /** * the constructor doesn't take any action, just calculates state. Call {@link #run()} to start @@ -709,7 +713,8 @@ final class ReplicationPhase extends AbstractRunnable implements ShardStateActio */ public ReplicationPhase(ShardIterator originalShardIt, ReplicaRequest replicaRequest, Response finalResponse, ClusterStateObserver observer, ShardRouting originalPrimaryShard, - InternalRequest internalRequest, ActionListener listener, Releasable indexShardReference) { + InternalRequest internalRequest, ActionListener listener, Releasable indexShardReference, + TimeValue shardFailedTimeout) { this.replicaRequest = replicaRequest; this.listener = listener; this.finalResponse = finalResponse; @@ -717,6 +722,7 @@ public ReplicationPhase(ShardIterator originalShardIt, ReplicaRequest replicaReq this.observer = observer; indexMetaData = observer.observedState().metaData().index(internalRequest.concreteIndex()); this.indexShardReference = indexShardReference; + this.shardFailedTimeout = shardFailedTimeout; ShardRouting shard; // we double check on the state, if it got changed we need to make sure we take the latest one cause @@ -822,16 +828,6 @@ public void onFailure(Throwable t) { forceFinishAsFailed(t); } - @Override - public void onShardFailedNoMaster() { - - } - - @Override - public void onShardFailedFailure(DiscoveryNode master, TransportException e) { - - } - /** * start sending current requests to replicas */ @@ -893,14 +889,14 @@ public void handleResponse(TransportResponse.Empty vResponse) { @Override public void handleException(TransportException exp) { - onReplicaFailure(nodeId, exp); logger.trace("[{}] transport failure during replica request [{}] ", exp, node, replicaRequest); - if (ignoreReplicaException(exp) == false) { + if (ignoreReplicaException(exp)) { + onReplicaFailure(nodeId, exp); + } else { logger.warn("{} failed to perform {} on node {}", exp, shardIt.shardId(), actionName, node); - shardStateAction.shardFailed(shard, indexMetaData.getIndexUUID(), "failed to perform " + actionName + " on replica on node " + node, exp, ReplicationPhase.this); + shardStateAction.shardFailed(shard, indexMetaData.getIndexUUID(), "failed to perform " + actionName + " on replica on node " + node, exp, shardFailedTimeout, new ReplicationFailedShardStateListener(nodeId, exp)); } } - }); } else { try { @@ -989,6 +985,33 @@ private void doFinish() { } } + public class ReplicationFailedShardStateListener implements ShardStateAction.Listener { + private final String nodeId; + private Throwable failure; + + public ReplicationFailedShardStateListener(String nodeId, Throwable failure) { + this.nodeId = nodeId; + this.failure = failure; + } + + @Override + public void onSuccess() { + onReplicaFailure(nodeId, failure); + } + + @Override + public void onShardFailedNoMaster() { + onReplicaFailure(nodeId, failure); + } + + @Override + public void onShardFailedFailure(DiscoveryNode master, TransportException e) { + if (e instanceof ReceiveTimeoutTransportException) { + logger.trace("timeout sending shard failure to master [{}]", e, master); + } + onReplicaFailure(nodeId, failure); + } + } } /** diff --git a/core/src/main/java/org/elasticsearch/cluster/ClusterModule.java b/core/src/main/java/org/elasticsearch/cluster/ClusterModule.java index 4674d143b9eb6..f2797e3b26752 100644 --- a/core/src/main/java/org/elasticsearch/cluster/ClusterModule.java +++ b/core/src/main/java/org/elasticsearch/cluster/ClusterModule.java @@ -21,6 +21,7 @@ import org.elasticsearch.action.admin.indices.close.TransportCloseIndexAction; import org.elasticsearch.action.support.DestructiveOperations; +import org.elasticsearch.action.support.replication.TransportReplicationAction; import org.elasticsearch.cluster.action.index.MappingUpdatedAction; import org.elasticsearch.cluster.action.index.NodeIndexDeletedAction; import org.elasticsearch.cluster.action.index.NodeMappingRefreshAction; @@ -86,7 +87,6 @@ import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService; import org.elasticsearch.indices.cache.request.IndicesRequestCache; import org.elasticsearch.indices.recovery.RecoverySettings; -import org.elasticsearch.indices.store.IndicesStore; import org.elasticsearch.indices.ttl.IndicesTTLService; import org.elasticsearch.search.SearchService; import org.elasticsearch.search.internal.DefaultSearchContext; @@ -206,6 +206,7 @@ private void registerBuiltinClusterSettings() { registerClusterDynamicSetting(TransportService.SETTING_TRACE_LOG_EXCLUDE + ".*", Validator.EMPTY); registerClusterDynamicSetting(TransportCloseIndexAction.SETTING_CLUSTER_INDICES_CLOSE_ENABLE, Validator.BOOLEAN); registerClusterDynamicSetting(ShardsLimitAllocationDecider.CLUSTER_TOTAL_SHARDS_PER_NODE, Validator.INTEGER); + registerClusterDynamicSetting(TransportReplicationAction.SHARD_FAILURE_TIMEOUT, Validator.TIME_NON_NEGATIVE); } private void registerBuiltinIndexSettings() { diff --git a/core/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java b/core/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java index 57432f8a4e03c..83897baa50dfa 100644 --- a/core/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java +++ b/core/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java @@ -37,6 +37,7 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.*; @@ -45,6 +46,7 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CountDownLatch; import static org.elasticsearch.cluster.routing.ShardRouting.readShardRoutingEntry; @@ -78,24 +80,37 @@ public ShardStateAction(Settings settings, ClusterService clusterService, Transp } public void shardFailed(final ShardRouting shardRouting, final String indexUUID, final String message, @Nullable final Throwable failure, Listener listener) { + shardFailed(shardRouting, indexUUID, message, failure, null, listener); + } + + public void shardFailed(final ShardRouting shardRouting, final String indexUUID, final String message, @Nullable final Throwable failure, TimeValue timeout, Listener listener) { DiscoveryNode masterNode = clusterService.state().nodes().masterNode(); if (masterNode == null) { logger.warn("can't send shard failed for {}, no master known.", shardRouting); listener.onShardFailedNoMaster(); return; } - innerShardFailed(shardRouting, indexUUID, masterNode, message, failure, listener); + innerShardFailed(shardRouting, indexUUID, masterNode, message, failure, timeout, listener); } public void resendShardFailed(final ShardRouting shardRouting, final String indexUUID, final DiscoveryNode masterNode, final String message, @Nullable final Throwable failure, Listener listener) { logger.trace("{} re-sending failed shard for {}, indexUUID [{}], reason [{}]", failure, shardRouting.shardId(), shardRouting, indexUUID, message); - innerShardFailed(shardRouting, indexUUID, masterNode, message, failure, listener); + innerShardFailed(shardRouting, indexUUID, masterNode, message, failure, null, listener); } - private void innerShardFailed(final ShardRouting shardRouting, final String indexUUID, final DiscoveryNode masterNode, final String message, final Throwable failure, Listener listener) { + private void innerShardFailed(final ShardRouting shardRouting, final String indexUUID, final DiscoveryNode masterNode, final String message, final Throwable failure, TimeValue timeout, Listener listener) { ShardRoutingEntry shardRoutingEntry = new ShardRoutingEntry(shardRouting, indexUUID, message, failure); + TransportRequestOptions options = TransportRequestOptions.EMPTY; + if (timeout != null) { + options = TransportRequestOptions.builder().withTimeout(timeout).build(); + } transportService.sendRequest(masterNode, - SHARD_FAILED_ACTION_NAME, shardRoutingEntry, new EmptyTransportResponseHandler(ThreadPool.Names.SAME) { + SHARD_FAILED_ACTION_NAME, shardRoutingEntry, options, new EmptyTransportResponseHandler(ThreadPool.Names.SAME) { + @Override + public void handleResponse(TransportResponse.Empty response) { + listener.onSuccess(); + } + @Override public void handleException(TransportException exp) { logger.warn("failed to send failed shard to {}", exp, masterNode); @@ -288,6 +303,7 @@ public String toString() { } public interface Listener { + default void onSuccess() {} default void onShardFailedNoMaster() {} default void onShardFailedFailure(final DiscoveryNode master, final TransportException e) {} } diff --git a/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java b/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java index 8e4ef817f3786..79f3853b0a8d0 100644 --- a/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java +++ b/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java @@ -63,6 +63,7 @@ import org.junit.BeforeClass; import java.io.IOException; +import java.util.ArrayList; import java.util.HashSet; import java.util.List; import java.util.concurrent.CountDownLatch; @@ -351,10 +352,11 @@ protected void runReplicateTest(IndexShardRoutingTable shardRoutingTable, int as internalRequest.concreteIndex(shardId.index().name()); Releasable reference = getOrCreateIndexShardOperationsCounter(); assertIndexShardCounter(2); + // TODO: set a default timeout TransportReplicationAction.ReplicationPhase replicationPhase = action.new ReplicationPhase(shardIt, request, new Response(), new ClusterStateObserver(clusterService, logger), - primaryShard, internalRequest, listener, reference); + primaryShard, internalRequest, listener, reference, null); assertThat(replicationPhase.totalShards(), equalTo(totalShards)); assertThat(replicationPhase.pending(), equalTo(assignedReplicas)); @@ -368,10 +370,12 @@ action.new ReplicationPhase(shardIt, request, int pending = replicationPhase.pending(); int criticalFailures = 0; // failures that should fail the shard int successful = 1; + List failures = new ArrayList<>(); for (CapturingTransport.CapturedRequest capturedRequest : capturedRequests) { if (randomBoolean()) { Throwable t; - if (randomBoolean()) { + boolean criticalFailure = randomBoolean(); + if (criticalFailure) { t = new CorruptIndexException("simulated", (String) null); criticalFailures++; } else { @@ -379,6 +383,14 @@ action.new ReplicationPhase(shardIt, request, } logger.debug("--> simulating failure on {} with [{}]", capturedRequest.node, t.getClass().getSimpleName()); transport.handleResponse(capturedRequest.requestId, t); + if (criticalFailure) { + CapturingTransport.CapturedRequest[] shardFailedRequests = transport.capturedRequests(); + transport.clear(); + assertEquals(1, shardFailedRequests.length); + CapturingTransport.CapturedRequest shardFailedRequest = shardFailedRequests[0]; + failures.add(shardFailedRequest); + transport.handleResponse(shardFailedRequest.requestId, TransportResponse.Empty.INSTANCE); + } } else { successful++; transport.handleResponse(capturedRequest.requestId, TransportResponse.Empty.INSTANCE); @@ -395,7 +407,7 @@ action.new ReplicationPhase(shardIt, request, assertThat(shardInfo.getSuccessful(), equalTo(successful)); assertThat(shardInfo.getTotal(), equalTo(totalShards)); - assertThat("failed to see enough shard failures", transport.capturedRequests().length, equalTo(criticalFailures)); + assertThat("failed to see enough shard failures", failures.size(), equalTo(criticalFailures)); for (CapturingTransport.CapturedRequest capturedRequest : transport.capturedRequests()) { assertThat(capturedRequest.action, equalTo(ShardStateAction.SHARD_FAILED_ACTION_NAME)); } @@ -464,9 +476,15 @@ public void testCounterIncrementedWhileReplicationOngoing() throws InterruptedEx primaryPhase = action.new PrimaryPhase(request, listener); primaryPhase.run(); assertIndexShardCounter(2); - assertThat(transport.capturedRequests().length, equalTo(1)); + CapturingTransport.CapturedRequest[] replicationRequests = transport.capturedRequests(); + transport.clear(); + assertThat(replicationRequests.length, equalTo(1)); // try with failure response - transport.handleResponse(transport.capturedRequests()[0].requestId, new CorruptIndexException("simulated", (String) null)); + transport.handleResponse(replicationRequests[0].requestId, new CorruptIndexException("simulated", (String) null)); + CapturingTransport.CapturedRequest[] shardFailedRequests = transport.capturedRequests(); + transport.clear(); + assertEquals(1, shardFailedRequests.length); + transport.handleResponse(shardFailedRequests[0].requestId, TransportResponse.Empty.INSTANCE); assertIndexShardCounter(1); } diff --git a/core/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java b/core/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java index a865f463cfb67..96eea881e9e08 100644 --- a/core/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java @@ -27,10 +27,12 @@ import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardsIterator; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.cluster.TestClusterService; import org.elasticsearch.test.transport.CapturingTransport; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.ReceiveTimeoutTransportException; import org.elasticsearch.transport.TransportException; import org.elasticsearch.transport.TransportService; import org.junit.After; @@ -38,6 +40,7 @@ import org.junit.Before; import org.junit.BeforeClass; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -141,6 +144,38 @@ public void onShardFailedFailure(DiscoveryNode master, TransportException e) { assertTrue(failure.get()); } + public void testTimeout() throws InterruptedException { + final String index = "test"; + + clusterService.setState(stateWithStartedPrimary(index, true, randomInt(5))); + + String indexUUID = clusterService.state().metaData().index(index).getIndexUUID(); + + AtomicBoolean progress = new AtomicBoolean(); + AtomicBoolean timedOut = new AtomicBoolean(); + + TimeValue timeout = new TimeValue(1, TimeUnit.MILLISECONDS); + CountDownLatch latch = new CountDownLatch(1); + shardStateAction.shardFailed(getRandomShardRouting(index), indexUUID, "test", getSimulatedFailure(), timeout, new ShardStateAction.Listener() { + @Override + public void onShardFailedFailure(DiscoveryNode master, TransportException e) { + if (e instanceof ReceiveTimeoutTransportException) { + assertFalse(progress.get()); + timedOut.set(true); + } + latch.countDown(); + } + }); + + latch.await(); + progress.set(true); + assertTrue(timedOut.get()); + + final CapturingTransport.CapturedRequest[] capturedRequests = transport.capturedRequests(); + transport.clear(); + assertThat(capturedRequests.length, equalTo(1)); + } + private ShardRouting getRandomShardRouting(String index) { IndexRoutingTable indexRoutingTable = clusterService.state().routingTable().index(index); ShardsIterator shardsIterator = indexRoutingTable.randomAllActiveShardsIt();